You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

408 regels
17 KiB

  1. # Copyright 2014-2016 OpenMarket Ltd
  2. # Copyright 2020 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from collections import OrderedDict
  16. from typing import Hashable, Optional, Tuple
  17. from synapse.api.errors import LimitExceededError
  18. from synapse.config.ratelimiting import RatelimitSettings
  19. from synapse.storage.databases.main import DataStore
  20. from synapse.types import Requester
  21. from synapse.util import Clock
  22. class Ratelimiter:
  23. """
  24. Ratelimit actions marked by arbitrary keys.
  25. (Note that the source code speaks of "actions" and "burst_count" rather than
  26. "tokens" and a "bucket_size".)
  27. This is a "leaky bucket as a meter". For each key to be tracked there is a bucket
  28. containing some number 0 <= T <= `burst_count` of tokens corresponding to previously
  29. permitted requests for that key. Each bucket starts empty, and gradually leaks
  30. tokens at a rate of `rate_hz`.
  31. Upon an incoming request, we must determine:
  32. - the key that this request falls under (which bucket to inspect), and
  33. - the cost C of this request in tokens.
  34. Then, if there is room in the bucket for C tokens (T + C <= `burst_count`),
  35. the request is permitted and `cost` tokens are added to the bucket.
  36. Otherwise, the request is denied, and the bucket continues to hold T tokens.
  37. This means that the limiter enforces an average request frequency of `rate_hz`,
  38. while accumulating a buffer of up to `burst_count` requests which can be consumed
  39. instantaneously.
  40. The tricky bit is the leaking. We do not want to have a periodic process which
  41. leaks every bucket! Instead, we track
  42. - the time point when the bucket was last completely empty, and
  43. - how many tokens have added to the bucket permitted since then.
  44. Then for each incoming request, we can calculate how many tokens have leaked
  45. since this time point, and use that to decide if we should accept or reject the
  46. request.
  47. Args:
  48. store: The datastore providing get_ratelimit_for_user.
  49. clock: A homeserver clock, for retrieving the current time
  50. cfg: The ratelimit configuration for this rate limiter including the
  51. allowed rate and burst count.
  52. """
  53. def __init__(
  54. self,
  55. store: DataStore,
  56. clock: Clock,
  57. cfg: RatelimitSettings,
  58. ):
  59. self.clock = clock
  60. self.rate_hz = cfg.per_second
  61. self.burst_count = cfg.burst_count
  62. self.store = store
  63. self._limiter_name = cfg.key
  64. # An ordered dictionary representing the token buckets tracked by this rate
  65. # limiter. Each entry maps a key of arbitrary type to a tuple representing:
  66. # * The number of tokens currently in the bucket,
  67. # * The time point when the bucket was last completely empty, and
  68. # * The rate_hz (leak rate) of this particular bucket.
  69. self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()
  70. def _get_key(
  71. self, requester: Optional[Requester], key: Optional[Hashable]
  72. ) -> Hashable:
  73. """Use the requester's MXID as a fallback key if no key is provided."""
  74. if key is None:
  75. if not requester:
  76. raise ValueError("Must supply at least one of `requester` or `key`")
  77. key = requester.user.to_string()
  78. return key
  79. def _get_action_counts(
  80. self, key: Hashable, time_now_s: float
  81. ) -> Tuple[float, float, float]:
  82. """Retrieve the action counts, with a fallback representing an empty bucket."""
  83. return self.actions.get(key, (0.0, time_now_s, 0.0))
  84. async def can_do_action(
  85. self,
  86. requester: Optional[Requester],
  87. key: Optional[Hashable] = None,
  88. rate_hz: Optional[float] = None,
  89. burst_count: Optional[int] = None,
  90. update: bool = True,
  91. n_actions: int = 1,
  92. _time_now_s: Optional[float] = None,
  93. ) -> Tuple[bool, float]:
  94. """Can the entity (e.g. user or IP address) perform the action?
  95. Checks if the user has ratelimiting disabled in the database by looking
  96. for null/zero values in the `ratelimit_override` table. (Non-zero
  97. values aren't honoured, as they're specific to the event sending
  98. ratelimiter, rather than all ratelimiters)
  99. Args:
  100. requester: The requester that is doing the action, if any. Used to check
  101. if the user has ratelimits disabled in the database.
  102. key: An arbitrary key used to classify an action. Defaults to the
  103. requester's user ID.
  104. rate_hz: The long term number of actions that can be performed in a second.
  105. Overrides the value set during instantiation if set.
  106. burst_count: How many actions that can be performed before being limited.
  107. Overrides the value set during instantiation if set.
  108. update: Whether to count this check as performing the action
  109. n_actions: The number of times the user wants to do this action. If the user
  110. cannot do all of the actions, the user's action count is not incremented
  111. at all.
  112. _time_now_s: The current time. Optional, defaults to the current time according
  113. to self.clock. Only used by tests.
  114. Returns:
  115. A tuple containing:
  116. * A bool indicating if they can perform the action now
  117. * The reactor timestamp for when the action can be performed next.
  118. -1 if rate_hz is less than or equal to zero
  119. """
  120. key = self._get_key(requester, key)
  121. if requester:
  122. # Disable rate limiting of users belonging to any AS that is configured
  123. # not to be rate limited in its registration file (rate_limited: true|false).
  124. if requester.app_service and not requester.app_service.is_rate_limited():
  125. return True, -1.0
  126. # Check if ratelimiting has been disabled for the user.
  127. #
  128. # Note that we don't use the returned rate/burst count, as the table
  129. # is specifically for the event sending ratelimiter. Instead, we
  130. # only use it to (somewhat cheekily) infer whether the user should
  131. # be subject to any rate limiting or not.
  132. override = await self.store.get_ratelimit_for_user(
  133. requester.authenticated_entity
  134. )
  135. if override and not override.messages_per_second:
  136. return True, -1.0
  137. # Override default values if set
  138. time_now_s = _time_now_s if _time_now_s is not None else self.clock.time()
  139. rate_hz = rate_hz if rate_hz is not None else self.rate_hz
  140. burst_count = burst_count if burst_count is not None else self.burst_count
  141. # Remove any expired entries
  142. self._prune_message_counts(time_now_s)
  143. # Check if there is an existing count entry for this key
  144. action_count, time_start, _ = self._get_action_counts(key, time_now_s)
  145. # Check whether performing another action is allowed
  146. time_delta = time_now_s - time_start
  147. performed_count = action_count - time_delta * rate_hz
  148. if performed_count < 0:
  149. performed_count = 0
  150. # Reset the start time and forgive all actions
  151. action_count = 0
  152. time_start = time_now_s
  153. # This check would be easier read as performed_count + n_actions > burst_count,
  154. # but performed_count might be a very precise float (with lots of numbers
  155. # following the point) in which case Python might round it up when adding it to
  156. # n_actions. Writing it this way ensures it doesn't happen.
  157. if performed_count > burst_count - n_actions:
  158. # Deny, we have exceeded our burst count
  159. allowed = False
  160. else:
  161. # We haven't reached our limit yet
  162. allowed = True
  163. action_count = action_count + n_actions
  164. if update:
  165. self.actions[key] = (action_count, time_start, rate_hz)
  166. if rate_hz > 0:
  167. # Find out when the count of existing actions expires
  168. time_allowed = time_start + (action_count - burst_count + 1) / rate_hz
  169. # Don't give back a time in the past
  170. if time_allowed < time_now_s:
  171. time_allowed = time_now_s
  172. else:
  173. # XXX: Why is this -1? This seems to only be used in
  174. # self.ratelimit. I guess so that clients get a time in the past and don't
  175. # feel afraid to try again immediately
  176. time_allowed = -1
  177. return allowed, time_allowed
  178. def record_action(
  179. self,
  180. requester: Optional[Requester],
  181. key: Optional[Hashable] = None,
  182. n_actions: int = 1,
  183. _time_now_s: Optional[float] = None,
  184. ) -> None:
  185. """Record that an action(s) took place, even if they violate the rate limit.
  186. This is useful for tracking the frequency of events that happen across
  187. federation which we still want to impose local rate limits on. For instance, if
  188. we are alice.com monitoring a particular room, we cannot prevent bob.com
  189. from joining users to that room. However, we can track the number of recent
  190. joins in the room and refuse to serve new joins ourselves if there have been too
  191. many in the room across both homeservers.
  192. Args:
  193. requester: The requester that is doing the action, if any.
  194. key: An arbitrary key used to classify an action. Defaults to the
  195. requester's user ID.
  196. n_actions: The number of times the user wants to do this action. If the user
  197. cannot do all of the actions, the user's action count is not incremented
  198. at all.
  199. _time_now_s: The current time. Optional, defaults to the current time according
  200. to self.clock. Only used by tests.
  201. """
  202. key = self._get_key(requester, key)
  203. time_now_s = _time_now_s if _time_now_s is not None else self.clock.time()
  204. action_count, time_start, rate_hz = self._get_action_counts(key, time_now_s)
  205. self.actions[key] = (action_count + n_actions, time_start, rate_hz)
  206. def _prune_message_counts(self, time_now_s: float) -> None:
  207. """Remove message count entries that have not exceeded their defined
  208. rate_hz limit
  209. Args:
  210. time_now_s: The current time
  211. """
  212. # We create a copy of the key list here as the dictionary is modified during
  213. # the loop
  214. for key in list(self.actions.keys()):
  215. action_count, time_start, rate_hz = self.actions[key]
  216. # Rate limit = "seconds since we started limiting this action" * rate_hz
  217. # If this limit has not been exceeded, wipe our record of this action
  218. time_delta = time_now_s - time_start
  219. if action_count - time_delta * rate_hz > 0:
  220. continue
  221. else:
  222. del self.actions[key]
  223. async def ratelimit(
  224. self,
  225. requester: Optional[Requester],
  226. key: Optional[Hashable] = None,
  227. rate_hz: Optional[float] = None,
  228. burst_count: Optional[int] = None,
  229. update: bool = True,
  230. n_actions: int = 1,
  231. _time_now_s: Optional[float] = None,
  232. ) -> None:
  233. """Checks if an action can be performed. If not, raises a LimitExceededError
  234. Checks if the user has ratelimiting disabled in the database by looking
  235. for null/zero values in the `ratelimit_override` table. (Non-zero
  236. values aren't honoured, as they're specific to the event sending
  237. ratelimiter, rather than all ratelimiters)
  238. Args:
  239. requester: The requester that is doing the action, if any. Used to check for
  240. if the user has ratelimits disabled.
  241. key: An arbitrary key used to classify an action. Defaults to the
  242. requester's user ID.
  243. rate_hz: The long term number of actions that can be performed in a second.
  244. Overrides the value set during instantiation if set.
  245. burst_count: How many actions that can be performed before being limited.
  246. Overrides the value set during instantiation if set.
  247. update: Whether to count this check as performing the action
  248. n_actions: The number of times the user wants to do this action. If the user
  249. cannot do all of the actions, the user's action count is not incremented
  250. at all.
  251. _time_now_s: The current time. Optional, defaults to the current time according
  252. to self.clock. Only used by tests.
  253. Raises:
  254. LimitExceededError: If an action could not be performed, along with the time in
  255. milliseconds until the action can be performed again
  256. """
  257. time_now_s = _time_now_s if _time_now_s is not None else self.clock.time()
  258. allowed, time_allowed = await self.can_do_action(
  259. requester,
  260. key,
  261. rate_hz=rate_hz,
  262. burst_count=burst_count,
  263. update=update,
  264. n_actions=n_actions,
  265. _time_now_s=time_now_s,
  266. )
  267. if not allowed:
  268. raise LimitExceededError(
  269. limiter_name=self._limiter_name,
  270. retry_after_ms=int(1000 * (time_allowed - time_now_s)),
  271. )
  272. class RequestRatelimiter:
  273. def __init__(
  274. self,
  275. store: DataStore,
  276. clock: Clock,
  277. rc_message: RatelimitSettings,
  278. rc_admin_redaction: Optional[RatelimitSettings],
  279. ):
  280. self.store = store
  281. self.clock = clock
  282. # The rate_hz and burst_count are overridden on a per-user basis
  283. self.request_ratelimiter = Ratelimiter(
  284. store=self.store,
  285. clock=self.clock,
  286. cfg=RatelimitSettings(key=rc_message.key, per_second=0, burst_count=0),
  287. )
  288. self._rc_message = rc_message
  289. # Check whether ratelimiting room admin message redaction is enabled
  290. # by the presence of rate limits in the config
  291. if rc_admin_redaction:
  292. self.admin_redaction_ratelimiter: Optional[Ratelimiter] = Ratelimiter(
  293. store=self.store,
  294. clock=self.clock,
  295. cfg=rc_admin_redaction,
  296. )
  297. else:
  298. self.admin_redaction_ratelimiter = None
  299. async def ratelimit(
  300. self,
  301. requester: Requester,
  302. update: bool = True,
  303. is_admin_redaction: bool = False,
  304. n_actions: int = 1,
  305. ) -> None:
  306. """Ratelimits requests.
  307. Args:
  308. requester
  309. update: Whether to record that a request is being processed.
  310. Set to False when doing multiple checks for one request (e.g.
  311. to check up front if we would reject the request), and set to
  312. True for the last call for a given request.
  313. is_admin_redaction: Whether this is a room admin/moderator
  314. redacting an event. If so then we may apply different
  315. ratelimits depending on config.
  316. n_actions: Multiplier for the number of actions to apply to the
  317. rate limiter at once.
  318. Raises:
  319. LimitExceededError if the request should be ratelimited
  320. """
  321. user_id = requester.user.to_string()
  322. # The AS user itself is never rate limited.
  323. app_service = self.store.get_app_service_by_user_id(user_id)
  324. if app_service is not None:
  325. return # do not ratelimit app service senders
  326. messages_per_second = self._rc_message.per_second
  327. burst_count = self._rc_message.burst_count
  328. # Check if there is a per user override in the DB.
  329. override = await self.store.get_ratelimit_for_user(user_id)
  330. if override:
  331. # If overridden with a null Hz then ratelimiting has been entirely
  332. # disabled for the user
  333. if not override.messages_per_second:
  334. return
  335. messages_per_second = override.messages_per_second
  336. burst_count = override.burst_count
  337. if is_admin_redaction and self.admin_redaction_ratelimiter:
  338. # If we have separate config for admin redactions, use a separate
  339. # ratelimiter as to not have user_ids clash
  340. await self.admin_redaction_ratelimiter.ratelimit(
  341. requester, update=update, n_actions=n_actions
  342. )
  343. else:
  344. # Override rate and burst count per-user
  345. await self.request_ratelimiter.ratelimit(
  346. requester,
  347. rate_hz=messages_per_second,
  348. burst_count=burst_count,
  349. update=update,
  350. n_actions=n_actions,
  351. )