You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

986 lines
39 KiB

  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """
  15. The Federation Sender is responsible for sending Persistent Data Units (PDUs)
  16. and Ephemeral Data Units (EDUs) to other homeservers using
  17. the `/send` Federation API.
  18. ## How do PDUs get sent?
  19. The Federation Sender is made aware of new PDUs due to `FederationSender.notify_new_events`.
  20. When the sender is notified about a newly-persisted PDU that originates from this homeserver
  21. and is not an out-of-band event, we pass the PDU to the `_PerDestinationQueue` for each
  22. remote homeserver that is in the room at that point in the DAG.
  23. ### Per-Destination Queues
  24. There is one `PerDestinationQueue` per 'destination' homeserver.
  25. The `PerDestinationQueue` maintains the following information about the destination:
  26. - whether the destination is currently in [catch-up mode (see below)](#catch-up-mode);
  27. - a queue of PDUs to be sent to the destination; and
  28. - a queue of EDUs to be sent to the destination (not considered in this section).
  29. Upon a new PDU being enqueued, `attempt_new_transaction` is called to start a new
  30. transaction if there is not already one in progress.
  31. ### Transactions and the Transaction Transmission Loop
  32. Each federation HTTP request to the `/send` endpoint is referred to as a 'transaction'.
  33. The body of the HTTP request contains a list of PDUs and EDUs to send to the destination.
  34. The *Transaction Transmission Loop* (`_transaction_transmission_loop`) is responsible
  35. for emptying the queued PDUs (and EDUs) from a `PerDestinationQueue` by sending
  36. them to the destination.
  37. There can only be one transaction in flight for a given destination at any time.
  38. (Other than preventing us from overloading the destination, this also makes it easier to
  39. reason about because we process events sequentially for each destination.
  40. This is useful for *Catch-Up Mode*, described later.)
  41. The loop continues so long as there is anything to send. At each iteration of the loop, we:
  42. - dequeue up to 50 PDUs (and up to 100 EDUs).
  43. - make the `/send` request to the destination homeserver with the dequeued PDUs and EDUs.
  44. - if successful, make note of the fact that we succeeded in transmitting PDUs up to
  45. the given `stream_ordering` of the latest PDU by
  46. - if unsuccessful, back off from the remote homeserver for some time.
  47. If we have been unsuccessful for too long (when the backoff interval grows to exceed 1 hour),
  48. the in-memory queues are emptied and we enter [*Catch-Up Mode*, described below](#catch-up-mode).
  49. ### Catch-Up Mode
  50. When the `PerDestinationQueue` has the catch-up flag set, the *Catch-Up Transmission Loop*
  51. (`_catch_up_transmission_loop`) is used in lieu of the regular `_transaction_transmission_loop`.
  52. (Only once the catch-up mode has been exited can the regular tranaction transmission behaviour
  53. be resumed.)
  54. *Catch-Up Mode*, entered upon Synapse startup or once a homeserver has fallen behind due to
  55. connection problems, is responsible for sending PDUs that have been missed by the destination
  56. homeserver. (PDUs can be missed because the `PerDestinationQueue` is volatile — i.e. resets
  57. on startup — and it does not hold PDUs forever if `/send` requests to the destination fail.)
  58. The catch-up mechanism makes use of the `last_successful_stream_ordering` column in the
  59. `destinations` table (which gives the `stream_ordering` of the most recent successfully
  60. sent PDU) and the `stream_ordering` column in the `destination_rooms` table (which gives,
  61. for each room, the `stream_ordering` of the most recent PDU that needs to be sent to this
  62. destination).
  63. Each iteration of the loop pulls out 50 `destination_rooms` entries with the oldest
  64. `stream_ordering`s that are greater than the `last_successful_stream_ordering`.
  65. In other words, from the set of latest PDUs in each room to be sent to the destination,
  66. the 50 oldest such PDUs are pulled out.
  67. These PDUs could, in principle, now be directly sent to the destination. However, as an
  68. optimisation intended to prevent overloading destination homeservers, we instead attempt
  69. to send the latest forward extremities so long as the destination homeserver is still
  70. eligible to receive those.
  71. This reduces load on the destination **in aggregate** because all Synapse homeservers
  72. will behave according to this principle and therefore avoid sending lots of different PDUs
  73. at different points in the DAG to a recovering homeserver.
  74. *This optimisation is not currently valid in rooms which are partial-state on this homeserver,
  75. since we are unable to determine whether the destination homeserver is eligible to receive
  76. the latest forward extremities unless this homeserver sent those PDUs — in this case, we
  77. just send the latest PDUs originating from this server and skip this optimisation.*
  78. Whilst PDUs are sent through this mechanism, the position of `last_successful_stream_ordering`
  79. is advanced as normal.
  80. Once there are no longer any rooms containing outstanding PDUs to be sent to the destination
  81. *that are not already in the `PerDestinationQueue` because they arrived since Catch-Up Mode
  82. was enabled*, Catch-Up Mode is exited and we return to `_transaction_transmission_loop`.
  83. #### A note on failures and back-offs
  84. If a remote server is unreachable over federation, we back off from that server,
  85. with an exponentially-increasing retry interval.
  86. Whilst we don't automatically retry after the interval, we prevent making new attempts
  87. until such time as the back-off has cleared.
  88. Once the back-off is cleared and a new PDU or EDU arrives for transmission, the transmission
  89. loop resumes and empties the queue by making federation requests.
  90. If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent
  91. unbounded growth) and Catch-Up Mode is entered.
  92. It is worth noting that the back-off for a remote server is cleared once an inbound
  93. request from that remote server is received (see `notify_remote_server_up`).
  94. At this point, the transaction transmission loop is also started up, to proactively
  95. send missed PDUs and EDUs to the destination (i.e. you don't need to wait for a new PDU
  96. or EDU, destined for that destination, to be created in order to send out missed PDUs and
  97. EDUs).
  98. """
  99. import abc
  100. import logging
  101. from collections import OrderedDict
  102. from typing import (
  103. TYPE_CHECKING,
  104. Collection,
  105. Dict,
  106. Hashable,
  107. Iterable,
  108. List,
  109. Optional,
  110. Set,
  111. Tuple,
  112. )
  113. import attr
  114. from prometheus_client import Counter
  115. from typing_extensions import Literal
  116. from twisted.internet import defer
  117. from twisted.internet.interfaces import IDelayedCall
  118. import synapse.metrics
  119. from synapse.api.presence import UserPresenceState
  120. from synapse.events import EventBase
  121. from synapse.federation.sender.per_destination_queue import PerDestinationQueue
  122. from synapse.federation.sender.transaction_manager import TransactionManager
  123. from synapse.federation.units import Edu
  124. from synapse.logging.context import make_deferred_yieldable, run_in_background
  125. from synapse.metrics import (
  126. LaterGauge,
  127. event_processing_loop_counter,
  128. event_processing_loop_room_count,
  129. events_processed_counter,
  130. )
  131. from synapse.metrics.background_process_metrics import (
  132. run_as_background_process,
  133. wrap_as_background_process,
  134. )
  135. from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
  136. from synapse.util import Clock
  137. from synapse.util.metrics import Measure
  138. if TYPE_CHECKING:
  139. from synapse.events.presence_router import PresenceRouter
  140. from synapse.server import HomeServer
  141. logger = logging.getLogger(__name__)
  142. sent_pdus_destination_dist_count = Counter(
  143. "synapse_federation_client_sent_pdu_destinations_count",
  144. "Number of PDUs queued for sending to one or more destinations",
  145. )
  146. sent_pdus_destination_dist_total = Counter(
  147. "synapse_federation_client_sent_pdu_destinations",
  148. "Total number of PDUs queued for sending across all destinations",
  149. )
  150. # Time (in s) after Synapse's startup that we will begin to wake up destinations
  151. # that have catch-up outstanding.
  152. CATCH_UP_STARTUP_DELAY_SEC = 15
  153. # Time (in s) to wait in between waking up each destination, i.e. one destination
  154. # will be woken up every <x> seconds after Synapse's startup until we have woken
  155. # every destination has outstanding catch-up.
  156. CATCH_UP_STARTUP_INTERVAL_SEC = 5
  157. class AbstractFederationSender(metaclass=abc.ABCMeta):
  158. @abc.abstractmethod
  159. def notify_new_events(self, max_token: RoomStreamToken) -> None:
  160. """This gets called when we have some new events we might want to
  161. send out to other servers.
  162. """
  163. raise NotImplementedError()
  164. @abc.abstractmethod
  165. async def send_read_receipt(self, receipt: ReadReceipt) -> None:
  166. """Send a RR to any other servers in the room
  167. Args:
  168. receipt: receipt to be sent
  169. """
  170. raise NotImplementedError()
  171. @abc.abstractmethod
  172. def send_presence_to_destinations(
  173. self, states: Iterable[UserPresenceState], destinations: Iterable[str]
  174. ) -> None:
  175. """Send the given presence states to the given destinations.
  176. Args:
  177. destinations:
  178. """
  179. raise NotImplementedError()
  180. @abc.abstractmethod
  181. def build_and_send_edu(
  182. self,
  183. destination: str,
  184. edu_type: str,
  185. content: JsonDict,
  186. key: Optional[Hashable] = None,
  187. ) -> None:
  188. """Construct an Edu object, and queue it for sending
  189. Args:
  190. destination: name of server to send to
  191. edu_type: type of EDU to send
  192. content: content of EDU
  193. key: clobbering key for this edu
  194. """
  195. raise NotImplementedError()
  196. @abc.abstractmethod
  197. def send_device_messages(self, destination: str, immediate: bool = True) -> None:
  198. """Tells the sender that a new device message is ready to be sent to the
  199. destination. The `immediate` flag specifies whether the messages should
  200. be tried to be sent immediately, or whether it can be delayed for a
  201. short while (to aid performance).
  202. """
  203. raise NotImplementedError()
  204. @abc.abstractmethod
  205. def wake_destination(self, destination: str) -> None:
  206. """Called when we want to retry sending transactions to a remote.
  207. This is mainly useful if the remote server has been down and we think it
  208. might have come back.
  209. """
  210. raise NotImplementedError()
  211. @abc.abstractmethod
  212. def get_current_token(self) -> int:
  213. raise NotImplementedError()
  214. @abc.abstractmethod
  215. def federation_ack(self, instance_name: str, token: int) -> None:
  216. raise NotImplementedError()
  217. @abc.abstractmethod
  218. async def get_replication_rows(
  219. self, instance_name: str, from_token: int, to_token: int, target_row_count: int
  220. ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
  221. raise NotImplementedError()
  222. @attr.s
  223. class _DestinationWakeupQueue:
  224. """A queue of destinations that need to be woken up due to new updates.
  225. Staggers waking up of per destination queues to ensure that we don't attempt
  226. to start TLS connections with many hosts all at once, leading to pinned CPU.
  227. """
  228. # The maximum duration in seconds between queuing up a destination and it
  229. # being woken up.
  230. _MAX_TIME_IN_QUEUE = 30.0
  231. # The maximum duration in seconds between waking up consecutive destination
  232. # queues.
  233. _MAX_DELAY = 0.1
  234. sender: "FederationSender" = attr.ib()
  235. clock: Clock = attr.ib()
  236. queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
  237. processing: bool = attr.ib(default=False)
  238. def add_to_queue(self, destination: str) -> None:
  239. """Add a destination to the queue to be woken up."""
  240. self.queue[destination] = None
  241. if not self.processing:
  242. self._handle()
  243. @wrap_as_background_process("_DestinationWakeupQueue.handle")
  244. async def _handle(self) -> None:
  245. """Background process to drain the queue."""
  246. if not self.queue:
  247. return
  248. assert not self.processing
  249. self.processing = True
  250. try:
  251. # We start with a delay that should drain the queue quickly enough that
  252. # we process all destinations in the queue in _MAX_TIME_IN_QUEUE
  253. # seconds.
  254. #
  255. # We also add an upper bound to the delay, to gracefully handle the
  256. # case where the queue only has a few entries in it.
  257. current_sleep_seconds = min(
  258. self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
  259. )
  260. while self.queue:
  261. destination, _ = self.queue.popitem(last=False)
  262. queue = self.sender._get_per_destination_queue(destination)
  263. if not queue._new_data_to_send:
  264. # The per destination queue has already been woken up.
  265. continue
  266. queue.attempt_new_transaction()
  267. await self.clock.sleep(current_sleep_seconds)
  268. if not self.queue:
  269. break
  270. # More destinations may have been added to the queue, so we may
  271. # need to reduce the delay to ensure everything gets processed
  272. # within _MAX_TIME_IN_QUEUE seconds.
  273. current_sleep_seconds = min(
  274. current_sleep_seconds, self._MAX_TIME_IN_QUEUE / len(self.queue)
  275. )
  276. finally:
  277. self.processing = False
  278. class FederationSender(AbstractFederationSender):
  279. def __init__(self, hs: "HomeServer"):
  280. self.hs = hs
  281. self.server_name = hs.hostname
  282. self.store = hs.get_datastores().main
  283. self.state = hs.get_state_handler()
  284. self._storage_controllers = hs.get_storage_controllers()
  285. self.clock = hs.get_clock()
  286. self.is_mine_id = hs.is_mine_id
  287. self._presence_router: Optional["PresenceRouter"] = None
  288. self._transaction_manager = TransactionManager(hs)
  289. self._instance_name = hs.get_instance_name()
  290. self._federation_shard_config = hs.config.worker.federation_shard_config
  291. # map from destination to PerDestinationQueue
  292. self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
  293. LaterGauge(
  294. "synapse_federation_transaction_queue_pending_destinations",
  295. "",
  296. [],
  297. lambda: sum(
  298. 1
  299. for d in self._per_destination_queues.values()
  300. if d.transmission_loop_running
  301. ),
  302. )
  303. LaterGauge(
  304. "synapse_federation_transaction_queue_pending_pdus",
  305. "",
  306. [],
  307. lambda: sum(
  308. d.pending_pdu_count() for d in self._per_destination_queues.values()
  309. ),
  310. )
  311. LaterGauge(
  312. "synapse_federation_transaction_queue_pending_edus",
  313. "",
  314. [],
  315. lambda: sum(
  316. d.pending_edu_count() for d in self._per_destination_queues.values()
  317. ),
  318. )
  319. self._is_processing = False
  320. self._last_poked_id = -1
  321. # map from room_id to a set of PerDestinationQueues which we believe are
  322. # awaiting a call to flush_read_receipts_for_room. The presence of an entry
  323. # here for a given room means that we are rate-limiting RR flushes to that room,
  324. # and that there is a pending call to _flush_rrs_for_room in the system.
  325. self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
  326. self._rr_txn_interval_per_room_ms = (
  327. 1000.0
  328. / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
  329. )
  330. # wake up destinations that have outstanding PDUs to be caught up
  331. self._catchup_after_startup_timer: Optional[
  332. IDelayedCall
  333. ] = self.clock.call_later(
  334. CATCH_UP_STARTUP_DELAY_SEC,
  335. run_as_background_process,
  336. "wake_destinations_needing_catchup",
  337. self._wake_destinations_needing_catchup,
  338. )
  339. self._external_cache = hs.get_external_cache()
  340. self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
  341. def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
  342. """Get or create a PerDestinationQueue for the given destination
  343. Args:
  344. destination: server_name of remote server
  345. """
  346. queue = self._per_destination_queues.get(destination)
  347. if not queue:
  348. queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
  349. self._per_destination_queues[destination] = queue
  350. return queue
  351. def notify_new_events(self, max_token: RoomStreamToken) -> None:
  352. """This gets called when we have some new events we might want to
  353. send out to other servers.
  354. """
  355. # We just use the minimum stream ordering and ignore the vector clock
  356. # component. This is safe to do as long as we *always* ignore the vector
  357. # clock components.
  358. current_id = max_token.stream
  359. self._last_poked_id = max(current_id, self._last_poked_id)
  360. if self._is_processing:
  361. return
  362. # fire off a processing loop in the background
  363. run_as_background_process(
  364. "process_event_queue_for_federation", self._process_event_queue_loop
  365. )
  366. async def _process_event_queue_loop(self) -> None:
  367. try:
  368. self._is_processing = True
  369. while True:
  370. last_token = await self.store.get_federation_out_pos("events")
  371. (
  372. next_token,
  373. event_to_received_ts,
  374. ) = await self.store.get_all_new_event_ids_stream(
  375. last_token, self._last_poked_id, limit=100
  376. )
  377. event_ids = event_to_received_ts.keys()
  378. event_entries = await self.store.get_unredacted_events_from_cache_or_db(
  379. event_ids
  380. )
  381. logger.debug(
  382. "Handling %i -> %i: %i events to send (current id %i)",
  383. last_token,
  384. next_token,
  385. len(event_entries),
  386. self._last_poked_id,
  387. )
  388. if not event_entries and next_token >= self._last_poked_id:
  389. logger.debug("All events processed")
  390. break
  391. async def handle_event(event: EventBase) -> None:
  392. # Only send events for this server.
  393. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
  394. is_mine = self.is_mine_id(event.sender)
  395. if not is_mine and send_on_behalf_of is None:
  396. logger.debug("Not sending remote-origin event %s", event)
  397. return
  398. # We also want to not send out-of-band membership events.
  399. #
  400. # OOB memberships are used in three (and a half) situations:
  401. #
  402. # (1) invite events which we have received over federation. Those
  403. # will have a `sender` on a different server, so will be
  404. # skipped by the "is_mine" test above anyway.
  405. #
  406. # (2) rejections of invites to federated rooms - either remotely
  407. # or locally generated. (Such rejections are normally
  408. # created via federation, in which case the remote server is
  409. # responsible for sending out the rejection. If that fails,
  410. # we'll create a leave event locally, but that's only really
  411. # for the benefit of the invited user - we don't have enough
  412. # information to send it out over federation).
  413. #
  414. # (2a) rescinded knocks. These are identical to rejected invites.
  415. #
  416. # (3) knock events which we have sent over federation. As with
  417. # invite rejections, the remote server should send them out to
  418. # the federation.
  419. #
  420. # So, in all the above cases, we want to ignore such events.
  421. #
  422. # OOB memberships are always(?) outliers anyway, so if we *don't*
  423. # ignore them, we'll get an exception further down when we try to
  424. # fetch the membership list for the room.
  425. #
  426. # Arguably, we could equivalently ignore all outliers here, since
  427. # in theory the only way for an outlier with a local `sender` to
  428. # exist is by being an OOB membership (via one of (2), (2a) or (3)
  429. # above).
  430. #
  431. if event.internal_metadata.is_out_of_band_membership():
  432. logger.debug("Not sending OOB membership event %s", event)
  433. return
  434. # Finally, there are some other events that we should not send out
  435. # until someone asks for them. They are explicitly flagged as such
  436. # with `proactively_send: False`.
  437. if not event.internal_metadata.should_proactively_send():
  438. logger.debug(
  439. "Not sending event with proactively_send=false: %s", event
  440. )
  441. return
  442. destinations: Optional[Collection[str]] = None
  443. if not event.prev_event_ids():
  444. # If there are no prev event IDs then the state is empty
  445. # and so no remote servers in the room
  446. destinations = set()
  447. if destinations is None:
  448. # During partial join we use the set of servers that we got
  449. # when beginning the join. It's still possible that we send
  450. # events to servers that left the room in the meantime, but
  451. # we consider that an acceptable risk since it is only our own
  452. # events that we leak and not other server's ones.
  453. partial_state_destinations = (
  454. await self.store.get_partial_state_servers_at_join(
  455. event.room_id
  456. )
  457. )
  458. if partial_state_destinations is not None:
  459. destinations = partial_state_destinations
  460. if destinations is None:
  461. # We check the external cache for the destinations, which is
  462. # stored per state group.
  463. sg = await self._external_cache.get(
  464. "event_to_prev_state_group", event.event_id
  465. )
  466. if sg:
  467. destinations = await self._external_cache.get(
  468. "get_joined_hosts", str(sg)
  469. )
  470. if destinations is None:
  471. # Add logging to help track down #13444
  472. logger.info(
  473. "Unexpectedly did not have cached destinations for %s / %s",
  474. sg,
  475. event.event_id,
  476. )
  477. else:
  478. # Add logging to help track down #13444
  479. logger.info(
  480. "Unexpectedly did not have cached prev group for %s",
  481. event.event_id,
  482. )
  483. if destinations is None:
  484. try:
  485. # Get the state from before the event.
  486. # We need to make sure that this is the state from before
  487. # the event and not from after it.
  488. # Otherwise if the last member on a server in a room is
  489. # banned then it won't receive the event because it won't
  490. # be in the room after the ban.
  491. destinations = await self.state.get_hosts_in_room_at_events(
  492. event.room_id, event_ids=event.prev_event_ids()
  493. )
  494. except Exception:
  495. logger.exception(
  496. "Failed to calculate hosts in room for event: %s",
  497. event.event_id,
  498. )
  499. return
  500. sharded_destinations = {
  501. d
  502. for d in destinations
  503. if self._federation_shard_config.should_handle(
  504. self._instance_name, d
  505. )
  506. }
  507. if send_on_behalf_of is not None:
  508. # If we are sending the event on behalf of another server
  509. # then it already has the event and there is no reason to
  510. # send the event to it.
  511. sharded_destinations.discard(send_on_behalf_of)
  512. logger.debug("Sending %s to %r", event, sharded_destinations)
  513. if sharded_destinations:
  514. await self._send_pdu(event, sharded_destinations)
  515. now = self.clock.time_msec()
  516. ts = event_to_received_ts[event.event_id]
  517. assert ts is not None
  518. synapse.metrics.event_processing_lag_by_event.labels(
  519. "federation_sender"
  520. ).observe((now - ts) / 1000)
  521. async def handle_room_events(events: List[EventBase]) -> None:
  522. logger.debug(
  523. "Handling %i events in room %s", len(events), events[0].room_id
  524. )
  525. with Measure(self.clock, "handle_room_events"):
  526. for event in events:
  527. await handle_event(event)
  528. events_by_room: Dict[str, List[EventBase]] = {}
  529. for event_id in event_ids:
  530. # `event_entries` is unsorted, so we have to iterate over `event_ids`
  531. # to ensure the events are in the right order
  532. event_cache = event_entries.get(event_id)
  533. if event_cache:
  534. event = event_cache.event
  535. events_by_room.setdefault(event.room_id, []).append(event)
  536. await make_deferred_yieldable(
  537. defer.gatherResults(
  538. [
  539. run_in_background(handle_room_events, evs)
  540. for evs in events_by_room.values()
  541. ],
  542. consumeErrors=True,
  543. )
  544. )
  545. logger.debug("Successfully handled up to %i", next_token)
  546. await self.store.update_federation_out_pos("events", next_token)
  547. if event_entries:
  548. now = self.clock.time_msec()
  549. ts = max(t for t in event_to_received_ts.values() if t)
  550. assert ts is not None
  551. synapse.metrics.event_processing_lag.labels(
  552. "federation_sender"
  553. ).set(now - ts)
  554. synapse.metrics.event_processing_last_ts.labels(
  555. "federation_sender"
  556. ).set(ts)
  557. events_processed_counter.inc(len(event_entries))
  558. event_processing_loop_room_count.labels("federation_sender").inc(
  559. len(events_by_room)
  560. )
  561. event_processing_loop_counter.labels("federation_sender").inc()
  562. synapse.metrics.event_processing_positions.labels(
  563. "federation_sender"
  564. ).set(next_token)
  565. finally:
  566. self._is_processing = False
  567. async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
  568. # We loop through all destinations to see whether we already have
  569. # a transaction in progress. If we do, stick it in the pending_pdus
  570. # table and we'll get back to it later.
  571. destinations = set(destinations)
  572. destinations.discard(self.server_name)
  573. logger.debug("Sending to: %s", str(destinations))
  574. if not destinations:
  575. return
  576. sent_pdus_destination_dist_total.inc(len(destinations))
  577. sent_pdus_destination_dist_count.inc()
  578. assert pdu.internal_metadata.stream_ordering
  579. # track the fact that we have a PDU for these destinations,
  580. # to allow us to perform catch-up later on if the remote is unreachable
  581. # for a while.
  582. await self.store.store_destination_rooms_entries(
  583. destinations,
  584. pdu.room_id,
  585. pdu.internal_metadata.stream_ordering,
  586. )
  587. for destination in destinations:
  588. self._get_per_destination_queue(destination).send_pdu(pdu)
  589. async def send_read_receipt(self, receipt: ReadReceipt) -> None:
  590. """Send a RR to any other servers in the room
  591. Args:
  592. receipt: receipt to be sent
  593. """
  594. # Some background on the rate-limiting going on here.
  595. #
  596. # It turns out that if we attempt to send out RRs as soon as we get them from
  597. # a client, then we end up trying to do several hundred Hz of federation
  598. # transactions. (The number of transactions scales as O(N^2) on the size of a
  599. # room, since in a large room we have both more RRs coming in, and more servers
  600. # to send them to.)
  601. #
  602. # This leads to a lot of CPU load, and we end up getting behind. The solution
  603. # currently adopted is as follows:
  604. #
  605. # The first receipt in a given room is sent out immediately, at time T0. Any
  606. # further receipts are, in theory, batched up for N seconds, where N is calculated
  607. # based on the number of servers in the room to achieve a transaction frequency
  608. # of around 50Hz. So, for example, if there were 100 servers in the room, then
  609. # N would be 100 / 50Hz = 2 seconds.
  610. #
  611. # Then, after T+N, we flush out any receipts that have accumulated, and restart
  612. # the timer to flush out more receipts at T+2N, etc. If no receipts accumulate,
  613. # we stop the cycle and go back to the start.
  614. #
  615. # However, in practice, it is often possible to flush out receipts earlier: in
  616. # particular, if we are sending a transaction to a given server anyway (for
  617. # example, because we have a PDU or a RR in another room to send), then we may
  618. # as well send out all of the pending RRs for that server. So it may be that
  619. # by the time we get to T+N, we don't actually have any RRs left to send out.
  620. # Nevertheless we continue to buffer up RRs for the room in question until we
  621. # reach the point that no RRs arrive between timer ticks.
  622. #
  623. # For even more background, see https://github.com/matrix-org/synapse/issues/4730.
  624. room_id = receipt.room_id
  625. # Work out which remote servers should be poked and poke them.
  626. domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
  627. room_id
  628. )
  629. domains = [
  630. d
  631. for d in domains_set
  632. if d != self.server_name
  633. and self._federation_shard_config.should_handle(self._instance_name, d)
  634. ]
  635. if not domains:
  636. return
  637. queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id)
  638. # if there is no flush yet scheduled, we will send out these receipts with
  639. # immediate flushes, and schedule the next flush for this room.
  640. if queues_pending_flush is not None:
  641. logger.debug("Queuing receipt for: %r", domains)
  642. else:
  643. logger.debug("Sending receipt to: %r", domains)
  644. self._schedule_rr_flush_for_room(room_id, len(domains))
  645. for domain in domains:
  646. queue = self._get_per_destination_queue(domain)
  647. queue.queue_read_receipt(receipt)
  648. # if there is already a RR flush pending for this room, then make sure this
  649. # destination is registered for the flush
  650. if queues_pending_flush is not None:
  651. queues_pending_flush.add(queue)
  652. else:
  653. queue.flush_read_receipts_for_room(room_id)
  654. def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
  655. # that is going to cause approximately len(domains) transactions, so now back
  656. # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
  657. backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
  658. logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
  659. self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
  660. self._queues_awaiting_rr_flush_by_room[room_id] = set()
  661. def _flush_rrs_for_room(self, room_id: str) -> None:
  662. queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
  663. logger.debug("Flushing RRs in %s to %s", room_id, queues)
  664. if not queues:
  665. # no more RRs arrived for this room; we are done.
  666. return
  667. # schedule the next flush
  668. self._schedule_rr_flush_for_room(room_id, len(queues))
  669. for queue in queues:
  670. queue.flush_read_receipts_for_room(room_id)
  671. def send_presence_to_destinations(
  672. self, states: Iterable[UserPresenceState], destinations: Iterable[str]
  673. ) -> None:
  674. """Send the given presence states to the given destinations.
  675. destinations (list[str])
  676. """
  677. if not states or not self.hs.config.server.use_presence:
  678. # No-op if presence is disabled.
  679. return
  680. # Ensure we only send out presence states for local users.
  681. for state in states:
  682. assert self.is_mine_id(state.user_id)
  683. for destination in destinations:
  684. if destination == self.server_name:
  685. continue
  686. if not self._federation_shard_config.should_handle(
  687. self._instance_name, destination
  688. ):
  689. continue
  690. self._get_per_destination_queue(destination).send_presence(
  691. states, start_loop=False
  692. )
  693. self._destination_wakeup_queue.add_to_queue(destination)
  694. def build_and_send_edu(
  695. self,
  696. destination: str,
  697. edu_type: str,
  698. content: JsonDict,
  699. key: Optional[Hashable] = None,
  700. ) -> None:
  701. """Construct an Edu object, and queue it for sending
  702. Args:
  703. destination: name of server to send to
  704. edu_type: type of EDU to send
  705. content: content of EDU
  706. key: clobbering key for this edu
  707. """
  708. if destination == self.server_name:
  709. logger.info("Not sending EDU to ourselves")
  710. return
  711. if not self._federation_shard_config.should_handle(
  712. self._instance_name, destination
  713. ):
  714. return
  715. edu = Edu(
  716. origin=self.server_name,
  717. destination=destination,
  718. edu_type=edu_type,
  719. content=content,
  720. )
  721. self.send_edu(edu, key)
  722. def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
  723. """Queue an EDU for sending
  724. Args:
  725. edu: edu to send
  726. key: clobbering key for this edu
  727. """
  728. if not self._federation_shard_config.should_handle(
  729. self._instance_name, edu.destination
  730. ):
  731. return
  732. queue = self._get_per_destination_queue(edu.destination)
  733. if key:
  734. queue.send_keyed_edu(edu, key)
  735. else:
  736. queue.send_edu(edu)
  737. def send_device_messages(self, destination: str, immediate: bool = True) -> None:
  738. if destination == self.server_name:
  739. logger.warning("Not sending device update to ourselves")
  740. return
  741. if not self._federation_shard_config.should_handle(
  742. self._instance_name, destination
  743. ):
  744. return
  745. if immediate:
  746. self._get_per_destination_queue(destination).attempt_new_transaction()
  747. else:
  748. self._get_per_destination_queue(destination).mark_new_data()
  749. self._destination_wakeup_queue.add_to_queue(destination)
  750. def wake_destination(self, destination: str) -> None:
  751. """Called when we want to retry sending transactions to a remote.
  752. This is mainly useful if the remote server has been down and we think it
  753. might have come back.
  754. """
  755. if destination == self.server_name:
  756. logger.warning("Not waking up ourselves")
  757. return
  758. if not self._federation_shard_config.should_handle(
  759. self._instance_name, destination
  760. ):
  761. return
  762. self._get_per_destination_queue(destination).attempt_new_transaction()
  763. @staticmethod
  764. def get_current_token() -> int:
  765. # Dummy implementation for case where federation sender isn't offloaded
  766. # to a worker.
  767. return 0
  768. def federation_ack(self, instance_name: str, token: int) -> None:
  769. # It is not expected that this gets called on FederationSender.
  770. raise NotImplementedError()
  771. @staticmethod
  772. async def get_replication_rows(
  773. instance_name: str, from_token: int, to_token: int, target_row_count: int
  774. ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
  775. # Dummy implementation for case where federation sender isn't offloaded
  776. # to a worker.
  777. return [], 0, False
  778. async def _wake_destinations_needing_catchup(self) -> None:
  779. """
  780. Wakes up destinations that need catch-up and are not currently being
  781. backed off from.
  782. In order to reduce load spikes, adds a delay between each destination.
  783. """
  784. last_processed: Optional[str] = None
  785. while True:
  786. destinations_to_wake = (
  787. await self.store.get_catch_up_outstanding_destinations(last_processed)
  788. )
  789. if not destinations_to_wake:
  790. # finished waking all destinations!
  791. self._catchup_after_startup_timer = None
  792. break
  793. last_processed = destinations_to_wake[-1]
  794. destinations_to_wake = [
  795. d
  796. for d in destinations_to_wake
  797. if self._federation_shard_config.should_handle(self._instance_name, d)
  798. ]
  799. for destination in destinations_to_wake:
  800. logger.info(
  801. "Destination %s has outstanding catch-up, waking up.",
  802. last_processed,
  803. )
  804. self.wake_destination(destination)
  805. await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)