|
|
@@ -67,7 +67,7 @@ from synapse.storage.database import ( |
|
|
|
make_in_list_sql_clause, |
|
|
|
) |
|
|
|
from synapse.storage.databases.main.events_worker import EventsWorkerStore |
|
|
|
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine |
|
|
|
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine |
|
|
|
from synapse.storage.util.id_generators import MultiWriterIdGenerator |
|
|
|
from synapse.types import PersistedEventPosition, RoomStreamToken |
|
|
|
from synapse.util.caches.descriptors import cached |
|
|
@@ -944,12 +944,40 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): |
|
|
|
room_id |
|
|
|
stream_key |
|
|
|
""" |
|
|
|
sql = ( |
|
|
|
"SELECT coalesce(MIN(topological_ordering), 0) FROM events" |
|
|
|
" WHERE room_id = ? AND stream_ordering >= ?" |
|
|
|
) |
|
|
|
if isinstance(self.database_engine, PostgresEngine): |
|
|
|
min_function = "LEAST" |
|
|
|
elif isinstance(self.database_engine, Sqlite3Engine): |
|
|
|
min_function = "MIN" |
|
|
|
else: |
|
|
|
raise RuntimeError(f"Unknown database engine {self.database_engine}") |
|
|
|
|
|
|
|
# This query used to be |
|
|
|
# SELECT COALESCE(MIN(topological_ordering), 0) FROM events |
|
|
|
# WHERE room_id = ? and events.stream_ordering >= {stream_key} |
|
|
|
# which returns 0 if the stream_key is newer than any event in |
|
|
|
# the room. That's not wrong, but it seems to interact oddly with backfill, |
|
|
|
# requiring a second call to /messages to actually backfill from a remote |
|
|
|
# homeserver. |
|
|
|
# |
|
|
|
# Instead, rollback the stream ordering to that after the most recent event in |
|
|
|
# this room. |
|
|
|
sql = f""" |
|
|
|
WITH fallback(max_stream_ordering) AS ( |
|
|
|
SELECT MAX(stream_ordering) |
|
|
|
FROM events |
|
|
|
WHERE room_id = ? |
|
|
|
) |
|
|
|
SELECT COALESCE(MIN(topological_ordering), 0) FROM events |
|
|
|
WHERE |
|
|
|
room_id = ? |
|
|
|
AND events.stream_ordering >= {min_function}( |
|
|
|
?, |
|
|
|
(SELECT max_stream_ordering FROM fallback) |
|
|
|
) |
|
|
|
""" |
|
|
|
|
|
|
|
row = await self.db_pool.execute( |
|
|
|
"get_current_topological_token", None, sql, room_id, stream_key |
|
|
|
"get_current_topological_token", None, sql, room_id, room_id, stream_key |
|
|
|
) |
|
|
|
return row[0][0] if row else 0 |
|
|
|
|
|
|
|