Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.
 
 
 
 
 
 

338 Zeilen
11 KiB

  1. # Copyright 2023 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import random
  15. from types import TracebackType
  16. from typing import (
  17. TYPE_CHECKING,
  18. AsyncContextManager,
  19. Collection,
  20. Dict,
  21. Optional,
  22. Tuple,
  23. Type,
  24. Union,
  25. )
  26. from weakref import WeakSet
  27. import attr
  28. from twisted.internet import defer
  29. from twisted.internet.interfaces import IReactorTime
  30. from synapse.logging.context import PreserveLoggingContext
  31. from synapse.logging.opentracing import start_active_span
  32. from synapse.metrics.background_process_metrics import wrap_as_background_process
  33. from synapse.storage.databases.main.lock import Lock, LockStore
  34. from synapse.util.async_helpers import timeout_deferred
  35. if TYPE_CHECKING:
  36. from synapse.logging.opentracing import opentracing
  37. from synapse.server import HomeServer
  38. # This lock is used to avoid creating an event while we are purging the room.
  39. # We take a read lock when creating an event, and a write one when purging a room.
  40. # This is because it is fine to create several events concurrently, since referenced events
  41. # will not disappear under our feet as long as we don't delete the room.
  42. NEW_EVENT_DURING_PURGE_LOCK_NAME = "new_event_during_purge_lock"
  43. class WorkerLocksHandler:
  44. """A class for waiting on taking out locks, rather than using the storage
  45. functions directly (which don't support awaiting).
  46. """
  47. def __init__(self, hs: "HomeServer") -> None:
  48. self._reactor = hs.get_reactor()
  49. self._store = hs.get_datastores().main
  50. self._clock = hs.get_clock()
  51. self._notifier = hs.get_notifier()
  52. self._instance_name = hs.get_instance_name()
  53. # Map from lock name/key to set of `WaitingLock` that are active for
  54. # that lock.
  55. self._locks: Dict[
  56. Tuple[str, str], WeakSet[Union[WaitingLock, WaitingMultiLock]]
  57. ] = {}
  58. self._clock.looping_call(self._cleanup_locks, 30_000)
  59. self._notifier.add_lock_released_callback(self._on_lock_released)
  60. def acquire_lock(self, lock_name: str, lock_key: str) -> "WaitingLock":
  61. """Acquire a standard lock, returns a context manager that will block
  62. until the lock is acquired.
  63. Note: Care must be taken to avoid deadlocks. In particular, this
  64. function does *not* timeout.
  65. Usage:
  66. async with handler.acquire_lock(name, key):
  67. # Do work while holding the lock...
  68. """
  69. lock = WaitingLock(
  70. reactor=self._reactor,
  71. store=self._store,
  72. handler=self,
  73. lock_name=lock_name,
  74. lock_key=lock_key,
  75. write=None,
  76. )
  77. self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock)
  78. return lock
  79. def acquire_read_write_lock(
  80. self,
  81. lock_name: str,
  82. lock_key: str,
  83. *,
  84. write: bool,
  85. ) -> "WaitingLock":
  86. """Acquire a read/write lock, returns a context manager that will block
  87. until the lock is acquired.
  88. Note: Care must be taken to avoid deadlocks. In particular, this
  89. function does *not* timeout.
  90. Usage:
  91. async with handler.acquire_read_write_lock(name, key, write=True):
  92. # Do work while holding the lock...
  93. """
  94. lock = WaitingLock(
  95. reactor=self._reactor,
  96. store=self._store,
  97. handler=self,
  98. lock_name=lock_name,
  99. lock_key=lock_key,
  100. write=write,
  101. )
  102. self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock)
  103. return lock
  104. def acquire_multi_read_write_lock(
  105. self,
  106. lock_names: Collection[Tuple[str, str]],
  107. *,
  108. write: bool,
  109. ) -> "WaitingMultiLock":
  110. """Acquires multi read/write locks at once, returns a context manager
  111. that will block until all the locks are acquired.
  112. This will try and acquire all locks at once, and will never hold on to a
  113. subset of the locks. (This avoids accidentally creating deadlocks).
  114. Note: Care must be taken to avoid deadlocks. In particular, this
  115. function does *not* timeout.
  116. """
  117. lock = WaitingMultiLock(
  118. lock_names=lock_names,
  119. write=write,
  120. reactor=self._reactor,
  121. store=self._store,
  122. handler=self,
  123. )
  124. for lock_name, lock_key in lock_names:
  125. self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock)
  126. return lock
  127. def notify_lock_released(self, lock_name: str, lock_key: str) -> None:
  128. """Notify that a lock has been released.
  129. Pokes both the notifier and replication.
  130. """
  131. self._notifier.notify_lock_released(self._instance_name, lock_name, lock_key)
  132. def _on_lock_released(
  133. self, instance_name: str, lock_name: str, lock_key: str
  134. ) -> None:
  135. """Called when a lock has been released.
  136. Wakes up any locks that might be waiting on this.
  137. """
  138. locks = self._locks.get((lock_name, lock_key))
  139. if not locks:
  140. return
  141. def _wake_deferred(deferred: defer.Deferred) -> None:
  142. if not deferred.called:
  143. deferred.callback(None)
  144. for lock in locks:
  145. self._clock.call_later(0, _wake_deferred, lock.deferred)
  146. @wrap_as_background_process("_cleanup_locks")
  147. async def _cleanup_locks(self) -> None:
  148. """Periodically cleans out stale entries in the locks map"""
  149. self._locks = {key: value for key, value in self._locks.items() if value}
  150. @attr.s(auto_attribs=True, eq=False)
  151. class WaitingLock:
  152. reactor: IReactorTime
  153. store: LockStore
  154. handler: WorkerLocksHandler
  155. lock_name: str
  156. lock_key: str
  157. write: Optional[bool]
  158. deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred)
  159. _inner_lock: Optional[Lock] = None
  160. _retry_interval: float = 0.1
  161. _lock_span: "opentracing.Scope" = attr.Factory(
  162. lambda: start_active_span("WaitingLock.lock")
  163. )
  164. async def __aenter__(self) -> None:
  165. self._lock_span.__enter__()
  166. with start_active_span("WaitingLock.waiting_for_lock"):
  167. while self._inner_lock is None:
  168. self.deferred = defer.Deferred()
  169. if self.write is not None:
  170. lock = await self.store.try_acquire_read_write_lock(
  171. self.lock_name, self.lock_key, write=self.write
  172. )
  173. else:
  174. lock = await self.store.try_acquire_lock(
  175. self.lock_name, self.lock_key
  176. )
  177. if lock:
  178. self._inner_lock = lock
  179. break
  180. try:
  181. # Wait until the we get notified the lock might have been
  182. # released (by the deferred being resolved). We also
  183. # periodically wake up in case the lock was released but we
  184. # weren't notified.
  185. with PreserveLoggingContext():
  186. await timeout_deferred(
  187. deferred=self.deferred,
  188. timeout=self._get_next_retry_interval(),
  189. reactor=self.reactor,
  190. )
  191. except Exception:
  192. pass
  193. return await self._inner_lock.__aenter__()
  194. async def __aexit__(
  195. self,
  196. exc_type: Optional[Type[BaseException]],
  197. exc: Optional[BaseException],
  198. tb: Optional[TracebackType],
  199. ) -> Optional[bool]:
  200. assert self._inner_lock
  201. self.handler.notify_lock_released(self.lock_name, self.lock_key)
  202. try:
  203. r = await self._inner_lock.__aexit__(exc_type, exc, tb)
  204. finally:
  205. self._lock_span.__exit__(exc_type, exc, tb)
  206. return r
  207. def _get_next_retry_interval(self) -> float:
  208. next = self._retry_interval
  209. self._retry_interval = max(5, next * 2)
  210. return next * random.uniform(0.9, 1.1)
  211. @attr.s(auto_attribs=True, eq=False)
  212. class WaitingMultiLock:
  213. lock_names: Collection[Tuple[str, str]]
  214. write: bool
  215. reactor: IReactorTime
  216. store: LockStore
  217. handler: WorkerLocksHandler
  218. deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred)
  219. _inner_lock_cm: Optional[AsyncContextManager] = None
  220. _retry_interval: float = 0.1
  221. _lock_span: "opentracing.Scope" = attr.Factory(
  222. lambda: start_active_span("WaitingLock.lock")
  223. )
  224. async def __aenter__(self) -> None:
  225. self._lock_span.__enter__()
  226. with start_active_span("WaitingLock.waiting_for_lock"):
  227. while self._inner_lock_cm is None:
  228. self.deferred = defer.Deferred()
  229. lock_cm = await self.store.try_acquire_multi_read_write_lock(
  230. self.lock_names, write=self.write
  231. )
  232. if lock_cm:
  233. self._inner_lock_cm = lock_cm
  234. break
  235. try:
  236. # Wait until the we get notified the lock might have been
  237. # released (by the deferred being resolved). We also
  238. # periodically wake up in case the lock was released but we
  239. # weren't notified.
  240. with PreserveLoggingContext():
  241. await timeout_deferred(
  242. deferred=self.deferred,
  243. timeout=self._get_next_retry_interval(),
  244. reactor=self.reactor,
  245. )
  246. except Exception:
  247. pass
  248. assert self._inner_lock_cm
  249. await self._inner_lock_cm.__aenter__()
  250. return
  251. async def __aexit__(
  252. self,
  253. exc_type: Optional[Type[BaseException]],
  254. exc: Optional[BaseException],
  255. tb: Optional[TracebackType],
  256. ) -> Optional[bool]:
  257. assert self._inner_lock_cm
  258. for lock_name, lock_key in self.lock_names:
  259. self.handler.notify_lock_released(lock_name, lock_key)
  260. try:
  261. r = await self._inner_lock_cm.__aexit__(exc_type, exc, tb)
  262. finally:
  263. self._lock_span.__exit__(exc_type, exc, tb)
  264. return r
  265. def _get_next_retry_interval(self) -> float:
  266. next = self._retry_interval
  267. self._retry_interval = max(5, next * 2)
  268. return next * random.uniform(0.9, 1.1)