| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132 |
- import os
- import sys
- import warnings
- from copy import copy, deepcopy
- from collections import deque
- from contextlib import contextmanager
- from enum import Enum
- from datetime import datetime, timezone
- from functools import wraps
- from itertools import chain
- import sentry_sdk
- from sentry_sdk._types import AnnotatedValue
- from sentry_sdk.attachments import Attachment
- from sentry_sdk.consts import (
- DEFAULT_MAX_BREADCRUMBS,
- FALSE_VALUES,
- INSTRUMENTER,
- SPANDATA,
- )
- from sentry_sdk.feature_flags import FlagBuffer, DEFAULT_FLAG_CAPACITY
- from sentry_sdk.profiler.continuous_profiler import (
- get_profiler_id,
- try_autostart_continuous_profiler,
- try_profile_lifecycle_trace_start,
- )
- from sentry_sdk.profiler.transaction_profiler import Profile
- from sentry_sdk.session import Session
- from sentry_sdk.tracing_utils import (
- Baggage,
- has_tracing_enabled,
- has_span_streaming_enabled,
- is_ignored_span,
- _make_sampling_decision,
- PropagationContext,
- )
- from sentry_sdk.traces import _DEFAULT_PARENT_SPAN, StreamedSpan, NoOpStreamedSpan
- from sentry_sdk.tracing import (
- BAGGAGE_HEADER_NAME,
- SENTRY_TRACE_HEADER_NAME,
- NoOpSpan,
- Span,
- Transaction,
- )
- from sentry_sdk.utils import (
- capture_internal_exception,
- capture_internal_exceptions,
- ContextVar,
- datetime_from_isoformat,
- disable_capture_event,
- event_from_exception,
- exc_info_from_error,
- format_attribute,
- logger,
- has_logs_enabled,
- has_metrics_enabled,
- )
- from typing import TYPE_CHECKING, cast
- if TYPE_CHECKING:
- from collections.abc import Mapping
- from typing import Any
- from typing import Callable
- from typing import Deque
- from typing import Dict
- from typing import Generator
- from typing import Iterator
- from typing import List
- from typing import Optional
- from typing import ParamSpec
- from typing import Tuple
- from typing import TypeVar
- from typing import Union
- from typing_extensions import Unpack
- from sentry_sdk._types import (
- Attributes,
- AttributeValue,
- Breadcrumb,
- BreadcrumbHint,
- ErrorProcessor,
- Event,
- EventProcessor,
- ExcInfo,
- Hint,
- Log,
- LogLevelStr,
- Metric,
- SamplingContext,
- Type,
- )
- from sentry_sdk.tracing import TransactionKwargs
- import sentry_sdk
- P = ParamSpec("P")
- R = TypeVar("R")
- F = TypeVar("F", bound=Callable[..., Any])
- T = TypeVar("T")
- # Holds data that will be added to **all** events sent by this process.
- # In case this is a http server (think web framework) with multiple users
- # the data will be added to events of all users.
- # Typically this is used for process wide data such as the release.
- _global_scope: "Optional[Scope]" = None
- # Holds data for the active request.
- # This is used to isolate data for different requests or users.
- # The isolation scope is usually created by integrations, but may also
- # be created manually
- _isolation_scope = ContextVar("isolation_scope", default=None)
- # Holds data for the active span.
- # This can be used to manually add additional data to a span.
- _current_scope = ContextVar("current_scope", default=None)
- global_event_processors: "List[EventProcessor]" = []
- # A function returning a (trace_id, span_id) tuple
- # from an external tracing source (such as otel)
- _external_propagation_context_fn: "Optional[Callable[[], Optional[Tuple[str, str]]]]" = None
- class ScopeType(Enum):
- CURRENT = "current"
- ISOLATION = "isolation"
- GLOBAL = "global"
- MERGED = "merged"
- class _ScopeManager:
- def __init__(self, hub: "Optional[Any]" = None) -> None:
- self._old_scopes: "List[Scope]" = []
- def __enter__(self) -> "Scope":
- isolation_scope = Scope.get_isolation_scope()
- self._old_scopes.append(isolation_scope)
- forked_scope = isolation_scope.fork()
- _isolation_scope.set(forked_scope)
- return forked_scope
- def __exit__(self, exc_type: "Any", exc_value: "Any", tb: "Any") -> None:
- old_scope = self._old_scopes.pop()
- _isolation_scope.set(old_scope)
- def add_global_event_processor(processor: "EventProcessor") -> None:
- global_event_processors.append(processor)
- def register_external_propagation_context(
- fn: "Callable[[], Optional[Tuple[str, str]]]",
- ) -> None:
- global _external_propagation_context_fn
- _external_propagation_context_fn = fn
- def remove_external_propagation_context() -> None:
- global _external_propagation_context_fn
- _external_propagation_context_fn = None
- def get_external_propagation_context() -> "Optional[Tuple[str, str]]":
- return (
- _external_propagation_context_fn() if _external_propagation_context_fn else None
- )
- def has_external_propagation_context() -> bool:
- return _external_propagation_context_fn is not None
- def _attr_setter(fn: "Any") -> "Any":
- return property(fset=fn, doc=fn.__doc__)
- def _disable_capture(fn: "F") -> "F":
- @wraps(fn)
- def wrapper(self: "Any", *args: "Dict[str, Any]", **kwargs: "Any") -> "Any":
- if not self._should_capture:
- return
- try:
- self._should_capture = False
- return fn(self, *args, **kwargs)
- finally:
- self._should_capture = True
- return wrapper # type: ignore
- class Scope:
- """The scope holds extra information that should be sent with all
- events that belong to it.
- """
- # NOTE: Even though it should not happen, the scope needs to not crash when
- # accessed by multiple threads. It's fine if it's full of races, but those
- # races should never make the user application crash.
- #
- # The same needs to hold for any accesses of the scope the SDK makes.
- __slots__ = (
- "_level",
- "_name",
- "_fingerprint",
- # note that for legacy reasons, _transaction is the transaction *name*,
- # not a Transaction object (the object is stored in _span)
- "_transaction",
- "_transaction_info",
- "_user",
- "_tags",
- "_contexts",
- "_extras",
- "_breadcrumbs",
- "_n_breadcrumbs_truncated",
- "_gen_ai_original_message_count",
- "_gen_ai_conversation_id",
- "_event_processors",
- "_error_processors",
- "_should_capture",
- "_span",
- "_session",
- "_attachments",
- "_force_auto_session_tracking",
- "_profile",
- "_propagation_context",
- "client",
- "_type",
- "_last_event_id",
- "_flags",
- "_attributes",
- )
- def __init__(
- self,
- ty: "Optional[ScopeType]" = None,
- client: "Optional[sentry_sdk.Client]" = None,
- ) -> None:
- self._type = ty
- self._event_processors: "List[EventProcessor]" = []
- self._error_processors: "List[ErrorProcessor]" = []
- self._name: "Optional[str]" = None
- self._propagation_context: "Optional[PropagationContext]" = None
- self._n_breadcrumbs_truncated: int = 0
- self._gen_ai_original_message_count: "Dict[str, int]" = {}
- self.client: "sentry_sdk.client.BaseClient" = NonRecordingClient()
- if client is not None:
- self.set_client(client)
- self.clear()
- incoming_trace_information = self._load_trace_data_from_env()
- self.generate_propagation_context(incoming_data=incoming_trace_information)
- def __copy__(self) -> "Scope":
- """
- Returns a copy of this scope.
- This also creates a copy of all referenced data structures.
- """
- rv: "Scope" = object.__new__(self.__class__)
- rv._type = self._type
- rv.client = self.client
- rv._level = self._level
- rv._name = self._name
- rv._fingerprint = self._fingerprint
- rv._transaction = self._transaction
- rv._transaction_info = self._transaction_info.copy()
- rv._user = self._user
- rv._tags = self._tags.copy()
- rv._contexts = self._contexts.copy()
- rv._extras = self._extras.copy()
- rv._breadcrumbs = copy(self._breadcrumbs)
- rv._n_breadcrumbs_truncated = self._n_breadcrumbs_truncated
- rv._gen_ai_original_message_count = self._gen_ai_original_message_count.copy()
- rv._event_processors = self._event_processors.copy()
- rv._error_processors = self._error_processors.copy()
- rv._propagation_context = self._propagation_context
- rv._should_capture = self._should_capture
- rv._span = self._span
- rv._session = self._session
- rv._force_auto_session_tracking = self._force_auto_session_tracking
- rv._attachments = self._attachments.copy()
- rv._profile = self._profile
- rv._last_event_id = self._last_event_id
- rv._flags = deepcopy(self._flags)
- rv._attributes = self._attributes.copy()
- rv._gen_ai_conversation_id = self._gen_ai_conversation_id
- return rv
- @classmethod
- def get_current_scope(cls) -> "Scope":
- """
- .. versionadded:: 2.0.0
- Returns the current scope.
- """
- current_scope = _current_scope.get()
- if current_scope is None:
- current_scope = Scope(ty=ScopeType.CURRENT)
- _current_scope.set(current_scope)
- return current_scope
- @classmethod
- def set_current_scope(cls, new_current_scope: "Scope") -> None:
- """
- .. versionadded:: 2.0.0
- Sets the given scope as the new current scope overwriting the existing current scope.
- :param new_current_scope: The scope to set as the new current scope.
- """
- _current_scope.set(new_current_scope)
- @classmethod
- def get_isolation_scope(cls) -> "Scope":
- """
- .. versionadded:: 2.0.0
- Returns the isolation scope.
- """
- isolation_scope = _isolation_scope.get()
- if isolation_scope is None:
- isolation_scope = Scope(ty=ScopeType.ISOLATION)
- _isolation_scope.set(isolation_scope)
- return isolation_scope
- @classmethod
- def set_isolation_scope(cls, new_isolation_scope: "Scope") -> None:
- """
- .. versionadded:: 2.0.0
- Sets the given scope as the new isolation scope overwriting the existing isolation scope.
- :param new_isolation_scope: The scope to set as the new isolation scope.
- """
- _isolation_scope.set(new_isolation_scope)
- @classmethod
- def get_global_scope(cls) -> "Scope":
- """
- .. versionadded:: 2.0.0
- Returns the global scope.
- """
- global _global_scope
- if _global_scope is None:
- _global_scope = Scope(ty=ScopeType.GLOBAL)
- return _global_scope
- def set_global_attributes(self) -> None:
- from sentry_sdk.client import SDK_INFO
- self.set_attribute("sentry.sdk.name", SDK_INFO["name"])
- self.set_attribute("sentry.sdk.version", SDK_INFO["version"])
- options = sentry_sdk.get_client().options
- server_name = options.get("server_name")
- if server_name:
- self.set_attribute(SPANDATA.SERVER_ADDRESS, server_name)
- environment = options.get("environment")
- if environment:
- self.set_attribute("sentry.environment", environment)
- release = options.get("release")
- if release:
- self.set_attribute("sentry.release", release)
- @classmethod
- def last_event_id(cls) -> "Optional[str]":
- """
- .. versionadded:: 2.2.0
- Returns event ID of the event most recently captured by the isolation scope, or None if no event
- has been captured. We do not consider events that are dropped, e.g. by a before_send hook.
- Transactions also are not considered events in this context.
- The event corresponding to the returned event ID is NOT guaranteed to actually be sent to Sentry;
- whether the event is sent depends on the transport. The event could be sent later or not at all.
- Even a sent event could fail to arrive in Sentry due to network issues, exhausted quotas, or
- various other reasons.
- """
- return cls.get_isolation_scope()._last_event_id
- def _merge_scopes(
- self,
- additional_scope: "Optional[Scope]" = None,
- additional_scope_kwargs: "Optional[Dict[str, Any]]" = None,
- ) -> "Scope":
- """
- Merges global, isolation and current scope into a new scope and
- adds the given additional scope or additional scope kwargs to it.
- """
- if additional_scope and additional_scope_kwargs:
- raise TypeError("cannot provide scope and kwargs")
- final_scope = copy(_global_scope) if _global_scope is not None else Scope()
- final_scope._type = ScopeType.MERGED
- isolation_scope = _isolation_scope.get()
- if isolation_scope is not None:
- final_scope.update_from_scope(isolation_scope)
- current_scope = _current_scope.get()
- if current_scope is not None:
- final_scope.update_from_scope(current_scope)
- if self != current_scope and self != isolation_scope:
- final_scope.update_from_scope(self)
- if additional_scope is not None:
- if callable(additional_scope):
- additional_scope(final_scope)
- else:
- final_scope.update_from_scope(additional_scope)
- elif additional_scope_kwargs:
- final_scope.update_from_kwargs(**additional_scope_kwargs)
- return final_scope
- @classmethod
- def get_client(cls) -> "sentry_sdk.client.BaseClient":
- """
- .. versionadded:: 2.0.0
- Returns the currently used :py:class:`sentry_sdk.Client`.
- This checks the current scope, the isolation scope and the global scope for a client.
- If no client is available a :py:class:`sentry_sdk.client.NonRecordingClient` is returned.
- """
- current_scope = _current_scope.get()
- try:
- client = current_scope.client
- except AttributeError:
- client = None
- if client is not None and client.is_active():
- return client
- isolation_scope = _isolation_scope.get()
- try:
- client = isolation_scope.client
- except AttributeError:
- client = None
- if client is not None and client.is_active():
- return client
- try:
- client = _global_scope.client # type: ignore
- except AttributeError:
- client = None
- if client is not None and client.is_active():
- return client
- return NonRecordingClient()
- def set_client(
- self, client: "Optional[sentry_sdk.client.BaseClient]" = None
- ) -> None:
- """
- .. versionadded:: 2.0.0
- Sets the client for this scope.
- :param client: The client to use in this scope.
- If `None` the client of the scope will be replaced by a :py:class:`sentry_sdk.NonRecordingClient`.
- """
- if client is not None:
- self.client = client
- # We need a client to set the initial global attributes on the global
- # scope since they mostly come from client options, so populate them
- # as soon as a client is set
- sentry_sdk.get_global_scope().set_global_attributes()
- else:
- self.client = NonRecordingClient()
- def fork(self) -> "Scope":
- """
- .. versionadded:: 2.0.0
- Returns a fork of this scope.
- """
- forked_scope = copy(self)
- return forked_scope
- def _load_trace_data_from_env(self) -> "Optional[Dict[str, str]]":
- """
- Load Sentry trace id and baggage from environment variables.
- Can be disabled by setting SENTRY_USE_ENVIRONMENT to "false".
- """
- incoming_trace_information = None
- sentry_use_environment = (
- os.environ.get("SENTRY_USE_ENVIRONMENT") or ""
- ).lower()
- use_environment = sentry_use_environment not in FALSE_VALUES
- if use_environment:
- incoming_trace_information = {}
- if os.environ.get("SENTRY_TRACE"):
- incoming_trace_information[SENTRY_TRACE_HEADER_NAME] = (
- os.environ.get("SENTRY_TRACE") or ""
- )
- if os.environ.get("SENTRY_BAGGAGE"):
- incoming_trace_information[BAGGAGE_HEADER_NAME] = (
- os.environ.get("SENTRY_BAGGAGE") or ""
- )
- return incoming_trace_information or None
- def set_new_propagation_context(self) -> None:
- """
- Creates a new propagation context and sets it as `_propagation_context`. Overwriting existing one.
- """
- self._propagation_context = PropagationContext()
- def generate_propagation_context(
- self, incoming_data: "Optional[Dict[str, str]]" = None
- ) -> None:
- """
- Makes sure the propagation context is set on the scope.
- If there is `incoming_data` overwrite existing propagation context.
- If there is no `incoming_data` create new propagation context, but do NOT overwrite if already existing.
- """
- if incoming_data is not None:
- self._propagation_context = PropagationContext.from_incoming_data(
- incoming_data
- )
- # TODO-neel this below is a BIG code smell but requires a bunch of other refactoring
- if self._type != ScopeType.CURRENT:
- if self._propagation_context is None:
- self.set_new_propagation_context()
- def get_dynamic_sampling_context(self) -> "Optional[Dict[str, str]]":
- """
- Returns the Dynamic Sampling Context from the Propagation Context.
- If not existing, creates a new one.
- Deprecated: Logic moved to PropagationContext, don't use directly.
- """
- if self._propagation_context is None:
- return None
- return self._propagation_context.dynamic_sampling_context
- def get_traceparent(self, *args: "Any", **kwargs: "Any") -> "Optional[str]":
- """
- Returns the Sentry "sentry-trace" header (aka the traceparent) from the
- currently active span or the scopes Propagation Context.
- """
- client = self.get_client()
- # If we have an active span, return traceparent from there
- if (
- has_tracing_enabled(client.options)
- and self.span is not None
- and not isinstance(self.span, NoOpStreamedSpan)
- ):
- return self.span._to_traceparent()
- # else return traceparent from the propagation context
- return self.get_active_propagation_context().to_traceparent()
- def get_baggage(self, *args: "Any", **kwargs: "Any") -> "Optional[Baggage]":
- """
- Returns the Sentry "baggage" header containing trace information from the
- currently active span or the scopes Propagation Context.
- """
- client = self.get_client()
- # If we have an active span, return baggage from there
- if (
- has_tracing_enabled(client.options)
- and self.span is not None
- and not isinstance(self.span, NoOpStreamedSpan)
- ):
- return self.span._to_baggage()
- # else return baggage from the propagation context
- return self.get_active_propagation_context().get_baggage()
- def get_trace_context(self) -> "Dict[str, Any]":
- """
- Returns the Sentry "trace" context from the Propagation Context.
- """
- if (
- has_tracing_enabled(self.get_client().options)
- and self._span is not None
- and not isinstance(self._span, NoOpStreamedSpan)
- ):
- return self._span._get_trace_context()
- # if we are tracing externally (otel), those values take precedence
- external_propagation_context = get_external_propagation_context()
- if external_propagation_context:
- trace_id, span_id = external_propagation_context
- return {"trace_id": trace_id, "span_id": span_id}
- propagation_context = self.get_active_propagation_context()
- return {
- "trace_id": propagation_context.trace_id,
- "span_id": propagation_context.span_id,
- "parent_span_id": propagation_context.parent_span_id,
- "dynamic_sampling_context": propagation_context.dynamic_sampling_context,
- }
- def trace_propagation_meta(self, *args: "Any", **kwargs: "Any") -> str:
- """
- Return meta tags which should be injected into HTML templates
- to allow propagation of trace information.
- """
- span = kwargs.pop("span", None)
- if span is not None:
- logger.warning(
- "The parameter `span` in trace_propagation_meta() is deprecated and will be removed in the future."
- )
- meta = ""
- for name, content in self.iter_trace_propagation_headers():
- meta += f'<meta name="{name}" content="{content}">'
- return meta
- def iter_headers(self) -> "Iterator[Tuple[str, str]]":
- """
- Creates a generator which returns the `sentry-trace` and `baggage` headers from the Propagation Context.
- Deprecated: use PropagationContext.iter_headers instead.
- """
- if self._propagation_context is not None:
- yield from self._propagation_context.iter_headers()
- def iter_trace_propagation_headers(
- self, *args: "Any", **kwargs: "Any"
- ) -> "Generator[Tuple[str, str], None, None]":
- """
- Return HTTP headers which allow propagation of trace data.
- If a span is given, the trace data will taken from the span.
- If no span is given, the trace data is taken from the scope.
- """
- client = self.get_client()
- if not client.options.get("propagate_traces"):
- warnings.warn(
- "The `propagate_traces` parameter is deprecated. Please use `trace_propagation_targets` instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- return
- span = kwargs.pop("span", None)
- span = span or self.span
- if (
- has_tracing_enabled(client.options)
- and span is not None
- and not isinstance(span, NoOpStreamedSpan)
- ):
- for header in span._iter_headers():
- yield header
- elif has_external_propagation_context():
- # when we have an external_propagation_context (otlp)
- # we leave outgoing propagation to the propagator
- return
- else:
- for header in self.get_active_propagation_context().iter_headers():
- yield header
- def get_active_propagation_context(self) -> "PropagationContext":
- if self._propagation_context is not None:
- return self._propagation_context
- current_scope = self.get_current_scope()
- if current_scope._propagation_context is not None:
- return current_scope._propagation_context
- isolation_scope = self.get_isolation_scope()
- # should actually never happen, but just in case someone calls scope.clear
- if isolation_scope._propagation_context is None:
- isolation_scope._propagation_context = PropagationContext()
- return isolation_scope._propagation_context
- def set_custom_sampling_context(
- self, custom_sampling_context: "dict[str, Any]"
- ) -> None:
- self.get_active_propagation_context()._set_custom_sampling_context(
- custom_sampling_context
- )
- def clear(self) -> None:
- """Clears the entire scope."""
- self._level: "Optional[LogLevelStr]" = None
- self._fingerprint: "Optional[List[str]]" = None
- self._transaction: "Optional[str]" = None
- self._transaction_info: "dict[str, str]" = {}
- self._user: "Optional[Dict[str, Any]]" = None
- self._tags: "Dict[str, Any]" = {}
- self._contexts: "Dict[str, Dict[str, Any]]" = {}
- self._extras: "dict[str, Any]" = {}
- self._attachments: "List[Attachment]" = []
- self.clear_breadcrumbs()
- self._should_capture: bool = True
- self._span: "Optional[Union[Span, StreamedSpan]]" = None
- self._session: "Optional[Session]" = None
- self._force_auto_session_tracking: "Optional[bool]" = None
- self._profile: "Optional[Profile]" = None
- self._propagation_context = None
- # self._last_event_id is only applicable to isolation scopes
- self._last_event_id: "Optional[str]" = None
- self._flags: "Optional[FlagBuffer]" = None
- self._attributes: "Attributes" = {}
- self._gen_ai_conversation_id: "Optional[str]" = None
- @_attr_setter
- def level(self, value: "LogLevelStr") -> None:
- """
- When set this overrides the level.
- .. deprecated:: 1.0.0
- Use :func:`set_level` instead.
- :param value: The level to set.
- """
- logger.warning(
- "Deprecated: use .set_level() instead. This will be removed in the future."
- )
- self._level = value
- def set_level(self, value: "LogLevelStr") -> None:
- """
- Sets the level for the scope.
- :param value: The level to set.
- """
- self._level = value
- @_attr_setter
- def fingerprint(self, value: "Optional[List[str]]") -> None:
- """When set this overrides the default fingerprint."""
- self._fingerprint = value
- @property
- def transaction(self) -> "Any":
- # would be type: () -> Optional[Transaction], see https://github.com/python/mypy/issues/3004
- """Return the transaction (root span) in the scope, if any."""
- # there is no span/transaction on the scope
- if self._span is None:
- return None
- if isinstance(self._span, StreamedSpan):
- warnings.warn(
- "Scope.transaction is not available in streaming mode.",
- DeprecationWarning,
- stacklevel=2,
- )
- return None
- # there is an orphan span on the scope
- if self._span.containing_transaction is None:
- return None
- # there is either a transaction (which is its own containing
- # transaction) or a non-orphan span on the scope
- return self._span.containing_transaction
- @transaction.setter
- def transaction(self, value: "Any") -> None:
- # would be type: (Optional[str]) -> None, see https://github.com/python/mypy/issues/3004
- """When set this forces a specific transaction name to be set.
- Deprecated: use set_transaction_name instead."""
- # XXX: the docstring above is misleading. The implementation of
- # apply_to_event prefers an existing value of event.transaction over
- # anything set in the scope.
- # XXX: note that with the introduction of the Scope.transaction getter,
- # there is a semantic and type mismatch between getter and setter. The
- # getter returns a Transaction, the setter sets a transaction name.
- # Without breaking version compatibility, we could make the setter set a
- # transaction name or transaction (self._span) depending on the type of
- # the value argument.
- logger.warning(
- "Assigning to scope.transaction directly is deprecated: use scope.set_transaction_name() instead."
- )
- self._transaction = value
- if self._span:
- if isinstance(self._span, StreamedSpan):
- warnings.warn(
- "Scope.transaction is not available in streaming mode.",
- DeprecationWarning,
- stacklevel=2,
- )
- return None
- if self._span.containing_transaction:
- self._span.containing_transaction.name = value
- def set_transaction_name(self, name: str, source: "Optional[str]" = None) -> None:
- """Set the transaction name and optionally the transaction source."""
- self._transaction = name
- if self._span:
- if isinstance(self._span, NoOpStreamedSpan):
- return
- elif isinstance(self._span, StreamedSpan):
- self._span._segment.name = name
- if source:
- self._span._segment.set_attribute(
- "sentry.span.source", getattr(source, "value", source)
- )
- elif self._span.containing_transaction:
- self._span.containing_transaction.name = name
- if source:
- self._span.containing_transaction.source = source
- if source:
- self._transaction_info["source"] = source
- @_attr_setter
- def user(self, value: "Optional[Dict[str, Any]]") -> None:
- """When set a specific user is bound to the scope. Deprecated in favor of set_user."""
- warnings.warn(
- "The `Scope.user` setter is deprecated in favor of `Scope.set_user()`.",
- DeprecationWarning,
- stacklevel=2,
- )
- self.set_user(value)
- def set_user(self, value: "Optional[Dict[str, Any]]") -> None:
- """Sets a user for the scope."""
- self._user = value
- session = self.get_isolation_scope()._session
- if session is not None:
- session.update(user=value)
- @property
- def span(self) -> "Optional[Union[Span, StreamedSpan]]":
- """Get/set current tracing span or transaction."""
- return self._span
- @span.setter
- def span(self, span: "Optional[Union[Span, StreamedSpan]]") -> None:
- self._span = span
- # XXX: this differs from the implementation in JS, there Scope.setSpan
- # does not set Scope._transactionName.
- if isinstance(span, Transaction):
- transaction = span
- if transaction.name:
- self._transaction = transaction.name
- if transaction.source:
- self._transaction_info["source"] = transaction.source
- @property
- def profile(self) -> "Optional[Profile]":
- return self._profile
- @profile.setter
- def profile(self, profile: "Optional[Profile]") -> None:
- self._profile = profile
- def set_tag(self, key: str, value: "Any") -> None:
- """
- Sets a tag for a key to a specific value.
- :param key: Key of the tag to set.
- :param value: Value of the tag to set.
- """
- self._tags[key] = value
- def set_tags(self, tags: "Mapping[str, object]") -> None:
- """Sets multiple tags at once.
- This method updates multiple tags at once. The tags are passed as a dictionary
- or other mapping type.
- Calling this method is equivalent to calling `set_tag` on each key-value pair
- in the mapping. If a tag key already exists in the scope, its value will be
- updated. If the tag key does not exist in the scope, the key-value pair will
- be added to the scope.
- This method only modifies tag keys in the `tags` mapping passed to the method.
- `scope.set_tags({})` is, therefore, a no-op.
- :param tags: A mapping of tag keys to tag values to set.
- """
- self._tags.update(tags)
- def remove_tag(self, key: str) -> None:
- """
- Removes a specific tag.
- :param key: Key of the tag to remove.
- """
- self._tags.pop(key, None)
- def set_context(
- self,
- key: str,
- value: "Dict[str, Any]",
- ) -> None:
- """
- Binds a context at a certain key to a specific value.
- """
- self._contexts[key] = value
- def remove_context(
- self,
- key: str,
- ) -> None:
- """Removes a context."""
- self._contexts.pop(key, None)
- def set_extra(
- self,
- key: str,
- value: "Any",
- ) -> None:
- """Sets an extra key to a specific value."""
- self._extras[key] = value
- def remove_extra(
- self,
- key: str,
- ) -> None:
- """Removes a specific extra key."""
- self._extras.pop(key, None)
- def set_conversation_id(self, conversation_id: str) -> None:
- """
- Sets the conversation ID for gen_ai spans.
- :param conversation_id: The conversation ID to set.
- """
- self._gen_ai_conversation_id = conversation_id
- def get_conversation_id(self) -> "Optional[str]":
- """
- Gets the conversation ID for gen_ai spans.
- :returns: The conversation ID, or None if not set.
- """
- return self._gen_ai_conversation_id
- def remove_conversation_id(self) -> None:
- """Removes the conversation ID."""
- self._gen_ai_conversation_id = None
- def clear_breadcrumbs(self) -> None:
- """Clears breadcrumb buffer."""
- self._breadcrumbs: "Deque[Breadcrumb]" = deque()
- self._n_breadcrumbs_truncated = 0
- def add_attachment(
- self,
- bytes: "Union[None, bytes, Callable[[], bytes]]" = None,
- filename: "Optional[str]" = None,
- path: "Optional[str]" = None,
- content_type: "Optional[str]" = None,
- add_to_transactions: bool = False,
- ) -> None:
- """Adds an attachment to future events sent from this scope.
- The parameters are the same as for the :py:class:`sentry_sdk.attachments.Attachment` constructor.
- """
- self._attachments.append(
- Attachment(
- bytes=bytes,
- path=path,
- filename=filename,
- content_type=content_type,
- add_to_transactions=add_to_transactions,
- )
- )
- def add_breadcrumb(
- self,
- crumb: "Optional[Breadcrumb]" = None,
- hint: "Optional[BreadcrumbHint]" = None,
- **kwargs: "Any",
- ) -> None:
- """
- Adds a breadcrumb.
- :param crumb: Dictionary with the data as the sentry v7/v8 protocol expects.
- :param hint: An optional value that can be used by `before_breadcrumb`
- to customize the breadcrumbs that are emitted.
- """
- client = self.get_client()
- if not client.is_active():
- logger.info("Dropped breadcrumb because no client bound")
- return
- before_breadcrumb = client.options.get("before_breadcrumb")
- max_breadcrumbs = client.options.get("max_breadcrumbs", DEFAULT_MAX_BREADCRUMBS)
- crumb: "Breadcrumb" = dict(crumb or ())
- crumb.update(kwargs)
- if not crumb:
- return
- hint: "Hint" = dict(hint or ())
- if crumb.get("timestamp") is None:
- crumb["timestamp"] = datetime.now(timezone.utc)
- if crumb.get("type") is None:
- crumb["type"] = "default"
- if before_breadcrumb is not None:
- new_crumb = before_breadcrumb(crumb, hint)
- else:
- new_crumb = crumb
- if new_crumb is not None:
- self._breadcrumbs.append(new_crumb)
- else:
- logger.info("before breadcrumb dropped breadcrumb (%s)", crumb)
- while len(self._breadcrumbs) > max_breadcrumbs:
- self._breadcrumbs.popleft()
- self._n_breadcrumbs_truncated += 1
- def start_transaction(
- self,
- transaction: "Optional[Transaction]" = None,
- instrumenter: str = INSTRUMENTER.SENTRY,
- custom_sampling_context: "Optional[SamplingContext]" = None,
- **kwargs: "Unpack[TransactionKwargs]",
- ) -> "Union[Transaction, NoOpSpan]":
- """
- Start and return a transaction.
- Start an existing transaction if given, otherwise create and start a new
- transaction with kwargs.
- This is the entry point to manual tracing instrumentation.
- A tree structure can be built by adding child spans to the transaction,
- and child spans to other spans. To start a new child span within the
- transaction or any span, call the respective `.start_child()` method.
- Every child span must be finished before the transaction is finished,
- otherwise the unfinished spans are discarded.
- When used as context managers, spans and transactions are automatically
- finished at the end of the `with` block. If not using context managers,
- call the `.finish()` method.
- When the transaction is finished, it will be sent to Sentry with all its
- finished child spans.
- :param transaction: The transaction to start. If omitted, we create and
- start a new transaction.
- :param instrumenter: This parameter is meant for internal use only. It
- will be removed in the next major version.
- :param custom_sampling_context: The transaction's custom sampling context.
- :param kwargs: Optional keyword arguments to be passed to the Transaction
- constructor. See :py:class:`sentry_sdk.tracing.Transaction` for
- available arguments.
- """
- kwargs.setdefault("scope", self)
- client = self.get_client()
- configuration_instrumenter = client.options["instrumenter"]
- if instrumenter != configuration_instrumenter:
- return NoOpSpan()
- try_autostart_continuous_profiler()
- custom_sampling_context = custom_sampling_context or {}
- # kwargs at this point has type TransactionKwargs, since we have removed
- # the client and custom_sampling_context from it.
- transaction_kwargs: "TransactionKwargs" = kwargs
- # if we haven't been given a transaction, make one
- if transaction is None:
- transaction = Transaction(**transaction_kwargs)
- # use traces_sample_rate, traces_sampler, and/or inheritance to make a
- # sampling decision
- sampling_context = {
- "transaction_context": transaction.to_json(),
- "parent_sampled": transaction.parent_sampled,
- }
- sampling_context.update(custom_sampling_context)
- transaction._set_initial_sampling_decision(sampling_context=sampling_context)
- # update the sample rate in the dsc
- if transaction.sample_rate is not None:
- propagation_context = self.get_active_propagation_context()
- baggage = propagation_context.baggage
- if baggage is not None:
- baggage.sentry_items["sample_rate"] = str(transaction.sample_rate)
- if transaction._baggage:
- transaction._baggage.sentry_items["sample_rate"] = str(
- transaction.sample_rate
- )
- if transaction.sampled:
- profile = Profile(
- transaction.sampled, transaction._start_timestamp_monotonic_ns
- )
- profile._set_initial_sampling_decision(sampling_context=sampling_context)
- transaction._profile = profile
- transaction._continuous_profile = try_profile_lifecycle_trace_start()
- # Typically, the profiler is set when the transaction is created. But when
- # using the auto lifecycle, the profiler isn't running when the first
- # transaction is started. So make sure we update the profiler id on it.
- if transaction._continuous_profile is not None:
- transaction.set_profiler_id(get_profiler_id())
- # we don't bother to keep spans if we already know we're not going to
- # send the transaction
- max_spans = (client.options["_experiments"].get("max_spans")) or 1000
- transaction.init_span_recorder(maxlen=max_spans)
- return transaction
- def start_span(
- self, instrumenter: str = INSTRUMENTER.SENTRY, **kwargs: "Any"
- ) -> "Span":
- """
- Start a span whose parent is the currently active span or transaction, if any.
- The return value is a :py:class:`sentry_sdk.tracing.Span` instance,
- typically used as a context manager to start and stop timing in a `with`
- block.
- Only spans contained in a transaction are sent to Sentry. Most
- integrations start a transaction at the appropriate time, for example
- for every incoming HTTP request. Use
- :py:meth:`sentry_sdk.start_transaction` to start a new transaction when
- one is not already in progress.
- For supported `**kwargs` see :py:class:`sentry_sdk.tracing.Span`.
- The instrumenter parameter is deprecated for user code, and it will
- be removed in the next major version. Going forward, it should only
- be used by the SDK itself.
- """
- client = sentry_sdk.get_client()
- if has_span_streaming_enabled(client.options):
- warnings.warn(
- "Scope.start_span is not available in streaming mode.",
- DeprecationWarning,
- stacklevel=2,
- )
- return NoOpSpan()
- if kwargs.get("description") is not None:
- warnings.warn(
- "The `description` parameter is deprecated. Please use `name` instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- with new_scope():
- kwargs.setdefault("scope", self)
- client = self.get_client()
- configuration_instrumenter = client.options["instrumenter"]
- if instrumenter != configuration_instrumenter:
- return NoOpSpan()
- # get current span or transaction
- span = self.span or self.get_isolation_scope().span
- if isinstance(span, StreamedSpan):
- # make mypy happy
- return NoOpSpan()
- if span is None:
- # New spans get the `trace_id` from the scope
- if "trace_id" not in kwargs:
- propagation_context = self.get_active_propagation_context()
- kwargs["trace_id"] = propagation_context.trace_id
- span = Span(**kwargs)
- else:
- # Children take `trace_id`` from the parent span.
- span = span.start_child(**kwargs)
- return span
- def start_streamed_span(
- self,
- name: str,
- attributes: "Optional[Attributes]",
- parent_span: "Optional[StreamedSpan]",
- active: bool,
- ) -> "StreamedSpan":
- # TODO: rename to start_span once we drop the old API
- if isinstance(parent_span, NoOpStreamedSpan):
- # parent_span is only set if the user explicitly set it
- logger.debug(
- "Ignored parent span provided. Span will be parented to the "
- "currently active span instead."
- )
- if parent_span is _DEFAULT_PARENT_SPAN or isinstance(
- parent_span, NoOpStreamedSpan
- ):
- parent_span = self.span # type: ignore
- # If no eligible parent_span was provided and there is no currently
- # active span, this is a segment
- if parent_span is None:
- propagation_context = self.get_active_propagation_context()
- if is_ignored_span(name, attributes):
- return NoOpStreamedSpan(
- scope=self,
- unsampled_reason="ignored",
- )
- sampled, sample_rate, sample_rand, outcome = _make_sampling_decision(
- name,
- attributes,
- self,
- )
- if sample_rate is not None:
- self._update_sample_rate(sample_rate)
- if sampled is False:
- return NoOpStreamedSpan(
- scope=self,
- unsampled_reason=outcome,
- )
- return StreamedSpan(
- name=name,
- attributes=attributes,
- active=active,
- scope=self,
- segment=None,
- trace_id=propagation_context.trace_id,
- parent_span_id=propagation_context.parent_span_id,
- parent_sampled=propagation_context.parent_sampled,
- baggage=propagation_context.baggage,
- sample_rand=sample_rand,
- sample_rate=sample_rate,
- )
- # This is a child span; take propagation context from the parent span
- with new_scope():
- if is_ignored_span(name, attributes):
- return NoOpStreamedSpan(
- unsampled_reason="ignored",
- )
- if isinstance(parent_span, NoOpStreamedSpan):
- return NoOpStreamedSpan(unsampled_reason=parent_span._unsampled_reason)
- return StreamedSpan(
- name=name,
- attributes=attributes,
- active=active,
- scope=self,
- segment=parent_span._segment,
- trace_id=parent_span.trace_id,
- parent_span_id=parent_span.span_id,
- parent_sampled=parent_span.sampled,
- )
- def _update_sample_rate(self, sample_rate: float) -> None:
- # If we had to adjust the sample rate when setting the sampling decision
- # for a span, it needs to be updated in the propagation context too
- propagation_context = self.get_active_propagation_context()
- baggage = propagation_context.baggage
- if baggage is not None:
- baggage.sentry_items["sample_rate"] = str(sample_rate)
- def continue_trace(
- self,
- environ_or_headers: "Dict[str, Any]",
- op: "Optional[str]" = None,
- name: "Optional[str]" = None,
- source: "Optional[str]" = None,
- origin: str = "manual",
- ) -> "Transaction":
- """
- Sets the propagation context from environment or headers and returns a transaction.
- """
- self.generate_propagation_context(environ_or_headers)
- # generate_propagation_context ensures that the propagation_context is not None.
- propagation_context = cast(PropagationContext, self._propagation_context)
- optional_kwargs = {}
- if name:
- optional_kwargs["name"] = name
- if source:
- optional_kwargs["source"] = source
- return Transaction(
- op=op,
- origin=origin,
- baggage=propagation_context.baggage,
- parent_sampled=propagation_context.parent_sampled,
- trace_id=propagation_context.trace_id,
- parent_span_id=propagation_context.parent_span_id,
- same_process_as_parent=False,
- **optional_kwargs,
- )
- def capture_event(
- self,
- event: "Event",
- hint: "Optional[Hint]" = None,
- scope: "Optional[Scope]" = None,
- **scope_kwargs: "Any",
- ) -> "Optional[str]":
- """
- Captures an event.
- Merges given scope data and calls :py:meth:`sentry_sdk.client._Client.capture_event`.
- :param event: A ready-made event that can be directly sent to Sentry.
- :param hint: Contains metadata about the event that can be read from `before_send`, such as the original exception object or a HTTP request object.
- :param scope: An optional :py:class:`sentry_sdk.Scope` to apply to events.
- The `scope` and `scope_kwargs` parameters are mutually exclusive.
- :param scope_kwargs: Optional data to apply to event.
- For supported `**scope_kwargs` see :py:meth:`sentry_sdk.Scope.update_from_kwargs`.
- The `scope` and `scope_kwargs` parameters are mutually exclusive.
- :returns: An `event_id` if the SDK decided to send the event (see :py:meth:`sentry_sdk.client._Client.capture_event`).
- """
- if disable_capture_event.get(False):
- return None
- scope = self._merge_scopes(scope, scope_kwargs)
- event_id = self.get_client().capture_event(event=event, hint=hint, scope=scope)
- if event_id is not None and event.get("type") != "transaction":
- self.get_isolation_scope()._last_event_id = event_id
- return event_id
- def _capture_log(self, log: "Optional[Log]") -> None:
- if log is None:
- return
- client = self.get_client()
- if not has_logs_enabled(client.options):
- return
- merged_scope = self._merge_scopes()
- debug = client.options.get("debug", False)
- if debug:
- logger.debug(
- f"[Sentry Logs] [{log.get('severity_text')}] {log.get('body')}"
- )
- client._capture_log(log, scope=merged_scope)
- def _capture_metric(self, metric: "Optional[Metric]") -> None:
- if metric is None:
- return
- client = self.get_client()
- if not has_metrics_enabled(client.options):
- return
- merged_scope = self._merge_scopes()
- debug = client.options.get("debug", False)
- if debug:
- logger.debug(
- f"[Sentry Metrics] [{metric.get('type')}] {metric.get('name')}: {metric.get('value')}"
- )
- client._capture_metric(metric, scope=merged_scope)
- def _capture_span(self, span: "Optional[StreamedSpan]") -> None:
- if span is None:
- return
- client = self.get_client()
- if not has_span_streaming_enabled(client.options):
- return
- merged_scope = self._merge_scopes()
- client._capture_span(span, scope=merged_scope)
- def capture_message(
- self,
- message: str,
- level: "Optional[LogLevelStr]" = None,
- scope: "Optional[Scope]" = None,
- **scope_kwargs: "Any",
- ) -> "Optional[str]":
- """
- Captures a message.
- :param message: The string to send as the message.
- :param level: If no level is provided, the default level is `info`.
- :param scope: An optional :py:class:`sentry_sdk.Scope` to apply to events.
- The `scope` and `scope_kwargs` parameters are mutually exclusive.
- :param scope_kwargs: Optional data to apply to event.
- For supported `**scope_kwargs` see :py:meth:`sentry_sdk.Scope.update_from_kwargs`.
- The `scope` and `scope_kwargs` parameters are mutually exclusive.
- :returns: An `event_id` if the SDK decided to send the event (see :py:meth:`sentry_sdk.client._Client.capture_event`).
- """
- if disable_capture_event.get(False):
- return None
- if level is None:
- level = "info"
- event: "Event" = {
- "message": message,
- "level": level,
- }
- return self.capture_event(event, scope=scope, **scope_kwargs)
- def capture_exception(
- self,
- error: "Optional[Union[BaseException, ExcInfo]]" = None,
- scope: "Optional[Scope]" = None,
- **scope_kwargs: "Any",
- ) -> "Optional[str]":
- """Captures an exception.
- :param error: An exception to capture. If `None`, `sys.exc_info()` will be used.
- :param scope: An optional :py:class:`sentry_sdk.Scope` to apply to events.
- The `scope` and `scope_kwargs` parameters are mutually exclusive.
- :param scope_kwargs: Optional data to apply to event.
- For supported `**scope_kwargs` see :py:meth:`sentry_sdk.Scope.update_from_kwargs`.
- The `scope` and `scope_kwargs` parameters are mutually exclusive.
- :returns: An `event_id` if the SDK decided to send the event (see :py:meth:`sentry_sdk.client._Client.capture_event`).
- """
- if disable_capture_event.get(False):
- return None
- if error is not None:
- exc_info = exc_info_from_error(error)
- else:
- exc_info = sys.exc_info()
- event, hint = event_from_exception(
- exc_info, client_options=self.get_client().options
- )
- try:
- return self.capture_event(event, hint=hint, scope=scope, **scope_kwargs)
- except Exception:
- capture_internal_exception(sys.exc_info())
- return None
- def start_session(self, *args: "Any", **kwargs: "Any") -> None:
- """Starts a new session."""
- session_mode = kwargs.pop("session_mode", "application")
- self.end_session()
- client = self.get_client()
- self._session = Session(
- release=client.options.get("release"),
- environment=client.options.get("environment"),
- user=self._user,
- session_mode=session_mode,
- )
- def end_session(self, *args: "Any", **kwargs: "Any") -> None:
- """Ends the current session if there is one."""
- session = self._session
- self._session = None
- if session is not None:
- session.close()
- self.get_client().capture_session(session)
- def stop_auto_session_tracking(self, *args: "Any", **kwargs: "Any") -> None:
- """Stops automatic session tracking.
- This temporarily session tracking for the current scope when called.
- To resume session tracking call `resume_auto_session_tracking`.
- """
- self.end_session()
- self._force_auto_session_tracking = False
- def resume_auto_session_tracking(self) -> None:
- """Resumes automatic session tracking for the current scope if
- disabled earlier. This requires that generally automatic session
- tracking is enabled.
- """
- self._force_auto_session_tracking = None
- def add_event_processor(
- self,
- func: "EventProcessor",
- ) -> None:
- """Register a scope local event processor on the scope.
- :param func: This function behaves like `before_send.`
- """
- if len(self._event_processors) > 20:
- logger.warning(
- "Too many event processors on scope! Clearing list to free up some memory: %r",
- self._event_processors,
- )
- del self._event_processors[:]
- self._event_processors.append(func)
- def add_error_processor(
- self,
- func: "ErrorProcessor",
- cls: "Optional[Type[BaseException]]" = None,
- ) -> None:
- """Register a scope local error processor on the scope.
- :param func: A callback that works similar to an event processor but is invoked with the original exception info triple as second argument.
- :param cls: Optionally, only process exceptions of this type.
- """
- if cls is not None:
- cls_ = cls # For mypy.
- real_func = func
- def func(event: "Event", exc_info: "ExcInfo") -> "Optional[Event]":
- try:
- is_inst = isinstance(exc_info[1], cls_)
- except Exception:
- is_inst = False
- if is_inst:
- return real_func(event, exc_info)
- return event
- self._error_processors.append(func)
- def _apply_level_to_event(
- self, event: "Event", hint: "Hint", options: "Optional[Dict[str, Any]]"
- ) -> None:
- if self._level is not None:
- event["level"] = self._level
- def _apply_breadcrumbs_to_event(
- self, event: "Event", hint: "Hint", options: "Optional[Dict[str, Any]]"
- ) -> None:
- event.setdefault("breadcrumbs", {})
- # This check is just for mypy -
- if not isinstance(event["breadcrumbs"], AnnotatedValue):
- event["breadcrumbs"].setdefault("values", [])
- event["breadcrumbs"]["values"].extend(self._breadcrumbs)
- # Attempt to sort timestamps
- try:
- if not isinstance(event["breadcrumbs"], AnnotatedValue):
- for crumb in event["breadcrumbs"]["values"]:
- if isinstance(crumb["timestamp"], str):
- crumb["timestamp"] = datetime_from_isoformat(crumb["timestamp"])
- event["breadcrumbs"]["values"].sort(
- key=lambda crumb: crumb["timestamp"]
- )
- except Exception as err:
- logger.debug("Error when sorting breadcrumbs", exc_info=err)
- pass
- def _apply_user_to_event(
- self, event: "Event", hint: "Hint", options: "Optional[Dict[str, Any]]"
- ) -> None:
- if event.get("user") is None and self._user is not None:
- event["user"] = self._user
- def _apply_transaction_name_to_event(
- self, event: "Event", hint: "Hint", options: "Optional[Dict[str, Any]]"
- ) -> None:
- if event.get("transaction") is None and self._transaction is not None:
- event["transaction"] = self._transaction
- def _apply_transaction_info_to_event(
- self, event: "Event", hint: "Hint", options: "Optional[Dict[str, Any]]"
- ) -> None:
- if event.get("transaction_info") is None and self._transaction_info is not None:
- event["transaction_info"] = self._transaction_info
- def _apply_fingerprint_to_event(
- self, event: "Event", hint: "Hint", options: "Optional[Dict[str, Any]]"
- ) -> None:
- if event.get("fingerprint") is None and self._fingerprint is not None:
- event["fingerprint"] = self._fingerprint
- def _apply_extra_to_event(
- self, event: "Event", hint: "Hint", options: "Optional[Dict[str, Any]]"
- ) -> None:
- if self._extras:
- event.setdefault("extra", {}).update(self._extras)
- def _apply_tags_to_event(
- self, event: "Event", hint: "Hint", options: "Optional[Dict[str, Any]]"
- ) -> None:
- if self._tags:
- event.setdefault("tags", {}).update(self._tags)
- def _apply_contexts_to_event(
- self, event: "Event", hint: "Hint", options: "Optional[Dict[str, Any]]"
- ) -> None:
- if self._contexts:
- event.setdefault("contexts", {}).update(self._contexts)
- contexts = event.setdefault("contexts", {})
- # Add "trace" context
- if contexts.get("trace") is None:
- contexts["trace"] = self.get_trace_context()
- def _apply_flags_to_event(
- self, event: "Event", hint: "Hint", options: "Optional[Dict[str, Any]]"
- ) -> None:
- flags = self.flags.get()
- if len(flags) > 0:
- event.setdefault("contexts", {}).setdefault("flags", {}).update(
- {"values": flags}
- )
- def _apply_scope_attributes_to_telemetry(
- self, telemetry: "Union[Log, Metric, StreamedSpan]"
- ) -> None:
- # TODO: turn Logs, Metrics into actual classes
- if isinstance(telemetry, dict):
- attributes = telemetry["attributes"]
- else:
- attributes = telemetry._attributes
- for attribute, value in self._attributes.items():
- if attribute not in attributes:
- attributes[attribute] = value
- def _apply_user_attributes_to_telemetry(
- self, telemetry: "Union[Log, Metric, StreamedSpan]"
- ) -> None:
- if isinstance(telemetry, dict):
- attributes = telemetry["attributes"]
- else:
- attributes = telemetry._attributes
- if not should_send_default_pii() or self._user is None:
- return
- for attribute_name, user_attribute in (
- ("user.id", "id"),
- ("user.name", "username"),
- ("user.email", "email"),
- ):
- if user_attribute in self._user and attribute_name not in attributes:
- attributes[attribute_name] = self._user[user_attribute]
- def _drop(self, cause: "Any", ty: str) -> "Optional[Any]":
- logger.info("%s (%s) dropped event", ty, cause)
- return None
- def run_error_processors(self, event: "Event", hint: "Hint") -> "Optional[Event]":
- """
- Runs the error processors on the event and returns the modified event.
- """
- exc_info = hint.get("exc_info")
- if exc_info is not None:
- error_processors = chain(
- self.get_global_scope()._error_processors,
- self.get_isolation_scope()._error_processors,
- self.get_current_scope()._error_processors,
- )
- for error_processor in error_processors:
- new_event = error_processor(event, exc_info)
- if new_event is None:
- return self._drop(error_processor, "error processor")
- event = new_event
- return event
- def run_event_processors(self, event: "Event", hint: "Hint") -> "Optional[Event]":
- """
- Runs the event processors on the event and returns the modified event.
- """
- ty = event.get("type")
- is_check_in = ty == "check_in"
- if not is_check_in:
- # Get scopes without creating them to prevent infinite recursion
- isolation_scope = _isolation_scope.get()
- current_scope = _current_scope.get()
- event_processors = chain(
- global_event_processors,
- _global_scope and _global_scope._event_processors or [],
- isolation_scope and isolation_scope._event_processors or [],
- current_scope and current_scope._event_processors or [],
- )
- for event_processor in event_processors:
- new_event = event
- with capture_internal_exceptions():
- new_event = event_processor(event, hint)
- if new_event is None:
- return self._drop(event_processor, "event processor")
- event = new_event
- return event
- @_disable_capture
- def apply_to_event(
- self,
- event: "Event",
- hint: "Hint",
- options: "Optional[Dict[str, Any]]" = None,
- ) -> "Optional[Event]":
- """Applies the information contained on the scope to the given event."""
- ty = event.get("type")
- is_transaction = ty == "transaction"
- is_check_in = ty == "check_in"
- # put all attachments into the hint. This lets callbacks play around
- # with attachments. We also later pull this out of the hint when we
- # create the envelope.
- attachments_to_send = hint.get("attachments") or []
- for attachment in self._attachments:
- if not is_transaction or attachment.add_to_transactions:
- attachments_to_send.append(attachment)
- hint["attachments"] = attachments_to_send
- self._apply_contexts_to_event(event, hint, options)
- if is_check_in:
- # Check-ins only support the trace context, strip all others
- event["contexts"] = {
- "trace": event.setdefault("contexts", {}).get("trace", {})
- }
- if not is_check_in:
- self._apply_level_to_event(event, hint, options)
- self._apply_fingerprint_to_event(event, hint, options)
- self._apply_user_to_event(event, hint, options)
- self._apply_transaction_name_to_event(event, hint, options)
- self._apply_transaction_info_to_event(event, hint, options)
- self._apply_tags_to_event(event, hint, options)
- self._apply_extra_to_event(event, hint, options)
- if not is_transaction and not is_check_in:
- self._apply_breadcrumbs_to_event(event, hint, options)
- self._apply_flags_to_event(event, hint, options)
- event = self.run_error_processors(event, hint)
- if event is None:
- return None
- event = self.run_event_processors(event, hint)
- if event is None:
- return None
- return event
- @_disable_capture
- def apply_to_telemetry(self, telemetry: "Union[Log, Metric, StreamedSpan]") -> None:
- # Attributes-based events and telemetry go through here (logs, metrics,
- # spansV2)
- if not isinstance(telemetry, StreamedSpan):
- trace_context = self.get_trace_context()
- trace_id = trace_context.get("trace_id")
- if telemetry.get("trace_id") is None:
- telemetry["trace_id"] = (
- trace_id or "00000000-0000-0000-0000-000000000000"
- )
- span_id = trace_context.get("span_id")
- if telemetry.get("span_id") is None and span_id:
- telemetry["span_id"] = span_id
- self._apply_scope_attributes_to_telemetry(telemetry)
- self._apply_user_attributes_to_telemetry(telemetry)
- def update_from_scope(self, scope: "Scope") -> None:
- """Update the scope with another scope's data."""
- if scope._level is not None:
- self._level = scope._level
- if scope._fingerprint is not None:
- self._fingerprint = scope._fingerprint
- if scope._transaction is not None:
- self._transaction = scope._transaction
- if scope._transaction_info is not None:
- self._transaction_info.update(scope._transaction_info)
- if scope._user is not None:
- self._user = scope._user
- if scope._tags:
- self._tags.update(scope._tags)
- if scope._contexts:
- self._contexts.update(scope._contexts)
- if scope._extras:
- self._extras.update(scope._extras)
- if scope._breadcrumbs:
- self._breadcrumbs.extend(scope._breadcrumbs)
- if scope._n_breadcrumbs_truncated:
- self._n_breadcrumbs_truncated = (
- self._n_breadcrumbs_truncated + scope._n_breadcrumbs_truncated
- )
- if scope._gen_ai_original_message_count:
- self._gen_ai_original_message_count.update(
- scope._gen_ai_original_message_count
- )
- if scope._gen_ai_conversation_id:
- self._gen_ai_conversation_id = scope._gen_ai_conversation_id
- if scope._span:
- self._span = scope._span
- if scope._attachments:
- self._attachments.extend(scope._attachments)
- if scope._profile:
- self._profile = scope._profile
- if scope._propagation_context:
- self._propagation_context = scope._propagation_context
- if scope._session:
- self._session = scope._session
- if scope._flags:
- if not self._flags:
- self._flags = deepcopy(scope._flags)
- else:
- for flag in scope._flags.get():
- self._flags.set(flag["flag"], flag["result"])
- if scope._attributes:
- self._attributes.update(scope._attributes)
- def update_from_kwargs(
- self,
- user: "Optional[Any]" = None,
- level: "Optional[LogLevelStr]" = None,
- extras: "Optional[Dict[str, Any]]" = None,
- contexts: "Optional[Dict[str, Dict[str, Any]]]" = None,
- tags: "Optional[Dict[str, str]]" = None,
- fingerprint: "Optional[List[str]]" = None,
- attributes: "Optional[Attributes]" = None,
- ) -> None:
- """Update the scope's attributes."""
- if level is not None:
- self._level = level
- if user is not None:
- self._user = user
- if extras is not None:
- self._extras.update(extras)
- if contexts is not None:
- self._contexts.update(contexts)
- if tags is not None:
- self._tags.update(tags)
- if fingerprint is not None:
- self._fingerprint = fingerprint
- if attributes is not None:
- self._attributes.update(attributes)
- def __repr__(self) -> str:
- return "<%s id=%s name=%s type=%s>" % (
- self.__class__.__name__,
- hex(id(self)),
- self._name,
- self._type,
- )
- @property
- def flags(self) -> "FlagBuffer":
- if self._flags is None:
- max_flags = (
- self.get_client().options["_experiments"].get("max_flags")
- or DEFAULT_FLAG_CAPACITY
- )
- self._flags = FlagBuffer(capacity=max_flags)
- return self._flags
- def set_attribute(self, attribute: str, value: "AttributeValue") -> None:
- """
- Set an attribute on the scope.
- Any attributes-based telemetry (logs, metrics) captured while this scope
- is active will inherit attributes set on the scope.
- """
- self._attributes[attribute] = format_attribute(value)
- def remove_attribute(self, attribute: str) -> None:
- """Remove an attribute if set on the scope. No-op if there is no such attribute."""
- try:
- del self._attributes[attribute]
- except KeyError:
- pass
- @contextmanager
- def new_scope() -> "Generator[Scope, None, None]":
- """
- .. versionadded:: 2.0.0
- Context manager that forks the current scope and runs the wrapped code in it.
- After the wrapped code is executed, the original scope is restored.
- Example Usage:
- .. code-block:: python
- import sentry_sdk
- with sentry_sdk.new_scope() as scope:
- scope.set_tag("color", "green")
- sentry_sdk.capture_message("hello") # will include `color` tag.
- sentry_sdk.capture_message("hello, again") # will NOT include `color` tag.
- """
- # fork current scope
- current_scope = Scope.get_current_scope()
- new_scope = current_scope.fork()
- token = _current_scope.set(new_scope)
- try:
- yield new_scope
- finally:
- try:
- # restore original scope
- _current_scope.reset(token)
- except (LookupError, ValueError):
- capture_internal_exception(sys.exc_info())
- @contextmanager
- def use_scope(scope: "Scope") -> "Generator[Scope, None, None]":
- """
- .. versionadded:: 2.0.0
- Context manager that uses the given `scope` and runs the wrapped code in it.
- After the wrapped code is executed, the original scope is restored.
- Example Usage:
- Suppose the variable `scope` contains a `Scope` object, which is not currently
- the active scope.
- .. code-block:: python
- import sentry_sdk
- with sentry_sdk.use_scope(scope):
- scope.set_tag("color", "green")
- sentry_sdk.capture_message("hello") # will include `color` tag.
- sentry_sdk.capture_message("hello, again") # will NOT include `color` tag.
- """
- # set given scope as current scope
- token = _current_scope.set(scope)
- try:
- yield scope
- finally:
- try:
- # restore original scope
- _current_scope.reset(token)
- except (LookupError, ValueError):
- capture_internal_exception(sys.exc_info())
- @contextmanager
- def isolation_scope() -> "Generator[Scope, None, None]":
- """
- .. versionadded:: 2.0.0
- Context manager that forks the current isolation scope and runs the wrapped code in it.
- The current scope is also forked to not bleed data into the existing current scope.
- After the wrapped code is executed, the original scopes are restored.
- Example Usage:
- .. code-block:: python
- import sentry_sdk
- with sentry_sdk.isolation_scope() as scope:
- scope.set_tag("color", "green")
- sentry_sdk.capture_message("hello") # will include `color` tag.
- sentry_sdk.capture_message("hello, again") # will NOT include `color` tag.
- """
- # fork current scope
- current_scope = Scope.get_current_scope()
- forked_current_scope = current_scope.fork()
- current_token = _current_scope.set(forked_current_scope)
- # fork isolation scope
- isolation_scope = Scope.get_isolation_scope()
- new_isolation_scope = isolation_scope.fork()
- isolation_token = _isolation_scope.set(new_isolation_scope)
- try:
- yield new_isolation_scope
- finally:
- # restore original scopes
- try:
- _current_scope.reset(current_token)
- except (LookupError, ValueError):
- capture_internal_exception(sys.exc_info())
- try:
- _isolation_scope.reset(isolation_token)
- except (LookupError, ValueError):
- capture_internal_exception(sys.exc_info())
- @contextmanager
- def use_isolation_scope(isolation_scope: "Scope") -> "Generator[Scope, None, None]":
- """
- .. versionadded:: 2.0.0
- Context manager that uses the given `isolation_scope` and runs the wrapped code in it.
- The current scope is also forked to not bleed data into the existing current scope.
- After the wrapped code is executed, the original scopes are restored.
- Example Usage:
- .. code-block:: python
- import sentry_sdk
- with sentry_sdk.isolation_scope() as scope:
- scope.set_tag("color", "green")
- sentry_sdk.capture_message("hello") # will include `color` tag.
- sentry_sdk.capture_message("hello, again") # will NOT include `color` tag.
- """
- # fork current scope
- current_scope = Scope.get_current_scope()
- forked_current_scope = current_scope.fork()
- current_token = _current_scope.set(forked_current_scope)
- # set given scope as isolation scope
- isolation_token = _isolation_scope.set(isolation_scope)
- try:
- yield isolation_scope
- finally:
- # restore original scopes
- try:
- _current_scope.reset(current_token)
- except (LookupError, ValueError):
- capture_internal_exception(sys.exc_info())
- try:
- _isolation_scope.reset(isolation_token)
- except (LookupError, ValueError):
- capture_internal_exception(sys.exc_info())
- def should_send_default_pii() -> bool:
- """Shortcut for `Scope.get_client().should_send_default_pii()`."""
- return Scope.get_client().should_send_default_pii()
- # Circular imports
- from sentry_sdk.client import NonRecordingClient
- if TYPE_CHECKING:
- import sentry_sdk.client
|