ray.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import inspect
  2. import sys
  3. import sentry_sdk
  4. from sentry_sdk.consts import OP, SPANSTATUS
  5. from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration
  6. from sentry_sdk.tracing import TransactionSource
  7. from sentry_sdk.utils import (
  8. event_from_exception,
  9. logger,
  10. package_version,
  11. qualname_from_function,
  12. reraise,
  13. )
  14. try:
  15. import ray # type: ignore[import-not-found]
  16. except ImportError:
  17. raise DidNotEnable("Ray not installed.")
  18. import functools
  19. from typing import TYPE_CHECKING
  20. if TYPE_CHECKING:
  21. from collections.abc import Callable
  22. from typing import Any, Optional
  23. from sentry_sdk.utils import ExcInfo
  24. def _check_sentry_initialized():
  25. # type: () -> None
  26. if sentry_sdk.get_client().is_active():
  27. return
  28. logger.debug(
  29. "[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded."
  30. )
  31. def _patch_ray_remote():
  32. # type: () -> None
  33. old_remote = ray.remote
  34. @functools.wraps(old_remote)
  35. def new_remote(f=None, *args, **kwargs):
  36. # type: (Optional[Callable[..., Any]], *Any, **Any) -> Callable[..., Any]
  37. if inspect.isclass(f):
  38. # Ray Actors
  39. # (https://docs.ray.io/en/latest/ray-core/actors.html)
  40. # are not supported
  41. # (Only Ray Tasks are supported)
  42. return old_remote(f, *args, **kwargs)
  43. def wrapper(user_f):
  44. # type: (Callable[..., Any]) -> Any
  45. def new_func(*f_args, _tracing=None, **f_kwargs):
  46. # type: (Any, Optional[dict[str, Any]], Any) -> Any
  47. _check_sentry_initialized()
  48. transaction = sentry_sdk.continue_trace(
  49. _tracing or {},
  50. op=OP.QUEUE_TASK_RAY,
  51. name=qualname_from_function(user_f),
  52. origin=RayIntegration.origin,
  53. source=TransactionSource.TASK,
  54. )
  55. with sentry_sdk.start_transaction(transaction) as transaction:
  56. try:
  57. result = user_f(*f_args, **f_kwargs)
  58. transaction.set_status(SPANSTATUS.OK)
  59. except Exception:
  60. transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
  61. exc_info = sys.exc_info()
  62. _capture_exception(exc_info)
  63. reraise(*exc_info)
  64. return result
  65. if f:
  66. rv = old_remote(new_func)
  67. else:
  68. rv = old_remote(*args, **kwargs)(new_func)
  69. old_remote_method = rv.remote
  70. def _remote_method_with_header_propagation(*args, **kwargs):
  71. # type: (*Any, **Any) -> Any
  72. """
  73. Ray Client
  74. """
  75. with sentry_sdk.start_span(
  76. op=OP.QUEUE_SUBMIT_RAY,
  77. name=qualname_from_function(user_f),
  78. origin=RayIntegration.origin,
  79. ) as span:
  80. tracing = {
  81. k: v
  82. for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
  83. }
  84. try:
  85. result = old_remote_method(*args, **kwargs, _tracing=tracing)
  86. span.set_status(SPANSTATUS.OK)
  87. except Exception:
  88. span.set_status(SPANSTATUS.INTERNAL_ERROR)
  89. exc_info = sys.exc_info()
  90. _capture_exception(exc_info)
  91. reraise(*exc_info)
  92. return result
  93. rv.remote = _remote_method_with_header_propagation
  94. return rv
  95. if f is not None:
  96. return wrapper(f)
  97. else:
  98. return wrapper
  99. ray.remote = new_remote
  100. def _capture_exception(exc_info, **kwargs):
  101. # type: (ExcInfo, **Any) -> None
  102. client = sentry_sdk.get_client()
  103. event, hint = event_from_exception(
  104. exc_info,
  105. client_options=client.options,
  106. mechanism={
  107. "handled": False,
  108. "type": RayIntegration.identifier,
  109. },
  110. )
  111. sentry_sdk.capture_event(event, hint=hint)
  112. class RayIntegration(Integration):
  113. identifier = "ray"
  114. origin = f"auto.queue.{identifier}"
  115. @staticmethod
  116. def setup_once():
  117. # type: () -> None
  118. version = package_version("ray")
  119. _check_minimum_version(RayIntegration, version)
  120. _patch_ray_remote()