| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- import threading
- from collections import defaultdict
- from datetime import datetime, timezone
- from typing import TYPE_CHECKING
- from sentry_sdk._batcher import Batcher
- from sentry_sdk.envelope import Envelope, Item, PayloadRef
- from sentry_sdk.utils import format_timestamp, serialize_attribute
- if TYPE_CHECKING:
- from typing import Any, Callable, Optional
- from sentry_sdk.traces import StreamedSpan
- class SpanBatcher(Batcher["StreamedSpan"]):
- # MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
- # a bit of a buffer for spans that appear between setting the flush event
- # and actually flushing the buffer.
- #
- # The max limits are all per trace.
- MAX_ENVELOPE_SIZE = 1000 # spans
- MAX_BEFORE_FLUSH = 1000
- MAX_BEFORE_DROP = 2000
- MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB
- FLUSH_WAIT_TIME = 5.0
- TYPE = "span"
- CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json"
- def __init__(
- self,
- capture_func: "Callable[[Envelope], None]",
- record_lost_func: "Callable[..., None]",
- ) -> None:
- # Spans from different traces cannot be emitted in the same envelope
- # since the envelope contains a shared trace header. That's why we bucket
- # by trace_id, so that we can then send the buckets each in its own
- # envelope.
- # trace_id -> span buffer
- self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
- self._running_size: dict[str, int] = defaultdict(lambda: 0)
- self._capture_func = capture_func
- self._record_lost_func = record_lost_func
- self._running = True
- self._lock = threading.Lock()
- self._active: "threading.local" = threading.local()
- self._flush_event: "threading.Event" = threading.Event()
- self._flusher: "Optional[threading.Thread]" = None
- self._flusher_pid: "Optional[int]" = None
- def add(self, span: "StreamedSpan") -> None:
- # Bail out if the current thread is already executing batcher code.
- # This prevents deadlocks when code running inside the batcher (e.g.
- # _add_to_envelope during flush, or _flush_event.wait/set) triggers
- # a GC-emitted warning that routes back through the logging
- # integration into add().
- if getattr(self._active, "flag", False):
- return None
- self._active.flag = True
- try:
- if not self._ensure_thread() or self._flusher is None:
- return None
- with self._lock:
- size = len(self._span_buffer[span.trace_id])
- if size >= self.MAX_BEFORE_DROP:
- self._record_lost_func(
- reason="queue_overflow",
- data_category="span",
- quantity=1,
- )
- return None
- self._span_buffer[span.trace_id].append(span)
- self._running_size[span.trace_id] += self._estimate_size(span)
- if size + 1 >= self.MAX_BEFORE_FLUSH:
- self._flush_event.set()
- return
- if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
- self._flush_event.set()
- return
- finally:
- self._active.flag = False
- @staticmethod
- def _estimate_size(item: "StreamedSpan") -> int:
- # Rough estimate of serialized span size that's quick to compute.
- # 210 is the rough size of the payload without attributes, and then we
- # estimate the attributes separately.
- estimate = 210
- for value in item._attributes.values():
- estimate += 50
- if isinstance(value, str):
- estimate += len(value)
- else:
- estimate += len(str(value))
- return estimate
- @staticmethod
- def _to_transport_format(item: "StreamedSpan") -> "Any":
- res: "dict[str, Any]" = {
- "trace_id": item.trace_id,
- "span_id": item.span_id,
- "name": item._name,
- "status": item._status,
- "is_segment": item._is_segment(),
- "start_timestamp": item._start_timestamp.timestamp(),
- }
- if item._timestamp:
- res["end_timestamp"] = item._timestamp.timestamp()
- if item._parent_span_id:
- res["parent_span_id"] = item._parent_span_id
- if item._attributes:
- res["attributes"] = {
- k: serialize_attribute(v) for (k, v) in item._attributes.items()
- }
- return res
- def _flush(self) -> None:
- with self._lock:
- if len(self._span_buffer) == 0:
- return
- envelopes = []
- for spans in self._span_buffer.values():
- if spans:
- dsc = spans[0]._dynamic_sampling_context()
- # Max per envelope is 1000, so if we happen to have more than
- # 1000 spans in one bucket, we'll need to separate them.
- for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
- end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))
- envelope = Envelope(
- headers={
- "sent_at": format_timestamp(datetime.now(timezone.utc)),
- "trace": dsc,
- }
- )
- envelope.add_item(
- Item(
- type=self.TYPE,
- content_type=self.CONTENT_TYPE,
- headers={
- "item_count": end - start,
- },
- payload=PayloadRef(
- json={
- "items": [
- self._to_transport_format(spans[j])
- for j in range(start, end)
- ]
- }
- ),
- )
- )
- envelopes.append(envelope)
- self._span_buffer.clear()
- self._running_size.clear()
- for envelope in envelopes:
- self._capture_func(envelope)
|