@@ -6,6 +6,7 @@ services: | |||
image: postgres:9.5 | |||
environment: | |||
POSTGRES_PASSWORD: postgres | |||
command: -c fsync=off | |||
testenv: | |||
image: python:3.5 | |||
@@ -6,6 +6,7 @@ services: | |||
image: postgres:11 | |||
environment: | |||
POSTGRES_PASSWORD: postgres | |||
command: -c fsync=off | |||
testenv: | |||
image: python:3.7 | |||
@@ -6,6 +6,7 @@ services: | |||
image: postgres:9.5 | |||
environment: | |||
POSTGRES_PASSWORD: postgres | |||
command: -c fsync=off | |||
testenv: | |||
image: python:3.7 | |||
@@ -45,8 +45,15 @@ steps: | |||
- docker#v3.0.1: | |||
image: "python:3.6" | |||
- wait | |||
- command: | |||
- "python -m pip install tox" | |||
- "tox -e mypy" | |||
label: ":mypy: mypy" | |||
plugins: | |||
- docker#v3.0.1: | |||
image: "python:3.5" | |||
- wait | |||
- command: | |||
- "apt-get update && apt-get install -y python3.5 python3.5-dev python3-pip libxml2-dev libxslt-dev zlib1g-dev" | |||
@@ -55,6 +62,7 @@ steps: | |||
label: ":python: 3.5 / SQLite / Old Deps" | |||
env: | |||
TRIAL_FLAGS: "-j 2" | |||
LANG: "C.UTF-8" | |||
plugins: | |||
- docker#v3.0.1: | |||
image: "ubuntu:xenial" # We use xenail to get an old sqlite and python | |||
@@ -20,6 +20,7 @@ _trial_temp*/ | |||
/*.signing.key | |||
/env/ | |||
/homeserver*.yaml | |||
/logs | |||
/media_store/ | |||
/uploads | |||
@@ -29,8 +30,9 @@ _trial_temp*/ | |||
/.vscode/ | |||
# build products | |||
/.coverage* | |||
!/.coveragerc | |||
/.coverage* | |||
/.mypy_cache/ | |||
/.tox | |||
/build/ | |||
/coverage.* | |||
@@ -38,4 +40,3 @@ _trial_temp*/ | |||
/docs/build/ | |||
/htmlcov | |||
/pip-wheel-metadata/ | |||
@@ -0,0 +1 @@ | |||
Lay the groundwork for structured logging output. |
@@ -0,0 +1,83 @@ | |||
# Structured Logging | |||
A structured logging system can be useful when your logs are destined for a machine to parse and process. By maintaining its machine-readable characteristics, it enables more efficient searching and aggregations when consumed by software such as the "ELK stack". | |||
Synapse's structured logging system is configured via the file that Synapse's `log_config` config option points to. The file must be YAML and contain `structured: true`. It must contain a list of "drains" (places where logs go to). | |||
A structured logging configuration looks similar to the following: | |||
```yaml | |||
structured: true | |||
loggers: | |||
synapse: | |||
level: INFO | |||
synapse.storage.SQL: | |||
level: WARNING | |||
drains: | |||
console: | |||
type: console | |||
location: stdout | |||
file: | |||
type: file_json | |||
location: homeserver.log | |||
``` | |||
The above logging config will set Synapse as 'INFO' logging level by default, with the SQL layer at 'WARNING', and will have two logging drains (to the console and to a file, stored as JSON). | |||
## Drain Types | |||
Drain types can be specified by the `type` key. | |||
### `console` | |||
Outputs human-readable logs to the console. | |||
Arguments: | |||
- `location`: Either `stdout` or `stderr`. | |||
### `console_json` | |||
Outputs machine-readable JSON logs to the console. | |||
Arguments: | |||
- `location`: Either `stdout` or `stderr`. | |||
### `console_json_terse` | |||
Outputs machine-readable JSON logs to the console, separated by newlines. This | |||
format is not designed to be read and re-formatted into human-readable text, but | |||
is optimal for a logging aggregation system. | |||
Arguments: | |||
- `location`: Either `stdout` or `stderr`. | |||
### `file` | |||
Outputs human-readable logs to a file. | |||
Arguments: | |||
- `location`: An absolute path to the file to log to. | |||
### `file_json` | |||
Outputs machine-readable logs to a file. | |||
Arguments: | |||
- `location`: An absolute path to the file to log to. | |||
### `network_json_terse` | |||
Delivers machine-readable JSON logs to a log aggregator over TCP. This is | |||
compatible with LogStash's TCP input with the codec set to `json_lines`. | |||
Arguments: | |||
- `host`: Hostname or IP address of the log aggregator. | |||
- `port`: Numerical port to contact on the host. |
@@ -36,18 +36,20 @@ from synapse.util.versionstring import get_version_string | |||
logger = logging.getLogger(__name__) | |||
# list of tuples of function, args list, kwargs dict | |||
_sighup_callbacks = [] | |||
def register_sighup(func): | |||
def register_sighup(func, *args, **kwargs): | |||
""" | |||
Register a function to be called when a SIGHUP occurs. | |||
Args: | |||
func (function): Function to be called when sent a SIGHUP signal. | |||
Will be called with a single argument, the homeserver. | |||
Will be called with a single default argument, the homeserver. | |||
*args, **kwargs: args and kwargs to be passed to the target function. | |||
""" | |||
_sighup_callbacks.append(func) | |||
_sighup_callbacks.append((func, args, kwargs)) | |||
def start_worker_reactor(appname, config, run_command=reactor.run): | |||
@@ -248,8 +250,8 @@ def start(hs, listeners=None): | |||
# we're not using systemd. | |||
sdnotify(b"RELOADING=1") | |||
for i in _sighup_callbacks: | |||
i(hs) | |||
for i, args, kwargs in _sighup_callbacks: | |||
i(hs, *args, **kwargs) | |||
sdnotify(b"READY=1") | |||
@@ -227,8 +227,6 @@ def start(config_options): | |||
config.start_pushers = False | |||
config.send_federation = False | |||
setup_logging(config, use_worker_options=True) | |||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts | |||
database_engine = create_engine(config.database_config) | |||
@@ -241,6 +239,8 @@ def start(config_options): | |||
database_engine=database_engine, | |||
) | |||
setup_logging(ss, config, use_worker_options=True) | |||
ss.setup() | |||
# We use task.react as the basic run command as it correctly handles tearing | |||
@@ -141,8 +141,6 @@ def start(config_options): | |||
assert config.worker_app == "synapse.app.appservice" | |||
setup_logging(config, use_worker_options=True) | |||
events.USE_FROZEN_DICTS = config.use_frozen_dicts | |||
database_engine = create_engine(config.database_config) | |||
@@ -167,6 +165,8 @@ def start(config_options): | |||
database_engine=database_engine, | |||
) | |||
setup_logging(ps, config, use_worker_options=True) | |||
ps.setup() | |||
reactor.addSystemEventTrigger( | |||
"before", "startup", _base.start, ps, config.worker_listeners | |||
@@ -179,8 +179,6 @@ def start(config_options): | |||
assert config.worker_app == "synapse.app.client_reader" | |||
setup_logging(config, use_worker_options=True) | |||
events.USE_FROZEN_DICTS = config.use_frozen_dicts | |||
database_engine = create_engine(config.database_config) | |||
@@ -193,6 +191,8 @@ def start(config_options): | |||
database_engine=database_engine, | |||
) | |||
setup_logging(ss, config, use_worker_options=True) | |||
ss.setup() | |||
reactor.addSystemEventTrigger( | |||
"before", "startup", _base.start, ss, config.worker_listeners | |||
@@ -175,8 +175,6 @@ def start(config_options): | |||
assert config.worker_replication_http_port is not None | |||
setup_logging(config, use_worker_options=True) | |||
# This should only be done on the user directory worker or the master | |||
config.update_user_directory = False | |||
@@ -192,6 +190,8 @@ def start(config_options): | |||
database_engine=database_engine, | |||
) | |||
setup_logging(ss, config, use_worker_options=True) | |||
ss.setup() | |||
reactor.addSystemEventTrigger( | |||
"before", "startup", _base.start, ss, config.worker_listeners | |||
@@ -160,8 +160,6 @@ def start(config_options): | |||
assert config.worker_app == "synapse.app.federation_reader" | |||
setup_logging(config, use_worker_options=True) | |||
events.USE_FROZEN_DICTS = config.use_frozen_dicts | |||
database_engine = create_engine(config.database_config) | |||
@@ -174,6 +172,8 @@ def start(config_options): | |||
database_engine=database_engine, | |||
) | |||
setup_logging(ss, config, use_worker_options=True) | |||
ss.setup() | |||
reactor.addSystemEventTrigger( | |||
"before", "startup", _base.start, ss, config.worker_listeners | |||
@@ -171,8 +171,6 @@ def start(config_options): | |||
assert config.worker_app == "synapse.app.federation_sender" | |||
setup_logging(config, use_worker_options=True) | |||
events.USE_FROZEN_DICTS = config.use_frozen_dicts | |||
database_engine = create_engine(config.database_config) | |||
@@ -197,6 +195,8 @@ def start(config_options): | |||
database_engine=database_engine, | |||
) | |||
setup_logging(ss, config, use_worker_options=True) | |||
ss.setup() | |||
reactor.addSystemEventTrigger( | |||
"before", "startup", _base.start, ss, config.worker_listeners | |||
@@ -232,8 +232,6 @@ def start(config_options): | |||
assert config.worker_main_http_uri is not None | |||
setup_logging(config, use_worker_options=True) | |||
events.USE_FROZEN_DICTS = config.use_frozen_dicts | |||
database_engine = create_engine(config.database_config) | |||
@@ -246,6 +244,8 @@ def start(config_options): | |||
database_engine=database_engine, | |||
) | |||
setup_logging(ss, config, use_worker_options=True) | |||
ss.setup() | |||
reactor.addSystemEventTrigger( | |||
"before", "startup", _base.start, ss, config.worker_listeners | |||
@@ -341,8 +341,6 @@ def setup(config_options): | |||
# generating config files and shouldn't try to continue. | |||
sys.exit(0) | |||
synapse.config.logger.setup_logging(config, use_worker_options=False) | |||
events.USE_FROZEN_DICTS = config.use_frozen_dicts | |||
database_engine = create_engine(config.database_config) | |||
@@ -356,6 +354,8 @@ def setup(config_options): | |||
database_engine=database_engine, | |||
) | |||
synapse.config.logger.setup_logging(hs, config, use_worker_options=False) | |||
logger.info("Preparing database: %s...", config.database_config["name"]) | |||
try: | |||
@@ -155,8 +155,6 @@ def start(config_options): | |||
"Please add ``enable_media_repo: false`` to the main config\n" | |||
) | |||
setup_logging(config, use_worker_options=True) | |||
events.USE_FROZEN_DICTS = config.use_frozen_dicts | |||
database_engine = create_engine(config.database_config) | |||
@@ -169,6 +167,8 @@ def start(config_options): | |||
database_engine=database_engine, | |||
) | |||
setup_logging(ss, config, use_worker_options=True) | |||
ss.setup() | |||
reactor.addSystemEventTrigger( | |||
"before", "startup", _base.start, ss, config.worker_listeners | |||
@@ -184,8 +184,6 @@ def start(config_options): | |||
assert config.worker_app == "synapse.app.pusher" | |||
setup_logging(config, use_worker_options=True) | |||
events.USE_FROZEN_DICTS = config.use_frozen_dicts | |||
if config.start_pushers: | |||
@@ -210,6 +208,8 @@ def start(config_options): | |||
database_engine=database_engine, | |||
) | |||
setup_logging(ps, config, use_worker_options=True) | |||
ps.setup() | |||
def start(): | |||
@@ -435,8 +435,6 @@ def start(config_options): | |||
assert config.worker_app == "synapse.app.synchrotron" | |||
setup_logging(config, use_worker_options=True) | |||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts | |||
database_engine = create_engine(config.database_config) | |||
@@ -450,6 +448,8 @@ def start(config_options): | |||
application_service_handler=SynchrotronApplicationService(), | |||
) | |||
setup_logging(ss, config, use_worker_options=True) | |||
ss.setup() | |||
reactor.addSystemEventTrigger( | |||
"before", "startup", _base.start, ss, config.worker_listeners | |||
@@ -197,8 +197,6 @@ def start(config_options): | |||
assert config.worker_app == "synapse.app.user_dir" | |||
setup_logging(config, use_worker_options=True) | |||
events.USE_FROZEN_DICTS = config.use_frozen_dicts | |||
database_engine = create_engine(config.database_config) | |||
@@ -223,6 +221,8 @@ def start(config_options): | |||
database_engine=database_engine, | |||
) | |||
setup_logging(ss, config, use_worker_options=True) | |||
ss.setup() | |||
reactor.addSystemEventTrigger( | |||
"before", "startup", _base.start, ss, config.worker_listeners | |||
@@ -25,6 +25,10 @@ from twisted.logger import STDLibLogObserver, globalLogBeginner | |||
import synapse | |||
from synapse.app import _base as appbase | |||
from synapse.logging._structured import ( | |||
reload_structured_logging, | |||
setup_structured_logging, | |||
) | |||
from synapse.logging.context import LoggingContextFilter | |||
from synapse.util.versionstring import get_version_string | |||
@@ -119,21 +123,10 @@ class LoggingConfig(Config): | |||
log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file)) | |||
def setup_logging(config, use_worker_options=False): | |||
""" Set up python logging | |||
Args: | |||
config (LoggingConfig | synapse.config.workers.WorkerConfig): | |||
configuration data | |||
use_worker_options (bool): True to use the 'worker_log_config' option | |||
instead of 'log_config'. | |||
register_sighup (func | None): Function to call to register a | |||
sighup handler. | |||
def _setup_stdlib_logging(config, log_config): | |||
""" | |||
Set up Python stdlib logging. | |||
""" | |||
log_config = config.worker_log_config if use_worker_options else config.log_config | |||
if log_config is None: | |||
log_format = ( | |||
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" | |||
@@ -151,35 +144,10 @@ def setup_logging(config, use_worker_options=False): | |||
handler.addFilter(LoggingContextFilter(request="")) | |||
logger.addHandler(handler) | |||
else: | |||
logging.config.dictConfig(log_config) | |||
def load_log_config(): | |||
with open(log_config, "r") as f: | |||
logging.config.dictConfig(yaml.safe_load(f)) | |||
def sighup(*args): | |||
# it might be better to use a file watcher or something for this. | |||
load_log_config() | |||
logging.info("Reloaded log config from %s due to SIGHUP", log_config) | |||
load_log_config() | |||
appbase.register_sighup(sighup) | |||
# make sure that the first thing we log is a thing we can grep backwards | |||
# for | |||
logging.warn("***** STARTING SERVER *****") | |||
logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse)) | |||
logging.info("Server hostname: %s", config.server_name) | |||
# It's critical to point twisted's internal logging somewhere, otherwise it | |||
# stacks up and leaks kup to 64K object; | |||
# see: https://twistedmatrix.com/trac/ticket/8164 | |||
# | |||
# Routing to the python logging framework could be a performance problem if | |||
# the handlers blocked for a long time as python.logging is a blocking API | |||
# see https://twistedmatrix.com/documents/current/core/howto/logger.html | |||
# filed as https://github.com/matrix-org/synapse/issues/1727 | |||
# | |||
# However this may not be too much of a problem if we are just writing to a file. | |||
# Route Twisted's native logging through to the standard library logging | |||
# system. | |||
observer = STDLibLogObserver() | |||
def _log(event): | |||
@@ -201,3 +169,54 @@ def setup_logging(config, use_worker_options=False): | |||
) | |||
if not config.no_redirect_stdio: | |||
print("Redirected stdout/stderr to logs") | |||
def _reload_stdlib_logging(*args, log_config=None): | |||
logger = logging.getLogger("") | |||
if not log_config: | |||
logger.warn("Reloaded a blank config?") | |||
logging.config.dictConfig(log_config) | |||
def setup_logging(hs, config, use_worker_options=False): | |||
""" | |||
Set up the logging subsystem. | |||
Args: | |||
config (LoggingConfig | synapse.config.workers.WorkerConfig): | |||
configuration data | |||
use_worker_options (bool): True to use the 'worker_log_config' option | |||
instead of 'log_config'. | |||
""" | |||
log_config = config.worker_log_config if use_worker_options else config.log_config | |||
def read_config(*args, callback=None): | |||
if log_config is None: | |||
return None | |||
with open(log_config, "rb") as f: | |||
log_config_body = yaml.safe_load(f.read()) | |||
if callback: | |||
callback(log_config=log_config_body) | |||
logging.info("Reloaded log config from %s due to SIGHUP", log_config) | |||
return log_config_body | |||
log_config_body = read_config() | |||
if log_config_body and log_config_body.get("structured") is True: | |||
setup_structured_logging(hs, config, log_config_body) | |||
appbase.register_sighup(read_config, callback=reload_structured_logging) | |||
else: | |||
_setup_stdlib_logging(config, log_config_body) | |||
appbase.register_sighup(read_config, callback=_reload_stdlib_logging) | |||
# make sure that the first thing we log is a thing we can grep backwards | |||
# for | |||
logging.warn("***** STARTING SERVER *****") | |||
logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse)) | |||
logging.info("Server hostname: %s", config.server_name) |
@@ -326,8 +326,9 @@ class FederationHandler(BaseHandler): | |||
ours = yield self.store.get_state_groups_ids(room_id, seen) | |||
# state_maps is a list of mappings from (type, state_key) to event_id | |||
# type: list[dict[tuple[str, str], str]] | |||
state_maps = list(ours.values()) | |||
state_maps = list( | |||
ours.values() | |||
) # type: list[dict[tuple[str, str], str]] | |||
# we don't need this any more, let's delete it. | |||
del ours | |||
@@ -0,0 +1,374 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2019 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import logging | |||
import os.path | |||
import sys | |||
import typing | |||
import warnings | |||
import attr | |||
from constantly import NamedConstant, Names, ValueConstant, Values | |||
from zope.interface import implementer | |||
from twisted.logger import ( | |||
FileLogObserver, | |||
FilteringLogObserver, | |||
ILogObserver, | |||
LogBeginner, | |||
Logger, | |||
LogLevel, | |||
LogLevelFilterPredicate, | |||
LogPublisher, | |||
eventAsText, | |||
globalLogBeginner, | |||
jsonFileLogObserver, | |||
) | |||
from synapse.config._base import ConfigError | |||
from synapse.logging._terse_json import ( | |||
TerseJSONToConsoleLogObserver, | |||
TerseJSONToTCPLogObserver, | |||
) | |||
from synapse.logging.context import LoggingContext | |||
def stdlib_log_level_to_twisted(level: str) -> LogLevel: | |||
""" | |||
Convert a stdlib log level to Twisted's log level. | |||
""" | |||
lvl = level.lower().replace("warning", "warn") | |||
return LogLevel.levelWithName(lvl) | |||
@attr.s | |||
@implementer(ILogObserver) | |||
class LogContextObserver(object): | |||
""" | |||
An ILogObserver which adds Synapse-specific log context information. | |||
Attributes: | |||
observer (ILogObserver): The target parent observer. | |||
""" | |||
observer = attr.ib() | |||
def __call__(self, event: dict) -> None: | |||
""" | |||
Consume a log event and emit it to the parent observer after filtering | |||
and adding log context information. | |||
Args: | |||
event (dict) | |||
""" | |||
# Filter out some useless events that Twisted outputs | |||
if "log_text" in event: | |||
if event["log_text"].startswith("DNSDatagramProtocol starting on "): | |||
return | |||
if event["log_text"].startswith("(UDP Port "): | |||
return | |||
if event["log_text"].startswith("Timing out client") or event[ | |||
"log_format" | |||
].startswith("Timing out client"): | |||
return | |||
context = LoggingContext.current_context() | |||
# Copy the context information to the log event. | |||
if context is not None: | |||
context.copy_to_twisted_log_entry(event) | |||
else: | |||
# If there's no logging context, not even the root one, we might be | |||
# starting up or it might be from non-Synapse code. Log it as if it | |||
# came from the root logger. | |||
event["request"] = None | |||
event["scope"] = None | |||
self.observer(event) | |||
class PythonStdlibToTwistedLogger(logging.Handler): | |||
""" | |||
Transform a Python stdlib log message into a Twisted one. | |||
""" | |||
def __init__(self, observer, *args, **kwargs): | |||
""" | |||
Args: | |||
observer (ILogObserver): A Twisted logging observer. | |||
*args, **kwargs: Args/kwargs to be passed to logging.Handler. | |||
""" | |||
self.observer = observer | |||
super().__init__(*args, **kwargs) | |||
def emit(self, record: logging.LogRecord) -> None: | |||
""" | |||
Emit a record to Twisted's observer. | |||
Args: | |||
record (logging.LogRecord) | |||
""" | |||
self.observer( | |||
{ | |||
"log_time": record.created, | |||
"log_text": record.getMessage(), | |||
"log_format": "{log_text}", | |||
"log_namespace": record.name, | |||
"log_level": stdlib_log_level_to_twisted(record.levelname), | |||
} | |||
) | |||
def SynapseFileLogObserver(outFile: typing.io.TextIO) -> FileLogObserver: | |||
""" | |||
A log observer that formats events like the traditional log formatter and | |||
sends them to `outFile`. | |||
Args: | |||
outFile (file object): The file object to write to. | |||
""" | |||
def formatEvent(_event: dict) -> str: | |||
event = dict(_event) | |||
event["log_level"] = event["log_level"].name.upper() | |||
event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + ( | |||
event.get("log_format", "{log_text}") or "{log_text}" | |||
) | |||
return eventAsText(event, includeSystem=False) + "\n" | |||
return FileLogObserver(outFile, formatEvent) | |||
class DrainType(Names): | |||
CONSOLE = NamedConstant() | |||
CONSOLE_JSON = NamedConstant() | |||
CONSOLE_JSON_TERSE = NamedConstant() | |||
FILE = NamedConstant() | |||
FILE_JSON = NamedConstant() | |||
NETWORK_JSON_TERSE = NamedConstant() | |||
class OutputPipeType(Values): | |||
stdout = ValueConstant(sys.__stdout__) | |||
stderr = ValueConstant(sys.__stderr__) | |||
@attr.s | |||
class DrainConfiguration(object): | |||
name = attr.ib() | |||
type = attr.ib() | |||
location = attr.ib() | |||
options = attr.ib(default=None) | |||
@attr.s | |||
class NetworkJSONTerseOptions(object): | |||
maximum_buffer = attr.ib(type=int) | |||
DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}} | |||
def parse_drain_configs( | |||
drains: dict | |||
) -> typing.Generator[DrainConfiguration, None, None]: | |||
""" | |||
Parse the drain configurations. | |||
Args: | |||
drains (dict): A list of drain configurations. | |||
Yields: | |||
DrainConfiguration instances. | |||
Raises: | |||
ConfigError: If any of the drain configuration items are invalid. | |||
""" | |||
for name, config in drains.items(): | |||
if "type" not in config: | |||
raise ConfigError("Logging drains require a 'type' key.") | |||
try: | |||
logging_type = DrainType.lookupByName(config["type"].upper()) | |||
except ValueError: | |||
raise ConfigError( | |||
"%s is not a known logging drain type." % (config["type"],) | |||
) | |||
if logging_type in [ | |||
DrainType.CONSOLE, | |||
DrainType.CONSOLE_JSON, | |||
DrainType.CONSOLE_JSON_TERSE, | |||
]: | |||
location = config.get("location") | |||
if location is None or location not in ["stdout", "stderr"]: | |||
raise ConfigError( | |||
( | |||
"The %s drain needs the 'location' key set to " | |||
"either 'stdout' or 'stderr'." | |||
) | |||
% (logging_type,) | |||
) | |||
pipe = OutputPipeType.lookupByName(location).value | |||
yield DrainConfiguration(name=name, type=logging_type, location=pipe) | |||
elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]: | |||
if "location" not in config: | |||
raise ConfigError( | |||
"The %s drain needs the 'location' key set." % (logging_type,) | |||
) | |||
location = config.get("location") | |||
if os.path.abspath(location) != location: | |||
raise ConfigError( | |||
"File paths need to be absolute, '%s' is a relative path" | |||
% (location,) | |||
) | |||
yield DrainConfiguration(name=name, type=logging_type, location=location) | |||
elif logging_type in [DrainType.NETWORK_JSON_TERSE]: | |||
host = config.get("host") | |||
port = config.get("port") | |||
maximum_buffer = config.get("maximum_buffer", 1000) | |||
yield DrainConfiguration( | |||
name=name, | |||
type=logging_type, | |||
location=(host, port), | |||
options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer), | |||
) | |||
else: | |||
raise ConfigError( | |||
"The %s drain type is currently not implemented." | |||
% (config["type"].upper(),) | |||
) | |||
def setup_structured_logging( | |||
hs, | |||
config, | |||
log_config: dict, | |||
logBeginner: LogBeginner = globalLogBeginner, | |||
redirect_stdlib_logging: bool = True, | |||
) -> LogPublisher: | |||
""" | |||
Set up Twisted's structured logging system. | |||
Args: | |||
hs: The homeserver to use. | |||
config (HomeserverConfig): The configuration of the Synapse homeserver. | |||
log_config (dict): The log configuration to use. | |||
""" | |||
if config.no_redirect_stdio: | |||
raise ConfigError( | |||
"no_redirect_stdio cannot be defined using structured logging." | |||
) | |||
logger = Logger() | |||
if "drains" not in log_config: | |||
raise ConfigError("The logging configuration requires a list of drains.") | |||
observers = [] | |||
for observer in parse_drain_configs(log_config["drains"]): | |||
# Pipe drains | |||
if observer.type == DrainType.CONSOLE: | |||
logger.debug( | |||
"Starting up the {name} console logger drain", name=observer.name | |||
) | |||
observers.append(SynapseFileLogObserver(observer.location)) | |||
elif observer.type == DrainType.CONSOLE_JSON: | |||
logger.debug( | |||
"Starting up the {name} JSON console logger drain", name=observer.name | |||
) | |||
observers.append(jsonFileLogObserver(observer.location)) | |||
elif observer.type == DrainType.CONSOLE_JSON_TERSE: | |||
logger.debug( | |||
"Starting up the {name} terse JSON console logger drain", | |||
name=observer.name, | |||
) | |||
observers.append( | |||
TerseJSONToConsoleLogObserver(observer.location, metadata={}) | |||
) | |||
# File drains | |||
elif observer.type == DrainType.FILE: | |||
logger.debug("Starting up the {name} file logger drain", name=observer.name) | |||
log_file = open(observer.location, "at", buffering=1, encoding="utf8") | |||
observers.append(SynapseFileLogObserver(log_file)) | |||
elif observer.type == DrainType.FILE_JSON: | |||
logger.debug( | |||
"Starting up the {name} JSON file logger drain", name=observer.name | |||
) | |||
log_file = open(observer.location, "at", buffering=1, encoding="utf8") | |||
observers.append(jsonFileLogObserver(log_file)) | |||
elif observer.type == DrainType.NETWORK_JSON_TERSE: | |||
metadata = {"server_name": hs.config.server_name} | |||
log_observer = TerseJSONToTCPLogObserver( | |||
hs=hs, | |||
host=observer.location[0], | |||
port=observer.location[1], | |||
metadata=metadata, | |||
maximum_buffer=observer.options.maximum_buffer, | |||
) | |||
log_observer.start() | |||
observers.append(log_observer) | |||
else: | |||
# We should never get here, but, just in case, throw an error. | |||
raise ConfigError("%s drain type cannot be configured" % (observer.type,)) | |||
publisher = LogPublisher(*observers) | |||
log_filter = LogLevelFilterPredicate() | |||
for namespace, namespace_config in log_config.get( | |||
"loggers", DEFAULT_LOGGERS | |||
).items(): | |||
# Set the log level for twisted.logger.Logger namespaces | |||
log_filter.setLogLevelForNamespace( | |||
namespace, | |||
stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")), | |||
) | |||
# Also set the log levels for the stdlib logger namespaces, to prevent | |||
# them getting to PythonStdlibToTwistedLogger and having to be formatted | |||
if "level" in namespace_config: | |||
logging.getLogger(namespace).setLevel(namespace_config.get("level")) | |||
f = FilteringLogObserver(publisher, [log_filter]) | |||
lco = LogContextObserver(f) | |||
if redirect_stdlib_logging: | |||
stuff_into_twisted = PythonStdlibToTwistedLogger(lco) | |||
stdliblogger = logging.getLogger() | |||
stdliblogger.addHandler(stuff_into_twisted) | |||
# Always redirect standard I/O, otherwise other logging outputs might miss | |||
# it. | |||
logBeginner.beginLoggingTo([lco], redirectStandardIO=True) | |||
return publisher | |||
def reload_structured_logging(*args, log_config=None) -> None: | |||
warnings.warn( | |||
"Currently the structured logging system can not be reloaded, doing nothing" | |||
) |
@@ -0,0 +1,278 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2019 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
""" | |||
Log formatters that output terse JSON. | |||
""" | |||
import sys | |||
from collections import deque | |||
from ipaddress import IPv4Address, IPv6Address, ip_address | |||
from math import floor | |||
from typing.io import TextIO | |||
import attr | |||
from simplejson import dumps | |||
from twisted.application.internet import ClientService | |||
from twisted.internet.endpoints import ( | |||
HostnameEndpoint, | |||
TCP4ClientEndpoint, | |||
TCP6ClientEndpoint, | |||
) | |||
from twisted.internet.protocol import Factory, Protocol | |||
from twisted.logger import FileLogObserver, Logger | |||
from twisted.python.failure import Failure | |||
def flatten_event(event: dict, metadata: dict, include_time: bool = False): | |||
""" | |||
Flatten a Twisted logging event to an dictionary capable of being sent | |||
as a log event to a logging aggregation system. | |||
The format is vastly simplified and is not designed to be a "human readable | |||
string" in the sense that traditional logs are. Instead, the structure is | |||
optimised for searchability and filtering, with human-understandable log | |||
keys. | |||
Args: | |||
event (dict): The Twisted logging event we are flattening. | |||
metadata (dict): Additional data to include with each log message. This | |||
can be information like the server name. Since the target log | |||
consumer does not know who we are other than by host IP, this | |||
allows us to forward through static information. | |||
include_time (bool): Should we include the `time` key? If False, the | |||
event time is stripped from the event. | |||
""" | |||
new_event = {} | |||
# If it's a failure, make the new event's log_failure be the traceback text. | |||
if "log_failure" in event: | |||
new_event["log_failure"] = event["log_failure"].getTraceback() | |||
# If it's a warning, copy over a string representation of the warning. | |||
if "warning" in event: | |||
new_event["warning"] = str(event["warning"]) | |||
# Stdlib logging events have "log_text" as their human-readable portion, | |||
# Twisted ones have "log_format". For now, include the log_format, so that | |||
# context only given in the log format (e.g. what is being logged) is | |||
# available. | |||
if "log_text" in event: | |||
new_event["log"] = event["log_text"] | |||
else: | |||
new_event["log"] = event["log_format"] | |||
# We want to include the timestamp when forwarding over the network, but | |||
# exclude it when we are writing to stdout. This is because the log ingester | |||
# (e.g. logstash, fluentd) can add its own timestamp. | |||
if include_time: | |||
new_event["time"] = round(event["log_time"], 2) | |||
# Convert the log level to a textual representation. | |||
new_event["level"] = event["log_level"].name.upper() | |||
# Ignore these keys, and do not transfer them over to the new log object. | |||
# They are either useless (isError), transferred manually above (log_time, | |||
# log_level, etc), or contain Python objects which are not useful for output | |||
# (log_logger, log_source). | |||
keys_to_delete = [ | |||
"isError", | |||
"log_failure", | |||
"log_format", | |||
"log_level", | |||
"log_logger", | |||
"log_source", | |||
"log_system", | |||
"log_time", | |||
"log_text", | |||
"observer", | |||
"warning", | |||
] | |||
# If it's from the Twisted legacy logger (twisted.python.log), it adds some | |||
# more keys we want to purge. | |||
if event.get("log_namespace") == "log_legacy": | |||
keys_to_delete.extend(["message", "system", "time"]) | |||
# Rather than modify the dictionary in place, construct a new one with only | |||
# the content we want. The original event should be considered 'frozen'. | |||
for key in event.keys(): | |||
if key in keys_to_delete: | |||
continue | |||
if isinstance(event[key], (str, int, bool, float)) or event[key] is None: | |||
# If it's a plain type, include it as is. | |||
new_event[key] = event[key] | |||
else: | |||
# If it's not one of those basic types, write out a string | |||
# representation. This should probably be a warning in development, | |||
# so that we are sure we are only outputting useful data. | |||
new_event[key] = str(event[key]) | |||
# Add the metadata information to the event (e.g. the server_name). | |||
new_event.update(metadata) | |||
return new_event | |||
def TerseJSONToConsoleLogObserver(outFile: TextIO, metadata: dict) -> FileLogObserver: | |||
""" | |||
A log observer that formats events to a flattened JSON representation. | |||
Args: | |||
outFile: The file object to write to. | |||
metadata: Metadata to be added to each log object. | |||
""" | |||
def formatEvent(_event: dict) -> str: | |||
flattened = flatten_event(_event, metadata) | |||
return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n" | |||
return FileLogObserver(outFile, formatEvent) | |||
@attr.s | |||
class TerseJSONToTCPLogObserver(object): | |||
""" | |||
An IObserver that writes JSON logs to a TCP target. | |||
Args: | |||
hs (HomeServer): The Homeserver that is being logged for. | |||
host: The host of the logging target. | |||
port: The logging target's port. | |||
metadata: Metadata to be added to each log entry. | |||
""" | |||
hs = attr.ib() | |||
host = attr.ib(type=str) | |||
port = attr.ib(type=int) | |||
metadata = attr.ib(type=dict) | |||
maximum_buffer = attr.ib(type=int) | |||
_buffer = attr.ib(default=attr.Factory(deque), type=deque) | |||
_writer = attr.ib(default=None) | |||
_logger = attr.ib(default=attr.Factory(Logger)) | |||
def start(self) -> None: | |||
# Connect without DNS lookups if it's a direct IP. | |||
try: | |||
ip = ip_address(self.host) | |||
if isinstance(ip, IPv4Address): | |||
endpoint = TCP4ClientEndpoint( | |||
self.hs.get_reactor(), self.host, self.port | |||
) | |||
elif isinstance(ip, IPv6Address): | |||
endpoint = TCP6ClientEndpoint( | |||
self.hs.get_reactor(), self.host, self.port | |||
) | |||
except ValueError: | |||
endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port) | |||
factory = Factory.forProtocol(Protocol) | |||
self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) | |||
self._service.startService() | |||
def _write_loop(self) -> None: | |||
""" | |||
Implement the write loop. | |||
""" | |||
if self._writer: | |||
return | |||
self._writer = self._service.whenConnected() | |||
@self._writer.addBoth | |||
def writer(r): | |||
if isinstance(r, Failure): | |||
r.printTraceback(file=sys.__stderr__) | |||
self._writer = None | |||
self.hs.get_reactor().callLater(1, self._write_loop) | |||
return | |||
try: | |||
for event in self._buffer: | |||
r.transport.write( | |||
dumps(event, ensure_ascii=False, separators=(",", ":")).encode( | |||
"utf8" | |||
) | |||
) | |||
r.transport.write(b"\n") | |||
self._buffer.clear() | |||
except Exception as e: | |||
sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),)) | |||
self._writer = False | |||
self.hs.get_reactor().callLater(1, self._write_loop) | |||
def _handle_pressure(self) -> None: | |||
""" | |||
Handle backpressure by shedding events. | |||
The buffer will, in this order, until the buffer is below the maximum: | |||
- Shed DEBUG events | |||
- Shed INFO events | |||
- Shed the middle 50% of the events. | |||
""" | |||
if len(self._buffer) <= self.maximum_buffer: | |||
return | |||
# Strip out DEBUGs | |||
self._buffer = deque( | |||
filter(lambda event: event["level"] != "DEBUG", self._buffer) | |||
) | |||
if len(self._buffer) <= self.maximum_buffer: | |||
return | |||
# Strip out INFOs | |||
self._buffer = deque( | |||
filter(lambda event: event["level"] != "INFO", self._buffer) | |||
) | |||
if len(self._buffer) <= self.maximum_buffer: | |||
return | |||
# Cut the middle entries out | |||
buffer_split = floor(self.maximum_buffer / 2) | |||
old_buffer = self._buffer | |||
self._buffer = deque() | |||
for i in range(buffer_split): | |||
self._buffer.append(old_buffer.popleft()) | |||
end_buffer = [] | |||
for i in range(buffer_split): | |||
end_buffer.append(old_buffer.pop()) | |||
self._buffer.extend(reversed(end_buffer)) | |||
def __call__(self, event: dict) -> None: | |||
flattened = flatten_event(event, self.metadata, include_time=True) | |||
self._buffer.append(flattened) | |||
# Handle backpressure, if it exists. | |||
try: | |||
self._handle_pressure() | |||
except Exception: | |||
# If handling backpressure fails,clear the buffer and log the | |||
# exception. | |||
self._buffer.clear() | |||
self._logger.failure("Failed clearing backpressure") | |||
# Try and write immediately. | |||
self._write_loop() |
@@ -25,6 +25,7 @@ See doc/log_contexts.rst for details on how this works. | |||
import logging | |||
import threading | |||
import types | |||
from typing import Any, List | |||
from twisted.internet import defer, threads | |||
@@ -194,7 +195,7 @@ class LoggingContext(object): | |||
class Sentinel(object): | |||
"""Sentinel to represent the root context""" | |||
__slots__ = [] | |||
__slots__ = [] # type: List[Any] | |||
def __str__(self): | |||
return "sentinel" | |||
@@ -202,6 +203,10 @@ class LoggingContext(object): | |||
def copy_to(self, record): | |||
pass | |||
def copy_to_twisted_log_entry(self, record): | |||
record["request"] = None | |||
record["scope"] = None | |||
def start(self): | |||
pass | |||
@@ -330,6 +335,13 @@ class LoggingContext(object): | |||
# we also track the current scope: | |||
record.scope = self.scope | |||
def copy_to_twisted_log_entry(self, record): | |||
""" | |||
Copy logging fields from this context to a Twisted log record. | |||
""" | |||
record["request"] = self.request | |||
record["scope"] = self.scope | |||
def start(self): | |||
if get_thread_id() != self.main_thread: | |||
logger.warning("Started logcontext %s on different thread", self) | |||
@@ -47,9 +47,9 @@ REQUIREMENTS = [ | |||
"idna>=2.5", | |||
# validating SSL certs for IP addresses requires service_identity 18.1. | |||
"service_identity>=18.1.0", | |||
# our logcontext handling relies on the ability to cancel inlineCallbacks | |||
# (https://twistedmatrix.com/trac/ticket/4632) which landed in Twisted 18.7. | |||
"Twisted>=18.7.0", | |||
# Twisted 18.9 introduces some logger improvements that the structured | |||
# logger utilises | |||
"Twisted>=18.9.0", | |||
"treq>=15.1", | |||
# Twisted has required pyopenssl 16.0 since about Twisted 16.6. | |||
"pyopenssl>=16.0.0", | |||
@@ -0,0 +1,197 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2019 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import os | |||
import os.path | |||
import shutil | |||
import sys | |||
import textwrap | |||
from twisted.logger import Logger, eventAsText, eventsFromJSONLogFile | |||
from synapse.config.logger import setup_logging | |||
from synapse.logging._structured import setup_structured_logging | |||
from synapse.logging.context import LoggingContext | |||
from tests.unittest import DEBUG, HomeserverTestCase | |||
class FakeBeginner(object): | |||
def beginLoggingTo(self, observers, **kwargs): | |||
self.observers = observers | |||
class StructuredLoggingTestCase(HomeserverTestCase): | |||
""" | |||
Tests for Synapse's structured logging support. | |||
""" | |||
def test_output_to_json_round_trip(self): | |||
""" | |||
Synapse logs can be outputted to JSON and then read back again. | |||
""" | |||
temp_dir = self.mktemp() | |||
os.mkdir(temp_dir) | |||
self.addCleanup(shutil.rmtree, temp_dir) | |||
json_log_file = os.path.abspath(os.path.join(temp_dir, "out.json")) | |||
log_config = { | |||
"drains": {"jsonfile": {"type": "file_json", "location": json_log_file}} | |||
} | |||
# Begin the logger with our config | |||
beginner = FakeBeginner() | |||
setup_structured_logging( | |||
self.hs, self.hs.config, log_config, logBeginner=beginner | |||
) | |||
# Make a logger and send an event | |||
logger = Logger( | |||
namespace="tests.logging.test_structured", observer=beginner.observers[0] | |||
) | |||
logger.info("Hello there, {name}!", name="wally") | |||
# Read the log file and check it has the event we sent | |||
with open(json_log_file, "r") as f: | |||
logged_events = list(eventsFromJSONLogFile(f)) | |||
self.assertEqual(len(logged_events), 1) | |||
# The event pulled from the file should render fine | |||
self.assertEqual( | |||
eventAsText(logged_events[0], includeTimestamp=False), | |||
"[tests.logging.test_structured#info] Hello there, wally!", | |||
) | |||
def test_output_to_text(self): | |||
""" | |||
Synapse logs can be outputted to text. | |||
""" | |||
temp_dir = self.mktemp() | |||
os.mkdir(temp_dir) | |||
self.addCleanup(shutil.rmtree, temp_dir) | |||
log_file = os.path.abspath(os.path.join(temp_dir, "out.log")) | |||
log_config = {"drains": {"file": {"type": "file", "location": log_file}}} | |||
# Begin the logger with our config | |||
beginner = FakeBeginner() | |||
setup_structured_logging( | |||
self.hs, self.hs.config, log_config, logBeginner=beginner | |||
) | |||
# Make a logger and send an event | |||
logger = Logger( | |||
namespace="tests.logging.test_structured", observer=beginner.observers[0] | |||
) | |||
logger.info("Hello there, {name}!", name="wally") | |||
# Read the log file and check it has the event we sent | |||
with open(log_file, "r") as f: | |||
logged_events = f.read().strip().split("\n") | |||
self.assertEqual(len(logged_events), 1) | |||
# The event pulled from the file should render fine | |||
self.assertTrue( | |||
logged_events[0].endswith( | |||
" - tests.logging.test_structured - INFO - None - Hello there, wally!" | |||
) | |||
) | |||
def test_collects_logcontext(self): | |||
""" | |||
Test that log outputs have the attached logging context. | |||
""" | |||
log_config = {"drains": {}} | |||
# Begin the logger with our config | |||
beginner = FakeBeginner() | |||
publisher = setup_structured_logging( | |||
self.hs, self.hs.config, log_config, logBeginner=beginner | |||
) | |||
logs = [] | |||
publisher.addObserver(logs.append) | |||
# Make a logger and send an event | |||
logger = Logger( | |||
namespace="tests.logging.test_structured", observer=beginner.observers[0] | |||
) | |||
with LoggingContext("testcontext", request="somereq"): | |||
logger.info("Hello there, {name}!", name="steve") | |||
self.assertEqual(len(logs), 1) | |||
self.assertEqual(logs[0]["request"], "somereq") | |||
class StructuredLoggingConfigurationFileTestCase(HomeserverTestCase): | |||
def make_homeserver(self, reactor, clock): | |||
tempdir = self.mktemp() | |||
os.mkdir(tempdir) | |||
log_config_file = os.path.abspath(os.path.join(tempdir, "log.config.yaml")) | |||
self.homeserver_log = os.path.abspath(os.path.join(tempdir, "homeserver.log")) | |||
config = self.default_config() | |||
config["log_config"] = log_config_file | |||
with open(log_config_file, "w") as f: | |||
f.write( | |||
textwrap.dedent( | |||
"""\ | |||
structured: true | |||
drains: | |||
file: | |||
type: file_json | |||
location: %s | |||
""" | |||
% (self.homeserver_log,) | |||
) | |||
) | |||
self.addCleanup(self._sys_cleanup) | |||
return self.setup_test_homeserver(config=config) | |||
def _sys_cleanup(self): | |||
sys.stdout = sys.__stdout__ | |||
sys.stderr = sys.__stderr__ | |||
# Do not remove! We need the logging system to be set other than WARNING. | |||
@DEBUG | |||
def test_log_output(self): | |||
""" | |||
When a structured logging config is given, Synapse will use it. | |||
""" | |||
setup_logging(self.hs, self.hs.config) | |||
# Make a logger and send an event | |||
logger = Logger(namespace="tests.logging.test_structured") | |||
with LoggingContext("testcontext", request="somereq"): | |||
logger.info("Hello there, {name}!", name="steve") | |||
with open(self.homeserver_log, "r") as f: | |||
logged_events = [ | |||
eventAsText(x, includeTimestamp=False) for x in eventsFromJSONLogFile(f) | |||
] | |||
logs = "\n".join(logged_events) | |||
self.assertTrue("***** STARTING SERVER *****" in logs) | |||
self.assertTrue("Hello there, steve!" in logs) |
@@ -0,0 +1,234 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2019 The Matrix.org Foundation C.I.C. | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
import json | |||
from collections import Counter | |||
from twisted.logger import Logger | |||
from synapse.logging._structured import setup_structured_logging | |||
from tests.server import connect_client | |||
from tests.unittest import HomeserverTestCase | |||
from .test_structured import FakeBeginner | |||
class TerseJSONTCPTestCase(HomeserverTestCase): | |||
def test_log_output(self): | |||
""" | |||
The Terse JSON outputter delivers simplified structured logs over TCP. | |||
""" | |||
log_config = { | |||
"drains": { | |||
"tersejson": { | |||
"type": "network_json_terse", | |||
"host": "127.0.0.1", | |||
"port": 8000, | |||
} | |||
} | |||
} | |||
# Begin the logger with our config | |||
beginner = FakeBeginner() | |||
setup_structured_logging( | |||
self.hs, self.hs.config, log_config, logBeginner=beginner | |||
) | |||
logger = Logger( | |||
namespace="tests.logging.test_terse_json", observer=beginner.observers[0] | |||
) | |||
logger.info("Hello there, {name}!", name="wally") | |||
# Trigger the connection | |||
self.pump() | |||
_, server = connect_client(self.reactor, 0) | |||
# Trigger data being sent | |||
self.pump() | |||
# One log message, with a single trailing newline | |||
logs = server.data.decode("utf8").splitlines() | |||
self.assertEqual(len(logs), 1) | |||
self.assertEqual(server.data.count(b"\n"), 1) | |||
log = json.loads(logs[0]) | |||
# The terse logger should give us these keys. | |||
expected_log_keys = [ | |||
"log", | |||
"time", | |||
"level", | |||
"log_namespace", | |||
"request", | |||
"scope", | |||
"server_name", | |||
"name", | |||
] | |||
self.assertEqual(set(log.keys()), set(expected_log_keys)) | |||
# It contains the data we expect. | |||
self.assertEqual(log["name"], "wally") | |||
def test_log_backpressure_debug(self): | |||
""" | |||
When backpressure is hit, DEBUG logs will be shed. | |||
""" | |||
log_config = { | |||
"loggers": {"synapse": {"level": "DEBUG"}}, | |||
"drains": { | |||
"tersejson": { | |||
"type": "network_json_terse", | |||
"host": "127.0.0.1", | |||
"port": 8000, | |||
"maximum_buffer": 10, | |||
} | |||
}, | |||
} | |||
# Begin the logger with our config | |||
beginner = FakeBeginner() | |||
setup_structured_logging( | |||
self.hs, | |||
self.hs.config, | |||
log_config, | |||
logBeginner=beginner, | |||
redirect_stdlib_logging=False, | |||
) | |||
logger = Logger( | |||
namespace="synapse.logging.test_terse_json", observer=beginner.observers[0] | |||
) | |||
# Send some debug messages | |||
for i in range(0, 3): | |||
logger.debug("debug %s" % (i,)) | |||
# Send a bunch of useful messages | |||
for i in range(0, 7): | |||
logger.info("test message %s" % (i,)) | |||
# The last debug message pushes it past the maximum buffer | |||
logger.debug("too much debug") | |||
# Allow the reconnection | |||
_, server = connect_client(self.reactor, 0) | |||
self.pump() | |||
# Only the 7 infos made it through, the debugs were elided | |||
logs = server.data.splitlines() | |||
self.assertEqual(len(logs), 7) | |||
def test_log_backpressure_info(self): | |||
""" | |||
When backpressure is hit, DEBUG and INFO logs will be shed. | |||
""" | |||
log_config = { | |||
"loggers": {"synapse": {"level": "DEBUG"}}, | |||
"drains": { | |||
"tersejson": { | |||
"type": "network_json_terse", | |||
"host": "127.0.0.1", | |||
"port": 8000, | |||
"maximum_buffer": 10, | |||
} | |||
}, | |||
} | |||
# Begin the logger with our config | |||
beginner = FakeBeginner() | |||
setup_structured_logging( | |||
self.hs, | |||
self.hs.config, | |||
log_config, | |||
logBeginner=beginner, | |||
redirect_stdlib_logging=False, | |||
) | |||
logger = Logger( | |||
namespace="synapse.logging.test_terse_json", observer=beginner.observers[0] | |||
) | |||
# Send some debug messages | |||
for i in range(0, 3): | |||
logger.debug("debug %s" % (i,)) | |||
# Send a bunch of useful messages | |||
for i in range(0, 10): | |||
logger.warn("test warn %s" % (i,)) | |||
# Send a bunch of info messages | |||
for i in range(0, 3): | |||
logger.info("test message %s" % (i,)) | |||
# The last debug message pushes it past the maximum buffer | |||
logger.debug("too much debug") | |||
# Allow the reconnection | |||
client, server = connect_client(self.reactor, 0) | |||
self.pump() | |||
# The 10 warnings made it through, the debugs and infos were elided | |||
logs = list(map(json.loads, server.data.decode("utf8").splitlines())) | |||
self.assertEqual(len(logs), 10) | |||
self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10}) | |||
def test_log_backpressure_cut_middle(self): | |||
""" | |||
When backpressure is hit, and no more DEBUG and INFOs cannot be culled, | |||
it will cut the middle messages out. | |||
""" | |||
log_config = { | |||
"loggers": {"synapse": {"level": "DEBUG"}}, | |||
"drains": { | |||
"tersejson": { | |||
"type": "network_json_terse", | |||
"host": "127.0.0.1", | |||
"port": 8000, | |||
"maximum_buffer": 10, | |||
} | |||
}, | |||
} | |||
# Begin the logger with our config | |||
beginner = FakeBeginner() | |||
setup_structured_logging( | |||
self.hs, | |||
self.hs.config, | |||
log_config, | |||
logBeginner=beginner, | |||
redirect_stdlib_logging=False, | |||
) | |||
logger = Logger( | |||
namespace="synapse.logging.test_terse_json", observer=beginner.observers[0] | |||
) | |||
# Send a bunch of useful messages | |||
for i in range(0, 20): | |||
logger.warn("test warn", num=i) | |||
# Allow the reconnection | |||
client, server = connect_client(self.reactor, 0) | |||
self.pump() | |||
# The first five and last five warnings made it through, the debugs and | |||
# infos were elided | |||
logs = list(map(json.loads, server.data.decode("utf8").splitlines())) | |||
self.assertEqual(len(logs), 10) | |||
self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10}) | |||
self.assertEqual([0, 1, 2, 3, 4, 15, 16, 17, 18, 19], [x["num"] for x in logs]) |
@@ -11,9 +11,13 @@ from twisted.internet import address, threads, udp | |||
from twisted.internet._resolver import SimpleResolverComplexifier | |||
from twisted.internet.defer import Deferred, fail, succeed | |||
from twisted.internet.error import DNSLookupError | |||
from twisted.internet.interfaces import IReactorPluggableNameResolver, IResolverSimple | |||
from twisted.internet.interfaces import ( | |||
IReactorPluggableNameResolver, | |||
IReactorTCP, | |||
IResolverSimple, | |||
) | |||
from twisted.python.failure import Failure | |||
from twisted.test.proto_helpers import MemoryReactorClock | |||
from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock | |||
from twisted.web.http import unquote | |||
from twisted.web.http_headers import Headers | |||
@@ -465,3 +469,22 @@ class FakeTransport(object): | |||
self.buffer = self.buffer[len(to_write) :] | |||
if self.buffer and self.autoflush: | |||
self._reactor.callLater(0.0, self.flush) | |||
def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol: | |||
""" | |||
Connect a client to a fake TCP transport. | |||
Args: | |||
reactor | |||
factory: The connecting factory to build. | |||
""" | |||
factory = reactor.tcpClients[client_id][2] | |||
client = factory.buildProtocol(None) | |||
server = AccumulatingProtocol() | |||
server.makeConnection(FakeTransport(client, reactor)) | |||
client.makeConnection(FakeTransport(server, reactor)) | |||
reactor.tcpClients.pop(client_id) | |||
return client, server |
@@ -146,3 +146,13 @@ commands = | |||
coverage combine | |||
coverage xml | |||
codecov -X gcov | |||
[testenv:mypy] | |||
basepython = python3.5 | |||
deps = | |||
{[base]deps} | |||
mypy | |||
extras = all | |||
commands = mypy --ignore-missing-imports \ | |||
synapse/logging/_structured.py \ | |||
synapse/logging/_terse_json.py |