dramatiq.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. import json
  2. import sentry_sdk
  3. from sentry_sdk.consts import OP, SPANSTATUS
  4. from sentry_sdk.api import continue_trace, get_baggage, get_traceparent
  5. from sentry_sdk.integrations import Integration, DidNotEnable
  6. from sentry_sdk.integrations._wsgi_common import request_body_within_bounds
  7. from sentry_sdk.tracing import (
  8. BAGGAGE_HEADER_NAME,
  9. SENTRY_TRACE_HEADER_NAME,
  10. TransactionSource,
  11. )
  12. from sentry_sdk.utils import (
  13. AnnotatedValue,
  14. capture_internal_exceptions,
  15. event_from_exception,
  16. )
  17. from typing import TypeVar
  18. R = TypeVar("R")
  19. try:
  20. from dramatiq.broker import Broker
  21. from dramatiq.middleware import Middleware, default_middleware
  22. from dramatiq.errors import Retry
  23. from dramatiq.message import Message
  24. except ImportError:
  25. raise DidNotEnable("Dramatiq is not installed")
  26. from typing import TYPE_CHECKING
  27. if TYPE_CHECKING:
  28. from typing import Any, Callable, Dict, Optional, Union
  29. from sentry_sdk._types import Event, Hint
  30. class DramatiqIntegration(Integration):
  31. """
  32. Dramatiq integration for Sentry
  33. Please make sure that you call `sentry_sdk.init` *before* initializing
  34. your broker, as it monkey patches `Broker.__init__`.
  35. This integration was originally developed and maintained
  36. by https://github.com/jacobsvante and later donated to the Sentry
  37. project.
  38. """
  39. identifier = "dramatiq"
  40. origin = f"auto.queue.{identifier}"
  41. @staticmethod
  42. def setup_once() -> None:
  43. _patch_dramatiq_broker()
  44. def _patch_dramatiq_broker() -> None:
  45. original_broker__init__ = Broker.__init__
  46. def sentry_patched_broker__init__(
  47. self: "Broker", *args: "Any", **kw: "Any"
  48. ) -> None:
  49. integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
  50. try:
  51. middleware = kw.pop("middleware")
  52. except KeyError:
  53. # Unfortunately Broker and StubBroker allows middleware to be
  54. # passed in as positional arguments, whilst RabbitmqBroker and
  55. # RedisBroker does not.
  56. if len(args) == 1:
  57. middleware = args[0]
  58. args = [] # type: ignore
  59. else:
  60. middleware = None
  61. if middleware is None:
  62. middleware = list(m() for m in default_middleware)
  63. else:
  64. middleware = list(middleware)
  65. if integration is not None:
  66. middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)]
  67. middleware.insert(0, SentryMiddleware())
  68. kw["middleware"] = middleware
  69. original_broker__init__(self, *args, **kw)
  70. Broker.__init__ = sentry_patched_broker__init__
  71. class SentryMiddleware(Middleware): # type: ignore[misc]
  72. """
  73. A Dramatiq middleware that automatically captures and sends
  74. exceptions to Sentry.
  75. This is automatically added to every instantiated broker via the
  76. DramatiqIntegration.
  77. """
  78. SENTRY_HEADERS_NAME = "_sentry_headers"
  79. def before_enqueue(
  80. self, broker: "Broker", message: "Message[R]", delay: int
  81. ) -> None:
  82. integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
  83. if integration is None:
  84. return
  85. message.options[self.SENTRY_HEADERS_NAME] = {
  86. BAGGAGE_HEADER_NAME: get_baggage(),
  87. SENTRY_TRACE_HEADER_NAME: get_traceparent(),
  88. }
  89. def before_process_message(self, broker: "Broker", message: "Message[R]") -> None:
  90. integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
  91. if integration is None:
  92. return
  93. message._scope_manager = sentry_sdk.isolation_scope()
  94. scope = message._scope_manager.__enter__()
  95. scope.clear_breadcrumbs()
  96. scope.set_extra("dramatiq_message_id", message.message_id)
  97. scope.add_event_processor(_make_message_event_processor(message, integration))
  98. sentry_headers = message.options.get(self.SENTRY_HEADERS_NAME) or {}
  99. if "retries" in message.options:
  100. # start new trace in case of retrying
  101. sentry_headers = {}
  102. transaction = continue_trace(
  103. sentry_headers,
  104. name=message.actor_name,
  105. op=OP.QUEUE_TASK_DRAMATIQ,
  106. source=TransactionSource.TASK,
  107. origin=DramatiqIntegration.origin,
  108. )
  109. transaction.set_status(SPANSTATUS.OK)
  110. sentry_sdk.start_transaction(
  111. transaction,
  112. name=message.actor_name,
  113. op=OP.QUEUE_TASK_DRAMATIQ,
  114. source=TransactionSource.TASK,
  115. )
  116. transaction.__enter__()
  117. def after_process_message(
  118. self,
  119. broker: "Broker",
  120. message: "Message[R]",
  121. *,
  122. result: "Optional[Any]" = None,
  123. exception: "Optional[Exception]" = None,
  124. ) -> None:
  125. integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
  126. if integration is None:
  127. return
  128. actor = broker.get_actor(message.actor_name)
  129. throws = message.options.get("throws") or actor.options.get("throws")
  130. scope_manager = message._scope_manager
  131. transaction = sentry_sdk.get_current_scope().transaction
  132. if not transaction:
  133. return None
  134. is_event_capture_required = (
  135. exception is not None
  136. and not (throws and isinstance(exception, throws))
  137. and not isinstance(exception, Retry)
  138. )
  139. if not is_event_capture_required:
  140. # normal transaction finish
  141. transaction.__exit__(None, None, None)
  142. scope_manager.__exit__(None, None, None)
  143. return
  144. event, hint = event_from_exception(
  145. exception, # type: ignore[arg-type]
  146. client_options=sentry_sdk.get_client().options,
  147. mechanism={
  148. "type": DramatiqIntegration.identifier,
  149. "handled": False,
  150. },
  151. )
  152. sentry_sdk.capture_event(event, hint=hint)
  153. # transaction error
  154. transaction.__exit__(type(exception), exception, None)
  155. scope_manager.__exit__(type(exception), exception, None)
  156. after_skip_message = after_process_message
  157. def _make_message_event_processor(
  158. message: "Message[R]", integration: "DramatiqIntegration"
  159. ) -> "Callable[[Event, Hint], Optional[Event]]":
  160. def inner(event: "Event", hint: "Hint") -> "Optional[Event]":
  161. with capture_internal_exceptions():
  162. DramatiqMessageExtractor(message).extract_into_event(event)
  163. return event
  164. return inner
  165. class DramatiqMessageExtractor:
  166. def __init__(self, message: "Message[R]") -> None:
  167. self.message_data = dict(message.asdict())
  168. def content_length(self) -> int:
  169. return len(json.dumps(self.message_data))
  170. def extract_into_event(self, event: "Event") -> None:
  171. client = sentry_sdk.get_client()
  172. if not client.is_active():
  173. return
  174. contexts = event.setdefault("contexts", {})
  175. request_info = contexts.setdefault("dramatiq", {})
  176. request_info["type"] = "dramatiq"
  177. data: "Optional[Union[AnnotatedValue, Dict[str, Any]]]" = None
  178. if not request_body_within_bounds(client, self.content_length()):
  179. data = AnnotatedValue.removed_because_over_size_limit()
  180. else:
  181. data = self.message_data
  182. request_info["data"] = data