You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

133 lines
4.0 KiB

  1. # Copyright 2019-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """
  15. Utilities for running the unit tests
  16. """
  17. import json
  18. import sys
  19. import warnings
  20. from binascii import unhexlify
  21. from typing import TYPE_CHECKING, Awaitable, Callable, Tuple, TypeVar
  22. import attr
  23. import zope.interface
  24. from twisted.internet.interfaces import IProtocol
  25. from twisted.python.failure import Failure
  26. from twisted.web.client import ResponseDone
  27. from twisted.web.http import RESPONSES
  28. from twisted.web.http_headers import Headers
  29. from twisted.web.iweb import IResponse
  30. from synapse.types import JsonSerializable
  31. if TYPE_CHECKING:
  32. from sys import UnraisableHookArgs
  33. TV = TypeVar("TV")
  34. def get_awaitable_result(awaitable: Awaitable[TV]) -> TV:
  35. """Get the result from an Awaitable which should have completed
  36. Asserts that the given awaitable has a result ready, and returns its value
  37. """
  38. i = awaitable.__await__()
  39. try:
  40. next(i)
  41. except StopIteration as e:
  42. # awaitable returned a result
  43. return e.value
  44. # if next didn't raise, the awaitable hasn't completed.
  45. raise Exception("awaitable has not yet completed")
  46. def setup_awaitable_errors() -> Callable[[], None]:
  47. """
  48. Convert warnings from a non-awaited coroutines into errors.
  49. """
  50. warnings.simplefilter("error", RuntimeWarning)
  51. # State shared between unraisablehook and check_for_unraisable_exceptions.
  52. unraisable_exceptions = []
  53. orig_unraisablehook = sys.unraisablehook
  54. def unraisablehook(unraisable: "UnraisableHookArgs") -> None:
  55. unraisable_exceptions.append(unraisable.exc_value)
  56. def cleanup() -> None:
  57. """
  58. A method to be used as a clean-up that fails a test-case if there are any new unraisable exceptions.
  59. """
  60. sys.unraisablehook = orig_unraisablehook
  61. if unraisable_exceptions:
  62. exc = unraisable_exceptions.pop()
  63. assert exc is not None
  64. raise exc
  65. sys.unraisablehook = unraisablehook
  66. return cleanup
  67. # Type ignore: it does not fully implement IResponse, but is good enough for tests
  68. @zope.interface.implementer(IResponse)
  69. @attr.s(slots=True, frozen=True, auto_attribs=True)
  70. class FakeResponse: # type: ignore[misc]
  71. """A fake twisted.web.IResponse object
  72. there is a similar class at treq.test.test_response, but it lacks a `phrase`
  73. attribute, and didn't support deliverBody until recently.
  74. """
  75. version: Tuple[bytes, int, int] = (b"HTTP", 1, 1)
  76. # HTTP response code
  77. code: int = 200
  78. # body of the response
  79. body: bytes = b""
  80. headers: Headers = attr.Factory(Headers)
  81. @property
  82. def phrase(self) -> bytes:
  83. return RESPONSES.get(self.code, b"Unknown Status")
  84. @property
  85. def length(self) -> int:
  86. return len(self.body)
  87. def deliverBody(self, protocol: IProtocol) -> None:
  88. protocol.dataReceived(self.body)
  89. protocol.connectionLost(Failure(ResponseDone()))
  90. @classmethod
  91. def json(cls, *, code: int = 200, payload: JsonSerializable) -> "FakeResponse":
  92. headers = Headers({"Content-Type": ["application/json"]})
  93. body = json.dumps(payload).encode("utf-8")
  94. return cls(code=code, body=body, headers=headers)
  95. # A small image used in some tests.
  96. #
  97. # Resolution: 1×1, MIME type: image/png, Extension: png, Size: 67 B
  98. SMALL_PNG = unhexlify(
  99. b"89504e470d0a1a0a0000000d4948445200000001000000010806"
  100. b"0000001f15c4890000000a49444154789c63000100000500010d"
  101. b"0a2db40000000049454e44ae426082"
  102. )