The existing implementation of the `python_twisted_reactor_tick_time` metric is pretty useless, because it *only* measures the time taken to execute timed calls and callbacks from threads. That neglects everything that happens off the back of I/O, which is obviously quite a lot for us. To improve this, I've hooked into a different place in the reactor - in particular, where it calls `epoll`. That call is the only place it should wait for something to happen - the rest of the loop *should* be quick. I've also removed `python_twisted_reactor_pending_calls`, because I don't believe anyone ever looks at it, and it's a nuisance to populate.tags/v1.51.0rc1
@@ -0,0 +1 @@ | |||
Improve accuracy of `python_twisted_reactor_tick_time` prometheus metric. |
@@ -0,0 +1 @@ | |||
Remove `python_twisted_reactor_pending_calls` prometheus metric. |
@@ -92,22 +92,6 @@ new PromConsole.Graph({ | |||
}) | |||
</script> | |||
<h3>Pending calls per tick</h3> | |||
<div id="reactor_pending_calls"></div> | |||
<script> | |||
new PromConsole.Graph({ | |||
node: document.querySelector("#reactor_pending_calls"), | |||
expr: "rate(python_twisted_reactor_pending_calls_sum[30s]) / rate(python_twisted_reactor_pending_calls_count[30s])", | |||
name: "[[job]]-[[index]]", | |||
min: 0, | |||
renderer: "line", | |||
height: 150, | |||
yAxisFormatter: PromConsole.NumberFormatter.humanize, | |||
yHoverFormatter: PromConsole.NumberFormatter.humanize, | |||
yTitle: "Pending Calls" | |||
}) | |||
</script> | |||
<h1>Storage</h1> | |||
<h3>Queries</h3> | |||
@@ -12,15 +12,12 @@ | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import functools | |||
import itertools | |||
import logging | |||
import os | |||
import platform | |||
import threading | |||
import time | |||
from typing import ( | |||
Any, | |||
Callable, | |||
Dict, | |||
Generic, | |||
@@ -33,7 +30,6 @@ from typing import ( | |||
Type, | |||
TypeVar, | |||
Union, | |||
cast, | |||
) | |||
import attr | |||
@@ -44,11 +40,9 @@ from prometheus_client.core import ( | |||
GaugeMetricFamily, | |||
) | |||
from twisted.internet import reactor | |||
from twisted.internet.base import ReactorBase | |||
from twisted.python.threadpool import ThreadPool | |||
import synapse | |||
import synapse.metrics._reactor_metrics | |||
from synapse.metrics._exposition import ( | |||
MetricsResource, | |||
generate_latest, | |||
@@ -368,21 +362,6 @@ class CPUMetrics: | |||
REGISTRY.register(CPUMetrics()) | |||
# | |||
# Twisted reactor metrics | |||
# | |||
tick_time = Histogram( | |||
"python_twisted_reactor_tick_time", | |||
"Tick time of the Twisted reactor (sec)", | |||
buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5], | |||
) | |||
pending_calls_metric = Histogram( | |||
"python_twisted_reactor_pending_calls", | |||
"Pending calls", | |||
buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], | |||
) | |||
# | |||
# Federation Metrics | |||
# | |||
@@ -434,8 +413,6 @@ build_info.labels( | |||
" ".join([platform.system(), platform.release()]), | |||
).set(1) | |||
last_ticked = time.time() | |||
# 3PID send info | |||
threepid_send_requests = Histogram( | |||
"synapse_threepid_send_requests_with_tries", | |||
@@ -483,75 +460,6 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None: | |||
) | |||
class ReactorLastSeenMetric: | |||
def collect(self) -> Iterable[Metric]: | |||
cm = GaugeMetricFamily( | |||
"python_twisted_reactor_last_seen", | |||
"Seconds since the Twisted reactor was last seen", | |||
) | |||
cm.add_metric([], time.time() - last_ticked) | |||
yield cm | |||
REGISTRY.register(ReactorLastSeenMetric()) | |||
F = TypeVar("F", bound=Callable[..., Any]) | |||
def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F: | |||
@functools.wraps(func) | |||
def f(*args: Any, **kwargs: Any) -> Any: | |||
now = reactor.seconds() | |||
num_pending = 0 | |||
# _newTimedCalls is one long list of *all* pending calls. Below loop | |||
# is based off of impl of reactor.runUntilCurrent | |||
for delayed_call in reactor._newTimedCalls: | |||
if delayed_call.time > now: | |||
break | |||
if delayed_call.delayed_time > 0: | |||
continue | |||
num_pending += 1 | |||
num_pending += len(reactor.threadCallQueue) | |||
start = time.time() | |||
ret = func(*args, **kwargs) | |||
end = time.time() | |||
# record the amount of wallclock time spent running pending calls. | |||
# This is a proxy for the actual amount of time between reactor polls, | |||
# since about 25% of time is actually spent running things triggered by | |||
# I/O events, but that is harder to capture without rewriting half the | |||
# reactor. | |||
tick_time.observe(end - start) | |||
pending_calls_metric.observe(num_pending) | |||
# Update the time we last ticked, for the metric to test whether | |||
# Synapse's reactor has frozen | |||
global last_ticked | |||
last_ticked = end | |||
return ret | |||
return cast(F, f) | |||
try: | |||
# Ensure the reactor has all the attributes we expect | |||
reactor.seconds # type: ignore | |||
reactor.runUntilCurrent # type: ignore | |||
reactor._newTimedCalls # type: ignore | |||
reactor.threadCallQueue # type: ignore | |||
# runUntilCurrent is called when we have pending calls. It is called once | |||
# per iteratation after fd polling. | |||
reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore | |||
except AttributeError: | |||
pass | |||
__all__ = [ | |||
"MetricsResource", | |||
"generate_latest", | |||
@@ -0,0 +1,83 @@ | |||
# Copyright 2022 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import select | |||
import time | |||
from typing import Any, Iterable, List, Tuple | |||
from prometheus_client import Histogram, Metric | |||
from prometheus_client.core import REGISTRY, GaugeMetricFamily | |||
from twisted.internet import reactor | |||
# | |||
# Twisted reactor metrics | |||
# | |||
tick_time = Histogram( | |||
"python_twisted_reactor_tick_time", | |||
"Tick time of the Twisted reactor (sec)", | |||
buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5], | |||
) | |||
class EpollWrapper: | |||
"""a wrapper for an epoll object which records the time between polls""" | |||
def __init__(self, poller: "select.epoll"): | |||
self.last_polled = time.time() | |||
self._poller = poller | |||
def poll(self, *args, **kwargs) -> List[Tuple[int, int]]: # type: ignore[no-untyped-def] | |||
# record the time since poll() was last called. This gives a good proxy for | |||
# how long it takes to run everything in the reactor - ie, how long anything | |||
# waiting for the next tick will have to wait. | |||
tick_time.observe(time.time() - self.last_polled) | |||
ret = self._poller.poll(*args, **kwargs) | |||
self.last_polled = time.time() | |||
return ret | |||
def __getattr__(self, item: str) -> Any: | |||
return getattr(self._poller, item) | |||
class ReactorLastSeenMetric: | |||
def __init__(self, epoll_wrapper: EpollWrapper): | |||
self._epoll_wrapper = epoll_wrapper | |||
def collect(self) -> Iterable[Metric]: | |||
cm = GaugeMetricFamily( | |||
"python_twisted_reactor_last_seen", | |||
"Seconds since the Twisted reactor was last seen", | |||
) | |||
cm.add_metric([], time.time() - self._epoll_wrapper.last_polled) | |||
yield cm | |||
try: | |||
# if the reactor has a `_poller` attribute, which is an `epoll` object | |||
# (ie, it's an EPollReactor), we wrap the `epoll` with a thing that will | |||
# measure the time between ticks | |||
from select import epoll | |||
poller = reactor._poller # type: ignore[attr-defined] | |||
except (AttributeError, ImportError): | |||
pass | |||
else: | |||
if isinstance(poller, epoll): | |||
poller = EpollWrapper(poller) | |||
reactor._poller = poller # type: ignore[attr-defined] | |||
REGISTRY.register(ReactorLastSeenMetric(poller)) |