You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

808 lines
26 KiB

  1. # Copyright 2016-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Optional, Tuple
  15. from twisted.test.proto_helpers import MemoryReactor
  16. from synapse.api.constants import MAIN_TIMELINE, RelationTypes
  17. from synapse.rest import admin
  18. from synapse.rest.client import login, room
  19. from synapse.server import HomeServer
  20. from synapse.storage.databases.main.event_push_actions import NotifCounts
  21. from synapse.types import JsonDict
  22. from synapse.util import Clock
  23. from tests.unittest import HomeserverTestCase
  24. class EventPushActionsStoreTestCase(HomeserverTestCase):
  25. servlets = [
  26. admin.register_servlets,
  27. room.register_servlets,
  28. login.register_servlets,
  29. ]
  30. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  31. self.store = hs.get_datastores().main
  32. persist_events_store = hs.get_datastores().persist_events
  33. assert persist_events_store is not None
  34. self.persist_events_store = persist_events_store
  35. def _create_users_and_room(self) -> Tuple[str, str, str, str, str]:
  36. """
  37. Creates two users and a shared room.
  38. Returns:
  39. Tuple of (user 1 ID, user 1 token, user 2 ID, user 2 token, room ID).
  40. """
  41. # Create a user to receive notifications and send receipts.
  42. user_id = self.register_user("user1235", "pass")
  43. token = self.login("user1235", "pass")
  44. # And another users to send events.
  45. other_id = self.register_user("other", "pass")
  46. other_token = self.login("other", "pass")
  47. # Create a room and put both users in it.
  48. room_id = self.helper.create_room_as(user_id, tok=token)
  49. self.helper.join(room_id, other_id, tok=other_token)
  50. return user_id, token, other_id, other_token, room_id
  51. def test_get_unread_push_actions_for_user_in_range(self) -> None:
  52. """Test getting unread push actions for HTTP and email pushers."""
  53. user_id, token, _, other_token, room_id = self._create_users_and_room()
  54. # Create two events, one of which is a highlight.
  55. first_event_id = self.helper.send_event(
  56. room_id,
  57. type="m.room.message",
  58. content={"msgtype": "m.text", "body": "msg"},
  59. tok=other_token,
  60. )["event_id"]
  61. second_event_id = self.helper.send_event(
  62. room_id,
  63. type="m.room.message",
  64. content={
  65. "msgtype": "m.text",
  66. "body": user_id,
  67. "m.relates_to": {
  68. "rel_type": RelationTypes.THREAD,
  69. "event_id": first_event_id,
  70. },
  71. },
  72. tok=other_token,
  73. )["event_id"]
  74. # Fetch unread actions for HTTP pushers.
  75. http_actions = self.get_success(
  76. self.store.get_unread_push_actions_for_user_in_range_for_http(
  77. user_id, 0, 1000, 20
  78. )
  79. )
  80. self.assertEqual(2, len(http_actions))
  81. # Fetch unread actions for email pushers.
  82. email_actions = self.get_success(
  83. self.store.get_unread_push_actions_for_user_in_range_for_email(
  84. user_id, 0, 1000, 20
  85. )
  86. )
  87. self.assertEqual(2, len(email_actions))
  88. # Send a receipt, which should clear the first action.
  89. self.get_success(
  90. self.store.insert_receipt(
  91. room_id,
  92. "m.read",
  93. user_id=user_id,
  94. event_ids=[first_event_id],
  95. thread_id=None,
  96. data={},
  97. )
  98. )
  99. http_actions = self.get_success(
  100. self.store.get_unread_push_actions_for_user_in_range_for_http(
  101. user_id, 0, 1000, 20
  102. )
  103. )
  104. self.assertEqual(1, len(http_actions))
  105. email_actions = self.get_success(
  106. self.store.get_unread_push_actions_for_user_in_range_for_email(
  107. user_id, 0, 1000, 20
  108. )
  109. )
  110. self.assertEqual(1, len(email_actions))
  111. # Send a thread receipt to clear the thread action.
  112. self.get_success(
  113. self.store.insert_receipt(
  114. room_id,
  115. "m.read",
  116. user_id=user_id,
  117. event_ids=[second_event_id],
  118. thread_id=first_event_id,
  119. data={},
  120. )
  121. )
  122. http_actions = self.get_success(
  123. self.store.get_unread_push_actions_for_user_in_range_for_http(
  124. user_id, 0, 1000, 20
  125. )
  126. )
  127. self.assertEqual([], http_actions)
  128. email_actions = self.get_success(
  129. self.store.get_unread_push_actions_for_user_in_range_for_email(
  130. user_id, 0, 1000, 20
  131. )
  132. )
  133. self.assertEqual([], email_actions)
  134. def test_count_aggregation(self) -> None:
  135. # Create a user to receive notifications and send receipts.
  136. user_id, token, _, other_token, room_id = self._create_users_and_room()
  137. last_event_id = ""
  138. def _assert_counts(notif_count: int, highlight_count: int) -> None:
  139. counts = self.get_success(
  140. self.store.db_pool.runInteraction(
  141. "get-unread-counts",
  142. self.store._get_unread_counts_by_receipt_txn,
  143. room_id,
  144. user_id,
  145. )
  146. )
  147. self.assertEqual(
  148. counts.main_timeline,
  149. NotifCounts(
  150. notify_count=notif_count,
  151. unread_count=0,
  152. highlight_count=highlight_count,
  153. ),
  154. )
  155. self.assertEqual(counts.threads, {})
  156. aggregate_counts = self.get_success(
  157. self.store.db_pool.runInteraction(
  158. "get-aggregate-unread-counts",
  159. self.store._get_unread_counts_by_room_for_user_txn,
  160. user_id,
  161. )
  162. )
  163. self.assertEqual(aggregate_counts[room_id], notif_count)
  164. def _create_event(highlight: bool = False) -> str:
  165. result = self.helper.send_event(
  166. room_id,
  167. type="m.room.message",
  168. content={"msgtype": "m.text", "body": user_id if highlight else "msg"},
  169. tok=other_token,
  170. )
  171. nonlocal last_event_id
  172. last_event_id = result["event_id"]
  173. return last_event_id
  174. def _rotate() -> None:
  175. self.get_success(self.store._rotate_notifs())
  176. def _mark_read(event_id: str) -> None:
  177. self.get_success(
  178. self.store.insert_receipt(
  179. room_id,
  180. "m.read",
  181. user_id=user_id,
  182. event_ids=[event_id],
  183. thread_id=None,
  184. data={},
  185. )
  186. )
  187. _assert_counts(0, 0)
  188. _create_event()
  189. _assert_counts(1, 0)
  190. _rotate()
  191. _assert_counts(1, 0)
  192. event_id = _create_event()
  193. _assert_counts(2, 0)
  194. _rotate()
  195. _assert_counts(2, 0)
  196. _create_event()
  197. _mark_read(event_id)
  198. _assert_counts(1, 0)
  199. _mark_read(last_event_id)
  200. _assert_counts(0, 0)
  201. _create_event()
  202. _assert_counts(1, 0)
  203. _rotate()
  204. _assert_counts(1, 0)
  205. # Delete old event push actions, this should not affect the (summarised) count.
  206. #
  207. # All event push actions are kept for 24 hours, so need to move forward
  208. # in time.
  209. self.pump(60 * 60 * 24)
  210. self.get_success(self.store._remove_old_push_actions_that_have_rotated())
  211. # Double check that the event push actions have been cleared (i.e. that
  212. # any results *must* come from the summary).
  213. result = self.get_success(
  214. self.store.db_pool.simple_select_list(
  215. table="event_push_actions",
  216. keyvalues={"1": 1},
  217. retcols=("event_id",),
  218. desc="",
  219. )
  220. )
  221. self.assertEqual(result, [])
  222. _assert_counts(1, 0)
  223. _mark_read(last_event_id)
  224. _assert_counts(0, 0)
  225. event_id = _create_event(True)
  226. _assert_counts(1, 1)
  227. _rotate()
  228. _assert_counts(1, 1)
  229. # Check that adding another notification and rotating after highlight
  230. # works.
  231. _create_event()
  232. _rotate()
  233. _assert_counts(2, 1)
  234. # Check that sending read receipts at different points results in the
  235. # right counts.
  236. _mark_read(event_id)
  237. _assert_counts(1, 0)
  238. _mark_read(last_event_id)
  239. _assert_counts(0, 0)
  240. _create_event(True)
  241. _assert_counts(1, 1)
  242. _mark_read(last_event_id)
  243. _assert_counts(0, 0)
  244. _rotate()
  245. _assert_counts(0, 0)
  246. def test_count_aggregation_threads(self) -> None:
  247. """
  248. This is essentially the same test as test_count_aggregation, but adds
  249. events to the main timeline and to a thread.
  250. """
  251. user_id, token, _, other_token, room_id = self._create_users_and_room()
  252. thread_id: str
  253. last_event_id = ""
  254. def _assert_counts(
  255. notif_count: int,
  256. highlight_count: int,
  257. thread_notif_count: int,
  258. thread_highlight_count: int,
  259. ) -> None:
  260. counts = self.get_success(
  261. self.store.db_pool.runInteraction(
  262. "get-unread-counts",
  263. self.store._get_unread_counts_by_receipt_txn,
  264. room_id,
  265. user_id,
  266. )
  267. )
  268. self.assertEqual(
  269. counts.main_timeline,
  270. NotifCounts(
  271. notify_count=notif_count,
  272. unread_count=0,
  273. highlight_count=highlight_count,
  274. ),
  275. )
  276. if thread_notif_count or thread_highlight_count:
  277. self.assertEqual(
  278. counts.threads,
  279. {
  280. thread_id: NotifCounts(
  281. notify_count=thread_notif_count,
  282. unread_count=0,
  283. highlight_count=thread_highlight_count,
  284. ),
  285. },
  286. )
  287. else:
  288. self.assertEqual(counts.threads, {})
  289. aggregate_counts = self.get_success(
  290. self.store.db_pool.runInteraction(
  291. "get-aggregate-unread-counts",
  292. self.store._get_unread_counts_by_room_for_user_txn,
  293. user_id,
  294. )
  295. )
  296. self.assertEqual(
  297. aggregate_counts[room_id], notif_count + thread_notif_count
  298. )
  299. def _create_event(
  300. highlight: bool = False, thread_id: Optional[str] = None
  301. ) -> str:
  302. content: JsonDict = {
  303. "msgtype": "m.text",
  304. "body": user_id if highlight else "msg",
  305. }
  306. if thread_id:
  307. content["m.relates_to"] = {
  308. "rel_type": "m.thread",
  309. "event_id": thread_id,
  310. }
  311. result = self.helper.send_event(
  312. room_id,
  313. type="m.room.message",
  314. content=content,
  315. tok=other_token,
  316. )
  317. nonlocal last_event_id
  318. last_event_id = result["event_id"]
  319. return last_event_id
  320. def _rotate() -> None:
  321. self.get_success(self.store._rotate_notifs())
  322. def _mark_read(event_id: str, thread_id: str = MAIN_TIMELINE) -> None:
  323. self.get_success(
  324. self.store.insert_receipt(
  325. room_id,
  326. "m.read",
  327. user_id=user_id,
  328. event_ids=[event_id],
  329. thread_id=thread_id,
  330. data={},
  331. )
  332. )
  333. _assert_counts(0, 0, 0, 0)
  334. thread_id = _create_event()
  335. _assert_counts(1, 0, 0, 0)
  336. _rotate()
  337. _assert_counts(1, 0, 0, 0)
  338. _create_event(thread_id=thread_id)
  339. _assert_counts(1, 0, 1, 0)
  340. _rotate()
  341. _assert_counts(1, 0, 1, 0)
  342. _create_event()
  343. _assert_counts(2, 0, 1, 0)
  344. _rotate()
  345. _assert_counts(2, 0, 1, 0)
  346. event_id = _create_event(thread_id=thread_id)
  347. _assert_counts(2, 0, 2, 0)
  348. _rotate()
  349. _assert_counts(2, 0, 2, 0)
  350. _create_event()
  351. _create_event(thread_id=thread_id)
  352. _mark_read(event_id)
  353. _assert_counts(1, 0, 3, 0)
  354. _mark_read(event_id, thread_id)
  355. _assert_counts(1, 0, 1, 0)
  356. _mark_read(last_event_id)
  357. _mark_read(last_event_id, thread_id)
  358. _assert_counts(0, 0, 0, 0)
  359. _create_event()
  360. _create_event(thread_id=thread_id)
  361. _assert_counts(1, 0, 1, 0)
  362. _rotate()
  363. _assert_counts(1, 0, 1, 0)
  364. # Delete old event push actions, this should not affect the (summarised) count.
  365. self.get_success(self.store._remove_old_push_actions_that_have_rotated())
  366. _assert_counts(1, 0, 1, 0)
  367. _mark_read(last_event_id)
  368. _mark_read(last_event_id, thread_id)
  369. _assert_counts(0, 0, 0, 0)
  370. _create_event(True)
  371. _assert_counts(1, 1, 0, 0)
  372. _rotate()
  373. _assert_counts(1, 1, 0, 0)
  374. event_id = _create_event(True, thread_id)
  375. _assert_counts(1, 1, 1, 1)
  376. _rotate()
  377. _assert_counts(1, 1, 1, 1)
  378. # Check that adding another notification and rotating after highlight
  379. # works.
  380. _create_event()
  381. _rotate()
  382. _assert_counts(2, 1, 1, 1)
  383. _create_event(thread_id=thread_id)
  384. _rotate()
  385. _assert_counts(2, 1, 2, 1)
  386. # Check that sending read receipts at different points results in the
  387. # right counts.
  388. _mark_read(event_id)
  389. _assert_counts(1, 0, 2, 1)
  390. _mark_read(event_id, thread_id)
  391. _assert_counts(1, 0, 1, 0)
  392. _mark_read(last_event_id)
  393. _assert_counts(0, 0, 1, 0)
  394. _mark_read(last_event_id, thread_id)
  395. _assert_counts(0, 0, 0, 0)
  396. _create_event(True)
  397. _create_event(True, thread_id)
  398. _assert_counts(1, 1, 1, 1)
  399. _mark_read(last_event_id)
  400. _mark_read(last_event_id, thread_id)
  401. _assert_counts(0, 0, 0, 0)
  402. _rotate()
  403. _assert_counts(0, 0, 0, 0)
  404. def test_count_aggregation_mixed(self) -> None:
  405. """
  406. This is essentially the same test as test_count_aggregation_threads, but
  407. sends both unthreaded and threaded receipts.
  408. """
  409. user_id, token, _, other_token, room_id = self._create_users_and_room()
  410. thread_id: str
  411. last_event_id = ""
  412. def _assert_counts(
  413. notif_count: int,
  414. highlight_count: int,
  415. thread_notif_count: int,
  416. thread_highlight_count: int,
  417. ) -> None:
  418. counts = self.get_success(
  419. self.store.db_pool.runInteraction(
  420. "get-unread-counts",
  421. self.store._get_unread_counts_by_receipt_txn,
  422. room_id,
  423. user_id,
  424. )
  425. )
  426. self.assertEqual(
  427. counts.main_timeline,
  428. NotifCounts(
  429. notify_count=notif_count,
  430. unread_count=0,
  431. highlight_count=highlight_count,
  432. ),
  433. )
  434. if thread_notif_count or thread_highlight_count:
  435. self.assertEqual(
  436. counts.threads,
  437. {
  438. thread_id: NotifCounts(
  439. notify_count=thread_notif_count,
  440. unread_count=0,
  441. highlight_count=thread_highlight_count,
  442. ),
  443. },
  444. )
  445. else:
  446. self.assertEqual(counts.threads, {})
  447. aggregate_counts = self.get_success(
  448. self.store.db_pool.runInteraction(
  449. "get-aggregate-unread-counts",
  450. self.store._get_unread_counts_by_room_for_user_txn,
  451. user_id,
  452. )
  453. )
  454. self.assertEqual(
  455. aggregate_counts[room_id], notif_count + thread_notif_count
  456. )
  457. def _create_event(
  458. highlight: bool = False, thread_id: Optional[str] = None
  459. ) -> str:
  460. content: JsonDict = {
  461. "msgtype": "m.text",
  462. "body": user_id if highlight else "msg",
  463. }
  464. if thread_id:
  465. content["m.relates_to"] = {
  466. "rel_type": "m.thread",
  467. "event_id": thread_id,
  468. }
  469. result = self.helper.send_event(
  470. room_id,
  471. type="m.room.message",
  472. content=content,
  473. tok=other_token,
  474. )
  475. nonlocal last_event_id
  476. last_event_id = result["event_id"]
  477. return last_event_id
  478. def _rotate() -> None:
  479. self.get_success(self.store._rotate_notifs())
  480. def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None:
  481. self.get_success(
  482. self.store.insert_receipt(
  483. room_id,
  484. "m.read",
  485. user_id=user_id,
  486. event_ids=[event_id],
  487. thread_id=thread_id,
  488. data={},
  489. )
  490. )
  491. _assert_counts(0, 0, 0, 0)
  492. thread_id = _create_event()
  493. _assert_counts(1, 0, 0, 0)
  494. _rotate()
  495. _assert_counts(1, 0, 0, 0)
  496. _create_event(thread_id=thread_id)
  497. _assert_counts(1, 0, 1, 0)
  498. _rotate()
  499. _assert_counts(1, 0, 1, 0)
  500. _create_event()
  501. _assert_counts(2, 0, 1, 0)
  502. _rotate()
  503. _assert_counts(2, 0, 1, 0)
  504. event_id = _create_event(thread_id=thread_id)
  505. _assert_counts(2, 0, 2, 0)
  506. _rotate()
  507. _assert_counts(2, 0, 2, 0)
  508. _create_event()
  509. _create_event(thread_id=thread_id)
  510. _mark_read(event_id)
  511. _assert_counts(1, 0, 1, 0)
  512. _mark_read(last_event_id, MAIN_TIMELINE)
  513. _mark_read(last_event_id, thread_id)
  514. _assert_counts(0, 0, 0, 0)
  515. _create_event()
  516. _create_event(thread_id=thread_id)
  517. _assert_counts(1, 0, 1, 0)
  518. _rotate()
  519. _assert_counts(1, 0, 1, 0)
  520. # Delete old event push actions, this should not affect the (summarised) count.
  521. self.get_success(self.store._remove_old_push_actions_that_have_rotated())
  522. _assert_counts(1, 0, 1, 0)
  523. _mark_read(last_event_id)
  524. _assert_counts(0, 0, 0, 0)
  525. _create_event(True)
  526. _assert_counts(1, 1, 0, 0)
  527. _rotate()
  528. _assert_counts(1, 1, 0, 0)
  529. event_id = _create_event(True, thread_id)
  530. _assert_counts(1, 1, 1, 1)
  531. _rotate()
  532. _assert_counts(1, 1, 1, 1)
  533. # Check that adding another notification and rotating after highlight
  534. # works.
  535. _create_event()
  536. _rotate()
  537. _assert_counts(2, 1, 1, 1)
  538. _create_event(thread_id=thread_id)
  539. _rotate()
  540. _assert_counts(2, 1, 2, 1)
  541. # Check that sending read receipts at different points results in the
  542. # right counts.
  543. _mark_read(event_id)
  544. _assert_counts(1, 0, 1, 0)
  545. _mark_read(event_id, MAIN_TIMELINE)
  546. _assert_counts(1, 0, 1, 0)
  547. _mark_read(last_event_id, MAIN_TIMELINE)
  548. _assert_counts(0, 0, 1, 0)
  549. _mark_read(last_event_id, thread_id)
  550. _assert_counts(0, 0, 0, 0)
  551. _create_event(True)
  552. _create_event(True, thread_id)
  553. _assert_counts(1, 1, 1, 1)
  554. _mark_read(last_event_id)
  555. _assert_counts(0, 0, 0, 0)
  556. _rotate()
  557. _assert_counts(0, 0, 0, 0)
  558. def test_recursive_thread(self) -> None:
  559. """
  560. Events related to events in a thread should still be considered part of
  561. that thread.
  562. """
  563. # Create a user to receive notifications and send receipts.
  564. user_id = self.register_user("user1235", "pass")
  565. token = self.login("user1235", "pass")
  566. # And another users to send events.
  567. other_id = self.register_user("other", "pass")
  568. other_token = self.login("other", "pass")
  569. # Create a room and put both users in it.
  570. room_id = self.helper.create_room_as(user_id, tok=token)
  571. self.helper.join(room_id, other_id, tok=other_token)
  572. # Update the user's push rules to care about reaction events.
  573. self.get_success(
  574. self.store.add_push_rule(
  575. user_id,
  576. "related_events",
  577. priority_class=5,
  578. conditions=[
  579. {"kind": "event_match", "key": "type", "pattern": "m.reaction"}
  580. ],
  581. actions=["notify"],
  582. )
  583. )
  584. def _create_event(type: str, content: JsonDict) -> str:
  585. result = self.helper.send_event(
  586. room_id, type=type, content=content, tok=other_token
  587. )
  588. return result["event_id"]
  589. def _assert_counts(notif_count: int, thread_notif_count: int) -> None:
  590. counts = self.get_success(
  591. self.store.db_pool.runInteraction(
  592. "get-unread-counts",
  593. self.store._get_unread_counts_by_receipt_txn,
  594. room_id,
  595. user_id,
  596. )
  597. )
  598. self.assertEqual(
  599. counts.main_timeline,
  600. NotifCounts(
  601. notify_count=notif_count, unread_count=0, highlight_count=0
  602. ),
  603. )
  604. if thread_notif_count:
  605. self.assertEqual(
  606. counts.threads,
  607. {
  608. thread_id: NotifCounts(
  609. notify_count=thread_notif_count,
  610. unread_count=0,
  611. highlight_count=0,
  612. ),
  613. },
  614. )
  615. else:
  616. self.assertEqual(counts.threads, {})
  617. # Create a root event.
  618. thread_id = _create_event(
  619. "m.room.message", {"msgtype": "m.text", "body": "msg"}
  620. )
  621. _assert_counts(1, 0)
  622. # Reply, creating a thread.
  623. reply_id = _create_event(
  624. "m.room.message",
  625. {
  626. "msgtype": "m.text",
  627. "body": "msg",
  628. "m.relates_to": {
  629. "rel_type": "m.thread",
  630. "event_id": thread_id,
  631. },
  632. },
  633. )
  634. _assert_counts(1, 1)
  635. # Create an event related to a thread event, this should still appear in
  636. # the thread.
  637. _create_event(
  638. type="m.reaction",
  639. content={
  640. "m.relates_to": {
  641. "rel_type": "m.annotation",
  642. "event_id": reply_id,
  643. "key": "A",
  644. }
  645. },
  646. )
  647. _assert_counts(1, 2)
  648. def test_find_first_stream_ordering_after_ts(self) -> None:
  649. def add_event(so: int, ts: int) -> None:
  650. self.get_success(
  651. self.store.db_pool.simple_insert(
  652. "events",
  653. {
  654. "stream_ordering": so,
  655. "received_ts": ts,
  656. "event_id": "event%i" % so,
  657. "type": "",
  658. "room_id": "",
  659. "content": "",
  660. "processed": True,
  661. "outlier": False,
  662. "topological_ordering": 0,
  663. "depth": 0,
  664. },
  665. )
  666. )
  667. # start with the base case where there are no events in the table
  668. r = self.get_success(self.store.find_first_stream_ordering_after_ts(11))
  669. self.assertEqual(r, 0)
  670. # now with one event
  671. add_event(2, 10)
  672. r = self.get_success(self.store.find_first_stream_ordering_after_ts(9))
  673. self.assertEqual(r, 2)
  674. r = self.get_success(self.store.find_first_stream_ordering_after_ts(10))
  675. self.assertEqual(r, 2)
  676. r = self.get_success(self.store.find_first_stream_ordering_after_ts(11))
  677. self.assertEqual(r, 3)
  678. # add a bunch of dummy events to the events table
  679. for stream_ordering, ts in (
  680. (3, 110),
  681. (4, 120),
  682. (5, 120),
  683. (10, 130),
  684. (20, 140),
  685. ):
  686. add_event(stream_ordering, ts)
  687. r = self.get_success(self.store.find_first_stream_ordering_after_ts(110))
  688. self.assertEqual(r, 3, "First event after 110ms should be 3, was %i" % r)
  689. # 4 and 5 are both after 120: we want 4 rather than 5
  690. r = self.get_success(self.store.find_first_stream_ordering_after_ts(120))
  691. self.assertEqual(r, 4, "First event after 120ms should be 4, was %i" % r)
  692. r = self.get_success(self.store.find_first_stream_ordering_after_ts(129))
  693. self.assertEqual(r, 10, "First event after 129ms should be 10, was %i" % r)
  694. # check we can get the last event
  695. r = self.get_success(self.store.find_first_stream_ordering_after_ts(140))
  696. self.assertEqual(r, 20, "First event after 14ms should be 20, was %i" % r)
  697. # off the end
  698. r = self.get_success(self.store.find_first_stream_ordering_after_ts(160))
  699. self.assertEqual(r, 21)
  700. # check we can find an event at ordering zero
  701. add_event(0, 5)
  702. r = self.get_success(self.store.find_first_stream_ordering_after_ts(1))
  703. self.assertEqual(r, 0)