arq.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. import sys
  2. import sentry_sdk
  3. from sentry_sdk.consts import OP, SPANSTATUS
  4. from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration
  5. from sentry_sdk.integrations.logging import ignore_logger
  6. from sentry_sdk.scope import should_send_default_pii
  7. from sentry_sdk.tracing import Transaction, TransactionSource
  8. from sentry_sdk.utils import (
  9. capture_internal_exceptions,
  10. ensure_integration_enabled,
  11. event_from_exception,
  12. SENSITIVE_DATA_SUBSTITUTE,
  13. parse_version,
  14. reraise,
  15. )
  16. try:
  17. import arq.worker
  18. from arq.version import VERSION as ARQ_VERSION
  19. from arq.connections import ArqRedis
  20. from arq.worker import JobExecutionFailed, Retry, RetryJob, Worker
  21. except ImportError:
  22. raise DidNotEnable("Arq is not installed")
  23. from typing import TYPE_CHECKING
  24. if TYPE_CHECKING:
  25. from typing import Any, Dict, Optional, Union
  26. from sentry_sdk._types import EventProcessor, Event, ExcInfo, Hint
  27. from arq.cron import CronJob
  28. from arq.jobs import Job
  29. from arq.typing import WorkerCoroutine
  30. from arq.worker import Function
  31. ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob)
  32. class ArqIntegration(Integration):
  33. identifier = "arq"
  34. origin = f"auto.queue.{identifier}"
  35. @staticmethod
  36. def setup_once():
  37. # type: () -> None
  38. try:
  39. if isinstance(ARQ_VERSION, str):
  40. version = parse_version(ARQ_VERSION)
  41. else:
  42. version = ARQ_VERSION.version[:2]
  43. except (TypeError, ValueError):
  44. version = None
  45. _check_minimum_version(ArqIntegration, version)
  46. patch_enqueue_job()
  47. patch_run_job()
  48. patch_create_worker()
  49. ignore_logger("arq.worker")
  50. def patch_enqueue_job():
  51. # type: () -> None
  52. old_enqueue_job = ArqRedis.enqueue_job
  53. original_kwdefaults = old_enqueue_job.__kwdefaults__
  54. async def _sentry_enqueue_job(self, function, *args, **kwargs):
  55. # type: (ArqRedis, str, *Any, **Any) -> Optional[Job]
  56. integration = sentry_sdk.get_client().get_integration(ArqIntegration)
  57. if integration is None:
  58. return await old_enqueue_job(self, function, *args, **kwargs)
  59. with sentry_sdk.start_span(
  60. op=OP.QUEUE_SUBMIT_ARQ, name=function, origin=ArqIntegration.origin
  61. ):
  62. return await old_enqueue_job(self, function, *args, **kwargs)
  63. _sentry_enqueue_job.__kwdefaults__ = original_kwdefaults
  64. ArqRedis.enqueue_job = _sentry_enqueue_job
  65. def patch_run_job():
  66. # type: () -> None
  67. old_run_job = Worker.run_job
  68. async def _sentry_run_job(self, job_id, score):
  69. # type: (Worker, str, int) -> None
  70. integration = sentry_sdk.get_client().get_integration(ArqIntegration)
  71. if integration is None:
  72. return await old_run_job(self, job_id, score)
  73. with sentry_sdk.isolation_scope() as scope:
  74. scope._name = "arq"
  75. scope.clear_breadcrumbs()
  76. transaction = Transaction(
  77. name="unknown arq task",
  78. status="ok",
  79. op=OP.QUEUE_TASK_ARQ,
  80. source=TransactionSource.TASK,
  81. origin=ArqIntegration.origin,
  82. )
  83. with sentry_sdk.start_transaction(transaction):
  84. return await old_run_job(self, job_id, score)
  85. Worker.run_job = _sentry_run_job
  86. def _capture_exception(exc_info):
  87. # type: (ExcInfo) -> None
  88. scope = sentry_sdk.get_current_scope()
  89. if scope.transaction is not None:
  90. if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS:
  91. scope.transaction.set_status(SPANSTATUS.ABORTED)
  92. return
  93. scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
  94. event, hint = event_from_exception(
  95. exc_info,
  96. client_options=sentry_sdk.get_client().options,
  97. mechanism={"type": ArqIntegration.identifier, "handled": False},
  98. )
  99. sentry_sdk.capture_event(event, hint=hint)
  100. def _make_event_processor(ctx, *args, **kwargs):
  101. # type: (Dict[Any, Any], *Any, **Any) -> EventProcessor
  102. def event_processor(event, hint):
  103. # type: (Event, Hint) -> Optional[Event]
  104. with capture_internal_exceptions():
  105. scope = sentry_sdk.get_current_scope()
  106. if scope.transaction is not None:
  107. scope.transaction.name = ctx["job_name"]
  108. event["transaction"] = ctx["job_name"]
  109. tags = event.setdefault("tags", {})
  110. tags["arq_task_id"] = ctx["job_id"]
  111. tags["arq_task_retry"] = ctx["job_try"] > 1
  112. extra = event.setdefault("extra", {})
  113. extra["arq-job"] = {
  114. "task": ctx["job_name"],
  115. "args": (
  116. args if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
  117. ),
  118. "kwargs": (
  119. kwargs if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
  120. ),
  121. "retry": ctx["job_try"],
  122. }
  123. return event
  124. return event_processor
  125. def _wrap_coroutine(name, coroutine):
  126. # type: (str, WorkerCoroutine) -> WorkerCoroutine
  127. async def _sentry_coroutine(ctx, *args, **kwargs):
  128. # type: (Dict[Any, Any], *Any, **Any) -> Any
  129. integration = sentry_sdk.get_client().get_integration(ArqIntegration)
  130. if integration is None:
  131. return await coroutine(ctx, *args, **kwargs)
  132. sentry_sdk.get_isolation_scope().add_event_processor(
  133. _make_event_processor({**ctx, "job_name": name}, *args, **kwargs)
  134. )
  135. try:
  136. result = await coroutine(ctx, *args, **kwargs)
  137. except Exception:
  138. exc_info = sys.exc_info()
  139. _capture_exception(exc_info)
  140. reraise(*exc_info)
  141. return result
  142. return _sentry_coroutine
  143. def patch_create_worker():
  144. # type: () -> None
  145. old_create_worker = arq.worker.create_worker
  146. @ensure_integration_enabled(ArqIntegration, old_create_worker)
  147. def _sentry_create_worker(*args, **kwargs):
  148. # type: (*Any, **Any) -> Worker
  149. settings_cls = args[0]
  150. if isinstance(settings_cls, dict):
  151. if "functions" in settings_cls:
  152. settings_cls["functions"] = [
  153. _get_arq_function(func)
  154. for func in settings_cls.get("functions", [])
  155. ]
  156. if "cron_jobs" in settings_cls:
  157. settings_cls["cron_jobs"] = [
  158. _get_arq_cron_job(cron_job)
  159. for cron_job in settings_cls.get("cron_jobs", [])
  160. ]
  161. if hasattr(settings_cls, "functions"):
  162. settings_cls.functions = [
  163. _get_arq_function(func) for func in settings_cls.functions
  164. ]
  165. if hasattr(settings_cls, "cron_jobs"):
  166. settings_cls.cron_jobs = [
  167. _get_arq_cron_job(cron_job)
  168. for cron_job in (settings_cls.cron_jobs or [])
  169. ]
  170. if "functions" in kwargs:
  171. kwargs["functions"] = [
  172. _get_arq_function(func) for func in kwargs.get("functions", [])
  173. ]
  174. if "cron_jobs" in kwargs:
  175. kwargs["cron_jobs"] = [
  176. _get_arq_cron_job(cron_job) for cron_job in kwargs.get("cron_jobs", [])
  177. ]
  178. return old_create_worker(*args, **kwargs)
  179. arq.worker.create_worker = _sentry_create_worker
  180. def _get_arq_function(func):
  181. # type: (Union[str, Function, WorkerCoroutine]) -> Function
  182. arq_func = arq.worker.func(func)
  183. arq_func.coroutine = _wrap_coroutine(arq_func.name, arq_func.coroutine)
  184. return arq_func
  185. def _get_arq_cron_job(cron_job):
  186. # type: (CronJob) -> CronJob
  187. cron_job.coroutine = _wrap_coroutine(cron_job.name, cron_job.coroutine)
  188. return cron_job