選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。
 
 
 
 
 
 

241 行
7.5 KiB

  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from functools import wraps
  16. from types import TracebackType
  17. from typing import Awaitable, Callable, Dict, Generator, Optional, Type, TypeVar
  18. from prometheus_client import CollectorRegistry, Counter, Metric
  19. from typing_extensions import Concatenate, ParamSpec, Protocol
  20. from synapse.logging.context import (
  21. ContextResourceUsage,
  22. LoggingContext,
  23. current_context,
  24. )
  25. from synapse.metrics import InFlightGauge
  26. from synapse.util import Clock
  27. logger = logging.getLogger(__name__)
  28. block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"])
  29. block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"])
  30. block_ru_utime = Counter(
  31. "synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"]
  32. )
  33. block_ru_stime = Counter(
  34. "synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"]
  35. )
  36. block_db_txn_count = Counter(
  37. "synapse_util_metrics_block_db_txn_count", "", ["block_name"]
  38. )
  39. # seconds spent waiting for db txns, excluding scheduling time, in this block
  40. block_db_txn_duration = Counter(
  41. "synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"]
  42. )
  43. # seconds spent waiting for a db connection, in this block
  44. block_db_sched_duration = Counter(
  45. "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]
  46. )
  47. # This is dynamically created in InFlightGauge.__init__.
  48. class _InFlightMetric(Protocol):
  49. real_time_max: float
  50. real_time_sum: float
  51. # Tracks the number of blocks currently active
  52. in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge(
  53. "synapse_util_metrics_block_in_flight",
  54. "",
  55. labels=["block_name"],
  56. sub_metrics=["real_time_max", "real_time_sum"],
  57. )
  58. P = ParamSpec("P")
  59. R = TypeVar("R")
  60. class HasClock(Protocol):
  61. clock: Clock
  62. def measure_func(
  63. name: Optional[str] = None,
  64. ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
  65. """Decorate an async method with a `Measure` context manager.
  66. The Measure is created using `self.clock`; it should only be used to decorate
  67. methods in classes defining an instance-level `clock` attribute.
  68. Usage:
  69. @measure_func()
  70. async def foo(...):
  71. ...
  72. Which is analogous to:
  73. async def foo(...):
  74. with Measure(...):
  75. ...
  76. """
  77. def wrapper(
  78. func: Callable[Concatenate[HasClock, P], Awaitable[R]]
  79. ) -> Callable[P, Awaitable[R]]:
  80. block_name = func.__name__ if name is None else name
  81. @wraps(func)
  82. async def measured_func(self: HasClock, *args: P.args, **kwargs: P.kwargs) -> R:
  83. with Measure(self.clock, block_name):
  84. r = await func(self, *args, **kwargs)
  85. return r
  86. # There are some shenanigans here, because we're decorating a method but
  87. # explicitly making use of the `self` parameter. The key thing here is that the
  88. # return type within the return type for `measure_func` itself describes how the
  89. # decorated function will be called.
  90. return measured_func # type: ignore[return-value]
  91. return wrapper # type: ignore[return-value]
  92. class Measure:
  93. __slots__ = [
  94. "clock",
  95. "name",
  96. "_logging_context",
  97. "start",
  98. ]
  99. def __init__(self, clock: Clock, name: str) -> None:
  100. """
  101. Args:
  102. clock: An object with a "time()" method, which returns the current
  103. time in seconds.
  104. name: The name of the metric to report.
  105. """
  106. self.clock = clock
  107. self.name = name
  108. curr_context = current_context()
  109. if not curr_context:
  110. logger.warning(
  111. "Starting metrics collection %r from sentinel context: metrics will be lost",
  112. name,
  113. )
  114. parent_context = None
  115. else:
  116. assert isinstance(curr_context, LoggingContext)
  117. parent_context = curr_context
  118. self._logging_context = LoggingContext(str(curr_context), parent_context)
  119. self.start: Optional[float] = None
  120. def __enter__(self) -> "Measure":
  121. if self.start is not None:
  122. raise RuntimeError("Measure() objects cannot be re-used")
  123. self.start = self.clock.time()
  124. self._logging_context.__enter__()
  125. in_flight.register((self.name,), self._update_in_flight)
  126. logger.debug("Entering block %s", self.name)
  127. return self
  128. def __exit__(
  129. self,
  130. exc_type: Optional[Type[BaseException]],
  131. exc_val: Optional[BaseException],
  132. exc_tb: Optional[TracebackType],
  133. ) -> None:
  134. if self.start is None:
  135. raise RuntimeError("Measure() block exited without being entered")
  136. logger.debug("Exiting block %s", self.name)
  137. duration = self.clock.time() - self.start
  138. usage = self.get_resource_usage()
  139. in_flight.unregister((self.name,), self._update_in_flight)
  140. self._logging_context.__exit__(exc_type, exc_val, exc_tb)
  141. try:
  142. block_counter.labels(self.name).inc()
  143. block_timer.labels(self.name).inc(duration)
  144. block_ru_utime.labels(self.name).inc(usage.ru_utime)
  145. block_ru_stime.labels(self.name).inc(usage.ru_stime)
  146. block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
  147. block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
  148. block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
  149. except ValueError:
  150. logger.warning("Failed to save metrics! Usage: %s", usage)
  151. def get_resource_usage(self) -> ContextResourceUsage:
  152. """Get the resources used within this Measure block
  153. If the Measure block is still active, returns the resource usage so far.
  154. """
  155. return self._logging_context.get_resource_usage()
  156. def _update_in_flight(self, metrics: _InFlightMetric) -> None:
  157. """Gets called when processing in flight metrics"""
  158. assert self.start is not None
  159. duration = self.clock.time() - self.start
  160. metrics.real_time_max = max(metrics.real_time_max, duration)
  161. metrics.real_time_sum += duration
  162. # TODO: Add other in flight metrics.
  163. class DynamicCollectorRegistry(CollectorRegistry):
  164. """
  165. Custom Prometheus Collector registry that calls a hook first, allowing you
  166. to update metrics on-demand.
  167. Don't forget to register this registry with the main registry!
  168. """
  169. def __init__(self) -> None:
  170. super().__init__()
  171. self._pre_update_hooks: Dict[str, Callable[[], None]] = {}
  172. def collect(self) -> Generator[Metric, None, None]:
  173. """
  174. Collects metrics, calling pre-update hooks first.
  175. """
  176. for pre_update_hook in self._pre_update_hooks.values():
  177. pre_update_hook()
  178. yield from super().collect()
  179. def register_hook(self, metric_name: str, hook: Callable[[], None]) -> None:
  180. """
  181. Registers a hook that is called before metric collection.
  182. """
  183. self._pre_update_hooks[metric_name] = hook