You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

284 lines
9.8 KiB

  1. # Copyright 2023 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. import json
  16. import logging
  17. import urllib.parse
  18. from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast
  19. from twisted.internet import protocol
  20. from twisted.internet.interfaces import ITCPTransport
  21. from twisted.internet.protocol import connectionDone
  22. from twisted.python import failure
  23. from twisted.python.failure import Failure
  24. from twisted.web.client import ResponseDone
  25. from twisted.web.http_headers import Headers
  26. from twisted.web.iweb import IResponse
  27. from twisted.web.resource import IResource
  28. from twisted.web.server import Request, Site
  29. from synapse.api.errors import Codes, InvalidProxyCredentialsError
  30. from synapse.http import QuieterFileBodyProducer
  31. from synapse.http.server import _AsyncResource
  32. from synapse.logging.context import make_deferred_yieldable, run_in_background
  33. from synapse.types import ISynapseReactor
  34. from synapse.util.async_helpers import timeout_deferred
  35. if TYPE_CHECKING:
  36. from synapse.http.site import SynapseRequest
  37. from synapse.server import HomeServer
  38. logger = logging.getLogger(__name__)
  39. # "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616
  40. # section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be
  41. # consumed by the immediate recipient and not be forwarded on.
  42. HOP_BY_HOP_HEADERS = {
  43. "Connection",
  44. "Keep-Alive",
  45. "Proxy-Authenticate",
  46. "Proxy-Authorization",
  47. "TE",
  48. "Trailers",
  49. "Transfer-Encoding",
  50. "Upgrade",
  51. }
  52. def parse_connection_header_value(
  53. connection_header_value: Optional[bytes],
  54. ) -> Set[str]:
  55. """
  56. Parse the `Connection` header to determine which headers we should not be copied
  57. over from the remote response.
  58. As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1
  59. Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}`
  60. Even though "close" is a special directive, let's just treat it as just another
  61. header for simplicity. If people want to check for this directive, they can simply
  62. check for `"Close" in headers`.
  63. Args:
  64. connection_header_value: The value of the `Connection` header.
  65. Returns:
  66. The set of header names that should not be copied over from the remote response.
  67. The keys are capitalized in canonical capitalization.
  68. """
  69. headers = Headers()
  70. extra_headers_to_remove: Set[str] = set()
  71. if connection_header_value:
  72. extra_headers_to_remove = {
  73. headers._canonicalNameCaps(connection_option.strip()).decode("ascii")
  74. for connection_option in connection_header_value.split(b",")
  75. }
  76. return extra_headers_to_remove
  77. class ProxyResource(_AsyncResource):
  78. """
  79. A stub resource that proxies any requests with a `matrix-federation://` scheme
  80. through the given `federation_agent` to the remote homeserver and ferries back the
  81. info.
  82. """
  83. isLeaf = True
  84. def __init__(self, reactor: ISynapseReactor, hs: "HomeServer"):
  85. super().__init__(True)
  86. self.reactor = reactor
  87. self.agent = hs.get_federation_http_client().agent
  88. self._proxy_authorization_secret = hs.config.worker.worker_replication_secret
  89. def _check_auth(self, request: Request) -> None:
  90. # The `matrix-federation://` proxy functionality can only be used with auth.
  91. # Protect homserver admins forgetting to configure a secret.
  92. assert self._proxy_authorization_secret is not None
  93. # Get the authorization header.
  94. auth_headers = request.requestHeaders.getRawHeaders(b"Proxy-Authorization")
  95. if not auth_headers:
  96. raise InvalidProxyCredentialsError(
  97. "Missing Proxy-Authorization header.", Codes.MISSING_TOKEN
  98. )
  99. if len(auth_headers) > 1:
  100. raise InvalidProxyCredentialsError(
  101. "Too many Proxy-Authorization headers.", Codes.UNAUTHORIZED
  102. )
  103. parts = auth_headers[0].split(b" ")
  104. if parts[0] == b"Bearer" and len(parts) == 2:
  105. received_secret = parts[1].decode("ascii")
  106. if self._proxy_authorization_secret == received_secret:
  107. # Success!
  108. return
  109. raise InvalidProxyCredentialsError(
  110. "Invalid Proxy-Authorization header.", Codes.UNAUTHORIZED
  111. )
  112. async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
  113. uri = urllib.parse.urlparse(request.uri)
  114. assert uri.scheme == b"matrix-federation"
  115. # Check the authorization headers before handling the request.
  116. self._check_auth(request)
  117. headers = Headers()
  118. for header_name in (b"User-Agent", b"Authorization", b"Content-Type"):
  119. header_value = request.getHeader(header_name)
  120. if header_value:
  121. headers.addRawHeader(header_name, header_value)
  122. request_deferred = run_in_background(
  123. self.agent.request,
  124. request.method,
  125. request.uri,
  126. headers=headers,
  127. bodyProducer=QuieterFileBodyProducer(request.content),
  128. )
  129. request_deferred = timeout_deferred(
  130. request_deferred,
  131. # This should be set longer than the timeout in `MatrixFederationHttpClient`
  132. # so that it has enough time to complete and pass us the data before we give
  133. # up.
  134. timeout=90,
  135. reactor=self.reactor,
  136. )
  137. response = await make_deferred_yieldable(request_deferred)
  138. return response.code, response
  139. def _send_response(
  140. self,
  141. request: "SynapseRequest",
  142. code: int,
  143. response_object: Any,
  144. ) -> None:
  145. response = cast(IResponse, response_object)
  146. response_headers = cast(Headers, response.headers)
  147. request.setResponseCode(code)
  148. # The `Connection` header also defines which headers should not be copied over.
  149. connection_header = response_headers.getRawHeaders(b"connection")
  150. extra_headers_to_remove = parse_connection_header_value(
  151. connection_header[0] if connection_header else None
  152. )
  153. # Copy headers.
  154. for k, v in response_headers.getAllRawHeaders():
  155. # Do not copy over any hop-by-hop headers. These are meant to only be
  156. # consumed by the immediate recipient and not be forwarded on.
  157. header_key = k.decode("ascii")
  158. if (
  159. header_key in HOP_BY_HOP_HEADERS
  160. or header_key in extra_headers_to_remove
  161. ):
  162. continue
  163. request.responseHeaders.setRawHeaders(k, v)
  164. response.deliverBody(_ProxyResponseBody(request))
  165. def _send_error_response(
  166. self,
  167. f: failure.Failure,
  168. request: "SynapseRequest",
  169. ) -> None:
  170. if isinstance(f.value, InvalidProxyCredentialsError):
  171. error_response_code = f.value.code
  172. error_response_json = {"errcode": f.value.errcode, "err": f.value.msg}
  173. else:
  174. error_response_code = 502
  175. error_response_json = {
  176. "errcode": Codes.UNKNOWN,
  177. "err": "ProxyResource: Error when proxying request: %s %s -> %s"
  178. % (
  179. request.method.decode("ascii"),
  180. request.uri.decode("ascii"),
  181. f,
  182. ),
  183. }
  184. request.setResponseCode(error_response_code)
  185. request.setHeader(b"Content-Type", b"application/json")
  186. request.write((json.dumps(error_response_json)).encode())
  187. request.finish()
  188. class _ProxyResponseBody(protocol.Protocol):
  189. """
  190. A protocol that proxies the given remote response data back out to the given local
  191. request.
  192. """
  193. transport: Optional[ITCPTransport] = None
  194. def __init__(self, request: "SynapseRequest") -> None:
  195. self._request = request
  196. def dataReceived(self, data: bytes) -> None:
  197. # Avoid sending response data to the local request that already disconnected
  198. if self._request._disconnected and self.transport is not None:
  199. # Close the connection (forcefully) since all the data will get
  200. # discarded anyway.
  201. self.transport.abortConnection()
  202. return
  203. self._request.write(data)
  204. def connectionLost(self, reason: Failure = connectionDone) -> None:
  205. # If the local request is already finished (successfully or failed), don't
  206. # worry about sending anything back.
  207. if self._request.finished:
  208. return
  209. if reason.check(ResponseDone):
  210. self._request.finish()
  211. else:
  212. # Abort the underlying request since our remote request also failed.
  213. self._request.transport.abortConnection()
  214. class ProxySite(Site):
  215. """
  216. Proxies any requests with a `matrix-federation://` scheme through the given
  217. `federation_agent`. Otherwise, behaves like a normal `Site`.
  218. """
  219. def __init__(
  220. self,
  221. resource: IResource,
  222. reactor: ISynapseReactor,
  223. hs: "HomeServer",
  224. ):
  225. super().__init__(resource, reactor=reactor)
  226. self._proxy_resource = ProxyResource(reactor, hs=hs)
  227. def getResourceFor(self, request: "SynapseRequest") -> IResource:
  228. uri = urllib.parse.urlparse(request.uri)
  229. if uri.scheme == b"matrix-federation":
  230. return self._proxy_resource
  231. return super().getResourceFor(request)