Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

tcp_replication.md 9.4 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. # TCP Replication
  2. ## Motivation
  3. Previously the workers used an HTTP long poll mechanism to get updates
  4. from the master, which had the problem of causing a lot of duplicate
  5. work on the server. This TCP protocol replaces those APIs with the aim
  6. of increased efficiency.
  7. ## Overview
  8. The protocol is based on fire and forget, line based commands. An
  9. example flow would be (where '>' indicates master to worker and
  10. '<' worker to master flows):
  11. > SERVER example.com
  12. < REPLICATE
  13. > POSITION events master 53 53
  14. > RDATA events master 54 ["$foo1:bar.com", ...]
  15. > RDATA events master 55 ["$foo4:bar.com", ...]
  16. The example shows the server accepting a new connection and sending its identity
  17. with the `SERVER` command, followed by the client server to respond with the
  18. position of all streams. The server then periodically sends `RDATA` commands
  19. which have the format `RDATA <stream_name> <instance_name> <token> <row>`, where
  20. the format of `<row>` is defined by the individual streams. The
  21. `<instance_name>` is the name of the Synapse process that generated the data
  22. (usually "master"). We expect an RDATA for every row in the DB.
  23. Error reporting happens by either the client or server sending an ERROR
  24. command, and usually the connection will be closed.
  25. Since the protocol is a simple line based, its possible to manually
  26. connect to the server using a tool like netcat. A few things should be
  27. noted when manually using the protocol:
  28. - The federation stream is only available if federation sending has
  29. been disabled on the main process.
  30. - The server will only time connections out that have sent a `PING`
  31. command. If a ping is sent then the connection will be closed if no
  32. further commands are received within 15s. Both the client and
  33. server protocol implementations will send an initial PING on
  34. connection and ensure at least one command every 5s is sent (not
  35. necessarily `PING`).
  36. - `RDATA` commands *usually* include a numeric token, however if the
  37. stream has multiple rows to replicate per token the server will send
  38. multiple `RDATA` commands, with all but the last having a token of
  39. `batch`. See the documentation on `commands.RdataCommand` for
  40. further details.
  41. ## Architecture
  42. The basic structure of the protocol is line based, where the initial
  43. word of each line specifies the command. The rest of the line is parsed
  44. based on the command. For example, the RDATA command is defined as:
  45. RDATA <stream_name> <instance_name> <token> <row_json>
  46. (Note that <row_json> may contains spaces, but cannot contain
  47. newlines.)
  48. Blank lines are ignored.
  49. ### Keep alives
  50. Both sides are expected to send at least one command every 5s or so, and
  51. should send a `PING` command if necessary. If either side do not receive
  52. a command within e.g. 15s then the connection should be closed.
  53. Because the server may be connected to manually using e.g. netcat, the
  54. timeouts aren't enabled until an initial `PING` command is seen. Both
  55. the client and server implementations below send a `PING` command
  56. immediately on connection to ensure the timeouts are enabled.
  57. This ensures that both sides can quickly realize if the tcp connection
  58. has gone and handle the situation appropriately.
  59. ### Start up
  60. When a new connection is made, the server:
  61. - Sends a `SERVER` command, which includes the identity of the server,
  62. allowing the client to detect if its connected to the expected
  63. server
  64. - Sends a `PING` command as above, to enable the client to time out
  65. connections promptly.
  66. The client:
  67. - Sends a `NAME` command, allowing the server to associate a human
  68. friendly name with the connection. This is optional.
  69. - Sends a `PING` as above
  70. - Sends a `REPLICATE` to get the current position of all streams.
  71. - On receipt of a `SERVER` command, checks that the server name
  72. matches the expected server name.
  73. ### Error handling
  74. If either side detects an error it can send an `ERROR` command and close
  75. the connection.
  76. If the client side loses the connection to the server it should
  77. reconnect, following the steps above.
  78. ### Congestion
  79. If the server sends messages faster than the client can consume them the
  80. server will first buffer a (fairly large) number of commands and then
  81. disconnect the client. This ensures that we don't queue up an unbounded
  82. number of commands in memory and gives us a potential opportunity to
  83. squawk loudly. When/if the client recovers it can reconnect to the
  84. server and ask for missed messages.
  85. ### Reliability
  86. In general the replication stream should be considered an unreliable
  87. transport since e.g. commands are not resent if the connection
  88. disappears.
  89. The exception to that are the replication streams, i.e. RDATA commands,
  90. since these include tokens which can be used to restart the stream on
  91. connection errors.
  92. The client should keep track of the token in the last RDATA command
  93. received for each stream so that on reconnection it can start streaming
  94. from the correct place. Note: not all RDATA have valid tokens due to
  95. batching. See `RdataCommand` for more details.
  96. ### Example
  97. An example interaction is shown below. Each line is prefixed with '>'
  98. or '<' to indicate which side is sending, these are *not* included on
  99. the wire:
  100. * connection established *
  101. > SERVER localhost:8823
  102. > PING 1490197665618
  103. < NAME synapse.app.appservice
  104. < PING 1490197665618
  105. < REPLICATE
  106. > POSITION events master 1 1
  107. > POSITION backfill master 1 1
  108. > POSITION caches master 1 1
  109. > RDATA caches master 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
  110. > RDATA events master 14 ["$149019767112vOHxz:localhost:8823",
  111. "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
  112. < PING 1490197675618
  113. > ERROR server stopping
  114. * connection closed by server *
  115. The `POSITION` command sent by the server is used to set the clients
  116. position without needing to send data with the `RDATA` command.
  117. An example of a batched set of `RDATA` is:
  118. > RDATA caches master batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
  119. > RDATA caches master batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
  120. > RDATA caches master batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
  121. > RDATA caches master 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
  122. In this case the client shouldn't advance their caches token until it
  123. sees the the last `RDATA`.
  124. ### List of commands
  125. The list of valid commands, with which side can send it: server (S) or
  126. client (C):
  127. #### SERVER (S)
  128. Sent at the start to identify which server the client is talking to
  129. #### RDATA (S)
  130. A single update in a stream
  131. #### POSITION (S)
  132. On receipt of a POSITION command clients should check if they have missed any
  133. updates, and if so then fetch them out of band. Sent in response to a
  134. REPLICATE command (but can happen at any time).
  135. The POSITION command includes the source of the stream. Currently all streams
  136. are written by a single process (usually "master"). If fetching missing
  137. updates via HTTP API, rather than via the DB, then processes should make the
  138. request to the appropriate process.
  139. Two positions are included, the "new" position and the last position sent respectively.
  140. This allows servers to tell instances that the positions have advanced but no
  141. data has been written, without clients needlessly checking to see if they
  142. have missed any updates. Instances will only fetch stuff if there is a gap between
  143. their current position and the given last position.
  144. #### ERROR (S, C)
  145. There was an error
  146. #### PING (S, C)
  147. Sent periodically to ensure the connection is still alive
  148. #### NAME (C)
  149. Sent at the start by client to inform the server who they are
  150. #### REPLICATE (C)
  151. Asks the server for the current position of all streams.
  152. #### USER_SYNC (C)
  153. A user has started or stopped syncing on this process.
  154. #### CLEAR_USER_SYNC (C)
  155. The server should clear all associated user sync data from the worker.
  156. This is used when a worker is shutting down.
  157. #### FEDERATION_ACK (C)
  158. Acknowledge receipt of some federation data
  159. ### REMOTE_SERVER_UP (S, C)
  160. Inform other processes that a remote server may have come back online.
  161. See `synapse/replication/tcp/commands.py` for a detailed description and
  162. the format of each command.
  163. ### Cache Invalidation Stream
  164. The cache invalidation stream is used to inform workers when they need
  165. to invalidate any of their caches in the data store. This is done by
  166. streaming all cache invalidations done on master down to the workers,
  167. assuming that any caches on the workers also exist on the master.
  168. Each individual cache invalidation results in a row being sent down
  169. replication, which includes the cache name (the name of the function)
  170. and they key to invalidate. For example:
  171. > RDATA caches master 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
  172. Alternatively, an entire cache can be invalidated by sending down a `null`
  173. instead of the key. For example:
  174. > RDATA caches master 550953772 ["get_user_by_id", null, 1550574873252]
  175. However, there are times when a number of caches need to be invalidated
  176. at the same time with the same key. To reduce traffic we batch those
  177. invalidations into a single poke by defining a special cache name that
  178. workers understand to mean to expand to invalidate the correct caches.
  179. Currently the special cache names are declared in
  180. `synapse/storage/_base.py` and are:
  181. 1. `cs_cache_fake` ─ invalidates caches that depend on the current
  182. state