| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164 |
- import base64
- import contextvars
- import json
- import linecache
- import logging
- import math
- import os
- import copy
- import random
- import re
- import subprocess
- import sys
- import threading
- import time
- from collections import namedtuple
- from contextlib import contextmanager
- from datetime import datetime, timezone
- from decimal import Decimal
- from functools import partial, partialmethod, wraps
- from numbers import Real
- from urllib.parse import parse_qs, unquote, urlencode, urlsplit, urlunsplit
- try:
- # Python 3.11
- from builtins import BaseExceptionGroup
- except ImportError:
- # Python 3.10 and below
- BaseExceptionGroup = None # type: ignore
- from typing import TYPE_CHECKING
- import sentry_sdk
- from sentry_sdk._compat import PY37
- from sentry_sdk._types import SENSITIVE_DATA_SUBSTITUTE, Annotated, AnnotatedValue
- from sentry_sdk.consts import (
- DEFAULT_ADD_FULL_STACK,
- DEFAULT_MAX_STACK_FRAMES,
- DEFAULT_MAX_VALUE_LENGTH,
- EndpointType,
- )
- if TYPE_CHECKING:
- from types import FrameType, TracebackType
- from typing import (
- Any,
- Callable,
- ContextManager,
- Dict,
- Generator,
- Iterator,
- List,
- NoReturn,
- Optional,
- ParamSpec,
- Set,
- Tuple,
- Type,
- TypeVar,
- Union,
- cast,
- overload,
- )
- from gevent.hub import Hub
- from sentry_sdk._types import (
- AttributeValue,
- SerializedAttributeValue,
- Event,
- ExcInfo,
- Hint,
- Log,
- Metric,
- )
- P = ParamSpec("P")
- R = TypeVar("R")
- epoch = datetime(1970, 1, 1)
- # The logger is created here but initialized in the debug support module
- logger = logging.getLogger("sentry_sdk.errors")
- _installed_modules = None
- _is_sentry_internal_task = contextvars.ContextVar(
- "is_sentry_internal_task", default=False
- )
- def is_internal_task() -> bool:
- return _is_sentry_internal_task.get()
- @contextmanager
- def mark_sentry_task_internal() -> "Generator[None, None, None]":
- """Context manager to mark a task as Sentry internal."""
- token = _is_sentry_internal_task.set(True)
- try:
- yield
- finally:
- _is_sentry_internal_task.reset(token)
- BASE64_ALPHABET = re.compile(r"^[a-zA-Z0-9/+=]*$")
- FALSY_ENV_VALUES = frozenset(("false", "f", "n", "no", "off", "0"))
- TRUTHY_ENV_VALUES = frozenset(("true", "t", "y", "yes", "on", "1"))
- MAX_STACK_FRAMES = 2000
- """Maximum number of stack frames to send to Sentry.
- If we have more than this number of stack frames, we will stop processing
- the stacktrace to avoid getting stuck in a long-lasting loop. This value
- exceeds the default sys.getrecursionlimit() of 1000, so users will only
- be affected by this limit if they have a custom recursion limit.
- """
- def env_to_bool(value: "Any", *, strict: "Optional[bool]" = False) -> "bool | None":
- """Casts an ENV variable value to boolean using the constants defined above.
- In strict mode, it may return None if the value doesn't match any of the predefined values.
- """
- normalized = str(value).lower() if value is not None else None
- if normalized in FALSY_ENV_VALUES:
- return False
- if normalized in TRUTHY_ENV_VALUES:
- return True
- return None if strict else bool(value)
- def json_dumps(data: "Any") -> bytes:
- """Serialize data into a compact JSON representation encoded as UTF-8."""
- return json.dumps(data, allow_nan=False, separators=(",", ":")).encode("utf-8")
- def get_git_revision() -> "Optional[str]":
- try:
- with open(os.path.devnull, "w+") as null:
- # prevent command prompt windows from popping up on windows
- startupinfo = None
- if sys.platform == "win32" or sys.platform == "cygwin":
- startupinfo = subprocess.STARTUPINFO()
- startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
- revision = (
- subprocess.Popen(
- ["git", "rev-parse", "HEAD"],
- startupinfo=startupinfo,
- stdout=subprocess.PIPE,
- stderr=null,
- stdin=null,
- )
- .communicate()[0]
- .strip()
- .decode("utf-8")
- )
- except (OSError, IOError, FileNotFoundError):
- return None
- return revision
- def get_default_release() -> "Optional[str]":
- """Try to guess a default release."""
- release = os.environ.get("SENTRY_RELEASE")
- if release:
- return release
- release = get_git_revision()
- if release:
- return release
- for var in (
- "HEROKU_BUILD_COMMIT",
- "HEROKU_SLUG_COMMIT", # deprecated by Heroku, kept for backward compatibility
- "SOURCE_VERSION",
- "CODEBUILD_RESOLVED_SOURCE_VERSION",
- "CIRCLE_SHA1",
- "GAE_DEPLOYMENT_ID",
- "K_REVISION",
- ):
- release = os.environ.get(var)
- if release:
- return release
- return None
- def get_sdk_name(installed_integrations: "List[str]") -> str:
- """Return the SDK name including the name of the used web framework."""
- # Note: I can not use for example sentry_sdk.integrations.django.DjangoIntegration.identifier
- # here because if django is not installed the integration is not accessible.
- framework_integrations = [
- "django",
- "flask",
- "fastapi",
- "bottle",
- "falcon",
- "quart",
- "sanic",
- "starlette",
- "litestar",
- "starlite",
- "chalice",
- "serverless",
- "pyramid",
- "tornado",
- "aiohttp",
- "aws_lambda",
- "gcp",
- "beam",
- "asgi",
- "wsgi",
- ]
- for integration in framework_integrations:
- if integration in installed_integrations:
- return "sentry.python.{}".format(integration)
- return "sentry.python"
- class CaptureInternalException:
- __slots__ = ()
- def __enter__(self) -> "ContextManager[Any]":
- return self
- def __exit__(
- self,
- ty: "Optional[Type[BaseException]]",
- value: "Optional[BaseException]",
- tb: "Optional[TracebackType]",
- ) -> bool:
- if ty is not None and value is not None:
- capture_internal_exception((ty, value, tb))
- return True
- _CAPTURE_INTERNAL_EXCEPTION = CaptureInternalException()
- def capture_internal_exceptions() -> "ContextManager[Any]":
- return _CAPTURE_INTERNAL_EXCEPTION
- def capture_internal_exception(exc_info: "ExcInfo") -> None:
- """
- Capture an exception that is likely caused by a bug in the SDK
- itself.
- These exceptions do not end up in Sentry and are just logged instead.
- """
- if sentry_sdk.get_client().is_active():
- logger.error("Internal error in sentry_sdk", exc_info=exc_info)
- def to_timestamp(value: "datetime") -> float:
- return (value - epoch).total_seconds()
- def format_timestamp(value: "datetime") -> str:
- """Formats a timestamp in RFC 3339 format.
- Any datetime objects with a non-UTC timezone are converted to UTC, so that all timestamps are formatted in UTC.
- """
- utctime = value.astimezone(timezone.utc)
- # We use this custom formatting rather than isoformat for backwards compatibility (we have used this format for
- # several years now), and isoformat is slightly different.
- return utctime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
- ISO_TZ_SEPARATORS = frozenset(("+", "-"))
- def datetime_from_isoformat(value: str) -> "datetime":
- try:
- result = datetime.fromisoformat(value)
- except (AttributeError, ValueError):
- # py 3.6
- timestamp_format = (
- "%Y-%m-%dT%H:%M:%S.%f" if "." in value else "%Y-%m-%dT%H:%M:%S"
- )
- if value.endswith("Z"):
- value = value[:-1] + "+0000"
- if value[-6] in ISO_TZ_SEPARATORS:
- timestamp_format += "%z"
- value = value[:-3] + value[-2:]
- elif value[-5] in ISO_TZ_SEPARATORS:
- timestamp_format += "%z"
- result = datetime.strptime(value, timestamp_format)
- return result.astimezone(timezone.utc)
- def event_hint_with_exc_info(
- exc_info: "Optional[ExcInfo]" = None,
- ) -> "Dict[str, Optional[ExcInfo]]":
- """Creates a hint with the exc info filled in."""
- if exc_info is None:
- exc_info = sys.exc_info()
- else:
- exc_info = exc_info_from_error(exc_info)
- if exc_info[0] is None:
- exc_info = None
- return {"exc_info": exc_info}
- class BadDsn(ValueError):
- """Raised on invalid DSNs."""
- class Dsn:
- """Represents a DSN."""
- ORG_ID_REGEX = re.compile(r"^o(\d+)\.")
- def __init__(
- self, value: "Union[Dsn, str]", org_id: "Optional[str]" = None
- ) -> None:
- if isinstance(value, Dsn):
- self.__dict__ = dict(value.__dict__)
- return
- parts = urlsplit(str(value))
- if parts.scheme not in ("http", "https"):
- raise BadDsn("Unsupported scheme %r" % parts.scheme)
- self.scheme = parts.scheme
- if parts.hostname is None:
- raise BadDsn("Missing hostname")
- self.host = parts.hostname
- if org_id is not None:
- self.org_id: "Optional[str]" = org_id
- else:
- org_id_match = Dsn.ORG_ID_REGEX.match(self.host)
- self.org_id = org_id_match.group(1) if org_id_match else None
- if parts.port is None:
- self.port: int = self.scheme == "https" and 443 or 80
- else:
- self.port = parts.port
- if not parts.username:
- raise BadDsn("Missing public key")
- self.public_key = parts.username
- self.secret_key = parts.password
- path = parts.path.rsplit("/", 1)
- try:
- self.project_id = str(int(path.pop()))
- except (ValueError, TypeError):
- raise BadDsn("Invalid project in DSN (%r)" % (parts.path or "")[1:])
- self.path = "/".join(path) + "/"
- @property
- def netloc(self) -> str:
- """The netloc part of a DSN."""
- rv = self.host
- if (self.scheme, self.port) not in (("http", 80), ("https", 443)):
- rv = "%s:%s" % (rv, self.port)
- return rv
- def to_auth(self, client: "Optional[Any]" = None) -> "Auth":
- """Returns the auth info object for this dsn."""
- return Auth(
- scheme=self.scheme,
- host=self.netloc,
- path=self.path,
- project_id=self.project_id,
- public_key=self.public_key,
- secret_key=self.secret_key,
- client=client,
- )
- def __str__(self) -> str:
- return "%s://%s%s@%s%s%s" % (
- self.scheme,
- self.public_key,
- self.secret_key and "@" + self.secret_key or "",
- self.netloc,
- self.path,
- self.project_id,
- )
- class Auth:
- """Helper object that represents the auth info."""
- def __init__(
- self,
- scheme: str,
- host: str,
- project_id: str,
- public_key: str,
- secret_key: "Optional[str]" = None,
- version: int = 7,
- client: "Optional[Any]" = None,
- path: str = "/",
- ) -> None:
- self.scheme = scheme
- self.host = host
- self.path = path
- self.project_id = project_id
- self.public_key = public_key
- self.secret_key = secret_key
- self.version = version
- self.client = client
- def get_api_url(
- self,
- type: "EndpointType" = EndpointType.ENVELOPE,
- ) -> str:
- """Returns the API url for storing events."""
- return "%s://%s%sapi/%s/%s/" % (
- self.scheme,
- self.host,
- self.path,
- self.project_id,
- type.value,
- )
- def to_header(self) -> str:
- """Returns the auth header a string."""
- rv = [("sentry_key", self.public_key), ("sentry_version", self.version)]
- if self.client is not None:
- rv.append(("sentry_client", self.client))
- if self.secret_key is not None:
- rv.append(("sentry_secret", self.secret_key))
- return "Sentry " + ", ".join("%s=%s" % (key, value) for key, value in rv)
- def get_type_name(cls: "Optional[type]") -> "Optional[str]":
- return getattr(cls, "__qualname__", None) or getattr(cls, "__name__", None)
- def get_type_module(cls: "Optional[type]") -> "Optional[str]":
- mod = getattr(cls, "__module__", None)
- if mod not in (None, "builtins", "__builtins__"):
- return mod
- return None
- def should_hide_frame(frame: "FrameType") -> bool:
- try:
- mod = frame.f_globals["__name__"]
- if mod.startswith("sentry_sdk."):
- return True
- except (AttributeError, KeyError):
- pass
- for flag_name in "__traceback_hide__", "__tracebackhide__":
- try:
- if frame.f_locals[flag_name]:
- return True
- except Exception:
- pass
- return False
- def iter_stacks(tb: "Optional[TracebackType]") -> "Iterator[TracebackType]":
- tb_: "Optional[TracebackType]" = tb
- while tb_ is not None:
- if not should_hide_frame(tb_.tb_frame):
- yield tb_
- tb_ = tb_.tb_next
- def get_lines_from_file(
- filename: str,
- lineno: int,
- max_length: "Optional[int]" = None,
- loader: "Optional[Any]" = None,
- module: "Optional[str]" = None,
- ) -> "Tuple[List[Annotated[str]], Optional[Annotated[str]], List[Annotated[str]]]":
- context_lines = 5
- source = None
- if loader is not None and hasattr(loader, "get_source"):
- try:
- source_str: "Optional[str]" = loader.get_source(module)
- except (ImportError, IOError):
- source_str = None
- if source_str is not None:
- source = source_str.splitlines()
- if source is None:
- try:
- source = linecache.getlines(filename)
- except (OSError, IOError):
- return [], None, []
- if not source:
- return [], None, []
- lower_bound = max(0, lineno - context_lines)
- upper_bound = min(lineno + 1 + context_lines, len(source))
- try:
- pre_context = [
- strip_string(line.strip("\r\n"), max_length=max_length)
- for line in source[lower_bound:lineno]
- ]
- context_line = strip_string(source[lineno].strip("\r\n"), max_length=max_length)
- post_context = [
- strip_string(line.strip("\r\n"), max_length=max_length)
- for line in source[(lineno + 1) : upper_bound]
- ]
- return pre_context, context_line, post_context
- except IndexError:
- # the file may have changed since it was loaded into memory
- return [], None, []
- def get_source_context(
- frame: "FrameType",
- tb_lineno: "Optional[int]",
- max_value_length: "Optional[int]" = None,
- ) -> "Tuple[List[Annotated[str]], Optional[Annotated[str]], List[Annotated[str]]]":
- try:
- abs_path: "Optional[str]" = frame.f_code.co_filename
- except Exception:
- abs_path = None
- try:
- module = frame.f_globals["__name__"]
- except Exception:
- return [], None, []
- try:
- loader = frame.f_globals["__loader__"]
- except Exception:
- loader = None
- if tb_lineno is not None and abs_path:
- lineno = tb_lineno - 1
- return get_lines_from_file(
- abs_path, lineno, max_value_length, loader=loader, module=module
- )
- return [], None, []
- def safe_str(value: "Any") -> str:
- try:
- return str(value)
- except Exception:
- return safe_repr(value)
- def safe_repr(value: "Any") -> str:
- try:
- return repr(value)
- except Exception:
- return "<broken repr>"
- def filename_for_module(
- module: "Optional[str]", abs_path: "Optional[str]"
- ) -> "Optional[str]":
- if not abs_path or not module:
- return abs_path
- try:
- if abs_path.endswith(".pyc"):
- abs_path = abs_path[:-1]
- base_module = module.split(".", 1)[0]
- if base_module == module:
- return os.path.basename(abs_path)
- base_module_path = sys.modules[base_module].__file__
- if not base_module_path:
- return abs_path
- return abs_path.split(base_module_path.rsplit(os.sep, 2)[0], 1)[-1].lstrip(
- os.sep
- )
- except Exception:
- return abs_path
- def serialize_frame(
- frame: "FrameType",
- tb_lineno: "Optional[int]" = None,
- include_local_variables: bool = True,
- include_source_context: bool = True,
- max_value_length: "Optional[int]" = None,
- custom_repr: "Optional[Callable[..., Optional[str]]]" = None,
- ) -> "Dict[str, Any]":
- f_code = getattr(frame, "f_code", None)
- if not f_code:
- abs_path = None
- function = None
- else:
- abs_path = frame.f_code.co_filename
- function = frame.f_code.co_name
- try:
- module = frame.f_globals["__name__"]
- except Exception:
- module = None
- if tb_lineno is None:
- tb_lineno = frame.f_lineno
- try:
- os_abs_path = os.path.abspath(abs_path) if abs_path else None
- except Exception:
- os_abs_path = None
- rv: "Dict[str, Any]" = {
- "filename": filename_for_module(module, abs_path) or None,
- "abs_path": os_abs_path,
- "function": function or "<unknown>",
- "module": module,
- "lineno": tb_lineno,
- }
- if include_source_context:
- rv["pre_context"], rv["context_line"], rv["post_context"] = get_source_context(
- frame, tb_lineno, max_value_length
- )
- if include_local_variables:
- from sentry_sdk.serializer import serialize
- rv["vars"] = serialize(
- dict(frame.f_locals), is_vars=True, custom_repr=custom_repr
- )
- return rv
- def current_stacktrace(
- include_local_variables: bool = True,
- include_source_context: bool = True,
- max_value_length: "Optional[int]" = None,
- ) -> "Dict[str, Any]":
- __tracebackhide__ = True
- frames = []
- f: "Optional[FrameType]" = sys._getframe()
- while f is not None:
- if not should_hide_frame(f):
- frames.append(
- serialize_frame(
- f,
- include_local_variables=include_local_variables,
- include_source_context=include_source_context,
- max_value_length=max_value_length,
- )
- )
- f = f.f_back
- frames.reverse()
- return {"frames": frames}
- def get_errno(exc_value: BaseException) -> "Optional[Any]":
- return getattr(exc_value, "errno", None)
- def get_error_message(exc_value: "Optional[BaseException]") -> str:
- message: str = safe_str(
- getattr(exc_value, "message", "")
- or getattr(exc_value, "detail", "")
- or safe_str(exc_value)
- )
- # __notes__ should be a list of strings when notes are added
- # via add_note, but can be anything else if __notes__ is set
- # directly. We only support strings in __notes__, since that
- # is the correct use.
- notes: object = getattr(exc_value, "__notes__", None)
- if isinstance(notes, list) and len(notes) > 0:
- message += "\n" + "\n".join(note for note in notes if isinstance(note, str))
- return message
- def single_exception_from_error_tuple(
- exc_type: "Optional[type]",
- exc_value: "Optional[BaseException]",
- tb: "Optional[TracebackType]",
- client_options: "Optional[Dict[str, Any]]" = None,
- mechanism: "Optional[Dict[str, Any]]" = None,
- exception_id: "Optional[int]" = None,
- parent_id: "Optional[int]" = None,
- source: "Optional[str]" = None,
- full_stack: "Optional[list[dict[str, Any]]]" = None,
- ) -> "Dict[str, Any]":
- """
- Creates a dict that goes into the events `exception.values` list and is ingestible by Sentry.
- See the Exception Interface documentation for more details:
- https://develop.sentry.dev/sdk/event-payloads/exception/
- """
- exception_value: "Dict[str, Any]" = {}
- exception_value["mechanism"] = (
- mechanism.copy() if mechanism else {"type": "generic", "handled": True}
- )
- if exception_id is not None:
- exception_value["mechanism"]["exception_id"] = exception_id
- if exc_value is not None:
- errno = get_errno(exc_value)
- else:
- errno = None
- if errno is not None:
- exception_value["mechanism"].setdefault("meta", {}).setdefault(
- "errno", {}
- ).setdefault("number", errno)
- if source is not None:
- exception_value["mechanism"]["source"] = source
- is_root_exception = exception_id == 0
- if not is_root_exception and parent_id is not None:
- exception_value["mechanism"]["parent_id"] = parent_id
- exception_value["mechanism"]["type"] = "chained"
- if is_root_exception and "type" not in exception_value["mechanism"]:
- exception_value["mechanism"]["type"] = "generic"
- is_exception_group = BaseExceptionGroup is not None and isinstance(
- exc_value, BaseExceptionGroup
- )
- if is_exception_group:
- exception_value["mechanism"]["is_exception_group"] = True
- exception_value["module"] = get_type_module(exc_type)
- exception_value["type"] = get_type_name(exc_type)
- exception_value["value"] = get_error_message(exc_value)
- if client_options is None:
- include_local_variables = True
- include_source_context = True
- max_value_length = DEFAULT_MAX_VALUE_LENGTH # fallback
- custom_repr = None
- else:
- include_local_variables = client_options["include_local_variables"]
- include_source_context = client_options["include_source_context"]
- max_value_length = client_options["max_value_length"]
- custom_repr = client_options.get("custom_repr")
- frames: "List[Dict[str, Any]]" = [
- serialize_frame(
- tb.tb_frame,
- tb_lineno=tb.tb_lineno,
- include_local_variables=include_local_variables,
- include_source_context=include_source_context,
- max_value_length=max_value_length,
- custom_repr=custom_repr,
- )
- # Process at most MAX_STACK_FRAMES + 1 frames, to avoid hanging on
- # processing a super-long stacktrace.
- for tb, _ in zip(iter_stacks(tb), range(MAX_STACK_FRAMES + 1))
- ]
- if len(frames) > MAX_STACK_FRAMES:
- # If we have more frames than the limit, we remove the stacktrace completely.
- # We don't trim the stacktrace here because we have not processed the whole
- # thing (see above, we stop at MAX_STACK_FRAMES + 1). Normally, Relay would
- # intelligently trim by removing frames in the middle of the stacktrace, but
- # since we don't have the whole stacktrace, we can't do that. Instead, we
- # drop the entire stacktrace.
- exception_value["stacktrace"] = AnnotatedValue.removed_because_over_size_limit(
- value=None
- )
- elif frames:
- if not full_stack:
- new_frames = frames
- else:
- new_frames = merge_stack_frames(frames, full_stack, client_options)
- exception_value["stacktrace"] = {"frames": new_frames}
- return exception_value
- HAS_CHAINED_EXCEPTIONS = hasattr(Exception, "__suppress_context__")
- if HAS_CHAINED_EXCEPTIONS:
- def walk_exception_chain(exc_info: "ExcInfo") -> "Iterator[ExcInfo]":
- exc_type, exc_value, tb = exc_info
- seen_exceptions = []
- seen_exception_ids: "Set[int]" = set()
- while (
- exc_type is not None
- and exc_value is not None
- and id(exc_value) not in seen_exception_ids
- ):
- yield exc_type, exc_value, tb
- # Avoid hashing random types we don't know anything
- # about. Use the list to keep a ref so that the `id` is
- # not used for another object.
- seen_exceptions.append(exc_value)
- seen_exception_ids.add(id(exc_value))
- if exc_value.__suppress_context__:
- cause = exc_value.__cause__
- else:
- cause = exc_value.__context__
- if cause is None:
- break
- exc_type = type(cause)
- exc_value = cause
- tb = getattr(cause, "__traceback__", None)
- else:
- def walk_exception_chain(exc_info: "ExcInfo") -> "Iterator[ExcInfo]":
- yield exc_info
- def exceptions_from_error(
- exc_type: "Optional[type]",
- exc_value: "Optional[BaseException]",
- tb: "Optional[TracebackType]",
- client_options: "Optional[Dict[str, Any]]" = None,
- mechanism: "Optional[Dict[str, Any]]" = None,
- exception_id: int = 0,
- parent_id: int = 0,
- source: "Optional[str]" = None,
- full_stack: "Optional[list[dict[str, Any]]]" = None,
- seen_exceptions: "Optional[list[BaseException]]" = None,
- seen_exception_ids: "Optional[Set[int]]" = None,
- ) -> "Tuple[int, List[Dict[str, Any]]]":
- """
- Creates the list of exceptions.
- This can include chained exceptions and exceptions from an ExceptionGroup.
- See the Exception Interface documentation for more details:
- https://develop.sentry.dev/sdk/event-payloads/exception/
- Args:
- exception_id (int):
- Sequential counter for assigning ``mechanism.exception_id``
- to each processed exception. Is NOT the result of calling `id()` on the exception itself.
- parent_id (int):
- The ``mechanism.exception_id`` of the parent exception.
- Written into ``mechanism.parent_id`` in the event payload so Sentry can
- reconstruct the exception tree.
- Not to be confused with ``seen_exception_ids``, which tracks Python ``id()``
- values for cycle detection.
- """
- if seen_exception_ids is None:
- seen_exception_ids = set()
- if seen_exceptions is None:
- seen_exceptions = []
- if exc_value is not None and id(exc_value) in seen_exception_ids:
- return (exception_id, [])
- if exc_value is not None:
- seen_exceptions.append(exc_value)
- seen_exception_ids.add(id(exc_value))
- parent = single_exception_from_error_tuple(
- exc_type=exc_type,
- exc_value=exc_value,
- tb=tb,
- client_options=client_options,
- mechanism=mechanism,
- exception_id=exception_id,
- parent_id=parent_id,
- source=source,
- full_stack=full_stack,
- )
- exceptions = [parent]
- parent_id = exception_id
- exception_id += 1
- should_supress_context = (
- hasattr(exc_value, "__suppress_context__") and exc_value.__suppress_context__ # type: ignore
- )
- if should_supress_context:
- # Add direct cause.
- # The field `__cause__` is set when raised with the exception (using the `from` keyword).
- exception_has_cause = (
- exc_value
- and hasattr(exc_value, "__cause__")
- and exc_value.__cause__ is not None
- )
- if exception_has_cause:
- cause = exc_value.__cause__ # type: ignore
- (exception_id, child_exceptions) = exceptions_from_error(
- exc_type=type(cause),
- exc_value=cause,
- tb=getattr(cause, "__traceback__", None),
- client_options=client_options,
- mechanism=mechanism,
- exception_id=exception_id,
- source="__cause__",
- full_stack=full_stack,
- seen_exceptions=seen_exceptions,
- seen_exception_ids=seen_exception_ids,
- )
- exceptions.extend(child_exceptions)
- else:
- # Add indirect cause.
- # The field `__context__` is assigned if another exception occurs while handling the exception.
- exception_has_content = (
- exc_value
- and hasattr(exc_value, "__context__")
- and exc_value.__context__ is not None
- )
- if exception_has_content:
- context = exc_value.__context__ # type: ignore
- (exception_id, child_exceptions) = exceptions_from_error(
- exc_type=type(context),
- exc_value=context,
- tb=getattr(context, "__traceback__", None),
- client_options=client_options,
- mechanism=mechanism,
- exception_id=exception_id,
- source="__context__",
- full_stack=full_stack,
- seen_exceptions=seen_exceptions,
- seen_exception_ids=seen_exception_ids,
- )
- exceptions.extend(child_exceptions)
- # Add exceptions from an ExceptionGroup.
- is_exception_group = exc_value and hasattr(exc_value, "exceptions")
- if is_exception_group:
- for idx, e in enumerate(exc_value.exceptions): # type: ignore
- (exception_id, child_exceptions) = exceptions_from_error(
- exc_type=type(e),
- exc_value=e,
- tb=getattr(e, "__traceback__", None),
- client_options=client_options,
- mechanism=mechanism,
- exception_id=exception_id,
- parent_id=parent_id,
- source="exceptions[%s]" % idx,
- full_stack=full_stack,
- seen_exceptions=seen_exceptions,
- seen_exception_ids=seen_exception_ids,
- )
- exceptions.extend(child_exceptions)
- return (exception_id, exceptions)
- def exceptions_from_error_tuple(
- exc_info: "ExcInfo",
- client_options: "Optional[Dict[str, Any]]" = None,
- mechanism: "Optional[Dict[str, Any]]" = None,
- full_stack: "Optional[list[dict[str, Any]]]" = None,
- ) -> "List[Dict[str, Any]]":
- exc_type, exc_value, tb = exc_info
- is_exception_group = BaseExceptionGroup is not None and isinstance(
- exc_value, BaseExceptionGroup
- )
- if is_exception_group:
- (_, exceptions) = exceptions_from_error(
- exc_type=exc_type,
- exc_value=exc_value,
- tb=tb,
- client_options=client_options,
- mechanism=mechanism,
- exception_id=0,
- parent_id=0,
- full_stack=full_stack,
- )
- else:
- exceptions = []
- for exc_type, exc_value, tb in walk_exception_chain(exc_info):
- exceptions.append(
- single_exception_from_error_tuple(
- exc_type=exc_type,
- exc_value=exc_value,
- tb=tb,
- client_options=client_options,
- mechanism=mechanism,
- full_stack=full_stack,
- )
- )
- exceptions.reverse()
- return exceptions
- def to_string(value: str) -> str:
- try:
- return str(value)
- except UnicodeDecodeError:
- return repr(value)[1:-1]
- def iter_event_stacktraces(event: "Event") -> "Iterator[Annotated[Dict[str, Any]]]":
- if "stacktrace" in event:
- yield event["stacktrace"]
- if "threads" in event:
- for thread in event["threads"].get("values") or ():
- if "stacktrace" in thread:
- yield thread["stacktrace"]
- if "exception" in event:
- for exception in event["exception"].get("values") or ():
- if isinstance(exception, dict) and "stacktrace" in exception:
- yield exception["stacktrace"]
- def iter_event_frames(event: "Event") -> "Iterator[Dict[str, Any]]":
- for stacktrace in iter_event_stacktraces(event):
- if isinstance(stacktrace, AnnotatedValue):
- stacktrace = stacktrace.value or {}
- for frame in stacktrace.get("frames") or ():
- yield frame
- def handle_in_app(
- event: "Event",
- in_app_exclude: "Optional[List[str]]" = None,
- in_app_include: "Optional[List[str]]" = None,
- project_root: "Optional[str]" = None,
- ) -> "Event":
- for stacktrace in iter_event_stacktraces(event):
- if isinstance(stacktrace, AnnotatedValue):
- stacktrace = stacktrace.value or {}
- set_in_app_in_frames(
- stacktrace.get("frames"),
- in_app_exclude=in_app_exclude,
- in_app_include=in_app_include,
- project_root=project_root,
- )
- return event
- def set_in_app_in_frames(
- frames: "Any",
- in_app_exclude: "Optional[List[str]]",
- in_app_include: "Optional[List[str]]",
- project_root: "Optional[str]" = None,
- ) -> "Optional[Any]":
- if not frames:
- return None
- for frame in frames:
- # if frame has already been marked as in_app, skip it
- current_in_app = frame.get("in_app")
- if current_in_app is not None:
- continue
- module = frame.get("module")
- # check if module in frame is in the list of modules to include
- if _module_in_list(module, in_app_include):
- frame["in_app"] = True
- continue
- # check if module in frame is in the list of modules to exclude
- if _module_in_list(module, in_app_exclude):
- frame["in_app"] = False
- continue
- # if frame has no abs_path, skip further checks
- abs_path = frame.get("abs_path")
- if abs_path is None:
- continue
- if _is_external_source(abs_path):
- frame["in_app"] = False
- continue
- if _is_in_project_root(abs_path, project_root):
- frame["in_app"] = True
- continue
- return frames
- def exc_info_from_error(error: "Union[BaseException, ExcInfo]") -> "ExcInfo":
- if isinstance(error, tuple) and len(error) == 3:
- exc_type, exc_value, tb = error
- elif isinstance(error, BaseException):
- tb = getattr(error, "__traceback__", None)
- if tb is not None:
- exc_type = type(error)
- exc_value = error
- else:
- exc_type, exc_value, tb = sys.exc_info()
- if exc_value is not error:
- tb = None
- exc_value = error
- exc_type = type(error)
- else:
- raise ValueError("Expected Exception object to report, got %s!" % type(error))
- exc_info = (exc_type, exc_value, tb)
- if TYPE_CHECKING:
- # This cast is safe because exc_type and exc_value are either both
- # None or both not None.
- exc_info = cast(ExcInfo, exc_info)
- return exc_info
- def merge_stack_frames(
- frames: "List[Dict[str, Any]]",
- full_stack: "List[Dict[str, Any]]",
- client_options: "Optional[Dict[str, Any]]",
- ) -> "List[Dict[str, Any]]":
- """
- Add the missing frames from full_stack to frames and return the merged list.
- """
- frame_ids = {
- (
- frame["abs_path"],
- frame["context_line"],
- frame["lineno"],
- frame["function"],
- )
- for frame in frames
- }
- new_frames = [
- stackframe
- for stackframe in full_stack
- if (
- stackframe["abs_path"],
- stackframe["context_line"],
- stackframe["lineno"],
- stackframe["function"],
- )
- not in frame_ids
- ]
- new_frames.extend(frames)
- # Limit the number of frames
- max_stack_frames = (
- client_options.get("max_stack_frames", DEFAULT_MAX_STACK_FRAMES)
- if client_options
- else None
- )
- if max_stack_frames is not None:
- new_frames = new_frames[len(new_frames) - max_stack_frames :]
- return new_frames
- def event_from_exception(
- exc_info: "Union[BaseException, ExcInfo]",
- client_options: "Optional[Dict[str, Any]]" = None,
- mechanism: "Optional[Dict[str, Any]]" = None,
- ) -> "Tuple[Event, Dict[str, Any]]":
- exc_info = exc_info_from_error(exc_info)
- hint = event_hint_with_exc_info(exc_info)
- if client_options and client_options.get("add_full_stack", DEFAULT_ADD_FULL_STACK):
- full_stack = current_stacktrace(
- include_local_variables=client_options["include_local_variables"],
- max_value_length=client_options["max_value_length"],
- )["frames"]
- else:
- full_stack = None
- return (
- {
- "level": "error",
- "exception": {
- "values": exceptions_from_error_tuple(
- exc_info, client_options, mechanism, full_stack
- )
- },
- },
- hint,
- )
- def _module_in_list(name: "Optional[str]", items: "Optional[List[str]]") -> bool:
- if name is None:
- return False
- if not items:
- return False
- for item in items:
- if item == name or name.startswith(item + "."):
- return True
- return False
- def _is_external_source(abs_path: "Optional[str]") -> bool:
- # check if frame is in 'site-packages' or 'dist-packages'
- if abs_path is None:
- return False
- external_source = (
- re.search(r"[\\/](?:dist|site)-packages[\\/]", abs_path) is not None
- )
- return external_source
- def _is_in_project_root(
- abs_path: "Optional[str]", project_root: "Optional[str]"
- ) -> bool:
- if abs_path is None or project_root is None:
- return False
- # check if path is in the project root
- if abs_path.startswith(project_root):
- return True
- return False
- def _truncate_by_bytes(string: str, max_bytes: int) -> str:
- """
- Truncate a UTF-8-encodable string to the last full codepoint so that it fits in max_bytes.
- """
- truncated = string.encode("utf-8")[: max_bytes - 3].decode("utf-8", errors="ignore")
- return truncated + "..."
- def _get_size_in_bytes(value: str) -> "Optional[int]":
- try:
- return len(value.encode("utf-8"))
- except (UnicodeEncodeError, UnicodeDecodeError):
- return None
- def strip_string(
- value: str, max_length: "Optional[int]" = None
- ) -> "Union[AnnotatedValue, str]":
- if not value:
- return value
- if max_length is None:
- max_length = DEFAULT_MAX_VALUE_LENGTH
- byte_size = _get_size_in_bytes(value)
- text_size = len(value)
- if byte_size is not None and byte_size > max_length:
- # truncate to max_length bytes, preserving code points
- truncated_value = _truncate_by_bytes(value, max_length)
- elif text_size is not None and text_size > max_length:
- # fallback to truncating by string length
- truncated_value = value[: max_length - 3] + "..."
- else:
- return value
- return AnnotatedValue(
- value=truncated_value,
- metadata={
- "len": byte_size or text_size,
- "rem": [["!limit", "x", max_length - 3, max_length]],
- },
- )
- def parse_version(version: str) -> "Optional[Tuple[int, ...]]":
- """
- Parses a version string into a tuple of integers.
- This uses the parsing loging from PEP 440:
- https://peps.python.org/pep-0440/#appendix-b-parsing-version-strings-with-regular-expressions
- """
- VERSION_PATTERN = r""" # noqa: N806
- v?
- (?:
- (?:(?P<epoch>[0-9]+)!)? # epoch
- (?P<release>[0-9]+(?:\.[0-9]+)*) # release segment
- (?P<pre> # pre-release
- [-_\.]?
- (?P<pre_l>(a|b|c|rc|alpha|beta|pre|preview))
- [-_\.]?
- (?P<pre_n>[0-9]+)?
- )?
- (?P<post> # post release
- (?:-(?P<post_n1>[0-9]+))
- |
- (?:
- [-_\.]?
- (?P<post_l>post|rev|r)
- [-_\.]?
- (?P<post_n2>[0-9]+)?
- )
- )?
- (?P<dev> # dev release
- [-_\.]?
- (?P<dev_l>dev)
- [-_\.]?
- (?P<dev_n>[0-9]+)?
- )?
- )
- (?:\+(?P<local>[a-z0-9]+(?:[-_\.][a-z0-9]+)*))? # local version
- """
- pattern = re.compile(
- r"^\s*" + VERSION_PATTERN + r"\s*$",
- re.VERBOSE | re.IGNORECASE,
- )
- try:
- release = pattern.match(version).groupdict()["release"] # type: ignore
- release_tuple: "Tuple[int, ...]" = tuple(map(int, release.split(".")[:3]))
- except (TypeError, ValueError, AttributeError):
- return None
- return release_tuple
- def _is_contextvars_broken() -> bool:
- """
- Returns whether gevent/eventlet have patched the stdlib in a way where thread locals are now more "correct" than contextvars.
- """
- try:
- import gevent
- from gevent.monkey import is_object_patched
- # Get the MAJOR and MINOR version numbers of Gevent
- version_tuple = tuple(
- [int(part) for part in re.split(r"a|b|rc|\.", gevent.__version__)[:2]]
- )
- if is_object_patched("threading", "local"):
- # Gevent 20.9.0 depends on Greenlet 0.4.17 which natively handles switching
- # context vars when greenlets are switched, so, Gevent 20.9.0+ is all fine.
- # Ref: https://github.com/gevent/gevent/blob/83c9e2ae5b0834b8f84233760aabe82c3ba065b4/src/gevent/monkey.py#L604-L609
- # Gevent 20.5, that doesn't depend on Greenlet 0.4.17 with native support
- # for contextvars, is able to patch both thread locals and contextvars, in
- # that case, check if contextvars are effectively patched.
- if (
- # Gevent 20.9.0+
- (sys.version_info >= (3, 7) and version_tuple >= (20, 9))
- # Gevent 20.5.0+ or Python < 3.7
- or (is_object_patched("contextvars", "ContextVar"))
- ):
- return False
- return True
- except ImportError:
- pass
- try:
- import greenlet
- from eventlet.patcher import is_monkey_patched # type: ignore
- greenlet_version = parse_version(greenlet.__version__)
- if greenlet_version is None:
- logger.error(
- "Internal error in Sentry SDK: Could not parse Greenlet version from greenlet.__version__."
- )
- return False
- if is_monkey_patched("thread") and greenlet_version < (0, 5):
- return True
- except ImportError:
- pass
- return False
- def _make_threadlocal_contextvars(local: type) -> type:
- class ContextVar:
- # Super-limited impl of ContextVar
- def __init__(self, name: str, default: "Any" = None) -> None:
- self._name = name
- self._default = default
- self._local = local()
- self._original_local = local()
- def get(self, default: "Any" = None) -> "Any":
- return getattr(self._local, "value", default or self._default)
- def set(self, value: "Any") -> "Any":
- token = str(random.getrandbits(64))
- original_value = self.get()
- setattr(self._original_local, token, original_value)
- self._local.value = value
- return token
- def reset(self, token: "Any") -> None:
- self._local.value = getattr(self._original_local, token)
- # delete the original value (this way it works in Python 3.6+)
- del self._original_local.__dict__[token]
- return ContextVar
- def _get_contextvars() -> "Tuple[bool, type]":
- """
- Figure out the "right" contextvars installation to use. Returns a
- `contextvars.ContextVar`-like class with a limited API.
- See https://docs.sentry.io/platforms/python/contextvars/ for more information.
- """
- if not _is_contextvars_broken():
- # aiocontextvars is a PyPI package that ensures that the contextvars
- # backport (also a PyPI package) works with asyncio under Python 3.6
- #
- # Import it if available.
- if sys.version_info < (3, 7):
- # `aiocontextvars` is absolutely required for functional
- # contextvars on Python 3.6.
- try:
- from aiocontextvars import ContextVar
- return True, ContextVar
- except ImportError:
- pass
- else:
- # On Python 3.7 contextvars are functional.
- try:
- from contextvars import ContextVar
- return True, ContextVar
- except ImportError:
- pass
- # Fall back to basic thread-local usage.
- from threading import local
- return False, _make_threadlocal_contextvars(local)
- HAS_REAL_CONTEXTVARS, ContextVar = _get_contextvars()
- CONTEXTVARS_ERROR_MESSAGE = """
- With asyncio/ASGI applications, the Sentry SDK requires a functional
- installation of `contextvars` to avoid leaking scope/context data across
- requests.
- Please refer to https://docs.sentry.io/platforms/python/contextvars/ for more information.
- """
- def qualname_from_function(func: "Callable[..., Any]") -> "Optional[str]":
- """Return the qualified name of func. Works with regular function, lambda, partial and partialmethod."""
- func_qualname: "Optional[str]" = None
- # Python 2
- try:
- return "%s.%s.%s" % (
- func.im_class.__module__, # type: ignore
- func.im_class.__name__, # type: ignore
- func.__name__,
- )
- except Exception:
- pass
- prefix, suffix = "", ""
- if isinstance(func, partial) and hasattr(func.func, "__name__"):
- prefix, suffix = "partial(<function ", ">)"
- func = func.func
- else:
- # The _partialmethod attribute of methods wrapped with partialmethod() was renamed to __partialmethod__ in CPython 3.13:
- # https://github.com/python/cpython/pull/16600
- partial_method = getattr(func, "_partialmethod", None) or getattr(
- func, "__partialmethod__", None
- )
- if isinstance(partial_method, partialmethod):
- prefix, suffix = "partialmethod(<function ", ">)"
- func = partial_method.func
- if hasattr(func, "__qualname__"):
- func_qualname = func.__qualname__
- elif hasattr(func, "__name__"): # Python 2.7 has no __qualname__
- func_qualname = func.__name__
- # Python 3: methods, functions, classes
- if func_qualname is not None:
- if hasattr(func, "__module__") and isinstance(func.__module__, str):
- func_qualname = func.__module__ + "." + func_qualname
- func_qualname = prefix + func_qualname + suffix
- return func_qualname
- def transaction_from_function(func: "Callable[..., Any]") -> "Optional[str]":
- return qualname_from_function(func)
- disable_capture_event = ContextVar("disable_capture_event")
- class ServerlessTimeoutWarning(Exception): # noqa: N818
- """Raised when a serverless method is about to reach its timeout."""
- pass
- class TimeoutThread(threading.Thread):
- """Creates a Thread which runs (sleeps) for a time duration equal to
- waiting_time and raises a custom ServerlessTimeout exception.
- """
- def __init__(
- self,
- waiting_time: float,
- configured_timeout: int,
- isolation_scope: "Optional[sentry_sdk.Scope]" = None,
- current_scope: "Optional[sentry_sdk.Scope]" = None,
- ) -> None:
- threading.Thread.__init__(self)
- self.waiting_time = waiting_time
- self.configured_timeout = configured_timeout
- self.isolation_scope = isolation_scope
- self.current_scope = current_scope
- self._stop_event = threading.Event()
- def stop(self) -> None:
- self._stop_event.set()
- def _capture_exception(self) -> "ExcInfo":
- exc_info = sys.exc_info()
- client = sentry_sdk.get_client()
- event, hint = event_from_exception(
- exc_info,
- client_options=client.options,
- mechanism={"type": "threading", "handled": False},
- )
- sentry_sdk.capture_event(event, hint=hint)
- return exc_info
- def run(self) -> None:
- self._stop_event.wait(self.waiting_time)
- if self._stop_event.is_set():
- return
- integer_configured_timeout = int(self.configured_timeout)
- # Setting up the exact integer value of configured time(in seconds)
- if integer_configured_timeout < self.configured_timeout:
- integer_configured_timeout = integer_configured_timeout + 1
- # Raising Exception after timeout duration is reached
- if self.isolation_scope is not None and self.current_scope is not None:
- with sentry_sdk.scope.use_isolation_scope(self.isolation_scope):
- with sentry_sdk.scope.use_scope(self.current_scope):
- try:
- raise ServerlessTimeoutWarning(
- "WARNING : Function is expected to get timed out. Configured timeout duration = {} seconds.".format(
- integer_configured_timeout
- )
- )
- except Exception:
- reraise(*self._capture_exception())
- raise ServerlessTimeoutWarning(
- "WARNING : Function is expected to get timed out. Configured timeout duration = {} seconds.".format(
- integer_configured_timeout
- )
- )
- def to_base64(original: str) -> "Optional[str]":
- """
- Convert a string to base64, via UTF-8. Returns None on invalid input.
- """
- base64_string = None
- try:
- utf8_bytes = original.encode("UTF-8")
- base64_bytes = base64.b64encode(utf8_bytes)
- base64_string = base64_bytes.decode("UTF-8")
- except Exception as err:
- logger.warning("Unable to encode {orig} to base64:".format(orig=original), err)
- return base64_string
- def from_base64(base64_string: str) -> "Optional[str]":
- """
- Convert a string from base64, via UTF-8. Returns None on invalid input.
- """
- utf8_string = None
- try:
- only_valid_chars = BASE64_ALPHABET.match(base64_string)
- assert only_valid_chars
- base64_bytes = base64_string.encode("UTF-8")
- utf8_bytes = base64.b64decode(base64_bytes)
- utf8_string = utf8_bytes.decode("UTF-8")
- except Exception as err:
- logger.warning(
- "Unable to decode {b64} from base64:".format(b64=base64_string), err
- )
- return utf8_string
- Components = namedtuple("Components", ["scheme", "netloc", "path", "query", "fragment"])
- def sanitize_url(
- url: str,
- remove_authority: bool = True,
- remove_query_values: bool = True,
- split: bool = False,
- ) -> "Union[str, Components]":
- """
- Removes the authority and query parameter values from a given URL.
- """
- parsed_url = urlsplit(url)
- query_params = parse_qs(parsed_url.query, keep_blank_values=True)
- # strip username:password (netloc can be usr:pwd@example.com)
- if remove_authority:
- netloc_parts = parsed_url.netloc.split("@")
- if len(netloc_parts) > 1:
- netloc = "%s:%s@%s" % (
- SENSITIVE_DATA_SUBSTITUTE,
- SENSITIVE_DATA_SUBSTITUTE,
- netloc_parts[-1],
- )
- else:
- netloc = parsed_url.netloc
- else:
- netloc = parsed_url.netloc
- # strip values from query string
- if remove_query_values:
- query_string = unquote(
- urlencode({key: SENSITIVE_DATA_SUBSTITUTE for key in query_params})
- )
- else:
- query_string = parsed_url.query
- components = Components(
- scheme=parsed_url.scheme,
- netloc=netloc,
- query=query_string,
- path=parsed_url.path,
- fragment=parsed_url.fragment,
- )
- if split:
- return components
- else:
- return urlunsplit(components)
- ParsedUrl = namedtuple("ParsedUrl", ["url", "query", "fragment"])
- def parse_url(url: str, sanitize: bool = True) -> "ParsedUrl":
- """
- Splits a URL into a url (including path), query and fragment. If sanitize is True, the query
- parameters will be sanitized to remove sensitive data. The autority (username and password)
- in the URL will always be removed.
- """
- parsed_url = sanitize_url(
- url, remove_authority=True, remove_query_values=sanitize, split=True
- )
- base_url = urlunsplit(
- Components(
- scheme=parsed_url.scheme, # type: ignore
- netloc=parsed_url.netloc, # type: ignore
- query="",
- path=parsed_url.path, # type: ignore
- fragment="",
- )
- )
- return ParsedUrl(
- url=base_url,
- query=parsed_url.query, # type: ignore
- fragment=parsed_url.fragment, # type: ignore
- )
- def is_valid_sample_rate(rate: "Any", source: str) -> bool:
- """
- Checks the given sample rate to make sure it is valid type and value (a
- boolean or a number between 0 and 1, inclusive).
- """
- # both booleans and NaN are instances of Real, so a) checking for Real
- # checks for the possibility of a boolean also, and b) we have to check
- # separately for NaN and Decimal does not derive from Real so need to check that too
- if not isinstance(rate, (Real, Decimal)) or math.isnan(rate):
- logger.warning(
- "{source} Given sample rate is invalid. Sample rate must be a boolean or a number between 0 and 1. Got {rate} of type {type}.".format(
- source=source, rate=rate, type=type(rate)
- )
- )
- return False
- # in case rate is a boolean, it will get cast to 1 if it's True and 0 if it's False
- rate = float(rate)
- if rate < 0 or rate > 1:
- logger.warning(
- "{source} Given sample rate is invalid. Sample rate must be between 0 and 1. Got {rate}.".format(
- source=source, rate=rate
- )
- )
- return False
- return True
- def match_regex_list(
- item: str,
- regex_list: "Optional[List[str]]" = None,
- substring_matching: bool = False,
- ) -> bool:
- if regex_list is None:
- return False
- for item_matcher in regex_list:
- if not substring_matching and item_matcher[-1] != "$":
- item_matcher += "$"
- matched = re.search(item_matcher, item)
- if matched:
- return True
- return False
- def is_sentry_url(client: "sentry_sdk.client.BaseClient", url: str) -> bool:
- """
- Determines whether the given URL matches the Sentry DSN.
- """
- return (
- client is not None
- and client.transport is not None
- and client.transport.parsed_dsn is not None
- and client.transport.parsed_dsn.netloc in url
- )
- def _generate_installed_modules() -> "Iterator[Tuple[str, str]]":
- try:
- from importlib import metadata
- yielded = set()
- for dist in metadata.distributions():
- name = dist.metadata.get("Name", None) # type: ignore[attr-defined]
- # `metadata` values may be `None`, see:
- # https://github.com/python/cpython/issues/91216
- # and
- # https://github.com/python/importlib_metadata/issues/371
- if name is not None:
- normalized_name = _normalize_module_name(name)
- if dist.version is not None and normalized_name not in yielded:
- yield normalized_name, dist.version
- yielded.add(normalized_name)
- except ImportError:
- # < py3.8
- try:
- import pkg_resources
- except ImportError:
- return
- for info in pkg_resources.working_set:
- yield _normalize_module_name(info.key), info.version
- def _normalize_module_name(name: str) -> str:
- return name.lower()
- def _replace_hyphens_dots_and_underscores_with_dashes(name: str) -> str:
- # https://peps.python.org/pep-0503/#normalized-names
- return re.sub(r"[-_.]+", "-", name)
- def _get_installed_modules() -> "Dict[str, str]":
- global _installed_modules
- if _installed_modules is None:
- _installed_modules = dict(_generate_installed_modules())
- return _installed_modules
- def package_version(package: str) -> "Optional[Tuple[int, ...]]":
- normalized_package = _normalize_module_name(
- _replace_hyphens_dots_and_underscores_with_dashes(package)
- )
- installed_packages = {
- _replace_hyphens_dots_and_underscores_with_dashes(module): v
- for module, v in _get_installed_modules().items()
- }
- version = installed_packages.get(normalized_package)
- if version is None:
- return None
- return parse_version(version)
- def reraise(
- tp: "Optional[Type[BaseException]]",
- value: "Optional[BaseException]",
- tb: "Optional[Any]" = None,
- ) -> "NoReturn":
- assert value is not None
- if value.__traceback__ is not tb:
- raise value.with_traceback(tb)
- raise value
- def _no_op(*_a: "Any", **_k: "Any") -> None:
- """No-op function for ensure_integration_enabled."""
- pass
- if TYPE_CHECKING:
- @overload
- def ensure_integration_enabled(
- integration: "type[sentry_sdk.integrations.Integration]",
- original_function: "Callable[P, R]",
- ) -> "Callable[[Callable[P, R]], Callable[P, R]]": ...
- @overload
- def ensure_integration_enabled(
- integration: "type[sentry_sdk.integrations.Integration]",
- ) -> "Callable[[Callable[P, None]], Callable[P, None]]": ...
- def ensure_integration_enabled(
- integration: "type[sentry_sdk.integrations.Integration]",
- original_function: "Union[Callable[P, R], Callable[P, None]]" = _no_op,
- ) -> "Callable[[Callable[P, R]], Callable[P, R]]":
- """
- Ensures a given integration is enabled prior to calling a Sentry-patched function.
- The function takes as its parameters the integration that must be enabled and the original
- function that the SDK is patching. The function returns a function that takes the
- decorated (Sentry-patched) function as its parameter, and returns a function that, when
- called, checks whether the given integration is enabled. If the integration is enabled, the
- function calls the decorated, Sentry-patched function. If the integration is not enabled,
- the original function is called.
- The function also takes care of preserving the original function's signature and docstring.
- Example usage:
- ```python
- @ensure_integration_enabled(MyIntegration, my_function)
- def patch_my_function():
- with sentry_sdk.start_transaction(...):
- return my_function()
- ```
- """
- if TYPE_CHECKING:
- # Type hint to ensure the default function has the right typing. The overloads
- # ensure the default _no_op function is only used when R is None.
- original_function = cast(Callable[P, R], original_function)
- def patcher(sentry_patched_function: "Callable[P, R]") -> "Callable[P, R]":
- def runner(*args: "P.args", **kwargs: "P.kwargs") -> "R":
- if sentry_sdk.get_client().get_integration(integration) is None:
- return original_function(*args, **kwargs)
- return sentry_patched_function(*args, **kwargs)
- if original_function is _no_op:
- return wraps(sentry_patched_function)(runner)
- return wraps(original_function)(runner)
- return patcher
- if PY37:
- def nanosecond_time() -> int:
- return time.perf_counter_ns()
- else:
- def nanosecond_time() -> int:
- return int(time.perf_counter() * 1e9)
- def now() -> float:
- return time.perf_counter()
- try:
- from gevent import get_hub as get_gevent_hub
- from gevent.monkey import is_module_patched
- except ImportError:
- # it's not great that the signatures are different, get_hub can't return None
- # consider adding an if TYPE_CHECKING to change the signature to Optional[Hub]
- def get_gevent_hub() -> "Optional[Hub]": # type: ignore[misc]
- return None
- def is_module_patched(mod_name: str) -> bool:
- # unable to import from gevent means no modules have been patched
- return False
- def is_gevent() -> bool:
- return is_module_patched("threading") or is_module_patched("_thread")
- def get_current_thread_meta(
- thread: "Optional[threading.Thread]" = None,
- ) -> "Tuple[Optional[int], Optional[str]]":
- """
- Try to get the id of the current thread, with various fall backs.
- """
- # if a thread is specified, that takes priority
- if thread is not None:
- try:
- thread_id = thread.ident
- thread_name = thread.name
- if thread_id is not None:
- return thread_id, thread_name
- except AttributeError:
- pass
- # if the app is using gevent, we should look at the gevent hub first
- # as the id there differs from what the threading module reports
- if is_gevent():
- gevent_hub = get_gevent_hub()
- if gevent_hub is not None:
- try:
- # this is undocumented, so wrap it in try except to be safe
- return gevent_hub.thread_ident, None
- except AttributeError:
- pass
- # use the current thread's id if possible
- try:
- thread = threading.current_thread()
- thread_id = thread.ident
- thread_name = thread.name
- if thread_id is not None:
- return thread_id, thread_name
- except AttributeError:
- pass
- # if we can't get the current thread id, fall back to the main thread id
- try:
- thread = threading.main_thread()
- thread_id = thread.ident
- thread_name = thread.name
- if thread_id is not None:
- return thread_id, thread_name
- except AttributeError:
- pass
- # we've tried everything, time to give up
- return None, None
- def should_be_treated_as_error(ty: "Any", value: "Any") -> bool:
- if ty == SystemExit and hasattr(value, "code") and value.code in (0, None):
- # https://docs.python.org/3/library/exceptions.html#SystemExit
- return False
- return True
- if TYPE_CHECKING:
- T = TypeVar("T")
- def try_convert(convert_func: "Callable[[Any], T]", value: "Any") -> "Optional[T]":
- """
- Attempt to convert from an unknown type to a specific type, using the
- given function. Return None if the conversion fails, i.e. if the function
- raises an exception.
- """
- try:
- if isinstance(value, convert_func): # type: ignore
- return value
- except TypeError:
- pass
- try:
- return convert_func(value)
- except Exception:
- return None
- def safe_serialize(data: "Any") -> str:
- """Safely serialize to a readable string."""
- def serialize_item(
- item: "Any",
- ) -> "Union[str, dict[Any, Any], list[Any], tuple[Any, ...]]":
- if callable(item):
- try:
- module = getattr(item, "__module__", None)
- qualname = getattr(item, "__qualname__", None)
- name = getattr(item, "__name__", "anonymous")
- if module and qualname:
- full_path = f"{module}.{qualname}"
- elif module and name:
- full_path = f"{module}.{name}"
- else:
- full_path = name
- return f"<function {full_path}>"
- except Exception:
- return f"<callable {type(item).__name__}>"
- elif isinstance(item, dict):
- return {k: serialize_item(v) for k, v in item.items()}
- elif isinstance(item, (list, tuple)):
- return [serialize_item(x) for x in item]
- elif hasattr(item, "__dict__"):
- try:
- attrs = {
- k: serialize_item(v)
- for k, v in vars(item).items()
- if not k.startswith("_")
- }
- return f"<{type(item).__name__} {attrs}>"
- except Exception:
- return repr(item)
- else:
- return item
- try:
- serialized = serialize_item(data)
- return (
- json.dumps(serialized, default=str)
- if not isinstance(serialized, str)
- else serialized
- )
- except Exception:
- return str(data)
- def has_logs_enabled(options: "Optional[dict[str, Any]]") -> bool:
- if options is None:
- return False
- return bool(
- options.get("enable_logs", False)
- or options["_experiments"].get("enable_logs", False)
- )
- def get_before_send_log(
- options: "Optional[dict[str, Any]]",
- ) -> "Optional[Callable[[Log, Hint], Optional[Log]]]":
- if options is None:
- return None
- return options.get("before_send_log") or options["_experiments"].get(
- "before_send_log"
- )
- def has_metrics_enabled(options: "Optional[dict[str, Any]]") -> bool:
- if options is None:
- return False
- return bool(options.get("enable_metrics", True))
- def get_before_send_metric(
- options: "Optional[dict[str, Any]]",
- ) -> "Optional[Callable[[Metric, Hint], Optional[Metric]]]":
- if options is None:
- return None
- return options.get("before_send_metric") or options["_experiments"].get(
- "before_send_metric"
- )
- def format_attribute(val: "Any") -> "AttributeValue":
- """
- Turn unsupported attribute value types into an AttributeValue.
- We do this as soon as a user-provided attribute is set, to prevent spans,
- logs, metrics and similar from having live references to various objects.
- Note: This is not the final attribute value format. Before they're sent,
- they're serialized further into the actual format the protocol expects:
- https://develop.sentry.dev/sdk/telemetry/attributes/
- """
- if isinstance(val, (bool, int, float, str)):
- return val
- if isinstance(val, (list, tuple)) and not val:
- return []
- elif isinstance(val, list):
- ty = type(val[0])
- if ty in (str, int, float, bool) and all(type(v) is ty for v in val):
- return copy.deepcopy(val)
- elif isinstance(val, tuple):
- ty = type(val[0])
- if ty in (str, int, float, bool) and all(type(v) is ty for v in val):
- return list(val)
- return safe_repr(val)
- def serialize_attribute(val: "AttributeValue") -> "SerializedAttributeValue":
- """Serialize attribute value to the transport format."""
- if isinstance(val, bool):
- return {"value": val, "type": "boolean"}
- if isinstance(val, int):
- return {"value": val, "type": "integer"}
- if isinstance(val, float):
- return {"value": val, "type": "double"}
- if isinstance(val, str):
- return {"value": val, "type": "string"}
- if isinstance(val, list):
- if not val:
- return {"value": [], "type": "array"}
- # Only lists of elements of a single type are supported
- ty = type(val[0])
- if ty in (int, str, bool, float) and all(type(v) is ty for v in val):
- return {"value": val, "type": "array"}
- # Coerce to string if we don't know what to do with the value. This should
- # never happen as we pre-format early in format_attribute, but let's be safe.
- return {"value": safe_repr(val), "type": "string"}
|