continuous_profiler.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. import atexit
  2. import os
  3. import random
  4. import sys
  5. import threading
  6. import time
  7. import uuid
  8. import warnings
  9. from collections import deque
  10. from datetime import datetime, timezone
  11. from sentry_sdk.consts import VERSION
  12. from sentry_sdk.envelope import Envelope
  13. from sentry_sdk._lru_cache import LRUCache
  14. from sentry_sdk.profiler.utils import (
  15. DEFAULT_SAMPLING_FREQUENCY,
  16. extract_stack,
  17. )
  18. from sentry_sdk.utils import (
  19. capture_internal_exception,
  20. is_gevent,
  21. logger,
  22. now,
  23. set_in_app_in_frames,
  24. )
  25. from typing import TYPE_CHECKING
  26. if TYPE_CHECKING:
  27. from typing import Any
  28. from typing import Callable
  29. from typing import Deque
  30. from typing import Dict
  31. from typing import List
  32. from typing import Optional
  33. from typing import Set
  34. from typing import Type
  35. from typing import Union
  36. from typing_extensions import TypedDict
  37. from sentry_sdk._types import ContinuousProfilerMode, SDKInfo
  38. from sentry_sdk.profiler.utils import (
  39. ExtractedSample,
  40. FrameId,
  41. StackId,
  42. ThreadId,
  43. ProcessedFrame,
  44. ProcessedStack,
  45. )
  46. ProcessedSample = TypedDict(
  47. "ProcessedSample",
  48. {
  49. "timestamp": float,
  50. "thread_id": ThreadId,
  51. "stack_id": int,
  52. },
  53. )
  54. try:
  55. from gevent.monkey import get_original
  56. from gevent.threadpool import ThreadPool as _ThreadPool
  57. ThreadPool: "Optional[Type[_ThreadPool]]" = _ThreadPool
  58. thread_sleep = get_original("time", "sleep")
  59. except ImportError:
  60. thread_sleep = time.sleep
  61. ThreadPool = None
  62. _scheduler: "Optional[ContinuousScheduler]" = None
  63. def setup_continuous_profiler(
  64. options: "Dict[str, Any]",
  65. sdk_info: "SDKInfo",
  66. capture_func: "Callable[[Envelope], None]",
  67. ) -> bool:
  68. global _scheduler
  69. already_initialized = _scheduler is not None
  70. if already_initialized:
  71. logger.debug("[Profiling] Continuous Profiler is already setup")
  72. teardown_continuous_profiler()
  73. if is_gevent():
  74. # If gevent has patched the threading modules then we cannot rely on
  75. # them to spawn a native thread for sampling.
  76. # Instead we default to the GeventContinuousScheduler which is capable of
  77. # spawning native threads within gevent.
  78. default_profiler_mode = GeventContinuousScheduler.mode
  79. else:
  80. default_profiler_mode = ThreadContinuousScheduler.mode
  81. if options.get("profiler_mode") is not None:
  82. profiler_mode = options["profiler_mode"]
  83. else:
  84. # TODO: deprecate this and just use the existing `profiler_mode`
  85. experiments = options.get("_experiments", {})
  86. profiler_mode = (
  87. experiments.get("continuous_profiling_mode") or default_profiler_mode
  88. )
  89. frequency = DEFAULT_SAMPLING_FREQUENCY
  90. if profiler_mode == ThreadContinuousScheduler.mode:
  91. _scheduler = ThreadContinuousScheduler(
  92. frequency, options, sdk_info, capture_func
  93. )
  94. elif profiler_mode == GeventContinuousScheduler.mode:
  95. _scheduler = GeventContinuousScheduler(
  96. frequency, options, sdk_info, capture_func
  97. )
  98. else:
  99. raise ValueError("Unknown continuous profiler mode: {}".format(profiler_mode))
  100. logger.debug(
  101. "[Profiling] Setting up continuous profiler in {mode} mode".format(
  102. mode=_scheduler.mode
  103. )
  104. )
  105. if not already_initialized:
  106. atexit.register(teardown_continuous_profiler)
  107. return True
  108. def is_profile_session_sampled() -> bool:
  109. if _scheduler is None:
  110. return False
  111. return _scheduler.sampled
  112. def try_autostart_continuous_profiler() -> None:
  113. # TODO: deprecate this as it'll be replaced by the auto lifecycle option
  114. if _scheduler is None:
  115. return
  116. if not _scheduler.is_auto_start_enabled():
  117. return
  118. _scheduler.manual_start()
  119. def try_profile_lifecycle_trace_start() -> "Union[ContinuousProfile, None]":
  120. if _scheduler is None:
  121. return None
  122. return _scheduler.auto_start()
  123. def start_profiler() -> None:
  124. if _scheduler is None:
  125. return
  126. _scheduler.manual_start()
  127. def start_profile_session() -> None:
  128. warnings.warn(
  129. "The `start_profile_session` function is deprecated. Please use `start_profile` instead.",
  130. DeprecationWarning,
  131. stacklevel=2,
  132. )
  133. start_profiler()
  134. def stop_profiler() -> None:
  135. if _scheduler is None:
  136. return
  137. _scheduler.manual_stop()
  138. def stop_profile_session() -> None:
  139. warnings.warn(
  140. "The `stop_profile_session` function is deprecated. Please use `stop_profile` instead.",
  141. DeprecationWarning,
  142. stacklevel=2,
  143. )
  144. stop_profiler()
  145. def teardown_continuous_profiler() -> None:
  146. stop_profiler()
  147. global _scheduler
  148. _scheduler = None
  149. def get_profiler_id() -> "Union[str, None]":
  150. if _scheduler is None:
  151. return None
  152. return _scheduler.profiler_id
  153. def determine_profile_session_sampling_decision(
  154. sample_rate: "Union[float, None]",
  155. ) -> bool:
  156. # `None` is treated as `0.0`
  157. if not sample_rate:
  158. return False
  159. return random.random() < float(sample_rate)
  160. class ContinuousProfile:
  161. active: bool = True
  162. def stop(self) -> None:
  163. self.active = False
  164. class ContinuousScheduler:
  165. mode: "ContinuousProfilerMode" = "unknown"
  166. def __init__(
  167. self,
  168. frequency: int,
  169. options: "Dict[str, Any]",
  170. sdk_info: "SDKInfo",
  171. capture_func: "Callable[[Envelope], None]",
  172. ) -> None:
  173. self.interval = 1.0 / frequency
  174. self.options = options
  175. self.sdk_info = sdk_info
  176. self.capture_func = capture_func
  177. self.lifecycle = self.options.get("profile_lifecycle")
  178. profile_session_sample_rate = self.options.get("profile_session_sample_rate")
  179. self.sampled = determine_profile_session_sampling_decision(
  180. profile_session_sample_rate
  181. )
  182. self.sampler = self.make_sampler()
  183. self.buffer: "Optional[ProfileBuffer]" = None
  184. self.pid: "Optional[int]" = None
  185. self.running = False
  186. self.soft_shutdown = False
  187. self.new_profiles: "Deque[ContinuousProfile]" = deque(maxlen=128)
  188. self.active_profiles: "Set[ContinuousProfile]" = set()
  189. def is_auto_start_enabled(self) -> bool:
  190. # Ensure that the scheduler only autostarts once per process.
  191. # This is necessary because many web servers use forks to spawn
  192. # additional processes. And the profiler is only spawned on the
  193. # master process, then it often only profiles the main process
  194. # and not the ones where the requests are being handled.
  195. if self.pid == os.getpid():
  196. return False
  197. experiments = self.options.get("_experiments")
  198. if not experiments:
  199. return False
  200. return experiments.get("continuous_profiling_auto_start")
  201. def auto_start(self) -> "Union[ContinuousProfile, None]":
  202. if not self.sampled:
  203. return None
  204. if self.lifecycle != "trace":
  205. return None
  206. logger.debug("[Profiling] Auto starting profiler")
  207. profile = ContinuousProfile()
  208. self.new_profiles.append(profile)
  209. self.ensure_running()
  210. return profile
  211. def manual_start(self) -> None:
  212. if not self.sampled:
  213. return
  214. if self.lifecycle != "manual":
  215. return
  216. self.ensure_running()
  217. def manual_stop(self) -> None:
  218. if self.lifecycle != "manual":
  219. return
  220. self.teardown()
  221. def ensure_running(self) -> None:
  222. raise NotImplementedError
  223. def teardown(self) -> None:
  224. raise NotImplementedError
  225. def pause(self) -> None:
  226. raise NotImplementedError
  227. def reset_buffer(self) -> None:
  228. self.buffer = ProfileBuffer(
  229. self.options, self.sdk_info, PROFILE_BUFFER_SECONDS, self.capture_func
  230. )
  231. @property
  232. def profiler_id(self) -> "Union[str, None]":
  233. if not self.running or self.buffer is None:
  234. return None
  235. return self.buffer.profiler_id
  236. def make_sampler(self) -> "Callable[..., bool]":
  237. cwd = os.getcwd()
  238. cache = LRUCache(max_size=256)
  239. if self.lifecycle == "trace":
  240. def _sample_stack(*args: "Any", **kwargs: "Any") -> bool:
  241. """
  242. Take a sample of the stack on all the threads in the process.
  243. This should be called at a regular interval to collect samples.
  244. """
  245. # no profiles taking place, so we can stop early
  246. if not self.new_profiles and not self.active_profiles:
  247. return True
  248. # This is the number of profiles we want to pop off.
  249. # It's possible another thread adds a new profile to
  250. # the list and we spend longer than we want inside
  251. # the loop below.
  252. #
  253. # Also make sure to set this value before extracting
  254. # frames so we do not write to any new profiles that
  255. # were started after this point.
  256. new_profiles = len(self.new_profiles)
  257. ts = now()
  258. try:
  259. sample = [
  260. (str(tid), extract_stack(frame, cache, cwd))
  261. for tid, frame in sys._current_frames().items()
  262. ]
  263. except AttributeError:
  264. # For some reason, the frame we get doesn't have certain attributes.
  265. # When this happens, we abandon the current sample as it's bad.
  266. capture_internal_exception(sys.exc_info())
  267. return False
  268. # Move the new profiles into the active_profiles set.
  269. #
  270. # We cannot directly add the to active_profiles set
  271. # in `start_profiling` because it is called from other
  272. # threads which can cause a RuntimeError when it the
  273. # set sizes changes during iteration without a lock.
  274. #
  275. # We also want to avoid using a lock here so threads
  276. # that are starting profiles are not blocked until it
  277. # can acquire the lock.
  278. for _ in range(new_profiles):
  279. self.active_profiles.add(self.new_profiles.popleft())
  280. inactive_profiles = []
  281. for profile in self.active_profiles:
  282. if not profile.active:
  283. # If a profile is marked inactive, we buffer it
  284. # to `inactive_profiles` so it can be removed.
  285. # We cannot remove it here as it would result
  286. # in a RuntimeError.
  287. inactive_profiles.append(profile)
  288. for profile in inactive_profiles:
  289. self.active_profiles.remove(profile)
  290. if self.buffer is not None:
  291. self.buffer.write(ts, sample)
  292. return False
  293. else:
  294. def _sample_stack(*args: "Any", **kwargs: "Any") -> bool:
  295. """
  296. Take a sample of the stack on all the threads in the process.
  297. This should be called at a regular interval to collect samples.
  298. """
  299. ts = now()
  300. try:
  301. sample = [
  302. (str(tid), extract_stack(frame, cache, cwd))
  303. for tid, frame in sys._current_frames().items()
  304. ]
  305. except AttributeError:
  306. # For some reason, the frame we get doesn't have certain attributes.
  307. # When this happens, we abandon the current sample as it's bad.
  308. capture_internal_exception(sys.exc_info())
  309. return False
  310. if self.buffer is not None:
  311. self.buffer.write(ts, sample)
  312. return False
  313. return _sample_stack
  314. def run(self) -> None:
  315. last = time.perf_counter()
  316. while self.running:
  317. self.soft_shutdown = self.sampler()
  318. # some time may have elapsed since the last time
  319. # we sampled, so we need to account for that and
  320. # not sleep for too long
  321. elapsed = time.perf_counter() - last
  322. if elapsed < self.interval:
  323. thread_sleep(self.interval - elapsed)
  324. # the soft shutdown happens here to give it a chance
  325. # for the profiler to be reused
  326. if self.soft_shutdown:
  327. self.running = False
  328. # make sure to explicitly exit the profiler here or there might
  329. # be multiple profilers at once
  330. break
  331. # after sleeping, make sure to take the current
  332. # timestamp so we can use it next iteration
  333. last = time.perf_counter()
  334. buffer = self.buffer
  335. if buffer is not None:
  336. buffer.flush()
  337. class ThreadContinuousScheduler(ContinuousScheduler):
  338. """
  339. This scheduler is based on running a daemon thread that will call
  340. the sampler at a regular interval.
  341. """
  342. mode: "ContinuousProfilerMode" = "thread"
  343. name = "sentry.profiler.ThreadContinuousScheduler"
  344. def __init__(
  345. self,
  346. frequency: int,
  347. options: "Dict[str, Any]",
  348. sdk_info: "SDKInfo",
  349. capture_func: "Callable[[Envelope], None]",
  350. ) -> None:
  351. super().__init__(frequency, options, sdk_info, capture_func)
  352. self.thread: "Optional[threading.Thread]" = None
  353. self.lock = threading.Lock()
  354. def ensure_running(self) -> None:
  355. self.soft_shutdown = False
  356. pid = os.getpid()
  357. # is running on the right process
  358. if self.running and self.pid == pid:
  359. return
  360. with self.lock:
  361. # another thread may have tried to acquire the lock
  362. # at the same time so it may start another thread
  363. # make sure to check again before proceeding
  364. if self.running and self.pid == pid:
  365. return
  366. self.pid = pid
  367. self.running = True
  368. # if the profiler thread is changing,
  369. # we should create a new buffer along with it
  370. self.reset_buffer()
  371. # make sure the thread is a daemon here otherwise this
  372. # can keep the application running after other threads
  373. # have exited
  374. self.thread = threading.Thread(name=self.name, target=self.run, daemon=True)
  375. try:
  376. self.thread.start()
  377. except RuntimeError:
  378. # Unfortunately at this point the interpreter is in a state that no
  379. # longer allows us to spawn a thread and we have to bail.
  380. self.running = False
  381. self.thread = None
  382. def teardown(self) -> None:
  383. if self.running:
  384. self.running = False
  385. if self.thread is not None:
  386. self.thread.join()
  387. self.thread = None
  388. self.buffer = None
  389. class GeventContinuousScheduler(ContinuousScheduler):
  390. """
  391. This scheduler is based on the thread scheduler but adapted to work with
  392. gevent. When using gevent, it may monkey patch the threading modules
  393. (`threading` and `_thread`). This results in the use of greenlets instead
  394. of native threads.
  395. This is an issue because the sampler CANNOT run in a greenlet because
  396. 1. Other greenlets doing sync work will prevent the sampler from running
  397. 2. The greenlet runs in the same thread as other greenlets so when taking
  398. a sample, other greenlets will have been evicted from the thread. This
  399. results in a sample containing only the sampler's code.
  400. """
  401. mode: "ContinuousProfilerMode" = "gevent"
  402. def __init__(
  403. self,
  404. frequency: int,
  405. options: "Dict[str, Any]",
  406. sdk_info: "SDKInfo",
  407. capture_func: "Callable[[Envelope], None]",
  408. ) -> None:
  409. if ThreadPool is None:
  410. raise ValueError("Profiler mode: {} is not available".format(self.mode))
  411. super().__init__(frequency, options, sdk_info, capture_func)
  412. self.thread: "Optional[_ThreadPool]" = None
  413. self.lock = threading.Lock()
  414. def ensure_running(self) -> None:
  415. self.soft_shutdown = False
  416. pid = os.getpid()
  417. # is running on the right process
  418. if self.running and self.pid == pid:
  419. return
  420. with self.lock:
  421. # another thread may have tried to acquire the lock
  422. # at the same time so it may start another thread
  423. # make sure to check again before proceeding
  424. if self.running and self.pid == pid:
  425. return
  426. self.pid = pid
  427. self.running = True
  428. # if the profiler thread is changing,
  429. # we should create a new buffer along with it
  430. self.reset_buffer()
  431. self.thread = ThreadPool(1) # type: ignore[misc]
  432. try:
  433. self.thread.spawn(self.run)
  434. except RuntimeError:
  435. # Unfortunately at this point the interpreter is in a state that no
  436. # longer allows us to spawn a thread and we have to bail.
  437. self.running = False
  438. self.thread = None
  439. def teardown(self) -> None:
  440. if self.running:
  441. self.running = False
  442. if self.thread is not None:
  443. self.thread.join()
  444. self.thread = None
  445. self.buffer = None
  446. PROFILE_BUFFER_SECONDS = 60
  447. class ProfileBuffer:
  448. def __init__(
  449. self,
  450. options: "Dict[str, Any]",
  451. sdk_info: "SDKInfo",
  452. buffer_size: int,
  453. capture_func: "Callable[[Envelope], None]",
  454. ) -> None:
  455. self.options = options
  456. self.sdk_info = sdk_info
  457. self.buffer_size = buffer_size
  458. self.capture_func = capture_func
  459. self.profiler_id = uuid.uuid4().hex
  460. self.chunk = ProfileChunk()
  461. # Make sure to use the same clock to compute a sample's monotonic timestamp
  462. # to ensure the timestamps are correctly aligned.
  463. self.start_monotonic_time = now()
  464. # Make sure the start timestamp is defined only once per profiler id.
  465. # This prevents issues with clock drift within a single profiler session.
  466. #
  467. # Subtracting the start_monotonic_time here to find a fixed starting position
  468. # for relative monotonic timestamps for each sample.
  469. self.start_timestamp = (
  470. datetime.now(timezone.utc).timestamp() - self.start_monotonic_time
  471. )
  472. def write(self, monotonic_time: float, sample: "ExtractedSample") -> None:
  473. if self.should_flush(monotonic_time):
  474. self.flush()
  475. self.chunk = ProfileChunk()
  476. self.start_monotonic_time = now()
  477. self.chunk.write(self.start_timestamp + monotonic_time, sample)
  478. def should_flush(self, monotonic_time: float) -> bool:
  479. # If the delta between the new monotonic time and the start monotonic time
  480. # exceeds the buffer size, it means we should flush the chunk
  481. return monotonic_time - self.start_monotonic_time >= self.buffer_size
  482. def flush(self) -> None:
  483. chunk = self.chunk.to_json(self.profiler_id, self.options, self.sdk_info)
  484. envelope = Envelope()
  485. envelope.add_profile_chunk(chunk)
  486. self.capture_func(envelope)
  487. class ProfileChunk:
  488. def __init__(self) -> None:
  489. self.chunk_id = uuid.uuid4().hex
  490. self.indexed_frames: "Dict[FrameId, int]" = {}
  491. self.indexed_stacks: "Dict[StackId, int]" = {}
  492. self.frames: "List[ProcessedFrame]" = []
  493. self.stacks: "List[ProcessedStack]" = []
  494. self.samples: "List[ProcessedSample]" = []
  495. def write(self, ts: float, sample: "ExtractedSample") -> None:
  496. for tid, (stack_id, frame_ids, frames) in sample:
  497. try:
  498. # Check if the stack is indexed first, this lets us skip
  499. # indexing frames if it's not necessary
  500. if stack_id not in self.indexed_stacks:
  501. for i, frame_id in enumerate(frame_ids):
  502. if frame_id not in self.indexed_frames:
  503. self.indexed_frames[frame_id] = len(self.indexed_frames)
  504. self.frames.append(frames[i])
  505. self.indexed_stacks[stack_id] = len(self.indexed_stacks)
  506. self.stacks.append(
  507. [self.indexed_frames[frame_id] for frame_id in frame_ids]
  508. )
  509. self.samples.append(
  510. {
  511. "timestamp": ts,
  512. "thread_id": tid,
  513. "stack_id": self.indexed_stacks[stack_id],
  514. }
  515. )
  516. except AttributeError:
  517. # For some reason, the frame we get doesn't have certain attributes.
  518. # When this happens, we abandon the current sample as it's bad.
  519. capture_internal_exception(sys.exc_info())
  520. def to_json(
  521. self, profiler_id: str, options: "Dict[str, Any]", sdk_info: "SDKInfo"
  522. ) -> "Dict[str, Any]":
  523. profile = {
  524. "frames": self.frames,
  525. "stacks": self.stacks,
  526. "samples": self.samples,
  527. "thread_metadata": {
  528. str(thread.ident): {
  529. "name": str(thread.name),
  530. }
  531. for thread in threading.enumerate()
  532. },
  533. }
  534. set_in_app_in_frames(
  535. profile["frames"],
  536. options["in_app_exclude"],
  537. options["in_app_include"],
  538. options["project_root"],
  539. )
  540. payload = {
  541. "chunk_id": self.chunk_id,
  542. "client_sdk": {
  543. "name": sdk_info["name"],
  544. "version": VERSION,
  545. },
  546. "platform": "python",
  547. "profile": profile,
  548. "profiler_id": profiler_id,
  549. "version": "2",
  550. }
  551. for key in "release", "environment", "dist":
  552. if options[key] is not None:
  553. payload[key] = str(options[key]).strip()
  554. return payload