@@ -38,7 +38,7 @@ from twisted.protocols.basic import FileSender
from synapse.api.errors import NotFoundError
from synapse.logging.context import defer_to_thread, make_deferred_yieldable
from synapse.logging.opentracing import trace
from synapse.logging.opentracing import start_active_span, trace, trace_with_opname
from synapse.util import Clock
from synapse.util.file_consumer import BackgroundFileConsumer
@@ -77,7 +77,7 @@ class MediaStorage:
self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker
self.clock = hs.get_clock()
@trace
@trace_with_opname("MediaStorage.store_file")
async def store_file(self, source: IO, file_info: FileInfo) -> str:
"""Write `source` to the on disk media store, and also any other
configured storage providers
@@ -91,18 +91,19 @@ class MediaStorage:
"""
with self.store_into_file(file_info) as (f, fname, finish_cb):
# Write to the main repository
# Write to the main media repository
await self.write_to_file(source, f)
# Write to the other storage providers
await finish_cb()
return fname
@trace
@trace_with_opname("MediaStorage.write_to_file")
async def write_to_file(self, source: IO, output: IO) -> None:
"""Asynchronously write the `source` to `output`."""
await defer_to_thread(self.reactor, _write_file_synchronously, source, output)
@trace
@trace_with_opname("MediaStorage.store_into_file")
@contextlib.contextmanager
def store_into_file(
self, file_info: FileInfo
@@ -117,9 +118,9 @@ class MediaStorage:
fname can be used to read the contents from after upload, e.g. to
generate thumbnails.
finish_cb must be called and waited on after the file has been
successfully been written to. Should not be called if there was an
error.
finish_cb must be called and waited on after the file has been successfully been
written to. Should not be called if there was an error. Checks for spam and
stor es the file into the configu red storage p rovide rs .
Args:
file_info: Info about the file to store
@@ -139,35 +140,48 @@ class MediaStorage:
finished_called = [False]
main_media_repo_write_trace_scope = start_active_span(
"writing to main media repo"
)
main_media_repo_write_trace_scope.__enter__()
try:
with open(fname, "wb") as f:
async def finish() -> None:
# Ensure that all writes have been flushed and close the
# file.
f.flush()
f.close()
spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam(
ReadableFileWrapper(self.clock, fname), file_info
)
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
logger.info("Blocking media due to spam checker")
# Note that we'll delete the stored media, due to the
# try/except below. The media also won't be stored in
# the DB.
# We currently ignore any additional field returned by
# the spam-check API.
raise SpamMediaException(errcode=spam_check[0])
for provider in self.storage_providers:
await provider.store_file(path, file_info)
finished_called[0] = True
# When someone calls finish, we assume they are done writing to the main media repo
main_media_repo_write_trace_scope.__exit__(None, None, None)
with start_active_span("writing to other storage providers"):
# Ensure that all writes have been flushed and close the
# file.
f.flush()
f.close()
spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam(
ReadableFileWrapper(self.clock, fname), file_info
)
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
logger.info("Blocking media due to spam checker")
# Note that we'll delete the stored media, due to the
# try/except below. The media also won't be stored in
# the DB.
# We currently ignore any additional field returned by
# the spam-check API.
raise SpamMediaException(errcode=spam_check[0])
for provider in self.storage_providers:
with start_active_span(str(provider)):
await provider.store_file(path, file_info)
finished_called[0] = True
yield f, fname, finish
except Exception as e:
try:
main_media_repo_write_trace_scope.__exit__(
type(e), None, e.__traceback__
)
os.remove(fname)
except Exception:
pass
@@ -175,7 +189,11 @@ class MediaStorage:
raise e from None
if not finished_called:
raise Exception("Finished callback not called")
exc = Exception("Finished callback not called")
main_media_repo_write_trace_scope.__exit__(
type(exc), None, exc.__traceback__
)
raise exc
async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]:
"""Attempts to fetch media described by file_info from the local cache