@@ -0,0 +1 @@ | |||
Synapse's presence functionality can now be disabled with the "use_presence" configuration option. |
@@ -241,6 +241,14 @@ regular expressions:: | |||
^/_matrix/client/(api/v1|r0|unstable)/keys/upload | |||
If ``use_presence`` is False in the homeserver config, it can also handle REST | |||
endpoints matching the following regular expressions:: | |||
^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status | |||
This "stub" presence handler will pass through ``GET`` request but make the | |||
``PUT`` effectively a no-op. | |||
It will proxy any requests it cannot handle to the main synapse instance. It | |||
must therefore be configured with the location of the main instance, via | |||
the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration | |||
@@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port): | |||
logger.info("Metrics now reporting on %s:%d", host, port) | |||
def listen_tcp(bind_addresses, port, factory, backlog=50): | |||
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): | |||
""" | |||
Create a TCP socket for a port and several addresses | |||
""" | |||
@@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50): | |||
check_bind_error(e, address, bind_addresses) | |||
def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50): | |||
def listen_ssl( | |||
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50 | |||
): | |||
""" | |||
Create an SSL socket for a port and several addresses | |||
""" | |||
@@ -38,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore | |||
from synapse.replication.slave.storage.devices import SlavedDeviceStore | |||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore | |||
from synapse.replication.tcp.client import ReplicationClientHandler | |||
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns | |||
from synapse.rest.client.v2_alpha._base import client_v2_patterns | |||
from synapse.server import HomeServer | |||
from synapse.storage.engines import create_engine | |||
@@ -49,6 +50,35 @@ from synapse.util.versionstring import get_version_string | |||
logger = logging.getLogger("synapse.app.frontend_proxy") | |||
class PresenceStatusStubServlet(ClientV1RestServlet): | |||
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status") | |||
def __init__(self, hs): | |||
super(PresenceStatusStubServlet, self).__init__(hs) | |||
self.http_client = hs.get_simple_http_client() | |||
self.auth = hs.get_auth() | |||
self.main_uri = hs.config.worker_main_http_uri | |||
@defer.inlineCallbacks | |||
def on_GET(self, request, user_id): | |||
# Pass through the auth headers, if any, in case the access token | |||
# is there. | |||
auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) | |||
headers = { | |||
"Authorization": auth_headers, | |||
} | |||
result = yield self.http_client.get_json( | |||
self.main_uri + request.uri, | |||
headers=headers, | |||
) | |||
defer.returnValue((200, result)) | |||
@defer.inlineCallbacks | |||
def on_PUT(self, request, user_id): | |||
yield self.auth.get_user_by_req(request) | |||
defer.returnValue((200, {})) | |||
class KeyUploadServlet(RestServlet): | |||
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$") | |||
@@ -135,6 +165,12 @@ class FrontendProxyServer(HomeServer): | |||
elif name == "client": | |||
resource = JsonResource(self, canonical_json=False) | |||
KeyUploadServlet(self).register(resource) | |||
# If presence is disabled, use the stub servlet that does | |||
# not allow sending presence | |||
if not self.config.use_presence: | |||
PresenceStatusStubServlet(self).register(resource) | |||
resources.update({ | |||
"/_matrix/client/r0": resource, | |||
"/_matrix/client/unstable": resource, | |||
@@ -153,7 +189,8 @@ class FrontendProxyServer(HomeServer): | |||
listener_config, | |||
root_resource, | |||
self.version_string, | |||
) | |||
), | |||
reactor=self.get_reactor() | |||
) | |||
logger.info("Synapse client reader now listening on port %d", port) | |||
@@ -114,7 +114,10 @@ class SynchrotronPresence(object): | |||
logger.info("Presence process_id is %r", self.process_id) | |||
def send_user_sync(self, user_id, is_syncing, last_sync_ms): | |||
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) | |||
if self.hs.config.use_presence: | |||
self.hs.get_tcp_replication().send_user_sync( | |||
user_id, is_syncing, last_sync_ms | |||
) | |||
def mark_as_coming_online(self, user_id): | |||
"""A user has started syncing. Send a UserSync to the master, unless they | |||
@@ -211,10 +214,13 @@ class SynchrotronPresence(object): | |||
yield self.notify_from_replication(states, stream_id) | |||
def get_currently_syncing_users(self): | |||
return [ | |||
user_id for user_id, count in iteritems(self.user_to_num_current_syncs) | |||
if count > 0 | |||
] | |||
if self.hs.config.use_presence: | |||
return [ | |||
user_id for user_id, count in iteritems(self.user_to_num_current_syncs) | |||
if count > 0 | |||
] | |||
else: | |||
return set() | |||
class SynchrotronTyping(object): | |||
@@ -49,6 +49,9 @@ class ServerConfig(Config): | |||
# "disable" federation | |||
self.send_federation = config.get("send_federation", True) | |||
# Whether to enable user presence. | |||
self.use_presence = config.get("use_presence", True) | |||
# Whether to update the user directory or not. This should be set to | |||
# false only if we are updating the user directory in a worker | |||
self.update_user_directory = config.get("update_user_directory", True) | |||
@@ -250,6 +253,9 @@ class ServerConfig(Config): | |||
# hard limit. | |||
soft_file_limit: 0 | |||
# Set to false to disable presence tracking on this homeserver. | |||
use_presence: true | |||
# The GC threshold parameters to pass to `gc.set_threshold`, if defined | |||
# gc_thresholds: [700, 10, 10] | |||
@@ -58,6 +58,7 @@ class TransactionQueue(object): | |||
""" | |||
def __init__(self, hs): | |||
self.hs = hs | |||
self.server_name = hs.hostname | |||
self.store = hs.get_datastore() | |||
@@ -308,6 +309,9 @@ class TransactionQueue(object): | |||
Args: | |||
states (list(UserPresenceState)) | |||
""" | |||
if not self.hs.config.use_presence: | |||
# No-op if presence is disabled. | |||
return | |||
# First we queue up the new presence by user ID, so multiple presence | |||
# updates in quick successtion are correctly handled | |||
@@ -372,6 +372,10 @@ class InitialSyncHandler(BaseHandler): | |||
@defer.inlineCallbacks | |||
def get_presence(): | |||
# If presence is disabled, return an empty list | |||
if not self.hs.config.use_presence: | |||
defer.returnValue([]) | |||
states = yield presence_handler.get_states( | |||
[m.user_id for m in room_members], | |||
as_event=True, | |||
@@ -395,6 +395,10 @@ class PresenceHandler(object): | |||
"""We've seen the user do something that indicates they're interacting | |||
with the app. | |||
""" | |||
# If presence is disabled, no-op | |||
if not self.hs.config.use_presence: | |||
return | |||
user_id = user.to_string() | |||
bump_active_time_counter.inc() | |||
@@ -424,6 +428,11 @@ class PresenceHandler(object): | |||
Useful for streams that are not associated with an actual | |||
client that is being used by a user. | |||
""" | |||
# Override if it should affect the user's presence, if presence is | |||
# disabled. | |||
if not self.hs.config.use_presence: | |||
affect_presence = False | |||
if affect_presence: | |||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0) | |||
self.user_to_num_current_syncs[user_id] = curr_sync + 1 | |||
@@ -469,13 +478,16 @@ class PresenceHandler(object): | |||
Returns: | |||
set(str): A set of user_id strings. | |||
""" | |||
syncing_user_ids = { | |||
user_id for user_id, count in self.user_to_num_current_syncs.items() | |||
if count | |||
} | |||
for user_ids in self.external_process_to_current_syncs.values(): | |||
syncing_user_ids.update(user_ids) | |||
return syncing_user_ids | |||
if self.hs.config.use_presence: | |||
syncing_user_ids = { | |||
user_id for user_id, count in self.user_to_num_current_syncs.items() | |||
if count | |||
} | |||
for user_ids in self.external_process_to_current_syncs.values(): | |||
syncing_user_ids.update(user_ids) | |||
return syncing_user_ids | |||
else: | |||
return set() | |||
@defer.inlineCallbacks | |||
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): | |||
@@ -185,6 +185,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ | |||
class SyncHandler(object): | |||
def __init__(self, hs): | |||
self.hs_config = hs.config | |||
self.store = hs.get_datastore() | |||
self.notifier = hs.get_notifier() | |||
self.presence_handler = hs.get_presence_handler() | |||
@@ -860,7 +861,7 @@ class SyncHandler(object): | |||
since_token is None and | |||
sync_config.filter_collection.blocks_all_presence() | |||
) | |||
if not block_all_presence_data: | |||
if self.hs_config.use_presence and not block_all_presence_data: | |||
yield self._generate_sync_entry_for_presence( | |||
sync_result_builder, newly_joined_rooms, newly_joined_users | |||
) | |||
@@ -84,7 +84,8 @@ class PresenceStatusRestServlet(ClientV1RestServlet): | |||
except Exception: | |||
raise SynapseError(400, "Unable to parse state") | |||
yield self.presence_handler.set_state(user, state) | |||
if self.hs.config.use_presence: | |||
yield self.presence_handler.set_state(user, state) | |||
defer.returnValue((200, {})) | |||
@@ -0,0 +1,88 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2018 New Vector Ltd | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from synapse.app.frontend_proxy import FrontendProxyServer | |||
from tests.unittest import HomeserverTestCase | |||
class FrontendProxyTests(HomeserverTestCase): | |||
def make_homeserver(self, reactor, clock): | |||
hs = self.setup_test_homeserver( | |||
http_client=None, homeserverToUse=FrontendProxyServer | |||
) | |||
return hs | |||
def test_listen_http_with_presence_enabled(self): | |||
""" | |||
When presence is on, the stub servlet will not register. | |||
""" | |||
# Presence is on | |||
self.hs.config.use_presence = True | |||
config = { | |||
"port": 8080, | |||
"bind_addresses": ["0.0.0.0"], | |||
"resources": [{"names": ["client"]}], | |||
} | |||
# Listen with the config | |||
self.hs._listen_http(config) | |||
# Grab the resource from the site that was told to listen | |||
self.assertEqual(len(self.reactor.tcpServers), 1) | |||
site = self.reactor.tcpServers[0][1] | |||
self.resource = ( | |||
site.resource.children["_matrix"].children["client"].children["r0"] | |||
) | |||
request, channel = self.make_request("PUT", "presence/a/status") | |||
self.render(request) | |||
# 400 + unrecognised, because nothing is registered | |||
self.assertEqual(channel.code, 400) | |||
self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED") | |||
def test_listen_http_with_presence_disabled(self): | |||
""" | |||
When presence is on, the stub servlet will register. | |||
""" | |||
# Presence is off | |||
self.hs.config.use_presence = False | |||
config = { | |||
"port": 8080, | |||
"bind_addresses": ["0.0.0.0"], | |||
"resources": [{"names": ["client"]}], | |||
} | |||
# Listen with the config | |||
self.hs._listen_http(config) | |||
# Grab the resource from the site that was told to listen | |||
self.assertEqual(len(self.reactor.tcpServers), 1) | |||
site = self.reactor.tcpServers[0][1] | |||
self.resource = ( | |||
site.resource.children["_matrix"].children["client"].children["r0"] | |||
) | |||
request, channel = self.make_request("PUT", "presence/a/status") | |||
self.render(request) | |||
# 401, because the stub servlet still checks authentication | |||
self.assertEqual(channel.code, 401) | |||
self.assertEqual(channel.json_body["errcode"], "M_MISSING_TOKEN") |
@@ -0,0 +1,72 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2018 New Vector Ltd | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from mock import Mock | |||
from synapse.rest.client.v1 import presence | |||
from synapse.types import UserID | |||
from tests import unittest | |||
class PresenceTestCase(unittest.HomeserverTestCase): | |||
""" Tests presence REST API. """ | |||
user_id = "@sid:red" | |||
user = UserID.from_string(user_id) | |||
servlets = [presence.register_servlets] | |||
def make_homeserver(self, reactor, clock): | |||
hs = self.setup_test_homeserver( | |||
"red", http_client=None, federation_client=Mock() | |||
) | |||
hs.presence_handler = Mock() | |||
return hs | |||
def test_put_presence(self): | |||
""" | |||
PUT to the status endpoint with use_presence enabled will call | |||
set_state on the presence handler. | |||
""" | |||
self.hs.config.use_presence = True | |||
body = {"presence": "here", "status_msg": "beep boop"} | |||
request, channel = self.make_request( | |||
"PUT", "/presence/%s/status" % (self.user_id,), body | |||
) | |||
self.render(request) | |||
self.assertEqual(channel.code, 200) | |||
self.assertEqual(self.hs.presence_handler.set_state.call_count, 1) | |||
def test_put_presence_disabled(self): | |||
""" | |||
PUT to the status endpoint with use_presence disbled will NOT call | |||
set_state on the presence handler. | |||
""" | |||
self.hs.config.use_presence = False | |||
body = {"presence": "here", "status_msg": "beep boop"} | |||
request, channel = self.make_request( | |||
"PUT", "/presence/%s/status" % (self.user_id,), body | |||
) | |||
self.render(request) | |||
self.assertEqual(channel.code, 200) | |||
self.assertEqual(self.hs.presence_handler.set_state.call_count, 0) |
@@ -13,71 +13,58 @@ | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import synapse.types | |||
from synapse.http.server import JsonResource | |||
from mock import Mock | |||
from synapse.rest.client.v2_alpha import sync | |||
from synapse.types import UserID | |||
from synapse.util import Clock | |||
from tests import unittest | |||
from tests.server import ( | |||
ThreadedMemoryReactorClock as MemoryReactorClock, | |||
make_request, | |||
render, | |||
setup_test_homeserver, | |||
) | |||
PATH_PREFIX = "/_matrix/client/v2_alpha" | |||
class FilterTestCase(unittest.TestCase): | |||
class FilterTestCase(unittest.HomeserverTestCase): | |||
USER_ID = "@apple:test" | |||
TO_REGISTER = [sync] | |||
user_id = "@apple:test" | |||
servlets = [sync.register_servlets] | |||
def setUp(self): | |||
self.clock = MemoryReactorClock() | |||
self.hs_clock = Clock(self.clock) | |||
def make_homeserver(self, reactor, clock): | |||
self.hs = setup_test_homeserver( | |||
self.addCleanup, http_client=None, clock=self.hs_clock, reactor=self.clock | |||
hs = self.setup_test_homeserver( | |||
"red", http_client=None, federation_client=Mock() | |||
) | |||
return hs | |||
self.auth = self.hs.get_auth() | |||
def get_user_by_access_token(token=None, allow_guest=False): | |||
return { | |||
"user": UserID.from_string(self.USER_ID), | |||
"token_id": 1, | |||
"is_guest": False, | |||
} | |||
def get_user_by_req(request, allow_guest=False, rights="access"): | |||
return synapse.types.create_requester( | |||
UserID.from_string(self.USER_ID), 1, False, None | |||
) | |||
self.auth.get_user_by_access_token = get_user_by_access_token | |||
self.auth.get_user_by_req = get_user_by_req | |||
def test_sync_argless(self): | |||
request, channel = self.make_request("GET", "/sync") | |||
self.render(request) | |||
self.store = self.hs.get_datastore() | |||
self.filtering = self.hs.get_filtering() | |||
self.resource = JsonResource(self.hs) | |||
self.assertEqual(channel.code, 200) | |||
self.assertTrue( | |||
set( | |||
[ | |||
"next_batch", | |||
"rooms", | |||
"presence", | |||
"account_data", | |||
"to_device", | |||
"device_lists", | |||
] | |||
).issubset(set(channel.json_body.keys())) | |||
) | |||
for r in self.TO_REGISTER: | |||
r.register_servlets(self.hs, self.resource) | |||
def test_sync_presence_disabled(self): | |||
""" | |||
When presence is disabled, the key does not appear in /sync. | |||
""" | |||
self.hs.config.use_presence = False | |||
def test_sync_argless(self): | |||
request, channel = make_request("GET", "/_matrix/client/r0/sync") | |||
render(request, self.resource, self.clock) | |||
request, channel = self.make_request("GET", "/sync") | |||
self.render(request) | |||
self.assertEqual(channel.result["code"], b"200") | |||
self.assertEqual(channel.code, 200) | |||
self.assertTrue( | |||
set( | |||
[ | |||
"next_batch", | |||
"rooms", | |||
"presence", | |||
"account_data", | |||
"to_device", | |||
"device_lists", | |||
@@ -18,6 +18,8 @@ import logging | |||
from mock import Mock | |||
from canonicaljson import json | |||
import twisted | |||
import twisted.logger | |||
from twisted.trial import unittest | |||
@@ -241,11 +243,15 @@ class HomeserverTestCase(TestCase): | |||
method (bytes/unicode): The HTTP request method ("verb"). | |||
path (bytes/unicode): The HTTP path, suitably URL encoded (e.g. | |||
escaped UTF-8 & spaces and such). | |||
content (bytes): The body of the request. | |||
content (bytes or dict): The body of the request. JSON-encoded, if | |||
a dict. | |||
Returns: | |||
A synapse.http.site.SynapseRequest. | |||
""" | |||
if isinstance(content, dict): | |||
content = json.dumps(content).encode('utf8') | |||
return make_request(method, path, content) | |||
def render(self, request): | |||
@@ -93,7 +93,8 @@ def setupdb(): | |||
@defer.inlineCallbacks | |||
def setup_test_homeserver( | |||
cleanup_func, name="test", datastore=None, config=None, reactor=None, **kargs | |||
cleanup_func, name="test", datastore=None, config=None, reactor=None, | |||
homeserverToUse=HomeServer, **kargs | |||
): | |||
""" | |||
Setup a homeserver suitable for running tests against. Keyword arguments | |||
@@ -192,7 +193,7 @@ def setup_test_homeserver( | |||
config.database_config["args"]["cp_openfun"] = db_engine.on_new_connection | |||
if datastore is None: | |||
hs = HomeServer( | |||
hs = homeserverToUse( | |||
name, | |||
config=config, | |||
db_config=config.database_config, | |||
@@ -235,7 +236,7 @@ def setup_test_homeserver( | |||
hs.setup() | |||
else: | |||
hs = HomeServer( | |||
hs = homeserverToUse( | |||
name, | |||
db_pool=None, | |||
datastore=datastore, | |||