@@ -0,0 +1 @@ | |||
Rename pagination and purge locks and add comments to explain why they exist and how they work. |
@@ -63,7 +63,7 @@ from synapse.federation.federation_base import ( | |||
) | |||
from synapse.federation.persistence import TransactionActions | |||
from synapse.federation.units import Edu, Transaction | |||
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME | |||
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME | |||
from synapse.http.servlet import assert_params_in_dict | |||
from synapse.logging.context import ( | |||
make_deferred_yieldable, | |||
@@ -1245,7 +1245,7 @@ class FederationServer(FederationBase): | |||
# while holding the `_INBOUND_EVENT_HANDLING_LOCK_NAME` | |||
# lock. | |||
async with self._worker_lock_handler.acquire_read_write_lock( | |||
DELETE_ROOM_LOCK_NAME, room_id, write=False | |||
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False | |||
): | |||
await self._federation_event_handler.on_receive_pdu( | |||
origin, event | |||
@@ -53,7 +53,7 @@ from synapse.events.snapshot import EventContext, UnpersistedEventContextBase | |||
from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field | |||
from synapse.events.validator import EventValidator | |||
from synapse.handlers.directory import DirectoryHandler | |||
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME | |||
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME | |||
from synapse.logging import opentracing | |||
from synapse.logging.context import make_deferred_yieldable, run_in_background | |||
from synapse.metrics.background_process_metrics import run_as_background_process | |||
@@ -1034,7 +1034,7 @@ class EventCreationHandler: | |||
) | |||
async with self._worker_lock_handler.acquire_read_write_lock( | |||
DELETE_ROOM_LOCK_NAME, room_id, write=False | |||
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False | |||
): | |||
return await self._create_and_send_nonmember_event_locked( | |||
requester=requester, | |||
@@ -1978,7 +1978,7 @@ class EventCreationHandler: | |||
for room_id in room_ids: | |||
async with self._worker_lock_handler.acquire_read_write_lock( | |||
DELETE_ROOM_LOCK_NAME, room_id, write=False | |||
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False | |||
): | |||
dummy_event_sent = await self._send_dummy_event_for_room(room_id) | |||
@@ -24,6 +24,7 @@ from synapse.api.errors import SynapseError | |||
from synapse.api.filtering import Filter | |||
from synapse.events.utils import SerializeEventConfig | |||
from synapse.handlers.room import ShutdownRoomResponse | |||
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME | |||
from synapse.logging.opentracing import trace | |||
from synapse.metrics.background_process_metrics import run_as_background_process | |||
from synapse.rest.admin._base import assert_user_is_admin | |||
@@ -46,9 +47,10 @@ logger = logging.getLogger(__name__) | |||
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3 | |||
PURGE_HISTORY_LOCK_NAME = "purge_history_lock" | |||
DELETE_ROOM_LOCK_NAME = "delete_room_lock" | |||
# This is used to avoid purging a room several time at the same moment, | |||
# and also paginating during a purge. Pagination can trigger backfill, | |||
# which would create old events locally, and would potentially clash with the room delete. | |||
PURGE_PAGINATION_LOCK_NAME = "purge_pagination_lock" | |||
@attr.s(slots=True, auto_attribs=True) | |||
@@ -363,7 +365,7 @@ class PaginationHandler: | |||
self._purges_in_progress_by_room.add(room_id) | |||
try: | |||
async with self._worker_locks.acquire_read_write_lock( | |||
PURGE_HISTORY_LOCK_NAME, room_id, write=True | |||
PURGE_PAGINATION_LOCK_NAME, room_id, write=True | |||
): | |||
await self._storage_controllers.purge_events.purge_history( | |||
room_id, token, delete_local_events | |||
@@ -421,7 +423,10 @@ class PaginationHandler: | |||
force: set true to skip checking for joined users. | |||
""" | |||
async with self._worker_locks.acquire_multi_read_write_lock( | |||
[(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)], | |||
[ | |||
(PURGE_PAGINATION_LOCK_NAME, room_id), | |||
(NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id), | |||
], | |||
write=True, | |||
): | |||
# first check that we have no users in this room | |||
@@ -483,7 +488,7 @@ class PaginationHandler: | |||
room_token = from_token.room_key | |||
async with self._worker_locks.acquire_read_write_lock( | |||
PURGE_HISTORY_LOCK_NAME, room_id, write=False | |||
PURGE_PAGINATION_LOCK_NAME, room_id, write=False | |||
): | |||
(membership, member_event_id) = (None, None) | |||
if not use_admin_priviledge: | |||
@@ -761,7 +766,7 @@ class PaginationHandler: | |||
self._purges_in_progress_by_room.add(room_id) | |||
try: | |||
async with self._worker_locks.acquire_read_write_lock( | |||
PURGE_HISTORY_LOCK_NAME, room_id, write=True | |||
PURGE_PAGINATION_LOCK_NAME, room_id, write=True | |||
): | |||
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN | |||
self._delete_by_id[ | |||
@@ -39,7 +39,7 @@ from synapse.events import EventBase | |||
from synapse.events.snapshot import EventContext | |||
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN | |||
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler | |||
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME | |||
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME | |||
from synapse.logging import opentracing | |||
from synapse.metrics import event_processing_positions | |||
from synapse.metrics.background_process_metrics import run_as_background_process | |||
@@ -621,7 +621,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): | |||
async with self.member_as_limiter.queue(as_id): | |||
async with self.member_linearizer.queue(key): | |||
async with self._worker_lock_handler.acquire_read_write_lock( | |||
DELETE_ROOM_LOCK_NAME, room_id, write=False | |||
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False | |||
): | |||
with opentracing.start_active_span("update_membership_locked"): | |||
result = await self.update_membership_locked( | |||
@@ -42,7 +42,11 @@ if TYPE_CHECKING: | |||
from synapse.server import HomeServer | |||
DELETE_ROOM_LOCK_NAME = "delete_room_lock" | |||
# This lock is used to avoid creating an event while we are purging the room. | |||
# We take a read lock when creating an event, and a write one when purging a room. | |||
# This is because it is fine to create several events concurrently, since referenced events | |||
# will not disappear under our feet as long as we don't delete the room. | |||
NEW_EVENT_DURING_PURGE_LOCK_NAME = "new_event_during_purge_lock" | |||
class WorkerLocksHandler: | |||
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Tuple | |||
from synapse.api.errors import Codes, ShadowBanError, SynapseError | |||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS | |||
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME | |||
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME | |||
from synapse.http.server import HttpServer | |||
from synapse.http.servlet import ( | |||
RestServlet, | |||
@@ -81,7 +81,7 @@ class RoomUpgradeRestServlet(RestServlet): | |||
try: | |||
async with self._worker_lock_handler.acquire_read_write_lock( | |||
DELETE_ROOM_LOCK_NAME, room_id, write=False | |||
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False | |||
): | |||
new_room_id = await self._room_creation_handler.upgrade_room( | |||
requester, room_id, new_version | |||
@@ -45,7 +45,7 @@ from twisted.internet import defer | |||
from synapse.api.constants import EventTypes, Membership | |||
from synapse.events import EventBase | |||
from synapse.events.snapshot import EventContext | |||
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME | |||
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME | |||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable | |||
from synapse.logging.opentracing import ( | |||
SynapseTags, | |||
@@ -357,7 +357,7 @@ class EventsPersistenceStorageController: | |||
# it. We might already have taken out the lock, but since this is just a | |||
# "read" lock its inherently reentrant. | |||
async with self.hs.get_worker_locks_handler().acquire_read_write_lock( | |||
DELETE_ROOM_LOCK_NAME, room_id, write=False | |||
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False | |||
): | |||
if isinstance(task, _PersistEventsTask): | |||
return await self._persist_event_batch(room_id, task) | |||