@@ -0,0 +1 @@ | |||
Remove the `worker_main_http_uri` configuration setting. This is now handled via internal replication. |
@@ -213,10 +213,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { | |||
"listener_resources": ["client", "replication"], | |||
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], | |||
"shared_extra_conf": {}, | |||
"worker_extra_conf": ( | |||
"worker_main_http_uri: http://127.0.0.1:%d" | |||
% (MAIN_PROCESS_HTTP_LISTENER_PORT,) | |||
), | |||
"worker_extra_conf": "", | |||
}, | |||
"account_data": { | |||
"app": "synapse.app.generic_worker", | |||
@@ -135,8 +135,8 @@ In the config file for each worker, you must specify: | |||
[`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)). | |||
* If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option | |||
with an `http` listener. | |||
* If handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for | |||
the main process (`worker_main_http_uri`). | |||
* **Synapse 1.71 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for | |||
the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.72 and newer. | |||
For example: | |||
@@ -221,7 +221,6 @@ information. | |||
^/_matrix/client/(api/v1|r0|v3|unstable)/search$ | |||
# Encryption requests | |||
# Note that ^/_matrix/client/(r0|v3|unstable)/keys/upload/ requires `worker_main_http_uri` | |||
^/_matrix/client/(r0|v3|unstable)/keys/query$ | |||
^/_matrix/client/(r0|v3|unstable)/keys/changes$ | |||
^/_matrix/client/(r0|v3|unstable)/keys/claim$ | |||
@@ -376,7 +375,7 @@ responsible for | |||
- persisting them to the DB, and finally | |||
- updating the events stream. | |||
Because load is sharded in this way, you *must* restart all worker instances when | |||
Because load is sharded in this way, you *must* restart all worker instances when | |||
adding or removing event persisters. | |||
An `event_persister` should not be mistaken for an `event_creator`. | |||
@@ -14,14 +14,12 @@ | |||
# limitations under the License. | |||
import logging | |||
import sys | |||
from typing import Dict, List, Optional, Tuple | |||
from typing import Dict, List | |||
from twisted.internet import address | |||
from twisted.web.resource import Resource | |||
import synapse | |||
import synapse.events | |||
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError | |||
from synapse.api.urls import ( | |||
CLIENT_API_PREFIX, | |||
FEDERATION_PREFIX, | |||
@@ -43,8 +41,6 @@ from synapse.config.logger import setup_logging | |||
from synapse.config.server import ListenerConfig | |||
from synapse.federation.transport.server import TransportLayerServer | |||
from synapse.http.server import JsonResource, OptionsResource | |||
from synapse.http.servlet import RestServlet, parse_json_object_from_request | |||
from synapse.http.site import SynapseRequest | |||
from synapse.logging.context import LoggingContext | |||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy | |||
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource | |||
@@ -70,12 +66,12 @@ from synapse.rest.client import ( | |||
versions, | |||
voip, | |||
) | |||
from synapse.rest.client._base import client_patterns | |||
from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet | |||
from synapse.rest.client.devices import DevicesRestServlet | |||
from synapse.rest.client.keys import ( | |||
KeyChangesServlet, | |||
KeyQueryServlet, | |||
KeyUploadServlet, | |||
OneTimeKeyServlet, | |||
) | |||
from synapse.rest.client.register import ( | |||
@@ -132,107 +128,12 @@ from synapse.storage.databases.main.transactions import TransactionWorkerStore | |||
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore | |||
from synapse.storage.databases.main.user_directory import UserDirectoryStore | |||
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore | |||
from synapse.types import JsonDict | |||
from synapse.util import SYNAPSE_VERSION | |||
from synapse.util.httpresourcetree import create_resource_tree | |||
logger = logging.getLogger("synapse.app.generic_worker") | |||
class KeyUploadServlet(RestServlet): | |||
"""An implementation of the `KeyUploadServlet` that responds to read only | |||
requests, but otherwise proxies through to the master instance. | |||
""" | |||
PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$") | |||
def __init__(self, hs: HomeServer): | |||
""" | |||
Args: | |||
hs: server | |||
""" | |||
super().__init__() | |||
self.auth = hs.get_auth() | |||
self.store = hs.get_datastores().main | |||
self.http_client = hs.get_simple_http_client() | |||
self.main_uri = hs.config.worker.worker_main_http_uri | |||
async def on_POST( | |||
self, request: SynapseRequest, device_id: Optional[str] | |||
) -> Tuple[int, JsonDict]: | |||
requester = await self.auth.get_user_by_req(request, allow_guest=True) | |||
user_id = requester.user.to_string() | |||
body = parse_json_object_from_request(request) | |||
if device_id is not None: | |||
# passing the device_id here is deprecated; however, we allow it | |||
# for now for compatibility with older clients. | |||
if requester.device_id is not None and device_id != requester.device_id: | |||
logger.warning( | |||
"Client uploading keys for a different device " | |||
"(logged in as %s, uploading for %s)", | |||
requester.device_id, | |||
device_id, | |||
) | |||
else: | |||
device_id = requester.device_id | |||
if device_id is None: | |||
raise SynapseError( | |||
400, "To upload keys, you must pass device_id when authenticating" | |||
) | |||
if body: | |||
# They're actually trying to upload something, proxy to main synapse. | |||
# Proxy headers from the original request, such as the auth headers | |||
# (in case the access token is there) and the original IP / | |||
# User-Agent of the request. | |||
headers: Dict[bytes, List[bytes]] = { | |||
header: list(request.requestHeaders.getRawHeaders(header, [])) | |||
for header in (b"Authorization", b"User-Agent") | |||
} | |||
# Add the previous hop to the X-Forwarded-For header. | |||
x_forwarded_for = list( | |||
request.requestHeaders.getRawHeaders(b"X-Forwarded-For", []) | |||
) | |||
# we use request.client here, since we want the previous hop, not the | |||
# original client (as returned by request.getClientAddress()). | |||
if isinstance(request.client, (address.IPv4Address, address.IPv6Address)): | |||
previous_host = request.client.host.encode("ascii") | |||
# If the header exists, add to the comma-separated list of the first | |||
# instance of the header. Otherwise, generate a new header. | |||
if x_forwarded_for: | |||
x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host] | |||
x_forwarded_for.extend(x_forwarded_for[1:]) | |||
else: | |||
x_forwarded_for = [previous_host] | |||
headers[b"X-Forwarded-For"] = x_forwarded_for | |||
# Replicate the original X-Forwarded-Proto header. Note that | |||
# XForwardedForRequest overrides isSecure() to give us the original protocol | |||
# used by the client, as opposed to the protocol used by our upstream proxy | |||
# - which is what we want here. | |||
headers[b"X-Forwarded-Proto"] = [ | |||
b"https" if request.isSecure() else b"http" | |||
] | |||
try: | |||
result = await self.http_client.post_json_get_json( | |||
self.main_uri + request.uri.decode("ascii"), body, headers=headers | |||
) | |||
except HttpResponseException as e: | |||
raise e.to_synapse_error() from e | |||
except RequestSendFailed as e: | |||
raise SynapseError(502, "Failed to talk to master") from e | |||
return 200, result | |||
else: | |||
# Just interested in counts. | |||
result = await self.store.count_e2e_one_time_keys(user_id, device_id) | |||
return 200, {"one_time_key_counts": result} | |||
class GenericWorkerSlavedStore( | |||
# FIXME(#3714): We need to add UserDirectoryStore as we write directly | |||
# rather than going via the correct worker. | |||
@@ -162,7 +162,13 @@ class WorkerConfig(Config): | |||
self.worker_name = config.get("worker_name", self.worker_app) | |||
self.instance_name = self.worker_name or "master" | |||
# FIXME: Remove this check after a suitable amount of time. | |||
self.worker_main_http_uri = config.get("worker_main_http_uri", None) | |||
if self.worker_main_http_uri is not None: | |||
logger.warning( | |||
"The config option worker_main_http_uri is unused since Synapse 1.72. " | |||
"It can be safely removed from your configuration." | |||
) | |||
# This option is really only here to support `--manhole` command line | |||
# argument. | |||
@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Tuple | |||
from twisted.web.server import Request | |||
from synapse.http.server import HttpServer | |||
from synapse.http.servlet import parse_json_object_from_request | |||
from synapse.replication.http._base import ReplicationEndpoint | |||
from synapse.types import JsonDict | |||
@@ -78,5 +79,71 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): | |||
return 200, user_devices | |||
class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint): | |||
"""Ask master to upload keys for the user and send them out over federation to | |||
update other servers. | |||
For now, only the master is permitted to handle key upload requests; | |||
any worker can handle key query requests (since they're read-only). | |||
Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on | |||
the main process to accomplish this. | |||
Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload | |||
Request format(borrowed and expanded from KeyUploadServlet): | |||
POST /_synapse/replication/upload_keys_for_user | |||
{ | |||
"user_id": "<user_id>", | |||
"device_id": "<device_id>", | |||
"keys": { | |||
....this part can be found in KeyUploadServlet in rest/client/keys.py.... | |||
} | |||
} | |||
Response is equivalent to ` /_matrix/client/v3/keys/upload` found in KeyUploadServlet | |||
""" | |||
NAME = "upload_keys_for_user" | |||
PATH_ARGS = () | |||
CACHE = False | |||
def __init__(self, hs: "HomeServer"): | |||
super().__init__(hs) | |||
self.e2e_keys_handler = hs.get_e2e_keys_handler() | |||
self.store = hs.get_datastores().main | |||
self.clock = hs.get_clock() | |||
@staticmethod | |||
async def _serialize_payload( # type: ignore[override] | |||
user_id: str, device_id: str, keys: JsonDict | |||
) -> JsonDict: | |||
return { | |||
"user_id": user_id, | |||
"device_id": device_id, | |||
"keys": keys, | |||
} | |||
async def _handle_request( # type: ignore[override] | |||
self, request: Request | |||
) -> Tuple[int, JsonDict]: | |||
content = parse_json_object_from_request(request) | |||
user_id = content["user_id"] | |||
device_id = content["device_id"] | |||
keys = content["keys"] | |||
results = await self.e2e_keys_handler.upload_keys_for_user( | |||
user_id, device_id, keys | |||
) | |||
return 200, results | |||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: | |||
ReplicationUserDevicesResyncRestServlet(hs).register(http_server) | |||
ReplicationUploadKeysForUserRestServlet(hs).register(http_server) |
@@ -27,6 +27,7 @@ from synapse.http.servlet import ( | |||
) | |||
from synapse.http.site import SynapseRequest | |||
from synapse.logging.opentracing import log_kv, set_tag | |||
from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet | |||
from synapse.rest.client._base import client_patterns, interactive_auth_handler | |||
from synapse.types import JsonDict, StreamToken | |||
from synapse.util.cancellation import cancellable | |||
@@ -43,24 +44,48 @@ class KeyUploadServlet(RestServlet): | |||
Content-Type: application/json | |||
{ | |||
"device_keys": { | |||
"user_id": "<user_id>", | |||
"device_id": "<device_id>", | |||
"valid_until_ts": <millisecond_timestamp>, | |||
"algorithms": [ | |||
"m.olm.curve25519-aes-sha2", | |||
] | |||
"keys": { | |||
"<algorithm>:<device_id>": "<key_base64>", | |||
"device_keys": { | |||
"user_id": "<user_id>", | |||
"device_id": "<device_id>", | |||
"valid_until_ts": <millisecond_timestamp>, | |||
"algorithms": [ | |||
"m.olm.curve25519-aes-sha2", | |||
] | |||
"keys": { | |||
"<algorithm>:<device_id>": "<key_base64>", | |||
}, | |||
"signatures:" { | |||
"<user_id>" { | |||
"<algorithm>:<device_id>": "<signature_base64>" | |||
} | |||
} | |||
}, | |||
"fallback_keys": { | |||
"<algorithm>:<device_id>": "<key_base64>", | |||
"signed_<algorithm>:<device_id>": { | |||
"fallback": true, | |||
"key": "<key_base64>", | |||
"signatures": { | |||
"<user_id>": { | |||
"<algorithm>:<device_id>": "<key_base64>" | |||
} | |||
} | |||
} | |||
} | |||
"one_time_keys": { | |||
"<algorithm>:<key_id>": "<key_base64>" | |||
}, | |||
"signatures:" { | |||
"<user_id>" { | |||
"<algorithm>:<device_id>": "<signature_base64>" | |||
} } }, | |||
"one_time_keys": { | |||
"<algorithm>:<key_id>": "<key_base64>" | |||
}, | |||
} | |||
response, e.g.: | |||
{ | |||
"one_time_key_counts": { | |||
"curve25519": 10, | |||
"signed_curve25519": 20 | |||
} | |||
} | |||
""" | |||
PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$") | |||
@@ -71,6 +96,13 @@ class KeyUploadServlet(RestServlet): | |||
self.e2e_keys_handler = hs.get_e2e_keys_handler() | |||
self.device_handler = hs.get_device_handler() | |||
if hs.config.worker.worker_app is None: | |||
# if main process | |||
self.key_uploader = self.e2e_keys_handler.upload_keys_for_user | |||
else: | |||
# then a worker | |||
self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs) | |||
async def on_POST( | |||
self, request: SynapseRequest, device_id: Optional[str] | |||
) -> Tuple[int, JsonDict]: | |||
@@ -109,8 +141,8 @@ class KeyUploadServlet(RestServlet): | |||
400, "To upload keys, you must pass device_id when authenticating" | |||
) | |||
result = await self.e2e_keys_handler.upload_keys_for_user( | |||
user_id, device_id, body | |||
result = await self.key_uploader( | |||
user_id=user_id, device_id=device_id, keys=body | |||
) | |||
return 200, result | |||