You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1612 lines
57 KiB

  1. # Copyright 2014-2016 OpenMarket Ltd
  2. # Copyright 2017 Vector Creations Ltd
  3. # Copyright 2018-2019 New Vector Ltd
  4. # Copyright 2019 The Matrix.org Foundation C.I.C.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. """ This module is responsible for getting events from the DB for pagination
  18. and event streaming.
  19. The order it returns events in depend on whether we are streaming forwards or
  20. are paginating backwards. We do this because we want to handle out of order
  21. messages nicely, while still returning them in the correct order when we
  22. paginate bacwards.
  23. This is implemented by keeping two ordering columns: stream_ordering and
  24. topological_ordering. Stream ordering is basically insertion/received order
  25. (except for events from backfill requests). The topological_ordering is a
  26. weak ordering of events based on the pdu graph.
  27. This means that we have to have two different types of tokens, depending on
  28. what sort order was used:
  29. - stream tokens are of the form: "s%d", which maps directly to the column
  30. - topological tokems: "t%d-%d", where the integers map to the topological
  31. and stream ordering columns respectively.
  32. """
  33. import logging
  34. from typing import (
  35. TYPE_CHECKING,
  36. Any,
  37. Collection,
  38. Dict,
  39. List,
  40. Optional,
  41. Set,
  42. Tuple,
  43. cast,
  44. overload,
  45. )
  46. import attr
  47. from immutabledict import immutabledict
  48. from typing_extensions import Literal
  49. from twisted.internet import defer
  50. from synapse.api.constants import Direction
  51. from synapse.api.filtering import Filter
  52. from synapse.events import EventBase
  53. from synapse.logging.context import make_deferred_yieldable, run_in_background
  54. from synapse.logging.opentracing import trace
  55. from synapse.storage._base import SQLBaseStore
  56. from synapse.storage.database import (
  57. DatabasePool,
  58. LoggingDatabaseConnection,
  59. LoggingTransaction,
  60. make_in_list_sql_clause,
  61. )
  62. from synapse.storage.databases.main.events_worker import EventsWorkerStore
  63. from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
  64. from synapse.storage.util.id_generators import MultiWriterIdGenerator
  65. from synapse.types import PersistedEventPosition, RoomStreamToken
  66. from synapse.util.caches.descriptors import cached
  67. from synapse.util.caches.stream_change_cache import StreamChangeCache
  68. from synapse.util.cancellation import cancellable
  69. if TYPE_CHECKING:
  70. from synapse.server import HomeServer
  71. logger = logging.getLogger(__name__)
  72. MAX_STREAM_SIZE = 1000
  73. _STREAM_TOKEN = "stream"
  74. _TOPOLOGICAL_TOKEN = "topological"
  75. # Used as return values for pagination APIs
  76. @attr.s(slots=True, frozen=True, auto_attribs=True)
  77. class _EventDictReturn:
  78. event_id: str
  79. topological_ordering: Optional[int]
  80. stream_ordering: int
  81. @attr.s(slots=True, frozen=True, auto_attribs=True)
  82. class _EventsAround:
  83. events_before: List[EventBase]
  84. events_after: List[EventBase]
  85. start: RoomStreamToken
  86. end: RoomStreamToken
  87. def generate_pagination_where_clause(
  88. direction: Direction,
  89. column_names: Tuple[str, str],
  90. from_token: Optional[Tuple[Optional[int], int]],
  91. to_token: Optional[Tuple[Optional[int], int]],
  92. engine: BaseDatabaseEngine,
  93. ) -> str:
  94. """Creates an SQL expression to bound the columns by the pagination
  95. tokens.
  96. For example creates an SQL expression like:
  97. (6, 7) >= (topological_ordering, stream_ordering)
  98. AND (5, 3) < (topological_ordering, stream_ordering)
  99. would be generated for dir=b, from_token=(6, 7) and to_token=(5, 3).
  100. Note that tokens are considered to be after the row they are in, e.g. if
  101. a row A has a token T, then we consider A to be before T. This convention
  102. is important when figuring out inequalities for the generated SQL, and
  103. produces the following result:
  104. - If paginating forwards then we exclude any rows matching the from
  105. token, but include those that match the to token.
  106. - If paginating backwards then we include any rows matching the from
  107. token, but include those that match the to token.
  108. Args:
  109. direction: Whether we're paginating backwards or forwards.
  110. column_names: The column names to bound. Must *not* be user defined as
  111. these get inserted directly into the SQL statement without escapes.
  112. from_token: The start point for the pagination. This is an exclusive
  113. minimum bound if direction is forwards, and an inclusive maximum bound if
  114. direction is backwards.
  115. to_token: The endpoint point for the pagination. This is an inclusive
  116. maximum bound if direction is forwards, and an exclusive minimum bound if
  117. direction is backwards.
  118. engine: The database engine to generate the clauses for
  119. Returns:
  120. The sql expression
  121. """
  122. where_clause = []
  123. if from_token:
  124. where_clause.append(
  125. _make_generic_sql_bound(
  126. bound=">=" if direction == Direction.BACKWARDS else "<",
  127. column_names=column_names,
  128. values=from_token,
  129. engine=engine,
  130. )
  131. )
  132. if to_token:
  133. where_clause.append(
  134. _make_generic_sql_bound(
  135. bound="<" if direction == Direction.BACKWARDS else ">=",
  136. column_names=column_names,
  137. values=to_token,
  138. engine=engine,
  139. )
  140. )
  141. return " AND ".join(where_clause)
  142. def generate_pagination_bounds(
  143. direction: Direction,
  144. from_token: Optional[RoomStreamToken],
  145. to_token: Optional[RoomStreamToken],
  146. ) -> Tuple[
  147. str, Optional[Tuple[Optional[int], int]], Optional[Tuple[Optional[int], int]]
  148. ]:
  149. """
  150. Generate a start and end point for this page of events.
  151. Args:
  152. direction: Whether pagination is going forwards or backwards.
  153. from_token: The token to start pagination at, or None to start at the first value.
  154. to_token: The token to end pagination at, or None to not limit the end point.
  155. Returns:
  156. A three tuple of:
  157. ASC or DESC for sorting of the query.
  158. The starting position as a tuple of ints representing
  159. (topological position, stream position) or None if no from_token was
  160. provided. The topological position may be None for live tokens.
  161. The end position in the same format as the starting position, or None
  162. if no to_token was provided.
  163. """
  164. # Tokens really represent positions between elements, but we use
  165. # the convention of pointing to the event before the gap. Hence
  166. # we have a bit of asymmetry when it comes to equalities.
  167. if direction == Direction.BACKWARDS:
  168. order = "DESC"
  169. else:
  170. order = "ASC"
  171. # The bounds for the stream tokens are complicated by the fact
  172. # that we need to handle the instance_map part of the tokens. We do this
  173. # by fetching all events between the min stream token and the maximum
  174. # stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
  175. # then filtering the results.
  176. from_bound: Optional[Tuple[Optional[int], int]] = None
  177. if from_token:
  178. if from_token.topological is not None:
  179. from_bound = from_token.as_historical_tuple()
  180. elif direction == Direction.BACKWARDS:
  181. from_bound = (
  182. None,
  183. from_token.get_max_stream_pos(),
  184. )
  185. else:
  186. from_bound = (
  187. None,
  188. from_token.stream,
  189. )
  190. to_bound: Optional[Tuple[Optional[int], int]] = None
  191. if to_token:
  192. if to_token.topological is not None:
  193. to_bound = to_token.as_historical_tuple()
  194. elif direction == Direction.BACKWARDS:
  195. to_bound = (
  196. None,
  197. to_token.stream,
  198. )
  199. else:
  200. to_bound = (
  201. None,
  202. to_token.get_max_stream_pos(),
  203. )
  204. return order, from_bound, to_bound
  205. def generate_next_token(
  206. direction: Direction, last_topo_ordering: int, last_stream_ordering: int
  207. ) -> RoomStreamToken:
  208. """
  209. Generate the next room stream token based on the currently returned data.
  210. Args:
  211. direction: Whether pagination is going forwards or backwards.
  212. last_topo_ordering: The last topological ordering being returned.
  213. last_stream_ordering: The last stream ordering being returned.
  214. Returns:
  215. A new RoomStreamToken to return to the client.
  216. """
  217. if direction == Direction.BACKWARDS:
  218. # Tokens are positions between events.
  219. # This token points *after* the last event in the chunk.
  220. # We need it to point to the event before it in the chunk
  221. # when we are going backwards so we subtract one from the
  222. # stream part.
  223. last_stream_ordering -= 1
  224. return RoomStreamToken(last_topo_ordering, last_stream_ordering)
  225. def _make_generic_sql_bound(
  226. bound: str,
  227. column_names: Tuple[str, str],
  228. values: Tuple[Optional[int], int],
  229. engine: BaseDatabaseEngine,
  230. ) -> str:
  231. """Create an SQL expression that bounds the given column names by the
  232. values, e.g. create the equivalent of `(1, 2) < (col1, col2)`.
  233. Only works with two columns.
  234. Older versions of SQLite don't support that syntax so we have to expand it
  235. out manually.
  236. Args:
  237. bound: The comparison operator to use. One of ">", "<", ">=",
  238. "<=", where the values are on the left and columns on the right.
  239. names: The column names. Must *not* be user defined
  240. as these get inserted directly into the SQL statement without
  241. escapes.
  242. values: The values to bound the columns by. If
  243. the first value is None then only creates a bound on the second
  244. column.
  245. engine: The database engine to generate the SQL for
  246. Returns:
  247. The SQL statement
  248. """
  249. assert bound in (">", "<", ">=", "<=")
  250. name1, name2 = column_names
  251. val1, val2 = values
  252. if val1 is None:
  253. val2 = int(val2)
  254. return "(%d %s %s)" % (val2, bound, name2)
  255. val1 = int(val1)
  256. val2 = int(val2)
  257. if isinstance(engine, PostgresEngine):
  258. # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
  259. # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
  260. # use the later form when running against postgres.
  261. return "((%d,%d) %s (%s,%s))" % (val1, val2, bound, name1, name2)
  262. # We want to generate queries of e.g. the form:
  263. #
  264. # (val1 < name1 OR (val1 = name1 AND val2 <= name2))
  265. #
  266. # which is equivalent to (val1, val2) < (name1, name2)
  267. return """(
  268. {val1:d} {strict_bound} {name1}
  269. OR ({val1:d} = {name1} AND {val2:d} {bound} {name2})
  270. )""".format(
  271. name1=name1,
  272. val1=val1,
  273. name2=name2,
  274. val2=val2,
  275. strict_bound=bound[0], # The first bound must always be strict equality here
  276. bound=bound,
  277. )
  278. def _filter_results(
  279. lower_token: Optional[RoomStreamToken],
  280. upper_token: Optional[RoomStreamToken],
  281. instance_name: str,
  282. topological_ordering: int,
  283. stream_ordering: int,
  284. ) -> bool:
  285. """Returns True if the event persisted by the given instance at the given
  286. topological/stream_ordering falls between the two tokens (taking a None
  287. token to mean unbounded).
  288. Used to filter results from fetching events in the DB against the given
  289. tokens. This is necessary to handle the case where the tokens include
  290. position maps, which we handle by fetching more than necessary from the DB
  291. and then filtering (rather than attempting to construct a complicated SQL
  292. query).
  293. """
  294. event_historical_tuple = (
  295. topological_ordering,
  296. stream_ordering,
  297. )
  298. if lower_token:
  299. if lower_token.topological is not None:
  300. # If these are historical tokens we compare the `(topological, stream)`
  301. # tuples.
  302. if event_historical_tuple <= lower_token.as_historical_tuple():
  303. return False
  304. else:
  305. # If these are live tokens we compare the stream ordering against the
  306. # writers stream position.
  307. if stream_ordering <= lower_token.get_stream_pos_for_instance(
  308. instance_name
  309. ):
  310. return False
  311. if upper_token:
  312. if upper_token.topological is not None:
  313. if upper_token.as_historical_tuple() < event_historical_tuple:
  314. return False
  315. else:
  316. if upper_token.get_stream_pos_for_instance(instance_name) < stream_ordering:
  317. return False
  318. return True
  319. def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
  320. # NB: This may create SQL clauses that don't optimise well (and we don't
  321. # have indices on all possible clauses). E.g. it may create
  322. # "room_id == X AND room_id != X", which postgres doesn't optimise.
  323. if not event_filter:
  324. return "", []
  325. clauses = []
  326. args = []
  327. if event_filter.types:
  328. clauses.append(
  329. "(%s)" % " OR ".join("event.type = ?" for _ in event_filter.types)
  330. )
  331. args.extend(event_filter.types)
  332. for typ in event_filter.not_types:
  333. clauses.append("event.type != ?")
  334. args.append(typ)
  335. if event_filter.senders:
  336. clauses.append(
  337. "(%s)" % " OR ".join("event.sender = ?" for _ in event_filter.senders)
  338. )
  339. args.extend(event_filter.senders)
  340. for sender in event_filter.not_senders:
  341. clauses.append("event.sender != ?")
  342. args.append(sender)
  343. if event_filter.rooms:
  344. clauses.append(
  345. "(%s)" % " OR ".join("event.room_id = ?" for _ in event_filter.rooms)
  346. )
  347. args.extend(event_filter.rooms)
  348. for room_id in event_filter.not_rooms:
  349. clauses.append("event.room_id != ?")
  350. args.append(room_id)
  351. if event_filter.contains_url:
  352. clauses.append("event.contains_url = ?")
  353. args.append(event_filter.contains_url)
  354. # We're only applying the "labels" filter on the database query, because applying the
  355. # "not_labels" filter via a SQL query is non-trivial. Instead, we let
  356. # event_filter.check_fields apply it, which is not as efficient but makes the
  357. # implementation simpler.
  358. if event_filter.labels:
  359. clauses.append("(%s)" % " OR ".join("label = ?" for _ in event_filter.labels))
  360. args.extend(event_filter.labels)
  361. # Filter on relation_senders / relation types from the joined tables.
  362. if event_filter.related_by_senders:
  363. clauses.append(
  364. "(%s)"
  365. % " OR ".join(
  366. "related_event.sender = ?" for _ in event_filter.related_by_senders
  367. )
  368. )
  369. args.extend(event_filter.related_by_senders)
  370. if event_filter.related_by_rel_types:
  371. clauses.append(
  372. "(%s)"
  373. % " OR ".join(
  374. "relation_type = ?" for _ in event_filter.related_by_rel_types
  375. )
  376. )
  377. args.extend(event_filter.related_by_rel_types)
  378. if event_filter.rel_types:
  379. clauses.append(
  380. "(%s)"
  381. % " OR ".join(
  382. "event_relation.relation_type = ?" for _ in event_filter.rel_types
  383. )
  384. )
  385. args.extend(event_filter.rel_types)
  386. if event_filter.not_rel_types:
  387. clauses.append(
  388. "((%s) OR event_relation.relation_type IS NULL)"
  389. % " AND ".join(
  390. "event_relation.relation_type != ?" for _ in event_filter.not_rel_types
  391. )
  392. )
  393. args.extend(event_filter.not_rel_types)
  394. return " AND ".join(clauses), args
  395. class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
  396. def __init__(
  397. self,
  398. database: DatabasePool,
  399. db_conn: LoggingDatabaseConnection,
  400. hs: "HomeServer",
  401. ):
  402. super().__init__(database, db_conn, hs)
  403. self._instance_name = hs.get_instance_name()
  404. self._send_federation = hs.should_send_federation()
  405. self._federation_shard_config = hs.config.worker.federation_shard_config
  406. # If we're a process that sends federation we may need to reset the
  407. # `federation_stream_position` table to match the current sharding
  408. # config. We don't do this now as otherwise two processes could conflict
  409. # during startup which would cause one to die.
  410. self._need_to_reset_federation_stream_positions = self._send_federation
  411. events_max = self.get_room_max_stream_ordering()
  412. event_cache_prefill, min_event_val = self.db_pool.get_cache_dict(
  413. db_conn,
  414. "events",
  415. entity_column="room_id",
  416. stream_column="stream_ordering",
  417. max_value=events_max,
  418. )
  419. self._events_stream_cache = StreamChangeCache(
  420. "EventsRoomStreamChangeCache",
  421. min_event_val,
  422. prefilled_cache=event_cache_prefill,
  423. )
  424. self._membership_stream_cache = StreamChangeCache(
  425. "MembershipStreamChangeCache", events_max
  426. )
  427. self._stream_order_on_start = self.get_room_max_stream_ordering()
  428. self._min_stream_order_on_start = self.get_room_min_stream_ordering()
  429. def get_room_max_stream_ordering(self) -> int:
  430. """Get the stream_ordering of regular events that we have committed up to
  431. Returns the maximum stream id such that all stream ids less than or
  432. equal to it have been successfully persisted.
  433. """
  434. return self._stream_id_gen.get_current_token()
  435. def get_room_min_stream_ordering(self) -> int:
  436. """Get the stream_ordering of backfilled events that we have committed up to
  437. Backfilled events use *negative* stream orderings, so this returns the
  438. minimum negative stream id such that all stream ids greater than or
  439. equal to it have been successfully persisted.
  440. """
  441. return self._backfill_id_gen.get_current_token()
  442. def get_room_max_token(self) -> RoomStreamToken:
  443. """Get a `RoomStreamToken` that marks the current maximum persisted
  444. position of the events stream. Useful to get a token that represents
  445. "now".
  446. The token returned is a "live" token that may have an instance_map
  447. component.
  448. """
  449. min_pos = self._stream_id_gen.get_current_token()
  450. positions = {}
  451. if isinstance(self._stream_id_gen, MultiWriterIdGenerator):
  452. # The `min_pos` is the minimum position that we know all instances
  453. # have finished persisting to, so we only care about instances whose
  454. # positions are ahead of that. (Instance positions can be behind the
  455. # min position as there are times we can work out that the minimum
  456. # position is ahead of the naive minimum across all current
  457. # positions. See MultiWriterIdGenerator for details)
  458. positions = {
  459. i: p
  460. for i, p in self._stream_id_gen.get_positions().items()
  461. if p > min_pos
  462. }
  463. return RoomStreamToken(None, min_pos, immutabledict(positions))
  464. async def get_room_events_stream_for_rooms(
  465. self,
  466. room_ids: Collection[str],
  467. from_key: RoomStreamToken,
  468. to_key: RoomStreamToken,
  469. limit: int = 0,
  470. order: str = "DESC",
  471. ) -> Dict[str, Tuple[List[EventBase], RoomStreamToken]]:
  472. """Get new room events in stream ordering since `from_key`.
  473. Args:
  474. room_ids
  475. from_key: Token from which no events are returned before
  476. to_key: Token from which no events are returned after. (This
  477. is typically the current stream token)
  478. limit: Maximum number of events to return
  479. order: Either "DESC" or "ASC". Determines which events are
  480. returned when the result is limited. If "DESC" then the most
  481. recent `limit` events are returned, otherwise returns the
  482. oldest `limit` events.
  483. Returns:
  484. A map from room id to a tuple containing:
  485. - list of recent events in the room
  486. - stream ordering key for the start of the chunk of events returned.
  487. """
  488. room_ids = self._events_stream_cache.get_entities_changed(
  489. room_ids, from_key.stream
  490. )
  491. if not room_ids:
  492. return {}
  493. results = {}
  494. room_ids = list(room_ids)
  495. for rm_ids in (room_ids[i : i + 20] for i in range(0, len(room_ids), 20)):
  496. res = await make_deferred_yieldable(
  497. defer.gatherResults(
  498. [
  499. run_in_background(
  500. self.get_room_events_stream_for_room,
  501. room_id,
  502. from_key,
  503. to_key,
  504. limit,
  505. order=order,
  506. )
  507. for room_id in rm_ids
  508. ],
  509. consumeErrors=True,
  510. )
  511. )
  512. results.update(dict(zip(rm_ids, res)))
  513. return results
  514. def get_rooms_that_changed(
  515. self, room_ids: Collection[str], from_key: RoomStreamToken
  516. ) -> Set[str]:
  517. """Given a list of rooms and a token, return rooms where there may have
  518. been changes.
  519. """
  520. from_id = from_key.stream
  521. return {
  522. room_id
  523. for room_id in room_ids
  524. if self._events_stream_cache.has_entity_changed(room_id, from_id)
  525. }
  526. async def get_room_events_stream_for_room(
  527. self,
  528. room_id: str,
  529. from_key: RoomStreamToken,
  530. to_key: RoomStreamToken,
  531. limit: int = 0,
  532. order: str = "DESC",
  533. ) -> Tuple[List[EventBase], RoomStreamToken]:
  534. """Get new room events in stream ordering since `from_key`.
  535. Args:
  536. room_id
  537. from_key: Token from which no events are returned before
  538. to_key: Token from which no events are returned after. (This
  539. is typically the current stream token)
  540. limit: Maximum number of events to return
  541. order: Either "DESC" or "ASC". Determines which events are
  542. returned when the result is limited. If "DESC" then the most
  543. recent `limit` events are returned, otherwise returns the
  544. oldest `limit` events.
  545. Returns:
  546. The list of events (in ascending stream order) and the token from the start
  547. of the chunk of events returned.
  548. """
  549. if from_key == to_key:
  550. return [], from_key
  551. has_changed = self._events_stream_cache.has_entity_changed(
  552. room_id, from_key.stream
  553. )
  554. if not has_changed:
  555. return [], from_key
  556. def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
  557. # To handle tokens with a non-empty instance_map we fetch more
  558. # results than necessary and then filter down
  559. min_from_id = from_key.stream
  560. max_to_id = to_key.get_max_stream_pos()
  561. sql = """
  562. SELECT event_id, instance_name, topological_ordering, stream_ordering
  563. FROM events
  564. WHERE
  565. room_id = ?
  566. AND not outlier
  567. AND stream_ordering > ? AND stream_ordering <= ?
  568. ORDER BY stream_ordering %s LIMIT ?
  569. """ % (
  570. order,
  571. )
  572. txn.execute(sql, (room_id, min_from_id, max_to_id, 2 * limit))
  573. rows = [
  574. _EventDictReturn(event_id, None, stream_ordering)
  575. for event_id, instance_name, topological_ordering, stream_ordering in txn
  576. if _filter_results(
  577. from_key,
  578. to_key,
  579. instance_name,
  580. topological_ordering,
  581. stream_ordering,
  582. )
  583. ][:limit]
  584. return rows
  585. rows = await self.db_pool.runInteraction("get_room_events_stream_for_room", f)
  586. ret = await self.get_events_as_list(
  587. [r.event_id for r in rows], get_prev_content=True
  588. )
  589. self._set_before_and_after(ret, rows, topo_order=False)
  590. if order.lower() == "desc":
  591. ret.reverse()
  592. if rows:
  593. key = RoomStreamToken(None, min(r.stream_ordering for r in rows))
  594. else:
  595. # Assume we didn't get anything because there was nothing to
  596. # get.
  597. key = from_key
  598. return ret, key
  599. @cancellable
  600. async def get_membership_changes_for_user(
  601. self,
  602. user_id: str,
  603. from_key: RoomStreamToken,
  604. to_key: RoomStreamToken,
  605. excluded_rooms: Optional[List[str]] = None,
  606. ) -> List[EventBase]:
  607. """Fetch membership events for a given user.
  608. All such events whose stream ordering `s` lies in the range
  609. `from_key < s <= to_key` are returned. Events are ordered by ascending stream
  610. order.
  611. """
  612. # Start by ruling out cases where a DB query is not necessary.
  613. if from_key == to_key:
  614. return []
  615. if from_key:
  616. has_changed = self._membership_stream_cache.has_entity_changed(
  617. user_id, int(from_key.stream)
  618. )
  619. if not has_changed:
  620. return []
  621. def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
  622. # To handle tokens with a non-empty instance_map we fetch more
  623. # results than necessary and then filter down
  624. min_from_id = from_key.stream
  625. max_to_id = to_key.get_max_stream_pos()
  626. args: List[Any] = [user_id, min_from_id, max_to_id]
  627. ignore_room_clause = ""
  628. if excluded_rooms is not None and len(excluded_rooms) > 0:
  629. ignore_room_clause = "AND e.room_id NOT IN (%s)" % ",".join(
  630. "?" for _ in excluded_rooms
  631. )
  632. args = args + excluded_rooms
  633. sql = """
  634. SELECT m.event_id, instance_name, topological_ordering, stream_ordering
  635. FROM events AS e, room_memberships AS m
  636. WHERE e.event_id = m.event_id
  637. AND m.user_id = ?
  638. AND e.stream_ordering > ? AND e.stream_ordering <= ?
  639. %s
  640. ORDER BY e.stream_ordering ASC
  641. """ % (
  642. ignore_room_clause,
  643. )
  644. txn.execute(sql, args)
  645. rows = [
  646. _EventDictReturn(event_id, None, stream_ordering)
  647. for event_id, instance_name, topological_ordering, stream_ordering in txn
  648. if _filter_results(
  649. from_key,
  650. to_key,
  651. instance_name,
  652. topological_ordering,
  653. stream_ordering,
  654. )
  655. ]
  656. return rows
  657. rows = await self.db_pool.runInteraction("get_membership_changes_for_user", f)
  658. ret = await self.get_events_as_list(
  659. [r.event_id for r in rows], get_prev_content=True
  660. )
  661. self._set_before_and_after(ret, rows, topo_order=False)
  662. return ret
  663. async def get_recent_events_for_room(
  664. self, room_id: str, limit: int, end_token: RoomStreamToken
  665. ) -> Tuple[List[EventBase], RoomStreamToken]:
  666. """Get the most recent events in the room in topological ordering.
  667. Args:
  668. room_id
  669. limit
  670. end_token: The stream token representing now.
  671. Returns:
  672. A list of events and a token pointing to the start of the returned
  673. events. The events returned are in ascending topological order.
  674. """
  675. rows, token = await self.get_recent_event_ids_for_room(
  676. room_id, limit, end_token
  677. )
  678. events = await self.get_events_as_list(
  679. [r.event_id for r in rows], get_prev_content=True
  680. )
  681. self._set_before_and_after(events, rows)
  682. return events, token
  683. async def get_recent_event_ids_for_room(
  684. self, room_id: str, limit: int, end_token: RoomStreamToken
  685. ) -> Tuple[List[_EventDictReturn], RoomStreamToken]:
  686. """Get the most recent events in the room in topological ordering.
  687. Args:
  688. room_id
  689. limit
  690. end_token: The stream token representing now.
  691. Returns:
  692. A list of _EventDictReturn and a token pointing to the start of the
  693. returned events. The events returned are in ascending order.
  694. """
  695. # Allow a zero limit here, and no-op.
  696. if limit == 0:
  697. return [], end_token
  698. rows, token = await self.db_pool.runInteraction(
  699. "get_recent_event_ids_for_room",
  700. self._paginate_room_events_txn,
  701. room_id,
  702. from_token=end_token,
  703. limit=limit,
  704. )
  705. # We want to return the results in ascending order.
  706. rows.reverse()
  707. return rows, token
  708. async def get_room_event_before_stream_ordering(
  709. self, room_id: str, stream_ordering: int
  710. ) -> Optional[Tuple[int, int, str]]:
  711. """Gets details of the first event in a room at or before a stream ordering
  712. Args:
  713. room_id:
  714. stream_ordering:
  715. Returns:
  716. A tuple of (stream ordering, topological ordering, event_id)
  717. """
  718. def _f(txn: LoggingTransaction) -> Optional[Tuple[int, int, str]]:
  719. sql = """
  720. SELECT stream_ordering, topological_ordering, event_id
  721. FROM events
  722. LEFT JOIN rejections USING (event_id)
  723. WHERE room_id = ?
  724. AND stream_ordering <= ?
  725. AND NOT outlier
  726. AND rejections.event_id IS NULL
  727. ORDER BY stream_ordering DESC
  728. LIMIT 1
  729. """
  730. txn.execute(sql, (room_id, stream_ordering))
  731. return cast(Optional[Tuple[int, int, str]], txn.fetchone())
  732. return await self.db_pool.runInteraction(
  733. "get_room_event_before_stream_ordering", _f
  734. )
  735. async def get_last_event_in_room_before_stream_ordering(
  736. self,
  737. room_id: str,
  738. end_token: RoomStreamToken,
  739. ) -> Optional[str]:
  740. """Returns the ID of the last event in a room at or before a stream ordering
  741. Args:
  742. room_id
  743. end_token: The token used to stream from
  744. Returns:
  745. The ID of the most recent event, or None if there are no events in the room
  746. before this stream ordering.
  747. """
  748. def get_last_event_in_room_before_stream_ordering_txn(
  749. txn: LoggingTransaction,
  750. ) -> Optional[str]:
  751. # We need to handle the fact that the stream tokens can be vector
  752. # clocks. We do this by getting all rows between the minimum and
  753. # maximum stream ordering in the token, plus one row less than the
  754. # minimum stream ordering. We then filter the results against the
  755. # token and return the first row that matches.
  756. sql = """
  757. SELECT * FROM (
  758. SELECT instance_name, stream_ordering, topological_ordering, event_id
  759. FROM events
  760. LEFT JOIN rejections USING (event_id)
  761. WHERE room_id = ?
  762. AND ? < stream_ordering AND stream_ordering <= ?
  763. AND NOT outlier
  764. AND rejections.event_id IS NULL
  765. ORDER BY stream_ordering DESC
  766. ) AS a
  767. UNION
  768. SELECT * FROM (
  769. SELECT instance_name, stream_ordering, topological_ordering, event_id
  770. FROM events
  771. LEFT JOIN rejections USING (event_id)
  772. WHERE room_id = ?
  773. AND stream_ordering <= ?
  774. AND NOT outlier
  775. AND rejections.event_id IS NULL
  776. ORDER BY stream_ordering DESC
  777. LIMIT 1
  778. ) AS b
  779. """
  780. txn.execute(
  781. sql,
  782. (
  783. room_id,
  784. end_token.stream,
  785. end_token.get_max_stream_pos(),
  786. room_id,
  787. end_token.stream,
  788. ),
  789. )
  790. for instance_name, stream_ordering, topological_ordering, event_id in txn:
  791. if _filter_results(
  792. lower_token=None,
  793. upper_token=end_token,
  794. instance_name=instance_name,
  795. topological_ordering=topological_ordering,
  796. stream_ordering=stream_ordering,
  797. ):
  798. return event_id
  799. return None
  800. return await self.db_pool.runInteraction(
  801. "get_last_event_in_room_before_stream_ordering",
  802. get_last_event_in_room_before_stream_ordering_txn,
  803. )
  804. async def get_current_room_stream_token_for_room_id(
  805. self, room_id: str
  806. ) -> RoomStreamToken:
  807. """Returns the current position of the rooms stream (historic token)."""
  808. stream_ordering = self.get_room_max_stream_ordering()
  809. topo = await self.db_pool.runInteraction(
  810. "_get_max_topological_txn", self._get_max_topological_txn, room_id
  811. )
  812. return RoomStreamToken(topo, stream_ordering)
  813. @overload
  814. def get_stream_id_for_event_txn(
  815. self,
  816. txn: LoggingTransaction,
  817. event_id: str,
  818. allow_none: Literal[False] = False,
  819. ) -> int:
  820. ...
  821. @overload
  822. def get_stream_id_for_event_txn(
  823. self,
  824. txn: LoggingTransaction,
  825. event_id: str,
  826. allow_none: bool = False,
  827. ) -> Optional[int]:
  828. ...
  829. def get_stream_id_for_event_txn(
  830. self,
  831. txn: LoggingTransaction,
  832. event_id: str,
  833. allow_none: bool = False,
  834. ) -> Optional[int]:
  835. # Type ignore: we pass keyvalues a Dict[str, str]; the function wants
  836. # Dict[str, Any]. I think mypy is unhappy because Dict is invariant?
  837. return self.db_pool.simple_select_one_onecol_txn( # type: ignore[call-overload]
  838. txn=txn,
  839. table="events",
  840. keyvalues={"event_id": event_id},
  841. retcol="stream_ordering",
  842. allow_none=allow_none,
  843. )
  844. async def get_position_for_event(self, event_id: str) -> PersistedEventPosition:
  845. """Get the persisted position for an event"""
  846. row = await self.db_pool.simple_select_one(
  847. table="events",
  848. keyvalues={"event_id": event_id},
  849. retcols=("stream_ordering", "instance_name"),
  850. desc="get_position_for_event",
  851. )
  852. return PersistedEventPosition(
  853. row["instance_name"] or "master", row["stream_ordering"]
  854. )
  855. async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToken:
  856. """The stream token for an event
  857. Args:
  858. event_id: The id of the event to look up a stream token for.
  859. Raises:
  860. StoreError if the event wasn't in the database.
  861. Returns:
  862. A `RoomStreamToken` topological token.
  863. """
  864. row = await self.db_pool.simple_select_one(
  865. table="events",
  866. keyvalues={"event_id": event_id},
  867. retcols=("stream_ordering", "topological_ordering"),
  868. desc="get_topological_token_for_event",
  869. )
  870. return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])
  871. async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
  872. """Gets the topological token in a room after or at the given stream
  873. ordering.
  874. Args:
  875. room_id
  876. stream_key
  877. """
  878. if isinstance(self.database_engine, PostgresEngine):
  879. min_function = "LEAST"
  880. elif isinstance(self.database_engine, Sqlite3Engine):
  881. min_function = "MIN"
  882. else:
  883. raise RuntimeError(f"Unknown database engine {self.database_engine}")
  884. # This query used to be
  885. # SELECT COALESCE(MIN(topological_ordering), 0) FROM events
  886. # WHERE room_id = ? and events.stream_ordering >= {stream_key}
  887. # which returns 0 if the stream_key is newer than any event in
  888. # the room. That's not wrong, but it seems to interact oddly with backfill,
  889. # requiring a second call to /messages to actually backfill from a remote
  890. # homeserver.
  891. #
  892. # Instead, rollback the stream ordering to that after the most recent event in
  893. # this room.
  894. sql = f"""
  895. WITH fallback(max_stream_ordering) AS (
  896. SELECT MAX(stream_ordering)
  897. FROM events
  898. WHERE room_id = ?
  899. )
  900. SELECT COALESCE(MIN(topological_ordering), 0) FROM events
  901. WHERE
  902. room_id = ?
  903. AND events.stream_ordering >= {min_function}(
  904. ?,
  905. (SELECT max_stream_ordering FROM fallback)
  906. )
  907. """
  908. row = await self.db_pool.execute(
  909. "get_current_topological_token", None, sql, room_id, room_id, stream_key
  910. )
  911. return row[0][0] if row else 0
  912. def _get_max_topological_txn(self, txn: LoggingTransaction, room_id: str) -> int:
  913. txn.execute(
  914. "SELECT MAX(topological_ordering) FROM events WHERE room_id = ?",
  915. (room_id,),
  916. )
  917. rows = txn.fetchall()
  918. # An aggregate function like MAX() will always return one row per group
  919. # so we can safely rely on the lookup here. For example, when a we
  920. # lookup a `room_id` which does not exist, `rows` will look like
  921. # `[(None,)]`
  922. return rows[0][0] if rows[0][0] is not None else 0
  923. @staticmethod
  924. def _set_before_and_after(
  925. events: List[EventBase], rows: List[_EventDictReturn], topo_order: bool = True
  926. ) -> None:
  927. """Inserts ordering information to events' internal metadata from
  928. the DB rows.
  929. Args:
  930. events
  931. rows
  932. topo_order: Whether the events were ordered topologically or by stream
  933. ordering. If true then all rows should have a non null
  934. topological_ordering.
  935. """
  936. for event, row in zip(events, rows):
  937. stream = row.stream_ordering
  938. if topo_order and row.topological_ordering:
  939. topo: Optional[int] = row.topological_ordering
  940. else:
  941. topo = None
  942. internal = event.internal_metadata
  943. internal.before = RoomStreamToken(topo, stream - 1)
  944. internal.after = RoomStreamToken(topo, stream)
  945. internal.order = (int(topo) if topo else 0, int(stream))
  946. async def get_events_around(
  947. self,
  948. room_id: str,
  949. event_id: str,
  950. before_limit: int,
  951. after_limit: int,
  952. event_filter: Optional[Filter] = None,
  953. ) -> _EventsAround:
  954. """Retrieve events and pagination tokens around a given event in a
  955. room.
  956. """
  957. results = await self.db_pool.runInteraction(
  958. "get_events_around",
  959. self._get_events_around_txn,
  960. room_id,
  961. event_id,
  962. before_limit,
  963. after_limit,
  964. event_filter,
  965. )
  966. events_before = await self.get_events_as_list(
  967. list(results["before"]["event_ids"]), get_prev_content=True
  968. )
  969. events_after = await self.get_events_as_list(
  970. list(results["after"]["event_ids"]), get_prev_content=True
  971. )
  972. return _EventsAround(
  973. events_before=events_before,
  974. events_after=events_after,
  975. start=results["before"]["token"],
  976. end=results["after"]["token"],
  977. )
  978. def _get_events_around_txn(
  979. self,
  980. txn: LoggingTransaction,
  981. room_id: str,
  982. event_id: str,
  983. before_limit: int,
  984. after_limit: int,
  985. event_filter: Optional[Filter],
  986. ) -> dict:
  987. """Retrieves event_ids and pagination tokens around a given event in a
  988. room.
  989. Args:
  990. room_id
  991. event_id
  992. before_limit
  993. after_limit
  994. event_filter
  995. Returns:
  996. dict
  997. """
  998. results = self.db_pool.simple_select_one_txn(
  999. txn,
  1000. "events",
  1001. keyvalues={"event_id": event_id, "room_id": room_id},
  1002. retcols=["stream_ordering", "topological_ordering"],
  1003. )
  1004. # This cannot happen as `allow_none=False`.
  1005. assert results is not None
  1006. # Paginating backwards includes the event at the token, but paginating
  1007. # forward doesn't.
  1008. before_token = RoomStreamToken(
  1009. results["topological_ordering"] - 1, results["stream_ordering"]
  1010. )
  1011. after_token = RoomStreamToken(
  1012. results["topological_ordering"], results["stream_ordering"]
  1013. )
  1014. rows, start_token = self._paginate_room_events_txn(
  1015. txn,
  1016. room_id,
  1017. before_token,
  1018. direction=Direction.BACKWARDS,
  1019. limit=before_limit,
  1020. event_filter=event_filter,
  1021. )
  1022. events_before = [r.event_id for r in rows]
  1023. rows, end_token = self._paginate_room_events_txn(
  1024. txn,
  1025. room_id,
  1026. after_token,
  1027. direction=Direction.FORWARDS,
  1028. limit=after_limit,
  1029. event_filter=event_filter,
  1030. )
  1031. events_after = [r.event_id for r in rows]
  1032. return {
  1033. "before": {"event_ids": events_before, "token": start_token},
  1034. "after": {"event_ids": events_after, "token": end_token},
  1035. }
  1036. async def get_all_new_event_ids_stream(
  1037. self,
  1038. from_id: int,
  1039. current_id: int,
  1040. limit: int,
  1041. ) -> Tuple[int, Dict[str, Optional[int]]]:
  1042. """Get all new events
  1043. Returns all event ids with from_id < stream_ordering <= current_id.
  1044. Args:
  1045. from_id: the stream_ordering of the last event we processed
  1046. current_id: the stream_ordering of the most recently processed event
  1047. limit: the maximum number of events to return
  1048. Returns:
  1049. A tuple of (next_id, event_to_received_ts), where `next_id`
  1050. is the next value to pass as `from_id` (it will either be the
  1051. stream_ordering of the last returned event, or, if fewer than `limit`
  1052. events were found, the `current_id`). The `event_to_received_ts` is
  1053. a dictionary mapping event ID to the event `received_ts`, sorted by ascending
  1054. stream_ordering.
  1055. """
  1056. def get_all_new_event_ids_stream_txn(
  1057. txn: LoggingTransaction,
  1058. ) -> Tuple[int, Dict[str, Optional[int]]]:
  1059. sql = (
  1060. "SELECT e.stream_ordering, e.event_id, e.received_ts"
  1061. " FROM events AS e"
  1062. " WHERE"
  1063. " ? < e.stream_ordering AND e.stream_ordering <= ?"
  1064. " ORDER BY e.stream_ordering ASC"
  1065. " LIMIT ?"
  1066. )
  1067. txn.execute(sql, (from_id, current_id, limit))
  1068. rows = txn.fetchall()
  1069. upper_bound = current_id
  1070. if len(rows) == limit:
  1071. upper_bound = rows[-1][0]
  1072. event_to_received_ts: Dict[str, Optional[int]] = {
  1073. row[1]: row[2] for row in rows
  1074. }
  1075. return upper_bound, event_to_received_ts
  1076. upper_bound, event_to_received_ts = await self.db_pool.runInteraction(
  1077. "get_all_new_event_ids_stream", get_all_new_event_ids_stream_txn
  1078. )
  1079. return upper_bound, event_to_received_ts
  1080. async def get_federation_out_pos(self, typ: str) -> int:
  1081. if self._need_to_reset_federation_stream_positions:
  1082. await self.db_pool.runInteraction(
  1083. "_reset_federation_positions_txn", self._reset_federation_positions_txn
  1084. )
  1085. self._need_to_reset_federation_stream_positions = False
  1086. return await self.db_pool.simple_select_one_onecol(
  1087. table="federation_stream_position",
  1088. retcol="stream_id",
  1089. keyvalues={"type": typ, "instance_name": self._instance_name},
  1090. desc="get_federation_out_pos",
  1091. )
  1092. async def update_federation_out_pos(self, typ: str, stream_id: int) -> None:
  1093. if self._need_to_reset_federation_stream_positions:
  1094. await self.db_pool.runInteraction(
  1095. "_reset_federation_positions_txn", self._reset_federation_positions_txn
  1096. )
  1097. self._need_to_reset_federation_stream_positions = False
  1098. await self.db_pool.simple_update_one(
  1099. table="federation_stream_position",
  1100. keyvalues={"type": typ, "instance_name": self._instance_name},
  1101. updatevalues={"stream_id": stream_id},
  1102. desc="update_federation_out_pos",
  1103. )
  1104. def _reset_federation_positions_txn(self, txn: LoggingTransaction) -> None:
  1105. """Fiddles with the `federation_stream_position` table to make it match
  1106. the configured federation sender instances during start up.
  1107. """
  1108. # The federation sender instances may have changed, so we need to
  1109. # massage the `federation_stream_position` table to have a row per type
  1110. # per instance sending federation. If there is a mismatch we update the
  1111. # table with the correct rows using the *minimum* stream ID seen. This
  1112. # may result in resending of events/EDUs to remote servers, but that is
  1113. # preferable to dropping them.
  1114. if not self._send_federation:
  1115. return
  1116. # Pull out the configured instances. If we don't have a shard config then
  1117. # we assume that we're the only instance sending.
  1118. configured_instances = self._federation_shard_config.instances
  1119. if not configured_instances:
  1120. configured_instances = [self._instance_name]
  1121. elif self._instance_name not in configured_instances:
  1122. return
  1123. instances_in_table = self.db_pool.simple_select_onecol_txn(
  1124. txn,
  1125. table="federation_stream_position",
  1126. keyvalues={},
  1127. retcol="instance_name",
  1128. )
  1129. if set(instances_in_table) == set(configured_instances):
  1130. # Nothing to do
  1131. return
  1132. sql = """
  1133. SELECT type, MIN(stream_id) FROM federation_stream_position
  1134. GROUP BY type
  1135. """
  1136. txn.execute(sql)
  1137. min_positions = {typ: pos for typ, pos in txn} # Map from type -> min position
  1138. # Ensure we do actually have some values here
  1139. assert set(min_positions) == {"federation", "events"}
  1140. sql = """
  1141. DELETE FROM federation_stream_position
  1142. WHERE NOT (%s)
  1143. """
  1144. clause, args = make_in_list_sql_clause(
  1145. txn.database_engine, "instance_name", configured_instances
  1146. )
  1147. txn.execute(sql % (clause,), args)
  1148. for typ, stream_id in min_positions.items():
  1149. self.db_pool.simple_upsert_txn(
  1150. txn,
  1151. table="federation_stream_position",
  1152. keyvalues={"type": typ, "instance_name": self._instance_name},
  1153. values={"stream_id": stream_id},
  1154. )
  1155. def has_room_changed_since(self, room_id: str, stream_id: int) -> bool:
  1156. return self._events_stream_cache.has_entity_changed(room_id, stream_id)
  1157. def _paginate_room_events_txn(
  1158. self,
  1159. txn: LoggingTransaction,
  1160. room_id: str,
  1161. from_token: RoomStreamToken,
  1162. to_token: Optional[RoomStreamToken] = None,
  1163. direction: Direction = Direction.BACKWARDS,
  1164. limit: int = -1,
  1165. event_filter: Optional[Filter] = None,
  1166. ) -> Tuple[List[_EventDictReturn], RoomStreamToken]:
  1167. """Returns list of events before or after a given token.
  1168. Args:
  1169. txn
  1170. room_id
  1171. from_token: The token used to stream from
  1172. to_token: A token which if given limits the results to only those before
  1173. direction: Indicates whether we are paginating forwards or backwards
  1174. from `from_key`.
  1175. limit: The maximum number of events to return.
  1176. event_filter: If provided filters the events to
  1177. those that match the filter.
  1178. Returns:
  1179. A list of _EventDictReturn and a token that points to the end of the
  1180. result set. If no events are returned then the end of the stream has
  1181. been reached (i.e. there are no events between `from_token` and
  1182. `to_token`), or `limit` is zero.
  1183. """
  1184. args = [False, room_id]
  1185. order, from_bound, to_bound = generate_pagination_bounds(
  1186. direction, from_token, to_token
  1187. )
  1188. bounds = generate_pagination_where_clause(
  1189. direction=direction,
  1190. column_names=("event.topological_ordering", "event.stream_ordering"),
  1191. from_token=from_bound,
  1192. to_token=to_bound,
  1193. engine=self.database_engine,
  1194. )
  1195. filter_clause, filter_args = filter_to_clause(event_filter)
  1196. if filter_clause:
  1197. bounds += " AND " + filter_clause
  1198. args.extend(filter_args)
  1199. # We fetch more events as we'll filter the result set
  1200. args.append(int(limit) * 2)
  1201. select_keywords = "SELECT"
  1202. join_clause = ""
  1203. # Using DISTINCT in this SELECT query is quite expensive, because it
  1204. # requires the engine to sort on the entire (not limited) result set,
  1205. # i.e. the entire events table. Only use it in scenarios that could result
  1206. # in the same event ID occurring multiple times in the results.
  1207. needs_distinct = False
  1208. if event_filter and event_filter.labels:
  1209. # If we're not filtering on a label, then joining on event_labels will
  1210. # return as many row for a single event as the number of labels it has. To
  1211. # avoid this, only join if we're filtering on at least one label.
  1212. join_clause += """
  1213. LEFT JOIN event_labels
  1214. USING (event_id, room_id, topological_ordering)
  1215. """
  1216. if len(event_filter.labels) > 1:
  1217. # Multiple labels could cause the same event to appear multiple times.
  1218. needs_distinct = True
  1219. # If there is a relation_senders and relation_types filter join to the
  1220. # relations table to get events related to the current event.
  1221. if event_filter and (
  1222. event_filter.related_by_senders or event_filter.related_by_rel_types
  1223. ):
  1224. # Filtering by relations could cause the same event to appear multiple
  1225. # times (since there's no limit on the number of relations to an event).
  1226. needs_distinct = True
  1227. join_clause += """
  1228. LEFT JOIN event_relations AS relation ON (event.event_id = relation.relates_to_id)
  1229. """
  1230. if event_filter.related_by_senders:
  1231. join_clause += """
  1232. LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id)
  1233. """
  1234. # If there is a not_rel_types filter join to the relations table to get
  1235. # the event's relation information.
  1236. if event_filter and (event_filter.rel_types or event_filter.not_rel_types):
  1237. join_clause += """
  1238. LEFT JOIN event_relations AS event_relation USING (event_id)
  1239. """
  1240. if needs_distinct:
  1241. select_keywords += " DISTINCT"
  1242. sql = """
  1243. %(select_keywords)s
  1244. event.event_id, event.instance_name,
  1245. event.topological_ordering, event.stream_ordering
  1246. FROM events AS event
  1247. %(join_clause)s
  1248. WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s
  1249. ORDER BY event.topological_ordering %(order)s,
  1250. event.stream_ordering %(order)s LIMIT ?
  1251. """ % {
  1252. "select_keywords": select_keywords,
  1253. "join_clause": join_clause,
  1254. "bounds": bounds,
  1255. "order": order,
  1256. }
  1257. txn.execute(sql, args)
  1258. # Filter the result set.
  1259. rows = [
  1260. _EventDictReturn(event_id, topological_ordering, stream_ordering)
  1261. for event_id, instance_name, topological_ordering, stream_ordering in txn
  1262. if _filter_results(
  1263. lower_token=to_token
  1264. if direction == Direction.BACKWARDS
  1265. else from_token,
  1266. upper_token=from_token
  1267. if direction == Direction.BACKWARDS
  1268. else to_token,
  1269. instance_name=instance_name,
  1270. topological_ordering=topological_ordering,
  1271. stream_ordering=stream_ordering,
  1272. )
  1273. ][:limit]
  1274. if rows:
  1275. assert rows[-1].topological_ordering is not None
  1276. next_token = generate_next_token(
  1277. direction, rows[-1].topological_ordering, rows[-1].stream_ordering
  1278. )
  1279. else:
  1280. # TODO (erikj): We should work out what to do here instead.
  1281. next_token = to_token if to_token else from_token
  1282. return rows, next_token
  1283. @trace
  1284. async def paginate_room_events(
  1285. self,
  1286. room_id: str,
  1287. from_key: RoomStreamToken,
  1288. to_key: Optional[RoomStreamToken] = None,
  1289. direction: Direction = Direction.BACKWARDS,
  1290. limit: int = -1,
  1291. event_filter: Optional[Filter] = None,
  1292. ) -> Tuple[List[EventBase], RoomStreamToken]:
  1293. """Returns list of events before or after a given token.
  1294. Args:
  1295. room_id
  1296. from_key: The token used to stream from
  1297. to_key: A token which if given limits the results to only those before
  1298. direction: Indicates whether we are paginating forwards or backwards
  1299. from `from_key`.
  1300. limit: The maximum number of events to return.
  1301. event_filter: If provided filters the events to those that match the filter.
  1302. Returns:
  1303. The results as a list of events and a token that points to the end
  1304. of the result set. If no events are returned then the end of the
  1305. stream has been reached (i.e. there are no events between `from_key`
  1306. and `to_key`).
  1307. """
  1308. rows, token = await self.db_pool.runInteraction(
  1309. "paginate_room_events",
  1310. self._paginate_room_events_txn,
  1311. room_id,
  1312. from_key,
  1313. to_key,
  1314. direction,
  1315. limit,
  1316. event_filter,
  1317. )
  1318. events = await self.get_events_as_list(
  1319. [r.event_id for r in rows], get_prev_content=True
  1320. )
  1321. self._set_before_and_after(events, rows)
  1322. return events, token
  1323. @cached()
  1324. async def get_id_for_instance(self, instance_name: str) -> int:
  1325. """Get a unique, immutable ID that corresponds to the given Synapse worker instance."""
  1326. def _get_id_for_instance_txn(txn: LoggingTransaction) -> int:
  1327. instance_id = self.db_pool.simple_select_one_onecol_txn(
  1328. txn,
  1329. table="instance_map",
  1330. keyvalues={"instance_name": instance_name},
  1331. retcol="instance_id",
  1332. allow_none=True,
  1333. )
  1334. if instance_id is not None:
  1335. return instance_id
  1336. # If we don't have an entry upsert one.
  1337. #
  1338. # We could do this before the first check, and rely on the cache for
  1339. # efficiency, but each UPSERT causes the next ID to increment which
  1340. # can quickly bloat the size of the generated IDs for new instances.
  1341. self.db_pool.simple_upsert_txn(
  1342. txn,
  1343. table="instance_map",
  1344. keyvalues={"instance_name": instance_name},
  1345. values={},
  1346. )
  1347. return self.db_pool.simple_select_one_onecol_txn(
  1348. txn,
  1349. table="instance_map",
  1350. keyvalues={"instance_name": instance_name},
  1351. retcol="instance_id",
  1352. )
  1353. return await self.db_pool.runInteraction(
  1354. "get_id_for_instance", _get_id_for_instance_txn
  1355. )
  1356. @cached()
  1357. async def get_name_from_instance_id(self, instance_id: int) -> str:
  1358. """Get the instance name from an ID previously returned by
  1359. `get_id_for_instance`.
  1360. """
  1361. return await self.db_pool.simple_select_one_onecol(
  1362. table="instance_map",
  1363. keyvalues={"instance_id": instance_id},
  1364. retcol="instance_name",
  1365. desc="get_name_from_instance_id",
  1366. )