You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

244 lines
8.7 KiB

  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from twisted.test.proto_helpers import MemoryReactor
  16. from synapse.api.constants import ReceiptTypes
  17. from synapse.rest import admin
  18. from synapse.rest.client import login, receipts, room, sync
  19. from synapse.server import HomeServer
  20. from synapse.storage.util.id_generators import MultiWriterIdGenerator
  21. from synapse.types import StreamToken
  22. from synapse.util import Clock
  23. from tests.replication._base import BaseMultiWorkerStreamTestCase
  24. from tests.server import make_request
  25. logger = logging.getLogger(__name__)
  26. class ReceiptsShardTestCase(BaseMultiWorkerStreamTestCase):
  27. """Checks receipts sharding works"""
  28. servlets = [
  29. admin.register_servlets_for_client_rest_resource,
  30. room.register_servlets,
  31. login.register_servlets,
  32. sync.register_servlets,
  33. receipts.register_servlets,
  34. ]
  35. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  36. # Register a user who sends a message that we'll get notified about
  37. self.other_user_id = self.register_user("otheruser", "pass")
  38. self.other_access_token = self.login("otheruser", "pass")
  39. self.room_creator = self.hs.get_room_creation_handler()
  40. self.store = hs.get_datastores().main
  41. def default_config(self) -> dict:
  42. conf = super().default_config()
  43. conf["stream_writers"] = {"receipts": ["worker1", "worker2"]}
  44. conf["instance_map"] = {
  45. "main": {"host": "testserv", "port": 8765},
  46. "worker1": {"host": "testserv", "port": 1001},
  47. "worker2": {"host": "testserv", "port": 1002},
  48. }
  49. return conf
  50. def test_basic(self) -> None:
  51. """Simple test to ensure that receipts can be sent on multiple
  52. workers.
  53. """
  54. worker1 = self.make_worker_hs(
  55. "synapse.app.generic_worker",
  56. {"worker_name": "worker1"},
  57. )
  58. worker1_site = self._hs_to_site[worker1]
  59. worker2 = self.make_worker_hs(
  60. "synapse.app.generic_worker",
  61. {"worker_name": "worker2"},
  62. )
  63. worker2_site = self._hs_to_site[worker2]
  64. user_id = self.register_user("user", "pass")
  65. access_token = self.login("user", "pass")
  66. # Create a room
  67. room_id = self.helper.create_room_as(user_id, tok=access_token)
  68. # The other user joins
  69. self.helper.join(
  70. room=room_id, user=self.other_user_id, tok=self.other_access_token
  71. )
  72. # First user sends a message, the other users sends a receipt.
  73. response = self.helper.send(room_id, body="Hi!", tok=self.other_access_token)
  74. event_id = response["event_id"]
  75. channel = make_request(
  76. reactor=self.reactor,
  77. site=worker1_site,
  78. method="POST",
  79. path=f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_id}",
  80. access_token=access_token,
  81. content={},
  82. )
  83. self.assertEqual(200, channel.code)
  84. # Now we do it again using the second worker
  85. response = self.helper.send(room_id, body="Hi!", tok=self.other_access_token)
  86. event_id = response["event_id"]
  87. channel = make_request(
  88. reactor=self.reactor,
  89. site=worker2_site,
  90. method="POST",
  91. path=f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_id}",
  92. access_token=access_token,
  93. content={},
  94. )
  95. self.assertEqual(200, channel.code)
  96. def test_vector_clock_token(self) -> None:
  97. """Tests that using a stream token with a vector clock component works
  98. correctly with basic /sync usage.
  99. """
  100. worker_hs1 = self.make_worker_hs(
  101. "synapse.app.generic_worker",
  102. {"worker_name": "worker1"},
  103. )
  104. worker1_site = self._hs_to_site[worker_hs1]
  105. worker_hs2 = self.make_worker_hs(
  106. "synapse.app.generic_worker",
  107. {"worker_name": "worker2"},
  108. )
  109. worker2_site = self._hs_to_site[worker_hs2]
  110. sync_hs = self.make_worker_hs(
  111. "synapse.app.generic_worker",
  112. {"worker_name": "sync"},
  113. )
  114. sync_hs_site = self._hs_to_site[sync_hs]
  115. user_id = self.register_user("user", "pass")
  116. access_token = self.login("user", "pass")
  117. store = self.hs.get_datastores().main
  118. room_id = self.helper.create_room_as(user_id, tok=access_token)
  119. # The other user joins
  120. self.helper.join(
  121. room=room_id, user=self.other_user_id, tok=self.other_access_token
  122. )
  123. response = self.helper.send(room_id, body="Hi!", tok=self.other_access_token)
  124. first_event = response["event_id"]
  125. # Do an initial sync so that we're up to date.
  126. channel = make_request(
  127. self.reactor, sync_hs_site, "GET", "/sync", access_token=access_token
  128. )
  129. next_batch = channel.json_body["next_batch"]
  130. # We now gut wrench into the events stream MultiWriterIdGenerator on
  131. # worker2 to mimic it getting stuck persisting a receipt. This ensures
  132. # that when we send an event on worker1 we end up in a state where
  133. # worker2 events stream position lags that on worker1, resulting in a
  134. # receipts token with a non-empty instance map component.
  135. #
  136. # Worker2's receipts stream position will not advance until we call
  137. # __aexit__ again.
  138. worker_store2 = worker_hs2.get_datastores().main
  139. assert isinstance(worker_store2._receipts_id_gen, MultiWriterIdGenerator)
  140. actx = worker_store2._receipts_id_gen.get_next()
  141. self.get_success(actx.__aenter__())
  142. channel = make_request(
  143. reactor=self.reactor,
  144. site=worker1_site,
  145. method="POST",
  146. path=f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{first_event}",
  147. access_token=access_token,
  148. content={},
  149. )
  150. self.assertEqual(200, channel.code)
  151. # Assert that the current stream token has an instance map component, as
  152. # we are trying to test vector clock tokens.
  153. receipts_token = store.get_max_receipt_stream_id()
  154. self.assertGreater(len(receipts_token.instance_map), 0)
  155. # Check that syncing still gets the new receipt, despite the gap in the
  156. # stream IDs.
  157. channel = make_request(
  158. self.reactor,
  159. sync_hs_site,
  160. "GET",
  161. f"/sync?since={next_batch}",
  162. access_token=access_token,
  163. )
  164. # We should only see the new event and nothing else
  165. self.assertIn(room_id, channel.json_body["rooms"]["join"])
  166. events = channel.json_body["rooms"]["join"][room_id]["ephemeral"]["events"]
  167. self.assertEqual(len(events), 1)
  168. self.assertIn(first_event, events[0]["content"])
  169. # Get the next batch and makes sure its a vector clock style token.
  170. vector_clock_token = channel.json_body["next_batch"]
  171. parsed_token = self.get_success(
  172. StreamToken.from_string(store, vector_clock_token)
  173. )
  174. self.assertGreaterEqual(len(parsed_token.receipt_key.instance_map), 1)
  175. # Now that we've got a vector clock token we finish the fake persisting
  176. # a receipt we started above.
  177. self.get_success(actx.__aexit__(None, None, None))
  178. # Now try and send another receipts to the other worker.
  179. response = self.helper.send(room_id, body="Hi!", tok=self.other_access_token)
  180. second_event = response["event_id"]
  181. channel = make_request(
  182. reactor=self.reactor,
  183. site=worker2_site,
  184. method="POST",
  185. path=f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{second_event}",
  186. access_token=access_token,
  187. content={},
  188. )
  189. channel = make_request(
  190. self.reactor,
  191. sync_hs_site,
  192. "GET",
  193. f"/sync?since={vector_clock_token}",
  194. access_token=access_token,
  195. )
  196. self.assertIn(room_id, channel.json_body["rooms"]["join"])
  197. events = channel.json_body["rooms"]["join"][room_id]["ephemeral"]["events"]
  198. self.assertEqual(len(events), 1)
  199. self.assertIn(second_event, events[0]["content"])