ray.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import inspect
  2. import functools
  3. import sys
  4. import sentry_sdk
  5. from sentry_sdk.consts import OP, SPANSTATUS
  6. from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration
  7. from sentry_sdk.tracing import TransactionSource
  8. from sentry_sdk.utils import (
  9. event_from_exception,
  10. logger,
  11. package_version,
  12. qualname_from_function,
  13. reraise,
  14. )
  15. try:
  16. import ray # type: ignore[import-not-found]
  17. from ray import remote
  18. except ImportError:
  19. raise DidNotEnable("Ray not installed.")
  20. from typing import TYPE_CHECKING
  21. if TYPE_CHECKING:
  22. from collections.abc import Callable
  23. from typing import Any, Optional
  24. from sentry_sdk.utils import ExcInfo
  25. def _check_sentry_initialized() -> None:
  26. if sentry_sdk.get_client().is_active():
  27. return
  28. logger.debug(
  29. "[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded."
  30. )
  31. def _insert_sentry_tracing_in_signature(func: "Callable[..., Any]") -> None:
  32. # Patching new_func signature to add the _sentry_tracing parameter to it
  33. # Ray later inspects the signature and finds the unexpected parameter otherwise
  34. signature = inspect.signature(func)
  35. params = list(signature.parameters.values())
  36. sentry_tracing_param = inspect.Parameter(
  37. "_sentry_tracing",
  38. kind=inspect.Parameter.KEYWORD_ONLY,
  39. default=None,
  40. )
  41. # Keyword only arguments are penultimate if function has variadic keyword arguments
  42. if params and params[-1].kind is inspect.Parameter.VAR_KEYWORD:
  43. params.insert(-1, sentry_tracing_param)
  44. else:
  45. params.append(sentry_tracing_param)
  46. func.__signature__ = signature.replace(parameters=params) # type: ignore[attr-defined]
  47. def _patch_ray_remote() -> None:
  48. old_remote = remote
  49. @functools.wraps(old_remote)
  50. def new_remote(
  51. f: "Optional[Callable[..., Any]]" = None, *args: "Any", **kwargs: "Any"
  52. ) -> "Callable[..., Any]":
  53. if inspect.isclass(f):
  54. # Ray Actors
  55. # (https://docs.ray.io/en/latest/ray-core/actors.html)
  56. # are not supported
  57. # (Only Ray Tasks are supported)
  58. return old_remote(f, *args, **kwargs)
  59. def wrapper(user_f: "Callable[..., Any]") -> "Any":
  60. if inspect.isclass(user_f):
  61. # Ray Actors
  62. # (https://docs.ray.io/en/latest/ray-core/actors.html)
  63. # are not supported
  64. # (Only Ray Tasks are supported)
  65. return old_remote(*args, **kwargs)(user_f)
  66. @functools.wraps(user_f)
  67. def new_func(
  68. *f_args: "Any",
  69. _sentry_tracing: "Optional[dict[str, Any]]" = None,
  70. **f_kwargs: "Any",
  71. ) -> "Any":
  72. _check_sentry_initialized()
  73. transaction = sentry_sdk.continue_trace(
  74. _sentry_tracing or {},
  75. op=OP.QUEUE_TASK_RAY,
  76. name=qualname_from_function(user_f),
  77. origin=RayIntegration.origin,
  78. source=TransactionSource.TASK,
  79. )
  80. with sentry_sdk.start_transaction(transaction) as transaction:
  81. try:
  82. result = user_f(*f_args, **f_kwargs)
  83. transaction.set_status(SPANSTATUS.OK)
  84. except Exception:
  85. transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
  86. exc_info = sys.exc_info()
  87. _capture_exception(exc_info)
  88. reraise(*exc_info)
  89. return result
  90. _insert_sentry_tracing_in_signature(new_func)
  91. if f:
  92. rv = old_remote(new_func)
  93. else:
  94. rv = old_remote(*args, **kwargs)(new_func)
  95. old_remote_method = rv.remote
  96. def _remote_method_with_header_propagation(
  97. *args: "Any", **kwargs: "Any"
  98. ) -> "Any":
  99. """
  100. Ray Client
  101. """
  102. with sentry_sdk.start_span(
  103. op=OP.QUEUE_SUBMIT_RAY,
  104. name=qualname_from_function(user_f),
  105. origin=RayIntegration.origin,
  106. ) as span:
  107. tracing = {
  108. k: v
  109. for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
  110. }
  111. try:
  112. result = old_remote_method(
  113. *args, **kwargs, _sentry_tracing=tracing
  114. )
  115. span.set_status(SPANSTATUS.OK)
  116. except Exception:
  117. span.set_status(SPANSTATUS.INTERNAL_ERROR)
  118. exc_info = sys.exc_info()
  119. _capture_exception(exc_info)
  120. reraise(*exc_info)
  121. return result
  122. rv.remote = _remote_method_with_header_propagation
  123. return rv
  124. if f is not None:
  125. return wrapper(f)
  126. else:
  127. return wrapper
  128. ray.remote = new_remote
  129. def _capture_exception(exc_info: "ExcInfo", **kwargs: "Any") -> None:
  130. client = sentry_sdk.get_client()
  131. event, hint = event_from_exception(
  132. exc_info,
  133. client_options=client.options,
  134. mechanism={
  135. "handled": False,
  136. "type": RayIntegration.identifier,
  137. },
  138. )
  139. sentry_sdk.capture_event(event, hint=hint)
  140. class RayIntegration(Integration):
  141. identifier = "ray"
  142. origin = f"auto.queue.{identifier}"
  143. @staticmethod
  144. def setup_once() -> None:
  145. version = package_version("ray")
  146. _check_minimum_version(RayIntegration, version)
  147. _patch_ray_remote()