You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

271 lines
9.6 KiB

  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from unittest.mock import AsyncMock, Mock
  16. from netaddr import IPSet
  17. from synapse.api.constants import EventTypes, Membership
  18. from synapse.events.builder import EventBuilderFactory
  19. from synapse.handlers.typing import TypingWriterHandler
  20. from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
  21. from synapse.rest.admin import register_servlets_for_client_rest_resource
  22. from synapse.rest.client import login, room
  23. from synapse.types import UserID, create_requester
  24. from tests.replication._base import BaseMultiWorkerStreamTestCase
  25. from tests.server import get_clock
  26. logger = logging.getLogger(__name__)
  27. class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
  28. """
  29. Various tests for federation sending on workers.
  30. Federation sending is disabled by default, it will be enabled in each test by
  31. updating 'federation_sender_instances'.
  32. """
  33. servlets = [
  34. login.register_servlets,
  35. register_servlets_for_client_rest_resource,
  36. room.register_servlets,
  37. ]
  38. def setUp(self) -> None:
  39. super().setUp()
  40. reactor, _ = get_clock()
  41. self.matrix_federation_agent = MatrixFederationAgent(
  42. reactor,
  43. tls_client_options_factory=None,
  44. user_agent=b"SynapseInTrialTest/0.0.0",
  45. ip_allowlist=None,
  46. ip_blocklist=IPSet(),
  47. )
  48. def test_send_event_single_sender(self) -> None:
  49. """Test that using a single federation sender worker correctly sends a
  50. new event.
  51. """
  52. mock_client = Mock(spec=["put_json"])
  53. mock_client.put_json = AsyncMock(return_value={})
  54. mock_client.agent = self.matrix_federation_agent
  55. self.make_worker_hs(
  56. "synapse.app.generic_worker",
  57. {
  58. "worker_name": "federation_sender1",
  59. "federation_sender_instances": ["federation_sender1"],
  60. },
  61. federation_http_client=mock_client,
  62. )
  63. user = self.register_user("user", "pass")
  64. token = self.login("user", "pass")
  65. room = self.create_room_with_remote_server(user, token)
  66. mock_client.put_json.reset_mock()
  67. self.create_and_send_event(room, UserID.from_string(user))
  68. self.replicate()
  69. # Assert that the event was sent out over federation.
  70. mock_client.put_json.assert_called()
  71. self.assertEqual(mock_client.put_json.call_args[0][0], "other_server")
  72. self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus"))
  73. def test_send_event_sharded(self) -> None:
  74. """Test that using two federation sender workers correctly sends
  75. new events.
  76. """
  77. mock_client1 = Mock(spec=["put_json"])
  78. mock_client1.put_json = AsyncMock(return_value={})
  79. mock_client1.agent = self.matrix_federation_agent
  80. self.make_worker_hs(
  81. "synapse.app.generic_worker",
  82. {
  83. "worker_name": "federation_sender1",
  84. "federation_sender_instances": [
  85. "federation_sender1",
  86. "federation_sender2",
  87. ],
  88. },
  89. federation_http_client=mock_client1,
  90. )
  91. mock_client2 = Mock(spec=["put_json"])
  92. mock_client2.put_json = AsyncMock(return_value={})
  93. mock_client2.agent = self.matrix_federation_agent
  94. self.make_worker_hs(
  95. "synapse.app.generic_worker",
  96. {
  97. "worker_name": "federation_sender2",
  98. "federation_sender_instances": [
  99. "federation_sender1",
  100. "federation_sender2",
  101. ],
  102. },
  103. federation_http_client=mock_client2,
  104. )
  105. user = self.register_user("user2", "pass")
  106. token = self.login("user2", "pass")
  107. sent_on_1 = False
  108. sent_on_2 = False
  109. for i in range(20):
  110. server_name = "other_server_%d" % (i,)
  111. room = self.create_room_with_remote_server(user, token, server_name)
  112. mock_client1.reset_mock()
  113. mock_client2.reset_mock()
  114. self.create_and_send_event(room, UserID.from_string(user))
  115. self.replicate()
  116. if mock_client1.put_json.called:
  117. sent_on_1 = True
  118. mock_client2.put_json.assert_not_called()
  119. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  120. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("pdus"))
  121. elif mock_client2.put_json.called:
  122. sent_on_2 = True
  123. mock_client1.put_json.assert_not_called()
  124. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  125. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("pdus"))
  126. else:
  127. raise AssertionError(
  128. "Expected send transaction from one or the other sender"
  129. )
  130. if sent_on_1 and sent_on_2:
  131. break
  132. self.assertTrue(sent_on_1)
  133. self.assertTrue(sent_on_2)
  134. def test_send_typing_sharded(self) -> None:
  135. """Test that using two federation sender workers correctly sends
  136. new typing EDUs.
  137. """
  138. mock_client1 = Mock(spec=["put_json"])
  139. mock_client1.put_json = AsyncMock(return_value={})
  140. mock_client1.agent = self.matrix_federation_agent
  141. self.make_worker_hs(
  142. "synapse.app.generic_worker",
  143. {
  144. "worker_name": "federation_sender1",
  145. "federation_sender_instances": [
  146. "federation_sender1",
  147. "federation_sender2",
  148. ],
  149. },
  150. federation_http_client=mock_client1,
  151. )
  152. mock_client2 = Mock(spec=["put_json"])
  153. mock_client2.put_json = AsyncMock(return_value={})
  154. mock_client2.agent = self.matrix_federation_agent
  155. self.make_worker_hs(
  156. "synapse.app.generic_worker",
  157. {
  158. "worker_name": "federation_sender2",
  159. "federation_sender_instances": [
  160. "federation_sender1",
  161. "federation_sender2",
  162. ],
  163. },
  164. federation_http_client=mock_client2,
  165. )
  166. user = self.register_user("user3", "pass")
  167. token = self.login("user3", "pass")
  168. typing_handler = self.hs.get_typing_handler()
  169. assert isinstance(typing_handler, TypingWriterHandler)
  170. sent_on_1 = False
  171. sent_on_2 = False
  172. for i in range(20):
  173. server_name = "other_server_%d" % (i,)
  174. room = self.create_room_with_remote_server(user, token, server_name)
  175. mock_client1.reset_mock()
  176. mock_client2.reset_mock()
  177. self.get_success(
  178. typing_handler.started_typing(
  179. target_user=UserID.from_string(user),
  180. requester=create_requester(user),
  181. room_id=room,
  182. timeout=20000,
  183. )
  184. )
  185. self.replicate()
  186. if mock_client1.put_json.called:
  187. sent_on_1 = True
  188. mock_client2.put_json.assert_not_called()
  189. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  190. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("edus"))
  191. elif mock_client2.put_json.called:
  192. sent_on_2 = True
  193. mock_client1.put_json.assert_not_called()
  194. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  195. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("edus"))
  196. else:
  197. raise AssertionError(
  198. "Expected send transaction from one or the other sender"
  199. )
  200. if sent_on_1 and sent_on_2:
  201. break
  202. self.assertTrue(sent_on_1)
  203. self.assertTrue(sent_on_2)
  204. def create_room_with_remote_server(
  205. self, user: str, token: str, remote_server: str = "other_server"
  206. ) -> str:
  207. room = self.helper.create_room_as(user, tok=token)
  208. store = self.hs.get_datastores().main
  209. federation = self.hs.get_federation_event_handler()
  210. prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
  211. room_version = self.get_success(store.get_room_version(room))
  212. factory = EventBuilderFactory(self.hs)
  213. factory.hostname = remote_server
  214. user_id = UserID("user", remote_server).to_string()
  215. event_dict = {
  216. "type": EventTypes.Member,
  217. "state_key": user_id,
  218. "content": {"membership": Membership.JOIN},
  219. "sender": user_id,
  220. "room_id": room,
  221. }
  222. builder = factory.for_room_version(room_version, event_dict)
  223. join_event = self.get_success(
  224. builder.build(prev_event_ids=list(prev_event_ids), auth_event_ids=None)
  225. )
  226. self.get_success(federation.on_send_membership_event(remote_server, join_event))
  227. self.replicate()
  228. return room