@@ -29,7 +29,7 @@ from synapse.handlers.pagination import (
PURGE_ROOM_ACTION_NAME,
SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME,
)
from synapse.rest.client import directory, events, login, room
from synapse.rest.client import directory, events, knock, login, room, sync
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
@@ -49,6 +49,8 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
login.register_servlets,
events.register_servlets,
room.register_servlets,
knock.register_servlets,
sync.register_servlets,
room.register_deprecated_servlets,
]
@@ -254,6 +256,55 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self._is_blocked(self.room_id, expect=False)
self._has_no_members(self.room_id)
def test_purge_room_unjoined(self) -> None:
"""Test to purge a room when there are invited or knocked users."""
# Test that room is not purged
with self.assertRaises(AssertionError):
self._is_purged(self.room_id)
# Test that room is not blocked
self._is_blocked(self.room_id, expect=False)
# Assert one user in room
self._is_member(room_id=self.room_id, user_id=self.other_user)
self.helper.send_state(
self.room_id,
EventTypes.JoinRules,
{"join_rule": "knock"},
tok=self.other_user_tok,
)
# Invite a user.
invited_user = self.register_user("invited", "pass")
self.helper.invite(
self.room_id, self.other_user, invited_user, tok=self.other_user_tok
)
# Have a user knock.
knocked_user = self.register_user("knocked", "pass")
knocked_user_tok = self.login("knocked", "pass")
self.helper.knock(self.room_id, knocked_user, tok=knocked_user_tok)
channel = self.make_request(
"DELETE",
self.url.encode("ascii"),
content={"block": False, "purge": True},
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(None, channel.json_body["new_room_id"])
self.assertCountEqual(
[self.other_user, invited_user, knocked_user],
channel.json_body["kicked_users"],
)
self.assertIn("failed_to_kick_users", channel.json_body)
self.assertIn("local_aliases", channel.json_body)
self._is_purged(self.room_id)
self._is_blocked(self.room_id, expect=False)
self._has_no_members(self.room_id)
def test_block_room_and_not_purge(self) -> None:
"""Test to block a room without purging it.
Members will not be moved to a new room and will not receive a message.