- # TCP Replication
-
- ## Motivation
-
- Previously the workers used an HTTP long poll mechanism to get updates
- from the master, which had the problem of causing a lot of duplicate
- work on the server. This TCP protocol replaces those APIs with the aim
- of increased efficiency.
-
- ## Overview
-
- The protocol is based on fire and forget, line based commands. An
- example flow would be (where '>' indicates master to worker and
- '<' worker to master flows):
-
- > SERVER example.com
- < REPLICATE
- > POSITION events master 53 53
- > RDATA events master 54 ["$foo1:bar.com", ...]
- > RDATA events master 55 ["$foo4:bar.com", ...]
-
- The example shows the server accepting a new connection and sending its identity
- with the `SERVER` command, followed by the client server to respond with the
- position of all streams. The server then periodically sends `RDATA` commands
- which have the format `RDATA <stream_name> <instance_name> <token> <row>`, where
- the format of `<row>` is defined by the individual streams. The
- `<instance_name>` is the name of the Synapse process that generated the data
- (usually "master"). We expect an RDATA for every row in the DB.
-
- Error reporting happens by either the client or server sending an ERROR
- command, and usually the connection will be closed.
-
- Since the protocol is a simple line based, its possible to manually
- connect to the server using a tool like netcat. A few things should be
- noted when manually using the protocol:
-
- - The federation stream is only available if federation sending has
- been disabled on the main process.
- - The server will only time connections out that have sent a `PING`
- command. If a ping is sent then the connection will be closed if no
- further commands are received within 15s. Both the client and
- server protocol implementations will send an initial PING on
- connection and ensure at least one command every 5s is sent (not
- necessarily `PING`).
- - `RDATA` commands *usually* include a numeric token, however if the
- stream has multiple rows to replicate per token the server will send
- multiple `RDATA` commands, with all but the last having a token of
- `batch`. See the documentation on `commands.RdataCommand` for
- further details.
-
- ## Architecture
-
- The basic structure of the protocol is line based, where the initial
- word of each line specifies the command. The rest of the line is parsed
- based on the command. For example, the RDATA command is defined as:
-
- RDATA <stream_name> <instance_name> <token> <row_json>
-
- (Note that <row_json> may contains spaces, but cannot contain
- newlines.)
-
- Blank lines are ignored.
-
- ### Keep alives
-
- Both sides are expected to send at least one command every 5s or so, and
- should send a `PING` command if necessary. If either side do not receive
- a command within e.g. 15s then the connection should be closed.
-
- Because the server may be connected to manually using e.g. netcat, the
- timeouts aren't enabled until an initial `PING` command is seen. Both
- the client and server implementations below send a `PING` command
- immediately on connection to ensure the timeouts are enabled.
-
- This ensures that both sides can quickly realize if the tcp connection
- has gone and handle the situation appropriately.
-
- ### Start up
-
- When a new connection is made, the server:
-
- - Sends a `SERVER` command, which includes the identity of the server,
- allowing the client to detect if its connected to the expected
- server
- - Sends a `PING` command as above, to enable the client to time out
- connections promptly.
-
- The client:
-
- - Sends a `NAME` command, allowing the server to associate a human
- friendly name with the connection. This is optional.
- - Sends a `PING` as above
- - Sends a `REPLICATE` to get the current position of all streams.
- - On receipt of a `SERVER` command, checks that the server name
- matches the expected server name.
-
- ### Error handling
-
- If either side detects an error it can send an `ERROR` command and close
- the connection.
-
- If the client side loses the connection to the server it should
- reconnect, following the steps above.
-
- ### Congestion
-
- If the server sends messages faster than the client can consume them the
- server will first buffer a (fairly large) number of commands and then
- disconnect the client. This ensures that we don't queue up an unbounded
- number of commands in memory and gives us a potential opportunity to
- squawk loudly. When/if the client recovers it can reconnect to the
- server and ask for missed messages.
-
- ### Reliability
-
- In general the replication stream should be considered an unreliable
- transport since e.g. commands are not resent if the connection
- disappears.
-
- The exception to that are the replication streams, i.e. RDATA commands,
- since these include tokens which can be used to restart the stream on
- connection errors.
-
- The client should keep track of the token in the last RDATA command
- received for each stream so that on reconnection it can start streaming
- from the correct place. Note: not all RDATA have valid tokens due to
- batching. See `RdataCommand` for more details.
-
- ### Example
-
- An example interaction is shown below. Each line is prefixed with '>'
- or '<' to indicate which side is sending, these are *not* included on
- the wire:
-
- * connection established *
- > SERVER localhost:8823
- > PING 1490197665618
- < NAME synapse.app.appservice
- < PING 1490197665618
- < REPLICATE
- > POSITION events master 1 1
- > POSITION backfill master 1 1
- > POSITION caches master 1 1
- > RDATA caches master 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
- > RDATA events master 14 ["$149019767112vOHxz:localhost:8823",
- "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
- < PING 1490197675618
- > ERROR server stopping
- * connection closed by server *
-
- The `POSITION` command sent by the server is used to set the clients
- position without needing to send data with the `RDATA` command.
-
- An example of a batched set of `RDATA` is:
-
- > RDATA caches master batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
- > RDATA caches master batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
- > RDATA caches master batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
- > RDATA caches master 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
-
- In this case the client shouldn't advance their caches token until it
- sees the the last `RDATA`.
-
- ### List of commands
-
- The list of valid commands, with which side can send it: server (S) or
- client (C):
-
- #### SERVER (S)
-
- Sent at the start to identify which server the client is talking to
-
- #### RDATA (S)
-
- A single update in a stream
-
- #### POSITION (S)
-
- On receipt of a POSITION command clients should check if they have missed any
- updates, and if so then fetch them out of band. Sent in response to a
- REPLICATE command (but can happen at any time).
-
- The POSITION command includes the source of the stream. Currently all streams
- are written by a single process (usually "master"). If fetching missing
- updates via HTTP API, rather than via the DB, then processes should make the
- request to the appropriate process.
-
- Two positions are included, the "new" position and the last position sent respectively.
- This allows servers to tell instances that the positions have advanced but no
- data has been written, without clients needlessly checking to see if they
- have missed any updates. Instances will only fetch stuff if there is a gap between
- their current position and the given last position.
-
- #### ERROR (S, C)
-
- There was an error
-
- #### PING (S, C)
-
- Sent periodically to ensure the connection is still alive
-
- #### NAME (C)
-
- Sent at the start by client to inform the server who they are
-
- #### REPLICATE (C)
-
- Asks the server for the current position of all streams.
-
- #### USER_SYNC (C)
-
- A user has started or stopped syncing on this process.
-
- #### CLEAR_USER_SYNC (C)
-
- The server should clear all associated user sync data from the worker.
-
- This is used when a worker is shutting down.
-
- #### FEDERATION_ACK (C)
-
- Acknowledge receipt of some federation data
-
- ### REMOTE_SERVER_UP (S, C)
-
- Inform other processes that a remote server may have come back online.
-
- See `synapse/replication/tcp/commands.py` for a detailed description and
- the format of each command.
-
- ### Cache Invalidation Stream
-
- The cache invalidation stream is used to inform workers when they need
- to invalidate any of their caches in the data store. This is done by
- streaming all cache invalidations done on master down to the workers,
- assuming that any caches on the workers also exist on the master.
-
- Each individual cache invalidation results in a row being sent down
- replication, which includes the cache name (the name of the function)
- and they key to invalidate. For example:
-
- > RDATA caches master 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
-
- Alternatively, an entire cache can be invalidated by sending down a `null`
- instead of the key. For example:
-
- > RDATA caches master 550953772 ["get_user_by_id", null, 1550574873252]
-
- However, there are times when a number of caches need to be invalidated
- at the same time with the same key. To reduce traffic we batch those
- invalidations into a single poke by defining a special cache name that
- workers understand to mean to expand to invalidate the correct caches.
-
- Currently the special cache names are declared in
- `synapse/storage/_base.py` and are:
-
- 1. `cs_cache_fake` ─ invalidates caches that depend on the current
- state
|