You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1528 lines
57 KiB

  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018-2019 New Vector Ltd
  4. # Copyright 2019 The Matrix.org Foundation C.I.C.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. import itertools
  18. import logging
  19. from collections import OrderedDict, namedtuple
  20. from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
  21. import attr
  22. from prometheus_client import Counter
  23. from twisted.internet import defer
  24. import synapse.metrics
  25. from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
  26. from synapse.api.room_versions import RoomVersions
  27. from synapse.crypto.event_signing import compute_event_reference_hash
  28. from synapse.events import EventBase # noqa: F401
  29. from synapse.events.snapshot import EventContext # noqa: F401
  30. from synapse.logging.utils import log_function
  31. from synapse.storage._base import db_to_json, make_in_list_sql_clause
  32. from synapse.storage.database import DatabasePool, LoggingTransaction
  33. from synapse.storage.databases.main.search import SearchEntry
  34. from synapse.storage.util.id_generators import StreamIdGenerator
  35. from synapse.types import StateMap, get_domain_from_id
  36. from synapse.util.frozenutils import frozendict_json_encoder
  37. from synapse.util.iterutils import batch_iter
  38. if TYPE_CHECKING:
  39. from synapse.server import HomeServer
  40. from synapse.storage.databases.main import DataStore
  41. logger = logging.getLogger(__name__)
  42. persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
  43. event_counter = Counter(
  44. "synapse_storage_events_persisted_events_sep",
  45. "",
  46. ["type", "origin_type", "origin_entity"],
  47. )
  48. STATE_EVENT_TYPES_TO_MARK_UNREAD = {
  49. EventTypes.Topic,
  50. EventTypes.Name,
  51. EventTypes.RoomAvatar,
  52. EventTypes.Tombstone,
  53. }
  54. def should_count_as_unread(event: EventBase, context: EventContext) -> bool:
  55. # Exclude rejected and soft-failed events.
  56. if context.rejected or event.internal_metadata.is_soft_failed():
  57. return False
  58. # Exclude notices.
  59. if (
  60. not event.is_state()
  61. and event.type == EventTypes.Message
  62. and event.content.get("msgtype") == "m.notice"
  63. ):
  64. return False
  65. # Exclude edits.
  66. relates_to = event.content.get("m.relates_to", {})
  67. if relates_to.get("rel_type") == RelationTypes.REPLACE:
  68. return False
  69. # Mark events that have a non-empty string body as unread.
  70. body = event.content.get("body")
  71. if isinstance(body, str) and body:
  72. return True
  73. # Mark some state events as unread.
  74. if event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD:
  75. return True
  76. # Mark encrypted events as unread.
  77. if not event.is_state() and event.type == EventTypes.Encrypted:
  78. return True
  79. return False
  80. def encode_json(json_object):
  81. """
  82. Encode a Python object as JSON and return it in a Unicode string.
  83. """
  84. out = frozendict_json_encoder.encode(json_object)
  85. if isinstance(out, bytes):
  86. out = out.decode("utf8")
  87. return out
  88. _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
  89. @attr.s(slots=True)
  90. class DeltaState:
  91. """Deltas to use to update the `current_state_events` table.
  92. Attributes:
  93. to_delete: List of type/state_keys to delete from current state
  94. to_insert: Map of state to upsert into current state
  95. no_longer_in_room: The server is not longer in the room, so the room
  96. should e.g. be removed from `current_state_events` table.
  97. """
  98. to_delete = attr.ib(type=List[Tuple[str, str]])
  99. to_insert = attr.ib(type=StateMap[str])
  100. no_longer_in_room = attr.ib(type=bool, default=False)
  101. class PersistEventsStore:
  102. """Contains all the functions for writing events to the database.
  103. Should only be instantiated on one process (when using a worker mode setup).
  104. Note: This is not part of the `DataStore` mixin.
  105. """
  106. def __init__(
  107. self, hs: "HomeServer", db: DatabasePool, main_data_store: "DataStore"
  108. ):
  109. self.hs = hs
  110. self.db_pool = db
  111. self.store = main_data_store
  112. self.database_engine = db.engine
  113. self._clock = hs.get_clock()
  114. self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
  115. self.is_mine_id = hs.is_mine_id
  116. # Ideally we'd move these ID gens here, unfortunately some other ID
  117. # generators are chained off them so doing so is a bit of a PITA.
  118. self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
  119. self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator
  120. # This should only exist on instances that are configured to write
  121. assert (
  122. hs.config.worker.writers.events == hs.get_instance_name()
  123. ), "Can only instantiate EventsStore on master"
  124. @defer.inlineCallbacks
  125. def _persist_events_and_state_updates(
  126. self,
  127. events_and_contexts: List[Tuple[EventBase, EventContext]],
  128. current_state_for_room: Dict[str, StateMap[str]],
  129. state_delta_for_room: Dict[str, DeltaState],
  130. new_forward_extremeties: Dict[str, List[str]],
  131. backfilled: bool = False,
  132. ):
  133. """Persist a set of events alongside updates to the current state and
  134. forward extremities tables.
  135. Args:
  136. events_and_contexts:
  137. current_state_for_room: Map from room_id to the current state of
  138. the room based on forward extremities
  139. state_delta_for_room: Map from room_id to the delta to apply to
  140. room state
  141. new_forward_extremities: Map from room_id to list of event IDs
  142. that are the new forward extremities of the room.
  143. backfilled
  144. Returns:
  145. Deferred: resolves when the events have been persisted
  146. """
  147. # We want to calculate the stream orderings as late as possible, as
  148. # we only notify after all events with a lesser stream ordering have
  149. # been persisted. I.e. if we spend 10s inside the with block then
  150. # that will delay all subsequent events from being notified about.
  151. # Hence why we do it down here rather than wrapping the entire
  152. # function.
  153. #
  154. # Its safe to do this after calculating the state deltas etc as we
  155. # only need to protect the *persistence* of the events. This is to
  156. # ensure that queries of the form "fetch events since X" don't
  157. # return events and stream positions after events that are still in
  158. # flight, as otherwise subsequent requests "fetch event since Y"
  159. # will not return those events.
  160. #
  161. # Note: Multiple instances of this function cannot be in flight at
  162. # the same time for the same room.
  163. if backfilled:
  164. stream_ordering_manager = self._backfill_id_gen.get_next_mult(
  165. len(events_and_contexts)
  166. )
  167. else:
  168. stream_ordering_manager = self._stream_id_gen.get_next_mult(
  169. len(events_and_contexts)
  170. )
  171. with stream_ordering_manager as stream_orderings:
  172. for (event, context), stream in zip(events_and_contexts, stream_orderings):
  173. event.internal_metadata.stream_ordering = stream
  174. yield self.db_pool.runInteraction(
  175. "persist_events",
  176. self._persist_events_txn,
  177. events_and_contexts=events_and_contexts,
  178. backfilled=backfilled,
  179. state_delta_for_room=state_delta_for_room,
  180. new_forward_extremeties=new_forward_extremeties,
  181. )
  182. persist_event_counter.inc(len(events_and_contexts))
  183. if not backfilled:
  184. # backfilled events have negative stream orderings, so we don't
  185. # want to set the event_persisted_position to that.
  186. synapse.metrics.event_persisted_position.set(
  187. events_and_contexts[-1][0].internal_metadata.stream_ordering
  188. )
  189. for event, context in events_and_contexts:
  190. if context.app_service:
  191. origin_type = "local"
  192. origin_entity = context.app_service.id
  193. elif self.hs.is_mine_id(event.sender):
  194. origin_type = "local"
  195. origin_entity = "*client*"
  196. else:
  197. origin_type = "remote"
  198. origin_entity = get_domain_from_id(event.sender)
  199. event_counter.labels(event.type, origin_type, origin_entity).inc()
  200. self.store.get_unread_message_count_for_user.invalidate_many(
  201. (event.room_id,),
  202. )
  203. for room_id, new_state in current_state_for_room.items():
  204. self.store.get_current_state_ids.prefill((room_id,), new_state)
  205. for room_id, latest_event_ids in new_forward_extremeties.items():
  206. self.store.get_latest_event_ids_in_room.prefill(
  207. (room_id,), list(latest_event_ids)
  208. )
  209. @defer.inlineCallbacks
  210. def _get_events_which_are_prevs(self, event_ids):
  211. """Filter the supplied list of event_ids to get those which are prev_events of
  212. existing (non-outlier/rejected) events.
  213. Args:
  214. event_ids (Iterable[str]): event ids to filter
  215. Returns:
  216. Deferred[List[str]]: filtered event ids
  217. """
  218. results = []
  219. def _get_events_which_are_prevs_txn(txn, batch):
  220. sql = """
  221. SELECT prev_event_id, internal_metadata
  222. FROM event_edges
  223. INNER JOIN events USING (event_id)
  224. LEFT JOIN rejections USING (event_id)
  225. LEFT JOIN event_json USING (event_id)
  226. WHERE
  227. NOT events.outlier
  228. AND rejections.event_id IS NULL
  229. AND
  230. """
  231. clause, args = make_in_list_sql_clause(
  232. self.database_engine, "prev_event_id", batch
  233. )
  234. txn.execute(sql + clause, args)
  235. results.extend(r[0] for r in txn if not db_to_json(r[1]).get("soft_failed"))
  236. for chunk in batch_iter(event_ids, 100):
  237. yield self.db_pool.runInteraction(
  238. "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
  239. )
  240. return results
  241. @defer.inlineCallbacks
  242. def _get_prevs_before_rejected(self, event_ids):
  243. """Get soft-failed ancestors to remove from the extremities.
  244. Given a set of events, find all those that have been soft-failed or
  245. rejected. Returns those soft failed/rejected events and their prev
  246. events (whether soft-failed/rejected or not), and recurses up the
  247. prev-event graph until it finds no more soft-failed/rejected events.
  248. This is used to find extremities that are ancestors of new events, but
  249. are separated by soft failed events.
  250. Args:
  251. event_ids (Iterable[str]): Events to find prev events for. Note
  252. that these must have already been persisted.
  253. Returns:
  254. Deferred[set[str]]
  255. """
  256. # The set of event_ids to return. This includes all soft-failed events
  257. # and their prev events.
  258. existing_prevs = set()
  259. def _get_prevs_before_rejected_txn(txn, batch):
  260. to_recursively_check = batch
  261. while to_recursively_check:
  262. sql = """
  263. SELECT
  264. event_id, prev_event_id, internal_metadata,
  265. rejections.event_id IS NOT NULL
  266. FROM event_edges
  267. INNER JOIN events USING (event_id)
  268. LEFT JOIN rejections USING (event_id)
  269. LEFT JOIN event_json USING (event_id)
  270. WHERE
  271. NOT events.outlier
  272. AND
  273. """
  274. clause, args = make_in_list_sql_clause(
  275. self.database_engine, "event_id", to_recursively_check
  276. )
  277. txn.execute(sql + clause, args)
  278. to_recursively_check = []
  279. for event_id, prev_event_id, metadata, rejected in txn:
  280. if prev_event_id in existing_prevs:
  281. continue
  282. soft_failed = db_to_json(metadata).get("soft_failed")
  283. if soft_failed or rejected:
  284. to_recursively_check.append(prev_event_id)
  285. existing_prevs.add(prev_event_id)
  286. for chunk in batch_iter(event_ids, 100):
  287. yield self.db_pool.runInteraction(
  288. "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
  289. )
  290. return existing_prevs
  291. @log_function
  292. def _persist_events_txn(
  293. self,
  294. txn: LoggingTransaction,
  295. events_and_contexts: List[Tuple[EventBase, EventContext]],
  296. backfilled: bool,
  297. state_delta_for_room: Dict[str, DeltaState] = {},
  298. new_forward_extremeties: Dict[str, List[str]] = {},
  299. ):
  300. """Insert some number of room events into the necessary database tables.
  301. Rejected events are only inserted into the events table, the events_json table,
  302. and the rejections table. Things reading from those table will need to check
  303. whether the event was rejected.
  304. Args:
  305. txn
  306. events_and_contexts: events to persist
  307. backfilled: True if the events were backfilled
  308. delete_existing True to purge existing table rows for the events
  309. from the database. This is useful when retrying due to
  310. IntegrityError.
  311. state_delta_for_room: The current-state delta for each room.
  312. new_forward_extremetie: The new forward extremities for each room.
  313. For each room, a list of the event ids which are the forward
  314. extremities.
  315. """
  316. all_events_and_contexts = events_and_contexts
  317. min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
  318. max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
  319. self._update_forward_extremities_txn(
  320. txn,
  321. new_forward_extremities=new_forward_extremeties,
  322. max_stream_order=max_stream_order,
  323. )
  324. # Ensure that we don't have the same event twice.
  325. events_and_contexts = self._filter_events_and_contexts_for_duplicates(
  326. events_and_contexts
  327. )
  328. self._update_room_depths_txn(
  329. txn, events_and_contexts=events_and_contexts, backfilled=backfilled
  330. )
  331. # _update_outliers_txn filters out any events which have already been
  332. # persisted, and returns the filtered list.
  333. events_and_contexts = self._update_outliers_txn(
  334. txn, events_and_contexts=events_and_contexts
  335. )
  336. # From this point onwards the events are only events that we haven't
  337. # seen before.
  338. self._store_event_txn(txn, events_and_contexts=events_and_contexts)
  339. # Insert into event_to_state_groups.
  340. self._store_event_state_mappings_txn(txn, events_and_contexts)
  341. # We want to store event_auth mappings for rejected events, as they're
  342. # used in state res v2.
  343. # This is only necessary if the rejected event appears in an accepted
  344. # event's auth chain, but its easier for now just to store them (and
  345. # it doesn't take much storage compared to storing the entire event
  346. # anyway).
  347. self.db_pool.simple_insert_many_txn(
  348. txn,
  349. table="event_auth",
  350. values=[
  351. {
  352. "event_id": event.event_id,
  353. "room_id": event.room_id,
  354. "auth_id": auth_id,
  355. }
  356. for event, _ in events_and_contexts
  357. for auth_id in event.auth_event_ids()
  358. if event.is_state()
  359. ],
  360. )
  361. # _store_rejected_events_txn filters out any events which were
  362. # rejected, and returns the filtered list.
  363. events_and_contexts = self._store_rejected_events_txn(
  364. txn, events_and_contexts=events_and_contexts
  365. )
  366. # From this point onwards the events are only ones that weren't
  367. # rejected.
  368. self._update_metadata_tables_txn(
  369. txn,
  370. events_and_contexts=events_and_contexts,
  371. all_events_and_contexts=all_events_and_contexts,
  372. backfilled=backfilled,
  373. )
  374. # We call this last as it assumes we've inserted the events into
  375. # room_memberships, where applicable.
  376. self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
  377. def _update_current_state_txn(
  378. self,
  379. txn: LoggingTransaction,
  380. state_delta_by_room: Dict[str, DeltaState],
  381. stream_id: int,
  382. ):
  383. for room_id, delta_state in state_delta_by_room.items():
  384. to_delete = delta_state.to_delete
  385. to_insert = delta_state.to_insert
  386. if delta_state.no_longer_in_room:
  387. # Server is no longer in the room so we delete the room from
  388. # current_state_events, being careful we've already updated the
  389. # rooms.room_version column (which gets populated in a
  390. # background task).
  391. self._upsert_room_version_txn(txn, room_id)
  392. # Before deleting we populate the current_state_delta_stream
  393. # so that async background tasks get told what happened.
  394. sql = """
  395. INSERT INTO current_state_delta_stream
  396. (stream_id, room_id, type, state_key, event_id, prev_event_id)
  397. SELECT ?, room_id, type, state_key, null, event_id
  398. FROM current_state_events
  399. WHERE room_id = ?
  400. """
  401. txn.execute(sql, (stream_id, room_id))
  402. self.db_pool.simple_delete_txn(
  403. txn, table="current_state_events", keyvalues={"room_id": room_id},
  404. )
  405. else:
  406. # We're still in the room, so we update the current state as normal.
  407. # First we add entries to the current_state_delta_stream. We
  408. # do this before updating the current_state_events table so
  409. # that we can use it to calculate the `prev_event_id`. (This
  410. # allows us to not have to pull out the existing state
  411. # unnecessarily).
  412. #
  413. # The stream_id for the update is chosen to be the minimum of the stream_ids
  414. # for the batch of the events that we are persisting; that means we do not
  415. # end up in a situation where workers see events before the
  416. # current_state_delta updates.
  417. #
  418. sql = """
  419. INSERT INTO current_state_delta_stream
  420. (stream_id, room_id, type, state_key, event_id, prev_event_id)
  421. SELECT ?, ?, ?, ?, ?, (
  422. SELECT event_id FROM current_state_events
  423. WHERE room_id = ? AND type = ? AND state_key = ?
  424. )
  425. """
  426. txn.executemany(
  427. sql,
  428. (
  429. (
  430. stream_id,
  431. room_id,
  432. etype,
  433. state_key,
  434. to_insert.get((etype, state_key)),
  435. room_id,
  436. etype,
  437. state_key,
  438. )
  439. for etype, state_key in itertools.chain(to_delete, to_insert)
  440. ),
  441. )
  442. # Now we actually update the current_state_events table
  443. txn.executemany(
  444. "DELETE FROM current_state_events"
  445. " WHERE room_id = ? AND type = ? AND state_key = ?",
  446. (
  447. (room_id, etype, state_key)
  448. for etype, state_key in itertools.chain(to_delete, to_insert)
  449. ),
  450. )
  451. # We include the membership in the current state table, hence we do
  452. # a lookup when we insert. This assumes that all events have already
  453. # been inserted into room_memberships.
  454. txn.executemany(
  455. """INSERT INTO current_state_events
  456. (room_id, type, state_key, event_id, membership)
  457. VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
  458. """,
  459. [
  460. (room_id, key[0], key[1], ev_id, ev_id)
  461. for key, ev_id in to_insert.items()
  462. ],
  463. )
  464. # We now update `local_current_membership`. We do this regardless
  465. # of whether we're still in the room or not to handle the case where
  466. # e.g. we just got banned (where we need to record that fact here).
  467. # Note: Do we really want to delete rows here (that we do not
  468. # subsequently reinsert below)? While technically correct it means
  469. # we have no record of the fact the user *was* a member of the
  470. # room but got, say, state reset out of it.
  471. if to_delete or to_insert:
  472. txn.executemany(
  473. "DELETE FROM local_current_membership"
  474. " WHERE room_id = ? AND user_id = ?",
  475. (
  476. (room_id, state_key)
  477. for etype, state_key in itertools.chain(to_delete, to_insert)
  478. if etype == EventTypes.Member and self.is_mine_id(state_key)
  479. ),
  480. )
  481. if to_insert:
  482. txn.executemany(
  483. """INSERT INTO local_current_membership
  484. (room_id, user_id, event_id, membership)
  485. VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
  486. """,
  487. [
  488. (room_id, key[1], ev_id, ev_id)
  489. for key, ev_id in to_insert.items()
  490. if key[0] == EventTypes.Member and self.is_mine_id(key[1])
  491. ],
  492. )
  493. txn.call_after(
  494. self.store._curr_state_delta_stream_cache.entity_has_changed,
  495. room_id,
  496. stream_id,
  497. )
  498. # Invalidate the various caches
  499. # Figure out the changes of membership to invalidate the
  500. # `get_rooms_for_user` cache.
  501. # We find out which membership events we may have deleted
  502. # and which we have added, then we invlidate the caches for all
  503. # those users.
  504. members_changed = {
  505. state_key
  506. for ev_type, state_key in itertools.chain(to_delete, to_insert)
  507. if ev_type == EventTypes.Member
  508. }
  509. for member in members_changed:
  510. txn.call_after(
  511. self.store.get_rooms_for_user_with_stream_ordering.invalidate,
  512. (member,),
  513. )
  514. self.store._invalidate_state_caches_and_stream(
  515. txn, room_id, members_changed
  516. )
  517. def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
  518. """Update the room version in the database based off current state
  519. events.
  520. This is used when we're about to delete current state and we want to
  521. ensure that the `rooms.room_version` column is up to date.
  522. """
  523. sql = """
  524. SELECT json FROM event_json
  525. INNER JOIN current_state_events USING (room_id, event_id)
  526. WHERE room_id = ? AND type = ? AND state_key = ?
  527. """
  528. txn.execute(sql, (room_id, EventTypes.Create, ""))
  529. row = txn.fetchone()
  530. if row:
  531. event_json = db_to_json(row[0])
  532. content = event_json.get("content", {})
  533. creator = content.get("creator")
  534. room_version_id = content.get("room_version", RoomVersions.V1.identifier)
  535. self.db_pool.simple_upsert_txn(
  536. txn,
  537. table="rooms",
  538. keyvalues={"room_id": room_id},
  539. values={"room_version": room_version_id},
  540. insertion_values={"is_public": False, "creator": creator},
  541. )
  542. def _update_forward_extremities_txn(
  543. self, txn, new_forward_extremities, max_stream_order
  544. ):
  545. for room_id, new_extrem in new_forward_extremities.items():
  546. self.db_pool.simple_delete_txn(
  547. txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
  548. )
  549. txn.call_after(
  550. self.store.get_latest_event_ids_in_room.invalidate, (room_id,)
  551. )
  552. self.db_pool.simple_insert_many_txn(
  553. txn,
  554. table="event_forward_extremities",
  555. values=[
  556. {"event_id": ev_id, "room_id": room_id}
  557. for room_id, new_extrem in new_forward_extremities.items()
  558. for ev_id in new_extrem
  559. ],
  560. )
  561. # We now insert into stream_ordering_to_exterm a mapping from room_id,
  562. # new stream_ordering to new forward extremeties in the room.
  563. # This allows us to later efficiently look up the forward extremeties
  564. # for a room before a given stream_ordering
  565. self.db_pool.simple_insert_many_txn(
  566. txn,
  567. table="stream_ordering_to_exterm",
  568. values=[
  569. {
  570. "room_id": room_id,
  571. "event_id": event_id,
  572. "stream_ordering": max_stream_order,
  573. }
  574. for room_id, new_extrem in new_forward_extremities.items()
  575. for event_id in new_extrem
  576. ],
  577. )
  578. @classmethod
  579. def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
  580. """Ensure that we don't have the same event twice.
  581. Pick the earliest non-outlier if there is one, else the earliest one.
  582. Args:
  583. events_and_contexts (list[(EventBase, EventContext)]):
  584. Returns:
  585. list[(EventBase, EventContext)]: filtered list
  586. """
  587. new_events_and_contexts = OrderedDict()
  588. for event, context in events_and_contexts:
  589. prev_event_context = new_events_and_contexts.get(event.event_id)
  590. if prev_event_context:
  591. if not event.internal_metadata.is_outlier():
  592. if prev_event_context[0].internal_metadata.is_outlier():
  593. # To ensure correct ordering we pop, as OrderedDict is
  594. # ordered by first insertion.
  595. new_events_and_contexts.pop(event.event_id, None)
  596. new_events_and_contexts[event.event_id] = (event, context)
  597. else:
  598. new_events_and_contexts[event.event_id] = (event, context)
  599. return list(new_events_and_contexts.values())
  600. def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
  601. """Update min_depth for each room
  602. Args:
  603. txn (twisted.enterprise.adbapi.Connection): db connection
  604. events_and_contexts (list[(EventBase, EventContext)]): events
  605. we are persisting
  606. backfilled (bool): True if the events were backfilled
  607. """
  608. depth_updates = {}
  609. for event, context in events_and_contexts:
  610. # Remove the any existing cache entries for the event_ids
  611. txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
  612. if not backfilled:
  613. txn.call_after(
  614. self.store._events_stream_cache.entity_has_changed,
  615. event.room_id,
  616. event.internal_metadata.stream_ordering,
  617. )
  618. if not event.internal_metadata.is_outlier() and not context.rejected:
  619. depth_updates[event.room_id] = max(
  620. event.depth, depth_updates.get(event.room_id, event.depth)
  621. )
  622. for room_id, depth in depth_updates.items():
  623. self._update_min_depth_for_room_txn(txn, room_id, depth)
  624. def _update_outliers_txn(self, txn, events_and_contexts):
  625. """Update any outliers with new event info.
  626. This turns outliers into ex-outliers (unless the new event was
  627. rejected).
  628. Args:
  629. txn (twisted.enterprise.adbapi.Connection): db connection
  630. events_and_contexts (list[(EventBase, EventContext)]): events
  631. we are persisting
  632. Returns:
  633. list[(EventBase, EventContext)] new list, without events which
  634. are already in the events table.
  635. """
  636. txn.execute(
  637. "SELECT event_id, outlier FROM events WHERE event_id in (%s)"
  638. % (",".join(["?"] * len(events_and_contexts)),),
  639. [event.event_id for event, _ in events_and_contexts],
  640. )
  641. have_persisted = {event_id: outlier for event_id, outlier in txn}
  642. to_remove = set()
  643. for event, context in events_and_contexts:
  644. if event.event_id not in have_persisted:
  645. continue
  646. to_remove.add(event)
  647. if context.rejected:
  648. # If the event is rejected then we don't care if the event
  649. # was an outlier or not.
  650. continue
  651. outlier_persisted = have_persisted[event.event_id]
  652. if not event.internal_metadata.is_outlier() and outlier_persisted:
  653. # We received a copy of an event that we had already stored as
  654. # an outlier in the database. We now have some state at that
  655. # so we need to update the state_groups table with that state.
  656. # insert into event_to_state_groups.
  657. try:
  658. self._store_event_state_mappings_txn(txn, ((event, context),))
  659. except Exception:
  660. logger.exception("")
  661. raise
  662. metadata_json = encode_json(event.internal_metadata.get_dict())
  663. sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
  664. txn.execute(sql, (metadata_json, event.event_id))
  665. # Add an entry to the ex_outlier_stream table to replicate the
  666. # change in outlier status to our workers.
  667. stream_order = event.internal_metadata.stream_ordering
  668. state_group_id = context.state_group
  669. self.db_pool.simple_insert_txn(
  670. txn,
  671. table="ex_outlier_stream",
  672. values={
  673. "event_stream_ordering": stream_order,
  674. "event_id": event.event_id,
  675. "state_group": state_group_id,
  676. },
  677. )
  678. sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
  679. txn.execute(sql, (False, event.event_id))
  680. # Update the event_backward_extremities table now that this
  681. # event isn't an outlier any more.
  682. self._update_backward_extremeties(txn, [event])
  683. return [ec for ec in events_and_contexts if ec[0] not in to_remove]
  684. def _store_event_txn(self, txn, events_and_contexts):
  685. """Insert new events into the event and event_json tables
  686. Args:
  687. txn (twisted.enterprise.adbapi.Connection): db connection
  688. events_and_contexts (list[(EventBase, EventContext)]): events
  689. we are persisting
  690. """
  691. if not events_and_contexts:
  692. # nothing to do here
  693. return
  694. def event_dict(event):
  695. d = event.get_dict()
  696. d.pop("redacted", None)
  697. d.pop("redacted_because", None)
  698. return d
  699. self.db_pool.simple_insert_many_txn(
  700. txn,
  701. table="event_json",
  702. values=[
  703. {
  704. "event_id": event.event_id,
  705. "room_id": event.room_id,
  706. "internal_metadata": encode_json(
  707. event.internal_metadata.get_dict()
  708. ),
  709. "json": encode_json(event_dict(event)),
  710. "format_version": event.format_version,
  711. }
  712. for event, _ in events_and_contexts
  713. ],
  714. )
  715. self.db_pool.simple_insert_many_txn(
  716. txn,
  717. table="events",
  718. values=[
  719. {
  720. "stream_ordering": event.internal_metadata.stream_ordering,
  721. "topological_ordering": event.depth,
  722. "depth": event.depth,
  723. "event_id": event.event_id,
  724. "room_id": event.room_id,
  725. "type": event.type,
  726. "processed": True,
  727. "outlier": event.internal_metadata.is_outlier(),
  728. "origin_server_ts": int(event.origin_server_ts),
  729. "received_ts": self._clock.time_msec(),
  730. "sender": event.sender,
  731. "contains_url": (
  732. "url" in event.content and isinstance(event.content["url"], str)
  733. ),
  734. "count_as_unread": should_count_as_unread(event, context),
  735. }
  736. for event, context in events_and_contexts
  737. ],
  738. )
  739. for event, _ in events_and_contexts:
  740. if not event.internal_metadata.is_redacted():
  741. # If we're persisting an unredacted event we go and ensure
  742. # that we mark any redactions that reference this event as
  743. # requiring censoring.
  744. self.db_pool.simple_update_txn(
  745. txn,
  746. table="redactions",
  747. keyvalues={"redacts": event.event_id},
  748. updatevalues={"have_censored": False},
  749. )
  750. def _store_rejected_events_txn(self, txn, events_and_contexts):
  751. """Add rows to the 'rejections' table for received events which were
  752. rejected
  753. Args:
  754. txn (twisted.enterprise.adbapi.Connection): db connection
  755. events_and_contexts (list[(EventBase, EventContext)]): events
  756. we are persisting
  757. Returns:
  758. list[(EventBase, EventContext)] new list, without the rejected
  759. events.
  760. """
  761. # Remove the rejected events from the list now that we've added them
  762. # to the events table and the events_json table.
  763. to_remove = set()
  764. for event, context in events_and_contexts:
  765. if context.rejected:
  766. # Insert the event_id into the rejections table
  767. self._store_rejections_txn(txn, event.event_id, context.rejected)
  768. to_remove.add(event)
  769. return [ec for ec in events_and_contexts if ec[0] not in to_remove]
  770. def _update_metadata_tables_txn(
  771. self, txn, events_and_contexts, all_events_and_contexts, backfilled
  772. ):
  773. """Update all the miscellaneous tables for new events
  774. Args:
  775. txn (twisted.enterprise.adbapi.Connection): db connection
  776. events_and_contexts (list[(EventBase, EventContext)]): events
  777. we are persisting
  778. all_events_and_contexts (list[(EventBase, EventContext)]): all
  779. events that we were going to persist. This includes events
  780. we've already persisted, etc, that wouldn't appear in
  781. events_and_context.
  782. backfilled (bool): True if the events were backfilled
  783. """
  784. # Insert all the push actions into the event_push_actions table.
  785. self._set_push_actions_for_event_and_users_txn(
  786. txn,
  787. events_and_contexts=events_and_contexts,
  788. all_events_and_contexts=all_events_and_contexts,
  789. )
  790. if not events_and_contexts:
  791. # nothing to do here
  792. return
  793. for event, context in events_and_contexts:
  794. if event.type == EventTypes.Redaction and event.redacts is not None:
  795. # Remove the entries in the event_push_actions table for the
  796. # redacted event.
  797. self._remove_push_actions_for_event_id_txn(
  798. txn, event.room_id, event.redacts
  799. )
  800. # Remove from relations table.
  801. self._handle_redaction(txn, event.redacts)
  802. # Update the event_forward_extremities, event_backward_extremities and
  803. # event_edges tables.
  804. self._handle_mult_prev_events(
  805. txn, events=[event for event, _ in events_and_contexts]
  806. )
  807. for event, _ in events_and_contexts:
  808. if event.type == EventTypes.Name:
  809. # Insert into the event_search table.
  810. self._store_room_name_txn(txn, event)
  811. elif event.type == EventTypes.Topic:
  812. # Insert into the event_search table.
  813. self._store_room_topic_txn(txn, event)
  814. elif event.type == EventTypes.Message:
  815. # Insert into the event_search table.
  816. self._store_room_message_txn(txn, event)
  817. elif event.type == EventTypes.Redaction and event.redacts is not None:
  818. # Insert into the redactions table.
  819. self._store_redaction(txn, event)
  820. elif event.type == EventTypes.Retention:
  821. # Update the room_retention table.
  822. self._store_retention_policy_for_room_txn(txn, event)
  823. self._handle_event_relations(txn, event)
  824. # Store the labels for this event.
  825. labels = event.content.get(EventContentFields.LABELS)
  826. if labels:
  827. self.insert_labels_for_event_txn(
  828. txn, event.event_id, labels, event.room_id, event.depth
  829. )
  830. if self._ephemeral_messages_enabled:
  831. # If there's an expiry timestamp on the event, store it.
  832. expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
  833. if isinstance(expiry_ts, int) and not event.is_state():
  834. self._insert_event_expiry_txn(txn, event.event_id, expiry_ts)
  835. # Insert into the room_memberships table.
  836. self._store_room_members_txn(
  837. txn,
  838. [
  839. event
  840. for event, _ in events_and_contexts
  841. if event.type == EventTypes.Member
  842. ],
  843. backfilled=backfilled,
  844. )
  845. # Insert event_reference_hashes table.
  846. self._store_event_reference_hashes_txn(
  847. txn, [event for event, _ in events_and_contexts]
  848. )
  849. state_events_and_contexts = [
  850. ec for ec in events_and_contexts if ec[0].is_state()
  851. ]
  852. state_values = []
  853. for event, context in state_events_and_contexts:
  854. vals = {
  855. "event_id": event.event_id,
  856. "room_id": event.room_id,
  857. "type": event.type,
  858. "state_key": event.state_key,
  859. }
  860. # TODO: How does this work with backfilling?
  861. if hasattr(event, "replaces_state"):
  862. vals["prev_state"] = event.replaces_state
  863. state_values.append(vals)
  864. self.db_pool.simple_insert_many_txn(
  865. txn, table="state_events", values=state_values
  866. )
  867. # Prefill the event cache
  868. self._add_to_cache(txn, events_and_contexts)
  869. def _add_to_cache(self, txn, events_and_contexts):
  870. to_prefill = []
  871. rows = []
  872. N = 200
  873. for i in range(0, len(events_and_contexts), N):
  874. ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]}
  875. if not ev_map:
  876. break
  877. sql = (
  878. "SELECT "
  879. " e.event_id as event_id, "
  880. " r.redacts as redacts,"
  881. " rej.event_id as rejects "
  882. " FROM events as e"
  883. " LEFT JOIN rejections as rej USING (event_id)"
  884. " LEFT JOIN redactions as r ON e.event_id = r.redacts"
  885. " WHERE "
  886. )
  887. clause, args = make_in_list_sql_clause(
  888. self.database_engine, "e.event_id", list(ev_map)
  889. )
  890. txn.execute(sql + clause, args)
  891. rows = self.db_pool.cursor_to_dict(txn)
  892. for row in rows:
  893. event = ev_map[row["event_id"]]
  894. if not row["rejects"] and not row["redacts"]:
  895. to_prefill.append(
  896. _EventCacheEntry(event=event, redacted_event=None)
  897. )
  898. def prefill():
  899. for cache_entry in to_prefill:
  900. self.store._get_event_cache.prefill(
  901. (cache_entry[0].event_id,), cache_entry
  902. )
  903. txn.call_after(prefill)
  904. def _store_redaction(self, txn, event):
  905. # invalidate the cache for the redacted event
  906. txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
  907. self.db_pool.simple_insert_txn(
  908. txn,
  909. table="redactions",
  910. values={
  911. "event_id": event.event_id,
  912. "redacts": event.redacts,
  913. "received_ts": self._clock.time_msec(),
  914. },
  915. )
  916. def insert_labels_for_event_txn(
  917. self, txn, event_id, labels, room_id, topological_ordering
  918. ):
  919. """Store the mapping between an event's ID and its labels, with one row per
  920. (event_id, label) tuple.
  921. Args:
  922. txn (LoggingTransaction): The transaction to execute.
  923. event_id (str): The event's ID.
  924. labels (list[str]): A list of text labels.
  925. room_id (str): The ID of the room the event was sent to.
  926. topological_ordering (int): The position of the event in the room's topology.
  927. """
  928. return self.db_pool.simple_insert_many_txn(
  929. txn=txn,
  930. table="event_labels",
  931. values=[
  932. {
  933. "event_id": event_id,
  934. "label": label,
  935. "room_id": room_id,
  936. "topological_ordering": topological_ordering,
  937. }
  938. for label in labels
  939. ],
  940. )
  941. def _insert_event_expiry_txn(self, txn, event_id, expiry_ts):
  942. """Save the expiry timestamp associated with a given event ID.
  943. Args:
  944. txn (LoggingTransaction): The database transaction to use.
  945. event_id (str): The event ID the expiry timestamp is associated with.
  946. expiry_ts (int): The timestamp at which to expire (delete) the event.
  947. """
  948. return self.db_pool.simple_insert_txn(
  949. txn=txn,
  950. table="event_expiry",
  951. values={"event_id": event_id, "expiry_ts": expiry_ts},
  952. )
  953. def _store_event_reference_hashes_txn(self, txn, events):
  954. """Store a hash for a PDU
  955. Args:
  956. txn (cursor):
  957. events (list): list of Events.
  958. """
  959. vals = []
  960. for event in events:
  961. ref_alg, ref_hash_bytes = compute_event_reference_hash(event)
  962. vals.append(
  963. {
  964. "event_id": event.event_id,
  965. "algorithm": ref_alg,
  966. "hash": memoryview(ref_hash_bytes),
  967. }
  968. )
  969. self.db_pool.simple_insert_many_txn(
  970. txn, table="event_reference_hashes", values=vals
  971. )
  972. def _store_room_members_txn(self, txn, events, backfilled):
  973. """Store a room member in the database.
  974. """
  975. self.db_pool.simple_insert_many_txn(
  976. txn,
  977. table="room_memberships",
  978. values=[
  979. {
  980. "event_id": event.event_id,
  981. "user_id": event.state_key,
  982. "sender": event.user_id,
  983. "room_id": event.room_id,
  984. "membership": event.membership,
  985. "display_name": event.content.get("displayname", None),
  986. "avatar_url": event.content.get("avatar_url", None),
  987. }
  988. for event in events
  989. ],
  990. )
  991. for event in events:
  992. txn.call_after(
  993. self.store._membership_stream_cache.entity_has_changed,
  994. event.state_key,
  995. event.internal_metadata.stream_ordering,
  996. )
  997. txn.call_after(
  998. self.store.get_invited_rooms_for_local_user.invalidate,
  999. (event.state_key,),
  1000. )
  1001. # We update the local_current_membership table only if the event is
  1002. # "current", i.e., its something that has just happened.
  1003. #
  1004. # This will usually get updated by the `current_state_events` handling,
  1005. # unless its an outlier, and an outlier is only "current" if it's an "out of
  1006. # band membership", like a remote invite or a rejection of a remote invite.
  1007. if (
  1008. self.is_mine_id(event.state_key)
  1009. and not backfilled
  1010. and event.internal_metadata.is_outlier()
  1011. and event.internal_metadata.is_out_of_band_membership()
  1012. ):
  1013. self.db_pool.simple_upsert_txn(
  1014. txn,
  1015. table="local_current_membership",
  1016. keyvalues={"room_id": event.room_id, "user_id": event.state_key},
  1017. values={
  1018. "event_id": event.event_id,
  1019. "membership": event.membership,
  1020. },
  1021. )
  1022. def _handle_event_relations(self, txn, event):
  1023. """Handles inserting relation data during peristence of events
  1024. Args:
  1025. txn
  1026. event (EventBase)
  1027. """
  1028. relation = event.content.get("m.relates_to")
  1029. if not relation:
  1030. # No relations
  1031. return
  1032. rel_type = relation.get("rel_type")
  1033. if rel_type not in (
  1034. RelationTypes.ANNOTATION,
  1035. RelationTypes.REFERENCE,
  1036. RelationTypes.REPLACE,
  1037. ):
  1038. # Unknown relation type
  1039. return
  1040. parent_id = relation.get("event_id")
  1041. if not parent_id:
  1042. # Invalid relation
  1043. return
  1044. aggregation_key = relation.get("key")
  1045. self.db_pool.simple_insert_txn(
  1046. txn,
  1047. table="event_relations",
  1048. values={
  1049. "event_id": event.event_id,
  1050. "relates_to_id": parent_id,
  1051. "relation_type": rel_type,
  1052. "aggregation_key": aggregation_key,
  1053. },
  1054. )
  1055. txn.call_after(self.store.get_relations_for_event.invalidate_many, (parent_id,))
  1056. txn.call_after(
  1057. self.store.get_aggregation_groups_for_event.invalidate_many, (parent_id,)
  1058. )
  1059. if rel_type == RelationTypes.REPLACE:
  1060. txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
  1061. def _handle_redaction(self, txn, redacted_event_id):
  1062. """Handles receiving a redaction and checking whether we need to remove
  1063. any redacted relations from the database.
  1064. Args:
  1065. txn
  1066. redacted_event_id (str): The event that was redacted.
  1067. """
  1068. self.db_pool.simple_delete_txn(
  1069. txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
  1070. )
  1071. def _store_room_topic_txn(self, txn, event):
  1072. if hasattr(event, "content") and "topic" in event.content:
  1073. self.store_event_search_txn(
  1074. txn, event, "content.topic", event.content["topic"]
  1075. )
  1076. def _store_room_name_txn(self, txn, event):
  1077. if hasattr(event, "content") and "name" in event.content:
  1078. self.store_event_search_txn(
  1079. txn, event, "content.name", event.content["name"]
  1080. )
  1081. def _store_room_message_txn(self, txn, event):
  1082. if hasattr(event, "content") and "body" in event.content:
  1083. self.store_event_search_txn(
  1084. txn, event, "content.body", event.content["body"]
  1085. )
  1086. def _store_retention_policy_for_room_txn(self, txn, event):
  1087. if hasattr(event, "content") and (
  1088. "min_lifetime" in event.content or "max_lifetime" in event.content
  1089. ):
  1090. if (
  1091. "min_lifetime" in event.content
  1092. and not isinstance(event.content.get("min_lifetime"), int)
  1093. ) or (
  1094. "max_lifetime" in event.content
  1095. and not isinstance(event.content.get("max_lifetime"), int)
  1096. ):
  1097. # Ignore the event if one of the value isn't an integer.
  1098. return
  1099. self.db_pool.simple_insert_txn(
  1100. txn=txn,
  1101. table="room_retention",
  1102. values={
  1103. "room_id": event.room_id,
  1104. "event_id": event.event_id,
  1105. "min_lifetime": event.content.get("min_lifetime"),
  1106. "max_lifetime": event.content.get("max_lifetime"),
  1107. },
  1108. )
  1109. self.store._invalidate_cache_and_stream(
  1110. txn, self.store.get_retention_policy_for_room, (event.room_id,)
  1111. )
  1112. def store_event_search_txn(self, txn, event, key, value):
  1113. """Add event to the search table
  1114. Args:
  1115. txn (cursor):
  1116. event (EventBase):
  1117. key (str):
  1118. value (str):
  1119. """
  1120. self.store.store_search_entries_txn(
  1121. txn,
  1122. (
  1123. SearchEntry(
  1124. key=key,
  1125. value=value,
  1126. event_id=event.event_id,
  1127. room_id=event.room_id,
  1128. stream_ordering=event.internal_metadata.stream_ordering,
  1129. origin_server_ts=event.origin_server_ts,
  1130. ),
  1131. ),
  1132. )
  1133. def _set_push_actions_for_event_and_users_txn(
  1134. self, txn, events_and_contexts, all_events_and_contexts
  1135. ):
  1136. """Handles moving push actions from staging table to main
  1137. event_push_actions table for all events in `events_and_contexts`.
  1138. Also ensures that all events in `all_events_and_contexts` are removed
  1139. from the push action staging area.
  1140. Args:
  1141. events_and_contexts (list[(EventBase, EventContext)]): events
  1142. we are persisting
  1143. all_events_and_contexts (list[(EventBase, EventContext)]): all
  1144. events that we were going to persist. This includes events
  1145. we've already persisted, etc, that wouldn't appear in
  1146. events_and_context.
  1147. """
  1148. sql = """
  1149. INSERT INTO event_push_actions (
  1150. room_id, event_id, user_id, actions, stream_ordering,
  1151. topological_ordering, notif, highlight
  1152. )
  1153. SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
  1154. FROM event_push_actions_staging
  1155. WHERE event_id = ?
  1156. """
  1157. if events_and_contexts:
  1158. txn.executemany(
  1159. sql,
  1160. (
  1161. (
  1162. event.room_id,
  1163. event.internal_metadata.stream_ordering,
  1164. event.depth,
  1165. event.event_id,
  1166. )
  1167. for event, _ in events_and_contexts
  1168. ),
  1169. )
  1170. for event, _ in events_and_contexts:
  1171. user_ids = self.db_pool.simple_select_onecol_txn(
  1172. txn,
  1173. table="event_push_actions_staging",
  1174. keyvalues={"event_id": event.event_id},
  1175. retcol="user_id",
  1176. )
  1177. for uid in user_ids:
  1178. txn.call_after(
  1179. self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
  1180. (event.room_id, uid),
  1181. )
  1182. # Now we delete the staging area for *all* events that were being
  1183. # persisted.
  1184. txn.executemany(
  1185. "DELETE FROM event_push_actions_staging WHERE event_id = ?",
  1186. ((event.event_id,) for event, _ in all_events_and_contexts),
  1187. )
  1188. def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
  1189. # Sad that we have to blow away the cache for the whole room here
  1190. txn.call_after(
  1191. self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
  1192. (room_id,),
  1193. )
  1194. txn.execute(
  1195. "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
  1196. (room_id, event_id),
  1197. )
  1198. def _store_rejections_txn(self, txn, event_id, reason):
  1199. self.db_pool.simple_insert_txn(
  1200. txn,
  1201. table="rejections",
  1202. values={
  1203. "event_id": event_id,
  1204. "reason": reason,
  1205. "last_check": self._clock.time_msec(),
  1206. },
  1207. )
  1208. def _store_event_state_mappings_txn(
  1209. self, txn, events_and_contexts: Iterable[Tuple[EventBase, EventContext]]
  1210. ):
  1211. state_groups = {}
  1212. for event, context in events_and_contexts:
  1213. if event.internal_metadata.is_outlier():
  1214. continue
  1215. # if the event was rejected, just give it the same state as its
  1216. # predecessor.
  1217. if context.rejected:
  1218. state_groups[event.event_id] = context.state_group_before_event
  1219. continue
  1220. state_groups[event.event_id] = context.state_group
  1221. self.db_pool.simple_insert_many_txn(
  1222. txn,
  1223. table="event_to_state_groups",
  1224. values=[
  1225. {"state_group": state_group_id, "event_id": event_id}
  1226. for event_id, state_group_id in state_groups.items()
  1227. ],
  1228. )
  1229. for event_id, state_group_id in state_groups.items():
  1230. txn.call_after(
  1231. self.store._get_state_group_for_event.prefill,
  1232. (event_id,),
  1233. state_group_id,
  1234. )
  1235. def _update_min_depth_for_room_txn(self, txn, room_id, depth):
  1236. min_depth = self.store._get_min_depth_interaction(txn, room_id)
  1237. if min_depth is not None and depth >= min_depth:
  1238. return
  1239. self.db_pool.simple_upsert_txn(
  1240. txn,
  1241. table="room_depth",
  1242. keyvalues={"room_id": room_id},
  1243. values={"min_depth": depth},
  1244. )
  1245. def _handle_mult_prev_events(self, txn, events):
  1246. """
  1247. For the given event, update the event edges table and forward and
  1248. backward extremities tables.
  1249. """
  1250. self.db_pool.simple_insert_many_txn(
  1251. txn,
  1252. table="event_edges",
  1253. values=[
  1254. {
  1255. "event_id": ev.event_id,
  1256. "prev_event_id": e_id,
  1257. "room_id": ev.room_id,
  1258. "is_state": False,
  1259. }
  1260. for ev in events
  1261. for e_id in ev.prev_event_ids()
  1262. ],
  1263. )
  1264. self._update_backward_extremeties(txn, events)
  1265. def _update_backward_extremeties(self, txn, events):
  1266. """Updates the event_backward_extremities tables based on the new/updated
  1267. events being persisted.
  1268. This is called for new events *and* for events that were outliers, but
  1269. are now being persisted as non-outliers.
  1270. Forward extremities are handled when we first start persisting the events.
  1271. """
  1272. events_by_room = {}
  1273. for ev in events:
  1274. events_by_room.setdefault(ev.room_id, []).append(ev)
  1275. query = (
  1276. "INSERT INTO event_backward_extremities (event_id, room_id)"
  1277. " SELECT ?, ? WHERE NOT EXISTS ("
  1278. " SELECT 1 FROM event_backward_extremities"
  1279. " WHERE event_id = ? AND room_id = ?"
  1280. " )"
  1281. " AND NOT EXISTS ("
  1282. " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
  1283. " AND outlier = ?"
  1284. " )"
  1285. )
  1286. txn.executemany(
  1287. query,
  1288. [
  1289. (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
  1290. for ev in events
  1291. for e_id in ev.prev_event_ids()
  1292. if not ev.internal_metadata.is_outlier()
  1293. ],
  1294. )
  1295. query = (
  1296. "DELETE FROM event_backward_extremities"
  1297. " WHERE event_id = ? AND room_id = ?"
  1298. )
  1299. txn.executemany(
  1300. query,
  1301. [
  1302. (ev.event_id, ev.room_id)
  1303. for ev in events
  1304. if not ev.internal_metadata.is_outlier()
  1305. ],
  1306. )