You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1056 line
36 KiB

  1. # Copyright 2014-2016 OpenMarket Ltd
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from typing import (
  17. TYPE_CHECKING,
  18. Any,
  19. Collection,
  20. Dict,
  21. Iterable,
  22. List,
  23. Mapping,
  24. Optional,
  25. Sequence,
  26. Tuple,
  27. cast,
  28. )
  29. from synapse.api.constants import EduTypes
  30. from synapse.replication.tcp.streams import ReceiptsStream
  31. from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
  32. from synapse.storage.database import (
  33. DatabasePool,
  34. LoggingDatabaseConnection,
  35. LoggingTransaction,
  36. )
  37. from synapse.storage.engines import PostgresEngine
  38. from synapse.storage.engines._base import IsolationLevel
  39. from synapse.storage.util.id_generators import (
  40. AbstractStreamIdGenerator,
  41. MultiWriterIdGenerator,
  42. StreamIdGenerator,
  43. )
  44. from synapse.types import JsonDict, JsonMapping
  45. from synapse.util import json_encoder
  46. from synapse.util.caches.descriptors import cached, cachedList
  47. from synapse.util.caches.stream_change_cache import StreamChangeCache
  48. if TYPE_CHECKING:
  49. from synapse.server import HomeServer
  50. logger = logging.getLogger(__name__)
  51. class ReceiptsWorkerStore(SQLBaseStore):
  52. def __init__(
  53. self,
  54. database: DatabasePool,
  55. db_conn: LoggingDatabaseConnection,
  56. hs: "HomeServer",
  57. ):
  58. self._instance_name = hs.get_instance_name()
  59. # In the worker store this is an ID tracker which we overwrite in the non-worker
  60. # class below that is used on the main process.
  61. self._receipts_id_gen: AbstractStreamIdGenerator
  62. if isinstance(database.engine, PostgresEngine):
  63. self._can_write_to_receipts = (
  64. self._instance_name in hs.config.worker.writers.receipts
  65. )
  66. self._receipts_id_gen = MultiWriterIdGenerator(
  67. db_conn=db_conn,
  68. db=database,
  69. notifier=hs.get_replication_notifier(),
  70. stream_name="receipts",
  71. instance_name=self._instance_name,
  72. tables=[("receipts_linearized", "instance_name", "stream_id")],
  73. sequence_name="receipts_sequence",
  74. writers=hs.config.worker.writers.receipts,
  75. )
  76. else:
  77. self._can_write_to_receipts = True
  78. # Multiple writers are not supported for SQLite.
  79. #
  80. # We shouldn't be running in worker mode with SQLite, but its useful
  81. # to support it for unit tests.
  82. self._receipts_id_gen = StreamIdGenerator(
  83. db_conn,
  84. hs.get_replication_notifier(),
  85. "receipts_linearized",
  86. "stream_id",
  87. is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts,
  88. )
  89. super().__init__(database, db_conn, hs)
  90. max_receipts_stream_id = self.get_max_receipt_stream_id()
  91. receipts_stream_prefill, min_receipts_stream_id = self.db_pool.get_cache_dict(
  92. db_conn,
  93. "receipts_linearized",
  94. entity_column="room_id",
  95. stream_column="stream_id",
  96. max_value=max_receipts_stream_id,
  97. limit=10000,
  98. )
  99. self._receipts_stream_cache = StreamChangeCache(
  100. "ReceiptsRoomChangeCache",
  101. min_receipts_stream_id,
  102. prefilled_cache=receipts_stream_prefill,
  103. )
  104. def get_max_receipt_stream_id(self) -> int:
  105. """Get the current max stream ID for receipts stream"""
  106. return self._receipts_id_gen.get_current_token()
  107. def get_last_unthreaded_receipt_for_user_txn(
  108. self,
  109. txn: LoggingTransaction,
  110. user_id: str,
  111. room_id: str,
  112. receipt_types: Collection[str],
  113. ) -> Optional[Tuple[str, int]]:
  114. """
  115. Fetch the event ID and stream_ordering for the latest unthreaded receipt
  116. in a room with one of the given receipt types.
  117. Args:
  118. user_id: The user to fetch receipts for.
  119. room_id: The room ID to fetch the receipt for.
  120. receipt_types: The receipt types to fetch.
  121. Returns:
  122. The event ID and stream ordering of the latest receipt, if one exists.
  123. """
  124. clause, args = make_in_list_sql_clause(
  125. self.database_engine, "receipt_type", receipt_types
  126. )
  127. sql = f"""
  128. SELECT event_id, stream_ordering
  129. FROM receipts_linearized
  130. INNER JOIN events USING (room_id, event_id)
  131. WHERE {clause}
  132. AND user_id = ?
  133. AND room_id = ?
  134. AND thread_id IS NULL
  135. ORDER BY stream_ordering DESC
  136. LIMIT 1
  137. """
  138. args.extend((user_id, room_id))
  139. txn.execute(sql, args)
  140. return cast(Optional[Tuple[str, int]], txn.fetchone())
  141. async def get_receipts_for_user(
  142. self, user_id: str, receipt_types: Iterable[str]
  143. ) -> Dict[str, str]:
  144. """
  145. Fetch the event IDs for the latest receipts sent by the given user.
  146. Args:
  147. user_id: The user to fetch receipts for.
  148. receipt_types: The receipt types to check.
  149. Returns:
  150. A map of room ID to the event ID of the latest receipt for that room.
  151. If the user has not sent a receipt to a room then it will not appear
  152. in the returned dictionary.
  153. """
  154. results = await self.get_receipts_for_user_with_orderings(
  155. user_id, receipt_types
  156. )
  157. # Reduce the result to room ID -> event ID.
  158. return {
  159. room_id: room_result["event_id"] for room_id, room_result in results.items()
  160. }
  161. async def get_receipts_for_user_with_orderings(
  162. self, user_id: str, receipt_types: Iterable[str]
  163. ) -> JsonDict:
  164. """
  165. Fetch receipts for all rooms that the given user is joined to.
  166. Args:
  167. user_id: The user to fetch receipts for.
  168. receipt_types: The receipt types to fetch. Earlier receipt types
  169. are given priority if multiple receipts point to the same event.
  170. Returns:
  171. A map of room ID to the latest receipt (for the given types).
  172. """
  173. results: JsonDict = {}
  174. for receipt_type in receipt_types:
  175. partial_result = await self._get_receipts_for_user_with_orderings(
  176. user_id, receipt_type
  177. )
  178. for room_id, room_result in partial_result.items():
  179. # If the room has not yet been seen, or the receipt is newer,
  180. # use it.
  181. if (
  182. room_id not in results
  183. or results[room_id]["stream_ordering"]
  184. < room_result["stream_ordering"]
  185. ):
  186. results[room_id] = room_result
  187. return results
  188. @cached()
  189. async def _get_receipts_for_user_with_orderings(
  190. self, user_id: str, receipt_type: str
  191. ) -> JsonMapping:
  192. """
  193. Fetch receipts for all rooms that the given user is joined to.
  194. Args:
  195. user_id: The user to fetch receipts for.
  196. receipt_type: The receipt type to fetch.
  197. Returns:
  198. A map of room ID to the latest receipt information.
  199. """
  200. def f(txn: LoggingTransaction) -> List[Tuple[str, str, int, int]]:
  201. sql = (
  202. "SELECT rl.room_id, rl.event_id,"
  203. " e.topological_ordering, e.stream_ordering"
  204. " FROM receipts_linearized AS rl"
  205. " INNER JOIN events AS e USING (room_id, event_id)"
  206. " WHERE rl.room_id = e.room_id"
  207. " AND rl.event_id = e.event_id"
  208. " AND user_id = ?"
  209. " AND receipt_type = ?"
  210. )
  211. txn.execute(sql, (user_id, receipt_type))
  212. return cast(List[Tuple[str, str, int, int]], txn.fetchall())
  213. rows = await self.db_pool.runInteraction(
  214. "get_receipts_for_user_with_orderings", f
  215. )
  216. return {
  217. row[0]: {
  218. "event_id": row[1],
  219. "topological_ordering": row[2],
  220. "stream_ordering": row[3],
  221. }
  222. for row in rows
  223. }
  224. async def get_linearized_receipts_for_rooms(
  225. self, room_ids: Iterable[str], to_key: int, from_key: Optional[int] = None
  226. ) -> List[JsonMapping]:
  227. """Get receipts for multiple rooms for sending to clients.
  228. Args:
  229. room_id: The room IDs to fetch receipts of.
  230. to_key: Max stream id to fetch receipts up to.
  231. from_key: Min stream id to fetch receipts from. None fetches
  232. from the start.
  233. Returns:
  234. A list of receipts.
  235. """
  236. room_ids = set(room_ids)
  237. if from_key is not None:
  238. # Only ask the database about rooms where there have been new
  239. # receipts added since `from_key`
  240. room_ids = self._receipts_stream_cache.get_entities_changed(
  241. room_ids, from_key
  242. )
  243. results = await self._get_linearized_receipts_for_rooms(
  244. room_ids, to_key, from_key=from_key
  245. )
  246. return [ev for res in results.values() for ev in res]
  247. async def get_linearized_receipts_for_room(
  248. self, room_id: str, to_key: int, from_key: Optional[int] = None
  249. ) -> Sequence[JsonMapping]:
  250. """Get receipts for a single room for sending to clients.
  251. Args:
  252. room_ids: The room id.
  253. to_key: Max stream id to fetch receipts up to.
  254. from_key: Min stream id to fetch receipts from. None fetches
  255. from the start.
  256. Returns:
  257. A list of receipts.
  258. """
  259. if from_key is not None:
  260. # Check the cache first to see if any new receipts have been added
  261. # since`from_key`. If not we can no-op.
  262. if not self._receipts_stream_cache.has_entity_changed(room_id, from_key):
  263. return []
  264. return await self._get_linearized_receipts_for_room(room_id, to_key, from_key)
  265. @cached(tree=True)
  266. async def _get_linearized_receipts_for_room(
  267. self, room_id: str, to_key: int, from_key: Optional[int] = None
  268. ) -> Sequence[JsonMapping]:
  269. """See get_linearized_receipts_for_room"""
  270. def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
  271. if from_key:
  272. sql = (
  273. "SELECT * FROM receipts_linearized WHERE"
  274. " room_id = ? AND stream_id > ? AND stream_id <= ?"
  275. )
  276. txn.execute(sql, (room_id, from_key, to_key))
  277. else:
  278. sql = (
  279. "SELECT * FROM receipts_linearized WHERE"
  280. " room_id = ? AND stream_id <= ?"
  281. )
  282. txn.execute(sql, (room_id, to_key))
  283. rows = self.db_pool.cursor_to_dict(txn)
  284. return rows
  285. rows = await self.db_pool.runInteraction("get_linearized_receipts_for_room", f)
  286. if not rows:
  287. return []
  288. content: JsonDict = {}
  289. for row in rows:
  290. content.setdefault(row["event_id"], {}).setdefault(row["receipt_type"], {})[
  291. row["user_id"]
  292. ] = db_to_json(row["data"])
  293. return [{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}]
  294. @cachedList(
  295. cached_method_name="_get_linearized_receipts_for_room",
  296. list_name="room_ids",
  297. num_args=3,
  298. )
  299. async def _get_linearized_receipts_for_rooms(
  300. self, room_ids: Collection[str], to_key: int, from_key: Optional[int] = None
  301. ) -> Mapping[str, Sequence[JsonMapping]]:
  302. if not room_ids:
  303. return {}
  304. def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
  305. if from_key:
  306. sql = """
  307. SELECT * FROM receipts_linearized WHERE
  308. stream_id > ? AND stream_id <= ? AND
  309. """
  310. clause, args = make_in_list_sql_clause(
  311. self.database_engine, "room_id", room_ids
  312. )
  313. txn.execute(sql + clause, [from_key, to_key] + list(args))
  314. else:
  315. sql = """
  316. SELECT * FROM receipts_linearized WHERE
  317. stream_id <= ? AND
  318. """
  319. clause, args = make_in_list_sql_clause(
  320. self.database_engine, "room_id", room_ids
  321. )
  322. txn.execute(sql + clause, [to_key] + list(args))
  323. return self.db_pool.cursor_to_dict(txn)
  324. txn_results = await self.db_pool.runInteraction(
  325. "_get_linearized_receipts_for_rooms", f
  326. )
  327. results: JsonDict = {}
  328. for row in txn_results:
  329. # We want a single event per room, since we want to batch the
  330. # receipts by room, event and type.
  331. room_event = results.setdefault(
  332. row["room_id"],
  333. {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}},
  334. )
  335. # The content is of the form:
  336. # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
  337. event_entry = room_event["content"].setdefault(row["event_id"], {})
  338. receipt_type = event_entry.setdefault(row["receipt_type"], {})
  339. receipt_type[row["user_id"]] = db_to_json(row["data"])
  340. if row["thread_id"]:
  341. receipt_type[row["user_id"]]["thread_id"] = row["thread_id"]
  342. results = {
  343. room_id: [results[room_id]] if room_id in results else []
  344. for room_id in room_ids
  345. }
  346. return results
  347. @cached(
  348. num_args=2,
  349. )
  350. async def get_linearized_receipts_for_all_rooms(
  351. self, to_key: int, from_key: Optional[int] = None
  352. ) -> Mapping[str, JsonMapping]:
  353. """Get receipts for all rooms between two stream_ids, up
  354. to a limit of the latest 100 read receipts.
  355. Args:
  356. to_key: Max stream id to fetch receipts up to.
  357. from_key: Min stream id to fetch receipts from. None fetches
  358. from the start.
  359. Returns:
  360. A dictionary of roomids to a list of receipts.
  361. """
  362. def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
  363. if from_key:
  364. sql = """
  365. SELECT * FROM receipts_linearized WHERE
  366. stream_id > ? AND stream_id <= ?
  367. ORDER BY stream_id DESC
  368. LIMIT 100
  369. """
  370. txn.execute(sql, [from_key, to_key])
  371. else:
  372. sql = """
  373. SELECT * FROM receipts_linearized WHERE
  374. stream_id <= ?
  375. ORDER BY stream_id DESC
  376. LIMIT 100
  377. """
  378. txn.execute(sql, [to_key])
  379. return self.db_pool.cursor_to_dict(txn)
  380. txn_results = await self.db_pool.runInteraction(
  381. "get_linearized_receipts_for_all_rooms", f
  382. )
  383. results: JsonDict = {}
  384. for row in txn_results:
  385. # We want a single event per room, since we want to batch the
  386. # receipts by room, event and type.
  387. room_event = results.setdefault(
  388. row["room_id"],
  389. {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}},
  390. )
  391. # The content is of the form:
  392. # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
  393. event_entry = room_event["content"].setdefault(row["event_id"], {})
  394. receipt_type = event_entry.setdefault(row["receipt_type"], {})
  395. receipt_type[row["user_id"]] = db_to_json(row["data"])
  396. return results
  397. async def get_users_sent_receipts_between(
  398. self, last_id: int, current_id: int
  399. ) -> List[str]:
  400. """Get all users who sent receipts between `last_id` exclusive and
  401. `current_id` inclusive.
  402. Returns:
  403. The list of users.
  404. """
  405. if last_id == current_id:
  406. return []
  407. def _get_users_sent_receipts_between_txn(txn: LoggingTransaction) -> List[str]:
  408. sql = """
  409. SELECT DISTINCT user_id FROM receipts_linearized
  410. WHERE ? < stream_id AND stream_id <= ?
  411. """
  412. txn.execute(sql, (last_id, current_id))
  413. return [r[0] for r in txn]
  414. return await self.db_pool.runInteraction(
  415. "get_users_sent_receipts_between", _get_users_sent_receipts_between_txn
  416. )
  417. async def get_all_updated_receipts(
  418. self, instance_name: str, last_id: int, current_id: int, limit: int
  419. ) -> Tuple[
  420. List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]], int, bool
  421. ]:
  422. """Get updates for receipts replication stream.
  423. Args:
  424. instance_name: The writer we want to fetch updates from. Unused
  425. here since there is only ever one writer.
  426. last_id: The token to fetch updates from. Exclusive.
  427. current_id: The token to fetch updates up to. Inclusive.
  428. limit: The requested limit for the number of rows to return. The
  429. function may return more or fewer rows.
  430. Returns:
  431. A tuple consisting of: the updates, a token to use to fetch
  432. subsequent updates, and whether we returned fewer rows than exists
  433. between the requested tokens due to the limit.
  434. The token returned can be used in a subsequent call to this
  435. function to get further updatees.
  436. The updates are a list of 2-tuples of stream ID and the row data
  437. """
  438. if last_id == current_id:
  439. return [], current_id, False
  440. def get_all_updated_receipts_txn(
  441. txn: LoggingTransaction,
  442. ) -> Tuple[
  443. List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]],
  444. int,
  445. bool,
  446. ]:
  447. sql = """
  448. SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data
  449. FROM receipts_linearized
  450. WHERE ? < stream_id AND stream_id <= ?
  451. ORDER BY stream_id ASC
  452. LIMIT ?
  453. """
  454. txn.execute(sql, (last_id, current_id, limit))
  455. updates = cast(
  456. List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]],
  457. [(r[0], r[1:6] + (db_to_json(r[6]),)) for r in txn],
  458. )
  459. limited = False
  460. upper_bound = current_id
  461. if len(updates) == limit:
  462. limited = True
  463. upper_bound = updates[-1][0]
  464. return updates, upper_bound, limited
  465. return await self.db_pool.runInteraction(
  466. "get_all_updated_receipts", get_all_updated_receipts_txn
  467. )
  468. def invalidate_caches_for_receipt(
  469. self, room_id: str, receipt_type: str, user_id: str
  470. ) -> None:
  471. self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))
  472. self._get_linearized_receipts_for_room.invalidate((room_id,))
  473. # We use this method to invalidate so that we don't end up with circular
  474. # dependencies between the receipts and push action stores.
  475. self._attempt_to_invalidate_cache(
  476. "get_unread_event_push_actions_by_room_for_user", (room_id,)
  477. )
  478. def process_replication_rows(
  479. self,
  480. stream_name: str,
  481. instance_name: str,
  482. token: int,
  483. rows: Iterable[Any],
  484. ) -> None:
  485. if stream_name == ReceiptsStream.NAME:
  486. self._receipts_id_gen.advance(instance_name, token)
  487. for row in rows:
  488. self.invalidate_caches_for_receipt(
  489. row.room_id, row.receipt_type, row.user_id
  490. )
  491. self._receipts_stream_cache.entity_has_changed(row.room_id, token)
  492. return super().process_replication_rows(stream_name, instance_name, token, rows)
  493. def process_replication_position(
  494. self, stream_name: str, instance_name: str, token: int
  495. ) -> None:
  496. if stream_name == ReceiptsStream.NAME:
  497. self._receipts_id_gen.advance(instance_name, token)
  498. super().process_replication_position(stream_name, instance_name, token)
  499. def _insert_linearized_receipt_txn(
  500. self,
  501. txn: LoggingTransaction,
  502. room_id: str,
  503. receipt_type: str,
  504. user_id: str,
  505. event_id: str,
  506. thread_id: Optional[str],
  507. data: JsonDict,
  508. stream_id: int,
  509. ) -> Optional[int]:
  510. """Inserts a receipt into the database if it's newer than the current one.
  511. Returns:
  512. None if the receipt is older than the current receipt
  513. otherwise, the rx timestamp of the event that the receipt corresponds to
  514. (or 0 if the event is unknown)
  515. """
  516. assert self._can_write_to_receipts
  517. res = self.db_pool.simple_select_one_txn(
  518. txn,
  519. table="events",
  520. retcols=["stream_ordering", "received_ts"],
  521. keyvalues={"event_id": event_id},
  522. allow_none=True,
  523. )
  524. stream_ordering = int(res["stream_ordering"]) if res else None
  525. rx_ts = res["received_ts"] if res else 0
  526. # We don't want to clobber receipts for more recent events, so we
  527. # have to compare orderings of existing receipts
  528. if stream_ordering is not None:
  529. if thread_id is None:
  530. thread_clause = "r.thread_id IS NULL"
  531. thread_args: Tuple[str, ...] = ()
  532. else:
  533. thread_clause = "r.thread_id = ?"
  534. thread_args = (thread_id,)
  535. sql = f"""
  536. SELECT stream_ordering, event_id FROM events
  537. INNER JOIN receipts_linearized AS r USING (event_id, room_id)
  538. WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause}
  539. """
  540. txn.execute(
  541. sql,
  542. (
  543. room_id,
  544. receipt_type,
  545. user_id,
  546. )
  547. + thread_args,
  548. )
  549. for so, eid in txn:
  550. if int(so) >= stream_ordering:
  551. logger.debug(
  552. "Ignoring new receipt for %s in favour of existing "
  553. "one for later event %s",
  554. event_id,
  555. eid,
  556. )
  557. return None
  558. txn.call_after(
  559. self.invalidate_caches_for_receipt, room_id, receipt_type, user_id
  560. )
  561. txn.call_after(
  562. self._receipts_stream_cache.entity_has_changed, room_id, stream_id
  563. )
  564. keyvalues = {
  565. "room_id": room_id,
  566. "receipt_type": receipt_type,
  567. "user_id": user_id,
  568. }
  569. where_clause = ""
  570. if thread_id is None:
  571. where_clause = "thread_id IS NULL"
  572. else:
  573. keyvalues["thread_id"] = thread_id
  574. self.db_pool.simple_upsert_txn(
  575. txn,
  576. table="receipts_linearized",
  577. keyvalues=keyvalues,
  578. values={
  579. "stream_id": stream_id,
  580. "event_id": event_id,
  581. "event_stream_ordering": stream_ordering,
  582. "data": json_encoder.encode(data),
  583. },
  584. where_clause=where_clause,
  585. )
  586. return rx_ts
  587. def _graph_to_linear(
  588. self, txn: LoggingTransaction, room_id: str, event_ids: List[str]
  589. ) -> str:
  590. """
  591. Generate a linearized event from a list of events (i.e. a list of forward
  592. extremities in the room).
  593. This should allow for calculation of the correct read receipt even if
  594. servers have different event ordering.
  595. Args:
  596. txn: The transaction
  597. room_id: The room ID the events are in.
  598. event_ids: The list of event IDs to linearize.
  599. Returns:
  600. The linearized event ID.
  601. """
  602. # TODO: Make this better.
  603. clause, args = make_in_list_sql_clause(
  604. self.database_engine, "event_id", event_ids
  605. )
  606. sql = """
  607. SELECT event_id WHERE room_id = ? AND stream_ordering IN (
  608. SELECT max(stream_ordering) WHERE %s
  609. )
  610. """ % (
  611. clause,
  612. )
  613. txn.execute(sql, [room_id] + list(args))
  614. rows = txn.fetchall()
  615. if rows:
  616. return rows[0][0]
  617. else:
  618. raise RuntimeError("Unrecognized event_ids: %r" % (event_ids,))
  619. async def insert_receipt(
  620. self,
  621. room_id: str,
  622. receipt_type: str,
  623. user_id: str,
  624. event_ids: List[str],
  625. thread_id: Optional[str],
  626. data: dict,
  627. ) -> Optional[Tuple[int, int]]:
  628. """Insert a receipt, either from local client or remote server.
  629. Automatically does conversion between linearized and graph
  630. representations.
  631. Returns:
  632. The new receipts stream ID and token, if the receipt is newer than
  633. what was previously persisted. None, otherwise.
  634. """
  635. assert self._can_write_to_receipts
  636. if not event_ids:
  637. return None
  638. if len(event_ids) == 1:
  639. linearized_event_id = event_ids[0]
  640. else:
  641. # we need to points in graph -> linearized form.
  642. linearized_event_id = await self.db_pool.runInteraction(
  643. "insert_receipt_conv", self._graph_to_linear, room_id, event_ids
  644. )
  645. async with self._receipts_id_gen.get_next() as stream_id:
  646. event_ts = await self.db_pool.runInteraction(
  647. "insert_linearized_receipt",
  648. self._insert_linearized_receipt_txn,
  649. room_id,
  650. receipt_type,
  651. user_id,
  652. linearized_event_id,
  653. thread_id,
  654. data,
  655. stream_id=stream_id,
  656. # Read committed is actually beneficial here because we check for a receipt with
  657. # greater stream order, and checking the very latest data at select time is better
  658. # than the data at transaction start time.
  659. isolation_level=IsolationLevel.READ_COMMITTED,
  660. )
  661. # If the receipt was older than the currently persisted one, nothing to do.
  662. if event_ts is None:
  663. return None
  664. now = self._clock.time_msec()
  665. logger.debug(
  666. "Receipt %s for event %s in %s (%i ms old)",
  667. receipt_type,
  668. linearized_event_id,
  669. room_id,
  670. now - event_ts,
  671. )
  672. await self._insert_graph_receipt(
  673. room_id,
  674. receipt_type,
  675. user_id,
  676. event_ids,
  677. thread_id,
  678. data,
  679. )
  680. max_persisted_id = self._receipts_id_gen.get_current_token()
  681. return stream_id, max_persisted_id
  682. async def _insert_graph_receipt(
  683. self,
  684. room_id: str,
  685. receipt_type: str,
  686. user_id: str,
  687. event_ids: List[str],
  688. thread_id: Optional[str],
  689. data: JsonDict,
  690. ) -> None:
  691. assert self._can_write_to_receipts
  692. keyvalues = {
  693. "room_id": room_id,
  694. "receipt_type": receipt_type,
  695. "user_id": user_id,
  696. }
  697. where_clause = ""
  698. if thread_id is None:
  699. where_clause = "thread_id IS NULL"
  700. else:
  701. keyvalues["thread_id"] = thread_id
  702. await self.db_pool.simple_upsert(
  703. desc="insert_graph_receipt",
  704. table="receipts_graph",
  705. keyvalues=keyvalues,
  706. values={
  707. "event_ids": json_encoder.encode(event_ids),
  708. "data": json_encoder.encode(data),
  709. },
  710. where_clause=where_clause,
  711. )
  712. self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))
  713. # FIXME: This shouldn't invalidate the whole cache
  714. self._get_linearized_receipts_for_room.invalidate((room_id,))
  715. class ReceiptsBackgroundUpdateStore(SQLBaseStore):
  716. POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
  717. RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME = "receipts_linearized_unique_index"
  718. RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME = "receipts_graph_unique_index"
  719. def __init__(
  720. self,
  721. database: DatabasePool,
  722. db_conn: LoggingDatabaseConnection,
  723. hs: "HomeServer",
  724. ):
  725. super().__init__(database, db_conn, hs)
  726. self.db_pool.updates.register_background_update_handler(
  727. self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
  728. self._populate_receipt_event_stream_ordering,
  729. )
  730. self.db_pool.updates.register_background_update_handler(
  731. self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
  732. self._background_receipts_linearized_unique_index,
  733. )
  734. self.db_pool.updates.register_background_update_handler(
  735. self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
  736. self._background_receipts_graph_unique_index,
  737. )
  738. async def _populate_receipt_event_stream_ordering(
  739. self, progress: JsonDict, batch_size: int
  740. ) -> int:
  741. def _populate_receipt_event_stream_ordering_txn(
  742. txn: LoggingTransaction,
  743. ) -> bool:
  744. if "max_stream_id" in progress:
  745. max_stream_id = progress["max_stream_id"]
  746. else:
  747. txn.execute("SELECT max(stream_id) FROM receipts_linearized")
  748. res = txn.fetchone()
  749. if res is None or res[0] is None:
  750. return True
  751. else:
  752. max_stream_id = res[0]
  753. start = progress.get("stream_id", 0)
  754. stop = start + batch_size
  755. sql = """
  756. UPDATE receipts_linearized
  757. SET event_stream_ordering = (
  758. SELECT stream_ordering
  759. FROM events
  760. WHERE event_id = receipts_linearized.event_id
  761. )
  762. WHERE stream_id >= ? AND stream_id < ?
  763. """
  764. txn.execute(sql, (start, stop))
  765. self.db_pool.updates._background_update_progress_txn(
  766. txn,
  767. self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
  768. {
  769. "stream_id": stop,
  770. "max_stream_id": max_stream_id,
  771. },
  772. )
  773. return stop > max_stream_id
  774. finished = await self.db_pool.runInteraction(
  775. "_remove_devices_from_device_inbox_txn",
  776. _populate_receipt_event_stream_ordering_txn,
  777. )
  778. if finished:
  779. await self.db_pool.updates._end_background_update(
  780. self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING
  781. )
  782. return batch_size
  783. async def _background_receipts_linearized_unique_index(
  784. self, progress: dict, batch_size: int
  785. ) -> int:
  786. """Removes duplicate receipts and adds a unique index on
  787. `(room_id, receipt_type, user_id)` to `receipts_linearized`, for non-thread
  788. receipts."""
  789. def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
  790. ROW_ID_NAME = self.database_engine.row_id_name
  791. # Identify any duplicate receipts arising from
  792. # https://github.com/matrix-org/synapse/issues/14406.
  793. # The following query takes less than a minute on matrix.org.
  794. sql = """
  795. SELECT MAX(stream_id), room_id, receipt_type, user_id
  796. FROM receipts_linearized
  797. WHERE thread_id IS NULL
  798. GROUP BY room_id, receipt_type, user_id
  799. HAVING COUNT(*) > 1
  800. """
  801. txn.execute(sql)
  802. duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn))
  803. # Then remove duplicate receipts, keeping the one with the highest
  804. # `stream_id`. Since there might be duplicate rows with the same
  805. # `stream_id`, we delete by the ctid instead.
  806. for stream_id, room_id, receipt_type, user_id in duplicate_keys:
  807. sql = f"""
  808. SELECT {ROW_ID_NAME}
  809. FROM receipts_linearized
  810. WHERE
  811. room_id = ? AND
  812. receipt_type = ? AND
  813. user_id = ? AND
  814. thread_id IS NULL AND
  815. stream_id = ?
  816. LIMIT 1
  817. """
  818. txn.execute(sql, (room_id, receipt_type, user_id, stream_id))
  819. row_id = cast(Tuple[str], txn.fetchone())[0]
  820. sql = f"""
  821. DELETE FROM receipts_linearized
  822. WHERE
  823. room_id = ? AND
  824. receipt_type = ? AND
  825. user_id = ? AND
  826. thread_id IS NULL AND
  827. {ROW_ID_NAME} != ?
  828. """
  829. txn.execute(sql, (room_id, receipt_type, user_id, row_id))
  830. await self.db_pool.runInteraction(
  831. self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
  832. _remote_duplicate_receipts_txn,
  833. )
  834. await self.db_pool.updates.create_index_in_background(
  835. index_name="receipts_linearized_unique_index",
  836. table="receipts_linearized",
  837. columns=["room_id", "receipt_type", "user_id"],
  838. where_clause="thread_id IS NULL",
  839. unique=True,
  840. )
  841. await self.db_pool.updates._end_background_update(
  842. self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME
  843. )
  844. return 1
  845. async def _background_receipts_graph_unique_index(
  846. self, progress: dict, batch_size: int
  847. ) -> int:
  848. """Removes duplicate receipts and adds a unique index on
  849. `(room_id, receipt_type, user_id)` to `receipts_graph`, for non-thread
  850. receipts."""
  851. def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
  852. # Identify any duplicate receipts arising from
  853. # https://github.com/matrix-org/synapse/issues/14406.
  854. # We expect the following query to use the per-thread receipt index and take
  855. # less than a minute.
  856. sql = """
  857. SELECT room_id, receipt_type, user_id FROM receipts_graph
  858. WHERE thread_id IS NULL
  859. GROUP BY room_id, receipt_type, user_id
  860. HAVING COUNT(*) > 1
  861. """
  862. txn.execute(sql)
  863. duplicate_keys = cast(List[Tuple[str, str, str]], list(txn))
  864. # Then remove all duplicate receipts.
  865. # We could be clever and try to keep the latest receipt out of every set of
  866. # duplicates, but it's far simpler to remove them all.
  867. for room_id, receipt_type, user_id in duplicate_keys:
  868. sql = """
  869. DELETE FROM receipts_graph
  870. WHERE
  871. room_id = ? AND
  872. receipt_type = ? AND
  873. user_id = ? AND
  874. thread_id IS NULL
  875. """
  876. txn.execute(sql, (room_id, receipt_type, user_id))
  877. await self.db_pool.runInteraction(
  878. self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
  879. _remote_duplicate_receipts_txn,
  880. )
  881. await self.db_pool.updates.create_index_in_background(
  882. index_name="receipts_graph_unique_index",
  883. table="receipts_graph",
  884. columns=["room_id", "receipt_type", "user_id"],
  885. where_clause="thread_id IS NULL",
  886. unique=True,
  887. )
  888. await self.db_pool.updates._end_background_update(
  889. self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME
  890. )
  891. return 1
  892. class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
  893. pass