threading.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import sys
  2. import warnings
  3. from functools import wraps
  4. from threading import Thread, current_thread
  5. from concurrent.futures import ThreadPoolExecutor, Future
  6. import sentry_sdk
  7. from sentry_sdk.integrations import Integration
  8. from sentry_sdk.scope import use_isolation_scope, use_scope
  9. from sentry_sdk.utils import (
  10. event_from_exception,
  11. capture_internal_exceptions,
  12. logger,
  13. reraise,
  14. )
  15. from typing import TYPE_CHECKING
  16. if TYPE_CHECKING:
  17. from typing import Any
  18. from typing import TypeVar
  19. from typing import Callable
  20. from typing import Optional
  21. from sentry_sdk._types import ExcInfo
  22. F = TypeVar("F", bound=Callable[..., Any])
  23. T = TypeVar("T", bound=Any)
  24. class ThreadingIntegration(Integration):
  25. identifier = "threading"
  26. def __init__(self, propagate_hub=None, propagate_scope=True):
  27. # type: (Optional[bool], bool) -> None
  28. if propagate_hub is not None:
  29. logger.warning(
  30. "Deprecated: propagate_hub is deprecated. This will be removed in the future."
  31. )
  32. # Note: propagate_hub did not have any effect on propagation of scope data
  33. # scope data was always propagated no matter what the value of propagate_hub was
  34. # This is why the default for propagate_scope is True
  35. self.propagate_scope = propagate_scope
  36. if propagate_hub is not None:
  37. self.propagate_scope = propagate_hub
  38. @staticmethod
  39. def setup_once():
  40. # type: () -> None
  41. old_start = Thread.start
  42. try:
  43. from django import VERSION as django_version # noqa: N811
  44. import channels # type: ignore[import-untyped]
  45. channels_version = channels.__version__
  46. except ImportError:
  47. django_version = None
  48. channels_version = None
  49. is_async_emulated_with_threads = (
  50. sys.version_info < (3, 9)
  51. and channels_version is not None
  52. and channels_version < "4.0.0"
  53. and django_version is not None
  54. and django_version >= (3, 0)
  55. and django_version < (4, 0)
  56. )
  57. @wraps(old_start)
  58. def sentry_start(self, *a, **kw):
  59. # type: (Thread, *Any, **Any) -> Any
  60. integration = sentry_sdk.get_client().get_integration(ThreadingIntegration)
  61. if integration is None:
  62. return old_start(self, *a, **kw)
  63. if integration.propagate_scope:
  64. if is_async_emulated_with_threads:
  65. warnings.warn(
  66. "There is a known issue with Django channels 2.x and 3.x when using Python 3.8 or older. "
  67. "(Async support is emulated using threads and some Sentry data may be leaked between those threads.) "
  68. "Please either upgrade to Django channels 4.0+, use Django's async features "
  69. "available in Django 3.1+ instead of Django channels, or upgrade to Python 3.9+.",
  70. stacklevel=2,
  71. )
  72. isolation_scope = sentry_sdk.get_isolation_scope()
  73. current_scope = sentry_sdk.get_current_scope()
  74. else:
  75. isolation_scope = sentry_sdk.get_isolation_scope().fork()
  76. current_scope = sentry_sdk.get_current_scope().fork()
  77. else:
  78. isolation_scope = None
  79. current_scope = None
  80. # Patching instance methods in `start()` creates a reference cycle if
  81. # done in a naive way. See
  82. # https://github.com/getsentry/sentry-python/pull/434
  83. #
  84. # In threading module, using current_thread API will access current thread instance
  85. # without holding it to avoid a reference cycle in an easier way.
  86. with capture_internal_exceptions():
  87. new_run = _wrap_run(
  88. isolation_scope,
  89. current_scope,
  90. getattr(self.run, "__func__", self.run),
  91. )
  92. self.run = new_run # type: ignore
  93. return old_start(self, *a, **kw)
  94. Thread.start = sentry_start # type: ignore
  95. ThreadPoolExecutor.submit = _wrap_threadpool_executor_submit( # type: ignore
  96. ThreadPoolExecutor.submit, is_async_emulated_with_threads
  97. )
  98. def _wrap_run(isolation_scope_to_use, current_scope_to_use, old_run_func):
  99. # type: (Optional[sentry_sdk.Scope], Optional[sentry_sdk.Scope], F) -> F
  100. @wraps(old_run_func)
  101. def run(*a, **kw):
  102. # type: (*Any, **Any) -> Any
  103. def _run_old_run_func():
  104. # type: () -> Any
  105. try:
  106. self = current_thread()
  107. return old_run_func(self, *a[1:], **kw)
  108. except Exception:
  109. reraise(*_capture_exception())
  110. if isolation_scope_to_use is not None and current_scope_to_use is not None:
  111. with use_isolation_scope(isolation_scope_to_use):
  112. with use_scope(current_scope_to_use):
  113. return _run_old_run_func()
  114. else:
  115. return _run_old_run_func()
  116. return run # type: ignore
  117. def _wrap_threadpool_executor_submit(func, is_async_emulated_with_threads):
  118. # type: (Callable[..., Future[T]], bool) -> Callable[..., Future[T]]
  119. """
  120. Wrap submit call to propagate scopes on task submission.
  121. """
  122. @wraps(func)
  123. def sentry_submit(self, fn, *args, **kwargs):
  124. # type: (ThreadPoolExecutor, Callable[..., T], *Any, **Any) -> Future[T]
  125. integration = sentry_sdk.get_client().get_integration(ThreadingIntegration)
  126. if integration is None:
  127. return func(self, fn, *args, **kwargs)
  128. if integration.propagate_scope and is_async_emulated_with_threads:
  129. isolation_scope = sentry_sdk.get_isolation_scope()
  130. current_scope = sentry_sdk.get_current_scope()
  131. elif integration.propagate_scope:
  132. isolation_scope = sentry_sdk.get_isolation_scope().fork()
  133. current_scope = sentry_sdk.get_current_scope().fork()
  134. else:
  135. isolation_scope = None
  136. current_scope = None
  137. def wrapped_fn(*args, **kwargs):
  138. # type: (*Any, **Any) -> Any
  139. if isolation_scope is not None and current_scope is not None:
  140. with use_isolation_scope(isolation_scope):
  141. with use_scope(current_scope):
  142. return fn(*args, **kwargs)
  143. return fn(*args, **kwargs)
  144. return func(self, wrapped_fn, *args, **kwargs)
  145. return sentry_submit
  146. def _capture_exception():
  147. # type: () -> ExcInfo
  148. exc_info = sys.exc_info()
  149. client = sentry_sdk.get_client()
  150. if client.get_integration(ThreadingIntegration) is not None:
  151. event, hint = event_from_exception(
  152. exc_info,
  153. client_options=client.options,
  154. mechanism={"type": "threading", "handled": False},
  155. )
  156. sentry_sdk.capture_event(event, hint=hint)
  157. return exc_info