beat.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. import sentry_sdk
  2. from sentry_sdk.crons import capture_checkin, MonitorStatus
  3. from sentry_sdk.integrations import DidNotEnable
  4. from sentry_sdk.integrations.celery.utils import (
  5. _get_humanized_interval,
  6. _now_seconds_since_epoch,
  7. )
  8. from sentry_sdk.utils import (
  9. logger,
  10. match_regex_list,
  11. )
  12. from typing import TYPE_CHECKING
  13. if TYPE_CHECKING:
  14. from collections.abc import Callable
  15. from typing import Any, Optional, TypeVar, Union
  16. from sentry_sdk._types import (
  17. MonitorConfig,
  18. MonitorConfigScheduleType,
  19. MonitorConfigScheduleUnit,
  20. )
  21. F = TypeVar("F", bound=Callable[..., Any])
  22. try:
  23. from celery import Task, Celery # type: ignore
  24. from celery.beat import Scheduler # type: ignore
  25. from celery.schedules import crontab, schedule # type: ignore
  26. from celery.signals import ( # type: ignore
  27. task_failure,
  28. task_success,
  29. task_retry,
  30. )
  31. except ImportError:
  32. raise DidNotEnable("Celery not installed")
  33. try:
  34. from redbeat.schedulers import RedBeatScheduler # type: ignore
  35. except ImportError:
  36. RedBeatScheduler = None
  37. def _get_headers(task):
  38. # type: (Task) -> dict[str, Any]
  39. headers = task.request.get("headers") or {}
  40. # flatten nested headers
  41. if "headers" in headers:
  42. headers.update(headers["headers"])
  43. del headers["headers"]
  44. headers.update(task.request.get("properties") or {})
  45. return headers
  46. def _get_monitor_config(celery_schedule, app, monitor_name):
  47. # type: (Any, Celery, str) -> MonitorConfig
  48. monitor_config = {} # type: MonitorConfig
  49. schedule_type = None # type: Optional[MonitorConfigScheduleType]
  50. schedule_value = None # type: Optional[Union[str, int]]
  51. schedule_unit = None # type: Optional[MonitorConfigScheduleUnit]
  52. if isinstance(celery_schedule, crontab):
  53. schedule_type = "crontab"
  54. schedule_value = (
  55. "{0._orig_minute} "
  56. "{0._orig_hour} "
  57. "{0._orig_day_of_month} "
  58. "{0._orig_month_of_year} "
  59. "{0._orig_day_of_week}".format(celery_schedule)
  60. )
  61. elif isinstance(celery_schedule, schedule):
  62. schedule_type = "interval"
  63. (schedule_value, schedule_unit) = _get_humanized_interval(
  64. celery_schedule.seconds
  65. )
  66. if schedule_unit == "second":
  67. logger.warning(
  68. "Intervals shorter than one minute are not supported by Sentry Crons. Monitor '%s' has an interval of %s seconds. Use the `exclude_beat_tasks` option in the celery integration to exclude it.",
  69. monitor_name,
  70. schedule_value,
  71. )
  72. return {}
  73. else:
  74. logger.warning(
  75. "Celery schedule type '%s' not supported by Sentry Crons.",
  76. type(celery_schedule),
  77. )
  78. return {}
  79. monitor_config["schedule"] = {}
  80. monitor_config["schedule"]["type"] = schedule_type
  81. monitor_config["schedule"]["value"] = schedule_value
  82. if schedule_unit is not None:
  83. monitor_config["schedule"]["unit"] = schedule_unit
  84. monitor_config["timezone"] = (
  85. (
  86. hasattr(celery_schedule, "tz")
  87. and celery_schedule.tz is not None
  88. and str(celery_schedule.tz)
  89. )
  90. or app.timezone
  91. or "UTC"
  92. )
  93. return monitor_config
  94. def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
  95. # type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None
  96. """
  97. Add Sentry Crons information to the schedule_entry headers.
  98. """
  99. if not integration.monitor_beat_tasks:
  100. return
  101. monitor_name = schedule_entry.name
  102. task_should_be_excluded = match_regex_list(
  103. monitor_name, integration.exclude_beat_tasks
  104. )
  105. if task_should_be_excluded:
  106. return
  107. celery_schedule = schedule_entry.schedule
  108. app = scheduler.app
  109. monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
  110. is_supported_schedule = bool(monitor_config)
  111. if not is_supported_schedule:
  112. return
  113. headers = schedule_entry.options.pop("headers", {})
  114. headers.update(
  115. {
  116. "sentry-monitor-slug": monitor_name,
  117. "sentry-monitor-config": monitor_config,
  118. }
  119. )
  120. check_in_id = capture_checkin(
  121. monitor_slug=monitor_name,
  122. monitor_config=monitor_config,
  123. status=MonitorStatus.IN_PROGRESS,
  124. )
  125. headers.update({"sentry-monitor-check-in-id": check_in_id})
  126. # Set the Sentry configuration in the options of the ScheduleEntry.
  127. # Those will be picked up in `apply_async` and added to the headers.
  128. schedule_entry.options["headers"] = headers
  129. def _wrap_beat_scheduler(original_function):
  130. # type: (Callable[..., Any]) -> Callable[..., Any]
  131. """
  132. Makes sure that:
  133. - a new Sentry trace is started for each task started by Celery Beat and
  134. it is propagated to the task.
  135. - the Sentry Crons information is set in the Celery Beat task's
  136. headers so that is is monitored with Sentry Crons.
  137. After the patched function is called,
  138. Celery Beat will call apply_async to put the task in the queue.
  139. """
  140. # Patch only once
  141. # Can't use __name__ here, because some of our tests mock original_apply_entry
  142. already_patched = "sentry_patched_scheduler" in str(original_function)
  143. if already_patched:
  144. return original_function
  145. from sentry_sdk.integrations.celery import CeleryIntegration
  146. def sentry_patched_scheduler(*args, **kwargs):
  147. # type: (*Any, **Any) -> None
  148. integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
  149. if integration is None:
  150. return original_function(*args, **kwargs)
  151. # Tasks started by Celery Beat start a new Trace
  152. scope = sentry_sdk.get_isolation_scope()
  153. scope.set_new_propagation_context()
  154. scope._name = "celery-beat"
  155. scheduler, schedule_entry = args
  156. _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)
  157. return original_function(*args, **kwargs)
  158. return sentry_patched_scheduler
  159. def _patch_beat_apply_entry():
  160. # type: () -> None
  161. Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)
  162. def _patch_redbeat_apply_async():
  163. # type: () -> None
  164. if RedBeatScheduler is None:
  165. return
  166. RedBeatScheduler.apply_async = _wrap_beat_scheduler(RedBeatScheduler.apply_async)
  167. def _setup_celery_beat_signals(monitor_beat_tasks):
  168. # type: (bool) -> None
  169. if monitor_beat_tasks:
  170. task_success.connect(crons_task_success)
  171. task_failure.connect(crons_task_failure)
  172. task_retry.connect(crons_task_retry)
  173. def crons_task_success(sender, **kwargs):
  174. # type: (Task, dict[Any, Any]) -> None
  175. logger.debug("celery_task_success %s", sender)
  176. headers = _get_headers(sender)
  177. if "sentry-monitor-slug" not in headers:
  178. return
  179. monitor_config = headers.get("sentry-monitor-config", {})
  180. start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
  181. capture_checkin(
  182. monitor_slug=headers["sentry-monitor-slug"],
  183. monitor_config=monitor_config,
  184. check_in_id=headers["sentry-monitor-check-in-id"],
  185. duration=(
  186. _now_seconds_since_epoch() - float(start_timestamp_s)
  187. if start_timestamp_s
  188. else None
  189. ),
  190. status=MonitorStatus.OK,
  191. )
  192. def crons_task_failure(sender, **kwargs):
  193. # type: (Task, dict[Any, Any]) -> None
  194. logger.debug("celery_task_failure %s", sender)
  195. headers = _get_headers(sender)
  196. if "sentry-monitor-slug" not in headers:
  197. return
  198. monitor_config = headers.get("sentry-monitor-config", {})
  199. start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
  200. capture_checkin(
  201. monitor_slug=headers["sentry-monitor-slug"],
  202. monitor_config=monitor_config,
  203. check_in_id=headers["sentry-monitor-check-in-id"],
  204. duration=(
  205. _now_seconds_since_epoch() - float(start_timestamp_s)
  206. if start_timestamp_s
  207. else None
  208. ),
  209. status=MonitorStatus.ERROR,
  210. )
  211. def crons_task_retry(sender, **kwargs):
  212. # type: (Task, dict[Any, Any]) -> None
  213. logger.debug("celery_task_retry %s", sender)
  214. headers = _get_headers(sender)
  215. if "sentry-monitor-slug" not in headers:
  216. return
  217. monitor_config = headers.get("sentry-monitor-config", {})
  218. start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
  219. capture_checkin(
  220. monitor_slug=headers["sentry-monitor-slug"],
  221. monitor_config=monitor_config,
  222. check_in_id=headers["sentry-monitor-check-in-id"],
  223. duration=(
  224. _now_seconds_since_epoch() - float(start_timestamp_s)
  225. if start_timestamp_s
  226. else None
  227. ),
  228. status=MonitorStatus.ERROR,
  229. )