Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 
 
 
 

159 строки
6.1 KiB

  1. # Copyright 2018 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import queue
  15. from typing import Any, BinaryIO, Optional, Union, cast
  16. from twisted.internet import threads
  17. from twisted.internet.defer import Deferred
  18. from twisted.internet.interfaces import IPullProducer, IPushProducer
  19. from synapse.logging.context import make_deferred_yieldable, run_in_background
  20. from synapse.types import ISynapseReactor
  21. class BackgroundFileConsumer:
  22. """A consumer that writes to a file like object. Supports both push
  23. and pull producers
  24. Args:
  25. file_obj: The file like object to write to. Closed when
  26. finished.
  27. reactor: the Twisted reactor to use
  28. """
  29. # For PushProducers pause if we have this many unwritten slices
  30. _PAUSE_ON_QUEUE_SIZE = 5
  31. # And resume once the size of the queue is less than this
  32. _RESUME_ON_QUEUE_SIZE = 2
  33. def __init__(self, file_obj: BinaryIO, reactor: ISynapseReactor) -> None:
  34. self._file_obj: BinaryIO = file_obj
  35. self._reactor: ISynapseReactor = reactor
  36. # Producer we're registered with
  37. self._producer: Optional[Union[IPushProducer, IPullProducer]] = None
  38. # True if PushProducer, false if PullProducer
  39. self.streaming = False
  40. # For PushProducers, indicates whether we've paused the producer and
  41. # need to call resumeProducing before we get more data.
  42. self._paused_producer = False
  43. # Queue of slices of bytes to be written. When producer calls
  44. # unregister a final None is sent.
  45. self._bytes_queue: queue.Queue[Optional[bytes]] = queue.Queue()
  46. # Deferred that is resolved when finished writing
  47. #
  48. # This is really Deferred[None], but mypy doesn't seem to like that.
  49. self._finished_deferred: Optional[Deferred[Any]] = None
  50. # If the _writer thread throws an exception it gets stored here.
  51. self._write_exception: Optional[Exception] = None
  52. def registerProducer(
  53. self, producer: Union[IPushProducer, IPullProducer], streaming: bool
  54. ) -> None:
  55. """Part of IConsumer interface
  56. Args:
  57. producer
  58. streaming: True if push based producer, False if pull
  59. based.
  60. """
  61. if self._producer:
  62. raise Exception("registerProducer called twice")
  63. self._producer = producer
  64. self.streaming = streaming
  65. self._finished_deferred = run_in_background(
  66. threads.deferToThreadPool,
  67. # mypy seems to get confused with the chaining of ParamSpec from
  68. # run_in_background to deferToThreadPool.
  69. #
  70. # For Twisted trunk, ignore arg-type; for Twisted release ignore unused-ignore.
  71. self._reactor, # type: ignore[arg-type,unused-ignore]
  72. self._reactor.getThreadPool(), # type: ignore[arg-type,unused-ignore]
  73. self._writer, # type: ignore[arg-type,unused-ignore]
  74. )
  75. if not streaming:
  76. self._producer.resumeProducing()
  77. def unregisterProducer(self) -> None:
  78. """Part of IProducer interface"""
  79. self._producer = None
  80. assert self._finished_deferred is not None
  81. if not self._finished_deferred.called:
  82. self._bytes_queue.put_nowait(None)
  83. def write(self, write_bytes: bytes) -> None:
  84. """Part of IProducer interface"""
  85. if self._write_exception:
  86. raise self._write_exception
  87. assert self._finished_deferred is not None
  88. if self._finished_deferred.called:
  89. raise Exception("consumer has closed")
  90. self._bytes_queue.put_nowait(write_bytes)
  91. # If this is a PushProducer and the queue is getting behind
  92. # then we pause the producer.
  93. if self.streaming and self._bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE:
  94. self._paused_producer = True
  95. assert self._producer is not None
  96. # cast safe because `streaming` means this is an IPushProducer
  97. cast(IPushProducer, self._producer).pauseProducing()
  98. def _writer(self) -> None:
  99. """This is run in a background thread to write to the file."""
  100. try:
  101. while self._producer or not self._bytes_queue.empty():
  102. # If we've paused the producer check if we should resume the
  103. # producer.
  104. if self._producer and self._paused_producer:
  105. if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
  106. self._reactor.callFromThread(self._resume_paused_producer)
  107. bytes = self._bytes_queue.get()
  108. # If we get a None (or empty list) then that's a signal used
  109. # to indicate we should check if we should stop.
  110. if bytes:
  111. self._file_obj.write(bytes)
  112. # If its a pull producer then we need to explicitly ask for
  113. # more stuff.
  114. if not self.streaming and self._producer:
  115. self._reactor.callFromThread(self._producer.resumeProducing)
  116. except Exception as e:
  117. self._write_exception = e
  118. raise
  119. finally:
  120. self._file_obj.close()
  121. def wait(self) -> "Deferred[None]":
  122. """Returns a deferred that resolves when finished writing to file"""
  123. assert self._finished_deferred is not None
  124. return make_deferred_yieldable(self._finished_deferred)
  125. def _resume_paused_producer(self) -> None:
  126. """Gets called if we should resume producing after being paused"""
  127. if self._paused_producer and self._producer:
  128. self._paused_producer = False
  129. self._producer.resumeProducing()