@@ -64,7 +64,13 @@ from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.storage.databases.main import DataStore
from synapse.storage.databases.main import DataStore
from synapse.streams import EventSource
from synapse.streams import EventSource
from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id
from synapse.types import (
JsonDict,
StrCollection,
StreamKeyType,
UserID,
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.metrics import Measure
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from synapse.util.wheel_timer import WheelTimer
@@ -320,7 +326,7 @@ class BasePresenceHandler(abc.ABC):
for destination, host_states in hosts_to_states.items():
for destination, host_states in hosts_to_states.items():
self._federation.send_presence_to_destinations(host_states, [destination])
self._federation.send_presence_to_destinations(host_states, [destination])
async def send_full_presence_to_users(self, user_ids: Collection[str] ) -> None:
async def send_full_presence_to_users(self, user_ids: Str Collection) -> None:
"""
"""
Adds to the list of users who should receive a full snapshot of presence
Adds to the list of users who should receive a full snapshot of presence
upon their next sync. Note that this only works for local users.
upon their next sync. Note that this only works for local users.
@@ -1601,7 +1607,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
# Having a default limit doesn't match the EventSource API, but some
# Having a default limit doesn't match the EventSource API, but some
# callers do not provide it. It is unused in this class.
# callers do not provide it. It is unused in this class.
limit: int = 0,
limit: int = 0,
room_ids: Optional[Collection[str] ] = None,
room_ids: Optional[Str Collection] = None,
is_guest: bool = False,
is_guest: bool = False,
explicit_room_id: Optional[str] = None,
explicit_room_id: Optional[str] = None,
include_offline: bool = True,
include_offline: bool = True,
@@ -1688,7 +1694,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
# The set of users that we're interested in and that have had a presence update.
# The set of users that we're interested in and that have had a presence update.
# We'll actually pull the presence updates for these users at the end.
# We'll actually pull the presence updates for these users at the end.
interested_and_updated_users: Collection[str]
interested_and_updated_users: Str Collection
if from_key is not None:
if from_key is not None:
# First get all users that have had a presence update
# First get all users that have had a presence update
@@ -2120,7 +2126,7 @@ class PresenceFederationQueue:
# stream_id, destinations, user_ids)`. We don't store the full states
# stream_id, destinations, user_ids)`. We don't store the full states
# for efficiency, and remote workers will already have the full states
# for efficiency, and remote workers will already have the full states
# cached.
# cached.
self._queue: List[Tuple[int, int, Collection[str] , Set[str]]] = []
self._queue: List[Tuple[int, int, Str Collection, Set[str]]] = []
self._next_id = 1
self._next_id = 1
@@ -2142,7 +2148,7 @@ class PresenceFederationQueue:
self._queue = self._queue[index:]
self._queue = self._queue[index:]
def send_presence_to_destinations(
def send_presence_to_destinations(
self, states: Collection[UserPresenceState], destinations: Collection[str]
self, states: Collection[UserPresenceState], destinations: Str Collection
) -> None:
) -> None:
"""Send the presence states to the given destinations.
"""Send the presence states to the given destinations.