You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

533 regels
20 KiB

  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import itertools
  15. import logging
  16. from collections import defaultdict
  17. from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple
  18. from synapse.api.constants import Membership, PresenceState
  19. from synapse.api.errors import Codes, StoreError, SynapseError
  20. from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
  21. from synapse.events.utils import (
  22. format_event_for_client_v2_without_room_id,
  23. format_event_raw,
  24. )
  25. from synapse.handlers.presence import format_user_presence_state
  26. from synapse.handlers.sync import KnockedSyncResult, SyncConfig
  27. from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
  28. from synapse.http.site import SynapseRequest
  29. from synapse.types import JsonDict, StreamToken
  30. from synapse.util import json_decoder
  31. from ._base import client_patterns, set_timeline_upper_limit
  32. if TYPE_CHECKING:
  33. from synapse.server import HomeServer
  34. logger = logging.getLogger(__name__)
  35. class SyncRestServlet(RestServlet):
  36. """
  37. GET parameters::
  38. timeout(int): How long to wait for new events in milliseconds.
  39. since(batch_token): Batch token when asking for incremental deltas.
  40. set_presence(str): What state the device presence should be set to.
  41. default is "online".
  42. filter(filter_id): A filter to apply to the events returned.
  43. Response JSON::
  44. {
  45. "next_batch": // batch token for the next /sync
  46. "presence": // presence data for the user.
  47. "rooms": {
  48. "join": { // Joined rooms being updated.
  49. "${room_id}": { // Id of the room being updated
  50. "event_map": // Map of EventID -> event JSON.
  51. "timeline": { // The recent events in the room if gap is "true"
  52. "limited": // Was the per-room event limit exceeded?
  53. // otherwise the next events in the room.
  54. "events": [] // list of EventIDs in the "event_map".
  55. "prev_batch": // back token for getting previous events.
  56. }
  57. "state": {"events": []} // list of EventIDs updating the
  58. // current state to be what it should
  59. // be at the end of the batch.
  60. "ephemeral": {"events": []} // list of event objects
  61. }
  62. },
  63. "invite": {}, // Invited rooms being updated.
  64. "leave": {} // Archived rooms being updated.
  65. }
  66. }
  67. """
  68. PATTERNS = client_patterns("/sync$")
  69. ALLOWED_PRESENCE = {"online", "offline", "unavailable"}
  70. def __init__(self, hs: "HomeServer"):
  71. super().__init__()
  72. self.hs = hs
  73. self.auth = hs.get_auth()
  74. self.store = hs.get_datastore()
  75. self.sync_handler = hs.get_sync_handler()
  76. self.clock = hs.get_clock()
  77. self.filtering = hs.get_filtering()
  78. self.presence_handler = hs.get_presence_handler()
  79. self._server_notices_sender = hs.get_server_notices_sender()
  80. self._event_serializer = hs.get_event_client_serializer()
  81. async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
  82. # This will always be set by the time Twisted calls us.
  83. assert request.args is not None
  84. if b"from" in request.args:
  85. # /events used to use 'from', but /sync uses 'since'.
  86. # Lets be helpful and whine if we see a 'from'.
  87. raise SynapseError(
  88. 400, "'from' is not a valid query parameter. Did you mean 'since'?"
  89. )
  90. requester = await self.auth.get_user_by_req(request, allow_guest=True)
  91. user = requester.user
  92. device_id = requester.device_id
  93. timeout = parse_integer(request, "timeout", default=0)
  94. since = parse_string(request, "since")
  95. set_presence = parse_string(
  96. request,
  97. "set_presence",
  98. default="online",
  99. allowed_values=self.ALLOWED_PRESENCE,
  100. )
  101. filter_id = parse_string(request, "filter")
  102. full_state = parse_boolean(request, "full_state", default=False)
  103. logger.debug(
  104. "/sync: user=%r, timeout=%r, since=%r, "
  105. "set_presence=%r, filter_id=%r, device_id=%r",
  106. user,
  107. timeout,
  108. since,
  109. set_presence,
  110. filter_id,
  111. device_id,
  112. )
  113. request_key = (user, timeout, since, filter_id, full_state, device_id)
  114. if filter_id is None:
  115. filter_collection = DEFAULT_FILTER_COLLECTION
  116. elif filter_id.startswith("{"):
  117. try:
  118. filter_object = json_decoder.decode(filter_id)
  119. set_timeline_upper_limit(
  120. filter_object, self.hs.config.filter_timeline_limit
  121. )
  122. except Exception:
  123. raise SynapseError(400, "Invalid filter JSON")
  124. self.filtering.check_valid_filter(filter_object)
  125. filter_collection = FilterCollection(filter_object)
  126. else:
  127. try:
  128. filter_collection = await self.filtering.get_user_filter(
  129. user.localpart, filter_id
  130. )
  131. except StoreError as err:
  132. if err.code != 404:
  133. raise
  134. # fix up the description and errcode to be more useful
  135. raise SynapseError(400, "No such filter", errcode=Codes.INVALID_PARAM)
  136. sync_config = SyncConfig(
  137. user=user,
  138. filter_collection=filter_collection,
  139. is_guest=requester.is_guest,
  140. request_key=request_key,
  141. device_id=device_id,
  142. )
  143. since_token = None
  144. if since is not None:
  145. since_token = await StreamToken.from_string(self.store, since)
  146. # send any outstanding server notices to the user.
  147. await self._server_notices_sender.on_user_syncing(user.to_string())
  148. affect_presence = set_presence != PresenceState.OFFLINE
  149. if affect_presence:
  150. await self.presence_handler.set_state(
  151. user, {"presence": set_presence}, True
  152. )
  153. context = await self.presence_handler.user_syncing(
  154. user.to_string(), affect_presence=affect_presence
  155. )
  156. with context:
  157. sync_result = await self.sync_handler.wait_for_sync_for_user(
  158. requester,
  159. sync_config,
  160. since_token=since_token,
  161. timeout=timeout,
  162. full_state=full_state,
  163. )
  164. # the client may have disconnected by now; don't bother to serialize the
  165. # response if so.
  166. if request._disconnected:
  167. logger.info("Client has disconnected; not serializing response.")
  168. return 200, {}
  169. time_now = self.clock.time_msec()
  170. response_content = await self.encode_response(
  171. time_now, sync_result, requester.access_token_id, filter_collection
  172. )
  173. logger.debug("Event formatting complete")
  174. return 200, response_content
  175. async def encode_response(self, time_now, sync_result, access_token_id, filter):
  176. logger.debug("Formatting events in sync response")
  177. if filter.event_format == "client":
  178. event_formatter = format_event_for_client_v2_without_room_id
  179. elif filter.event_format == "federation":
  180. event_formatter = format_event_raw
  181. else:
  182. raise Exception("Unknown event format %s" % (filter.event_format,))
  183. joined = await self.encode_joined(
  184. sync_result.joined,
  185. time_now,
  186. access_token_id,
  187. filter.event_fields,
  188. event_formatter,
  189. )
  190. invited = await self.encode_invited(
  191. sync_result.invited, time_now, access_token_id, event_formatter
  192. )
  193. knocked = await self.encode_knocked(
  194. sync_result.knocked, time_now, access_token_id, event_formatter
  195. )
  196. archived = await self.encode_archived(
  197. sync_result.archived,
  198. time_now,
  199. access_token_id,
  200. filter.event_fields,
  201. event_formatter,
  202. )
  203. logger.debug("building sync response dict")
  204. response: dict = defaultdict(dict)
  205. response["next_batch"] = await sync_result.next_batch.to_string(self.store)
  206. if sync_result.account_data:
  207. response["account_data"] = {"events": sync_result.account_data}
  208. if sync_result.presence:
  209. response["presence"] = SyncRestServlet.encode_presence(
  210. sync_result.presence, time_now
  211. )
  212. if sync_result.to_device:
  213. response["to_device"] = {"events": sync_result.to_device}
  214. if sync_result.device_lists.changed:
  215. response["device_lists"]["changed"] = list(sync_result.device_lists.changed)
  216. if sync_result.device_lists.left:
  217. response["device_lists"]["left"] = list(sync_result.device_lists.left)
  218. # We always include this because https://github.com/vector-im/element-android/issues/3725
  219. # The spec isn't terribly clear on when this can be omitted and how a client would tell
  220. # the difference between "no keys present" and "nothing changed" in terms of whole field
  221. # absent / individual key type entry absent
  222. # Corresponding synapse issue: https://github.com/matrix-org/synapse/issues/10456
  223. response["device_one_time_keys_count"] = sync_result.device_one_time_keys_count
  224. # https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
  225. # states that this field should always be included, as long as the server supports the feature.
  226. response[
  227. "org.matrix.msc2732.device_unused_fallback_key_types"
  228. ] = sync_result.device_unused_fallback_key_types
  229. if joined:
  230. response["rooms"][Membership.JOIN] = joined
  231. if invited:
  232. response["rooms"][Membership.INVITE] = invited
  233. if knocked:
  234. response["rooms"][Membership.KNOCK] = knocked
  235. if archived:
  236. response["rooms"][Membership.LEAVE] = archived
  237. if sync_result.groups.join:
  238. response["groups"][Membership.JOIN] = sync_result.groups.join
  239. if sync_result.groups.invite:
  240. response["groups"][Membership.INVITE] = sync_result.groups.invite
  241. if sync_result.groups.leave:
  242. response["groups"][Membership.LEAVE] = sync_result.groups.leave
  243. return response
  244. @staticmethod
  245. def encode_presence(events, time_now):
  246. return {
  247. "events": [
  248. {
  249. "type": "m.presence",
  250. "sender": event.user_id,
  251. "content": format_user_presence_state(
  252. event, time_now, include_user_id=False
  253. ),
  254. }
  255. for event in events
  256. ]
  257. }
  258. async def encode_joined(
  259. self, rooms, time_now, token_id, event_fields, event_formatter
  260. ):
  261. """
  262. Encode the joined rooms in a sync result
  263. Args:
  264. rooms(list[synapse.handlers.sync.JoinedSyncResult]): list of sync
  265. results for rooms this user is joined to
  266. time_now(int): current time - used as a baseline for age
  267. calculations
  268. token_id(int): ID of the user's auth token - used for namespacing
  269. of transaction IDs
  270. event_fields(list<str>): List of event fields to include. If empty,
  271. all fields will be returned.
  272. event_formatter (func[dict]): function to convert from federation format
  273. to client format
  274. Returns:
  275. dict[str, dict[str, object]]: the joined rooms list, in our
  276. response format
  277. """
  278. joined = {}
  279. for room in rooms:
  280. joined[room.room_id] = await self.encode_room(
  281. room,
  282. time_now,
  283. token_id,
  284. joined=True,
  285. only_fields=event_fields,
  286. event_formatter=event_formatter,
  287. )
  288. return joined
  289. async def encode_invited(self, rooms, time_now, token_id, event_formatter):
  290. """
  291. Encode the invited rooms in a sync result
  292. Args:
  293. rooms(list[synapse.handlers.sync.InvitedSyncResult]): list of
  294. sync results for rooms this user is invited to
  295. time_now(int): current time - used as a baseline for age
  296. calculations
  297. token_id(int): ID of the user's auth token - used for namespacing
  298. of transaction IDs
  299. event_formatter (func[dict]): function to convert from federation format
  300. to client format
  301. Returns:
  302. dict[str, dict[str, object]]: the invited rooms list, in our
  303. response format
  304. """
  305. invited = {}
  306. for room in rooms:
  307. invite = await self._event_serializer.serialize_event(
  308. room.invite,
  309. time_now,
  310. token_id=token_id,
  311. event_format=event_formatter,
  312. include_stripped_room_state=True,
  313. )
  314. unsigned = dict(invite.get("unsigned", {}))
  315. invite["unsigned"] = unsigned
  316. invited_state = list(unsigned.pop("invite_room_state", []))
  317. invited_state.append(invite)
  318. invited[room.room_id] = {"invite_state": {"events": invited_state}}
  319. return invited
  320. async def encode_knocked(
  321. self,
  322. rooms: List[KnockedSyncResult],
  323. time_now: int,
  324. token_id: int,
  325. event_formatter: Callable[[Dict], Dict],
  326. ) -> Dict[str, Dict[str, Any]]:
  327. """
  328. Encode the rooms we've knocked on in a sync result.
  329. Args:
  330. rooms: list of sync results for rooms this user is knocking on
  331. time_now: current time - used as a baseline for age calculations
  332. token_id: ID of the user's auth token - used for namespacing of transaction IDs
  333. event_formatter: function to convert from federation format to client format
  334. Returns:
  335. The list of rooms the user has knocked on, in our response format.
  336. """
  337. knocked = {}
  338. for room in rooms:
  339. knock = await self._event_serializer.serialize_event(
  340. room.knock,
  341. time_now,
  342. token_id=token_id,
  343. event_format=event_formatter,
  344. include_stripped_room_state=True,
  345. )
  346. # Extract the `unsigned` key from the knock event.
  347. # This is where we (cheekily) store the knock state events
  348. unsigned = knock.setdefault("unsigned", {})
  349. # Duplicate the dictionary in order to avoid modifying the original
  350. unsigned = dict(unsigned)
  351. # Extract the stripped room state from the unsigned dict
  352. # This is for clients to get a little bit of information about
  353. # the room they've knocked on, without revealing any sensitive information
  354. knocked_state = list(unsigned.pop("knock_room_state", []))
  355. # Append the actual knock membership event itself as well. This provides
  356. # the client with:
  357. #
  358. # * A knock state event that they can use for easier internal tracking
  359. # * The rough timestamp of when the knock occurred contained within the event
  360. knocked_state.append(knock)
  361. # Build the `knock_state` dictionary, which will contain the state of the
  362. # room that the client has knocked on
  363. knocked[room.room_id] = {"knock_state": {"events": knocked_state}}
  364. return knocked
  365. async def encode_archived(
  366. self, rooms, time_now, token_id, event_fields, event_formatter
  367. ):
  368. """
  369. Encode the archived rooms in a sync result
  370. Args:
  371. rooms (list[synapse.handlers.sync.ArchivedSyncResult]): list of
  372. sync results for rooms this user is joined to
  373. time_now(int): current time - used as a baseline for age
  374. calculations
  375. token_id(int): ID of the user's auth token - used for namespacing
  376. of transaction IDs
  377. event_fields(list<str>): List of event fields to include. If empty,
  378. all fields will be returned.
  379. event_formatter (func[dict]): function to convert from federation format
  380. to client format
  381. Returns:
  382. dict[str, dict[str, object]]: The invited rooms list, in our
  383. response format
  384. """
  385. joined = {}
  386. for room in rooms:
  387. joined[room.room_id] = await self.encode_room(
  388. room,
  389. time_now,
  390. token_id,
  391. joined=False,
  392. only_fields=event_fields,
  393. event_formatter=event_formatter,
  394. )
  395. return joined
  396. async def encode_room(
  397. self, room, time_now, token_id, joined, only_fields, event_formatter
  398. ):
  399. """
  400. Args:
  401. room (JoinedSyncResult|ArchivedSyncResult): sync result for a
  402. single room
  403. time_now (int): current time - used as a baseline for age
  404. calculations
  405. token_id (int): ID of the user's auth token - used for namespacing
  406. of transaction IDs
  407. joined (bool): True if the user is joined to this room - will mean
  408. we handle ephemeral events
  409. only_fields(list<str>): Optional. The list of event fields to include.
  410. event_formatter (func[dict]): function to convert from federation format
  411. to client format
  412. Returns:
  413. dict[str, object]: the room, encoded in our response format
  414. """
  415. def serialize(events):
  416. return self._event_serializer.serialize_events(
  417. events,
  418. time_now=time_now,
  419. # We don't bundle "live" events, as otherwise clients
  420. # will end up double counting annotations.
  421. bundle_aggregations=False,
  422. token_id=token_id,
  423. event_format=event_formatter,
  424. only_event_fields=only_fields,
  425. )
  426. state_dict = room.state
  427. timeline_events = room.timeline.events
  428. state_events = state_dict.values()
  429. for event in itertools.chain(state_events, timeline_events):
  430. # We've had bug reports that events were coming down under the
  431. # wrong room.
  432. if event.room_id != room.room_id:
  433. logger.warning(
  434. "Event %r is under room %r instead of %r",
  435. event.event_id,
  436. room.room_id,
  437. event.room_id,
  438. )
  439. serialized_state = await serialize(state_events)
  440. serialized_timeline = await serialize(timeline_events)
  441. account_data = room.account_data
  442. result = {
  443. "timeline": {
  444. "events": serialized_timeline,
  445. "prev_batch": await room.timeline.prev_batch.to_string(self.store),
  446. "limited": room.timeline.limited,
  447. },
  448. "state": {"events": serialized_state},
  449. "account_data": {"events": account_data},
  450. }
  451. if joined:
  452. ephemeral_events = room.ephemeral
  453. result["ephemeral"] = {"events": ephemeral_events}
  454. result["unread_notifications"] = room.unread_notifications
  455. result["summary"] = room.summary
  456. result["org.matrix.msc2654.unread_count"] = room.unread_count
  457. return result
  458. def register_servlets(hs, http_server):
  459. SyncRestServlet(hs).register(http_server)