| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- from abc import ABC, abstractmethod
- import asyncio
- import os
- import threading
- from time import sleep, time
- from sentry_sdk._queue import Queue, FullError
- from sentry_sdk.utils import logger, mark_sentry_task_internal
- from sentry_sdk.consts import DEFAULT_QUEUE_SIZE
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from typing import Any
- from typing import Optional
- from typing import Callable
- _TERMINATOR = object()
- class Worker(ABC):
- """Base class for all workers."""
- @property
- @abstractmethod
- def is_alive(self) -> bool:
- """Whether the worker is alive and running."""
- pass
- @abstractmethod
- def kill(self) -> None:
- """Kill the worker. It will not process any more events."""
- pass
- def flush(
- self, timeout: float, callback: "Optional[Callable[[int, float], Any]]" = None
- ) -> None:
- """Flush the worker, blocking until done or timeout is reached."""
- return None
- @abstractmethod
- def full(self) -> bool:
- """Whether the worker's queue is full."""
- pass
- @abstractmethod
- def submit(self, callback: "Callable[[], Any]") -> bool:
- """Schedule a callback. Returns True if queued, False if full."""
- pass
- class BackgroundWorker(Worker):
- def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
- self._queue: "Queue" = Queue(queue_size)
- self._lock = threading.Lock()
- self._thread: "Optional[threading.Thread]" = None
- self._thread_for_pid: "Optional[int]" = None
- @property
- def is_alive(self) -> bool:
- if self._thread_for_pid != os.getpid():
- return False
- if not self._thread:
- return False
- return self._thread.is_alive()
- def _ensure_thread(self) -> None:
- if not self.is_alive:
- self.start()
- def _timed_queue_join(self, timeout: float) -> bool:
- deadline = time() + timeout
- queue = self._queue
- queue.all_tasks_done.acquire()
- try:
- while queue.unfinished_tasks:
- delay = deadline - time()
- if delay <= 0:
- return False
- queue.all_tasks_done.wait(timeout=delay)
- return True
- finally:
- queue.all_tasks_done.release()
- def start(self) -> None:
- with self._lock:
- if not self.is_alive:
- self._thread = threading.Thread(
- target=self._target, name="sentry-sdk.BackgroundWorker"
- )
- self._thread.daemon = True
- try:
- self._thread.start()
- self._thread_for_pid = os.getpid()
- except RuntimeError:
- # At this point we can no longer start because the interpreter
- # is already shutting down. Sadly at this point we can no longer
- # send out events.
- self._thread = None
- def kill(self) -> None:
- """
- Kill worker thread. Returns immediately. Not useful for
- waiting on shutdown for events, use `flush` for that.
- """
- logger.debug("background worker got kill request")
- with self._lock:
- if self._thread:
- try:
- self._queue.put_nowait(_TERMINATOR)
- except FullError:
- logger.debug("background worker queue full, kill failed")
- self._thread = None
- self._thread_for_pid = None
- def flush(self, timeout: float, callback: "Optional[Any]" = None) -> None:
- logger.debug("background worker got flush request")
- with self._lock:
- if self.is_alive and timeout > 0.0:
- self._wait_flush(timeout, callback)
- logger.debug("background worker flushed")
- def full(self) -> bool:
- return self._queue.full()
- def _wait_flush(self, timeout: float, callback: "Optional[Any]") -> None:
- initial_timeout = min(0.1, timeout)
- if not self._timed_queue_join(initial_timeout):
- pending = self._queue.qsize() + 1
- logger.debug("%d event(s) pending on flush", pending)
- if callback is not None:
- callback(pending, timeout)
- if not self._timed_queue_join(timeout - initial_timeout):
- pending = self._queue.qsize() + 1
- logger.error("flush timed out, dropped %s events", pending)
- def submit(self, callback: "Callable[[], Any]") -> bool:
- self._ensure_thread()
- try:
- self._queue.put_nowait(callback)
- return True
- except FullError:
- return False
- def _target(self) -> None:
- while True:
- callback = self._queue.get()
- try:
- if callback is _TERMINATOR:
- break
- try:
- callback()
- except Exception:
- logger.error("Failed processing job", exc_info=True)
- finally:
- self._queue.task_done()
- sleep(0)
- class AsyncWorker(Worker):
- def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
- self._queue: "Optional[asyncio.Queue[Any]]" = None
- self._queue_size = queue_size
- self._task: "Optional[asyncio.Task[None]]" = None
- # Event loop needs to remain in the same process
- self._task_for_pid: "Optional[int]" = None
- self._loop: "Optional[asyncio.AbstractEventLoop]" = None
- # Track active callback tasks so they have a strong reference and can be cancelled on kill
- self._active_tasks: "set[asyncio.Task[None]]" = set()
- @property
- def is_alive(self) -> bool:
- if self._task_for_pid != os.getpid():
- return False
- if not self._task or not self._loop:
- return False
- return self._loop.is_running() and not self._task.done()
- def kill(self) -> None:
- if self._task:
- # Cancel the main consumer task to prevent duplicate consumers
- self._task.cancel()
- # Also cancel any active callback tasks
- # Avoid modifying the set while cancelling tasks
- tasks_to_cancel = set(self._active_tasks)
- for task in tasks_to_cancel:
- task.cancel()
- self._active_tasks.clear()
- self._loop = None
- self._task = None
- self._task_for_pid = None
- def start(self) -> None:
- if not self.is_alive:
- try:
- self._loop = asyncio.get_running_loop()
- # Always create a fresh queue on start to avoid stale items
- self._queue = asyncio.Queue(maxsize=self._queue_size)
- with mark_sentry_task_internal():
- self._task = self._loop.create_task(self._target())
- self._task_for_pid = os.getpid()
- except RuntimeError:
- # There is no event loop running
- logger.warning("No event loop running, async worker not started")
- self._loop = None
- self._task = None
- self._task_for_pid = None
- def full(self) -> bool:
- if self._queue is None:
- return True
- return self._queue.full()
- def _ensure_task(self) -> None:
- if not self.is_alive:
- self.start()
- async def _wait_flush(
- self, timeout: float, callback: "Optional[Any]" = None
- ) -> None:
- if not self._loop or not self._loop.is_running() or self._queue is None:
- return
- initial_timeout = min(0.1, timeout)
- # Timeout on the join
- try:
- await asyncio.wait_for(self._queue.join(), timeout=initial_timeout)
- except asyncio.TimeoutError:
- pending = self._queue.qsize() + len(self._active_tasks)
- logger.debug("%d event(s) pending on flush", pending)
- if callback is not None:
- callback(pending, timeout)
- try:
- remaining_timeout = timeout - initial_timeout
- await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout)
- except asyncio.TimeoutError:
- pending = self._queue.qsize() + len(self._active_tasks)
- logger.error("flush timed out, dropped %s events", pending)
- def flush( # type: ignore[override]
- self, timeout: float, callback: "Optional[Any]" = None
- ) -> "Optional[asyncio.Task[None]]":
- if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running():
- with mark_sentry_task_internal():
- return self._loop.create_task(self._wait_flush(timeout, callback))
- return None
- def submit(self, callback: "Callable[[], Any]") -> bool:
- self._ensure_task()
- if self._queue is None:
- return False
- try:
- self._queue.put_nowait(callback)
- return True
- except asyncio.QueueFull:
- return False
- async def _target(self) -> None:
- if self._queue is None:
- return
- try:
- while True:
- callback = await self._queue.get()
- if callback is _TERMINATOR:
- self._queue.task_done()
- break
- # Firing tasks instead of awaiting them allows for concurrent requests
- with mark_sentry_task_internal():
- task = asyncio.create_task(self._process_callback(callback))
- # Create a strong reference to the task so it can be cancelled on kill
- # and does not get garbage collected while running
- self._active_tasks.add(task)
- # Capture queue ref at dispatch time so done callbacks use the
- # correct queue even if kill()/start() replace self._queue.
- queue_ref = self._queue
- task.add_done_callback(lambda t: self._on_task_complete(t, queue_ref))
- # Yield to let the event loop run other tasks
- await asyncio.sleep(0)
- except asyncio.CancelledError:
- pass # Expected during kill()
- async def _process_callback(self, callback: "Callable[[], Any]") -> None:
- # Callback is an async coroutine, need to await it
- await callback()
- def _on_task_complete(
- self,
- task: "asyncio.Task[None]",
- queue: "Optional[asyncio.Queue[Any]]" = None,
- ) -> None:
- try:
- task.result()
- except asyncio.CancelledError:
- pass # Task was cancelled, expected during shutdown
- except Exception:
- logger.error("Failed processing job", exc_info=True)
- finally:
- # Mark the task as done and remove it from the active tasks set
- # Use the queue reference captured at dispatch time, not self._queue,
- # to avoid calling task_done() on a different queue after kill()/start().
- if queue is not None:
- queue.task_done()
- self._active_tasks.discard(task)
|