continuous_profiler.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730
  1. import atexit
  2. import os
  3. import random
  4. import sys
  5. import threading
  6. import time
  7. import uuid
  8. import warnings
  9. from collections import deque
  10. from datetime import datetime, timezone
  11. from sentry_sdk.consts import VERSION
  12. from sentry_sdk.envelope import Envelope
  13. from sentry_sdk._lru_cache import LRUCache
  14. from sentry_sdk.profiler.utils import (
  15. DEFAULT_SAMPLING_FREQUENCY,
  16. extract_stack,
  17. )
  18. from sentry_sdk.utils import (
  19. capture_internal_exception,
  20. is_gevent,
  21. logger,
  22. now,
  23. set_in_app_in_frames,
  24. )
  25. from typing import TYPE_CHECKING
  26. if TYPE_CHECKING:
  27. from typing import Any
  28. from typing import Callable
  29. from typing import Deque
  30. from typing import Dict
  31. from typing import List
  32. from typing import Optional
  33. from typing import Set
  34. from typing import Type
  35. from typing import Union
  36. from typing_extensions import TypedDict
  37. from sentry_sdk._types import ContinuousProfilerMode, SDKInfo
  38. from sentry_sdk.profiler.utils import (
  39. ExtractedSample,
  40. FrameId,
  41. StackId,
  42. ThreadId,
  43. ProcessedFrame,
  44. ProcessedStack,
  45. )
  46. ProcessedSample = TypedDict(
  47. "ProcessedSample",
  48. {
  49. "timestamp": float,
  50. "thread_id": ThreadId,
  51. "stack_id": int,
  52. },
  53. )
  54. try:
  55. from gevent.monkey import get_original
  56. from gevent.threadpool import ThreadPool as _ThreadPool
  57. ThreadPool = _ThreadPool # type: Optional[Type[_ThreadPool]]
  58. thread_sleep = get_original("time", "sleep")
  59. except ImportError:
  60. thread_sleep = time.sleep
  61. ThreadPool = None
  62. _scheduler = None # type: Optional[ContinuousScheduler]
  63. def setup_continuous_profiler(options, sdk_info, capture_func):
  64. # type: (Dict[str, Any], SDKInfo, Callable[[Envelope], None]) -> bool
  65. global _scheduler
  66. already_initialized = _scheduler is not None
  67. if already_initialized:
  68. logger.debug("[Profiling] Continuous Profiler is already setup")
  69. teardown_continuous_profiler()
  70. if is_gevent():
  71. # If gevent has patched the threading modules then we cannot rely on
  72. # them to spawn a native thread for sampling.
  73. # Instead we default to the GeventContinuousScheduler which is capable of
  74. # spawning native threads within gevent.
  75. default_profiler_mode = GeventContinuousScheduler.mode
  76. else:
  77. default_profiler_mode = ThreadContinuousScheduler.mode
  78. if options.get("profiler_mode") is not None:
  79. profiler_mode = options["profiler_mode"]
  80. else:
  81. # TODO: deprecate this and just use the existing `profiler_mode`
  82. experiments = options.get("_experiments", {})
  83. profiler_mode = (
  84. experiments.get("continuous_profiling_mode") or default_profiler_mode
  85. )
  86. frequency = DEFAULT_SAMPLING_FREQUENCY
  87. if profiler_mode == ThreadContinuousScheduler.mode:
  88. _scheduler = ThreadContinuousScheduler(
  89. frequency, options, sdk_info, capture_func
  90. )
  91. elif profiler_mode == GeventContinuousScheduler.mode:
  92. _scheduler = GeventContinuousScheduler(
  93. frequency, options, sdk_info, capture_func
  94. )
  95. else:
  96. raise ValueError("Unknown continuous profiler mode: {}".format(profiler_mode))
  97. logger.debug(
  98. "[Profiling] Setting up continuous profiler in {mode} mode".format(
  99. mode=_scheduler.mode
  100. )
  101. )
  102. if not already_initialized:
  103. atexit.register(teardown_continuous_profiler)
  104. return True
  105. def is_profile_session_sampled():
  106. # type: () -> bool
  107. if _scheduler is None:
  108. return False
  109. return _scheduler.sampled
  110. def try_autostart_continuous_profiler():
  111. # type: () -> None
  112. # TODO: deprecate this as it'll be replaced by the auto lifecycle option
  113. if _scheduler is None:
  114. return
  115. if not _scheduler.is_auto_start_enabled():
  116. return
  117. _scheduler.manual_start()
  118. def try_profile_lifecycle_trace_start():
  119. # type: () -> Union[ContinuousProfile, None]
  120. if _scheduler is None:
  121. return None
  122. return _scheduler.auto_start()
  123. def start_profiler():
  124. # type: () -> None
  125. if _scheduler is None:
  126. return
  127. _scheduler.manual_start()
  128. def start_profile_session():
  129. # type: () -> None
  130. warnings.warn(
  131. "The `start_profile_session` function is deprecated. Please use `start_profile` instead.",
  132. DeprecationWarning,
  133. stacklevel=2,
  134. )
  135. start_profiler()
  136. def stop_profiler():
  137. # type: () -> None
  138. if _scheduler is None:
  139. return
  140. _scheduler.manual_stop()
  141. def stop_profile_session():
  142. # type: () -> None
  143. warnings.warn(
  144. "The `stop_profile_session` function is deprecated. Please use `stop_profile` instead.",
  145. DeprecationWarning,
  146. stacklevel=2,
  147. )
  148. stop_profiler()
  149. def teardown_continuous_profiler():
  150. # type: () -> None
  151. stop_profiler()
  152. global _scheduler
  153. _scheduler = None
  154. def get_profiler_id():
  155. # type: () -> Union[str, None]
  156. if _scheduler is None:
  157. return None
  158. return _scheduler.profiler_id
  159. def determine_profile_session_sampling_decision(sample_rate):
  160. # type: (Union[float, None]) -> bool
  161. # `None` is treated as `0.0`
  162. if not sample_rate:
  163. return False
  164. return random.random() < float(sample_rate)
  165. class ContinuousProfile:
  166. active: bool = True
  167. def stop(self):
  168. # type: () -> None
  169. self.active = False
  170. class ContinuousScheduler:
  171. mode = "unknown" # type: ContinuousProfilerMode
  172. def __init__(self, frequency, options, sdk_info, capture_func):
  173. # type: (int, Dict[str, Any], SDKInfo, Callable[[Envelope], None]) -> None
  174. self.interval = 1.0 / frequency
  175. self.options = options
  176. self.sdk_info = sdk_info
  177. self.capture_func = capture_func
  178. self.lifecycle = self.options.get("profile_lifecycle")
  179. profile_session_sample_rate = self.options.get("profile_session_sample_rate")
  180. self.sampled = determine_profile_session_sampling_decision(
  181. profile_session_sample_rate
  182. )
  183. self.sampler = self.make_sampler()
  184. self.buffer = None # type: Optional[ProfileBuffer]
  185. self.pid = None # type: Optional[int]
  186. self.running = False
  187. self.soft_shutdown = False
  188. self.new_profiles = deque(maxlen=128) # type: Deque[ContinuousProfile]
  189. self.active_profiles = set() # type: Set[ContinuousProfile]
  190. def is_auto_start_enabled(self):
  191. # type: () -> bool
  192. # Ensure that the scheduler only autostarts once per process.
  193. # This is necessary because many web servers use forks to spawn
  194. # additional processes. And the profiler is only spawned on the
  195. # master process, then it often only profiles the main process
  196. # and not the ones where the requests are being handled.
  197. if self.pid == os.getpid():
  198. return False
  199. experiments = self.options.get("_experiments")
  200. if not experiments:
  201. return False
  202. return experiments.get("continuous_profiling_auto_start")
  203. def auto_start(self):
  204. # type: () -> Union[ContinuousProfile, None]
  205. if not self.sampled:
  206. return None
  207. if self.lifecycle != "trace":
  208. return None
  209. logger.debug("[Profiling] Auto starting profiler")
  210. profile = ContinuousProfile()
  211. self.new_profiles.append(profile)
  212. self.ensure_running()
  213. return profile
  214. def manual_start(self):
  215. # type: () -> None
  216. if not self.sampled:
  217. return
  218. if self.lifecycle != "manual":
  219. return
  220. self.ensure_running()
  221. def manual_stop(self):
  222. # type: () -> None
  223. if self.lifecycle != "manual":
  224. return
  225. self.teardown()
  226. def ensure_running(self):
  227. # type: () -> None
  228. raise NotImplementedError
  229. def teardown(self):
  230. # type: () -> None
  231. raise NotImplementedError
  232. def pause(self):
  233. # type: () -> None
  234. raise NotImplementedError
  235. def reset_buffer(self):
  236. # type: () -> None
  237. self.buffer = ProfileBuffer(
  238. self.options, self.sdk_info, PROFILE_BUFFER_SECONDS, self.capture_func
  239. )
  240. @property
  241. def profiler_id(self):
  242. # type: () -> Union[str, None]
  243. if self.buffer is None:
  244. return None
  245. return self.buffer.profiler_id
  246. def make_sampler(self):
  247. # type: () -> Callable[..., bool]
  248. cwd = os.getcwd()
  249. cache = LRUCache(max_size=256)
  250. if self.lifecycle == "trace":
  251. def _sample_stack(*args, **kwargs):
  252. # type: (*Any, **Any) -> bool
  253. """
  254. Take a sample of the stack on all the threads in the process.
  255. This should be called at a regular interval to collect samples.
  256. """
  257. # no profiles taking place, so we can stop early
  258. if not self.new_profiles and not self.active_profiles:
  259. return True
  260. # This is the number of profiles we want to pop off.
  261. # It's possible another thread adds a new profile to
  262. # the list and we spend longer than we want inside
  263. # the loop below.
  264. #
  265. # Also make sure to set this value before extracting
  266. # frames so we do not write to any new profiles that
  267. # were started after this point.
  268. new_profiles = len(self.new_profiles)
  269. ts = now()
  270. try:
  271. sample = [
  272. (str(tid), extract_stack(frame, cache, cwd))
  273. for tid, frame in sys._current_frames().items()
  274. ]
  275. except AttributeError:
  276. # For some reason, the frame we get doesn't have certain attributes.
  277. # When this happens, we abandon the current sample as it's bad.
  278. capture_internal_exception(sys.exc_info())
  279. return False
  280. # Move the new profiles into the active_profiles set.
  281. #
  282. # We cannot directly add the to active_profiles set
  283. # in `start_profiling` because it is called from other
  284. # threads which can cause a RuntimeError when it the
  285. # set sizes changes during iteration without a lock.
  286. #
  287. # We also want to avoid using a lock here so threads
  288. # that are starting profiles are not blocked until it
  289. # can acquire the lock.
  290. for _ in range(new_profiles):
  291. self.active_profiles.add(self.new_profiles.popleft())
  292. inactive_profiles = []
  293. for profile in self.active_profiles:
  294. if not profile.active:
  295. # If a profile is marked inactive, we buffer it
  296. # to `inactive_profiles` so it can be removed.
  297. # We cannot remove it here as it would result
  298. # in a RuntimeError.
  299. inactive_profiles.append(profile)
  300. for profile in inactive_profiles:
  301. self.active_profiles.remove(profile)
  302. if self.buffer is not None:
  303. self.buffer.write(ts, sample)
  304. return False
  305. else:
  306. def _sample_stack(*args, **kwargs):
  307. # type: (*Any, **Any) -> bool
  308. """
  309. Take a sample of the stack on all the threads in the process.
  310. This should be called at a regular interval to collect samples.
  311. """
  312. ts = now()
  313. try:
  314. sample = [
  315. (str(tid), extract_stack(frame, cache, cwd))
  316. for tid, frame in sys._current_frames().items()
  317. ]
  318. except AttributeError:
  319. # For some reason, the frame we get doesn't have certain attributes.
  320. # When this happens, we abandon the current sample as it's bad.
  321. capture_internal_exception(sys.exc_info())
  322. return False
  323. if self.buffer is not None:
  324. self.buffer.write(ts, sample)
  325. return False
  326. return _sample_stack
  327. def run(self):
  328. # type: () -> None
  329. last = time.perf_counter()
  330. while self.running:
  331. self.soft_shutdown = self.sampler()
  332. # some time may have elapsed since the last time
  333. # we sampled, so we need to account for that and
  334. # not sleep for too long
  335. elapsed = time.perf_counter() - last
  336. if elapsed < self.interval:
  337. thread_sleep(self.interval - elapsed)
  338. # the soft shutdown happens here to give it a chance
  339. # for the profiler to be reused
  340. if self.soft_shutdown:
  341. self.running = False
  342. # make sure to explicitly exit the profiler here or there might
  343. # be multiple profilers at once
  344. break
  345. # after sleeping, make sure to take the current
  346. # timestamp so we can use it next iteration
  347. last = time.perf_counter()
  348. if self.buffer is not None:
  349. self.buffer.flush()
  350. self.buffer = None
  351. class ThreadContinuousScheduler(ContinuousScheduler):
  352. """
  353. This scheduler is based on running a daemon thread that will call
  354. the sampler at a regular interval.
  355. """
  356. mode = "thread" # type: ContinuousProfilerMode
  357. name = "sentry.profiler.ThreadContinuousScheduler"
  358. def __init__(self, frequency, options, sdk_info, capture_func):
  359. # type: (int, Dict[str, Any], SDKInfo, Callable[[Envelope], None]) -> None
  360. super().__init__(frequency, options, sdk_info, capture_func)
  361. self.thread = None # type: Optional[threading.Thread]
  362. self.lock = threading.Lock()
  363. def ensure_running(self):
  364. # type: () -> None
  365. self.soft_shutdown = False
  366. pid = os.getpid()
  367. # is running on the right process
  368. if self.running and self.pid == pid:
  369. return
  370. with self.lock:
  371. # another thread may have tried to acquire the lock
  372. # at the same time so it may start another thread
  373. # make sure to check again before proceeding
  374. if self.running and self.pid == pid:
  375. return
  376. self.pid = pid
  377. self.running = True
  378. # if the profiler thread is changing,
  379. # we should create a new buffer along with it
  380. self.reset_buffer()
  381. # make sure the thread is a daemon here otherwise this
  382. # can keep the application running after other threads
  383. # have exited
  384. self.thread = threading.Thread(name=self.name, target=self.run, daemon=True)
  385. try:
  386. self.thread.start()
  387. except RuntimeError:
  388. # Unfortunately at this point the interpreter is in a state that no
  389. # longer allows us to spawn a thread and we have to bail.
  390. self.running = False
  391. self.thread = None
  392. def teardown(self):
  393. # type: () -> None
  394. if self.running:
  395. self.running = False
  396. if self.thread is not None:
  397. self.thread.join()
  398. self.thread = None
  399. self.buffer = None
  400. class GeventContinuousScheduler(ContinuousScheduler):
  401. """
  402. This scheduler is based on the thread scheduler but adapted to work with
  403. gevent. When using gevent, it may monkey patch the threading modules
  404. (`threading` and `_thread`). This results in the use of greenlets instead
  405. of native threads.
  406. This is an issue because the sampler CANNOT run in a greenlet because
  407. 1. Other greenlets doing sync work will prevent the sampler from running
  408. 2. The greenlet runs in the same thread as other greenlets so when taking
  409. a sample, other greenlets will have been evicted from the thread. This
  410. results in a sample containing only the sampler's code.
  411. """
  412. mode = "gevent" # type: ContinuousProfilerMode
  413. def __init__(self, frequency, options, sdk_info, capture_func):
  414. # type: (int, Dict[str, Any], SDKInfo, Callable[[Envelope], None]) -> None
  415. if ThreadPool is None:
  416. raise ValueError("Profiler mode: {} is not available".format(self.mode))
  417. super().__init__(frequency, options, sdk_info, capture_func)
  418. self.thread = None # type: Optional[_ThreadPool]
  419. self.lock = threading.Lock()
  420. def ensure_running(self):
  421. # type: () -> None
  422. self.soft_shutdown = False
  423. pid = os.getpid()
  424. # is running on the right process
  425. if self.running and self.pid == pid:
  426. return
  427. with self.lock:
  428. # another thread may have tried to acquire the lock
  429. # at the same time so it may start another thread
  430. # make sure to check again before proceeding
  431. if self.running and self.pid == pid:
  432. return
  433. self.pid = pid
  434. self.running = True
  435. # if the profiler thread is changing,
  436. # we should create a new buffer along with it
  437. self.reset_buffer()
  438. self.thread = ThreadPool(1) # type: ignore[misc]
  439. try:
  440. self.thread.spawn(self.run)
  441. except RuntimeError:
  442. # Unfortunately at this point the interpreter is in a state that no
  443. # longer allows us to spawn a thread and we have to bail.
  444. self.running = False
  445. self.thread = None
  446. def teardown(self):
  447. # type: () -> None
  448. if self.running:
  449. self.running = False
  450. if self.thread is not None:
  451. self.thread.join()
  452. self.thread = None
  453. self.buffer = None
  454. PROFILE_BUFFER_SECONDS = 60
  455. class ProfileBuffer:
  456. def __init__(self, options, sdk_info, buffer_size, capture_func):
  457. # type: (Dict[str, Any], SDKInfo, int, Callable[[Envelope], None]) -> None
  458. self.options = options
  459. self.sdk_info = sdk_info
  460. self.buffer_size = buffer_size
  461. self.capture_func = capture_func
  462. self.profiler_id = uuid.uuid4().hex
  463. self.chunk = ProfileChunk()
  464. # Make sure to use the same clock to compute a sample's monotonic timestamp
  465. # to ensure the timestamps are correctly aligned.
  466. self.start_monotonic_time = now()
  467. # Make sure the start timestamp is defined only once per profiler id.
  468. # This prevents issues with clock drift within a single profiler session.
  469. #
  470. # Subtracting the start_monotonic_time here to find a fixed starting position
  471. # for relative monotonic timestamps for each sample.
  472. self.start_timestamp = (
  473. datetime.now(timezone.utc).timestamp() - self.start_monotonic_time
  474. )
  475. def write(self, monotonic_time, sample):
  476. # type: (float, ExtractedSample) -> None
  477. if self.should_flush(monotonic_time):
  478. self.flush()
  479. self.chunk = ProfileChunk()
  480. self.start_monotonic_time = now()
  481. self.chunk.write(self.start_timestamp + monotonic_time, sample)
  482. def should_flush(self, monotonic_time):
  483. # type: (float) -> bool
  484. # If the delta between the new monotonic time and the start monotonic time
  485. # exceeds the buffer size, it means we should flush the chunk
  486. return monotonic_time - self.start_monotonic_time >= self.buffer_size
  487. def flush(self):
  488. # type: () -> None
  489. chunk = self.chunk.to_json(self.profiler_id, self.options, self.sdk_info)
  490. envelope = Envelope()
  491. envelope.add_profile_chunk(chunk)
  492. self.capture_func(envelope)
  493. class ProfileChunk:
  494. def __init__(self):
  495. # type: () -> None
  496. self.chunk_id = uuid.uuid4().hex
  497. self.indexed_frames = {} # type: Dict[FrameId, int]
  498. self.indexed_stacks = {} # type: Dict[StackId, int]
  499. self.frames = [] # type: List[ProcessedFrame]
  500. self.stacks = [] # type: List[ProcessedStack]
  501. self.samples = [] # type: List[ProcessedSample]
  502. def write(self, ts, sample):
  503. # type: (float, ExtractedSample) -> None
  504. for tid, (stack_id, frame_ids, frames) in sample:
  505. try:
  506. # Check if the stack is indexed first, this lets us skip
  507. # indexing frames if it's not necessary
  508. if stack_id not in self.indexed_stacks:
  509. for i, frame_id in enumerate(frame_ids):
  510. if frame_id not in self.indexed_frames:
  511. self.indexed_frames[frame_id] = len(self.indexed_frames)
  512. self.frames.append(frames[i])
  513. self.indexed_stacks[stack_id] = len(self.indexed_stacks)
  514. self.stacks.append(
  515. [self.indexed_frames[frame_id] for frame_id in frame_ids]
  516. )
  517. self.samples.append(
  518. {
  519. "timestamp": ts,
  520. "thread_id": tid,
  521. "stack_id": self.indexed_stacks[stack_id],
  522. }
  523. )
  524. except AttributeError:
  525. # For some reason, the frame we get doesn't have certain attributes.
  526. # When this happens, we abandon the current sample as it's bad.
  527. capture_internal_exception(sys.exc_info())
  528. def to_json(self, profiler_id, options, sdk_info):
  529. # type: (str, Dict[str, Any], SDKInfo) -> Dict[str, Any]
  530. profile = {
  531. "frames": self.frames,
  532. "stacks": self.stacks,
  533. "samples": self.samples,
  534. "thread_metadata": {
  535. str(thread.ident): {
  536. "name": str(thread.name),
  537. }
  538. for thread in threading.enumerate()
  539. },
  540. }
  541. set_in_app_in_frames(
  542. profile["frames"],
  543. options["in_app_exclude"],
  544. options["in_app_include"],
  545. options["project_root"],
  546. )
  547. payload = {
  548. "chunk_id": self.chunk_id,
  549. "client_sdk": {
  550. "name": sdk_info["name"],
  551. "version": VERSION,
  552. },
  553. "platform": "python",
  554. "profile": profile,
  555. "profiler_id": profiler_id,
  556. "version": "2",
  557. }
  558. for key in "release", "environment", "dist":
  559. if options[key] is not None:
  560. payload[key] = str(options[key]).strip()
  561. return payload