@@ -0,0 +1 @@ | |||
Reduce memory allocations. |
@@ -80,10 +80,6 @@ class UserPresenceState: | |||
def as_dict(self) -> JsonDict: | |||
return attr.asdict(self) | |||
@staticmethod | |||
def from_dict(d: JsonDict) -> "UserPresenceState": | |||
return UserPresenceState(**d) | |||
def copy_and_replace(self, **kwargs: Any) -> "UserPresenceState": | |||
return attr.evolve(self, **kwargs) | |||
@@ -395,7 +395,7 @@ class PresenceDestinationsRow(BaseFederationRow): | |||
@staticmethod | |||
def from_data(data: JsonDict) -> "PresenceDestinationsRow": | |||
return PresenceDestinationsRow( | |||
state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"] | |||
state=UserPresenceState(**data["state"]), destinations=data["dests"] | |||
) | |||
def to_data(self) -> JsonDict: | |||
@@ -198,7 +198,13 @@ class DestinationMembershipRestServlet(RestServlet): | |||
rooms, total = await self._store.get_destination_rooms_paginate( | |||
destination, start, limit, direction | |||
) | |||
response = {"rooms": rooms, "total": total} | |||
response = { | |||
"rooms": [ | |||
{"room_id": room_id, "stream_ordering": stream_ordering} | |||
for room_id, stream_ordering in rooms | |||
], | |||
"total": total, | |||
} | |||
if (start + limit) < total: | |||
response["next_token"] = str(start + len(rooms)) | |||
@@ -2418,7 +2418,7 @@ class DatabasePool: | |||
keyvalues: Optional[Dict[str, Any]] = None, | |||
exclude_keyvalues: Optional[Dict[str, Any]] = None, | |||
order_direction: str = "ASC", | |||
) -> List[Dict[str, Any]]: | |||
) -> List[Tuple[Any, ...]]: | |||
""" | |||
Executes a SELECT query on the named table with start and limit, | |||
of row numbers, which may return zero or number of rows from start to limit, | |||
@@ -2447,7 +2447,7 @@ class DatabasePool: | |||
order_direction: Whether the results should be ordered "ASC" or "DESC". | |||
Returns: | |||
The result as a list of dictionaries. | |||
The result as a list of tuples. | |||
""" | |||
if order_direction not in ["ASC", "DESC"]: | |||
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") | |||
@@ -2474,7 +2474,7 @@ class DatabasePool: | |||
) | |||
txn.execute(sql, arg_list + [limit, start]) | |||
return cls.cursor_to_dict(txn) | |||
return txn.fetchall() | |||
async def simple_search_list( | |||
self, | |||
@@ -20,6 +20,7 @@ from typing import ( | |||
Mapping, | |||
Optional, | |||
Tuple, | |||
Union, | |||
cast, | |||
) | |||
@@ -385,28 +386,47 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) | |||
limit = 100 | |||
offset = 0 | |||
while True: | |||
rows = await self.db_pool.runInteraction( | |||
"get_presence_for_all_users", | |||
self.db_pool.simple_select_list_paginate_txn, | |||
"presence_stream", | |||
orderby="stream_id", | |||
start=offset, | |||
limit=limit, | |||
exclude_keyvalues=exclude_keyvalues, | |||
retcols=( | |||
"user_id", | |||
"state", | |||
"last_active_ts", | |||
"last_federation_update_ts", | |||
"last_user_sync_ts", | |||
"status_msg", | |||
"currently_active", | |||
rows = cast( | |||
List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]], | |||
await self.db_pool.runInteraction( | |||
"get_presence_for_all_users", | |||
self.db_pool.simple_select_list_paginate_txn, | |||
"presence_stream", | |||
orderby="stream_id", | |||
start=offset, | |||
limit=limit, | |||
exclude_keyvalues=exclude_keyvalues, | |||
retcols=( | |||
"user_id", | |||
"state", | |||
"last_active_ts", | |||
"last_federation_update_ts", | |||
"last_user_sync_ts", | |||
"status_msg", | |||
"currently_active", | |||
), | |||
order_direction="ASC", | |||
), | |||
order_direction="ASC", | |||
) | |||
for row in rows: | |||
users_to_state[row["user_id"]] = UserPresenceState(**row) | |||
for ( | |||
user_id, | |||
state, | |||
last_active_ts, | |||
last_federation_update_ts, | |||
last_user_sync_ts, | |||
status_msg, | |||
currently_active, | |||
) in rows: | |||
users_to_state[user_id] = UserPresenceState( | |||
user_id=user_id, | |||
state=state, | |||
last_active_ts=last_active_ts, | |||
last_federation_update_ts=last_federation_update_ts, | |||
last_user_sync_ts=last_user_sync_ts, | |||
status_msg=status_msg, | |||
currently_active=bool(currently_active), | |||
) | |||
# We've run out of updates to query | |||
if len(rows) < limit: | |||
@@ -526,7 +526,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): | |||
start: int, | |||
limit: int, | |||
direction: Direction = Direction.FORWARDS, | |||
) -> Tuple[List[JsonDict], int]: | |||
) -> Tuple[List[Tuple[str, int]], int]: | |||
"""Function to retrieve a paginated list of destination's rooms. | |||
This will return a json list of rooms and the | |||
total number of rooms. | |||
@@ -537,12 +537,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): | |||
limit: number of rows to retrieve | |||
direction: sort ascending or descending by room_id | |||
Returns: | |||
A tuple of a dict of rooms and a count of total rooms. | |||
A tuple of a list of room tuples and a count of total rooms. | |||
Each room tuple is room_id, stream_ordering. | |||
""" | |||
def get_destination_rooms_paginate_txn( | |||
txn: LoggingTransaction, | |||
) -> Tuple[List[JsonDict], int]: | |||
) -> Tuple[List[Tuple[str, int]], int]: | |||
if direction == Direction.BACKWARDS: | |||
order = "DESC" | |||
else: | |||
@@ -556,14 +558,17 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): | |||
txn.execute(sql, [destination]) | |||
count = cast(Tuple[int], txn.fetchone())[0] | |||
rooms = self.db_pool.simple_select_list_paginate_txn( | |||
txn=txn, | |||
table="destination_rooms", | |||
orderby="room_id", | |||
start=start, | |||
limit=limit, | |||
retcols=("room_id", "stream_ordering"), | |||
order_direction=order, | |||
rooms = cast( | |||
List[Tuple[str, int]], | |||
self.db_pool.simple_select_list_paginate_txn( | |||
txn=txn, | |||
table="destination_rooms", | |||
orderby="room_id", | |||
start=start, | |||
limit=limit, | |||
retcols=("room_id", "stream_ordering"), | |||
order_direction=order, | |||
), | |||
) | |||
return rooms, count | |||