check.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. """Validation of dependencies of packages"""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Generator, Iterable
  5. from contextlib import suppress
  6. from email.parser import Parser
  7. from functools import reduce
  8. from typing import (
  9. Callable,
  10. NamedTuple,
  11. )
  12. from pip._vendor.packaging.requirements import Requirement
  13. from pip._vendor.packaging.tags import Tag, parse_tag
  14. from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
  15. from pip._vendor.packaging.version import Version
  16. from pip._internal.distributions import make_distribution_for_install_requirement
  17. from pip._internal.metadata import get_default_environment
  18. from pip._internal.metadata.base import BaseDistribution
  19. from pip._internal.req.req_install import InstallRequirement
  20. logger = logging.getLogger(__name__)
  21. class PackageDetails(NamedTuple):
  22. version: Version
  23. dependencies: list[Requirement]
  24. # Shorthands
  25. PackageSet = dict[NormalizedName, PackageDetails]
  26. Missing = tuple[NormalizedName, Requirement]
  27. Conflicting = tuple[NormalizedName, Version, Requirement]
  28. MissingDict = dict[NormalizedName, list[Missing]]
  29. ConflictingDict = dict[NormalizedName, list[Conflicting]]
  30. CheckResult = tuple[MissingDict, ConflictingDict]
  31. ConflictDetails = tuple[PackageSet, CheckResult]
  32. def create_package_set_from_installed() -> tuple[PackageSet, bool]:
  33. """Converts a list of distributions into a PackageSet."""
  34. package_set = {}
  35. problems = False
  36. env = get_default_environment()
  37. for dist in env.iter_installed_distributions(local_only=False, skip=()):
  38. name = dist.canonical_name
  39. try:
  40. dependencies = list(dist.iter_dependencies())
  41. package_set[name] = PackageDetails(dist.version, dependencies)
  42. except (OSError, ValueError) as e:
  43. # Don't crash on unreadable or broken metadata.
  44. logger.warning("Error parsing dependencies of %s: %s", name, e)
  45. problems = True
  46. return package_set, problems
  47. def check_package_set(
  48. package_set: PackageSet, should_ignore: Callable[[str], bool] | None = None
  49. ) -> CheckResult:
  50. """Check if a package set is consistent
  51. If should_ignore is passed, it should be a callable that takes a
  52. package name and returns a boolean.
  53. """
  54. missing = {}
  55. conflicting = {}
  56. for package_name, package_detail in package_set.items():
  57. # Info about dependencies of package_name
  58. missing_deps: set[Missing] = set()
  59. conflicting_deps: set[Conflicting] = set()
  60. if should_ignore and should_ignore(package_name):
  61. continue
  62. for req in package_detail.dependencies:
  63. name = canonicalize_name(req.name)
  64. # Check if it's missing
  65. if name not in package_set:
  66. missed = True
  67. if req.marker is not None:
  68. missed = req.marker.evaluate({"extra": ""})
  69. if missed:
  70. missing_deps.add((name, req))
  71. continue
  72. # Check if there's a conflict
  73. version = package_set[name].version
  74. if not req.specifier.contains(version, prereleases=True):
  75. conflicting_deps.add((name, version, req))
  76. if missing_deps:
  77. missing[package_name] = sorted(missing_deps, key=str)
  78. if conflicting_deps:
  79. conflicting[package_name] = sorted(conflicting_deps, key=str)
  80. return missing, conflicting
  81. def check_install_conflicts(to_install: list[InstallRequirement]) -> ConflictDetails:
  82. """For checking if the dependency graph would be consistent after \
  83. installing given requirements
  84. """
  85. # Start from the current state
  86. package_set, _ = create_package_set_from_installed()
  87. # Install packages
  88. would_be_installed = _simulate_installation_of(to_install, package_set)
  89. # Only warn about directly-dependent packages; create a whitelist of them
  90. whitelist = _create_whitelist(would_be_installed, package_set)
  91. return (
  92. package_set,
  93. check_package_set(
  94. package_set, should_ignore=lambda name: name not in whitelist
  95. ),
  96. )
  97. def check_unsupported(
  98. packages: Iterable[BaseDistribution],
  99. supported_tags: Iterable[Tag],
  100. ) -> Generator[BaseDistribution, None, None]:
  101. for p in packages:
  102. with suppress(FileNotFoundError):
  103. wheel_file = p.read_text("WHEEL")
  104. wheel_tags: frozenset[Tag] = reduce(
  105. frozenset.union,
  106. map(parse_tag, Parser().parsestr(wheel_file).get_all("Tag", [])),
  107. frozenset(),
  108. )
  109. if wheel_tags.isdisjoint(supported_tags):
  110. yield p
  111. def _simulate_installation_of(
  112. to_install: list[InstallRequirement], package_set: PackageSet
  113. ) -> set[NormalizedName]:
  114. """Computes the version of packages after installing to_install."""
  115. # Keep track of packages that were installed
  116. installed = set()
  117. # Modify it as installing requirement_set would (assuming no errors)
  118. for inst_req in to_install:
  119. abstract_dist = make_distribution_for_install_requirement(inst_req)
  120. dist = abstract_dist.get_metadata_distribution()
  121. name = dist.canonical_name
  122. package_set[name] = PackageDetails(dist.version, list(dist.iter_dependencies()))
  123. installed.add(name)
  124. return installed
  125. def _create_whitelist(
  126. would_be_installed: set[NormalizedName], package_set: PackageSet
  127. ) -> set[NormalizedName]:
  128. packages_affected = set(would_be_installed)
  129. for package_name in package_set:
  130. if package_name in packages_affected:
  131. continue
  132. for req in package_set[package_name].dependencies:
  133. if canonicalize_name(req.name) in packages_affected:
  134. packages_affected.add(package_name)
  135. break
  136. return packages_affected