Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.
 
 
 
 
 
 

411 Zeilen
15 KiB

  1. # Copyright 2014-2016 OpenMarket Ltd
  2. # Copyright 2020-2021 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import functools
  16. import os
  17. import re
  18. import string
  19. from typing import Any, Callable, List, TypeVar, Union, cast
  20. NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
  21. F = TypeVar("F", bound=Callable[..., str])
  22. def _wrap_in_base_path(func: F) -> F:
  23. """Takes a function that returns a relative path and turns it into an
  24. absolute path based on the location of the primary media store
  25. """
  26. @functools.wraps(func)
  27. def _wrapped(self: "MediaFilePaths", *args: Any, **kwargs: Any) -> str:
  28. path = func(self, *args, **kwargs)
  29. return os.path.join(self.base_path, path)
  30. return cast(F, _wrapped)
  31. GetPathMethod = TypeVar(
  32. "GetPathMethod", bound=Union[Callable[..., str], Callable[..., List[str]]]
  33. )
  34. def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]:
  35. """Wraps a path-returning method to check that the returned path(s) do not escape
  36. the media store directory.
  37. The path-returning method may return either a single path, or a list of paths.
  38. The check is not expected to ever fail, unless `func` is missing a call to
  39. `_validate_path_component`, or `_validate_path_component` is buggy.
  40. Args:
  41. relative: A boolean indicating whether the wrapped method returns paths relative
  42. to the media store directory.
  43. Returns:
  44. A method which will wrap a path-returning method, adding a check to ensure that
  45. the returned path(s) lie within the media store directory. The check will raise
  46. a `ValueError` if it fails.
  47. """
  48. def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod:
  49. @functools.wraps(func)
  50. def _wrapped(
  51. self: "MediaFilePaths", *args: Any, **kwargs: Any
  52. ) -> Union[str, List[str]]:
  53. path_or_paths = func(self, *args, **kwargs)
  54. if isinstance(path_or_paths, list):
  55. paths_to_check = path_or_paths
  56. else:
  57. paths_to_check = [path_or_paths]
  58. for path in paths_to_check:
  59. # Construct the path that will ultimately be used.
  60. # We cannot guess whether `path` is relative to the media store
  61. # directory, since the media store directory may itself be a relative
  62. # path.
  63. if relative:
  64. path = os.path.join(self.base_path, path)
  65. normalized_path = os.path.normpath(path)
  66. # Now that `normpath` has eliminated `../`s and `./`s from the path,
  67. # `os.path.commonpath` can be used to check whether it lies within the
  68. # media store directory.
  69. if (
  70. os.path.commonpath([normalized_path, self.normalized_base_path])
  71. != self.normalized_base_path
  72. ):
  73. # The path resolves to outside the media store directory,
  74. # or `self.base_path` is `.`, which is an unlikely configuration.
  75. raise ValueError(f"Invalid media store path: {path!r}")
  76. # Note that `os.path.normpath`/`abspath` has a subtle caveat:
  77. # `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a
  78. # different path if `a/b/c` is a symlink. That is, the check above is
  79. # not perfect and may allow a certain restricted subset of untrustworthy
  80. # paths through. Since the check above is secondary to the main
  81. # `_validate_path_component` checks, it's less important for it to be
  82. # perfect.
  83. #
  84. # As an alternative, `os.path.realpath` will resolve symlinks, but
  85. # proves problematic if there are symlinks inside the media store.
  86. # eg. if `url_store/` is symlinked to elsewhere, its canonical path
  87. # won't match that of the main media store directory.
  88. return path_or_paths
  89. return cast(GetPathMethod, _wrapped)
  90. return _wrap_with_jail_check_inner
  91. ALLOWED_CHARACTERS = set(
  92. string.ascii_letters
  93. + string.digits
  94. + "_-"
  95. + ".[]:" # Domain names, IPv6 addresses and ports in server names
  96. )
  97. FORBIDDEN_NAMES = {
  98. "",
  99. os.path.curdir, # "." for the current platform
  100. os.path.pardir, # ".." for the current platform
  101. }
  102. def _validate_path_component(name: str) -> str:
  103. """Checks that the given string can be safely used as a path component
  104. Args:
  105. name: The path component to check.
  106. Returns:
  107. The path component if valid.
  108. Raises:
  109. ValueError: If `name` cannot be safely used as a path component.
  110. """
  111. if not ALLOWED_CHARACTERS.issuperset(name) or name in FORBIDDEN_NAMES:
  112. raise ValueError(f"Invalid path component: {name!r}")
  113. return name
  114. class MediaFilePaths:
  115. """Describes where files are stored on disk.
  116. Most of the functions have a `*_rel` variant which returns a file path that
  117. is relative to the base media store path. This is mainly used when we want
  118. to write to the backup media store (when one is configured)
  119. """
  120. def __init__(self, primary_base_path: str):
  121. self.base_path = primary_base_path
  122. self.normalized_base_path = os.path.normpath(self.base_path)
  123. # Refuse to initialize if paths cannot be validated correctly for the current
  124. # platform.
  125. assert os.path.sep not in ALLOWED_CHARACTERS
  126. assert os.path.altsep not in ALLOWED_CHARACTERS
  127. # On Windows, paths have all sorts of weirdness which `_validate_path_component`
  128. # does not consider. In any case, the remote media store can't work correctly
  129. # for certain homeservers there, since ":"s aren't allowed in paths.
  130. assert os.name == "posix"
  131. @_wrap_with_jail_check(relative=True)
  132. def local_media_filepath_rel(self, media_id: str) -> str:
  133. return os.path.join(
  134. "local_content",
  135. _validate_path_component(media_id[0:2]),
  136. _validate_path_component(media_id[2:4]),
  137. _validate_path_component(media_id[4:]),
  138. )
  139. local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
  140. @_wrap_with_jail_check(relative=True)
  141. def local_media_thumbnail_rel(
  142. self, media_id: str, width: int, height: int, content_type: str, method: str
  143. ) -> str:
  144. top_level_type, sub_type = content_type.split("/")
  145. file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
  146. return os.path.join(
  147. "local_thumbnails",
  148. _validate_path_component(media_id[0:2]),
  149. _validate_path_component(media_id[2:4]),
  150. _validate_path_component(media_id[4:]),
  151. _validate_path_component(file_name),
  152. )
  153. local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
  154. @_wrap_with_jail_check(relative=False)
  155. def local_media_thumbnail_dir(self, media_id: str) -> str:
  156. """
  157. Retrieve the local store path of thumbnails of a given media_id
  158. Args:
  159. media_id: The media ID to query.
  160. Returns:
  161. Path of local_thumbnails from media_id
  162. """
  163. return os.path.join(
  164. self.base_path,
  165. "local_thumbnails",
  166. _validate_path_component(media_id[0:2]),
  167. _validate_path_component(media_id[2:4]),
  168. _validate_path_component(media_id[4:]),
  169. )
  170. @_wrap_with_jail_check(relative=True)
  171. def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
  172. return os.path.join(
  173. "remote_content",
  174. _validate_path_component(server_name),
  175. _validate_path_component(file_id[0:2]),
  176. _validate_path_component(file_id[2:4]),
  177. _validate_path_component(file_id[4:]),
  178. )
  179. remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
  180. @_wrap_with_jail_check(relative=True)
  181. def remote_media_thumbnail_rel(
  182. self,
  183. server_name: str,
  184. file_id: str,
  185. width: int,
  186. height: int,
  187. content_type: str,
  188. method: str,
  189. ) -> str:
  190. top_level_type, sub_type = content_type.split("/")
  191. file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
  192. return os.path.join(
  193. "remote_thumbnail",
  194. _validate_path_component(server_name),
  195. _validate_path_component(file_id[0:2]),
  196. _validate_path_component(file_id[2:4]),
  197. _validate_path_component(file_id[4:]),
  198. _validate_path_component(file_name),
  199. )
  200. remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel)
  201. # Legacy path that was used to store thumbnails previously.
  202. # Should be removed after some time, when most of the thumbnails are stored
  203. # using the new path.
  204. @_wrap_with_jail_check(relative=True)
  205. def remote_media_thumbnail_rel_legacy(
  206. self, server_name: str, file_id: str, width: int, height: int, content_type: str
  207. ) -> str:
  208. top_level_type, sub_type = content_type.split("/")
  209. file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type)
  210. return os.path.join(
  211. "remote_thumbnail",
  212. _validate_path_component(server_name),
  213. _validate_path_component(file_id[0:2]),
  214. _validate_path_component(file_id[2:4]),
  215. _validate_path_component(file_id[4:]),
  216. _validate_path_component(file_name),
  217. )
  218. @_wrap_with_jail_check(relative=False)
  219. def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
  220. return os.path.join(
  221. self.base_path,
  222. "remote_thumbnail",
  223. _validate_path_component(server_name),
  224. _validate_path_component(file_id[0:2]),
  225. _validate_path_component(file_id[2:4]),
  226. _validate_path_component(file_id[4:]),
  227. )
  228. @_wrap_with_jail_check(relative=True)
  229. def url_cache_filepath_rel(self, media_id: str) -> str:
  230. if NEW_FORMAT_ID_RE.match(media_id):
  231. # Media id is of the form <DATE><RANDOM_STRING>
  232. # E.g.: 2017-09-28-fsdRDt24DS234dsf
  233. return os.path.join(
  234. "url_cache",
  235. _validate_path_component(media_id[:10]),
  236. _validate_path_component(media_id[11:]),
  237. )
  238. else:
  239. return os.path.join(
  240. "url_cache",
  241. _validate_path_component(media_id[0:2]),
  242. _validate_path_component(media_id[2:4]),
  243. _validate_path_component(media_id[4:]),
  244. )
  245. url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
  246. @_wrap_with_jail_check(relative=False)
  247. def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
  248. "The dirs to try and remove if we delete the media_id file"
  249. if NEW_FORMAT_ID_RE.match(media_id):
  250. return [
  251. os.path.join(
  252. self.base_path, "url_cache", _validate_path_component(media_id[:10])
  253. )
  254. ]
  255. else:
  256. return [
  257. os.path.join(
  258. self.base_path,
  259. "url_cache",
  260. _validate_path_component(media_id[0:2]),
  261. _validate_path_component(media_id[2:4]),
  262. ),
  263. os.path.join(
  264. self.base_path, "url_cache", _validate_path_component(media_id[0:2])
  265. ),
  266. ]
  267. @_wrap_with_jail_check(relative=True)
  268. def url_cache_thumbnail_rel(
  269. self, media_id: str, width: int, height: int, content_type: str, method: str
  270. ) -> str:
  271. # Media id is of the form <DATE><RANDOM_STRING>
  272. # E.g.: 2017-09-28-fsdRDt24DS234dsf
  273. top_level_type, sub_type = content_type.split("/")
  274. file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
  275. if NEW_FORMAT_ID_RE.match(media_id):
  276. return os.path.join(
  277. "url_cache_thumbnails",
  278. _validate_path_component(media_id[:10]),
  279. _validate_path_component(media_id[11:]),
  280. _validate_path_component(file_name),
  281. )
  282. else:
  283. return os.path.join(
  284. "url_cache_thumbnails",
  285. _validate_path_component(media_id[0:2]),
  286. _validate_path_component(media_id[2:4]),
  287. _validate_path_component(media_id[4:]),
  288. _validate_path_component(file_name),
  289. )
  290. url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
  291. @_wrap_with_jail_check(relative=True)
  292. def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
  293. # Media id is of the form <DATE><RANDOM_STRING>
  294. # E.g.: 2017-09-28-fsdRDt24DS234dsf
  295. if NEW_FORMAT_ID_RE.match(media_id):
  296. return os.path.join(
  297. "url_cache_thumbnails",
  298. _validate_path_component(media_id[:10]),
  299. _validate_path_component(media_id[11:]),
  300. )
  301. else:
  302. return os.path.join(
  303. "url_cache_thumbnails",
  304. _validate_path_component(media_id[0:2]),
  305. _validate_path_component(media_id[2:4]),
  306. _validate_path_component(media_id[4:]),
  307. )
  308. url_cache_thumbnail_directory = _wrap_in_base_path(
  309. url_cache_thumbnail_directory_rel
  310. )
  311. @_wrap_with_jail_check(relative=False)
  312. def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
  313. "The dirs to try and remove if we delete the media_id thumbnails"
  314. # Media id is of the form <DATE><RANDOM_STRING>
  315. # E.g.: 2017-09-28-fsdRDt24DS234dsf
  316. if NEW_FORMAT_ID_RE.match(media_id):
  317. return [
  318. os.path.join(
  319. self.base_path,
  320. "url_cache_thumbnails",
  321. _validate_path_component(media_id[:10]),
  322. _validate_path_component(media_id[11:]),
  323. ),
  324. os.path.join(
  325. self.base_path,
  326. "url_cache_thumbnails",
  327. _validate_path_component(media_id[:10]),
  328. ),
  329. ]
  330. else:
  331. return [
  332. os.path.join(
  333. self.base_path,
  334. "url_cache_thumbnails",
  335. _validate_path_component(media_id[0:2]),
  336. _validate_path_component(media_id[2:4]),
  337. _validate_path_component(media_id[4:]),
  338. ),
  339. os.path.join(
  340. self.base_path,
  341. "url_cache_thumbnails",
  342. _validate_path_component(media_id[0:2]),
  343. _validate_path_component(media_id[2:4]),
  344. ),
  345. os.path.join(
  346. self.base_path,
  347. "url_cache_thumbnails",
  348. _validate_path_component(media_id[0:2]),
  349. ),
  350. ]