_metrics_batcher.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import os
  2. import random
  3. import threading
  4. from datetime import datetime, timezone
  5. from typing import Optional, List, Callable, TYPE_CHECKING, Any, Union
  6. from sentry_sdk.utils import format_timestamp, safe_repr
  7. from sentry_sdk.envelope import Envelope, Item, PayloadRef
  8. if TYPE_CHECKING:
  9. from sentry_sdk._types import Metric
  10. class MetricsBatcher:
  11. MAX_METRICS_BEFORE_FLUSH = 100
  12. FLUSH_WAIT_TIME = 5.0
  13. def __init__(
  14. self,
  15. capture_func, # type: Callable[[Envelope], None]
  16. ):
  17. # type: (...) -> None
  18. self._metric_buffer = [] # type: List[Metric]
  19. self._capture_func = capture_func
  20. self._running = True
  21. self._lock = threading.Lock()
  22. self._flush_event = threading.Event() # type: threading.Event
  23. self._flusher = None # type: Optional[threading.Thread]
  24. self._flusher_pid = None # type: Optional[int]
  25. def _ensure_thread(self):
  26. # type: (...) -> bool
  27. if not self._running:
  28. return False
  29. pid = os.getpid()
  30. if self._flusher_pid == pid:
  31. return True
  32. with self._lock:
  33. if self._flusher_pid == pid:
  34. return True
  35. self._flusher_pid = pid
  36. self._flusher = threading.Thread(target=self._flush_loop)
  37. self._flusher.daemon = True
  38. try:
  39. self._flusher.start()
  40. except RuntimeError:
  41. self._running = False
  42. return False
  43. return True
  44. def _flush_loop(self):
  45. # type: (...) -> None
  46. while self._running:
  47. self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
  48. self._flush_event.clear()
  49. self._flush()
  50. def add(
  51. self,
  52. metric, # type: Metric
  53. ):
  54. # type: (...) -> None
  55. if not self._ensure_thread() or self._flusher is None:
  56. return None
  57. with self._lock:
  58. self._metric_buffer.append(metric)
  59. if len(self._metric_buffer) >= self.MAX_METRICS_BEFORE_FLUSH:
  60. self._flush_event.set()
  61. def kill(self):
  62. # type: (...) -> None
  63. if self._flusher is None:
  64. return
  65. self._running = False
  66. self._flush_event.set()
  67. self._flusher = None
  68. def flush(self):
  69. # type: (...) -> None
  70. self._flush()
  71. @staticmethod
  72. def _metric_to_transport_format(metric):
  73. # type: (Metric) -> Any
  74. def format_attribute(val):
  75. # type: (Union[int, float, str, bool]) -> Any
  76. if isinstance(val, bool):
  77. return {"value": val, "type": "boolean"}
  78. if isinstance(val, int):
  79. return {"value": val, "type": "integer"}
  80. if isinstance(val, float):
  81. return {"value": val, "type": "double"}
  82. if isinstance(val, str):
  83. return {"value": val, "type": "string"}
  84. return {"value": safe_repr(val), "type": "string"}
  85. res = {
  86. "timestamp": metric["timestamp"],
  87. "trace_id": metric["trace_id"],
  88. "name": metric["name"],
  89. "type": metric["type"],
  90. "value": metric["value"],
  91. "attributes": {
  92. k: format_attribute(v) for (k, v) in metric["attributes"].items()
  93. },
  94. }
  95. if metric.get("span_id") is not None:
  96. res["span_id"] = metric["span_id"]
  97. if metric.get("unit") is not None:
  98. res["unit"] = metric["unit"]
  99. return res
  100. def _flush(self):
  101. # type: (...) -> Optional[Envelope]
  102. envelope = Envelope(
  103. headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
  104. )
  105. with self._lock:
  106. if len(self._metric_buffer) == 0:
  107. return None
  108. envelope.add_item(
  109. Item(
  110. type="trace_metric",
  111. content_type="application/vnd.sentry.items.trace-metric+json",
  112. headers={
  113. "item_count": len(self._metric_buffer),
  114. },
  115. payload=PayloadRef(
  116. json={
  117. "items": [
  118. self._metric_to_transport_format(metric)
  119. for metric in self._metric_buffer
  120. ]
  121. }
  122. ),
  123. )
  124. )
  125. self._metric_buffer.clear()
  126. self._capture_func(envelope)
  127. return envelope