Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 
 
 
 

1548 строки
56 KiB

  1. # Copyright 2014-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import abc
  15. import cgi
  16. import codecs
  17. import logging
  18. import random
  19. import sys
  20. import urllib.parse
  21. from http import HTTPStatus
  22. from io import BytesIO, StringIO
  23. from typing import (
  24. TYPE_CHECKING,
  25. Any,
  26. BinaryIO,
  27. Callable,
  28. Dict,
  29. Generic,
  30. List,
  31. Optional,
  32. TextIO,
  33. Tuple,
  34. TypeVar,
  35. Union,
  36. cast,
  37. overload,
  38. )
  39. import attr
  40. import treq
  41. from canonicaljson import encode_canonical_json
  42. from prometheus_client import Counter
  43. from signedjson.sign import sign_json
  44. from typing_extensions import Literal
  45. from twisted.internet import defer
  46. from twisted.internet.error import DNSLookupError
  47. from twisted.internet.interfaces import IReactorTime
  48. from twisted.internet.task import Cooperator
  49. from twisted.web.client import ResponseFailed
  50. from twisted.web.http_headers import Headers
  51. from twisted.web.iweb import IAgent, IBodyProducer, IResponse
  52. import synapse.metrics
  53. import synapse.util.retryutils
  54. from synapse.api.errors import (
  55. Codes,
  56. FederationDeniedError,
  57. HttpResponseException,
  58. RequestSendFailed,
  59. SynapseError,
  60. )
  61. from synapse.crypto.context_factory import FederationPolicyForHTTPS
  62. from synapse.http import QuieterFileBodyProducer
  63. from synapse.http.client import (
  64. BlocklistingAgentWrapper,
  65. BodyExceededMaxSize,
  66. ByteWriteable,
  67. _make_scheduler,
  68. encode_query_args,
  69. read_body_with_max_size,
  70. )
  71. from synapse.http.connectproxyclient import BearerProxyCredentials
  72. from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
  73. from synapse.http.proxyagent import ProxyAgent
  74. from synapse.http.types import QueryParams
  75. from synapse.logging import opentracing
  76. from synapse.logging.context import make_deferred_yieldable, run_in_background
  77. from synapse.logging.opentracing import set_tag, start_active_span, tags
  78. from synapse.types import JsonDict
  79. from synapse.util import json_decoder
  80. from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
  81. from synapse.util.metrics import Measure
  82. from synapse.util.stringutils import parse_and_validate_server_name
  83. if TYPE_CHECKING:
  84. from synapse.server import HomeServer
  85. logger = logging.getLogger(__name__)
  86. outgoing_requests_counter = Counter(
  87. "synapse_http_matrixfederationclient_requests", "", ["method"]
  88. )
  89. incoming_responses_counter = Counter(
  90. "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
  91. )
  92. MAXINT = sys.maxsize
  93. _next_id = 1
  94. T = TypeVar("T")
  95. class ByteParser(ByteWriteable, Generic[T], abc.ABC):
  96. """A `ByteWriteable` that has an additional `finish` function that returns
  97. the parsed data.
  98. """
  99. CONTENT_TYPE: str = abc.abstractproperty() # type: ignore
  100. """The expected content type of the response, e.g. `application/json`. If
  101. the content type doesn't match we fail the request.
  102. """
  103. # a federation response can be rather large (eg a big state_ids is 50M or so), so we
  104. # need a generous limit here.
  105. MAX_RESPONSE_SIZE: int = 100 * 1024 * 1024
  106. """The largest response this parser will accept."""
  107. @abc.abstractmethod
  108. def finish(self) -> T:
  109. """Called when response has finished streaming and the parser should
  110. return the final result (or error).
  111. """
  112. @attr.s(slots=True, frozen=True, auto_attribs=True)
  113. class MatrixFederationRequest:
  114. method: str
  115. """HTTP method
  116. """
  117. path: str
  118. """HTTP path
  119. """
  120. destination: str
  121. """The remote server to send the HTTP request to.
  122. """
  123. json: Optional[JsonDict] = None
  124. """JSON to send in the body.
  125. """
  126. json_callback: Optional[Callable[[], JsonDict]] = None
  127. """A callback to generate the JSON.
  128. """
  129. query: Optional[QueryParams] = None
  130. """Query arguments.
  131. """
  132. txn_id: str = attr.ib(init=False)
  133. """Unique ID for this request (for logging), this is autogenerated.
  134. """
  135. uri: bytes = b""
  136. """The URI of this request, usually generated from the above information.
  137. """
  138. _generate_uri: bool = True
  139. """True to automatically generate the uri field based on the above information.
  140. Set to False if manually configuring the URI.
  141. """
  142. def __attrs_post_init__(self) -> None:
  143. global _next_id
  144. txn_id = "%s-O-%s" % (self.method, _next_id)
  145. _next_id = (_next_id + 1) % (MAXINT - 1)
  146. object.__setattr__(self, "txn_id", txn_id)
  147. if self._generate_uri:
  148. destination_bytes = self.destination.encode("ascii")
  149. path_bytes = self.path.encode("ascii")
  150. query_bytes = encode_query_args(self.query)
  151. # The object is frozen so we can pre-compute this.
  152. uri = urllib.parse.urlunparse(
  153. (
  154. b"matrix-federation",
  155. destination_bytes,
  156. path_bytes,
  157. None,
  158. query_bytes,
  159. b"",
  160. )
  161. )
  162. object.__setattr__(self, "uri", uri)
  163. def get_json(self) -> Optional[JsonDict]:
  164. if self.json_callback:
  165. return self.json_callback()
  166. return self.json
  167. class _BaseJsonParser(ByteParser[T]):
  168. """A parser that buffers the response and tries to parse it as JSON."""
  169. CONTENT_TYPE = "application/json"
  170. def __init__(
  171. self, validator: Optional[Callable[[Optional[object]], bool]] = None
  172. ) -> None:
  173. """
  174. Args:
  175. validator: A callable which takes the parsed JSON value and returns
  176. true if the value is valid.
  177. """
  178. self._buffer = StringIO()
  179. self._binary_wrapper = BinaryIOWrapper(self._buffer)
  180. self._validator = validator
  181. def write(self, data: bytes) -> int:
  182. return self._binary_wrapper.write(data)
  183. def finish(self) -> T:
  184. result = json_decoder.decode(self._buffer.getvalue())
  185. if self._validator is not None and not self._validator(result):
  186. raise ValueError(
  187. f"Received incorrect JSON value: {result.__class__.__name__}"
  188. )
  189. return result
  190. class JsonParser(_BaseJsonParser[JsonDict]):
  191. """A parser that buffers the response and tries to parse it as a JSON object."""
  192. def __init__(self) -> None:
  193. super().__init__(self._validate)
  194. @staticmethod
  195. def _validate(v: Any) -> bool:
  196. return isinstance(v, dict)
  197. class LegacyJsonSendParser(_BaseJsonParser[Tuple[int, JsonDict]]):
  198. """Ensure the legacy responses of /send_join & /send_leave are correct."""
  199. def __init__(self) -> None:
  200. super().__init__(self._validate)
  201. @staticmethod
  202. def _validate(v: Any) -> bool:
  203. # Match [integer, JSON dict]
  204. return (
  205. isinstance(v, list)
  206. and len(v) == 2
  207. and type(v[0]) == int # noqa: E721
  208. and isinstance(v[1], dict)
  209. )
  210. async def _handle_response(
  211. reactor: IReactorTime,
  212. timeout_sec: float,
  213. request: MatrixFederationRequest,
  214. response: IResponse,
  215. start_ms: int,
  216. parser: ByteParser[T],
  217. ) -> T:
  218. """
  219. Reads the body of a response with a timeout and sends it to a parser
  220. Args:
  221. reactor: twisted reactor, for the timeout
  222. timeout_sec: number of seconds to wait for response to complete
  223. request: the request that triggered the response
  224. response: response to the request
  225. start_ms: Timestamp when request was made
  226. parser: The parser for the response
  227. Returns:
  228. The parsed response
  229. """
  230. max_response_size = parser.MAX_RESPONSE_SIZE
  231. finished = False
  232. try:
  233. check_content_type_is(response.headers, parser.CONTENT_TYPE)
  234. d = read_body_with_max_size(response, parser, max_response_size)
  235. d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
  236. length = await make_deferred_yieldable(d)
  237. finished = True
  238. value = parser.finish()
  239. except BodyExceededMaxSize as e:
  240. # The response was too big.
  241. logger.warning(
  242. "{%s} [%s] JSON response exceeded max size %i - %s %s",
  243. request.txn_id,
  244. request.destination,
  245. max_response_size,
  246. request.method,
  247. request.uri.decode("ascii"),
  248. )
  249. raise RequestSendFailed(e, can_retry=False) from e
  250. except ValueError as e:
  251. # The content was invalid.
  252. logger.warning(
  253. "{%s} [%s] Failed to parse response - %s %s",
  254. request.txn_id,
  255. request.destination,
  256. request.method,
  257. request.uri.decode("ascii"),
  258. )
  259. raise RequestSendFailed(e, can_retry=False) from e
  260. except defer.TimeoutError as e:
  261. logger.warning(
  262. "{%s} [%s] Timed out reading response - %s %s",
  263. request.txn_id,
  264. request.destination,
  265. request.method,
  266. request.uri.decode("ascii"),
  267. )
  268. raise RequestSendFailed(e, can_retry=True) from e
  269. except ResponseFailed as e:
  270. logger.warning(
  271. "{%s} [%s] Failed to read response - %s %s",
  272. request.txn_id,
  273. request.destination,
  274. request.method,
  275. request.uri.decode("ascii"),
  276. )
  277. raise RequestSendFailed(e, can_retry=True) from e
  278. except Exception as e:
  279. logger.warning(
  280. "{%s} [%s] Error reading response %s %s: %s",
  281. request.txn_id,
  282. request.destination,
  283. request.method,
  284. request.uri.decode("ascii"),
  285. e,
  286. )
  287. raise
  288. finally:
  289. if not finished:
  290. # There was an exception and we didn't `finish()` the parse.
  291. # Let the parser know that it can free up any resources.
  292. try:
  293. parser.finish()
  294. except Exception:
  295. # Ignore any additional exceptions.
  296. pass
  297. time_taken_secs = reactor.seconds() - start_ms / 1000
  298. logger.info(
  299. "{%s} [%s] Completed request: %d %s in %.2f secs, got %d bytes - %s %s",
  300. request.txn_id,
  301. request.destination,
  302. response.code,
  303. response.phrase.decode("ascii", errors="replace"),
  304. time_taken_secs,
  305. length,
  306. request.method,
  307. request.uri.decode("ascii"),
  308. )
  309. return value
  310. class BinaryIOWrapper:
  311. """A wrapper for a TextIO which converts from bytes on the fly."""
  312. def __init__(self, file: TextIO, encoding: str = "utf-8", errors: str = "strict"):
  313. self.decoder = codecs.getincrementaldecoder(encoding)(errors)
  314. self.file = file
  315. def write(self, b: Union[bytes, bytearray]) -> int:
  316. self.file.write(self.decoder.decode(b))
  317. return len(b)
  318. class MatrixFederationHttpClient:
  319. """HTTP client used to talk to other homeservers over the federation
  320. protocol. Send client certificates and signs requests.
  321. Attributes:
  322. agent (twisted.web.client.Agent): The twisted Agent used to send the
  323. requests.
  324. """
  325. def __init__(
  326. self,
  327. hs: "HomeServer",
  328. tls_client_options_factory: Optional[FederationPolicyForHTTPS],
  329. ):
  330. self.hs = hs
  331. self.signing_key = hs.signing_key
  332. self.server_name = hs.hostname
  333. self.reactor = hs.get_reactor()
  334. user_agent = hs.version_string
  335. if hs.config.server.user_agent_suffix:
  336. user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
  337. outbound_federation_restricted_to = (
  338. hs.config.worker.outbound_federation_restricted_to
  339. )
  340. if hs.get_instance_name() in outbound_federation_restricted_to:
  341. # Talk to federation directly
  342. federation_agent: IAgent = MatrixFederationAgent(
  343. self.reactor,
  344. tls_client_options_factory,
  345. user_agent.encode("ascii"),
  346. hs.config.server.federation_ip_range_allowlist,
  347. hs.config.server.federation_ip_range_blocklist,
  348. )
  349. else:
  350. proxy_authorization_secret = hs.config.worker.worker_replication_secret
  351. assert (
  352. proxy_authorization_secret is not None
  353. ), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)"
  354. federation_proxy_credentials = BearerProxyCredentials(
  355. proxy_authorization_secret.encode("ascii")
  356. )
  357. # We need to talk to federation via the proxy via one of the configured
  358. # locations
  359. federation_proxy_locations = outbound_federation_restricted_to.locations
  360. federation_agent = ProxyAgent(
  361. self.reactor,
  362. self.reactor,
  363. tls_client_options_factory,
  364. federation_proxy_locations=federation_proxy_locations,
  365. federation_proxy_credentials=federation_proxy_credentials,
  366. )
  367. # Use a BlocklistingAgentWrapper to prevent circumventing the IP
  368. # blocking via IP literals in server names
  369. self.agent: IAgent = BlocklistingAgentWrapper(
  370. federation_agent,
  371. ip_blocklist=hs.config.server.federation_ip_range_blocklist,
  372. )
  373. self.clock = hs.get_clock()
  374. self._store = hs.get_datastores().main
  375. self.version_string_bytes = hs.version_string.encode("ascii")
  376. self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
  377. self.max_long_retry_delay_seconds = (
  378. hs.config.federation.max_long_retry_delay_ms / 1000
  379. )
  380. self.max_short_retry_delay_seconds = (
  381. hs.config.federation.max_short_retry_delay_ms / 1000
  382. )
  383. self.max_long_retries = hs.config.federation.max_long_retries
  384. self.max_short_retries = hs.config.federation.max_short_retries
  385. self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor))
  386. self._sleeper = AwakenableSleeper(self.reactor)
  387. def wake_destination(self, destination: str) -> None:
  388. """Called when the remote server may have come back online."""
  389. self._sleeper.wake(destination)
  390. async def _send_request_with_optional_trailing_slash(
  391. self,
  392. request: MatrixFederationRequest,
  393. try_trailing_slash_on_400: bool = False,
  394. **send_request_args: Any,
  395. ) -> IResponse:
  396. """Wrapper for _send_request which can optionally retry the request
  397. upon receiving a combination of a 400 HTTP response code and a
  398. 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
  399. due to https://github.com/matrix-org/synapse/issues/3622.
  400. Args:
  401. request: details of request to be sent
  402. try_trailing_slash_on_400: Whether on receiving a 400
  403. 'M_UNRECOGNIZED' from the server to retry the request with a
  404. trailing slash appended to the request path.
  405. send_request_args: A dictionary of arguments to pass to `_send_request()`.
  406. Raises:
  407. HttpResponseException: If we get an HTTP response code >= 300
  408. (except 429).
  409. Returns:
  410. Parsed JSON response body.
  411. """
  412. try:
  413. response = await self._send_request(request, **send_request_args)
  414. except HttpResponseException as e:
  415. # Received an HTTP error > 300. Check if it meets the requirements
  416. # to retry with a trailing slash
  417. if not try_trailing_slash_on_400:
  418. raise
  419. if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
  420. raise
  421. # Retry with a trailing slash if we received a 400 with
  422. # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
  423. # trailing slash on Synapse <= v0.99.3.
  424. logger.info("Retrying request with trailing slash")
  425. # Request is frozen so we create a new instance
  426. request = attr.evolve(request, path=request.path + "/")
  427. response = await self._send_request(request, **send_request_args)
  428. return response
  429. async def _send_request(
  430. self,
  431. request: MatrixFederationRequest,
  432. retry_on_dns_fail: bool = True,
  433. timeout: Optional[int] = None,
  434. long_retries: bool = False,
  435. ignore_backoff: bool = False,
  436. backoff_on_404: bool = False,
  437. backoff_on_all_error_codes: bool = False,
  438. follow_redirects: bool = False,
  439. ) -> IResponse:
  440. """
  441. Sends a request to the given server.
  442. Args:
  443. request: details of request to be sent
  444. retry_on_dns_fail: true if the request should be retried on DNS failures
  445. timeout: number of milliseconds to wait for the response headers
  446. (including connecting to the server), *for each attempt*.
  447. 60s by default.
  448. long_retries: whether to use the long retry algorithm.
  449. The regular retry algorithm makes 4 attempts, with intervals
  450. [0.5s, 1s, 2s].
  451. The long retry algorithm makes 11 attempts, with intervals
  452. [4s, 16s, 60s, 60s, ...]
  453. Both algorithms add -20%/+40% jitter to the retry intervals.
  454. Note that the above intervals are *in addition* to the time spent
  455. waiting for the request to complete (up to `timeout` ms).
  456. NB: the long retry algorithm takes over 20 minutes to complete, with a
  457. default timeout of 60s! It's best not to use the `long_retries` option
  458. for something that is blocking a client so we don't make them wait for
  459. aaaaages, whereas some things like sending transactions (server to
  460. server) we can be a lot more lenient but its very fuzzy / hand-wavey.
  461. In the future, we could be more intelligent about doing this sort of
  462. thing by looking at things with the bigger picture in mind,
  463. https://github.com/matrix-org/synapse/issues/8917
  464. ignore_backoff: true to ignore the historical backoff data
  465. and try the request anyway.
  466. backoff_on_404: Back off if we get a 404
  467. backoff_on_all_error_codes: Back off if we get any error response
  468. follow_redirects: True to follow the Location header of 307/308 redirect
  469. responses. This does not recurse.
  470. Returns:
  471. Resolves with the HTTP response object on success.
  472. Raises:
  473. HttpResponseException: If we get an HTTP response code >= 300
  474. (except 429).
  475. NotRetryingDestination: If we are not yet ready to retry this
  476. server.
  477. FederationDeniedError: If this destination is not on our
  478. federation whitelist
  479. RequestSendFailed: If there were problems connecting to the
  480. remote, due to e.g. DNS failures, connection timeouts etc.
  481. """
  482. # Validate server name and log if it is an invalid destination, this is
  483. # partially to help track down code paths where we haven't validated before here
  484. try:
  485. parse_and_validate_server_name(request.destination)
  486. except ValueError:
  487. logger.exception(f"Invalid destination: {request.destination}.")
  488. raise FederationDeniedError(request.destination)
  489. if timeout is not None:
  490. _sec_timeout = timeout / 1000
  491. else:
  492. _sec_timeout = self.default_timeout_seconds
  493. if (
  494. self.hs.config.federation.federation_domain_whitelist is not None
  495. and request.destination
  496. not in self.hs.config.federation.federation_domain_whitelist
  497. ):
  498. raise FederationDeniedError(request.destination)
  499. limiter = await synapse.util.retryutils.get_retry_limiter(
  500. request.destination,
  501. self.clock,
  502. self._store,
  503. backoff_on_404=backoff_on_404,
  504. ignore_backoff=ignore_backoff,
  505. notifier=self.hs.get_notifier(),
  506. replication_client=self.hs.get_replication_command_handler(),
  507. backoff_on_all_error_codes=backoff_on_all_error_codes,
  508. )
  509. method_bytes = request.method.encode("ascii")
  510. destination_bytes = request.destination.encode("ascii")
  511. path_bytes = request.path.encode("ascii")
  512. query_bytes = encode_query_args(request.query)
  513. scope = start_active_span(
  514. "outgoing-federation-request",
  515. tags={
  516. tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
  517. tags.PEER_ADDRESS: request.destination,
  518. tags.HTTP_METHOD: request.method,
  519. tags.HTTP_URL: request.path,
  520. },
  521. finish_on_close=True,
  522. )
  523. # Inject the span into the headers
  524. headers_dict: Dict[bytes, List[bytes]] = {}
  525. opentracing.inject_header_dict(headers_dict, request.destination)
  526. headers_dict[b"User-Agent"] = [self.version_string_bytes]
  527. with limiter, scope:
  528. # XXX: Would be much nicer to retry only at the transaction-layer
  529. # (once we have reliable transactions in place)
  530. if long_retries:
  531. retries_left = self.max_long_retries
  532. else:
  533. retries_left = self.max_short_retries
  534. url_bytes = request.uri
  535. url_str = url_bytes.decode("ascii")
  536. url_to_sign_bytes = urllib.parse.urlunparse(
  537. (b"", b"", path_bytes, None, query_bytes, b"")
  538. )
  539. while True:
  540. try:
  541. json = request.get_json()
  542. if json:
  543. headers_dict[b"Content-Type"] = [b"application/json"]
  544. auth_headers = self.build_auth_headers(
  545. destination_bytes, method_bytes, url_to_sign_bytes, json
  546. )
  547. data = encode_canonical_json(json)
  548. producer: Optional[IBodyProducer] = QuieterFileBodyProducer(
  549. BytesIO(data), cooperator=self._cooperator
  550. )
  551. else:
  552. producer = None
  553. auth_headers = self.build_auth_headers(
  554. destination_bytes, method_bytes, url_to_sign_bytes
  555. )
  556. headers_dict[b"Authorization"] = auth_headers
  557. logger.debug(
  558. "{%s} [%s] Sending request: %s %s; timeout %fs",
  559. request.txn_id,
  560. request.destination,
  561. request.method,
  562. url_str,
  563. _sec_timeout,
  564. )
  565. outgoing_requests_counter.labels(request.method).inc()
  566. try:
  567. with Measure(self.clock, "outbound_request"):
  568. # we don't want all the fancy cookie and redirect handling
  569. # that treq.request gives: just use the raw Agent.
  570. # To preserve the logging context, the timeout is treated
  571. # in a similar way to `defer.gatherResults`:
  572. # * Each logging context-preserving fork is wrapped in
  573. # `run_in_background`. In this case there is only one,
  574. # since the timeout fork is not logging-context aware.
  575. # * The `Deferred` that joins the forks back together is
  576. # wrapped in `make_deferred_yieldable` to restore the
  577. # logging context regardless of the path taken.
  578. request_deferred = run_in_background(
  579. self.agent.request,
  580. method_bytes,
  581. url_bytes,
  582. headers=Headers(headers_dict),
  583. bodyProducer=producer,
  584. )
  585. request_deferred = timeout_deferred(
  586. request_deferred,
  587. timeout=_sec_timeout,
  588. reactor=self.reactor,
  589. )
  590. response = await make_deferred_yieldable(request_deferred)
  591. except DNSLookupError as e:
  592. raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
  593. except Exception as e:
  594. raise RequestSendFailed(e, can_retry=True) from e
  595. incoming_responses_counter.labels(
  596. request.method, response.code
  597. ).inc()
  598. set_tag(tags.HTTP_STATUS_CODE, response.code)
  599. response_phrase = response.phrase.decode("ascii", errors="replace")
  600. if 200 <= response.code < 300:
  601. logger.debug(
  602. "{%s} [%s] Got response headers: %d %s",
  603. request.txn_id,
  604. request.destination,
  605. response.code,
  606. response_phrase,
  607. )
  608. elif (
  609. response.code in (307, 308)
  610. and follow_redirects
  611. and response.headers.hasHeader("Location")
  612. ):
  613. # The Location header *might* be relative so resolve it.
  614. location = response.headers.getRawHeaders(b"Location")[0]
  615. new_uri = urllib.parse.urljoin(request.uri, location)
  616. return await self._send_request(
  617. attr.evolve(request, uri=new_uri, generate_uri=False),
  618. retry_on_dns_fail,
  619. timeout,
  620. long_retries,
  621. ignore_backoff,
  622. backoff_on_404,
  623. backoff_on_all_error_codes,
  624. # Do not continue following redirects.
  625. follow_redirects=False,
  626. )
  627. else:
  628. logger.info(
  629. "{%s} [%s] Got response headers: %d %s",
  630. request.txn_id,
  631. request.destination,
  632. response.code,
  633. response_phrase,
  634. )
  635. # :'(
  636. # Update transactions table?
  637. d = treq.content(response)
  638. d = timeout_deferred(
  639. d, timeout=_sec_timeout, reactor=self.reactor
  640. )
  641. try:
  642. body = await make_deferred_yieldable(d)
  643. except Exception as e:
  644. # Eh, we're already going to raise an exception so lets
  645. # ignore if this fails.
  646. logger.warning(
  647. "{%s} [%s] Failed to get error response: %s %s: %s",
  648. request.txn_id,
  649. request.destination,
  650. request.method,
  651. url_str,
  652. _flatten_response_never_received(e),
  653. )
  654. body = None
  655. exc = HttpResponseException(
  656. response.code, response_phrase, body
  657. )
  658. # Retry if the error is a 5xx or a 429 (Too Many
  659. # Requests), otherwise just raise a standard
  660. # `HttpResponseException`
  661. if 500 <= response.code < 600 or response.code == 429:
  662. raise RequestSendFailed(exc, can_retry=True) from exc
  663. else:
  664. raise exc
  665. break
  666. except RequestSendFailed as e:
  667. logger.info(
  668. "{%s} [%s] Request failed: %s %s: %s",
  669. request.txn_id,
  670. request.destination,
  671. request.method,
  672. url_str,
  673. _flatten_response_never_received(e.inner_exception),
  674. )
  675. if not e.can_retry:
  676. raise
  677. if retries_left and not timeout:
  678. if long_retries:
  679. delay_seconds = 4 ** (
  680. self.max_long_retries + 1 - retries_left
  681. )
  682. delay_seconds = min(
  683. delay_seconds, self.max_long_retry_delay_seconds
  684. )
  685. delay_seconds *= random.uniform(0.8, 1.4)
  686. else:
  687. delay_seconds = 0.5 * 2 ** (
  688. self.max_short_retries - retries_left
  689. )
  690. delay_seconds = min(
  691. delay_seconds, self.max_short_retry_delay_seconds
  692. )
  693. delay_seconds *= random.uniform(0.8, 1.4)
  694. logger.debug(
  695. "{%s} [%s] Waiting %ss before re-sending...",
  696. request.txn_id,
  697. request.destination,
  698. delay_seconds,
  699. )
  700. # Sleep for the calculated delay, or wake up immediately
  701. # if we get notified that the server is back up.
  702. await self._sleeper.sleep(
  703. request.destination, delay_seconds * 1000
  704. )
  705. retries_left -= 1
  706. else:
  707. raise
  708. except Exception as e:
  709. logger.warning(
  710. "{%s} [%s] Request failed: %s %s: %s",
  711. request.txn_id,
  712. request.destination,
  713. request.method,
  714. url_str,
  715. _flatten_response_never_received(e),
  716. )
  717. raise
  718. return response
  719. def build_auth_headers(
  720. self,
  721. destination: Optional[bytes],
  722. method: bytes,
  723. url_bytes: bytes,
  724. content: Optional[JsonDict] = None,
  725. destination_is: Optional[bytes] = None,
  726. ) -> List[bytes]:
  727. """
  728. Builds the Authorization headers for a federation request
  729. Args:
  730. destination: The destination homeserver of the request.
  731. May be None if the destination is an identity server, in which case
  732. destination_is must be non-None.
  733. method: The HTTP method of the request
  734. url_bytes: The URI path of the request
  735. content: The body of the request
  736. destination_is: As 'destination', but if the destination is an
  737. identity server
  738. Returns:
  739. A list of headers to be added as "Authorization:" headers
  740. """
  741. if not destination and not destination_is:
  742. raise ValueError(
  743. "At least one of the arguments destination and destination_is "
  744. "must be a nonempty bytestring."
  745. )
  746. request: JsonDict = {
  747. "method": method.decode("ascii"),
  748. "uri": url_bytes.decode("ascii"),
  749. "origin": self.server_name,
  750. }
  751. if destination is not None:
  752. request["destination"] = destination.decode("ascii")
  753. if destination_is is not None:
  754. request["destination_is"] = destination_is.decode("ascii")
  755. if content is not None:
  756. request["content"] = content
  757. request = sign_json(request, self.server_name, self.signing_key)
  758. auth_headers = []
  759. for key, sig in request["signatures"][self.server_name].items():
  760. auth_headers.append(
  761. (
  762. 'X-Matrix origin="%s",key="%s",sig="%s",destination="%s"'
  763. % (
  764. self.server_name,
  765. key,
  766. sig,
  767. request.get("destination") or request["destination_is"],
  768. )
  769. ).encode("ascii")
  770. )
  771. return auth_headers
  772. @overload
  773. async def put_json(
  774. self,
  775. destination: str,
  776. path: str,
  777. args: Optional[QueryParams] = None,
  778. data: Optional[JsonDict] = None,
  779. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  780. long_retries: bool = False,
  781. timeout: Optional[int] = None,
  782. ignore_backoff: bool = False,
  783. backoff_on_404: bool = False,
  784. try_trailing_slash_on_400: bool = False,
  785. parser: Literal[None] = None,
  786. backoff_on_all_error_codes: bool = False,
  787. ) -> JsonDict:
  788. ...
  789. @overload
  790. async def put_json(
  791. self,
  792. destination: str,
  793. path: str,
  794. args: Optional[QueryParams] = None,
  795. data: Optional[JsonDict] = None,
  796. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  797. long_retries: bool = False,
  798. timeout: Optional[int] = None,
  799. ignore_backoff: bool = False,
  800. backoff_on_404: bool = False,
  801. try_trailing_slash_on_400: bool = False,
  802. parser: Optional[ByteParser[T]] = None,
  803. backoff_on_all_error_codes: bool = False,
  804. ) -> T:
  805. ...
  806. async def put_json(
  807. self,
  808. destination: str,
  809. path: str,
  810. args: Optional[QueryParams] = None,
  811. data: Optional[JsonDict] = None,
  812. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  813. long_retries: bool = False,
  814. timeout: Optional[int] = None,
  815. ignore_backoff: bool = False,
  816. backoff_on_404: bool = False,
  817. try_trailing_slash_on_400: bool = False,
  818. parser: Optional[ByteParser[T]] = None,
  819. backoff_on_all_error_codes: bool = False,
  820. ) -> Union[JsonDict, T]:
  821. """Sends the specified json data using PUT
  822. Args:
  823. destination: The remote server to send the HTTP request to.
  824. path: The HTTP path.
  825. args: query params
  826. data: A dict containing the data that will be used as
  827. the request body. This will be encoded as JSON.
  828. json_data_callback: A callable returning the dict to
  829. use as the request body.
  830. long_retries: whether to use the long retry algorithm. See
  831. docs on _send_request for details.
  832. timeout: number of milliseconds to wait for the response.
  833. self._default_timeout (60s) by default.
  834. Note that we may make several attempts to send the request; this
  835. timeout applies to the time spent waiting for response headers for
  836. *each* attempt (including connection time) as well as the time spent
  837. reading the response body after a 200 response.
  838. ignore_backoff: true to ignore the historical backoff data
  839. and try the request anyway.
  840. backoff_on_404: True if we should count a 404 response as
  841. a failure of the server (and should therefore back off future
  842. requests).
  843. try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
  844. response we should try appending a trailing slash to the end
  845. of the request. Workaround for https://github.com/matrix-org/synapse/issues/3622
  846. in Synapse <= v0.99.3. This will be attempted before backing off if
  847. backing off has been enabled.
  848. parser: The parser to use to decode the response. Defaults to
  849. parsing as JSON.
  850. backoff_on_all_error_codes: Back off if we get any error response
  851. Returns:
  852. Succeeds when we get a 2xx HTTP response. The
  853. result will be the decoded JSON body.
  854. Raises:
  855. HttpResponseException: If we get an HTTP response code >= 300
  856. (except 429).
  857. NotRetryingDestination: If we are not yet ready to retry this
  858. server.
  859. FederationDeniedError: If this destination is not on our
  860. federation whitelist
  861. RequestSendFailed: If there were problems connecting to the
  862. remote, due to e.g. DNS failures, connection timeouts etc.
  863. """
  864. request = MatrixFederationRequest(
  865. method="PUT",
  866. destination=destination,
  867. path=path,
  868. query=args,
  869. json_callback=json_data_callback,
  870. json=data,
  871. )
  872. start_ms = self.clock.time_msec()
  873. response = await self._send_request_with_optional_trailing_slash(
  874. request,
  875. try_trailing_slash_on_400,
  876. backoff_on_404=backoff_on_404,
  877. ignore_backoff=ignore_backoff,
  878. long_retries=long_retries,
  879. timeout=timeout,
  880. backoff_on_all_error_codes=backoff_on_all_error_codes,
  881. )
  882. if timeout is not None:
  883. _sec_timeout = timeout / 1000
  884. else:
  885. _sec_timeout = self.default_timeout_seconds
  886. if parser is None:
  887. parser = cast(ByteParser[T], JsonParser())
  888. body = await _handle_response(
  889. self.reactor,
  890. _sec_timeout,
  891. request,
  892. response,
  893. start_ms,
  894. parser=parser,
  895. )
  896. return body
  897. async def post_json(
  898. self,
  899. destination: str,
  900. path: str,
  901. data: Optional[JsonDict] = None,
  902. long_retries: bool = False,
  903. timeout: Optional[int] = None,
  904. ignore_backoff: bool = False,
  905. args: Optional[QueryParams] = None,
  906. ) -> JsonDict:
  907. """Sends the specified json data using POST
  908. Args:
  909. destination: The remote server to send the HTTP request to.
  910. path: The HTTP path.
  911. data: A dict containing the data that will be used as
  912. the request body. This will be encoded as JSON.
  913. long_retries: whether to use the long retry algorithm. See
  914. docs on _send_request for details.
  915. timeout: number of milliseconds to wait for the response.
  916. self._default_timeout (60s) by default.
  917. Note that we may make several attempts to send the request; this
  918. timeout applies to the time spent waiting for response headers for
  919. *each* attempt (including connection time) as well as the time spent
  920. reading the response body after a 200 response.
  921. ignore_backoff: true to ignore the historical backoff data and
  922. try the request anyway.
  923. args: query params
  924. Returns:
  925. Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body.
  926. Raises:
  927. HttpResponseException: If we get an HTTP response code >= 300
  928. (except 429).
  929. NotRetryingDestination: If we are not yet ready to retry this
  930. server.
  931. FederationDeniedError: If this destination is not on our
  932. federation whitelist
  933. RequestSendFailed: If there were problems connecting to the
  934. remote, due to e.g. DNS failures, connection timeouts etc.
  935. """
  936. request = MatrixFederationRequest(
  937. method="POST", destination=destination, path=path, query=args, json=data
  938. )
  939. start_ms = self.clock.time_msec()
  940. response = await self._send_request(
  941. request,
  942. long_retries=long_retries,
  943. timeout=timeout,
  944. ignore_backoff=ignore_backoff,
  945. )
  946. if timeout is not None:
  947. _sec_timeout = timeout / 1000
  948. else:
  949. _sec_timeout = self.default_timeout_seconds
  950. body = await _handle_response(
  951. self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
  952. )
  953. return body
  954. @overload
  955. async def get_json(
  956. self,
  957. destination: str,
  958. path: str,
  959. args: Optional[QueryParams] = None,
  960. retry_on_dns_fail: bool = True,
  961. timeout: Optional[int] = None,
  962. ignore_backoff: bool = False,
  963. try_trailing_slash_on_400: bool = False,
  964. parser: Literal[None] = None,
  965. ) -> JsonDict:
  966. ...
  967. @overload
  968. async def get_json(
  969. self,
  970. destination: str,
  971. path: str,
  972. args: Optional[QueryParams] = ...,
  973. retry_on_dns_fail: bool = ...,
  974. timeout: Optional[int] = ...,
  975. ignore_backoff: bool = ...,
  976. try_trailing_slash_on_400: bool = ...,
  977. parser: ByteParser[T] = ...,
  978. ) -> T:
  979. ...
  980. async def get_json(
  981. self,
  982. destination: str,
  983. path: str,
  984. args: Optional[QueryParams] = None,
  985. retry_on_dns_fail: bool = True,
  986. timeout: Optional[int] = None,
  987. ignore_backoff: bool = False,
  988. try_trailing_slash_on_400: bool = False,
  989. parser: Optional[ByteParser[T]] = None,
  990. ) -> Union[JsonDict, T]:
  991. """GETs some json from the given host homeserver and path
  992. Args:
  993. destination: The remote server to send the HTTP request to.
  994. path: The HTTP path.
  995. args: A dictionary used to create query strings, defaults to
  996. None.
  997. retry_on_dns_fail: true if the request should be retried on DNS failures
  998. timeout: number of milliseconds to wait for the response.
  999. self._default_timeout (60s) by default.
  1000. Note that we may make several attempts to send the request; this
  1001. timeout applies to the time spent waiting for response headers for
  1002. *each* attempt (including connection time) as well as the time spent
  1003. reading the response body after a 200 response.
  1004. ignore_backoff: true to ignore the historical backoff data
  1005. and try the request anyway.
  1006. try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
  1007. response we should try appending a trailing slash to the end of
  1008. the request. Workaround for https://github.com/matrix-org/synapse/issues/3622
  1009. in Synapse <= v0.99.3.
  1010. parser: The parser to use to decode the response. Defaults to
  1011. parsing as JSON.
  1012. Returns:
  1013. Succeeds when we get a 2xx HTTP response. The
  1014. result will be the decoded JSON body.
  1015. Raises:
  1016. HttpResponseException: If we get an HTTP response code >= 300
  1017. (except 429).
  1018. NotRetryingDestination: If we are not yet ready to retry this
  1019. server.
  1020. FederationDeniedError: If this destination is not on our
  1021. federation whitelist
  1022. RequestSendFailed: If there were problems connecting to the
  1023. remote, due to e.g. DNS failures, connection timeouts etc.
  1024. """
  1025. json_dict, _ = await self.get_json_with_headers(
  1026. destination=destination,
  1027. path=path,
  1028. args=args,
  1029. retry_on_dns_fail=retry_on_dns_fail,
  1030. timeout=timeout,
  1031. ignore_backoff=ignore_backoff,
  1032. try_trailing_slash_on_400=try_trailing_slash_on_400,
  1033. parser=parser,
  1034. )
  1035. return json_dict
  1036. @overload
  1037. async def get_json_with_headers(
  1038. self,
  1039. destination: str,
  1040. path: str,
  1041. args: Optional[QueryParams] = None,
  1042. retry_on_dns_fail: bool = True,
  1043. timeout: Optional[int] = None,
  1044. ignore_backoff: bool = False,
  1045. try_trailing_slash_on_400: bool = False,
  1046. parser: Literal[None] = None,
  1047. ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]:
  1048. ...
  1049. @overload
  1050. async def get_json_with_headers(
  1051. self,
  1052. destination: str,
  1053. path: str,
  1054. args: Optional[QueryParams] = ...,
  1055. retry_on_dns_fail: bool = ...,
  1056. timeout: Optional[int] = ...,
  1057. ignore_backoff: bool = ...,
  1058. try_trailing_slash_on_400: bool = ...,
  1059. parser: ByteParser[T] = ...,
  1060. ) -> Tuple[T, Dict[bytes, List[bytes]]]:
  1061. ...
  1062. async def get_json_with_headers(
  1063. self,
  1064. destination: str,
  1065. path: str,
  1066. args: Optional[QueryParams] = None,
  1067. retry_on_dns_fail: bool = True,
  1068. timeout: Optional[int] = None,
  1069. ignore_backoff: bool = False,
  1070. try_trailing_slash_on_400: bool = False,
  1071. parser: Optional[ByteParser[T]] = None,
  1072. ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
  1073. """GETs some json from the given host homeserver and path
  1074. Args:
  1075. destination: The remote server to send the HTTP request to.
  1076. path: The HTTP path.
  1077. args: A dictionary used to create query strings, defaults to
  1078. None.
  1079. retry_on_dns_fail: true if the request should be retried on DNS failures
  1080. timeout: number of milliseconds to wait for the response.
  1081. self._default_timeout (60s) by default.
  1082. Note that we may make several attempts to send the request; this
  1083. timeout applies to the time spent waiting for response headers for
  1084. *each* attempt (including connection time) as well as the time spent
  1085. reading the response body after a 200 response.
  1086. ignore_backoff: true to ignore the historical backoff data
  1087. and try the request anyway.
  1088. try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
  1089. response we should try appending a trailing slash to the end of
  1090. the request. Workaround for https://github.com/matrix-org/synapse/issues/3622
  1091. in Synapse <= v0.99.3.
  1092. parser: The parser to use to decode the response. Defaults to
  1093. parsing as JSON.
  1094. Returns:
  1095. Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
  1096. decoded JSON body and a dict of the response headers.
  1097. Raises:
  1098. HttpResponseException: If we get an HTTP response code >= 300
  1099. (except 429).
  1100. NotRetryingDestination: If we are not yet ready to retry this
  1101. server.
  1102. FederationDeniedError: If this destination is not on our
  1103. federation whitelist
  1104. RequestSendFailed: If there were problems connecting to the
  1105. remote, due to e.g. DNS failures, connection timeouts etc.
  1106. """
  1107. request = MatrixFederationRequest(
  1108. method="GET", destination=destination, path=path, query=args
  1109. )
  1110. start_ms = self.clock.time_msec()
  1111. response = await self._send_request_with_optional_trailing_slash(
  1112. request,
  1113. try_trailing_slash_on_400,
  1114. backoff_on_404=False,
  1115. ignore_backoff=ignore_backoff,
  1116. retry_on_dns_fail=retry_on_dns_fail,
  1117. timeout=timeout,
  1118. )
  1119. headers = dict(response.headers.getAllRawHeaders())
  1120. if timeout is not None:
  1121. _sec_timeout = timeout / 1000
  1122. else:
  1123. _sec_timeout = self.default_timeout_seconds
  1124. if parser is None:
  1125. parser = cast(ByteParser[T], JsonParser())
  1126. body = await _handle_response(
  1127. self.reactor,
  1128. _sec_timeout,
  1129. request,
  1130. response,
  1131. start_ms,
  1132. parser=parser,
  1133. )
  1134. return body, headers
  1135. async def delete_json(
  1136. self,
  1137. destination: str,
  1138. path: str,
  1139. long_retries: bool = False,
  1140. timeout: Optional[int] = None,
  1141. ignore_backoff: bool = False,
  1142. args: Optional[QueryParams] = None,
  1143. ) -> JsonDict:
  1144. """Send a DELETE request to the remote expecting some json response
  1145. Args:
  1146. destination: The remote server to send the HTTP request to.
  1147. path: The HTTP path.
  1148. long_retries: whether to use the long retry algorithm. See
  1149. docs on _send_request for details.
  1150. timeout: number of milliseconds to wait for the response.
  1151. self._default_timeout (60s) by default.
  1152. Note that we may make several attempts to send the request; this
  1153. timeout applies to the time spent waiting for response headers for
  1154. *each* attempt (including connection time) as well as the time spent
  1155. reading the response body after a 200 response.
  1156. ignore_backoff: true to ignore the historical backoff data and
  1157. try the request anyway.
  1158. args: query params
  1159. Returns:
  1160. Succeeds when we get a 2xx HTTP response. The
  1161. result will be the decoded JSON body.
  1162. Raises:
  1163. HttpResponseException: If we get an HTTP response code >= 300
  1164. (except 429).
  1165. NotRetryingDestination: If we are not yet ready to retry this
  1166. server.
  1167. FederationDeniedError: If this destination is not on our
  1168. federation whitelist
  1169. RequestSendFailed: If there were problems connecting to the
  1170. remote, due to e.g. DNS failures, connection timeouts etc.
  1171. """
  1172. request = MatrixFederationRequest(
  1173. method="DELETE", destination=destination, path=path, query=args
  1174. )
  1175. start_ms = self.clock.time_msec()
  1176. response = await self._send_request(
  1177. request,
  1178. long_retries=long_retries,
  1179. timeout=timeout,
  1180. ignore_backoff=ignore_backoff,
  1181. )
  1182. if timeout is not None:
  1183. _sec_timeout = timeout / 1000
  1184. else:
  1185. _sec_timeout = self.default_timeout_seconds
  1186. body = await _handle_response(
  1187. self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
  1188. )
  1189. return body
  1190. async def get_file(
  1191. self,
  1192. destination: str,
  1193. path: str,
  1194. output_stream: BinaryIO,
  1195. args: Optional[QueryParams] = None,
  1196. retry_on_dns_fail: bool = True,
  1197. max_size: Optional[int] = None,
  1198. ignore_backoff: bool = False,
  1199. follow_redirects: bool = False,
  1200. ) -> Tuple[int, Dict[bytes, List[bytes]]]:
  1201. """GETs a file from a given homeserver
  1202. Args:
  1203. destination: The remote server to send the HTTP request to.
  1204. path: The HTTP path to GET.
  1205. output_stream: File to write the response body to.
  1206. args: Optional dictionary used to create the query string.
  1207. ignore_backoff: true to ignore the historical backoff data
  1208. and try the request anyway.
  1209. follow_redirects: True to follow the Location header of 307/308 redirect
  1210. responses. This does not recurse.
  1211. Returns:
  1212. Resolves with an (int,dict) tuple of
  1213. the file length and a dict of the response headers.
  1214. Raises:
  1215. HttpResponseException: If we get an HTTP response code >= 300
  1216. (except 429).
  1217. NotRetryingDestination: If we are not yet ready to retry this
  1218. server.
  1219. FederationDeniedError: If this destination is not on our
  1220. federation whitelist
  1221. RequestSendFailed: If there were problems connecting to the
  1222. remote, due to e.g. DNS failures, connection timeouts etc.
  1223. """
  1224. request = MatrixFederationRequest(
  1225. method="GET", destination=destination, path=path, query=args
  1226. )
  1227. response = await self._send_request(
  1228. request,
  1229. retry_on_dns_fail=retry_on_dns_fail,
  1230. ignore_backoff=ignore_backoff,
  1231. follow_redirects=follow_redirects,
  1232. )
  1233. headers = dict(response.headers.getAllRawHeaders())
  1234. try:
  1235. d = read_body_with_max_size(response, output_stream, max_size)
  1236. d.addTimeout(self.default_timeout_seconds, self.reactor)
  1237. length = await make_deferred_yieldable(d)
  1238. except BodyExceededMaxSize:
  1239. msg = "Requested file is too large > %r bytes" % (max_size,)
  1240. logger.warning(
  1241. "{%s} [%s] %s",
  1242. request.txn_id,
  1243. request.destination,
  1244. msg,
  1245. )
  1246. raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
  1247. except defer.TimeoutError as e:
  1248. logger.warning(
  1249. "{%s} [%s] Timed out reading response - %s %s",
  1250. request.txn_id,
  1251. request.destination,
  1252. request.method,
  1253. request.uri.decode("ascii"),
  1254. )
  1255. raise RequestSendFailed(e, can_retry=True) from e
  1256. except ResponseFailed as e:
  1257. logger.warning(
  1258. "{%s} [%s] Failed to read response - %s %s",
  1259. request.txn_id,
  1260. request.destination,
  1261. request.method,
  1262. request.uri.decode("ascii"),
  1263. )
  1264. raise RequestSendFailed(e, can_retry=True) from e
  1265. except Exception as e:
  1266. logger.warning(
  1267. "{%s} [%s] Error reading response: %s",
  1268. request.txn_id,
  1269. request.destination,
  1270. e,
  1271. )
  1272. raise
  1273. logger.info(
  1274. "{%s} [%s] Completed: %d %s [%d bytes] %s %s",
  1275. request.txn_id,
  1276. request.destination,
  1277. response.code,
  1278. response.phrase.decode("ascii", errors="replace"),
  1279. length,
  1280. request.method,
  1281. request.uri.decode("ascii"),
  1282. )
  1283. return length, headers
  1284. def _flatten_response_never_received(e: BaseException) -> str:
  1285. if hasattr(e, "reasons"):
  1286. reasons = ", ".join(
  1287. _flatten_response_never_received(f.value) for f in e.reasons
  1288. )
  1289. return "%s:[%s]" % (type(e).__name__, reasons)
  1290. else:
  1291. return repr(e)
  1292. def check_content_type_is(headers: Headers, expected_content_type: str) -> None:
  1293. """
  1294. Check that a set of HTTP headers have a Content-Type header, and that it
  1295. is the expected value..
  1296. Args:
  1297. headers: headers to check
  1298. Raises:
  1299. RequestSendFailed: if the Content-Type header is missing or doesn't match
  1300. """
  1301. content_type_headers = headers.getRawHeaders(b"Content-Type")
  1302. if content_type_headers is None:
  1303. raise RequestSendFailed(
  1304. RuntimeError("No Content-Type header received from remote server"),
  1305. can_retry=False,
  1306. )
  1307. c_type = content_type_headers[0].decode("ascii") # only the first header
  1308. val, options = cgi.parse_header(c_type)
  1309. if val != expected_content_type:
  1310. raise RequestSendFailed(
  1311. RuntimeError(
  1312. f"Remote server sent Content-Type header of '{c_type}', not '{expected_content_type}'",
  1313. ),
  1314. can_retry=False,
  1315. )