Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>tags/v1.61.0rc1
@@ -0,0 +1 @@ | |||||
Add a configurable background job to delete stale devices. |
@@ -575,6 +575,18 @@ Example configuration: | |||||
dummy_events_threshold: 5 | dummy_events_threshold: 5 | ||||
``` | ``` | ||||
--- | --- | ||||
Config option `delete_stale_devices_after` | |||||
An optional duration. If set, Synapse will run a daily background task to log out and | |||||
delete any device that hasn't been accessed for more than the specified amount of time. | |||||
Defaults to no duration, which means devices are never pruned. | |||||
Example configuration: | |||||
```yaml | |||||
delete_stale_devices_after: 1y | |||||
``` | |||||
## Homeserver blocking ## | ## Homeserver blocking ## | ||||
Useful options for Synapse admins. | Useful options for Synapse admins. | ||||
@@ -679,6 +679,17 @@ class ServerConfig(Config): | |||||
config.get("exclude_rooms_from_sync") or [] | config.get("exclude_rooms_from_sync") or [] | ||||
) | ) | ||||
delete_stale_devices_after: Optional[str] = ( | |||||
config.get("delete_stale_devices_after") or None | |||||
) | |||||
if delete_stale_devices_after is not None: | |||||
self.delete_stale_devices_after: Optional[int] = self.parse_duration( | |||||
delete_stale_devices_after | |||||
) | |||||
else: | |||||
self.delete_stale_devices_after = None | |||||
def has_tls_listener(self) -> bool: | def has_tls_listener(self) -> bool: | ||||
return any(listener.tls for listener in self.listeners) | return any(listener.tls for listener in self.listeners) | ||||
@@ -61,6 +61,7 @@ if TYPE_CHECKING: | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
MAX_DEVICE_DISPLAY_NAME_LEN = 100 | MAX_DEVICE_DISPLAY_NAME_LEN = 100 | ||||
DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000 | |||||
class DeviceWorkerHandler: | class DeviceWorkerHandler: | ||||
@@ -295,6 +296,19 @@ class DeviceHandler(DeviceWorkerHandler): | |||||
# On start up check if there are any updates pending. | # On start up check if there are any updates pending. | ||||
hs.get_reactor().callWhenRunning(self._handle_new_device_update_async) | hs.get_reactor().callWhenRunning(self._handle_new_device_update_async) | ||||
self._delete_stale_devices_after = hs.config.server.delete_stale_devices_after | |||||
# Ideally we would run this on a worker and condition this on the | |||||
# "run_background_tasks_on" setting, but this would mean making the notification | |||||
# of device list changes over federation work on workers, which is nontrivial. | |||||
if self._delete_stale_devices_after is not None: | |||||
self.clock.looping_call( | |||||
run_as_background_process, | |||||
DELETE_STALE_DEVICES_INTERVAL_MS, | |||||
"delete_stale_devices", | |||||
self._delete_stale_devices, | |||||
) | |||||
def _check_device_name_length(self, name: Optional[str]) -> None: | def _check_device_name_length(self, name: Optional[str]) -> None: | ||||
""" | """ | ||||
Checks whether a device name is longer than the maximum allowed length. | Checks whether a device name is longer than the maximum allowed length. | ||||
@@ -370,6 +384,19 @@ class DeviceHandler(DeviceWorkerHandler): | |||||
raise errors.StoreError(500, "Couldn't generate a device ID.") | raise errors.StoreError(500, "Couldn't generate a device ID.") | ||||
async def _delete_stale_devices(self) -> None: | |||||
"""Background task that deletes devices which haven't been accessed for more than | |||||
a configured time period. | |||||
""" | |||||
# We should only be running this job if the config option is defined. | |||||
assert self._delete_stale_devices_after is not None | |||||
now_ms = self.clock.time_msec() | |||||
since_ms = now_ms - self._delete_stale_devices_after | |||||
devices = await self.store.get_local_devices_not_accessed_since(since_ms) | |||||
for user_id, user_devices in devices.items(): | |||||
await self.delete_devices(user_id, user_devices) | |||||
@trace | @trace | ||||
async def delete_device(self, user_id: str, device_id: str) -> None: | async def delete_device(self, user_id: str, device_id: str) -> None: | ||||
"""Delete the given device | """Delete the given device | ||||
@@ -692,7 +719,8 @@ class DeviceHandler(DeviceWorkerHandler): | |||||
) | ) | ||||
# TODO: when called, this isn't in a logging context. | # TODO: when called, this isn't in a logging context. | ||||
# This leads to log spam, sentry event spam, and massive | # This leads to log spam, sentry event spam, and massive | ||||
# memory usage. See #12552. | |||||
# memory usage. | |||||
# See https://github.com/matrix-org/synapse/issues/12552. | |||||
# log_kv( | # log_kv( | ||||
# {"message": "sent device update to host", "host": host} | # {"message": "sent device update to host", "host": host} | ||||
# ) | # ) | ||||
@@ -1154,6 +1154,45 @@ class DeviceWorkerStore(SQLBaseStore): | |||||
_prune_txn, | _prune_txn, | ||||
) | ) | ||||
async def get_local_devices_not_accessed_since( | |||||
self, since_ms: int | |||||
) -> Dict[str, List[str]]: | |||||
"""Retrieves local devices that haven't been accessed since a given date. | |||||
Args: | |||||
since_ms: the timestamp to select on, every device with a last access date | |||||
from before that time is returned. | |||||
Returns: | |||||
A dictionary with an entry for each user with at least one device matching | |||||
the request, which value is a list of the device ID(s) for the corresponding | |||||
device(s). | |||||
""" | |||||
def get_devices_not_accessed_since_txn( | |||||
txn: LoggingTransaction, | |||||
) -> List[Dict[str, str]]: | |||||
sql = """ | |||||
SELECT user_id, device_id | |||||
FROM devices WHERE last_seen < ? AND hidden = FALSE | |||||
""" | |||||
txn.execute(sql, (since_ms,)) | |||||
return self.db_pool.cursor_to_dict(txn) | |||||
rows = await self.db_pool.runInteraction( | |||||
"get_devices_not_accessed_since", | |||||
get_devices_not_accessed_since_txn, | |||||
) | |||||
devices: Dict[str, List[str]] = {} | |||||
for row in rows: | |||||
# Remote devices are never stale from our point of view. | |||||
if self.hs.is_mine_id(row["user_id"]): | |||||
user_devices = devices.setdefault(row["user_id"], []) | |||||
user_devices.append(row["device_id"]) | |||||
return devices | |||||
class DeviceBackgroundUpdateStore(SQLBaseStore): | class DeviceBackgroundUpdateStore(SQLBaseStore): | ||||
def __init__( | def __init__( | ||||
@@ -13,8 +13,13 @@ | |||||
# limitations under the License. | # limitations under the License. | ||||
from http import HTTPStatus | from http import HTTPStatus | ||||
from twisted.test.proto_helpers import MemoryReactor | |||||
from synapse.api.errors import NotFoundError | |||||
from synapse.rest import admin, devices, room, sync | from synapse.rest import admin, devices, room, sync | ||||
from synapse.rest.client import account, login, register | from synapse.rest.client import account, login, register | ||||
from synapse.server import HomeServer | |||||
from synapse.util import Clock | |||||
from tests import unittest | from tests import unittest | ||||
@@ -157,3 +162,41 @@ class DeviceListsTestCase(unittest.HomeserverTestCase): | |||||
self.assertNotIn( | self.assertNotIn( | ||||
alice_user_id, changed_device_lists, bob_sync_channel.json_body | alice_user_id, changed_device_lists, bob_sync_channel.json_body | ||||
) | ) | ||||
class DevicesTestCase(unittest.HomeserverTestCase): | |||||
servlets = [ | |||||
admin.register_servlets, | |||||
login.register_servlets, | |||||
sync.register_servlets, | |||||
] | |||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: | |||||
self.handler = hs.get_device_handler() | |||||
@unittest.override_config({"delete_stale_devices_after": 72000000}) | |||||
def test_delete_stale_devices(self) -> None: | |||||
"""Tests that stale devices are automatically removed after a set time of | |||||
inactivity. | |||||
The configuration is set to delete devices that haven't been used in the past 20h. | |||||
""" | |||||
# Register a user and creates 2 devices for them. | |||||
user_id = self.register_user("user", "password") | |||||
tok1 = self.login("user", "password", device_id="abc") | |||||
tok2 = self.login("user", "password", device_id="def") | |||||
# Sync them so they have a last_seen value. | |||||
self.make_request("GET", "/sync", access_token=tok1) | |||||
self.make_request("GET", "/sync", access_token=tok2) | |||||
# Advance half a day and sync again with one of the devices, so that the next | |||||
# time the background job runs we don't delete this device (since it will look | |||||
# for devices that haven't been used for over an hour). | |||||
self.reactor.advance(43200) | |||||
self.make_request("GET", "/sync", access_token=tok1) | |||||
# Advance another half a day, and check that the device that has synced still | |||||
# exists but the one that hasn't has been removed. | |||||
self.reactor.advance(43200) | |||||
self.get_success(self.handler.get_device(user_id, "abc")) | |||||
self.get_failure(self.handler.get_device(user_id, "def"), NotFoundError) |