|
|
@@ -48,6 +48,11 @@ class PusherWorkerStore(SQLBaseStore): |
|
|
|
self._remove_stale_pushers, |
|
|
|
) |
|
|
|
|
|
|
|
self.db_pool.updates.register_background_update_handler( |
|
|
|
"remove_deleted_email_pushers", |
|
|
|
self._remove_deleted_email_pushers, |
|
|
|
) |
|
|
|
|
|
|
|
def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]: |
|
|
|
"""JSON-decode the data in the rows returned from the `pushers` table |
|
|
|
|
|
|
@@ -388,6 +393,73 @@ class PusherWorkerStore(SQLBaseStore): |
|
|
|
|
|
|
|
return number_deleted |
|
|
|
|
|
|
|
async def _remove_deleted_email_pushers( |
|
|
|
self, progress: dict, batch_size: int |
|
|
|
) -> int: |
|
|
|
"""A background update that deletes all pushers for deleted email addresses. |
|
|
|
|
|
|
|
In previous versions of synapse, when users deleted their email address, it didn't |
|
|
|
also delete all the pushers for that email address. This background update removes |
|
|
|
those to prevent unwanted emails. This should only need to be run once (when users |
|
|
|
upgrade to v1.42.0 |
|
|
|
|
|
|
|
Args: |
|
|
|
progress: dict used to store progress of this background update |
|
|
|
batch_size: the maximum number of rows to retrieve in a single select query |
|
|
|
|
|
|
|
Returns: |
|
|
|
The number of deleted rows |
|
|
|
""" |
|
|
|
|
|
|
|
last_pusher = progress.get("last_pusher", 0) |
|
|
|
|
|
|
|
def _delete_pushers(txn) -> int: |
|
|
|
|
|
|
|
sql = """ |
|
|
|
SELECT p.id, p.user_name, p.app_id, p.pushkey |
|
|
|
FROM pushers AS p |
|
|
|
LEFT JOIN user_threepids AS t |
|
|
|
ON t.user_id = p.user_name |
|
|
|
AND t.medium = 'email' |
|
|
|
AND t.address = p.pushkey |
|
|
|
WHERE t.user_id is NULL |
|
|
|
AND p.app_id = 'm.email' |
|
|
|
AND p.id > ? |
|
|
|
ORDER BY p.id ASC |
|
|
|
LIMIT ? |
|
|
|
""" |
|
|
|
|
|
|
|
txn.execute(sql, (last_pusher, batch_size)) |
|
|
|
|
|
|
|
last = None |
|
|
|
num_deleted = 0 |
|
|
|
for row in txn: |
|
|
|
last = row[0] |
|
|
|
num_deleted += 1 |
|
|
|
self.db_pool.simple_delete_txn( |
|
|
|
txn, |
|
|
|
"pushers", |
|
|
|
{"user_name": row[1], "app_id": row[2], "pushkey": row[3]}, |
|
|
|
) |
|
|
|
|
|
|
|
if last is not None: |
|
|
|
self.db_pool.updates._background_update_progress_txn( |
|
|
|
txn, "remove_deleted_email_pushers", {"last_pusher": last} |
|
|
|
) |
|
|
|
|
|
|
|
return num_deleted |
|
|
|
|
|
|
|
number_deleted = await self.db_pool.runInteraction( |
|
|
|
"_remove_deleted_email_pushers", _delete_pushers |
|
|
|
) |
|
|
|
|
|
|
|
if number_deleted < batch_size: |
|
|
|
await self.db_pool.updates._end_background_update( |
|
|
|
"remove_deleted_email_pushers" |
|
|
|
) |
|
|
|
|
|
|
|
return number_deleted |
|
|
|
|
|
|
|
|
|
|
|
class PusherStore(PusherWorkerStore): |
|
|
|
def get_pushers_stream_token(self) -> int: |
|
|
|