|
|
@@ -40,6 +40,7 @@ from twisted.enterprise import adbapi |
|
|
|
|
|
|
|
from synapse.api.errors import StoreError |
|
|
|
from synapse.config.database import DatabaseConnectionConfig |
|
|
|
from synapse.logging import opentracing |
|
|
|
from synapse.logging.context import ( |
|
|
|
LoggingContext, |
|
|
|
current_context, |
|
|
@@ -313,7 +314,14 @@ class LoggingTransaction: |
|
|
|
start = time.time() |
|
|
|
|
|
|
|
try: |
|
|
|
return func(sql, *args) |
|
|
|
with opentracing.start_active_span( |
|
|
|
"db.query", |
|
|
|
tags={ |
|
|
|
opentracing.tags.DATABASE_TYPE: "sql", |
|
|
|
opentracing.tags.DATABASE_STATEMENT: sql, |
|
|
|
}, |
|
|
|
): |
|
|
|
return func(sql, *args) |
|
|
|
except Exception as e: |
|
|
|
sql_logger.debug("[SQL FAIL] {%s} %s", self.name, e) |
|
|
|
raise |
|
|
@@ -525,9 +533,16 @@ class DatabasePool: |
|
|
|
exception_callbacks=exception_callbacks, |
|
|
|
) |
|
|
|
try: |
|
|
|
r = func(cursor, *args, **kwargs) |
|
|
|
conn.commit() |
|
|
|
return r |
|
|
|
with opentracing.start_active_span( |
|
|
|
"db.txn", |
|
|
|
tags={ |
|
|
|
opentracing.SynapseTags.DB_TXN_DESC: desc, |
|
|
|
opentracing.SynapseTags.DB_TXN_ID: name, |
|
|
|
}, |
|
|
|
): |
|
|
|
r = func(cursor, *args, **kwargs) |
|
|
|
conn.commit() |
|
|
|
return r |
|
|
|
except self.engine.module.OperationalError as e: |
|
|
|
# This can happen if the database disappears mid |
|
|
|
# transaction. |
|
|
@@ -653,16 +668,17 @@ class DatabasePool: |
|
|
|
logger.warning("Starting db txn '%s' from sentinel context", desc) |
|
|
|
|
|
|
|
try: |
|
|
|
result = await self.runWithConnection( |
|
|
|
self.new_transaction, |
|
|
|
desc, |
|
|
|
after_callbacks, |
|
|
|
exception_callbacks, |
|
|
|
func, |
|
|
|
*args, |
|
|
|
db_autocommit=db_autocommit, |
|
|
|
**kwargs, |
|
|
|
) |
|
|
|
with opentracing.start_active_span(f"db.{desc}"): |
|
|
|
result = await self.runWithConnection( |
|
|
|
self.new_transaction, |
|
|
|
desc, |
|
|
|
after_callbacks, |
|
|
|
exception_callbacks, |
|
|
|
func, |
|
|
|
*args, |
|
|
|
db_autocommit=db_autocommit, |
|
|
|
**kwargs, |
|
|
|
) |
|
|
|
|
|
|
|
for after_callback, after_args, after_kwargs in after_callbacks: |
|
|
|
after_callback(*after_args, **after_kwargs) |
|
|
@@ -718,25 +734,29 @@ class DatabasePool: |
|
|
|
with LoggingContext( |
|
|
|
str(curr_context), parent_context=parent_context |
|
|
|
) as context: |
|
|
|
sched_duration_sec = monotonic_time() - start_time |
|
|
|
sql_scheduling_timer.observe(sched_duration_sec) |
|
|
|
context.add_database_scheduled(sched_duration_sec) |
|
|
|
|
|
|
|
if self.engine.is_connection_closed(conn): |
|
|
|
logger.debug("Reconnecting closed database connection") |
|
|
|
conn.reconnect() |
|
|
|
|
|
|
|
try: |
|
|
|
if db_autocommit: |
|
|
|
self.engine.attempt_to_set_autocommit(conn, True) |
|
|
|
|
|
|
|
db_conn = LoggingDatabaseConnection( |
|
|
|
conn, self.engine, "runWithConnection" |
|
|
|
) |
|
|
|
return func(db_conn, *args, **kwargs) |
|
|
|
finally: |
|
|
|
if db_autocommit: |
|
|
|
self.engine.attempt_to_set_autocommit(conn, False) |
|
|
|
with opentracing.start_active_span( |
|
|
|
operation_name="db.connection", |
|
|
|
): |
|
|
|
sched_duration_sec = monotonic_time() - start_time |
|
|
|
sql_scheduling_timer.observe(sched_duration_sec) |
|
|
|
context.add_database_scheduled(sched_duration_sec) |
|
|
|
|
|
|
|
if self.engine.is_connection_closed(conn): |
|
|
|
logger.debug("Reconnecting closed database connection") |
|
|
|
conn.reconnect() |
|
|
|
opentracing.log_kv({"message": "reconnected"}) |
|
|
|
|
|
|
|
try: |
|
|
|
if db_autocommit: |
|
|
|
self.engine.attempt_to_set_autocommit(conn, True) |
|
|
|
|
|
|
|
db_conn = LoggingDatabaseConnection( |
|
|
|
conn, self.engine, "runWithConnection" |
|
|
|
) |
|
|
|
return func(db_conn, *args, **kwargs) |
|
|
|
finally: |
|
|
|
if db_autocommit: |
|
|
|
self.engine.attempt_to_set_autocommit(conn, False) |
|
|
|
|
|
|
|
return await make_deferred_yieldable( |
|
|
|
self._db_pool.runWithConnection(inner_func, *args, **kwargs) |
|
|
|