|
|
@@ -40,6 +40,7 @@ from synapse.util.caches.descriptors import cached |
|
|
|
from synapse.api.constants import EventTypes |
|
|
|
from synapse.types import RoomStreamToken |
|
|
|
from synapse.util.logcontext import preserve_fn |
|
|
|
from synapse.storage.engines import PostgresEngine |
|
|
|
|
|
|
|
import logging |
|
|
|
|
|
|
@@ -54,25 +55,41 @@ _STREAM_TOKEN = "stream" |
|
|
|
_TOPOLOGICAL_TOKEN = "topological" |
|
|
|
|
|
|
|
|
|
|
|
def lower_bound(token): |
|
|
|
def lower_bound(token, engine, inclusive=""): |
|
|
|
if token.topological is None: |
|
|
|
return "(%d < %s)" % (token.stream, "stream_ordering") |
|
|
|
return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering") |
|
|
|
else: |
|
|
|
return "(%d < %s OR (%d = %s AND %d < %s))" % ( |
|
|
|
if isinstance(engine, PostgresEngine): |
|
|
|
# Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well |
|
|
|
# as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we |
|
|
|
# use the later form when running against postgres. |
|
|
|
return "((%d,%d) <%s (%s,%s))" % ( |
|
|
|
token.topological, token.stream, inclusive, |
|
|
|
"topological_ordering", "stream_ordering", |
|
|
|
) |
|
|
|
return "(%d < %s OR (%d = %s AND %d <%s %s))" % ( |
|
|
|
token.topological, "topological_ordering", |
|
|
|
token.topological, "topological_ordering", |
|
|
|
token.stream, "stream_ordering", |
|
|
|
token.stream, inclusive, "stream_ordering", |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def upper_bound(token): |
|
|
|
def upper_bound(token, engine, inclusive="="): |
|
|
|
if token.topological is None: |
|
|
|
return "(%d >= %s)" % (token.stream, "stream_ordering") |
|
|
|
return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering") |
|
|
|
else: |
|
|
|
return "(%d > %s OR (%d = %s AND %d >= %s))" % ( |
|
|
|
if isinstance(engine, PostgresEngine): |
|
|
|
# Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well |
|
|
|
# as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we |
|
|
|
# use the later form when running against postgres. |
|
|
|
return "((%d,%d) >%s (%s,%s))" % ( |
|
|
|
token.topological, token.stream, inclusive, |
|
|
|
"topological_ordering", "stream_ordering", |
|
|
|
) |
|
|
|
return "(%d > %s OR (%d = %s AND %d >%s %s))" % ( |
|
|
|
token.topological, "topological_ordering", |
|
|
|
token.topological, "topological_ordering", |
|
|
|
token.stream, "stream_ordering", |
|
|
|
token.stream, inclusive, "stream_ordering", |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
@@ -308,18 +325,22 @@ class StreamStore(SQLBaseStore): |
|
|
|
args = [False, room_id] |
|
|
|
if direction == 'b': |
|
|
|
order = "DESC" |
|
|
|
bounds = upper_bound(RoomStreamToken.parse(from_key)) |
|
|
|
bounds = upper_bound( |
|
|
|
RoomStreamToken.parse(from_key), self.database_engine |
|
|
|
) |
|
|
|
if to_key: |
|
|
|
bounds = "%s AND %s" % ( |
|
|
|
bounds, lower_bound(RoomStreamToken.parse(to_key)) |
|
|
|
) |
|
|
|
bounds = "%s AND %s" % (bounds, lower_bound( |
|
|
|
RoomStreamToken.parse(to_key), self.database_engine |
|
|
|
)) |
|
|
|
else: |
|
|
|
order = "ASC" |
|
|
|
bounds = lower_bound(RoomStreamToken.parse(from_key)) |
|
|
|
bounds = lower_bound( |
|
|
|
RoomStreamToken.parse(from_key), self.database_engine |
|
|
|
) |
|
|
|
if to_key: |
|
|
|
bounds = "%s AND %s" % ( |
|
|
|
bounds, upper_bound(RoomStreamToken.parse(to_key)) |
|
|
|
) |
|
|
|
bounds = "%s AND %s" % (bounds, upper_bound( |
|
|
|
RoomStreamToken.parse(to_key), self.database_engine |
|
|
|
)) |
|
|
|
|
|
|
|
if int(limit) > 0: |
|
|
|
args.append(int(limit)) |
|
|
@@ -586,35 +607,24 @@ class StreamStore(SQLBaseStore): |
|
|
|
retcols=["stream_ordering", "topological_ordering"], |
|
|
|
) |
|
|
|
|
|
|
|
stream_ordering = results["stream_ordering"] |
|
|
|
topological_ordering = results["topological_ordering"] |
|
|
|
token = RoomStreamToken( |
|
|
|
results["topological_ordering"], |
|
|
|
results["stream_ordering"], |
|
|
|
) |
|
|
|
|
|
|
|
query_before = ( |
|
|
|
"SELECT topological_ordering, stream_ordering, event_id FROM events" |
|
|
|
" WHERE room_id = ? AND topological_ordering < ?" |
|
|
|
" UNION ALL " |
|
|
|
" SELECT topological_ordering, stream_ordering, event_id FROM events" |
|
|
|
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?" |
|
|
|
" WHERE room_id = ? AND %s" |
|
|
|
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" |
|
|
|
) |
|
|
|
) % (upper_bound(token, self.database_engine, inclusive=""),) |
|
|
|
|
|
|
|
query_after = ( |
|
|
|
"SELECT topological_ordering, stream_ordering, event_id FROM events" |
|
|
|
" WHERE room_id = ? AND topological_ordering > ?" |
|
|
|
" UNION ALL" |
|
|
|
" SELECT topological_ordering, stream_ordering, event_id FROM events" |
|
|
|
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?" |
|
|
|
" WHERE room_id = ? AND %s" |
|
|
|
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" |
|
|
|
) |
|
|
|
) % (lower_bound(token, self.database_engine, inclusive=""),) |
|
|
|
|
|
|
|
txn.execute( |
|
|
|
query_before, |
|
|
|
( |
|
|
|
room_id, topological_ordering, |
|
|
|
room_id, topological_ordering, stream_ordering, |
|
|
|
before_limit, |
|
|
|
) |
|
|
|
) |
|
|
|
txn.execute(query_before, (room_id, before_limit)) |
|
|
|
|
|
|
|
rows = self.cursor_to_dict(txn) |
|
|
|
events_before = [r["event_id"] for r in rows] |
|
|
@@ -626,18 +636,11 @@ class StreamStore(SQLBaseStore): |
|
|
|
)) |
|
|
|
else: |
|
|
|
start_token = str(RoomStreamToken( |
|
|
|
topological_ordering, |
|
|
|
stream_ordering - 1, |
|
|
|
token.topological, |
|
|
|
token.stream - 1, |
|
|
|
)) |
|
|
|
|
|
|
|
txn.execute( |
|
|
|
query_after, |
|
|
|
( |
|
|
|
room_id, topological_ordering, |
|
|
|
room_id, topological_ordering, stream_ordering, |
|
|
|
after_limit, |
|
|
|
) |
|
|
|
) |
|
|
|
txn.execute(query_after, (room_id, after_limit)) |
|
|
|
|
|
|
|
rows = self.cursor_to_dict(txn) |
|
|
|
events_after = [r["event_id"] for r in rows] |
|
|
@@ -648,10 +651,7 @@ class StreamStore(SQLBaseStore): |
|
|
|
rows[-1]["stream_ordering"], |
|
|
|
)) |
|
|
|
else: |
|
|
|
end_token = str(RoomStreamToken( |
|
|
|
topological_ordering, |
|
|
|
stream_ordering, |
|
|
|
)) |
|
|
|
end_token = str(token) |
|
|
|
|
|
|
|
return { |
|
|
|
"before": { |
|
|
|