Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.
 
 
 
 
 
 

407 righe
15 KiB

  1. # Copyright 2014-2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import abc
  15. import logging
  16. from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Set
  17. import attr
  18. from synapse.api.constants import Direction, Membership
  19. from synapse.events import EventBase
  20. from synapse.types import JsonMapping, RoomStreamToken, StateMap, UserID, UserInfo
  21. from synapse.visibility import filter_events_for_client
  22. if TYPE_CHECKING:
  23. from synapse.server import HomeServer
  24. logger = logging.getLogger(__name__)
  25. class AdminHandler:
  26. def __init__(self, hs: "HomeServer"):
  27. self._store = hs.get_datastores().main
  28. self._device_handler = hs.get_device_handler()
  29. self._storage_controllers = hs.get_storage_controllers()
  30. self._state_storage_controller = self._storage_controllers.state
  31. self._msc3866_enabled = hs.config.experimental.msc3866.enabled
  32. async def get_whois(self, user: UserID) -> JsonMapping:
  33. connections = []
  34. sessions = await self._store.get_user_ip_and_agents(user)
  35. for session in sessions:
  36. connections.append(
  37. {
  38. "ip": session["ip"],
  39. "last_seen": session["last_seen"],
  40. "user_agent": session["user_agent"],
  41. }
  42. )
  43. ret = {
  44. "user_id": user.to_string(),
  45. "devices": {"": {"sessions": [{"connections": connections}]}},
  46. }
  47. return ret
  48. async def get_user(self, user: UserID) -> Optional[JsonMapping]:
  49. """Function to get user details"""
  50. user_info: Optional[UserInfo] = await self._store.get_user_by_id(
  51. user.to_string()
  52. )
  53. if user_info is None:
  54. return None
  55. user_info_dict = {
  56. "name": user.to_string(),
  57. "admin": user_info.is_admin,
  58. "deactivated": user_info.is_deactivated,
  59. "locked": user_info.locked,
  60. "shadow_banned": user_info.is_shadow_banned,
  61. "creation_ts": user_info.creation_ts,
  62. "appservice_id": user_info.appservice_id,
  63. "consent_server_notice_sent": user_info.consent_server_notice_sent,
  64. "consent_version": user_info.consent_version,
  65. "consent_ts": user_info.consent_ts,
  66. "user_type": user_info.user_type,
  67. "is_guest": user_info.is_guest,
  68. }
  69. if self._msc3866_enabled:
  70. # Only include the approved flag if support for MSC3866 is enabled.
  71. user_info_dict["approved"] = user_info.approved
  72. # Add additional user metadata
  73. profile = await self._store.get_profileinfo(user)
  74. threepids = await self._store.user_get_threepids(user.to_string())
  75. external_ids = [
  76. ({"auth_provider": auth_provider, "external_id": external_id})
  77. for auth_provider, external_id in await self._store.get_external_ids_by_user(
  78. user.to_string()
  79. )
  80. ]
  81. user_info_dict["displayname"] = profile.display_name
  82. user_info_dict["avatar_url"] = profile.avatar_url
  83. user_info_dict["threepids"] = [attr.asdict(t) for t in threepids]
  84. user_info_dict["external_ids"] = external_ids
  85. user_info_dict["erased"] = await self._store.is_user_erased(user.to_string())
  86. last_seen_ts = await self._store.get_last_seen_for_user_id(user.to_string())
  87. user_info_dict["last_seen_ts"] = last_seen_ts
  88. return user_info_dict
  89. async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> Any:
  90. """Write all data we have on the user to the given writer.
  91. Args:
  92. user_id: The user ID to fetch data of.
  93. writer: The writer to write to.
  94. Returns:
  95. Resolves when all data for a user has been written.
  96. The returned value is that returned by `writer.finished()`.
  97. """
  98. # Get all rooms the user is in or has been in
  99. rooms = await self._store.get_rooms_for_local_user_where_membership_is(
  100. user_id,
  101. membership_list=(
  102. Membership.JOIN,
  103. Membership.LEAVE,
  104. Membership.BAN,
  105. Membership.INVITE,
  106. Membership.KNOCK,
  107. ),
  108. )
  109. # We only try and fetch events for rooms the user has been in. If
  110. # they've been e.g. invited to a room without joining then we handle
  111. # those separately.
  112. rooms_user_has_been_in = await self._store.get_rooms_user_has_been_in(user_id)
  113. for index, room in enumerate(rooms):
  114. room_id = room.room_id
  115. logger.info(
  116. "[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms)
  117. )
  118. forgotten = await self._store.did_forget(user_id, room_id)
  119. if forgotten:
  120. logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
  121. continue
  122. if room_id not in rooms_user_has_been_in:
  123. # If we haven't been in the rooms then the filtering code below
  124. # won't return anything, so we need to handle these cases
  125. # explicitly.
  126. if room.membership == Membership.INVITE:
  127. event_id = room.event_id
  128. invite = await self._store.get_event(event_id, allow_none=True)
  129. if invite:
  130. invited_state = invite.unsigned["invite_room_state"]
  131. writer.write_invite(room_id, invite, invited_state)
  132. if room.membership == Membership.KNOCK:
  133. event_id = room.event_id
  134. knock = await self._store.get_event(event_id, allow_none=True)
  135. if knock:
  136. knock_state = knock.unsigned["knock_room_state"]
  137. writer.write_knock(room_id, knock, knock_state)
  138. continue
  139. # We only want to bother fetching events up to the last time they
  140. # were joined. We estimate that point by looking at the
  141. # stream_ordering of the last membership if it wasn't a join.
  142. if room.membership == Membership.JOIN:
  143. stream_ordering = self._store.get_room_max_stream_ordering()
  144. else:
  145. stream_ordering = room.stream_ordering
  146. from_key = RoomStreamToken(topological=0, stream=0)
  147. to_key = RoomStreamToken(stream=stream_ordering)
  148. # Events that we've processed in this room
  149. written_events: Set[str] = set()
  150. # We need to track gaps in the events stream so that we can then
  151. # write out the state at those events. We do this by keeping track
  152. # of events whose prev events we haven't seen.
  153. # Map from event ID to prev events that haven't been processed,
  154. # dict[str, set[str]].
  155. event_to_unseen_prevs = {}
  156. # The reverse mapping to above, i.e. map from unseen event to events
  157. # that have the unseen event in their prev_events, i.e. the unseen
  158. # events "children".
  159. unseen_to_child_events: Dict[str, Set[str]] = {}
  160. # We fetch events in the room the user could see by fetching *all*
  161. # events that we have and then filtering, this isn't the most
  162. # efficient method perhaps but it does guarantee we get everything.
  163. while True:
  164. events, _ = await self._store.paginate_room_events(
  165. room_id, from_key, to_key, limit=100, direction=Direction.FORWARDS
  166. )
  167. if not events:
  168. break
  169. from_key = events[-1].internal_metadata.after
  170. events = await filter_events_for_client(
  171. self._storage_controllers, user_id, events
  172. )
  173. writer.write_events(room_id, events)
  174. # Update the extremity tracking dicts
  175. for event in events:
  176. # Check if we have any prev events that haven't been
  177. # processed yet, and add those to the appropriate dicts.
  178. unseen_events = set(event.prev_event_ids()) - written_events
  179. if unseen_events:
  180. event_to_unseen_prevs[event.event_id] = unseen_events
  181. for unseen in unseen_events:
  182. unseen_to_child_events.setdefault(unseen, set()).add(
  183. event.event_id
  184. )
  185. # Now check if this event is an unseen prev event, if so
  186. # then we remove this event from the appropriate dicts.
  187. for child_id in unseen_to_child_events.pop(event.event_id, []):
  188. event_to_unseen_prevs[child_id].discard(event.event_id)
  189. written_events.add(event.event_id)
  190. logger.info(
  191. "Written %d events in room %s", len(written_events), room_id
  192. )
  193. # Extremities are the events who have at least one unseen prev event.
  194. extremities = (
  195. event_id
  196. for event_id, unseen_prevs in event_to_unseen_prevs.items()
  197. if unseen_prevs
  198. )
  199. for event_id in extremities:
  200. if not event_to_unseen_prevs[event_id]:
  201. continue
  202. state = await self._state_storage_controller.get_state_for_event(
  203. event_id
  204. )
  205. writer.write_state(room_id, event_id, state)
  206. # Get the user profile
  207. profile = await self.get_user(UserID.from_string(user_id))
  208. if profile is not None:
  209. writer.write_profile(profile)
  210. logger.info("[%s] Written profile", user_id)
  211. # Get all devices the user has
  212. devices = await self._device_handler.get_devices_by_user(user_id)
  213. writer.write_devices(devices)
  214. logger.info("[%s] Written %s devices", user_id, len(devices))
  215. # Get all connections the user has
  216. connections = await self.get_whois(UserID.from_string(user_id))
  217. writer.write_connections(
  218. connections["devices"][""]["sessions"][0]["connections"]
  219. )
  220. logger.info("[%s] Written %s connections", user_id, len(connections))
  221. # Get all account data the user has global and in rooms
  222. global_data = await self._store.get_global_account_data_for_user(user_id)
  223. by_room_data = await self._store.get_room_account_data_for_user(user_id)
  224. writer.write_account_data("global", global_data)
  225. for room_id in by_room_data:
  226. writer.write_account_data(room_id, by_room_data[room_id])
  227. logger.info(
  228. "[%s] Written account data for %s rooms", user_id, len(by_room_data)
  229. )
  230. # Get all media ids the user has
  231. limit = 100
  232. start = 0
  233. while True:
  234. media_ids, total = await self._store.get_local_media_by_user_paginate(
  235. start, limit, user_id
  236. )
  237. for media in media_ids:
  238. writer.write_media_id(media.media_id, attr.asdict(media))
  239. logger.info(
  240. "[%s] Written %d media_ids of %s",
  241. user_id,
  242. (start + len(media_ids)),
  243. total,
  244. )
  245. if (start + limit) >= total:
  246. break
  247. start += limit
  248. return writer.finished()
  249. class ExfiltrationWriter(metaclass=abc.ABCMeta):
  250. """Interface used to specify how to write exported data."""
  251. @abc.abstractmethod
  252. def write_events(self, room_id: str, events: List[EventBase]) -> None:
  253. """Write a batch of events for a room."""
  254. raise NotImplementedError()
  255. @abc.abstractmethod
  256. def write_state(
  257. self, room_id: str, event_id: str, state: StateMap[EventBase]
  258. ) -> None:
  259. """Write the state at the given event in the room.
  260. This only gets called for backward extremities rather than for each
  261. event.
  262. """
  263. raise NotImplementedError()
  264. @abc.abstractmethod
  265. def write_invite(
  266. self, room_id: str, event: EventBase, state: StateMap[EventBase]
  267. ) -> None:
  268. """Write an invite for the room, with associated invite state.
  269. Args:
  270. room_id: The room ID the invite is for.
  271. event: The invite event.
  272. state: A subset of the state at the invite, with a subset of the
  273. event keys (type, state_key content and sender).
  274. """
  275. raise NotImplementedError()
  276. @abc.abstractmethod
  277. def write_knock(
  278. self, room_id: str, event: EventBase, state: StateMap[EventBase]
  279. ) -> None:
  280. """Write a knock for the room, with associated knock state.
  281. Args:
  282. room_id: The room ID the knock is for.
  283. event: The knock event.
  284. state: A subset of the state at the knock, with a subset of the
  285. event keys (type, state_key content and sender).
  286. """
  287. raise NotImplementedError()
  288. @abc.abstractmethod
  289. def write_profile(self, profile: JsonMapping) -> None:
  290. """Write the profile of a user.
  291. Args:
  292. profile: The user profile.
  293. """
  294. raise NotImplementedError()
  295. @abc.abstractmethod
  296. def write_devices(self, devices: Sequence[JsonMapping]) -> None:
  297. """Write the devices of a user.
  298. Args:
  299. devices: The list of devices.
  300. """
  301. raise NotImplementedError()
  302. @abc.abstractmethod
  303. def write_connections(self, connections: Sequence[JsonMapping]) -> None:
  304. """Write the connections of a user.
  305. Args:
  306. connections: The list of connections / sessions.
  307. """
  308. raise NotImplementedError()
  309. @abc.abstractmethod
  310. def write_account_data(
  311. self, file_name: str, account_data: Mapping[str, JsonMapping]
  312. ) -> None:
  313. """Write the account data of a user.
  314. Args:
  315. file_name: file name to write data
  316. account_data: mapping of global or room account_data
  317. """
  318. raise NotImplementedError()
  319. @abc.abstractmethod
  320. def write_media_id(self, media_id: str, media_metadata: JsonMapping) -> None:
  321. """Write the media's metadata of a user.
  322. Exports only the metadata, as this can be fetched from the database via
  323. read only. In order to access the files, a connection to the correct
  324. media repository would be required.
  325. Args:
  326. media_id: ID of the media.
  327. media_metadata: Metadata of one media file.
  328. """
  329. @abc.abstractmethod
  330. def finished(self) -> Any:
  331. """Called when all data has successfully been exported and written.
  332. This functions return value is passed to the caller of
  333. `export_user_data`.
  334. """
  335. raise NotImplementedError()