|
|
@@ -13,7 +13,7 @@ |
|
|
|
# limitations under the License. |
|
|
|
|
|
|
|
import queue |
|
|
|
from typing import BinaryIO, Optional, Union, cast |
|
|
|
from typing import Any, BinaryIO, Optional, Union, cast |
|
|
|
|
|
|
|
from twisted.internet import threads |
|
|
|
from twisted.internet.defer import Deferred |
|
|
@@ -58,7 +58,9 @@ class BackgroundFileConsumer: |
|
|
|
self._bytes_queue: queue.Queue[Optional[bytes]] = queue.Queue() |
|
|
|
|
|
|
|
# Deferred that is resolved when finished writing |
|
|
|
self._finished_deferred: Optional[Deferred[None]] = None |
|
|
|
# |
|
|
|
# This is really Deferred[None], but mypy doesn't seem to like that. |
|
|
|
self._finished_deferred: Optional[Deferred[Any]] = None |
|
|
|
|
|
|
|
# If the _writer thread throws an exception it gets stored here. |
|
|
|
self._write_exception: Optional[Exception] = None |
|
|
@@ -80,9 +82,13 @@ class BackgroundFileConsumer: |
|
|
|
self.streaming = streaming |
|
|
|
self._finished_deferred = run_in_background( |
|
|
|
threads.deferToThreadPool, |
|
|
|
self._reactor, |
|
|
|
self._reactor.getThreadPool(), |
|
|
|
self._writer, |
|
|
|
# mypy seems to get confused with the chaining of ParamSpec from |
|
|
|
# run_in_background to deferToThreadPool. |
|
|
|
# |
|
|
|
# For Twisted trunk, ignore arg-type; for Twisted release ignore unused-ignore. |
|
|
|
self._reactor, # type: ignore[arg-type,unused-ignore] |
|
|
|
self._reactor.getThreadPool(), # type: ignore[arg-type,unused-ignore] |
|
|
|
self._writer, # type: ignore[arg-type,unused-ignore] |
|
|
|
) |
|
|
|
if not streaming: |
|
|
|
self._producer.resumeProducing() |
|
|
|