# Copyright 2023 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from twisted.internet import defer from twisted.test.proto_helpers import MemoryReactor from synapse.server import HomeServer from synapse.util import Clock from tests import unittest from tests.replication._base import BaseMultiWorkerStreamTestCase class WorkerLockTestCase(unittest.HomeserverTestCase): def prepare( self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer ) -> None: self.worker_lock_handler = self.hs.get_worker_locks_handler() def test_wait_for_lock_locally(self) -> None: """Test waiting for a lock on a single worker""" lock1 = self.worker_lock_handler.acquire_lock("name", "key") self.get_success(lock1.__aenter__()) lock2 = self.worker_lock_handler.acquire_lock("name", "key") d2 = defer.ensureDeferred(lock2.__aenter__()) self.assertNoResult(d2) self.get_success(lock1.__aexit__(None, None, None)) self.get_success(d2) self.get_success(lock2.__aexit__(None, None, None)) class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase): def prepare( self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer ) -> None: self.main_worker_lock_handler = self.hs.get_worker_locks_handler() def test_wait_for_lock_worker(self) -> None: """Test waiting for a lock on another worker""" worker = self.make_worker_hs( "synapse.app.generic_worker", extra_config={ "redis": {"enabled": True}, }, ) worker_lock_handler = worker.get_worker_locks_handler() lock1 = self.main_worker_lock_handler.acquire_lock("name", "key") self.get_success(lock1.__aenter__()) lock2 = worker_lock_handler.acquire_lock("name", "key") d2 = defer.ensureDeferred(lock2.__aenter__()) self.assertNoResult(d2) self.get_success(lock1.__aexit__(None, None, None)) self.get_success(d2) self.get_success(lock2.__aexit__(None, None, None))