* Removes the `v1` directory from `test.rest.media.v1`. * Moves the non-REST code from `synapse.rest.media.v1` to `synapse.media`. * Flatten the `v1` directory from `synapse.rest.media`, but leave compatiblity with 3rd party media repositories and spam checkers.tags/v1.79.0rc1
@@ -0,0 +1 @@ | |||
Refactor the media modules. |
@@ -37,7 +37,7 @@ import os | |||
import shutil | |||
import sys | |||
from synapse.rest.media.v1.filepath import MediaFilePaths | |||
from synapse.media.filepath import MediaFilePaths | |||
logger = logging.getLogger() | |||
@@ -178,11 +178,13 @@ class ContentRepositoryConfig(Config): | |||
for i, provider_config in enumerate(storage_providers): | |||
# We special case the module "file_system" so as not to need to | |||
# expose FileStorageProviderBackend | |||
if provider_config["module"] == "file_system": | |||
provider_config["module"] = ( | |||
"synapse.rest.media.v1.storage_provider" | |||
".FileStorageProviderBackend" | |||
) | |||
if ( | |||
provider_config["module"] == "file_system" | |||
or provider_config["module"] == "synapse.rest.media.v1.storage_provider" | |||
): | |||
provider_config[ | |||
"module" | |||
] = "synapse.media.storage_provider.FileStorageProviderBackend" | |||
provider_class, parsed_config = load_module( | |||
provider_config, ("media_storage_providers", "<item %i>" % i) | |||
@@ -33,8 +33,8 @@ from typing_extensions import Literal | |||
import synapse | |||
from synapse.api.errors import Codes | |||
from synapse.logging.opentracing import trace | |||
from synapse.rest.media.v1._base import FileInfo | |||
from synapse.rest.media.v1.media_storage import ReadableFileWrapper | |||
from synapse.media._base import FileInfo | |||
from synapse.media.media_storage import ReadableFileWrapper | |||
from synapse.spam_checker_api import RegistrationBehaviour | |||
from synapse.types import JsonDict, RoomAlias, UserProfile | |||
from synapse.util.async_helpers import delay_cancellation, maybe_awaitable | |||
@@ -0,0 +1,479 @@ | |||
# Copyright 2014-2016 OpenMarket Ltd | |||
# Copyright 2019-2021 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import logging | |||
import os | |||
import urllib | |||
from abc import ABC, abstractmethod | |||
from types import TracebackType | |||
from typing import Awaitable, Dict, Generator, List, Optional, Tuple, Type | |||
import attr | |||
from twisted.internet.interfaces import IConsumer | |||
from twisted.protocols.basic import FileSender | |||
from twisted.web.server import Request | |||
from synapse.api.errors import Codes, SynapseError, cs_error | |||
from synapse.http.server import finish_request, respond_with_json | |||
from synapse.http.site import SynapseRequest | |||
from synapse.logging.context import make_deferred_yieldable | |||
from synapse.util.stringutils import is_ascii, parse_and_validate_server_name | |||
logger = logging.getLogger(__name__) | |||
# list all text content types that will have the charset default to UTF-8 when | |||
# none is given | |||
TEXT_CONTENT_TYPES = [ | |||
"text/css", | |||
"text/csv", | |||
"text/html", | |||
"text/calendar", | |||
"text/plain", | |||
"text/javascript", | |||
"application/json", | |||
"application/ld+json", | |||
"application/rtf", | |||
"image/svg+xml", | |||
"text/xml", | |||
] | |||
def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]: | |||
"""Parses the server name, media ID and optional file name from the request URI | |||
Also performs some rough validation on the server name. | |||
Args: | |||
request: The `Request`. | |||
Returns: | |||
A tuple containing the parsed server name, media ID and optional file name. | |||
Raises: | |||
SynapseError(404): if parsing or validation fail for any reason | |||
""" | |||
try: | |||
# The type on postpath seems incorrect in Twisted 21.2.0. | |||
postpath: List[bytes] = request.postpath # type: ignore | |||
assert postpath | |||
# This allows users to append e.g. /test.png to the URL. Useful for | |||
# clients that parse the URL to see content type. | |||
server_name_bytes, media_id_bytes = postpath[:2] | |||
server_name = server_name_bytes.decode("utf-8") | |||
media_id = media_id_bytes.decode("utf8") | |||
# Validate the server name, raising if invalid | |||
parse_and_validate_server_name(server_name) | |||
file_name = None | |||
if len(postpath) > 2: | |||
try: | |||
file_name = urllib.parse.unquote(postpath[-1].decode("utf-8")) | |||
except UnicodeDecodeError: | |||
pass | |||
return server_name, media_id, file_name | |||
except Exception: | |||
raise SynapseError( | |||
404, "Invalid media id token %r" % (request.postpath,), Codes.UNKNOWN | |||
) | |||
def respond_404(request: SynapseRequest) -> None: | |||
respond_with_json( | |||
request, | |||
404, | |||
cs_error("Not found %r" % (request.postpath,), code=Codes.NOT_FOUND), | |||
send_cors=True, | |||
) | |||
async def respond_with_file( | |||
request: SynapseRequest, | |||
media_type: str, | |||
file_path: str, | |||
file_size: Optional[int] = None, | |||
upload_name: Optional[str] = None, | |||
) -> None: | |||
logger.debug("Responding with %r", file_path) | |||
if os.path.isfile(file_path): | |||
if file_size is None: | |||
stat = os.stat(file_path) | |||
file_size = stat.st_size | |||
add_file_headers(request, media_type, file_size, upload_name) | |||
with open(file_path, "rb") as f: | |||
await make_deferred_yieldable(FileSender().beginFileTransfer(f, request)) | |||
finish_request(request) | |||
else: | |||
respond_404(request) | |||
def add_file_headers( | |||
request: Request, | |||
media_type: str, | |||
file_size: Optional[int], | |||
upload_name: Optional[str], | |||
) -> None: | |||
"""Adds the correct response headers in preparation for responding with the | |||
media. | |||
Args: | |||
request | |||
media_type: The media/content type. | |||
file_size: Size in bytes of the media, if known. | |||
upload_name: The name of the requested file, if any. | |||
""" | |||
def _quote(x: str) -> str: | |||
return urllib.parse.quote(x.encode("utf-8")) | |||
# Default to a UTF-8 charset for text content types. | |||
# ex, uses UTF-8 for 'text/css' but not 'text/css; charset=UTF-16' | |||
if media_type.lower() in TEXT_CONTENT_TYPES: | |||
content_type = media_type + "; charset=UTF-8" | |||
else: | |||
content_type = media_type | |||
request.setHeader(b"Content-Type", content_type.encode("UTF-8")) | |||
if upload_name: | |||
# RFC6266 section 4.1 [1] defines both `filename` and `filename*`. | |||
# | |||
# `filename` is defined to be a `value`, which is defined by RFC2616 | |||
# section 3.6 [2] to be a `token` or a `quoted-string`, where a `token` | |||
# is (essentially) a single US-ASCII word, and a `quoted-string` is a | |||
# US-ASCII string surrounded by double-quotes, using backslash as an | |||
# escape character. Note that %-encoding is *not* permitted. | |||
# | |||
# `filename*` is defined to be an `ext-value`, which is defined in | |||
# RFC5987 section 3.2.1 [3] to be `charset "'" [ language ] "'" value-chars`, | |||
# where `value-chars` is essentially a %-encoded string in the given charset. | |||
# | |||
# [1]: https://tools.ietf.org/html/rfc6266#section-4.1 | |||
# [2]: https://tools.ietf.org/html/rfc2616#section-3.6 | |||
# [3]: https://tools.ietf.org/html/rfc5987#section-3.2.1 | |||
# We avoid the quoted-string version of `filename`, because (a) synapse didn't | |||
# correctly interpret those as of 0.99.2 and (b) they are a bit of a pain and we | |||
# may as well just do the filename* version. | |||
if _can_encode_filename_as_token(upload_name): | |||
disposition = "inline; filename=%s" % (upload_name,) | |||
else: | |||
disposition = "inline; filename*=utf-8''%s" % (_quote(upload_name),) | |||
request.setHeader(b"Content-Disposition", disposition.encode("ascii")) | |||
# cache for at least a day. | |||
# XXX: we might want to turn this off for data we don't want to | |||
# recommend caching as it's sensitive or private - or at least | |||
# select private. don't bother setting Expires as all our | |||
# clients are smart enough to be happy with Cache-Control | |||
request.setHeader(b"Cache-Control", b"public,max-age=86400,s-maxage=86400") | |||
if file_size is not None: | |||
request.setHeader(b"Content-Length", b"%d" % (file_size,)) | |||
# Tell web crawlers to not index, archive, or follow links in media. This | |||
# should help to prevent things in the media repo from showing up in web | |||
# search results. | |||
request.setHeader(b"X-Robots-Tag", "noindex, nofollow, noarchive, noimageindex") | |||
# separators as defined in RFC2616. SP and HT are handled separately. | |||
# see _can_encode_filename_as_token. | |||
_FILENAME_SEPARATOR_CHARS = { | |||
"(", | |||
")", | |||
"<", | |||
">", | |||
"@", | |||
",", | |||
";", | |||
":", | |||
"\\", | |||
'"', | |||
"/", | |||
"[", | |||
"]", | |||
"?", | |||
"=", | |||
"{", | |||
"}", | |||
} | |||
def _can_encode_filename_as_token(x: str) -> bool: | |||
for c in x: | |||
# from RFC2616: | |||
# | |||
# token = 1*<any CHAR except CTLs or separators> | |||
# | |||
# separators = "(" | ")" | "<" | ">" | "@" | |||
# | "," | ";" | ":" | "\" | <"> | |||
# | "/" | "[" | "]" | "?" | "=" | |||
# | "{" | "}" | SP | HT | |||
# | |||
# CHAR = <any US-ASCII character (octets 0 - 127)> | |||
# | |||
# CTL = <any US-ASCII control character | |||
# (octets 0 - 31) and DEL (127)> | |||
# | |||
if ord(c) >= 127 or ord(c) <= 32 or c in _FILENAME_SEPARATOR_CHARS: | |||
return False | |||
return True | |||
async def respond_with_responder( | |||
request: SynapseRequest, | |||
responder: "Optional[Responder]", | |||
media_type: str, | |||
file_size: Optional[int], | |||
upload_name: Optional[str] = None, | |||
) -> None: | |||
"""Responds to the request with given responder. If responder is None then | |||
returns 404. | |||
Args: | |||
request | |||
responder | |||
media_type: The media/content type. | |||
file_size: Size in bytes of the media. If not known it should be None | |||
upload_name: The name of the requested file, if any. | |||
""" | |||
if not responder: | |||
respond_404(request) | |||
return | |||
# If we have a responder we *must* use it as a context manager. | |||
with responder: | |||
if request._disconnected: | |||
logger.warning( | |||
"Not sending response to request %s, already disconnected.", request | |||
) | |||
return | |||
logger.debug("Responding to media request with responder %s", responder) | |||
add_file_headers(request, media_type, file_size, upload_name) | |||
try: | |||
await responder.write_to_consumer(request) | |||
except Exception as e: | |||
# The majority of the time this will be due to the client having gone | |||
# away. Unfortunately, Twisted simply throws a generic exception at us | |||
# in that case. | |||
logger.warning("Failed to write to consumer: %s %s", type(e), e) | |||
# Unregister the producer, if it has one, so Twisted doesn't complain | |||
if request.producer: | |||
request.unregisterProducer() | |||
finish_request(request) | |||
class Responder(ABC): | |||
"""Represents a response that can be streamed to the requester. | |||
Responder is a context manager which *must* be used, so that any resources | |||
held can be cleaned up. | |||
""" | |||
@abstractmethod | |||
def write_to_consumer(self, consumer: IConsumer) -> Awaitable: | |||
"""Stream response into consumer | |||
Args: | |||
consumer: The consumer to stream into. | |||
Returns: | |||
Resolves once the response has finished being written | |||
""" | |||
raise NotImplementedError() | |||
def __enter__(self) -> None: # noqa: B027 | |||
pass | |||
def __exit__( # noqa: B027 | |||
self, | |||
exc_type: Optional[Type[BaseException]], | |||
exc_val: Optional[BaseException], | |||
exc_tb: Optional[TracebackType], | |||
) -> None: | |||
pass | |||
@attr.s(slots=True, frozen=True, auto_attribs=True) | |||
class ThumbnailInfo: | |||
"""Details about a generated thumbnail.""" | |||
width: int | |||
height: int | |||
method: str | |||
# Content type of thumbnail, e.g. image/png | |||
type: str | |||
# The size of the media file, in bytes. | |||
length: Optional[int] = None | |||
@attr.s(slots=True, frozen=True, auto_attribs=True) | |||
class FileInfo: | |||
"""Details about a requested/uploaded file.""" | |||
# The server name where the media originated from, or None if local. | |||
server_name: Optional[str] | |||
# The local ID of the file. For local files this is the same as the media_id | |||
file_id: str | |||
# If the file is for the url preview cache | |||
url_cache: bool = False | |||
# Whether the file is a thumbnail or not. | |||
thumbnail: Optional[ThumbnailInfo] = None | |||
# The below properties exist to maintain compatibility with third-party modules. | |||
@property | |||
def thumbnail_width(self) -> Optional[int]: | |||
if not self.thumbnail: | |||
return None | |||
return self.thumbnail.width | |||
@property | |||
def thumbnail_height(self) -> Optional[int]: | |||
if not self.thumbnail: | |||
return None | |||
return self.thumbnail.height | |||
@property | |||
def thumbnail_method(self) -> Optional[str]: | |||
if not self.thumbnail: | |||
return None | |||
return self.thumbnail.method | |||
@property | |||
def thumbnail_type(self) -> Optional[str]: | |||
if not self.thumbnail: | |||
return None | |||
return self.thumbnail.type | |||
@property | |||
def thumbnail_length(self) -> Optional[int]: | |||
if not self.thumbnail: | |||
return None | |||
return self.thumbnail.length | |||
def get_filename_from_headers(headers: Dict[bytes, List[bytes]]) -> Optional[str]: | |||
""" | |||
Get the filename of the downloaded file by inspecting the | |||
Content-Disposition HTTP header. | |||
Args: | |||
headers: The HTTP request headers. | |||
Returns: | |||
The filename, or None. | |||
""" | |||
content_disposition = headers.get(b"Content-Disposition", [b""]) | |||
# No header, bail out. | |||
if not content_disposition[0]: | |||
return None | |||
_, params = _parse_header(content_disposition[0]) | |||
upload_name = None | |||
# First check if there is a valid UTF-8 filename | |||
upload_name_utf8 = params.get(b"filename*", None) | |||
if upload_name_utf8: | |||
if upload_name_utf8.lower().startswith(b"utf-8''"): | |||
upload_name_utf8 = upload_name_utf8[7:] | |||
# We have a filename*= section. This MUST be ASCII, and any UTF-8 | |||
# bytes are %-quoted. | |||
try: | |||
# Once it is decoded, we can then unquote the %-encoded | |||
# parts strictly into a unicode string. | |||
upload_name = urllib.parse.unquote( | |||
upload_name_utf8.decode("ascii"), errors="strict" | |||
) | |||
except UnicodeDecodeError: | |||
# Incorrect UTF-8. | |||
pass | |||
# If there isn't check for an ascii name. | |||
if not upload_name: | |||
upload_name_ascii = params.get(b"filename", None) | |||
if upload_name_ascii and is_ascii(upload_name_ascii): | |||
upload_name = upload_name_ascii.decode("ascii") | |||
# This may be None here, indicating we did not find a matching name. | |||
return upload_name | |||
def _parse_header(line: bytes) -> Tuple[bytes, Dict[bytes, bytes]]: | |||
"""Parse a Content-type like header. | |||
Cargo-culted from `cgi`, but works on bytes rather than strings. | |||
Args: | |||
line: header to be parsed | |||
Returns: | |||
The main content-type, followed by the parameter dictionary | |||
""" | |||
parts = _parseparam(b";" + line) | |||
key = next(parts) | |||
pdict = {} | |||
for p in parts: | |||
i = p.find(b"=") | |||
if i >= 0: | |||
name = p[:i].strip().lower() | |||
value = p[i + 1 :].strip() | |||
# strip double-quotes | |||
if len(value) >= 2 and value[0:1] == value[-1:] == b'"': | |||
value = value[1:-1] | |||
value = value.replace(b"\\\\", b"\\").replace(b'\\"', b'"') | |||
pdict[name] = value | |||
return key, pdict | |||
def _parseparam(s: bytes) -> Generator[bytes, None, None]: | |||
"""Generator which splits the input on ;, respecting double-quoted sequences | |||
Cargo-culted from `cgi`, but works on bytes rather than strings. | |||
Args: | |||
s: header to be parsed | |||
Returns: | |||
The split input | |||
""" | |||
while s[:1] == b";": | |||
s = s[1:] | |||
# look for the next ; | |||
end = s.find(b";") | |||
# if there is an odd number of " marks between here and the next ;, skip to the | |||
# next ; instead | |||
while end > 0 and (s.count(b'"', 0, end) - s.count(b'\\"', 0, end)) % 2: | |||
end = s.find(b";", end + 1) | |||
if end < 0: | |||
end = len(s) | |||
f = s[:end] | |||
yield f.strip() | |||
s = s[end:] |
@@ -32,18 +32,10 @@ from synapse.api.errors import ( | |||
RequestSendFailed, | |||
SynapseError, | |||
) | |||
from synapse.config._base import ConfigError | |||
from synapse.config.repository import ThumbnailRequirement | |||
from synapse.http.server import UnrecognizedRequestResource | |||
from synapse.http.site import SynapseRequest | |||
from synapse.logging.context import defer_to_thread | |||
from synapse.metrics.background_process_metrics import run_as_background_process | |||
from synapse.types import UserID | |||
from synapse.util.async_helpers import Linearizer | |||
from synapse.util.retryutils import NotRetryingDestination | |||
from synapse.util.stringutils import random_string | |||
from ._base import ( | |||
from synapse.media._base import ( | |||
FileInfo, | |||
Responder, | |||
ThumbnailInfo, | |||
@@ -51,15 +43,15 @@ from ._base import ( | |||
respond_404, | |||
respond_with_responder, | |||
) | |||
from .config_resource import MediaConfigResource | |||
from .download_resource import DownloadResource | |||
from .filepath import MediaFilePaths | |||
from .media_storage import MediaStorage | |||
from .preview_url_resource import PreviewUrlResource | |||
from .storage_provider import StorageProviderWrapper | |||
from .thumbnail_resource import ThumbnailResource | |||
from .thumbnailer import Thumbnailer, ThumbnailError | |||
from .upload_resource import UploadResource | |||
from synapse.media.filepath import MediaFilePaths | |||
from synapse.media.media_storage import MediaStorage | |||
from synapse.media.storage_provider import StorageProviderWrapper | |||
from synapse.media.thumbnailer import Thumbnailer, ThumbnailError | |||
from synapse.metrics.background_process_metrics import run_as_background_process | |||
from synapse.types import UserID | |||
from synapse.util.async_helpers import Linearizer | |||
from synapse.util.retryutils import NotRetryingDestination | |||
from synapse.util.stringutils import random_string | |||
if TYPE_CHECKING: | |||
from synapse.server import HomeServer | |||
@@ -1044,69 +1036,3 @@ class MediaRepository: | |||
removed_media.append(media_id) | |||
return removed_media, len(removed_media) | |||
class MediaRepositoryResource(UnrecognizedRequestResource): | |||
"""File uploading and downloading. | |||
Uploads are POSTed to a resource which returns a token which is used to GET | |||
the download:: | |||
=> POST /_matrix/media/r0/upload HTTP/1.1 | |||
Content-Type: <media-type> | |||
Content-Length: <content-length> | |||
<media> | |||
<= HTTP/1.1 200 OK | |||
Content-Type: application/json | |||
{ "content_uri": "mxc://<server-name>/<media-id>" } | |||
=> GET /_matrix/media/r0/download/<server-name>/<media-id> HTTP/1.1 | |||
<= HTTP/1.1 200 OK | |||
Content-Type: <media-type> | |||
Content-Disposition: attachment;filename=<upload-filename> | |||
<media> | |||
Clients can get thumbnails by supplying a desired width and height and | |||
thumbnailing method:: | |||
=> GET /_matrix/media/r0/thumbnail/<server_name> | |||
/<media-id>?width=<w>&height=<h>&method=<m> HTTP/1.1 | |||
<= HTTP/1.1 200 OK | |||
Content-Type: image/jpeg or image/png | |||
<thumbnail> | |||
The thumbnail methods are "crop" and "scale". "scale" tries to return an | |||
image where either the width or the height is smaller than the requested | |||
size. The client should then scale and letterbox the image if it needs to | |||
fit within a given rectangle. "crop" tries to return an image where the | |||
width and height are close to the requested size and the aspect matches | |||
the requested size. The client should scale the image if it needs to fit | |||
within a given rectangle. | |||
""" | |||
def __init__(self, hs: "HomeServer"): | |||
# If we're not configured to use it, raise if we somehow got here. | |||
if not hs.config.media.can_load_media_repo: | |||
raise ConfigError("Synapse is not configured to use a media repo.") | |||
super().__init__() | |||
media_repo = hs.get_media_repository() | |||
self.putChild(b"upload", UploadResource(hs, media_repo)) | |||
self.putChild(b"download", DownloadResource(hs, media_repo)) | |||
self.putChild( | |||
b"thumbnail", ThumbnailResource(hs, media_repo, media_repo.media_storage) | |||
) | |||
if hs.config.media.url_preview_enabled: | |||
self.putChild( | |||
b"preview_url", | |||
PreviewUrlResource(hs, media_repo, media_repo.media_storage), | |||
) | |||
self.putChild(b"config", MediaConfigResource(hs)) |
@@ -0,0 +1,374 @@ | |||
# Copyright 2018-2021 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import contextlib | |||
import logging | |||
import os | |||
import shutil | |||
from types import TracebackType | |||
from typing import ( | |||
IO, | |||
TYPE_CHECKING, | |||
Any, | |||
Awaitable, | |||
BinaryIO, | |||
Callable, | |||
Generator, | |||
Optional, | |||
Sequence, | |||
Tuple, | |||
Type, | |||
) | |||
import attr | |||
from twisted.internet.defer import Deferred | |||
from twisted.internet.interfaces import IConsumer | |||
from twisted.protocols.basic import FileSender | |||
import synapse | |||
from synapse.api.errors import NotFoundError | |||
from synapse.logging.context import defer_to_thread, make_deferred_yieldable | |||
from synapse.util import Clock | |||
from synapse.util.file_consumer import BackgroundFileConsumer | |||
from ._base import FileInfo, Responder | |||
from .filepath import MediaFilePaths | |||
if TYPE_CHECKING: | |||
from synapse.media.storage_provider import StorageProvider | |||
from synapse.server import HomeServer | |||
logger = logging.getLogger(__name__) | |||
class MediaStorage: | |||
"""Responsible for storing/fetching files from local sources. | |||
Args: | |||
hs | |||
local_media_directory: Base path where we store media on disk | |||
filepaths | |||
storage_providers: List of StorageProvider that are used to fetch and store files. | |||
""" | |||
def __init__( | |||
self, | |||
hs: "HomeServer", | |||
local_media_directory: str, | |||
filepaths: MediaFilePaths, | |||
storage_providers: Sequence["StorageProvider"], | |||
): | |||
self.hs = hs | |||
self.reactor = hs.get_reactor() | |||
self.local_media_directory = local_media_directory | |||
self.filepaths = filepaths | |||
self.storage_providers = storage_providers | |||
self.spam_checker = hs.get_spam_checker() | |||
self.clock = hs.get_clock() | |||
async def store_file(self, source: IO, file_info: FileInfo) -> str: | |||
"""Write `source` to the on disk media store, and also any other | |||
configured storage providers | |||
Args: | |||
source: A file like object that should be written | |||
file_info: Info about the file to store | |||
Returns: | |||
the file path written to in the primary media store | |||
""" | |||
with self.store_into_file(file_info) as (f, fname, finish_cb): | |||
# Write to the main repository | |||
await self.write_to_file(source, f) | |||
await finish_cb() | |||
return fname | |||
async def write_to_file(self, source: IO, output: IO) -> None: | |||
"""Asynchronously write the `source` to `output`.""" | |||
await defer_to_thread(self.reactor, _write_file_synchronously, source, output) | |||
@contextlib.contextmanager | |||
def store_into_file( | |||
self, file_info: FileInfo | |||
) -> Generator[Tuple[BinaryIO, str, Callable[[], Awaitable[None]]], None, None]: | |||
"""Context manager used to get a file like object to write into, as | |||
described by file_info. | |||
Actually yields a 3-tuple (file, fname, finish_cb), where file is a file | |||
like object that can be written to, fname is the absolute path of file | |||
on disk, and finish_cb is a function that returns an awaitable. | |||
fname can be used to read the contents from after upload, e.g. to | |||
generate thumbnails. | |||
finish_cb must be called and waited on after the file has been | |||
successfully been written to. Should not be called if there was an | |||
error. | |||
Args: | |||
file_info: Info about the file to store | |||
Example: | |||
with media_storage.store_into_file(info) as (f, fname, finish_cb): | |||
# .. write into f ... | |||
await finish_cb() | |||
""" | |||
path = self._file_info_to_path(file_info) | |||
fname = os.path.join(self.local_media_directory, path) | |||
dirname = os.path.dirname(fname) | |||
os.makedirs(dirname, exist_ok=True) | |||
finished_called = [False] | |||
try: | |||
with open(fname, "wb") as f: | |||
async def finish() -> None: | |||
# Ensure that all writes have been flushed and close the | |||
# file. | |||
f.flush() | |||
f.close() | |||
spam_check = await self.spam_checker.check_media_file_for_spam( | |||
ReadableFileWrapper(self.clock, fname), file_info | |||
) | |||
if spam_check != synapse.module_api.NOT_SPAM: | |||
logger.info("Blocking media due to spam checker") | |||
# Note that we'll delete the stored media, due to the | |||
# try/except below. The media also won't be stored in | |||
# the DB. | |||
# We currently ignore any additional field returned by | |||
# the spam-check API. | |||
raise SpamMediaException(errcode=spam_check[0]) | |||
for provider in self.storage_providers: | |||
await provider.store_file(path, file_info) | |||
finished_called[0] = True | |||
yield f, fname, finish | |||
except Exception as e: | |||
try: | |||
os.remove(fname) | |||
except Exception: | |||
pass | |||
raise e from None | |||
if not finished_called: | |||
raise Exception("Finished callback not called") | |||
async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]: | |||
"""Attempts to fetch media described by file_info from the local cache | |||
and configured storage providers. | |||
Args: | |||
file_info | |||
Returns: | |||
Returns a Responder if the file was found, otherwise None. | |||
""" | |||
paths = [self._file_info_to_path(file_info)] | |||
# fallback for remote thumbnails with no method in the filename | |||
if file_info.thumbnail and file_info.server_name: | |||
paths.append( | |||
self.filepaths.remote_media_thumbnail_rel_legacy( | |||
server_name=file_info.server_name, | |||
file_id=file_info.file_id, | |||
width=file_info.thumbnail.width, | |||
height=file_info.thumbnail.height, | |||
content_type=file_info.thumbnail.type, | |||
) | |||
) | |||
for path in paths: | |||
local_path = os.path.join(self.local_media_directory, path) | |||
if os.path.exists(local_path): | |||
logger.debug("responding with local file %s", local_path) | |||
return FileResponder(open(local_path, "rb")) | |||
logger.debug("local file %s did not exist", local_path) | |||
for provider in self.storage_providers: | |||
for path in paths: | |||
res: Any = await provider.fetch(path, file_info) | |||
if res: | |||
logger.debug("Streaming %s from %s", path, provider) | |||
return res | |||
logger.debug("%s not found on %s", path, provider) | |||
return None | |||
async def ensure_media_is_in_local_cache(self, file_info: FileInfo) -> str: | |||
"""Ensures that the given file is in the local cache. Attempts to | |||
download it from storage providers if it isn't. | |||
Args: | |||
file_info | |||
Returns: | |||
Full path to local file | |||
""" | |||
path = self._file_info_to_path(file_info) | |||
local_path = os.path.join(self.local_media_directory, path) | |||
if os.path.exists(local_path): | |||
return local_path | |||
# Fallback for paths without method names | |||
# Should be removed in the future | |||
if file_info.thumbnail and file_info.server_name: | |||
legacy_path = self.filepaths.remote_media_thumbnail_rel_legacy( | |||
server_name=file_info.server_name, | |||
file_id=file_info.file_id, | |||
width=file_info.thumbnail.width, | |||
height=file_info.thumbnail.height, | |||
content_type=file_info.thumbnail.type, | |||
) | |||
legacy_local_path = os.path.join(self.local_media_directory, legacy_path) | |||
if os.path.exists(legacy_local_path): | |||
return legacy_local_path | |||
dirname = os.path.dirname(local_path) | |||
os.makedirs(dirname, exist_ok=True) | |||
for provider in self.storage_providers: | |||
res: Any = await provider.fetch(path, file_info) | |||
if res: | |||
with res: | |||
consumer = BackgroundFileConsumer( | |||
open(local_path, "wb"), self.reactor | |||
) | |||
await res.write_to_consumer(consumer) | |||
await consumer.wait() | |||
return local_path | |||
raise NotFoundError() | |||
def _file_info_to_path(self, file_info: FileInfo) -> str: | |||
"""Converts file_info into a relative path. | |||
The path is suitable for storing files under a directory, e.g. used to | |||
store files on local FS under the base media repository directory. | |||
""" | |||
if file_info.url_cache: | |||
if file_info.thumbnail: | |||
return self.filepaths.url_cache_thumbnail_rel( | |||
media_id=file_info.file_id, | |||
width=file_info.thumbnail.width, | |||
height=file_info.thumbnail.height, | |||
content_type=file_info.thumbnail.type, | |||
method=file_info.thumbnail.method, | |||
) | |||
return self.filepaths.url_cache_filepath_rel(file_info.file_id) | |||
if file_info.server_name: | |||
if file_info.thumbnail: | |||
return self.filepaths.remote_media_thumbnail_rel( | |||
server_name=file_info.server_name, | |||
file_id=file_info.file_id, | |||
width=file_info.thumbnail.width, | |||
height=file_info.thumbnail.height, | |||
content_type=file_info.thumbnail.type, | |||
method=file_info.thumbnail.method, | |||
) | |||
return self.filepaths.remote_media_filepath_rel( | |||
file_info.server_name, file_info.file_id | |||
) | |||
if file_info.thumbnail: | |||
return self.filepaths.local_media_thumbnail_rel( | |||
media_id=file_info.file_id, | |||
width=file_info.thumbnail.width, | |||
height=file_info.thumbnail.height, | |||
content_type=file_info.thumbnail.type, | |||
method=file_info.thumbnail.method, | |||
) | |||
return self.filepaths.local_media_filepath_rel(file_info.file_id) | |||
def _write_file_synchronously(source: IO, dest: IO) -> None: | |||
"""Write `source` to the file like `dest` synchronously. Should be called | |||
from a thread. | |||
Args: | |||
source: A file like object that's to be written | |||
dest: A file like object to be written to | |||
""" | |||
source.seek(0) # Ensure we read from the start of the file | |||
shutil.copyfileobj(source, dest) | |||
class FileResponder(Responder): | |||
"""Wraps an open file that can be sent to a request. | |||
Args: | |||
open_file: A file like object to be streamed ot the client, | |||
is closed when finished streaming. | |||
""" | |||
def __init__(self, open_file: IO): | |||
self.open_file = open_file | |||
def write_to_consumer(self, consumer: IConsumer) -> Deferred: | |||
return make_deferred_yieldable( | |||
FileSender().beginFileTransfer(self.open_file, consumer) | |||
) | |||
def __exit__( | |||
self, | |||
exc_type: Optional[Type[BaseException]], | |||
exc_val: Optional[BaseException], | |||
exc_tb: Optional[TracebackType], | |||
) -> None: | |||
self.open_file.close() | |||
class SpamMediaException(NotFoundError): | |||
"""The media was blocked by a spam checker, so we simply 404 the request (in | |||
the same way as if it was quarantined). | |||
""" | |||
@attr.s(slots=True, auto_attribs=True) | |||
class ReadableFileWrapper: | |||
"""Wrapper that allows reading a file in chunks, yielding to the reactor, | |||
and writing to a callback. | |||
This is simplified `FileSender` that takes an IO object rather than an | |||
`IConsumer`. | |||
""" | |||
CHUNK_SIZE = 2**14 | |||
clock: Clock | |||
path: str | |||
async def write_chunks_to(self, callback: Callable[[bytes], object]) -> None: | |||
"""Reads the file in chunks and calls the callback with each chunk.""" | |||
with open(self.path, "rb") as file: | |||
while True: | |||
chunk = file.read(self.CHUNK_SIZE) | |||
if not chunk: | |||
break | |||
callback(chunk) | |||
# We yield to the reactor by sleeping for 0 seconds. | |||
await self.clock.sleep(0) |
@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, List, Optional | |||
import attr | |||
from synapse.rest.media.v1.preview_html import parse_html_description | |||
from synapse.media.preview_html import parse_html_description | |||
from synapse.types import JsonDict | |||
from synapse.util import json_decoder | |||
@@ -0,0 +1,181 @@ | |||
# Copyright 2018-2021 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import abc | |||
import logging | |||
import os | |||
import shutil | |||
from typing import TYPE_CHECKING, Callable, Optional | |||
from synapse.config._base import Config | |||
from synapse.logging.context import defer_to_thread, run_in_background | |||
from synapse.util.async_helpers import maybe_awaitable | |||
from ._base import FileInfo, Responder | |||
from .media_storage import FileResponder | |||
logger = logging.getLogger(__name__) | |||
if TYPE_CHECKING: | |||
from synapse.server import HomeServer | |||
class StorageProvider(metaclass=abc.ABCMeta): | |||
"""A storage provider is a service that can store uploaded media and | |||
retrieve them. | |||
""" | |||
@abc.abstractmethod | |||
async def store_file(self, path: str, file_info: FileInfo) -> None: | |||
"""Store the file described by file_info. The actual contents can be | |||
retrieved by reading the file in file_info.upload_path. | |||
Args: | |||
path: Relative path of file in local cache | |||
file_info: The metadata of the file. | |||
""" | |||
@abc.abstractmethod | |||
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: | |||
"""Attempt to fetch the file described by file_info and stream it | |||
into writer. | |||
Args: | |||
path: Relative path of file in local cache | |||
file_info: The metadata of the file. | |||
Returns: | |||
Returns a Responder if the provider has the file, otherwise returns None. | |||
""" | |||
class StorageProviderWrapper(StorageProvider): | |||
"""Wraps a storage provider and provides various config options | |||
Args: | |||
backend: The storage provider to wrap. | |||
store_local: Whether to store new local files or not. | |||
store_synchronous: Whether to wait for file to be successfully | |||
uploaded, or todo the upload in the background. | |||
store_remote: Whether remote media should be uploaded | |||
""" | |||
def __init__( | |||
self, | |||
backend: StorageProvider, | |||
store_local: bool, | |||
store_synchronous: bool, | |||
store_remote: bool, | |||
): | |||
self.backend = backend | |||
self.store_local = store_local | |||
self.store_synchronous = store_synchronous | |||
self.store_remote = store_remote | |||
def __str__(self) -> str: | |||
return "StorageProviderWrapper[%s]" % (self.backend,) | |||
async def store_file(self, path: str, file_info: FileInfo) -> None: | |||
if not file_info.server_name and not self.store_local: | |||
return None | |||
if file_info.server_name and not self.store_remote: | |||
return None | |||
if file_info.url_cache: | |||
# The URL preview cache is short lived and not worth offloading or | |||
# backing up. | |||
return None | |||
if self.store_synchronous: | |||
# store_file is supposed to return an Awaitable, but guard | |||
# against improper implementations. | |||
await maybe_awaitable(self.backend.store_file(path, file_info)) # type: ignore | |||
else: | |||
# TODO: Handle errors. | |||
async def store() -> None: | |||
try: | |||
return await maybe_awaitable( | |||
self.backend.store_file(path, file_info) | |||
) | |||
except Exception: | |||
logger.exception("Error storing file") | |||
run_in_background(store) | |||
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: | |||
if file_info.url_cache: | |||
# Files in the URL preview cache definitely aren't stored here, | |||
# so avoid any potentially slow I/O or network access. | |||
return None | |||
# store_file is supposed to return an Awaitable, but guard | |||
# against improper implementations. | |||
return await maybe_awaitable(self.backend.fetch(path, file_info)) | |||
class FileStorageProviderBackend(StorageProvider): | |||
"""A storage provider that stores files in a directory on a filesystem. | |||
Args: | |||
hs | |||
config: The config returned by `parse_config`. | |||
""" | |||
def __init__(self, hs: "HomeServer", config: str): | |||
self.hs = hs | |||
self.cache_directory = hs.config.media.media_store_path | |||
self.base_directory = config | |||
def __str__(self) -> str: | |||
return "FileStorageProviderBackend[%s]" % (self.base_directory,) | |||
async def store_file(self, path: str, file_info: FileInfo) -> None: | |||
"""See StorageProvider.store_file""" | |||
primary_fname = os.path.join(self.cache_directory, path) | |||
backup_fname = os.path.join(self.base_directory, path) | |||
dirname = os.path.dirname(backup_fname) | |||
os.makedirs(dirname, exist_ok=True) | |||
# mypy needs help inferring the type of the second parameter, which is generic | |||
shutil_copyfile: Callable[[str, str], str] = shutil.copyfile | |||
await defer_to_thread( | |||
self.hs.get_reactor(), | |||
shutil_copyfile, | |||
primary_fname, | |||
backup_fname, | |||
) | |||
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: | |||
"""See StorageProvider.fetch""" | |||
backup_fname = os.path.join(self.base_directory, path) | |||
if os.path.isfile(backup_fname): | |||
return FileResponder(open(backup_fname, "rb")) | |||
return None | |||
@staticmethod | |||
def parse_config(config: dict) -> str: | |||
"""Called on startup to parse config supplied. This should parse | |||
the config and raise if there is a problem. | |||
The returned value is passed into the constructor. | |||
In this case we only care about a single param, the directory, so let's | |||
just pull that out. | |||
""" | |||
return Config.ensure_directory(config["directory"]) |
@@ -22,11 +22,10 @@ from synapse.http.server import ( | |||
) | |||
from synapse.http.servlet import parse_boolean | |||
from synapse.http.site import SynapseRequest | |||
from ._base import parse_media_id, respond_404 | |||
from synapse.media._base import parse_media_id, respond_404 | |||
if TYPE_CHECKING: | |||
from synapse.rest.media.v1.media_repository import MediaRepository | |||
from synapse.media.media_repository import MediaRepository | |||
from synapse.server import HomeServer | |||
logger = logging.getLogger(__name__) |
@@ -0,0 +1,93 @@ | |||
# Copyright 2014-2016 OpenMarket Ltd | |||
# Copyright 2018-2021 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from typing import TYPE_CHECKING | |||
from synapse.config._base import ConfigError | |||
from synapse.http.server import UnrecognizedRequestResource | |||
from .config_resource import MediaConfigResource | |||
from .download_resource import DownloadResource | |||
from .preview_url_resource import PreviewUrlResource | |||
from .thumbnail_resource import ThumbnailResource | |||
from .upload_resource import UploadResource | |||
if TYPE_CHECKING: | |||
from synapse.server import HomeServer | |||
class MediaRepositoryResource(UnrecognizedRequestResource): | |||
"""File uploading and downloading. | |||
Uploads are POSTed to a resource which returns a token which is used to GET | |||
the download:: | |||
=> POST /_matrix/media/r0/upload HTTP/1.1 | |||
Content-Type: <media-type> | |||
Content-Length: <content-length> | |||
<media> | |||
<= HTTP/1.1 200 OK | |||
Content-Type: application/json | |||
{ "content_uri": "mxc://<server-name>/<media-id>" } | |||
=> GET /_matrix/media/r0/download/<server-name>/<media-id> HTTP/1.1 | |||
<= HTTP/1.1 200 OK | |||
Content-Type: <media-type> | |||
Content-Disposition: attachment;filename=<upload-filename> | |||
<media> | |||
Clients can get thumbnails by supplying a desired width and height and | |||
thumbnailing method:: | |||
=> GET /_matrix/media/r0/thumbnail/<server_name> | |||
/<media-id>?width=<w>&height=<h>&method=<m> HTTP/1.1 | |||
<= HTTP/1.1 200 OK | |||
Content-Type: image/jpeg or image/png | |||
<thumbnail> | |||
The thumbnail methods are "crop" and "scale". "scale" tries to return an | |||
image where either the width or the height is smaller than the requested | |||
size. The client should then scale and letterbox the image if it needs to | |||
fit within a given rectangle. "crop" tries to return an image where the | |||
width and height are close to the requested size and the aspect matches | |||
the requested size. The client should scale the image if it needs to fit | |||
within a given rectangle. | |||
""" | |||
def __init__(self, hs: "HomeServer"): | |||
# If we're not configured to use it, raise if we somehow got here. | |||
if not hs.config.media.can_load_media_repo: | |||
raise ConfigError("Synapse is not configured to use a media repo.") | |||
super().__init__() | |||
media_repo = hs.get_media_repository() | |||
self.putChild(b"upload", UploadResource(hs, media_repo)) | |||
self.putChild(b"download", DownloadResource(hs, media_repo)) | |||
self.putChild( | |||
b"thumbnail", ThumbnailResource(hs, media_repo, media_repo.media_storage) | |||
) | |||
if hs.config.media.url_preview_enabled: | |||
self.putChild( | |||
b"preview_url", | |||
PreviewUrlResource(hs, media_repo, media_repo.media_storage), | |||
) | |||
self.putChild(b"config", MediaConfigResource(hs)) |
@@ -40,21 +40,19 @@ from synapse.http.server import ( | |||
from synapse.http.servlet import parse_integer, parse_string | |||
from synapse.http.site import SynapseRequest | |||
from synapse.logging.context import make_deferred_yieldable, run_in_background | |||
from synapse.media._base import FileInfo, get_filename_from_headers | |||
from synapse.media.media_storage import MediaStorage | |||
from synapse.media.oembed import OEmbedProvider | |||
from synapse.media.preview_html import decode_body, parse_html_to_open_graph | |||
from synapse.metrics.background_process_metrics import run_as_background_process | |||
from synapse.rest.media.v1._base import get_filename_from_headers | |||
from synapse.rest.media.v1.media_storage import MediaStorage | |||
from synapse.rest.media.v1.oembed import OEmbedProvider | |||
from synapse.rest.media.v1.preview_html import decode_body, parse_html_to_open_graph | |||
from synapse.types import JsonDict, UserID | |||
from synapse.util import json_encoder | |||
from synapse.util.async_helpers import ObservableDeferred | |||
from synapse.util.caches.expiringcache import ExpiringCache | |||
from synapse.util.stringutils import random_string | |||
from ._base import FileInfo | |||
if TYPE_CHECKING: | |||
from synapse.rest.media.v1.media_repository import MediaRepository | |||
from synapse.media.media_repository import MediaRepository | |||
from synapse.server import HomeServer | |||
logger = logging.getLogger(__name__) |
@@ -27,9 +27,7 @@ from synapse.http.server import ( | |||
) | |||
from synapse.http.servlet import parse_integer, parse_string | |||
from synapse.http.site import SynapseRequest | |||
from synapse.rest.media.v1.media_storage import MediaStorage | |||
from ._base import ( | |||
from synapse.media._base import ( | |||
FileInfo, | |||
ThumbnailInfo, | |||
parse_media_id, | |||
@@ -37,9 +35,10 @@ from ._base import ( | |||
respond_with_file, | |||
respond_with_responder, | |||
) | |||
from synapse.media.media_storage import MediaStorage | |||
if TYPE_CHECKING: | |||
from synapse.rest.media.v1.media_repository import MediaRepository | |||
from synapse.media.media_repository import MediaRepository | |||
from synapse.server import HomeServer | |||
logger = logging.getLogger(__name__) |
@@ -20,10 +20,10 @@ from synapse.api.errors import Codes, SynapseError | |||
from synapse.http.server import DirectServeJsonResource, respond_with_json | |||
from synapse.http.servlet import parse_bytes_from_args | |||
from synapse.http.site import SynapseRequest | |||
from synapse.rest.media.v1.media_storage import SpamMediaException | |||
from synapse.media.media_storage import SpamMediaException | |||
if TYPE_CHECKING: | |||
from synapse.rest.media.v1.media_repository import MediaRepository | |||
from synapse.media.media_repository import MediaRepository | |||
from synapse.server import HomeServer | |||
logger = logging.getLogger(__name__) |
@@ -1,5 +1,4 @@ | |||
# Copyright 2014-2016 OpenMarket Ltd | |||
# Copyright 2019-2021 The Matrix.org Foundation C.I.C. | |||
# Copyright 2023 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
@@ -12,468 +11,7 @@ | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
# | |||
import logging | |||
import os | |||
import urllib | |||
from abc import ABC, abstractmethod | |||
from types import TracebackType | |||
from typing import Awaitable, Dict, Generator, List, Optional, Tuple, Type | |||
import attr | |||
from twisted.internet.interfaces import IConsumer | |||
from twisted.protocols.basic import FileSender | |||
from twisted.web.server import Request | |||
from synapse.api.errors import Codes, SynapseError, cs_error | |||
from synapse.http.server import finish_request, respond_with_json | |||
from synapse.http.site import SynapseRequest | |||
from synapse.logging.context import make_deferred_yieldable | |||
from synapse.util.stringutils import is_ascii, parse_and_validate_server_name | |||
logger = logging.getLogger(__name__) | |||
# list all text content types that will have the charset default to UTF-8 when | |||
# none is given | |||
TEXT_CONTENT_TYPES = [ | |||
"text/css", | |||
"text/csv", | |||
"text/html", | |||
"text/calendar", | |||
"text/plain", | |||
"text/javascript", | |||
"application/json", | |||
"application/ld+json", | |||
"application/rtf", | |||
"image/svg+xml", | |||
"text/xml", | |||
] | |||
def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]: | |||
"""Parses the server name, media ID and optional file name from the request URI | |||
Also performs some rough validation on the server name. | |||
Args: | |||
request: The `Request`. | |||
Returns: | |||
A tuple containing the parsed server name, media ID and optional file name. | |||
Raises: | |||
SynapseError(404): if parsing or validation fail for any reason | |||
""" | |||
try: | |||
# The type on postpath seems incorrect in Twisted 21.2.0. | |||
postpath: List[bytes] = request.postpath # type: ignore | |||
assert postpath | |||
# This allows users to append e.g. /test.png to the URL. Useful for | |||
# clients that parse the URL to see content type. | |||
server_name_bytes, media_id_bytes = postpath[:2] | |||
server_name = server_name_bytes.decode("utf-8") | |||
media_id = media_id_bytes.decode("utf8") | |||
# Validate the server name, raising if invalid | |||
parse_and_validate_server_name(server_name) | |||
file_name = None | |||
if len(postpath) > 2: | |||
try: | |||
file_name = urllib.parse.unquote(postpath[-1].decode("utf-8")) | |||
except UnicodeDecodeError: | |||
pass | |||
return server_name, media_id, file_name | |||
except Exception: | |||
raise SynapseError( | |||
404, "Invalid media id token %r" % (request.postpath,), Codes.UNKNOWN | |||
) | |||
def respond_404(request: SynapseRequest) -> None: | |||
respond_with_json( | |||
request, | |||
404, | |||
cs_error("Not found %r" % (request.postpath,), code=Codes.NOT_FOUND), | |||
send_cors=True, | |||
) | |||
async def respond_with_file( | |||
request: SynapseRequest, | |||
media_type: str, | |||
file_path: str, | |||
file_size: Optional[int] = None, | |||
upload_name: Optional[str] = None, | |||
) -> None: | |||
logger.debug("Responding with %r", file_path) | |||
if os.path.isfile(file_path): | |||
if file_size is None: | |||
stat = os.stat(file_path) | |||
file_size = stat.st_size | |||
add_file_headers(request, media_type, file_size, upload_name) | |||
with open(file_path, "rb") as f: | |||
await make_deferred_yieldable(FileSender().beginFileTransfer(f, request)) | |||
finish_request(request) | |||
else: | |||
respond_404(request) | |||
def add_file_headers( | |||
request: Request, | |||
media_type: str, | |||
file_size: Optional[int], | |||
upload_name: Optional[str], | |||
) -> None: | |||
"""Adds the correct response headers in preparation for responding with the | |||
media. | |||
Args: | |||
request | |||
media_type: The media/content type. | |||
file_size: Size in bytes of the media, if known. | |||
upload_name: The name of the requested file, if any. | |||
""" | |||
def _quote(x: str) -> str: | |||
return urllib.parse.quote(x.encode("utf-8")) | |||
# Default to a UTF-8 charset for text content types. | |||
# ex, uses UTF-8 for 'text/css' but not 'text/css; charset=UTF-16' | |||
if media_type.lower() in TEXT_CONTENT_TYPES: | |||
content_type = media_type + "; charset=UTF-8" | |||
else: | |||
content_type = media_type | |||
request.setHeader(b"Content-Type", content_type.encode("UTF-8")) | |||
if upload_name: | |||
# RFC6266 section 4.1 [1] defines both `filename` and `filename*`. | |||
# | |||
# `filename` is defined to be a `value`, which is defined by RFC2616 | |||
# section 3.6 [2] to be a `token` or a `quoted-string`, where a `token` | |||
# is (essentially) a single US-ASCII word, and a `quoted-string` is a | |||
# US-ASCII string surrounded by double-quotes, using backslash as an | |||
# escape character. Note that %-encoding is *not* permitted. | |||
# | |||
# `filename*` is defined to be an `ext-value`, which is defined in | |||
# RFC5987 section 3.2.1 [3] to be `charset "'" [ language ] "'" value-chars`, | |||
# where `value-chars` is essentially a %-encoded string in the given charset. | |||
# | |||
# [1]: https://tools.ietf.org/html/rfc6266#section-4.1 | |||
# [2]: https://tools.ietf.org/html/rfc2616#section-3.6 | |||
# [3]: https://tools.ietf.org/html/rfc5987#section-3.2.1 | |||
# We avoid the quoted-string version of `filename`, because (a) synapse didn't | |||
# correctly interpret those as of 0.99.2 and (b) they are a bit of a pain and we | |||
# may as well just do the filename* version. | |||
if _can_encode_filename_as_token(upload_name): | |||
disposition = "inline; filename=%s" % (upload_name,) | |||
else: | |||
disposition = "inline; filename*=utf-8''%s" % (_quote(upload_name),) | |||
request.setHeader(b"Content-Disposition", disposition.encode("ascii")) | |||
# cache for at least a day. | |||
# XXX: we might want to turn this off for data we don't want to | |||
# recommend caching as it's sensitive or private - or at least | |||
# select private. don't bother setting Expires as all our | |||
# clients are smart enough to be happy with Cache-Control | |||
request.setHeader(b"Cache-Control", b"public,max-age=86400,s-maxage=86400") | |||
if file_size is not None: | |||
request.setHeader(b"Content-Length", b"%d" % (file_size,)) | |||
# Tell web crawlers to not index, archive, or follow links in media. This | |||
# should help to prevent things in the media repo from showing up in web | |||
# search results. | |||
request.setHeader(b"X-Robots-Tag", "noindex, nofollow, noarchive, noimageindex") | |||
# separators as defined in RFC2616. SP and HT are handled separately. | |||
# see _can_encode_filename_as_token. | |||
_FILENAME_SEPARATOR_CHARS = { | |||
"(", | |||
")", | |||
"<", | |||
">", | |||
"@", | |||
",", | |||
";", | |||
":", | |||
"\\", | |||
'"', | |||
"/", | |||
"[", | |||
"]", | |||
"?", | |||
"=", | |||
"{", | |||
"}", | |||
} | |||
def _can_encode_filename_as_token(x: str) -> bool: | |||
for c in x: | |||
# from RFC2616: | |||
# | |||
# token = 1*<any CHAR except CTLs or separators> | |||
# | |||
# separators = "(" | ")" | "<" | ">" | "@" | |||
# | "," | ";" | ":" | "\" | <"> | |||
# | "/" | "[" | "]" | "?" | "=" | |||
# | "{" | "}" | SP | HT | |||
# | |||
# CHAR = <any US-ASCII character (octets 0 - 127)> | |||
# | |||
# CTL = <any US-ASCII control character | |||
# (octets 0 - 31) and DEL (127)> | |||
# | |||
if ord(c) >= 127 or ord(c) <= 32 or c in _FILENAME_SEPARATOR_CHARS: | |||
return False | |||
return True | |||
async def respond_with_responder( | |||
request: SynapseRequest, | |||
responder: "Optional[Responder]", | |||
media_type: str, | |||
file_size: Optional[int], | |||
upload_name: Optional[str] = None, | |||
) -> None: | |||
"""Responds to the request with given responder. If responder is None then | |||
returns 404. | |||
Args: | |||
request | |||
responder | |||
media_type: The media/content type. | |||
file_size: Size in bytes of the media. If not known it should be None | |||
upload_name: The name of the requested file, if any. | |||
""" | |||
if not responder: | |||
respond_404(request) | |||
return | |||
# If we have a responder we *must* use it as a context manager. | |||
with responder: | |||
if request._disconnected: | |||
logger.warning( | |||
"Not sending response to request %s, already disconnected.", request | |||
) | |||
return | |||
logger.debug("Responding to media request with responder %s", responder) | |||
add_file_headers(request, media_type, file_size, upload_name) | |||
try: | |||
await responder.write_to_consumer(request) | |||
except Exception as e: | |||
# The majority of the time this will be due to the client having gone | |||
# away. Unfortunately, Twisted simply throws a generic exception at us | |||
# in that case. | |||
logger.warning("Failed to write to consumer: %s %s", type(e), e) | |||
# Unregister the producer, if it has one, so Twisted doesn't complain | |||
if request.producer: | |||
request.unregisterProducer() | |||
finish_request(request) | |||
class Responder(ABC): | |||
"""Represents a response that can be streamed to the requester. | |||
Responder is a context manager which *must* be used, so that any resources | |||
held can be cleaned up. | |||
""" | |||
@abstractmethod | |||
def write_to_consumer(self, consumer: IConsumer) -> Awaitable: | |||
"""Stream response into consumer | |||
Args: | |||
consumer: The consumer to stream into. | |||
Returns: | |||
Resolves once the response has finished being written | |||
""" | |||
raise NotImplementedError() | |||
def __enter__(self) -> None: # noqa: B027 | |||
pass | |||
def __exit__( # noqa: B027 | |||
self, | |||
exc_type: Optional[Type[BaseException]], | |||
exc_val: Optional[BaseException], | |||
exc_tb: Optional[TracebackType], | |||
) -> None: | |||
pass | |||
@attr.s(slots=True, frozen=True, auto_attribs=True) | |||
class ThumbnailInfo: | |||
"""Details about a generated thumbnail.""" | |||
width: int | |||
height: int | |||
method: str | |||
# Content type of thumbnail, e.g. image/png | |||
type: str | |||
# The size of the media file, in bytes. | |||
length: Optional[int] = None | |||
@attr.s(slots=True, frozen=True, auto_attribs=True) | |||
class FileInfo: | |||
"""Details about a requested/uploaded file.""" | |||
# The server name where the media originated from, or None if local. | |||
server_name: Optional[str] | |||
# The local ID of the file. For local files this is the same as the media_id | |||
file_id: str | |||
# If the file is for the url preview cache | |||
url_cache: bool = False | |||
# Whether the file is a thumbnail or not. | |||
thumbnail: Optional[ThumbnailInfo] = None | |||
# The below properties exist to maintain compatibility with third-party modules. | |||
@property | |||
def thumbnail_width(self) -> Optional[int]: | |||
if not self.thumbnail: | |||
return None | |||
return self.thumbnail.width | |||
@property | |||
def thumbnail_height(self) -> Optional[int]: | |||
if not self.thumbnail: | |||
return None | |||
return self.thumbnail.height | |||
@property | |||
def thumbnail_method(self) -> Optional[str]: | |||
if not self.thumbnail: | |||
return None | |||
return self.thumbnail.method | |||
@property | |||
def thumbnail_type(self) -> Optional[str]: | |||
if not self.thumbnail: | |||
return None | |||
return self.thumbnail.type | |||
@property | |||
def thumbnail_length(self) -> Optional[int]: | |||
if not self.thumbnail: | |||
return None | |||
return self.thumbnail.length | |||
def get_filename_from_headers(headers: Dict[bytes, List[bytes]]) -> Optional[str]: | |||
""" | |||
Get the filename of the downloaded file by inspecting the | |||
Content-Disposition HTTP header. | |||
Args: | |||
headers: The HTTP request headers. | |||
Returns: | |||
The filename, or None. | |||
""" | |||
content_disposition = headers.get(b"Content-Disposition", [b""]) | |||
# No header, bail out. | |||
if not content_disposition[0]: | |||
return None | |||
_, params = _parse_header(content_disposition[0]) | |||
upload_name = None | |||
# First check if there is a valid UTF-8 filename | |||
upload_name_utf8 = params.get(b"filename*", None) | |||
if upload_name_utf8: | |||
if upload_name_utf8.lower().startswith(b"utf-8''"): | |||
upload_name_utf8 = upload_name_utf8[7:] | |||
# We have a filename*= section. This MUST be ASCII, and any UTF-8 | |||
# bytes are %-quoted. | |||
try: | |||
# Once it is decoded, we can then unquote the %-encoded | |||
# parts strictly into a unicode string. | |||
upload_name = urllib.parse.unquote( | |||
upload_name_utf8.decode("ascii"), errors="strict" | |||
) | |||
except UnicodeDecodeError: | |||
# Incorrect UTF-8. | |||
pass | |||
# If there isn't check for an ascii name. | |||
if not upload_name: | |||
upload_name_ascii = params.get(b"filename", None) | |||
if upload_name_ascii and is_ascii(upload_name_ascii): | |||
upload_name = upload_name_ascii.decode("ascii") | |||
# This may be None here, indicating we did not find a matching name. | |||
return upload_name | |||
def _parse_header(line: bytes) -> Tuple[bytes, Dict[bytes, bytes]]: | |||
"""Parse a Content-type like header. | |||
Cargo-culted from `cgi`, but works on bytes rather than strings. | |||
Args: | |||
line: header to be parsed | |||
Returns: | |||
The main content-type, followed by the parameter dictionary | |||
""" | |||
parts = _parseparam(b";" + line) | |||
key = next(parts) | |||
pdict = {} | |||
for p in parts: | |||
i = p.find(b"=") | |||
if i >= 0: | |||
name = p[:i].strip().lower() | |||
value = p[i + 1 :].strip() | |||
# strip double-quotes | |||
if len(value) >= 2 and value[0:1] == value[-1:] == b'"': | |||
value = value[1:-1] | |||
value = value.replace(b"\\\\", b"\\").replace(b'\\"', b'"') | |||
pdict[name] = value | |||
return key, pdict | |||
def _parseparam(s: bytes) -> Generator[bytes, None, None]: | |||
"""Generator which splits the input on ;, respecting double-quoted sequences | |||
Cargo-culted from `cgi`, but works on bytes rather than strings. | |||
Args: | |||
s: header to be parsed | |||
Returns: | |||
The split input | |||
""" | |||
while s[:1] == b";": | |||
s = s[1:] | |||
# look for the next ; | |||
end = s.find(b";") | |||
# if there is an odd number of " marks between here and the next ;, skip to the | |||
# next ; instead | |||
while end > 0 and (s.count(b'"', 0, end) - s.count(b'\\"', 0, end)) % 2: | |||
end = s.find(b";", end + 1) | |||
if end < 0: | |||
end = len(s) | |||
f = s[:end] | |||
yield f.strip() | |||
s = s[end:] | |||
# This exists purely for backwards compatibility with media providers and spam checkers. | |||
from synapse.media._base import FileInfo, Responder # noqa: F401 |
@@ -1,4 +1,4 @@ | |||
# Copyright 2018-2021 The Matrix.org Foundation C.I.C. | |||
# Copyright 2023 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
@@ -11,364 +11,7 @@ | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import contextlib | |||
import logging | |||
import os | |||
import shutil | |||
from types import TracebackType | |||
from typing import ( | |||
IO, | |||
TYPE_CHECKING, | |||
Any, | |||
Awaitable, | |||
BinaryIO, | |||
Callable, | |||
Generator, | |||
Optional, | |||
Sequence, | |||
Tuple, | |||
Type, | |||
) | |||
import attr | |||
from twisted.internet.defer import Deferred | |||
from twisted.internet.interfaces import IConsumer | |||
from twisted.protocols.basic import FileSender | |||
import synapse | |||
from synapse.api.errors import NotFoundError | |||
from synapse.logging.context import defer_to_thread, make_deferred_yieldable | |||
from synapse.util import Clock | |||
from synapse.util.file_consumer import BackgroundFileConsumer | |||
from ._base import FileInfo, Responder | |||
from .filepath import MediaFilePaths | |||
if TYPE_CHECKING: | |||
from synapse.rest.media.v1.storage_provider import StorageProvider | |||
from synapse.server import HomeServer | |||
logger = logging.getLogger(__name__) | |||
class MediaStorage: | |||
"""Responsible for storing/fetching files from local sources. | |||
Args: | |||
hs | |||
local_media_directory: Base path where we store media on disk | |||
filepaths | |||
storage_providers: List of StorageProvider that are used to fetch and store files. | |||
""" | |||
def __init__( | |||
self, | |||
hs: "HomeServer", | |||
local_media_directory: str, | |||
filepaths: MediaFilePaths, | |||
storage_providers: Sequence["StorageProvider"], | |||
): | |||
self.hs = hs | |||
self.reactor = hs.get_reactor() | |||
self.local_media_directory = local_media_directory | |||
self.filepaths = filepaths | |||
self.storage_providers = storage_providers | |||
self.spam_checker = hs.get_spam_checker() | |||
self.clock = hs.get_clock() | |||
async def store_file(self, source: IO, file_info: FileInfo) -> str: | |||
"""Write `source` to the on disk media store, and also any other | |||
configured storage providers | |||
Args: | |||
source: A file like object that should be written | |||
file_info: Info about the file to store | |||
Returns: | |||
the file path written to in the primary media store | |||
""" | |||
with self.store_into_file(file_info) as (f, fname, finish_cb): | |||
# Write to the main repository | |||
await self.write_to_file(source, f) | |||
await finish_cb() | |||
return fname | |||
async def write_to_file(self, source: IO, output: IO) -> None: | |||
"""Asynchronously write the `source` to `output`.""" | |||
await defer_to_thread(self.reactor, _write_file_synchronously, source, output) | |||
@contextlib.contextmanager | |||
def store_into_file( | |||
self, file_info: FileInfo | |||
) -> Generator[Tuple[BinaryIO, str, Callable[[], Awaitable[None]]], None, None]: | |||
"""Context manager used to get a file like object to write into, as | |||
described by file_info. | |||
Actually yields a 3-tuple (file, fname, finish_cb), where file is a file | |||
like object that can be written to, fname is the absolute path of file | |||
on disk, and finish_cb is a function that returns an awaitable. | |||
fname can be used to read the contents from after upload, e.g. to | |||
generate thumbnails. | |||
finish_cb must be called and waited on after the file has been | |||
successfully been written to. Should not be called if there was an | |||
error. | |||
Args: | |||
file_info: Info about the file to store | |||
Example: | |||
with media_storage.store_into_file(info) as (f, fname, finish_cb): | |||
# .. write into f ... | |||
await finish_cb() | |||
""" | |||
path = self._file_info_to_path(file_info) | |||
fname = os.path.join(self.local_media_directory, path) | |||
dirname = os.path.dirname(fname) | |||
os.makedirs(dirname, exist_ok=True) | |||
finished_called = [False] | |||
try: | |||
with open(fname, "wb") as f: | |||
async def finish() -> None: | |||
# Ensure that all writes have been flushed and close the | |||
# file. | |||
f.flush() | |||
f.close() | |||
spam_check = await self.spam_checker.check_media_file_for_spam( | |||
ReadableFileWrapper(self.clock, fname), file_info | |||
) | |||
if spam_check != synapse.module_api.NOT_SPAM: | |||
logger.info("Blocking media due to spam checker") | |||
# Note that we'll delete the stored media, due to the | |||
# try/except below. The media also won't be stored in | |||
# the DB. | |||
# We currently ignore any additional field returned by | |||
# the spam-check API. | |||
raise SpamMediaException(errcode=spam_check[0]) | |||
for provider in self.storage_providers: | |||
await provider.store_file(path, file_info) | |||
finished_called[0] = True | |||
yield f, fname, finish | |||
except Exception as e: | |||
try: | |||
os.remove(fname) | |||
except Exception: | |||
pass | |||
raise e from None | |||
if not finished_called: | |||
raise Exception("Finished callback not called") | |||
async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]: | |||
"""Attempts to fetch media described by file_info from the local cache | |||
and configured storage providers. | |||
Args: | |||
file_info | |||
Returns: | |||
Returns a Responder if the file was found, otherwise None. | |||
""" | |||
paths = [self._file_info_to_path(file_info)] | |||
# fallback for remote thumbnails with no method in the filename | |||
if file_info.thumbnail and file_info.server_name: | |||
paths.append( | |||
self.filepaths.remote_media_thumbnail_rel_legacy( | |||
server_name=file_info.server_name, | |||
file_id=file_info.file_id, | |||
width=file_info.thumbnail.width, | |||
height=file_info.thumbnail.height, | |||
content_type=file_info.thumbnail.type, | |||
) | |||
) | |||
for path in paths: | |||
local_path = os.path.join(self.local_media_directory, path) | |||
if os.path.exists(local_path): | |||
logger.debug("responding with local file %s", local_path) | |||
return FileResponder(open(local_path, "rb")) | |||
logger.debug("local file %s did not exist", local_path) | |||
for provider in self.storage_providers: | |||
for path in paths: | |||
res: Any = await provider.fetch(path, file_info) | |||
if res: | |||
logger.debug("Streaming %s from %s", path, provider) | |||
return res | |||
logger.debug("%s not found on %s", path, provider) | |||
return None | |||
async def ensure_media_is_in_local_cache(self, file_info: FileInfo) -> str: | |||
"""Ensures that the given file is in the local cache. Attempts to | |||
download it from storage providers if it isn't. | |||
Args: | |||
file_info | |||
Returns: | |||
Full path to local file | |||
""" | |||
path = self._file_info_to_path(file_info) | |||
local_path = os.path.join(self.local_media_directory, path) | |||
if os.path.exists(local_path): | |||
return local_path | |||
# Fallback for paths without method names | |||
# Should be removed in the future | |||
if file_info.thumbnail and file_info.server_name: | |||
legacy_path = self.filepaths.remote_media_thumbnail_rel_legacy( | |||
server_name=file_info.server_name, | |||
file_id=file_info.file_id, | |||
width=file_info.thumbnail.width, | |||
height=file_info.thumbnail.height, | |||
content_type=file_info.thumbnail.type, | |||
) | |||
legacy_local_path = os.path.join(self.local_media_directory, legacy_path) | |||
if os.path.exists(legacy_local_path): | |||
return legacy_local_path | |||
dirname = os.path.dirname(local_path) | |||
os.makedirs(dirname, exist_ok=True) | |||
for provider in self.storage_providers: | |||
res: Any = await provider.fetch(path, file_info) | |||
if res: | |||
with res: | |||
consumer = BackgroundFileConsumer( | |||
open(local_path, "wb"), self.reactor | |||
) | |||
await res.write_to_consumer(consumer) | |||
await consumer.wait() | |||
return local_path | |||
raise NotFoundError() | |||
def _file_info_to_path(self, file_info: FileInfo) -> str: | |||
"""Converts file_info into a relative path. | |||
The path is suitable for storing files under a directory, e.g. used to | |||
store files on local FS under the base media repository directory. | |||
""" | |||
if file_info.url_cache: | |||
if file_info.thumbnail: | |||
return self.filepaths.url_cache_thumbnail_rel( | |||
media_id=file_info.file_id, | |||
width=file_info.thumbnail.width, | |||
height=file_info.thumbnail.height, | |||
content_type=file_info.thumbnail.type, | |||
method=file_info.thumbnail.method, | |||
) | |||
return self.filepaths.url_cache_filepath_rel(file_info.file_id) | |||
if file_info.server_name: | |||
if file_info.thumbnail: | |||
return self.filepaths.remote_media_thumbnail_rel( | |||
server_name=file_info.server_name, | |||
file_id=file_info.file_id, | |||
width=file_info.thumbnail.width, | |||
height=file_info.thumbnail.height, | |||
content_type=file_info.thumbnail.type, | |||
method=file_info.thumbnail.method, | |||
) | |||
return self.filepaths.remote_media_filepath_rel( | |||
file_info.server_name, file_info.file_id | |||
) | |||
if file_info.thumbnail: | |||
return self.filepaths.local_media_thumbnail_rel( | |||
media_id=file_info.file_id, | |||
width=file_info.thumbnail.width, | |||
height=file_info.thumbnail.height, | |||
content_type=file_info.thumbnail.type, | |||
method=file_info.thumbnail.method, | |||
) | |||
return self.filepaths.local_media_filepath_rel(file_info.file_id) | |||
def _write_file_synchronously(source: IO, dest: IO) -> None: | |||
"""Write `source` to the file like `dest` synchronously. Should be called | |||
from a thread. | |||
Args: | |||
source: A file like object that's to be written | |||
dest: A file like object to be written to | |||
""" | |||
source.seek(0) # Ensure we read from the start of the file | |||
shutil.copyfileobj(source, dest) | |||
class FileResponder(Responder): | |||
"""Wraps an open file that can be sent to a request. | |||
Args: | |||
open_file: A file like object to be streamed ot the client, | |||
is closed when finished streaming. | |||
""" | |||
def __init__(self, open_file: IO): | |||
self.open_file = open_file | |||
def write_to_consumer(self, consumer: IConsumer) -> Deferred: | |||
return make_deferred_yieldable( | |||
FileSender().beginFileTransfer(self.open_file, consumer) | |||
) | |||
def __exit__( | |||
self, | |||
exc_type: Optional[Type[BaseException]], | |||
exc_val: Optional[BaseException], | |||
exc_tb: Optional[TracebackType], | |||
) -> None: | |||
self.open_file.close() | |||
class SpamMediaException(NotFoundError): | |||
"""The media was blocked by a spam checker, so we simply 404 the request (in | |||
the same way as if it was quarantined). | |||
""" | |||
@attr.s(slots=True, auto_attribs=True) | |||
class ReadableFileWrapper: | |||
"""Wrapper that allows reading a file in chunks, yielding to the reactor, | |||
and writing to a callback. | |||
This is simplified `FileSender` that takes an IO object rather than an | |||
`IConsumer`. | |||
""" | |||
CHUNK_SIZE = 2**14 | |||
clock: Clock | |||
path: str | |||
async def write_chunks_to(self, callback: Callable[[bytes], object]) -> None: | |||
"""Reads the file in chunks and calls the callback with each chunk.""" | |||
with open(self.path, "rb") as file: | |||
while True: | |||
chunk = file.read(self.CHUNK_SIZE) | |||
if not chunk: | |||
break | |||
callback(chunk) | |||
# | |||
# We yield to the reactor by sleeping for 0 seconds. | |||
await self.clock.sleep(0) | |||
# This exists purely for backwards compatibility with spam checkers. | |||
from synapse.media.media_storage import ReadableFileWrapper # noqa: F401 |
@@ -1,4 +1,4 @@ | |||
# Copyright 2018-2021 The Matrix.org Foundation C.I.C. | |||
# Copyright 2023 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
@@ -11,171 +11,7 @@ | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
# | |||
import abc | |||
import logging | |||
import os | |||
import shutil | |||
from typing import TYPE_CHECKING, Callable, Optional | |||
from synapse.config._base import Config | |||
from synapse.logging.context import defer_to_thread, run_in_background | |||
from synapse.util.async_helpers import maybe_awaitable | |||
from ._base import FileInfo, Responder | |||
from .media_storage import FileResponder | |||
logger = logging.getLogger(__name__) | |||
if TYPE_CHECKING: | |||
from synapse.server import HomeServer | |||
class StorageProvider(metaclass=abc.ABCMeta): | |||
"""A storage provider is a service that can store uploaded media and | |||
retrieve them. | |||
""" | |||
@abc.abstractmethod | |||
async def store_file(self, path: str, file_info: FileInfo) -> None: | |||
"""Store the file described by file_info. The actual contents can be | |||
retrieved by reading the file in file_info.upload_path. | |||
Args: | |||
path: Relative path of file in local cache | |||
file_info: The metadata of the file. | |||
""" | |||
@abc.abstractmethod | |||
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: | |||
"""Attempt to fetch the file described by file_info and stream it | |||
into writer. | |||
Args: | |||
path: Relative path of file in local cache | |||
file_info: The metadata of the file. | |||
Returns: | |||
Returns a Responder if the provider has the file, otherwise returns None. | |||
""" | |||
class StorageProviderWrapper(StorageProvider): | |||
"""Wraps a storage provider and provides various config options | |||
Args: | |||
backend: The storage provider to wrap. | |||
store_local: Whether to store new local files or not. | |||
store_synchronous: Whether to wait for file to be successfully | |||
uploaded, or todo the upload in the background. | |||
store_remote: Whether remote media should be uploaded | |||
""" | |||
def __init__( | |||
self, | |||
backend: StorageProvider, | |||
store_local: bool, | |||
store_synchronous: bool, | |||
store_remote: bool, | |||
): | |||
self.backend = backend | |||
self.store_local = store_local | |||
self.store_synchronous = store_synchronous | |||
self.store_remote = store_remote | |||
def __str__(self) -> str: | |||
return "StorageProviderWrapper[%s]" % (self.backend,) | |||
async def store_file(self, path: str, file_info: FileInfo) -> None: | |||
if not file_info.server_name and not self.store_local: | |||
return None | |||
if file_info.server_name and not self.store_remote: | |||
return None | |||
if file_info.url_cache: | |||
# The URL preview cache is short lived and not worth offloading or | |||
# backing up. | |||
return None | |||
if self.store_synchronous: | |||
# store_file is supposed to return an Awaitable, but guard | |||
# against improper implementations. | |||
await maybe_awaitable(self.backend.store_file(path, file_info)) # type: ignore | |||
else: | |||
# TODO: Handle errors. | |||
async def store() -> None: | |||
try: | |||
return await maybe_awaitable( | |||
self.backend.store_file(path, file_info) | |||
) | |||
except Exception: | |||
logger.exception("Error storing file") | |||
run_in_background(store) | |||
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: | |||
if file_info.url_cache: | |||
# Files in the URL preview cache definitely aren't stored here, | |||
# so avoid any potentially slow I/O or network access. | |||
return None | |||
# store_file is supposed to return an Awaitable, but guard | |||
# against improper implementations. | |||
return await maybe_awaitable(self.backend.fetch(path, file_info)) | |||
class FileStorageProviderBackend(StorageProvider): | |||
"""A storage provider that stores files in a directory on a filesystem. | |||
Args: | |||
hs | |||
config: The config returned by `parse_config`. | |||
""" | |||
def __init__(self, hs: "HomeServer", config: str): | |||
self.hs = hs | |||
self.cache_directory = hs.config.media.media_store_path | |||
self.base_directory = config | |||
def __str__(self) -> str: | |||
return "FileStorageProviderBackend[%s]" % (self.base_directory,) | |||
async def store_file(self, path: str, file_info: FileInfo) -> None: | |||
"""See StorageProvider.store_file""" | |||
primary_fname = os.path.join(self.cache_directory, path) | |||
backup_fname = os.path.join(self.base_directory, path) | |||
dirname = os.path.dirname(backup_fname) | |||
os.makedirs(dirname, exist_ok=True) | |||
# mypy needs help inferring the type of the second parameter, which is generic | |||
shutil_copyfile: Callable[[str, str], str] = shutil.copyfile | |||
await defer_to_thread( | |||
self.hs.get_reactor(), | |||
shutil_copyfile, | |||
primary_fname, | |||
backup_fname, | |||
) | |||
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: | |||
"""See StorageProvider.fetch""" | |||
backup_fname = os.path.join(self.base_directory, path) | |||
if os.path.isfile(backup_fname): | |||
return FileResponder(open(backup_fname, "rb")) | |||
return None | |||
@staticmethod | |||
def parse_config(config: dict) -> str: | |||
"""Called on startup to parse config supplied. This should parse | |||
the config and raise if there is a problem. | |||
The returned value is passed into the constructor. | |||
In this case we only care about a single param, the directory, so let's | |||
just pull that out. | |||
""" | |||
return Config.ensure_directory(config["directory"]) | |||
# This exists purely for backwards compatibility with media providers. | |||
from synapse.media.storage_provider import StorageProvider # noqa: F401 |
@@ -105,6 +105,7 @@ from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler | |||
from synapse.handlers.user_directory import UserDirectoryHandler | |||
from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient | |||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient | |||
from synapse.media.media_repository import MediaRepository | |||
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager | |||
from synapse.module_api import ModuleApi | |||
from synapse.notifier import Notifier, ReplicationNotifier | |||
@@ -115,10 +116,7 @@ from synapse.replication.tcp.external_cache import ExternalCache | |||
from synapse.replication.tcp.handler import ReplicationCommandHandler | |||
from synapse.replication.tcp.resource import ReplicationStreamer | |||
from synapse.replication.tcp.streams import STREAMS_MAP, Stream | |||
from synapse.rest.media.v1.media_repository import ( | |||
MediaRepository, | |||
MediaRepositoryResource, | |||
) | |||
from synapse.rest.media.media_repository_resource import MediaRepositoryResource | |||
from synapse.server_notices.server_notices_manager import ServerNoticesManager | |||
from synapse.server_notices.server_notices_sender import ServerNoticesSender | |||
from synapse.server_notices.worker_server_notices_sender import ( | |||
@@ -1,4 +1,4 @@ | |||
# Copyright 2018 New Vector Ltd | |||
# Copyright 2023 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. |
@@ -12,7 +12,7 @@ | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from synapse.rest.media.v1._base import get_filename_from_headers | |||
from synapse.media._base import get_filename_from_headers | |||
from tests import unittest | |||
@@ -15,7 +15,7 @@ import inspect | |||
import os | |||
from typing import Iterable | |||
from synapse.rest.media.v1.filepath import MediaFilePaths, _wrap_with_jail_check | |||
from synapse.media.filepath import MediaFilePaths, _wrap_with_jail_check | |||
from tests import unittest | |||
@@ -12,7 +12,7 @@ | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from synapse.rest.media.v1.preview_html import ( | |||
from synapse.media.preview_html import ( | |||
_get_html_media_encodings, | |||
decode_body, | |||
parse_html_to_open_graph, |
@@ -34,13 +34,13 @@ from synapse.events import EventBase | |||
from synapse.events.spamcheck import load_legacy_spam_checkers | |||
from synapse.http.types import QueryParams | |||
from synapse.logging.context import make_deferred_yieldable | |||
from synapse.media._base import FileInfo | |||
from synapse.media.filepath import MediaFilePaths | |||
from synapse.media.media_storage import MediaStorage, ReadableFileWrapper | |||
from synapse.media.storage_provider import FileStorageProviderBackend | |||
from synapse.module_api import ModuleApi | |||
from synapse.rest import admin | |||
from synapse.rest.client import login | |||
from synapse.rest.media.v1._base import FileInfo | |||
from synapse.rest.media.v1.filepath import MediaFilePaths | |||
from synapse.rest.media.v1.media_storage import MediaStorage, ReadableFileWrapper | |||
from synapse.rest.media.v1.storage_provider import FileStorageProviderBackend | |||
from synapse.server import HomeServer | |||
from synapse.types import JsonDict, RoomAlias | |||
from synapse.util import Clock | |||
@@ -253,7 +253,7 @@ class MediaRepoTests(unittest.HomeserverTestCase): | |||
config["max_image_pixels"] = 2000000 | |||
provider_config = { | |||
"module": "synapse.rest.media.v1.storage_provider.FileStorageProviderBackend", | |||
"module": "synapse.media.storage_provider.FileStorageProviderBackend", | |||
"store_local": True, | |||
"store_synchronous": False, | |||
"store_remote": True, |
@@ -18,7 +18,7 @@ from parameterized import parameterized | |||
from twisted.test.proto_helpers import MemoryReactor | |||
from synapse.rest.media.v1.oembed import OEmbedProvider, OEmbedResult | |||
from synapse.media.oembed import OEmbedProvider, OEmbedResult | |||
from synapse.server import HomeServer | |||
from synapse.types import JsonDict | |||
from synapse.util import Clock |
@@ -20,8 +20,8 @@ from twisted.test.proto_helpers import MemoryReactor | |||
import synapse.rest.admin | |||
from synapse.api.errors import Codes | |||
from synapse.media.filepath import MediaFilePaths | |||
from synapse.rest.client import login, profile, room | |||
from synapse.rest.media.v1.filepath import MediaFilePaths | |||
from synapse.server import HomeServer | |||
from synapse.util import Clock | |||
@@ -28,8 +28,8 @@ import synapse.rest.admin | |||
from synapse.api.constants import ApprovalNoticeMedium, LoginType, UserTypes | |||
from synapse.api.errors import Codes, HttpResponseException, ResourceLimitError | |||
from synapse.api.room_versions import RoomVersions | |||
from synapse.media.filepath import MediaFilePaths | |||
from synapse.rest.client import devices, login, logout, profile, register, room, sync | |||
from synapse.rest.media.v1.filepath import MediaFilePaths | |||
from synapse.server import HomeServer | |||
from synapse.types import JsonDict, UserID, create_requester | |||
from synapse.util import Clock | |||
@@ -26,8 +26,8 @@ from twisted.internet.interfaces import IAddress, IResolutionReceiver | |||
from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactor | |||
from synapse.config.oembed import OEmbedEndpointConfig | |||
from synapse.rest.media.v1.media_repository import MediaRepositoryResource | |||
from synapse.rest.media.v1.preview_url_resource import IMAGE_CACHE_EXPIRY_MS | |||
from synapse.rest.media.media_repository_resource import MediaRepositoryResource | |||
from synapse.rest.media.preview_url_resource import IMAGE_CACHE_EXPIRY_MS | |||
from synapse.server import HomeServer | |||
from synapse.types import JsonDict | |||
from synapse.util import Clock | |||
@@ -82,7 +82,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): | |||
config["media_store_path"] = self.media_store_path | |||
provider_config = { | |||
"module": "synapse.rest.media.v1.storage_provider.FileStorageProviderBackend", | |||
"module": "synapse.media.storage_provider.FileStorageProviderBackend", | |||
"store_local": True, | |||
"store_synchronous": False, | |||
"store_remote": True, |