pure_eval.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import ast
  2. import sentry_sdk
  3. from sentry_sdk import serializer
  4. from sentry_sdk.integrations import Integration, DidNotEnable
  5. from sentry_sdk.scope import add_global_event_processor
  6. from sentry_sdk.utils import walk_exception_chain, iter_stacks
  7. from typing import TYPE_CHECKING
  8. if TYPE_CHECKING:
  9. from typing import Optional, Dict, Any, Tuple, List
  10. from types import FrameType
  11. from sentry_sdk._types import Event, Hint
  12. try:
  13. from executing import Source
  14. except ImportError:
  15. raise DidNotEnable("executing is not installed")
  16. try:
  17. from pure_eval import Evaluator
  18. except ImportError:
  19. raise DidNotEnable("pure_eval is not installed")
  20. try:
  21. # Used implicitly, just testing it's available
  22. import asttokens # noqa
  23. except ImportError:
  24. raise DidNotEnable("asttokens is not installed")
  25. class PureEvalIntegration(Integration):
  26. identifier = "pure_eval"
  27. @staticmethod
  28. def setup_once() -> None:
  29. @add_global_event_processor
  30. def add_executing_info(
  31. event: "Event", hint: "Optional[Hint]"
  32. ) -> "Optional[Event]":
  33. if sentry_sdk.get_client().get_integration(PureEvalIntegration) is None:
  34. return event
  35. if hint is None:
  36. return event
  37. exc_info = hint.get("exc_info", None)
  38. if exc_info is None:
  39. return event
  40. exception = event.get("exception", None)
  41. if exception is None:
  42. return event
  43. values = exception.get("values", None)
  44. if values is None:
  45. return event
  46. for exception, (_exc_type, _exc_value, exc_tb) in zip(
  47. reversed(values), walk_exception_chain(exc_info)
  48. ):
  49. sentry_frames = [
  50. frame
  51. for frame in exception.get("stacktrace", {}).get("frames", [])
  52. if frame.get("function")
  53. ]
  54. tbs = list(iter_stacks(exc_tb))
  55. if len(sentry_frames) != len(tbs):
  56. continue
  57. for sentry_frame, tb in zip(sentry_frames, tbs):
  58. sentry_frame["vars"] = (
  59. pure_eval_frame(tb.tb_frame) or sentry_frame["vars"]
  60. )
  61. return event
  62. def pure_eval_frame(frame: "FrameType") -> "Dict[str, Any]":
  63. source = Source.for_frame(frame)
  64. if not source.tree:
  65. return {}
  66. statements = source.statements_at_line(frame.f_lineno)
  67. if not statements:
  68. return {}
  69. scope = stmt = list(statements)[0]
  70. while True:
  71. # Get the parent first in case the original statement is already
  72. # a function definition, e.g. if we're calling a decorator
  73. # In that case we still want the surrounding scope, not that function
  74. scope = scope.parent
  75. if isinstance(scope, (ast.FunctionDef, ast.ClassDef, ast.Module)):
  76. break
  77. evaluator = Evaluator.from_frame(frame)
  78. expressions = evaluator.interesting_expressions_grouped(scope)
  79. def closeness(expression: "Tuple[List[Any], Any]") -> "Tuple[int, int]":
  80. # Prioritise expressions with a node closer to the statement executed
  81. # without being after that statement
  82. # A higher return value is better - the expression will appear
  83. # earlier in the list of values and is less likely to be trimmed
  84. nodes, _value = expression
  85. def start(n: "ast.expr") -> "Tuple[int, int]":
  86. return (n.lineno, n.col_offset)
  87. nodes_before_stmt = [
  88. node
  89. for node in nodes
  90. if start(node) < stmt.last_token.end # type: ignore
  91. ]
  92. if nodes_before_stmt:
  93. # The position of the last node before or in the statement
  94. return max(start(node) for node in nodes_before_stmt)
  95. else:
  96. # The position of the first node after the statement
  97. # Negative means it's always lower priority than nodes that come before
  98. # Less negative means closer to the statement and higher priority
  99. lineno, col_offset = min(start(node) for node in nodes)
  100. return (-lineno, -col_offset)
  101. # This adds the first_token and last_token attributes to nodes
  102. atok = source.asttokens()
  103. expressions.sort(key=closeness, reverse=True)
  104. vars = {
  105. atok.get_text(nodes[0]): value
  106. for nodes, value in expressions[: serializer.MAX_DATABAG_BREADTH]
  107. }
  108. return serializer.serialize(vars, is_vars=True)