worker.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. import os
  2. import threading
  3. from time import sleep, time
  4. from sentry_sdk._queue import Queue, FullError
  5. from sentry_sdk.utils import logger
  6. from sentry_sdk.consts import DEFAULT_QUEUE_SIZE
  7. from typing import TYPE_CHECKING
  8. if TYPE_CHECKING:
  9. from typing import Any
  10. from typing import Optional
  11. from typing import Callable
  12. _TERMINATOR = object()
  13. class BackgroundWorker:
  14. def __init__(self, queue_size=DEFAULT_QUEUE_SIZE):
  15. # type: (int) -> None
  16. self._queue = Queue(queue_size) # type: Queue
  17. self._lock = threading.Lock()
  18. self._thread = None # type: Optional[threading.Thread]
  19. self._thread_for_pid = None # type: Optional[int]
  20. @property
  21. def is_alive(self):
  22. # type: () -> bool
  23. if self._thread_for_pid != os.getpid():
  24. return False
  25. if not self._thread:
  26. return False
  27. return self._thread.is_alive()
  28. def _ensure_thread(self):
  29. # type: () -> None
  30. if not self.is_alive:
  31. self.start()
  32. def _timed_queue_join(self, timeout):
  33. # type: (float) -> bool
  34. deadline = time() + timeout
  35. queue = self._queue
  36. queue.all_tasks_done.acquire()
  37. try:
  38. while queue.unfinished_tasks:
  39. delay = deadline - time()
  40. if delay <= 0:
  41. return False
  42. queue.all_tasks_done.wait(timeout=delay)
  43. return True
  44. finally:
  45. queue.all_tasks_done.release()
  46. def start(self):
  47. # type: () -> None
  48. with self._lock:
  49. if not self.is_alive:
  50. self._thread = threading.Thread(
  51. target=self._target, name="sentry-sdk.BackgroundWorker"
  52. )
  53. self._thread.daemon = True
  54. try:
  55. self._thread.start()
  56. self._thread_for_pid = os.getpid()
  57. except RuntimeError:
  58. # At this point we can no longer start because the interpreter
  59. # is already shutting down. Sadly at this point we can no longer
  60. # send out events.
  61. self._thread = None
  62. def kill(self):
  63. # type: () -> None
  64. """
  65. Kill worker thread. Returns immediately. Not useful for
  66. waiting on shutdown for events, use `flush` for that.
  67. """
  68. logger.debug("background worker got kill request")
  69. with self._lock:
  70. if self._thread:
  71. try:
  72. self._queue.put_nowait(_TERMINATOR)
  73. except FullError:
  74. logger.debug("background worker queue full, kill failed")
  75. self._thread = None
  76. self._thread_for_pid = None
  77. def flush(self, timeout, callback=None):
  78. # type: (float, Optional[Any]) -> None
  79. logger.debug("background worker got flush request")
  80. with self._lock:
  81. if self.is_alive and timeout > 0.0:
  82. self._wait_flush(timeout, callback)
  83. logger.debug("background worker flushed")
  84. def full(self):
  85. # type: () -> bool
  86. return self._queue.full()
  87. def _wait_flush(self, timeout, callback):
  88. # type: (float, Optional[Any]) -> None
  89. initial_timeout = min(0.1, timeout)
  90. if not self._timed_queue_join(initial_timeout):
  91. pending = self._queue.qsize() + 1
  92. logger.debug("%d event(s) pending on flush", pending)
  93. if callback is not None:
  94. callback(pending, timeout)
  95. if not self._timed_queue_join(timeout - initial_timeout):
  96. pending = self._queue.qsize() + 1
  97. logger.error("flush timed out, dropped %s events", pending)
  98. def submit(self, callback):
  99. # type: (Callable[[], None]) -> bool
  100. self._ensure_thread()
  101. try:
  102. self._queue.put_nowait(callback)
  103. return True
  104. except FullError:
  105. return False
  106. def _target(self):
  107. # type: () -> None
  108. while True:
  109. callback = self._queue.get()
  110. try:
  111. if callback is _TERMINATOR:
  112. break
  113. try:
  114. callback()
  115. except Exception:
  116. logger.error("Failed processing job", exc_info=True)
  117. finally:
  118. self._queue.task_done()
  119. sleep(0)