You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

188 line
6.6 KiB

  1. # Copyright 2023 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import Dict, Optional
  16. from zope.interface import implementer
  17. from twisted.internet import defer
  18. from twisted.internet.endpoints import (
  19. HostnameEndpoint,
  20. UNIXClientEndpoint,
  21. wrapClientTLS,
  22. )
  23. from twisted.internet.interfaces import IStreamClientEndpoint
  24. from twisted.python.failure import Failure
  25. from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
  26. from twisted.web.error import SchemeNotSupported
  27. from twisted.web.http_headers import Headers
  28. from twisted.web.iweb import (
  29. IAgent,
  30. IAgentEndpointFactory,
  31. IBodyProducer,
  32. IPolicyForHTTPS,
  33. IResponse,
  34. )
  35. from synapse.config.workers import (
  36. InstanceLocationConfig,
  37. InstanceTcpLocationConfig,
  38. InstanceUnixLocationConfig,
  39. )
  40. from synapse.types import ISynapseReactor
  41. logger = logging.getLogger(__name__)
  42. @implementer(IAgentEndpointFactory)
  43. class ReplicationEndpointFactory:
  44. """Connect to a given TCP or UNIX socket"""
  45. def __init__(
  46. self,
  47. reactor: ISynapseReactor,
  48. instance_map: Dict[str, InstanceLocationConfig],
  49. context_factory: IPolicyForHTTPS,
  50. ) -> None:
  51. self.reactor = reactor
  52. self.instance_map = instance_map
  53. self.context_factory = context_factory
  54. def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
  55. """
  56. This part of the factory decides what kind of endpoint is being connected to.
  57. Args:
  58. uri: The pre-parsed URI object containing all the uri data
  59. Returns: The correct client endpoint object
  60. """
  61. # The given URI has a special scheme and includes the worker name. The
  62. # actual connection details are pulled from the instance map.
  63. worker_name = uri.netloc.decode("utf-8")
  64. location_config = self.instance_map[worker_name]
  65. scheme = location_config.scheme()
  66. if isinstance(location_config, InstanceTcpLocationConfig):
  67. endpoint = HostnameEndpoint(
  68. self.reactor,
  69. location_config.host,
  70. location_config.port,
  71. )
  72. if scheme == "https":
  73. endpoint = wrapClientTLS(
  74. # The 'port' argument below isn't actually used by the function
  75. self.context_factory.creatorForNetloc(
  76. location_config.host.encode("utf-8"),
  77. location_config.port,
  78. ),
  79. endpoint,
  80. )
  81. return endpoint
  82. elif isinstance(location_config, InstanceUnixLocationConfig):
  83. return UNIXClientEndpoint(self.reactor, location_config.path)
  84. else:
  85. raise SchemeNotSupported(f"Unsupported scheme: {scheme}")
  86. @implementer(IAgent)
  87. class ReplicationAgent(_AgentBase):
  88. """
  89. Client for connecting to replication endpoints via HTTP and HTTPS.
  90. Much of this code is copied from Twisted's twisted.web.client.Agent.
  91. """
  92. def __init__(
  93. self,
  94. reactor: ISynapseReactor,
  95. instance_map: Dict[str, InstanceLocationConfig],
  96. contextFactory: IPolicyForHTTPS,
  97. connectTimeout: Optional[float] = None,
  98. bindAddress: Optional[bytes] = None,
  99. pool: Optional[HTTPConnectionPool] = None,
  100. ):
  101. """
  102. Create a ReplicationAgent.
  103. Args:
  104. reactor: A reactor for this Agent to place outgoing connections.
  105. contextFactory: A factory for TLS contexts, to control the
  106. verification parameters of OpenSSL. The default is to use a
  107. BrowserLikePolicyForHTTPS, so unless you have special
  108. requirements you can leave this as-is.
  109. connectTimeout: The amount of time that this Agent will wait
  110. for the peer to accept a connection.
  111. bindAddress: The local address for client sockets to bind to.
  112. pool: An HTTPConnectionPool instance, or None, in which
  113. case a non-persistent HTTPConnectionPool instance will be
  114. created.
  115. """
  116. _AgentBase.__init__(self, reactor, pool)
  117. endpoint_factory = ReplicationEndpointFactory(
  118. reactor, instance_map, contextFactory
  119. )
  120. self._endpointFactory = endpoint_factory
  121. def request(
  122. self,
  123. method: bytes,
  124. uri: bytes,
  125. headers: Optional[Headers] = None,
  126. bodyProducer: Optional[IBodyProducer] = None,
  127. ) -> "defer.Deferred[IResponse]":
  128. """
  129. Issue a request to the server indicated by the given uri.
  130. An existing connection from the connection pool may be used or a new
  131. one may be created.
  132. Currently, HTTP, HTTPS and UNIX schemes are supported in uri.
  133. This is copied from twisted.web.client.Agent, except:
  134. * It uses a different pool key (combining the scheme with either host & port or
  135. socket path).
  136. * It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not
  137. required when using a worker's name as a 'hostname' for Synapse HTTP
  138. Replication machinery. Specifically, this allows a range of ascii characters
  139. such as '+' and '_' in hostnames/worker's names.
  140. See: twisted.web.iweb.IAgent.request
  141. """
  142. parsedURI = URI.fromBytes(uri)
  143. try:
  144. endpoint = self._endpointFactory.endpointForURI(parsedURI)
  145. except SchemeNotSupported:
  146. return defer.fail(Failure())
  147. worker_name = parsedURI.netloc.decode("utf-8")
  148. key_scheme = self._endpointFactory.instance_map[worker_name].scheme()
  149. key_netloc = self._endpointFactory.instance_map[worker_name].netloc()
  150. # This sets the Pool key to be:
  151. # (http(s), <host:port>) or (unix, <socket_path>)
  152. key = (key_scheme, key_netloc)
  153. # _requestWithEndpoint comes from _AgentBase class
  154. return self._requestWithEndpoint(
  155. key,
  156. endpoint,
  157. method,
  158. parsedURI,
  159. headers,
  160. bodyProducer,
  161. parsedURI.originForm,
  162. )