You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1021 lines
41 KiB

  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """
  15. The Federation Sender is responsible for sending Persistent Data Units (PDUs)
  16. and Ephemeral Data Units (EDUs) to other homeservers using
  17. the `/send` Federation API.
  18. ## How do PDUs get sent?
  19. The Federation Sender is made aware of new PDUs due to `FederationSender.notify_new_events`.
  20. When the sender is notified about a newly-persisted PDU that originates from this homeserver
  21. and is not an out-of-band event, we pass the PDU to the `_PerDestinationQueue` for each
  22. remote homeserver that is in the room at that point in the DAG.
  23. ### Per-Destination Queues
  24. There is one `PerDestinationQueue` per 'destination' homeserver.
  25. The `PerDestinationQueue` maintains the following information about the destination:
  26. - whether the destination is currently in [catch-up mode (see below)](#catch-up-mode);
  27. - a queue of PDUs to be sent to the destination; and
  28. - a queue of EDUs to be sent to the destination (not considered in this section).
  29. Upon a new PDU being enqueued, `attempt_new_transaction` is called to start a new
  30. transaction if there is not already one in progress.
  31. ### Transactions and the Transaction Transmission Loop
  32. Each federation HTTP request to the `/send` endpoint is referred to as a 'transaction'.
  33. The body of the HTTP request contains a list of PDUs and EDUs to send to the destination.
  34. The *Transaction Transmission Loop* (`_transaction_transmission_loop`) is responsible
  35. for emptying the queued PDUs (and EDUs) from a `PerDestinationQueue` by sending
  36. them to the destination.
  37. There can only be one transaction in flight for a given destination at any time.
  38. (Other than preventing us from overloading the destination, this also makes it easier to
  39. reason about because we process events sequentially for each destination.
  40. This is useful for *Catch-Up Mode*, described later.)
  41. The loop continues so long as there is anything to send. At each iteration of the loop, we:
  42. - dequeue up to 50 PDUs (and up to 100 EDUs).
  43. - make the `/send` request to the destination homeserver with the dequeued PDUs and EDUs.
  44. - if successful, make note of the fact that we succeeded in transmitting PDUs up to
  45. the given `stream_ordering` of the latest PDU by
  46. - if unsuccessful, back off from the remote homeserver for some time.
  47. If we have been unsuccessful for too long (when the backoff interval grows to exceed 1 hour),
  48. the in-memory queues are emptied and we enter [*Catch-Up Mode*, described below](#catch-up-mode).
  49. ### Catch-Up Mode
  50. When the `PerDestinationQueue` has the catch-up flag set, the *Catch-Up Transmission Loop*
  51. (`_catch_up_transmission_loop`) is used in lieu of the regular `_transaction_transmission_loop`.
  52. (Only once the catch-up mode has been exited can the regular transaction transmission behaviour
  53. be resumed.)
  54. *Catch-Up Mode*, entered upon Synapse startup or once a homeserver has fallen behind due to
  55. connection problems, is responsible for sending PDUs that have been missed by the destination
  56. homeserver. (PDUs can be missed because the `PerDestinationQueue` is volatile — i.e. resets
  57. on startup — and it does not hold PDUs forever if `/send` requests to the destination fail.)
  58. The catch-up mechanism makes use of the `last_successful_stream_ordering` column in the
  59. `destinations` table (which gives the `stream_ordering` of the most recent successfully
  60. sent PDU) and the `stream_ordering` column in the `destination_rooms` table (which gives,
  61. for each room, the `stream_ordering` of the most recent PDU that needs to be sent to this
  62. destination).
  63. Each iteration of the loop pulls out 50 `destination_rooms` entries with the oldest
  64. `stream_ordering`s that are greater than the `last_successful_stream_ordering`.
  65. In other words, from the set of latest PDUs in each room to be sent to the destination,
  66. the 50 oldest such PDUs are pulled out.
  67. These PDUs could, in principle, now be directly sent to the destination. However, as an
  68. optimisation intended to prevent overloading destination homeservers, we instead attempt
  69. to send the latest forward extremities so long as the destination homeserver is still
  70. eligible to receive those.
  71. This reduces load on the destination **in aggregate** because all Synapse homeservers
  72. will behave according to this principle and therefore avoid sending lots of different PDUs
  73. at different points in the DAG to a recovering homeserver.
  74. *This optimisation is not currently valid in rooms which are partial-state on this homeserver,
  75. since we are unable to determine whether the destination homeserver is eligible to receive
  76. the latest forward extremities unless this homeserver sent those PDUs — in this case, we
  77. just send the latest PDUs originating from this server and skip this optimisation.*
  78. Whilst PDUs are sent through this mechanism, the position of `last_successful_stream_ordering`
  79. is advanced as normal.
  80. Once there are no longer any rooms containing outstanding PDUs to be sent to the destination
  81. *that are not already in the `PerDestinationQueue` because they arrived since Catch-Up Mode
  82. was enabled*, Catch-Up Mode is exited and we return to `_transaction_transmission_loop`.
  83. #### A note on failures and back-offs
  84. If a remote server is unreachable over federation, we back off from that server,
  85. with an exponentially-increasing retry interval.
  86. We automatically retry after the retry interval expires (roughly, the logic to do so
  87. being triggered every minute).
  88. If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent
  89. unbounded growth) and Catch-Up Mode is entered.
  90. It is worth noting that the back-off for a remote server is cleared once an inbound
  91. request from that remote server is received (see `notify_remote_server_up`).
  92. At this point, the transaction transmission loop is also started up, to proactively
  93. send missed PDUs and EDUs to the destination (i.e. you don't need to wait for a new PDU
  94. or EDU, destined for that destination, to be created in order to send out missed PDUs and
  95. EDUs).
  96. """
  97. import abc
  98. import logging
  99. from collections import OrderedDict
  100. from typing import (
  101. TYPE_CHECKING,
  102. Collection,
  103. Dict,
  104. Hashable,
  105. Iterable,
  106. List,
  107. Optional,
  108. Set,
  109. Tuple,
  110. )
  111. import attr
  112. from prometheus_client import Counter
  113. from typing_extensions import Literal
  114. from twisted.internet import defer
  115. import synapse.metrics
  116. from synapse.api.presence import UserPresenceState
  117. from synapse.events import EventBase
  118. from synapse.federation.sender.per_destination_queue import (
  119. CATCHUP_RETRY_INTERVAL,
  120. PerDestinationQueue,
  121. )
  122. from synapse.federation.sender.transaction_manager import TransactionManager
  123. from synapse.federation.units import Edu
  124. from synapse.logging.context import make_deferred_yieldable, run_in_background
  125. from synapse.metrics import (
  126. LaterGauge,
  127. event_processing_loop_counter,
  128. event_processing_loop_room_count,
  129. events_processed_counter,
  130. )
  131. from synapse.metrics.background_process_metrics import (
  132. run_as_background_process,
  133. wrap_as_background_process,
  134. )
  135. from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
  136. from synapse.util import Clock
  137. from synapse.util.metrics import Measure
  138. from synapse.util.retryutils import filter_destinations_by_retry_limiter
  139. if TYPE_CHECKING:
  140. from synapse.events.presence_router import PresenceRouter
  141. from synapse.server import HomeServer
  142. logger = logging.getLogger(__name__)
  143. sent_pdus_destination_dist_count = Counter(
  144. "synapse_federation_client_sent_pdu_destinations_count",
  145. "Number of PDUs queued for sending to one or more destinations",
  146. )
  147. sent_pdus_destination_dist_total = Counter(
  148. "synapse_federation_client_sent_pdu_destinations",
  149. "Total number of PDUs queued for sending across all destinations",
  150. )
  151. # Time (in s) to wait before trying to wake up destinations that have
  152. # catch-up outstanding. This will also be the delay applied at startup
  153. # before trying the same.
  154. # Please note that rate limiting still applies, so while the loop is
  155. # executed every X seconds the destinations may not be wake up because
  156. # they are being rate limited following previous attempt failures.
  157. WAKEUP_RETRY_PERIOD_SEC = 60
  158. # Time (in s) to wait in between waking up each destination, i.e. one destination
  159. # will be woken up every <x> seconds until we have woken every destination
  160. # has outstanding catch-up.
  161. WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5
  162. class AbstractFederationSender(metaclass=abc.ABCMeta):
  163. @abc.abstractmethod
  164. def notify_new_events(self, max_token: RoomStreamToken) -> None:
  165. """This gets called when we have some new events we might want to
  166. send out to other servers.
  167. """
  168. raise NotImplementedError()
  169. @abc.abstractmethod
  170. async def send_read_receipt(self, receipt: ReadReceipt) -> None:
  171. """Send a RR to any other servers in the room
  172. Args:
  173. receipt: receipt to be sent
  174. """
  175. raise NotImplementedError()
  176. @abc.abstractmethod
  177. async def send_presence_to_destinations(
  178. self, states: Iterable[UserPresenceState], destinations: Iterable[str]
  179. ) -> None:
  180. """Send the given presence states to the given destinations.
  181. Args:
  182. destinations:
  183. """
  184. raise NotImplementedError()
  185. @abc.abstractmethod
  186. def build_and_send_edu(
  187. self,
  188. destination: str,
  189. edu_type: str,
  190. content: JsonDict,
  191. key: Optional[Hashable] = None,
  192. ) -> None:
  193. """Construct an Edu object, and queue it for sending
  194. Args:
  195. destination: name of server to send to
  196. edu_type: type of EDU to send
  197. content: content of EDU
  198. key: clobbering key for this edu
  199. """
  200. raise NotImplementedError()
  201. @abc.abstractmethod
  202. async def send_device_messages(
  203. self, destinations: StrCollection, immediate: bool = True
  204. ) -> None:
  205. """Tells the sender that a new device message is ready to be sent to the
  206. destinations. The `immediate` flag specifies whether the messages should
  207. be tried to be sent immediately, or whether it can be delayed for a
  208. short while (to aid performance).
  209. """
  210. raise NotImplementedError()
  211. @abc.abstractmethod
  212. def wake_destination(self, destination: str) -> None:
  213. """Called when we want to retry sending transactions to a remote.
  214. This is mainly useful if the remote server has been down and we think it
  215. might have come back.
  216. """
  217. raise NotImplementedError()
  218. @abc.abstractmethod
  219. def get_current_token(self) -> int:
  220. raise NotImplementedError()
  221. @abc.abstractmethod
  222. def federation_ack(self, instance_name: str, token: int) -> None:
  223. raise NotImplementedError()
  224. @abc.abstractmethod
  225. async def get_replication_rows(
  226. self, instance_name: str, from_token: int, to_token: int, target_row_count: int
  227. ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
  228. raise NotImplementedError()
  229. @attr.s
  230. class _DestinationWakeupQueue:
  231. """A queue of destinations that need to be woken up due to new updates.
  232. Staggers waking up of per destination queues to ensure that we don't attempt
  233. to start TLS connections with many hosts all at once, leading to pinned CPU.
  234. """
  235. # The maximum duration in seconds between queuing up a destination and it
  236. # being woken up.
  237. _MAX_TIME_IN_QUEUE = 30.0
  238. # The maximum duration in seconds between waking up consecutive destination
  239. # queues.
  240. _MAX_DELAY = 0.1
  241. sender: "FederationSender" = attr.ib()
  242. clock: Clock = attr.ib()
  243. queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
  244. processing: bool = attr.ib(default=False)
  245. def add_to_queue(self, destination: str) -> None:
  246. """Add a destination to the queue to be woken up."""
  247. self.queue[destination] = None
  248. if not self.processing:
  249. self._handle()
  250. @wrap_as_background_process("_DestinationWakeupQueue.handle")
  251. async def _handle(self) -> None:
  252. """Background process to drain the queue."""
  253. if not self.queue:
  254. return
  255. assert not self.processing
  256. self.processing = True
  257. try:
  258. # We start with a delay that should drain the queue quickly enough that
  259. # we process all destinations in the queue in _MAX_TIME_IN_QUEUE
  260. # seconds.
  261. #
  262. # We also add an upper bound to the delay, to gracefully handle the
  263. # case where the queue only has a few entries in it.
  264. current_sleep_seconds = min(
  265. self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
  266. )
  267. while self.queue:
  268. destination, _ = self.queue.popitem(last=False)
  269. queue = self.sender._get_per_destination_queue(destination)
  270. if not queue._new_data_to_send:
  271. # The per destination queue has already been woken up.
  272. continue
  273. queue.attempt_new_transaction()
  274. await self.clock.sleep(current_sleep_seconds)
  275. if not self.queue:
  276. break
  277. # More destinations may have been added to the queue, so we may
  278. # need to reduce the delay to ensure everything gets processed
  279. # within _MAX_TIME_IN_QUEUE seconds.
  280. current_sleep_seconds = min(
  281. current_sleep_seconds, self._MAX_TIME_IN_QUEUE / len(self.queue)
  282. )
  283. finally:
  284. self.processing = False
  285. class FederationSender(AbstractFederationSender):
  286. def __init__(self, hs: "HomeServer"):
  287. self.hs = hs
  288. self.server_name = hs.hostname
  289. self.store = hs.get_datastores().main
  290. self.state = hs.get_state_handler()
  291. self._storage_controllers = hs.get_storage_controllers()
  292. self.clock = hs.get_clock()
  293. self.is_mine_id = hs.is_mine_id
  294. self.is_mine_server_name = hs.is_mine_server_name
  295. self._presence_router: Optional["PresenceRouter"] = None
  296. self._transaction_manager = TransactionManager(hs)
  297. self._instance_name = hs.get_instance_name()
  298. self._federation_shard_config = hs.config.worker.federation_shard_config
  299. # map from destination to PerDestinationQueue
  300. self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
  301. LaterGauge(
  302. "synapse_federation_transaction_queue_pending_destinations",
  303. "",
  304. [],
  305. lambda: sum(
  306. 1
  307. for d in self._per_destination_queues.values()
  308. if d.transmission_loop_running
  309. ),
  310. )
  311. LaterGauge(
  312. "synapse_federation_transaction_queue_pending_pdus",
  313. "",
  314. [],
  315. lambda: sum(
  316. d.pending_pdu_count() for d in self._per_destination_queues.values()
  317. ),
  318. )
  319. LaterGauge(
  320. "synapse_federation_transaction_queue_pending_edus",
  321. "",
  322. [],
  323. lambda: sum(
  324. d.pending_edu_count() for d in self._per_destination_queues.values()
  325. ),
  326. )
  327. self._is_processing = False
  328. self._last_poked_id = -1
  329. # map from room_id to a set of PerDestinationQueues which we believe are
  330. # awaiting a call to flush_read_receipts_for_room. The presence of an entry
  331. # here for a given room means that we are rate-limiting RR flushes to that room,
  332. # and that there is a pending call to _flush_rrs_for_room in the system.
  333. self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
  334. self._rr_txn_interval_per_room_ms = (
  335. 1000.0
  336. / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
  337. )
  338. # Regularly wake up destinations that have outstanding PDUs to be caught up
  339. self.clock.looping_call(
  340. run_as_background_process,
  341. WAKEUP_RETRY_PERIOD_SEC * 1000.0,
  342. "wake_destinations_needing_catchup",
  343. self._wake_destinations_needing_catchup,
  344. )
  345. self._external_cache = hs.get_external_cache()
  346. self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
  347. def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
  348. """Get or create a PerDestinationQueue for the given destination
  349. Args:
  350. destination: server_name of remote server
  351. """
  352. queue = self._per_destination_queues.get(destination)
  353. if not queue:
  354. queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
  355. self._per_destination_queues[destination] = queue
  356. return queue
  357. def notify_new_events(self, max_token: RoomStreamToken) -> None:
  358. """This gets called when we have some new events we might want to
  359. send out to other servers.
  360. """
  361. # We just use the minimum stream ordering and ignore the vector clock
  362. # component. This is safe to do as long as we *always* ignore the vector
  363. # clock components.
  364. current_id = max_token.stream
  365. self._last_poked_id = max(current_id, self._last_poked_id)
  366. if self._is_processing:
  367. return
  368. # fire off a processing loop in the background
  369. run_as_background_process(
  370. "process_event_queue_for_federation", self._process_event_queue_loop
  371. )
  372. async def _process_event_queue_loop(self) -> None:
  373. try:
  374. self._is_processing = True
  375. while True:
  376. last_token = await self.store.get_federation_out_pos("events")
  377. (
  378. next_token,
  379. event_to_received_ts,
  380. ) = await self.store.get_all_new_event_ids_stream(
  381. last_token, self._last_poked_id, limit=100
  382. )
  383. event_ids = event_to_received_ts.keys()
  384. event_entries = await self.store.get_unredacted_events_from_cache_or_db(
  385. event_ids
  386. )
  387. logger.debug(
  388. "Handling %i -> %i: %i events to send (current id %i)",
  389. last_token,
  390. next_token,
  391. len(event_entries),
  392. self._last_poked_id,
  393. )
  394. if not event_entries and next_token >= self._last_poked_id:
  395. logger.debug("All events processed")
  396. break
  397. async def handle_event(event: EventBase) -> None:
  398. # Only send events for this server.
  399. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
  400. is_mine = self.is_mine_id(event.sender)
  401. if not is_mine and send_on_behalf_of is None:
  402. logger.debug("Not sending remote-origin event %s", event)
  403. return
  404. # We also want to not send out-of-band membership events.
  405. #
  406. # OOB memberships are used in three (and a half) situations:
  407. #
  408. # (1) invite events which we have received over federation. Those
  409. # will have a `sender` on a different server, so will be
  410. # skipped by the "is_mine" test above anyway.
  411. #
  412. # (2) rejections of invites to federated rooms - either remotely
  413. # or locally generated. (Such rejections are normally
  414. # created via federation, in which case the remote server is
  415. # responsible for sending out the rejection. If that fails,
  416. # we'll create a leave event locally, but that's only really
  417. # for the benefit of the invited user - we don't have enough
  418. # information to send it out over federation).
  419. #
  420. # (2a) rescinded knocks. These are identical to rejected invites.
  421. #
  422. # (3) knock events which we have sent over federation. As with
  423. # invite rejections, the remote server should send them out to
  424. # the federation.
  425. #
  426. # So, in all the above cases, we want to ignore such events.
  427. #
  428. # OOB memberships are always(?) outliers anyway, so if we *don't*
  429. # ignore them, we'll get an exception further down when we try to
  430. # fetch the membership list for the room.
  431. #
  432. # Arguably, we could equivalently ignore all outliers here, since
  433. # in theory the only way for an outlier with a local `sender` to
  434. # exist is by being an OOB membership (via one of (2), (2a) or (3)
  435. # above).
  436. #
  437. if event.internal_metadata.is_out_of_band_membership():
  438. logger.debug("Not sending OOB membership event %s", event)
  439. return
  440. # Finally, there are some other events that we should not send out
  441. # until someone asks for them. They are explicitly flagged as such
  442. # with `proactively_send: False`.
  443. if not event.internal_metadata.should_proactively_send():
  444. logger.debug(
  445. "Not sending event with proactively_send=false: %s", event
  446. )
  447. return
  448. destinations: Optional[Collection[str]] = None
  449. if not event.prev_event_ids():
  450. # If there are no prev event IDs then the state is empty
  451. # and so no remote servers in the room
  452. destinations = set()
  453. if destinations is None:
  454. # During partial join we use the set of servers that we got
  455. # when beginning the join. It's still possible that we send
  456. # events to servers that left the room in the meantime, but
  457. # we consider that an acceptable risk since it is only our own
  458. # events that we leak and not other server's ones.
  459. partial_state_destinations = (
  460. await self.store.get_partial_state_servers_at_join(
  461. event.room_id
  462. )
  463. )
  464. if partial_state_destinations is not None:
  465. destinations = partial_state_destinations
  466. if destinations is None:
  467. # We check the external cache for the destinations, which is
  468. # stored per state group.
  469. sg = await self._external_cache.get(
  470. "event_to_prev_state_group", event.event_id
  471. )
  472. if sg:
  473. destinations = await self._external_cache.get(
  474. "get_joined_hosts", str(sg)
  475. )
  476. if destinations is None:
  477. # Add logging to help track down https://github.com/matrix-org/synapse/issues/13444
  478. logger.info(
  479. "Unexpectedly did not have cached destinations for %s / %s",
  480. sg,
  481. event.event_id,
  482. )
  483. else:
  484. # Add logging to help track down https://github.com/matrix-org/synapse/issues/13444
  485. logger.info(
  486. "Unexpectedly did not have cached prev group for %s",
  487. event.event_id,
  488. )
  489. if destinations is None:
  490. try:
  491. # Get the state from before the event.
  492. # We need to make sure that this is the state from before
  493. # the event and not from after it.
  494. # Otherwise if the last member on a server in a room is
  495. # banned then it won't receive the event because it won't
  496. # be in the room after the ban.
  497. destinations = await self.state.get_hosts_in_room_at_events(
  498. event.room_id, event_ids=event.prev_event_ids()
  499. )
  500. except Exception:
  501. logger.exception(
  502. "Failed to calculate hosts in room for event: %s",
  503. event.event_id,
  504. )
  505. return
  506. sharded_destinations = {
  507. d
  508. for d in destinations
  509. if self._federation_shard_config.should_handle(
  510. self._instance_name, d
  511. )
  512. }
  513. if send_on_behalf_of is not None:
  514. # If we are sending the event on behalf of another server
  515. # then it already has the event and there is no reason to
  516. # send the event to it.
  517. sharded_destinations.discard(send_on_behalf_of)
  518. logger.debug("Sending %s to %r", event, sharded_destinations)
  519. if sharded_destinations:
  520. await self._send_pdu(event, sharded_destinations)
  521. now = self.clock.time_msec()
  522. ts = event_to_received_ts[event.event_id]
  523. assert ts is not None
  524. synapse.metrics.event_processing_lag_by_event.labels(
  525. "federation_sender"
  526. ).observe((now - ts) / 1000)
  527. async def handle_room_events(events: List[EventBase]) -> None:
  528. logger.debug(
  529. "Handling %i events in room %s", len(events), events[0].room_id
  530. )
  531. with Measure(self.clock, "handle_room_events"):
  532. for event in events:
  533. await handle_event(event)
  534. events_by_room: Dict[str, List[EventBase]] = {}
  535. for event_id in event_ids:
  536. # `event_entries` is unsorted, so we have to iterate over `event_ids`
  537. # to ensure the events are in the right order
  538. event_cache = event_entries.get(event_id)
  539. if event_cache:
  540. event = event_cache.event
  541. events_by_room.setdefault(event.room_id, []).append(event)
  542. await make_deferred_yieldable(
  543. defer.gatherResults(
  544. [
  545. run_in_background(handle_room_events, evs)
  546. for evs in events_by_room.values()
  547. ],
  548. consumeErrors=True,
  549. )
  550. )
  551. logger.debug("Successfully handled up to %i", next_token)
  552. await self.store.update_federation_out_pos("events", next_token)
  553. if event_entries:
  554. now = self.clock.time_msec()
  555. ts = max(t for t in event_to_received_ts.values() if t)
  556. assert ts is not None
  557. synapse.metrics.event_processing_lag.labels(
  558. "federation_sender"
  559. ).set(now - ts)
  560. synapse.metrics.event_processing_last_ts.labels(
  561. "federation_sender"
  562. ).set(ts)
  563. events_processed_counter.inc(len(event_entries))
  564. event_processing_loop_room_count.labels("federation_sender").inc(
  565. len(events_by_room)
  566. )
  567. event_processing_loop_counter.labels("federation_sender").inc()
  568. synapse.metrics.event_processing_positions.labels(
  569. "federation_sender"
  570. ).set(next_token)
  571. finally:
  572. self._is_processing = False
  573. async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
  574. # We loop through all destinations to see whether we already have
  575. # a transaction in progress. If we do, stick it in the pending_pdus
  576. # table and we'll get back to it later.
  577. destinations = set(destinations)
  578. destinations.discard(self.server_name)
  579. logger.debug("Sending to: %s", str(destinations))
  580. if not destinations:
  581. return
  582. sent_pdus_destination_dist_total.inc(len(destinations))
  583. sent_pdus_destination_dist_count.inc()
  584. assert pdu.internal_metadata.stream_ordering
  585. # track the fact that we have a PDU for these destinations,
  586. # to allow us to perform catch-up later on if the remote is unreachable
  587. # for a while.
  588. await self.store.store_destination_rooms_entries(
  589. destinations,
  590. pdu.room_id,
  591. pdu.internal_metadata.stream_ordering,
  592. )
  593. destinations = await filter_destinations_by_retry_limiter(
  594. destinations,
  595. clock=self.clock,
  596. store=self.store,
  597. retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
  598. )
  599. for destination in destinations:
  600. self._get_per_destination_queue(destination).send_pdu(pdu)
  601. async def send_read_receipt(self, receipt: ReadReceipt) -> None:
  602. """Send a RR to any other servers in the room
  603. Args:
  604. receipt: receipt to be sent
  605. """
  606. # Some background on the rate-limiting going on here.
  607. #
  608. # It turns out that if we attempt to send out RRs as soon as we get them from
  609. # a client, then we end up trying to do several hundred Hz of federation
  610. # transactions. (The number of transactions scales as O(N^2) on the size of a
  611. # room, since in a large room we have both more RRs coming in, and more servers
  612. # to send them to.)
  613. #
  614. # This leads to a lot of CPU load, and we end up getting behind. The solution
  615. # currently adopted is as follows:
  616. #
  617. # The first receipt in a given room is sent out immediately, at time T0. Any
  618. # further receipts are, in theory, batched up for N seconds, where N is calculated
  619. # based on the number of servers in the room to achieve a transaction frequency
  620. # of around 50Hz. So, for example, if there were 100 servers in the room, then
  621. # N would be 100 / 50Hz = 2 seconds.
  622. #
  623. # Then, after T+N, we flush out any receipts that have accumulated, and restart
  624. # the timer to flush out more receipts at T+2N, etc. If no receipts accumulate,
  625. # we stop the cycle and go back to the start.
  626. #
  627. # However, in practice, it is often possible to flush out receipts earlier: in
  628. # particular, if we are sending a transaction to a given server anyway (for
  629. # example, because we have a PDU or a RR in another room to send), then we may
  630. # as well send out all of the pending RRs for that server. So it may be that
  631. # by the time we get to T+N, we don't actually have any RRs left to send out.
  632. # Nevertheless we continue to buffer up RRs for the room in question until we
  633. # reach the point that no RRs arrive between timer ticks.
  634. #
  635. # For even more background, see https://github.com/matrix-org/synapse/issues/4730.
  636. room_id = receipt.room_id
  637. # Work out which remote servers should be poked and poke them.
  638. domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
  639. room_id
  640. )
  641. domains: StrCollection = [
  642. d
  643. for d in domains_set
  644. if not self.is_mine_server_name(d)
  645. and self._federation_shard_config.should_handle(self._instance_name, d)
  646. ]
  647. domains = await filter_destinations_by_retry_limiter(
  648. domains,
  649. clock=self.clock,
  650. store=self.store,
  651. retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
  652. )
  653. if not domains:
  654. return
  655. queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id)
  656. # if there is no flush yet scheduled, we will send out these receipts with
  657. # immediate flushes, and schedule the next flush for this room.
  658. if queues_pending_flush is not None:
  659. logger.debug("Queuing receipt for: %r", domains)
  660. else:
  661. logger.debug("Sending receipt to: %r", domains)
  662. self._schedule_rr_flush_for_room(room_id, len(domains))
  663. for domain in domains:
  664. queue = self._get_per_destination_queue(domain)
  665. queue.queue_read_receipt(receipt)
  666. # if there is already a RR flush pending for this room, then make sure this
  667. # destination is registered for the flush
  668. if queues_pending_flush is not None:
  669. queues_pending_flush.add(queue)
  670. else:
  671. queue.flush_read_receipts_for_room(room_id)
  672. def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
  673. # that is going to cause approximately len(domains) transactions, so now back
  674. # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
  675. backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
  676. logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
  677. self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
  678. self._queues_awaiting_rr_flush_by_room[room_id] = set()
  679. def _flush_rrs_for_room(self, room_id: str) -> None:
  680. queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
  681. logger.debug("Flushing RRs in %s to %s", room_id, queues)
  682. if not queues:
  683. # no more RRs arrived for this room; we are done.
  684. return
  685. # schedule the next flush
  686. self._schedule_rr_flush_for_room(room_id, len(queues))
  687. for queue in queues:
  688. queue.flush_read_receipts_for_room(room_id)
  689. async def send_presence_to_destinations(
  690. self, states: Iterable[UserPresenceState], destinations: Iterable[str]
  691. ) -> None:
  692. """Send the given presence states to the given destinations.
  693. destinations (list[str])
  694. """
  695. if not states or not self.hs.config.server.track_presence:
  696. # No-op if presence is disabled.
  697. return
  698. # Ensure we only send out presence states for local users.
  699. for state in states:
  700. assert self.is_mine_id(state.user_id)
  701. destinations = await filter_destinations_by_retry_limiter(
  702. [
  703. d
  704. for d in destinations
  705. if self._federation_shard_config.should_handle(self._instance_name, d)
  706. ],
  707. clock=self.clock,
  708. store=self.store,
  709. retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
  710. )
  711. for destination in destinations:
  712. if self.is_mine_server_name(destination):
  713. continue
  714. self._get_per_destination_queue(destination).send_presence(
  715. states, start_loop=False
  716. )
  717. self._destination_wakeup_queue.add_to_queue(destination)
  718. def build_and_send_edu(
  719. self,
  720. destination: str,
  721. edu_type: str,
  722. content: JsonDict,
  723. key: Optional[Hashable] = None,
  724. ) -> None:
  725. """Construct an Edu object, and queue it for sending
  726. Args:
  727. destination: name of server to send to
  728. edu_type: type of EDU to send
  729. content: content of EDU
  730. key: clobbering key for this edu
  731. """
  732. if self.is_mine_server_name(destination):
  733. logger.info("Not sending EDU to ourselves")
  734. return
  735. if not self._federation_shard_config.should_handle(
  736. self._instance_name, destination
  737. ):
  738. return
  739. edu = Edu(
  740. origin=self.server_name,
  741. destination=destination,
  742. edu_type=edu_type,
  743. content=content,
  744. )
  745. self.send_edu(edu, key)
  746. def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
  747. """Queue an EDU for sending
  748. Args:
  749. edu: edu to send
  750. key: clobbering key for this edu
  751. """
  752. if not self._federation_shard_config.should_handle(
  753. self._instance_name, edu.destination
  754. ):
  755. return
  756. queue = self._get_per_destination_queue(edu.destination)
  757. if key:
  758. queue.send_keyed_edu(edu, key)
  759. else:
  760. queue.send_edu(edu)
  761. async def send_device_messages(
  762. self, destinations: StrCollection, immediate: bool = True
  763. ) -> None:
  764. destinations = await filter_destinations_by_retry_limiter(
  765. [
  766. destination
  767. for destination in destinations
  768. if self._federation_shard_config.should_handle(
  769. self._instance_name, destination
  770. )
  771. and not self.is_mine_server_name(destination)
  772. ],
  773. clock=self.clock,
  774. store=self.store,
  775. retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
  776. )
  777. for destination in destinations:
  778. if immediate:
  779. self._get_per_destination_queue(destination).attempt_new_transaction()
  780. else:
  781. self._get_per_destination_queue(destination).mark_new_data()
  782. self._destination_wakeup_queue.add_to_queue(destination)
  783. def wake_destination(self, destination: str) -> None:
  784. """Called when we want to retry sending transactions to a remote.
  785. This is mainly useful if the remote server has been down and we think it
  786. might have come back.
  787. """
  788. if self.is_mine_server_name(destination):
  789. logger.warning("Not waking up ourselves")
  790. return
  791. if not self._federation_shard_config.should_handle(
  792. self._instance_name, destination
  793. ):
  794. return
  795. self._get_per_destination_queue(destination).attempt_new_transaction()
  796. @staticmethod
  797. def get_current_token() -> int:
  798. # Dummy implementation for case where federation sender isn't offloaded
  799. # to a worker.
  800. return 0
  801. def federation_ack(self, instance_name: str, token: int) -> None:
  802. # It is not expected that this gets called on FederationSender.
  803. raise NotImplementedError()
  804. @staticmethod
  805. async def get_replication_rows(
  806. instance_name: str, from_token: int, to_token: int, target_row_count: int
  807. ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
  808. # Dummy implementation for case where federation sender isn't offloaded
  809. # to a worker.
  810. return [], 0, False
  811. async def _wake_destinations_needing_catchup(self) -> None:
  812. """
  813. Wakes up destinations that need catch-up and are not currently being
  814. backed off from.
  815. In order to reduce load spikes, adds a delay between each destination.
  816. """
  817. last_processed: Optional[str] = None
  818. while True:
  819. destinations_to_wake = (
  820. await self.store.get_catch_up_outstanding_destinations(last_processed)
  821. )
  822. if not destinations_to_wake:
  823. # finished waking all destinations!
  824. break
  825. last_processed = destinations_to_wake[-1]
  826. destinations_to_wake = [
  827. d
  828. for d in destinations_to_wake
  829. if self._federation_shard_config.should_handle(self._instance_name, d)
  830. ]
  831. for destination in destinations_to_wake:
  832. logger.info(
  833. "Destination %s has outstanding catch-up, waking up.",
  834. last_processed,
  835. )
  836. self.wake_destination(destination)
  837. await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC)