| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- import sys
- import warnings
- from functools import wraps
- from threading import Thread, current_thread
- from concurrent.futures import ThreadPoolExecutor, Future
- import sentry_sdk
- from sentry_sdk.integrations import Integration
- from sentry_sdk.scope import use_isolation_scope, use_scope
- from sentry_sdk.utils import (
- event_from_exception,
- capture_internal_exceptions,
- logger,
- reraise,
- )
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from typing import Any
- from typing import TypeVar
- from typing import Callable
- from typing import Optional
- from sentry_sdk._types import ExcInfo
- F = TypeVar("F", bound=Callable[..., Any])
- T = TypeVar("T", bound=Any)
- class ThreadingIntegration(Integration):
- identifier = "threading"
- def __init__(self, propagate_hub=None, propagate_scope=True):
- # type: (Optional[bool], bool) -> None
- if propagate_hub is not None:
- logger.warning(
- "Deprecated: propagate_hub is deprecated. This will be removed in the future."
- )
- # Note: propagate_hub did not have any effect on propagation of scope data
- # scope data was always propagated no matter what the value of propagate_hub was
- # This is why the default for propagate_scope is True
- self.propagate_scope = propagate_scope
- if propagate_hub is not None:
- self.propagate_scope = propagate_hub
- @staticmethod
- def setup_once():
- # type: () -> None
- old_start = Thread.start
- try:
- from django import VERSION as django_version # noqa: N811
- import channels # type: ignore[import-untyped]
- channels_version = channels.__version__
- except ImportError:
- django_version = None
- channels_version = None
- is_async_emulated_with_threads = (
- sys.version_info < (3, 9)
- and channels_version is not None
- and channels_version < "4.0.0"
- and django_version is not None
- and django_version >= (3, 0)
- and django_version < (4, 0)
- )
- @wraps(old_start)
- def sentry_start(self, *a, **kw):
- # type: (Thread, *Any, **Any) -> Any
- integration = sentry_sdk.get_client().get_integration(ThreadingIntegration)
- if integration is None:
- return old_start(self, *a, **kw)
- if integration.propagate_scope:
- if is_async_emulated_with_threads:
- warnings.warn(
- "There is a known issue with Django channels 2.x and 3.x when using Python 3.8 or older. "
- "(Async support is emulated using threads and some Sentry data may be leaked between those threads.) "
- "Please either upgrade to Django channels 4.0+, use Django's async features "
- "available in Django 3.1+ instead of Django channels, or upgrade to Python 3.9+.",
- stacklevel=2,
- )
- isolation_scope = sentry_sdk.get_isolation_scope()
- current_scope = sentry_sdk.get_current_scope()
- else:
- isolation_scope = sentry_sdk.get_isolation_scope().fork()
- current_scope = sentry_sdk.get_current_scope().fork()
- else:
- isolation_scope = None
- current_scope = None
- # Patching instance methods in `start()` creates a reference cycle if
- # done in a naive way. See
- # https://github.com/getsentry/sentry-python/pull/434
- #
- # In threading module, using current_thread API will access current thread instance
- # without holding it to avoid a reference cycle in an easier way.
- with capture_internal_exceptions():
- new_run = _wrap_run(
- isolation_scope,
- current_scope,
- getattr(self.run, "__func__", self.run),
- )
- self.run = new_run # type: ignore
- return old_start(self, *a, **kw)
- Thread.start = sentry_start # type: ignore
- ThreadPoolExecutor.submit = _wrap_threadpool_executor_submit( # type: ignore
- ThreadPoolExecutor.submit, is_async_emulated_with_threads
- )
- def _wrap_run(isolation_scope_to_use, current_scope_to_use, old_run_func):
- # type: (Optional[sentry_sdk.Scope], Optional[sentry_sdk.Scope], F) -> F
- @wraps(old_run_func)
- def run(*a, **kw):
- # type: (*Any, **Any) -> Any
- def _run_old_run_func():
- # type: () -> Any
- try:
- self = current_thread()
- return old_run_func(self, *a[1:], **kw)
- except Exception:
- reraise(*_capture_exception())
- if isolation_scope_to_use is not None and current_scope_to_use is not None:
- with use_isolation_scope(isolation_scope_to_use):
- with use_scope(current_scope_to_use):
- return _run_old_run_func()
- else:
- return _run_old_run_func()
- return run # type: ignore
- def _wrap_threadpool_executor_submit(func, is_async_emulated_with_threads):
- # type: (Callable[..., Future[T]], bool) -> Callable[..., Future[T]]
- """
- Wrap submit call to propagate scopes on task submission.
- """
- @wraps(func)
- def sentry_submit(self, fn, *args, **kwargs):
- # type: (ThreadPoolExecutor, Callable[..., T], *Any, **Any) -> Future[T]
- integration = sentry_sdk.get_client().get_integration(ThreadingIntegration)
- if integration is None:
- return func(self, fn, *args, **kwargs)
- if integration.propagate_scope and is_async_emulated_with_threads:
- isolation_scope = sentry_sdk.get_isolation_scope()
- current_scope = sentry_sdk.get_current_scope()
- elif integration.propagate_scope:
- isolation_scope = sentry_sdk.get_isolation_scope().fork()
- current_scope = sentry_sdk.get_current_scope().fork()
- else:
- isolation_scope = None
- current_scope = None
- def wrapped_fn(*args, **kwargs):
- # type: (*Any, **Any) -> Any
- if isolation_scope is not None and current_scope is not None:
- with use_isolation_scope(isolation_scope):
- with use_scope(current_scope):
- return fn(*args, **kwargs)
- return fn(*args, **kwargs)
- return func(self, wrapped_fn, *args, **kwargs)
- return sentry_submit
- def _capture_exception():
- # type: () -> ExcInfo
- exc_info = sys.exc_info()
- client = sentry_sdk.get_client()
- if client.get_integration(ThreadingIntegration) is not None:
- event, hint = event_from_exception(
- exc_info,
- client_options=client.options,
- mechanism={"type": "threading", "handled": False},
- )
- sentry_sdk.capture_event(event, hint=hint)
- return exc_info
|