You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

490 lines
18 KiB

  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Any, List, Optional, Sequence
  15. from twisted.test.proto_helpers import MemoryReactor
  16. from synapse.api.constants import EventTypes, Membership
  17. from synapse.events import EventBase
  18. from synapse.replication.tcp.commands import RdataCommand
  19. from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
  20. from synapse.replication.tcp.streams.events import (
  21. EventsStreamCurrentStateRow,
  22. EventsStreamEventRow,
  23. EventsStreamRow,
  24. )
  25. from synapse.rest import admin
  26. from synapse.rest.client import login, room
  27. from synapse.server import HomeServer
  28. from synapse.util import Clock
  29. from tests.replication._base import BaseStreamTestCase
  30. from tests.test_utils.event_injection import inject_event, inject_member_event
  31. class EventsStreamTestCase(BaseStreamTestCase):
  32. servlets = [
  33. admin.register_servlets,
  34. login.register_servlets,
  35. room.register_servlets,
  36. ]
  37. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  38. super().prepare(reactor, clock, hs)
  39. self.user_id = self.register_user("u1", "pass")
  40. self.user_tok = self.login("u1", "pass")
  41. self.reconnect()
  42. self.room_id = self.helper.create_room_as(tok=self.user_tok)
  43. self.test_handler.received_rdata_rows.clear()
  44. def test_update_function_event_row_limit(self) -> None:
  45. """Test replication with many non-state events
  46. Checks that all events are correctly replicated when there are lots of
  47. event rows to be replicated.
  48. """
  49. # disconnect, so that we can stack up some changes
  50. self.disconnect()
  51. # generate lots of non-state events. We inject them using inject_event
  52. # so that they are not send out over replication until we call self.replicate().
  53. events = [
  54. self._inject_test_event()
  55. for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 1)
  56. ]
  57. # also one state event
  58. state_event = self._inject_state_event()
  59. # check we're testing what we think we are: no rows should yet have been
  60. # received
  61. self.assertEqual([], self.test_handler.received_rdata_rows)
  62. # now reconnect to pull the updates
  63. self.reconnect()
  64. self.replicate()
  65. # we should have received all the expected rows in the right order (as
  66. # well as various cache invalidation updates which we ignore)
  67. received_rows = [
  68. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  69. ]
  70. for event in events:
  71. stream_name, token, row = received_rows.pop(0)
  72. self.assertEqual("events", stream_name)
  73. self.assertIsInstance(row, EventsStreamRow)
  74. self.assertEqual(row.type, "ev")
  75. self.assertIsInstance(row.data, EventsStreamEventRow)
  76. self.assertEqual(row.data.event_id, event.event_id)
  77. stream_name, token, row = received_rows.pop(0)
  78. self.assertIsInstance(row, EventsStreamRow)
  79. self.assertIsInstance(row.data, EventsStreamEventRow)
  80. self.assertEqual(row.data.event_id, state_event.event_id)
  81. stream_name, token, row = received_rows.pop(0)
  82. self.assertEqual("events", stream_name)
  83. self.assertIsInstance(row, EventsStreamRow)
  84. self.assertEqual(row.type, "state")
  85. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  86. self.assertEqual(row.data.event_id, state_event.event_id)
  87. self.assertEqual([], received_rows)
  88. def test_update_function_huge_state_change(self) -> None:
  89. """Test replication with many state events
  90. Ensures that all events are correctly replicated when there are lots of
  91. state change rows to be replicated.
  92. """
  93. # we want to generate lots of state changes at a single stream ID.
  94. #
  95. # We do this by having two branches in the DAG. On one, we have a moderator
  96. # which that generates lots of state; on the other, we de-op the moderator,
  97. # thus invalidating all the state.
  98. OTHER_USER = "@other_user:localhost"
  99. # have the user join
  100. self.get_success(
  101. inject_member_event(self.hs, self.room_id, OTHER_USER, Membership.JOIN)
  102. )
  103. # Update existing power levels with mod at PL50
  104. pls = self.helper.get_state(
  105. self.room_id, EventTypes.PowerLevels, tok=self.user_tok
  106. )
  107. pls["users"][OTHER_USER] = 50
  108. self.helper.send_state(
  109. self.room_id,
  110. EventTypes.PowerLevels,
  111. pls,
  112. tok=self.user_tok,
  113. )
  114. # this is the point in the DAG where we make a fork
  115. fork_point: Sequence[str] = self.get_success(
  116. self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id)
  117. )
  118. events = [
  119. self._inject_state_event(sender=OTHER_USER)
  120. for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT)
  121. ]
  122. self.replicate()
  123. # all those events and state changes should have landed
  124. self.assertGreaterEqual(
  125. len(self.test_handler.received_rdata_rows), 2 * len(events)
  126. )
  127. # disconnect, so that we can stack up the changes
  128. self.disconnect()
  129. self.test_handler.received_rdata_rows.clear()
  130. # a state event which doesn't get rolled back, to check that the state
  131. # before the huge update comes through ok
  132. state1 = self._inject_state_event()
  133. # roll back all the state by de-modding the user
  134. prev_events = fork_point
  135. pls["users"][OTHER_USER] = 0
  136. pl_event = self.get_success(
  137. inject_event(
  138. self.hs,
  139. prev_event_ids=list(prev_events),
  140. type=EventTypes.PowerLevels,
  141. state_key="",
  142. sender=self.user_id,
  143. room_id=self.room_id,
  144. content=pls,
  145. )
  146. )
  147. # one more bit of state that doesn't get rolled back
  148. state2 = self._inject_state_event()
  149. # check we're testing what we think we are: no rows should yet have been
  150. # received
  151. self.assertEqual([], self.test_handler.received_rdata_rows)
  152. # now reconnect to pull the updates
  153. self.reconnect()
  154. self.replicate()
  155. # we should have received all the expected rows in the right order (as
  156. # well as various cache invalidation updates which we ignore)
  157. #
  158. # we expect:
  159. #
  160. # - two rows for state1
  161. # - the PL event row, plus state rows for the PL event and each
  162. # of the states that got reverted.
  163. # - two rows for state2
  164. received_rows = [
  165. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  166. ]
  167. # first check the first two rows, which should be state1
  168. stream_name, token, row = received_rows.pop(0)
  169. self.assertEqual("events", stream_name)
  170. self.assertIsInstance(row, EventsStreamRow)
  171. self.assertEqual(row.type, "ev")
  172. self.assertIsInstance(row.data, EventsStreamEventRow)
  173. self.assertEqual(row.data.event_id, state1.event_id)
  174. stream_name, token, row = received_rows.pop(0)
  175. self.assertIsInstance(row, EventsStreamRow)
  176. self.assertEqual(row.type, "state")
  177. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  178. self.assertEqual(row.data.event_id, state1.event_id)
  179. # now the last two rows, which should be state2
  180. stream_name, token, row = received_rows.pop(-2)
  181. self.assertEqual("events", stream_name)
  182. self.assertIsInstance(row, EventsStreamRow)
  183. self.assertEqual(row.type, "ev")
  184. self.assertIsInstance(row.data, EventsStreamEventRow)
  185. self.assertEqual(row.data.event_id, state2.event_id)
  186. stream_name, token, row = received_rows.pop(-1)
  187. self.assertIsInstance(row, EventsStreamRow)
  188. self.assertEqual(row.type, "state")
  189. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  190. self.assertEqual(row.data.event_id, state2.event_id)
  191. # that should leave us with the rows for the PL event
  192. self.assertEqual(len(received_rows), len(events) + 2)
  193. stream_name, token, row = received_rows.pop(0)
  194. self.assertEqual("events", stream_name)
  195. self.assertIsInstance(row, EventsStreamRow)
  196. self.assertEqual(row.type, "ev")
  197. self.assertIsInstance(row.data, EventsStreamEventRow)
  198. self.assertEqual(row.data.event_id, pl_event.event_id)
  199. # the state rows are unsorted
  200. state_rows: List[EventsStreamCurrentStateRow] = []
  201. for stream_name, _, row in received_rows:
  202. self.assertEqual("events", stream_name)
  203. self.assertIsInstance(row, EventsStreamRow)
  204. self.assertEqual(row.type, "state")
  205. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  206. state_rows.append(row.data)
  207. state_rows.sort(key=lambda r: r.state_key)
  208. sr = state_rows.pop(0)
  209. self.assertEqual(sr.type, EventTypes.PowerLevels)
  210. self.assertEqual(sr.event_id, pl_event.event_id)
  211. for sr in state_rows:
  212. self.assertEqual(sr.type, "test_state_event")
  213. # "None" indicates the state has been deleted
  214. self.assertIsNone(sr.event_id)
  215. def test_update_function_state_row_limit(self) -> None:
  216. """Test replication with many state events over several stream ids."""
  217. # we want to generate lots of state changes, but for this test, we want to
  218. # spread out the state changes over a few stream IDs.
  219. #
  220. # We do this by having two branches in the DAG. On one, we have four moderators,
  221. # each of which that generates lots of state; on the other, we de-op the users,
  222. # thus invalidating all the state.
  223. NUM_USERS = 4
  224. STATES_PER_USER = _STREAM_UPDATE_TARGET_ROW_COUNT // 4 + 1
  225. user_ids = ["@user%i:localhost" % (i,) for i in range(NUM_USERS)]
  226. # have the users join
  227. for u in user_ids:
  228. self.get_success(
  229. inject_member_event(self.hs, self.room_id, u, Membership.JOIN)
  230. )
  231. # Update existing power levels with mod at PL50
  232. pls = self.helper.get_state(
  233. self.room_id, EventTypes.PowerLevels, tok=self.user_tok
  234. )
  235. pls["users"].update({u: 50 for u in user_ids})
  236. self.helper.send_state(
  237. self.room_id,
  238. EventTypes.PowerLevels,
  239. pls,
  240. tok=self.user_tok,
  241. )
  242. # this is the point in the DAG where we make a fork
  243. fork_point: Sequence[str] = self.get_success(
  244. self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id)
  245. )
  246. events: List[EventBase] = []
  247. for user in user_ids:
  248. events.extend(
  249. self._inject_state_event(sender=user) for _ in range(STATES_PER_USER)
  250. )
  251. self.replicate()
  252. # all those events and state changes should have landed
  253. self.assertGreaterEqual(
  254. len(self.test_handler.received_rdata_rows), 2 * len(events)
  255. )
  256. # disconnect, so that we can stack up the changes
  257. self.disconnect()
  258. self.test_handler.received_rdata_rows.clear()
  259. # now roll back all that state by de-modding the users
  260. prev_events = fork_point
  261. pl_events = []
  262. for u in user_ids:
  263. pls["users"][u] = 0
  264. e = self.get_success(
  265. inject_event(
  266. self.hs,
  267. prev_event_ids=list(prev_events),
  268. type=EventTypes.PowerLevels,
  269. state_key="",
  270. sender=self.user_id,
  271. room_id=self.room_id,
  272. content=pls,
  273. )
  274. )
  275. prev_events = [e.event_id]
  276. pl_events.append(e)
  277. # check we're testing what we think we are: no rows should yet have been
  278. # received
  279. self.assertEqual([], self.test_handler.received_rdata_rows)
  280. # now reconnect to pull the updates
  281. self.reconnect()
  282. self.replicate()
  283. # we should have received all the expected rows in the right order (as
  284. # well as various cache invalidation updates which we ignore)
  285. received_rows = [
  286. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  287. ]
  288. self.assertGreaterEqual(len(received_rows), len(events))
  289. for i in range(NUM_USERS):
  290. # for each user, we expect the PL event row, followed by state rows for
  291. # the PL event and each of the states that got reverted.
  292. stream_name, token, row = received_rows.pop(0)
  293. self.assertEqual("events", stream_name)
  294. self.assertIsInstance(row, EventsStreamRow)
  295. self.assertEqual(row.type, "ev")
  296. self.assertIsInstance(row.data, EventsStreamEventRow)
  297. self.assertEqual(row.data.event_id, pl_events[i].event_id)
  298. # the state rows are unsorted
  299. state_rows: List[EventsStreamCurrentStateRow] = []
  300. for _ in range(STATES_PER_USER + 1):
  301. stream_name, token, row = received_rows.pop(0)
  302. self.assertEqual("events", stream_name)
  303. self.assertIsInstance(row, EventsStreamRow)
  304. self.assertEqual(row.type, "state")
  305. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  306. state_rows.append(row.data)
  307. state_rows.sort(key=lambda r: r.state_key)
  308. sr = state_rows.pop(0)
  309. self.assertEqual(sr.type, EventTypes.PowerLevels)
  310. self.assertEqual(sr.event_id, pl_events[i].event_id)
  311. for sr in state_rows:
  312. self.assertEqual(sr.type, "test_state_event")
  313. # "None" indicates the state has been deleted
  314. self.assertIsNone(sr.event_id)
  315. self.assertEqual([], received_rows)
  316. def test_backwards_stream_id(self) -> None:
  317. """
  318. Test that RDATA that comes after the current position should be discarded.
  319. """
  320. # disconnect, so that we can stack up some changes
  321. self.disconnect()
  322. # Generate an events. We inject them using inject_event so that they are
  323. # not send out over replication until we call self.replicate().
  324. event = self._inject_test_event()
  325. # check we're testing what we think we are: no rows should yet have been
  326. # received
  327. self.assertEqual([], self.test_handler.received_rdata_rows)
  328. # now reconnect to pull the updates
  329. self.reconnect()
  330. self.replicate()
  331. # We should have received the expected single row (as well as various
  332. # cache invalidation updates which we ignore).
  333. received_rows = [
  334. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  335. ]
  336. # There should be a single received row.
  337. self.assertEqual(len(received_rows), 1)
  338. stream_name, token, row = received_rows[0]
  339. self.assertEqual("events", stream_name)
  340. self.assertIsInstance(row, EventsStreamRow)
  341. self.assertEqual(row.type, "ev")
  342. self.assertIsInstance(row.data, EventsStreamEventRow)
  343. self.assertEqual(row.data.event_id, event.event_id)
  344. # Reset the data.
  345. self.test_handler.received_rdata_rows = []
  346. # Save the current token for later.
  347. worker_events_stream = self.worker_hs.get_replication_streams()["events"]
  348. prev_token = worker_events_stream.current_token("master")
  349. # Manually send an old RDATA command, which should get dropped. This
  350. # re-uses the row from above, but with an earlier stream token.
  351. self.hs.get_replication_command_handler().send_command(
  352. RdataCommand("events", "master", 1, row)
  353. )
  354. # No updates have been received (because it was discard as old).
  355. received_rows = [
  356. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  357. ]
  358. self.assertEqual(len(received_rows), 0)
  359. # Ensure the stream has not gone backwards.
  360. current_token = worker_events_stream.current_token("master")
  361. self.assertGreaterEqual(current_token, prev_token)
  362. event_count = 0
  363. def _inject_test_event(
  364. self, body: Optional[str] = None, sender: Optional[str] = None, **kwargs: Any
  365. ) -> EventBase:
  366. if sender is None:
  367. sender = self.user_id
  368. if body is None:
  369. body = "event %i" % (self.event_count,)
  370. self.event_count += 1
  371. return self.get_success(
  372. inject_event(
  373. self.hs,
  374. room_id=self.room_id,
  375. sender=sender,
  376. type="test_event",
  377. content={"body": body},
  378. **kwargs,
  379. )
  380. )
  381. def _inject_state_event(
  382. self,
  383. body: Optional[str] = None,
  384. state_key: Optional[str] = None,
  385. sender: Optional[str] = None,
  386. ) -> EventBase:
  387. if sender is None:
  388. sender = self.user_id
  389. if state_key is None:
  390. state_key = "state_%i" % (self.event_count,)
  391. self.event_count += 1
  392. if body is None:
  393. body = "state event %s" % (state_key,)
  394. return self.get_success(
  395. inject_event(
  396. self.hs,
  397. room_id=self.room_id,
  398. sender=sender,
  399. type="test_state_event",
  400. state_key=state_key,
  401. content={"body": body},
  402. )
  403. )