threading.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. import sys
  2. import warnings
  3. from functools import wraps
  4. from threading import Thread, current_thread
  5. from concurrent.futures import ThreadPoolExecutor, Future
  6. import sentry_sdk
  7. from sentry_sdk.integrations import Integration
  8. from sentry_sdk.scope import use_isolation_scope, use_scope
  9. from sentry_sdk.utils import (
  10. event_from_exception,
  11. capture_internal_exceptions,
  12. logger,
  13. reraise,
  14. )
  15. from typing import TYPE_CHECKING
  16. if TYPE_CHECKING:
  17. from typing import Any
  18. from typing import TypeVar
  19. from typing import Callable
  20. from typing import Optional
  21. from sentry_sdk._types import ExcInfo
  22. F = TypeVar("F", bound=Callable[..., Any])
  23. T = TypeVar("T", bound=Any)
  24. class ThreadingIntegration(Integration):
  25. identifier = "threading"
  26. def __init__(
  27. self, propagate_hub: "Optional[bool]" = None, propagate_scope: bool = True
  28. ) -> None:
  29. if propagate_hub is not None:
  30. logger.warning(
  31. "Deprecated: propagate_hub is deprecated. This will be removed in the future."
  32. )
  33. # Note: propagate_hub did not have any effect on propagation of scope data
  34. # scope data was always propagated no matter what the value of propagate_hub was
  35. # This is why the default for propagate_scope is True
  36. self.propagate_scope = propagate_scope
  37. if propagate_hub is not None:
  38. self.propagate_scope = propagate_hub
  39. @staticmethod
  40. def setup_once() -> None:
  41. old_start = Thread.start
  42. try:
  43. from django import VERSION as django_version # noqa: N811
  44. except ImportError:
  45. django_version = None
  46. try:
  47. import channels # type: ignore[import-untyped]
  48. channels_version = channels.__version__
  49. except (ImportError, AttributeError):
  50. channels_version = None
  51. is_async_emulated_with_threads = (
  52. sys.version_info < (3, 9)
  53. and channels_version is not None
  54. and channels_version < "4.0.0"
  55. and django_version is not None
  56. and django_version >= (3, 0)
  57. and django_version < (4, 0)
  58. )
  59. @wraps(old_start)
  60. def sentry_start(self: "Thread", *a: "Any", **kw: "Any") -> "Any":
  61. integration = sentry_sdk.get_client().get_integration(ThreadingIntegration)
  62. if integration is None:
  63. return old_start(self, *a, **kw)
  64. if integration.propagate_scope:
  65. if is_async_emulated_with_threads:
  66. warnings.warn(
  67. "There is a known issue with Django channels 2.x and 3.x when using Python 3.8 or older. "
  68. "(Async support is emulated using threads and some Sentry data may be leaked between those threads.) "
  69. "Please either upgrade to Django channels 4.0+, use Django's async features "
  70. "available in Django 3.1+ instead of Django channels, or upgrade to Python 3.9+.",
  71. stacklevel=2,
  72. )
  73. isolation_scope = sentry_sdk.get_isolation_scope()
  74. current_scope = sentry_sdk.get_current_scope()
  75. else:
  76. isolation_scope = sentry_sdk.get_isolation_scope().fork()
  77. current_scope = sentry_sdk.get_current_scope().fork()
  78. else:
  79. isolation_scope = None
  80. current_scope = None
  81. # Patching instance methods in `start()` creates a reference cycle if
  82. # done in a naive way. See
  83. # https://github.com/getsentry/sentry-python/pull/434
  84. #
  85. # In threading module, using current_thread API will access current thread instance
  86. # without holding it to avoid a reference cycle in an easier way.
  87. with capture_internal_exceptions():
  88. new_run = _wrap_run(
  89. isolation_scope,
  90. current_scope,
  91. getattr(self.run, "__func__", self.run),
  92. )
  93. self.run = new_run # type: ignore
  94. return old_start(self, *a, **kw)
  95. Thread.start = sentry_start # type: ignore
  96. ThreadPoolExecutor.submit = _wrap_threadpool_executor_submit( # type: ignore
  97. ThreadPoolExecutor.submit, is_async_emulated_with_threads
  98. )
  99. def _wrap_run(
  100. isolation_scope_to_use: "Optional[sentry_sdk.Scope]",
  101. current_scope_to_use: "Optional[sentry_sdk.Scope]",
  102. old_run_func: "F",
  103. ) -> "F":
  104. @wraps(old_run_func)
  105. def run(*a: "Any", **kw: "Any") -> "Any":
  106. def _run_old_run_func() -> "Any":
  107. try:
  108. self = current_thread()
  109. return old_run_func(self, *a[1:], **kw)
  110. except Exception:
  111. reraise(*_capture_exception())
  112. if isolation_scope_to_use is not None and current_scope_to_use is not None:
  113. with use_isolation_scope(isolation_scope_to_use):
  114. with use_scope(current_scope_to_use):
  115. return _run_old_run_func()
  116. else:
  117. return _run_old_run_func()
  118. return run # type: ignore
  119. def _wrap_threadpool_executor_submit(
  120. func: "Callable[..., Future[T]]", is_async_emulated_with_threads: bool
  121. ) -> "Callable[..., Future[T]]":
  122. """
  123. Wrap submit call to propagate scopes on task submission.
  124. """
  125. @wraps(func)
  126. def sentry_submit(
  127. self: "ThreadPoolExecutor",
  128. fn: "Callable[..., T]",
  129. *args: "Any",
  130. **kwargs: "Any",
  131. ) -> "Future[T]":
  132. integration = sentry_sdk.get_client().get_integration(ThreadingIntegration)
  133. if integration is None:
  134. return func(self, fn, *args, **kwargs)
  135. if integration.propagate_scope and is_async_emulated_with_threads:
  136. isolation_scope = sentry_sdk.get_isolation_scope()
  137. current_scope = sentry_sdk.get_current_scope()
  138. elif integration.propagate_scope:
  139. isolation_scope = sentry_sdk.get_isolation_scope().fork()
  140. current_scope = sentry_sdk.get_current_scope().fork()
  141. else:
  142. isolation_scope = None
  143. current_scope = None
  144. def wrapped_fn(*args: "Any", **kwargs: "Any") -> "Any":
  145. if isolation_scope is not None and current_scope is not None:
  146. with use_isolation_scope(isolation_scope):
  147. with use_scope(current_scope):
  148. return fn(*args, **kwargs)
  149. return fn(*args, **kwargs)
  150. return func(self, wrapped_fn, *args, **kwargs)
  151. return sentry_submit
  152. def _capture_exception() -> "ExcInfo":
  153. exc_info = sys.exc_info()
  154. client = sentry_sdk.get_client()
  155. if client.get_integration(ThreadingIntegration) is not None:
  156. event, hint = event_from_exception(
  157. exc_info,
  158. client_options=client.options,
  159. mechanism={"type": "threading", "handled": False},
  160. )
  161. sentry_sdk.capture_event(event, hint=hint)
  162. return exc_info