You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

827 lines
30 KiB

  1. # Copyright 2017 Vector Creations Ltd
  2. # Copyright 2020, 2022 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from typing import (
  17. TYPE_CHECKING,
  18. Any,
  19. Awaitable,
  20. Deque,
  21. Dict,
  22. Iterable,
  23. Iterator,
  24. List,
  25. Optional,
  26. Set,
  27. Tuple,
  28. TypeVar,
  29. Union,
  30. )
  31. from prometheus_client import Counter
  32. from twisted.internet.protocol import ReconnectingClientFactory
  33. from synapse.metrics import LaterGauge
  34. from synapse.metrics.background_process_metrics import run_as_background_process
  35. from synapse.replication.tcp.commands import (
  36. ClearUserSyncsCommand,
  37. Command,
  38. FederationAckCommand,
  39. LockReleasedCommand,
  40. PositionCommand,
  41. RdataCommand,
  42. RemoteServerUpCommand,
  43. ReplicateCommand,
  44. UserIpCommand,
  45. UserSyncCommand,
  46. )
  47. from synapse.replication.tcp.context import ClientContextFactory
  48. from synapse.replication.tcp.protocol import IReplicationConnection
  49. from synapse.replication.tcp.streams import (
  50. STREAMS_MAP,
  51. AccountDataStream,
  52. BackfillStream,
  53. CachesStream,
  54. EventsStream,
  55. FederationStream,
  56. PresenceFederationStream,
  57. PresenceStream,
  58. ReceiptsStream,
  59. Stream,
  60. ToDeviceStream,
  61. TypingStream,
  62. )
  63. if TYPE_CHECKING:
  64. from synapse.server import HomeServer
  65. logger = logging.getLogger(__name__)
  66. # number of updates received for each RDATA stream
  67. inbound_rdata_count = Counter(
  68. "synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
  69. )
  70. user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
  71. federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
  72. remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
  73. user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
  74. # the type of the entries in _command_queues_by_stream
  75. _StreamCommandQueue = Deque[
  76. Tuple[Union[RdataCommand, PositionCommand], IReplicationConnection]
  77. ]
  78. class ReplicationCommandHandler:
  79. """Handles incoming commands from replication as well as sending commands
  80. back out to connections.
  81. """
  82. def __init__(self, hs: "HomeServer"):
  83. self._replication_data_handler = hs.get_replication_data_handler()
  84. self._presence_handler = hs.get_presence_handler()
  85. self._store = hs.get_datastores().main
  86. self._notifier = hs.get_notifier()
  87. self._clock = hs.get_clock()
  88. self._instance_id = hs.get_instance_id()
  89. self._instance_name = hs.get_instance_name()
  90. # Additional Redis channel suffixes to subscribe to.
  91. self._channels_to_subscribe_to: List[str] = []
  92. self._is_presence_writer = (
  93. hs.get_instance_name() in hs.config.worker.writers.presence
  94. )
  95. self._streams: Dict[str, Stream] = {
  96. stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
  97. }
  98. # List of streams that this instance is the source of
  99. self._streams_to_replicate: List[Stream] = []
  100. for stream in self._streams.values():
  101. if hs.config.redis.redis_enabled and stream.NAME == CachesStream.NAME:
  102. # All workers can write to the cache invalidation stream when
  103. # using redis.
  104. self._streams_to_replicate.append(stream)
  105. continue
  106. if isinstance(stream, (EventsStream, BackfillStream)):
  107. # Only add EventStream and BackfillStream as a source on the
  108. # instance in charge of event persistence.
  109. if hs.get_instance_name() in hs.config.worker.writers.events:
  110. self._streams_to_replicate.append(stream)
  111. continue
  112. if isinstance(stream, ToDeviceStream):
  113. # Only add ToDeviceStream as a source on instances in charge of
  114. # sending to device messages.
  115. if hs.get_instance_name() in hs.config.worker.writers.to_device:
  116. self._streams_to_replicate.append(stream)
  117. continue
  118. if isinstance(stream, TypingStream):
  119. # Only add TypingStream as a source on the instance in charge of
  120. # typing.
  121. if hs.get_instance_name() in hs.config.worker.writers.typing:
  122. self._streams_to_replicate.append(stream)
  123. continue
  124. if isinstance(stream, AccountDataStream):
  125. # Only add AccountDataStream and TagAccountDataStream as a source on the
  126. # instance in charge of account_data persistence.
  127. if hs.get_instance_name() in hs.config.worker.writers.account_data:
  128. self._streams_to_replicate.append(stream)
  129. continue
  130. if isinstance(stream, ReceiptsStream):
  131. # Only add ReceiptsStream as a source on the instance in charge of
  132. # receipts.
  133. if hs.get_instance_name() in hs.config.worker.writers.receipts:
  134. self._streams_to_replicate.append(stream)
  135. continue
  136. if isinstance(stream, (PresenceStream, PresenceFederationStream)):
  137. # Only add PresenceStream as a source on the instance in charge
  138. # of presence.
  139. if self._is_presence_writer:
  140. self._streams_to_replicate.append(stream)
  141. continue
  142. # Only add any other streams if we're on master.
  143. if hs.config.worker.worker_app is not None:
  144. continue
  145. if (
  146. stream.NAME == FederationStream.NAME
  147. and hs.config.worker.send_federation
  148. ):
  149. # We only support federation stream if federation sending
  150. # has been disabled on the master.
  151. continue
  152. self._streams_to_replicate.append(stream)
  153. # Map of stream name to batched updates. See RdataCommand for info on
  154. # how batching works.
  155. self._pending_batches: Dict[str, List[Any]] = {}
  156. # The factory used to create connections.
  157. self._factory: Optional[ReconnectingClientFactory] = None
  158. # The currently connected connections. (The list of places we need to send
  159. # outgoing replication commands to.)
  160. self._connections: List[IReplicationConnection] = []
  161. LaterGauge(
  162. "synapse_replication_tcp_resource_total_connections",
  163. "",
  164. [],
  165. lambda: len(self._connections),
  166. )
  167. # When POSITION or RDATA commands arrive, we stick them in a queue and process
  168. # them in order in a separate background process.
  169. # the streams which are currently being processed by _unsafe_process_queue
  170. self._processing_streams: Set[str] = set()
  171. # for each stream, a queue of commands that are awaiting processing, and the
  172. # connection that they arrived on.
  173. self._command_queues_by_stream = {
  174. stream_name: _StreamCommandQueue() for stream_name in self._streams
  175. }
  176. # For each connection, the incoming stream names that have received a POSITION
  177. # from that connection.
  178. self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
  179. LaterGauge(
  180. "synapse_replication_tcp_command_queue",
  181. "Number of inbound RDATA/POSITION commands queued for processing",
  182. ["stream_name"],
  183. lambda: {
  184. (stream_name,): len(queue)
  185. for stream_name, queue in self._command_queues_by_stream.items()
  186. },
  187. )
  188. self._is_master = hs.config.worker.worker_app is None
  189. self._federation_sender = None
  190. if self._is_master and not hs.config.worker.send_federation:
  191. self._federation_sender = hs.get_federation_sender()
  192. self._server_notices_sender = None
  193. if self._is_master:
  194. self._server_notices_sender = hs.get_server_notices_sender()
  195. if hs.config.redis.redis_enabled:
  196. # If we're using Redis, it's the background worker that should
  197. # receive USER_IP commands and store the relevant client IPs.
  198. self._should_insert_client_ips = hs.config.worker.run_background_tasks
  199. else:
  200. # If we're NOT using Redis, this must be handled by the master
  201. self._should_insert_client_ips = hs.get_instance_name() == "master"
  202. if self._is_master or self._should_insert_client_ips:
  203. self.subscribe_to_channel("USER_IP")
  204. if hs.config.redis.redis_enabled:
  205. self._notifier.add_lock_released_callback(self.on_lock_released)
  206. def subscribe_to_channel(self, channel_name: str) -> None:
  207. """
  208. Indicates that we wish to subscribe to a Redis channel by name.
  209. (The name will later be prefixed with the server name; i.e. subscribing
  210. to the 'ABC' channel actually subscribes to 'example.com/ABC' Redis-side.)
  211. Raises:
  212. - If replication has already started, then it's too late to subscribe
  213. to new channels.
  214. """
  215. if self._factory is not None:
  216. # We don't allow subscribing after the fact to avoid the chance
  217. # of missing an important message because we didn't subscribe in time.
  218. raise RuntimeError(
  219. "Cannot subscribe to more channels after replication started."
  220. )
  221. if channel_name not in self._channels_to_subscribe_to:
  222. self._channels_to_subscribe_to.append(channel_name)
  223. def _add_command_to_stream_queue(
  224. self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
  225. ) -> None:
  226. """Queue the given received command for processing
  227. Adds the given command to the per-stream queue, and processes the queue if
  228. necessary
  229. """
  230. stream_name = cmd.stream_name
  231. queue = self._command_queues_by_stream.get(stream_name)
  232. if queue is None:
  233. logger.error("Got %s for unknown stream: %s", cmd.NAME, stream_name)
  234. return
  235. queue.append((cmd, conn))
  236. # if we're already processing this stream, there's nothing more to do:
  237. # the new entry on the queue will get picked up in due course
  238. if stream_name in self._processing_streams:
  239. return
  240. # fire off a background process to start processing the queue.
  241. run_as_background_process(
  242. "process-replication-data", self._unsafe_process_queue, stream_name
  243. )
  244. async def _unsafe_process_queue(self, stream_name: str) -> None:
  245. """Processes the command queue for the given stream, until it is empty
  246. Does not check if there is already a thread processing the queue, hence "unsafe"
  247. """
  248. assert stream_name not in self._processing_streams
  249. self._processing_streams.add(stream_name)
  250. try:
  251. queue = self._command_queues_by_stream.get(stream_name)
  252. while queue:
  253. cmd, conn = queue.popleft()
  254. try:
  255. await self._process_command(cmd, conn, stream_name)
  256. except Exception:
  257. logger.exception("Failed to handle command %s", cmd)
  258. finally:
  259. self._processing_streams.discard(stream_name)
  260. async def _process_command(
  261. self,
  262. cmd: Union[PositionCommand, RdataCommand],
  263. conn: IReplicationConnection,
  264. stream_name: str,
  265. ) -> None:
  266. if isinstance(cmd, PositionCommand):
  267. await self._process_position(stream_name, conn, cmd)
  268. elif isinstance(cmd, RdataCommand):
  269. await self._process_rdata(stream_name, conn, cmd)
  270. else:
  271. # This shouldn't be possible
  272. raise Exception("Unrecognised command %s in stream queue", cmd.NAME)
  273. def start_replication(self, hs: "HomeServer") -> None:
  274. """Helper method to start replication."""
  275. from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory
  276. # First let's ensure that we have a ReplicationStreamer started.
  277. hs.get_replication_streamer()
  278. # We need two connections to redis, one for the subscription stream and
  279. # one to send commands to (as you can't send further redis commands to a
  280. # connection after SUBSCRIBE is called).
  281. # First create the connection for sending commands.
  282. outbound_redis_connection = hs.get_outbound_redis_connection()
  283. # Now create the factory/connection for the subscription stream.
  284. self._factory = RedisDirectTcpReplicationClientFactory(
  285. hs,
  286. outbound_redis_connection,
  287. channel_names=self._channels_to_subscribe_to,
  288. )
  289. reactor = hs.get_reactor()
  290. redis_config = hs.config.redis
  291. if redis_config.redis_path is not None:
  292. reactor.connectUNIX(
  293. redis_config.redis_path,
  294. self._factory,
  295. timeout=30,
  296. checkPID=False,
  297. )
  298. elif hs.config.redis.redis_use_tls:
  299. ssl_context_factory = ClientContextFactory(hs.config.redis)
  300. reactor.connectSSL(
  301. redis_config.redis_host,
  302. redis_config.redis_port,
  303. self._factory,
  304. ssl_context_factory,
  305. timeout=30,
  306. bindAddress=None,
  307. )
  308. else:
  309. reactor.connectTCP(
  310. redis_config.redis_host,
  311. redis_config.redis_port,
  312. self._factory,
  313. timeout=30,
  314. bindAddress=None,
  315. )
  316. def get_streams(self) -> Dict[str, Stream]:
  317. """Get a map from stream name to all streams."""
  318. return self._streams
  319. def get_streams_to_replicate(self) -> List[Stream]:
  320. """Get a list of streams that this instances replicates."""
  321. return self._streams_to_replicate
  322. def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
  323. self.send_positions_to_connection(conn)
  324. def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
  325. """Send current position of all streams this process is source of to
  326. the connection.
  327. """
  328. # We respond with current position of all streams this instance
  329. # replicates.
  330. for stream in self.get_streams_to_replicate():
  331. # Note that we use the current token as the prev token here (rather
  332. # than stream.last_token), as we can't be sure that there have been
  333. # no rows written between last token and the current token (since we
  334. # might be racing with the replication sending bg process).
  335. current_token = stream.current_token(self._instance_name)
  336. self.send_command(
  337. PositionCommand(
  338. stream.NAME,
  339. self._instance_name,
  340. current_token,
  341. current_token,
  342. )
  343. )
  344. def on_USER_SYNC(
  345. self, conn: IReplicationConnection, cmd: UserSyncCommand
  346. ) -> Optional[Awaitable[None]]:
  347. user_sync_counter.inc()
  348. if self._is_presence_writer:
  349. return self._presence_handler.update_external_syncs_row(
  350. cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
  351. )
  352. else:
  353. return None
  354. def on_CLEAR_USER_SYNC(
  355. self, conn: IReplicationConnection, cmd: ClearUserSyncsCommand
  356. ) -> Optional[Awaitable[None]]:
  357. if self._is_presence_writer:
  358. return self._presence_handler.update_external_syncs_clear(cmd.instance_id)
  359. else:
  360. return None
  361. def on_FEDERATION_ACK(
  362. self, conn: IReplicationConnection, cmd: FederationAckCommand
  363. ) -> None:
  364. federation_ack_counter.inc()
  365. if self._federation_sender:
  366. self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
  367. def on_USER_IP(
  368. self, conn: IReplicationConnection, cmd: UserIpCommand
  369. ) -> Optional[Awaitable[None]]:
  370. user_ip_cache_counter.inc()
  371. if self._is_master or self._should_insert_client_ips:
  372. # We make a point of only returning an awaitable if there's actually
  373. # something to do; on_USER_IP is not an async function, but
  374. # _handle_user_ip is.
  375. # If on_USER_IP returns an awaitable, it gets scheduled as a
  376. # background process (see `BaseReplicationStreamProtocol.handle_command`).
  377. return self._handle_user_ip(cmd)
  378. else:
  379. # Returning None when this process definitely has nothing to do
  380. # reduces the overhead of handling the USER_IP command, which is
  381. # currently broadcast to all workers regardless of utility.
  382. return None
  383. async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
  384. """
  385. Handles a User IP, branching depending on whether we are the main process
  386. and/or the background worker.
  387. """
  388. if self._is_master:
  389. assert self._server_notices_sender is not None
  390. await self._server_notices_sender.on_user_ip(cmd.user_id)
  391. if self._should_insert_client_ips:
  392. await self._store.insert_client_ip(
  393. cmd.user_id,
  394. cmd.access_token,
  395. cmd.ip,
  396. cmd.user_agent,
  397. cmd.device_id,
  398. cmd.last_seen,
  399. )
  400. def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None:
  401. if cmd.instance_name == self._instance_name:
  402. # Ignore RDATA that are just our own echoes
  403. return
  404. stream_name = cmd.stream_name
  405. inbound_rdata_count.labels(stream_name).inc()
  406. # We put the received command into a queue here for two reasons:
  407. # 1. so we don't try and concurrently handle multiple rows for the
  408. # same stream, and
  409. # 2. so we don't race with getting a POSITION command and fetching
  410. # missing RDATA.
  411. self._add_command_to_stream_queue(conn, cmd)
  412. async def _process_rdata(
  413. self, stream_name: str, conn: IReplicationConnection, cmd: RdataCommand
  414. ) -> None:
  415. """Process an RDATA command
  416. Called after the command has been popped off the queue of inbound commands
  417. """
  418. try:
  419. row = STREAMS_MAP[stream_name].parse_row(cmd.row)
  420. except Exception as e:
  421. raise Exception(
  422. "Failed to parse RDATA: %r %r" % (stream_name, cmd.row)
  423. ) from e
  424. # make sure that we've processed a POSITION for this stream *on this
  425. # connection*. (A POSITION on another connection is no good, as there
  426. # is no guarantee that we have seen all the intermediate updates.)
  427. sbc = self._streams_by_connection.get(conn)
  428. if not sbc or stream_name not in sbc:
  429. # Let's drop the row for now, on the assumption we'll receive a
  430. # `POSITION` soon and we'll catch up correctly then.
  431. logger.debug(
  432. "Discarding RDATA for unconnected stream %s -> %s",
  433. stream_name,
  434. cmd.token,
  435. )
  436. return
  437. if cmd.token is None:
  438. # I.e. this is part of a batch of updates for this stream (in
  439. # which case batch until we get an update for the stream with a non
  440. # None token).
  441. self._pending_batches.setdefault(stream_name, []).append(row)
  442. return
  443. # Check if this is the last of a batch of updates
  444. rows = self._pending_batches.pop(stream_name, [])
  445. rows.append(row)
  446. stream = self._streams[stream_name]
  447. # Find where we previously streamed up to.
  448. current_token = stream.current_token(cmd.instance_name)
  449. # Discard this data if this token is earlier than the current
  450. # position. Note that streams can be reset (in which case you
  451. # expect an earlier token), but that must be preceded by a
  452. # POSITION command.
  453. if cmd.token <= current_token:
  454. logger.debug(
  455. "Discarding RDATA from stream %s at position %s before previous position %s",
  456. stream_name,
  457. cmd.token,
  458. current_token,
  459. )
  460. else:
  461. await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
  462. async def on_rdata(
  463. self, stream_name: str, instance_name: str, token: int, rows: list
  464. ) -> None:
  465. """Called to handle a batch of replication data with a given stream token.
  466. Args:
  467. stream_name: name of the replication stream for this batch of rows
  468. instance_name: the instance that wrote the rows.
  469. token: stream token for this batch of rows
  470. rows: a list of Stream.ROW_TYPE objects as returned by
  471. Stream.parse_row.
  472. """
  473. logger.debug("Received rdata %s (%s) -> %s", stream_name, instance_name, token)
  474. await self._replication_data_handler.on_rdata(
  475. stream_name, instance_name, token, rows
  476. )
  477. def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand) -> None:
  478. if cmd.instance_name == self._instance_name:
  479. # Ignore POSITION that are just our own echoes
  480. return
  481. logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
  482. self._add_command_to_stream_queue(conn, cmd)
  483. async def _process_position(
  484. self, stream_name: str, conn: IReplicationConnection, cmd: PositionCommand
  485. ) -> None:
  486. """Process a POSITION command
  487. Called after the command has been popped off the queue of inbound commands
  488. """
  489. stream = self._streams[stream_name]
  490. # We're about to go and catch up with the stream, so remove from set
  491. # of connected streams.
  492. for streams in self._streams_by_connection.values():
  493. streams.discard(stream_name)
  494. # We clear the pending batches for the stream as the fetching of the
  495. # missing updates below will fetch all rows in the batch.
  496. self._pending_batches.pop(stream_name, [])
  497. # Find where we previously streamed up to.
  498. current_token = stream.current_token(cmd.instance_name)
  499. # If the position token matches our current token then we're up to
  500. # date and there's nothing to do. Otherwise, fetch all updates
  501. # between then and now.
  502. missing_updates = cmd.prev_token != current_token
  503. while missing_updates:
  504. # Note: There may very well not be any new updates, but we check to
  505. # make sure. This can particularly happen for the event stream where
  506. # event persisters continuously send `POSITION`. See `resource.py`
  507. # for why this can happen.
  508. logger.info(
  509. "Fetching replication rows for '%s' between %i and %i",
  510. stream_name,
  511. current_token,
  512. cmd.new_token,
  513. )
  514. (updates, current_token, missing_updates) = await stream.get_updates_since(
  515. cmd.instance_name, current_token, cmd.new_token
  516. )
  517. # TODO: add some tests for this
  518. # Some streams return multiple rows with the same stream IDs,
  519. # which need to be processed in batches.
  520. for token, rows in _batch_updates(updates):
  521. await self.on_rdata(
  522. stream_name,
  523. cmd.instance_name,
  524. token,
  525. [stream.parse_row(row) for row in rows],
  526. )
  527. logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)
  528. # We've now caught up to position sent to us, notify handler.
  529. await self._replication_data_handler.on_position(
  530. cmd.stream_name, cmd.instance_name, cmd.new_token
  531. )
  532. self._streams_by_connection.setdefault(conn, set()).add(stream_name)
  533. def on_REMOTE_SERVER_UP(
  534. self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
  535. ) -> None:
  536. """Called when get a new REMOTE_SERVER_UP command."""
  537. self._replication_data_handler.on_remote_server_up(cmd.data)
  538. self._notifier.notify_remote_server_up(cmd.data)
  539. def on_LOCK_RELEASED(
  540. self, conn: IReplicationConnection, cmd: LockReleasedCommand
  541. ) -> None:
  542. """Called when we get a new LOCK_RELEASED command."""
  543. if cmd.instance_name == self._instance_name:
  544. return
  545. self._notifier.notify_lock_released(
  546. cmd.instance_name, cmd.lock_name, cmd.lock_key
  547. )
  548. def new_connection(self, connection: IReplicationConnection) -> None:
  549. """Called when we have a new connection."""
  550. self._connections.append(connection)
  551. # If we are connected to replication as a client (rather than a server)
  552. # we need to reset the reconnection delay on the client factory (which
  553. # is used to do exponential back off when the connection drops).
  554. #
  555. # Ideally we would reset the delay when we've "fully established" the
  556. # connection (for some definition thereof) to stop us from tightlooping
  557. # on reconnection if something fails after this point and we drop the
  558. # connection. Unfortunately, we don't really have a better definition of
  559. # "fully established" than the connection being established.
  560. if self._factory:
  561. self._factory.resetDelay()
  562. # Tell the other end if we have any users currently syncing.
  563. currently_syncing = (
  564. self._presence_handler.get_currently_syncing_users_for_replication()
  565. )
  566. now = self._clock.time_msec()
  567. for user_id in currently_syncing:
  568. connection.send_command(
  569. UserSyncCommand(self._instance_id, user_id, True, now)
  570. )
  571. def lost_connection(self, connection: IReplicationConnection) -> None:
  572. """Called when a connection is closed/lost."""
  573. # we no longer need _streams_by_connection for this connection.
  574. streams = self._streams_by_connection.pop(connection, None)
  575. if streams:
  576. logger.info(
  577. "Lost replication connection; streams now disconnected: %s", streams
  578. )
  579. try:
  580. self._connections.remove(connection)
  581. except ValueError:
  582. pass
  583. def connected(self) -> bool:
  584. """Do we have any replication connections open?
  585. Is used by e.g. `ReplicationStreamer` to no-op if nothing is connected.
  586. """
  587. return bool(self._connections)
  588. def send_command(self, cmd: Command) -> None:
  589. """Send a command to all connected connections.
  590. Args:
  591. cmd
  592. """
  593. if self._connections:
  594. for connection in self._connections:
  595. try:
  596. connection.send_command(cmd)
  597. except Exception:
  598. # We probably want to catch some types of exceptions here
  599. # and log them as warnings (e.g. connection gone), but I
  600. # can't find what those exception types they would be.
  601. logger.exception(
  602. "Failed to write command %s to connection %s",
  603. cmd.NAME,
  604. connection,
  605. )
  606. else:
  607. logger.warning("Dropping command as not connected: %r", cmd.NAME)
  608. def send_federation_ack(self, token: int) -> None:
  609. """Ack data for the federation stream. This allows the master to drop
  610. data stored purely in memory.
  611. """
  612. self.send_command(FederationAckCommand(self._instance_name, token))
  613. def send_user_sync(
  614. self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
  615. ) -> None:
  616. """Poke the master that a user has started/stopped syncing."""
  617. self.send_command(
  618. UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
  619. )
  620. def send_user_ip(
  621. self,
  622. user_id: str,
  623. access_token: str,
  624. ip: str,
  625. user_agent: str,
  626. device_id: Optional[str],
  627. last_seen: int,
  628. ) -> None:
  629. """Tell the master that the user made a request."""
  630. cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
  631. self.send_command(cmd)
  632. def send_remote_server_up(self, server: str) -> None:
  633. self.send_command(RemoteServerUpCommand(server))
  634. def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None:
  635. """Called when a new update is available to stream to Redis subscribers.
  636. We need to check if the client is interested in the stream or not
  637. """
  638. self.send_command(RdataCommand(stream_name, self._instance_name, token, data))
  639. def on_lock_released(
  640. self, instance_name: str, lock_name: str, lock_key: str
  641. ) -> None:
  642. """Called when we released a lock and should notify other instances."""
  643. if instance_name == self._instance_name:
  644. self.send_command(LockReleasedCommand(instance_name, lock_name, lock_key))
  645. UpdateToken = TypeVar("UpdateToken")
  646. UpdateRow = TypeVar("UpdateRow")
  647. def _batch_updates(
  648. updates: Iterable[Tuple[UpdateToken, UpdateRow]]
  649. ) -> Iterator[Tuple[UpdateToken, List[UpdateRow]]]:
  650. """Collect stream updates with the same token together
  651. Given a series of updates returned by Stream.get_updates_since(), collects
  652. the updates which share the same stream_id together.
  653. For example:
  654. [(1, a), (1, b), (2, c), (3, d), (3, e)]
  655. becomes:
  656. [
  657. (1, [a, b]),
  658. (2, [c]),
  659. (3, [d, e]),
  660. ]
  661. """
  662. update_iter = iter(updates)
  663. first_update = next(update_iter, None)
  664. if first_update is None:
  665. # empty input
  666. return
  667. current_batch_token = first_update[0]
  668. current_batch = [first_update[1]]
  669. for token, row in update_iter:
  670. if token != current_batch_token:
  671. # different token to the previous row: flush the previous
  672. # batch and start anew
  673. yield current_batch_token, current_batch
  674. current_batch_token = token
  675. current_batch = []
  676. current_batch.append(row)
  677. # flush the final batch
  678. yield current_batch_token, current_batch