Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.
 
 
 
 
 
 

1501 rader
53 KiB

  1. # Copyright 2014-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import abc
  15. import cgi
  16. import codecs
  17. import logging
  18. import random
  19. import sys
  20. import urllib.parse
  21. from http import HTTPStatus
  22. from io import BytesIO, StringIO
  23. from typing import (
  24. TYPE_CHECKING,
  25. Any,
  26. BinaryIO,
  27. Callable,
  28. Dict,
  29. Generic,
  30. List,
  31. Optional,
  32. TextIO,
  33. Tuple,
  34. TypeVar,
  35. Union,
  36. cast,
  37. overload,
  38. )
  39. import attr
  40. import treq
  41. from canonicaljson import encode_canonical_json
  42. from prometheus_client import Counter
  43. from signedjson.sign import sign_json
  44. from typing_extensions import Literal
  45. from twisted.internet import defer
  46. from twisted.internet.error import DNSLookupError
  47. from twisted.internet.interfaces import IReactorTime
  48. from twisted.internet.task import Cooperator
  49. from twisted.web.client import ResponseFailed
  50. from twisted.web.http_headers import Headers
  51. from twisted.web.iweb import IAgent, IBodyProducer, IResponse
  52. import synapse.metrics
  53. import synapse.util.retryutils
  54. from synapse.api.errors import (
  55. Codes,
  56. FederationDeniedError,
  57. HttpResponseException,
  58. RequestSendFailed,
  59. SynapseError,
  60. )
  61. from synapse.crypto.context_factory import FederationPolicyForHTTPS
  62. from synapse.http import QuieterFileBodyProducer
  63. from synapse.http.client import (
  64. BlocklistingAgentWrapper,
  65. BodyExceededMaxSize,
  66. ByteWriteable,
  67. _make_scheduler,
  68. encode_query_args,
  69. read_body_with_max_size,
  70. )
  71. from synapse.http.connectproxyclient import BearerProxyCredentials
  72. from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
  73. from synapse.http.proxyagent import ProxyAgent
  74. from synapse.http.types import QueryParams
  75. from synapse.logging import opentracing
  76. from synapse.logging.context import make_deferred_yieldable, run_in_background
  77. from synapse.logging.opentracing import set_tag, start_active_span, tags
  78. from synapse.types import JsonDict
  79. from synapse.util import json_decoder
  80. from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
  81. from synapse.util.metrics import Measure
  82. from synapse.util.stringutils import parse_and_validate_server_name
  83. if TYPE_CHECKING:
  84. from synapse.server import HomeServer
  85. logger = logging.getLogger(__name__)
  86. outgoing_requests_counter = Counter(
  87. "synapse_http_matrixfederationclient_requests", "", ["method"]
  88. )
  89. incoming_responses_counter = Counter(
  90. "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
  91. )
  92. MAXINT = sys.maxsize
  93. _next_id = 1
  94. T = TypeVar("T")
  95. class ByteParser(ByteWriteable, Generic[T], abc.ABC):
  96. """A `ByteWriteable` that has an additional `finish` function that returns
  97. the parsed data.
  98. """
  99. CONTENT_TYPE: str = abc.abstractproperty() # type: ignore
  100. """The expected content type of the response, e.g. `application/json`. If
  101. the content type doesn't match we fail the request.
  102. """
  103. # a federation response can be rather large (eg a big state_ids is 50M or so), so we
  104. # need a generous limit here.
  105. MAX_RESPONSE_SIZE: int = 100 * 1024 * 1024
  106. """The largest response this parser will accept."""
  107. @abc.abstractmethod
  108. def finish(self) -> T:
  109. """Called when response has finished streaming and the parser should
  110. return the final result (or error).
  111. """
  112. @attr.s(slots=True, frozen=True, auto_attribs=True)
  113. class MatrixFederationRequest:
  114. method: str
  115. """HTTP method
  116. """
  117. path: str
  118. """HTTP path
  119. """
  120. destination: str
  121. """The remote server to send the HTTP request to.
  122. """
  123. json: Optional[JsonDict] = None
  124. """JSON to send in the body.
  125. """
  126. json_callback: Optional[Callable[[], JsonDict]] = None
  127. """A callback to generate the JSON.
  128. """
  129. query: Optional[QueryParams] = None
  130. """Query arguments.
  131. """
  132. txn_id: Optional[str] = None
  133. """Unique ID for this request (for logging)
  134. """
  135. uri: bytes = attr.ib(init=False)
  136. """The URI of this request
  137. """
  138. def __attrs_post_init__(self) -> None:
  139. global _next_id
  140. txn_id = "%s-O-%s" % (self.method, _next_id)
  141. _next_id = (_next_id + 1) % (MAXINT - 1)
  142. object.__setattr__(self, "txn_id", txn_id)
  143. destination_bytes = self.destination.encode("ascii")
  144. path_bytes = self.path.encode("ascii")
  145. query_bytes = encode_query_args(self.query)
  146. # The object is frozen so we can pre-compute this.
  147. uri = urllib.parse.urlunparse(
  148. (
  149. b"matrix-federation",
  150. destination_bytes,
  151. path_bytes,
  152. None,
  153. query_bytes,
  154. b"",
  155. )
  156. )
  157. object.__setattr__(self, "uri", uri)
  158. def get_json(self) -> Optional[JsonDict]:
  159. if self.json_callback:
  160. return self.json_callback()
  161. return self.json
  162. class _BaseJsonParser(ByteParser[T]):
  163. """A parser that buffers the response and tries to parse it as JSON."""
  164. CONTENT_TYPE = "application/json"
  165. def __init__(
  166. self, validator: Optional[Callable[[Optional[object]], bool]] = None
  167. ) -> None:
  168. """
  169. Args:
  170. validator: A callable which takes the parsed JSON value and returns
  171. true if the value is valid.
  172. """
  173. self._buffer = StringIO()
  174. self._binary_wrapper = BinaryIOWrapper(self._buffer)
  175. self._validator = validator
  176. def write(self, data: bytes) -> int:
  177. return self._binary_wrapper.write(data)
  178. def finish(self) -> T:
  179. result = json_decoder.decode(self._buffer.getvalue())
  180. if self._validator is not None and not self._validator(result):
  181. raise ValueError(
  182. f"Received incorrect JSON value: {result.__class__.__name__}"
  183. )
  184. return result
  185. class JsonParser(_BaseJsonParser[JsonDict]):
  186. """A parser that buffers the response and tries to parse it as a JSON object."""
  187. def __init__(self) -> None:
  188. super().__init__(self._validate)
  189. @staticmethod
  190. def _validate(v: Any) -> bool:
  191. return isinstance(v, dict)
  192. class LegacyJsonSendParser(_BaseJsonParser[Tuple[int, JsonDict]]):
  193. """Ensure the legacy responses of /send_join & /send_leave are correct."""
  194. def __init__(self) -> None:
  195. super().__init__(self._validate)
  196. @staticmethod
  197. def _validate(v: Any) -> bool:
  198. # Match [integer, JSON dict]
  199. return (
  200. isinstance(v, list)
  201. and len(v) == 2
  202. and type(v[0]) == int # noqa: E721
  203. and isinstance(v[1], dict)
  204. )
  205. async def _handle_response(
  206. reactor: IReactorTime,
  207. timeout_sec: float,
  208. request: MatrixFederationRequest,
  209. response: IResponse,
  210. start_ms: int,
  211. parser: ByteParser[T],
  212. ) -> T:
  213. """
  214. Reads the body of a response with a timeout and sends it to a parser
  215. Args:
  216. reactor: twisted reactor, for the timeout
  217. timeout_sec: number of seconds to wait for response to complete
  218. request: the request that triggered the response
  219. response: response to the request
  220. start_ms: Timestamp when request was made
  221. parser: The parser for the response
  222. Returns:
  223. The parsed response
  224. """
  225. max_response_size = parser.MAX_RESPONSE_SIZE
  226. finished = False
  227. try:
  228. check_content_type_is(response.headers, parser.CONTENT_TYPE)
  229. d = read_body_with_max_size(response, parser, max_response_size)
  230. d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
  231. length = await make_deferred_yieldable(d)
  232. finished = True
  233. value = parser.finish()
  234. except BodyExceededMaxSize as e:
  235. # The response was too big.
  236. logger.warning(
  237. "{%s} [%s] JSON response exceeded max size %i - %s %s",
  238. request.txn_id,
  239. request.destination,
  240. max_response_size,
  241. request.method,
  242. request.uri.decode("ascii"),
  243. )
  244. raise RequestSendFailed(e, can_retry=False) from e
  245. except ValueError as e:
  246. # The content was invalid.
  247. logger.warning(
  248. "{%s} [%s] Failed to parse response - %s %s",
  249. request.txn_id,
  250. request.destination,
  251. request.method,
  252. request.uri.decode("ascii"),
  253. )
  254. raise RequestSendFailed(e, can_retry=False) from e
  255. except defer.TimeoutError as e:
  256. logger.warning(
  257. "{%s} [%s] Timed out reading response - %s %s",
  258. request.txn_id,
  259. request.destination,
  260. request.method,
  261. request.uri.decode("ascii"),
  262. )
  263. raise RequestSendFailed(e, can_retry=True) from e
  264. except ResponseFailed as e:
  265. logger.warning(
  266. "{%s} [%s] Failed to read response - %s %s",
  267. request.txn_id,
  268. request.destination,
  269. request.method,
  270. request.uri.decode("ascii"),
  271. )
  272. raise RequestSendFailed(e, can_retry=True) from e
  273. except Exception as e:
  274. logger.warning(
  275. "{%s} [%s] Error reading response %s %s: %s",
  276. request.txn_id,
  277. request.destination,
  278. request.method,
  279. request.uri.decode("ascii"),
  280. e,
  281. )
  282. raise
  283. finally:
  284. if not finished:
  285. # There was an exception and we didn't `finish()` the parse.
  286. # Let the parser know that it can free up any resources.
  287. try:
  288. parser.finish()
  289. except Exception:
  290. # Ignore any additional exceptions.
  291. pass
  292. time_taken_secs = reactor.seconds() - start_ms / 1000
  293. logger.info(
  294. "{%s} [%s] Completed request: %d %s in %.2f secs, got %d bytes - %s %s",
  295. request.txn_id,
  296. request.destination,
  297. response.code,
  298. response.phrase.decode("ascii", errors="replace"),
  299. time_taken_secs,
  300. length,
  301. request.method,
  302. request.uri.decode("ascii"),
  303. )
  304. return value
  305. class BinaryIOWrapper:
  306. """A wrapper for a TextIO which converts from bytes on the fly."""
  307. def __init__(self, file: TextIO, encoding: str = "utf-8", errors: str = "strict"):
  308. self.decoder = codecs.getincrementaldecoder(encoding)(errors)
  309. self.file = file
  310. def write(self, b: Union[bytes, bytearray]) -> int:
  311. self.file.write(self.decoder.decode(b))
  312. return len(b)
  313. class MatrixFederationHttpClient:
  314. """HTTP client used to talk to other homeservers over the federation
  315. protocol. Send client certificates and signs requests.
  316. Attributes:
  317. agent (twisted.web.client.Agent): The twisted Agent used to send the
  318. requests.
  319. """
  320. def __init__(
  321. self,
  322. hs: "HomeServer",
  323. tls_client_options_factory: Optional[FederationPolicyForHTTPS],
  324. ):
  325. self.hs = hs
  326. self.signing_key = hs.signing_key
  327. self.server_name = hs.hostname
  328. self.reactor = hs.get_reactor()
  329. user_agent = hs.version_string
  330. if hs.config.server.user_agent_suffix:
  331. user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
  332. outbound_federation_restricted_to = (
  333. hs.config.worker.outbound_federation_restricted_to
  334. )
  335. if hs.get_instance_name() in outbound_federation_restricted_to:
  336. # Talk to federation directly
  337. federation_agent: IAgent = MatrixFederationAgent(
  338. self.reactor,
  339. tls_client_options_factory,
  340. user_agent.encode("ascii"),
  341. hs.config.server.federation_ip_range_allowlist,
  342. hs.config.server.federation_ip_range_blocklist,
  343. )
  344. else:
  345. proxy_authorization_secret = hs.config.worker.worker_replication_secret
  346. assert (
  347. proxy_authorization_secret is not None
  348. ), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)"
  349. federation_proxy_credentials = BearerProxyCredentials(
  350. proxy_authorization_secret.encode("ascii")
  351. )
  352. # We need to talk to federation via the proxy via one of the configured
  353. # locations
  354. federation_proxy_locations = outbound_federation_restricted_to.locations
  355. federation_agent = ProxyAgent(
  356. self.reactor,
  357. self.reactor,
  358. tls_client_options_factory,
  359. federation_proxy_locations=federation_proxy_locations,
  360. federation_proxy_credentials=federation_proxy_credentials,
  361. )
  362. # Use a BlocklistingAgentWrapper to prevent circumventing the IP
  363. # blocking via IP literals in server names
  364. self.agent: IAgent = BlocklistingAgentWrapper(
  365. federation_agent,
  366. ip_blocklist=hs.config.server.federation_ip_range_blocklist,
  367. )
  368. self.clock = hs.get_clock()
  369. self._store = hs.get_datastores().main
  370. self.version_string_bytes = hs.version_string.encode("ascii")
  371. self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
  372. self.max_long_retry_delay_seconds = (
  373. hs.config.federation.max_long_retry_delay_ms / 1000
  374. )
  375. self.max_short_retry_delay_seconds = (
  376. hs.config.federation.max_short_retry_delay_ms / 1000
  377. )
  378. self.max_long_retries = hs.config.federation.max_long_retries
  379. self.max_short_retries = hs.config.federation.max_short_retries
  380. self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor))
  381. self._sleeper = AwakenableSleeper(self.reactor)
  382. def wake_destination(self, destination: str) -> None:
  383. """Called when the remote server may have come back online."""
  384. self._sleeper.wake(destination)
  385. async def _send_request_with_optional_trailing_slash(
  386. self,
  387. request: MatrixFederationRequest,
  388. try_trailing_slash_on_400: bool = False,
  389. **send_request_args: Any,
  390. ) -> IResponse:
  391. """Wrapper for _send_request which can optionally retry the request
  392. upon receiving a combination of a 400 HTTP response code and a
  393. 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
  394. due to #3622.
  395. Args:
  396. request: details of request to be sent
  397. try_trailing_slash_on_400: Whether on receiving a 400
  398. 'M_UNRECOGNIZED' from the server to retry the request with a
  399. trailing slash appended to the request path.
  400. send_request_args: A dictionary of arguments to pass to `_send_request()`.
  401. Raises:
  402. HttpResponseException: If we get an HTTP response code >= 300
  403. (except 429).
  404. Returns:
  405. Parsed JSON response body.
  406. """
  407. try:
  408. response = await self._send_request(request, **send_request_args)
  409. except HttpResponseException as e:
  410. # Received an HTTP error > 300. Check if it meets the requirements
  411. # to retry with a trailing slash
  412. if not try_trailing_slash_on_400:
  413. raise
  414. if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
  415. raise
  416. # Retry with a trailing slash if we received a 400 with
  417. # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
  418. # trailing slash on Synapse <= v0.99.3.
  419. logger.info("Retrying request with trailing slash")
  420. # Request is frozen so we create a new instance
  421. request = attr.evolve(request, path=request.path + "/")
  422. response = await self._send_request(request, **send_request_args)
  423. return response
  424. async def _send_request(
  425. self,
  426. request: MatrixFederationRequest,
  427. retry_on_dns_fail: bool = True,
  428. timeout: Optional[int] = None,
  429. long_retries: bool = False,
  430. ignore_backoff: bool = False,
  431. backoff_on_404: bool = False,
  432. ) -> IResponse:
  433. """
  434. Sends a request to the given server.
  435. Args:
  436. request: details of request to be sent
  437. retry_on_dns_fail: true if the request should be retried on DNS failures
  438. timeout: number of milliseconds to wait for the response headers
  439. (including connecting to the server), *for each attempt*.
  440. 60s by default.
  441. long_retries: whether to use the long retry algorithm.
  442. The regular retry algorithm makes 4 attempts, with intervals
  443. [0.5s, 1s, 2s].
  444. The long retry algorithm makes 11 attempts, with intervals
  445. [4s, 16s, 60s, 60s, ...]
  446. Both algorithms add -20%/+40% jitter to the retry intervals.
  447. Note that the above intervals are *in addition* to the time spent
  448. waiting for the request to complete (up to `timeout` ms).
  449. NB: the long retry algorithm takes over 20 minutes to complete, with a
  450. default timeout of 60s! It's best not to use the `long_retries` option
  451. for something that is blocking a client so we don't make them wait for
  452. aaaaages, whereas some things like sending transactions (server to
  453. server) we can be a lot more lenient but its very fuzzy / hand-wavey.
  454. In the future, we could be more intelligent about doing this sort of
  455. thing by looking at things with the bigger picture in mind,
  456. https://github.com/matrix-org/synapse/issues/8917
  457. ignore_backoff: true to ignore the historical backoff data
  458. and try the request anyway.
  459. backoff_on_404: Back off if we get a 404
  460. Returns:
  461. Resolves with the HTTP response object on success.
  462. Raises:
  463. HttpResponseException: If we get an HTTP response code >= 300
  464. (except 429).
  465. NotRetryingDestination: If we are not yet ready to retry this
  466. server.
  467. FederationDeniedError: If this destination is not on our
  468. federation whitelist
  469. RequestSendFailed: If there were problems connecting to the
  470. remote, due to e.g. DNS failures, connection timeouts etc.
  471. """
  472. # Validate server name and log if it is an invalid destination, this is
  473. # partially to help track down code paths where we haven't validated before here
  474. try:
  475. parse_and_validate_server_name(request.destination)
  476. except ValueError:
  477. logger.exception(f"Invalid destination: {request.destination}.")
  478. raise FederationDeniedError(request.destination)
  479. if timeout is not None:
  480. _sec_timeout = timeout / 1000
  481. else:
  482. _sec_timeout = self.default_timeout_seconds
  483. if (
  484. self.hs.config.federation.federation_domain_whitelist is not None
  485. and request.destination
  486. not in self.hs.config.federation.federation_domain_whitelist
  487. ):
  488. raise FederationDeniedError(request.destination)
  489. limiter = await synapse.util.retryutils.get_retry_limiter(
  490. request.destination,
  491. self.clock,
  492. self._store,
  493. backoff_on_404=backoff_on_404,
  494. ignore_backoff=ignore_backoff,
  495. notifier=self.hs.get_notifier(),
  496. replication_client=self.hs.get_replication_command_handler(),
  497. )
  498. method_bytes = request.method.encode("ascii")
  499. destination_bytes = request.destination.encode("ascii")
  500. path_bytes = request.path.encode("ascii")
  501. query_bytes = encode_query_args(request.query)
  502. scope = start_active_span(
  503. "outgoing-federation-request",
  504. tags={
  505. tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
  506. tags.PEER_ADDRESS: request.destination,
  507. tags.HTTP_METHOD: request.method,
  508. tags.HTTP_URL: request.path,
  509. },
  510. finish_on_close=True,
  511. )
  512. # Inject the span into the headers
  513. headers_dict: Dict[bytes, List[bytes]] = {}
  514. opentracing.inject_header_dict(headers_dict, request.destination)
  515. headers_dict[b"User-Agent"] = [self.version_string_bytes]
  516. with limiter, scope:
  517. # XXX: Would be much nicer to retry only at the transaction-layer
  518. # (once we have reliable transactions in place)
  519. if long_retries:
  520. retries_left = self.max_long_retries
  521. else:
  522. retries_left = self.max_short_retries
  523. url_bytes = request.uri
  524. url_str = url_bytes.decode("ascii")
  525. url_to_sign_bytes = urllib.parse.urlunparse(
  526. (b"", b"", path_bytes, None, query_bytes, b"")
  527. )
  528. while True:
  529. try:
  530. json = request.get_json()
  531. if json:
  532. headers_dict[b"Content-Type"] = [b"application/json"]
  533. auth_headers = self.build_auth_headers(
  534. destination_bytes, method_bytes, url_to_sign_bytes, json
  535. )
  536. data = encode_canonical_json(json)
  537. producer: Optional[IBodyProducer] = QuieterFileBodyProducer(
  538. BytesIO(data), cooperator=self._cooperator
  539. )
  540. else:
  541. producer = None
  542. auth_headers = self.build_auth_headers(
  543. destination_bytes, method_bytes, url_to_sign_bytes
  544. )
  545. headers_dict[b"Authorization"] = auth_headers
  546. logger.debug(
  547. "{%s} [%s] Sending request: %s %s; timeout %fs",
  548. request.txn_id,
  549. request.destination,
  550. request.method,
  551. url_str,
  552. _sec_timeout,
  553. )
  554. outgoing_requests_counter.labels(request.method).inc()
  555. try:
  556. with Measure(self.clock, "outbound_request"):
  557. # we don't want all the fancy cookie and redirect handling
  558. # that treq.request gives: just use the raw Agent.
  559. # To preserve the logging context, the timeout is treated
  560. # in a similar way to `defer.gatherResults`:
  561. # * Each logging context-preserving fork is wrapped in
  562. # `run_in_background`. In this case there is only one,
  563. # since the timeout fork is not logging-context aware.
  564. # * The `Deferred` that joins the forks back together is
  565. # wrapped in `make_deferred_yieldable` to restore the
  566. # logging context regardless of the path taken.
  567. request_deferred = run_in_background(
  568. self.agent.request,
  569. method_bytes,
  570. url_bytes,
  571. headers=Headers(headers_dict),
  572. bodyProducer=producer,
  573. )
  574. request_deferred = timeout_deferred(
  575. request_deferred,
  576. timeout=_sec_timeout,
  577. reactor=self.reactor,
  578. )
  579. response = await make_deferred_yieldable(request_deferred)
  580. except DNSLookupError as e:
  581. raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
  582. except Exception as e:
  583. raise RequestSendFailed(e, can_retry=True) from e
  584. incoming_responses_counter.labels(
  585. request.method, response.code
  586. ).inc()
  587. set_tag(tags.HTTP_STATUS_CODE, response.code)
  588. response_phrase = response.phrase.decode("ascii", errors="replace")
  589. if 200 <= response.code < 300:
  590. logger.debug(
  591. "{%s} [%s] Got response headers: %d %s",
  592. request.txn_id,
  593. request.destination,
  594. response.code,
  595. response_phrase,
  596. )
  597. else:
  598. logger.info(
  599. "{%s} [%s] Got response headers: %d %s",
  600. request.txn_id,
  601. request.destination,
  602. response.code,
  603. response_phrase,
  604. )
  605. # :'(
  606. # Update transactions table?
  607. d = treq.content(response)
  608. d = timeout_deferred(
  609. d, timeout=_sec_timeout, reactor=self.reactor
  610. )
  611. try:
  612. body = await make_deferred_yieldable(d)
  613. except Exception as e:
  614. # Eh, we're already going to raise an exception so lets
  615. # ignore if this fails.
  616. logger.warning(
  617. "{%s} [%s] Failed to get error response: %s %s: %s",
  618. request.txn_id,
  619. request.destination,
  620. request.method,
  621. url_str,
  622. _flatten_response_never_received(e),
  623. )
  624. body = None
  625. exc = HttpResponseException(
  626. response.code, response_phrase, body
  627. )
  628. # Retry if the error is a 5xx or a 429 (Too Many
  629. # Requests), otherwise just raise a standard
  630. # `HttpResponseException`
  631. if 500 <= response.code < 600 or response.code == 429:
  632. raise RequestSendFailed(exc, can_retry=True) from exc
  633. else:
  634. raise exc
  635. break
  636. except RequestSendFailed as e:
  637. logger.info(
  638. "{%s} [%s] Request failed: %s %s: %s",
  639. request.txn_id,
  640. request.destination,
  641. request.method,
  642. url_str,
  643. _flatten_response_never_received(e.inner_exception),
  644. )
  645. if not e.can_retry:
  646. raise
  647. if retries_left and not timeout:
  648. if long_retries:
  649. delay_seconds = 4 ** (
  650. self.max_long_retries + 1 - retries_left
  651. )
  652. delay_seconds = min(
  653. delay_seconds, self.max_long_retry_delay_seconds
  654. )
  655. delay_seconds *= random.uniform(0.8, 1.4)
  656. else:
  657. delay_seconds = 0.5 * 2 ** (
  658. self.max_short_retries - retries_left
  659. )
  660. delay_seconds = min(
  661. delay_seconds, self.max_short_retry_delay_seconds
  662. )
  663. delay_seconds *= random.uniform(0.8, 1.4)
  664. logger.debug(
  665. "{%s} [%s] Waiting %ss before re-sending...",
  666. request.txn_id,
  667. request.destination,
  668. delay_seconds,
  669. )
  670. # Sleep for the calculated delay, or wake up immediately
  671. # if we get notified that the server is back up.
  672. await self._sleeper.sleep(
  673. request.destination, delay_seconds * 1000
  674. )
  675. retries_left -= 1
  676. else:
  677. raise
  678. except Exception as e:
  679. logger.warning(
  680. "{%s} [%s] Request failed: %s %s: %s",
  681. request.txn_id,
  682. request.destination,
  683. request.method,
  684. url_str,
  685. _flatten_response_never_received(e),
  686. )
  687. raise
  688. return response
  689. def build_auth_headers(
  690. self,
  691. destination: Optional[bytes],
  692. method: bytes,
  693. url_bytes: bytes,
  694. content: Optional[JsonDict] = None,
  695. destination_is: Optional[bytes] = None,
  696. ) -> List[bytes]:
  697. """
  698. Builds the Authorization headers for a federation request
  699. Args:
  700. destination: The destination homeserver of the request.
  701. May be None if the destination is an identity server, in which case
  702. destination_is must be non-None.
  703. method: The HTTP method of the request
  704. url_bytes: The URI path of the request
  705. content: The body of the request
  706. destination_is: As 'destination', but if the destination is an
  707. identity server
  708. Returns:
  709. A list of headers to be added as "Authorization:" headers
  710. """
  711. if not destination and not destination_is:
  712. raise ValueError(
  713. "At least one of the arguments destination and destination_is "
  714. "must be a nonempty bytestring."
  715. )
  716. request: JsonDict = {
  717. "method": method.decode("ascii"),
  718. "uri": url_bytes.decode("ascii"),
  719. "origin": self.server_name,
  720. }
  721. if destination is not None:
  722. request["destination"] = destination.decode("ascii")
  723. if destination_is is not None:
  724. request["destination_is"] = destination_is.decode("ascii")
  725. if content is not None:
  726. request["content"] = content
  727. request = sign_json(request, self.server_name, self.signing_key)
  728. auth_headers = []
  729. for key, sig in request["signatures"][self.server_name].items():
  730. auth_headers.append(
  731. (
  732. 'X-Matrix origin="%s",key="%s",sig="%s",destination="%s"'
  733. % (
  734. self.server_name,
  735. key,
  736. sig,
  737. request.get("destination") or request["destination_is"],
  738. )
  739. ).encode("ascii")
  740. )
  741. return auth_headers
  742. @overload
  743. async def put_json(
  744. self,
  745. destination: str,
  746. path: str,
  747. args: Optional[QueryParams] = None,
  748. data: Optional[JsonDict] = None,
  749. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  750. long_retries: bool = False,
  751. timeout: Optional[int] = None,
  752. ignore_backoff: bool = False,
  753. backoff_on_404: bool = False,
  754. try_trailing_slash_on_400: bool = False,
  755. parser: Literal[None] = None,
  756. ) -> JsonDict:
  757. ...
  758. @overload
  759. async def put_json(
  760. self,
  761. destination: str,
  762. path: str,
  763. args: Optional[QueryParams] = None,
  764. data: Optional[JsonDict] = None,
  765. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  766. long_retries: bool = False,
  767. timeout: Optional[int] = None,
  768. ignore_backoff: bool = False,
  769. backoff_on_404: bool = False,
  770. try_trailing_slash_on_400: bool = False,
  771. parser: Optional[ByteParser[T]] = None,
  772. ) -> T:
  773. ...
  774. async def put_json(
  775. self,
  776. destination: str,
  777. path: str,
  778. args: Optional[QueryParams] = None,
  779. data: Optional[JsonDict] = None,
  780. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  781. long_retries: bool = False,
  782. timeout: Optional[int] = None,
  783. ignore_backoff: bool = False,
  784. backoff_on_404: bool = False,
  785. try_trailing_slash_on_400: bool = False,
  786. parser: Optional[ByteParser[T]] = None,
  787. ) -> Union[JsonDict, T]:
  788. """Sends the specified json data using PUT
  789. Args:
  790. destination: The remote server to send the HTTP request to.
  791. path: The HTTP path.
  792. args: query params
  793. data: A dict containing the data that will be used as
  794. the request body. This will be encoded as JSON.
  795. json_data_callback: A callable returning the dict to
  796. use as the request body.
  797. long_retries: whether to use the long retry algorithm. See
  798. docs on _send_request for details.
  799. timeout: number of milliseconds to wait for the response.
  800. self._default_timeout (60s) by default.
  801. Note that we may make several attempts to send the request; this
  802. timeout applies to the time spent waiting for response headers for
  803. *each* attempt (including connection time) as well as the time spent
  804. reading the response body after a 200 response.
  805. ignore_backoff: true to ignore the historical backoff data
  806. and try the request anyway.
  807. backoff_on_404: True if we should count a 404 response as
  808. a failure of the server (and should therefore back off future
  809. requests).
  810. try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
  811. response we should try appending a trailing slash to the end
  812. of the request. Workaround for #3622 in Synapse <= v0.99.3. This
  813. will be attempted before backing off if backing off has been
  814. enabled.
  815. parser: The parser to use to decode the response. Defaults to
  816. parsing as JSON.
  817. Returns:
  818. Succeeds when we get a 2xx HTTP response. The
  819. result will be the decoded JSON body.
  820. Raises:
  821. HttpResponseException: If we get an HTTP response code >= 300
  822. (except 429).
  823. NotRetryingDestination: If we are not yet ready to retry this
  824. server.
  825. FederationDeniedError: If this destination is not on our
  826. federation whitelist
  827. RequestSendFailed: If there were problems connecting to the
  828. remote, due to e.g. DNS failures, connection timeouts etc.
  829. """
  830. request = MatrixFederationRequest(
  831. method="PUT",
  832. destination=destination,
  833. path=path,
  834. query=args,
  835. json_callback=json_data_callback,
  836. json=data,
  837. )
  838. start_ms = self.clock.time_msec()
  839. response = await self._send_request_with_optional_trailing_slash(
  840. request,
  841. try_trailing_slash_on_400,
  842. backoff_on_404=backoff_on_404,
  843. ignore_backoff=ignore_backoff,
  844. long_retries=long_retries,
  845. timeout=timeout,
  846. )
  847. if timeout is not None:
  848. _sec_timeout = timeout / 1000
  849. else:
  850. _sec_timeout = self.default_timeout_seconds
  851. if parser is None:
  852. parser = cast(ByteParser[T], JsonParser())
  853. body = await _handle_response(
  854. self.reactor,
  855. _sec_timeout,
  856. request,
  857. response,
  858. start_ms,
  859. parser=parser,
  860. )
  861. return body
  862. async def post_json(
  863. self,
  864. destination: str,
  865. path: str,
  866. data: Optional[JsonDict] = None,
  867. long_retries: bool = False,
  868. timeout: Optional[int] = None,
  869. ignore_backoff: bool = False,
  870. args: Optional[QueryParams] = None,
  871. ) -> JsonDict:
  872. """Sends the specified json data using POST
  873. Args:
  874. destination: The remote server to send the HTTP request to.
  875. path: The HTTP path.
  876. data: A dict containing the data that will be used as
  877. the request body. This will be encoded as JSON.
  878. long_retries: whether to use the long retry algorithm. See
  879. docs on _send_request for details.
  880. timeout: number of milliseconds to wait for the response.
  881. self._default_timeout (60s) by default.
  882. Note that we may make several attempts to send the request; this
  883. timeout applies to the time spent waiting for response headers for
  884. *each* attempt (including connection time) as well as the time spent
  885. reading the response body after a 200 response.
  886. ignore_backoff: true to ignore the historical backoff data and
  887. try the request anyway.
  888. args: query params
  889. Returns:
  890. Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body.
  891. Raises:
  892. HttpResponseException: If we get an HTTP response code >= 300
  893. (except 429).
  894. NotRetryingDestination: If we are not yet ready to retry this
  895. server.
  896. FederationDeniedError: If this destination is not on our
  897. federation whitelist
  898. RequestSendFailed: If there were problems connecting to the
  899. remote, due to e.g. DNS failures, connection timeouts etc.
  900. """
  901. request = MatrixFederationRequest(
  902. method="POST", destination=destination, path=path, query=args, json=data
  903. )
  904. start_ms = self.clock.time_msec()
  905. response = await self._send_request(
  906. request,
  907. long_retries=long_retries,
  908. timeout=timeout,
  909. ignore_backoff=ignore_backoff,
  910. )
  911. if timeout is not None:
  912. _sec_timeout = timeout / 1000
  913. else:
  914. _sec_timeout = self.default_timeout_seconds
  915. body = await _handle_response(
  916. self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
  917. )
  918. return body
  919. @overload
  920. async def get_json(
  921. self,
  922. destination: str,
  923. path: str,
  924. args: Optional[QueryParams] = None,
  925. retry_on_dns_fail: bool = True,
  926. timeout: Optional[int] = None,
  927. ignore_backoff: bool = False,
  928. try_trailing_slash_on_400: bool = False,
  929. parser: Literal[None] = None,
  930. ) -> JsonDict:
  931. ...
  932. @overload
  933. async def get_json(
  934. self,
  935. destination: str,
  936. path: str,
  937. args: Optional[QueryParams] = ...,
  938. retry_on_dns_fail: bool = ...,
  939. timeout: Optional[int] = ...,
  940. ignore_backoff: bool = ...,
  941. try_trailing_slash_on_400: bool = ...,
  942. parser: ByteParser[T] = ...,
  943. ) -> T:
  944. ...
  945. async def get_json(
  946. self,
  947. destination: str,
  948. path: str,
  949. args: Optional[QueryParams] = None,
  950. retry_on_dns_fail: bool = True,
  951. timeout: Optional[int] = None,
  952. ignore_backoff: bool = False,
  953. try_trailing_slash_on_400: bool = False,
  954. parser: Optional[ByteParser[T]] = None,
  955. ) -> Union[JsonDict, T]:
  956. """GETs some json from the given host homeserver and path
  957. Args:
  958. destination: The remote server to send the HTTP request to.
  959. path: The HTTP path.
  960. args: A dictionary used to create query strings, defaults to
  961. None.
  962. retry_on_dns_fail: true if the request should be retried on DNS failures
  963. timeout: number of milliseconds to wait for the response.
  964. self._default_timeout (60s) by default.
  965. Note that we may make several attempts to send the request; this
  966. timeout applies to the time spent waiting for response headers for
  967. *each* attempt (including connection time) as well as the time spent
  968. reading the response body after a 200 response.
  969. ignore_backoff: true to ignore the historical backoff data
  970. and try the request anyway.
  971. try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
  972. response we should try appending a trailing slash to the end of
  973. the request. Workaround for #3622 in Synapse <= v0.99.3.
  974. parser: The parser to use to decode the response. Defaults to
  975. parsing as JSON.
  976. Returns:
  977. Succeeds when we get a 2xx HTTP response. The
  978. result will be the decoded JSON body.
  979. Raises:
  980. HttpResponseException: If we get an HTTP response code >= 300
  981. (except 429).
  982. NotRetryingDestination: If we are not yet ready to retry this
  983. server.
  984. FederationDeniedError: If this destination is not on our
  985. federation whitelist
  986. RequestSendFailed: If there were problems connecting to the
  987. remote, due to e.g. DNS failures, connection timeouts etc.
  988. """
  989. json_dict, _ = await self.get_json_with_headers(
  990. destination=destination,
  991. path=path,
  992. args=args,
  993. retry_on_dns_fail=retry_on_dns_fail,
  994. timeout=timeout,
  995. ignore_backoff=ignore_backoff,
  996. try_trailing_slash_on_400=try_trailing_slash_on_400,
  997. parser=parser,
  998. )
  999. return json_dict
  1000. @overload
  1001. async def get_json_with_headers(
  1002. self,
  1003. destination: str,
  1004. path: str,
  1005. args: Optional[QueryParams] = None,
  1006. retry_on_dns_fail: bool = True,
  1007. timeout: Optional[int] = None,
  1008. ignore_backoff: bool = False,
  1009. try_trailing_slash_on_400: bool = False,
  1010. parser: Literal[None] = None,
  1011. ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]:
  1012. ...
  1013. @overload
  1014. async def get_json_with_headers(
  1015. self,
  1016. destination: str,
  1017. path: str,
  1018. args: Optional[QueryParams] = ...,
  1019. retry_on_dns_fail: bool = ...,
  1020. timeout: Optional[int] = ...,
  1021. ignore_backoff: bool = ...,
  1022. try_trailing_slash_on_400: bool = ...,
  1023. parser: ByteParser[T] = ...,
  1024. ) -> Tuple[T, Dict[bytes, List[bytes]]]:
  1025. ...
  1026. async def get_json_with_headers(
  1027. self,
  1028. destination: str,
  1029. path: str,
  1030. args: Optional[QueryParams] = None,
  1031. retry_on_dns_fail: bool = True,
  1032. timeout: Optional[int] = None,
  1033. ignore_backoff: bool = False,
  1034. try_trailing_slash_on_400: bool = False,
  1035. parser: Optional[ByteParser[T]] = None,
  1036. ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
  1037. """GETs some json from the given host homeserver and path
  1038. Args:
  1039. destination: The remote server to send the HTTP request to.
  1040. path: The HTTP path.
  1041. args: A dictionary used to create query strings, defaults to
  1042. None.
  1043. retry_on_dns_fail: true if the request should be retried on DNS failures
  1044. timeout: number of milliseconds to wait for the response.
  1045. self._default_timeout (60s) by default.
  1046. Note that we may make several attempts to send the request; this
  1047. timeout applies to the time spent waiting for response headers for
  1048. *each* attempt (including connection time) as well as the time spent
  1049. reading the response body after a 200 response.
  1050. ignore_backoff: true to ignore the historical backoff data
  1051. and try the request anyway.
  1052. try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
  1053. response we should try appending a trailing slash to the end of
  1054. the request. Workaround for #3622 in Synapse <= v0.99.3.
  1055. parser: The parser to use to decode the response. Defaults to
  1056. parsing as JSON.
  1057. Returns:
  1058. Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
  1059. decoded JSON body and a dict of the response headers.
  1060. Raises:
  1061. HttpResponseException: If we get an HTTP response code >= 300
  1062. (except 429).
  1063. NotRetryingDestination: If we are not yet ready to retry this
  1064. server.
  1065. FederationDeniedError: If this destination is not on our
  1066. federation whitelist
  1067. RequestSendFailed: If there were problems connecting to the
  1068. remote, due to e.g. DNS failures, connection timeouts etc.
  1069. """
  1070. request = MatrixFederationRequest(
  1071. method="GET", destination=destination, path=path, query=args
  1072. )
  1073. start_ms = self.clock.time_msec()
  1074. response = await self._send_request_with_optional_trailing_slash(
  1075. request,
  1076. try_trailing_slash_on_400,
  1077. backoff_on_404=False,
  1078. ignore_backoff=ignore_backoff,
  1079. retry_on_dns_fail=retry_on_dns_fail,
  1080. timeout=timeout,
  1081. )
  1082. headers = dict(response.headers.getAllRawHeaders())
  1083. if timeout is not None:
  1084. _sec_timeout = timeout / 1000
  1085. else:
  1086. _sec_timeout = self.default_timeout_seconds
  1087. if parser is None:
  1088. parser = cast(ByteParser[T], JsonParser())
  1089. body = await _handle_response(
  1090. self.reactor,
  1091. _sec_timeout,
  1092. request,
  1093. response,
  1094. start_ms,
  1095. parser=parser,
  1096. )
  1097. return body, headers
  1098. async def delete_json(
  1099. self,
  1100. destination: str,
  1101. path: str,
  1102. long_retries: bool = False,
  1103. timeout: Optional[int] = None,
  1104. ignore_backoff: bool = False,
  1105. args: Optional[QueryParams] = None,
  1106. ) -> JsonDict:
  1107. """Send a DELETE request to the remote expecting some json response
  1108. Args:
  1109. destination: The remote server to send the HTTP request to.
  1110. path: The HTTP path.
  1111. long_retries: whether to use the long retry algorithm. See
  1112. docs on _send_request for details.
  1113. timeout: number of milliseconds to wait for the response.
  1114. self._default_timeout (60s) by default.
  1115. Note that we may make several attempts to send the request; this
  1116. timeout applies to the time spent waiting for response headers for
  1117. *each* attempt (including connection time) as well as the time spent
  1118. reading the response body after a 200 response.
  1119. ignore_backoff: true to ignore the historical backoff data and
  1120. try the request anyway.
  1121. args: query params
  1122. Returns:
  1123. Succeeds when we get a 2xx HTTP response. The
  1124. result will be the decoded JSON body.
  1125. Raises:
  1126. HttpResponseException: If we get an HTTP response code >= 300
  1127. (except 429).
  1128. NotRetryingDestination: If we are not yet ready to retry this
  1129. server.
  1130. FederationDeniedError: If this destination is not on our
  1131. federation whitelist
  1132. RequestSendFailed: If there were problems connecting to the
  1133. remote, due to e.g. DNS failures, connection timeouts etc.
  1134. """
  1135. request = MatrixFederationRequest(
  1136. method="DELETE", destination=destination, path=path, query=args
  1137. )
  1138. start_ms = self.clock.time_msec()
  1139. response = await self._send_request(
  1140. request,
  1141. long_retries=long_retries,
  1142. timeout=timeout,
  1143. ignore_backoff=ignore_backoff,
  1144. )
  1145. if timeout is not None:
  1146. _sec_timeout = timeout / 1000
  1147. else:
  1148. _sec_timeout = self.default_timeout_seconds
  1149. body = await _handle_response(
  1150. self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
  1151. )
  1152. return body
  1153. async def get_file(
  1154. self,
  1155. destination: str,
  1156. path: str,
  1157. output_stream: BinaryIO,
  1158. args: Optional[QueryParams] = None,
  1159. retry_on_dns_fail: bool = True,
  1160. max_size: Optional[int] = None,
  1161. ignore_backoff: bool = False,
  1162. ) -> Tuple[int, Dict[bytes, List[bytes]]]:
  1163. """GETs a file from a given homeserver
  1164. Args:
  1165. destination: The remote server to send the HTTP request to.
  1166. path: The HTTP path to GET.
  1167. output_stream: File to write the response body to.
  1168. args: Optional dictionary used to create the query string.
  1169. ignore_backoff: true to ignore the historical backoff data
  1170. and try the request anyway.
  1171. Returns:
  1172. Resolves with an (int,dict) tuple of
  1173. the file length and a dict of the response headers.
  1174. Raises:
  1175. HttpResponseException: If we get an HTTP response code >= 300
  1176. (except 429).
  1177. NotRetryingDestination: If we are not yet ready to retry this
  1178. server.
  1179. FederationDeniedError: If this destination is not on our
  1180. federation whitelist
  1181. RequestSendFailed: If there were problems connecting to the
  1182. remote, due to e.g. DNS failures, connection timeouts etc.
  1183. """
  1184. request = MatrixFederationRequest(
  1185. method="GET", destination=destination, path=path, query=args
  1186. )
  1187. response = await self._send_request(
  1188. request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
  1189. )
  1190. headers = dict(response.headers.getAllRawHeaders())
  1191. try:
  1192. d = read_body_with_max_size(response, output_stream, max_size)
  1193. d.addTimeout(self.default_timeout_seconds, self.reactor)
  1194. length = await make_deferred_yieldable(d)
  1195. except BodyExceededMaxSize:
  1196. msg = "Requested file is too large > %r bytes" % (max_size,)
  1197. logger.warning(
  1198. "{%s} [%s] %s",
  1199. request.txn_id,
  1200. request.destination,
  1201. msg,
  1202. )
  1203. raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
  1204. except defer.TimeoutError as e:
  1205. logger.warning(
  1206. "{%s} [%s] Timed out reading response - %s %s",
  1207. request.txn_id,
  1208. request.destination,
  1209. request.method,
  1210. request.uri.decode("ascii"),
  1211. )
  1212. raise RequestSendFailed(e, can_retry=True) from e
  1213. except ResponseFailed as e:
  1214. logger.warning(
  1215. "{%s} [%s] Failed to read response - %s %s",
  1216. request.txn_id,
  1217. request.destination,
  1218. request.method,
  1219. request.uri.decode("ascii"),
  1220. )
  1221. raise RequestSendFailed(e, can_retry=True) from e
  1222. except Exception as e:
  1223. logger.warning(
  1224. "{%s} [%s] Error reading response: %s",
  1225. request.txn_id,
  1226. request.destination,
  1227. e,
  1228. )
  1229. raise
  1230. logger.info(
  1231. "{%s} [%s] Completed: %d %s [%d bytes] %s %s",
  1232. request.txn_id,
  1233. request.destination,
  1234. response.code,
  1235. response.phrase.decode("ascii", errors="replace"),
  1236. length,
  1237. request.method,
  1238. request.uri.decode("ascii"),
  1239. )
  1240. return length, headers
  1241. def _flatten_response_never_received(e: BaseException) -> str:
  1242. if hasattr(e, "reasons"):
  1243. reasons = ", ".join(
  1244. _flatten_response_never_received(f.value) for f in e.reasons
  1245. )
  1246. return "%s:[%s]" % (type(e).__name__, reasons)
  1247. else:
  1248. return repr(e)
  1249. def check_content_type_is(headers: Headers, expected_content_type: str) -> None:
  1250. """
  1251. Check that a set of HTTP headers have a Content-Type header, and that it
  1252. is the expected value..
  1253. Args:
  1254. headers: headers to check
  1255. Raises:
  1256. RequestSendFailed: if the Content-Type header is missing or doesn't match
  1257. """
  1258. content_type_headers = headers.getRawHeaders(b"Content-Type")
  1259. if content_type_headers is None:
  1260. raise RequestSendFailed(
  1261. RuntimeError("No Content-Type header received from remote server"),
  1262. can_retry=False,
  1263. )
  1264. c_type = content_type_headers[0].decode("ascii") # only the first header
  1265. val, options = cgi.parse_header(c_type)
  1266. if val != expected_content_type:
  1267. raise RequestSendFailed(
  1268. RuntimeError(
  1269. f"Remote server sent Content-Type header of '{c_type}', not '{expected_content_type}'",
  1270. ),
  1271. can_retry=False,
  1272. )