@@ -0,0 +1 @@ | |||
Improve type hints. |
@@ -32,6 +32,7 @@ files = | |||
docker/, | |||
scripts-dev/, | |||
synapse/, | |||
synmark/, | |||
tests/, | |||
build_rust.py | |||
@@ -80,6 +81,9 @@ ignore_missing_imports = True | |||
[mypy-pympler.*] | |||
ignore_missing_imports = True | |||
[mypy-pyperf.*] | |||
ignore_missing_imports = True | |||
[mypy-rust_python_jaeger_reporter.*] | |||
ignore_missing_imports = True | |||
@@ -13,15 +13,18 @@ | |||
# limitations under the License. | |||
import sys | |||
from typing import cast | |||
from synapse.types import ISynapseReactor | |||
try: | |||
from twisted.internet.epollreactor import EPollReactor as Reactor | |||
except ImportError: | |||
from twisted.internet.pollreactor import PollReactor as Reactor | |||
from twisted.internet.pollreactor import PollReactor as Reactor # type: ignore[assignment] | |||
from twisted.internet.main import installReactor | |||
def make_reactor(): | |||
def make_reactor() -> ISynapseReactor: | |||
""" | |||
Instantiate and install a Twisted reactor suitable for testing (i.e. not the | |||
default global one). | |||
@@ -32,4 +35,4 @@ def make_reactor(): | |||
del sys.modules["twisted.internet.reactor"] | |||
installReactor(reactor) | |||
return reactor | |||
return cast(ISynapseReactor, reactor) |
@@ -12,9 +12,10 @@ | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import sys | |||
from argparse import REMAINDER | |||
from argparse import REMAINDER, Namespace | |||
from contextlib import redirect_stderr | |||
from io import StringIO | |||
from typing import Any, Callable, Coroutine, List, TypeVar | |||
import pyperf | |||
@@ -22,44 +23,50 @@ from twisted.internet.defer import Deferred, ensureDeferred | |||
from twisted.logger import globalLogBeginner, textFileLogObserver | |||
from twisted.python.failure import Failure | |||
from synapse.types import ISynapseReactor | |||
from synmark import make_reactor | |||
from synmark.suites import SUITES | |||
from tests.utils import setupdb | |||
T = TypeVar("T") | |||
def make_test(main): | |||
def make_test( | |||
main: Callable[[ISynapseReactor, int], Coroutine[Any, Any, float]] | |||
) -> Callable[[int], float]: | |||
""" | |||
Take a benchmark function and wrap it in a reactor start and stop. | |||
""" | |||
def _main(loops): | |||
def _main(loops: int) -> float: | |||
reactor = make_reactor() | |||
file_out = StringIO() | |||
with redirect_stderr(file_out): | |||
d = Deferred() | |||
d: "Deferred[float]" = Deferred() | |||
d.addCallback(lambda _: ensureDeferred(main(reactor, loops))) | |||
def on_done(_): | |||
if isinstance(_, Failure): | |||
_.printTraceback() | |||
def on_done(res: T) -> T: | |||
if isinstance(res, Failure): | |||
res.printTraceback() | |||
print(file_out.getvalue()) | |||
reactor.stop() | |||
return _ | |||
return res | |||
d.addBoth(on_done) | |||
reactor.callWhenRunning(lambda: d.callback(True)) | |||
reactor.run() | |||
return d.result | |||
# mypy thinks this is an object for some reason. | |||
return d.result # type: ignore[return-value] | |||
return _main | |||
if __name__ == "__main__": | |||
def add_cmdline_args(cmd, args): | |||
def add_cmdline_args(cmd: List[str], args: Namespace) -> None: | |||
if args.log: | |||
cmd.extend(["--log"]) | |||
cmd.extend(args.tests) | |||
@@ -82,17 +89,26 @@ if __name__ == "__main__": | |||
setupdb() | |||
if runner.args.tests: | |||
SUITES = list( | |||
filter(lambda x: x[0].__name__.split(".")[-1] in runner.args.tests, SUITES) | |||
existing_suites = {s.__name__.split(".")[-1] for s, _ in SUITES} | |||
for test in runner.args.tests: | |||
if test not in existing_suites: | |||
print(f"Test suite {test} does not exist.") | |||
exit(-1) | |||
suites = list( | |||
filter(lambda t: t[0].__name__.split(".")[-1] in runner.args.tests, SUITES) | |||
) | |||
else: | |||
suites = SUITES | |||
for suite, loops in SUITES: | |||
for suite, loops in suites: | |||
if loops: | |||
runner.args.loops = loops | |||
loops_desc = str(loops) | |||
else: | |||
runner.args.loops = orig_loops | |||
loops = "auto" | |||
loops_desc = "auto" | |||
runner.bench_time_func( | |||
suite.__name__ + "_" + str(loops), | |||
suite.__name__ + "_" + loops_desc, | |||
make_test(suite.main), | |||
) |
@@ -11,14 +11,16 @@ | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import logging | |||
import logging.config | |||
import warnings | |||
from io import StringIO | |||
from typing import Optional | |||
from unittest.mock import Mock | |||
from pyperf import perf_counter | |||
from twisted.internet.address import IPv4Address, IPv6Address | |||
from twisted.internet.defer import Deferred | |||
from twisted.internet.protocol import ServerFactory | |||
from twisted.logger import LogBeginner, LogPublisher | |||
@@ -26,45 +28,53 @@ from twisted.protocols.basic import LineOnlyReceiver | |||
from synapse.config.logger import _setup_stdlib_logging | |||
from synapse.logging import RemoteHandler | |||
from synapse.synapse_rust import reset_logging_config | |||
from synapse.types import ISynapseReactor | |||
from synapse.util import Clock | |||
class LineCounter(LineOnlyReceiver): | |||
delimiter = b"\n" | |||
count = 0 | |||
def __init__(self, *args, **kwargs): | |||
self.count = 0 | |||
super().__init__(*args, **kwargs) | |||
def lineReceived(self, line): | |||
def lineReceived(self, line: bytes) -> None: | |||
self.count += 1 | |||
assert isinstance(self.factory, Factory) | |||
if self.count >= self.factory.wait_for and self.factory.on_done: | |||
on_done = self.factory.on_done | |||
self.factory.on_done = None | |||
on_done.callback(True) | |||
async def main(reactor, loops): | |||
class Factory(ServerFactory): | |||
protocol = LineCounter | |||
wait_for: int | |||
on_done: Optional[Deferred] | |||
async def main(reactor: ISynapseReactor, loops: int) -> float: | |||
""" | |||
Benchmark how long it takes to send `loops` messages. | |||
""" | |||
servers = [] | |||
def protocol(): | |||
p = LineCounter() | |||
servers.append(p) | |||
return p | |||
logger_factory = ServerFactory.forProtocol(protocol) | |||
logger_factory = Factory() | |||
logger_factory.wait_for = loops | |||
logger_factory.on_done = Deferred() | |||
port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1") | |||
port = reactor.listenTCP(0, logger_factory, backlog=50, interface="127.0.0.1") | |||
# A fake homeserver config. | |||
class Config: | |||
server_name = "synmark-" + str(loops) | |||
no_redirect_stdio = True | |||
class server: | |||
server_name = "synmark-" + str(loops) | |||
# This odd construct is to avoid mypy thinking that logging escapes the | |||
# scope of Config. | |||
class _logging: | |||
no_redirect_stdio = True | |||
logging = _logging | |||
hs_config = Config() | |||
@@ -78,28 +88,34 @@ async def main(reactor, loops): | |||
publisher, errors, mock_sys, warnings, initialBufferSize=loops | |||
) | |||
address = port.getHost() | |||
assert isinstance(address, (IPv4Address, IPv6Address)) | |||
log_config = { | |||
"version": 1, | |||
"loggers": {"synapse": {"level": "DEBUG", "handlers": ["tersejson"]}}, | |||
"loggers": {"synapse": {"level": "DEBUG", "handlers": ["remote"]}}, | |||
"formatters": {"tersejson": {"class": "synapse.logging.TerseJsonFormatter"}}, | |||
"handlers": { | |||
"tersejson": { | |||
"remote": { | |||
"class": "synapse.logging.RemoteHandler", | |||
"host": "127.0.0.1", | |||
"port": port.getHost().port, | |||
"formatter": "tersejson", | |||
"host": address.host, | |||
"port": address.port, | |||
"maximum_buffer": 100, | |||
"_reactor": reactor, | |||
} | |||
}, | |||
} | |||
logger = logging.getLogger("synapse.logging.test_terse_json") | |||
logger = logging.getLogger("synapse") | |||
_setup_stdlib_logging( | |||
hs_config, | |||
log_config, | |||
hs_config, # type: ignore[arg-type] | |||
None, | |||
logBeginner=beginner, | |||
) | |||
# Force a new logging config without having to load it from a file. | |||
logging.config.dictConfig(log_config) | |||
reset_logging_config() | |||
# Wait for it to connect... | |||
for handler in logging.getLogger("synapse").handlers: | |||
if isinstance(handler, RemoteHandler): | |||
@@ -107,7 +123,7 @@ async def main(reactor, loops): | |||
else: | |||
raise RuntimeError("Improperly configured: no RemoteHandler found.") | |||
await handler._service.whenConnected() | |||
await handler._service.whenConnected(failAfterFailures=10) | |||
start = perf_counter() | |||
@@ -14,14 +14,15 @@ | |||
from pyperf import perf_counter | |||
from synapse.types import ISynapseReactor | |||
from synapse.util.caches.lrucache import LruCache | |||
async def main(reactor, loops): | |||
async def main(reactor: ISynapseReactor, loops: int) -> float: | |||
""" | |||
Benchmark `loops` number of insertions into LruCache without eviction. | |||
""" | |||
cache = LruCache(loops) | |||
cache: LruCache[int, bool] = LruCache(loops) | |||
start = perf_counter() | |||
@@ -14,15 +14,16 @@ | |||
from pyperf import perf_counter | |||
from synapse.types import ISynapseReactor | |||
from synapse.util.caches.lrucache import LruCache | |||
async def main(reactor, loops): | |||
async def main(reactor: ISynapseReactor, loops: int) -> float: | |||
""" | |||
Benchmark `loops` number of insertions into LruCache where half of them are | |||
evicted. | |||
""" | |||
cache = LruCache(loops // 2) | |||
cache: LruCache[int, bool] = LruCache(loops // 2) | |||
start = perf_counter() | |||