Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 
 
 
 
 

805 řádky
30 KiB

  1. # Copyright 2017 Vector Creations Ltd
  2. # Copyright 2020, 2022 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from typing import (
  17. TYPE_CHECKING,
  18. Any,
  19. Awaitable,
  20. Dict,
  21. Iterable,
  22. Iterator,
  23. List,
  24. Optional,
  25. Set,
  26. Tuple,
  27. TypeVar,
  28. Union,
  29. )
  30. from prometheus_client import Counter
  31. from typing_extensions import Deque
  32. from twisted.internet.protocol import ReconnectingClientFactory
  33. from synapse.metrics import LaterGauge
  34. from synapse.metrics.background_process_metrics import run_as_background_process
  35. from synapse.replication.tcp.commands import (
  36. ClearUserSyncsCommand,
  37. Command,
  38. FederationAckCommand,
  39. PositionCommand,
  40. RdataCommand,
  41. RemoteServerUpCommand,
  42. ReplicateCommand,
  43. UserIpCommand,
  44. UserSyncCommand,
  45. )
  46. from synapse.replication.tcp.context import ClientContextFactory
  47. from synapse.replication.tcp.protocol import IReplicationConnection
  48. from synapse.replication.tcp.streams import (
  49. STREAMS_MAP,
  50. AccountDataStream,
  51. BackfillStream,
  52. CachesStream,
  53. EventsStream,
  54. FederationStream,
  55. PresenceFederationStream,
  56. PresenceStream,
  57. ReceiptsStream,
  58. Stream,
  59. ToDeviceStream,
  60. TypingStream,
  61. )
  62. if TYPE_CHECKING:
  63. from synapse.server import HomeServer
  64. logger = logging.getLogger(__name__)
  65. # number of updates received for each RDATA stream
  66. inbound_rdata_count = Counter(
  67. "synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
  68. )
  69. user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
  70. federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
  71. remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
  72. user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
  73. # the type of the entries in _command_queues_by_stream
  74. _StreamCommandQueue = Deque[
  75. Tuple[Union[RdataCommand, PositionCommand], IReplicationConnection]
  76. ]
  77. class ReplicationCommandHandler:
  78. """Handles incoming commands from replication as well as sending commands
  79. back out to connections.
  80. """
  81. def __init__(self, hs: "HomeServer"):
  82. self._replication_data_handler = hs.get_replication_data_handler()
  83. self._presence_handler = hs.get_presence_handler()
  84. self._store = hs.get_datastores().main
  85. self._notifier = hs.get_notifier()
  86. self._clock = hs.get_clock()
  87. self._instance_id = hs.get_instance_id()
  88. self._instance_name = hs.get_instance_name()
  89. # Additional Redis channel suffixes to subscribe to.
  90. self._channels_to_subscribe_to: List[str] = []
  91. self._is_presence_writer = (
  92. hs.get_instance_name() in hs.config.worker.writers.presence
  93. )
  94. self._streams: Dict[str, Stream] = {
  95. stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
  96. }
  97. # List of streams that this instance is the source of
  98. self._streams_to_replicate: List[Stream] = []
  99. for stream in self._streams.values():
  100. if hs.config.redis.redis_enabled and stream.NAME == CachesStream.NAME:
  101. # All workers can write to the cache invalidation stream when
  102. # using redis.
  103. self._streams_to_replicate.append(stream)
  104. continue
  105. if isinstance(stream, (EventsStream, BackfillStream)):
  106. # Only add EventStream and BackfillStream as a source on the
  107. # instance in charge of event persistence.
  108. if hs.get_instance_name() in hs.config.worker.writers.events:
  109. self._streams_to_replicate.append(stream)
  110. continue
  111. if isinstance(stream, ToDeviceStream):
  112. # Only add ToDeviceStream as a source on instances in charge of
  113. # sending to device messages.
  114. if hs.get_instance_name() in hs.config.worker.writers.to_device:
  115. self._streams_to_replicate.append(stream)
  116. continue
  117. if isinstance(stream, TypingStream):
  118. # Only add TypingStream as a source on the instance in charge of
  119. # typing.
  120. if hs.get_instance_name() in hs.config.worker.writers.typing:
  121. self._streams_to_replicate.append(stream)
  122. continue
  123. if isinstance(stream, AccountDataStream):
  124. # Only add AccountDataStream and TagAccountDataStream as a source on the
  125. # instance in charge of account_data persistence.
  126. if hs.get_instance_name() in hs.config.worker.writers.account_data:
  127. self._streams_to_replicate.append(stream)
  128. continue
  129. if isinstance(stream, ReceiptsStream):
  130. # Only add ReceiptsStream as a source on the instance in charge of
  131. # receipts.
  132. if hs.get_instance_name() in hs.config.worker.writers.receipts:
  133. self._streams_to_replicate.append(stream)
  134. continue
  135. if isinstance(stream, (PresenceStream, PresenceFederationStream)):
  136. # Only add PresenceStream as a source on the instance in charge
  137. # of presence.
  138. if self._is_presence_writer:
  139. self._streams_to_replicate.append(stream)
  140. continue
  141. # Only add any other streams if we're on master.
  142. if hs.config.worker.worker_app is not None:
  143. continue
  144. if (
  145. stream.NAME == FederationStream.NAME
  146. and hs.config.worker.send_federation
  147. ):
  148. # We only support federation stream if federation sending
  149. # has been disabled on the master.
  150. continue
  151. self._streams_to_replicate.append(stream)
  152. # Map of stream name to batched updates. See RdataCommand for info on
  153. # how batching works.
  154. self._pending_batches: Dict[str, List[Any]] = {}
  155. # The factory used to create connections.
  156. self._factory: Optional[ReconnectingClientFactory] = None
  157. # The currently connected connections. (The list of places we need to send
  158. # outgoing replication commands to.)
  159. self._connections: List[IReplicationConnection] = []
  160. LaterGauge(
  161. "synapse_replication_tcp_resource_total_connections",
  162. "",
  163. [],
  164. lambda: len(self._connections),
  165. )
  166. # When POSITION or RDATA commands arrive, we stick them in a queue and process
  167. # them in order in a separate background process.
  168. # the streams which are currently being processed by _unsafe_process_queue
  169. self._processing_streams: Set[str] = set()
  170. # for each stream, a queue of commands that are awaiting processing, and the
  171. # connection that they arrived on.
  172. self._command_queues_by_stream = {
  173. stream_name: _StreamCommandQueue() for stream_name in self._streams
  174. }
  175. # For each connection, the incoming stream names that have received a POSITION
  176. # from that connection.
  177. self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
  178. LaterGauge(
  179. "synapse_replication_tcp_command_queue",
  180. "Number of inbound RDATA/POSITION commands queued for processing",
  181. ["stream_name"],
  182. lambda: {
  183. (stream_name,): len(queue)
  184. for stream_name, queue in self._command_queues_by_stream.items()
  185. },
  186. )
  187. self._is_master = hs.config.worker.worker_app is None
  188. self._federation_sender = None
  189. if self._is_master and not hs.config.worker.send_federation:
  190. self._federation_sender = hs.get_federation_sender()
  191. self._server_notices_sender = None
  192. if self._is_master:
  193. self._server_notices_sender = hs.get_server_notices_sender()
  194. if hs.config.redis.redis_enabled:
  195. # If we're using Redis, it's the background worker that should
  196. # receive USER_IP commands and store the relevant client IPs.
  197. self._should_insert_client_ips = hs.config.worker.run_background_tasks
  198. else:
  199. # If we're NOT using Redis, this must be handled by the master
  200. self._should_insert_client_ips = hs.get_instance_name() == "master"
  201. if self._is_master or self._should_insert_client_ips:
  202. self.subscribe_to_channel("USER_IP")
  203. def subscribe_to_channel(self, channel_name: str) -> None:
  204. """
  205. Indicates that we wish to subscribe to a Redis channel by name.
  206. (The name will later be prefixed with the server name; i.e. subscribing
  207. to the 'ABC' channel actually subscribes to 'example.com/ABC' Redis-side.)
  208. Raises:
  209. - If replication has already started, then it's too late to subscribe
  210. to new channels.
  211. """
  212. if self._factory is not None:
  213. # We don't allow subscribing after the fact to avoid the chance
  214. # of missing an important message because we didn't subscribe in time.
  215. raise RuntimeError(
  216. "Cannot subscribe to more channels after replication started."
  217. )
  218. if channel_name not in self._channels_to_subscribe_to:
  219. self._channels_to_subscribe_to.append(channel_name)
  220. def _add_command_to_stream_queue(
  221. self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
  222. ) -> None:
  223. """Queue the given received command for processing
  224. Adds the given command to the per-stream queue, and processes the queue if
  225. necessary
  226. """
  227. stream_name = cmd.stream_name
  228. queue = self._command_queues_by_stream.get(stream_name)
  229. if queue is None:
  230. logger.error("Got %s for unknown stream: %s", cmd.NAME, stream_name)
  231. return
  232. queue.append((cmd, conn))
  233. # if we're already processing this stream, there's nothing more to do:
  234. # the new entry on the queue will get picked up in due course
  235. if stream_name in self._processing_streams:
  236. return
  237. # fire off a background process to start processing the queue.
  238. run_as_background_process(
  239. "process-replication-data", self._unsafe_process_queue, stream_name
  240. )
  241. async def _unsafe_process_queue(self, stream_name: str) -> None:
  242. """Processes the command queue for the given stream, until it is empty
  243. Does not check if there is already a thread processing the queue, hence "unsafe"
  244. """
  245. assert stream_name not in self._processing_streams
  246. self._processing_streams.add(stream_name)
  247. try:
  248. queue = self._command_queues_by_stream.get(stream_name)
  249. while queue:
  250. cmd, conn = queue.popleft()
  251. try:
  252. await self._process_command(cmd, conn, stream_name)
  253. except Exception:
  254. logger.exception("Failed to handle command %s", cmd)
  255. finally:
  256. self._processing_streams.discard(stream_name)
  257. async def _process_command(
  258. self,
  259. cmd: Union[PositionCommand, RdataCommand],
  260. conn: IReplicationConnection,
  261. stream_name: str,
  262. ) -> None:
  263. if isinstance(cmd, PositionCommand):
  264. await self._process_position(stream_name, conn, cmd)
  265. elif isinstance(cmd, RdataCommand):
  266. await self._process_rdata(stream_name, conn, cmd)
  267. else:
  268. # This shouldn't be possible
  269. raise Exception("Unrecognised command %s in stream queue", cmd.NAME)
  270. def start_replication(self, hs: "HomeServer") -> None:
  271. """Helper method to start replication."""
  272. from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory
  273. # First let's ensure that we have a ReplicationStreamer started.
  274. hs.get_replication_streamer()
  275. # We need two connections to redis, one for the subscription stream and
  276. # one to send commands to (as you can't send further redis commands to a
  277. # connection after SUBSCRIBE is called).
  278. # First create the connection for sending commands.
  279. outbound_redis_connection = hs.get_outbound_redis_connection()
  280. # Now create the factory/connection for the subscription stream.
  281. self._factory = RedisDirectTcpReplicationClientFactory(
  282. hs,
  283. outbound_redis_connection,
  284. channel_names=self._channels_to_subscribe_to,
  285. )
  286. reactor = hs.get_reactor()
  287. redis_config = hs.config.redis
  288. if redis_config.redis_path is not None:
  289. reactor.connectUNIX(
  290. redis_config.redis_path,
  291. self._factory,
  292. timeout=30,
  293. checkPID=False,
  294. )
  295. elif hs.config.redis.redis_use_tls:
  296. ssl_context_factory = ClientContextFactory(hs.config.redis)
  297. reactor.connectSSL(
  298. redis_config.redis_host,
  299. redis_config.redis_port,
  300. self._factory,
  301. ssl_context_factory,
  302. timeout=30,
  303. bindAddress=None,
  304. )
  305. else:
  306. reactor.connectTCP(
  307. redis_config.redis_host,
  308. redis_config.redis_port,
  309. self._factory,
  310. timeout=30,
  311. bindAddress=None,
  312. )
  313. def get_streams(self) -> Dict[str, Stream]:
  314. """Get a map from stream name to all streams."""
  315. return self._streams
  316. def get_streams_to_replicate(self) -> List[Stream]:
  317. """Get a list of streams that this instances replicates."""
  318. return self._streams_to_replicate
  319. def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
  320. self.send_positions_to_connection(conn)
  321. def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
  322. """Send current position of all streams this process is source of to
  323. the connection.
  324. """
  325. # We respond with current position of all streams this instance
  326. # replicates.
  327. for stream in self.get_streams_to_replicate():
  328. # Note that we use the current token as the prev token here (rather
  329. # than stream.last_token), as we can't be sure that there have been
  330. # no rows written between last token and the current token (since we
  331. # might be racing with the replication sending bg process).
  332. current_token = stream.current_token(self._instance_name)
  333. self.send_command(
  334. PositionCommand(
  335. stream.NAME,
  336. self._instance_name,
  337. current_token,
  338. current_token,
  339. )
  340. )
  341. def on_USER_SYNC(
  342. self, conn: IReplicationConnection, cmd: UserSyncCommand
  343. ) -> Optional[Awaitable[None]]:
  344. user_sync_counter.inc()
  345. if self._is_presence_writer:
  346. return self._presence_handler.update_external_syncs_row(
  347. cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
  348. )
  349. else:
  350. return None
  351. def on_CLEAR_USER_SYNC(
  352. self, conn: IReplicationConnection, cmd: ClearUserSyncsCommand
  353. ) -> Optional[Awaitable[None]]:
  354. if self._is_presence_writer:
  355. return self._presence_handler.update_external_syncs_clear(cmd.instance_id)
  356. else:
  357. return None
  358. def on_FEDERATION_ACK(
  359. self, conn: IReplicationConnection, cmd: FederationAckCommand
  360. ) -> None:
  361. federation_ack_counter.inc()
  362. if self._federation_sender:
  363. self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
  364. def on_USER_IP(
  365. self, conn: IReplicationConnection, cmd: UserIpCommand
  366. ) -> Optional[Awaitable[None]]:
  367. user_ip_cache_counter.inc()
  368. if self._is_master or self._should_insert_client_ips:
  369. # We make a point of only returning an awaitable if there's actually
  370. # something to do; on_USER_IP is not an async function, but
  371. # _handle_user_ip is.
  372. # If on_USER_IP returns an awaitable, it gets scheduled as a
  373. # background process (see `BaseReplicationStreamProtocol.handle_command`).
  374. return self._handle_user_ip(cmd)
  375. else:
  376. # Returning None when this process definitely has nothing to do
  377. # reduces the overhead of handling the USER_IP command, which is
  378. # currently broadcast to all workers regardless of utility.
  379. return None
  380. async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
  381. """
  382. Handles a User IP, branching depending on whether we are the main process
  383. and/or the background worker.
  384. """
  385. if self._is_master:
  386. assert self._server_notices_sender is not None
  387. await self._server_notices_sender.on_user_ip(cmd.user_id)
  388. if self._should_insert_client_ips:
  389. await self._store.insert_client_ip(
  390. cmd.user_id,
  391. cmd.access_token,
  392. cmd.ip,
  393. cmd.user_agent,
  394. cmd.device_id,
  395. cmd.last_seen,
  396. )
  397. def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None:
  398. if cmd.instance_name == self._instance_name:
  399. # Ignore RDATA that are just our own echoes
  400. return
  401. stream_name = cmd.stream_name
  402. inbound_rdata_count.labels(stream_name).inc()
  403. # We put the received command into a queue here for two reasons:
  404. # 1. so we don't try and concurrently handle multiple rows for the
  405. # same stream, and
  406. # 2. so we don't race with getting a POSITION command and fetching
  407. # missing RDATA.
  408. self._add_command_to_stream_queue(conn, cmd)
  409. async def _process_rdata(
  410. self, stream_name: str, conn: IReplicationConnection, cmd: RdataCommand
  411. ) -> None:
  412. """Process an RDATA command
  413. Called after the command has been popped off the queue of inbound commands
  414. """
  415. try:
  416. row = STREAMS_MAP[stream_name].parse_row(cmd.row)
  417. except Exception as e:
  418. raise Exception(
  419. "Failed to parse RDATA: %r %r" % (stream_name, cmd.row)
  420. ) from e
  421. # make sure that we've processed a POSITION for this stream *on this
  422. # connection*. (A POSITION on another connection is no good, as there
  423. # is no guarantee that we have seen all the intermediate updates.)
  424. sbc = self._streams_by_connection.get(conn)
  425. if not sbc or stream_name not in sbc:
  426. # Let's drop the row for now, on the assumption we'll receive a
  427. # `POSITION` soon and we'll catch up correctly then.
  428. logger.debug(
  429. "Discarding RDATA for unconnected stream %s -> %s",
  430. stream_name,
  431. cmd.token,
  432. )
  433. return
  434. if cmd.token is None:
  435. # I.e. this is part of a batch of updates for this stream (in
  436. # which case batch until we get an update for the stream with a non
  437. # None token).
  438. self._pending_batches.setdefault(stream_name, []).append(row)
  439. return
  440. # Check if this is the last of a batch of updates
  441. rows = self._pending_batches.pop(stream_name, [])
  442. rows.append(row)
  443. stream = self._streams[stream_name]
  444. # Find where we previously streamed up to.
  445. current_token = stream.current_token(cmd.instance_name)
  446. # Discard this data if this token is earlier than the current
  447. # position. Note that streams can be reset (in which case you
  448. # expect an earlier token), but that must be preceded by a
  449. # POSITION command.
  450. if cmd.token <= current_token:
  451. logger.debug(
  452. "Discarding RDATA from stream %s at position %s before previous position %s",
  453. stream_name,
  454. cmd.token,
  455. current_token,
  456. )
  457. else:
  458. await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
  459. async def on_rdata(
  460. self, stream_name: str, instance_name: str, token: int, rows: list
  461. ) -> None:
  462. """Called to handle a batch of replication data with a given stream token.
  463. Args:
  464. stream_name: name of the replication stream for this batch of rows
  465. instance_name: the instance that wrote the rows.
  466. token: stream token for this batch of rows
  467. rows: a list of Stream.ROW_TYPE objects as returned by
  468. Stream.parse_row.
  469. """
  470. logger.debug("Received rdata %s (%s) -> %s", stream_name, instance_name, token)
  471. await self._replication_data_handler.on_rdata(
  472. stream_name, instance_name, token, rows
  473. )
  474. def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand) -> None:
  475. if cmd.instance_name == self._instance_name:
  476. # Ignore POSITION that are just our own echoes
  477. return
  478. logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
  479. self._add_command_to_stream_queue(conn, cmd)
  480. async def _process_position(
  481. self, stream_name: str, conn: IReplicationConnection, cmd: PositionCommand
  482. ) -> None:
  483. """Process a POSITION command
  484. Called after the command has been popped off the queue of inbound commands
  485. """
  486. stream = self._streams[stream_name]
  487. # We're about to go and catch up with the stream, so remove from set
  488. # of connected streams.
  489. for streams in self._streams_by_connection.values():
  490. streams.discard(stream_name)
  491. # We clear the pending batches for the stream as the fetching of the
  492. # missing updates below will fetch all rows in the batch.
  493. self._pending_batches.pop(stream_name, [])
  494. # Find where we previously streamed up to.
  495. current_token = stream.current_token(cmd.instance_name)
  496. # If the position token matches our current token then we're up to
  497. # date and there's nothing to do. Otherwise, fetch all updates
  498. # between then and now.
  499. missing_updates = cmd.prev_token != current_token
  500. while missing_updates:
  501. # Note: There may very well not be any new updates, but we check to
  502. # make sure. This can particularly happen for the event stream where
  503. # event persisters continuously send `POSITION`. See `resource.py`
  504. # for why this can happen.
  505. logger.info(
  506. "Fetching replication rows for '%s' between %i and %i",
  507. stream_name,
  508. current_token,
  509. cmd.new_token,
  510. )
  511. (updates, current_token, missing_updates) = await stream.get_updates_since(
  512. cmd.instance_name, current_token, cmd.new_token
  513. )
  514. # TODO: add some tests for this
  515. # Some streams return multiple rows with the same stream IDs,
  516. # which need to be processed in batches.
  517. for token, rows in _batch_updates(updates):
  518. await self.on_rdata(
  519. stream_name,
  520. cmd.instance_name,
  521. token,
  522. [stream.parse_row(row) for row in rows],
  523. )
  524. logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)
  525. # We've now caught up to position sent to us, notify handler.
  526. await self._replication_data_handler.on_position(
  527. cmd.stream_name, cmd.instance_name, cmd.new_token
  528. )
  529. self._streams_by_connection.setdefault(conn, set()).add(stream_name)
  530. def on_REMOTE_SERVER_UP(
  531. self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
  532. ) -> None:
  533. """Called when get a new REMOTE_SERVER_UP command."""
  534. self._replication_data_handler.on_remote_server_up(cmd.data)
  535. self._notifier.notify_remote_server_up(cmd.data)
  536. def new_connection(self, connection: IReplicationConnection) -> None:
  537. """Called when we have a new connection."""
  538. self._connections.append(connection)
  539. # If we are connected to replication as a client (rather than a server)
  540. # we need to reset the reconnection delay on the client factory (which
  541. # is used to do exponential back off when the connection drops).
  542. #
  543. # Ideally we would reset the delay when we've "fully established" the
  544. # connection (for some definition thereof) to stop us from tightlooping
  545. # on reconnection if something fails after this point and we drop the
  546. # connection. Unfortunately, we don't really have a better definition of
  547. # "fully established" than the connection being established.
  548. if self._factory:
  549. self._factory.resetDelay()
  550. # Tell the other end if we have any users currently syncing.
  551. currently_syncing = (
  552. self._presence_handler.get_currently_syncing_users_for_replication()
  553. )
  554. now = self._clock.time_msec()
  555. for user_id in currently_syncing:
  556. connection.send_command(
  557. UserSyncCommand(self._instance_id, user_id, True, now)
  558. )
  559. def lost_connection(self, connection: IReplicationConnection) -> None:
  560. """Called when a connection is closed/lost."""
  561. # we no longer need _streams_by_connection for this connection.
  562. streams = self._streams_by_connection.pop(connection, None)
  563. if streams:
  564. logger.info(
  565. "Lost replication connection; streams now disconnected: %s", streams
  566. )
  567. try:
  568. self._connections.remove(connection)
  569. except ValueError:
  570. pass
  571. def connected(self) -> bool:
  572. """Do we have any replication connections open?
  573. Is used by e.g. `ReplicationStreamer` to no-op if nothing is connected.
  574. """
  575. return bool(self._connections)
  576. def send_command(self, cmd: Command) -> None:
  577. """Send a command to all connected connections.
  578. Args:
  579. cmd
  580. """
  581. if self._connections:
  582. for connection in self._connections:
  583. try:
  584. connection.send_command(cmd)
  585. except Exception:
  586. # We probably want to catch some types of exceptions here
  587. # and log them as warnings (e.g. connection gone), but I
  588. # can't find what those exception types they would be.
  589. logger.exception(
  590. "Failed to write command %s to connection %s",
  591. cmd.NAME,
  592. connection,
  593. )
  594. else:
  595. logger.warning("Dropping command as not connected: %r", cmd.NAME)
  596. def send_federation_ack(self, token: int) -> None:
  597. """Ack data for the federation stream. This allows the master to drop
  598. data stored purely in memory.
  599. """
  600. self.send_command(FederationAckCommand(self._instance_name, token))
  601. def send_user_sync(
  602. self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
  603. ) -> None:
  604. """Poke the master that a user has started/stopped syncing."""
  605. self.send_command(
  606. UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
  607. )
  608. def send_user_ip(
  609. self,
  610. user_id: str,
  611. access_token: str,
  612. ip: str,
  613. user_agent: str,
  614. device_id: Optional[str],
  615. last_seen: int,
  616. ) -> None:
  617. """Tell the master that the user made a request."""
  618. cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
  619. self.send_command(cmd)
  620. def send_remote_server_up(self, server: str) -> None:
  621. self.send_command(RemoteServerUpCommand(server))
  622. def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None:
  623. """Called when a new update is available to stream to Redis subscribers.
  624. We need to check if the client is interested in the stream or not
  625. """
  626. self.send_command(RdataCommand(stream_name, self._instance_name, token, data))
  627. UpdateToken = TypeVar("UpdateToken")
  628. UpdateRow = TypeVar("UpdateRow")
  629. def _batch_updates(
  630. updates: Iterable[Tuple[UpdateToken, UpdateRow]]
  631. ) -> Iterator[Tuple[UpdateToken, List[UpdateRow]]]:
  632. """Collect stream updates with the same token together
  633. Given a series of updates returned by Stream.get_updates_since(), collects
  634. the updates which share the same stream_id together.
  635. For example:
  636. [(1, a), (1, b), (2, c), (3, d), (3, e)]
  637. becomes:
  638. [
  639. (1, [a, b]),
  640. (2, [c]),
  641. (3, [d, e]),
  642. ]
  643. """
  644. update_iter = iter(updates)
  645. first_update = next(update_iter, None)
  646. if first_update is None:
  647. # empty input
  648. return
  649. current_batch_token = first_update[0]
  650. current_batch = [first_update[1]]
  651. for token, row in update_iter:
  652. if token != current_batch_token:
  653. # different token to the previous row: flush the previous
  654. # batch and start anew
  655. yield current_batch_token, current_batch
  656. current_batch_token = token
  657. current_batch = []
  658. current_batch.append(row)
  659. # flush the final batch
  660. yield current_batch_token, current_batch