您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 
 

173 行
5.8 KiB

  1. # -*- coding: utf-8 -*-
  2. # Copyright 2019 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from typing import TYPE_CHECKING, List, Tuple
  17. from canonicaljson import json
  18. from synapse.api.errors import HttpResponseException
  19. from synapse.events import EventBase
  20. from synapse.federation.persistence import TransactionActions
  21. from synapse.federation.units import Edu, Transaction
  22. from synapse.logging.opentracing import (
  23. extract_text_map,
  24. set_tag,
  25. start_active_span_follows_from,
  26. tags,
  27. whitelisted_homeserver,
  28. )
  29. from synapse.util.metrics import measure_func
  30. if TYPE_CHECKING:
  31. import synapse.server
  32. logger = logging.getLogger(__name__)
  33. class TransactionManager(object):
  34. """Helper class which handles building and sending transactions
  35. shared between PerDestinationQueue objects
  36. """
  37. def __init__(self, hs: "synapse.server.HomeServer"):
  38. self._server_name = hs.hostname
  39. self.clock = hs.get_clock() # nb must be called this for @measure_func
  40. self._store = hs.get_datastore()
  41. self._transaction_actions = TransactionActions(self._store)
  42. self._transport_layer = hs.get_federation_transport_client()
  43. # HACK to get unique tx id
  44. self._next_txn_id = int(self.clock.time_msec())
  45. @measure_func("_send_new_transaction")
  46. async def send_new_transaction(
  47. self,
  48. destination: str,
  49. pending_pdus: List[Tuple[EventBase, int]],
  50. pending_edus: List[Edu],
  51. ):
  52. # Make a transaction-sending opentracing span. This span follows on from
  53. # all the edus in that transaction. This needs to be done since there is
  54. # no active span here, so if the edus were not received by the remote the
  55. # span would have no causality and it would be forgotten.
  56. span_contexts = []
  57. keep_destination = whitelisted_homeserver(destination)
  58. for edu in pending_edus:
  59. context = edu.get_context()
  60. if context:
  61. span_contexts.append(extract_text_map(json.loads(context)))
  62. if keep_destination:
  63. edu.strip_context()
  64. with start_active_span_follows_from("send_transaction", span_contexts):
  65. # Sort based on the order field
  66. pending_pdus.sort(key=lambda t: t[1])
  67. pdus = [x[0] for x in pending_pdus]
  68. edus = pending_edus
  69. success = True
  70. logger.debug("TX [%s] _attempt_new_transaction", destination)
  71. txn_id = str(self._next_txn_id)
  72. logger.debug(
  73. "TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)",
  74. destination,
  75. txn_id,
  76. len(pdus),
  77. len(edus),
  78. )
  79. transaction = Transaction.create_new(
  80. origin_server_ts=int(self.clock.time_msec()),
  81. transaction_id=txn_id,
  82. origin=self._server_name,
  83. destination=destination,
  84. pdus=pdus,
  85. edus=edus,
  86. )
  87. self._next_txn_id += 1
  88. logger.info(
  89. "TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)",
  90. destination,
  91. txn_id,
  92. transaction.transaction_id,
  93. len(pdus),
  94. len(edus),
  95. )
  96. # Actually send the transaction
  97. # FIXME (erikj): This is a bit of a hack to make the Pdu age
  98. # keys work
  99. def json_data_cb():
  100. data = transaction.get_dict()
  101. now = int(self.clock.time_msec())
  102. if "pdus" in data:
  103. for p in data["pdus"]:
  104. if "age_ts" in p:
  105. unsigned = p.setdefault("unsigned", {})
  106. unsigned["age"] = now - int(p["age_ts"])
  107. del p["age_ts"]
  108. return data
  109. try:
  110. response = await self._transport_layer.send_transaction(
  111. transaction, json_data_cb
  112. )
  113. code = 200
  114. except HttpResponseException as e:
  115. code = e.code
  116. response = e.response
  117. if e.code in (401, 404, 429) or 500 <= e.code:
  118. logger.info(
  119. "TX [%s] {%s} got %d response", destination, txn_id, code
  120. )
  121. raise e
  122. logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
  123. if code == 200:
  124. for e_id, r in response.get("pdus", {}).items():
  125. if "error" in r:
  126. logger.warning(
  127. "TX [%s] {%s} Remote returned error for %s: %s",
  128. destination,
  129. txn_id,
  130. e_id,
  131. r,
  132. )
  133. else:
  134. for p in pdus:
  135. logger.warning(
  136. "TX [%s] {%s} Failed to send event %s",
  137. destination,
  138. txn_id,
  139. p.event_id,
  140. )
  141. success = False
  142. set_tag(tags.ERROR, not success)
  143. return success