beam.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import sys
  2. import types
  3. from functools import wraps
  4. import sentry_sdk
  5. from sentry_sdk.integrations import Integration
  6. from sentry_sdk.integrations.logging import ignore_logger
  7. from sentry_sdk.utils import (
  8. capture_internal_exceptions,
  9. ensure_integration_enabled,
  10. event_from_exception,
  11. reraise,
  12. )
  13. from typing import TYPE_CHECKING
  14. if TYPE_CHECKING:
  15. from typing import Any
  16. from typing import Iterator
  17. from typing import TypeVar
  18. from typing import Callable
  19. from sentry_sdk._types import ExcInfo
  20. T = TypeVar("T")
  21. F = TypeVar("F", bound=Callable[..., Any])
  22. WRAPPED_FUNC = "_wrapped_{}_"
  23. INSPECT_FUNC = "_inspect_{}" # Required format per apache_beam/transforms/core.py
  24. USED_FUNC = "_sentry_used_"
  25. class BeamIntegration(Integration):
  26. identifier = "beam"
  27. @staticmethod
  28. def setup_once() -> None:
  29. from apache_beam.transforms.core import DoFn, ParDo # type: ignore
  30. ignore_logger("root")
  31. ignore_logger("bundle_processor.create")
  32. function_patches = ["process", "start_bundle", "finish_bundle", "setup"]
  33. for func_name in function_patches:
  34. setattr(
  35. DoFn,
  36. INSPECT_FUNC.format(func_name),
  37. _wrap_inspect_call(DoFn, func_name),
  38. )
  39. old_init = ParDo.__init__
  40. def sentry_init_pardo(
  41. self: "ParDo", fn: "Any", *args: "Any", **kwargs: "Any"
  42. ) -> "Any":
  43. # Do not monkey patch init twice
  44. if not getattr(self, "_sentry_is_patched", False):
  45. for func_name in function_patches:
  46. if not hasattr(fn, func_name):
  47. continue
  48. wrapped_func = WRAPPED_FUNC.format(func_name)
  49. # Check to see if inspect is set and process is not
  50. # to avoid monkey patching process twice.
  51. # Check to see if function is part of object for
  52. # backwards compatibility.
  53. process_func = getattr(fn, func_name)
  54. inspect_func = getattr(fn, INSPECT_FUNC.format(func_name))
  55. if not getattr(inspect_func, USED_FUNC, False) and not getattr(
  56. process_func, USED_FUNC, False
  57. ):
  58. setattr(fn, wrapped_func, process_func)
  59. setattr(fn, func_name, _wrap_task_call(process_func))
  60. self._sentry_is_patched = True
  61. old_init(self, fn, *args, **kwargs)
  62. ParDo.__init__ = sentry_init_pardo
  63. def _wrap_inspect_call(cls: "Any", func_name: "Any") -> "Any":
  64. if not hasattr(cls, func_name):
  65. return None
  66. def _inspect(self: "Any") -> "Any":
  67. """
  68. Inspect function overrides the way Beam gets argspec.
  69. """
  70. wrapped_func = WRAPPED_FUNC.format(func_name)
  71. if hasattr(self, wrapped_func):
  72. process_func = getattr(self, wrapped_func)
  73. else:
  74. process_func = getattr(self, func_name)
  75. setattr(self, func_name, _wrap_task_call(process_func))
  76. setattr(self, wrapped_func, process_func)
  77. # getfullargspec is deprecated in more recent beam versions and get_function_args_defaults
  78. # (which uses Signatures internally) should be used instead.
  79. try:
  80. from apache_beam.transforms.core import get_function_args_defaults
  81. return get_function_args_defaults(process_func)
  82. except ImportError:
  83. from apache_beam.typehints.decorators import getfullargspec # type: ignore
  84. return getfullargspec(process_func)
  85. setattr(_inspect, USED_FUNC, True)
  86. return _inspect
  87. def _wrap_task_call(func: "F") -> "F":
  88. """
  89. Wrap task call with a try catch to get exceptions.
  90. """
  91. @wraps(func)
  92. def _inner(*args: "Any", **kwargs: "Any") -> "Any":
  93. try:
  94. gen = func(*args, **kwargs)
  95. except Exception:
  96. raise_exception()
  97. if not isinstance(gen, types.GeneratorType):
  98. return gen
  99. return _wrap_generator_call(gen)
  100. setattr(_inner, USED_FUNC, True)
  101. return _inner # type: ignore
  102. @ensure_integration_enabled(BeamIntegration)
  103. def _capture_exception(exc_info: "ExcInfo") -> None:
  104. """
  105. Send Beam exception to Sentry.
  106. """
  107. client = sentry_sdk.get_client()
  108. event, hint = event_from_exception(
  109. exc_info,
  110. client_options=client.options,
  111. mechanism={"type": "beam", "handled": False},
  112. )
  113. sentry_sdk.capture_event(event, hint=hint)
  114. def raise_exception() -> None:
  115. """
  116. Raise an exception.
  117. """
  118. exc_info = sys.exc_info()
  119. with capture_internal_exceptions():
  120. _capture_exception(exc_info)
  121. reraise(*exc_info)
  122. def _wrap_generator_call(gen: "Iterator[T]") -> "Iterator[T]":
  123. """
  124. Wrap the generator to handle any failures.
  125. """
  126. while True:
  127. try:
  128. yield next(gen)
  129. except StopIteration:
  130. break
  131. except Exception:
  132. raise_exception()