@@ -0,0 +1 @@ | |||
Cleanup old worker datastore classes. Contributed by Nick @ Beeper (@fizzadar). |
@@ -28,10 +28,6 @@ from synapse.config.homeserver import HomeServerConfig | |||
from synapse.config.logger import setup_logging | |||
from synapse.events import EventBase | |||
from synapse.handlers.admin import ExfiltrationWriter | |||
from synapse.replication.slave.storage.devices import SlavedDeviceStore | |||
from synapse.replication.slave.storage.events import SlavedEventStore | |||
from synapse.replication.slave.storage.filtering import SlavedFilteringStore | |||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore | |||
from synapse.server import HomeServer | |||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection | |||
from synapse.storage.databases.main.account_data import AccountDataWorkerStore | |||
@@ -40,10 +36,24 @@ from synapse.storage.databases.main.appservice import ( | |||
ApplicationServiceWorkerStore, | |||
) | |||
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore | |||
from synapse.storage.databases.main.devices import DeviceWorkerStore | |||
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore | |||
from synapse.storage.databases.main.event_push_actions import ( | |||
EventPushActionsWorkerStore, | |||
) | |||
from synapse.storage.databases.main.events_worker import EventsWorkerStore | |||
from synapse.storage.databases.main.filtering import FilteringWorkerStore | |||
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore | |||
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore | |||
from synapse.storage.databases.main.registration import RegistrationWorkerStore | |||
from synapse.storage.databases.main.relations import RelationsWorkerStore | |||
from synapse.storage.databases.main.room import RoomWorkerStore | |||
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore | |||
from synapse.storage.databases.main.signatures import SignatureWorkerStore | |||
from synapse.storage.databases.main.state import StateGroupWorkerStore | |||
from synapse.storage.databases.main.stream import StreamWorkerStore | |||
from synapse.storage.databases.main.tags import TagsWorkerStore | |||
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore | |||
from synapse.types import StateMap | |||
from synapse.util import SYNAPSE_VERSION | |||
from synapse.util.logcontext import LoggingContext | |||
@@ -52,17 +62,25 @@ logger = logging.getLogger("synapse.app.admin_cmd") | |||
class AdminCmdSlavedStore( | |||
SlavedFilteringStore, | |||
SlavedPushRuleStore, | |||
SlavedEventStore, | |||
SlavedDeviceStore, | |||
FilteringWorkerStore, | |||
DeviceWorkerStore, | |||
TagsWorkerStore, | |||
DeviceInboxWorkerStore, | |||
AccountDataWorkerStore, | |||
PushRulesWorkerStore, | |||
ApplicationServiceTransactionWorkerStore, | |||
ApplicationServiceWorkerStore, | |||
RegistrationWorkerStore, | |||
RoomMemberWorkerStore, | |||
RelationsWorkerStore, | |||
EventFederationWorkerStore, | |||
EventPushActionsWorkerStore, | |||
StateGroupWorkerStore, | |||
SignatureWorkerStore, | |||
UserErasureWorkerStore, | |||
ReceiptsWorkerStore, | |||
StreamWorkerStore, | |||
EventsWorkerStore, | |||
RegistrationWorkerStore, | |||
RoomWorkerStore, | |||
): | |||
def __init__( | |||
@@ -48,12 +48,6 @@ from synapse.http.site import SynapseRequest, SynapseSite | |||
from synapse.logging.context import LoggingContext | |||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy | |||
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource | |||
from synapse.replication.slave.storage.devices import SlavedDeviceStore | |||
from synapse.replication.slave.storage.events import SlavedEventStore | |||
from synapse.replication.slave.storage.filtering import SlavedFilteringStore | |||
from synapse.replication.slave.storage.keys import SlavedKeyStore | |||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore | |||
from synapse.replication.slave.storage.pushers import SlavedPusherStore | |||
from synapse.rest.admin import register_servlets_for_media_repo | |||
from synapse.rest.client import ( | |||
account_data, | |||
@@ -101,8 +95,16 @@ from synapse.storage.databases.main.appservice import ( | |||
from synapse.storage.databases.main.censor_events import CensorEventsStore | |||
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore | |||
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore | |||
from synapse.storage.databases.main.devices import DeviceWorkerStore | |||
from synapse.storage.databases.main.directory import DirectoryWorkerStore | |||
from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore | |||
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore | |||
from synapse.storage.databases.main.event_push_actions import ( | |||
EventPushActionsWorkerStore, | |||
) | |||
from synapse.storage.databases.main.events_worker import EventsWorkerStore | |||
from synapse.storage.databases.main.filtering import FilteringWorkerStore | |||
from synapse.storage.databases.main.keys import KeyStore | |||
from synapse.storage.databases.main.lock import LockStore | |||
from synapse.storage.databases.main.media_repository import MediaRepositoryStore | |||
from synapse.storage.databases.main.metrics import ServerMetricsStore | |||
@@ -111,17 +113,25 @@ from synapse.storage.databases.main.monthly_active_users import ( | |||
) | |||
from synapse.storage.databases.main.presence import PresenceStore | |||
from synapse.storage.databases.main.profile import ProfileWorkerStore | |||
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore | |||
from synapse.storage.databases.main.pusher import PusherWorkerStore | |||
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore | |||
from synapse.storage.databases.main.registration import RegistrationWorkerStore | |||
from synapse.storage.databases.main.relations import RelationsWorkerStore | |||
from synapse.storage.databases.main.room import RoomWorkerStore | |||
from synapse.storage.databases.main.room_batch import RoomBatchStore | |||
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore | |||
from synapse.storage.databases.main.search import SearchStore | |||
from synapse.storage.databases.main.session import SessionStore | |||
from synapse.storage.databases.main.signatures import SignatureWorkerStore | |||
from synapse.storage.databases.main.state import StateGroupWorkerStore | |||
from synapse.storage.databases.main.stats import StatsStore | |||
from synapse.storage.databases.main.stream import StreamWorkerStore | |||
from synapse.storage.databases.main.tags import TagsWorkerStore | |||
from synapse.storage.databases.main.transactions import TransactionWorkerStore | |||
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore | |||
from synapse.storage.databases.main.user_directory import UserDirectoryStore | |||
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore | |||
from synapse.types import JsonDict | |||
from synapse.util import SYNAPSE_VERSION | |||
from synapse.util.httpresourcetree import create_resource_tree | |||
@@ -232,26 +242,36 @@ class GenericWorkerSlavedStore( | |||
EndToEndRoomKeyStore, | |||
PresenceStore, | |||
DeviceInboxWorkerStore, | |||
SlavedDeviceStore, | |||
SlavedPushRuleStore, | |||
DeviceWorkerStore, | |||
TagsWorkerStore, | |||
AccountDataWorkerStore, | |||
SlavedPusherStore, | |||
CensorEventsStore, | |||
ClientIpWorkerStore, | |||
SlavedEventStore, | |||
SlavedKeyStore, | |||
# KeyStore isn't really safe to use from a worker, but for now we do so and hope that | |||
# the races it creates aren't too bad. | |||
KeyStore, | |||
RoomWorkerStore, | |||
RoomBatchStore, | |||
DirectoryWorkerStore, | |||
PushRulesWorkerStore, | |||
ApplicationServiceTransactionWorkerStore, | |||
ApplicationServiceWorkerStore, | |||
ProfileWorkerStore, | |||
SlavedFilteringStore, | |||
FilteringWorkerStore, | |||
MonthlyActiveUsersWorkerStore, | |||
MediaRepositoryStore, | |||
ServerMetricsStore, | |||
PusherWorkerStore, | |||
RoomMemberWorkerStore, | |||
RelationsWorkerStore, | |||
EventFederationWorkerStore, | |||
EventPushActionsWorkerStore, | |||
StateGroupWorkerStore, | |||
SignatureWorkerStore, | |||
UserErasureWorkerStore, | |||
ReceiptsWorkerStore, | |||
StreamWorkerStore, | |||
EventsWorkerStore, | |||
RegistrationWorkerStore, | |||
SearchStore, | |||
TransactionWorkerStore, | |||
@@ -1,79 +0,0 @@ | |||
# Copyright 2016 OpenMarket Ltd | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from typing import TYPE_CHECKING, Any, Iterable | |||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker | |||
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream | |||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection | |||
from synapse.storage.databases.main.devices import DeviceWorkerStore | |||
if TYPE_CHECKING: | |||
from synapse.server import HomeServer | |||
class SlavedDeviceStore(DeviceWorkerStore): | |||
def __init__( | |||
self, | |||
database: DatabasePool, | |||
db_conn: LoggingDatabaseConnection, | |||
hs: "HomeServer", | |||
): | |||
self.hs = hs | |||
self._device_list_id_gen = SlavedIdTracker( | |||
db_conn, | |||
"device_lists_stream", | |||
"stream_id", | |||
extra_tables=[ | |||
("user_signature_stream", "stream_id"), | |||
("device_lists_outbound_pokes", "stream_id"), | |||
("device_lists_changes_in_room", "stream_id"), | |||
], | |||
) | |||
super().__init__(database, db_conn, hs) | |||
def get_device_stream_token(self) -> int: | |||
return self._device_list_id_gen.get_current_token() | |||
def process_replication_rows( | |||
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] | |||
) -> None: | |||
if stream_name == DeviceListsStream.NAME: | |||
self._device_list_id_gen.advance(instance_name, token) | |||
self._invalidate_caches_for_devices(token, rows) | |||
elif stream_name == UserSignatureStream.NAME: | |||
self._device_list_id_gen.advance(instance_name, token) | |||
for row in rows: | |||
self._user_signature_stream_cache.entity_has_changed(row.user_id, token) | |||
return super().process_replication_rows(stream_name, instance_name, token, rows) | |||
def _invalidate_caches_for_devices( | |||
self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow] | |||
) -> None: | |||
for row in rows: | |||
# The entities are either user IDs (starting with '@') whose devices | |||
# have changed, or remote servers that we need to tell about | |||
# changes. | |||
if row.entity.startswith("@"): | |||
self._device_list_stream_cache.entity_has_changed(row.entity, token) | |||
self.get_cached_devices_for_user.invalidate((row.entity,)) | |||
self._get_cached_user_device.invalidate((row.entity,)) | |||
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,)) | |||
else: | |||
self._device_list_federation_stream_cache.entity_has_changed( | |||
row.entity, token | |||
) |
@@ -1,79 +0,0 @@ | |||
# Copyright 2016 OpenMarket Ltd | |||
# Copyright 2018 New Vector Ltd | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import logging | |||
from typing import TYPE_CHECKING | |||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection | |||
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore | |||
from synapse.storage.databases.main.event_push_actions import ( | |||
EventPushActionsWorkerStore, | |||
) | |||
from synapse.storage.databases.main.events_worker import EventsWorkerStore | |||
from synapse.storage.databases.main.relations import RelationsWorkerStore | |||
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore | |||
from synapse.storage.databases.main.signatures import SignatureWorkerStore | |||
from synapse.storage.databases.main.state import StateGroupWorkerStore | |||
from synapse.storage.databases.main.stream import StreamWorkerStore | |||
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore | |||
from synapse.util.caches.stream_change_cache import StreamChangeCache | |||
if TYPE_CHECKING: | |||
from synapse.server import HomeServer | |||
logger = logging.getLogger(__name__) | |||
# So, um, we want to borrow a load of functions intended for reading from | |||
# a DataStore, but we don't want to take functions that either write to the | |||
# DataStore or are cached and don't have cache invalidation logic. | |||
# | |||
# Rather than write duplicate versions of those functions, or lift them to | |||
# a common base class, we going to grab the underlying __func__ object from | |||
# the method descriptor on the DataStore and chuck them into our class. | |||
class SlavedEventStore( | |||
EventFederationWorkerStore, | |||
RoomMemberWorkerStore, | |||
EventPushActionsWorkerStore, | |||
StreamWorkerStore, | |||
StateGroupWorkerStore, | |||
SignatureWorkerStore, | |||
EventsWorkerStore, | |||
UserErasureWorkerStore, | |||
RelationsWorkerStore, | |||
): | |||
def __init__( | |||
self, | |||
database: DatabasePool, | |||
db_conn: LoggingDatabaseConnection, | |||
hs: "HomeServer", | |||
): | |||
super().__init__(database, db_conn, hs) | |||
events_max = self._stream_id_gen.get_current_token() | |||
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict( | |||
db_conn, | |||
"current_state_delta_stream", | |||
entity_column="room_id", | |||
stream_column="stream_id", | |||
max_value=events_max, # As we share the stream id with events token | |||
limit=1000, | |||
) | |||
self._curr_state_delta_stream_cache = StreamChangeCache( | |||
"_curr_state_delta_stream_cache", | |||
min_curr_state_delta_id, | |||
prefilled_cache=curr_state_delta_prefill, | |||
) |
@@ -1,35 +0,0 @@ | |||
# Copyright 2015, 2016 OpenMarket Ltd | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from typing import TYPE_CHECKING | |||
from synapse.storage._base import SQLBaseStore | |||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection | |||
from synapse.storage.databases.main.filtering import FilteringStore | |||
if TYPE_CHECKING: | |||
from synapse.server import HomeServer | |||
class SlavedFilteringStore(SQLBaseStore): | |||
def __init__( | |||
self, | |||
database: DatabasePool, | |||
db_conn: LoggingDatabaseConnection, | |||
hs: "HomeServer", | |||
): | |||
super().__init__(database, db_conn, hs) | |||
# Filters are immutable so this cache doesn't need to be expired | |||
get_user_filter = FilteringStore.__dict__["get_user_filter"] |
@@ -1,20 +0,0 @@ | |||
# Copyright 2015, 2016 OpenMarket Ltd | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from synapse.storage.databases.main.keys import KeyStore | |||
# KeyStore isn't really safe to use from a worker, but for now we do so and hope that | |||
# the races it creates aren't too bad. | |||
SlavedKeyStore = KeyStore |
@@ -1,35 +0,0 @@ | |||
# Copyright 2015, 2016 OpenMarket Ltd | |||
# Copyright 2018 New Vector Ltd | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from typing import Any, Iterable | |||
from synapse.replication.tcp.streams import PushRulesStream | |||
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore | |||
from .events import SlavedEventStore | |||
class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): | |||
def get_max_push_rules_stream_id(self) -> int: | |||
return self._push_rules_stream_id_gen.get_current_token() | |||
def process_replication_rows( | |||
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] | |||
) -> None: | |||
if stream_name == PushRulesStream.NAME: | |||
self._push_rules_stream_id_gen.advance(instance_name, token) | |||
for row in rows: | |||
self.get_push_rules_for_user.invalidate((row.user_id,)) | |||
self.push_rules_stream_cache.entity_has_changed(row.user_id, token) | |||
return super().process_replication_rows(stream_name, instance_name, token, rows) |
@@ -1,47 +0,0 @@ | |||
# Copyright 2016 OpenMarket Ltd | |||
# Copyright 2018 New Vector Ltd | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from typing import TYPE_CHECKING, Any, Iterable | |||
from synapse.replication.tcp.streams import PushersStream | |||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection | |||
from synapse.storage.databases.main.pusher import PusherWorkerStore | |||
from ._slaved_id_tracker import SlavedIdTracker | |||
if TYPE_CHECKING: | |||
from synapse.server import HomeServer | |||
class SlavedPusherStore(PusherWorkerStore): | |||
def __init__( | |||
self, | |||
database: DatabasePool, | |||
db_conn: LoggingDatabaseConnection, | |||
hs: "HomeServer", | |||
): | |||
super().__init__(database, db_conn, hs) | |||
self._pushers_id_gen = SlavedIdTracker( # type: ignore | |||
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")] | |||
) | |||
def get_pushers_stream_token(self) -> int: | |||
return self._pushers_id_gen.get_current_token() | |||
def process_replication_rows( | |||
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] | |||
) -> None: | |||
if stream_name == PushersStream.NAME: | |||
self._pushers_id_gen.advance(instance_name, token) | |||
return super().process_replication_rows(stream_name, instance_name, token, rows) |
@@ -26,9 +26,7 @@ from synapse.storage.database import ( | |||
from synapse.storage.databases.main.stats import UserSortOrder | |||
from synapse.storage.engines import BaseDatabaseEngine | |||
from synapse.storage.types import Cursor | |||
from synapse.storage.util.id_generators import StreamIdGenerator | |||
from synapse.types import JsonDict, get_domain_from_id | |||
from synapse.util.caches.stream_change_cache import StreamChangeCache | |||
from .account_data import AccountDataStore | |||
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore | |||
@@ -138,41 +136,8 @@ class DataStore( | |||
self._clock = hs.get_clock() | |||
self.database_engine = database.engine | |||
self._device_list_id_gen = StreamIdGenerator( | |||
db_conn, | |||
"device_lists_stream", | |||
"stream_id", | |||
extra_tables=[ | |||
("user_signature_stream", "stream_id"), | |||
("device_lists_outbound_pokes", "stream_id"), | |||
("device_lists_changes_in_room", "stream_id"), | |||
], | |||
) | |||
super().__init__(database, db_conn, hs) | |||
events_max = self._stream_id_gen.get_current_token() | |||
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict( | |||
db_conn, | |||
"current_state_delta_stream", | |||
entity_column="room_id", | |||
stream_column="stream_id", | |||
max_value=events_max, # As we share the stream id with events token | |||
limit=1000, | |||
) | |||
self._curr_state_delta_stream_cache = StreamChangeCache( | |||
"_curr_state_delta_stream_cache", | |||
min_curr_state_delta_id, | |||
prefilled_cache=curr_state_delta_prefill, | |||
) | |||
self._stream_order_on_start = self.get_room_max_stream_ordering() | |||
self._min_stream_order_on_start = self.get_room_min_stream_ordering() | |||
def get_device_stream_token(self) -> int: | |||
# TODO: shouldn't this be moved to `DeviceWorkerStore`? | |||
return self._device_list_id_gen.get_current_token() | |||
async def get_users(self) -> List[JsonDict]: | |||
"""Function to retrieve a list of users in users table. | |||
@@ -13,7 +13,6 @@ | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import abc | |||
import logging | |||
from typing import ( | |||
TYPE_CHECKING, | |||
@@ -39,6 +38,8 @@ from synapse.logging.opentracing import ( | |||
whitelisted_homeserver, | |||
) | |||
from synapse.metrics.background_process_metrics import wrap_as_background_process | |||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker | |||
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream | |||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause | |||
from synapse.storage.database import ( | |||
DatabasePool, | |||
@@ -49,6 +50,11 @@ from synapse.storage.database import ( | |||
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore | |||
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore | |||
from synapse.storage.types import Cursor | |||
from synapse.storage.util.id_generators import ( | |||
AbstractStreamIdGenerator, | |||
AbstractStreamIdTracker, | |||
StreamIdGenerator, | |||
) | |||
from synapse.types import JsonDict, get_verify_key_from_cross_signing_key | |||
from synapse.util import json_decoder, json_encoder | |||
from synapse.util.caches.descriptors import cached, cachedList | |||
@@ -80,9 +86,32 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): | |||
): | |||
super().__init__(database, db_conn, hs) | |||
if hs.config.worker.worker_app is None: | |||
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator( | |||
db_conn, | |||
"device_lists_stream", | |||
"stream_id", | |||
extra_tables=[ | |||
("user_signature_stream", "stream_id"), | |||
("device_lists_outbound_pokes", "stream_id"), | |||
("device_lists_changes_in_room", "stream_id"), | |||
], | |||
) | |||
else: | |||
self._device_list_id_gen = SlavedIdTracker( | |||
db_conn, | |||
"device_lists_stream", | |||
"stream_id", | |||
extra_tables=[ | |||
("user_signature_stream", "stream_id"), | |||
("device_lists_outbound_pokes", "stream_id"), | |||
("device_lists_changes_in_room", "stream_id"), | |||
], | |||
) | |||
# Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a | |||
# StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker). | |||
device_list_max = self._device_list_id_gen.get_current_token() # type: ignore[attr-defined] | |||
device_list_max = self._device_list_id_gen.get_current_token() | |||
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( | |||
db_conn, | |||
"device_lists_stream", | |||
@@ -136,6 +165,39 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): | |||
self._prune_old_outbound_device_pokes, 60 * 60 * 1000 | |||
) | |||
def process_replication_rows( | |||
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] | |||
) -> None: | |||
if stream_name == DeviceListsStream.NAME: | |||
self._device_list_id_gen.advance(instance_name, token) | |||
self._invalidate_caches_for_devices(token, rows) | |||
elif stream_name == UserSignatureStream.NAME: | |||
self._device_list_id_gen.advance(instance_name, token) | |||
for row in rows: | |||
self._user_signature_stream_cache.entity_has_changed(row.user_id, token) | |||
return super().process_replication_rows(stream_name, instance_name, token, rows) | |||
def _invalidate_caches_for_devices( | |||
self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow] | |||
) -> None: | |||
for row in rows: | |||
# The entities are either user IDs (starting with '@') whose devices | |||
# have changed, or remote servers that we need to tell about | |||
# changes. | |||
if row.entity.startswith("@"): | |||
self._device_list_stream_cache.entity_has_changed(row.entity, token) | |||
self.get_cached_devices_for_user.invalidate((row.entity,)) | |||
self._get_cached_user_device.invalidate((row.entity,)) | |||
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,)) | |||
else: | |||
self._device_list_federation_stream_cache.entity_has_changed( | |||
row.entity, token | |||
) | |||
def get_device_stream_token(self) -> int: | |||
return self._device_list_id_gen.get_current_token() | |||
async def count_devices_by_users(self, user_ids: Optional[List[str]] = None) -> int: | |||
"""Retrieve number of all devices of given users. | |||
Only returns number of devices that are not marked as hidden. | |||
@@ -677,11 +739,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): | |||
}, | |||
) | |||
@abc.abstractmethod | |||
def get_device_stream_token(self) -> int: | |||
"""Get the current stream id from the _device_list_id_gen""" | |||
... | |||
@trace | |||
@cancellable | |||
async def get_user_devices_from_cache( | |||
@@ -1481,6 +1538,10 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): | |||
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): | |||
# Because we have write access, this will be a StreamIdGenerator | |||
# (see DeviceWorkerStore.__init__) | |||
_device_list_id_gen: AbstractStreamIdGenerator | |||
def __init__( | |||
self, | |||
database: DatabasePool, | |||
@@ -1805,7 +1866,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): | |||
context, | |||
) | |||
async with self._device_list_id_gen.get_next_mult( # type: ignore[attr-defined] | |||
async with self._device_list_id_gen.get_next_mult( | |||
len(device_ids) | |||
) as stream_ids: | |||
await self.db_pool.runInteraction( | |||
@@ -2044,7 +2105,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): | |||
[], | |||
) | |||
async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: # type: ignore[attr-defined] | |||
async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: | |||
return await self.db_pool.runInteraction( | |||
"add_device_list_outbound_pokes", | |||
add_device_list_outbound_pokes_txn, | |||
@@ -2058,7 +2119,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): | |||
updates during partial joins. | |||
""" | |||
async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined] | |||
async with self._device_list_id_gen.get_next() as stream_id: | |||
await self.db_pool.simple_upsert( | |||
table="device_lists_remote_pending", | |||
keyvalues={ | |||
@@ -81,6 +81,7 @@ from synapse.util import unwrapFirstError | |||
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation | |||
from synapse.util.caches.descriptors import cached, cachedList | |||
from synapse.util.caches.lrucache import AsyncLruCache | |||
from synapse.util.caches.stream_change_cache import StreamChangeCache | |||
from synapse.util.cancellation import cancellable | |||
from synapse.util.iterutils import batch_iter | |||
from synapse.util.metrics import Measure | |||
@@ -233,6 +234,21 @@ class EventsWorkerStore(SQLBaseStore): | |||
db_conn, "events", "stream_ordering", step=-1 | |||
) | |||
events_max = self._stream_id_gen.get_current_token() | |||
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict( | |||
db_conn, | |||
"current_state_delta_stream", | |||
entity_column="room_id", | |||
stream_column="stream_id", | |||
max_value=events_max, # As we share the stream id with events token | |||
limit=1000, | |||
) | |||
self._curr_state_delta_stream_cache: StreamChangeCache = StreamChangeCache( | |||
"_curr_state_delta_stream_cache", | |||
min_curr_state_delta_id, | |||
prefilled_cache=curr_state_delta_prefill, | |||
) | |||
if hs.config.worker.run_background_tasks: | |||
# We periodically clean out old transaction ID mappings | |||
self._clock.looping_call( | |||
@@ -24,7 +24,7 @@ from synapse.types import JsonDict | |||
from synapse.util.caches.descriptors import cached | |||
class FilteringStore(SQLBaseStore): | |||
class FilteringWorkerStore(SQLBaseStore): | |||
@cached(num_args=2) | |||
async def get_user_filter( | |||
self, user_localpart: str, filter_id: Union[int, str] | |||
@@ -46,6 +46,8 @@ class FilteringStore(SQLBaseStore): | |||
return db_to_json(def_json) | |||
class FilteringStore(FilteringWorkerStore): | |||
async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> int: | |||
def_json = encode_canonical_json(user_filter) | |||
@@ -12,13 +12,13 @@ | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import abc | |||
import logging | |||
from typing import ( | |||
TYPE_CHECKING, | |||
Any, | |||
Collection, | |||
Dict, | |||
Iterable, | |||
List, | |||
Mapping, | |||
Optional, | |||
@@ -31,6 +31,7 @@ from typing import ( | |||
from synapse.api.errors import StoreError | |||
from synapse.config.homeserver import ExperimentalConfig | |||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker | |||
from synapse.replication.tcp.streams import PushRulesStream | |||
from synapse.storage._base import SQLBaseStore | |||
from synapse.storage.database import ( | |||
DatabasePool, | |||
@@ -90,8 +91,6 @@ def _load_rules( | |||
return filtered_rules | |||
# The ABCMeta metaclass ensures that it cannot be instantiated without | |||
# the abstract methods being implemented. | |||
class PushRulesWorkerStore( | |||
ApplicationServiceWorkerStore, | |||
PusherWorkerStore, | |||
@@ -99,7 +98,6 @@ class PushRulesWorkerStore( | |||
ReceiptsWorkerStore, | |||
EventsWorkerStore, | |||
SQLBaseStore, | |||
metaclass=abc.ABCMeta, | |||
): | |||
"""This is an abstract base class where subclasses must implement | |||
`get_max_push_rules_stream_id` which can be called in the initializer. | |||
@@ -136,14 +134,23 @@ class PushRulesWorkerStore( | |||
prefilled_cache=push_rules_prefill, | |||
) | |||
@abc.abstractmethod | |||
def get_max_push_rules_stream_id(self) -> int: | |||
"""Get the position of the push rules stream. | |||
Returns: | |||
int | |||
""" | |||
raise NotImplementedError() | |||
return self._push_rules_stream_id_gen.get_current_token() | |||
def process_replication_rows( | |||
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] | |||
) -> None: | |||
if stream_name == PushRulesStream.NAME: | |||
self._push_rules_stream_id_gen.advance(instance_name, token) | |||
for row in rows: | |||
self.get_push_rules_for_user.invalidate((row.user_id,)) | |||
self.push_rules_stream_cache.entity_has_changed(row.user_id, token) | |||
return super().process_replication_rows(stream_name, instance_name, token, rows) | |||
@cached(max_entries=5000) | |||
async def get_push_rules_for_user(self, user_id: str) -> FilteredPushRules: | |||
@@ -27,13 +27,19 @@ from typing import ( | |||
) | |||
from synapse.push import PusherConfig, ThrottleParams | |||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker | |||
from synapse.replication.tcp.streams import PushersStream | |||
from synapse.storage._base import SQLBaseStore, db_to_json | |||
from synapse.storage.database import ( | |||
DatabasePool, | |||
LoggingDatabaseConnection, | |||
LoggingTransaction, | |||
) | |||
from synapse.storage.util.id_generators import StreamIdGenerator | |||
from synapse.storage.util.id_generators import ( | |||
AbstractStreamIdGenerator, | |||
AbstractStreamIdTracker, | |||
StreamIdGenerator, | |||
) | |||
from synapse.types import JsonDict | |||
from synapse.util import json_encoder | |||
from synapse.util.caches.descriptors import cached | |||
@@ -52,9 +58,21 @@ class PusherWorkerStore(SQLBaseStore): | |||
hs: "HomeServer", | |||
): | |||
super().__init__(database, db_conn, hs) | |||
self._pushers_id_gen = StreamIdGenerator( | |||
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")] | |||
) | |||
if hs.config.worker.worker_app is None: | |||
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator( | |||
db_conn, | |||
"pushers", | |||
"id", | |||
extra_tables=[("deleted_pushers", "stream_id")], | |||
) | |||
else: | |||
self._pushers_id_gen = SlavedIdTracker( | |||
db_conn, | |||
"pushers", | |||
"id", | |||
extra_tables=[("deleted_pushers", "stream_id")], | |||
) | |||
self.db_pool.updates.register_background_update_handler( | |||
"remove_deactivated_pushers", | |||
@@ -96,6 +114,16 @@ class PusherWorkerStore(SQLBaseStore): | |||
yield PusherConfig(**r) | |||
def get_pushers_stream_token(self) -> int: | |||
return self._pushers_id_gen.get_current_token() | |||
def process_replication_rows( | |||
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] | |||
) -> None: | |||
if stream_name == PushersStream.NAME: | |||
self._pushers_id_gen.advance(instance_name, token) | |||
return super().process_replication_rows(stream_name, instance_name, token, rows) | |||
async def get_pushers_by_app_id_and_pushkey( | |||
self, app_id: str, pushkey: str | |||
) -> Iterator[PusherConfig]: | |||
@@ -545,8 +573,9 @@ class PusherBackgroundUpdatesStore(SQLBaseStore): | |||
class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore): | |||
def get_pushers_stream_token(self) -> int: | |||
return self._pushers_id_gen.get_current_token() | |||
# Because we have write access, this will be a StreamIdGenerator | |||
# (see PusherWorkerStore.__init__) | |||
_pushers_id_gen: AbstractStreamIdGenerator | |||
async def add_pusher( | |||
self, | |||
@@ -415,6 +415,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): | |||
) | |||
self._stream_order_on_start = self.get_room_max_stream_ordering() | |||
self._min_stream_order_on_start = self.get_room_min_stream_ordering() | |||
def get_room_max_stream_ordering(self) -> int: | |||
"""Get the stream_ordering of regular events that we have committed up to | |||
@@ -21,11 +21,11 @@ from synapse.api.constants import ReceiptTypes | |||
from synapse.api.room_versions import RoomVersions | |||
from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict | |||
from synapse.handlers.room import RoomEventSource | |||
from synapse.replication.slave.storage.events import SlavedEventStore | |||
from synapse.storage.databases.main.event_push_actions import ( | |||
NotifCounts, | |||
RoomNotifCounts, | |||
) | |||
from synapse.storage.databases.main.events_worker import EventsWorkerStore | |||
from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser | |||
from synapse.types import PersistedEventPosition | |||
@@ -58,9 +58,9 @@ def patch__eq__(cls): | |||
return unpatch | |||
class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): | |||
class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase): | |||
STORE_TYPE = SlavedEventStore | |||
STORE_TYPE = EventsWorkerStore | |||
def setUp(self): | |||
# Patch up the equality operator for events so that we can check | |||