@@ -722,6 +722,84 @@ class PresenceHandler(BaseHandler): | |||
) | |||
class PresenceEventSource(object): | |||
def __init__(self, hs): | |||
self.hs = hs | |||
self.clock = hs.get_clock() | |||
def get_new_events_for_user(self, user, from_token, limit): | |||
from_key = int(from_token.presence_key) | |||
presence = self.hs.get_handlers().presence_handler | |||
cachemap = presence._user_cachemap | |||
# TODO(paul): limit, and filter by visibility | |||
updates = [(k, cachemap[k]) for k in cachemap | |||
if from_key < cachemap[k].serial] | |||
if updates: | |||
clock = self.clock | |||
latest_serial = max([x[1].serial for x in updates]) | |||
data = [x[1].make_event(user=x[0], clock=clock) for x in updates] | |||
end_token = from_token.copy_and_replace( | |||
"presence_key", latest_serial | |||
) | |||
return ((data, end_token)) | |||
else: | |||
end_token = from_token.copy_and_replace( | |||
"presence_key", presence._user_cachemap_latest_serial | |||
) | |||
return (([], end_token)) | |||
def get_current_token_part(self): | |||
presence = self.hs.get_handlers().presence_handler | |||
return presence._user_cachemap_latest_serial | |||
def get_pagination_rows(self, user, pagination_config, key): | |||
# TODO (erikj): Does this make sense? Ordering? | |||
from_token = pagination_config.from_token | |||
to_token = pagination_config.to_token | |||
from_key = int(from_token.presence_key) | |||
if to_token: | |||
to_key = int(to_token.presence_key) | |||
else: | |||
to_key = -1 | |||
presence = self.hs.get_handlers().presence_handler | |||
cachemap = presence._user_cachemap | |||
# TODO(paul): limit, and filter by visibility | |||
updates = [(k, cachemap[k]) for k in cachemap | |||
if to_key < cachemap[k].serial < from_key] | |||
if updates: | |||
clock = self.clock | |||
earliest_serial = max([x[1].serial for x in updates]) | |||
data = [x[1].make_event(user=x[0], clock=clock) for x in updates] | |||
if to_token: | |||
next_token = to_token | |||
else: | |||
next_token = from_token | |||
next_token = next_token.copy_and_replace( | |||
"presence_key", earliest_serial | |||
) | |||
return ((data, next_token)) | |||
else: | |||
if not to_token: | |||
to_token = from_token.copy_and_replace( | |||
"presence_key", 0 | |||
) | |||
return (([], to_token)) | |||
class UserPresenceCache(object): | |||
"""Store an observed user's state and status message. | |||
@@ -462,3 +462,51 @@ class RoomListHandler(BaseRoomHandler): | |||
chunk = yield self.store.get_rooms(is_public=True) | |||
# FIXME (erikj): START is no longer a valid value | |||
defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) | |||
class RoomEventSource(object): | |||
def __init__(self, hs): | |||
self.store = hs.get_datastore() | |||
@defer.inlineCallbacks | |||
def get_new_events_for_user(self, user, from_token, limit): | |||
# We just ignore the key for now. | |||
to_key = yield self.get_current_token_part() | |||
events, end_key = yield self.store.get_room_events_stream( | |||
user_id=user.to_string(), | |||
from_key=from_token.events_key, | |||
to_key=to_key, | |||
room_id=None, | |||
limit=limit, | |||
) | |||
end_token = from_token.copy_and_replace("events_key", end_key) | |||
defer.returnValue((events, end_token)) | |||
def get_current_token_part(self): | |||
return self.store.get_room_events_max_id() | |||
@defer.inlineCallbacks | |||
def get_pagination_rows(self, user, pagination_config, key): | |||
from_token = pagination_config.from_token | |||
to_token = pagination_config.to_token | |||
limit = pagination_config.limit | |||
direction = pagination_config.direction | |||
to_key = to_token.events_key if to_token else None | |||
events, next_key = yield self.store.paginate_room_events( | |||
room_id=key, | |||
from_key=from_token.events_key, | |||
to_key=to_key, | |||
direction=direction, | |||
limit=limit, | |||
with_feedback=True | |||
) | |||
next_token = from_token.copy_and_replace("events_key", next_key) | |||
defer.returnValue((events, next_token)) |
@@ -17,6 +17,9 @@ from twisted.internet import defer | |||
from synapse.types import StreamToken | |||
from synapse.handlers.presence import PresenceEventSource | |||
from synapse.handlers.room import RoomEventSource | |||
class NullSource(object): | |||
"""This event source never yields any events and its token remains at | |||
@@ -34,136 +37,10 @@ class NullSource(object): | |||
return defer.succeed(([], pagination_config.from_token)) | |||
class RoomEventSource(object): | |||
def __init__(self, hs): | |||
self.store = hs.get_datastore() | |||
@defer.inlineCallbacks | |||
def get_new_events_for_user(self, user, from_token, limit): | |||
# We just ignore the key for now. | |||
to_key = yield self.get_current_token_part() | |||
events, end_key = yield self.store.get_room_events_stream( | |||
user_id=user.to_string(), | |||
from_key=from_token.events_key, | |||
to_key=to_key, | |||
room_id=None, | |||
limit=limit, | |||
) | |||
end_token = from_token.copy_and_replace("events_key", end_key) | |||
defer.returnValue((events, end_token)) | |||
def get_current_token_part(self): | |||
return self.store.get_room_events_max_id() | |||
@defer.inlineCallbacks | |||
def get_pagination_rows(self, user, pagination_config, key): | |||
from_token = pagination_config.from_token | |||
to_token = pagination_config.to_token | |||
limit = pagination_config.limit | |||
direction = pagination_config.direction | |||
to_key = to_token.events_key if to_token else None | |||
events, next_key = yield self.store.paginate_room_events( | |||
room_id=key, | |||
from_key=from_token.events_key, | |||
to_key=to_key, | |||
direction=direction, | |||
limit=limit, | |||
with_feedback=True | |||
) | |||
next_token = from_token.copy_and_replace("events_key", next_key) | |||
defer.returnValue((events, next_token)) | |||
class PresenceSource(object): | |||
def __init__(self, hs): | |||
self.hs = hs | |||
self.clock = hs.get_clock() | |||
def get_new_events_for_user(self, user, from_token, limit): | |||
from_key = int(from_token.presence_key) | |||
presence = self.hs.get_handlers().presence_handler | |||
cachemap = presence._user_cachemap | |||
# TODO(paul): limit, and filter by visibility | |||
updates = [(k, cachemap[k]) for k in cachemap | |||
if from_key < cachemap[k].serial] | |||
if updates: | |||
clock = self.clock | |||
latest_serial = max([x[1].serial for x in updates]) | |||
data = [x[1].make_event(user=x[0], clock=clock) for x in updates] | |||
end_token = from_token.copy_and_replace( | |||
"presence_key", latest_serial | |||
) | |||
return ((data, end_token)) | |||
else: | |||
end_token = from_token.copy_and_replace( | |||
"presence_key", presence._user_cachemap_latest_serial | |||
) | |||
return (([], end_token)) | |||
def get_current_token_part(self): | |||
presence = self.hs.get_handlers().presence_handler | |||
return presence._user_cachemap_latest_serial | |||
def get_pagination_rows(self, user, pagination_config, key): | |||
# TODO (erikj): Does this make sense? Ordering? | |||
from_token = pagination_config.from_token | |||
to_token = pagination_config.to_token | |||
from_key = int(from_token.presence_key) | |||
if to_token: | |||
to_key = int(to_token.presence_key) | |||
else: | |||
to_key = -1 | |||
presence = self.hs.get_handlers().presence_handler | |||
cachemap = presence._user_cachemap | |||
# TODO(paul): limit, and filter by visibility | |||
updates = [(k, cachemap[k]) for k in cachemap | |||
if to_key < cachemap[k].serial < from_key] | |||
if updates: | |||
clock = self.clock | |||
earliest_serial = max([x[1].serial for x in updates]) | |||
data = [x[1].make_event(user=x[0], clock=clock) for x in updates] | |||
if to_token: | |||
next_token = to_token | |||
else: | |||
next_token = from_token | |||
next_token = next_token.copy_and_replace( | |||
"presence_key", earliest_serial | |||
) | |||
return ((data, next_token)) | |||
else: | |||
if not to_token: | |||
to_token = from_token.copy_and_replace( | |||
"presence_key", 0 | |||
) | |||
return (([], to_token)) | |||
class EventSources(object): | |||
SOURCE_TYPES = { | |||
"room": RoomEventSource, | |||
"presence": PresenceSource, | |||
"presence": PresenceEventSource, | |||
} | |||
def __init__(self, hs): | |||
@@ -229,7 +229,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): | |||
# HIDEOUS HACKERY | |||
# TODO(paul): This should be injected in via the HomeServer DI system | |||
from synapse.streams.events import ( | |||
PresenceSource, NullSource, EventSources | |||
PresenceEventSource, NullSource, EventSources | |||
) | |||
old_SOURCE_TYPES = EventSources.SOURCE_TYPES | |||
@@ -240,7 +240,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): | |||
EventSources.SOURCE_TYPES = { | |||
k: NullSource for k in old_SOURCE_TYPES.keys() | |||
} | |||
EventSources.SOURCE_TYPES["presence"] = PresenceSource | |||
EventSources.SOURCE_TYPES["presence"] = PresenceEventSource | |||
hs = HomeServer("test", | |||
db_pool=None, | |||