Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.
 
 
 
 
 
 

219 рядки
8.6 KiB

  1. # Copyright 2014-2016 OpenMarket Ltd
  2. # Copyright 2017-2018 New Vector Ltd
  3. # Copyright 2019 The Matrix.org Foundation C.I.C.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from abc import ABCMeta
  18. from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Union
  19. from synapse.storage.database import make_in_list_sql_clause # noqa: F401; noqa: F401
  20. from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
  21. from synapse.types import get_domain_from_id
  22. from synapse.util import json_decoder
  23. from synapse.util.caches.descriptors import CachedFunction
  24. if TYPE_CHECKING:
  25. from synapse.server import HomeServer
  26. logger = logging.getLogger(__name__)
  27. # some of our subclasses have abstract methods, so we use the ABCMeta metaclass.
  28. class SQLBaseStore(metaclass=ABCMeta):
  29. """Base class for data stores that holds helper functions.
  30. Note that multiple instances of this class will exist as there will be one
  31. per data store (and not one per physical database).
  32. """
  33. db_pool: DatabasePool
  34. def __init__(
  35. self,
  36. database: DatabasePool,
  37. db_conn: LoggingDatabaseConnection,
  38. hs: "HomeServer",
  39. ):
  40. self.hs = hs
  41. self._clock = hs.get_clock()
  42. self.database_engine = database.engine
  43. self.db_pool = database
  44. self.external_cached_functions: Dict[str, CachedFunction] = {}
  45. def process_replication_rows( # noqa: B027 (no-op by design)
  46. self,
  47. stream_name: str,
  48. instance_name: str,
  49. token: int,
  50. rows: Iterable[Any],
  51. ) -> None:
  52. """
  53. Used by storage classes to invalidate caches based on incoming replication data. These
  54. must not update any ID generators, use `process_replication_position`.
  55. """
  56. def process_replication_position( # noqa: B027 (no-op by design)
  57. self,
  58. stream_name: str,
  59. instance_name: str,
  60. token: int,
  61. ) -> None:
  62. """
  63. Used by storage classes to advance ID generators based on incoming replication data. This
  64. is called after process_replication_rows such that caches are invalidated before any token
  65. positions advance.
  66. """
  67. def _invalidate_state_caches(
  68. self, room_id: str, members_changed: Collection[str]
  69. ) -> None:
  70. """Invalidates caches that are based on the current state, but does
  71. not stream invalidations down replication.
  72. Args:
  73. room_id: Room where state changed
  74. members_changed: The user_ids of members that have changed
  75. """
  76. # XXX: If you add something to this function make sure you add it to
  77. # `_invalidate_state_caches_all` as well.
  78. # If there were any membership changes, purge the appropriate caches.
  79. for host in {get_domain_from_id(u) for u in members_changed}:
  80. self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
  81. self._attempt_to_invalidate_cache("is_host_invited", (room_id, host))
  82. if members_changed:
  83. self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
  84. self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
  85. self._attempt_to_invalidate_cache(
  86. "get_users_in_room_with_profiles", (room_id,)
  87. )
  88. self._attempt_to_invalidate_cache(
  89. "get_number_joined_users_in_room", (room_id,)
  90. )
  91. self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
  92. # There's no easy way of invalidating this cache for just the users
  93. # that have changed, so we just clear the entire thing.
  94. self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
  95. for user_id in members_changed:
  96. self._attempt_to_invalidate_cache(
  97. "get_user_in_room_with_profile", (room_id, user_id)
  98. )
  99. self._attempt_to_invalidate_cache(
  100. "get_rooms_for_user_with_stream_ordering", (user_id,)
  101. )
  102. self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
  103. # Purge other caches based on room state.
  104. self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
  105. self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
  106. def _invalidate_state_caches_all(self, room_id: str) -> None:
  107. """Invalidates caches that are based on the current state, but does
  108. not stream invalidations down replication.
  109. Same as `_invalidate_state_caches`, except that works when we don't know
  110. which memberships have changed.
  111. Args:
  112. room_id: Room where state changed
  113. """
  114. self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
  115. self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
  116. self._attempt_to_invalidate_cache("is_host_invited", None)
  117. self._attempt_to_invalidate_cache("is_host_joined", None)
  118. self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
  119. self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
  120. self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,))
  121. self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
  122. self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
  123. self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
  124. self._attempt_to_invalidate_cache(
  125. "get_rooms_for_user_with_stream_ordering", None
  126. )
  127. self._attempt_to_invalidate_cache("get_rooms_for_user", None)
  128. self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
  129. def _attempt_to_invalidate_cache(
  130. self, cache_name: str, key: Optional[Collection[Any]]
  131. ) -> bool:
  132. """Attempts to invalidate the cache of the given name, ignoring if the
  133. cache doesn't exist. Mainly used for invalidating caches on workers,
  134. where they may not have the cache.
  135. Note that this function does not invalidate any remote caches, only the
  136. local in-memory ones. Any remote invalidation must be performed before
  137. calling this.
  138. Args:
  139. cache_name
  140. key: Entry to invalidate. If None then invalidates the entire
  141. cache.
  142. """
  143. try:
  144. cache = getattr(self, cache_name)
  145. except AttributeError:
  146. # Check if an externally defined module cache has been registered
  147. cache = self.external_cached_functions.get(cache_name)
  148. if not cache:
  149. # We probably haven't pulled in the cache in this worker,
  150. # which is fine.
  151. return False
  152. if key is None:
  153. cache.invalidate_all()
  154. else:
  155. # Prefer any local-only invalidation method. Invalidating any non-local
  156. # cache must be be done before this.
  157. invalidate_method = getattr(cache, "invalidate_local", cache.invalidate)
  158. invalidate_method(tuple(key))
  159. return True
  160. def register_external_cached_function(
  161. self, cache_name: str, func: CachedFunction
  162. ) -> None:
  163. self.external_cached_functions[cache_name] = func
  164. def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
  165. """
  166. Take some data from a database row and return a JSON-decoded object.
  167. Args:
  168. db_content: The JSON-encoded contents from the database.
  169. Returns:
  170. The object decoded from JSON.
  171. """
  172. # psycopg2 on Python 3 returns memoryview objects, which we need to
  173. # cast to bytes to decode
  174. if isinstance(db_content, memoryview):
  175. db_content = db_content.tobytes()
  176. # Decode it to a Unicode string before feeding it to the JSON decoder, since
  177. # it only supports handling strings
  178. if isinstance(db_content, (bytes, bytearray)):
  179. db_content = db_content.decode("utf8")
  180. try:
  181. return json_decoder.decode(db_content)
  182. except Exception:
  183. logging.warning("Tried to decode '%r' as JSON and failed", db_content)
  184. raise