Use the newer foo_instances configuration instead of the deprecated flags to enable specific features (e.g. start_pushers).tags/v1.74.0rc1
@@ -0,0 +1 @@ | |||
Modernize unit tests configuration related to workers. |
@@ -126,6 +126,13 @@ class PresenceRouterTestModule: | |||
class PresenceRouterTestCase(FederatingHomeserverTestCase): | |||
""" | |||
Test cases using a custom PresenceRouter | |||
By default in test cases, federation sending is disabled. This class re-enables it | |||
for the main process by setting `federation_sender_instances` to None. | |||
""" | |||
servlets = [ | |||
admin.register_servlets, | |||
login.register_servlets, | |||
@@ -150,6 +157,11 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): | |||
self.sync_handler = self.hs.get_sync_handler() | |||
self.module_api = homeserver.get_module_api() | |||
def default_config(self) -> JsonDict: | |||
config = super().default_config() | |||
config["federation_sender_instances"] = None | |||
return config | |||
@override_config( | |||
{ | |||
"presence": { | |||
@@ -162,7 +174,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): | |||
}, | |||
} | |||
}, | |||
"send_federation": True, | |||
} | |||
) | |||
def test_receiving_all_presence_legacy(self): | |||
@@ -180,7 +191,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): | |||
}, | |||
}, | |||
], | |||
"send_federation": True, | |||
} | |||
) | |||
def test_receiving_all_presence(self): | |||
@@ -290,7 +300,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): | |||
}, | |||
} | |||
}, | |||
"send_federation": True, | |||
} | |||
) | |||
def test_send_local_online_presence_to_with_module_legacy(self): | |||
@@ -310,7 +319,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): | |||
}, | |||
}, | |||
], | |||
"send_federation": True, | |||
} | |||
) | |||
def test_send_local_online_presence_to_with_module(self): | |||
@@ -7,13 +7,21 @@ from synapse.federation.sender import PerDestinationQueue, TransactionManager | |||
from synapse.federation.units import Edu | |||
from synapse.rest import admin | |||
from synapse.rest.client import login, room | |||
from synapse.types import JsonDict | |||
from synapse.util.retryutils import NotRetryingDestination | |||
from tests.test_utils import event_injection, make_awaitable | |||
from tests.unittest import FederatingHomeserverTestCase, override_config | |||
from tests.unittest import FederatingHomeserverTestCase | |||
class FederationCatchUpTestCases(FederatingHomeserverTestCase): | |||
""" | |||
Tests cases of catching up over federation. | |||
By default for test cases federation sending is disabled. This Test class has it | |||
re-enabled for the main process. | |||
""" | |||
servlets = [ | |||
admin.register_servlets, | |||
room.register_servlets, | |||
@@ -42,6 +50,11 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): | |||
self.record_transaction | |||
) | |||
def default_config(self) -> JsonDict: | |||
config = super().default_config() | |||
config["federation_sender_instances"] = None | |||
return config | |||
async def record_transaction(self, txn, json_cb): | |||
if self.is_online: | |||
data = json_cb() | |||
@@ -79,7 +92,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): | |||
)[0] | |||
return {"event_id": event_id, "stream_ordering": stream_ordering} | |||
@override_config({"send_federation": True}) | |||
def test_catch_up_destination_rooms_tracking(self): | |||
""" | |||
Tests that we populate the `destination_rooms` table as needed. | |||
@@ -105,7 +117,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): | |||
self.assertEqual(row_2["event_id"], event_id_2) | |||
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1) | |||
@override_config({"send_federation": True}) | |||
def test_catch_up_last_successful_stream_ordering_tracking(self): | |||
""" | |||
Tests that we populate the `destination_rooms` table as needed. | |||
@@ -163,7 +174,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): | |||
"Send succeeded but not marked as last_successful_stream_ordering", | |||
) | |||
@override_config({"send_federation": True}) # critical to federate | |||
def test_catch_up_from_blank_state(self): | |||
""" | |||
Runs an overall test of federation catch-up from scratch. | |||
@@ -260,7 +270,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): | |||
return per_dest_queue, results_list | |||
@override_config({"send_federation": True}) | |||
def test_catch_up_loop(self): | |||
""" | |||
Tests the behaviour of _catch_up_transmission_loop. | |||
@@ -325,7 +334,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): | |||
event_5.internal_metadata.stream_ordering, | |||
) | |||
@override_config({"send_federation": True}) | |||
def test_catch_up_on_synapse_startup(self): | |||
""" | |||
Tests the behaviour of get_catch_up_outstanding_destinations and | |||
@@ -424,7 +432,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): | |||
# - all destinations are woken exactly once; they appear once in woken. | |||
self.assertCountEqual(woken, server_names[:-1]) | |||
@override_config({"send_federation": True}) | |||
def test_not_latest_event(self): | |||
"""Test that we send the latest event in the room even if its not ours.""" | |||
@@ -25,10 +25,17 @@ from synapse.rest.client import login | |||
from synapse.types import JsonDict, ReadReceipt | |||
from tests.test_utils import make_awaitable | |||
from tests.unittest import HomeserverTestCase, override_config | |||
from tests.unittest import HomeserverTestCase | |||
class FederationSenderReceiptsTestCases(HomeserverTestCase): | |||
""" | |||
Test federation sending to update receipts. | |||
By default for test cases federation sending is disabled. This Test class has it | |||
re-enabled for the main process. | |||
""" | |||
def make_homeserver(self, reactor, clock): | |||
hs = self.setup_test_homeserver( | |||
federation_transport_client=Mock(spec=["send_transaction"]), | |||
@@ -44,7 +51,11 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): | |||
return hs | |||
@override_config({"send_federation": True}) | |||
def default_config(self) -> JsonDict: | |||
config = super().default_config() | |||
config["federation_sender_instances"] = None | |||
return config | |||
def test_send_receipts(self): | |||
mock_send_transaction = ( | |||
self.hs.get_federation_transport_client().send_transaction | |||
@@ -87,7 +98,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): | |||
], | |||
) | |||
@override_config({"send_federation": True}) | |||
def test_send_receipts_thread(self): | |||
mock_send_transaction = ( | |||
self.hs.get_federation_transport_client().send_transaction | |||
@@ -164,7 +174,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): | |||
], | |||
) | |||
@override_config({"send_federation": True}) | |||
def test_send_receipts_with_backoff(self): | |||
"""Send two receipts in quick succession; the second should be flushed, but | |||
only after 20ms""" | |||
@@ -251,6 +260,13 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): | |||
class FederationSenderDevicesTestCases(HomeserverTestCase): | |||
""" | |||
Test federation sending to update devices. | |||
By default for test cases federation sending is disabled. This Test class has it | |||
re-enabled for the main process. | |||
""" | |||
servlets = [ | |||
admin.register_servlets, | |||
login.register_servlets, | |||
@@ -265,7 +281,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): | |||
def default_config(self): | |||
c = super().default_config() | |||
c["send_federation"] = True | |||
# Enable federation sending on the main process. | |||
c["federation_sender_instances"] = None | |||
return c | |||
def prepare(self, reactor, clock, hs): | |||
@@ -992,7 +992,8 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): | |||
def default_config(self): | |||
config = super().default_config() | |||
config["send_federation"] = True | |||
# Enable federation sending on the main process. | |||
config["federation_sender_instances"] = None | |||
return config | |||
def prepare(self, reactor, clock, hs): | |||
@@ -200,7 +200,8 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): | |||
], | |||
) | |||
@override_config({"send_federation": True}) | |||
# Enable federation sending on the main process. | |||
@override_config({"federation_sender_instances": None}) | |||
def test_started_typing_remote_send(self) -> None: | |||
self.room_members = [U_APPLE, U_ONION] | |||
@@ -305,7 +306,8 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): | |||
self.assertEqual(events[0], []) | |||
self.assertEqual(events[1], 0) | |||
@override_config({"send_federation": True}) | |||
# Enable federation sending on the main process. | |||
@override_config({"federation_sender_instances": None}) | |||
def test_stopped_typing(self) -> None: | |||
self.room_members = [U_APPLE, U_BANANA, U_ONION] | |||
@@ -56,7 +56,8 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): | |||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: | |||
config = self.default_config() | |||
config["update_user_directory"] = True | |||
# Re-enables updating the user directory, as that function is needed below. | |||
config["update_user_directory_from_worker"] = None | |||
self.appservice = ApplicationService( | |||
token="i_am_an_app_service", | |||
@@ -1045,7 +1046,9 @@ class TestUserDirSearchDisabled(unittest.HomeserverTestCase): | |||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: | |||
config = self.default_config() | |||
config["update_user_directory"] = True | |||
# Re-enables updating the user directory, as that function is needed below. It | |||
# will be force disabled later | |||
config["update_user_directory_from_worker"] = None | |||
hs = self.setup_test_homeserver(config=config) | |||
self.config = hs.config | |||
@@ -336,7 +336,8 @@ class ModuleApiTestCase(HomeserverTestCase): | |||
# Test sending local online presence to users from the main process | |||
_test_sending_local_online_presence_to_local_user(self, test_with_workers=False) | |||
@override_config({"send_federation": True}) | |||
# Enable federation sending on the main process. | |||
@override_config({"federation_sender_instances": None}) | |||
def test_send_local_online_presence_to_federation(self): | |||
"""Tests that send_local_presence_to_users sends local online presence to remote users.""" | |||
# Create a user who will send presence updates | |||
@@ -66,7 +66,6 @@ class EmailPusherTests(HomeserverTestCase): | |||
"riot_base_url": None, | |||
} | |||
config["public_baseurl"] = "http://aaa" | |||
config["start_pushers"] = True | |||
hs = self.setup_test_homeserver(config=config) | |||
@@ -11,7 +11,7 @@ | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from typing import Any, Dict, List, Optional, Tuple | |||
from typing import List, Optional, Tuple | |||
from unittest.mock import Mock | |||
from twisted.internet.defer import Deferred | |||
@@ -41,11 +41,6 @@ class HTTPPusherTests(HomeserverTestCase): | |||
user_id = True | |||
hijack_auth = False | |||
def default_config(self) -> Dict[str, Any]: | |||
config = super().default_config() | |||
config["start_pushers"] = True | |||
return config | |||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: | |||
self.push_attempts: List[Tuple[Deferred, str, dict]] = [] | |||
@@ -307,7 +307,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): | |||
stream to the master HS. | |||
Args: | |||
worker_app: Type of worker, e.g. `synapse.app.federation_sender`. | |||
worker_app: Type of worker, e.g. `synapse.app.generic_worker`. | |||
extra_config: Any extra config to use for this instances. | |||
**kwargs: Options that get passed to `self.setup_test_homeserver`, | |||
useful to e.g. pass some mocks for things like `federation_http_client` | |||
@@ -22,9 +22,8 @@ class FederationStreamTestCase(BaseStreamTestCase): | |||
def _get_worker_hs_config(self) -> dict: | |||
# enable federation sending on the worker | |||
config = super()._get_worker_hs_config() | |||
# TODO: make it so we don't need both of these | |||
config["send_federation"] = False | |||
config["worker_app"] = "synapse.app.federation_sender" | |||
config["worker_name"] = "federation_sender1" | |||
config["federation_sender_instances"] = ["federation_sender1"] | |||
return config | |||
def test_catchup(self): | |||
@@ -38,7 +38,7 @@ class WorkerAuthenticationTestCase(BaseMultiWorkerStreamTestCase): | |||
def _get_worker_hs_config(self) -> dict: | |||
config = self.default_config() | |||
config["worker_app"] = "synapse.app.client_reader" | |||
config["worker_app"] = "synapse.app.generic_worker" | |||
config["worker_replication_host"] = "testserv" | |||
config["worker_replication_http_port"] = "8765" | |||
@@ -53,7 +53,7 @@ class WorkerAuthenticationTestCase(BaseMultiWorkerStreamTestCase): | |||
4. Return the final request. | |||
""" | |||
worker_hs = self.make_worker_hs("synapse.app.client_reader") | |||
worker_hs = self.make_worker_hs("synapse.app.generic_worker") | |||
site = self._hs_to_site[worker_hs] | |||
channel_1 = make_request( | |||
@@ -22,20 +22,20 @@ logger = logging.getLogger(__name__) | |||
class ClientReaderTestCase(BaseMultiWorkerStreamTestCase): | |||
"""Test using one or more client readers for registration.""" | |||
"""Test using one or more generic workers for registration.""" | |||
servlets = [register.register_servlets] | |||
def _get_worker_hs_config(self) -> dict: | |||
config = self.default_config() | |||
config["worker_app"] = "synapse.app.client_reader" | |||
config["worker_app"] = "synapse.app.generic_worker" | |||
config["worker_replication_host"] = "testserv" | |||
config["worker_replication_http_port"] = "8765" | |||
return config | |||
def test_register_single_worker(self): | |||
"""Test that registration works when using a single client reader worker.""" | |||
worker_hs = self.make_worker_hs("synapse.app.client_reader") | |||
"""Test that registration works when using a single generic worker.""" | |||
worker_hs = self.make_worker_hs("synapse.app.generic_worker") | |||
site = self._hs_to_site[worker_hs] | |||
channel_1 = make_request( | |||
@@ -64,9 +64,9 @@ class ClientReaderTestCase(BaseMultiWorkerStreamTestCase): | |||
self.assertEqual(channel_2.json_body["user_id"], "@user:test") | |||
def test_register_multi_worker(self): | |||
"""Test that registration works when using multiple client reader workers.""" | |||
worker_hs_1 = self.make_worker_hs("synapse.app.client_reader") | |||
worker_hs_2 = self.make_worker_hs("synapse.app.client_reader") | |||
"""Test that registration works when using multiple generic workers.""" | |||
worker_hs_1 = self.make_worker_hs("synapse.app.generic_worker") | |||
worker_hs_2 = self.make_worker_hs("synapse.app.generic_worker") | |||
site_1 = self._hs_to_site[worker_hs_1] | |||
channel_1 = make_request( | |||
@@ -25,8 +25,9 @@ from tests.unittest import HomeserverTestCase | |||
class FederationAckTestCase(HomeserverTestCase): | |||
def default_config(self) -> dict: | |||
config = super().default_config() | |||
config["worker_app"] = "synapse.app.federation_sender" | |||
config["send_federation"] = False | |||
config["worker_app"] = "synapse.app.generic_worker" | |||
config["worker_name"] = "federation_sender1" | |||
config["federation_sender_instances"] = ["federation_sender1"] | |||
return config | |||
def make_homeserver(self, reactor, clock): | |||
@@ -27,17 +27,19 @@ logger = logging.getLogger(__name__) | |||
class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): | |||
""" | |||
Various tests for federation sending on workers. | |||
Federation sending is disabled by default, it will be enabled in each test by | |||
updating 'federation_sender_instances'. | |||
""" | |||
servlets = [ | |||
login.register_servlets, | |||
register_servlets_for_client_rest_resource, | |||
room.register_servlets, | |||
] | |||
def default_config(self): | |||
conf = super().default_config() | |||
conf["send_federation"] = False | |||
return conf | |||
def test_send_event_single_sender(self): | |||
"""Test that using a single federation sender worker correctly sends a | |||
new event. | |||
@@ -46,8 +48,11 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): | |||
mock_client.put_json.return_value = make_awaitable({}) | |||
self.make_worker_hs( | |||
"synapse.app.federation_sender", | |||
{"send_federation": False}, | |||
"synapse.app.generic_worker", | |||
{ | |||
"worker_name": "federation_sender1", | |||
"federation_sender_instances": ["federation_sender1"], | |||
}, | |||
federation_http_client=mock_client, | |||
) | |||
@@ -73,11 +78,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): | |||
mock_client1 = Mock(spec=["put_json"]) | |||
mock_client1.put_json.return_value = make_awaitable({}) | |||
self.make_worker_hs( | |||
"synapse.app.federation_sender", | |||
"synapse.app.generic_worker", | |||
{ | |||
"send_federation": True, | |||
"worker_name": "sender1", | |||
"federation_sender_instances": ["sender1", "sender2"], | |||
"worker_name": "federation_sender1", | |||
"federation_sender_instances": [ | |||
"federation_sender1", | |||
"federation_sender2", | |||
], | |||
}, | |||
federation_http_client=mock_client1, | |||
) | |||
@@ -85,11 +92,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): | |||
mock_client2 = Mock(spec=["put_json"]) | |||
mock_client2.put_json.return_value = make_awaitable({}) | |||
self.make_worker_hs( | |||
"synapse.app.federation_sender", | |||
"synapse.app.generic_worker", | |||
{ | |||
"send_federation": True, | |||
"worker_name": "sender2", | |||
"federation_sender_instances": ["sender1", "sender2"], | |||
"worker_name": "federation_sender2", | |||
"federation_sender_instances": [ | |||
"federation_sender1", | |||
"federation_sender2", | |||
], | |||
}, | |||
federation_http_client=mock_client2, | |||
) | |||
@@ -136,11 +145,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): | |||
mock_client1 = Mock(spec=["put_json"]) | |||
mock_client1.put_json.return_value = make_awaitable({}) | |||
self.make_worker_hs( | |||
"synapse.app.federation_sender", | |||
"synapse.app.generic_worker", | |||
{ | |||
"send_federation": True, | |||
"worker_name": "sender1", | |||
"federation_sender_instances": ["sender1", "sender2"], | |||
"worker_name": "federation_sender1", | |||
"federation_sender_instances": [ | |||
"federation_sender1", | |||
"federation_sender2", | |||
], | |||
}, | |||
federation_http_client=mock_client1, | |||
) | |||
@@ -148,11 +159,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): | |||
mock_client2 = Mock(spec=["put_json"]) | |||
mock_client2.put_json.return_value = make_awaitable({}) | |||
self.make_worker_hs( | |||
"synapse.app.federation_sender", | |||
"synapse.app.generic_worker", | |||
{ | |||
"send_federation": True, | |||
"worker_name": "sender2", | |||
"federation_sender_instances": ["sender1", "sender2"], | |||
"worker_name": "federation_sender2", | |||
"federation_sender_instances": [ | |||
"federation_sender1", | |||
"federation_sender2", | |||
], | |||
}, | |||
federation_http_client=mock_client2, | |||
) | |||
@@ -38,11 +38,6 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase): | |||
self.other_user_id = self.register_user("otheruser", "pass") | |||
self.other_access_token = self.login("otheruser", "pass") | |||
def default_config(self): | |||
conf = super().default_config() | |||
conf["start_pushers"] = False | |||
return conf | |||
def _create_pusher_and_send_msg(self, localpart): | |||
# Create a user that will get push notifications | |||
user_id = self.register_user(localpart, "pass") | |||
@@ -92,8 +87,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase): | |||
) | |||
self.make_worker_hs( | |||
"synapse.app.pusher", | |||
{"start_pushers": False}, | |||
"synapse.app.generic_worker", | |||
{"worker_name": "pusher1", "pusher_instances": ["pusher1"]}, | |||
proxied_blacklisted_http_client=http_client_mock, | |||
) | |||
@@ -122,9 +117,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase): | |||
) | |||
self.make_worker_hs( | |||
"synapse.app.pusher", | |||
"synapse.app.generic_worker", | |||
{ | |||
"start_pushers": True, | |||
"worker_name": "pusher1", | |||
"pusher_instances": ["pusher1", "pusher2"], | |||
}, | |||
@@ -137,9 +131,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase): | |||
) | |||
self.make_worker_hs( | |||
"synapse.app.pusher", | |||
"synapse.app.generic_worker", | |||
{ | |||
"start_pushers": True, | |||
"worker_name": "pusher2", | |||
"pusher_instances": ["pusher1", "pusher2"], | |||
}, | |||
@@ -125,7 +125,8 @@ def default_config( | |||
""" | |||
config_dict = { | |||
"server_name": name, | |||
"send_federation": False, | |||
# Setting this to an empty list turns off federation sending. | |||
"federation_sender_instances": [], | |||
"media_store_path": "media", | |||
# the test signing key is just an arbitrary ed25519 key to keep the config | |||
# parser happy | |||
@@ -183,8 +184,9 @@ def default_config( | |||
# rooms will fail. | |||
"default_room_version": DEFAULT_ROOM_VERSION, | |||
# disable user directory updates, because they get done in the | |||
# background, which upsets the test runner. | |||
"update_user_directory": False, | |||
# background, which upsets the test runner. Setting this to an | |||
# (obviously) fake worker name disables updating the user directory. | |||
"update_user_directory_from_worker": "does_not_exist_worker_name", | |||
"caches": {"global_factor": 1, "sync_response_cache_duration": 0}, | |||
"listeners": [{"port": 0, "type": "http"}], | |||
} | |||