client.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import sentry_sdk
  2. from sentry_sdk.consts import OP
  3. from sentry_sdk.integrations import DidNotEnable
  4. from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
  5. from typing import TYPE_CHECKING
  6. if TYPE_CHECKING:
  7. from typing import Any, Callable, Iterator, Iterable, Union
  8. try:
  9. import grpc
  10. from grpc import ClientCallDetails, Call
  11. from grpc._interceptor import _UnaryOutcome
  12. from grpc.aio._interceptor import UnaryStreamCall
  13. from google.protobuf.message import Message
  14. except ImportError:
  15. raise DidNotEnable("grpcio is not installed")
  16. class ClientInterceptor(
  17. grpc.UnaryUnaryClientInterceptor, # type: ignore
  18. grpc.UnaryStreamClientInterceptor, # type: ignore
  19. ):
  20. def intercept_unary_unary(
  21. self: "ClientInterceptor",
  22. continuation: "Callable[[ClientCallDetails, Message], _UnaryOutcome]",
  23. client_call_details: "ClientCallDetails",
  24. request: "Message",
  25. ) -> "_UnaryOutcome":
  26. method = client_call_details.method
  27. with sentry_sdk.start_span(
  28. op=OP.GRPC_CLIENT,
  29. name="unary unary call to %s" % method,
  30. origin=SPAN_ORIGIN,
  31. ) as span:
  32. span.set_data("type", "unary unary")
  33. span.set_data("method", method)
  34. client_call_details = self._update_client_call_details_metadata_from_scope(
  35. client_call_details
  36. )
  37. response = continuation(client_call_details, request)
  38. span.set_data("code", response.code().name)
  39. return response
  40. def intercept_unary_stream(
  41. self: "ClientInterceptor",
  42. continuation: "Callable[[ClientCallDetails, Message], Union[Iterable[Any], UnaryStreamCall]]",
  43. client_call_details: "ClientCallDetails",
  44. request: "Message",
  45. ) -> "Union[Iterator[Message], Call]":
  46. method = client_call_details.method
  47. with sentry_sdk.start_span(
  48. op=OP.GRPC_CLIENT,
  49. name="unary stream call to %s" % method,
  50. origin=SPAN_ORIGIN,
  51. ) as span:
  52. span.set_data("type", "unary stream")
  53. span.set_data("method", method)
  54. client_call_details = self._update_client_call_details_metadata_from_scope(
  55. client_call_details
  56. )
  57. response: "UnaryStreamCall" = continuation(client_call_details, request)
  58. # Setting code on unary-stream leads to execution getting stuck
  59. # span.set_data("code", response.code().name)
  60. return response
  61. @staticmethod
  62. def _update_client_call_details_metadata_from_scope(
  63. client_call_details: "ClientCallDetails",
  64. ) -> "ClientCallDetails":
  65. metadata = (
  66. list(client_call_details.metadata) if client_call_details.metadata else []
  67. )
  68. for (
  69. key,
  70. value,
  71. ) in sentry_sdk.get_current_scope().iter_trace_propagation_headers():
  72. metadata.append((key, value))
  73. client_call_details = grpc._interceptor._ClientCallDetails(
  74. method=client_call_details.method,
  75. timeout=client_call_details.timeout,
  76. metadata=metadata,
  77. credentials=client_call_details.credentials,
  78. wait_for_ready=client_call_details.wait_for_ready,
  79. compression=client_call_details.compression,
  80. )
  81. return client_call_details