Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.
 
 
 
 
 
 

2252 rader
81 KiB

  1. # Copyright 2016 OpenMarket Ltd
  2. # Copyright 2019 New Vector Ltd
  3. # Copyright 2019,2020 The Matrix.org Foundation C.I.C.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from typing import (
  18. TYPE_CHECKING,
  19. Any,
  20. Collection,
  21. Dict,
  22. Iterable,
  23. List,
  24. Mapping,
  25. Optional,
  26. Set,
  27. Tuple,
  28. cast,
  29. )
  30. from typing_extensions import Literal
  31. from synapse.api.constants import EduTypes
  32. from synapse.api.errors import Codes, StoreError
  33. from synapse.logging.opentracing import (
  34. get_active_span_text_map,
  35. set_tag,
  36. trace,
  37. whitelisted_homeserver,
  38. )
  39. from synapse.metrics.background_process_metrics import wrap_as_background_process
  40. from synapse.replication.tcp.streams._base import DeviceListsStream
  41. from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
  42. from synapse.storage.database import (
  43. DatabasePool,
  44. LoggingDatabaseConnection,
  45. LoggingTransaction,
  46. make_tuple_comparison_clause,
  47. )
  48. from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
  49. from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
  50. from synapse.storage.types import Cursor
  51. from synapse.storage.util.id_generators import (
  52. AbstractStreamIdGenerator,
  53. StreamIdGenerator,
  54. )
  55. from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key
  56. from synapse.util import json_decoder, json_encoder
  57. from synapse.util.caches.descriptors import cached, cachedList
  58. from synapse.util.caches.lrucache import LruCache
  59. from synapse.util.caches.stream_change_cache import (
  60. AllEntitiesChangedResult,
  61. StreamChangeCache,
  62. )
  63. from synapse.util.cancellation import cancellable
  64. from synapse.util.iterutils import batch_iter
  65. from synapse.util.stringutils import shortstr
  66. if TYPE_CHECKING:
  67. from synapse.server import HomeServer
  68. logger = logging.getLogger(__name__)
  69. issue_8631_logger = logging.getLogger("synapse.8631_debug")
  70. DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
  71. "drop_device_list_streams_non_unique_indexes"
  72. )
  73. BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
  74. class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
  75. def __init__(
  76. self,
  77. database: DatabasePool,
  78. db_conn: LoggingDatabaseConnection,
  79. hs: "HomeServer",
  80. ):
  81. super().__init__(database, db_conn, hs)
  82. # In the worker store this is an ID tracker which we overwrite in the non-worker
  83. # class below that is used on the main process.
  84. self._device_list_id_gen = StreamIdGenerator(
  85. db_conn,
  86. hs.get_replication_notifier(),
  87. "device_lists_stream",
  88. "stream_id",
  89. extra_tables=[
  90. ("user_signature_stream", "stream_id"),
  91. ("device_lists_outbound_pokes", "stream_id"),
  92. ("device_lists_changes_in_room", "stream_id"),
  93. ("device_lists_remote_pending", "stream_id"),
  94. ("device_lists_changes_converted_stream_position", "stream_id"),
  95. ],
  96. is_writer=hs.config.worker.worker_app is None,
  97. )
  98. device_list_max = self._device_list_id_gen.get_current_token()
  99. device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
  100. db_conn,
  101. "device_lists_stream",
  102. entity_column="user_id",
  103. stream_column="stream_id",
  104. max_value=device_list_max,
  105. limit=10000,
  106. )
  107. self._device_list_stream_cache = StreamChangeCache(
  108. "DeviceListStreamChangeCache",
  109. min_device_list_id,
  110. prefilled_cache=device_list_prefill,
  111. )
  112. (
  113. user_signature_stream_prefill,
  114. user_signature_stream_list_id,
  115. ) = self.db_pool.get_cache_dict(
  116. db_conn,
  117. "user_signature_stream",
  118. entity_column="from_user_id",
  119. stream_column="stream_id",
  120. max_value=device_list_max,
  121. limit=1000,
  122. )
  123. self._user_signature_stream_cache = StreamChangeCache(
  124. "UserSignatureStreamChangeCache",
  125. user_signature_stream_list_id,
  126. prefilled_cache=user_signature_stream_prefill,
  127. )
  128. (
  129. device_list_federation_prefill,
  130. device_list_federation_list_id,
  131. ) = self.db_pool.get_cache_dict(
  132. db_conn,
  133. "device_lists_outbound_pokes",
  134. entity_column="destination",
  135. stream_column="stream_id",
  136. max_value=device_list_max,
  137. limit=10000,
  138. )
  139. self._device_list_federation_stream_cache = StreamChangeCache(
  140. "DeviceListFederationStreamChangeCache",
  141. device_list_federation_list_id,
  142. prefilled_cache=device_list_federation_prefill,
  143. )
  144. if hs.config.worker.run_background_tasks:
  145. self._clock.looping_call(
  146. self._prune_old_outbound_device_pokes, 60 * 60 * 1000
  147. )
  148. def process_replication_rows(
  149. self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
  150. ) -> None:
  151. if stream_name == DeviceListsStream.NAME:
  152. self._invalidate_caches_for_devices(token, rows)
  153. return super().process_replication_rows(stream_name, instance_name, token, rows)
  154. def process_replication_position(
  155. self, stream_name: str, instance_name: str, token: int
  156. ) -> None:
  157. if stream_name == DeviceListsStream.NAME:
  158. self._device_list_id_gen.advance(instance_name, token)
  159. super().process_replication_position(stream_name, instance_name, token)
  160. def _invalidate_caches_for_devices(
  161. self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow]
  162. ) -> None:
  163. for row in rows:
  164. if row.is_signature:
  165. self._user_signature_stream_cache.entity_has_changed(row.entity, token)
  166. continue
  167. # The entities are either user IDs (starting with '@') whose devices
  168. # have changed, or remote servers that we need to tell about
  169. # changes.
  170. if row.entity.startswith("@"):
  171. self._device_list_stream_cache.entity_has_changed(row.entity, token)
  172. self.get_cached_devices_for_user.invalidate((row.entity,))
  173. self._get_cached_user_device.invalidate((row.entity,))
  174. self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
  175. else:
  176. self._device_list_federation_stream_cache.entity_has_changed(
  177. row.entity, token
  178. )
  179. def get_device_stream_token(self) -> int:
  180. return self._device_list_id_gen.get_current_token()
  181. async def count_devices_by_users(
  182. self, user_ids: Optional[Collection[str]] = None
  183. ) -> int:
  184. """Retrieve number of all devices of given users.
  185. Only returns number of devices that are not marked as hidden.
  186. Args:
  187. user_ids: The IDs of the users which owns devices
  188. Returns:
  189. Number of devices of this users.
  190. """
  191. def count_devices_by_users_txn(
  192. txn: LoggingTransaction, user_ids: Collection[str]
  193. ) -> int:
  194. sql = """
  195. SELECT count(*)
  196. FROM devices
  197. WHERE
  198. hidden = '0' AND
  199. """
  200. clause, args = make_in_list_sql_clause(
  201. txn.database_engine, "user_id", user_ids
  202. )
  203. txn.execute(sql + clause, args)
  204. return cast(Tuple[int], txn.fetchone())[0]
  205. if not user_ids:
  206. return 0
  207. return await self.db_pool.runInteraction(
  208. "count_devices_by_users", count_devices_by_users_txn, user_ids
  209. )
  210. async def get_device(
  211. self, user_id: str, device_id: str
  212. ) -> Optional[Dict[str, Any]]:
  213. """Retrieve a device. Only returns devices that are not marked as
  214. hidden.
  215. Args:
  216. user_id: The ID of the user which owns the device
  217. device_id: The ID of the device to retrieve
  218. Returns:
  219. A dict containing the device information, or `None` if the device does not
  220. exist.
  221. """
  222. return await self.db_pool.simple_select_one(
  223. table="devices",
  224. keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
  225. retcols=("user_id", "device_id", "display_name"),
  226. desc="get_device",
  227. allow_none=True,
  228. )
  229. async def get_device_opt(
  230. self, user_id: str, device_id: str
  231. ) -> Optional[Dict[str, Any]]:
  232. """Retrieve a device. Only returns devices that are not marked as
  233. hidden.
  234. Args:
  235. user_id: The ID of the user which owns the device
  236. device_id: The ID of the device to retrieve
  237. Returns:
  238. A dict containing the device information, or None if the device does not exist.
  239. """
  240. return await self.db_pool.simple_select_one(
  241. table="devices",
  242. keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
  243. retcols=("user_id", "device_id", "display_name"),
  244. desc="get_device",
  245. allow_none=True,
  246. )
  247. async def get_devices_by_user(self, user_id: str) -> Dict[str, Dict[str, str]]:
  248. """Retrieve all of a user's registered devices. Only returns devices
  249. that are not marked as hidden.
  250. Args:
  251. user_id:
  252. Returns:
  253. A mapping from device_id to a dict containing "device_id", "user_id"
  254. and "display_name" for each device.
  255. """
  256. devices = await self.db_pool.simple_select_list(
  257. table="devices",
  258. keyvalues={"user_id": user_id, "hidden": False},
  259. retcols=("user_id", "device_id", "display_name"),
  260. desc="get_devices_by_user",
  261. )
  262. return {d["device_id"]: d for d in devices}
  263. async def get_devices_by_auth_provider_session_id(
  264. self, auth_provider_id: str, auth_provider_session_id: str
  265. ) -> List[Dict[str, Any]]:
  266. """Retrieve the list of devices associated with a SSO IdP session ID.
  267. Args:
  268. auth_provider_id: The SSO IdP ID as defined in the server config
  269. auth_provider_session_id: The session ID within the IdP
  270. Returns:
  271. A list of dicts containing the device_id and the user_id of each device
  272. """
  273. return await self.db_pool.simple_select_list(
  274. table="device_auth_providers",
  275. keyvalues={
  276. "auth_provider_id": auth_provider_id,
  277. "auth_provider_session_id": auth_provider_session_id,
  278. },
  279. retcols=("user_id", "device_id"),
  280. desc="get_devices_by_auth_provider_session_id",
  281. )
  282. @trace
  283. async def get_device_updates_by_remote(
  284. self, destination: str, from_stream_id: int, limit: int
  285. ) -> Tuple[int, List[Tuple[str, JsonDict]]]:
  286. """Get a stream of device updates to send to the given remote server.
  287. Args:
  288. destination: The host the device updates are intended for
  289. from_stream_id: The minimum stream_id to filter updates by, exclusive
  290. limit: Maximum number of device updates to return
  291. Returns:
  292. - The current stream id (i.e. the stream id of the last update included
  293. in the response); and
  294. - The list of updates, where each update is a pair of EDU type and
  295. EDU contents.
  296. """
  297. now_stream_id = self.get_device_stream_token()
  298. has_changed = self._device_list_federation_stream_cache.has_entity_changed(
  299. destination, int(from_stream_id)
  300. )
  301. if not has_changed:
  302. # debugging for https://github.com/matrix-org/synapse/issues/14251
  303. issue_8631_logger.debug(
  304. "%s: no change between %i and %i",
  305. destination,
  306. from_stream_id,
  307. now_stream_id,
  308. )
  309. return now_stream_id, []
  310. updates = await self.db_pool.runInteraction(
  311. "get_device_updates_by_remote",
  312. self._get_device_updates_by_remote_txn,
  313. destination,
  314. from_stream_id,
  315. now_stream_id,
  316. limit,
  317. )
  318. # We need to ensure `updates` doesn't grow too big.
  319. # Currently: `len(updates) <= limit`.
  320. # Return an empty list if there are no updates
  321. if not updates:
  322. return now_stream_id, []
  323. if issue_8631_logger.isEnabledFor(logging.DEBUG):
  324. data = {(user, device): stream_id for user, device, stream_id, _ in updates}
  325. issue_8631_logger.debug(
  326. "device updates need to be sent to %s: %s", destination, data
  327. )
  328. # get the cross-signing keys of the users in the list, so that we can
  329. # determine which of the device changes were cross-signing keys
  330. users = {r[0] for r in updates}
  331. master_key_by_user = {}
  332. self_signing_key_by_user = {}
  333. for user in users:
  334. cross_signing_key = await self.get_e2e_cross_signing_key(user, "master")
  335. if cross_signing_key:
  336. key_id, verify_key = get_verify_key_from_cross_signing_key(
  337. cross_signing_key
  338. )
  339. # verify_key is a VerifyKey from signedjson, which uses
  340. # .version to denote the portion of the key ID after the
  341. # algorithm and colon, which is the device ID
  342. master_key_by_user[user] = {
  343. "key_info": cross_signing_key,
  344. "device_id": verify_key.version,
  345. }
  346. cross_signing_key = await self.get_e2e_cross_signing_key(
  347. user, "self_signing"
  348. )
  349. if cross_signing_key:
  350. key_id, verify_key = get_verify_key_from_cross_signing_key(
  351. cross_signing_key
  352. )
  353. self_signing_key_by_user[user] = {
  354. "key_info": cross_signing_key,
  355. "device_id": verify_key.version,
  356. }
  357. # Perform the equivalent of a GROUP BY
  358. #
  359. # Iterate through the updates list and copy non-duplicate
  360. # (user_id, device_id) entries into a map, with the value being
  361. # the max stream_id across each set of duplicate entries
  362. #
  363. # maps (user_id, device_id) -> (stream_id, opentracing_context)
  364. #
  365. # opentracing_context contains the opentracing metadata for the request
  366. # that created the poke
  367. #
  368. # The most recent request's opentracing_context is used as the
  369. # context which created the Edu.
  370. # This is the stream ID that we will return for the consumer to resume
  371. # following this stream later.
  372. last_processed_stream_id = from_stream_id
  373. # A map of (user ID, device ID) to (stream ID, context).
  374. query_map: Dict[Tuple[str, str], Tuple[int, Optional[str]]] = {}
  375. cross_signing_keys_by_user: Dict[str, Dict[str, object]] = {}
  376. for user_id, device_id, update_stream_id, update_context in updates:
  377. # Calculate the remaining length budget.
  378. # Note that, for now, each entry in `cross_signing_keys_by_user`
  379. # gives rise to two device updates in the result, so those cost twice
  380. # as much (and are the whole reason we need to separately calculate
  381. # the budget; we know len(updates) <= limit otherwise!)
  382. # N.B. len() on dicts is cheap since they store their size.
  383. remaining_length_budget = limit - (
  384. len(query_map) + 2 * len(cross_signing_keys_by_user)
  385. )
  386. assert remaining_length_budget >= 0
  387. is_master_key_update = (
  388. user_id in master_key_by_user
  389. and device_id == master_key_by_user[user_id]["device_id"]
  390. )
  391. is_self_signing_key_update = (
  392. user_id in self_signing_key_by_user
  393. and device_id == self_signing_key_by_user[user_id]["device_id"]
  394. )
  395. is_cross_signing_key_update = (
  396. is_master_key_update or is_self_signing_key_update
  397. )
  398. if (
  399. is_cross_signing_key_update
  400. and user_id not in cross_signing_keys_by_user
  401. ):
  402. # This will give rise to 2 device updates.
  403. # If we don't have the budget, stop here!
  404. if remaining_length_budget < 2:
  405. break
  406. if is_master_key_update:
  407. result = cross_signing_keys_by_user.setdefault(user_id, {})
  408. result["master_key"] = master_key_by_user[user_id]["key_info"]
  409. elif is_self_signing_key_update:
  410. result = cross_signing_keys_by_user.setdefault(user_id, {})
  411. result["self_signing_key"] = self_signing_key_by_user[user_id][
  412. "key_info"
  413. ]
  414. else:
  415. key = (user_id, device_id)
  416. if key not in query_map and remaining_length_budget < 1:
  417. # We don't have space for a new entry
  418. break
  419. previous_update_stream_id, _ = query_map.get(key, (0, None))
  420. if update_stream_id > previous_update_stream_id:
  421. # FIXME If this overwrites an older update, this discards the
  422. # previous OpenTracing context.
  423. # It might make it harder to track down issues using OpenTracing.
  424. # If there's a good reason why it doesn't matter, a comment here
  425. # about that would not hurt.
  426. query_map[key] = (update_stream_id, update_context)
  427. # As this update has been added to the response, advance the stream
  428. # position.
  429. last_processed_stream_id = update_stream_id
  430. # In the worst case scenario, each update is for a distinct user and is
  431. # added either to the query_map or to cross_signing_keys_by_user,
  432. # but not both:
  433. # len(query_map) + len(cross_signing_keys_by_user) <= len(updates) here,
  434. # so len(query_map) + len(cross_signing_keys_by_user) <= limit.
  435. results = await self._get_device_update_edus_by_remote(
  436. destination, from_stream_id, query_map
  437. )
  438. # len(results) <= len(query_map) here,
  439. # so len(results) + len(cross_signing_keys_by_user) <= limit.
  440. # Add the updated cross-signing keys to the results list
  441. for user_id, result in cross_signing_keys_by_user.items():
  442. result["user_id"] = user_id
  443. results.append((EduTypes.SIGNING_KEY_UPDATE, result))
  444. # also send the unstable version
  445. # FIXME: remove this when enough servers have upgraded
  446. # and remove the length budgeting above.
  447. results.append(("org.matrix.signing_key_update", result))
  448. if issue_8631_logger.isEnabledFor(logging.DEBUG):
  449. for user_id, edu in results:
  450. issue_8631_logger.debug(
  451. "device update to %s for %s from %s to %s: %s",
  452. destination,
  453. user_id,
  454. from_stream_id,
  455. last_processed_stream_id,
  456. edu,
  457. )
  458. return last_processed_stream_id, results
  459. def _get_device_updates_by_remote_txn(
  460. self,
  461. txn: LoggingTransaction,
  462. destination: str,
  463. from_stream_id: int,
  464. now_stream_id: int,
  465. limit: int,
  466. ) -> List[Tuple[str, str, int, Optional[str]]]:
  467. """Return device update information for a given remote destination
  468. Args:
  469. txn: The transaction to execute
  470. destination: The host the device updates are intended for
  471. from_stream_id: The minimum stream_id to filter updates by, exclusive
  472. now_stream_id: The maximum stream_id to filter updates by, inclusive
  473. limit: Maximum number of device updates to return
  474. Returns:
  475. List of device update tuples:
  476. - user_id
  477. - device_id
  478. - stream_id
  479. - opentracing_context
  480. """
  481. # get the list of device updates that need to be sent
  482. sql = """
  483. SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
  484. WHERE destination = ? AND ? < stream_id AND stream_id <= ?
  485. ORDER BY stream_id
  486. LIMIT ?
  487. """
  488. txn.execute(sql, (destination, from_stream_id, now_stream_id, limit))
  489. return cast(List[Tuple[str, str, int, Optional[str]]], txn.fetchall())
  490. async def _get_device_update_edus_by_remote(
  491. self,
  492. destination: str,
  493. from_stream_id: int,
  494. query_map: Dict[Tuple[str, str], Tuple[int, Optional[str]]],
  495. ) -> List[Tuple[str, dict]]:
  496. """Returns a list of device update EDUs as well as E2EE keys
  497. Args:
  498. destination: The host the device updates are intended for
  499. from_stream_id: The minimum stream_id to filter updates by, exclusive
  500. query_map: Dictionary mapping (user_id, device_id) to
  501. (update stream_id, the relevant json-encoded opentracing context)
  502. Returns:
  503. List of objects representing a device update EDU.
  504. Postconditions:
  505. The returned list has a length not exceeding that of the query_map:
  506. len(result) <= len(query_map)
  507. """
  508. devices = (
  509. await self.get_e2e_device_keys_and_signatures(
  510. # Because these are (user_id, device_id) tuples with all
  511. # device_ids not being None, the returned list's length will not
  512. # exceed that of query_map.
  513. query_map.keys(),
  514. include_all_devices=True,
  515. include_deleted_devices=True,
  516. )
  517. if query_map
  518. else {}
  519. )
  520. results = []
  521. for user_id, user_devices in devices.items():
  522. # The prev_id for the first row is always the last row before
  523. # `from_stream_id`
  524. prev_id = await self._get_last_device_update_for_remote_user(
  525. destination, user_id, from_stream_id
  526. )
  527. # make sure we go through the devices in stream order
  528. device_ids = sorted(
  529. user_devices.keys(),
  530. key=lambda i: query_map[(user_id, i)][0],
  531. )
  532. for device_id in device_ids:
  533. device = user_devices[device_id]
  534. stream_id, opentracing_context = query_map[(user_id, device_id)]
  535. result = {
  536. "user_id": user_id,
  537. "device_id": device_id,
  538. "prev_id": [prev_id] if prev_id else [],
  539. "stream_id": stream_id,
  540. }
  541. if opentracing_context != "{}":
  542. result["org.matrix.opentracing_context"] = opentracing_context
  543. prev_id = stream_id
  544. if device is not None:
  545. keys = device.keys
  546. if keys:
  547. result["keys"] = keys
  548. device_display_name = None
  549. if (
  550. self.hs.config.federation.allow_device_name_lookup_over_federation
  551. ):
  552. device_display_name = device.display_name
  553. if device_display_name:
  554. result["device_display_name"] = device_display_name
  555. else:
  556. result["deleted"] = True
  557. results.append((EduTypes.DEVICE_LIST_UPDATE, result))
  558. return results
  559. async def _get_last_device_update_for_remote_user(
  560. self, destination: str, user_id: str, from_stream_id: int
  561. ) -> int:
  562. def f(txn: LoggingTransaction) -> int:
  563. prev_sent_id_sql = """
  564. SELECT coalesce(max(stream_id), 0) as stream_id
  565. FROM device_lists_outbound_last_success
  566. WHERE destination = ? AND user_id = ? AND stream_id <= ?
  567. """
  568. txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id))
  569. rows = txn.fetchall()
  570. return rows[0][0]
  571. return await self.db_pool.runInteraction(
  572. "get_last_device_update_for_remote_user", f
  573. )
  574. async def mark_as_sent_devices_by_remote(
  575. self, destination: str, stream_id: int
  576. ) -> None:
  577. """Mark that updates have successfully been sent to the destination."""
  578. await self.db_pool.runInteraction(
  579. "mark_as_sent_devices_by_remote",
  580. self._mark_as_sent_devices_by_remote_txn,
  581. destination,
  582. stream_id,
  583. )
  584. def _mark_as_sent_devices_by_remote_txn(
  585. self, txn: LoggingTransaction, destination: str, stream_id: int
  586. ) -> None:
  587. # We update the device_lists_outbound_last_success with the successfully
  588. # poked users.
  589. sql = """
  590. SELECT user_id, coalesce(max(o.stream_id), 0)
  591. FROM device_lists_outbound_pokes as o
  592. WHERE destination = ? AND o.stream_id <= ?
  593. GROUP BY user_id
  594. """
  595. txn.execute(sql, (destination, stream_id))
  596. rows = txn.fetchall()
  597. self.db_pool.simple_upsert_many_txn(
  598. txn=txn,
  599. table="device_lists_outbound_last_success",
  600. key_names=("destination", "user_id"),
  601. key_values=[(destination, user_id) for user_id, _ in rows],
  602. value_names=("stream_id",),
  603. value_values=((stream_id,) for _, stream_id in rows),
  604. )
  605. # Delete all sent outbound pokes
  606. sql = """
  607. DELETE FROM device_lists_outbound_pokes
  608. WHERE destination = ? AND stream_id <= ?
  609. """
  610. txn.execute(sql, (destination, stream_id))
  611. async def add_user_signature_change_to_streams(
  612. self, from_user_id: str, user_ids: List[str]
  613. ) -> int:
  614. """Persist that a user has made new signatures
  615. Args:
  616. from_user_id: the user who made the signatures
  617. user_ids: the users who were signed
  618. Returns:
  619. The new stream ID.
  620. """
  621. async with self._device_list_id_gen.get_next() as stream_id:
  622. await self.db_pool.runInteraction(
  623. "add_user_sig_change_to_streams",
  624. self._add_user_signature_change_txn,
  625. from_user_id,
  626. user_ids,
  627. stream_id,
  628. )
  629. return stream_id
  630. def _add_user_signature_change_txn(
  631. self,
  632. txn: LoggingTransaction,
  633. from_user_id: str,
  634. user_ids: List[str],
  635. stream_id: int,
  636. ) -> None:
  637. txn.call_after(
  638. self._user_signature_stream_cache.entity_has_changed,
  639. from_user_id,
  640. stream_id,
  641. )
  642. self.db_pool.simple_insert_txn(
  643. txn,
  644. "user_signature_stream",
  645. values={
  646. "stream_id": stream_id,
  647. "from_user_id": from_user_id,
  648. "user_ids": json_encoder.encode(user_ids),
  649. },
  650. )
  651. @trace
  652. @cancellable
  653. async def get_user_devices_from_cache(
  654. self, user_ids: Set[str], user_and_device_ids: List[Tuple[str, str]]
  655. ) -> Tuple[Set[str], Dict[str, Mapping[str, JsonDict]]]:
  656. """Get the devices (and keys if any) for remote users from the cache.
  657. Args:
  658. user_ids: users which should have all device IDs returned
  659. user_and_device_ids: List of (user_id, device_ids)
  660. Returns:
  661. A tuple of (user_ids_not_in_cache, results_map), where
  662. user_ids_not_in_cache is a set of user_ids and results_map is a
  663. mapping of user_id -> device_id -> device_info.
  664. """
  665. unique_user_ids = user_ids | {user_id for user_id, _ in user_and_device_ids}
  666. user_map = await self.get_device_list_last_stream_id_for_remotes(
  667. list(unique_user_ids)
  668. )
  669. # We go and check if any of the users need to have their device lists
  670. # resynced. If they do then we remove them from the cached list.
  671. users_needing_resync = await self.get_user_ids_requiring_device_list_resync(
  672. unique_user_ids
  673. )
  674. user_ids_in_cache = {
  675. user_id for user_id, stream_id in user_map.items() if stream_id
  676. } - users_needing_resync
  677. user_ids_not_in_cache = unique_user_ids - user_ids_in_cache
  678. # First fetch all the users which all devices are to be returned.
  679. results: Dict[str, Mapping[str, JsonDict]] = {}
  680. for user_id in user_ids:
  681. if user_id in user_ids_in_cache:
  682. results[user_id] = await self.get_cached_devices_for_user(user_id)
  683. # Then fetch all device-specific requests, but skip users we've already
  684. # fetched all devices for.
  685. device_specific_results: Dict[str, Dict[str, JsonDict]] = {}
  686. for user_id, device_id in user_and_device_ids:
  687. if user_id in user_ids_in_cache and user_id not in user_ids:
  688. device = await self._get_cached_user_device(user_id, device_id)
  689. device_specific_results.setdefault(user_id, {})[device_id] = device
  690. results.update(device_specific_results)
  691. set_tag("in_cache", str(results))
  692. set_tag("not_in_cache", str(user_ids_not_in_cache))
  693. return user_ids_not_in_cache, results
  694. @cached(num_args=2, tree=True)
  695. async def _get_cached_user_device(self, user_id: str, device_id: str) -> JsonDict:
  696. content = await self.db_pool.simple_select_one_onecol(
  697. table="device_lists_remote_cache",
  698. keyvalues={"user_id": user_id, "device_id": device_id},
  699. retcol="content",
  700. desc="_get_cached_user_device",
  701. )
  702. return db_to_json(content)
  703. @cached()
  704. async def get_cached_devices_for_user(self, user_id: str) -> Mapping[str, JsonDict]:
  705. devices = await self.db_pool.simple_select_list(
  706. table="device_lists_remote_cache",
  707. keyvalues={"user_id": user_id},
  708. retcols=("device_id", "content"),
  709. desc="get_cached_devices_for_user",
  710. )
  711. return {
  712. device["device_id"]: db_to_json(device["content"]) for device in devices
  713. }
  714. def get_cached_device_list_changes(
  715. self,
  716. from_key: int,
  717. ) -> AllEntitiesChangedResult:
  718. """Get set of users whose devices have changed since `from_key`, or None
  719. if that information is not in our cache.
  720. """
  721. return self._device_list_stream_cache.get_all_entities_changed(from_key)
  722. @cancellable
  723. async def get_all_devices_changed(
  724. self,
  725. from_key: int,
  726. to_key: int,
  727. ) -> Set[str]:
  728. """Get all users whose devices have changed in the given range.
  729. Args:
  730. from_key: The minimum device lists stream token to query device list
  731. changes for, exclusive.
  732. to_key: The maximum device lists stream token to query device list
  733. changes for, inclusive.
  734. Returns:
  735. The set of user_ids whose devices have changed since `from_key`
  736. (exclusive) until `to_key` (inclusive).
  737. """
  738. result = self._device_list_stream_cache.get_all_entities_changed(from_key)
  739. if result.hit:
  740. # We know which users might have changed devices.
  741. if not result.entities:
  742. # If no users then we can return early.
  743. return set()
  744. # Otherwise we need to filter down the list
  745. return await self.get_users_whose_devices_changed(
  746. from_key, result.entities, to_key
  747. )
  748. # If the cache didn't tell us anything, we just need to query the full
  749. # range.
  750. sql = """
  751. SELECT DISTINCT user_id FROM device_lists_stream
  752. WHERE ? < stream_id AND stream_id <= ?
  753. """
  754. rows = await self.db_pool.execute(
  755. "get_all_devices_changed",
  756. None,
  757. sql,
  758. from_key,
  759. to_key,
  760. )
  761. return {u for u, in rows}
  762. @cancellable
  763. async def get_users_whose_devices_changed(
  764. self,
  765. from_key: int,
  766. user_ids: Collection[str],
  767. to_key: Optional[int] = None,
  768. ) -> Set[str]:
  769. """Get set of users whose devices have changed since `from_key` that
  770. are in the given list of user_ids.
  771. Args:
  772. from_key: The minimum device lists stream token to query device list changes for,
  773. exclusive.
  774. user_ids: If provided, only check if these users have changed their device lists.
  775. Otherwise changes from all users are returned.
  776. to_key: The maximum device lists stream token to query device list changes for,
  777. inclusive.
  778. Returns:
  779. The set of user_ids whose devices have changed since `from_key` (exclusive)
  780. until `to_key` (inclusive).
  781. """
  782. # Get set of users who *may* have changed. Users not in the returned
  783. # list have definitely not changed.
  784. user_ids_to_check = self._device_list_stream_cache.get_entities_changed(
  785. user_ids, from_key
  786. )
  787. # If an empty set was returned, there's nothing to do.
  788. if not user_ids_to_check:
  789. return set()
  790. if to_key is None:
  791. to_key = self._device_list_id_gen.get_current_token()
  792. def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
  793. sql = """
  794. SELECT DISTINCT user_id FROM device_lists_stream
  795. WHERE ? < stream_id AND stream_id <= ? AND %s
  796. """
  797. changes: Set[str] = set()
  798. # Query device changes with a batch of users at a time
  799. for chunk in batch_iter(user_ids_to_check, 100):
  800. clause, args = make_in_list_sql_clause(
  801. txn.database_engine, "user_id", chunk
  802. )
  803. txn.execute(sql % (clause,), [from_key, to_key] + args)
  804. changes.update(user_id for user_id, in txn)
  805. return changes
  806. return await self.db_pool.runInteraction(
  807. "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn
  808. )
  809. async def get_users_whose_signatures_changed(
  810. self, user_id: str, from_key: int
  811. ) -> Set[str]:
  812. """Get the users who have new cross-signing signatures made by `user_id` since
  813. `from_key`.
  814. Args:
  815. user_id: the user who made the signatures
  816. from_key: The device lists stream token
  817. Returns:
  818. A set of user IDs with updated signatures.
  819. """
  820. if self._user_signature_stream_cache.has_entity_changed(user_id, from_key):
  821. sql = """
  822. SELECT DISTINCT user_ids FROM user_signature_stream
  823. WHERE from_user_id = ? AND stream_id > ?
  824. """
  825. rows = await self.db_pool.execute(
  826. "get_users_whose_signatures_changed", None, sql, user_id, from_key
  827. )
  828. return {user for row in rows for user in db_to_json(row[0])}
  829. else:
  830. return set()
  831. async def get_all_device_list_changes_for_remotes(
  832. self, instance_name: str, last_id: int, current_id: int, limit: int
  833. ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
  834. """Get updates for device lists replication stream.
  835. Args:
  836. instance_name: The writer we want to fetch updates from. Unused
  837. here since there is only ever one writer.
  838. last_id: The token to fetch updates from. Exclusive.
  839. current_id: The token to fetch updates up to. Inclusive.
  840. limit: The requested limit for the number of rows to return. The
  841. function may return more or fewer rows.
  842. Returns:
  843. A tuple consisting of: the updates, a token to use to fetch
  844. subsequent updates, and whether we returned fewer rows than exists
  845. between the requested tokens due to the limit.
  846. The token returned can be used in a subsequent call to this
  847. function to get further updates.
  848. The updates are a list of 2-tuples of stream ID and the row data
  849. """
  850. if last_id == current_id:
  851. return [], current_id, False
  852. def _get_all_device_list_changes_for_remotes(
  853. txn: Cursor,
  854. ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
  855. # This query Does The Right Thing where it'll correctly apply the
  856. # bounds to the inner queries.
  857. sql = """
  858. SELECT stream_id, entity FROM (
  859. SELECT stream_id, user_id AS entity FROM device_lists_stream
  860. UNION ALL
  861. SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
  862. ) AS e
  863. WHERE ? < stream_id AND stream_id <= ?
  864. ORDER BY stream_id ASC
  865. LIMIT ?
  866. """
  867. txn.execute(sql, (last_id, current_id, limit))
  868. updates = [(row[0], row[1:]) for row in txn]
  869. limited = False
  870. upto_token = current_id
  871. if len(updates) >= limit:
  872. upto_token = updates[-1][0]
  873. limited = True
  874. return updates, upto_token, limited
  875. return await self.db_pool.runInteraction(
  876. "get_all_device_list_changes_for_remotes",
  877. _get_all_device_list_changes_for_remotes,
  878. )
  879. @cached(max_entries=10000)
  880. async def get_device_list_last_stream_id_for_remote(
  881. self, user_id: str
  882. ) -> Optional[str]:
  883. """Get the last stream_id we got for a user. May be None if we haven't
  884. got any information for them.
  885. """
  886. return await self.db_pool.simple_select_one_onecol(
  887. table="device_lists_remote_extremeties",
  888. keyvalues={"user_id": user_id},
  889. retcol="stream_id",
  890. desc="get_device_list_last_stream_id_for_remote",
  891. allow_none=True,
  892. )
  893. @cachedList(
  894. cached_method_name="get_device_list_last_stream_id_for_remote",
  895. list_name="user_ids",
  896. )
  897. async def get_device_list_last_stream_id_for_remotes(
  898. self, user_ids: Iterable[str]
  899. ) -> Dict[str, Optional[str]]:
  900. rows = await self.db_pool.simple_select_many_batch(
  901. table="device_lists_remote_extremeties",
  902. column="user_id",
  903. iterable=user_ids,
  904. retcols=("user_id", "stream_id"),
  905. desc="get_device_list_last_stream_id_for_remotes",
  906. )
  907. results: Dict[str, Optional[str]] = {user_id: None for user_id in user_ids}
  908. results.update({row["user_id"]: row["stream_id"] for row in rows})
  909. return results
  910. async def get_user_ids_requiring_device_list_resync(
  911. self,
  912. user_ids: Optional[Collection[str]] = None,
  913. ) -> Set[str]:
  914. """Given a list of remote users return the list of users that we
  915. should resync the device lists for. If None is given instead of a list,
  916. return every user that we should resync the device lists for.
  917. Returns:
  918. The IDs of users whose device lists need resync.
  919. """
  920. if user_ids:
  921. rows = await self.db_pool.simple_select_many_batch(
  922. table="device_lists_remote_resync",
  923. column="user_id",
  924. iterable=user_ids,
  925. retcols=("user_id",),
  926. desc="get_user_ids_requiring_device_list_resync_with_iterable",
  927. )
  928. else:
  929. rows = await self.db_pool.simple_select_list(
  930. table="device_lists_remote_resync",
  931. keyvalues=None,
  932. retcols=("user_id",),
  933. desc="get_user_ids_requiring_device_list_resync",
  934. )
  935. return {row["user_id"] for row in rows}
  936. async def mark_remote_users_device_caches_as_stale(
  937. self, user_ids: StrCollection
  938. ) -> None:
  939. """Records that the server has reason to believe the cache of the devices
  940. for the remote users is out of date.
  941. """
  942. def _mark_remote_users_device_caches_as_stale_txn(
  943. txn: LoggingTransaction,
  944. ) -> None:
  945. # TODO add insertion_values support to simple_upsert_many and use
  946. # that!
  947. for user_id in user_ids:
  948. self.db_pool.simple_upsert_txn(
  949. txn,
  950. table="device_lists_remote_resync",
  951. keyvalues={"user_id": user_id},
  952. values={},
  953. insertion_values={"added_ts": self._clock.time_msec()},
  954. )
  955. await self.db_pool.runInteraction(
  956. "mark_remote_users_device_caches_as_stale",
  957. _mark_remote_users_device_caches_as_stale_txn,
  958. )
  959. async def mark_remote_user_device_cache_as_valid(self, user_id: str) -> None:
  960. # Remove the database entry that says we need to resync devices, after a resync
  961. await self.db_pool.simple_delete(
  962. table="device_lists_remote_resync",
  963. keyvalues={"user_id": user_id},
  964. desc="mark_remote_user_device_cache_as_valid",
  965. )
  966. async def handle_potentially_left_users(self, user_ids: Set[str]) -> None:
  967. """Given a set of remote users check if the server still shares a room with
  968. them. If not then mark those users' device cache as stale.
  969. """
  970. if not user_ids:
  971. return
  972. await self.db_pool.runInteraction(
  973. "_handle_potentially_left_users",
  974. self.handle_potentially_left_users_txn,
  975. user_ids,
  976. )
  977. def handle_potentially_left_users_txn(
  978. self,
  979. txn: LoggingTransaction,
  980. user_ids: Set[str],
  981. ) -> None:
  982. """Given a set of remote users check if the server still shares a room with
  983. them. If not then mark those users' device cache as stale.
  984. """
  985. if not user_ids:
  986. return
  987. joined_users = self.get_users_server_still_shares_room_with_txn(txn, user_ids)
  988. left_users = user_ids - joined_users
  989. for user_id in left_users:
  990. self.mark_remote_user_device_list_as_unsubscribed_txn(txn, user_id)
  991. async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None:
  992. """Mark that we no longer track device lists for remote user."""
  993. await self.db_pool.runInteraction(
  994. "mark_remote_user_device_list_as_unsubscribed",
  995. self.mark_remote_user_device_list_as_unsubscribed_txn,
  996. user_id,
  997. )
  998. def mark_remote_user_device_list_as_unsubscribed_txn(
  999. self,
  1000. txn: LoggingTransaction,
  1001. user_id: str,
  1002. ) -> None:
  1003. self.db_pool.simple_delete_txn(
  1004. txn,
  1005. table="device_lists_remote_extremeties",
  1006. keyvalues={"user_id": user_id},
  1007. )
  1008. self._invalidate_cache_and_stream(
  1009. txn, self.get_device_list_last_stream_id_for_remote, (user_id,)
  1010. )
  1011. async def get_dehydrated_device(
  1012. self, user_id: str
  1013. ) -> Optional[Tuple[str, JsonDict]]:
  1014. """Retrieve the information for a dehydrated device.
  1015. Args:
  1016. user_id: the user whose dehydrated device we are looking for
  1017. Returns:
  1018. a tuple whose first item is the device ID, and the second item is
  1019. the dehydrated device information
  1020. """
  1021. # FIXME: make sure device ID still exists in devices table
  1022. row = await self.db_pool.simple_select_one(
  1023. table="dehydrated_devices",
  1024. keyvalues={"user_id": user_id},
  1025. retcols=["device_id", "device_data"],
  1026. allow_none=True,
  1027. )
  1028. return (
  1029. (row["device_id"], json_decoder.decode(row["device_data"])) if row else None
  1030. )
  1031. def _store_dehydrated_device_txn(
  1032. self, txn: LoggingTransaction, user_id: str, device_id: str, device_data: str
  1033. ) -> Optional[str]:
  1034. old_device_id = self.db_pool.simple_select_one_onecol_txn(
  1035. txn,
  1036. table="dehydrated_devices",
  1037. keyvalues={"user_id": user_id},
  1038. retcol="device_id",
  1039. allow_none=True,
  1040. )
  1041. self.db_pool.simple_upsert_txn(
  1042. txn,
  1043. table="dehydrated_devices",
  1044. keyvalues={"user_id": user_id},
  1045. values={"device_id": device_id, "device_data": device_data},
  1046. )
  1047. return old_device_id
  1048. async def store_dehydrated_device(
  1049. self, user_id: str, device_id: str, device_data: JsonDict
  1050. ) -> Optional[str]:
  1051. """Store a dehydrated device for a user.
  1052. Args:
  1053. user_id: the user that we are storing the device for
  1054. device_id: the ID of the dehydrated device
  1055. device_data: the dehydrated device information
  1056. Returns:
  1057. device id of the user's previous dehydrated device, if any
  1058. """
  1059. return await self.db_pool.runInteraction(
  1060. "store_dehydrated_device_txn",
  1061. self._store_dehydrated_device_txn,
  1062. user_id,
  1063. device_id,
  1064. json_encoder.encode(device_data),
  1065. )
  1066. async def remove_dehydrated_device(self, user_id: str, device_id: str) -> bool:
  1067. """Remove a dehydrated device.
  1068. Args:
  1069. user_id: the user that the dehydrated device belongs to
  1070. device_id: the ID of the dehydrated device
  1071. """
  1072. count = await self.db_pool.simple_delete(
  1073. "dehydrated_devices",
  1074. {"user_id": user_id, "device_id": device_id},
  1075. desc="remove_dehydrated_device",
  1076. )
  1077. return count >= 1
  1078. @wrap_as_background_process("prune_old_outbound_device_pokes")
  1079. async def _prune_old_outbound_device_pokes(
  1080. self, prune_age: int = 24 * 60 * 60 * 1000
  1081. ) -> None:
  1082. """Delete old entries out of the device_lists_outbound_pokes to ensure
  1083. that we don't fill up due to dead servers.
  1084. Normally, we try to send device updates as a delta since a previous known point:
  1085. this is done by setting the prev_id in the m.device_list_update EDU. However,
  1086. for that to work, we have to have a complete record of each change to
  1087. each device, which can add up to quite a lot of data.
  1088. An alternative mechanism is that, if the remote server sees that it has missed
  1089. an entry in the stream_id sequence for a given user, it will request a full
  1090. list of that user's devices. Hence, we can reduce the amount of data we have to
  1091. store (and transmit in some future transaction), by clearing almost everything
  1092. for a given destination out of the database, and having the remote server
  1093. resync.
  1094. All we need to do is make sure we keep at least one row for each
  1095. (user, destination) pair, to remind us to send a m.device_list_update EDU for
  1096. that user when the destination comes back. It doesn't matter which device
  1097. we keep.
  1098. """
  1099. yesterday = self._clock.time_msec() - prune_age
  1100. def _prune_txn(txn: LoggingTransaction) -> None:
  1101. # look for (user, destination) pairs which have an update older than
  1102. # the cutoff.
  1103. #
  1104. # For each pair, we also need to know the most recent stream_id, and
  1105. # an arbitrary device_id at that stream_id.
  1106. select_sql = """
  1107. SELECT
  1108. dlop1.destination,
  1109. dlop1.user_id,
  1110. MAX(dlop1.stream_id) AS stream_id,
  1111. (SELECT MIN(dlop2.device_id) AS device_id FROM
  1112. device_lists_outbound_pokes dlop2
  1113. WHERE dlop2.destination = dlop1.destination AND
  1114. dlop2.user_id=dlop1.user_id AND
  1115. dlop2.stream_id=MAX(dlop1.stream_id)
  1116. )
  1117. FROM device_lists_outbound_pokes dlop1
  1118. GROUP BY destination, user_id
  1119. HAVING min(ts) < ? AND count(*) > 1
  1120. """
  1121. txn.execute(select_sql, (yesterday,))
  1122. rows = txn.fetchall()
  1123. if not rows:
  1124. return
  1125. logger.info(
  1126. "Pruning old outbound device list updates for %i users/destinations: %s",
  1127. len(rows),
  1128. shortstr((row[0], row[1]) for row in rows),
  1129. )
  1130. # we want to keep the update with the highest stream_id for each user.
  1131. #
  1132. # there might be more than one update (with different device_ids) with the
  1133. # same stream_id, so we also delete all but one rows with the max stream id.
  1134. delete_sql = """
  1135. DELETE FROM device_lists_outbound_pokes
  1136. WHERE destination = ? AND user_id = ? AND (
  1137. stream_id < ? OR
  1138. (stream_id = ? AND device_id != ?)
  1139. )
  1140. """
  1141. count = 0
  1142. for destination, user_id, stream_id, device_id in rows:
  1143. txn.execute(
  1144. delete_sql, (destination, user_id, stream_id, stream_id, device_id)
  1145. )
  1146. count += txn.rowcount
  1147. # Since we've deleted unsent deltas, we need to remove the entry
  1148. # of last successful sent so that the prev_ids are correctly set.
  1149. sql = """
  1150. DELETE FROM device_lists_outbound_last_success
  1151. WHERE destination = ? AND user_id = ?
  1152. """
  1153. txn.execute_batch(sql, ((row[0], row[1]) for row in rows))
  1154. logger.info("Pruned %d device list outbound pokes", count)
  1155. await self.db_pool.runInteraction(
  1156. "_prune_old_outbound_device_pokes",
  1157. _prune_txn,
  1158. )
  1159. async def get_local_devices_not_accessed_since(
  1160. self, since_ms: int
  1161. ) -> Dict[str, List[str]]:
  1162. """Retrieves local devices that haven't been accessed since a given date.
  1163. Args:
  1164. since_ms: the timestamp to select on, every device with a last access date
  1165. from before that time is returned.
  1166. Returns:
  1167. A dictionary with an entry for each user with at least one device matching
  1168. the request, which value is a list of the device ID(s) for the corresponding
  1169. device(s).
  1170. """
  1171. def get_devices_not_accessed_since_txn(
  1172. txn: LoggingTransaction,
  1173. ) -> List[Dict[str, str]]:
  1174. sql = """
  1175. SELECT user_id, device_id
  1176. FROM devices WHERE last_seen < ? AND hidden = FALSE
  1177. """
  1178. txn.execute(sql, (since_ms,))
  1179. return self.db_pool.cursor_to_dict(txn)
  1180. rows = await self.db_pool.runInteraction(
  1181. "get_devices_not_accessed_since",
  1182. get_devices_not_accessed_since_txn,
  1183. )
  1184. devices: Dict[str, List[str]] = {}
  1185. for row in rows:
  1186. # Remote devices are never stale from our point of view.
  1187. if self.hs.is_mine_id(row["user_id"]):
  1188. user_devices = devices.setdefault(row["user_id"], [])
  1189. user_devices.append(row["device_id"])
  1190. return devices
  1191. @cached()
  1192. async def _get_min_device_lists_changes_in_room(self) -> int:
  1193. """Returns the minimum stream ID that we have entries for
  1194. `device_lists_changes_in_room`
  1195. """
  1196. return await self.db_pool.simple_select_one_onecol(
  1197. table="device_lists_changes_in_room",
  1198. keyvalues={},
  1199. retcol="COALESCE(MIN(stream_id), 0)",
  1200. desc="get_min_device_lists_changes_in_room",
  1201. )
  1202. @cancellable
  1203. async def get_device_list_changes_in_rooms(
  1204. self, room_ids: Collection[str], from_id: int
  1205. ) -> Optional[Set[str]]:
  1206. """Return the set of users whose devices have changed in the given rooms
  1207. since the given stream ID.
  1208. Returns None if the given stream ID is too old.
  1209. """
  1210. if not room_ids:
  1211. return set()
  1212. min_stream_id = await self._get_min_device_lists_changes_in_room()
  1213. if min_stream_id > from_id:
  1214. return None
  1215. sql = """
  1216. SELECT DISTINCT user_id FROM device_lists_changes_in_room
  1217. WHERE {clause} AND stream_id >= ?
  1218. """
  1219. def _get_device_list_changes_in_rooms_txn(
  1220. txn: LoggingTransaction,
  1221. clause: str,
  1222. args: List[Any],
  1223. ) -> Set[str]:
  1224. txn.execute(sql.format(clause=clause), args)
  1225. return {user_id for user_id, in txn}
  1226. changes = set()
  1227. for chunk in batch_iter(room_ids, 1000):
  1228. clause, args = make_in_list_sql_clause(
  1229. self.database_engine, "room_id", chunk
  1230. )
  1231. args.append(from_id)
  1232. changes |= await self.db_pool.runInteraction(
  1233. "get_device_list_changes_in_rooms",
  1234. _get_device_list_changes_in_rooms_txn,
  1235. clause,
  1236. args,
  1237. )
  1238. return changes
  1239. async def get_device_list_changes_in_room(
  1240. self, room_id: str, min_stream_id: int
  1241. ) -> Collection[Tuple[str, str]]:
  1242. """Get all device list changes that happened in the room since the given
  1243. stream ID.
  1244. Returns:
  1245. Collection of user ID/device ID tuples of all devices that have
  1246. changed
  1247. """
  1248. sql = """
  1249. SELECT DISTINCT user_id, device_id FROM device_lists_changes_in_room
  1250. WHERE room_id = ? AND stream_id > ?
  1251. """
  1252. def get_device_list_changes_in_room_txn(
  1253. txn: LoggingTransaction,
  1254. ) -> Collection[Tuple[str, str]]:
  1255. txn.execute(sql, (room_id, min_stream_id))
  1256. return cast(Collection[Tuple[str, str]], txn.fetchall())
  1257. return await self.db_pool.runInteraction(
  1258. "get_device_list_changes_in_room",
  1259. get_device_list_changes_in_room_txn,
  1260. )
  1261. class DeviceBackgroundUpdateStore(SQLBaseStore):
  1262. def __init__(
  1263. self,
  1264. database: DatabasePool,
  1265. db_conn: LoggingDatabaseConnection,
  1266. hs: "HomeServer",
  1267. ):
  1268. super().__init__(database, db_conn, hs)
  1269. self.db_pool.updates.register_background_index_update(
  1270. "device_lists_stream_idx",
  1271. index_name="device_lists_stream_user_id",
  1272. table="device_lists_stream",
  1273. columns=["user_id", "device_id"],
  1274. )
  1275. # create a unique index on device_lists_remote_cache
  1276. self.db_pool.updates.register_background_index_update(
  1277. "device_lists_remote_cache_unique_idx",
  1278. index_name="device_lists_remote_cache_unique_id",
  1279. table="device_lists_remote_cache",
  1280. columns=["user_id", "device_id"],
  1281. unique=True,
  1282. )
  1283. # And one on device_lists_remote_extremeties
  1284. self.db_pool.updates.register_background_index_update(
  1285. "device_lists_remote_extremeties_unique_idx",
  1286. index_name="device_lists_remote_extremeties_unique_idx",
  1287. table="device_lists_remote_extremeties",
  1288. columns=["user_id"],
  1289. unique=True,
  1290. )
  1291. # once they complete, we can remove the old non-unique indexes.
  1292. self.db_pool.updates.register_background_update_handler(
  1293. DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
  1294. self._drop_device_list_streams_non_unique_indexes,
  1295. )
  1296. # clear out duplicate device list outbound pokes
  1297. self.db_pool.updates.register_background_update_handler(
  1298. BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES,
  1299. self._remove_duplicate_outbound_pokes,
  1300. )
  1301. self.db_pool.updates.register_background_index_update(
  1302. "device_lists_changes_in_room_by_room_index",
  1303. index_name="device_lists_changes_in_room_by_room_idx",
  1304. table="device_lists_changes_in_room",
  1305. columns=["room_id", "stream_id"],
  1306. )
  1307. async def _drop_device_list_streams_non_unique_indexes(
  1308. self, progress: JsonDict, batch_size: int
  1309. ) -> int:
  1310. def f(conn: LoggingDatabaseConnection) -> None:
  1311. txn = conn.cursor()
  1312. txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
  1313. txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
  1314. txn.close()
  1315. await self.db_pool.runWithConnection(f)
  1316. await self.db_pool.updates._end_background_update(
  1317. DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES
  1318. )
  1319. return 1
  1320. async def _remove_duplicate_outbound_pokes(
  1321. self, progress: JsonDict, batch_size: int
  1322. ) -> int:
  1323. # for some reason, we have accumulated duplicate entries in
  1324. # device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
  1325. # efficient.
  1326. #
  1327. # For each duplicate, we delete all the existing rows and put one back.
  1328. KEY_COLS = ["stream_id", "destination", "user_id", "device_id"]
  1329. last_row = progress.get(
  1330. "last_row",
  1331. {"stream_id": 0, "destination": "", "user_id": "", "device_id": ""},
  1332. )
  1333. def _txn(txn: LoggingTransaction) -> int:
  1334. clause, args = make_tuple_comparison_clause(
  1335. [(x, last_row[x]) for x in KEY_COLS]
  1336. )
  1337. sql = """
  1338. SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts
  1339. FROM device_lists_outbound_pokes
  1340. WHERE %s
  1341. GROUP BY %s
  1342. HAVING count(*) > 1
  1343. ORDER BY %s
  1344. LIMIT ?
  1345. """ % (
  1346. clause, # WHERE
  1347. ",".join(KEY_COLS), # GROUP BY
  1348. ",".join(KEY_COLS), # ORDER BY
  1349. )
  1350. txn.execute(sql, args + [batch_size])
  1351. rows = self.db_pool.cursor_to_dict(txn)
  1352. row = None
  1353. for row in rows:
  1354. self.db_pool.simple_delete_txn(
  1355. txn,
  1356. "device_lists_outbound_pokes",
  1357. {x: row[x] for x in KEY_COLS},
  1358. )
  1359. row["sent"] = False
  1360. self.db_pool.simple_insert_txn(
  1361. txn,
  1362. "device_lists_outbound_pokes",
  1363. row,
  1364. )
  1365. if row:
  1366. self.db_pool.updates._background_update_progress_txn(
  1367. txn,
  1368. BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES,
  1369. {"last_row": row},
  1370. )
  1371. return len(rows)
  1372. rows = await self.db_pool.runInteraction(
  1373. BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, _txn
  1374. )
  1375. if not rows:
  1376. await self.db_pool.updates._end_background_update(
  1377. BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES
  1378. )
  1379. return rows
  1380. class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
  1381. # Because we have write access, this will be a StreamIdGenerator
  1382. # (see DeviceWorkerStore.__init__)
  1383. _device_list_id_gen: AbstractStreamIdGenerator
  1384. def __init__(
  1385. self,
  1386. database: DatabasePool,
  1387. db_conn: LoggingDatabaseConnection,
  1388. hs: "HomeServer",
  1389. ):
  1390. super().__init__(database, db_conn, hs)
  1391. # Map of (user_id, device_id) -> bool. If there is an entry that implies
  1392. # the device exists.
  1393. self.device_id_exists_cache: LruCache[
  1394. Tuple[str, str], Literal[True]
  1395. ] = LruCache(cache_name="device_id_exists", max_size=10000)
  1396. async def store_device(
  1397. self,
  1398. user_id: str,
  1399. device_id: str,
  1400. initial_device_display_name: Optional[str],
  1401. auth_provider_id: Optional[str] = None,
  1402. auth_provider_session_id: Optional[str] = None,
  1403. ) -> bool:
  1404. """Ensure the given device is known; add it to the store if not
  1405. Args:
  1406. user_id: id of user associated with the device
  1407. device_id: id of device
  1408. initial_device_display_name: initial displayname of the device.
  1409. Ignored if device exists.
  1410. auth_provider_id: The SSO IdP the user used, if any.
  1411. auth_provider_session_id: The session ID (sid) got from a OIDC login.
  1412. Returns:
  1413. Whether the device was inserted or an existing device existed with that ID.
  1414. Raises:
  1415. StoreError: if the device is already in use
  1416. """
  1417. key = (user_id, device_id)
  1418. if self.device_id_exists_cache.get(key, None):
  1419. return False
  1420. try:
  1421. inserted = await self.db_pool.simple_upsert(
  1422. "devices",
  1423. keyvalues={
  1424. "user_id": user_id,
  1425. "device_id": device_id,
  1426. },
  1427. values={},
  1428. insertion_values={
  1429. "display_name": initial_device_display_name,
  1430. "hidden": False,
  1431. },
  1432. desc="store_device",
  1433. )
  1434. if not inserted:
  1435. # if the device already exists, check if it's a real device, or
  1436. # if the device ID is reserved by something else
  1437. hidden = await self.db_pool.simple_select_one_onecol(
  1438. "devices",
  1439. keyvalues={"user_id": user_id, "device_id": device_id},
  1440. retcol="hidden",
  1441. )
  1442. if hidden:
  1443. raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN)
  1444. if auth_provider_id and auth_provider_session_id:
  1445. await self.db_pool.simple_insert(
  1446. "device_auth_providers",
  1447. values={
  1448. "user_id": user_id,
  1449. "device_id": device_id,
  1450. "auth_provider_id": auth_provider_id,
  1451. "auth_provider_session_id": auth_provider_session_id,
  1452. },
  1453. desc="store_device_auth_provider",
  1454. )
  1455. self.device_id_exists_cache.set(key, True)
  1456. return inserted
  1457. except StoreError:
  1458. raise
  1459. except Exception as e:
  1460. logger.error(
  1461. "store_device with device_id=%s(%r) user_id=%s(%r)"
  1462. " display_name=%s(%r) failed: %s",
  1463. type(device_id).__name__,
  1464. device_id,
  1465. type(user_id).__name__,
  1466. user_id,
  1467. type(initial_device_display_name).__name__,
  1468. initial_device_display_name,
  1469. e,
  1470. )
  1471. raise StoreError(500, "Problem storing device.")
  1472. async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
  1473. """Deletes several devices.
  1474. Args:
  1475. user_id: The ID of the user which owns the devices
  1476. device_ids: The IDs of the devices to delete
  1477. """
  1478. def _delete_devices_txn(txn: LoggingTransaction) -> None:
  1479. self.db_pool.simple_delete_many_txn(
  1480. txn,
  1481. table="devices",
  1482. column="device_id",
  1483. values=device_ids,
  1484. keyvalues={"user_id": user_id, "hidden": False},
  1485. )
  1486. self.db_pool.simple_delete_many_txn(
  1487. txn,
  1488. table="device_inbox",
  1489. column="device_id",
  1490. values=device_ids,
  1491. keyvalues={"user_id": user_id},
  1492. )
  1493. self.db_pool.simple_delete_many_txn(
  1494. txn,
  1495. table="device_auth_providers",
  1496. column="device_id",
  1497. values=device_ids,
  1498. keyvalues={"user_id": user_id},
  1499. )
  1500. await self.db_pool.runInteraction("delete_devices", _delete_devices_txn)
  1501. for device_id in device_ids:
  1502. self.device_id_exists_cache.invalidate((user_id, device_id))
  1503. async def update_device(
  1504. self, user_id: str, device_id: str, new_display_name: Optional[str] = None
  1505. ) -> None:
  1506. """Update a device. Only updates the device if it is not marked as
  1507. hidden.
  1508. Args:
  1509. user_id: The ID of the user which owns the device
  1510. device_id: The ID of the device to update
  1511. new_display_name: new displayname for device; None to leave unchanged
  1512. Raises:
  1513. StoreError: if the device is not found
  1514. """
  1515. updates = {}
  1516. if new_display_name is not None:
  1517. updates["display_name"] = new_display_name
  1518. if not updates:
  1519. return None
  1520. await self.db_pool.simple_update_one(
  1521. table="devices",
  1522. keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
  1523. updatevalues=updates,
  1524. desc="update_device",
  1525. )
  1526. async def update_remote_device_list_cache_entry(
  1527. self, user_id: str, device_id: str, content: JsonDict, stream_id: str
  1528. ) -> None:
  1529. """Updates a single device in the cache of a remote user's devicelist.
  1530. Note: assumes that we are the only thread that can be updating this user's
  1531. device list.
  1532. Args:
  1533. user_id: User to update device list for
  1534. device_id: ID of decivice being updated
  1535. content: new data on this device
  1536. stream_id: the version of the device list
  1537. """
  1538. await self.db_pool.runInteraction(
  1539. "update_remote_device_list_cache_entry",
  1540. self._update_remote_device_list_cache_entry_txn,
  1541. user_id,
  1542. device_id,
  1543. content,
  1544. stream_id,
  1545. )
  1546. def _update_remote_device_list_cache_entry_txn(
  1547. self,
  1548. txn: LoggingTransaction,
  1549. user_id: str,
  1550. device_id: str,
  1551. content: JsonDict,
  1552. stream_id: str,
  1553. ) -> None:
  1554. """Delete, update or insert a cache entry for this (user, device) pair."""
  1555. if content.get("deleted"):
  1556. self.db_pool.simple_delete_txn(
  1557. txn,
  1558. table="device_lists_remote_cache",
  1559. keyvalues={"user_id": user_id, "device_id": device_id},
  1560. )
  1561. txn.call_after(self.device_id_exists_cache.invalidate, (user_id, device_id))
  1562. else:
  1563. self.db_pool.simple_upsert_txn(
  1564. txn,
  1565. table="device_lists_remote_cache",
  1566. keyvalues={"user_id": user_id, "device_id": device_id},
  1567. values={"content": json_encoder.encode(content)},
  1568. )
  1569. txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
  1570. txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
  1571. txn.call_after(
  1572. self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
  1573. )
  1574. self.db_pool.simple_upsert_txn(
  1575. txn,
  1576. table="device_lists_remote_extremeties",
  1577. keyvalues={"user_id": user_id},
  1578. values={"stream_id": stream_id},
  1579. )
  1580. async def update_remote_device_list_cache(
  1581. self, user_id: str, devices: List[dict], stream_id: int
  1582. ) -> None:
  1583. """Replace the entire cache of the remote user's devices.
  1584. Note: assumes that we are the only thread that can be updating this user's
  1585. device list.
  1586. Args:
  1587. user_id: User to update device list for
  1588. devices: list of device objects supplied over federation
  1589. stream_id: the version of the device list
  1590. """
  1591. await self.db_pool.runInteraction(
  1592. "update_remote_device_list_cache",
  1593. self._update_remote_device_list_cache_txn,
  1594. user_id,
  1595. devices,
  1596. stream_id,
  1597. )
  1598. def _update_remote_device_list_cache_txn(
  1599. self, txn: LoggingTransaction, user_id: str, devices: List[dict], stream_id: int
  1600. ) -> None:
  1601. """Replace the list of cached devices for this user with the given list."""
  1602. self.db_pool.simple_delete_txn(
  1603. txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id}
  1604. )
  1605. self.db_pool.simple_insert_many_txn(
  1606. txn,
  1607. table="device_lists_remote_cache",
  1608. keys=("user_id", "device_id", "content"),
  1609. values=[
  1610. (user_id, content["device_id"], json_encoder.encode(content))
  1611. for content in devices
  1612. ],
  1613. )
  1614. txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
  1615. txn.call_after(self._get_cached_user_device.invalidate, (user_id,))
  1616. txn.call_after(
  1617. self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
  1618. )
  1619. self.db_pool.simple_upsert_txn(
  1620. txn,
  1621. table="device_lists_remote_extremeties",
  1622. keyvalues={"user_id": user_id},
  1623. values={"stream_id": stream_id},
  1624. )
  1625. async def add_device_change_to_streams(
  1626. self,
  1627. user_id: str,
  1628. device_ids: Collection[str],
  1629. room_ids: Collection[str],
  1630. ) -> Optional[int]:
  1631. """Persist that a user's devices have been updated, and which hosts
  1632. (if any) should be poked.
  1633. Args:
  1634. user_id: The ID of the user whose device changed.
  1635. device_ids: The IDs of any changed devices. If empty, this function will
  1636. return None.
  1637. room_ids: The rooms that the user is in
  1638. Returns:
  1639. The maximum stream ID of device list updates that were added to the database, or
  1640. None if no updates were added.
  1641. """
  1642. if not device_ids:
  1643. return None
  1644. context = get_active_span_text_map()
  1645. def add_device_changes_txn(
  1646. txn: LoggingTransaction, stream_ids: List[int]
  1647. ) -> None:
  1648. self._add_device_change_to_stream_txn(
  1649. txn,
  1650. user_id,
  1651. device_ids,
  1652. stream_ids,
  1653. )
  1654. self._add_device_outbound_room_poke_txn(
  1655. txn,
  1656. user_id,
  1657. device_ids,
  1658. room_ids,
  1659. stream_ids,
  1660. context,
  1661. )
  1662. async with self._device_list_id_gen.get_next_mult(
  1663. len(device_ids)
  1664. ) as stream_ids:
  1665. await self.db_pool.runInteraction(
  1666. "add_device_change_to_stream",
  1667. add_device_changes_txn,
  1668. stream_ids,
  1669. )
  1670. return stream_ids[-1]
  1671. def _add_device_change_to_stream_txn(
  1672. self,
  1673. txn: LoggingTransaction,
  1674. user_id: str,
  1675. device_ids: Collection[str],
  1676. stream_ids: List[int],
  1677. ) -> None:
  1678. txn.call_after(
  1679. self._device_list_stream_cache.entity_has_changed,
  1680. user_id,
  1681. stream_ids[-1],
  1682. )
  1683. min_stream_id = stream_ids[0]
  1684. # Delete older entries in the table, as we really only care about
  1685. # when the latest change happened.
  1686. txn.execute_batch(
  1687. """
  1688. DELETE FROM device_lists_stream
  1689. WHERE user_id = ? AND device_id = ? AND stream_id < ?
  1690. """,
  1691. [(user_id, device_id, min_stream_id) for device_id in device_ids],
  1692. )
  1693. self.db_pool.simple_insert_many_txn(
  1694. txn,
  1695. table="device_lists_stream",
  1696. keys=("stream_id", "user_id", "device_id"),
  1697. values=[
  1698. (stream_id, user_id, device_id)
  1699. for stream_id, device_id in zip(stream_ids, device_ids)
  1700. ],
  1701. )
  1702. def _add_device_outbound_poke_to_stream_txn(
  1703. self,
  1704. txn: LoggingTransaction,
  1705. user_id: str,
  1706. device_id: str,
  1707. hosts: Collection[str],
  1708. stream_ids: List[int],
  1709. context: Optional[Dict[str, str]],
  1710. ) -> None:
  1711. for host in hosts:
  1712. txn.call_after(
  1713. self._device_list_federation_stream_cache.entity_has_changed,
  1714. host,
  1715. stream_ids[-1],
  1716. )
  1717. now = self._clock.time_msec()
  1718. stream_id_iterator = iter(stream_ids)
  1719. encoded_context = json_encoder.encode(context)
  1720. mark_sent = not self.hs.is_mine_id(user_id)
  1721. values = [
  1722. (
  1723. destination,
  1724. next(stream_id_iterator),
  1725. user_id,
  1726. device_id,
  1727. mark_sent,
  1728. now,
  1729. encoded_context if whitelisted_homeserver(destination) else "{}",
  1730. )
  1731. for destination in hosts
  1732. ]
  1733. self.db_pool.simple_insert_many_txn(
  1734. txn,
  1735. table="device_lists_outbound_pokes",
  1736. keys=(
  1737. "destination",
  1738. "stream_id",
  1739. "user_id",
  1740. "device_id",
  1741. "sent",
  1742. "ts",
  1743. "opentracing_context",
  1744. ),
  1745. values=values,
  1746. )
  1747. # debugging for https://github.com/matrix-org/synapse/issues/14251
  1748. if issue_8631_logger.isEnabledFor(logging.DEBUG):
  1749. issue_8631_logger.debug(
  1750. "Recorded outbound pokes for %s:%s with device stream ids %s",
  1751. user_id,
  1752. device_id,
  1753. {
  1754. stream_id: destination
  1755. for (destination, stream_id, _, _, _, _, _) in values
  1756. },
  1757. )
  1758. def _add_device_outbound_room_poke_txn(
  1759. self,
  1760. txn: LoggingTransaction,
  1761. user_id: str,
  1762. device_ids: Iterable[str],
  1763. room_ids: Collection[str],
  1764. stream_ids: List[int],
  1765. context: Dict[str, str],
  1766. ) -> None:
  1767. """Record the user in the room has updated their device."""
  1768. encoded_context = json_encoder.encode(context)
  1769. # The `device_lists_changes_in_room.stream_id` column matches the
  1770. # corresponding `stream_id` of the update in the `device_lists_stream`
  1771. # table, i.e. all rows persisted for the same device update will have
  1772. # the same `stream_id` (but different room IDs).
  1773. self.db_pool.simple_insert_many_txn(
  1774. txn,
  1775. table="device_lists_changes_in_room",
  1776. keys=(
  1777. "user_id",
  1778. "device_id",
  1779. "room_id",
  1780. "stream_id",
  1781. "converted_to_destinations",
  1782. "opentracing_context",
  1783. ),
  1784. values=[
  1785. (
  1786. user_id,
  1787. device_id,
  1788. room_id,
  1789. stream_id,
  1790. # We only need to calculate outbound pokes for local users
  1791. not self.hs.is_mine_id(user_id),
  1792. encoded_context,
  1793. )
  1794. for room_id in room_ids
  1795. for device_id, stream_id in zip(device_ids, stream_ids)
  1796. ],
  1797. )
  1798. async def get_uncoverted_outbound_room_pokes(
  1799. self, start_stream_id: int, start_room_id: str, limit: int = 10
  1800. ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
  1801. """Get device list changes by room that have not yet been handled and
  1802. written to `device_lists_outbound_pokes`.
  1803. Args:
  1804. start_stream_id: Together with `start_room_id`, indicates the position after
  1805. which to return device list changes.
  1806. start_room_id: Together with `start_stream_id`, indicates the position after
  1807. which to return device list changes.
  1808. limit: The maximum number of device list changes to return.
  1809. Returns:
  1810. A list of user ID, device ID, room ID, stream ID and optional opentracing
  1811. context, in order of ascending (stream ID, room ID).
  1812. """
  1813. sql = """
  1814. SELECT user_id, device_id, room_id, stream_id, opentracing_context
  1815. FROM device_lists_changes_in_room
  1816. WHERE
  1817. (stream_id, room_id) > (?, ?) AND
  1818. stream_id <= ? AND
  1819. NOT converted_to_destinations
  1820. ORDER BY stream_id ASC, room_id ASC
  1821. LIMIT ?
  1822. """
  1823. def get_uncoverted_outbound_room_pokes_txn(
  1824. txn: LoggingTransaction,
  1825. ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
  1826. txn.execute(
  1827. sql,
  1828. (
  1829. start_stream_id,
  1830. start_room_id,
  1831. # Avoid returning rows if there may be uncommitted device list
  1832. # changes with smaller stream IDs.
  1833. self._device_list_id_gen.get_current_token(),
  1834. limit,
  1835. ),
  1836. )
  1837. return [
  1838. (
  1839. user_id,
  1840. device_id,
  1841. room_id,
  1842. stream_id,
  1843. db_to_json(opentracing_context),
  1844. )
  1845. for user_id, device_id, room_id, stream_id, opentracing_context in txn
  1846. ]
  1847. return await self.db_pool.runInteraction(
  1848. "get_uncoverted_outbound_room_pokes", get_uncoverted_outbound_room_pokes_txn
  1849. )
  1850. async def add_device_list_outbound_pokes(
  1851. self,
  1852. user_id: str,
  1853. device_id: str,
  1854. room_id: str,
  1855. hosts: Collection[str],
  1856. context: Optional[Dict[str, str]],
  1857. ) -> None:
  1858. """Queue the device update to be sent to the given set of hosts,
  1859. calculated from the room ID.
  1860. """
  1861. if not hosts:
  1862. return
  1863. def add_device_list_outbound_pokes_txn(
  1864. txn: LoggingTransaction, stream_ids: List[int]
  1865. ) -> None:
  1866. self._add_device_outbound_poke_to_stream_txn(
  1867. txn,
  1868. user_id=user_id,
  1869. device_id=device_id,
  1870. hosts=hosts,
  1871. stream_ids=stream_ids,
  1872. context=context,
  1873. )
  1874. async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
  1875. return await self.db_pool.runInteraction(
  1876. "add_device_list_outbound_pokes",
  1877. add_device_list_outbound_pokes_txn,
  1878. stream_ids,
  1879. )
  1880. async def add_remote_device_list_to_pending(
  1881. self, user_id: str, device_id: str
  1882. ) -> None:
  1883. """Add a device list update to the table tracking remote device list
  1884. updates during partial joins.
  1885. """
  1886. async with self._device_list_id_gen.get_next() as stream_id:
  1887. await self.db_pool.simple_upsert(
  1888. table="device_lists_remote_pending",
  1889. keyvalues={
  1890. "user_id": user_id,
  1891. "device_id": device_id,
  1892. },
  1893. values={"stream_id": stream_id},
  1894. desc="add_remote_device_list_to_pending",
  1895. )
  1896. async def get_pending_remote_device_list_updates_for_room(
  1897. self, room_id: str
  1898. ) -> Collection[Tuple[str, str]]:
  1899. """Get the set of remote device list updates from the pending table for
  1900. the room.
  1901. """
  1902. min_device_stream_id = await self.db_pool.simple_select_one_onecol(
  1903. table="partial_state_rooms",
  1904. keyvalues={
  1905. "room_id": room_id,
  1906. },
  1907. retcol="device_lists_stream_id",
  1908. desc="get_pending_remote_device_list_updates_for_room_device",
  1909. )
  1910. sql = """
  1911. SELECT user_id, device_id FROM device_lists_remote_pending AS d
  1912. INNER JOIN current_state_events AS c ON
  1913. type = 'm.room.member'
  1914. AND state_key = user_id
  1915. AND membership = 'join'
  1916. WHERE
  1917. room_id = ? AND stream_id > ?
  1918. """
  1919. def get_pending_remote_device_list_updates_for_room_txn(
  1920. txn: LoggingTransaction,
  1921. ) -> Collection[Tuple[str, str]]:
  1922. txn.execute(sql, (room_id, min_device_stream_id))
  1923. return cast(Collection[Tuple[str, str]], txn.fetchall())
  1924. return await self.db_pool.runInteraction(
  1925. "get_pending_remote_device_list_updates_for_room",
  1926. get_pending_remote_device_list_updates_for_room_txn,
  1927. )
  1928. async def get_device_change_last_converted_pos(self) -> Tuple[int, str]:
  1929. """
  1930. Get the position of the last row in `device_list_changes_in_room` that has been
  1931. converted to `device_lists_outbound_pokes`.
  1932. Rows with a strictly greater position where `converted_to_destinations` is
  1933. `FALSE` have not been converted.
  1934. """
  1935. row = await self.db_pool.simple_select_one(
  1936. table="device_lists_changes_converted_stream_position",
  1937. keyvalues={},
  1938. retcols=["stream_id", "room_id"],
  1939. desc="get_device_change_last_converted_pos",
  1940. )
  1941. return row["stream_id"], row["room_id"]
  1942. async def set_device_change_last_converted_pos(
  1943. self,
  1944. stream_id: int,
  1945. room_id: str,
  1946. ) -> None:
  1947. """
  1948. Set the position of the last row in `device_list_changes_in_room` that has been
  1949. converted to `device_lists_outbound_pokes`.
  1950. """
  1951. await self.db_pool.simple_update_one(
  1952. table="device_lists_changes_converted_stream_position",
  1953. keyvalues={},
  1954. updatevalues={"stream_id": stream_id, "room_id": room_id},
  1955. desc="set_device_change_last_converted_pos",
  1956. )