spinners.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. from __future__ import annotations
  2. import contextlib
  3. import itertools
  4. import logging
  5. import sys
  6. import time
  7. from collections.abc import Generator
  8. from typing import IO, Final
  9. from pip._vendor.rich.console import (
  10. Console,
  11. ConsoleOptions,
  12. RenderableType,
  13. RenderResult,
  14. )
  15. from pip._vendor.rich.live import Live
  16. from pip._vendor.rich.measure import Measurement
  17. from pip._vendor.rich.text import Text
  18. from pip._internal.utils.compat import WINDOWS
  19. from pip._internal.utils.logging import get_console, get_indentation
  20. logger = logging.getLogger(__name__)
  21. SPINNER_CHARS: Final = r"-\|/"
  22. SPINS_PER_SECOND: Final = 8
  23. class SpinnerInterface:
  24. def spin(self) -> None:
  25. raise NotImplementedError()
  26. def finish(self, final_status: str) -> None:
  27. raise NotImplementedError()
  28. class InteractiveSpinner(SpinnerInterface):
  29. def __init__(
  30. self,
  31. message: str,
  32. file: IO[str] | None = None,
  33. spin_chars: str = SPINNER_CHARS,
  34. # Empirically, 8 updates/second looks nice
  35. min_update_interval_seconds: float = 1 / SPINS_PER_SECOND,
  36. ):
  37. self._message = message
  38. if file is None:
  39. file = sys.stdout
  40. self._file = file
  41. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  42. self._finished = False
  43. self._spin_cycle = itertools.cycle(spin_chars)
  44. self._file.write(" " * get_indentation() + self._message + " ... ")
  45. self._width = 0
  46. def _write(self, status: str) -> None:
  47. assert not self._finished
  48. # Erase what we wrote before by backspacing to the beginning, writing
  49. # spaces to overwrite the old text, and then backspacing again
  50. backup = "\b" * self._width
  51. self._file.write(backup + " " * self._width + backup)
  52. # Now we have a blank slate to add our status
  53. self._file.write(status)
  54. self._width = len(status)
  55. self._file.flush()
  56. self._rate_limiter.reset()
  57. def spin(self) -> None:
  58. if self._finished:
  59. return
  60. if not self._rate_limiter.ready():
  61. return
  62. self._write(next(self._spin_cycle))
  63. def finish(self, final_status: str) -> None:
  64. if self._finished:
  65. return
  66. self._write(final_status)
  67. self._file.write("\n")
  68. self._file.flush()
  69. self._finished = True
  70. # Used for dumb terminals, non-interactive installs (no tty), etc.
  71. # We still print updates occasionally (once every 60 seconds by default) to
  72. # act as a keep-alive for systems like Travis-CI that take lack-of-output as
  73. # an indication that a task has frozen.
  74. class NonInteractiveSpinner(SpinnerInterface):
  75. def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:
  76. self._message = message
  77. self._finished = False
  78. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  79. self._update("started")
  80. def _update(self, status: str) -> None:
  81. assert not self._finished
  82. self._rate_limiter.reset()
  83. logger.info("%s: %s", self._message, status)
  84. def spin(self) -> None:
  85. if self._finished:
  86. return
  87. if not self._rate_limiter.ready():
  88. return
  89. self._update("still running...")
  90. def finish(self, final_status: str) -> None:
  91. if self._finished:
  92. return
  93. self._update(f"finished with status '{final_status}'")
  94. self._finished = True
  95. class RateLimiter:
  96. def __init__(self, min_update_interval_seconds: float) -> None:
  97. self._min_update_interval_seconds = min_update_interval_seconds
  98. self._last_update: float = 0
  99. def ready(self) -> bool:
  100. now = time.time()
  101. delta = now - self._last_update
  102. return delta >= self._min_update_interval_seconds
  103. def reset(self) -> None:
  104. self._last_update = time.time()
  105. @contextlib.contextmanager
  106. def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:
  107. # Interactive spinner goes directly to sys.stdout rather than being routed
  108. # through the logging system, but it acts like it has level INFO,
  109. # i.e. it's only displayed if we're at level INFO or better.
  110. # Non-interactive spinner goes through the logging system, so it is always
  111. # in sync with logging configuration.
  112. if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
  113. spinner: SpinnerInterface = InteractiveSpinner(message)
  114. else:
  115. spinner = NonInteractiveSpinner(message)
  116. try:
  117. with hidden_cursor(sys.stdout):
  118. yield spinner
  119. except KeyboardInterrupt:
  120. spinner.finish("canceled")
  121. raise
  122. except Exception:
  123. spinner.finish("error")
  124. raise
  125. else:
  126. spinner.finish("done")
  127. class _PipRichSpinner:
  128. """
  129. Custom rich spinner that matches the style of the legacy spinners.
  130. (*) Updates will be handled in a background thread by a rich live panel
  131. which will call render() automatically at the appropriate time.
  132. """
  133. def __init__(self, label: str) -> None:
  134. self.label = label
  135. self._spin_cycle = itertools.cycle(SPINNER_CHARS)
  136. self._spinner_text = ""
  137. self._finished = False
  138. self._indent = get_indentation() * " "
  139. def __rich_console__(
  140. self, console: Console, options: ConsoleOptions
  141. ) -> RenderResult:
  142. yield self.render()
  143. def __rich_measure__(
  144. self, console: Console, options: ConsoleOptions
  145. ) -> Measurement:
  146. text = self.render()
  147. return Measurement.get(console, options, text)
  148. def render(self) -> RenderableType:
  149. if not self._finished:
  150. self._spinner_text = next(self._spin_cycle)
  151. return Text.assemble(self._indent, self.label, " ... ", self._spinner_text)
  152. def finish(self, status: str) -> None:
  153. """Stop spinning and set a final status message."""
  154. self._spinner_text = status
  155. self._finished = True
  156. @contextlib.contextmanager
  157. def open_rich_spinner(label: str, console: Console | None = None) -> Generator[None]:
  158. if not logger.isEnabledFor(logging.INFO):
  159. # Don't show spinner if --quiet is given.
  160. yield
  161. return
  162. console = console or get_console()
  163. spinner = _PipRichSpinner(label)
  164. with Live(spinner, refresh_per_second=SPINS_PER_SECOND, console=console):
  165. try:
  166. yield
  167. except KeyboardInterrupt:
  168. spinner.finish("canceled")
  169. raise
  170. except Exception:
  171. spinner.finish("error")
  172. raise
  173. else:
  174. spinner.finish("done")
  175. HIDE_CURSOR = "\x1b[?25l"
  176. SHOW_CURSOR = "\x1b[?25h"
  177. @contextlib.contextmanager
  178. def hidden_cursor(file: IO[str]) -> Generator[None, None, None]:
  179. # The Windows terminal does not support the hide/show cursor ANSI codes,
  180. # even via colorama. So don't even try.
  181. if WINDOWS:
  182. yield
  183. # We don't want to clutter the output with control characters if we're
  184. # writing to a file, or if the user is running with --quiet.
  185. # See https://github.com/pypa/pip/issues/3418
  186. elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
  187. yield
  188. else:
  189. file.write(HIDE_CURSOR)
  190. try:
  191. yield
  192. finally:
  193. file.write(SHOW_CURSOR)