25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

471 lines
17 KiB

  1. # Copyright 2019 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import random
  16. import re
  17. from typing import Any, Collection, Dict, List, Optional, Sequence, Tuple
  18. from urllib.parse import urlparse
  19. from urllib.request import ( # type: ignore[attr-defined]
  20. getproxies_environment,
  21. proxy_bypass_environment,
  22. )
  23. from zope.interface import implementer
  24. from twisted.internet import defer
  25. from twisted.internet.endpoints import (
  26. HostnameEndpoint,
  27. UNIXClientEndpoint,
  28. wrapClientTLS,
  29. )
  30. from twisted.internet.interfaces import (
  31. IProtocol,
  32. IProtocolFactory,
  33. IReactorCore,
  34. IStreamClientEndpoint,
  35. )
  36. from twisted.python.failure import Failure
  37. from twisted.web.client import (
  38. URI,
  39. BrowserLikePolicyForHTTPS,
  40. HTTPConnectionPool,
  41. _AgentBase,
  42. )
  43. from twisted.web.error import SchemeNotSupported
  44. from twisted.web.http_headers import Headers
  45. from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse
  46. from synapse.config.workers import (
  47. InstanceLocationConfig,
  48. InstanceTcpLocationConfig,
  49. InstanceUnixLocationConfig,
  50. )
  51. from synapse.http import redact_uri
  52. from synapse.http.connectproxyclient import (
  53. BasicProxyCredentials,
  54. HTTPConnectProxyEndpoint,
  55. ProxyCredentials,
  56. )
  57. from synapse.logging.context import run_in_background
  58. logger = logging.getLogger(__name__)
  59. _VALID_URI = re.compile(rb"\A[\x21-\x7e]+\Z")
  60. @implementer(IAgent)
  61. class ProxyAgent(_AgentBase):
  62. """An Agent implementation which will use an HTTP proxy if one was requested
  63. Args:
  64. reactor: twisted reactor to place outgoing
  65. connections.
  66. proxy_reactor: twisted reactor to use for connections to the proxy server
  67. reactor might have some blocking applied (i.e. for DNS queries),
  68. but we need unblocked access to the proxy.
  69. contextFactory: A factory for TLS contexts, to control the
  70. verification parameters of OpenSSL. The default is to use a
  71. `BrowserLikePolicyForHTTPS`, so unless you have special
  72. requirements you can leave this as-is.
  73. connectTimeout: The amount of time that this Agent will wait
  74. for the peer to accept a connection, in seconds. If 'None',
  75. HostnameEndpoint's default (30s) will be used.
  76. This is used for connections to both proxies and destination servers.
  77. bindAddress: The local address for client sockets to bind to.
  78. pool: connection pool to be used. If None, a
  79. non-persistent pool instance will be created.
  80. use_proxy: Whether proxy settings should be discovered and used
  81. from conventional environment variables.
  82. federation_proxy_locations: An optional list of locations to proxy outbound federation
  83. traffic through (only requests that use the `matrix-federation://` scheme
  84. will be proxied).
  85. federation_proxy_credentials: Required if `federation_proxy_locations` is set. The
  86. credentials to use when proxying outbound federation traffic through another
  87. worker.
  88. Raises:
  89. ValueError if use_proxy is set and the environment variables
  90. contain an invalid proxy specification.
  91. RuntimeError if no tls_options_factory is given for a https connection
  92. """
  93. def __init__(
  94. self,
  95. reactor: IReactorCore,
  96. proxy_reactor: Optional[IReactorCore] = None,
  97. contextFactory: Optional[IPolicyForHTTPS] = None,
  98. connectTimeout: Optional[float] = None,
  99. bindAddress: Optional[bytes] = None,
  100. pool: Optional[HTTPConnectionPool] = None,
  101. use_proxy: bool = False,
  102. federation_proxy_locations: Collection[InstanceLocationConfig] = (),
  103. federation_proxy_credentials: Optional[ProxyCredentials] = None,
  104. ):
  105. contextFactory = contextFactory or BrowserLikePolicyForHTTPS()
  106. _AgentBase.__init__(self, reactor, pool)
  107. if proxy_reactor is None:
  108. self.proxy_reactor = reactor
  109. else:
  110. self.proxy_reactor = proxy_reactor
  111. self._endpoint_kwargs: Dict[str, Any] = {}
  112. if connectTimeout is not None:
  113. self._endpoint_kwargs["timeout"] = connectTimeout
  114. if bindAddress is not None:
  115. self._endpoint_kwargs["bindAddress"] = bindAddress
  116. http_proxy = None
  117. https_proxy = None
  118. no_proxy = None
  119. if use_proxy:
  120. proxies = getproxies_environment()
  121. http_proxy = proxies["http"].encode() if "http" in proxies else None
  122. https_proxy = proxies["https"].encode() if "https" in proxies else None
  123. no_proxy = proxies["no"] if "no" in proxies else None
  124. self.http_proxy_endpoint, self.http_proxy_creds = http_proxy_endpoint(
  125. http_proxy, self.proxy_reactor, contextFactory, **self._endpoint_kwargs
  126. )
  127. self.https_proxy_endpoint, self.https_proxy_creds = http_proxy_endpoint(
  128. https_proxy, self.proxy_reactor, contextFactory, **self._endpoint_kwargs
  129. )
  130. self.no_proxy = no_proxy
  131. self._policy_for_https = contextFactory
  132. self._reactor = reactor
  133. self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None
  134. self._federation_proxy_credentials: Optional[ProxyCredentials] = None
  135. if federation_proxy_locations:
  136. assert (
  137. federation_proxy_credentials is not None
  138. ), "`federation_proxy_credentials` are required when using `federation_proxy_locations`"
  139. endpoints: List[IStreamClientEndpoint] = []
  140. for federation_proxy_location in federation_proxy_locations:
  141. endpoint: IStreamClientEndpoint
  142. if isinstance(federation_proxy_location, InstanceTcpLocationConfig):
  143. endpoint = HostnameEndpoint(
  144. self.proxy_reactor,
  145. federation_proxy_location.host,
  146. federation_proxy_location.port,
  147. )
  148. if federation_proxy_location.tls:
  149. tls_connection_creator = (
  150. self._policy_for_https.creatorForNetloc(
  151. federation_proxy_location.host.encode("utf-8"),
  152. federation_proxy_location.port,
  153. )
  154. )
  155. endpoint = wrapClientTLS(tls_connection_creator, endpoint)
  156. elif isinstance(federation_proxy_location, InstanceUnixLocationConfig):
  157. endpoint = UNIXClientEndpoint(
  158. self.proxy_reactor, federation_proxy_location.path
  159. )
  160. else:
  161. # It is supremely unlikely we ever hit this
  162. raise SchemeNotSupported(
  163. f"Unknown type of Endpoint requested, check {federation_proxy_location}"
  164. )
  165. endpoints.append(endpoint)
  166. self._federation_proxy_endpoint = _RandomSampleEndpoints(endpoints)
  167. self._federation_proxy_credentials = federation_proxy_credentials
  168. def request(
  169. self,
  170. method: bytes,
  171. uri: bytes,
  172. headers: Optional[Headers] = None,
  173. bodyProducer: Optional[IBodyProducer] = None,
  174. ) -> "defer.Deferred[IResponse]":
  175. """
  176. Issue a request to the server indicated by the given uri.
  177. Supports `http` and `https` schemes.
  178. An existing connection from the connection pool may be used or a new one may be
  179. created.
  180. See also: twisted.web.iweb.IAgent.request
  181. Args:
  182. method: The request method to use, such as `GET`, `POST`, etc
  183. uri: The location of the resource to request.
  184. headers: Extra headers to send with the request
  185. bodyProducer: An object which can generate bytes to make up the body of
  186. this request (for example, the properly encoded contents of a file for
  187. a file upload). Or, None if the request is to have no body.
  188. Returns:
  189. A deferred which completes when the header of the response has
  190. been received (regardless of the response status code).
  191. Can fail with:
  192. SchemeNotSupported: if the uri is not http or https
  193. twisted.internet.error.TimeoutError if the server we are connecting
  194. to (proxy or destination) does not accept a connection before
  195. connectTimeout.
  196. ... other things too.
  197. """
  198. uri = uri.strip()
  199. if not _VALID_URI.match(uri):
  200. raise ValueError(f"Invalid URI {uri!r}")
  201. parsed_uri = URI.fromBytes(uri)
  202. pool_key = f"{parsed_uri.scheme!r}{parsed_uri.host!r}{parsed_uri.port}"
  203. request_path = parsed_uri.originForm
  204. should_skip_proxy = False
  205. if self.no_proxy is not None:
  206. should_skip_proxy = proxy_bypass_environment(
  207. parsed_uri.host.decode(),
  208. proxies={"no": self.no_proxy},
  209. )
  210. if (
  211. parsed_uri.scheme == b"http"
  212. and self.http_proxy_endpoint
  213. and not should_skip_proxy
  214. ):
  215. # Determine whether we need to set Proxy-Authorization headers
  216. if self.http_proxy_creds:
  217. # Set a Proxy-Authorization header
  218. if headers is None:
  219. headers = Headers()
  220. headers.addRawHeader(
  221. b"Proxy-Authorization",
  222. self.http_proxy_creds.as_proxy_authorization_value(),
  223. )
  224. # Cache *all* connections under the same key, since we are only
  225. # connecting to a single destination, the proxy:
  226. pool_key = "http-proxy"
  227. endpoint = self.http_proxy_endpoint
  228. request_path = uri
  229. elif (
  230. parsed_uri.scheme == b"https"
  231. and self.https_proxy_endpoint
  232. and not should_skip_proxy
  233. ):
  234. endpoint = HTTPConnectProxyEndpoint(
  235. self.proxy_reactor,
  236. self.https_proxy_endpoint,
  237. parsed_uri.host,
  238. parsed_uri.port,
  239. self.https_proxy_creds,
  240. )
  241. elif (
  242. parsed_uri.scheme == b"matrix-federation"
  243. and self._federation_proxy_endpoint
  244. ):
  245. assert (
  246. self._federation_proxy_credentials is not None
  247. ), "`federation_proxy_credentials` are required when using `federation_proxy_locations`"
  248. # Set a Proxy-Authorization header
  249. if headers is None:
  250. headers = Headers()
  251. # We always need authentication for the outbound federation proxy
  252. headers.addRawHeader(
  253. b"Proxy-Authorization",
  254. self._federation_proxy_credentials.as_proxy_authorization_value(),
  255. )
  256. endpoint = self._federation_proxy_endpoint
  257. request_path = uri
  258. else:
  259. # not using a proxy
  260. endpoint = HostnameEndpoint(
  261. self._reactor, parsed_uri.host, parsed_uri.port, **self._endpoint_kwargs
  262. )
  263. logger.debug(
  264. "Requesting %s via %s",
  265. redact_uri(uri.decode("ascii", errors="replace")),
  266. endpoint,
  267. )
  268. if parsed_uri.scheme == b"https":
  269. tls_connection_creator = self._policy_for_https.creatorForNetloc(
  270. parsed_uri.host, parsed_uri.port
  271. )
  272. endpoint = wrapClientTLS(tls_connection_creator, endpoint)
  273. elif parsed_uri.scheme == b"http":
  274. pass
  275. elif (
  276. parsed_uri.scheme == b"matrix-federation"
  277. and self._federation_proxy_endpoint
  278. ):
  279. pass
  280. else:
  281. return defer.fail(
  282. Failure(
  283. SchemeNotSupported("Unsupported scheme: %r" % (parsed_uri.scheme,))
  284. )
  285. )
  286. return self._requestWithEndpoint(
  287. pool_key, endpoint, method, parsed_uri, headers, bodyProducer, request_path
  288. )
  289. def http_proxy_endpoint(
  290. proxy: Optional[bytes],
  291. reactor: IReactorCore,
  292. tls_options_factory: Optional[IPolicyForHTTPS],
  293. **kwargs: object,
  294. ) -> Tuple[Optional[IStreamClientEndpoint], Optional[ProxyCredentials]]:
  295. """Parses an http proxy setting and returns an endpoint for the proxy
  296. Args:
  297. proxy: the proxy setting in the form: [scheme://][<username>:<password>@]<host>[:<port>]
  298. This currently supports http:// and https:// proxies.
  299. A hostname without scheme is assumed to be http.
  300. reactor: reactor to be used to connect to the proxy
  301. tls_options_factory: the TLS options to use when connecting through a https proxy
  302. kwargs: other args to be passed to HostnameEndpoint
  303. Returns:
  304. a tuple of
  305. endpoint to use to connect to the proxy, or None
  306. ProxyCredentials or if no credentials were found, or None
  307. Raise:
  308. ValueError if proxy has no hostname or unsupported scheme.
  309. RuntimeError if no tls_options_factory is given for a https connection
  310. """
  311. if proxy is None:
  312. return None, None
  313. # Note: urlsplit/urlparse cannot be used here as that does not work (for Python
  314. # 3.9+) on scheme-less proxies, e.g. host:port.
  315. scheme, host, port, credentials = parse_proxy(proxy)
  316. proxy_endpoint = HostnameEndpoint(reactor, host, port, **kwargs)
  317. if scheme == b"https":
  318. if tls_options_factory:
  319. tls_options = tls_options_factory.creatorForNetloc(host, port)
  320. proxy_endpoint = wrapClientTLS(tls_options, proxy_endpoint)
  321. else:
  322. raise RuntimeError(
  323. f"No TLS options for a https connection via proxy {proxy!s}"
  324. )
  325. return proxy_endpoint, credentials
  326. def parse_proxy(
  327. proxy: bytes, default_scheme: bytes = b"http", default_port: int = 1080
  328. ) -> Tuple[bytes, bytes, int, Optional[ProxyCredentials]]:
  329. """
  330. Parse a proxy connection string.
  331. Given a HTTP proxy URL, breaks it down into components and checks that it
  332. has a hostname (otherwise it is not useful to us when trying to find a
  333. proxy) and asserts that the URL has a scheme we support.
  334. Args:
  335. proxy: The proxy connection string. Must be in the form '[scheme://][<username>:<password>@]host[:port]'.
  336. default_scheme: The default scheme to return if one is not found in `proxy`. Defaults to http
  337. default_port: The default port to return if one is not found in `proxy`. Defaults to 1080
  338. Returns:
  339. A tuple containing the scheme, hostname, port and ProxyCredentials.
  340. If no credentials were found, the ProxyCredentials instance is replaced with None.
  341. Raise:
  342. ValueError if proxy has no hostname or unsupported scheme.
  343. """
  344. # First check if we have a scheme present
  345. # Note: urlsplit/urlparse cannot be used (for Python # 3.9+) on scheme-less proxies, e.g. host:port.
  346. if b"://" not in proxy:
  347. proxy = b"".join([default_scheme, b"://", proxy])
  348. url = urlparse(proxy)
  349. if not url.hostname:
  350. raise ValueError("Proxy URL did not contain a hostname! Please specify one.")
  351. if url.scheme not in (b"http", b"https"):
  352. raise ValueError(
  353. f"Unknown proxy scheme {url.scheme!s}; only 'http' and 'https' is supported."
  354. )
  355. credentials = None
  356. if url.username and url.password:
  357. credentials = BasicProxyCredentials(
  358. b"".join([url.username, b":", url.password])
  359. )
  360. return url.scheme, url.hostname, url.port or default_port, credentials
  361. @implementer(IStreamClientEndpoint)
  362. class _RandomSampleEndpoints:
  363. """An endpoint that randomly iterates through a given list of endpoints at
  364. each connection attempt.
  365. """
  366. def __init__(
  367. self,
  368. endpoints: Sequence[IStreamClientEndpoint],
  369. ) -> None:
  370. assert endpoints
  371. self._endpoints = endpoints
  372. def __repr__(self) -> str:
  373. return f"<_RandomSampleEndpoints endpoints={self._endpoints}>"
  374. def connect(
  375. self, protocol_factory: IProtocolFactory
  376. ) -> "defer.Deferred[IProtocol]":
  377. """Implements IStreamClientEndpoint interface"""
  378. return run_in_background(self._do_connect, protocol_factory)
  379. async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol:
  380. failures: List[Failure] = []
  381. for endpoint in random.sample(self._endpoints, k=len(self._endpoints)):
  382. try:
  383. return await endpoint.connect(protocol_factory)
  384. except Exception:
  385. failures.append(Failure())
  386. failures.pop().raiseException()