transport.py 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168
  1. from abc import ABC, abstractmethod
  2. import asyncio
  3. import io
  4. import os
  5. import gzip
  6. import socket
  7. import ssl
  8. import time
  9. import warnings
  10. from datetime import datetime, timedelta, timezone
  11. from collections import defaultdict
  12. from urllib.request import getproxies
  13. try:
  14. import brotli # type: ignore
  15. except ImportError:
  16. brotli = None
  17. try:
  18. import httpcore
  19. except ImportError:
  20. httpcore = None # type: ignore[assignment,unused-ignore]
  21. try:
  22. import h2 # noqa: F401
  23. HTTP2_ENABLED = httpcore is not None
  24. except ImportError:
  25. HTTP2_ENABLED = False
  26. try:
  27. import anyio # noqa: F401
  28. ASYNC_TRANSPORT_AVAILABLE = httpcore is not None
  29. except ImportError:
  30. ASYNC_TRANSPORT_AVAILABLE = False
  31. import urllib3
  32. import certifi
  33. import sentry_sdk
  34. from sentry_sdk.consts import EndpointType
  35. from sentry_sdk.utils import (
  36. Dsn,
  37. logger,
  38. capture_internal_exceptions,
  39. mark_sentry_task_internal,
  40. )
  41. from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker
  42. from sentry_sdk.envelope import Envelope, Item, PayloadRef
  43. from typing import TYPE_CHECKING, cast, List, Dict
  44. if TYPE_CHECKING:
  45. from typing import Any
  46. from typing import Callable
  47. from typing import DefaultDict
  48. from typing import Iterable
  49. from typing import Mapping
  50. from typing import Optional
  51. from typing import Self
  52. from typing import Tuple
  53. from typing import Type
  54. from typing import Union
  55. from urllib3.poolmanager import PoolManager
  56. from urllib3.poolmanager import ProxyManager
  57. from sentry_sdk._types import Event, EventDataCategory
  58. KEEP_ALIVE_SOCKET_OPTIONS = []
  59. for option in [
  60. (socket.SOL_SOCKET, lambda: getattr(socket, "SO_KEEPALIVE"), 1), # noqa: B009
  61. (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPIDLE"), 45), # noqa: B009
  62. (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPINTVL"), 10), # noqa: B009
  63. (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPCNT"), 6), # noqa: B009
  64. ]:
  65. try:
  66. KEEP_ALIVE_SOCKET_OPTIONS.append((option[0], option[1](), option[2]))
  67. except AttributeError:
  68. # a specific option might not be available on specific systems,
  69. # e.g. TCP_KEEPIDLE doesn't exist on macOS
  70. pass
  71. def _get_httpcore_header_value(response: "Any", header: str) -> "Optional[str]":
  72. """Case-insensitive header lookup for httpcore-style responses."""
  73. header_lower = header.lower()
  74. return next(
  75. (
  76. val.decode("ascii")
  77. for key, val in response.headers
  78. if key.decode("ascii").lower() == header_lower
  79. ),
  80. None,
  81. )
  82. class Transport(ABC):
  83. """Baseclass for all transports.
  84. A transport is used to send an event to sentry.
  85. """
  86. parsed_dsn: "Optional[Dsn]" = None
  87. def __init__(self: "Self", options: "Optional[Dict[str, Any]]" = None) -> None:
  88. self.options = options
  89. if options and options["dsn"] is not None and options["dsn"]:
  90. self.parsed_dsn = Dsn(options["dsn"], options.get("org_id"))
  91. else:
  92. self.parsed_dsn = None
  93. def capture_event(self: "Self", event: "Event") -> None:
  94. """
  95. DEPRECATED: Please use capture_envelope instead.
  96. This gets invoked with the event dictionary when an event should
  97. be sent to sentry.
  98. """
  99. warnings.warn(
  100. "capture_event is deprecated, please use capture_envelope instead!",
  101. DeprecationWarning,
  102. stacklevel=2,
  103. )
  104. envelope = Envelope()
  105. envelope.add_event(event)
  106. self.capture_envelope(envelope)
  107. @abstractmethod
  108. def capture_envelope(self: "Self", envelope: "Envelope") -> None:
  109. """
  110. Send an envelope to Sentry.
  111. Envelopes are a data container format that can hold any type of data
  112. submitted to Sentry. We use it to send all event data (including errors,
  113. transactions, crons check-ins, etc.) to Sentry.
  114. """
  115. pass
  116. def flush(
  117. self: "Self",
  118. timeout: float,
  119. callback: "Optional[Any]" = None,
  120. ) -> None:
  121. """
  122. Wait `timeout` seconds for the current events to be sent out.
  123. The default implementation is a no-op, since this method may only be relevant to some transports.
  124. Subclasses should override this method if necessary.
  125. """
  126. return None
  127. def kill(self: "Self") -> None:
  128. """
  129. Forcefully kills the transport.
  130. The default implementation is a no-op, since this method may only be relevant to some transports.
  131. Subclasses should override this method if necessary.
  132. """
  133. return None
  134. def record_lost_event(
  135. self,
  136. reason: str,
  137. data_category: "Optional[EventDataCategory]" = None,
  138. item: "Optional[Item]" = None,
  139. *,
  140. quantity: int = 1,
  141. ) -> None:
  142. """This increments a counter for event loss by reason and
  143. data category by the given positive-int quantity (default 1).
  144. If an item is provided, the data category and quantity are
  145. extracted from the item, and the values passed for
  146. data_category and quantity are ignored.
  147. When recording a lost transaction via data_category="transaction",
  148. the calling code should also record the lost spans via this method.
  149. When recording lost spans, `quantity` should be set to the number
  150. of contained spans, plus one for the transaction itself. When
  151. passing an Item containing a transaction via the `item` parameter,
  152. this method automatically records the lost spans.
  153. """
  154. return None
  155. def is_healthy(self: "Self") -> bool:
  156. return True
  157. def _parse_rate_limits(
  158. header: str, now: "Optional[datetime]" = None
  159. ) -> "Iterable[Tuple[Optional[EventDataCategory], datetime]]":
  160. if now is None:
  161. now = datetime.now(timezone.utc)
  162. for limit in header.split(","):
  163. try:
  164. parameters = limit.strip().split(":")
  165. retry_after_val, categories = parameters[:2]
  166. retry_after = now + timedelta(seconds=int(retry_after_val))
  167. for category in categories and categories.split(";") or (None,):
  168. yield category, retry_after # type: ignore
  169. except (LookupError, ValueError):
  170. continue
  171. class HttpTransportCore(Transport):
  172. """Shared base class for sync and async transports."""
  173. TIMEOUT = 30 # seconds
  174. def __init__(self: "Self", options: "Dict[str, Any]") -> None:
  175. from sentry_sdk.consts import VERSION
  176. Transport.__init__(self, options)
  177. assert self.parsed_dsn is not None
  178. self.options: "Dict[str, Any]" = options
  179. self._worker = self._create_worker(options)
  180. self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
  181. self._disabled_until: "Dict[Optional[EventDataCategory], datetime]" = {}
  182. # We only use this Retry() class for the `get_retry_after` method it exposes
  183. self._retry = urllib3.util.Retry()
  184. self._discarded_events: "DefaultDict[Tuple[EventDataCategory, str], int]" = (
  185. defaultdict(int)
  186. )
  187. self._last_client_report_sent = time.time()
  188. self._pool = self._make_pool()
  189. # Backwards compatibility for deprecated `self.hub_class` attribute
  190. self._hub_cls = sentry_sdk.Hub
  191. experiments = options.get("_experiments", {})
  192. compression_level = experiments.get(
  193. "transport_compression_level",
  194. experiments.get("transport_zlib_compression_level"),
  195. )
  196. compression_algo = experiments.get(
  197. "transport_compression_algo",
  198. (
  199. "gzip"
  200. # if only compression level is set, assume gzip for backwards compatibility
  201. # if we don't have brotli available, fallback to gzip
  202. if compression_level is not None or brotli is None
  203. else "br"
  204. ),
  205. )
  206. if compression_algo == "br" and brotli is None:
  207. logger.warning(
  208. "You asked for brotli compression without the Brotli module, falling back to gzip -9"
  209. )
  210. compression_algo = "gzip"
  211. compression_level = None
  212. if compression_algo not in ("br", "gzip"):
  213. logger.warning(
  214. "Unknown compression algo %s, disabling compression", compression_algo
  215. )
  216. self._compression_level = 0
  217. self._compression_algo = None
  218. else:
  219. self._compression_algo = compression_algo
  220. if compression_level is not None:
  221. self._compression_level = compression_level
  222. elif self._compression_algo == "gzip":
  223. self._compression_level = 9
  224. elif self._compression_algo == "br":
  225. self._compression_level = 4
  226. def _create_worker(self: "Self", options: "Dict[str, Any]") -> "Worker":
  227. raise NotImplementedError()
  228. def record_lost_event(
  229. self,
  230. reason: str,
  231. data_category: "Optional[EventDataCategory]" = None,
  232. item: "Optional[Item]" = None,
  233. *,
  234. quantity: int = 1,
  235. ) -> None:
  236. if not self.options["send_client_reports"]:
  237. return
  238. if item is not None:
  239. data_category = item.data_category
  240. quantity = 1 # If an item is provided, we always count it as 1 (except for attachments, handled below).
  241. if data_category == "transaction":
  242. # Also record the lost spans
  243. event = item.get_transaction_event() or {}
  244. # +1 for the transaction itself
  245. span_count = (
  246. len(cast(List[Dict[str, object]], event.get("spans") or [])) + 1
  247. )
  248. self.record_lost_event(reason, "span", quantity=span_count)
  249. elif data_category == "log_item" and item:
  250. # Also record size of lost logs in bytes
  251. bytes_size = len(item.get_bytes())
  252. self.record_lost_event(reason, "log_byte", quantity=bytes_size)
  253. elif data_category == "attachment":
  254. # quantity of 0 is actually 1 as we do not want to count
  255. # empty attachments as actually empty.
  256. quantity = len(item.get_bytes()) or 1
  257. elif data_category is None:
  258. raise TypeError("data category not provided")
  259. self._discarded_events[data_category, reason] += quantity
  260. def _get_header_value(
  261. self: "Self", response: "Any", header: str
  262. ) -> "Optional[str]":
  263. return response.headers.get(header)
  264. def _update_rate_limits(
  265. self: "Self", response: "Union[urllib3.BaseHTTPResponse, httpcore.Response]"
  266. ) -> None:
  267. # new sentries with more rate limit insights. We honor this header
  268. # no matter of the status code to update our internal rate limits.
  269. header = self._get_header_value(response, "x-sentry-rate-limits")
  270. if header:
  271. logger.warning("Rate-limited via x-sentry-rate-limits")
  272. self._disabled_until.update(_parse_rate_limits(header))
  273. # old sentries only communicate global rate limit hits via the
  274. # retry-after header on 429. This header can also be emitted on new
  275. # sentries if a proxy in front wants to globally slow things down.
  276. elif response.status == 429:
  277. logger.warning("Rate-limited via 429")
  278. retry_after_value = self._get_header_value(response, "Retry-After")
  279. retry_after = (
  280. self._retry.parse_retry_after(retry_after_value)
  281. if retry_after_value is not None
  282. else None
  283. ) or 60
  284. self._disabled_until[None] = datetime.now(timezone.utc) + timedelta(
  285. seconds=retry_after
  286. )
  287. def _handle_request_error(
  288. self: "Self",
  289. envelope: "Optional[Envelope]",
  290. loss_reason: str = "network",
  291. record_reason: str = "network_error",
  292. ) -> None:
  293. def record_loss(reason: str) -> None:
  294. if envelope is None:
  295. self.record_lost_event(reason, data_category="error")
  296. else:
  297. for item in envelope.items:
  298. self.record_lost_event(reason, item=item)
  299. self.on_dropped_event(loss_reason)
  300. record_loss(record_reason)
  301. def _handle_response(
  302. self: "Self",
  303. response: "Union[urllib3.BaseHTTPResponse, httpcore.Response]",
  304. envelope: "Optional[Envelope]",
  305. ) -> None:
  306. self._update_rate_limits(response)
  307. if response.status == 413:
  308. size_exceeded_message = (
  309. "HTTP 413: Event dropped due to exceeded envelope size limit"
  310. )
  311. response_message = getattr(
  312. response, "data", getattr(response, "content", None)
  313. )
  314. if response_message is not None:
  315. size_exceeded_message += f" (body: {response_message})"
  316. logger.error(size_exceeded_message)
  317. self._handle_request_error(
  318. envelope=envelope, loss_reason="status_413", record_reason="send_error"
  319. )
  320. elif response.status == 429:
  321. # if we hit a 429. Something was rate limited but we already
  322. # acted on this in `self._update_rate_limits`. Note that we
  323. # do not want to record event loss here as we will have recorded
  324. # an outcome in relay already.
  325. self.on_dropped_event("status_429")
  326. pass
  327. elif response.status >= 300 or response.status < 200:
  328. logger.error(
  329. "Unexpected status code: %s (body: %s)",
  330. response.status,
  331. getattr(response, "data", getattr(response, "content", None)),
  332. )
  333. self._handle_request_error(
  334. envelope=envelope, loss_reason="status_{}".format(response.status)
  335. )
  336. def _update_headers(
  337. self: "Self",
  338. headers: "Dict[str, str]",
  339. ) -> None:
  340. headers.update(
  341. {
  342. "User-Agent": str(self._auth.client),
  343. "X-Sentry-Auth": str(self._auth.to_header()),
  344. }
  345. )
  346. def on_dropped_event(self: "Self", _reason: str) -> None:
  347. return None
  348. def _fetch_pending_client_report(
  349. self: "Self", force: bool = False, interval: int = 60
  350. ) -> "Optional[Item]":
  351. if not self.options["send_client_reports"]:
  352. return None
  353. if not (force or self._last_client_report_sent < time.time() - interval):
  354. return None
  355. discarded_events = self._discarded_events
  356. self._discarded_events = defaultdict(int)
  357. self._last_client_report_sent = time.time()
  358. if not discarded_events:
  359. return None
  360. return Item(
  361. PayloadRef(
  362. json={
  363. "timestamp": time.time(),
  364. "discarded_events": [
  365. {"reason": reason, "category": category, "quantity": quantity}
  366. for (
  367. (category, reason),
  368. quantity,
  369. ) in discarded_events.items()
  370. ],
  371. }
  372. ),
  373. type="client_report",
  374. )
  375. def _check_disabled(self, category: str) -> bool:
  376. def _disabled(bucket: "Any") -> bool:
  377. ts = self._disabled_until.get(bucket)
  378. return ts is not None and ts > datetime.now(timezone.utc)
  379. return _disabled(category) or _disabled(None)
  380. def _is_rate_limited(self: "Self") -> bool:
  381. return any(
  382. ts > datetime.now(timezone.utc) for ts in self._disabled_until.values()
  383. )
  384. def _is_worker_full(self: "Self") -> bool:
  385. return self._worker.full()
  386. def is_healthy(self: "Self") -> bool:
  387. return not (self._is_worker_full() or self._is_rate_limited())
  388. def _prepare_envelope(
  389. self: "Self", envelope: "Envelope"
  390. ) -> "Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]":
  391. # remove all items from the envelope which are over quota
  392. new_items = []
  393. for item in envelope.items:
  394. if self._check_disabled(item.data_category):
  395. if item.data_category in ("transaction", "error", "default", "statsd"):
  396. self.on_dropped_event("self_rate_limits")
  397. self.record_lost_event("ratelimit_backoff", item=item)
  398. else:
  399. new_items.append(item)
  400. # Since we're modifying the envelope here make a copy so that others
  401. # that hold references do not see their envelope modified.
  402. envelope = Envelope(headers=envelope.headers, items=new_items)
  403. if not envelope.items:
  404. return None
  405. # since we're already in the business of sending out an envelope here
  406. # check if we have one pending for the stats session envelopes so we
  407. # can attach it to this enveloped scheduled for sending. This will
  408. # currently typically attach the client report to the most recent
  409. # session update.
  410. client_report_item = self._fetch_pending_client_report(interval=30)
  411. if client_report_item is not None:
  412. envelope.items.append(client_report_item)
  413. content_encoding, body = self._serialize_envelope(envelope)
  414. assert self.parsed_dsn is not None
  415. logger.debug(
  416. "Sending envelope [%s] project:%s host:%s",
  417. envelope.description,
  418. self.parsed_dsn.project_id,
  419. self.parsed_dsn.host,
  420. )
  421. headers: "Dict[str, str]" = {
  422. "Content-Type": "application/x-sentry-envelope",
  423. }
  424. if content_encoding:
  425. headers["Content-Encoding"] = content_encoding
  426. return envelope, body, headers
  427. def _serialize_envelope(
  428. self: "Self", envelope: "Envelope"
  429. ) -> "tuple[Optional[str], io.BytesIO]":
  430. content_encoding = None
  431. body = io.BytesIO()
  432. if self._compression_level == 0 or self._compression_algo is None:
  433. envelope.serialize_into(body)
  434. else:
  435. content_encoding = self._compression_algo
  436. if self._compression_algo == "br" and brotli is not None:
  437. body.write(
  438. brotli.compress(
  439. envelope.serialize(), quality=self._compression_level
  440. )
  441. )
  442. else: # assume gzip as we sanitize the algo value in init
  443. with gzip.GzipFile(
  444. fileobj=body, mode="w", compresslevel=self._compression_level
  445. ) as f:
  446. envelope.serialize_into(f)
  447. return content_encoding, body
  448. def _get_httpcore_pool_options(
  449. self: "Self", http2: bool = False
  450. ) -> "Dict[str, Any]":
  451. """Shared pool options for httpcore-based transports (Http2 and Async)."""
  452. options: "Dict[str, Any]" = {
  453. "http2": http2,
  454. "retries": 3,
  455. }
  456. socket_options: "Optional[List[Tuple[int, int, int | bytes]]]" = None
  457. if self.options["socket_options"] is not None:
  458. socket_options = self.options["socket_options"]
  459. if socket_options is None:
  460. socket_options = []
  461. used_options = {(o[0], o[1]) for o in socket_options}
  462. for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
  463. if (default_option[0], default_option[1]) not in used_options:
  464. socket_options.append(default_option)
  465. if socket_options is not None:
  466. options["socket_options"] = socket_options
  467. ssl_context = ssl.create_default_context()
  468. ssl_context.load_verify_locations(
  469. self.options["ca_certs"]
  470. or os.environ.get("SSL_CERT_FILE")
  471. or os.environ.get("REQUESTS_CA_BUNDLE")
  472. or certifi.where()
  473. )
  474. cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE")
  475. key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE")
  476. if cert_file is not None:
  477. ssl_context.load_cert_chain(cert_file, key_file)
  478. options["ssl_context"] = ssl_context
  479. return options
  480. def _resolve_proxy(self: "Self") -> "Optional[str]":
  481. """Resolve proxy URL from options and environment. Returns proxy URL or None."""
  482. if self.parsed_dsn is None:
  483. return None
  484. no_proxy = self._in_no_proxy(self.parsed_dsn)
  485. proxy = None
  486. # try HTTPS first
  487. https_proxy = self.options["https_proxy"]
  488. if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
  489. proxy = https_proxy or (not no_proxy and getproxies().get("https"))
  490. # maybe fallback to HTTP proxy
  491. http_proxy = self.options["http_proxy"]
  492. if not proxy and (http_proxy != ""):
  493. proxy = http_proxy or (not no_proxy and getproxies().get("http"))
  494. return proxy or None
  495. @property
  496. def _timeout_extensions(self: "Self") -> "Dict[str, Any]":
  497. return {
  498. "timeout": {
  499. "pool": self.TIMEOUT,
  500. "connect": self.TIMEOUT,
  501. "write": self.TIMEOUT,
  502. "read": self.TIMEOUT,
  503. }
  504. }
  505. def _get_pool_options(self: "Self") -> "Dict[str, Any]":
  506. raise NotImplementedError()
  507. def _in_no_proxy(self: "Self", parsed_dsn: "Dsn") -> bool:
  508. no_proxy = getproxies().get("no")
  509. if not no_proxy:
  510. return False
  511. for host in no_proxy.split(","):
  512. host = host.strip()
  513. if parsed_dsn.host.endswith(host) or parsed_dsn.netloc.endswith(host):
  514. return True
  515. return False
  516. def _make_pool(
  517. self: "Self",
  518. ) -> "Union[PoolManager, ProxyManager, httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool, httpcore.AsyncSOCKSProxy, httpcore.AsyncHTTPProxy, httpcore.AsyncConnectionPool]":
  519. raise NotImplementedError()
  520. def _request(
  521. self: "Self",
  522. method: str,
  523. endpoint_type: "EndpointType",
  524. body: "Any",
  525. headers: "Mapping[str, str]",
  526. ) -> "Union[urllib3.BaseHTTPResponse, httpcore.Response]":
  527. raise NotImplementedError()
  528. def kill(self: "Self") -> None:
  529. logger.debug("Killing HTTP transport")
  530. self._worker.kill()
  531. # Keep BaseHttpTransport as an alias for backwards compatibility
  532. # and for the sync transport implementation
  533. class BaseHttpTransport(HttpTransportCore):
  534. """The base HTTP transport (synchronous)."""
  535. def _send_envelope(self: "Self", envelope: "Envelope") -> None:
  536. _prepared_envelope = self._prepare_envelope(envelope)
  537. if _prepared_envelope is not None:
  538. envelope, body, headers = _prepared_envelope
  539. self._send_request(
  540. body.getvalue(),
  541. headers=headers,
  542. endpoint_type=EndpointType.ENVELOPE,
  543. envelope=envelope,
  544. )
  545. return None
  546. def _send_request(
  547. self: "Self",
  548. body: bytes,
  549. headers: "Dict[str, str]",
  550. endpoint_type: "EndpointType",
  551. envelope: "Optional[Envelope]" = None,
  552. ) -> None:
  553. self._update_headers(headers)
  554. try:
  555. response = self._request(
  556. "POST",
  557. endpoint_type,
  558. body,
  559. headers,
  560. )
  561. except Exception:
  562. self._handle_request_error(envelope=envelope, loss_reason="network")
  563. raise
  564. try:
  565. self._handle_response(response=response, envelope=envelope)
  566. finally:
  567. response.close()
  568. def _create_worker(self: "Self", options: "Dict[str, Any]") -> "Worker":
  569. return BackgroundWorker(queue_size=options["transport_queue_size"])
  570. def _flush_client_reports(self: "Self", force: bool = False) -> None:
  571. client_report = self._fetch_pending_client_report(force=force, interval=60)
  572. if client_report is not None:
  573. self.capture_envelope(Envelope(items=[client_report]))
  574. def capture_envelope(
  575. self,
  576. envelope: "Envelope",
  577. ) -> None:
  578. def send_envelope_wrapper() -> None:
  579. with capture_internal_exceptions():
  580. self._send_envelope(envelope)
  581. self._flush_client_reports()
  582. if not self._worker.submit(send_envelope_wrapper):
  583. self.on_dropped_event("full_queue")
  584. for item in envelope.items:
  585. self.record_lost_event("queue_overflow", item=item)
  586. def flush(
  587. self: "Self",
  588. timeout: float,
  589. callback: "Optional[Callable[[int, float], None]]" = None,
  590. ) -> None:
  591. logger.debug("Flushing HTTP transport")
  592. if timeout > 0:
  593. self._worker.submit(lambda: self._flush_client_reports(force=True))
  594. self._worker.flush(timeout, callback)
  595. @staticmethod
  596. def _warn_hub_cls() -> None:
  597. """Convenience method to warn users about the deprecation of the `hub_cls` attribute."""
  598. warnings.warn(
  599. "The `hub_cls` attribute is deprecated and will be removed in a future release.",
  600. DeprecationWarning,
  601. stacklevel=3,
  602. )
  603. @property
  604. def hub_cls(self: "Self") -> "type[sentry_sdk.Hub]":
  605. """DEPRECATED: This attribute is deprecated and will be removed in a future release."""
  606. HttpTransport._warn_hub_cls()
  607. return self._hub_cls
  608. @hub_cls.setter
  609. def hub_cls(self: "Self", value: "type[sentry_sdk.Hub]") -> None:
  610. """DEPRECATED: This attribute is deprecated and will be removed in a future release."""
  611. HttpTransport._warn_hub_cls()
  612. self._hub_cls = value
  613. class HttpTransport(BaseHttpTransport):
  614. if TYPE_CHECKING:
  615. _pool: "Union[PoolManager, ProxyManager]"
  616. def _get_pool_options(self: "Self") -> "Dict[str, Any]":
  617. num_pools = self.options.get("_experiments", {}).get("transport_num_pools")
  618. options = {
  619. "num_pools": 2 if num_pools is None else int(num_pools),
  620. "cert_reqs": "CERT_REQUIRED",
  621. "timeout": urllib3.Timeout(total=self.TIMEOUT),
  622. }
  623. socket_options: "Optional[List[Tuple[int, int, int | bytes]]]" = None
  624. if self.options["socket_options"] is not None:
  625. socket_options = self.options["socket_options"]
  626. if self.options["keep_alive"]:
  627. if socket_options is None:
  628. socket_options = []
  629. used_options = {(o[0], o[1]) for o in socket_options}
  630. for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
  631. if (default_option[0], default_option[1]) not in used_options:
  632. socket_options.append(default_option)
  633. if socket_options is not None:
  634. options["socket_options"] = socket_options
  635. options["ca_certs"] = (
  636. self.options["ca_certs"] # User-provided bundle from the SDK init
  637. or os.environ.get("SSL_CERT_FILE")
  638. or os.environ.get("REQUESTS_CA_BUNDLE")
  639. or certifi.where()
  640. )
  641. options["cert_file"] = self.options["cert_file"] or os.environ.get(
  642. "CLIENT_CERT_FILE"
  643. )
  644. options["key_file"] = self.options["key_file"] or os.environ.get(
  645. "CLIENT_KEY_FILE"
  646. )
  647. return options
  648. def _make_pool(self: "Self") -> "Union[PoolManager, ProxyManager]":
  649. if self.parsed_dsn is None:
  650. raise ValueError("Cannot create HTTP-based transport without valid DSN")
  651. proxy = None
  652. no_proxy = self._in_no_proxy(self.parsed_dsn)
  653. # try HTTPS first
  654. https_proxy = self.options["https_proxy"]
  655. if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
  656. proxy = https_proxy or (not no_proxy and getproxies().get("https"))
  657. # maybe fallback to HTTP proxy
  658. http_proxy = self.options["http_proxy"]
  659. if not proxy and (http_proxy != ""):
  660. proxy = http_proxy or (not no_proxy and getproxies().get("http"))
  661. opts = self._get_pool_options()
  662. if proxy:
  663. proxy_headers = self.options["proxy_headers"]
  664. if proxy_headers:
  665. opts["proxy_headers"] = proxy_headers
  666. if proxy.startswith("socks"):
  667. use_socks_proxy = True
  668. try:
  669. # Check if PySocks dependency is available
  670. from urllib3.contrib.socks import SOCKSProxyManager
  671. except ImportError:
  672. use_socks_proxy = False
  673. logger.warning(
  674. "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support. Please add `PySocks` (or `urllib3` with the `[socks]` extra) to your dependencies.",
  675. proxy,
  676. )
  677. if use_socks_proxy:
  678. return SOCKSProxyManager(proxy, **opts)
  679. else:
  680. return urllib3.PoolManager(**opts)
  681. else:
  682. return urllib3.ProxyManager(proxy, **opts)
  683. else:
  684. return urllib3.PoolManager(**opts)
  685. def _request(
  686. self: "Self",
  687. method: str,
  688. endpoint_type: "EndpointType",
  689. body: "Any",
  690. headers: "Mapping[str, str]",
  691. ) -> "urllib3.BaseHTTPResponse":
  692. return self._pool.request(
  693. method,
  694. self._auth.get_api_url(endpoint_type),
  695. body=body,
  696. headers=headers,
  697. )
  698. class AsyncHttpTransport(HttpTransportCore):
  699. def __init__(self: "Self", options: "Dict[str, Any]") -> None:
  700. if not ASYNC_TRANSPORT_AVAILABLE:
  701. raise RuntimeError(
  702. "AsyncHttpTransport requires httpcore[asyncio]. "
  703. "Install it with: pip install sentry-sdk[asyncio]"
  704. )
  705. super().__init__(options)
  706. # Requires event loop at init time
  707. self.loop = asyncio.get_running_loop()
  708. def _create_worker(self: "Self", options: "Dict[str, Any]") -> "Worker":
  709. return AsyncWorker(queue_size=options["transport_queue_size"])
  710. def _get_header_value(
  711. self: "Self", response: "Any", header: str
  712. ) -> "Optional[str]":
  713. return _get_httpcore_header_value(response, header)
  714. async def _send_envelope(self: "Self", envelope: "Envelope") -> None:
  715. _prepared_envelope = self._prepare_envelope(envelope)
  716. if _prepared_envelope is not None:
  717. envelope, body, headers = _prepared_envelope
  718. await self._send_request(
  719. body.getvalue(),
  720. headers=headers,
  721. endpoint_type=EndpointType.ENVELOPE,
  722. envelope=envelope,
  723. )
  724. return None
  725. async def _send_request(
  726. self: "Self",
  727. body: bytes,
  728. headers: "Dict[str, str]",
  729. endpoint_type: "EndpointType",
  730. envelope: "Optional[Envelope]" = None,
  731. ) -> None:
  732. self._update_headers(headers)
  733. try:
  734. response = await self._request(
  735. "POST",
  736. endpoint_type,
  737. body,
  738. headers,
  739. )
  740. except Exception:
  741. self._handle_request_error(envelope=envelope, loss_reason="network")
  742. raise
  743. try:
  744. self._handle_response(response=response, envelope=envelope)
  745. finally:
  746. await response.aclose()
  747. async def _request( # type: ignore[override]
  748. self: "Self",
  749. method: str,
  750. endpoint_type: "EndpointType",
  751. body: "Any",
  752. headers: "Mapping[str, str]",
  753. ) -> "httpcore.Response":
  754. return await self._pool.request( # type: ignore[misc,unused-ignore]
  755. method,
  756. self._auth.get_api_url(endpoint_type),
  757. content=body,
  758. headers=headers, # type: ignore[arg-type,unused-ignore]
  759. extensions=self._timeout_extensions,
  760. )
  761. async def _flush_client_reports(self: "Self", force: bool = False) -> None:
  762. client_report = self._fetch_pending_client_report(force=force, interval=60)
  763. if client_report is not None:
  764. self.capture_envelope(Envelope(items=[client_report]))
  765. def _capture_envelope(self: "Self", envelope: "Envelope") -> None:
  766. async def send_envelope_wrapper() -> None:
  767. with capture_internal_exceptions():
  768. await self._send_envelope(envelope)
  769. await self._flush_client_reports()
  770. if not self._worker.submit(send_envelope_wrapper):
  771. self.on_dropped_event("full_queue")
  772. for item in envelope.items:
  773. self.record_lost_event("queue_overflow", item=item)
  774. def capture_envelope(self: "Self", envelope: "Envelope") -> None:
  775. # Synchronous entry point
  776. if self.loop and self.loop.is_running():
  777. self.loop.call_soon_threadsafe(self._capture_envelope, envelope)
  778. else:
  779. # The event loop is no longer running
  780. logger.warning("Async Transport is not running in an event loop.")
  781. self.on_dropped_event("internal_sdk_error")
  782. for item in envelope.items:
  783. self.record_lost_event("internal_sdk_error", item=item)
  784. def flush( # type: ignore[override]
  785. self: "Self",
  786. timeout: float,
  787. callback: "Optional[Callable[[int, float], None]]" = None,
  788. ) -> "Optional[asyncio.Task[None]]":
  789. logger.debug("Flushing HTTP transport")
  790. if timeout > 0:
  791. self._worker.submit(lambda: self._flush_client_reports(force=True))
  792. return self._worker.flush(timeout, callback) # type: ignore[func-returns-value]
  793. return None
  794. def _get_pool_options(self: "Self") -> "Dict[str, Any]":
  795. return self._get_httpcore_pool_options(
  796. http2=HTTP2_ENABLED
  797. and self.parsed_dsn is not None
  798. and self.parsed_dsn.scheme == "https"
  799. )
  800. def _make_pool(
  801. self: "Self",
  802. ) -> "Union[httpcore.AsyncSOCKSProxy, httpcore.AsyncHTTPProxy, httpcore.AsyncConnectionPool]":
  803. if self.parsed_dsn is None:
  804. raise ValueError("Cannot create HTTP-based transport without valid DSN")
  805. proxy = self._resolve_proxy()
  806. opts = self._get_pool_options()
  807. if proxy:
  808. proxy_headers = self.options["proxy_headers"]
  809. if proxy_headers:
  810. opts["proxy_headers"] = proxy_headers
  811. if proxy.startswith("socks"):
  812. try:
  813. socks_opts = opts.copy()
  814. if "socket_options" in socks_opts:
  815. socket_options = socks_opts.pop("socket_options")
  816. if socket_options:
  817. logger.warning(
  818. "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
  819. )
  820. return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **socks_opts)
  821. except RuntimeError:
  822. logger.warning(
  823. "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
  824. proxy,
  825. )
  826. else:
  827. return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts)
  828. return httpcore.AsyncConnectionPool(**opts)
  829. def kill(self: "Self") -> "Optional[asyncio.Task[None]]": # type: ignore[override]
  830. logger.debug("Killing HTTP transport")
  831. self._worker.kill()
  832. try:
  833. # Return the pool cleanup task so caller can await it if needed
  834. with mark_sentry_task_internal():
  835. return self.loop.create_task(self._pool.aclose()) # type: ignore[union-attr,unused-ignore]
  836. except RuntimeError:
  837. logger.warning("Event loop not running, aborting kill.")
  838. return None
  839. if not HTTP2_ENABLED:
  840. # Sorry, no Http2Transport for you
  841. class Http2Transport(HttpTransport):
  842. def __init__(self: "Self", options: "Dict[str, Any]") -> None:
  843. super().__init__(options)
  844. logger.warning(
  845. "You tried to use HTTP2Transport but don't have httpcore[http2] installed. Falling back to HTTPTransport."
  846. )
  847. else:
  848. class Http2Transport(BaseHttpTransport): # type: ignore
  849. """The HTTP2 transport based on httpcore."""
  850. TIMEOUT = 15
  851. if TYPE_CHECKING:
  852. _pool: """Union[
  853. httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool
  854. ]"""
  855. def _get_header_value(
  856. self: "Self", response: "httpcore.Response", header: str
  857. ) -> "Optional[str]":
  858. return _get_httpcore_header_value(response, header)
  859. def _request(
  860. self: "Self",
  861. method: str,
  862. endpoint_type: "EndpointType",
  863. body: "Any",
  864. headers: "Mapping[str, str]",
  865. ) -> "httpcore.Response":
  866. response = self._pool.request(
  867. method,
  868. self._auth.get_api_url(endpoint_type),
  869. content=body,
  870. headers=headers, # type: ignore[arg-type,unused-ignore]
  871. extensions=self._timeout_extensions,
  872. )
  873. return response
  874. def _get_pool_options(self: "Self") -> "Dict[str, Any]":
  875. return self._get_httpcore_pool_options(
  876. http2=self.parsed_dsn is not None and self.parsed_dsn.scheme == "https"
  877. )
  878. def _make_pool(
  879. self: "Self",
  880. ) -> "Union[httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool]":
  881. if self.parsed_dsn is None:
  882. raise ValueError("Cannot create HTTP-based transport without valid DSN")
  883. proxy = self._resolve_proxy()
  884. opts = self._get_pool_options()
  885. if proxy:
  886. proxy_headers = self.options["proxy_headers"]
  887. if proxy_headers:
  888. opts["proxy_headers"] = proxy_headers
  889. if proxy.startswith("socks"):
  890. try:
  891. if "socket_options" in opts:
  892. socket_options = opts.pop("socket_options")
  893. if socket_options:
  894. logger.warning(
  895. "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
  896. )
  897. return httpcore.SOCKSProxy(proxy_url=proxy, **opts)
  898. except RuntimeError:
  899. logger.warning(
  900. "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
  901. proxy,
  902. )
  903. else:
  904. return httpcore.HTTPProxy(proxy_url=proxy, **opts)
  905. return httpcore.ConnectionPool(**opts)
  906. class _FunctionTransport(Transport):
  907. """
  908. DEPRECATED: Users wishing to provide a custom transport should subclass
  909. the Transport class, rather than providing a function.
  910. """
  911. def __init__(
  912. self,
  913. func: "Callable[[Event], None]",
  914. ) -> None:
  915. Transport.__init__(self)
  916. self._func = func
  917. def capture_event(
  918. self,
  919. event: "Event",
  920. ) -> None:
  921. self._func(event)
  922. return None
  923. def capture_envelope(self, envelope: "Envelope") -> None:
  924. # Since function transports expect to be called with an event, we need
  925. # to iterate over the envelope and call the function for each event, via
  926. # the deprecated capture_event method.
  927. event = envelope.get_event()
  928. if event is not None:
  929. self.capture_event(event)
  930. def make_transport(options: "Dict[str, Any]") -> "Optional[Transport]":
  931. ref_transport = options["transport"]
  932. use_http2_transport = options.get("_experiments", {}).get("transport_http2", False)
  933. use_async_transport = options.get("_experiments", {}).get("transport_async", False)
  934. async_integration = any(
  935. integration.__class__.__name__ == "AsyncioIntegration"
  936. for integration in options.get("integrations") or []
  937. )
  938. # By default, we use the http transport class
  939. transport_cls: "Type[Transport]" = (
  940. Http2Transport if use_http2_transport else HttpTransport
  941. )
  942. if use_async_transport and ASYNC_TRANSPORT_AVAILABLE:
  943. try:
  944. asyncio.get_running_loop()
  945. if async_integration:
  946. if use_http2_transport:
  947. logger.warning(
  948. "HTTP/2 transport is not supported with async transport. "
  949. "Ignoring transport_http2 experiment."
  950. )
  951. transport_cls = AsyncHttpTransport
  952. else:
  953. logger.warning(
  954. "You tried to use AsyncHttpTransport but the AsyncioIntegration is not enabled. Falling back to sync transport."
  955. )
  956. except RuntimeError:
  957. # No event loop running, fall back to sync transport
  958. logger.warning("No event loop running, falling back to sync transport.")
  959. elif use_async_transport:
  960. logger.warning(
  961. "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport."
  962. )
  963. if isinstance(ref_transport, Transport):
  964. return ref_transport
  965. elif isinstance(ref_transport, type) and issubclass(ref_transport, Transport):
  966. transport_cls = ref_transport
  967. elif callable(ref_transport):
  968. warnings.warn(
  969. "Function transports are deprecated and will be removed in a future release."
  970. "Please provide a Transport instance or subclass, instead.",
  971. DeprecationWarning,
  972. stacklevel=2,
  973. )
  974. return _FunctionTransport(ref_transport)
  975. # if a transport class is given only instantiate it if the dsn is not
  976. # empty or None
  977. if options["dsn"]:
  978. return transport_cls(options)
  979. return None