@@ -0,0 +1 @@ | |||
Add benchmarks for structured logging and improve output performance. |
@@ -256,6 +256,7 @@ class TerseJSONToTCPLogObserver(object): | |||
# transport is the same, just trigger a resumeProducing. | |||
if self._producer and r.transport is self._producer.transport: | |||
self._producer.resumeProducing() | |||
self._connection_waiter = None | |||
return | |||
# If the producer is still producing, stop it. | |||
@@ -0,0 +1,72 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2019 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import sys | |||
from twisted.internet import epollreactor | |||
from twisted.internet.main import installReactor | |||
from synapse.config.homeserver import HomeServerConfig | |||
from synapse.util import Clock | |||
from tests.utils import default_config, setup_test_homeserver | |||
async def make_homeserver(reactor, config=None): | |||
""" | |||
Make a Homeserver suitable for running benchmarks against. | |||
Args: | |||
reactor: A Twisted reactor to run under. | |||
config: A HomeServerConfig to use, or None. | |||
""" | |||
cleanup_tasks = [] | |||
clock = Clock(reactor) | |||
if not config: | |||
config = default_config("test") | |||
config_obj = HomeServerConfig() | |||
config_obj.parse_config_dict(config, "", "") | |||
hs = await setup_test_homeserver( | |||
cleanup_tasks.append, config=config_obj, reactor=reactor, clock=clock | |||
) | |||
stor = hs.get_datastore() | |||
# Run the database background updates. | |||
if hasattr(stor, "do_next_background_update"): | |||
while not await stor.has_completed_background_updates(): | |||
await stor.do_next_background_update(1) | |||
def cleanup(): | |||
for i in cleanup_tasks: | |||
i() | |||
return hs, clock.sleep, cleanup | |||
def make_reactor(): | |||
""" | |||
Instantiate and install a Twisted reactor suitable for testing (i.e. not the | |||
default global one). | |||
""" | |||
reactor = epollreactor.EPollReactor() | |||
if "twisted.internet.reactor" in sys.modules: | |||
del sys.modules["twisted.internet.reactor"] | |||
installReactor(reactor) | |||
return reactor |
@@ -0,0 +1,90 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2019 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import sys | |||
from contextlib import redirect_stderr | |||
from io import StringIO | |||
import pyperf | |||
from synmark import make_reactor | |||
from synmark.suites import SUITES | |||
from twisted.internet.defer import ensureDeferred | |||
from twisted.logger import globalLogBeginner, textFileLogObserver | |||
from twisted.python.failure import Failure | |||
from tests.utils import setupdb | |||
def make_test(main): | |||
""" | |||
Take a benchmark function and wrap it in a reactor start and stop. | |||
""" | |||
def _main(loops): | |||
reactor = make_reactor() | |||
file_out = StringIO() | |||
with redirect_stderr(file_out): | |||
d = ensureDeferred(main(reactor, loops)) | |||
def on_done(_): | |||
if isinstance(_, Failure): | |||
_.printTraceback() | |||
print(file_out.getvalue()) | |||
reactor.stop() | |||
return _ | |||
d.addBoth(on_done) | |||
reactor.run() | |||
return d.result | |||
return _main | |||
if __name__ == "__main__": | |||
def add_cmdline_args(cmd, args): | |||
if args.log: | |||
cmd.extend(["--log"]) | |||
runner = pyperf.Runner( | |||
processes=3, min_time=2, show_name=True, add_cmdline_args=add_cmdline_args | |||
) | |||
runner.argparser.add_argument("--log", action="store_true") | |||
runner.parse_args() | |||
orig_loops = runner.args.loops | |||
runner.args.inherit_environ = ["SYNAPSE_POSTGRES"] | |||
if runner.args.worker: | |||
if runner.args.log: | |||
globalLogBeginner.beginLoggingTo( | |||
[textFileLogObserver(sys.__stdout__)], redirectStandardIO=False | |||
) | |||
setupdb() | |||
for suite, loops in SUITES: | |||
if loops: | |||
runner.args.loops = loops | |||
else: | |||
runner.args.loops = orig_loops | |||
loops = "auto" | |||
runner.bench_time_func( | |||
suite.__name__ + "_" + str(loops), make_test(suite.main), | |||
) |
@@ -0,0 +1,3 @@ | |||
from . import logging | |||
SUITES = [(logging, 1000), (logging, 10000), (logging, None)] |
@@ -0,0 +1,118 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2019 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import warnings | |||
from io import StringIO | |||
from mock import Mock | |||
from pyperf import perf_counter | |||
from synmark import make_homeserver | |||
from twisted.internet.defer import Deferred | |||
from twisted.internet.protocol import ServerFactory | |||
from twisted.logger import LogBeginner, Logger, LogPublisher | |||
from twisted.protocols.basic import LineOnlyReceiver | |||
from synapse.logging._structured import setup_structured_logging | |||
class LineCounter(LineOnlyReceiver): | |||
delimiter = b"\n" | |||
def __init__(self, *args, **kwargs): | |||
self.count = 0 | |||
super().__init__(*args, **kwargs) | |||
def lineReceived(self, line): | |||
self.count += 1 | |||
if self.count >= self.factory.wait_for and self.factory.on_done: | |||
on_done = self.factory.on_done | |||
self.factory.on_done = None | |||
on_done.callback(True) | |||
async def main(reactor, loops): | |||
""" | |||
Benchmark how long it takes to send `loops` messages. | |||
""" | |||
servers = [] | |||
def protocol(): | |||
p = LineCounter() | |||
servers.append(p) | |||
return p | |||
logger_factory = ServerFactory.forProtocol(protocol) | |||
logger_factory.wait_for = loops | |||
logger_factory.on_done = Deferred() | |||
port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1") | |||
hs, wait, cleanup = await make_homeserver(reactor) | |||
errors = StringIO() | |||
publisher = LogPublisher() | |||
mock_sys = Mock() | |||
beginner = LogBeginner( | |||
publisher, errors, mock_sys, warnings, initialBufferSize=loops | |||
) | |||
log_config = { | |||
"loggers": {"synapse": {"level": "DEBUG"}}, | |||
"drains": { | |||
"tersejson": { | |||
"type": "network_json_terse", | |||
"host": "127.0.0.1", | |||
"port": port.getHost().port, | |||
"maximum_buffer": 100, | |||
} | |||
}, | |||
} | |||
logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher) | |||
logging_system = setup_structured_logging( | |||
hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False | |||
) | |||
# Wait for it to connect... | |||
await logging_system._observers[0]._service.whenConnected() | |||
start = perf_counter() | |||
# Send a bunch of useful messages | |||
for i in range(0, loops): | |||
logger.info("test message %s" % (i,)) | |||
if ( | |||
len(logging_system._observers[0]._buffer) | |||
== logging_system._observers[0].maximum_buffer | |||
): | |||
while ( | |||
len(logging_system._observers[0]._buffer) | |||
> logging_system._observers[0].maximum_buffer / 2 | |||
): | |||
await wait(0.01) | |||
await logger_factory.on_done | |||
end = perf_counter() - start | |||
logging_system.stop() | |||
port.stopListening() | |||
cleanup() | |||
return end |
@@ -102,6 +102,15 @@ commands = | |||
{envbindir}/coverage run "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:} | |||
[testenv:benchmark] | |||
deps = | |||
{[base]deps} | |||
pyperf | |||
setenv = | |||
SYNAPSE_POSTGRES = 1 | |||
commands = | |||
python -m synmark {posargs:} | |||
[testenv:packaging] | |||
skip_install=True | |||
deps = | |||