transaction_profiler.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839
  1. """
  2. This file is originally based on code from https://github.com/nylas/nylas-perftools,
  3. which is published under the following license:
  4. The MIT License (MIT)
  5. Copyright (c) 2014 Nylas
  6. Permission is hereby granted, free of charge, to any person obtaining a copy
  7. of this software and associated documentation files (the "Software"), to deal
  8. in the Software without restriction, including without limitation the rights
  9. to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. copies of the Software, and to permit persons to whom the Software is
  11. furnished to do so, subject to the following conditions:
  12. The above copyright notice and this permission notice shall be included in all
  13. copies or substantial portions of the Software.
  14. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  20. SOFTWARE.
  21. """
  22. import atexit
  23. import os
  24. import platform
  25. import random
  26. import sys
  27. import threading
  28. import time
  29. import uuid
  30. import warnings
  31. from abc import ABC, abstractmethod
  32. from collections import deque
  33. import sentry_sdk
  34. from sentry_sdk._lru_cache import LRUCache
  35. from sentry_sdk.profiler.utils import (
  36. DEFAULT_SAMPLING_FREQUENCY,
  37. extract_stack,
  38. )
  39. from sentry_sdk.utils import (
  40. capture_internal_exception,
  41. capture_internal_exceptions,
  42. get_current_thread_meta,
  43. is_gevent,
  44. is_valid_sample_rate,
  45. logger,
  46. nanosecond_time,
  47. set_in_app_in_frames,
  48. )
  49. from typing import TYPE_CHECKING
  50. if TYPE_CHECKING:
  51. from typing import Any
  52. from typing import Callable
  53. from typing import Deque
  54. from typing import Dict
  55. from typing import List
  56. from typing import Optional
  57. from typing import Set
  58. from typing import Type
  59. from typing_extensions import TypedDict
  60. from sentry_sdk.profiler.utils import (
  61. ProcessedStack,
  62. ProcessedFrame,
  63. ProcessedThreadMetadata,
  64. FrameId,
  65. StackId,
  66. ThreadId,
  67. ExtractedSample,
  68. )
  69. from sentry_sdk._types import Event, SamplingContext, ProfilerMode
  70. ProcessedSample = TypedDict(
  71. "ProcessedSample",
  72. {
  73. "elapsed_since_start_ns": str,
  74. "thread_id": ThreadId,
  75. "stack_id": int,
  76. },
  77. )
  78. ProcessedProfile = TypedDict(
  79. "ProcessedProfile",
  80. {
  81. "frames": List[ProcessedFrame],
  82. "stacks": List[ProcessedStack],
  83. "samples": List[ProcessedSample],
  84. "thread_metadata": Dict[ThreadId, ProcessedThreadMetadata],
  85. },
  86. )
  87. try:
  88. from gevent.monkey import get_original
  89. from gevent.threadpool import ThreadPool as _ThreadPool
  90. ThreadPool = _ThreadPool # type: Optional[Type[_ThreadPool]]
  91. thread_sleep = get_original("time", "sleep")
  92. except ImportError:
  93. thread_sleep = time.sleep
  94. ThreadPool = None
  95. _scheduler = None # type: Optional[Scheduler]
  96. # The minimum number of unique samples that must exist in a profile to be
  97. # considered valid.
  98. PROFILE_MINIMUM_SAMPLES = 2
  99. def has_profiling_enabled(options):
  100. # type: (Dict[str, Any]) -> bool
  101. profiles_sampler = options["profiles_sampler"]
  102. if profiles_sampler is not None:
  103. return True
  104. profiles_sample_rate = options["profiles_sample_rate"]
  105. if profiles_sample_rate is not None and profiles_sample_rate > 0:
  106. return True
  107. profiles_sample_rate = options["_experiments"].get("profiles_sample_rate")
  108. if profiles_sample_rate is not None:
  109. logger.warning(
  110. "_experiments['profiles_sample_rate'] is deprecated. "
  111. "Please use the non-experimental profiles_sample_rate option "
  112. "directly."
  113. )
  114. if profiles_sample_rate > 0:
  115. return True
  116. return False
  117. def setup_profiler(options):
  118. # type: (Dict[str, Any]) -> bool
  119. global _scheduler
  120. if _scheduler is not None:
  121. logger.debug("[Profiling] Profiler is already setup")
  122. return False
  123. frequency = DEFAULT_SAMPLING_FREQUENCY
  124. if is_gevent():
  125. # If gevent has patched the threading modules then we cannot rely on
  126. # them to spawn a native thread for sampling.
  127. # Instead we default to the GeventScheduler which is capable of
  128. # spawning native threads within gevent.
  129. default_profiler_mode = GeventScheduler.mode
  130. else:
  131. default_profiler_mode = ThreadScheduler.mode
  132. if options.get("profiler_mode") is not None:
  133. profiler_mode = options["profiler_mode"]
  134. else:
  135. profiler_mode = options.get("_experiments", {}).get("profiler_mode")
  136. if profiler_mode is not None:
  137. logger.warning(
  138. "_experiments['profiler_mode'] is deprecated. Please use the "
  139. "non-experimental profiler_mode option directly."
  140. )
  141. profiler_mode = profiler_mode or default_profiler_mode
  142. if (
  143. profiler_mode == ThreadScheduler.mode
  144. # for legacy reasons, we'll keep supporting sleep mode for this scheduler
  145. or profiler_mode == "sleep"
  146. ):
  147. _scheduler = ThreadScheduler(frequency=frequency)
  148. elif profiler_mode == GeventScheduler.mode:
  149. _scheduler = GeventScheduler(frequency=frequency)
  150. else:
  151. raise ValueError("Unknown profiler mode: {}".format(profiler_mode))
  152. logger.debug(
  153. "[Profiling] Setting up profiler in {mode} mode".format(mode=_scheduler.mode)
  154. )
  155. _scheduler.setup()
  156. atexit.register(teardown_profiler)
  157. return True
  158. def teardown_profiler():
  159. # type: () -> None
  160. global _scheduler
  161. if _scheduler is not None:
  162. _scheduler.teardown()
  163. _scheduler = None
  164. MAX_PROFILE_DURATION_NS = int(3e10) # 30 seconds
  165. class Profile:
  166. def __init__(
  167. self,
  168. sampled, # type: Optional[bool]
  169. start_ns, # type: int
  170. hub=None, # type: Optional[sentry_sdk.Hub]
  171. scheduler=None, # type: Optional[Scheduler]
  172. ):
  173. # type: (...) -> None
  174. self.scheduler = _scheduler if scheduler is None else scheduler
  175. self.event_id = uuid.uuid4().hex # type: str
  176. self.sampled = sampled # type: Optional[bool]
  177. # Various framework integrations are capable of overwriting the active thread id.
  178. # If it is set to `None` at the end of the profile, we fall back to the default.
  179. self._default_active_thread_id = get_current_thread_meta()[0] or 0 # type: int
  180. self.active_thread_id = None # type: Optional[int]
  181. try:
  182. self.start_ns = start_ns # type: int
  183. except AttributeError:
  184. self.start_ns = 0
  185. self.stop_ns = 0 # type: int
  186. self.active = False # type: bool
  187. self.indexed_frames = {} # type: Dict[FrameId, int]
  188. self.indexed_stacks = {} # type: Dict[StackId, int]
  189. self.frames = [] # type: List[ProcessedFrame]
  190. self.stacks = [] # type: List[ProcessedStack]
  191. self.samples = [] # type: List[ProcessedSample]
  192. self.unique_samples = 0
  193. # Backwards compatibility with the old hub property
  194. self._hub = None # type: Optional[sentry_sdk.Hub]
  195. if hub is not None:
  196. self._hub = hub
  197. warnings.warn(
  198. "The `hub` parameter is deprecated. Please do not use it.",
  199. DeprecationWarning,
  200. stacklevel=2,
  201. )
  202. def update_active_thread_id(self):
  203. # type: () -> None
  204. self.active_thread_id = get_current_thread_meta()[0]
  205. logger.debug(
  206. "[Profiling] updating active thread id to {tid}".format(
  207. tid=self.active_thread_id
  208. )
  209. )
  210. def _set_initial_sampling_decision(self, sampling_context):
  211. # type: (SamplingContext) -> None
  212. """
  213. Sets the profile's sampling decision according to the following
  214. precedence rules:
  215. 1. If the transaction to be profiled is not sampled, that decision
  216. will be used, regardless of anything else.
  217. 2. Use `profiles_sample_rate` to decide.
  218. """
  219. # The corresponding transaction was not sampled,
  220. # so don't generate a profile for it.
  221. if not self.sampled:
  222. logger.debug(
  223. "[Profiling] Discarding profile because transaction is discarded."
  224. )
  225. self.sampled = False
  226. return
  227. # The profiler hasn't been properly initialized.
  228. if self.scheduler is None:
  229. logger.debug(
  230. "[Profiling] Discarding profile because profiler was not started."
  231. )
  232. self.sampled = False
  233. return
  234. client = sentry_sdk.get_client()
  235. if not client.is_active():
  236. self.sampled = False
  237. return
  238. options = client.options
  239. if callable(options.get("profiles_sampler")):
  240. sample_rate = options["profiles_sampler"](sampling_context)
  241. elif options["profiles_sample_rate"] is not None:
  242. sample_rate = options["profiles_sample_rate"]
  243. else:
  244. sample_rate = options["_experiments"].get("profiles_sample_rate")
  245. # The profiles_sample_rate option was not set, so profiling
  246. # was never enabled.
  247. if sample_rate is None:
  248. logger.debug(
  249. "[Profiling] Discarding profile because profiling was not enabled."
  250. )
  251. self.sampled = False
  252. return
  253. if not is_valid_sample_rate(sample_rate, source="Profiling"):
  254. logger.warning(
  255. "[Profiling] Discarding profile because of invalid sample rate."
  256. )
  257. self.sampled = False
  258. return
  259. # Now we roll the dice. random.random is inclusive of 0, but not of 1,
  260. # so strict < is safe here. In case sample_rate is a boolean, cast it
  261. # to a float (True becomes 1.0 and False becomes 0.0)
  262. self.sampled = random.random() < float(sample_rate)
  263. if self.sampled:
  264. logger.debug("[Profiling] Initializing profile")
  265. else:
  266. logger.debug(
  267. "[Profiling] Discarding profile because it's not included in the random sample (sample rate = {sample_rate})".format(
  268. sample_rate=float(sample_rate)
  269. )
  270. )
  271. def start(self):
  272. # type: () -> None
  273. if not self.sampled or self.active:
  274. return
  275. assert self.scheduler, "No scheduler specified"
  276. logger.debug("[Profiling] Starting profile")
  277. self.active = True
  278. if not self.start_ns:
  279. self.start_ns = nanosecond_time()
  280. self.scheduler.start_profiling(self)
  281. def stop(self):
  282. # type: () -> None
  283. if not self.sampled or not self.active:
  284. return
  285. assert self.scheduler, "No scheduler specified"
  286. logger.debug("[Profiling] Stopping profile")
  287. self.active = False
  288. self.stop_ns = nanosecond_time()
  289. def __enter__(self):
  290. # type: () -> Profile
  291. scope = sentry_sdk.get_isolation_scope()
  292. old_profile = scope.profile
  293. scope.profile = self
  294. self._context_manager_state = (scope, old_profile)
  295. self.start()
  296. return self
  297. def __exit__(self, ty, value, tb):
  298. # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
  299. with capture_internal_exceptions():
  300. self.stop()
  301. scope, old_profile = self._context_manager_state
  302. del self._context_manager_state
  303. scope.profile = old_profile
  304. def write(self, ts, sample):
  305. # type: (int, ExtractedSample) -> None
  306. if not self.active:
  307. return
  308. if ts < self.start_ns:
  309. return
  310. offset = ts - self.start_ns
  311. if offset > MAX_PROFILE_DURATION_NS:
  312. self.stop()
  313. return
  314. self.unique_samples += 1
  315. elapsed_since_start_ns = str(offset)
  316. for tid, (stack_id, frame_ids, frames) in sample:
  317. try:
  318. # Check if the stack is indexed first, this lets us skip
  319. # indexing frames if it's not necessary
  320. if stack_id not in self.indexed_stacks:
  321. for i, frame_id in enumerate(frame_ids):
  322. if frame_id not in self.indexed_frames:
  323. self.indexed_frames[frame_id] = len(self.indexed_frames)
  324. self.frames.append(frames[i])
  325. self.indexed_stacks[stack_id] = len(self.indexed_stacks)
  326. self.stacks.append(
  327. [self.indexed_frames[frame_id] for frame_id in frame_ids]
  328. )
  329. self.samples.append(
  330. {
  331. "elapsed_since_start_ns": elapsed_since_start_ns,
  332. "thread_id": tid,
  333. "stack_id": self.indexed_stacks[stack_id],
  334. }
  335. )
  336. except AttributeError:
  337. # For some reason, the frame we get doesn't have certain attributes.
  338. # When this happens, we abandon the current sample as it's bad.
  339. capture_internal_exception(sys.exc_info())
  340. def process(self):
  341. # type: () -> ProcessedProfile
  342. # This collects the thread metadata at the end of a profile. Doing it
  343. # this way means that any threads that terminate before the profile ends
  344. # will not have any metadata associated with it.
  345. thread_metadata = {
  346. str(thread.ident): {
  347. "name": str(thread.name),
  348. }
  349. for thread in threading.enumerate()
  350. } # type: Dict[str, ProcessedThreadMetadata]
  351. return {
  352. "frames": self.frames,
  353. "stacks": self.stacks,
  354. "samples": self.samples,
  355. "thread_metadata": thread_metadata,
  356. }
  357. def to_json(self, event_opt, options):
  358. # type: (Event, Dict[str, Any]) -> Dict[str, Any]
  359. profile = self.process()
  360. set_in_app_in_frames(
  361. profile["frames"],
  362. options["in_app_exclude"],
  363. options["in_app_include"],
  364. options["project_root"],
  365. )
  366. return {
  367. "environment": event_opt.get("environment"),
  368. "event_id": self.event_id,
  369. "platform": "python",
  370. "profile": profile,
  371. "release": event_opt.get("release", ""),
  372. "timestamp": event_opt["start_timestamp"],
  373. "version": "1",
  374. "device": {
  375. "architecture": platform.machine(),
  376. },
  377. "os": {
  378. "name": platform.system(),
  379. "version": platform.release(),
  380. },
  381. "runtime": {
  382. "name": platform.python_implementation(),
  383. "version": platform.python_version(),
  384. },
  385. "transactions": [
  386. {
  387. "id": event_opt["event_id"],
  388. "name": event_opt["transaction"],
  389. # we start the transaction before the profile and this is
  390. # the transaction start time relative to the profile, so we
  391. # hardcode it to 0 until we can start the profile before
  392. "relative_start_ns": "0",
  393. # use the duration of the profile instead of the transaction
  394. # because we end the transaction after the profile
  395. "relative_end_ns": str(self.stop_ns - self.start_ns),
  396. "trace_id": event_opt["contexts"]["trace"]["trace_id"],
  397. "active_thread_id": str(
  398. self._default_active_thread_id
  399. if self.active_thread_id is None
  400. else self.active_thread_id
  401. ),
  402. }
  403. ],
  404. }
  405. def valid(self):
  406. # type: () -> bool
  407. client = sentry_sdk.get_client()
  408. if not client.is_active():
  409. return False
  410. if not has_profiling_enabled(client.options):
  411. return False
  412. if self.sampled is None or not self.sampled:
  413. if client.transport:
  414. client.transport.record_lost_event(
  415. "sample_rate", data_category="profile"
  416. )
  417. return False
  418. if self.unique_samples < PROFILE_MINIMUM_SAMPLES:
  419. if client.transport:
  420. client.transport.record_lost_event(
  421. "insufficient_data", data_category="profile"
  422. )
  423. logger.debug("[Profiling] Discarding profile because insufficient samples.")
  424. return False
  425. return True
  426. @property
  427. def hub(self):
  428. # type: () -> Optional[sentry_sdk.Hub]
  429. warnings.warn(
  430. "The `hub` attribute is deprecated. Please do not access it.",
  431. DeprecationWarning,
  432. stacklevel=2,
  433. )
  434. return self._hub
  435. @hub.setter
  436. def hub(self, value):
  437. # type: (Optional[sentry_sdk.Hub]) -> None
  438. warnings.warn(
  439. "The `hub` attribute is deprecated. Please do not set it.",
  440. DeprecationWarning,
  441. stacklevel=2,
  442. )
  443. self._hub = value
  444. class Scheduler(ABC):
  445. mode = "unknown" # type: ProfilerMode
  446. def __init__(self, frequency):
  447. # type: (int) -> None
  448. self.interval = 1.0 / frequency
  449. self.sampler = self.make_sampler()
  450. # cap the number of new profiles at any time so it does not grow infinitely
  451. self.new_profiles = deque(maxlen=128) # type: Deque[Profile]
  452. self.active_profiles = set() # type: Set[Profile]
  453. def __enter__(self):
  454. # type: () -> Scheduler
  455. self.setup()
  456. return self
  457. def __exit__(self, ty, value, tb):
  458. # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
  459. self.teardown()
  460. @abstractmethod
  461. def setup(self):
  462. # type: () -> None
  463. pass
  464. @abstractmethod
  465. def teardown(self):
  466. # type: () -> None
  467. pass
  468. def ensure_running(self):
  469. # type: () -> None
  470. """
  471. Ensure the scheduler is running. By default, this method is a no-op.
  472. The method should be overridden by any implementation for which it is
  473. relevant.
  474. """
  475. return None
  476. def start_profiling(self, profile):
  477. # type: (Profile) -> None
  478. self.ensure_running()
  479. self.new_profiles.append(profile)
  480. def make_sampler(self):
  481. # type: () -> Callable[..., None]
  482. cwd = os.getcwd()
  483. cache = LRUCache(max_size=256)
  484. def _sample_stack(*args, **kwargs):
  485. # type: (*Any, **Any) -> None
  486. """
  487. Take a sample of the stack on all the threads in the process.
  488. This should be called at a regular interval to collect samples.
  489. """
  490. # no profiles taking place, so we can stop early
  491. if not self.new_profiles and not self.active_profiles:
  492. # make sure to clear the cache if we're not profiling so we dont
  493. # keep a reference to the last stack of frames around
  494. return
  495. # This is the number of profiles we want to pop off.
  496. # It's possible another thread adds a new profile to
  497. # the list and we spend longer than we want inside
  498. # the loop below.
  499. #
  500. # Also make sure to set this value before extracting
  501. # frames so we do not write to any new profiles that
  502. # were started after this point.
  503. new_profiles = len(self.new_profiles)
  504. now = nanosecond_time()
  505. try:
  506. sample = [
  507. (str(tid), extract_stack(frame, cache, cwd))
  508. for tid, frame in sys._current_frames().items()
  509. ]
  510. except AttributeError:
  511. # For some reason, the frame we get doesn't have certain attributes.
  512. # When this happens, we abandon the current sample as it's bad.
  513. capture_internal_exception(sys.exc_info())
  514. return
  515. # Move the new profiles into the active_profiles set.
  516. #
  517. # We cannot directly add the to active_profiles set
  518. # in `start_profiling` because it is called from other
  519. # threads which can cause a RuntimeError when it the
  520. # set sizes changes during iteration without a lock.
  521. #
  522. # We also want to avoid using a lock here so threads
  523. # that are starting profiles are not blocked until it
  524. # can acquire the lock.
  525. for _ in range(new_profiles):
  526. self.active_profiles.add(self.new_profiles.popleft())
  527. inactive_profiles = []
  528. for profile in self.active_profiles:
  529. if profile.active:
  530. profile.write(now, sample)
  531. else:
  532. # If a profile is marked inactive, we buffer it
  533. # to `inactive_profiles` so it can be removed.
  534. # We cannot remove it here as it would result
  535. # in a RuntimeError.
  536. inactive_profiles.append(profile)
  537. for profile in inactive_profiles:
  538. self.active_profiles.remove(profile)
  539. return _sample_stack
  540. class ThreadScheduler(Scheduler):
  541. """
  542. This scheduler is based on running a daemon thread that will call
  543. the sampler at a regular interval.
  544. """
  545. mode = "thread" # type: ProfilerMode
  546. name = "sentry.profiler.ThreadScheduler"
  547. def __init__(self, frequency):
  548. # type: (int) -> None
  549. super().__init__(frequency=frequency)
  550. # used to signal to the thread that it should stop
  551. self.running = False
  552. self.thread = None # type: Optional[threading.Thread]
  553. self.pid = None # type: Optional[int]
  554. self.lock = threading.Lock()
  555. def setup(self):
  556. # type: () -> None
  557. pass
  558. def teardown(self):
  559. # type: () -> None
  560. if self.running:
  561. self.running = False
  562. if self.thread is not None:
  563. self.thread.join()
  564. def ensure_running(self):
  565. # type: () -> None
  566. """
  567. Check that the profiler has an active thread to run in, and start one if
  568. that's not the case.
  569. Note that this might fail (e.g. in Python 3.12 it's not possible to
  570. spawn new threads at interpreter shutdown). In that case self.running
  571. will be False after running this function.
  572. """
  573. pid = os.getpid()
  574. # is running on the right process
  575. if self.running and self.pid == pid:
  576. return
  577. with self.lock:
  578. # another thread may have tried to acquire the lock
  579. # at the same time so it may start another thread
  580. # make sure to check again before proceeding
  581. if self.running and self.pid == pid:
  582. return
  583. self.pid = pid
  584. self.running = True
  585. # make sure the thread is a daemon here otherwise this
  586. # can keep the application running after other threads
  587. # have exited
  588. self.thread = threading.Thread(name=self.name, target=self.run, daemon=True)
  589. try:
  590. self.thread.start()
  591. except RuntimeError:
  592. # Unfortunately at this point the interpreter is in a state that no
  593. # longer allows us to spawn a thread and we have to bail.
  594. self.running = False
  595. self.thread = None
  596. return
  597. def run(self):
  598. # type: () -> None
  599. last = time.perf_counter()
  600. while self.running:
  601. self.sampler()
  602. # some time may have elapsed since the last time
  603. # we sampled, so we need to account for that and
  604. # not sleep for too long
  605. elapsed = time.perf_counter() - last
  606. if elapsed < self.interval:
  607. thread_sleep(self.interval - elapsed)
  608. # after sleeping, make sure to take the current
  609. # timestamp so we can use it next iteration
  610. last = time.perf_counter()
  611. class GeventScheduler(Scheduler):
  612. """
  613. This scheduler is based on the thread scheduler but adapted to work with
  614. gevent. When using gevent, it may monkey patch the threading modules
  615. (`threading` and `_thread`). This results in the use of greenlets instead
  616. of native threads.
  617. This is an issue because the sampler CANNOT run in a greenlet because
  618. 1. Other greenlets doing sync work will prevent the sampler from running
  619. 2. The greenlet runs in the same thread as other greenlets so when taking
  620. a sample, other greenlets will have been evicted from the thread. This
  621. results in a sample containing only the sampler's code.
  622. """
  623. mode = "gevent" # type: ProfilerMode
  624. name = "sentry.profiler.GeventScheduler"
  625. def __init__(self, frequency):
  626. # type: (int) -> None
  627. if ThreadPool is None:
  628. raise ValueError("Profiler mode: {} is not available".format(self.mode))
  629. super().__init__(frequency=frequency)
  630. # used to signal to the thread that it should stop
  631. self.running = False
  632. self.thread = None # type: Optional[_ThreadPool]
  633. self.pid = None # type: Optional[int]
  634. # This intentionally uses the gevent patched threading.Lock.
  635. # The lock will be required when first trying to start profiles
  636. # as we need to spawn the profiler thread from the greenlets.
  637. self.lock = threading.Lock()
  638. def setup(self):
  639. # type: () -> None
  640. pass
  641. def teardown(self):
  642. # type: () -> None
  643. if self.running:
  644. self.running = False
  645. if self.thread is not None:
  646. self.thread.join()
  647. def ensure_running(self):
  648. # type: () -> None
  649. pid = os.getpid()
  650. # is running on the right process
  651. if self.running and self.pid == pid:
  652. return
  653. with self.lock:
  654. # another thread may have tried to acquire the lock
  655. # at the same time so it may start another thread
  656. # make sure to check again before proceeding
  657. if self.running and self.pid == pid:
  658. return
  659. self.pid = pid
  660. self.running = True
  661. self.thread = ThreadPool(1) # type: ignore[misc]
  662. try:
  663. self.thread.spawn(self.run)
  664. except RuntimeError:
  665. # Unfortunately at this point the interpreter is in a state that no
  666. # longer allows us to spawn a thread and we have to bail.
  667. self.running = False
  668. self.thread = None
  669. return
  670. def run(self):
  671. # type: () -> None
  672. last = time.perf_counter()
  673. while self.running:
  674. self.sampler()
  675. # some time may have elapsed since the last time
  676. # we sampled, so we need to account for that and
  677. # not sleep for too long
  678. elapsed = time.perf_counter() - last
  679. if elapsed < self.interval:
  680. thread_sleep(self.interval - elapsed)
  681. # after sleeping, make sure to take the current
  682. # timestamp so we can use it next iteration
  683. last = time.perf_counter()