Browse Source

Fix type hints in typing edu unit tests (#14886)

tags/v1.77.0rc1
Andrew Morgan 1 year ago
committed by GitHub
parent
commit
871ff05add
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 66 additions and 43 deletions
  1. +1
    -0
      changelog.d/14886.misc
  2. +0
    -1
      mypy.ini
  3. +59
    -40
      tests/handlers/test_typing.py
  4. +4
    -1
      tests/storage/test_user_directory.py
  5. +2
    -1
      tests/unittest.py

+ 1
- 0
changelog.d/14886.misc View File

@@ -0,0 +1 @@
Add missing type hints.

+ 0
- 1
mypy.ini View File

@@ -37,7 +37,6 @@ exclude = (?x)
|tests/appservice/test_scheduler.py
|tests/federation/test_federation_catch_up.py
|tests/federation/test_federation_sender.py
|tests/handlers/test_typing.py
|tests/http/federation/test_matrix_federation_agent.py
|tests/http/federation/test_srv_resolver.py
|tests/http/test_proxyagent.py


+ 59
- 40
tests/handlers/test_typing.py View File

@@ -14,21 +14,22 @@


import json
from typing import Dict
from typing import Dict, List, Set
from unittest.mock import ANY, Mock, call

from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactor
from twisted.web.resource import Resource

from synapse.api.constants import EduTypes
from synapse.api.errors import AuthError
from synapse.federation.transport.server import TransportLayerServer
from synapse.handlers.typing import TypingWriterHandler
from synapse.server import HomeServer
from synapse.types import JsonDict, Requester, UserID, create_requester
from synapse.util import Clock

from tests import unittest
from tests.server import ThreadedMemoryReactorClock
from tests.test_utils import make_awaitable
from tests.unittest import override_config

@@ -62,7 +63,11 @@ def _make_edu_transaction_json(edu_type: str, content: JsonDict) -> bytes:


class TypingNotificationsTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
def make_homeserver(
self,
reactor: ThreadedMemoryReactorClock,
clock: Clock,
) -> HomeServer:
# we mock out the keyring so as to skip the authentication check on the
# federation API call.
mock_keyring = Mock(spec=["verify_json_for_server"])
@@ -75,8 +80,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
# the tests assume that we are starting at unix time 1000
reactor.pump((1000,))

self.mock_hs_notifier = Mock()
hs = self.setup_test_homeserver(
notifier=Mock(),
notifier=self.mock_hs_notifier,
federation_http_client=mock_federation_client,
keyring=mock_keyring,
replication_streams={},
@@ -90,32 +96,38 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
return d

def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
mock_notifier = hs.get_notifier()
self.on_new_event = mock_notifier.on_new_event
self.on_new_event = self.mock_hs_notifier.on_new_event

self.handler = hs.get_typing_handler()
# hs.get_typing_handler will return a TypingWriterHandler when calling it
# from the main process, and a FollowerTypingHandler on workers.
# We rely on methods only available on the former, so assert we have the
# correct type here. We have to assign self.handler after the assert,
# otherwise mypy will treat it as a FollowerTypingHandler
handler = hs.get_typing_handler()
assert isinstance(handler, TypingWriterHandler)
self.handler = handler

self.event_source = hs.get_event_sources().sources.typing

self.datastore = hs.get_datastores().main

self.datastore.get_destination_retry_timings = Mock(
return_value=make_awaitable(None)
)

self.datastore.get_device_updates_by_remote = Mock(
self.datastore.get_device_updates_by_remote = Mock( # type: ignore[assignment]
return_value=make_awaitable((0, []))
)

self.datastore.get_destination_last_successful_stream_ordering = Mock(
self.datastore.get_destination_last_successful_stream_ordering = Mock( # type: ignore[assignment]
return_value=make_awaitable(None)
)

def get_received_txn_response(*args):
return defer.succeed(None)

self.datastore.get_received_txn_response = get_received_txn_response
self.datastore.get_received_txn_response = Mock( # type: ignore[assignment]
return_value=make_awaitable(None)
)

self.room_members = []
self.room_members: List[UserID] = []

async def check_user_in_room(room_id: str, requester: Requester) -> None:
if requester.user.to_string() not in [
@@ -124,47 +136,54 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
raise AuthError(401, "User is not in the room")
return None

hs.get_auth().check_user_in_room = check_user_in_room
hs.get_auth().check_user_in_room = Mock( # type: ignore[assignment]
side_effect=check_user_in_room
)

async def check_host_in_room(room_id: str, server_name: str) -> bool:
return room_id == ROOM_ID

hs.get_event_auth_handler().is_host_in_room = check_host_in_room
hs.get_event_auth_handler().is_host_in_room = Mock( # type: ignore[assignment]
side_effect=check_host_in_room
)

async def get_current_hosts_in_room(room_id: str):
async def get_current_hosts_in_room(room_id: str) -> Set[str]:
return {member.domain for member in self.room_members}

hs.get_storage_controllers().state.get_current_hosts_in_room = (
get_current_hosts_in_room
hs.get_storage_controllers().state.get_current_hosts_in_room = Mock( # type: ignore[assignment]
side_effect=get_current_hosts_in_room
)

hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = (
get_current_hosts_in_room
hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = Mock( # type: ignore[assignment]
side_effect=get_current_hosts_in_room
)

async def get_users_in_room(room_id: str):
async def get_users_in_room(room_id: str) -> Set[str]:
return {str(u) for u in self.room_members}

self.datastore.get_users_in_room = get_users_in_room
self.datastore.get_users_in_room = Mock(side_effect=get_users_in_room)

self.datastore.get_user_directory_stream_pos = Mock(
self.datastore.get_user_directory_stream_pos = Mock( # type: ignore[assignment]
side_effect=(
# we deliberately return a non-None stream pos to avoid doing an initial_spam
# we deliberately return a non-None stream pos to avoid
# doing an initial_sync
lambda: make_awaitable(1)
)
)

self.datastore.get_partial_current_state_deltas = Mock(return_value=(0, None))
self.datastore.get_partial_current_state_deltas = Mock(return_value=(0, None)) # type: ignore[assignment]

self.datastore.get_to_device_stream_token = lambda: 0
self.datastore.get_new_device_msgs_for_remote = (
lambda *args, **kargs: make_awaitable(([], 0))
self.datastore.get_to_device_stream_token = Mock( # type: ignore[assignment]
side_effect=lambda: 0
)
self.datastore.get_new_device_msgs_for_remote = Mock( # type: ignore[assignment]
side_effect=lambda *args, **kargs: make_awaitable(([], 0))
)
self.datastore.delete_device_msgs_for_remote = (
lambda *args, **kargs: make_awaitable(None)
self.datastore.delete_device_msgs_for_remote = Mock( # type: ignore[assignment]
side_effect=lambda *args, **kargs: make_awaitable(None)
)
self.datastore.set_received_txn_response = (
lambda *args, **kwargs: make_awaitable(None)
self.datastore.set_received_txn_response = Mock( # type: ignore[assignment]
side_effect=lambda *args, **kwargs: make_awaitable(None)
)

def test_started_typing_local(self) -> None:
@@ -186,7 +205,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.assertEqual(self.event_source.get_current_key(), 1)
events = self.get_success(
self.event_source.get_new_events(
user=U_APPLE, from_key=0, limit=None, room_ids=[ROOM_ID], is_guest=False
user=U_APPLE, from_key=0, limit=0, room_ids=[ROOM_ID], is_guest=False
)
)
self.assertEqual(
@@ -257,7 +276,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.assertEqual(self.event_source.get_current_key(), 1)
events = self.get_success(
self.event_source.get_new_events(
user=U_APPLE, from_key=0, limit=None, room_ids=[ROOM_ID], is_guest=False
user=U_APPLE, from_key=0, limit=0, room_ids=[ROOM_ID], is_guest=False
)
)
self.assertEqual(
@@ -298,7 +317,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.event_source.get_new_events(
user=U_APPLE,
from_key=0,
limit=None,
limit=0,
room_ids=[OTHER_ROOM_ID],
is_guest=False,
)
@@ -351,7 +370,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.assertEqual(self.event_source.get_current_key(), 1)
events = self.get_success(
self.event_source.get_new_events(
user=U_APPLE, from_key=0, limit=None, room_ids=[ROOM_ID], is_guest=False
user=U_APPLE, from_key=0, limit=0, room_ids=[ROOM_ID], is_guest=False
)
)
self.assertEqual(
@@ -387,7 +406,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.event_source.get_new_events(
user=U_APPLE,
from_key=0,
limit=None,
limit=0,
room_ids=[ROOM_ID],
is_guest=False,
)
@@ -412,7 +431,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.event_source.get_new_events(
user=U_APPLE,
from_key=1,
limit=None,
limit=0,
room_ids=[ROOM_ID],
is_guest=False,
)
@@ -447,7 +466,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.event_source.get_new_events(
user=U_APPLE,
from_key=0,
limit=None,
limit=0,
room_ids=[ROOM_ID],
is_guest=False,
)


+ 4
- 1
tests/storage/test_user_directory.py View File

@@ -28,6 +28,7 @@ from synapse.storage.background_updates import _BackgroundUpdateHandler
from synapse.storage.roommember import ProfileInfo
from synapse.util import Clock

from tests.server import ThreadedMemoryReactorClock
from tests.test_utils.event_injection import inject_member_event
from tests.unittest import HomeserverTestCase, override_config

@@ -138,7 +139,9 @@ class UserDirectoryInitialPopulationTestcase(HomeserverTestCase):
register.register_servlets,
]

def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
def make_homeserver(
self, reactor: ThreadedMemoryReactorClock, clock: Clock
) -> HomeServer:
self.appservice = ApplicationService(
token="i_am_an_app_service",
id="1234",


+ 2
- 1
tests/unittest.py View File

@@ -75,6 +75,7 @@ from synapse.util.httpresourcetree import create_resource_tree
from tests.server import (
CustomHeaderType,
FakeChannel,
ThreadedMemoryReactorClock,
get_clock,
make_request,
setup_test_homeserver,
@@ -360,7 +361,7 @@ class HomeserverTestCase(TestCase):
store.db_pool.updates.do_next_background_update(False), by=0.1
)

def make_homeserver(self, reactor: MemoryReactor, clock: Clock):
def make_homeserver(self, reactor: ThreadedMemoryReactorClock, clock: Clock):
"""
Make and return a homeserver.



Loading…
Cancel
Save