| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- import json
- import sentry_sdk
- from sentry_sdk.consts import OP, SPANSTATUS
- from sentry_sdk.api import continue_trace, get_baggage, get_traceparent
- from sentry_sdk.integrations import Integration, DidNotEnable
- from sentry_sdk.integrations._wsgi_common import request_body_within_bounds
- from sentry_sdk.tracing import (
- BAGGAGE_HEADER_NAME,
- SENTRY_TRACE_HEADER_NAME,
- TransactionSource,
- )
- from sentry_sdk.utils import (
- AnnotatedValue,
- capture_internal_exceptions,
- event_from_exception,
- )
- from typing import TypeVar
- R = TypeVar("R")
- try:
- from dramatiq.broker import Broker
- from dramatiq.middleware import Middleware, default_middleware
- from dramatiq.errors import Retry
- from dramatiq.message import Message
- except ImportError:
- raise DidNotEnable("Dramatiq is not installed")
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from typing import Any, Callable, Dict, Optional, Union
- from sentry_sdk._types import Event, Hint
- class DramatiqIntegration(Integration):
- """
- Dramatiq integration for Sentry
- Please make sure that you call `sentry_sdk.init` *before* initializing
- your broker, as it monkey patches `Broker.__init__`.
- This integration was originally developed and maintained
- by https://github.com/jacobsvante and later donated to the Sentry
- project.
- """
- identifier = "dramatiq"
- origin = f"auto.queue.{identifier}"
- @staticmethod
- def setup_once():
- # type: () -> None
- _patch_dramatiq_broker()
- def _patch_dramatiq_broker():
- # type: () -> None
- original_broker__init__ = Broker.__init__
- def sentry_patched_broker__init__(self, *args, **kw):
- # type: (Broker, *Any, **Any) -> None
- integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
- try:
- middleware = kw.pop("middleware")
- except KeyError:
- # Unfortunately Broker and StubBroker allows middleware to be
- # passed in as positional arguments, whilst RabbitmqBroker and
- # RedisBroker does not.
- if len(args) == 1:
- middleware = args[0]
- args = [] # type: ignore
- else:
- middleware = None
- if middleware is None:
- middleware = list(m() for m in default_middleware)
- else:
- middleware = list(middleware)
- if integration is not None:
- middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)]
- middleware.insert(0, SentryMiddleware())
- kw["middleware"] = middleware
- original_broker__init__(self, *args, **kw)
- Broker.__init__ = sentry_patched_broker__init__
- class SentryMiddleware(Middleware): # type: ignore[misc]
- """
- A Dramatiq middleware that automatically captures and sends
- exceptions to Sentry.
- This is automatically added to every instantiated broker via the
- DramatiqIntegration.
- """
- SENTRY_HEADERS_NAME = "_sentry_headers"
- def before_enqueue(self, broker, message, delay):
- # type: (Broker, Message[R], int) -> None
- integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
- if integration is None:
- return
- message.options[self.SENTRY_HEADERS_NAME] = {
- BAGGAGE_HEADER_NAME: get_baggage(),
- SENTRY_TRACE_HEADER_NAME: get_traceparent(),
- }
- def before_process_message(self, broker, message):
- # type: (Broker, Message[R]) -> None
- integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
- if integration is None:
- return
- message._scope_manager = sentry_sdk.isolation_scope()
- scope = message._scope_manager.__enter__()
- scope.clear_breadcrumbs()
- scope.set_extra("dramatiq_message_id", message.message_id)
- scope.add_event_processor(_make_message_event_processor(message, integration))
- sentry_headers = message.options.get(self.SENTRY_HEADERS_NAME) or {}
- if "retries" in message.options:
- # start new trace in case of retrying
- sentry_headers = {}
- transaction = continue_trace(
- sentry_headers,
- name=message.actor_name,
- op=OP.QUEUE_TASK_DRAMATIQ,
- source=TransactionSource.TASK,
- origin=DramatiqIntegration.origin,
- )
- transaction.set_status(SPANSTATUS.OK)
- sentry_sdk.start_transaction(
- transaction,
- name=message.actor_name,
- op=OP.QUEUE_TASK_DRAMATIQ,
- source=TransactionSource.TASK,
- )
- transaction.__enter__()
- def after_process_message(self, broker, message, *, result=None, exception=None):
- # type: (Broker, Message[R], Optional[Any], Optional[Exception]) -> None
- integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
- if integration is None:
- return
- actor = broker.get_actor(message.actor_name)
- throws = message.options.get("throws") or actor.options.get("throws")
- scope_manager = message._scope_manager
- transaction = sentry_sdk.get_current_scope().transaction
- if not transaction:
- return None
- is_event_capture_required = (
- exception is not None
- and not (throws and isinstance(exception, throws))
- and not isinstance(exception, Retry)
- )
- if not is_event_capture_required:
- # normal transaction finish
- transaction.__exit__(None, None, None)
- scope_manager.__exit__(None, None, None)
- return
- event, hint = event_from_exception(
- exception, # type: ignore[arg-type]
- client_options=sentry_sdk.get_client().options,
- mechanism={
- "type": DramatiqIntegration.identifier,
- "handled": False,
- },
- )
- sentry_sdk.capture_event(event, hint=hint)
- # transaction error
- transaction.__exit__(type(exception), exception, None)
- scope_manager.__exit__(type(exception), exception, None)
- def _make_message_event_processor(message, integration):
- # type: (Message[R], DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]]
- def inner(event, hint):
- # type: (Event, Hint) -> Optional[Event]
- with capture_internal_exceptions():
- DramatiqMessageExtractor(message).extract_into_event(event)
- return event
- return inner
- class DramatiqMessageExtractor:
- def __init__(self, message):
- # type: (Message[R]) -> None
- self.message_data = dict(message.asdict())
- def content_length(self):
- # type: () -> int
- return len(json.dumps(self.message_data))
- def extract_into_event(self, event):
- # type: (Event) -> None
- client = sentry_sdk.get_client()
- if not client.is_active():
- return
- contexts = event.setdefault("contexts", {})
- request_info = contexts.setdefault("dramatiq", {})
- request_info["type"] = "dramatiq"
- data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]]
- if not request_body_within_bounds(client, self.content_length()):
- data = AnnotatedValue.removed_because_over_size_limit()
- else:
- data = self.message_data
- request_info["data"] = data
|