| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839 |
- """
- This file is originally based on code from https://github.com/nylas/nylas-perftools,
- which is published under the following license:
- The MIT License (MIT)
- Copyright (c) 2014 Nylas
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in all
- copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- SOFTWARE.
- """
- import atexit
- import os
- import platform
- import random
- import sys
- import threading
- import time
- import uuid
- import warnings
- from abc import ABC, abstractmethod
- from collections import deque
- import sentry_sdk
- from sentry_sdk._lru_cache import LRUCache
- from sentry_sdk.profiler.utils import (
- DEFAULT_SAMPLING_FREQUENCY,
- extract_stack,
- )
- from sentry_sdk.utils import (
- capture_internal_exception,
- capture_internal_exceptions,
- get_current_thread_meta,
- is_gevent,
- is_valid_sample_rate,
- logger,
- nanosecond_time,
- set_in_app_in_frames,
- )
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from typing import Any
- from typing import Callable
- from typing import Deque
- from typing import Dict
- from typing import List
- from typing import Optional
- from typing import Set
- from typing import Type
- from typing_extensions import TypedDict
- from sentry_sdk.profiler.utils import (
- ProcessedStack,
- ProcessedFrame,
- ProcessedThreadMetadata,
- FrameId,
- StackId,
- ThreadId,
- ExtractedSample,
- )
- from sentry_sdk._types import Event, SamplingContext, ProfilerMode
- ProcessedSample = TypedDict(
- "ProcessedSample",
- {
- "elapsed_since_start_ns": str,
- "thread_id": ThreadId,
- "stack_id": int,
- },
- )
- ProcessedProfile = TypedDict(
- "ProcessedProfile",
- {
- "frames": List[ProcessedFrame],
- "stacks": List[ProcessedStack],
- "samples": List[ProcessedSample],
- "thread_metadata": Dict[ThreadId, ProcessedThreadMetadata],
- },
- )
- try:
- from gevent.monkey import get_original
- from gevent.threadpool import ThreadPool as _ThreadPool
- ThreadPool = _ThreadPool # type: Optional[Type[_ThreadPool]]
- thread_sleep = get_original("time", "sleep")
- except ImportError:
- thread_sleep = time.sleep
- ThreadPool = None
- _scheduler = None # type: Optional[Scheduler]
- # The minimum number of unique samples that must exist in a profile to be
- # considered valid.
- PROFILE_MINIMUM_SAMPLES = 2
- def has_profiling_enabled(options):
- # type: (Dict[str, Any]) -> bool
- profiles_sampler = options["profiles_sampler"]
- if profiles_sampler is not None:
- return True
- profiles_sample_rate = options["profiles_sample_rate"]
- if profiles_sample_rate is not None and profiles_sample_rate > 0:
- return True
- profiles_sample_rate = options["_experiments"].get("profiles_sample_rate")
- if profiles_sample_rate is not None:
- logger.warning(
- "_experiments['profiles_sample_rate'] is deprecated. "
- "Please use the non-experimental profiles_sample_rate option "
- "directly."
- )
- if profiles_sample_rate > 0:
- return True
- return False
- def setup_profiler(options):
- # type: (Dict[str, Any]) -> bool
- global _scheduler
- if _scheduler is not None:
- logger.debug("[Profiling] Profiler is already setup")
- return False
- frequency = DEFAULT_SAMPLING_FREQUENCY
- if is_gevent():
- # If gevent has patched the threading modules then we cannot rely on
- # them to spawn a native thread for sampling.
- # Instead we default to the GeventScheduler which is capable of
- # spawning native threads within gevent.
- default_profiler_mode = GeventScheduler.mode
- else:
- default_profiler_mode = ThreadScheduler.mode
- if options.get("profiler_mode") is not None:
- profiler_mode = options["profiler_mode"]
- else:
- profiler_mode = options.get("_experiments", {}).get("profiler_mode")
- if profiler_mode is not None:
- logger.warning(
- "_experiments['profiler_mode'] is deprecated. Please use the "
- "non-experimental profiler_mode option directly."
- )
- profiler_mode = profiler_mode or default_profiler_mode
- if (
- profiler_mode == ThreadScheduler.mode
- # for legacy reasons, we'll keep supporting sleep mode for this scheduler
- or profiler_mode == "sleep"
- ):
- _scheduler = ThreadScheduler(frequency=frequency)
- elif profiler_mode == GeventScheduler.mode:
- _scheduler = GeventScheduler(frequency=frequency)
- else:
- raise ValueError("Unknown profiler mode: {}".format(profiler_mode))
- logger.debug(
- "[Profiling] Setting up profiler in {mode} mode".format(mode=_scheduler.mode)
- )
- _scheduler.setup()
- atexit.register(teardown_profiler)
- return True
- def teardown_profiler():
- # type: () -> None
- global _scheduler
- if _scheduler is not None:
- _scheduler.teardown()
- _scheduler = None
- MAX_PROFILE_DURATION_NS = int(3e10) # 30 seconds
- class Profile:
- def __init__(
- self,
- sampled, # type: Optional[bool]
- start_ns, # type: int
- hub=None, # type: Optional[sentry_sdk.Hub]
- scheduler=None, # type: Optional[Scheduler]
- ):
- # type: (...) -> None
- self.scheduler = _scheduler if scheduler is None else scheduler
- self.event_id = uuid.uuid4().hex # type: str
- self.sampled = sampled # type: Optional[bool]
- # Various framework integrations are capable of overwriting the active thread id.
- # If it is set to `None` at the end of the profile, we fall back to the default.
- self._default_active_thread_id = get_current_thread_meta()[0] or 0 # type: int
- self.active_thread_id = None # type: Optional[int]
- try:
- self.start_ns = start_ns # type: int
- except AttributeError:
- self.start_ns = 0
- self.stop_ns = 0 # type: int
- self.active = False # type: bool
- self.indexed_frames = {} # type: Dict[FrameId, int]
- self.indexed_stacks = {} # type: Dict[StackId, int]
- self.frames = [] # type: List[ProcessedFrame]
- self.stacks = [] # type: List[ProcessedStack]
- self.samples = [] # type: List[ProcessedSample]
- self.unique_samples = 0
- # Backwards compatibility with the old hub property
- self._hub = None # type: Optional[sentry_sdk.Hub]
- if hub is not None:
- self._hub = hub
- warnings.warn(
- "The `hub` parameter is deprecated. Please do not use it.",
- DeprecationWarning,
- stacklevel=2,
- )
- def update_active_thread_id(self):
- # type: () -> None
- self.active_thread_id = get_current_thread_meta()[0]
- logger.debug(
- "[Profiling] updating active thread id to {tid}".format(
- tid=self.active_thread_id
- )
- )
- def _set_initial_sampling_decision(self, sampling_context):
- # type: (SamplingContext) -> None
- """
- Sets the profile's sampling decision according to the following
- precedence rules:
- 1. If the transaction to be profiled is not sampled, that decision
- will be used, regardless of anything else.
- 2. Use `profiles_sample_rate` to decide.
- """
- # The corresponding transaction was not sampled,
- # so don't generate a profile for it.
- if not self.sampled:
- logger.debug(
- "[Profiling] Discarding profile because transaction is discarded."
- )
- self.sampled = False
- return
- # The profiler hasn't been properly initialized.
- if self.scheduler is None:
- logger.debug(
- "[Profiling] Discarding profile because profiler was not started."
- )
- self.sampled = False
- return
- client = sentry_sdk.get_client()
- if not client.is_active():
- self.sampled = False
- return
- options = client.options
- if callable(options.get("profiles_sampler")):
- sample_rate = options["profiles_sampler"](sampling_context)
- elif options["profiles_sample_rate"] is not None:
- sample_rate = options["profiles_sample_rate"]
- else:
- sample_rate = options["_experiments"].get("profiles_sample_rate")
- # The profiles_sample_rate option was not set, so profiling
- # was never enabled.
- if sample_rate is None:
- logger.debug(
- "[Profiling] Discarding profile because profiling was not enabled."
- )
- self.sampled = False
- return
- if not is_valid_sample_rate(sample_rate, source="Profiling"):
- logger.warning(
- "[Profiling] Discarding profile because of invalid sample rate."
- )
- self.sampled = False
- return
- # Now we roll the dice. random.random is inclusive of 0, but not of 1,
- # so strict < is safe here. In case sample_rate is a boolean, cast it
- # to a float (True becomes 1.0 and False becomes 0.0)
- self.sampled = random.random() < float(sample_rate)
- if self.sampled:
- logger.debug("[Profiling] Initializing profile")
- else:
- logger.debug(
- "[Profiling] Discarding profile because it's not included in the random sample (sample rate = {sample_rate})".format(
- sample_rate=float(sample_rate)
- )
- )
- def start(self):
- # type: () -> None
- if not self.sampled or self.active:
- return
- assert self.scheduler, "No scheduler specified"
- logger.debug("[Profiling] Starting profile")
- self.active = True
- if not self.start_ns:
- self.start_ns = nanosecond_time()
- self.scheduler.start_profiling(self)
- def stop(self):
- # type: () -> None
- if not self.sampled or not self.active:
- return
- assert self.scheduler, "No scheduler specified"
- logger.debug("[Profiling] Stopping profile")
- self.active = False
- self.stop_ns = nanosecond_time()
- def __enter__(self):
- # type: () -> Profile
- scope = sentry_sdk.get_isolation_scope()
- old_profile = scope.profile
- scope.profile = self
- self._context_manager_state = (scope, old_profile)
- self.start()
- return self
- def __exit__(self, ty, value, tb):
- # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
- with capture_internal_exceptions():
- self.stop()
- scope, old_profile = self._context_manager_state
- del self._context_manager_state
- scope.profile = old_profile
- def write(self, ts, sample):
- # type: (int, ExtractedSample) -> None
- if not self.active:
- return
- if ts < self.start_ns:
- return
- offset = ts - self.start_ns
- if offset > MAX_PROFILE_DURATION_NS:
- self.stop()
- return
- self.unique_samples += 1
- elapsed_since_start_ns = str(offset)
- for tid, (stack_id, frame_ids, frames) in sample:
- try:
- # Check if the stack is indexed first, this lets us skip
- # indexing frames if it's not necessary
- if stack_id not in self.indexed_stacks:
- for i, frame_id in enumerate(frame_ids):
- if frame_id not in self.indexed_frames:
- self.indexed_frames[frame_id] = len(self.indexed_frames)
- self.frames.append(frames[i])
- self.indexed_stacks[stack_id] = len(self.indexed_stacks)
- self.stacks.append(
- [self.indexed_frames[frame_id] for frame_id in frame_ids]
- )
- self.samples.append(
- {
- "elapsed_since_start_ns": elapsed_since_start_ns,
- "thread_id": tid,
- "stack_id": self.indexed_stacks[stack_id],
- }
- )
- except AttributeError:
- # For some reason, the frame we get doesn't have certain attributes.
- # When this happens, we abandon the current sample as it's bad.
- capture_internal_exception(sys.exc_info())
- def process(self):
- # type: () -> ProcessedProfile
- # This collects the thread metadata at the end of a profile. Doing it
- # this way means that any threads that terminate before the profile ends
- # will not have any metadata associated with it.
- thread_metadata = {
- str(thread.ident): {
- "name": str(thread.name),
- }
- for thread in threading.enumerate()
- } # type: Dict[str, ProcessedThreadMetadata]
- return {
- "frames": self.frames,
- "stacks": self.stacks,
- "samples": self.samples,
- "thread_metadata": thread_metadata,
- }
- def to_json(self, event_opt, options):
- # type: (Event, Dict[str, Any]) -> Dict[str, Any]
- profile = self.process()
- set_in_app_in_frames(
- profile["frames"],
- options["in_app_exclude"],
- options["in_app_include"],
- options["project_root"],
- )
- return {
- "environment": event_opt.get("environment"),
- "event_id": self.event_id,
- "platform": "python",
- "profile": profile,
- "release": event_opt.get("release", ""),
- "timestamp": event_opt["start_timestamp"],
- "version": "1",
- "device": {
- "architecture": platform.machine(),
- },
- "os": {
- "name": platform.system(),
- "version": platform.release(),
- },
- "runtime": {
- "name": platform.python_implementation(),
- "version": platform.python_version(),
- },
- "transactions": [
- {
- "id": event_opt["event_id"],
- "name": event_opt["transaction"],
- # we start the transaction before the profile and this is
- # the transaction start time relative to the profile, so we
- # hardcode it to 0 until we can start the profile before
- "relative_start_ns": "0",
- # use the duration of the profile instead of the transaction
- # because we end the transaction after the profile
- "relative_end_ns": str(self.stop_ns - self.start_ns),
- "trace_id": event_opt["contexts"]["trace"]["trace_id"],
- "active_thread_id": str(
- self._default_active_thread_id
- if self.active_thread_id is None
- else self.active_thread_id
- ),
- }
- ],
- }
- def valid(self):
- # type: () -> bool
- client = sentry_sdk.get_client()
- if not client.is_active():
- return False
- if not has_profiling_enabled(client.options):
- return False
- if self.sampled is None or not self.sampled:
- if client.transport:
- client.transport.record_lost_event(
- "sample_rate", data_category="profile"
- )
- return False
- if self.unique_samples < PROFILE_MINIMUM_SAMPLES:
- if client.transport:
- client.transport.record_lost_event(
- "insufficient_data", data_category="profile"
- )
- logger.debug("[Profiling] Discarding profile because insufficient samples.")
- return False
- return True
- @property
- def hub(self):
- # type: () -> Optional[sentry_sdk.Hub]
- warnings.warn(
- "The `hub` attribute is deprecated. Please do not access it.",
- DeprecationWarning,
- stacklevel=2,
- )
- return self._hub
- @hub.setter
- def hub(self, value):
- # type: (Optional[sentry_sdk.Hub]) -> None
- warnings.warn(
- "The `hub` attribute is deprecated. Please do not set it.",
- DeprecationWarning,
- stacklevel=2,
- )
- self._hub = value
- class Scheduler(ABC):
- mode = "unknown" # type: ProfilerMode
- def __init__(self, frequency):
- # type: (int) -> None
- self.interval = 1.0 / frequency
- self.sampler = self.make_sampler()
- # cap the number of new profiles at any time so it does not grow infinitely
- self.new_profiles = deque(maxlen=128) # type: Deque[Profile]
- self.active_profiles = set() # type: Set[Profile]
- def __enter__(self):
- # type: () -> Scheduler
- self.setup()
- return self
- def __exit__(self, ty, value, tb):
- # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
- self.teardown()
- @abstractmethod
- def setup(self):
- # type: () -> None
- pass
- @abstractmethod
- def teardown(self):
- # type: () -> None
- pass
- def ensure_running(self):
- # type: () -> None
- """
- Ensure the scheduler is running. By default, this method is a no-op.
- The method should be overridden by any implementation for which it is
- relevant.
- """
- return None
- def start_profiling(self, profile):
- # type: (Profile) -> None
- self.ensure_running()
- self.new_profiles.append(profile)
- def make_sampler(self):
- # type: () -> Callable[..., None]
- cwd = os.getcwd()
- cache = LRUCache(max_size=256)
- def _sample_stack(*args, **kwargs):
- # type: (*Any, **Any) -> None
- """
- Take a sample of the stack on all the threads in the process.
- This should be called at a regular interval to collect samples.
- """
- # no profiles taking place, so we can stop early
- if not self.new_profiles and not self.active_profiles:
- # make sure to clear the cache if we're not profiling so we dont
- # keep a reference to the last stack of frames around
- return
- # This is the number of profiles we want to pop off.
- # It's possible another thread adds a new profile to
- # the list and we spend longer than we want inside
- # the loop below.
- #
- # Also make sure to set this value before extracting
- # frames so we do not write to any new profiles that
- # were started after this point.
- new_profiles = len(self.new_profiles)
- now = nanosecond_time()
- try:
- sample = [
- (str(tid), extract_stack(frame, cache, cwd))
- for tid, frame in sys._current_frames().items()
- ]
- except AttributeError:
- # For some reason, the frame we get doesn't have certain attributes.
- # When this happens, we abandon the current sample as it's bad.
- capture_internal_exception(sys.exc_info())
- return
- # Move the new profiles into the active_profiles set.
- #
- # We cannot directly add the to active_profiles set
- # in `start_profiling` because it is called from other
- # threads which can cause a RuntimeError when it the
- # set sizes changes during iteration without a lock.
- #
- # We also want to avoid using a lock here so threads
- # that are starting profiles are not blocked until it
- # can acquire the lock.
- for _ in range(new_profiles):
- self.active_profiles.add(self.new_profiles.popleft())
- inactive_profiles = []
- for profile in self.active_profiles:
- if profile.active:
- profile.write(now, sample)
- else:
- # If a profile is marked inactive, we buffer it
- # to `inactive_profiles` so it can be removed.
- # We cannot remove it here as it would result
- # in a RuntimeError.
- inactive_profiles.append(profile)
- for profile in inactive_profiles:
- self.active_profiles.remove(profile)
- return _sample_stack
- class ThreadScheduler(Scheduler):
- """
- This scheduler is based on running a daemon thread that will call
- the sampler at a regular interval.
- """
- mode = "thread" # type: ProfilerMode
- name = "sentry.profiler.ThreadScheduler"
- def __init__(self, frequency):
- # type: (int) -> None
- super().__init__(frequency=frequency)
- # used to signal to the thread that it should stop
- self.running = False
- self.thread = None # type: Optional[threading.Thread]
- self.pid = None # type: Optional[int]
- self.lock = threading.Lock()
- def setup(self):
- # type: () -> None
- pass
- def teardown(self):
- # type: () -> None
- if self.running:
- self.running = False
- if self.thread is not None:
- self.thread.join()
- def ensure_running(self):
- # type: () -> None
- """
- Check that the profiler has an active thread to run in, and start one if
- that's not the case.
- Note that this might fail (e.g. in Python 3.12 it's not possible to
- spawn new threads at interpreter shutdown). In that case self.running
- will be False after running this function.
- """
- pid = os.getpid()
- # is running on the right process
- if self.running and self.pid == pid:
- return
- with self.lock:
- # another thread may have tried to acquire the lock
- # at the same time so it may start another thread
- # make sure to check again before proceeding
- if self.running and self.pid == pid:
- return
- self.pid = pid
- self.running = True
- # make sure the thread is a daemon here otherwise this
- # can keep the application running after other threads
- # have exited
- self.thread = threading.Thread(name=self.name, target=self.run, daemon=True)
- try:
- self.thread.start()
- except RuntimeError:
- # Unfortunately at this point the interpreter is in a state that no
- # longer allows us to spawn a thread and we have to bail.
- self.running = False
- self.thread = None
- return
- def run(self):
- # type: () -> None
- last = time.perf_counter()
- while self.running:
- self.sampler()
- # some time may have elapsed since the last time
- # we sampled, so we need to account for that and
- # not sleep for too long
- elapsed = time.perf_counter() - last
- if elapsed < self.interval:
- thread_sleep(self.interval - elapsed)
- # after sleeping, make sure to take the current
- # timestamp so we can use it next iteration
- last = time.perf_counter()
- class GeventScheduler(Scheduler):
- """
- This scheduler is based on the thread scheduler but adapted to work with
- gevent. When using gevent, it may monkey patch the threading modules
- (`threading` and `_thread`). This results in the use of greenlets instead
- of native threads.
- This is an issue because the sampler CANNOT run in a greenlet because
- 1. Other greenlets doing sync work will prevent the sampler from running
- 2. The greenlet runs in the same thread as other greenlets so when taking
- a sample, other greenlets will have been evicted from the thread. This
- results in a sample containing only the sampler's code.
- """
- mode = "gevent" # type: ProfilerMode
- name = "sentry.profiler.GeventScheduler"
- def __init__(self, frequency):
- # type: (int) -> None
- if ThreadPool is None:
- raise ValueError("Profiler mode: {} is not available".format(self.mode))
- super().__init__(frequency=frequency)
- # used to signal to the thread that it should stop
- self.running = False
- self.thread = None # type: Optional[_ThreadPool]
- self.pid = None # type: Optional[int]
- # This intentionally uses the gevent patched threading.Lock.
- # The lock will be required when first trying to start profiles
- # as we need to spawn the profiler thread from the greenlets.
- self.lock = threading.Lock()
- def setup(self):
- # type: () -> None
- pass
- def teardown(self):
- # type: () -> None
- if self.running:
- self.running = False
- if self.thread is not None:
- self.thread.join()
- def ensure_running(self):
- # type: () -> None
- pid = os.getpid()
- # is running on the right process
- if self.running and self.pid == pid:
- return
- with self.lock:
- # another thread may have tried to acquire the lock
- # at the same time so it may start another thread
- # make sure to check again before proceeding
- if self.running and self.pid == pid:
- return
- self.pid = pid
- self.running = True
- self.thread = ThreadPool(1) # type: ignore[misc]
- try:
- self.thread.spawn(self.run)
- except RuntimeError:
- # Unfortunately at this point the interpreter is in a state that no
- # longer allows us to spawn a thread and we have to bail.
- self.running = False
- self.thread = None
- return
- def run(self):
- # type: () -> None
- last = time.perf_counter()
- while self.running:
- self.sampler()
- # some time may have elapsed since the last time
- # we sampled, so we need to account for that and
- # not sleep for too long
- elapsed = time.perf_counter() - last
- if elapsed < self.interval:
- thread_sleep(self.interval - elapsed)
- # after sleeping, make sure to take the current
- # timestamp so we can use it next iteration
- last = time.perf_counter()
|