server.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import sentry_sdk
  2. from sentry_sdk.consts import OP
  3. from sentry_sdk.integrations import DidNotEnable
  4. from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
  5. from sentry_sdk.tracing import TransactionSource
  6. from sentry_sdk.utils import event_from_exception
  7. from typing import TYPE_CHECKING
  8. if TYPE_CHECKING:
  9. from collections.abc import Awaitable, Callable
  10. from typing import Any, Optional
  11. try:
  12. import grpc
  13. from grpc import HandlerCallDetails, RpcMethodHandler
  14. from grpc.aio import AbortError, ServicerContext
  15. except ImportError:
  16. raise DidNotEnable("grpcio is not installed")
  17. class ServerInterceptor(grpc.aio.ServerInterceptor): # type: ignore
  18. def __init__(
  19. self: "ServerInterceptor",
  20. find_name: "Callable[[ServicerContext], str] | None" = None,
  21. ) -> None:
  22. self._find_method_name = find_name or self._find_name
  23. super().__init__()
  24. async def intercept_service(
  25. self: "ServerInterceptor",
  26. continuation: "Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]]",
  27. handler_call_details: "HandlerCallDetails",
  28. ) -> "Optional[Awaitable[RpcMethodHandler]]":
  29. self._handler_call_details = handler_call_details
  30. handler = await continuation(handler_call_details)
  31. if handler is None:
  32. return None
  33. if not handler.request_streaming and not handler.response_streaming:
  34. handler_factory = grpc.unary_unary_rpc_method_handler
  35. async def wrapped(request: "Any", context: "ServicerContext") -> "Any":
  36. name = self._find_method_name(context)
  37. if not name:
  38. return await handler(request, context)
  39. # What if the headers are empty?
  40. transaction = sentry_sdk.continue_trace(
  41. dict(context.invocation_metadata()),
  42. op=OP.GRPC_SERVER,
  43. name=name,
  44. source=TransactionSource.CUSTOM,
  45. origin=SPAN_ORIGIN,
  46. )
  47. with sentry_sdk.start_transaction(transaction=transaction):
  48. try:
  49. return await handler.unary_unary(request, context)
  50. except AbortError:
  51. raise
  52. except Exception as exc:
  53. event, hint = event_from_exception(
  54. exc,
  55. mechanism={"type": "grpc", "handled": False},
  56. )
  57. sentry_sdk.capture_event(event, hint=hint)
  58. raise
  59. elif not handler.request_streaming and handler.response_streaming:
  60. handler_factory = grpc.unary_stream_rpc_method_handler
  61. async def wrapped(request: "Any", context: "ServicerContext") -> "Any": # type: ignore
  62. async for r in handler.unary_stream(request, context):
  63. yield r
  64. elif handler.request_streaming and not handler.response_streaming:
  65. handler_factory = grpc.stream_unary_rpc_method_handler
  66. async def wrapped(request: "Any", context: "ServicerContext") -> "Any":
  67. response = handler.stream_unary(request, context)
  68. return await response
  69. elif handler.request_streaming and handler.response_streaming:
  70. handler_factory = grpc.stream_stream_rpc_method_handler
  71. async def wrapped(request: "Any", context: "ServicerContext") -> "Any": # type: ignore
  72. async for r in handler.stream_stream(request, context):
  73. yield r
  74. return handler_factory(
  75. wrapped,
  76. request_deserializer=handler.request_deserializer,
  77. response_serializer=handler.response_serializer,
  78. )
  79. def _find_name(self, context: "ServicerContext") -> str:
  80. return self._handler_call_details.method