This adds a module API which allows a module to update a user's presence state/status message. This is useful for controlling presence from an external system. To fully control presence from the module the presence.enabled config parameter gains a new state of "untracked" which disables internal tracking of presence changes via user actions, etc. Only updates from the module will be persisted and sent down sync properly).tags/v1.96.0rc1
@@ -0,0 +1 @@ | |||
Add a new module API for controller presence. |
@@ -230,6 +230,13 @@ Example configuration: | |||
presence: | |||
enabled: false | |||
``` | |||
`enabled` can also be set to a special value of "untracked" which ignores updates | |||
received via clients and federation, while still accepting updates from the | |||
[module API](../../modules/index.md). | |||
*The "untracked" option was added in Synapse 1.96.0.* | |||
--- | |||
### `require_auth_for_profile_requests` | |||
@@ -368,9 +368,14 @@ class ServerConfig(Config): | |||
# Whether to enable user presence. | |||
presence_config = config.get("presence") or {} | |||
self.use_presence = presence_config.get("enabled") | |||
if self.use_presence is None: | |||
self.use_presence = config.get("use_presence", True) | |||
presence_enabled = presence_config.get("enabled") | |||
if presence_enabled is None: | |||
presence_enabled = config.get("use_presence", True) | |||
# Whether presence is enabled *at all*. | |||
self.presence_enabled = bool(presence_enabled) | |||
# Whether to internally track presence, requires that presence is enabled, | |||
self.track_presence = self.presence_enabled and presence_enabled != "untracked" | |||
# Custom presence router module | |||
# This is the legacy way of configuring it (the config should now be put in the modules section) | |||
@@ -1395,7 +1395,7 @@ class FederationHandlerRegistry: | |||
self._edu_type_to_instance[edu_type] = instance_names | |||
async def on_edu(self, edu_type: str, origin: str, content: dict) -> None: | |||
if not self.config.server.use_presence and edu_type == EduTypes.PRESENCE: | |||
if not self.config.server.track_presence and edu_type == EduTypes.PRESENCE: | |||
return | |||
# Check if we have a handler on this instance | |||
@@ -844,7 +844,7 @@ class FederationSender(AbstractFederationSender): | |||
destinations (list[str]) | |||
""" | |||
if not states or not self.hs.config.server.use_presence: | |||
if not states or not self.hs.config.server.track_presence: | |||
# No-op if presence is disabled. | |||
return | |||
@@ -439,7 +439,7 @@ class InitialSyncHandler: | |||
async def get_presence() -> List[JsonDict]: | |||
# If presence is disabled, return an empty list | |||
if not self.hs.config.server.use_presence: | |||
if not self.hs.config.server.presence_enabled: | |||
return [] | |||
states = await presence_handler.get_states( | |||
@@ -192,7 +192,8 @@ class BasePresenceHandler(abc.ABC): | |||
self.state = hs.get_state_handler() | |||
self.is_mine_id = hs.is_mine_id | |||
self._presence_enabled = hs.config.server.use_presence | |||
self._presence_enabled = hs.config.server.presence_enabled | |||
self._track_presence = hs.config.server.track_presence | |||
self._federation = None | |||
if hs.should_send_federation(): | |||
@@ -512,7 +513,7 @@ class WorkerPresenceHandler(BasePresenceHandler): | |||
) | |||
async def _on_shutdown(self) -> None: | |||
if self._presence_enabled: | |||
if self._track_presence: | |||
self.hs.get_replication_command_handler().send_command( | |||
ClearUserSyncsCommand(self.instance_id) | |||
) | |||
@@ -524,7 +525,7 @@ class WorkerPresenceHandler(BasePresenceHandler): | |||
is_syncing: bool, | |||
last_sync_ms: int, | |||
) -> None: | |||
if self._presence_enabled: | |||
if self._track_presence: | |||
self.hs.get_replication_command_handler().send_user_sync( | |||
self.instance_id, user_id, device_id, is_syncing, last_sync_ms | |||
) | |||
@@ -571,7 +572,7 @@ class WorkerPresenceHandler(BasePresenceHandler): | |||
Called by the sync and events servlets to record that a user has connected to | |||
this worker and is waiting for some events. | |||
""" | |||
if not affect_presence or not self._presence_enabled: | |||
if not affect_presence or not self._track_presence: | |||
return _NullContextManager() | |||
# Note that this causes last_active_ts to be incremented which is not | |||
@@ -702,8 +703,8 @@ class WorkerPresenceHandler(BasePresenceHandler): | |||
user_id = target_user.to_string() | |||
# If presence is disabled, no-op | |||
if not self._presence_enabled: | |||
# If tracking of presence is disabled, no-op | |||
if not self._track_presence: | |||
return | |||
# Proxy request to instance that writes presence | |||
@@ -723,7 +724,7 @@ class WorkerPresenceHandler(BasePresenceHandler): | |||
with the app. | |||
""" | |||
# If presence is disabled, no-op | |||
if not self._presence_enabled: | |||
if not self._track_presence: | |||
return | |||
# Proxy request to instance that writes presence | |||
@@ -760,7 +761,7 @@ class PresenceHandler(BasePresenceHandler): | |||
] = {} | |||
now = self.clock.time_msec() | |||
if self._presence_enabled: | |||
if self._track_presence: | |||
for state in self.user_to_current_state.values(): | |||
# Create a psuedo-device to properly handle time outs. This will | |||
# be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT. | |||
@@ -831,7 +832,7 @@ class PresenceHandler(BasePresenceHandler): | |||
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") | |||
if self._presence_enabled: | |||
if self._track_presence: | |||
# Start a LoopingCall in 30s that fires every 5s. | |||
# The initial delay is to allow disconnected clients a chance to | |||
# reconnect before we treat them as offline. | |||
@@ -839,6 +840,9 @@ class PresenceHandler(BasePresenceHandler): | |||
30, self.clock.looping_call, self._handle_timeouts, 5000 | |||
) | |||
# Presence information is persisted, whether or not it is being tracked | |||
# internally. | |||
if self._presence_enabled: | |||
self.clock.call_later( | |||
60, | |||
self.clock.looping_call, | |||
@@ -854,7 +858,7 @@ class PresenceHandler(BasePresenceHandler): | |||
) | |||
# Used to handle sending of presence to newly joined users/servers | |||
if self._presence_enabled: | |||
if self._track_presence: | |||
self.notifier.add_replication_callback(self.notify_new_event) | |||
# Presence is best effort and quickly heals itself, so lets just always | |||
@@ -905,7 +909,9 @@ class PresenceHandler(BasePresenceHandler): | |||
) | |||
async def _update_states( | |||
self, new_states: Iterable[UserPresenceState], force_notify: bool = False | |||
self, | |||
new_states: Iterable[UserPresenceState], | |||
force_notify: bool = False, | |||
) -> None: | |||
"""Updates presence of users. Sets the appropriate timeouts. Pokes | |||
the notifier and federation if and only if the changed presence state | |||
@@ -943,7 +949,7 @@ class PresenceHandler(BasePresenceHandler): | |||
for new_state in new_states: | |||
user_id = new_state.user_id | |||
# Its fine to not hit the database here, as the only thing not in | |||
# It's fine to not hit the database here, as the only thing not in | |||
# the current state cache are OFFLINE states, where the only field | |||
# of interest is last_active which is safe enough to assume is 0 | |||
# here. | |||
@@ -957,6 +963,9 @@ class PresenceHandler(BasePresenceHandler): | |||
is_mine=self.is_mine_id(user_id), | |||
wheel_timer=self.wheel_timer, | |||
now=now, | |||
# When overriding disabled presence, don't kick off all the | |||
# wheel timers. | |||
persist=not self._track_presence, | |||
) | |||
if force_notify: | |||
@@ -1072,7 +1081,7 @@ class PresenceHandler(BasePresenceHandler): | |||
with the app. | |||
""" | |||
# If presence is disabled, no-op | |||
if not self._presence_enabled: | |||
if not self._track_presence: | |||
return | |||
user_id = user.to_string() | |||
@@ -1124,7 +1133,7 @@ class PresenceHandler(BasePresenceHandler): | |||
client that is being used by a user. | |||
presence_state: The presence state indicated in the sync request | |||
""" | |||
if not affect_presence or not self._presence_enabled: | |||
if not affect_presence or not self._track_presence: | |||
return _NullContextManager() | |||
curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0) | |||
@@ -1284,7 +1293,7 @@ class PresenceHandler(BasePresenceHandler): | |||
async def incoming_presence(self, origin: str, content: JsonDict) -> None: | |||
"""Called when we receive a `m.presence` EDU from a remote server.""" | |||
if not self._presence_enabled: | |||
if not self._track_presence: | |||
return | |||
now = self.clock.time_msec() | |||
@@ -1359,7 +1368,7 @@ class PresenceHandler(BasePresenceHandler): | |||
raise SynapseError(400, "Invalid presence state") | |||
# If presence is disabled, no-op | |||
if not self._presence_enabled: | |||
if not self._track_presence: | |||
return | |||
user_id = target_user.to_string() | |||
@@ -2118,6 +2127,7 @@ def handle_update( | |||
is_mine: bool, | |||
wheel_timer: WheelTimer, | |||
now: int, | |||
persist: bool, | |||
) -> Tuple[UserPresenceState, bool, bool]: | |||
"""Given a presence update: | |||
1. Add any appropriate timers. | |||
@@ -2129,6 +2139,8 @@ def handle_update( | |||
is_mine: Whether the user is ours | |||
wheel_timer | |||
now: Time now in ms | |||
persist: True if this state should persist until another update occurs. | |||
Skips insertion into wheel timers. | |||
Returns: | |||
3-tuple: `(new_state, persist_and_notify, federation_ping)` where: | |||
@@ -2146,14 +2158,15 @@ def handle_update( | |||
if is_mine: | |||
if new_state.state == PresenceState.ONLINE: | |||
# Idle timer | |||
wheel_timer.insert( | |||
now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER | |||
) | |||
if not persist: | |||
wheel_timer.insert( | |||
now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER | |||
) | |||
active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY | |||
new_state = new_state.copy_and_replace(currently_active=active) | |||
if active: | |||
if active and not persist: | |||
wheel_timer.insert( | |||
now=now, | |||
obj=user_id, | |||
@@ -2162,11 +2175,12 @@ def handle_update( | |||
if new_state.state != PresenceState.OFFLINE: | |||
# User has stopped syncing | |||
wheel_timer.insert( | |||
now=now, | |||
obj=user_id, | |||
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, | |||
) | |||
if not persist: | |||
wheel_timer.insert( | |||
now=now, | |||
obj=user_id, | |||
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, | |||
) | |||
last_federate = new_state.last_federation_update_ts | |||
if now - last_federate > FEDERATION_PING_INTERVAL: | |||
@@ -2174,7 +2188,7 @@ def handle_update( | |||
new_state = new_state.copy_and_replace(last_federation_update_ts=now) | |||
federation_ping = True | |||
if new_state.state == PresenceState.BUSY: | |||
if new_state.state == PresenceState.BUSY and not persist: | |||
wheel_timer.insert( | |||
now=now, | |||
obj=user_id, | |||
@@ -2182,11 +2196,13 @@ def handle_update( | |||
) | |||
else: | |||
wheel_timer.insert( | |||
now=now, | |||
obj=user_id, | |||
then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT, | |||
) | |||
# An update for a remote user was received. | |||
if not persist: | |||
wheel_timer.insert( | |||
now=now, | |||
obj=user_id, | |||
then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT, | |||
) | |||
# Check whether the change was something worth notifying about | |||
if should_notify(prev_state, new_state, is_mine): | |||
@@ -1517,7 +1517,7 @@ class SyncHandler: | |||
# Presence data is included if the server has it enabled and not filtered out. | |||
include_presence_data = bool( | |||
self.hs_config.server.use_presence | |||
self.hs_config.server.presence_enabled | |||
and not sync_config.filter_collection.blocks_all_presence() | |||
) | |||
# Device list updates are sent if a since token is provided. | |||
@@ -23,6 +23,7 @@ from typing import ( | |||
Generator, | |||
Iterable, | |||
List, | |||
Mapping, | |||
Optional, | |||
Tuple, | |||
TypeVar, | |||
@@ -39,6 +40,7 @@ from twisted.web.resource import Resource | |||
from synapse.api import errors | |||
from synapse.api.errors import SynapseError | |||
from synapse.api.presence import UserPresenceState | |||
from synapse.config import ConfigError | |||
from synapse.events import EventBase | |||
from synapse.events.presence_router import ( | |||
@@ -1184,6 +1186,37 @@ class ModuleApi: | |||
presence_events, [destination] | |||
) | |||
async def set_presence_for_users( | |||
self, users: Mapping[str, Tuple[str, Optional[str]]] | |||
) -> None: | |||
""" | |||
Update the internal presence state of users. | |||
This can be used for either local or remote users. | |||
Note that this method can only be run on the process that is configured to write to the | |||
presence stream. By default, this is the main process. | |||
Added in Synapse v1.96.0. | |||
""" | |||
# We pull out the presence handler here to break a cyclic | |||
# dependency between the presence router and module API. | |||
presence_handler = self._hs.get_presence_handler() | |||
from synapse.handlers.presence import PresenceHandler | |||
assert isinstance(presence_handler, PresenceHandler) | |||
states = await presence_handler.current_state_for_users(users.keys()) | |||
for user_id, (state, status_msg) in users.items(): | |||
prev_state = states.setdefault(user_id, UserPresenceState.default(user_id)) | |||
states[user_id] = prev_state.copy_and_replace( | |||
state=state, status_msg=status_msg | |||
) | |||
await presence_handler._update_states(states.values(), force_notify=True) | |||
def looping_background_call( | |||
self, | |||
f: Callable, | |||
@@ -42,15 +42,13 @@ class PresenceStatusRestServlet(RestServlet): | |||
self.clock = hs.get_clock() | |||
self.auth = hs.get_auth() | |||
self._use_presence = hs.config.server.use_presence | |||
async def on_GET( | |||
self, request: SynapseRequest, user_id: str | |||
) -> Tuple[int, JsonDict]: | |||
requester = await self.auth.get_user_by_req(request) | |||
user = UserID.from_string(user_id) | |||
if not self._use_presence: | |||
if not self.hs.config.server.presence_enabled: | |||
return 200, {"presence": "offline"} | |||
if requester.user != user: | |||
@@ -96,7 +94,7 @@ class PresenceStatusRestServlet(RestServlet): | |||
except Exception: | |||
raise SynapseError(400, "Unable to parse state") | |||
if self._use_presence: | |||
if self.hs.config.server.track_presence: | |||
await self.presence_handler.set_state(user, requester.device_id, state) | |||
return 200, {} | |||
@@ -11,7 +11,7 @@ | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import itertools | |||
from typing import Optional, cast | |||
from unittest.mock import Mock, call | |||
@@ -33,6 +33,7 @@ from synapse.handlers.presence import ( | |||
IDLE_TIMER, | |||
LAST_ACTIVE_GRANULARITY, | |||
SYNC_ONLINE_TIMEOUT, | |||
PresenceHandler, | |||
handle_timeout, | |||
handle_update, | |||
) | |||
@@ -66,7 +67,12 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase): | |||
) | |||
state, persist_and_notify, federation_ping = handle_update( | |||
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now | |||
prev_state, | |||
new_state, | |||
is_mine=True, | |||
wheel_timer=wheel_timer, | |||
now=now, | |||
persist=False, | |||
) | |||
self.assertTrue(persist_and_notify) | |||
@@ -108,7 +114,12 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase): | |||
) | |||
state, persist_and_notify, federation_ping = handle_update( | |||
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now | |||
prev_state, | |||
new_state, | |||
is_mine=True, | |||
wheel_timer=wheel_timer, | |||
now=now, | |||
persist=False, | |||
) | |||
self.assertFalse(persist_and_notify) | |||
@@ -153,7 +164,12 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase): | |||
) | |||
state, persist_and_notify, federation_ping = handle_update( | |||
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now | |||
prev_state, | |||
new_state, | |||
is_mine=True, | |||
wheel_timer=wheel_timer, | |||
now=now, | |||
persist=False, | |||
) | |||
self.assertFalse(persist_and_notify) | |||
@@ -196,7 +212,12 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase): | |||
new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE) | |||
state, persist_and_notify, federation_ping = handle_update( | |||
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now | |||
prev_state, | |||
new_state, | |||
is_mine=True, | |||
wheel_timer=wheel_timer, | |||
now=now, | |||
persist=False, | |||
) | |||
self.assertTrue(persist_and_notify) | |||
@@ -231,7 +252,12 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase): | |||
new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE) | |||
state, persist_and_notify, federation_ping = handle_update( | |||
prev_state, new_state, is_mine=False, wheel_timer=wheel_timer, now=now | |||
prev_state, | |||
new_state, | |||
is_mine=False, | |||
wheel_timer=wheel_timer, | |||
now=now, | |||
persist=False, | |||
) | |||
self.assertFalse(persist_and_notify) | |||
@@ -265,7 +291,12 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase): | |||
new_state = prev_state.copy_and_replace(state=PresenceState.OFFLINE) | |||
state, persist_and_notify, federation_ping = handle_update( | |||
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now | |||
prev_state, | |||
new_state, | |||
is_mine=True, | |||
wheel_timer=wheel_timer, | |||
now=now, | |||
persist=False, | |||
) | |||
self.assertTrue(persist_and_notify) | |||
@@ -287,7 +318,12 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase): | |||
new_state = prev_state.copy_and_replace(state=PresenceState.UNAVAILABLE) | |||
state, persist_and_notify, federation_ping = handle_update( | |||
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now | |||
prev_state, | |||
new_state, | |||
is_mine=True, | |||
wheel_timer=wheel_timer, | |||
now=now, | |||
persist=False, | |||
) | |||
self.assertTrue(persist_and_notify) | |||
@@ -347,6 +383,41 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase): | |||
# They should be identical. | |||
self.assertEqual(presence_states_compare, db_presence_states) | |||
@parameterized.expand( | |||
itertools.permutations( | |||
( | |||
PresenceState.BUSY, | |||
PresenceState.ONLINE, | |||
PresenceState.UNAVAILABLE, | |||
PresenceState.OFFLINE, | |||
), | |||
2, | |||
) | |||
) | |||
def test_override(self, initial_state: str, final_state: str) -> None: | |||
"""Overridden statuses should not go into the wheel timer.""" | |||
wheel_timer = Mock() | |||
user_id = "@foo:bar" | |||
now = 5000000 | |||
prev_state = UserPresenceState.default(user_id) | |||
prev_state = prev_state.copy_and_replace( | |||
state=initial_state, last_active_ts=now, currently_active=True | |||
) | |||
new_state = prev_state.copy_and_replace(state=final_state, last_active_ts=now) | |||
handle_update( | |||
prev_state, | |||
new_state, | |||
is_mine=True, | |||
wheel_timer=wheel_timer, | |||
now=now, | |||
persist=True, | |||
) | |||
wheel_timer.insert.assert_not_called() | |||
class PresenceTimeoutTestCase(unittest.TestCase): | |||
"""Tests different timers and that the timer does not change `status_msg` of user.""" | |||
@@ -738,7 +809,6 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): | |||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: | |||
self.presence_handler = hs.get_presence_handler() | |||
self.clock = hs.get_clock() | |||
def test_external_process_timeout(self) -> None: | |||
"""Test that if an external process doesn't update the records for a while | |||
@@ -1471,6 +1541,29 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): | |||
self.assertEqual(new_state.state, state) | |||
self.assertEqual(new_state.status_msg, status_msg) | |||
@unittest.override_config({"presence": {"enabled": "untracked"}}) | |||
def test_untracked_does_not_idle(self) -> None: | |||
"""Untracked presence should not idle.""" | |||
# Mark user as online, this needs to reach into internals in order to | |||
# bypass checks. | |||
state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) | |||
assert isinstance(self.presence_handler, PresenceHandler) | |||
self.get_success( | |||
self.presence_handler._update_states( | |||
[state.copy_and_replace(state=PresenceState.ONLINE)] | |||
) | |||
) | |||
# Ensure the update took. | |||
state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) | |||
self.assertEqual(state.state, PresenceState.ONLINE) | |||
# The timeout should not fire and the state should be the same. | |||
self.reactor.advance(SYNC_ONLINE_TIMEOUT) | |||
state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) | |||
self.assertEqual(state.state, PresenceState.ONLINE) | |||
class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): | |||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: | |||
@@ -50,7 +50,7 @@ class PresenceTestCase(unittest.HomeserverTestCase): | |||
PUT to the status endpoint with use_presence enabled will call | |||
set_state on the presence handler. | |||
""" | |||
self.hs.config.server.use_presence = True | |||
self.hs.config.server.presence_enabled = True | |||
body = {"presence": "here", "status_msg": "beep boop"} | |||
channel = self.make_request( | |||
@@ -63,7 +63,22 @@ class PresenceTestCase(unittest.HomeserverTestCase): | |||
@unittest.override_config({"use_presence": False}) | |||
def test_put_presence_disabled(self) -> None: | |||
""" | |||
PUT to the status endpoint with use_presence disabled will NOT call | |||
PUT to the status endpoint with presence disabled will NOT call | |||
set_state on the presence handler. | |||
""" | |||
body = {"presence": "here", "status_msg": "beep boop"} | |||
channel = self.make_request( | |||
"PUT", "/presence/%s/status" % (self.user_id,), body | |||
) | |||
self.assertEqual(channel.code, HTTPStatus.OK) | |||
self.assertEqual(self.presence_handler.set_state.call_count, 0) | |||
@unittest.override_config({"presence": {"enabled": "untracked"}}) | |||
def test_put_presence_untracked(self) -> None: | |||
""" | |||
PUT to the status endpoint with presence untracked will NOT call | |||
set_state on the presence handler. | |||
""" | |||