You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

551 lines
21 KiB

  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import itertools
  15. import logging
  16. from collections import defaultdict
  17. from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
  18. from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
  19. from synapse.api.errors import Codes, StoreError, SynapseError
  20. from synapse.api.filtering import FilterCollection
  21. from synapse.api.presence import UserPresenceState
  22. from synapse.events.utils import (
  23. SerializeEventConfig,
  24. format_event_for_client_v2_without_room_id,
  25. format_event_raw,
  26. )
  27. from synapse.handlers.presence import format_user_presence_state
  28. from synapse.handlers.sync import (
  29. ArchivedSyncResult,
  30. InvitedSyncResult,
  31. JoinedSyncResult,
  32. KnockedSyncResult,
  33. SyncConfig,
  34. SyncResult,
  35. )
  36. from synapse.http.server import HttpServer
  37. from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
  38. from synapse.http.site import SynapseRequest
  39. from synapse.logging.opentracing import trace_with_opname
  40. from synapse.types import JsonDict, Requester, StreamToken
  41. from synapse.util import json_decoder
  42. from ._base import client_patterns, set_timeline_upper_limit
  43. if TYPE_CHECKING:
  44. from synapse.server import HomeServer
  45. logger = logging.getLogger(__name__)
  46. class SyncRestServlet(RestServlet):
  47. """
  48. GET parameters::
  49. timeout(int): How long to wait for new events in milliseconds.
  50. since(batch_token): Batch token when asking for incremental deltas.
  51. set_presence(str): What state the device presence should be set to.
  52. default is "online".
  53. filter(filter_id): A filter to apply to the events returned.
  54. Response JSON::
  55. {
  56. "next_batch": // batch token for the next /sync
  57. "presence": // presence data for the user.
  58. "rooms": {
  59. "join": { // Joined rooms being updated.
  60. "${room_id}": { // Id of the room being updated
  61. "event_map": // Map of EventID -> event JSON.
  62. "timeline": { // The recent events in the room if gap is "true"
  63. "limited": // Was the per-room event limit exceeded?
  64. // otherwise the next events in the room.
  65. "events": [] // list of EventIDs in the "event_map".
  66. "prev_batch": // back token for getting previous events.
  67. }
  68. "state": {"events": []} // list of EventIDs updating the
  69. // current state to be what it should
  70. // be at the end of the batch.
  71. "ephemeral": {"events": []} // list of event objects
  72. }
  73. },
  74. "invite": {}, // Invited rooms being updated.
  75. "leave": {} // Archived rooms being updated.
  76. }
  77. }
  78. """
  79. PATTERNS = client_patterns("/sync$")
  80. ALLOWED_PRESENCE = {"online", "offline", "unavailable"}
  81. CATEGORY = "Sync requests"
  82. def __init__(self, hs: "HomeServer"):
  83. super().__init__()
  84. self.hs = hs
  85. self.auth = hs.get_auth()
  86. self.store = hs.get_datastores().main
  87. self.sync_handler = hs.get_sync_handler()
  88. self.clock = hs.get_clock()
  89. self.filtering = hs.get_filtering()
  90. self.presence_handler = hs.get_presence_handler()
  91. self._server_notices_sender = hs.get_server_notices_sender()
  92. self._event_serializer = hs.get_event_client_serializer()
  93. self._msc2654_enabled = hs.config.experimental.msc2654_enabled
  94. self._msc3773_enabled = hs.config.experimental.msc3773_enabled
  95. async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
  96. # This will always be set by the time Twisted calls us.
  97. assert request.args is not None
  98. if b"from" in request.args:
  99. # /events used to use 'from', but /sync uses 'since'.
  100. # Lets be helpful and whine if we see a 'from'.
  101. raise SynapseError(
  102. 400, "'from' is not a valid query parameter. Did you mean 'since'?"
  103. )
  104. requester = await self.auth.get_user_by_req(request, allow_guest=True)
  105. user = requester.user
  106. device_id = requester.device_id
  107. timeout = parse_integer(request, "timeout", default=0)
  108. since = parse_string(request, "since")
  109. set_presence = parse_string(
  110. request,
  111. "set_presence",
  112. default="online",
  113. allowed_values=self.ALLOWED_PRESENCE,
  114. )
  115. filter_id = parse_string(request, "filter")
  116. full_state = parse_boolean(request, "full_state", default=False)
  117. logger.debug(
  118. "/sync: user=%r, timeout=%r, since=%r, "
  119. "set_presence=%r, filter_id=%r, device_id=%r",
  120. user,
  121. timeout,
  122. since,
  123. set_presence,
  124. filter_id,
  125. device_id,
  126. )
  127. # Stream position of the last ignored users account data event for this user,
  128. # if we're initial syncing.
  129. # We include this in the request key to invalidate an initial sync
  130. # in the response cache once the set of ignored users has changed.
  131. # (We filter out ignored users from timeline events, so our sync response
  132. # is invalid once the set of ignored users changes.)
  133. last_ignore_accdata_streampos: Optional[int] = None
  134. if not since:
  135. # No `since`, so this is an initial sync.
  136. last_ignore_accdata_streampos = await self.store.get_latest_stream_id_for_global_account_data_by_type_for_user(
  137. user.to_string(), AccountDataTypes.IGNORED_USER_LIST
  138. )
  139. request_key = (
  140. user,
  141. timeout,
  142. since,
  143. filter_id,
  144. full_state,
  145. device_id,
  146. last_ignore_accdata_streampos,
  147. )
  148. if filter_id is None:
  149. filter_collection = self.filtering.DEFAULT_FILTER_COLLECTION
  150. elif filter_id.startswith("{"):
  151. try:
  152. filter_object = json_decoder.decode(filter_id)
  153. except Exception:
  154. raise SynapseError(400, "Invalid filter JSON", errcode=Codes.NOT_JSON)
  155. self.filtering.check_valid_filter(filter_object)
  156. set_timeline_upper_limit(
  157. filter_object, self.hs.config.server.filter_timeline_limit
  158. )
  159. filter_collection = FilterCollection(self.hs, filter_object)
  160. else:
  161. try:
  162. filter_collection = await self.filtering.get_user_filter(
  163. user, filter_id
  164. )
  165. except StoreError as err:
  166. if err.code != 404:
  167. raise
  168. # fix up the description and errcode to be more useful
  169. raise SynapseError(400, "No such filter", errcode=Codes.INVALID_PARAM)
  170. sync_config = SyncConfig(
  171. user=user,
  172. filter_collection=filter_collection,
  173. is_guest=requester.is_guest,
  174. request_key=request_key,
  175. device_id=device_id,
  176. )
  177. since_token = None
  178. if since is not None:
  179. since_token = await StreamToken.from_string(self.store, since)
  180. # send any outstanding server notices to the user.
  181. await self._server_notices_sender.on_user_syncing(user.to_string())
  182. affect_presence = set_presence != PresenceState.OFFLINE
  183. context = await self.presence_handler.user_syncing(
  184. user.to_string(),
  185. requester.device_id,
  186. affect_presence=affect_presence,
  187. presence_state=set_presence,
  188. )
  189. with context:
  190. sync_result = await self.sync_handler.wait_for_sync_for_user(
  191. requester,
  192. sync_config,
  193. since_token=since_token,
  194. timeout=timeout,
  195. full_state=full_state,
  196. )
  197. # the client may have disconnected by now; don't bother to serialize the
  198. # response if so.
  199. if request._disconnected:
  200. logger.info("Client has disconnected; not serializing response.")
  201. return 200, {}
  202. time_now = self.clock.time_msec()
  203. # We know that the the requester has an access token since appservices
  204. # cannot use sync.
  205. response_content = await self.encode_response(
  206. time_now, sync_result, requester, filter_collection
  207. )
  208. logger.debug("Event formatting complete")
  209. return 200, response_content
  210. @trace_with_opname("sync.encode_response")
  211. async def encode_response(
  212. self,
  213. time_now: int,
  214. sync_result: SyncResult,
  215. requester: Requester,
  216. filter: FilterCollection,
  217. ) -> JsonDict:
  218. logger.debug("Formatting events in sync response")
  219. if filter.event_format == "client":
  220. event_formatter = format_event_for_client_v2_without_room_id
  221. elif filter.event_format == "federation":
  222. event_formatter = format_event_raw
  223. else:
  224. raise Exception("Unknown event format %s" % (filter.event_format,))
  225. serialize_options = SerializeEventConfig(
  226. event_format=event_formatter,
  227. requester=requester,
  228. only_event_fields=filter.event_fields,
  229. )
  230. stripped_serialize_options = SerializeEventConfig(
  231. event_format=event_formatter,
  232. requester=requester,
  233. include_stripped_room_state=True,
  234. )
  235. joined = await self.encode_joined(
  236. sync_result.joined, time_now, serialize_options
  237. )
  238. invited = await self.encode_invited(
  239. sync_result.invited, time_now, stripped_serialize_options
  240. )
  241. knocked = await self.encode_knocked(
  242. sync_result.knocked, time_now, stripped_serialize_options
  243. )
  244. archived = await self.encode_archived(
  245. sync_result.archived, time_now, serialize_options
  246. )
  247. logger.debug("building sync response dict")
  248. response: JsonDict = defaultdict(dict)
  249. response["next_batch"] = await sync_result.next_batch.to_string(self.store)
  250. if sync_result.account_data:
  251. response["account_data"] = {"events": sync_result.account_data}
  252. if sync_result.presence:
  253. response["presence"] = SyncRestServlet.encode_presence(
  254. sync_result.presence, time_now
  255. )
  256. if sync_result.to_device:
  257. response["to_device"] = {"events": sync_result.to_device}
  258. if sync_result.device_lists.changed:
  259. response["device_lists"]["changed"] = list(sync_result.device_lists.changed)
  260. if sync_result.device_lists.left:
  261. response["device_lists"]["left"] = list(sync_result.device_lists.left)
  262. # We always include this because https://github.com/vector-im/element-android/issues/3725
  263. # The spec isn't terribly clear on when this can be omitted and how a client would tell
  264. # the difference between "no keys present" and "nothing changed" in terms of whole field
  265. # absent / individual key type entry absent
  266. # Corresponding synapse issue: https://github.com/matrix-org/synapse/issues/10456
  267. response["device_one_time_keys_count"] = sync_result.device_one_time_keys_count
  268. # https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
  269. # states that this field should always be included, as long as the server supports the feature.
  270. response[
  271. "org.matrix.msc2732.device_unused_fallback_key_types"
  272. ] = sync_result.device_unused_fallback_key_types
  273. response[
  274. "device_unused_fallback_key_types"
  275. ] = sync_result.device_unused_fallback_key_types
  276. if joined:
  277. response["rooms"][Membership.JOIN] = joined
  278. if invited:
  279. response["rooms"][Membership.INVITE] = invited
  280. if knocked:
  281. response["rooms"][Membership.KNOCK] = knocked
  282. if archived:
  283. response["rooms"][Membership.LEAVE] = archived
  284. return response
  285. @staticmethod
  286. def encode_presence(events: List[UserPresenceState], time_now: int) -> JsonDict:
  287. return {
  288. "events": [
  289. {
  290. "type": EduTypes.PRESENCE,
  291. "sender": event.user_id,
  292. "content": format_user_presence_state(
  293. event, time_now, include_user_id=False
  294. ),
  295. }
  296. for event in events
  297. ]
  298. }
  299. @trace_with_opname("sync.encode_joined")
  300. async def encode_joined(
  301. self,
  302. rooms: List[JoinedSyncResult],
  303. time_now: int,
  304. serialize_options: SerializeEventConfig,
  305. ) -> JsonDict:
  306. """
  307. Encode the joined rooms in a sync result
  308. Args:
  309. rooms: list of sync results for rooms this user is joined to
  310. time_now: current time - used as a baseline for age calculations
  311. serialize_options: Event serializer options
  312. Returns:
  313. The joined rooms list, in our response format
  314. """
  315. joined = {}
  316. for room in rooms:
  317. joined[room.room_id] = await self.encode_room(
  318. room, time_now, joined=True, serialize_options=serialize_options
  319. )
  320. return joined
  321. @trace_with_opname("sync.encode_invited")
  322. async def encode_invited(
  323. self,
  324. rooms: List[InvitedSyncResult],
  325. time_now: int,
  326. serialize_options: SerializeEventConfig,
  327. ) -> JsonDict:
  328. """
  329. Encode the invited rooms in a sync result
  330. Args:
  331. rooms: list of sync results for rooms this user is invited to
  332. time_now: current time - used as a baseline for age calculations
  333. serialize_options: Event serializer options
  334. Returns:
  335. The invited rooms list, in our response format
  336. """
  337. invited = {}
  338. for room in rooms:
  339. invite = await self._event_serializer.serialize_event(
  340. room.invite, time_now, config=serialize_options
  341. )
  342. unsigned = dict(invite.get("unsigned", {}))
  343. invite["unsigned"] = unsigned
  344. invited_state = list(unsigned.pop("invite_room_state", []))
  345. invited_state.append(invite)
  346. invited[room.room_id] = {"invite_state": {"events": invited_state}}
  347. return invited
  348. @trace_with_opname("sync.encode_knocked")
  349. async def encode_knocked(
  350. self,
  351. rooms: List[KnockedSyncResult],
  352. time_now: int,
  353. serialize_options: SerializeEventConfig,
  354. ) -> Dict[str, Dict[str, Any]]:
  355. """
  356. Encode the rooms we've knocked on in a sync result.
  357. Args:
  358. rooms: list of sync results for rooms this user is knocking on
  359. time_now: current time - used as a baseline for age calculations
  360. serialize_options: Event serializer options
  361. Returns:
  362. The list of rooms the user has knocked on, in our response format.
  363. """
  364. knocked = {}
  365. for room in rooms:
  366. knock = await self._event_serializer.serialize_event(
  367. room.knock, time_now, config=serialize_options
  368. )
  369. # Extract the `unsigned` key from the knock event.
  370. # This is where we (cheekily) store the knock state events
  371. unsigned = knock.setdefault("unsigned", {})
  372. # Duplicate the dictionary in order to avoid modifying the original
  373. unsigned = dict(unsigned)
  374. # Extract the stripped room state from the unsigned dict
  375. # This is for clients to get a little bit of information about
  376. # the room they've knocked on, without revealing any sensitive information
  377. knocked_state = list(unsigned.pop("knock_room_state", []))
  378. # Append the actual knock membership event itself as well. This provides
  379. # the client with:
  380. #
  381. # * A knock state event that they can use for easier internal tracking
  382. # * The rough timestamp of when the knock occurred contained within the event
  383. knocked_state.append(knock)
  384. # Build the `knock_state` dictionary, which will contain the state of the
  385. # room that the client has knocked on
  386. knocked[room.room_id] = {"knock_state": {"events": knocked_state}}
  387. return knocked
  388. @trace_with_opname("sync.encode_archived")
  389. async def encode_archived(
  390. self,
  391. rooms: List[ArchivedSyncResult],
  392. time_now: int,
  393. serialize_options: SerializeEventConfig,
  394. ) -> JsonDict:
  395. """
  396. Encode the archived rooms in a sync result
  397. Args:
  398. rooms: list of sync results for rooms this user is joined to
  399. time_now: current time - used as a baseline for age calculations
  400. serialize_options: Event serializer options
  401. Returns:
  402. The archived rooms list, in our response format
  403. """
  404. joined = {}
  405. for room in rooms:
  406. joined[room.room_id] = await self.encode_room(
  407. room, time_now, joined=False, serialize_options=serialize_options
  408. )
  409. return joined
  410. async def encode_room(
  411. self,
  412. room: Union[JoinedSyncResult, ArchivedSyncResult],
  413. time_now: int,
  414. joined: bool,
  415. serialize_options: SerializeEventConfig,
  416. ) -> JsonDict:
  417. """
  418. Args:
  419. room: sync result for a single room
  420. time_now: current time - used as a baseline for age calculations
  421. token_id: ID of the user's auth token - used for namespacing
  422. of transaction IDs
  423. joined: True if the user is joined to this room - will mean
  424. we handle ephemeral events
  425. only_fields: Optional. The list of event fields to include.
  426. event_formatter: function to convert from federation format
  427. to client format
  428. Returns:
  429. The room, encoded in our response format
  430. """
  431. state_dict = room.state
  432. timeline_events = room.timeline.events
  433. state_events = state_dict.values()
  434. for event in itertools.chain(state_events, timeline_events):
  435. # We've had bug reports that events were coming down under the
  436. # wrong room.
  437. if event.room_id != room.room_id:
  438. logger.warning(
  439. "Event %r is under room %r instead of %r",
  440. event.event_id,
  441. room.room_id,
  442. event.room_id,
  443. )
  444. serialized_state = await self._event_serializer.serialize_events(
  445. state_events, time_now, config=serialize_options
  446. )
  447. serialized_timeline = await self._event_serializer.serialize_events(
  448. timeline_events,
  449. time_now,
  450. config=serialize_options,
  451. bundle_aggregations=room.timeline.bundled_aggregations,
  452. )
  453. account_data = room.account_data
  454. result: JsonDict = {
  455. "timeline": {
  456. "events": serialized_timeline,
  457. "prev_batch": await room.timeline.prev_batch.to_string(self.store),
  458. "limited": room.timeline.limited,
  459. },
  460. "state": {"events": serialized_state},
  461. "account_data": {"events": account_data},
  462. }
  463. if joined:
  464. assert isinstance(room, JoinedSyncResult)
  465. ephemeral_events = room.ephemeral
  466. result["ephemeral"] = {"events": ephemeral_events}
  467. result["unread_notifications"] = room.unread_notifications
  468. if room.unread_thread_notifications:
  469. result["unread_thread_notifications"] = room.unread_thread_notifications
  470. if self._msc3773_enabled:
  471. result[
  472. "org.matrix.msc3773.unread_thread_notifications"
  473. ] = room.unread_thread_notifications
  474. result["summary"] = room.summary
  475. if self._msc2654_enabled:
  476. result["org.matrix.msc2654.unread_count"] = room.unread_count
  477. return result
  478. def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
  479. SyncRestServlet(hs).register(http_server)