Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 
 
 
 

315 rindas
12 KiB

  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import random
  16. from types import TracebackType
  17. from typing import TYPE_CHECKING, Any, Optional, Type
  18. from synapse.api.errors import CodeMessageException
  19. from synapse.metrics.background_process_metrics import run_as_background_process
  20. from synapse.storage import DataStore
  21. from synapse.types import StrCollection
  22. from synapse.util import Clock
  23. if TYPE_CHECKING:
  24. from synapse.notifier import Notifier
  25. from synapse.replication.tcp.handler import ReplicationCommandHandler
  26. logger = logging.getLogger(__name__)
  27. class NotRetryingDestination(Exception):
  28. def __init__(self, retry_last_ts: int, retry_interval: int, destination: str):
  29. """Raised by the limiter (and federation client) to indicate that we are
  30. are deliberately not attempting to contact a given server.
  31. Args:
  32. retry_last_ts: the unix ts in milliseconds of our last attempt
  33. to contact the server. 0 indicates that the last attempt was
  34. successful or that we've never actually attempted to connect.
  35. retry_interval: the time in milliseconds to wait until the next
  36. attempt.
  37. destination: the domain in question
  38. """
  39. msg = f"Not retrying server {destination} because we tried it recently retry_last_ts={retry_last_ts} and we won't check for another retry_interval={retry_interval}ms."
  40. super().__init__(msg)
  41. self.retry_last_ts = retry_last_ts
  42. self.retry_interval = retry_interval
  43. self.destination = destination
  44. async def get_retry_limiter(
  45. destination: str,
  46. clock: Clock,
  47. store: DataStore,
  48. ignore_backoff: bool = False,
  49. **kwargs: Any,
  50. ) -> "RetryDestinationLimiter":
  51. """For a given destination check if we have previously failed to
  52. send a request there and are waiting before retrying the destination.
  53. If we are not ready to retry the destination, this will raise a
  54. NotRetryingDestination exception. Otherwise, will return a Context Manager
  55. that will mark the destination as down if an exception is thrown (excluding
  56. CodeMessageException with code < 500)
  57. Args:
  58. destination: name of homeserver
  59. clock: timing source
  60. store: datastore
  61. ignore_backoff: true to ignore the historical backoff data and
  62. try the request anyway. We will still reset the retry_interval on success.
  63. Example usage:
  64. try:
  65. limiter = await get_retry_limiter(destination, clock, store)
  66. with limiter:
  67. response = await do_request()
  68. except NotRetryingDestination:
  69. # We aren't ready to retry that destination.
  70. raise
  71. """
  72. failure_ts = None
  73. retry_last_ts, retry_interval = (0, 0)
  74. retry_timings = await store.get_destination_retry_timings(destination)
  75. if retry_timings:
  76. failure_ts = retry_timings.failure_ts
  77. retry_last_ts = retry_timings.retry_last_ts
  78. retry_interval = retry_timings.retry_interval
  79. now = int(clock.time_msec())
  80. if not ignore_backoff and retry_last_ts + retry_interval > now:
  81. raise NotRetryingDestination(
  82. retry_last_ts=retry_last_ts,
  83. retry_interval=retry_interval,
  84. destination=destination,
  85. )
  86. # if we are ignoring the backoff data, we should also not increment the backoff
  87. # when we get another failure - otherwise a server can very quickly reach the
  88. # maximum backoff even though it might only have been down briefly
  89. backoff_on_failure = not ignore_backoff
  90. return RetryDestinationLimiter(
  91. destination,
  92. clock,
  93. store,
  94. failure_ts,
  95. retry_interval,
  96. backoff_on_failure=backoff_on_failure,
  97. **kwargs,
  98. )
  99. async def filter_destinations_by_retry_limiter(
  100. destinations: StrCollection,
  101. clock: Clock,
  102. store: DataStore,
  103. retry_due_within_ms: int = 0,
  104. ) -> StrCollection:
  105. """Filter down the list of destinations to only those that will are either
  106. alive or due for a retry (within `retry_due_within_ms`)
  107. """
  108. if not destinations:
  109. return destinations
  110. retry_timings = await store.get_destination_retry_timings_batch(destinations)
  111. now = int(clock.time_msec())
  112. return [
  113. destination
  114. for destination, timings in retry_timings.items()
  115. if timings is None
  116. or timings.retry_last_ts + timings.retry_interval <= now + retry_due_within_ms
  117. ]
  118. class RetryDestinationLimiter:
  119. def __init__(
  120. self,
  121. destination: str,
  122. clock: Clock,
  123. store: DataStore,
  124. failure_ts: Optional[int],
  125. retry_interval: int,
  126. backoff_on_404: bool = False,
  127. backoff_on_failure: bool = True,
  128. notifier: Optional["Notifier"] = None,
  129. replication_client: Optional["ReplicationCommandHandler"] = None,
  130. backoff_on_all_error_codes: bool = False,
  131. ):
  132. """Marks the destination as "down" if an exception is thrown in the
  133. context, except for CodeMessageException with code < 500.
  134. If no exception is raised, marks the destination as "up".
  135. Args:
  136. destination
  137. clock
  138. store
  139. failure_ts: when this destination started failing (in ms since
  140. the epoch), or zero if the last request was successful
  141. retry_interval: The next retry interval taken from the
  142. database in milliseconds, or zero if the last request was
  143. successful.
  144. backoff_on_404: Back off if we get a 404
  145. backoff_on_failure: set to False if we should not increase the
  146. retry interval on a failure.
  147. notifier: A notifier used to mark servers as up.
  148. replication_client A replication client used to mark servers as up.
  149. backoff_on_all_error_codes: Whether we should back off on any
  150. error code.
  151. """
  152. self.clock = clock
  153. self.store = store
  154. self.destination = destination
  155. self.failure_ts = failure_ts
  156. self.retry_interval = retry_interval
  157. self.backoff_on_404 = backoff_on_404
  158. self.backoff_on_failure = backoff_on_failure
  159. self.backoff_on_all_error_codes = backoff_on_all_error_codes
  160. self.notifier = notifier
  161. self.replication_client = replication_client
  162. self.destination_min_retry_interval_ms = (
  163. self.store.hs.config.federation.destination_min_retry_interval_ms
  164. )
  165. self.destination_retry_multiplier = (
  166. self.store.hs.config.federation.destination_retry_multiplier
  167. )
  168. self.destination_max_retry_interval_ms = (
  169. self.store.hs.config.federation.destination_max_retry_interval_ms
  170. )
  171. def __enter__(self) -> None:
  172. pass
  173. def __exit__(
  174. self,
  175. exc_type: Optional[Type[BaseException]],
  176. exc_val: Optional[BaseException],
  177. exc_tb: Optional[TracebackType],
  178. ) -> None:
  179. success = exc_type is None
  180. valid_err_code = False
  181. if exc_type is None:
  182. valid_err_code = True
  183. elif not issubclass(exc_type, Exception):
  184. # avoid treating exceptions which don't derive from Exception as
  185. # failures; this is mostly so as not to catch defer._DefGen.
  186. valid_err_code = True
  187. elif isinstance(exc_val, CodeMessageException):
  188. # Some error codes are perfectly fine for some APIs, whereas other
  189. # APIs may expect to never received e.g. a 404. It's important to
  190. # handle 404 as some remote servers will return a 404 when the HS
  191. # has been decommissioned.
  192. # If we get a 401, then we should probably back off since they
  193. # won't accept our requests for at least a while.
  194. # 429 is us being aggressively rate limited, so lets rate limit
  195. # ourselves.
  196. if self.backoff_on_all_error_codes:
  197. valid_err_code = False
  198. elif exc_val.code == 404 and self.backoff_on_404:
  199. valid_err_code = False
  200. elif exc_val.code in (401, 429):
  201. valid_err_code = False
  202. elif exc_val.code < 500:
  203. valid_err_code = True
  204. else:
  205. valid_err_code = False
  206. # Whether previous requests to the destination had been failing.
  207. previously_failing = bool(self.failure_ts)
  208. if success:
  209. # We connected successfully.
  210. if not self.retry_interval:
  211. return
  212. logger.debug(
  213. "Connection to %s was successful; clearing backoff", self.destination
  214. )
  215. self.failure_ts = None
  216. retry_last_ts = 0
  217. self.retry_interval = 0
  218. elif valid_err_code:
  219. # We got a potentially valid error code back. We don't reset the
  220. # timers though, as the other side might actually be down anyway
  221. # (e.g. some deprovisioned servers will always return a 404 or 403,
  222. # and we don't want to keep resetting the retry timers for them).
  223. return
  224. elif not self.backoff_on_failure:
  225. return
  226. else:
  227. # We couldn't connect.
  228. if self.retry_interval:
  229. self.retry_interval = int(
  230. self.retry_interval
  231. * self.destination_retry_multiplier
  232. * random.uniform(0.8, 1.4)
  233. )
  234. if self.retry_interval >= self.destination_max_retry_interval_ms:
  235. self.retry_interval = self.destination_max_retry_interval_ms
  236. else:
  237. self.retry_interval = self.destination_min_retry_interval_ms
  238. logger.info(
  239. "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
  240. self.destination,
  241. exc_type,
  242. exc_val,
  243. self.retry_interval,
  244. )
  245. retry_last_ts = int(self.clock.time_msec())
  246. if self.failure_ts is None:
  247. self.failure_ts = retry_last_ts
  248. # Whether the current request to the destination had been failing.
  249. currently_failing = bool(self.failure_ts)
  250. async def store_retry_timings() -> None:
  251. try:
  252. await self.store.set_destination_retry_timings(
  253. self.destination,
  254. self.failure_ts,
  255. retry_last_ts,
  256. self.retry_interval,
  257. )
  258. # If the server was previously failing, but is no longer.
  259. if previously_failing and not currently_failing:
  260. if self.notifier:
  261. # Inform the relevant places that the remote server is back up.
  262. self.notifier.notify_remote_server_up(self.destination)
  263. if self.replication_client:
  264. # Inform other workers that the remote server is up.
  265. self.replication_client.send_remote_server_up(self.destination)
  266. except Exception:
  267. logger.exception("Failed to store destination_retry_timings")
  268. # we deliberately do this in the background.
  269. run_as_background_process("store_retry_timings", store_retry_timings)