when processing incoming transactions, it can be hard to see what's going on, because we process a bunch of stuff in parallel, and because we may end up recursively working our way through a chain of three or four events. This commit creates a way to use logcontexts to add the relevant event ids to the log lines.tags/v0.33.6rc1
@@ -0,0 +1 @@ | |||
Include eventid in log lines when processing incoming federation transactions |
@@ -46,6 +46,7 @@ from synapse.replication.http.federation import ( | |||
from synapse.types import get_domain_from_id | |||
from synapse.util.async_helpers import Linearizer, concurrently_execute | |||
from synapse.util.caches.response_cache import ResponseCache | |||
from synapse.util.logcontext import nested_logging_context | |||
from synapse.util.logutils import log_function | |||
# when processing incoming transactions, we try to handle multiple rooms in | |||
@@ -187,21 +188,22 @@ class FederationServer(FederationBase): | |||
for pdu in pdus_by_room[room_id]: | |||
event_id = pdu.event_id | |||
try: | |||
yield self._handle_received_pdu( | |||
origin, pdu | |||
) | |||
pdu_results[event_id] = {} | |||
except FederationError as e: | |||
logger.warn("Error handling PDU %s: %s", event_id, e) | |||
pdu_results[event_id] = {"error": str(e)} | |||
except Exception as e: | |||
f = failure.Failure() | |||
pdu_results[event_id] = {"error": str(e)} | |||
logger.error( | |||
"Failed to handle PDU %s: %s", | |||
event_id, f.getTraceback().rstrip(), | |||
) | |||
with nested_logging_context(event_id): | |||
try: | |||
yield self._handle_received_pdu( | |||
origin, pdu | |||
) | |||
pdu_results[event_id] = {} | |||
except FederationError as e: | |||
logger.warn("Error handling PDU %s: %s", event_id, e) | |||
pdu_results[event_id] = {"error": str(e)} | |||
except Exception as e: | |||
f = failure.Failure() | |||
pdu_results[event_id] = {"error": str(e)} | |||
logger.error( | |||
"Failed to handle PDU %s: %s", | |||
event_id, f.getTraceback().rstrip(), | |||
) | |||
yield concurrently_execute( | |||
process_pdus_for_room, pdus_by_room.keys(), | |||
@@ -339,14 +339,18 @@ class FederationHandler(BaseHandler): | |||
"[%s %s] Requesting state at missing prev_event %s", | |||
room_id, event_id, p, | |||
) | |||
state, got_auth_chain = ( | |||
yield self.federation_client.get_state_for_room( | |||
origin, room_id, p, | |||
with logcontext.nested_logging_context(p): | |||
state, got_auth_chain = ( | |||
yield self.federation_client.get_state_for_room( | |||
origin, room_id, p, | |||
) | |||
) | |||
) | |||
auth_chains.update(got_auth_chain) | |||
state_group = {(x.type, x.state_key): x.event_id for x in state} | |||
state_groups.append(state_group) | |||
auth_chains.update(got_auth_chain) | |||
state_group = { | |||
(x.type, x.state_key): x.event_id for x in state | |||
} | |||
state_groups.append(state_group) | |||
# Resolve any conflicting state | |||
def fetch(ev_ids): | |||
@@ -483,20 +487,21 @@ class FederationHandler(BaseHandler): | |||
"[%s %s] Handling received prev_event %s", | |||
room_id, event_id, ev.event_id, | |||
) | |||
try: | |||
yield self.on_receive_pdu( | |||
origin, | |||
ev, | |||
sent_to_us_directly=False, | |||
) | |||
except FederationError as e: | |||
if e.code == 403: | |||
logger.warn( | |||
"[%s %s] Received prev_event %s failed history check.", | |||
room_id, event_id, ev.event_id, | |||
with logcontext.nested_logging_context(ev.event_id): | |||
try: | |||
yield self.on_receive_pdu( | |||
origin, | |||
ev, | |||
sent_to_us_directly=False, | |||
) | |||
else: | |||
raise | |||
except FederationError as e: | |||
if e.code == 403: | |||
logger.warn( | |||
"[%s %s] Received prev_event %s failed history check.", | |||
room_id, event_id, ev.event_id, | |||
) | |||
else: | |||
raise | |||
@defer.inlineCallbacks | |||
def _process_received_pdu(self, origin, event, state, auth_chain): | |||
@@ -1135,7 +1140,8 @@ class FederationHandler(BaseHandler): | |||
try: | |||
logger.info("Processing queued PDU %s which was received " | |||
"while we were joining %s", p.event_id, p.room_id) | |||
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) | |||
with logcontext.nested_logging_context(p.event_id): | |||
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) | |||
except Exception as e: | |||
logger.warn( | |||
"Error handling queued PDU %s from %s: %s", | |||
@@ -1581,15 +1587,22 @@ class FederationHandler(BaseHandler): | |||
Notifies about the events where appropriate. | |||
""" | |||
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( | |||
[ | |||
logcontext.run_in_background( | |||
self._prep_event, | |||
@defer.inlineCallbacks | |||
def prep(ev_info): | |||
event = ev_info["event"] | |||
with logcontext.nested_logging_context(suffix=event.event_id): | |||
res = yield self._prep_event( | |||
origin, | |||
ev_info["event"], | |||
event, | |||
state=ev_info.get("state"), | |||
auth_events=ev_info.get("auth_events"), | |||
) | |||
defer.returnValue(res) | |||
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( | |||
[ | |||
logcontext.run_in_background(prep, ev_info) | |||
for ev_info in event_infos | |||
], consumeErrors=True, | |||
)) | |||
@@ -200,7 +200,7 @@ class LoggingContext(object): | |||
sentinel = Sentinel() | |||
def __init__(self, name=None, parent_context=None): | |||
def __init__(self, name=None, parent_context=None, request=None): | |||
self.previous_context = LoggingContext.current_context() | |||
self.name = name | |||
@@ -218,6 +218,13 @@ class LoggingContext(object): | |||
self.parent_context = parent_context | |||
if self.parent_context is not None: | |||
self.parent_context.copy_to(self) | |||
if request is not None: | |||
# the request param overrides the request from the parent context | |||
self.request = request | |||
def __str__(self): | |||
return "%s@%x" % (self.name, id(self)) | |||
@@ -256,9 +263,6 @@ class LoggingContext(object): | |||
) | |||
self.alive = True | |||
if self.parent_context is not None: | |||
self.parent_context.copy_to(self) | |||
return self | |||
def __exit__(self, type, value, traceback): | |||
@@ -439,6 +443,35 @@ class PreserveLoggingContext(object): | |||
) | |||
def nested_logging_context(suffix, parent_context=None): | |||
"""Creates a new logging context as a child of another. | |||
The nested logging context will have a 'request' made up of the parent context's | |||
request, plus the given suffix. | |||
CPU/db usage stats will be added to the parent context's on exit. | |||
Normal usage looks like: | |||
with nested_logging_context(suffix): | |||
# ... do stuff | |||
Args: | |||
suffix (str): suffix to add to the parent context's 'request'. | |||
parent_context (LoggingContext|None): parent context. Will use the current context | |||
if None. | |||
Returns: | |||
LoggingContext: new logging context. | |||
""" | |||
if parent_context is None: | |||
parent_context = LoggingContext.current_context() | |||
return LoggingContext( | |||
parent_context=parent_context, | |||
request=parent_context.request + "-" + suffix, | |||
) | |||
def preserve_fn(f): | |||
"""Function decorator which wraps the function with run_in_background""" | |||
def g(*args, **kwargs): | |||
@@ -6,6 +6,7 @@ from twisted.internet.defer import maybeDeferred, succeed | |||
from synapse.events import FrozenEvent | |||
from synapse.types import Requester, UserID | |||
from synapse.util import Clock | |||
from synapse.util.logcontext import LoggingContext | |||
from tests import unittest | |||
from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver | |||
@@ -117,9 +118,10 @@ class MessageAcceptTests(unittest.TestCase): | |||
} | |||
) | |||
d = self.handler.on_receive_pdu( | |||
"test.serv", lying_event, sent_to_us_directly=True | |||
) | |||
with LoggingContext(request="lying_event"): | |||
d = self.handler.on_receive_pdu( | |||
"test.serv", lying_event, sent_to_us_directly=True | |||
) | |||
# Step the reactor, so the database fetches come back | |||
self.reactor.advance(1) | |||
@@ -209,11 +211,12 @@ class MessageAcceptTests(unittest.TestCase): | |||
} | |||
) | |||
d = self.handler.on_receive_pdu( | |||
"test.serv", good_event, sent_to_us_directly=True | |||
) | |||
self.reactor.advance(1) | |||
self.assertEqual(self.successResultOf(d), None) | |||
with LoggingContext(request="good_event"): | |||
d = self.handler.on_receive_pdu( | |||
"test.serv", good_event, sent_to_us_directly=True | |||
) | |||
self.reactor.advance(1) | |||
self.assertEqual(self.successResultOf(d), None) | |||
bad_event = FrozenEvent( | |||
{ | |||
@@ -230,10 +233,11 @@ class MessageAcceptTests(unittest.TestCase): | |||
} | |||
) | |||
d = self.handler.on_receive_pdu( | |||
"test.serv", bad_event, sent_to_us_directly=True | |||
) | |||
self.reactor.advance(1) | |||
with LoggingContext(request="bad_event"): | |||
d = self.handler.on_receive_pdu( | |||
"test.serv", bad_event, sent_to_us_directly=True | |||
) | |||
self.reactor.advance(1) | |||
extrem = maybeDeferred( | |||
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id | |||
@@ -159,6 +159,11 @@ class LoggingContextTestCase(unittest.TestCase): | |||
self.assertEqual(r, "bum") | |||
self._check_test_key("one") | |||
def test_nested_logging_context(self): | |||
with LoggingContext(request="foo"): | |||
nested_context = logcontext.nested_logging_context(suffix="bar") | |||
self.assertEqual(nested_context.request, "foo-bar") | |||
# a function which returns a deferred which has been "called", but | |||
# which had a function which returned another incomplete deferred on | |||