_batcher.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import os
  2. import random
  3. import threading
  4. from datetime import datetime, timezone
  5. from typing import TYPE_CHECKING, TypeVar, Generic
  6. from sentry_sdk.utils import format_timestamp
  7. from sentry_sdk.envelope import Envelope, Item, PayloadRef
  8. if TYPE_CHECKING:
  9. from typing import Optional, Callable, Any
  10. T = TypeVar("T")
  11. class Batcher(Generic[T]):
  12. MAX_BEFORE_FLUSH = 100
  13. MAX_BEFORE_DROP = 1_000
  14. FLUSH_WAIT_TIME = 5.0
  15. TYPE = ""
  16. CONTENT_TYPE = ""
  17. def __init__(
  18. self,
  19. capture_func: "Callable[[Envelope], None]",
  20. record_lost_func: "Callable[..., None]",
  21. ) -> None:
  22. self._buffer: "list[T]" = []
  23. self._capture_func = capture_func
  24. self._record_lost_func = record_lost_func
  25. self._running = True
  26. self._lock = threading.Lock()
  27. self._active: "threading.local" = threading.local()
  28. self._flush_event: "threading.Event" = threading.Event()
  29. self._flusher: "Optional[threading.Thread]" = None
  30. self._flusher_pid: "Optional[int]" = None
  31. def _ensure_thread(self) -> bool:
  32. """For forking processes we might need to restart this thread.
  33. This ensures that our process actually has that thread running.
  34. """
  35. if not self._running:
  36. return False
  37. pid = os.getpid()
  38. if self._flusher_pid == pid:
  39. return True
  40. with self._lock:
  41. # Recheck to make sure another thread didn't get here and start the
  42. # the flusher in the meantime
  43. if self._flusher_pid == pid:
  44. return True
  45. self._flusher_pid = pid
  46. self._flusher = threading.Thread(target=self._flush_loop)
  47. self._flusher.daemon = True
  48. try:
  49. self._flusher.start()
  50. except RuntimeError:
  51. # Unfortunately at this point the interpreter is in a state that no
  52. # longer allows us to spawn a thread and we have to bail.
  53. self._running = False
  54. return False
  55. return True
  56. def _flush_loop(self) -> None:
  57. # Mark the flush-loop thread as active for its entire lifetime so
  58. # that any re-entrant add() triggered by GC warnings during wait(),
  59. # flush(), or Event operations is silently dropped instead of
  60. # deadlocking on internal locks.
  61. self._active.flag = True
  62. while self._running:
  63. self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
  64. self._flush_event.clear()
  65. self._flush()
  66. def add(self, item: "T") -> None:
  67. # Bail out if the current thread is already executing batcher code.
  68. # This prevents deadlocks when code running inside the batcher (e.g.
  69. # _add_to_envelope during flush, or _flush_event.wait/set) triggers
  70. # a GC-emitted warning that routes back through the logging
  71. # integration into add().
  72. if getattr(self._active, "flag", False):
  73. return None
  74. self._active.flag = True
  75. try:
  76. if not self._ensure_thread() or self._flusher is None:
  77. return None
  78. with self._lock:
  79. if len(self._buffer) >= self.MAX_BEFORE_DROP:
  80. self._record_lost(item)
  81. return None
  82. self._buffer.append(item)
  83. if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
  84. self._flush_event.set()
  85. finally:
  86. self._active.flag = False
  87. def kill(self) -> None:
  88. if self._flusher is None:
  89. return
  90. self._running = False
  91. self._flush_event.set()
  92. self._flusher = None
  93. def flush(self) -> None:
  94. was_active = getattr(self._active, "flag", False)
  95. self._active.flag = True
  96. try:
  97. self._flush()
  98. finally:
  99. self._active.flag = was_active
  100. def _add_to_envelope(self, envelope: "Envelope") -> None:
  101. envelope.add_item(
  102. Item(
  103. type=self.TYPE,
  104. content_type=self.CONTENT_TYPE,
  105. headers={
  106. "item_count": len(self._buffer),
  107. },
  108. payload=PayloadRef(
  109. json={
  110. "items": [
  111. self._to_transport_format(item) for item in self._buffer
  112. ]
  113. }
  114. ),
  115. )
  116. )
  117. def _flush(self) -> "Optional[Envelope]":
  118. envelope = Envelope(
  119. headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
  120. )
  121. with self._lock:
  122. if len(self._buffer) == 0:
  123. return None
  124. self._add_to_envelope(envelope)
  125. self._buffer.clear()
  126. self._capture_func(envelope)
  127. return envelope
  128. def _record_lost(self, item: "T") -> None:
  129. pass
  130. @staticmethod
  131. def _to_transport_format(item: "T") -> "Any":
  132. pass