You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1112 lines
35 KiB

  1. # Copyright 2014-2022 The Matrix.org Foundation C.I.C.
  2. # Copyright 2020 Sorunome
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import urllib
  17. from typing import (
  18. TYPE_CHECKING,
  19. Any,
  20. BinaryIO,
  21. Callable,
  22. Collection,
  23. Dict,
  24. Generator,
  25. Iterable,
  26. List,
  27. Mapping,
  28. Optional,
  29. Tuple,
  30. Union,
  31. )
  32. import attr
  33. import ijson
  34. from synapse.api.constants import Direction, Membership
  35. from synapse.api.errors import Codes, HttpResponseException, SynapseError
  36. from synapse.api.room_versions import RoomVersion
  37. from synapse.api.urls import (
  38. FEDERATION_UNSTABLE_PREFIX,
  39. FEDERATION_V1_PREFIX,
  40. FEDERATION_V2_PREFIX,
  41. )
  42. from synapse.events import EventBase, make_event_from_dict
  43. from synapse.federation.units import Transaction
  44. from synapse.http.matrixfederationclient import ByteParser, LegacyJsonSendParser
  45. from synapse.http.types import QueryParams
  46. from synapse.types import JsonDict, UserID
  47. from synapse.util import ExceptionBundle
  48. if TYPE_CHECKING:
  49. from synapse.app.homeserver import HomeServer
  50. logger = logging.getLogger(__name__)
  51. class TransportLayerClient:
  52. """Sends federation HTTP requests to other servers"""
  53. def __init__(self, hs: "HomeServer"):
  54. self.client = hs.get_federation_http_client()
  55. self._is_mine_server_name = hs.is_mine_server_name
  56. async def get_room_state_ids(
  57. self, destination: str, room_id: str, event_id: str
  58. ) -> JsonDict:
  59. """Requests the IDs of all state for a given room at the given event.
  60. Args:
  61. destination: The host name of the remote homeserver we want
  62. to get the state from.
  63. room_id: the room we want the state of
  64. event_id: The event we want the context at.
  65. Returns:
  66. Results in a dict received from the remote homeserver.
  67. """
  68. logger.debug("get_room_state_ids dest=%s, room=%s", destination, room_id)
  69. path = _create_v1_path("/state_ids/%s", room_id)
  70. return await self.client.get_json(
  71. destination,
  72. path=path,
  73. args={"event_id": event_id},
  74. try_trailing_slash_on_400=True,
  75. )
  76. async def get_room_state(
  77. self, room_version: RoomVersion, destination: str, room_id: str, event_id: str
  78. ) -> "StateRequestResponse":
  79. """Requests the full state for a given room at the given event.
  80. Args:
  81. room_version: the version of the room (required to build the event objects)
  82. destination: The host name of the remote homeserver we want
  83. to get the state from.
  84. room_id: the room we want the state of
  85. event_id: The event we want the context at.
  86. Returns:
  87. Results in a dict received from the remote homeserver.
  88. """
  89. path = _create_v1_path("/state/%s", room_id)
  90. return await self.client.get_json(
  91. destination,
  92. path=path,
  93. args={"event_id": event_id},
  94. # This can take a looooooong time for large rooms. Give this a generous
  95. # timeout of 10 minutes to avoid the partial state resync timing out early
  96. # and trying a bunch of servers who haven't seen our join yet.
  97. timeout=600_000,
  98. parser=_StateParser(room_version),
  99. )
  100. async def get_event(
  101. self, destination: str, event_id: str, timeout: Optional[int] = None
  102. ) -> JsonDict:
  103. """Requests the pdu with give id and origin from the given server.
  104. Args:
  105. destination: The host name of the remote homeserver we want
  106. to get the state from.
  107. event_id: The id of the event being requested.
  108. timeout: How long to try (in ms) the destination for before
  109. giving up. None indicates no timeout.
  110. Returns:
  111. Results in a dict received from the remote homeserver.
  112. """
  113. logger.debug("get_pdu dest=%s, event_id=%s", destination, event_id)
  114. path = _create_v1_path("/event/%s", event_id)
  115. return await self.client.get_json(
  116. destination, path=path, timeout=timeout, try_trailing_slash_on_400=True
  117. )
  118. async def backfill(
  119. self, destination: str, room_id: str, event_tuples: Collection[str], limit: int
  120. ) -> Optional[Union[JsonDict, list]]:
  121. """Requests `limit` previous PDUs in a given context before list of
  122. PDUs.
  123. Args:
  124. destination
  125. room_id
  126. event_tuples:
  127. Must be a Collection that is falsy when empty.
  128. (Iterable is not enough here!)
  129. limit
  130. Returns:
  131. Results in a dict received from the remote homeserver.
  132. """
  133. logger.debug(
  134. "backfill dest=%s, room_id=%s, event_tuples=%r, limit=%s",
  135. destination,
  136. room_id,
  137. event_tuples,
  138. str(limit),
  139. )
  140. if not event_tuples:
  141. # TODO: raise?
  142. return None
  143. path = _create_v1_path("/backfill/%s", room_id)
  144. args = {"v": event_tuples, "limit": [str(limit)]}
  145. return await self.client.get_json(
  146. destination, path=path, args=args, try_trailing_slash_on_400=True
  147. )
  148. async def timestamp_to_event(
  149. self, destination: str, room_id: str, timestamp: int, direction: Direction
  150. ) -> Union[JsonDict, List]:
  151. """
  152. Calls a remote federating server at `destination` asking for their
  153. closest event to the given timestamp in the given direction.
  154. Args:
  155. destination: Domain name of the remote homeserver
  156. room_id: Room to fetch the event from
  157. timestamp: The point in time (inclusive) we should navigate from in
  158. the given direction to find the closest event.
  159. direction: indicates whether we should navigate forward
  160. or backward from the given timestamp to find the closest event.
  161. Returns:
  162. Response dict received from the remote homeserver.
  163. Raises:
  164. Various exceptions when the request fails
  165. """
  166. path = _create_v1_path(
  167. "/timestamp_to_event/%s",
  168. room_id,
  169. )
  170. args = {"ts": [str(timestamp)], "dir": [direction.value]}
  171. remote_response = await self.client.get_json(
  172. destination, path=path, args=args, try_trailing_slash_on_400=True
  173. )
  174. return remote_response
  175. async def send_transaction(
  176. self,
  177. transaction: Transaction,
  178. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  179. ) -> JsonDict:
  180. """Sends the given Transaction to its destination
  181. Args:
  182. transaction
  183. Returns:
  184. Succeeds when we get a 2xx HTTP response. The result
  185. will be the decoded JSON body.
  186. Fails with ``HTTPRequestException`` if we get an HTTP response
  187. code >= 300.
  188. Fails with ``NotRetryingDestination`` if we are not yet ready
  189. to retry this server.
  190. Fails with ``FederationDeniedError`` if this destination
  191. is not on our federation whitelist
  192. """
  193. logger.debug(
  194. "send_data dest=%s, txid=%s",
  195. transaction.destination,
  196. transaction.transaction_id,
  197. )
  198. if self._is_mine_server_name(transaction.destination):
  199. raise RuntimeError("Transport layer cannot send to itself!")
  200. # FIXME: This is only used by the tests. The actual json sent is
  201. # generated by the json_data_callback.
  202. json_data = transaction.get_dict()
  203. path = _create_v1_path("/send/%s", transaction.transaction_id)
  204. return await self.client.put_json(
  205. transaction.destination,
  206. path=path,
  207. data=json_data,
  208. json_data_callback=json_data_callback,
  209. long_retries=True,
  210. try_trailing_slash_on_400=True,
  211. # Sending a transaction should always succeed, if it doesn't
  212. # then something is wrong and we should backoff.
  213. backoff_on_all_error_codes=True,
  214. )
  215. async def make_query(
  216. self,
  217. destination: str,
  218. query_type: str,
  219. args: QueryParams,
  220. retry_on_dns_fail: bool,
  221. ignore_backoff: bool = False,
  222. prefix: str = FEDERATION_V1_PREFIX,
  223. ) -> JsonDict:
  224. path = _create_path(prefix, "/query/%s", query_type)
  225. return await self.client.get_json(
  226. destination=destination,
  227. path=path,
  228. args=args,
  229. retry_on_dns_fail=retry_on_dns_fail,
  230. timeout=10000,
  231. ignore_backoff=ignore_backoff,
  232. )
  233. async def make_membership_event(
  234. self,
  235. destination: str,
  236. room_id: str,
  237. user_id: str,
  238. membership: str,
  239. params: Optional[Mapping[str, Union[str, Iterable[str]]]],
  240. ) -> JsonDict:
  241. """Asks a remote server to build and sign us a membership event
  242. Note that this does not append any events to any graphs.
  243. Args:
  244. destination: address of remote homeserver
  245. room_id: room to join/leave
  246. user_id: user to be joined/left
  247. membership: one of join/leave
  248. params: Query parameters to include in the request.
  249. Returns:
  250. Succeeds when we get a 2xx HTTP response. The result
  251. will be the decoded JSON body (ie, the new event).
  252. Fails with ``HTTPRequestException`` if we get an HTTP response
  253. code >= 300.
  254. Fails with ``NotRetryingDestination`` if we are not yet ready
  255. to retry this server.
  256. Fails with ``FederationDeniedError`` if the remote destination
  257. is not in our federation whitelist
  258. """
  259. valid_memberships = {Membership.JOIN, Membership.LEAVE, Membership.KNOCK}
  260. if membership not in valid_memberships:
  261. raise RuntimeError(
  262. "make_membership_event called with membership='%s', must be one of %s"
  263. % (membership, ",".join(valid_memberships))
  264. )
  265. path = _create_v1_path("/make_%s/%s/%s", membership, room_id, user_id)
  266. ignore_backoff = False
  267. retry_on_dns_fail = False
  268. if membership == Membership.LEAVE:
  269. # we particularly want to do our best to send leave events. The
  270. # problem is that if it fails, we won't retry it later, so if the
  271. # remote server was just having a momentary blip, the room will be
  272. # out of sync.
  273. ignore_backoff = True
  274. retry_on_dns_fail = True
  275. return await self.client.get_json(
  276. destination=destination,
  277. path=path,
  278. args=params,
  279. retry_on_dns_fail=retry_on_dns_fail,
  280. timeout=20000,
  281. ignore_backoff=ignore_backoff,
  282. )
  283. async def send_join_v1(
  284. self,
  285. room_version: RoomVersion,
  286. destination: str,
  287. room_id: str,
  288. event_id: str,
  289. content: JsonDict,
  290. ) -> "SendJoinResponse":
  291. path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
  292. return await self.client.put_json(
  293. destination=destination,
  294. path=path,
  295. data=content,
  296. parser=SendJoinParser(room_version, v1_api=True),
  297. )
  298. async def send_join_v2(
  299. self,
  300. room_version: RoomVersion,
  301. destination: str,
  302. room_id: str,
  303. event_id: str,
  304. content: JsonDict,
  305. omit_members: bool,
  306. ) -> "SendJoinResponse":
  307. path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
  308. query_params: Dict[str, str] = {}
  309. # lazy-load state on join
  310. query_params["omit_members"] = "true" if omit_members else "false"
  311. return await self.client.put_json(
  312. destination=destination,
  313. path=path,
  314. args=query_params,
  315. data=content,
  316. parser=SendJoinParser(room_version, v1_api=False),
  317. )
  318. async def send_leave_v1(
  319. self, destination: str, room_id: str, event_id: str, content: JsonDict
  320. ) -> Tuple[int, JsonDict]:
  321. path = _create_v1_path("/send_leave/%s/%s", room_id, event_id)
  322. return await self.client.put_json(
  323. destination=destination,
  324. path=path,
  325. data=content,
  326. # we want to do our best to send this through. The problem is
  327. # that if it fails, we won't retry it later, so if the remote
  328. # server was just having a momentary blip, the room will be out of
  329. # sync.
  330. ignore_backoff=True,
  331. parser=LegacyJsonSendParser(),
  332. )
  333. async def send_leave_v2(
  334. self, destination: str, room_id: str, event_id: str, content: JsonDict
  335. ) -> JsonDict:
  336. path = _create_v2_path("/send_leave/%s/%s", room_id, event_id)
  337. return await self.client.put_json(
  338. destination=destination,
  339. path=path,
  340. data=content,
  341. # we want to do our best to send this through. The problem is
  342. # that if it fails, we won't retry it later, so if the remote
  343. # server was just having a momentary blip, the room will be out of
  344. # sync.
  345. ignore_backoff=True,
  346. )
  347. async def send_knock_v1(
  348. self,
  349. destination: str,
  350. room_id: str,
  351. event_id: str,
  352. content: JsonDict,
  353. ) -> JsonDict:
  354. """
  355. Sends a signed knock membership event to a remote server. This is the second
  356. step for knocking after make_knock.
  357. Args:
  358. destination: The remote homeserver.
  359. room_id: The ID of the room to knock on.
  360. event_id: The ID of the knock membership event that we're sending.
  361. content: The knock membership event that we're sending. Note that this is not the
  362. `content` field of the membership event, but the entire signed membership event
  363. itself represented as a JSON dict.
  364. Returns:
  365. The remote homeserver can optionally return some state from the room. The response
  366. dictionary is in the form:
  367. {"knock_room_state": [<state event dict>, ...]}
  368. The list of state events may be empty.
  369. """
  370. path = _create_v1_path("/send_knock/%s/%s", room_id, event_id)
  371. return await self.client.put_json(
  372. destination=destination, path=path, data=content
  373. )
  374. async def send_invite_v1(
  375. self, destination: str, room_id: str, event_id: str, content: JsonDict
  376. ) -> Tuple[int, JsonDict]:
  377. path = _create_v1_path("/invite/%s/%s", room_id, event_id)
  378. return await self.client.put_json(
  379. destination=destination,
  380. path=path,
  381. data=content,
  382. ignore_backoff=True,
  383. parser=LegacyJsonSendParser(),
  384. )
  385. async def send_invite_v2(
  386. self, destination: str, room_id: str, event_id: str, content: JsonDict
  387. ) -> JsonDict:
  388. path = _create_v2_path("/invite/%s/%s", room_id, event_id)
  389. return await self.client.put_json(
  390. destination=destination, path=path, data=content, ignore_backoff=True
  391. )
  392. async def get_public_rooms(
  393. self,
  394. remote_server: str,
  395. limit: Optional[int] = None,
  396. since_token: Optional[str] = None,
  397. search_filter: Optional[Dict] = None,
  398. include_all_networks: bool = False,
  399. third_party_instance_id: Optional[str] = None,
  400. ) -> JsonDict:
  401. """Get the list of public rooms from a remote homeserver
  402. See synapse.federation.federation_client.FederationClient.get_public_rooms for
  403. more information.
  404. """
  405. path = _create_v1_path("/publicRooms")
  406. if search_filter:
  407. # this uses MSC2197 (Search Filtering over Federation)
  408. data: Dict[str, Any] = {"include_all_networks": include_all_networks}
  409. if third_party_instance_id:
  410. data["third_party_instance_id"] = third_party_instance_id
  411. if limit:
  412. data["limit"] = limit
  413. if since_token:
  414. data["since"] = since_token
  415. data["filter"] = search_filter
  416. try:
  417. response = await self.client.post_json(
  418. destination=remote_server, path=path, data=data, ignore_backoff=True
  419. )
  420. except HttpResponseException as e:
  421. if e.code == 403:
  422. raise SynapseError(
  423. 403,
  424. "You are not allowed to view the public rooms list of %s"
  425. % (remote_server,),
  426. errcode=Codes.FORBIDDEN,
  427. )
  428. raise
  429. else:
  430. args: Dict[str, Union[str, Iterable[str]]] = {
  431. "include_all_networks": "true" if include_all_networks else "false"
  432. }
  433. if third_party_instance_id:
  434. args["third_party_instance_id"] = third_party_instance_id
  435. if limit:
  436. args["limit"] = str(limit)
  437. if since_token:
  438. args["since"] = since_token
  439. try:
  440. response = await self.client.get_json(
  441. destination=remote_server, path=path, args=args, ignore_backoff=True
  442. )
  443. except HttpResponseException as e:
  444. if e.code == 403:
  445. raise SynapseError(
  446. 403,
  447. "You are not allowed to view the public rooms list of %s"
  448. % (remote_server,),
  449. errcode=Codes.FORBIDDEN,
  450. )
  451. raise
  452. return response
  453. async def exchange_third_party_invite(
  454. self, destination: str, room_id: str, event_dict: JsonDict
  455. ) -> JsonDict:
  456. path = _create_v1_path("/exchange_third_party_invite/%s", room_id)
  457. return await self.client.put_json(
  458. destination=destination, path=path, data=event_dict
  459. )
  460. async def get_event_auth(
  461. self, destination: str, room_id: str, event_id: str
  462. ) -> JsonDict:
  463. path = _create_v1_path("/event_auth/%s/%s", room_id, event_id)
  464. return await self.client.get_json(destination=destination, path=path)
  465. async def query_client_keys(
  466. self, destination: str, query_content: JsonDict, timeout: int
  467. ) -> JsonDict:
  468. """Query the device keys for a list of user ids hosted on a remote
  469. server.
  470. Request:
  471. {
  472. "device_keys": {
  473. "<user_id>": ["<device_id>"]
  474. }
  475. }
  476. Response:
  477. {
  478. "device_keys": {
  479. "<user_id>": {
  480. "<device_id>": {...}
  481. }
  482. },
  483. "master_key": {
  484. "<user_id>": {...}
  485. }
  486. },
  487. "self_signing_key": {
  488. "<user_id>": {...}
  489. }
  490. }
  491. Args:
  492. destination: The server to query.
  493. query_content: The user ids to query.
  494. Returns:
  495. A dict containing device and cross-signing keys.
  496. """
  497. path = _create_v1_path("/user/keys/query")
  498. return await self.client.post_json(
  499. destination=destination, path=path, data=query_content, timeout=timeout
  500. )
  501. async def query_user_devices(
  502. self, destination: str, user_id: str, timeout: int
  503. ) -> JsonDict:
  504. """Query the devices for a user id hosted on a remote server.
  505. Response:
  506. {
  507. "stream_id": "...",
  508. "devices": [ { ... } ],
  509. "master_key": {
  510. "user_id": "<user_id>",
  511. "usage": [...],
  512. "keys": {...},
  513. "signatures": {
  514. "<user_id>": {...}
  515. }
  516. },
  517. "self_signing_key": {
  518. "user_id": "<user_id>",
  519. "usage": [...],
  520. "keys": {...},
  521. "signatures": {
  522. "<user_id>": {...}
  523. }
  524. }
  525. }
  526. Args:
  527. destination: The server to query.
  528. query_content: The user ids to query.
  529. Returns:
  530. A dict containing device and cross-signing keys.
  531. """
  532. path = _create_v1_path("/user/devices/%s", user_id)
  533. return await self.client.get_json(
  534. destination=destination, path=path, timeout=timeout
  535. )
  536. async def claim_client_keys(
  537. self,
  538. user: UserID,
  539. destination: str,
  540. query_content: JsonDict,
  541. timeout: Optional[int],
  542. ) -> JsonDict:
  543. """Claim one-time keys for a list of devices hosted on a remote server.
  544. Request:
  545. {
  546. "one_time_keys": {
  547. "<user_id>": {
  548. "<device_id>": "<algorithm>"
  549. }
  550. }
  551. }
  552. Response:
  553. {
  554. "one_time_keys": {
  555. "<user_id>": {
  556. "<device_id>": {
  557. "<algorithm>:<key_id>": <OTK JSON>
  558. }
  559. }
  560. }
  561. }
  562. Args:
  563. user: the user_id of the requesting user
  564. destination: The server to query.
  565. query_content: The user ids to query.
  566. Returns:
  567. A dict containing the one-time keys.
  568. """
  569. path = _create_v1_path("/user/keys/claim")
  570. return await self.client.post_json(
  571. destination=destination,
  572. path=path,
  573. data={"one_time_keys": query_content},
  574. timeout=timeout,
  575. )
  576. async def claim_client_keys_unstable(
  577. self,
  578. user: UserID,
  579. destination: str,
  580. query_content: JsonDict,
  581. timeout: Optional[int],
  582. ) -> JsonDict:
  583. """Claim one-time keys for a list of devices hosted on a remote server.
  584. Request:
  585. {
  586. "one_time_keys": {
  587. "<user_id>": {
  588. "<device_id>": {"<algorithm>": <count>}
  589. }
  590. }
  591. }
  592. Response:
  593. {
  594. "one_time_keys": {
  595. "<user_id>": {
  596. "<device_id>": {
  597. "<algorithm>:<key_id>": <OTK JSON>
  598. }
  599. }
  600. }
  601. }
  602. Args:
  603. user: the user_id of the requesting user
  604. destination: The server to query.
  605. query_content: The user ids to query.
  606. Returns:
  607. A dict containing the one-time keys.
  608. """
  609. path = _create_path(FEDERATION_UNSTABLE_PREFIX, "/user/keys/claim")
  610. return await self.client.post_json(
  611. destination=destination,
  612. path=path,
  613. data={"one_time_keys": query_content},
  614. timeout=timeout,
  615. )
  616. async def get_missing_events(
  617. self,
  618. destination: str,
  619. room_id: str,
  620. earliest_events: Iterable[str],
  621. latest_events: Iterable[str],
  622. limit: int,
  623. min_depth: int,
  624. timeout: int,
  625. ) -> JsonDict:
  626. path = _create_v1_path("/get_missing_events/%s", room_id)
  627. return await self.client.post_json(
  628. destination=destination,
  629. path=path,
  630. data={
  631. "limit": int(limit),
  632. "min_depth": int(min_depth),
  633. "earliest_events": earliest_events,
  634. "latest_events": latest_events,
  635. },
  636. timeout=timeout,
  637. )
  638. async def get_room_complexity(self, destination: str, room_id: str) -> JsonDict:
  639. """
  640. Args:
  641. destination: The remote server
  642. room_id: The room ID to ask about.
  643. """
  644. path = _create_path(FEDERATION_UNSTABLE_PREFIX, "/rooms/%s/complexity", room_id)
  645. return await self.client.get_json(destination=destination, path=path)
  646. async def get_room_hierarchy(
  647. self, destination: str, room_id: str, suggested_only: bool
  648. ) -> JsonDict:
  649. """
  650. Args:
  651. destination: The remote server
  652. room_id: The room ID to ask about.
  653. suggested_only: if True, only suggested rooms will be returned
  654. """
  655. path = _create_v1_path("/hierarchy/%s", room_id)
  656. return await self.client.get_json(
  657. destination=destination,
  658. path=path,
  659. args={"suggested_only": "true" if suggested_only else "false"},
  660. )
  661. async def get_room_hierarchy_unstable(
  662. self, destination: str, room_id: str, suggested_only: bool
  663. ) -> JsonDict:
  664. """
  665. Args:
  666. destination: The remote server
  667. room_id: The room ID to ask about.
  668. suggested_only: if True, only suggested rooms will be returned
  669. """
  670. path = _create_path(
  671. FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/hierarchy/%s", room_id
  672. )
  673. return await self.client.get_json(
  674. destination=destination,
  675. path=path,
  676. args={"suggested_only": "true" if suggested_only else "false"},
  677. )
  678. async def get_account_status(
  679. self, destination: str, user_ids: List[str]
  680. ) -> JsonDict:
  681. """
  682. Args:
  683. destination: The remote server.
  684. user_ids: The user ID(s) for which to request account status(es).
  685. """
  686. path = _create_path(
  687. FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc3720/account_status"
  688. )
  689. return await self.client.post_json(
  690. destination=destination, path=path, data={"user_ids": user_ids}
  691. )
  692. async def download_media_r0(
  693. self,
  694. destination: str,
  695. media_id: str,
  696. output_stream: BinaryIO,
  697. max_size: int,
  698. max_timeout_ms: int,
  699. ) -> Tuple[int, Dict[bytes, List[bytes]]]:
  700. path = f"/_matrix/media/r0/download/{destination}/{media_id}"
  701. return await self.client.get_file(
  702. destination,
  703. path,
  704. output_stream=output_stream,
  705. max_size=max_size,
  706. args={
  707. # tell the remote server to 404 if it doesn't
  708. # recognise the server_name, to make sure we don't
  709. # end up with a routing loop.
  710. "allow_remote": "false",
  711. "timeout_ms": str(max_timeout_ms),
  712. },
  713. )
  714. async def download_media_v3(
  715. self,
  716. destination: str,
  717. media_id: str,
  718. output_stream: BinaryIO,
  719. max_size: int,
  720. max_timeout_ms: int,
  721. ) -> Tuple[int, Dict[bytes, List[bytes]]]:
  722. path = f"/_matrix/media/v3/download/{destination}/{media_id}"
  723. return await self.client.get_file(
  724. destination,
  725. path,
  726. output_stream=output_stream,
  727. max_size=max_size,
  728. args={
  729. # tell the remote server to 404 if it doesn't
  730. # recognise the server_name, to make sure we don't
  731. # end up with a routing loop.
  732. "allow_remote": "false",
  733. "timeout_ms": str(max_timeout_ms),
  734. # Matrix 1.7 allows for this to redirect to another URL, this should
  735. # just be ignored for an old homeserver, so always provide it.
  736. "allow_redirect": "true",
  737. },
  738. follow_redirects=True,
  739. )
  740. def _create_path(federation_prefix: str, path: str, *args: str) -> str:
  741. """
  742. Ensures that all args are url encoded.
  743. """
  744. return federation_prefix + path % tuple(urllib.parse.quote(arg, "") for arg in args)
  745. def _create_v1_path(path: str, *args: str) -> str:
  746. """Creates a path against V1 federation API from the path template and
  747. args. Ensures that all args are url encoded.
  748. Example:
  749. _create_v1_path("/event/%s", event_id)
  750. Args:
  751. path: String template for the path
  752. args: Args to insert into path. Each arg will be url encoded
  753. """
  754. return _create_path(FEDERATION_V1_PREFIX, path, *args)
  755. def _create_v2_path(path: str, *args: str) -> str:
  756. """Creates a path against V2 federation API from the path template and
  757. args. Ensures that all args are url encoded.
  758. Example:
  759. _create_v2_path("/event/%s", event_id)
  760. Args:
  761. path: String template for the path
  762. args: Args to insert into path. Each arg will be url encoded
  763. """
  764. return _create_path(FEDERATION_V2_PREFIX, path, *args)
  765. @attr.s(slots=True, auto_attribs=True)
  766. class SendJoinResponse:
  767. """The parsed response of a `/send_join` request."""
  768. # The list of auth events from the /send_join response.
  769. auth_events: List[EventBase]
  770. # The list of state from the /send_join response.
  771. state: List[EventBase]
  772. # The raw join event from the /send_join response.
  773. event_dict: JsonDict
  774. # The parsed join event from the /send_join response. This will be None if
  775. # "event" is not included in the response.
  776. event: Optional[EventBase] = None
  777. # The room state is incomplete
  778. members_omitted: bool = False
  779. # List of servers in the room
  780. servers_in_room: Optional[List[str]] = None
  781. @attr.s(slots=True, auto_attribs=True)
  782. class StateRequestResponse:
  783. """The parsed response of a `/state` request."""
  784. auth_events: List[EventBase]
  785. state: List[EventBase]
  786. @ijson.coroutine
  787. def _event_parser(event_dict: JsonDict) -> Generator[None, Tuple[str, Any], None]:
  788. """Helper function for use with `ijson.kvitems_coro` to parse key-value pairs
  789. to add them to a given dictionary.
  790. """
  791. while True:
  792. key, value = yield
  793. event_dict[key] = value
  794. @ijson.coroutine
  795. def _event_list_parser(
  796. room_version: RoomVersion, events: List[EventBase]
  797. ) -> Generator[None, JsonDict, None]:
  798. """Helper function for use with `ijson.items_coro` to parse an array of
  799. events and add them to the given list.
  800. """
  801. while True:
  802. obj = yield
  803. event = make_event_from_dict(obj, room_version)
  804. events.append(event)
  805. @ijson.coroutine
  806. def _members_omitted_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
  807. """Helper function for use with `ijson.items_coro`
  808. Parses the members_omitted field in send_join responses
  809. """
  810. while True:
  811. val = yield
  812. if not isinstance(val, bool):
  813. raise TypeError("members_omitted must be a boolean")
  814. response.members_omitted = val
  815. @ijson.coroutine
  816. def _servers_in_room_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
  817. """Helper function for use with `ijson.items_coro`
  818. Parses the servers_in_room field in send_join responses
  819. """
  820. while True:
  821. val = yield
  822. if not isinstance(val, list) or any(not isinstance(x, str) for x in val):
  823. raise TypeError("servers_in_room must be a list of strings")
  824. response.servers_in_room = val
  825. class SendJoinParser(ByteParser[SendJoinResponse]):
  826. """A parser for the response to `/send_join` requests.
  827. Args:
  828. room_version: The version of the room.
  829. v1_api: Whether the response is in the v1 format.
  830. """
  831. CONTENT_TYPE = "application/json"
  832. # /send_join responses can be huge, so we override the size limit here. The response
  833. # is parsed in a streaming manner, which helps alleviate the issue of memory
  834. # usage a bit.
  835. MAX_RESPONSE_SIZE = 500 * 1024 * 1024
  836. def __init__(self, room_version: RoomVersion, v1_api: bool):
  837. self._response = SendJoinResponse([], [], event_dict={})
  838. self._room_version = room_version
  839. self._coros: List[Generator[None, bytes, None]] = []
  840. # The V1 API has the shape of `[200, {...}]`, which we handle by
  841. # prefixing with `item.*`.
  842. prefix = "item." if v1_api else ""
  843. self._coros = [
  844. ijson.items_coro(
  845. _event_list_parser(room_version, self._response.state),
  846. prefix + "state.item",
  847. use_float=True,
  848. ),
  849. ijson.items_coro(
  850. _event_list_parser(room_version, self._response.auth_events),
  851. prefix + "auth_chain.item",
  852. use_float=True,
  853. ),
  854. ijson.kvitems_coro(
  855. _event_parser(self._response.event_dict),
  856. prefix + "event",
  857. use_float=True,
  858. ),
  859. ]
  860. if not v1_api:
  861. self._coros.append(
  862. ijson.items_coro(
  863. _members_omitted_parser(self._response),
  864. "members_omitted",
  865. use_float="True",
  866. )
  867. )
  868. # Again, stable field name comes last
  869. self._coros.append(
  870. ijson.items_coro(
  871. _servers_in_room_parser(self._response),
  872. "servers_in_room",
  873. use_float="True",
  874. )
  875. )
  876. def write(self, data: bytes) -> int:
  877. for c in self._coros:
  878. c.send(data)
  879. return len(data)
  880. def finish(self) -> SendJoinResponse:
  881. _close_coros(self._coros)
  882. if self._response.event_dict:
  883. self._response.event = make_event_from_dict(
  884. self._response.event_dict, self._room_version
  885. )
  886. return self._response
  887. class _StateParser(ByteParser[StateRequestResponse]):
  888. """A parser for the response to `/state` requests.
  889. Args:
  890. room_version: The version of the room.
  891. """
  892. CONTENT_TYPE = "application/json"
  893. # As with /send_join, /state responses can be huge.
  894. MAX_RESPONSE_SIZE = 500 * 1024 * 1024
  895. def __init__(self, room_version: RoomVersion):
  896. self._response = StateRequestResponse([], [])
  897. self._room_version = room_version
  898. self._coros: List[Generator[None, bytes, None]] = [
  899. ijson.items_coro(
  900. _event_list_parser(room_version, self._response.state),
  901. "pdus.item",
  902. use_float=True,
  903. ),
  904. ijson.items_coro(
  905. _event_list_parser(room_version, self._response.auth_events),
  906. "auth_chain.item",
  907. use_float=True,
  908. ),
  909. ]
  910. def write(self, data: bytes) -> int:
  911. for c in self._coros:
  912. c.send(data)
  913. return len(data)
  914. def finish(self) -> StateRequestResponse:
  915. _close_coros(self._coros)
  916. return self._response
  917. def _close_coros(coros: Iterable[Generator[None, bytes, None]]) -> None:
  918. """Close each of the given coroutines.
  919. Always calls .close() on each coroutine, even if doing so raises an exception.
  920. Any exceptions raised are aggregated into an ExceptionBundle.
  921. :raises ExceptionBundle: if at least one coroutine fails to close.
  922. """
  923. exceptions = []
  924. for c in coros:
  925. try:
  926. c.close()
  927. except Exception as e:
  928. exceptions.append(e)
  929. if exceptions:
  930. # raise from the first exception so that the traceback has slightly more context
  931. raise ExceptionBundle(
  932. f"There were {len(exceptions)} errors closing coroutines", exceptions
  933. ) from exceptions[0]