client.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. from typing import Callable, Union, AsyncIterable, Any
  2. import sentry_sdk
  3. from sentry_sdk.consts import OP
  4. from sentry_sdk.integrations import DidNotEnable
  5. from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
  6. try:
  7. from grpc.aio import (
  8. UnaryUnaryClientInterceptor,
  9. UnaryStreamClientInterceptor,
  10. ClientCallDetails,
  11. UnaryUnaryCall,
  12. UnaryStreamCall,
  13. Metadata,
  14. )
  15. from google.protobuf.message import Message
  16. except ImportError:
  17. raise DidNotEnable("grpcio is not installed")
  18. class ClientInterceptor:
  19. @staticmethod
  20. def _update_client_call_details_metadata_from_scope(
  21. client_call_details: "ClientCallDetails",
  22. ) -> "ClientCallDetails":
  23. if client_call_details.metadata is None:
  24. client_call_details = client_call_details._replace(metadata=Metadata())
  25. elif not isinstance(client_call_details.metadata, Metadata):
  26. # This is a workaround for a GRPC bug, which was fixed in grpcio v1.60.0
  27. # See https://github.com/grpc/grpc/issues/34298.
  28. client_call_details = client_call_details._replace(
  29. metadata=Metadata.from_tuple(client_call_details.metadata)
  30. )
  31. for (
  32. key,
  33. value,
  34. ) in sentry_sdk.get_current_scope().iter_trace_propagation_headers():
  35. client_call_details.metadata.add(key, value)
  36. return client_call_details
  37. class SentryUnaryUnaryClientInterceptor(ClientInterceptor, UnaryUnaryClientInterceptor): # type: ignore
  38. async def intercept_unary_unary(
  39. self,
  40. continuation: "Callable[[ClientCallDetails, Message], UnaryUnaryCall]",
  41. client_call_details: "ClientCallDetails",
  42. request: "Message",
  43. ) -> "Union[UnaryUnaryCall, Message]":
  44. method = client_call_details.method
  45. with sentry_sdk.start_span(
  46. op=OP.GRPC_CLIENT,
  47. name="unary unary call to %s" % method.decode(),
  48. origin=SPAN_ORIGIN,
  49. ) as span:
  50. span.set_data("type", "unary unary")
  51. span.set_data("method", method)
  52. client_call_details = self._update_client_call_details_metadata_from_scope(
  53. client_call_details
  54. )
  55. response = await continuation(client_call_details, request)
  56. status_code = await response.code()
  57. span.set_data("code", status_code.name)
  58. return response
  59. class SentryUnaryStreamClientInterceptor(
  60. ClientInterceptor,
  61. UnaryStreamClientInterceptor, # type: ignore
  62. ):
  63. async def intercept_unary_stream(
  64. self,
  65. continuation: "Callable[[ClientCallDetails, Message], UnaryStreamCall]",
  66. client_call_details: "ClientCallDetails",
  67. request: "Message",
  68. ) -> "Union[AsyncIterable[Any], UnaryStreamCall]":
  69. method = client_call_details.method
  70. with sentry_sdk.start_span(
  71. op=OP.GRPC_CLIENT,
  72. name="unary stream call to %s" % method.decode(),
  73. origin=SPAN_ORIGIN,
  74. ) as span:
  75. span.set_data("type", "unary stream")
  76. span.set_data("method", method)
  77. client_call_details = self._update_client_call_details_metadata_from_scope(
  78. client_call_details
  79. )
  80. response = await continuation(client_call_details, request)
  81. # status_code = await response.code()
  82. # span.set_data("code", status_code)
  83. return response