client.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import sentry_sdk
  2. from sentry_sdk.consts import OP
  3. from sentry_sdk.integrations import DidNotEnable
  4. from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
  5. from typing import TYPE_CHECKING
  6. if TYPE_CHECKING:
  7. from typing import Any, Callable, Iterator, Iterable, Union
  8. try:
  9. import grpc
  10. from grpc import ClientCallDetails, Call
  11. from grpc._interceptor import _UnaryOutcome
  12. from grpc.aio._interceptor import UnaryStreamCall
  13. from google.protobuf.message import Message
  14. except ImportError:
  15. raise DidNotEnable("grpcio is not installed")
  16. class ClientInterceptor(
  17. grpc.UnaryUnaryClientInterceptor, # type: ignore
  18. grpc.UnaryStreamClientInterceptor, # type: ignore
  19. ):
  20. _is_intercepted = False
  21. def intercept_unary_unary(self, continuation, client_call_details, request):
  22. # type: (ClientInterceptor, Callable[[ClientCallDetails, Message], _UnaryOutcome], ClientCallDetails, Message) -> _UnaryOutcome
  23. method = client_call_details.method
  24. with sentry_sdk.start_span(
  25. op=OP.GRPC_CLIENT,
  26. name="unary unary call to %s" % method,
  27. origin=SPAN_ORIGIN,
  28. ) as span:
  29. span.set_data("type", "unary unary")
  30. span.set_data("method", method)
  31. client_call_details = self._update_client_call_details_metadata_from_scope(
  32. client_call_details
  33. )
  34. response = continuation(client_call_details, request)
  35. span.set_data("code", response.code().name)
  36. return response
  37. def intercept_unary_stream(self, continuation, client_call_details, request):
  38. # type: (ClientInterceptor, Callable[[ClientCallDetails, Message], Union[Iterable[Any], UnaryStreamCall]], ClientCallDetails, Message) -> Union[Iterator[Message], Call]
  39. method = client_call_details.method
  40. with sentry_sdk.start_span(
  41. op=OP.GRPC_CLIENT,
  42. name="unary stream call to %s" % method,
  43. origin=SPAN_ORIGIN,
  44. ) as span:
  45. span.set_data("type", "unary stream")
  46. span.set_data("method", method)
  47. client_call_details = self._update_client_call_details_metadata_from_scope(
  48. client_call_details
  49. )
  50. response = continuation(client_call_details, request) # type: UnaryStreamCall
  51. # Setting code on unary-stream leads to execution getting stuck
  52. # span.set_data("code", response.code().name)
  53. return response
  54. @staticmethod
  55. def _update_client_call_details_metadata_from_scope(client_call_details):
  56. # type: (ClientCallDetails) -> ClientCallDetails
  57. metadata = (
  58. list(client_call_details.metadata) if client_call_details.metadata else []
  59. )
  60. for (
  61. key,
  62. value,
  63. ) in sentry_sdk.get_current_scope().iter_trace_propagation_headers():
  64. metadata.append((key, value))
  65. client_call_details = grpc._interceptor._ClientCallDetails(
  66. method=client_call_details.method,
  67. timeout=client_call_details.timeout,
  68. metadata=metadata,
  69. credentials=client_call_details.credentials,
  70. wait_for_ready=client_call_details.wait_for_ready,
  71. compression=client_call_details.compression,
  72. )
  73. return client_call_details