You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

107 lines
3.6 KiB

  1. # Copyright 2022 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from http import HTTPStatus
  15. from typing import Tuple
  16. from twisted.web.server import Request
  17. from synapse.api.errors import Codes
  18. from synapse.http.server import JsonResource
  19. from synapse.replication.http import REPLICATION_PREFIX
  20. from synapse.replication.http._base import ReplicationEndpoint
  21. from synapse.server import HomeServer
  22. from synapse.types import JsonDict
  23. from synapse.util.cancellation import cancellable
  24. from tests import unittest
  25. from tests.http.server._base import test_disconnect
  26. class CancellableReplicationEndpoint(ReplicationEndpoint):
  27. NAME = "cancellable_sleep"
  28. PATH_ARGS = ()
  29. CACHE = False
  30. def __init__(self, hs: HomeServer):
  31. super().__init__(hs)
  32. self.clock = hs.get_clock()
  33. @staticmethod
  34. async def _serialize_payload() -> JsonDict:
  35. return {}
  36. @cancellable
  37. async def _handle_request( # type: ignore[override]
  38. self, request: Request, content: JsonDict
  39. ) -> Tuple[int, JsonDict]:
  40. await self.clock.sleep(1.0)
  41. return HTTPStatus.OK, {"result": True}
  42. class UncancellableReplicationEndpoint(ReplicationEndpoint):
  43. NAME = "uncancellable_sleep"
  44. PATH_ARGS = ()
  45. CACHE = False
  46. WAIT_FOR_STREAMS = False
  47. def __init__(self, hs: HomeServer):
  48. super().__init__(hs)
  49. self.clock = hs.get_clock()
  50. @staticmethod
  51. async def _serialize_payload() -> JsonDict:
  52. return {}
  53. async def _handle_request( # type: ignore[override]
  54. self, request: Request, content: JsonDict
  55. ) -> Tuple[int, JsonDict]:
  56. await self.clock.sleep(1.0)
  57. return HTTPStatus.OK, {"result": True}
  58. class ReplicationEndpointCancellationTestCase(unittest.HomeserverTestCase):
  59. """Tests for `ReplicationEndpoint` cancellation."""
  60. def create_test_resource(self) -> JsonResource:
  61. """Overrides `HomeserverTestCase.create_test_resource`."""
  62. resource = JsonResource(self.hs)
  63. CancellableReplicationEndpoint(self.hs).register(resource)
  64. UncancellableReplicationEndpoint(self.hs).register(resource)
  65. return resource
  66. def test_cancellable_disconnect(self) -> None:
  67. """Test that handlers with the `@cancellable` flag can be cancelled."""
  68. path = f"{REPLICATION_PREFIX}/{CancellableReplicationEndpoint.NAME}/"
  69. channel = self.make_request("POST", path, await_result=False, content={})
  70. test_disconnect(
  71. self.reactor,
  72. channel,
  73. expect_cancellation=True,
  74. expected_body={"error": "Request cancelled", "errcode": Codes.UNKNOWN},
  75. )
  76. def test_uncancellable_disconnect(self) -> None:
  77. """Test that handlers without the `@cancellable` flag cannot be cancelled."""
  78. path = f"{REPLICATION_PREFIX}/{UncancellableReplicationEndpoint.NAME}/"
  79. channel = self.make_request("POST", path, await_result=False, content={})
  80. test_disconnect(
  81. self.reactor,
  82. channel,
  83. expect_cancellation=False,
  84. expected_body={"result": True},
  85. )