| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168 |
- from abc import ABC, abstractmethod
- import asyncio
- import io
- import os
- import gzip
- import socket
- import ssl
- import time
- import warnings
- from datetime import datetime, timedelta, timezone
- from collections import defaultdict
- from urllib.request import getproxies
- try:
- import brotli # type: ignore
- except ImportError:
- brotli = None
- try:
- import httpcore
- except ImportError:
- httpcore = None # type: ignore[assignment,unused-ignore]
- try:
- import h2 # noqa: F401
- HTTP2_ENABLED = httpcore is not None
- except ImportError:
- HTTP2_ENABLED = False
- try:
- import anyio # noqa: F401
- ASYNC_TRANSPORT_AVAILABLE = httpcore is not None
- except ImportError:
- ASYNC_TRANSPORT_AVAILABLE = False
- import urllib3
- import certifi
- import sentry_sdk
- from sentry_sdk.consts import EndpointType
- from sentry_sdk.utils import (
- Dsn,
- logger,
- capture_internal_exceptions,
- mark_sentry_task_internal,
- )
- from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker
- from sentry_sdk.envelope import Envelope, Item, PayloadRef
- from typing import TYPE_CHECKING, cast, List, Dict
- if TYPE_CHECKING:
- from typing import Any
- from typing import Callable
- from typing import DefaultDict
- from typing import Iterable
- from typing import Mapping
- from typing import Optional
- from typing import Self
- from typing import Tuple
- from typing import Type
- from typing import Union
- from urllib3.poolmanager import PoolManager
- from urllib3.poolmanager import ProxyManager
- from sentry_sdk._types import Event, EventDataCategory
- KEEP_ALIVE_SOCKET_OPTIONS = []
- for option in [
- (socket.SOL_SOCKET, lambda: getattr(socket, "SO_KEEPALIVE"), 1), # noqa: B009
- (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPIDLE"), 45), # noqa: B009
- (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPINTVL"), 10), # noqa: B009
- (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPCNT"), 6), # noqa: B009
- ]:
- try:
- KEEP_ALIVE_SOCKET_OPTIONS.append((option[0], option[1](), option[2]))
- except AttributeError:
- # a specific option might not be available on specific systems,
- # e.g. TCP_KEEPIDLE doesn't exist on macOS
- pass
- def _get_httpcore_header_value(response: "Any", header: str) -> "Optional[str]":
- """Case-insensitive header lookup for httpcore-style responses."""
- header_lower = header.lower()
- return next(
- (
- val.decode("ascii")
- for key, val in response.headers
- if key.decode("ascii").lower() == header_lower
- ),
- None,
- )
- class Transport(ABC):
- """Baseclass for all transports.
- A transport is used to send an event to sentry.
- """
- parsed_dsn: "Optional[Dsn]" = None
- def __init__(self: "Self", options: "Optional[Dict[str, Any]]" = None) -> None:
- self.options = options
- if options and options["dsn"] is not None and options["dsn"]:
- self.parsed_dsn = Dsn(options["dsn"], options.get("org_id"))
- else:
- self.parsed_dsn = None
- def capture_event(self: "Self", event: "Event") -> None:
- """
- DEPRECATED: Please use capture_envelope instead.
- This gets invoked with the event dictionary when an event should
- be sent to sentry.
- """
- warnings.warn(
- "capture_event is deprecated, please use capture_envelope instead!",
- DeprecationWarning,
- stacklevel=2,
- )
- envelope = Envelope()
- envelope.add_event(event)
- self.capture_envelope(envelope)
- @abstractmethod
- def capture_envelope(self: "Self", envelope: "Envelope") -> None:
- """
- Send an envelope to Sentry.
- Envelopes are a data container format that can hold any type of data
- submitted to Sentry. We use it to send all event data (including errors,
- transactions, crons check-ins, etc.) to Sentry.
- """
- pass
- def flush(
- self: "Self",
- timeout: float,
- callback: "Optional[Any]" = None,
- ) -> None:
- """
- Wait `timeout` seconds for the current events to be sent out.
- The default implementation is a no-op, since this method may only be relevant to some transports.
- Subclasses should override this method if necessary.
- """
- return None
- def kill(self: "Self") -> None:
- """
- Forcefully kills the transport.
- The default implementation is a no-op, since this method may only be relevant to some transports.
- Subclasses should override this method if necessary.
- """
- return None
- def record_lost_event(
- self,
- reason: str,
- data_category: "Optional[EventDataCategory]" = None,
- item: "Optional[Item]" = None,
- *,
- quantity: int = 1,
- ) -> None:
- """This increments a counter for event loss by reason and
- data category by the given positive-int quantity (default 1).
- If an item is provided, the data category and quantity are
- extracted from the item, and the values passed for
- data_category and quantity are ignored.
- When recording a lost transaction via data_category="transaction",
- the calling code should also record the lost spans via this method.
- When recording lost spans, `quantity` should be set to the number
- of contained spans, plus one for the transaction itself. When
- passing an Item containing a transaction via the `item` parameter,
- this method automatically records the lost spans.
- """
- return None
- def is_healthy(self: "Self") -> bool:
- return True
- def _parse_rate_limits(
- header: str, now: "Optional[datetime]" = None
- ) -> "Iterable[Tuple[Optional[EventDataCategory], datetime]]":
- if now is None:
- now = datetime.now(timezone.utc)
- for limit in header.split(","):
- try:
- parameters = limit.strip().split(":")
- retry_after_val, categories = parameters[:2]
- retry_after = now + timedelta(seconds=int(retry_after_val))
- for category in categories and categories.split(";") or (None,):
- yield category, retry_after # type: ignore
- except (LookupError, ValueError):
- continue
- class HttpTransportCore(Transport):
- """Shared base class for sync and async transports."""
- TIMEOUT = 30 # seconds
- def __init__(self: "Self", options: "Dict[str, Any]") -> None:
- from sentry_sdk.consts import VERSION
- Transport.__init__(self, options)
- assert self.parsed_dsn is not None
- self.options: "Dict[str, Any]" = options
- self._worker = self._create_worker(options)
- self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
- self._disabled_until: "Dict[Optional[EventDataCategory], datetime]" = {}
- # We only use this Retry() class for the `get_retry_after` method it exposes
- self._retry = urllib3.util.Retry()
- self._discarded_events: "DefaultDict[Tuple[EventDataCategory, str], int]" = (
- defaultdict(int)
- )
- self._last_client_report_sent = time.time()
- self._pool = self._make_pool()
- # Backwards compatibility for deprecated `self.hub_class` attribute
- self._hub_cls = sentry_sdk.Hub
- experiments = options.get("_experiments", {})
- compression_level = experiments.get(
- "transport_compression_level",
- experiments.get("transport_zlib_compression_level"),
- )
- compression_algo = experiments.get(
- "transport_compression_algo",
- (
- "gzip"
- # if only compression level is set, assume gzip for backwards compatibility
- # if we don't have brotli available, fallback to gzip
- if compression_level is not None or brotli is None
- else "br"
- ),
- )
- if compression_algo == "br" and brotli is None:
- logger.warning(
- "You asked for brotli compression without the Brotli module, falling back to gzip -9"
- )
- compression_algo = "gzip"
- compression_level = None
- if compression_algo not in ("br", "gzip"):
- logger.warning(
- "Unknown compression algo %s, disabling compression", compression_algo
- )
- self._compression_level = 0
- self._compression_algo = None
- else:
- self._compression_algo = compression_algo
- if compression_level is not None:
- self._compression_level = compression_level
- elif self._compression_algo == "gzip":
- self._compression_level = 9
- elif self._compression_algo == "br":
- self._compression_level = 4
- def _create_worker(self: "Self", options: "Dict[str, Any]") -> "Worker":
- raise NotImplementedError()
- def record_lost_event(
- self,
- reason: str,
- data_category: "Optional[EventDataCategory]" = None,
- item: "Optional[Item]" = None,
- *,
- quantity: int = 1,
- ) -> None:
- if not self.options["send_client_reports"]:
- return
- if item is not None:
- data_category = item.data_category
- quantity = 1 # If an item is provided, we always count it as 1 (except for attachments, handled below).
- if data_category == "transaction":
- # Also record the lost spans
- event = item.get_transaction_event() or {}
- # +1 for the transaction itself
- span_count = (
- len(cast(List[Dict[str, object]], event.get("spans") or [])) + 1
- )
- self.record_lost_event(reason, "span", quantity=span_count)
- elif data_category == "log_item" and item:
- # Also record size of lost logs in bytes
- bytes_size = len(item.get_bytes())
- self.record_lost_event(reason, "log_byte", quantity=bytes_size)
- elif data_category == "attachment":
- # quantity of 0 is actually 1 as we do not want to count
- # empty attachments as actually empty.
- quantity = len(item.get_bytes()) or 1
- elif data_category is None:
- raise TypeError("data category not provided")
- self._discarded_events[data_category, reason] += quantity
- def _get_header_value(
- self: "Self", response: "Any", header: str
- ) -> "Optional[str]":
- return response.headers.get(header)
- def _update_rate_limits(
- self: "Self", response: "Union[urllib3.BaseHTTPResponse, httpcore.Response]"
- ) -> None:
- # new sentries with more rate limit insights. We honor this header
- # no matter of the status code to update our internal rate limits.
- header = self._get_header_value(response, "x-sentry-rate-limits")
- if header:
- logger.warning("Rate-limited via x-sentry-rate-limits")
- self._disabled_until.update(_parse_rate_limits(header))
- # old sentries only communicate global rate limit hits via the
- # retry-after header on 429. This header can also be emitted on new
- # sentries if a proxy in front wants to globally slow things down.
- elif response.status == 429:
- logger.warning("Rate-limited via 429")
- retry_after_value = self._get_header_value(response, "Retry-After")
- retry_after = (
- self._retry.parse_retry_after(retry_after_value)
- if retry_after_value is not None
- else None
- ) or 60
- self._disabled_until[None] = datetime.now(timezone.utc) + timedelta(
- seconds=retry_after
- )
- def _handle_request_error(
- self: "Self",
- envelope: "Optional[Envelope]",
- loss_reason: str = "network",
- record_reason: str = "network_error",
- ) -> None:
- def record_loss(reason: str) -> None:
- if envelope is None:
- self.record_lost_event(reason, data_category="error")
- else:
- for item in envelope.items:
- self.record_lost_event(reason, item=item)
- self.on_dropped_event(loss_reason)
- record_loss(record_reason)
- def _handle_response(
- self: "Self",
- response: "Union[urllib3.BaseHTTPResponse, httpcore.Response]",
- envelope: "Optional[Envelope]",
- ) -> None:
- self._update_rate_limits(response)
- if response.status == 413:
- size_exceeded_message = (
- "HTTP 413: Event dropped due to exceeded envelope size limit"
- )
- response_message = getattr(
- response, "data", getattr(response, "content", None)
- )
- if response_message is not None:
- size_exceeded_message += f" (body: {response_message})"
- logger.error(size_exceeded_message)
- self._handle_request_error(
- envelope=envelope, loss_reason="status_413", record_reason="send_error"
- )
- elif response.status == 429:
- # if we hit a 429. Something was rate limited but we already
- # acted on this in `self._update_rate_limits`. Note that we
- # do not want to record event loss here as we will have recorded
- # an outcome in relay already.
- self.on_dropped_event("status_429")
- pass
- elif response.status >= 300 or response.status < 200:
- logger.error(
- "Unexpected status code: %s (body: %s)",
- response.status,
- getattr(response, "data", getattr(response, "content", None)),
- )
- self._handle_request_error(
- envelope=envelope, loss_reason="status_{}".format(response.status)
- )
- def _update_headers(
- self: "Self",
- headers: "Dict[str, str]",
- ) -> None:
- headers.update(
- {
- "User-Agent": str(self._auth.client),
- "X-Sentry-Auth": str(self._auth.to_header()),
- }
- )
- def on_dropped_event(self: "Self", _reason: str) -> None:
- return None
- def _fetch_pending_client_report(
- self: "Self", force: bool = False, interval: int = 60
- ) -> "Optional[Item]":
- if not self.options["send_client_reports"]:
- return None
- if not (force or self._last_client_report_sent < time.time() - interval):
- return None
- discarded_events = self._discarded_events
- self._discarded_events = defaultdict(int)
- self._last_client_report_sent = time.time()
- if not discarded_events:
- return None
- return Item(
- PayloadRef(
- json={
- "timestamp": time.time(),
- "discarded_events": [
- {"reason": reason, "category": category, "quantity": quantity}
- for (
- (category, reason),
- quantity,
- ) in discarded_events.items()
- ],
- }
- ),
- type="client_report",
- )
- def _check_disabled(self, category: str) -> bool:
- def _disabled(bucket: "Any") -> bool:
- ts = self._disabled_until.get(bucket)
- return ts is not None and ts > datetime.now(timezone.utc)
- return _disabled(category) or _disabled(None)
- def _is_rate_limited(self: "Self") -> bool:
- return any(
- ts > datetime.now(timezone.utc) for ts in self._disabled_until.values()
- )
- def _is_worker_full(self: "Self") -> bool:
- return self._worker.full()
- def is_healthy(self: "Self") -> bool:
- return not (self._is_worker_full() or self._is_rate_limited())
- def _prepare_envelope(
- self: "Self", envelope: "Envelope"
- ) -> "Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]":
- # remove all items from the envelope which are over quota
- new_items = []
- for item in envelope.items:
- if self._check_disabled(item.data_category):
- if item.data_category in ("transaction", "error", "default", "statsd"):
- self.on_dropped_event("self_rate_limits")
- self.record_lost_event("ratelimit_backoff", item=item)
- else:
- new_items.append(item)
- # Since we're modifying the envelope here make a copy so that others
- # that hold references do not see their envelope modified.
- envelope = Envelope(headers=envelope.headers, items=new_items)
- if not envelope.items:
- return None
- # since we're already in the business of sending out an envelope here
- # check if we have one pending for the stats session envelopes so we
- # can attach it to this enveloped scheduled for sending. This will
- # currently typically attach the client report to the most recent
- # session update.
- client_report_item = self._fetch_pending_client_report(interval=30)
- if client_report_item is not None:
- envelope.items.append(client_report_item)
- content_encoding, body = self._serialize_envelope(envelope)
- assert self.parsed_dsn is not None
- logger.debug(
- "Sending envelope [%s] project:%s host:%s",
- envelope.description,
- self.parsed_dsn.project_id,
- self.parsed_dsn.host,
- )
- headers: "Dict[str, str]" = {
- "Content-Type": "application/x-sentry-envelope",
- }
- if content_encoding:
- headers["Content-Encoding"] = content_encoding
- return envelope, body, headers
- def _serialize_envelope(
- self: "Self", envelope: "Envelope"
- ) -> "tuple[Optional[str], io.BytesIO]":
- content_encoding = None
- body = io.BytesIO()
- if self._compression_level == 0 or self._compression_algo is None:
- envelope.serialize_into(body)
- else:
- content_encoding = self._compression_algo
- if self._compression_algo == "br" and brotli is not None:
- body.write(
- brotli.compress(
- envelope.serialize(), quality=self._compression_level
- )
- )
- else: # assume gzip as we sanitize the algo value in init
- with gzip.GzipFile(
- fileobj=body, mode="w", compresslevel=self._compression_level
- ) as f:
- envelope.serialize_into(f)
- return content_encoding, body
- def _get_httpcore_pool_options(
- self: "Self", http2: bool = False
- ) -> "Dict[str, Any]":
- """Shared pool options for httpcore-based transports (Http2 and Async)."""
- options: "Dict[str, Any]" = {
- "http2": http2,
- "retries": 3,
- }
- socket_options: "Optional[List[Tuple[int, int, int | bytes]]]" = None
- if self.options["socket_options"] is not None:
- socket_options = self.options["socket_options"]
- if socket_options is None:
- socket_options = []
- used_options = {(o[0], o[1]) for o in socket_options}
- for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
- if (default_option[0], default_option[1]) not in used_options:
- socket_options.append(default_option)
- if socket_options is not None:
- options["socket_options"] = socket_options
- ssl_context = ssl.create_default_context()
- ssl_context.load_verify_locations(
- self.options["ca_certs"]
- or os.environ.get("SSL_CERT_FILE")
- or os.environ.get("REQUESTS_CA_BUNDLE")
- or certifi.where()
- )
- cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE")
- key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE")
- if cert_file is not None:
- ssl_context.load_cert_chain(cert_file, key_file)
- options["ssl_context"] = ssl_context
- return options
- def _resolve_proxy(self: "Self") -> "Optional[str]":
- """Resolve proxy URL from options and environment. Returns proxy URL or None."""
- if self.parsed_dsn is None:
- return None
- no_proxy = self._in_no_proxy(self.parsed_dsn)
- proxy = None
- # try HTTPS first
- https_proxy = self.options["https_proxy"]
- if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
- proxy = https_proxy or (not no_proxy and getproxies().get("https"))
- # maybe fallback to HTTP proxy
- http_proxy = self.options["http_proxy"]
- if not proxy and (http_proxy != ""):
- proxy = http_proxy or (not no_proxy and getproxies().get("http"))
- return proxy or None
- @property
- def _timeout_extensions(self: "Self") -> "Dict[str, Any]":
- return {
- "timeout": {
- "pool": self.TIMEOUT,
- "connect": self.TIMEOUT,
- "write": self.TIMEOUT,
- "read": self.TIMEOUT,
- }
- }
- def _get_pool_options(self: "Self") -> "Dict[str, Any]":
- raise NotImplementedError()
- def _in_no_proxy(self: "Self", parsed_dsn: "Dsn") -> bool:
- no_proxy = getproxies().get("no")
- if not no_proxy:
- return False
- for host in no_proxy.split(","):
- host = host.strip()
- if parsed_dsn.host.endswith(host) or parsed_dsn.netloc.endswith(host):
- return True
- return False
- def _make_pool(
- self: "Self",
- ) -> "Union[PoolManager, ProxyManager, httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool, httpcore.AsyncSOCKSProxy, httpcore.AsyncHTTPProxy, httpcore.AsyncConnectionPool]":
- raise NotImplementedError()
- def _request(
- self: "Self",
- method: str,
- endpoint_type: "EndpointType",
- body: "Any",
- headers: "Mapping[str, str]",
- ) -> "Union[urllib3.BaseHTTPResponse, httpcore.Response]":
- raise NotImplementedError()
- def kill(self: "Self") -> None:
- logger.debug("Killing HTTP transport")
- self._worker.kill()
- # Keep BaseHttpTransport as an alias for backwards compatibility
- # and for the sync transport implementation
- class BaseHttpTransport(HttpTransportCore):
- """The base HTTP transport (synchronous)."""
- def _send_envelope(self: "Self", envelope: "Envelope") -> None:
- _prepared_envelope = self._prepare_envelope(envelope)
- if _prepared_envelope is not None:
- envelope, body, headers = _prepared_envelope
- self._send_request(
- body.getvalue(),
- headers=headers,
- endpoint_type=EndpointType.ENVELOPE,
- envelope=envelope,
- )
- return None
- def _send_request(
- self: "Self",
- body: bytes,
- headers: "Dict[str, str]",
- endpoint_type: "EndpointType",
- envelope: "Optional[Envelope]" = None,
- ) -> None:
- self._update_headers(headers)
- try:
- response = self._request(
- "POST",
- endpoint_type,
- body,
- headers,
- )
- except Exception:
- self._handle_request_error(envelope=envelope, loss_reason="network")
- raise
- try:
- self._handle_response(response=response, envelope=envelope)
- finally:
- response.close()
- def _create_worker(self: "Self", options: "Dict[str, Any]") -> "Worker":
- return BackgroundWorker(queue_size=options["transport_queue_size"])
- def _flush_client_reports(self: "Self", force: bool = False) -> None:
- client_report = self._fetch_pending_client_report(force=force, interval=60)
- if client_report is not None:
- self.capture_envelope(Envelope(items=[client_report]))
- def capture_envelope(
- self,
- envelope: "Envelope",
- ) -> None:
- def send_envelope_wrapper() -> None:
- with capture_internal_exceptions():
- self._send_envelope(envelope)
- self._flush_client_reports()
- if not self._worker.submit(send_envelope_wrapper):
- self.on_dropped_event("full_queue")
- for item in envelope.items:
- self.record_lost_event("queue_overflow", item=item)
- def flush(
- self: "Self",
- timeout: float,
- callback: "Optional[Callable[[int, float], None]]" = None,
- ) -> None:
- logger.debug("Flushing HTTP transport")
- if timeout > 0:
- self._worker.submit(lambda: self._flush_client_reports(force=True))
- self._worker.flush(timeout, callback)
- @staticmethod
- def _warn_hub_cls() -> None:
- """Convenience method to warn users about the deprecation of the `hub_cls` attribute."""
- warnings.warn(
- "The `hub_cls` attribute is deprecated and will be removed in a future release.",
- DeprecationWarning,
- stacklevel=3,
- )
- @property
- def hub_cls(self: "Self") -> "type[sentry_sdk.Hub]":
- """DEPRECATED: This attribute is deprecated and will be removed in a future release."""
- HttpTransport._warn_hub_cls()
- return self._hub_cls
- @hub_cls.setter
- def hub_cls(self: "Self", value: "type[sentry_sdk.Hub]") -> None:
- """DEPRECATED: This attribute is deprecated and will be removed in a future release."""
- HttpTransport._warn_hub_cls()
- self._hub_cls = value
- class HttpTransport(BaseHttpTransport):
- if TYPE_CHECKING:
- _pool: "Union[PoolManager, ProxyManager]"
- def _get_pool_options(self: "Self") -> "Dict[str, Any]":
- num_pools = self.options.get("_experiments", {}).get("transport_num_pools")
- options = {
- "num_pools": 2 if num_pools is None else int(num_pools),
- "cert_reqs": "CERT_REQUIRED",
- "timeout": urllib3.Timeout(total=self.TIMEOUT),
- }
- socket_options: "Optional[List[Tuple[int, int, int | bytes]]]" = None
- if self.options["socket_options"] is not None:
- socket_options = self.options["socket_options"]
- if self.options["keep_alive"]:
- if socket_options is None:
- socket_options = []
- used_options = {(o[0], o[1]) for o in socket_options}
- for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
- if (default_option[0], default_option[1]) not in used_options:
- socket_options.append(default_option)
- if socket_options is not None:
- options["socket_options"] = socket_options
- options["ca_certs"] = (
- self.options["ca_certs"] # User-provided bundle from the SDK init
- or os.environ.get("SSL_CERT_FILE")
- or os.environ.get("REQUESTS_CA_BUNDLE")
- or certifi.where()
- )
- options["cert_file"] = self.options["cert_file"] or os.environ.get(
- "CLIENT_CERT_FILE"
- )
- options["key_file"] = self.options["key_file"] or os.environ.get(
- "CLIENT_KEY_FILE"
- )
- return options
- def _make_pool(self: "Self") -> "Union[PoolManager, ProxyManager]":
- if self.parsed_dsn is None:
- raise ValueError("Cannot create HTTP-based transport without valid DSN")
- proxy = None
- no_proxy = self._in_no_proxy(self.parsed_dsn)
- # try HTTPS first
- https_proxy = self.options["https_proxy"]
- if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
- proxy = https_proxy or (not no_proxy and getproxies().get("https"))
- # maybe fallback to HTTP proxy
- http_proxy = self.options["http_proxy"]
- if not proxy and (http_proxy != ""):
- proxy = http_proxy or (not no_proxy and getproxies().get("http"))
- opts = self._get_pool_options()
- if proxy:
- proxy_headers = self.options["proxy_headers"]
- if proxy_headers:
- opts["proxy_headers"] = proxy_headers
- if proxy.startswith("socks"):
- use_socks_proxy = True
- try:
- # Check if PySocks dependency is available
- from urllib3.contrib.socks import SOCKSProxyManager
- except ImportError:
- use_socks_proxy = False
- logger.warning(
- "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support. Please add `PySocks` (or `urllib3` with the `[socks]` extra) to your dependencies.",
- proxy,
- )
- if use_socks_proxy:
- return SOCKSProxyManager(proxy, **opts)
- else:
- return urllib3.PoolManager(**opts)
- else:
- return urllib3.ProxyManager(proxy, **opts)
- else:
- return urllib3.PoolManager(**opts)
- def _request(
- self: "Self",
- method: str,
- endpoint_type: "EndpointType",
- body: "Any",
- headers: "Mapping[str, str]",
- ) -> "urllib3.BaseHTTPResponse":
- return self._pool.request(
- method,
- self._auth.get_api_url(endpoint_type),
- body=body,
- headers=headers,
- )
- class AsyncHttpTransport(HttpTransportCore):
- def __init__(self: "Self", options: "Dict[str, Any]") -> None:
- if not ASYNC_TRANSPORT_AVAILABLE:
- raise RuntimeError(
- "AsyncHttpTransport requires httpcore[asyncio]. "
- "Install it with: pip install sentry-sdk[asyncio]"
- )
- super().__init__(options)
- # Requires event loop at init time
- self.loop = asyncio.get_running_loop()
- def _create_worker(self: "Self", options: "Dict[str, Any]") -> "Worker":
- return AsyncWorker(queue_size=options["transport_queue_size"])
- def _get_header_value(
- self: "Self", response: "Any", header: str
- ) -> "Optional[str]":
- return _get_httpcore_header_value(response, header)
- async def _send_envelope(self: "Self", envelope: "Envelope") -> None:
- _prepared_envelope = self._prepare_envelope(envelope)
- if _prepared_envelope is not None:
- envelope, body, headers = _prepared_envelope
- await self._send_request(
- body.getvalue(),
- headers=headers,
- endpoint_type=EndpointType.ENVELOPE,
- envelope=envelope,
- )
- return None
- async def _send_request(
- self: "Self",
- body: bytes,
- headers: "Dict[str, str]",
- endpoint_type: "EndpointType",
- envelope: "Optional[Envelope]" = None,
- ) -> None:
- self._update_headers(headers)
- try:
- response = await self._request(
- "POST",
- endpoint_type,
- body,
- headers,
- )
- except Exception:
- self._handle_request_error(envelope=envelope, loss_reason="network")
- raise
- try:
- self._handle_response(response=response, envelope=envelope)
- finally:
- await response.aclose()
- async def _request( # type: ignore[override]
- self: "Self",
- method: str,
- endpoint_type: "EndpointType",
- body: "Any",
- headers: "Mapping[str, str]",
- ) -> "httpcore.Response":
- return await self._pool.request( # type: ignore[misc,unused-ignore]
- method,
- self._auth.get_api_url(endpoint_type),
- content=body,
- headers=headers, # type: ignore[arg-type,unused-ignore]
- extensions=self._timeout_extensions,
- )
- async def _flush_client_reports(self: "Self", force: bool = False) -> None:
- client_report = self._fetch_pending_client_report(force=force, interval=60)
- if client_report is not None:
- self.capture_envelope(Envelope(items=[client_report]))
- def _capture_envelope(self: "Self", envelope: "Envelope") -> None:
- async def send_envelope_wrapper() -> None:
- with capture_internal_exceptions():
- await self._send_envelope(envelope)
- await self._flush_client_reports()
- if not self._worker.submit(send_envelope_wrapper):
- self.on_dropped_event("full_queue")
- for item in envelope.items:
- self.record_lost_event("queue_overflow", item=item)
- def capture_envelope(self: "Self", envelope: "Envelope") -> None:
- # Synchronous entry point
- if self.loop and self.loop.is_running():
- self.loop.call_soon_threadsafe(self._capture_envelope, envelope)
- else:
- # The event loop is no longer running
- logger.warning("Async Transport is not running in an event loop.")
- self.on_dropped_event("internal_sdk_error")
- for item in envelope.items:
- self.record_lost_event("internal_sdk_error", item=item)
- def flush( # type: ignore[override]
- self: "Self",
- timeout: float,
- callback: "Optional[Callable[[int, float], None]]" = None,
- ) -> "Optional[asyncio.Task[None]]":
- logger.debug("Flushing HTTP transport")
- if timeout > 0:
- self._worker.submit(lambda: self._flush_client_reports(force=True))
- return self._worker.flush(timeout, callback) # type: ignore[func-returns-value]
- return None
- def _get_pool_options(self: "Self") -> "Dict[str, Any]":
- return self._get_httpcore_pool_options(
- http2=HTTP2_ENABLED
- and self.parsed_dsn is not None
- and self.parsed_dsn.scheme == "https"
- )
- def _make_pool(
- self: "Self",
- ) -> "Union[httpcore.AsyncSOCKSProxy, httpcore.AsyncHTTPProxy, httpcore.AsyncConnectionPool]":
- if self.parsed_dsn is None:
- raise ValueError("Cannot create HTTP-based transport without valid DSN")
- proxy = self._resolve_proxy()
- opts = self._get_pool_options()
- if proxy:
- proxy_headers = self.options["proxy_headers"]
- if proxy_headers:
- opts["proxy_headers"] = proxy_headers
- if proxy.startswith("socks"):
- try:
- socks_opts = opts.copy()
- if "socket_options" in socks_opts:
- socket_options = socks_opts.pop("socket_options")
- if socket_options:
- logger.warning(
- "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
- )
- return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **socks_opts)
- except RuntimeError:
- logger.warning(
- "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
- proxy,
- )
- else:
- return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts)
- return httpcore.AsyncConnectionPool(**opts)
- def kill(self: "Self") -> "Optional[asyncio.Task[None]]": # type: ignore[override]
- logger.debug("Killing HTTP transport")
- self._worker.kill()
- try:
- # Return the pool cleanup task so caller can await it if needed
- with mark_sentry_task_internal():
- return self.loop.create_task(self._pool.aclose()) # type: ignore[union-attr,unused-ignore]
- except RuntimeError:
- logger.warning("Event loop not running, aborting kill.")
- return None
- if not HTTP2_ENABLED:
- # Sorry, no Http2Transport for you
- class Http2Transport(HttpTransport):
- def __init__(self: "Self", options: "Dict[str, Any]") -> None:
- super().__init__(options)
- logger.warning(
- "You tried to use HTTP2Transport but don't have httpcore[http2] installed. Falling back to HTTPTransport."
- )
- else:
- class Http2Transport(BaseHttpTransport): # type: ignore
- """The HTTP2 transport based on httpcore."""
- TIMEOUT = 15
- if TYPE_CHECKING:
- _pool: """Union[
- httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool
- ]"""
- def _get_header_value(
- self: "Self", response: "httpcore.Response", header: str
- ) -> "Optional[str]":
- return _get_httpcore_header_value(response, header)
- def _request(
- self: "Self",
- method: str,
- endpoint_type: "EndpointType",
- body: "Any",
- headers: "Mapping[str, str]",
- ) -> "httpcore.Response":
- response = self._pool.request(
- method,
- self._auth.get_api_url(endpoint_type),
- content=body,
- headers=headers, # type: ignore[arg-type,unused-ignore]
- extensions=self._timeout_extensions,
- )
- return response
- def _get_pool_options(self: "Self") -> "Dict[str, Any]":
- return self._get_httpcore_pool_options(
- http2=self.parsed_dsn is not None and self.parsed_dsn.scheme == "https"
- )
- def _make_pool(
- self: "Self",
- ) -> "Union[httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool]":
- if self.parsed_dsn is None:
- raise ValueError("Cannot create HTTP-based transport without valid DSN")
- proxy = self._resolve_proxy()
- opts = self._get_pool_options()
- if proxy:
- proxy_headers = self.options["proxy_headers"]
- if proxy_headers:
- opts["proxy_headers"] = proxy_headers
- if proxy.startswith("socks"):
- try:
- if "socket_options" in opts:
- socket_options = opts.pop("socket_options")
- if socket_options:
- logger.warning(
- "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
- )
- return httpcore.SOCKSProxy(proxy_url=proxy, **opts)
- except RuntimeError:
- logger.warning(
- "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
- proxy,
- )
- else:
- return httpcore.HTTPProxy(proxy_url=proxy, **opts)
- return httpcore.ConnectionPool(**opts)
- class _FunctionTransport(Transport):
- """
- DEPRECATED: Users wishing to provide a custom transport should subclass
- the Transport class, rather than providing a function.
- """
- def __init__(
- self,
- func: "Callable[[Event], None]",
- ) -> None:
- Transport.__init__(self)
- self._func = func
- def capture_event(
- self,
- event: "Event",
- ) -> None:
- self._func(event)
- return None
- def capture_envelope(self, envelope: "Envelope") -> None:
- # Since function transports expect to be called with an event, we need
- # to iterate over the envelope and call the function for each event, via
- # the deprecated capture_event method.
- event = envelope.get_event()
- if event is not None:
- self.capture_event(event)
- def make_transport(options: "Dict[str, Any]") -> "Optional[Transport]":
- ref_transport = options["transport"]
- use_http2_transport = options.get("_experiments", {}).get("transport_http2", False)
- use_async_transport = options.get("_experiments", {}).get("transport_async", False)
- async_integration = any(
- integration.__class__.__name__ == "AsyncioIntegration"
- for integration in options.get("integrations") or []
- )
- # By default, we use the http transport class
- transport_cls: "Type[Transport]" = (
- Http2Transport if use_http2_transport else HttpTransport
- )
- if use_async_transport and ASYNC_TRANSPORT_AVAILABLE:
- try:
- asyncio.get_running_loop()
- if async_integration:
- if use_http2_transport:
- logger.warning(
- "HTTP/2 transport is not supported with async transport. "
- "Ignoring transport_http2 experiment."
- )
- transport_cls = AsyncHttpTransport
- else:
- logger.warning(
- "You tried to use AsyncHttpTransport but the AsyncioIntegration is not enabled. Falling back to sync transport."
- )
- except RuntimeError:
- # No event loop running, fall back to sync transport
- logger.warning("No event loop running, falling back to sync transport.")
- elif use_async_transport:
- logger.warning(
- "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport."
- )
- if isinstance(ref_transport, Transport):
- return ref_transport
- elif isinstance(ref_transport, type) and issubclass(ref_transport, Transport):
- transport_cls = ref_transport
- elif callable(ref_transport):
- warnings.warn(
- "Function transports are deprecated and will be removed in a future release."
- "Please provide a Transport instance or subclass, instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- return _FunctionTransport(ref_transport)
- # if a transport class is given only instantiate it if the dsn is not
- # empty or None
- if options["dsn"]:
- return transport_cls(options)
- return None
|