You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

751 lines
28 KiB

  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import abc
  15. import logging
  16. from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
  17. from prometheus_client import Counter
  18. from twisted.internet import defer
  19. import synapse.metrics
  20. from synapse.api.presence import UserPresenceState
  21. from synapse.events import EventBase
  22. from synapse.federation.sender.per_destination_queue import PerDestinationQueue
  23. from synapse.federation.sender.transaction_manager import TransactionManager
  24. from synapse.federation.units import Edu
  25. from synapse.handlers.presence import get_interested_remotes
  26. from synapse.logging.context import (
  27. make_deferred_yieldable,
  28. preserve_fn,
  29. run_in_background,
  30. )
  31. from synapse.metrics import (
  32. LaterGauge,
  33. event_processing_loop_counter,
  34. event_processing_loop_room_count,
  35. events_processed_counter,
  36. )
  37. from synapse.metrics.background_process_metrics import run_as_background_process
  38. from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
  39. from synapse.util.metrics import Measure, measure_func
  40. if TYPE_CHECKING:
  41. from synapse.events.presence_router import PresenceRouter
  42. from synapse.server import HomeServer
  43. logger = logging.getLogger(__name__)
  44. sent_pdus_destination_dist_count = Counter(
  45. "synapse_federation_client_sent_pdu_destinations:count",
  46. "Number of PDUs queued for sending to one or more destinations",
  47. )
  48. sent_pdus_destination_dist_total = Counter(
  49. "synapse_federation_client_sent_pdu_destinations:total",
  50. "Total number of PDUs queued for sending across all destinations",
  51. )
  52. # Time (in s) after Synapse's startup that we will begin to wake up destinations
  53. # that have catch-up outstanding.
  54. CATCH_UP_STARTUP_DELAY_SEC = 15
  55. # Time (in s) to wait in between waking up each destination, i.e. one destination
  56. # will be woken up every <x> seconds after Synapse's startup until we have woken
  57. # every destination has outstanding catch-up.
  58. CATCH_UP_STARTUP_INTERVAL_SEC = 5
  59. class AbstractFederationSender(metaclass=abc.ABCMeta):
  60. @abc.abstractmethod
  61. def notify_new_events(self, max_token: RoomStreamToken) -> None:
  62. """This gets called when we have some new events we might want to
  63. send out to other servers.
  64. """
  65. raise NotImplementedError()
  66. @abc.abstractmethod
  67. async def send_read_receipt(self, receipt: ReadReceipt) -> None:
  68. """Send a RR to any other servers in the room
  69. Args:
  70. receipt: receipt to be sent
  71. """
  72. raise NotImplementedError()
  73. @abc.abstractmethod
  74. def send_presence(self, states: List[UserPresenceState]) -> None:
  75. """Send the new presence states to the appropriate destinations.
  76. This actually queues up the presence states ready for sending and
  77. triggers a background task to process them and send out the transactions.
  78. """
  79. raise NotImplementedError()
  80. @abc.abstractmethod
  81. def send_presence_to_destinations(
  82. self, states: Iterable[UserPresenceState], destinations: Iterable[str]
  83. ) -> None:
  84. """Send the given presence states to the given destinations.
  85. Args:
  86. destinations:
  87. """
  88. raise NotImplementedError()
  89. @abc.abstractmethod
  90. def build_and_send_edu(
  91. self,
  92. destination: str,
  93. edu_type: str,
  94. content: JsonDict,
  95. key: Optional[Hashable] = None,
  96. ) -> None:
  97. """Construct an Edu object, and queue it for sending
  98. Args:
  99. destination: name of server to send to
  100. edu_type: type of EDU to send
  101. content: content of EDU
  102. key: clobbering key for this edu
  103. """
  104. raise NotImplementedError()
  105. @abc.abstractmethod
  106. def send_device_messages(self, destination: str) -> None:
  107. raise NotImplementedError()
  108. @abc.abstractmethod
  109. def wake_destination(self, destination: str) -> None:
  110. """Called when we want to retry sending transactions to a remote.
  111. This is mainly useful if the remote server has been down and we think it
  112. might have come back.
  113. """
  114. raise NotImplementedError()
  115. @abc.abstractmethod
  116. def get_current_token(self) -> int:
  117. raise NotImplementedError()
  118. @abc.abstractmethod
  119. def federation_ack(self, instance_name: str, token: int) -> None:
  120. raise NotImplementedError()
  121. @abc.abstractmethod
  122. async def get_replication_rows(
  123. self, instance_name: str, from_token: int, to_token: int, target_row_count: int
  124. ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
  125. raise NotImplementedError()
  126. class FederationSender(AbstractFederationSender):
  127. def __init__(self, hs: "HomeServer"):
  128. self.hs = hs
  129. self.server_name = hs.hostname
  130. self.store = hs.get_datastore()
  131. self.state = hs.get_state_handler()
  132. self.clock = hs.get_clock()
  133. self.is_mine_id = hs.is_mine_id
  134. self._presence_router = None # type: Optional[PresenceRouter]
  135. self._transaction_manager = TransactionManager(hs)
  136. self._instance_name = hs.get_instance_name()
  137. self._federation_shard_config = hs.config.worker.federation_shard_config
  138. # map from destination to PerDestinationQueue
  139. self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
  140. LaterGauge(
  141. "synapse_federation_transaction_queue_pending_destinations",
  142. "",
  143. [],
  144. lambda: sum(
  145. 1
  146. for d in self._per_destination_queues.values()
  147. if d.transmission_loop_running
  148. ),
  149. )
  150. # Map of user_id -> UserPresenceState for all the pending presence
  151. # to be sent out by user_id. Entries here get processed and put in
  152. # pending_presence_by_dest
  153. self.pending_presence = {} # type: Dict[str, UserPresenceState]
  154. LaterGauge(
  155. "synapse_federation_transaction_queue_pending_pdus",
  156. "",
  157. [],
  158. lambda: sum(
  159. d.pending_pdu_count() for d in self._per_destination_queues.values()
  160. ),
  161. )
  162. LaterGauge(
  163. "synapse_federation_transaction_queue_pending_edus",
  164. "",
  165. [],
  166. lambda: sum(
  167. d.pending_edu_count() for d in self._per_destination_queues.values()
  168. ),
  169. )
  170. self._is_processing = False
  171. self._last_poked_id = -1
  172. self._processing_pending_presence = False
  173. # map from room_id to a set of PerDestinationQueues which we believe are
  174. # awaiting a call to flush_read_receipts_for_room. The presence of an entry
  175. # here for a given room means that we are rate-limiting RR flushes to that room,
  176. # and that there is a pending call to _flush_rrs_for_room in the system.
  177. self._queues_awaiting_rr_flush_by_room = (
  178. {}
  179. ) # type: Dict[str, Set[PerDestinationQueue]]
  180. self._rr_txn_interval_per_room_ms = (
  181. 1000.0 / hs.config.federation_rr_transactions_per_room_per_second
  182. )
  183. # wake up destinations that have outstanding PDUs to be caught up
  184. self._catchup_after_startup_timer = self.clock.call_later(
  185. CATCH_UP_STARTUP_DELAY_SEC,
  186. run_as_background_process,
  187. "wake_destinations_needing_catchup",
  188. self._wake_destinations_needing_catchup,
  189. )
  190. self._external_cache = hs.get_external_cache()
  191. def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
  192. """Get or create a PerDestinationQueue for the given destination
  193. Args:
  194. destination: server_name of remote server
  195. """
  196. queue = self._per_destination_queues.get(destination)
  197. if not queue:
  198. queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
  199. self._per_destination_queues[destination] = queue
  200. return queue
  201. def notify_new_events(self, max_token: RoomStreamToken) -> None:
  202. """This gets called when we have some new events we might want to
  203. send out to other servers.
  204. """
  205. # We just use the minimum stream ordering and ignore the vector clock
  206. # component. This is safe to do as long as we *always* ignore the vector
  207. # clock components.
  208. current_id = max_token.stream
  209. self._last_poked_id = max(current_id, self._last_poked_id)
  210. if self._is_processing:
  211. return
  212. # fire off a processing loop in the background
  213. run_as_background_process(
  214. "process_event_queue_for_federation", self._process_event_queue_loop
  215. )
  216. async def _process_event_queue_loop(self) -> None:
  217. try:
  218. self._is_processing = True
  219. while True:
  220. last_token = await self.store.get_federation_out_pos("events")
  221. next_token, events = await self.store.get_all_new_events_stream(
  222. last_token, self._last_poked_id, limit=100
  223. )
  224. logger.debug("Handling %s -> %s", last_token, next_token)
  225. if not events and next_token >= self._last_poked_id:
  226. break
  227. async def handle_event(event: EventBase) -> None:
  228. # Only send events for this server.
  229. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
  230. is_mine = self.is_mine_id(event.sender)
  231. if not is_mine and send_on_behalf_of is None:
  232. return
  233. if not event.internal_metadata.should_proactively_send():
  234. return
  235. destinations = None # type: Optional[Set[str]]
  236. if not event.prev_event_ids():
  237. # If there are no prev event IDs then the state is empty
  238. # and so no remote servers in the room
  239. destinations = set()
  240. else:
  241. # We check the external cache for the destinations, which is
  242. # stored per state group.
  243. sg = await self._external_cache.get(
  244. "event_to_prev_state_group", event.event_id
  245. )
  246. if sg:
  247. destinations = await self._external_cache.get(
  248. "get_joined_hosts", str(sg)
  249. )
  250. if destinations is None:
  251. try:
  252. # Get the state from before the event.
  253. # We need to make sure that this is the state from before
  254. # the event and not from after it.
  255. # Otherwise if the last member on a server in a room is
  256. # banned then it won't receive the event because it won't
  257. # be in the room after the ban.
  258. destinations = await self.state.get_hosts_in_room_at_events(
  259. event.room_id, event_ids=event.prev_event_ids()
  260. )
  261. except Exception:
  262. logger.exception(
  263. "Failed to calculate hosts in room for event: %s",
  264. event.event_id,
  265. )
  266. return
  267. destinations = {
  268. d
  269. for d in destinations
  270. if self._federation_shard_config.should_handle(
  271. self._instance_name, d
  272. )
  273. }
  274. if send_on_behalf_of is not None:
  275. # If we are sending the event on behalf of another server
  276. # then it already has the event and there is no reason to
  277. # send the event to it.
  278. destinations.discard(send_on_behalf_of)
  279. logger.debug("Sending %s to %r", event, destinations)
  280. if destinations:
  281. await self._send_pdu(event, destinations)
  282. now = self.clock.time_msec()
  283. ts = await self.store.get_received_ts(event.event_id)
  284. synapse.metrics.event_processing_lag_by_event.labels(
  285. "federation_sender"
  286. ).observe((now - ts) / 1000)
  287. async def handle_room_events(events: Iterable[EventBase]) -> None:
  288. with Measure(self.clock, "handle_room_events"):
  289. for event in events:
  290. await handle_event(event)
  291. events_by_room = {} # type: Dict[str, List[EventBase]]
  292. for event in events:
  293. events_by_room.setdefault(event.room_id, []).append(event)
  294. await make_deferred_yieldable(
  295. defer.gatherResults(
  296. [
  297. run_in_background(handle_room_events, evs)
  298. for evs in events_by_room.values()
  299. ],
  300. consumeErrors=True,
  301. )
  302. )
  303. await self.store.update_federation_out_pos("events", next_token)
  304. if events:
  305. now = self.clock.time_msec()
  306. ts = await self.store.get_received_ts(events[-1].event_id)
  307. synapse.metrics.event_processing_lag.labels(
  308. "federation_sender"
  309. ).set(now - ts)
  310. synapse.metrics.event_processing_last_ts.labels(
  311. "federation_sender"
  312. ).set(ts)
  313. events_processed_counter.inc(len(events))
  314. event_processing_loop_room_count.labels("federation_sender").inc(
  315. len(events_by_room)
  316. )
  317. event_processing_loop_counter.labels("federation_sender").inc()
  318. synapse.metrics.event_processing_positions.labels(
  319. "federation_sender"
  320. ).set(next_token)
  321. finally:
  322. self._is_processing = False
  323. async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
  324. # We loop through all destinations to see whether we already have
  325. # a transaction in progress. If we do, stick it in the pending_pdus
  326. # table and we'll get back to it later.
  327. destinations = set(destinations)
  328. destinations.discard(self.server_name)
  329. logger.debug("Sending to: %s", str(destinations))
  330. if not destinations:
  331. return
  332. sent_pdus_destination_dist_total.inc(len(destinations))
  333. sent_pdus_destination_dist_count.inc()
  334. assert pdu.internal_metadata.stream_ordering
  335. # track the fact that we have a PDU for these destinations,
  336. # to allow us to perform catch-up later on if the remote is unreachable
  337. # for a while.
  338. await self.store.store_destination_rooms_entries(
  339. destinations,
  340. pdu.room_id,
  341. pdu.internal_metadata.stream_ordering,
  342. )
  343. for destination in destinations:
  344. self._get_per_destination_queue(destination).send_pdu(pdu)
  345. async def send_read_receipt(self, receipt: ReadReceipt) -> None:
  346. """Send a RR to any other servers in the room
  347. Args:
  348. receipt: receipt to be sent
  349. """
  350. # Some background on the rate-limiting going on here.
  351. #
  352. # It turns out that if we attempt to send out RRs as soon as we get them from
  353. # a client, then we end up trying to do several hundred Hz of federation
  354. # transactions. (The number of transactions scales as O(N^2) on the size of a
  355. # room, since in a large room we have both more RRs coming in, and more servers
  356. # to send them to.)
  357. #
  358. # This leads to a lot of CPU load, and we end up getting behind. The solution
  359. # currently adopted is as follows:
  360. #
  361. # The first receipt in a given room is sent out immediately, at time T0. Any
  362. # further receipts are, in theory, batched up for N seconds, where N is calculated
  363. # based on the number of servers in the room to achieve a transaction frequency
  364. # of around 50Hz. So, for example, if there were 100 servers in the room, then
  365. # N would be 100 / 50Hz = 2 seconds.
  366. #
  367. # Then, after T+N, we flush out any receipts that have accumulated, and restart
  368. # the timer to flush out more receipts at T+2N, etc. If no receipts accumulate,
  369. # we stop the cycle and go back to the start.
  370. #
  371. # However, in practice, it is often possible to flush out receipts earlier: in
  372. # particular, if we are sending a transaction to a given server anyway (for
  373. # example, because we have a PDU or a RR in another room to send), then we may
  374. # as well send out all of the pending RRs for that server. So it may be that
  375. # by the time we get to T+N, we don't actually have any RRs left to send out.
  376. # Nevertheless we continue to buffer up RRs for the room in question until we
  377. # reach the point that no RRs arrive between timer ticks.
  378. #
  379. # For even more background, see https://github.com/matrix-org/synapse/issues/4730.
  380. room_id = receipt.room_id
  381. # Work out which remote servers should be poked and poke them.
  382. domains_set = await self.state.get_current_hosts_in_room(room_id)
  383. domains = [
  384. d
  385. for d in domains_set
  386. if d != self.server_name
  387. and self._federation_shard_config.should_handle(self._instance_name, d)
  388. ]
  389. if not domains:
  390. return
  391. queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id)
  392. # if there is no flush yet scheduled, we will send out these receipts with
  393. # immediate flushes, and schedule the next flush for this room.
  394. if queues_pending_flush is not None:
  395. logger.debug("Queuing receipt for: %r", domains)
  396. else:
  397. logger.debug("Sending receipt to: %r", domains)
  398. self._schedule_rr_flush_for_room(room_id, len(domains))
  399. for domain in domains:
  400. queue = self._get_per_destination_queue(domain)
  401. queue.queue_read_receipt(receipt)
  402. # if there is already a RR flush pending for this room, then make sure this
  403. # destination is registered for the flush
  404. if queues_pending_flush is not None:
  405. queues_pending_flush.add(queue)
  406. else:
  407. queue.flush_read_receipts_for_room(room_id)
  408. def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
  409. # that is going to cause approximately len(domains) transactions, so now back
  410. # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
  411. backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
  412. logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
  413. self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
  414. self._queues_awaiting_rr_flush_by_room[room_id] = set()
  415. def _flush_rrs_for_room(self, room_id: str) -> None:
  416. queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
  417. logger.debug("Flushing RRs in %s to %s", room_id, queues)
  418. if not queues:
  419. # no more RRs arrived for this room; we are done.
  420. return
  421. # schedule the next flush
  422. self._schedule_rr_flush_for_room(room_id, len(queues))
  423. for queue in queues:
  424. queue.flush_read_receipts_for_room(room_id)
  425. @preserve_fn # the caller should not yield on this
  426. async def send_presence(self, states: List[UserPresenceState]) -> None:
  427. """Send the new presence states to the appropriate destinations.
  428. This actually queues up the presence states ready for sending and
  429. triggers a background task to process them and send out the transactions.
  430. """
  431. if not self.hs.config.use_presence:
  432. # No-op if presence is disabled.
  433. return
  434. # First we queue up the new presence by user ID, so multiple presence
  435. # updates in quick succession are correctly handled.
  436. # We only want to send presence for our own users, so lets always just
  437. # filter here just in case.
  438. self.pending_presence.update(
  439. {state.user_id: state for state in states if self.is_mine_id(state.user_id)}
  440. )
  441. # We then handle the new pending presence in batches, first figuring
  442. # out the destinations we need to send each state to and then poking it
  443. # to attempt a new transaction. We linearize this so that we don't
  444. # accidentally mess up the ordering and send multiple presence updates
  445. # in the wrong order
  446. if self._processing_pending_presence:
  447. return
  448. self._processing_pending_presence = True
  449. try:
  450. while True:
  451. states_map = self.pending_presence
  452. self.pending_presence = {}
  453. if not states_map:
  454. break
  455. await self._process_presence_inner(list(states_map.values()))
  456. except Exception:
  457. logger.exception("Error sending presence states to servers")
  458. finally:
  459. self._processing_pending_presence = False
  460. def send_presence_to_destinations(
  461. self, states: Iterable[UserPresenceState], destinations: Iterable[str]
  462. ) -> None:
  463. """Send the given presence states to the given destinations.
  464. destinations (list[str])
  465. """
  466. if not states or not self.hs.config.use_presence:
  467. # No-op if presence is disabled.
  468. return
  469. for destination in destinations:
  470. if destination == self.server_name:
  471. continue
  472. if not self._federation_shard_config.should_handle(
  473. self._instance_name, destination
  474. ):
  475. continue
  476. self._get_per_destination_queue(destination).send_presence(states)
  477. @measure_func("txnqueue._process_presence")
  478. async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
  479. """Given a list of states populate self.pending_presence_by_dest and
  480. poke to send a new transaction to each destination
  481. """
  482. # We pull the presence router here instead of __init__
  483. # to prevent a dependency cycle:
  484. #
  485. # AuthHandler -> Notifier -> FederationSender
  486. # -> PresenceRouter -> ModuleApi -> AuthHandler
  487. if self._presence_router is None:
  488. self._presence_router = self.hs.get_presence_router()
  489. assert self._presence_router is not None
  490. hosts_and_states = await get_interested_remotes(
  491. self.store,
  492. self._presence_router,
  493. states,
  494. self.state,
  495. )
  496. for destinations, states in hosts_and_states:
  497. for destination in destinations:
  498. if destination == self.server_name:
  499. continue
  500. if not self._federation_shard_config.should_handle(
  501. self._instance_name, destination
  502. ):
  503. continue
  504. self._get_per_destination_queue(destination).send_presence(states)
  505. def build_and_send_edu(
  506. self,
  507. destination: str,
  508. edu_type: str,
  509. content: JsonDict,
  510. key: Optional[Hashable] = None,
  511. ) -> None:
  512. """Construct an Edu object, and queue it for sending
  513. Args:
  514. destination: name of server to send to
  515. edu_type: type of EDU to send
  516. content: content of EDU
  517. key: clobbering key for this edu
  518. """
  519. if destination == self.server_name:
  520. logger.info("Not sending EDU to ourselves")
  521. return
  522. if not self._federation_shard_config.should_handle(
  523. self._instance_name, destination
  524. ):
  525. return
  526. edu = Edu(
  527. origin=self.server_name,
  528. destination=destination,
  529. edu_type=edu_type,
  530. content=content,
  531. )
  532. self.send_edu(edu, key)
  533. def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
  534. """Queue an EDU for sending
  535. Args:
  536. edu: edu to send
  537. key: clobbering key for this edu
  538. """
  539. if not self._federation_shard_config.should_handle(
  540. self._instance_name, edu.destination
  541. ):
  542. return
  543. queue = self._get_per_destination_queue(edu.destination)
  544. if key:
  545. queue.send_keyed_edu(edu, key)
  546. else:
  547. queue.send_edu(edu)
  548. def send_device_messages(self, destination: str) -> None:
  549. if destination == self.server_name:
  550. logger.warning("Not sending device update to ourselves")
  551. return
  552. if not self._federation_shard_config.should_handle(
  553. self._instance_name, destination
  554. ):
  555. return
  556. self._get_per_destination_queue(destination).attempt_new_transaction()
  557. def wake_destination(self, destination: str) -> None:
  558. """Called when we want to retry sending transactions to a remote.
  559. This is mainly useful if the remote server has been down and we think it
  560. might have come back.
  561. """
  562. if destination == self.server_name:
  563. logger.warning("Not waking up ourselves")
  564. return
  565. if not self._federation_shard_config.should_handle(
  566. self._instance_name, destination
  567. ):
  568. return
  569. self._get_per_destination_queue(destination).attempt_new_transaction()
  570. @staticmethod
  571. def get_current_token() -> int:
  572. # Dummy implementation for case where federation sender isn't offloaded
  573. # to a worker.
  574. return 0
  575. def federation_ack(self, instance_name: str, token: int) -> None:
  576. # It is not expected that this gets called on FederationSender.
  577. raise NotImplementedError()
  578. @staticmethod
  579. async def get_replication_rows(
  580. instance_name: str, from_token: int, to_token: int, target_row_count: int
  581. ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
  582. # Dummy implementation for case where federation sender isn't offloaded
  583. # to a worker.
  584. return [], 0, False
  585. async def _wake_destinations_needing_catchup(self) -> None:
  586. """
  587. Wakes up destinations that need catch-up and are not currently being
  588. backed off from.
  589. In order to reduce load spikes, adds a delay between each destination.
  590. """
  591. last_processed = None # type: Optional[str]
  592. while True:
  593. destinations_to_wake = (
  594. await self.store.get_catch_up_outstanding_destinations(last_processed)
  595. )
  596. if not destinations_to_wake:
  597. # finished waking all destinations!
  598. self._catchup_after_startup_timer = None
  599. break
  600. last_processed = destinations_to_wake[-1]
  601. destinations_to_wake = [
  602. d
  603. for d in destinations_to_wake
  604. if self._federation_shard_config.should_handle(self._instance_name, d)
  605. ]
  606. for destination in destinations_to_wake:
  607. logger.info(
  608. "Destination %s has outstanding catch-up, waking up.",
  609. last_processed,
  610. )
  611. self.wake_destination(destination)
  612. await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)