@@ -0,0 +1 @@ | |||||
Stop using the `event_txn_id` table. |
@@ -908,19 +908,6 @@ class EventCreationHandler: | |||||
if existing_event_id: | if existing_event_id: | ||||
return existing_event_id | return existing_event_id | ||||
# Some requsters don't have device IDs (appservice, guests, and access | |||||
# tokens minted with the admin API), fallback to checking the access token | |||||
# ID, which should be close enough. | |||||
if requester.access_token_id: | |||||
existing_event_id = ( | |||||
await self.store.get_event_id_from_transaction_id_and_token_id( | |||||
room_id, | |||||
requester.user.to_string(), | |||||
requester.access_token_id, | |||||
txn_id, | |||||
) | |||||
) | |||||
return existing_event_id | return existing_event_id | ||||
async def get_event_from_transaction( | async def get_event_from_transaction( | ||||
@@ -978,26 +978,12 @@ class PersistEventsStore: | |||||
"""Persist the mapping from transaction IDs to event IDs (if defined).""" | """Persist the mapping from transaction IDs to event IDs (if defined).""" | ||||
inserted_ts = self._clock.time_msec() | inserted_ts = self._clock.time_msec() | ||||
to_insert_token_id: List[Tuple[str, str, str, int, str, int]] = [] | |||||
to_insert_device_id: List[Tuple[str, str, str, str, str, int]] = [] | to_insert_device_id: List[Tuple[str, str, str, str, str, int]] = [] | ||||
for event, _ in events_and_contexts: | for event, _ in events_and_contexts: | ||||
txn_id = getattr(event.internal_metadata, "txn_id", None) | txn_id = getattr(event.internal_metadata, "txn_id", None) | ||||
token_id = getattr(event.internal_metadata, "token_id", None) | |||||
device_id = getattr(event.internal_metadata, "device_id", None) | device_id = getattr(event.internal_metadata, "device_id", None) | ||||
if txn_id is not None: | if txn_id is not None: | ||||
if token_id is not None: | |||||
to_insert_token_id.append( | |||||
( | |||||
event.event_id, | |||||
event.room_id, | |||||
event.sender, | |||||
token_id, | |||||
txn_id, | |||||
inserted_ts, | |||||
) | |||||
) | |||||
if device_id is not None: | if device_id is not None: | ||||
to_insert_device_id.append( | to_insert_device_id.append( | ||||
( | ( | ||||
@@ -1010,26 +996,7 @@ class PersistEventsStore: | |||||
) | ) | ||||
) | ) | ||||
# Synapse usually relies on the device_id to scope transactions for events, | |||||
# except for users without device IDs (appservice, guests, and access | |||||
# tokens minted with the admin API) which use the access token ID instead. | |||||
# | |||||
# TODO https://github.com/matrix-org/synapse/issues/16042 | |||||
if to_insert_token_id: | |||||
self.db_pool.simple_insert_many_txn( | |||||
txn, | |||||
table="event_txn_id", | |||||
keys=( | |||||
"event_id", | |||||
"room_id", | |||||
"user_id", | |||||
"token_id", | |||||
"txn_id", | |||||
"inserted_ts", | |||||
), | |||||
values=to_insert_token_id, | |||||
) | |||||
# Synapse relies on the device_id to scope transactions for events.. | |||||
if to_insert_device_id: | if to_insert_device_id: | ||||
self.db_pool.simple_insert_many_txn( | self.db_pool.simple_insert_many_txn( | ||||
txn, | txn, | ||||
@@ -2022,25 +2022,6 @@ class EventsWorkerStore(SQLBaseStore): | |||||
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn | desc="get_next_event_to_expire", func=get_next_event_to_expire_txn | ||||
) | ) | ||||
async def get_event_id_from_transaction_id_and_token_id( | |||||
self, room_id: str, user_id: str, token_id: int, txn_id: str | |||||
) -> Optional[str]: | |||||
"""Look up if we have already persisted an event for the transaction ID, | |||||
returning the event ID if so. | |||||
""" | |||||
return await self.db_pool.simple_select_one_onecol( | |||||
table="event_txn_id", | |||||
keyvalues={ | |||||
"room_id": room_id, | |||||
"user_id": user_id, | |||||
"token_id": token_id, | |||||
"txn_id": txn_id, | |||||
}, | |||||
retcol="event_id", | |||||
allow_none=True, | |||||
desc="get_event_id_from_transaction_id_and_token_id", | |||||
) | |||||
async def get_event_id_from_transaction_id_and_device_id( | async def get_event_id_from_transaction_id_and_device_id( | ||||
self, room_id: str, user_id: str, device_id: str, txn_id: str | self, room_id: str, user_id: str, device_id: str, txn_id: str | ||||
) -> Optional[str]: | ) -> Optional[str]: | ||||
@@ -2072,29 +2053,35 @@ class EventsWorkerStore(SQLBaseStore): | |||||
""" | """ | ||||
mapping = {} | mapping = {} | ||||
txn_id_to_event: Dict[Tuple[str, int, str], str] = {} | |||||
txn_id_to_event: Dict[Tuple[str, str, str, str], str] = {} | |||||
for event in events: | for event in events: | ||||
token_id = getattr(event.internal_metadata, "token_id", None) | |||||
device_id = getattr(event.internal_metadata, "device_id", None) | |||||
txn_id = getattr(event.internal_metadata, "txn_id", None) | txn_id = getattr(event.internal_metadata, "txn_id", None) | ||||
if token_id and txn_id: | |||||
if device_id and txn_id: | |||||
# Check if this is a duplicate of an event in the given events. | # Check if this is a duplicate of an event in the given events. | ||||
existing = txn_id_to_event.get((event.room_id, token_id, txn_id)) | |||||
existing = txn_id_to_event.get( | |||||
(event.room_id, event.sender, device_id, txn_id) | |||||
) | |||||
if existing: | if existing: | ||||
mapping[event.event_id] = existing | mapping[event.event_id] = existing | ||||
continue | continue | ||||
# Check if this is a duplicate of an event we've already | # Check if this is a duplicate of an event we've already | ||||
# persisted. | # persisted. | ||||
existing = await self.get_event_id_from_transaction_id_and_token_id( | |||||
event.room_id, event.sender, token_id, txn_id | |||||
existing = await self.get_event_id_from_transaction_id_and_device_id( | |||||
event.room_id, event.sender, device_id, txn_id | |||||
) | ) | ||||
if existing: | if existing: | ||||
mapping[event.event_id] = existing | mapping[event.event_id] = existing | ||||
txn_id_to_event[(event.room_id, token_id, txn_id)] = existing | |||||
txn_id_to_event[ | |||||
(event.room_id, event.sender, device_id, txn_id) | |||||
] = existing | |||||
else: | else: | ||||
txn_id_to_event[(event.room_id, token_id, txn_id)] = event.event_id | |||||
txn_id_to_event[ | |||||
(event.room_id, event.sender, device_id, txn_id) | |||||
] = event.event_id | |||||
return mapping | return mapping | ||||
@@ -12,7 +12,7 @@ | |||||
# See the License for the specific language governing permissions and | # See the License for the specific language governing permissions and | ||||
# limitations under the License. | # limitations under the License. | ||||
SCHEMA_VERSION = 80 # remember to update the list below when updating | |||||
SCHEMA_VERSION = 81 # remember to update the list below when updating | |||||
"""Represents the expectations made by the codebase about the database schema | """Represents the expectations made by the codebase about the database schema | ||||
This should be incremented whenever the codebase changes its requirements on the | This should be incremented whenever the codebase changes its requirements on the | ||||
@@ -114,19 +114,15 @@ Changes in SCHEMA_VERSION = 79 | |||||
Changes in SCHEMA_VERSION = 80 | Changes in SCHEMA_VERSION = 80 | ||||
- The event_txn_id_device_id is always written to for new events. | - The event_txn_id_device_id is always written to for new events. | ||||
- Add tables for the task scheduler. | - Add tables for the task scheduler. | ||||
Changes in SCHEMA_VERSION = 81 | |||||
- The event_txn_id is no longer written to for new events. | |||||
""" | """ | ||||
SCHEMA_COMPAT_VERSION = ( | SCHEMA_COMPAT_VERSION = ( | ||||
# Queries against `event_stream_ordering` columns in membership tables must | |||||
# be disambiguated. | |||||
# | |||||
# The threads_id column must written to with non-null values for the | |||||
# event_push_actions, event_push_actions_staging, and event_push_summary tables. | |||||
# | |||||
# insertions to the column `full_user_id` of tables profiles and user_filters can no | |||||
# longer be null | |||||
76 | |||||
# The `event_txn_id_device_id` must be written to for new events. | |||||
80 | |||||
) | ) | ||||
"""Limit on how far the synapse codebase can be rolled back without breaking db compat | """Limit on how far the synapse codebase can be rolled back without breaking db compat | ||||
@@ -46,18 +46,11 @@ class EventCreationTestCase(unittest.HomeserverTestCase): | |||||
self._persist_event_storage_controller = persistence | self._persist_event_storage_controller = persistence | ||||
self.user_id = self.register_user("tester", "foobar") | self.user_id = self.register_user("tester", "foobar") | ||||
self.access_token = self.login("tester", "foobar") | |||||
self.room_id = self.helper.create_room_as(self.user_id, tok=self.access_token) | |||||
info = self.get_success( | |||||
self.hs.get_datastores().main.get_user_by_access_token( | |||||
self.access_token, | |||||
) | |||||
) | |||||
assert info is not None | |||||
self.token_id = info.token_id | |||||
device_id = "dev-1" | |||||
access_token = self.login("tester", "foobar", device_id=device_id) | |||||
self.room_id = self.helper.create_room_as(self.user_id, tok=access_token) | |||||
self.requester = create_requester(self.user_id, access_token_id=self.token_id) | |||||
self.requester = create_requester(self.user_id, device_id=device_id) | |||||
def _create_and_persist_member_event(self) -> Tuple[EventBase, EventContext]: | def _create_and_persist_member_event(self) -> Tuple[EventBase, EventContext]: | ||||
# Create a member event we can use as an auth_event | # Create a member event we can use as an auth_event | ||||