dramatiq.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import json
  2. import sentry_sdk
  3. from sentry_sdk.consts import OP, SPANSTATUS
  4. from sentry_sdk.api import continue_trace, get_baggage, get_traceparent
  5. from sentry_sdk.integrations import Integration, DidNotEnable
  6. from sentry_sdk.integrations._wsgi_common import request_body_within_bounds
  7. from sentry_sdk.tracing import (
  8. BAGGAGE_HEADER_NAME,
  9. SENTRY_TRACE_HEADER_NAME,
  10. TransactionSource,
  11. )
  12. from sentry_sdk.utils import (
  13. AnnotatedValue,
  14. capture_internal_exceptions,
  15. event_from_exception,
  16. )
  17. from typing import TypeVar
  18. R = TypeVar("R")
  19. try:
  20. from dramatiq.broker import Broker
  21. from dramatiq.middleware import Middleware, default_middleware
  22. from dramatiq.errors import Retry
  23. from dramatiq.message import Message
  24. except ImportError:
  25. raise DidNotEnable("Dramatiq is not installed")
  26. from typing import TYPE_CHECKING
  27. if TYPE_CHECKING:
  28. from typing import Any, Callable, Dict, Optional, Union
  29. from sentry_sdk._types import Event, Hint
  30. class DramatiqIntegration(Integration):
  31. """
  32. Dramatiq integration for Sentry
  33. Please make sure that you call `sentry_sdk.init` *before* initializing
  34. your broker, as it monkey patches `Broker.__init__`.
  35. This integration was originally developed and maintained
  36. by https://github.com/jacobsvante and later donated to the Sentry
  37. project.
  38. """
  39. identifier = "dramatiq"
  40. origin = f"auto.queue.{identifier}"
  41. @staticmethod
  42. def setup_once():
  43. # type: () -> None
  44. _patch_dramatiq_broker()
  45. def _patch_dramatiq_broker():
  46. # type: () -> None
  47. original_broker__init__ = Broker.__init__
  48. def sentry_patched_broker__init__(self, *args, **kw):
  49. # type: (Broker, *Any, **Any) -> None
  50. integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
  51. try:
  52. middleware = kw.pop("middleware")
  53. except KeyError:
  54. # Unfortunately Broker and StubBroker allows middleware to be
  55. # passed in as positional arguments, whilst RabbitmqBroker and
  56. # RedisBroker does not.
  57. if len(args) == 1:
  58. middleware = args[0]
  59. args = [] # type: ignore
  60. else:
  61. middleware = None
  62. if middleware is None:
  63. middleware = list(m() for m in default_middleware)
  64. else:
  65. middleware = list(middleware)
  66. if integration is not None:
  67. middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)]
  68. middleware.insert(0, SentryMiddleware())
  69. kw["middleware"] = middleware
  70. original_broker__init__(self, *args, **kw)
  71. Broker.__init__ = sentry_patched_broker__init__
  72. class SentryMiddleware(Middleware): # type: ignore[misc]
  73. """
  74. A Dramatiq middleware that automatically captures and sends
  75. exceptions to Sentry.
  76. This is automatically added to every instantiated broker via the
  77. DramatiqIntegration.
  78. """
  79. SENTRY_HEADERS_NAME = "_sentry_headers"
  80. def before_enqueue(self, broker, message, delay):
  81. # type: (Broker, Message[R], int) -> None
  82. integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
  83. if integration is None:
  84. return
  85. message.options[self.SENTRY_HEADERS_NAME] = {
  86. BAGGAGE_HEADER_NAME: get_baggage(),
  87. SENTRY_TRACE_HEADER_NAME: get_traceparent(),
  88. }
  89. def before_process_message(self, broker, message):
  90. # type: (Broker, Message[R]) -> None
  91. integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
  92. if integration is None:
  93. return
  94. message._scope_manager = sentry_sdk.isolation_scope()
  95. scope = message._scope_manager.__enter__()
  96. scope.clear_breadcrumbs()
  97. scope.set_extra("dramatiq_message_id", message.message_id)
  98. scope.add_event_processor(_make_message_event_processor(message, integration))
  99. sentry_headers = message.options.get(self.SENTRY_HEADERS_NAME) or {}
  100. if "retries" in message.options:
  101. # start new trace in case of retrying
  102. sentry_headers = {}
  103. transaction = continue_trace(
  104. sentry_headers,
  105. name=message.actor_name,
  106. op=OP.QUEUE_TASK_DRAMATIQ,
  107. source=TransactionSource.TASK,
  108. origin=DramatiqIntegration.origin,
  109. )
  110. transaction.set_status(SPANSTATUS.OK)
  111. sentry_sdk.start_transaction(
  112. transaction,
  113. name=message.actor_name,
  114. op=OP.QUEUE_TASK_DRAMATIQ,
  115. source=TransactionSource.TASK,
  116. )
  117. transaction.__enter__()
  118. def after_process_message(self, broker, message, *, result=None, exception=None):
  119. # type: (Broker, Message[R], Optional[Any], Optional[Exception]) -> None
  120. integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
  121. if integration is None:
  122. return
  123. actor = broker.get_actor(message.actor_name)
  124. throws = message.options.get("throws") or actor.options.get("throws")
  125. scope_manager = message._scope_manager
  126. transaction = sentry_sdk.get_current_scope().transaction
  127. if not transaction:
  128. return None
  129. is_event_capture_required = (
  130. exception is not None
  131. and not (throws and isinstance(exception, throws))
  132. and not isinstance(exception, Retry)
  133. )
  134. if not is_event_capture_required:
  135. # normal transaction finish
  136. transaction.__exit__(None, None, None)
  137. scope_manager.__exit__(None, None, None)
  138. return
  139. event, hint = event_from_exception(
  140. exception, # type: ignore[arg-type]
  141. client_options=sentry_sdk.get_client().options,
  142. mechanism={
  143. "type": DramatiqIntegration.identifier,
  144. "handled": False,
  145. },
  146. )
  147. sentry_sdk.capture_event(event, hint=hint)
  148. # transaction error
  149. transaction.__exit__(type(exception), exception, None)
  150. scope_manager.__exit__(type(exception), exception, None)
  151. def _make_message_event_processor(message, integration):
  152. # type: (Message[R], DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]]
  153. def inner(event, hint):
  154. # type: (Event, Hint) -> Optional[Event]
  155. with capture_internal_exceptions():
  156. DramatiqMessageExtractor(message).extract_into_event(event)
  157. return event
  158. return inner
  159. class DramatiqMessageExtractor:
  160. def __init__(self, message):
  161. # type: (Message[R]) -> None
  162. self.message_data = dict(message.asdict())
  163. def content_length(self):
  164. # type: () -> int
  165. return len(json.dumps(self.message_data))
  166. def extract_into_event(self, event):
  167. # type: (Event) -> None
  168. client = sentry_sdk.get_client()
  169. if not client.is_active():
  170. return
  171. contexts = event.setdefault("contexts", {})
  172. request_info = contexts.setdefault("dramatiq", {})
  173. request_info["type"] = "dramatiq"
  174. data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]]
  175. if not request_body_within_bounds(client, self.content_length()):
  176. data = AnnotatedValue.removed_because_over_size_limit()
  177. else:
  178. data = self.message_data
  179. request_info["data"] = data