Não pode escolher mais do que 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 
 
 
 

272 linhas
10 KiB

  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import html
  15. import logging
  16. import urllib.parse
  17. from typing import TYPE_CHECKING, List, Optional, cast
  18. import attr
  19. from synapse.media.preview_html import parse_html_description
  20. from synapse.types import JsonDict
  21. from synapse.util import json_decoder
  22. if TYPE_CHECKING:
  23. from lxml import etree
  24. from synapse.server import HomeServer
  25. logger = logging.getLogger(__name__)
  26. @attr.s(slots=True, frozen=True, auto_attribs=True)
  27. class OEmbedResult:
  28. # The Open Graph result (converted from the oEmbed result).
  29. open_graph_result: JsonDict
  30. # The author_name of the oEmbed result
  31. author_name: Optional[str]
  32. # Number of milliseconds to cache the content, according to the oEmbed response.
  33. #
  34. # This will be None if no cache-age is provided in the oEmbed response (or
  35. # if the oEmbed response cannot be turned into an Open Graph response).
  36. cache_age: Optional[int]
  37. class OEmbedProvider:
  38. """
  39. A helper for accessing oEmbed content.
  40. It can be used to check if a URL should be accessed via oEmbed and for
  41. requesting/parsing oEmbed content.
  42. """
  43. def __init__(self, hs: "HomeServer"):
  44. self._oembed_patterns = {}
  45. for oembed_endpoint in hs.config.oembed.oembed_patterns:
  46. api_endpoint = oembed_endpoint.api_endpoint
  47. # Only JSON is supported at the moment. This could be declared in
  48. # the formats field. Otherwise, if the endpoint ends in .xml assume
  49. # it doesn't support JSON.
  50. if (
  51. oembed_endpoint.formats is not None
  52. and "json" not in oembed_endpoint.formats
  53. ) or api_endpoint.endswith(".xml"):
  54. logger.info(
  55. "Ignoring oEmbed endpoint due to not supporting JSON: %s",
  56. api_endpoint,
  57. )
  58. continue
  59. # Iterate through each URL pattern and point it to the endpoint.
  60. for pattern in oembed_endpoint.url_patterns:
  61. self._oembed_patterns[pattern] = api_endpoint
  62. def get_oembed_url(self, url: str) -> Optional[str]:
  63. """
  64. Check whether the URL should be downloaded as oEmbed content instead.
  65. Args:
  66. url: The URL to check.
  67. Returns:
  68. A URL to use instead or None if the original URL should be used.
  69. """
  70. for url_pattern, endpoint in self._oembed_patterns.items():
  71. if url_pattern.fullmatch(url):
  72. # TODO Specify max height / width.
  73. # Note that only the JSON format is supported, some endpoints want
  74. # this in the URL, others want it as an argument.
  75. endpoint = endpoint.replace("{format}", "json")
  76. args = {"url": url, "format": "json"}
  77. query_str = urllib.parse.urlencode(args, True)
  78. return f"{endpoint}?{query_str}"
  79. # No match.
  80. return None
  81. def autodiscover_from_html(self, tree: "etree._Element") -> Optional[str]:
  82. """
  83. Search an HTML document for oEmbed autodiscovery information.
  84. Args:
  85. tree: The parsed HTML body.
  86. Returns:
  87. The URL to use for oEmbed information, or None if no URL was found.
  88. """
  89. # Search for link elements with the proper rel and type attributes.
  90. # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
  91. for tag in cast(
  92. List["etree._Element"],
  93. tree.xpath("//link[@rel='alternate'][@type='application/json+oembed']"),
  94. ):
  95. if "href" in tag.attrib:
  96. return cast(str, tag.attrib["href"])
  97. # Some providers (e.g. Flickr) use alternative instead of alternate.
  98. # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
  99. for tag in cast(
  100. List["etree._Element"],
  101. tree.xpath("//link[@rel='alternative'][@type='application/json+oembed']"),
  102. ):
  103. if "href" in tag.attrib:
  104. return cast(str, tag.attrib["href"])
  105. return None
  106. def parse_oembed_response(self, url: str, raw_body: bytes) -> OEmbedResult:
  107. """
  108. Parse the oEmbed response into an Open Graph response.
  109. Args:
  110. url: The URL which is being previewed (not the one which was
  111. requested).
  112. raw_body: The oEmbed response as JSON encoded as bytes.
  113. Returns:
  114. json-encoded Open Graph data
  115. """
  116. try:
  117. # oEmbed responses *must* be UTF-8 according to the spec.
  118. oembed = json_decoder.decode(raw_body.decode("utf-8"))
  119. except ValueError:
  120. return OEmbedResult({}, None, None)
  121. # The version is a required string field, but not always provided,
  122. # or sometimes provided as a float. Be lenient.
  123. oembed_version = oembed.get("version", "1.0")
  124. if oembed_version != "1.0" and oembed_version != 1:
  125. return OEmbedResult({}, None, None)
  126. # Attempt to parse the cache age, if possible.
  127. try:
  128. cache_age = int(oembed.get("cache_age")) * 1000
  129. except (TypeError, ValueError):
  130. # If the cache age cannot be parsed (e.g. wrong type or invalid
  131. # string), ignore it.
  132. cache_age = None
  133. # The oEmbed response converted to Open Graph.
  134. open_graph_response: JsonDict = {"og:url": url}
  135. title = oembed.get("title")
  136. if title and isinstance(title, str):
  137. # A common WordPress plug-in seems to incorrectly escape entities
  138. # in the oEmbed response.
  139. open_graph_response["og:title"] = html.unescape(title)
  140. author_name = oembed.get("author_name")
  141. if not isinstance(author_name, str):
  142. author_name = None
  143. # Use the provider name and as the site.
  144. provider_name = oembed.get("provider_name")
  145. if provider_name and isinstance(provider_name, str):
  146. open_graph_response["og:site_name"] = provider_name
  147. # If a thumbnail exists, use it. Note that dimensions will be calculated later.
  148. thumbnail_url = oembed.get("thumbnail_url")
  149. if thumbnail_url and isinstance(thumbnail_url, str):
  150. open_graph_response["og:image"] = thumbnail_url
  151. # Process each type separately.
  152. oembed_type = oembed.get("type")
  153. if oembed_type == "rich":
  154. html_str = oembed.get("html")
  155. if isinstance(html_str, str):
  156. calc_description_and_urls(open_graph_response, html_str)
  157. elif oembed_type == "photo":
  158. # If this is a photo, use the full image, not the thumbnail.
  159. url = oembed.get("url")
  160. if url and isinstance(url, str):
  161. open_graph_response["og:image"] = url
  162. elif oembed_type == "video":
  163. open_graph_response["og:type"] = "video.other"
  164. html_str = oembed.get("html")
  165. if html_str and isinstance(html_str, str):
  166. calc_description_and_urls(open_graph_response, oembed["html"])
  167. for size in ("width", "height"):
  168. val = oembed.get(size)
  169. if type(val) is int: # noqa: E721
  170. open_graph_response[f"og:video:{size}"] = val
  171. elif oembed_type == "link":
  172. open_graph_response["og:type"] = "website"
  173. else:
  174. logger.warning("Unknown oEmbed type: %s", oembed_type)
  175. return OEmbedResult(open_graph_response, author_name, cache_age)
  176. def _fetch_urls(tree: "etree._Element", tag_name: str) -> List[str]:
  177. results = []
  178. # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
  179. for tag in cast(List["etree._Element"], tree.xpath("//*/" + tag_name)):
  180. if "src" in tag.attrib:
  181. results.append(cast(str, tag.attrib["src"]))
  182. return results
  183. def calc_description_and_urls(open_graph_response: JsonDict, html_body: str) -> None:
  184. """
  185. Calculate description for an HTML document.
  186. This uses lxml to convert the HTML document into plaintext. If errors
  187. occur during processing of the document, an empty response is returned.
  188. Args:
  189. open_graph_response: The current Open Graph summary. This is updated with additional fields.
  190. html_body: The HTML document, as bytes.
  191. Returns:
  192. The summary
  193. """
  194. # If there's no body, nothing useful is going to be found.
  195. if not html_body:
  196. return
  197. from lxml import etree
  198. # Create an HTML parser. If this fails, log and return no metadata.
  199. parser = etree.HTMLParser(recover=True, encoding="utf-8")
  200. # Attempt to parse the body. If this fails, log and return no metadata.
  201. # TODO Develop of lxml-stubs has this correct.
  202. tree = etree.fromstring(html_body, parser) # type: ignore[arg-type]
  203. # The data was successfully parsed, but no tree was found.
  204. if tree is None:
  205. return # type: ignore[unreachable]
  206. # Attempt to find interesting URLs (images, videos, embeds).
  207. if "og:image" not in open_graph_response:
  208. image_urls = _fetch_urls(tree, "img")
  209. if image_urls:
  210. open_graph_response["og:image"] = image_urls[0]
  211. video_urls = _fetch_urls(tree, "video") + _fetch_urls(tree, "embed")
  212. if video_urls:
  213. open_graph_response["og:video"] = video_urls[0]
  214. description = parse_html_description(tree)
  215. if description:
  216. open_graph_response["og:description"] = description