asyncio.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. import sys
  2. import functools
  3. import sentry_sdk
  4. from sentry_sdk.consts import OP
  5. from sentry_sdk.integrations import Integration, DidNotEnable
  6. from sentry_sdk.integrations._wsgi_common import nullcontext
  7. from sentry_sdk.utils import (
  8. event_from_exception,
  9. logger,
  10. reraise,
  11. is_internal_task,
  12. )
  13. from sentry_sdk.transport import AsyncHttpTransport
  14. try:
  15. import asyncio
  16. from asyncio.tasks import Task
  17. except ImportError:
  18. raise DidNotEnable("asyncio not available")
  19. from typing import TYPE_CHECKING
  20. if TYPE_CHECKING:
  21. from typing import Any, Callable, TypeVar
  22. from collections.abc import Coroutine
  23. from sentry_sdk._types import ExcInfo
  24. T = TypeVar("T", bound=Callable[..., Any])
  25. def get_name(coro: "Any") -> str:
  26. return (
  27. getattr(coro, "__qualname__", None)
  28. or getattr(coro, "__name__", None)
  29. or "coroutine without __name__"
  30. )
  31. def _wrap_coroutine(wrapped: "Coroutine[Any, Any, Any]") -> "Callable[[T], T]":
  32. # Only __name__ and __qualname__ are copied from function to coroutine in CPython
  33. return functools.partial(
  34. functools.update_wrapper,
  35. wrapped=wrapped, # type: ignore
  36. assigned=("__name__", "__qualname__"),
  37. updated=(),
  38. )
  39. def patch_loop_close() -> None:
  40. """Patch loop.close to flush pending events before shutdown."""
  41. # Atexit shutdown hook happens after the event loop is closed.
  42. # Therefore, it is necessary to patch the loop.close method to ensure
  43. # that pending events are flushed before the interpreter shuts down.
  44. try:
  45. loop = asyncio.get_running_loop()
  46. except RuntimeError:
  47. # No running loop → cannot patch now
  48. return
  49. if getattr(loop, "_sentry_flush_patched", False):
  50. return
  51. async def _flush() -> None:
  52. client = sentry_sdk.get_client()
  53. if not client.is_active():
  54. return
  55. try:
  56. if not isinstance(client.transport, AsyncHttpTransport):
  57. return
  58. await client.close_async()
  59. except Exception:
  60. logger.warning("Sentry flush failed during loop shutdown", exc_info=True)
  61. orig_close = loop.close
  62. def _patched_close() -> None:
  63. try:
  64. loop.run_until_complete(_flush())
  65. except Exception:
  66. logger.debug(
  67. "Could not flush Sentry events during loop close", exc_info=True
  68. )
  69. finally:
  70. orig_close()
  71. loop.close = _patched_close # type: ignore
  72. loop._sentry_flush_patched = True # type: ignore
  73. def _create_task_with_factory(
  74. orig_task_factory: "Any",
  75. loop: "asyncio.AbstractEventLoop",
  76. coro: "Coroutine[Any, Any, Any]",
  77. **kwargs: "Any",
  78. ) -> "asyncio.Task[Any]":
  79. task = None
  80. # Trying to use user set task factory (if there is one)
  81. if orig_task_factory:
  82. task = orig_task_factory(loop, coro, **kwargs)
  83. if task is None:
  84. # The default task factory in `asyncio` does not have its own function
  85. # but is just a couple of lines in `asyncio.base_events.create_task()`
  86. # Those lines are copied here.
  87. # WARNING:
  88. # If the default behavior of the task creation in asyncio changes,
  89. # this will break!
  90. task = Task(coro, loop=loop, **kwargs)
  91. if task._source_traceback: # type: ignore
  92. del task._source_traceback[-1] # type: ignore
  93. return task
  94. def patch_asyncio() -> None:
  95. orig_task_factory = None
  96. try:
  97. loop = asyncio.get_running_loop()
  98. orig_task_factory = loop.get_task_factory()
  99. # Check if already patched
  100. if getattr(orig_task_factory, "_is_sentry_task_factory", False):
  101. return
  102. def _sentry_task_factory(
  103. loop: "asyncio.AbstractEventLoop",
  104. coro: "Coroutine[Any, Any, Any]",
  105. **kwargs: "Any",
  106. ) -> "asyncio.Future[Any]":
  107. # Check if this is an internal Sentry task
  108. if is_internal_task():
  109. return _create_task_with_factory(
  110. orig_task_factory, loop, coro, **kwargs
  111. )
  112. @_wrap_coroutine(coro)
  113. async def _task_with_sentry_span_creation() -> "Any":
  114. result = None
  115. integration = sentry_sdk.get_client().get_integration(
  116. AsyncioIntegration
  117. )
  118. task_spans = integration.task_spans if integration else False
  119. with sentry_sdk.isolation_scope():
  120. with (
  121. sentry_sdk.start_span(
  122. op=OP.FUNCTION,
  123. name=get_name(coro),
  124. origin=AsyncioIntegration.origin,
  125. )
  126. if task_spans
  127. else nullcontext()
  128. ):
  129. try:
  130. result = await coro
  131. except StopAsyncIteration as e:
  132. raise e from None
  133. except Exception:
  134. reraise(*_capture_exception())
  135. return result
  136. task = _create_task_with_factory(
  137. orig_task_factory, loop, _task_with_sentry_span_creation(), **kwargs
  138. )
  139. # Set the task name to include the original coroutine's name
  140. try:
  141. task.set_name(f"{get_name(coro)} (Sentry-wrapped)")
  142. except AttributeError:
  143. # set_name might not be available in all Python versions
  144. pass
  145. return task
  146. _sentry_task_factory._is_sentry_task_factory = True # type: ignore
  147. loop.set_task_factory(_sentry_task_factory) # type: ignore
  148. except RuntimeError:
  149. # When there is no running loop, we have nothing to patch.
  150. logger.warning(
  151. "There is no running asyncio loop so there is nothing Sentry can patch. "
  152. "Please make sure you call sentry_sdk.init() within a running "
  153. "asyncio loop for the AsyncioIntegration to work. "
  154. "See https://docs.sentry.io/platforms/python/integrations/asyncio/"
  155. )
  156. def _capture_exception() -> "ExcInfo":
  157. exc_info = sys.exc_info()
  158. client = sentry_sdk.get_client()
  159. integration = client.get_integration(AsyncioIntegration)
  160. if integration is not None:
  161. event, hint = event_from_exception(
  162. exc_info,
  163. client_options=client.options,
  164. mechanism={"type": "asyncio", "handled": False},
  165. )
  166. sentry_sdk.capture_event(event, hint=hint)
  167. return exc_info
  168. class AsyncioIntegration(Integration):
  169. identifier = "asyncio"
  170. origin = f"auto.function.{identifier}"
  171. def __init__(self, task_spans: bool = True) -> None:
  172. self.task_spans = task_spans
  173. @staticmethod
  174. def setup_once() -> None:
  175. patch_asyncio()
  176. patch_loop_close()
  177. def enable_asyncio_integration(*args: "Any", **kwargs: "Any") -> None:
  178. """
  179. Enable AsyncioIntegration with the provided options.
  180. This is useful in scenarios where Sentry needs to be initialized before
  181. an event loop is set up, but you still want to instrument asyncio once there
  182. is an event loop. In that case, you can sentry_sdk.init() early on without
  183. the AsyncioIntegration and then, once the event loop has been set up,
  184. execute:
  185. ```python
  186. from sentry_sdk.integrations.asyncio import enable_asyncio_integration
  187. async def async_entrypoint():
  188. enable_asyncio_integration()
  189. ```
  190. Any arguments provided will be passed to AsyncioIntegration() as is.
  191. If AsyncioIntegration has already patched the current event loop, this
  192. function won't have any effect.
  193. If AsyncioIntegration was provided in
  194. sentry_sdk.init(disabled_integrations=[...]), this function will ignore that
  195. and the integration will be enabled.
  196. """
  197. client = sentry_sdk.get_client()
  198. if not client.is_active():
  199. return
  200. # This function purposefully bypasses the integration machinery in
  201. # integrations/__init__.py. _installed_integrations/_processed_integrations
  202. # is used to prevent double patching the same module, but in the case of
  203. # the AsyncioIntegration, we don't monkeypatch the standard library directly,
  204. # we patch the currently running event loop, and we keep the record of doing
  205. # that on the loop itself.
  206. logger.debug("Setting up integration asyncio")
  207. integration = AsyncioIntegration(*args, **kwargs)
  208. integration.setup_once()
  209. if "asyncio" not in client.integrations:
  210. client.integrations["asyncio"] = integration