Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 
 
 
 

335 rindas
12 KiB

  1. # Copyright 2019 Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import argparse
  15. import json
  16. import logging
  17. import os
  18. import sys
  19. import tempfile
  20. from typing import List, Mapping, Optional, Sequence
  21. from twisted.internet import defer, task
  22. import synapse
  23. from synapse.app import _base
  24. from synapse.config._base import ConfigError
  25. from synapse.config.homeserver import HomeServerConfig
  26. from synapse.config.logger import setup_logging
  27. from synapse.events import EventBase
  28. from synapse.handlers.admin import ExfiltrationWriter
  29. from synapse.server import HomeServer
  30. from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
  31. from synapse.storage.databases.main.account_data import AccountDataWorkerStore
  32. from synapse.storage.databases.main.appservice import (
  33. ApplicationServiceTransactionWorkerStore,
  34. ApplicationServiceWorkerStore,
  35. )
  36. from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
  37. from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
  38. from synapse.storage.databases.main.devices import DeviceWorkerStore
  39. from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
  40. from synapse.storage.databases.main.event_push_actions import (
  41. EventPushActionsWorkerStore,
  42. )
  43. from synapse.storage.databases.main.events_worker import EventsWorkerStore
  44. from synapse.storage.databases.main.filtering import FilteringWorkerStore
  45. from synapse.storage.databases.main.media_repository import MediaRepositoryStore
  46. from synapse.storage.databases.main.profile import ProfileWorkerStore
  47. from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
  48. from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
  49. from synapse.storage.databases.main.registration import RegistrationWorkerStore
  50. from synapse.storage.databases.main.relations import RelationsWorkerStore
  51. from synapse.storage.databases.main.room import RoomWorkerStore
  52. from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
  53. from synapse.storage.databases.main.signatures import SignatureWorkerStore
  54. from synapse.storage.databases.main.state import StateGroupWorkerStore
  55. from synapse.storage.databases.main.stream import StreamWorkerStore
  56. from synapse.storage.databases.main.tags import TagsWorkerStore
  57. from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
  58. from synapse.types import JsonMapping, StateMap
  59. from synapse.util import SYNAPSE_VERSION
  60. from synapse.util.logcontext import LoggingContext
  61. logger = logging.getLogger("synapse.app.admin_cmd")
  62. class AdminCmdStore(
  63. FilteringWorkerStore,
  64. ClientIpWorkerStore,
  65. DeviceWorkerStore,
  66. TagsWorkerStore,
  67. DeviceInboxWorkerStore,
  68. AccountDataWorkerStore,
  69. PushRulesWorkerStore,
  70. ApplicationServiceTransactionWorkerStore,
  71. ApplicationServiceWorkerStore,
  72. RoomMemberWorkerStore,
  73. RelationsWorkerStore,
  74. EventFederationWorkerStore,
  75. EventPushActionsWorkerStore,
  76. StateGroupWorkerStore,
  77. SignatureWorkerStore,
  78. UserErasureWorkerStore,
  79. ReceiptsWorkerStore,
  80. StreamWorkerStore,
  81. EventsWorkerStore,
  82. RegistrationWorkerStore,
  83. RoomWorkerStore,
  84. ProfileWorkerStore,
  85. MediaRepositoryStore,
  86. ):
  87. def __init__(
  88. self,
  89. database: DatabasePool,
  90. db_conn: LoggingDatabaseConnection,
  91. hs: "HomeServer",
  92. ):
  93. super().__init__(database, db_conn, hs)
  94. # Annoyingly `filter_events_for_client` assumes that this exists. We
  95. # should refactor it to take a `Clock` directly.
  96. self.clock = hs.get_clock()
  97. class AdminCmdServer(HomeServer):
  98. DATASTORE_CLASS = AdminCmdStore # type: ignore
  99. async def export_data_command(hs: HomeServer, args: argparse.Namespace) -> None:
  100. """Export data for a user."""
  101. user_id = args.user_id
  102. directory = args.output_directory
  103. res = await hs.get_admin_handler().export_user_data(
  104. user_id, FileExfiltrationWriter(user_id, directory=directory)
  105. )
  106. print(res)
  107. class FileExfiltrationWriter(ExfiltrationWriter):
  108. """An ExfiltrationWriter that writes the users data to a directory.
  109. Returns the directory location on completion.
  110. Note: This writes to disk on the main reactor thread.
  111. Args:
  112. user_id: The user whose data is being exfiltrated.
  113. directory: The directory to write the data to, if None then will write
  114. to a temporary directory.
  115. """
  116. def __init__(self, user_id: str, directory: Optional[str] = None):
  117. self.user_id = user_id
  118. if directory:
  119. self.base_directory = directory
  120. else:
  121. self.base_directory = tempfile.mkdtemp(
  122. prefix="synapse-exfiltrate__%s__" % (user_id,)
  123. )
  124. os.makedirs(self.base_directory, exist_ok=True)
  125. if list(os.listdir(self.base_directory)):
  126. raise Exception("Directory must be empty")
  127. def write_events(self, room_id: str, events: List[EventBase]) -> None:
  128. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  129. os.makedirs(room_directory, exist_ok=True)
  130. events_file = os.path.join(room_directory, "events")
  131. with open(events_file, "a") as f:
  132. for event in events:
  133. json.dump(event.get_pdu_json(), fp=f)
  134. def write_state(
  135. self, room_id: str, event_id: str, state: StateMap[EventBase]
  136. ) -> None:
  137. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  138. state_directory = os.path.join(room_directory, "state")
  139. os.makedirs(state_directory, exist_ok=True)
  140. event_file = os.path.join(state_directory, event_id)
  141. with open(event_file, "a") as f:
  142. for event in state.values():
  143. json.dump(event.get_pdu_json(), fp=f)
  144. def write_invite(
  145. self, room_id: str, event: EventBase, state: StateMap[EventBase]
  146. ) -> None:
  147. self.write_events(room_id, [event])
  148. # We write the invite state somewhere else as they aren't full events
  149. # and are only a subset of the state at the event.
  150. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  151. os.makedirs(room_directory, exist_ok=True)
  152. invite_state = os.path.join(room_directory, "invite_state")
  153. with open(invite_state, "a") as f:
  154. for event in state.values():
  155. json.dump(event, fp=f)
  156. def write_knock(
  157. self, room_id: str, event: EventBase, state: StateMap[EventBase]
  158. ) -> None:
  159. self.write_events(room_id, [event])
  160. # We write the knock state somewhere else as they aren't full events
  161. # and are only a subset of the state at the event.
  162. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  163. os.makedirs(room_directory, exist_ok=True)
  164. knock_state = os.path.join(room_directory, "knock_state")
  165. with open(knock_state, "a") as f:
  166. for event in state.values():
  167. json.dump(event, fp=f)
  168. def write_profile(self, profile: JsonMapping) -> None:
  169. user_directory = os.path.join(self.base_directory, "user_data")
  170. os.makedirs(user_directory, exist_ok=True)
  171. profile_file = os.path.join(user_directory, "profile")
  172. with open(profile_file, "a") as f:
  173. json.dump(profile, fp=f)
  174. def write_devices(self, devices: Sequence[JsonMapping]) -> None:
  175. user_directory = os.path.join(self.base_directory, "user_data")
  176. os.makedirs(user_directory, exist_ok=True)
  177. device_file = os.path.join(user_directory, "devices")
  178. for device in devices:
  179. with open(device_file, "a") as f:
  180. json.dump(device, fp=f)
  181. def write_connections(self, connections: Sequence[JsonMapping]) -> None:
  182. user_directory = os.path.join(self.base_directory, "user_data")
  183. os.makedirs(user_directory, exist_ok=True)
  184. connection_file = os.path.join(user_directory, "connections")
  185. for connection in connections:
  186. with open(connection_file, "a") as f:
  187. json.dump(connection, fp=f)
  188. def write_account_data(
  189. self, file_name: str, account_data: Mapping[str, JsonMapping]
  190. ) -> None:
  191. account_data_directory = os.path.join(
  192. self.base_directory, "user_data", "account_data"
  193. )
  194. os.makedirs(account_data_directory, exist_ok=True)
  195. account_data_file = os.path.join(account_data_directory, file_name)
  196. with open(account_data_file, "a") as f:
  197. json.dump(account_data, fp=f)
  198. def write_media_id(self, media_id: str, media_metadata: JsonMapping) -> None:
  199. file_directory = os.path.join(self.base_directory, "media_ids")
  200. os.makedirs(file_directory, exist_ok=True)
  201. media_id_file = os.path.join(file_directory, media_id)
  202. with open(media_id_file, "w") as f:
  203. json.dump(media_metadata, fp=f)
  204. def finished(self) -> str:
  205. return self.base_directory
  206. def start(config_options: List[str]) -> None:
  207. parser = argparse.ArgumentParser(description="Synapse Admin Command")
  208. HomeServerConfig.add_arguments_to_parser(parser)
  209. subparser = parser.add_subparsers(
  210. title="Admin Commands",
  211. required=True,
  212. dest="command",
  213. metavar="<admin_command>",
  214. help="The admin command to perform.",
  215. )
  216. export_data_parser = subparser.add_parser(
  217. "export-data", help="Export all data for a user"
  218. )
  219. export_data_parser.add_argument("user_id", help="User to extra data from")
  220. export_data_parser.add_argument(
  221. "--output-directory",
  222. action="store",
  223. metavar="DIRECTORY",
  224. required=False,
  225. help="The directory to store the exported data in. Must be empty. Defaults"
  226. " to creating a temp directory.",
  227. )
  228. export_data_parser.set_defaults(func=export_data_command)
  229. try:
  230. config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
  231. except ConfigError as e:
  232. sys.stderr.write("\n" + str(e) + "\n")
  233. sys.exit(1)
  234. if config.worker.worker_app is not None:
  235. assert config.worker.worker_app == "synapse.app.admin_cmd"
  236. # Update the config with some basic overrides so that don't have to specify
  237. # a full worker config.
  238. config.worker.worker_app = "synapse.app.admin_cmd"
  239. if not config.worker.worker_daemonize and not config.worker.worker_log_config:
  240. # Since we're meant to be run as a "command" let's not redirect stdio
  241. # unless we've actually set log config.
  242. config.logging.no_redirect_stdio = True
  243. # Explicitly disable background processes
  244. config.worker.should_update_user_directory = False
  245. config.worker.run_background_tasks = False
  246. config.worker.start_pushers = False
  247. config.worker.pusher_shard_config.instances = []
  248. config.worker.send_federation = False
  249. config.worker.federation_shard_config.instances = []
  250. synapse.events.USE_FROZEN_DICTS = config.server.use_frozen_dicts
  251. ss = AdminCmdServer(
  252. config.server.server_name,
  253. config=config,
  254. version_string=f"Synapse/{SYNAPSE_VERSION}",
  255. )
  256. setup_logging(ss, config, use_worker_options=True)
  257. ss.setup()
  258. # We use task.react as the basic run command as it correctly handles tearing
  259. # down the reactor when the deferreds resolve and setting the return value.
  260. # We also make sure that `_base.start` gets run before we actually run the
  261. # command.
  262. async def run() -> None:
  263. with LoggingContext("command"):
  264. await _base.start(ss)
  265. await args.func(ss, args)
  266. _base.start_worker_reactor(
  267. "synapse-admin-cmd",
  268. config,
  269. run_command=lambda: task.react(lambda _reactor: defer.ensureDeferred(run())),
  270. )
  271. if __name__ == "__main__":
  272. with LoggingContext("main"):
  273. start(sys.argv[1:])