| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- import sentry_sdk
- from sentry_sdk.crons import capture_checkin, MonitorStatus
- from sentry_sdk.integrations import DidNotEnable
- from sentry_sdk.integrations.celery.utils import (
- _get_humanized_interval,
- _now_seconds_since_epoch,
- )
- from sentry_sdk.utils import (
- logger,
- match_regex_list,
- )
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from collections.abc import Callable
- from typing import Any, Optional, TypeVar, Union
- from sentry_sdk._types import (
- MonitorConfig,
- MonitorConfigScheduleType,
- MonitorConfigScheduleUnit,
- )
- F = TypeVar("F", bound=Callable[..., Any])
- try:
- from celery import Task, Celery # type: ignore
- from celery.beat import Scheduler # type: ignore
- from celery.schedules import crontab, schedule # type: ignore
- from celery.signals import ( # type: ignore
- task_failure,
- task_success,
- task_retry,
- )
- except ImportError:
- raise DidNotEnable("Celery not installed")
- try:
- from redbeat.schedulers import RedBeatScheduler # type: ignore
- except ImportError:
- RedBeatScheduler = None
- def _get_headers(task):
- # type: (Task) -> dict[str, Any]
- headers = task.request.get("headers") or {}
- # flatten nested headers
- if "headers" in headers:
- headers.update(headers["headers"])
- del headers["headers"]
- headers.update(task.request.get("properties") or {})
- return headers
- def _get_monitor_config(celery_schedule, app, monitor_name):
- # type: (Any, Celery, str) -> MonitorConfig
- monitor_config = {} # type: MonitorConfig
- schedule_type = None # type: Optional[MonitorConfigScheduleType]
- schedule_value = None # type: Optional[Union[str, int]]
- schedule_unit = None # type: Optional[MonitorConfigScheduleUnit]
- if isinstance(celery_schedule, crontab):
- schedule_type = "crontab"
- schedule_value = (
- "{0._orig_minute} "
- "{0._orig_hour} "
- "{0._orig_day_of_month} "
- "{0._orig_month_of_year} "
- "{0._orig_day_of_week}".format(celery_schedule)
- )
- elif isinstance(celery_schedule, schedule):
- schedule_type = "interval"
- (schedule_value, schedule_unit) = _get_humanized_interval(
- celery_schedule.seconds
- )
- if schedule_unit == "second":
- logger.warning(
- "Intervals shorter than one minute are not supported by Sentry Crons. Monitor '%s' has an interval of %s seconds. Use the `exclude_beat_tasks` option in the celery integration to exclude it.",
- monitor_name,
- schedule_value,
- )
- return {}
- else:
- logger.warning(
- "Celery schedule type '%s' not supported by Sentry Crons.",
- type(celery_schedule),
- )
- return {}
- monitor_config["schedule"] = {}
- monitor_config["schedule"]["type"] = schedule_type
- monitor_config["schedule"]["value"] = schedule_value
- if schedule_unit is not None:
- monitor_config["schedule"]["unit"] = schedule_unit
- monitor_config["timezone"] = (
- (
- hasattr(celery_schedule, "tz")
- and celery_schedule.tz is not None
- and str(celery_schedule.tz)
- )
- or app.timezone
- or "UTC"
- )
- return monitor_config
- def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
- # type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None
- """
- Add Sentry Crons information to the schedule_entry headers.
- """
- if not integration.monitor_beat_tasks:
- return
- monitor_name = schedule_entry.name
- task_should_be_excluded = match_regex_list(
- monitor_name, integration.exclude_beat_tasks
- )
- if task_should_be_excluded:
- return
- celery_schedule = schedule_entry.schedule
- app = scheduler.app
- monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
- is_supported_schedule = bool(monitor_config)
- if not is_supported_schedule:
- return
- headers = schedule_entry.options.pop("headers", {})
- headers.update(
- {
- "sentry-monitor-slug": monitor_name,
- "sentry-monitor-config": monitor_config,
- }
- )
- check_in_id = capture_checkin(
- monitor_slug=monitor_name,
- monitor_config=monitor_config,
- status=MonitorStatus.IN_PROGRESS,
- )
- headers.update({"sentry-monitor-check-in-id": check_in_id})
- # Set the Sentry configuration in the options of the ScheduleEntry.
- # Those will be picked up in `apply_async` and added to the headers.
- schedule_entry.options["headers"] = headers
- def _wrap_beat_scheduler(original_function):
- # type: (Callable[..., Any]) -> Callable[..., Any]
- """
- Makes sure that:
- - a new Sentry trace is started for each task started by Celery Beat and
- it is propagated to the task.
- - the Sentry Crons information is set in the Celery Beat task's
- headers so that is is monitored with Sentry Crons.
- After the patched function is called,
- Celery Beat will call apply_async to put the task in the queue.
- """
- # Patch only once
- # Can't use __name__ here, because some of our tests mock original_apply_entry
- already_patched = "sentry_patched_scheduler" in str(original_function)
- if already_patched:
- return original_function
- from sentry_sdk.integrations.celery import CeleryIntegration
- def sentry_patched_scheduler(*args, **kwargs):
- # type: (*Any, **Any) -> None
- integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
- if integration is None:
- return original_function(*args, **kwargs)
- # Tasks started by Celery Beat start a new Trace
- scope = sentry_sdk.get_isolation_scope()
- scope.set_new_propagation_context()
- scope._name = "celery-beat"
- scheduler, schedule_entry = args
- _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)
- return original_function(*args, **kwargs)
- return sentry_patched_scheduler
- def _patch_beat_apply_entry():
- # type: () -> None
- Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)
- def _patch_redbeat_apply_async():
- # type: () -> None
- if RedBeatScheduler is None:
- return
- RedBeatScheduler.apply_async = _wrap_beat_scheduler(RedBeatScheduler.apply_async)
- def _setup_celery_beat_signals(monitor_beat_tasks):
- # type: (bool) -> None
- if monitor_beat_tasks:
- task_success.connect(crons_task_success)
- task_failure.connect(crons_task_failure)
- task_retry.connect(crons_task_retry)
- def crons_task_success(sender, **kwargs):
- # type: (Task, dict[Any, Any]) -> None
- logger.debug("celery_task_success %s", sender)
- headers = _get_headers(sender)
- if "sentry-monitor-slug" not in headers:
- return
- monitor_config = headers.get("sentry-monitor-config", {})
- start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
- capture_checkin(
- monitor_slug=headers["sentry-monitor-slug"],
- monitor_config=monitor_config,
- check_in_id=headers["sentry-monitor-check-in-id"],
- duration=(
- _now_seconds_since_epoch() - float(start_timestamp_s)
- if start_timestamp_s
- else None
- ),
- status=MonitorStatus.OK,
- )
- def crons_task_failure(sender, **kwargs):
- # type: (Task, dict[Any, Any]) -> None
- logger.debug("celery_task_failure %s", sender)
- headers = _get_headers(sender)
- if "sentry-monitor-slug" not in headers:
- return
- monitor_config = headers.get("sentry-monitor-config", {})
- start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
- capture_checkin(
- monitor_slug=headers["sentry-monitor-slug"],
- monitor_config=monitor_config,
- check_in_id=headers["sentry-monitor-check-in-id"],
- duration=(
- _now_seconds_since_epoch() - float(start_timestamp_s)
- if start_timestamp_s
- else None
- ),
- status=MonitorStatus.ERROR,
- )
- def crons_task_retry(sender, **kwargs):
- # type: (Task, dict[Any, Any]) -> None
- logger.debug("celery_task_retry %s", sender)
- headers = _get_headers(sender)
- if "sentry-monitor-slug" not in headers:
- return
- monitor_config = headers.get("sentry-monitor-config", {})
- start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
- capture_checkin(
- monitor_slug=headers["sentry-monitor-slug"],
- monitor_config=monitor_config,
- check_in_id=headers["sentry-monitor-check-in-id"],
- duration=(
- _now_seconds_since_epoch() - float(start_timestamp_s)
- if start_timestamp_s
- else None
- ),
- status=MonitorStatus.ERROR,
- )
|