clickhouse_driver.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import sentry_sdk
  2. from sentry_sdk.consts import OP, SPANDATA
  3. from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable
  4. from sentry_sdk.tracing import Span
  5. from sentry_sdk.scope import should_send_default_pii
  6. from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled
  7. from typing import TYPE_CHECKING, TypeVar
  8. # Hack to get new Python features working in older versions
  9. # without introducing a hard dependency on `typing_extensions`
  10. # from: https://stackoverflow.com/a/71944042/300572
  11. if TYPE_CHECKING:
  12. from collections.abc import Iterator
  13. from typing import Any, ParamSpec, Callable
  14. else:
  15. # Fake ParamSpec
  16. class ParamSpec:
  17. def __init__(self, _):
  18. self.args = None
  19. self.kwargs = None
  20. # Callable[anything] will return None
  21. class _Callable:
  22. def __getitem__(self, _):
  23. return None
  24. # Make instances
  25. Callable = _Callable()
  26. try:
  27. import clickhouse_driver # type: ignore[import-not-found]
  28. except ImportError:
  29. raise DidNotEnable("clickhouse-driver not installed.")
  30. class ClickhouseDriverIntegration(Integration):
  31. identifier = "clickhouse_driver"
  32. origin = f"auto.db.{identifier}"
  33. @staticmethod
  34. def setup_once() -> None:
  35. _check_minimum_version(ClickhouseDriverIntegration, clickhouse_driver.VERSION)
  36. # Every query is done using the Connection's `send_query` function
  37. clickhouse_driver.connection.Connection.send_query = _wrap_start(
  38. clickhouse_driver.connection.Connection.send_query
  39. )
  40. # If the query contains parameters then the send_data function is used to send those parameters to clickhouse
  41. _wrap_send_data()
  42. # Every query ends either with the Client's `receive_end_of_query` (no result expected)
  43. # or its `receive_result` (result expected)
  44. clickhouse_driver.client.Client.receive_end_of_query = _wrap_end(
  45. clickhouse_driver.client.Client.receive_end_of_query
  46. )
  47. if hasattr(clickhouse_driver.client.Client, "receive_end_of_insert_query"):
  48. # In 0.2.7, insert queries are handled separately via `receive_end_of_insert_query`
  49. clickhouse_driver.client.Client.receive_end_of_insert_query = _wrap_end(
  50. clickhouse_driver.client.Client.receive_end_of_insert_query
  51. )
  52. clickhouse_driver.client.Client.receive_result = _wrap_end(
  53. clickhouse_driver.client.Client.receive_result
  54. )
  55. P = ParamSpec("P")
  56. T = TypeVar("T")
  57. def _wrap_start(f: Callable[P, T]) -> Callable[P, T]:
  58. @ensure_integration_enabled(ClickhouseDriverIntegration, f)
  59. def _inner(*args: P.args, **kwargs: P.kwargs) -> T:
  60. connection = args[0]
  61. query = args[1]
  62. query_id = args[2] if len(args) > 2 else kwargs.get("query_id")
  63. params = args[3] if len(args) > 3 else kwargs.get("params")
  64. span = sentry_sdk.start_span(
  65. op=OP.DB,
  66. name=query,
  67. origin=ClickhouseDriverIntegration.origin,
  68. )
  69. connection._sentry_span = span # type: ignore[attr-defined]
  70. _set_db_data(span, connection)
  71. span.set_data("query", query)
  72. if query_id:
  73. span.set_data("db.query_id", query_id)
  74. if params and should_send_default_pii():
  75. span.set_data("db.params", params)
  76. # run the original code
  77. ret = f(*args, **kwargs)
  78. return ret
  79. return _inner
  80. def _wrap_end(f: Callable[P, T]) -> Callable[P, T]:
  81. def _inner_end(*args: P.args, **kwargs: P.kwargs) -> T:
  82. res = f(*args, **kwargs)
  83. instance = args[0]
  84. span = getattr(instance.connection, "_sentry_span", None) # type: ignore[attr-defined]
  85. if span is not None:
  86. if res is not None and should_send_default_pii():
  87. span.set_data("db.result", res)
  88. with capture_internal_exceptions():
  89. span.scope.add_breadcrumb(
  90. message=span._data.pop("query"), category="query", data=span._data
  91. )
  92. span.finish()
  93. return res
  94. return _inner_end
  95. def _wrap_send_data() -> None:
  96. original_send_data = clickhouse_driver.client.Client.send_data
  97. def _inner_send_data( # type: ignore[no-untyped-def] # clickhouse-driver does not type send_data
  98. self, sample_block, data, types_check=False, columnar=False, *args, **kwargs
  99. ):
  100. span = getattr(self.connection, "_sentry_span", None)
  101. if span is not None:
  102. _set_db_data(span, self.connection)
  103. if should_send_default_pii():
  104. db_params = span._data.get("db.params", [])
  105. if isinstance(data, (list, tuple)):
  106. db_params.extend(data)
  107. else: # data is a generic iterator
  108. orig_data = data
  109. # Wrap the generator to add items to db.params as they are yielded.
  110. # This allows us to send the params to Sentry without needing to allocate
  111. # memory for the entire generator at once.
  112. def wrapped_generator() -> "Iterator[Any]":
  113. for item in orig_data:
  114. db_params.append(item)
  115. yield item
  116. # Replace the original iterator with the wrapped one.
  117. data = wrapped_generator()
  118. span.set_data("db.params", db_params)
  119. return original_send_data(
  120. self, sample_block, data, types_check, columnar, *args, **kwargs
  121. )
  122. clickhouse_driver.client.Client.send_data = _inner_send_data
  123. def _set_db_data(
  124. span: Span, connection: clickhouse_driver.connection.Connection
  125. ) -> None:
  126. span.set_data(SPANDATA.DB_SYSTEM, "clickhouse")
  127. span.set_data(SPANDATA.SERVER_ADDRESS, connection.host)
  128. span.set_data(SPANDATA.SERVER_PORT, connection.port)
  129. span.set_data(SPANDATA.DB_NAME, connection.database)
  130. span.set_data(SPANDATA.DB_USER, connection.user)