sessions.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. import os
  2. import warnings
  3. from threading import Thread, Lock, Event
  4. from contextlib import contextmanager
  5. import sentry_sdk
  6. from sentry_sdk.envelope import Envelope
  7. from sentry_sdk.session import Session
  8. from sentry_sdk.utils import format_timestamp
  9. from typing import TYPE_CHECKING
  10. if TYPE_CHECKING:
  11. from typing import Any
  12. from typing import Callable
  13. from typing import Dict
  14. from typing import Generator
  15. from typing import List
  16. from typing import Optional
  17. from typing import Union
  18. def is_auto_session_tracking_enabled(
  19. hub: "Optional[sentry_sdk.Hub]" = None,
  20. ) -> "Union[Any, bool, None]":
  21. """DEPRECATED: Utility function to find out if session tracking is enabled."""
  22. # Internal callers should use private _is_auto_session_tracking_enabled, instead.
  23. warnings.warn(
  24. "This function is deprecated and will be removed in the next major release. "
  25. "There is no public API replacement.",
  26. DeprecationWarning,
  27. stacklevel=2,
  28. )
  29. if hub is None:
  30. hub = sentry_sdk.Hub.current
  31. should_track = hub.scope._force_auto_session_tracking
  32. if should_track is None:
  33. client_options = hub.client.options if hub.client else {}
  34. should_track = client_options.get("auto_session_tracking", False)
  35. return should_track
  36. @contextmanager
  37. def auto_session_tracking(
  38. hub: "Optional[sentry_sdk.Hub]" = None, session_mode: str = "application"
  39. ) -> "Generator[None, None, None]":
  40. """DEPRECATED: Use track_session instead
  41. Starts and stops a session automatically around a block.
  42. """
  43. warnings.warn(
  44. "This function is deprecated and will be removed in the next major release. "
  45. "Use track_session instead.",
  46. DeprecationWarning,
  47. stacklevel=2,
  48. )
  49. if hub is None:
  50. hub = sentry_sdk.Hub.current
  51. with warnings.catch_warnings():
  52. warnings.simplefilter("ignore", DeprecationWarning)
  53. should_track = is_auto_session_tracking_enabled(hub)
  54. if should_track:
  55. hub.start_session(session_mode=session_mode)
  56. try:
  57. yield
  58. finally:
  59. if should_track:
  60. hub.end_session()
  61. def is_auto_session_tracking_enabled_scope(scope: "sentry_sdk.Scope") -> bool:
  62. """
  63. DEPRECATED: Utility function to find out if session tracking is enabled.
  64. """
  65. warnings.warn(
  66. "This function is deprecated and will be removed in the next major release. "
  67. "There is no public API replacement.",
  68. DeprecationWarning,
  69. stacklevel=2,
  70. )
  71. # Internal callers should use private _is_auto_session_tracking_enabled, instead.
  72. return _is_auto_session_tracking_enabled(scope)
  73. def _is_auto_session_tracking_enabled(scope: "sentry_sdk.Scope") -> bool:
  74. """
  75. Utility function to find out if session tracking is enabled.
  76. """
  77. should_track = scope._force_auto_session_tracking
  78. if should_track is None:
  79. client_options = sentry_sdk.get_client().options
  80. should_track = client_options.get("auto_session_tracking", False)
  81. return should_track
  82. @contextmanager
  83. def auto_session_tracking_scope(
  84. scope: "sentry_sdk.Scope", session_mode: str = "application"
  85. ) -> "Generator[None, None, None]":
  86. """DEPRECATED: This function is a deprecated alias for track_session.
  87. Starts and stops a session automatically around a block.
  88. """
  89. warnings.warn(
  90. "This function is a deprecated alias for track_session and will be removed in the next major release.",
  91. DeprecationWarning,
  92. stacklevel=2,
  93. )
  94. with track_session(scope, session_mode=session_mode):
  95. yield
  96. @contextmanager
  97. def track_session(
  98. scope: "sentry_sdk.Scope", session_mode: str = "application"
  99. ) -> "Generator[None, None, None]":
  100. """
  101. Start a new session in the provided scope, assuming session tracking is enabled.
  102. This is a no-op context manager if session tracking is not enabled.
  103. """
  104. should_track = _is_auto_session_tracking_enabled(scope)
  105. if should_track:
  106. scope.start_session(session_mode=session_mode)
  107. try:
  108. yield
  109. finally:
  110. if should_track:
  111. scope.end_session()
  112. TERMINAL_SESSION_STATES = ("exited", "abnormal", "crashed")
  113. MAX_ENVELOPE_ITEMS = 100
  114. def make_aggregate_envelope(aggregate_states: "Any", attrs: "Any") -> "Any":
  115. return {"attrs": dict(attrs), "aggregates": list(aggregate_states.values())}
  116. class SessionFlusher:
  117. def __init__(
  118. self,
  119. capture_func: "Callable[[Envelope], None]",
  120. flush_interval: int = 60,
  121. ) -> None:
  122. self.capture_func = capture_func
  123. self.flush_interval = flush_interval
  124. self.pending_sessions: "List[Any]" = []
  125. self.pending_aggregates: "Dict[Any, Any]" = {}
  126. self._thread: "Optional[Thread]" = None
  127. self._thread_lock = Lock()
  128. self._aggregate_lock = Lock()
  129. self._thread_for_pid: "Optional[int]" = None
  130. self.__shutdown_requested = Event()
  131. def flush(self) -> None:
  132. pending_sessions = self.pending_sessions
  133. self.pending_sessions = []
  134. with self._aggregate_lock:
  135. pending_aggregates = self.pending_aggregates
  136. self.pending_aggregates = {}
  137. envelope = Envelope()
  138. for session in pending_sessions:
  139. if len(envelope.items) == MAX_ENVELOPE_ITEMS:
  140. self.capture_func(envelope)
  141. envelope = Envelope()
  142. envelope.add_session(session)
  143. for attrs, states in pending_aggregates.items():
  144. if len(envelope.items) == MAX_ENVELOPE_ITEMS:
  145. self.capture_func(envelope)
  146. envelope = Envelope()
  147. envelope.add_sessions(make_aggregate_envelope(states, attrs))
  148. if len(envelope.items) > 0:
  149. self.capture_func(envelope)
  150. def _ensure_running(self) -> None:
  151. """
  152. Check that we have an active thread to run in, or create one if not.
  153. Note that this might fail (e.g. in Python 3.12 it's not possible to
  154. spawn new threads at interpreter shutdown). In that case self._running
  155. will be False after running this function.
  156. """
  157. if self._thread_for_pid == os.getpid() and self._thread is not None:
  158. return None
  159. with self._thread_lock:
  160. if self._thread_for_pid == os.getpid() and self._thread is not None:
  161. return None
  162. def _thread() -> None:
  163. running = True
  164. while running:
  165. running = not self.__shutdown_requested.wait(self.flush_interval)
  166. self.flush()
  167. thread = Thread(target=_thread)
  168. thread.daemon = True
  169. try:
  170. thread.start()
  171. except RuntimeError:
  172. # Unfortunately at this point the interpreter is in a state that no
  173. # longer allows us to spawn a thread and we have to bail.
  174. self.__shutdown_requested.set()
  175. return None
  176. self._thread = thread
  177. self._thread_for_pid = os.getpid()
  178. return None
  179. def add_aggregate_session(
  180. self,
  181. session: "Session",
  182. ) -> None:
  183. # NOTE on `session.did`:
  184. # the protocol can deal with buckets that have a distinct-id, however
  185. # in practice we expect the python SDK to have an extremely high cardinality
  186. # here, effectively making aggregation useless, therefore we do not
  187. # aggregate per-did.
  188. # For this part we can get away with using the global interpreter lock
  189. with self._aggregate_lock:
  190. attrs = session.get_json_attrs(with_user_info=False)
  191. primary_key = tuple(sorted(attrs.items()))
  192. secondary_key = session.truncated_started # (, session.did)
  193. states = self.pending_aggregates.setdefault(primary_key, {})
  194. state = states.setdefault(secondary_key, {})
  195. if "started" not in state:
  196. state["started"] = format_timestamp(session.truncated_started)
  197. # if session.did is not None:
  198. # state["did"] = session.did
  199. if session.status == "crashed":
  200. state["crashed"] = state.get("crashed", 0) + 1
  201. elif session.status == "abnormal":
  202. state["abnormal"] = state.get("abnormal", 0) + 1
  203. elif session.errors > 0:
  204. state["errored"] = state.get("errored", 0) + 1
  205. else:
  206. state["exited"] = state.get("exited", 0) + 1
  207. def add_session(
  208. self,
  209. session: "Session",
  210. ) -> None:
  211. if session.session_mode == "request":
  212. self.add_aggregate_session(session)
  213. else:
  214. self.pending_sessions.append(session.to_json())
  215. self._ensure_running()
  216. def kill(self) -> None:
  217. self.__shutdown_requested.set()