| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- import os
- import time
- from threading import Thread, Lock
- import sentry_sdk
- from sentry_sdk.utils import logger
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from typing import Optional
- MAX_DOWNSAMPLE_FACTOR = 10
- class Monitor:
- """
- Performs health checks in a separate thread once every interval seconds
- and updates the internal state. Other parts of the SDK only read this state
- and act accordingly.
- """
- name = "sentry.monitor"
- def __init__(self, transport, interval=10):
- # type: (sentry_sdk.transport.Transport, float) -> None
- self.transport = transport # type: sentry_sdk.transport.Transport
- self.interval = interval # type: float
- self._healthy = True
- self._downsample_factor = 0 # type: int
- self._thread = None # type: Optional[Thread]
- self._thread_lock = Lock()
- self._thread_for_pid = None # type: Optional[int]
- self._running = True
- def _ensure_running(self):
- # type: () -> None
- """
- Check that the monitor has an active thread to run in, or create one if not.
- Note that this might fail (e.g. in Python 3.12 it's not possible to
- spawn new threads at interpreter shutdown). In that case self._running
- will be False after running this function.
- """
- if self._thread_for_pid == os.getpid() and self._thread is not None:
- return None
- with self._thread_lock:
- if self._thread_for_pid == os.getpid() and self._thread is not None:
- return None
- def _thread():
- # type: (...) -> None
- while self._running:
- time.sleep(self.interval)
- if self._running:
- self.run()
- thread = Thread(name=self.name, target=_thread)
- thread.daemon = True
- try:
- thread.start()
- except RuntimeError:
- # Unfortunately at this point the interpreter is in a state that no
- # longer allows us to spawn a thread and we have to bail.
- self._running = False
- return None
- self._thread = thread
- self._thread_for_pid = os.getpid()
- return None
- def run(self):
- # type: () -> None
- self.check_health()
- self.set_downsample_factor()
- def set_downsample_factor(self):
- # type: () -> None
- if self._healthy:
- if self._downsample_factor > 0:
- logger.debug(
- "[Monitor] health check positive, reverting to normal sampling"
- )
- self._downsample_factor = 0
- else:
- if self.downsample_factor < MAX_DOWNSAMPLE_FACTOR:
- self._downsample_factor += 1
- logger.debug(
- "[Monitor] health check negative, downsampling with a factor of %d",
- self._downsample_factor,
- )
- def check_health(self):
- # type: () -> None
- """
- Perform the actual health checks,
- currently only checks if the transport is rate-limited.
- TODO: augment in the future with more checks.
- """
- self._healthy = self.transport.is_healthy()
- def is_healthy(self):
- # type: () -> bool
- self._ensure_running()
- return self._healthy
- @property
- def downsample_factor(self):
- # type: () -> int
- self._ensure_running()
- return self._downsample_factor
- def kill(self):
- # type: () -> None
- self._running = False
|