_span_batcher.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import threading
  2. from collections import defaultdict
  3. from datetime import datetime, timezone
  4. from typing import TYPE_CHECKING
  5. from sentry_sdk._batcher import Batcher
  6. from sentry_sdk.envelope import Envelope, Item, PayloadRef
  7. from sentry_sdk.utils import format_timestamp, serialize_attribute
  8. if TYPE_CHECKING:
  9. from typing import Any, Callable, Optional
  10. from sentry_sdk.traces import StreamedSpan
  11. class SpanBatcher(Batcher["StreamedSpan"]):
  12. # MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
  13. # a bit of a buffer for spans that appear between setting the flush event
  14. # and actually flushing the buffer.
  15. #
  16. # The max limits are all per trace.
  17. MAX_ENVELOPE_SIZE = 1000 # spans
  18. MAX_BEFORE_FLUSH = 1000
  19. MAX_BEFORE_DROP = 2000
  20. MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB
  21. FLUSH_WAIT_TIME = 5.0
  22. TYPE = "span"
  23. CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json"
  24. def __init__(
  25. self,
  26. capture_func: "Callable[[Envelope], None]",
  27. record_lost_func: "Callable[..., None]",
  28. ) -> None:
  29. # Spans from different traces cannot be emitted in the same envelope
  30. # since the envelope contains a shared trace header. That's why we bucket
  31. # by trace_id, so that we can then send the buckets each in its own
  32. # envelope.
  33. # trace_id -> span buffer
  34. self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
  35. self._running_size: dict[str, int] = defaultdict(lambda: 0)
  36. self._capture_func = capture_func
  37. self._record_lost_func = record_lost_func
  38. self._running = True
  39. self._lock = threading.Lock()
  40. self._active: "threading.local" = threading.local()
  41. self._flush_event: "threading.Event" = threading.Event()
  42. self._flusher: "Optional[threading.Thread]" = None
  43. self._flusher_pid: "Optional[int]" = None
  44. def add(self, span: "StreamedSpan") -> None:
  45. # Bail out if the current thread is already executing batcher code.
  46. # This prevents deadlocks when code running inside the batcher (e.g.
  47. # _add_to_envelope during flush, or _flush_event.wait/set) triggers
  48. # a GC-emitted warning that routes back through the logging
  49. # integration into add().
  50. if getattr(self._active, "flag", False):
  51. return None
  52. self._active.flag = True
  53. try:
  54. if not self._ensure_thread() or self._flusher is None:
  55. return None
  56. with self._lock:
  57. size = len(self._span_buffer[span.trace_id])
  58. if size >= self.MAX_BEFORE_DROP:
  59. self._record_lost_func(
  60. reason="queue_overflow",
  61. data_category="span",
  62. quantity=1,
  63. )
  64. return None
  65. self._span_buffer[span.trace_id].append(span)
  66. self._running_size[span.trace_id] += self._estimate_size(span)
  67. if size + 1 >= self.MAX_BEFORE_FLUSH:
  68. self._flush_event.set()
  69. return
  70. if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
  71. self._flush_event.set()
  72. return
  73. finally:
  74. self._active.flag = False
  75. @staticmethod
  76. def _estimate_size(item: "StreamedSpan") -> int:
  77. # Rough estimate of serialized span size that's quick to compute.
  78. # 210 is the rough size of the payload without attributes, and then we
  79. # estimate the attributes separately.
  80. estimate = 210
  81. for value in item._attributes.values():
  82. estimate += 50
  83. if isinstance(value, str):
  84. estimate += len(value)
  85. else:
  86. estimate += len(str(value))
  87. return estimate
  88. @staticmethod
  89. def _to_transport_format(item: "StreamedSpan") -> "Any":
  90. res: "dict[str, Any]" = {
  91. "trace_id": item.trace_id,
  92. "span_id": item.span_id,
  93. "name": item._name,
  94. "status": item._status,
  95. "is_segment": item._is_segment(),
  96. "start_timestamp": item._start_timestamp.timestamp(),
  97. }
  98. if item._timestamp:
  99. res["end_timestamp"] = item._timestamp.timestamp()
  100. if item._parent_span_id:
  101. res["parent_span_id"] = item._parent_span_id
  102. if item._attributes:
  103. res["attributes"] = {
  104. k: serialize_attribute(v) for (k, v) in item._attributes.items()
  105. }
  106. return res
  107. def _flush(self) -> None:
  108. with self._lock:
  109. if len(self._span_buffer) == 0:
  110. return
  111. envelopes = []
  112. for spans in self._span_buffer.values():
  113. if spans:
  114. dsc = spans[0]._dynamic_sampling_context()
  115. # Max per envelope is 1000, so if we happen to have more than
  116. # 1000 spans in one bucket, we'll need to separate them.
  117. for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
  118. end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))
  119. envelope = Envelope(
  120. headers={
  121. "sent_at": format_timestamp(datetime.now(timezone.utc)),
  122. "trace": dsc,
  123. }
  124. )
  125. envelope.add_item(
  126. Item(
  127. type=self.TYPE,
  128. content_type=self.CONTENT_TYPE,
  129. headers={
  130. "item_count": end - start,
  131. },
  132. payload=PayloadRef(
  133. json={
  134. "items": [
  135. self._to_transport_format(spans[j])
  136. for j in range(start, end)
  137. ]
  138. }
  139. ),
  140. )
  141. )
  142. envelopes.append(envelope)
  143. self._span_buffer.clear()
  144. self._running_size.clear()
  145. for envelope in envelopes:
  146. self._capture_func(envelope)