search.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. from __future__ import annotations
  2. import logging
  3. import shutil
  4. import sys
  5. import textwrap
  6. import xmlrpc.client
  7. from collections import OrderedDict
  8. from optparse import Values
  9. from typing import TypedDict
  10. from pip._vendor.packaging.version import parse as parse_version
  11. from pip._internal.cli.base_command import Command
  12. from pip._internal.cli.req_command import SessionCommandMixin
  13. from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
  14. from pip._internal.exceptions import CommandError
  15. from pip._internal.metadata import get_default_environment
  16. from pip._internal.metadata.base import BaseDistribution
  17. from pip._internal.models.index import PyPI
  18. from pip._internal.network.xmlrpc import PipXmlrpcTransport
  19. from pip._internal.utils.logging import indent_log
  20. from pip._internal.utils.misc import write_output
  21. class TransformedHit(TypedDict):
  22. name: str
  23. summary: str
  24. versions: list[str]
  25. logger = logging.getLogger(__name__)
  26. class SearchCommand(Command, SessionCommandMixin):
  27. """Search for PyPI packages whose name or summary contains <query>."""
  28. usage = """
  29. %prog [options] <query>"""
  30. ignore_require_venv = True
  31. def add_options(self) -> None:
  32. self.cmd_opts.add_option(
  33. "-i",
  34. "--index",
  35. dest="index",
  36. metavar="URL",
  37. default=PyPI.pypi_url,
  38. help="Base URL of Python Package Index (default %default)",
  39. )
  40. self.parser.insert_option_group(0, self.cmd_opts)
  41. def run(self, options: Values, args: list[str]) -> int:
  42. if not args:
  43. raise CommandError("Missing required argument (search query).")
  44. query = args
  45. pypi_hits = self.search(query, options)
  46. hits = transform_hits(pypi_hits)
  47. terminal_width = None
  48. if sys.stdout.isatty():
  49. terminal_width = shutil.get_terminal_size()[0]
  50. print_results(hits, terminal_width=terminal_width)
  51. if pypi_hits:
  52. return SUCCESS
  53. return NO_MATCHES_FOUND
  54. def search(self, query: list[str], options: Values) -> list[dict[str, str]]:
  55. index_url = options.index
  56. session = self.get_default_session(options)
  57. transport = PipXmlrpcTransport(index_url, session)
  58. pypi = xmlrpc.client.ServerProxy(index_url, transport)
  59. try:
  60. hits = pypi.search({"name": query, "summary": query}, "or")
  61. except xmlrpc.client.Fault as fault:
  62. message = (
  63. f"XMLRPC request failed [code: {fault.faultCode}]\n{fault.faultString}"
  64. )
  65. raise CommandError(message)
  66. assert isinstance(hits, list)
  67. return hits
  68. def transform_hits(hits: list[dict[str, str]]) -> list[TransformedHit]:
  69. """
  70. The list from pypi is really a list of versions. We want a list of
  71. packages with the list of versions stored inline. This converts the
  72. list from pypi into one we can use.
  73. """
  74. packages: dict[str, TransformedHit] = OrderedDict()
  75. for hit in hits:
  76. name = hit["name"]
  77. summary = hit["summary"]
  78. version = hit["version"]
  79. if name not in packages.keys():
  80. packages[name] = {
  81. "name": name,
  82. "summary": summary,
  83. "versions": [version],
  84. }
  85. else:
  86. packages[name]["versions"].append(version)
  87. # if this is the highest version, replace summary and score
  88. if version == highest_version(packages[name]["versions"]):
  89. packages[name]["summary"] = summary
  90. return list(packages.values())
  91. def print_dist_installation_info(latest: str, dist: BaseDistribution | None) -> None:
  92. if dist is not None:
  93. with indent_log():
  94. if dist.version == latest:
  95. write_output("INSTALLED: %s (latest)", dist.version)
  96. else:
  97. write_output("INSTALLED: %s", dist.version)
  98. if parse_version(latest).pre:
  99. write_output(
  100. "LATEST: %s (pre-release; install"
  101. " with `pip install --pre`)",
  102. latest,
  103. )
  104. else:
  105. write_output("LATEST: %s", latest)
  106. def get_installed_distribution(name: str) -> BaseDistribution | None:
  107. env = get_default_environment()
  108. return env.get_distribution(name)
  109. def print_results(
  110. hits: list[TransformedHit],
  111. name_column_width: int | None = None,
  112. terminal_width: int | None = None,
  113. ) -> None:
  114. if not hits:
  115. return
  116. if name_column_width is None:
  117. name_column_width = (
  118. max(
  119. [
  120. len(hit["name"]) + len(highest_version(hit.get("versions", ["-"])))
  121. for hit in hits
  122. ]
  123. )
  124. + 4
  125. )
  126. for hit in hits:
  127. name = hit["name"]
  128. summary = hit["summary"] or ""
  129. latest = highest_version(hit.get("versions", ["-"]))
  130. if terminal_width is not None:
  131. target_width = terminal_width - name_column_width - 5
  132. if target_width > 10:
  133. # wrap and indent summary to fit terminal
  134. summary_lines = textwrap.wrap(summary, target_width)
  135. summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines)
  136. name_latest = f"{name} ({latest})"
  137. line = f"{name_latest:{name_column_width}} - {summary}"
  138. try:
  139. write_output(line)
  140. dist = get_installed_distribution(name)
  141. print_dist_installation_info(latest, dist)
  142. except UnicodeEncodeError:
  143. pass
  144. def highest_version(versions: list[str]) -> str:
  145. return max(versions, key=parse_version)