_log_batcher.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import os
  2. import random
  3. import threading
  4. from datetime import datetime, timezone
  5. from typing import Optional, List, Callable, TYPE_CHECKING, Any
  6. from sentry_sdk.utils import format_timestamp, safe_repr
  7. from sentry_sdk.envelope import Envelope, Item, PayloadRef
  8. if TYPE_CHECKING:
  9. from sentry_sdk._types import Log
  10. class LogBatcher:
  11. MAX_LOGS_BEFORE_FLUSH = 100
  12. FLUSH_WAIT_TIME = 5.0
  13. def __init__(
  14. self,
  15. capture_func, # type: Callable[[Envelope], None]
  16. ):
  17. # type: (...) -> None
  18. self._log_buffer = [] # type: List[Log]
  19. self._capture_func = capture_func
  20. self._running = True
  21. self._lock = threading.Lock()
  22. self._flush_event = threading.Event() # type: threading.Event
  23. self._flusher = None # type: Optional[threading.Thread]
  24. self._flusher_pid = None # type: Optional[int]
  25. def _ensure_thread(self):
  26. # type: (...) -> bool
  27. """For forking processes we might need to restart this thread.
  28. This ensures that our process actually has that thread running.
  29. """
  30. if not self._running:
  31. return False
  32. pid = os.getpid()
  33. if self._flusher_pid == pid:
  34. return True
  35. with self._lock:
  36. # Recheck to make sure another thread didn't get here and start the
  37. # the flusher in the meantime
  38. if self._flusher_pid == pid:
  39. return True
  40. self._flusher_pid = pid
  41. self._flusher = threading.Thread(target=self._flush_loop)
  42. self._flusher.daemon = True
  43. try:
  44. self._flusher.start()
  45. except RuntimeError:
  46. # Unfortunately at this point the interpreter is in a state that no
  47. # longer allows us to spawn a thread and we have to bail.
  48. self._running = False
  49. return False
  50. return True
  51. def _flush_loop(self):
  52. # type: (...) -> None
  53. while self._running:
  54. self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
  55. self._flush_event.clear()
  56. self._flush()
  57. def add(
  58. self,
  59. log, # type: Log
  60. ):
  61. # type: (...) -> None
  62. if not self._ensure_thread() or self._flusher is None:
  63. return None
  64. with self._lock:
  65. self._log_buffer.append(log)
  66. if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_FLUSH:
  67. self._flush_event.set()
  68. def kill(self):
  69. # type: (...) -> None
  70. if self._flusher is None:
  71. return
  72. self._running = False
  73. self._flush_event.set()
  74. self._flusher = None
  75. def flush(self):
  76. # type: (...) -> None
  77. self._flush()
  78. @staticmethod
  79. def _log_to_transport_format(log):
  80. # type: (Log) -> Any
  81. def format_attribute(val):
  82. # type: (int | float | str | bool) -> Any
  83. if isinstance(val, bool):
  84. return {"value": val, "type": "boolean"}
  85. if isinstance(val, int):
  86. return {"value": val, "type": "integer"}
  87. if isinstance(val, float):
  88. return {"value": val, "type": "double"}
  89. if isinstance(val, str):
  90. return {"value": val, "type": "string"}
  91. return {"value": safe_repr(val), "type": "string"}
  92. if "sentry.severity_number" not in log["attributes"]:
  93. log["attributes"]["sentry.severity_number"] = log["severity_number"]
  94. if "sentry.severity_text" not in log["attributes"]:
  95. log["attributes"]["sentry.severity_text"] = log["severity_text"]
  96. res = {
  97. "timestamp": int(log["time_unix_nano"]) / 1.0e9,
  98. "trace_id": log.get("trace_id", "00000000-0000-0000-0000-000000000000"),
  99. "level": str(log["severity_text"]),
  100. "body": str(log["body"]),
  101. "attributes": {
  102. k: format_attribute(v) for (k, v) in log["attributes"].items()
  103. },
  104. }
  105. return res
  106. def _flush(self):
  107. # type: (...) -> Optional[Envelope]
  108. envelope = Envelope(
  109. headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
  110. )
  111. with self._lock:
  112. if len(self._log_buffer) == 0:
  113. return None
  114. envelope.add_item(
  115. Item(
  116. type="log",
  117. content_type="application/vnd.sentry.items.log+json",
  118. headers={
  119. "item_count": len(self._log_buffer),
  120. },
  121. payload=PayloadRef(
  122. json={
  123. "items": [
  124. self._log_to_transport_format(log)
  125. for log in self._log_buffer
  126. ]
  127. }
  128. ),
  129. )
  130. )
  131. self._log_buffer.clear()
  132. self._capture_func(envelope)
  133. return envelope