acquire.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. """Bootstrap."""
  2. from __future__ import annotations
  3. import logging
  4. import sys
  5. from operator import eq, lt
  6. from pathlib import Path
  7. from subprocess import PIPE, CalledProcessError, Popen
  8. from typing import TYPE_CHECKING
  9. from .bundle import from_bundle
  10. from .periodic_update import add_wheel_to_update_log
  11. from .util import Version, Wheel, discover_wheels
  12. if TYPE_CHECKING:
  13. from virtualenv.app_data.base import AppData
  14. LOGGER = logging.getLogger(__name__)
  15. def get_wheel( # noqa: PLR0913
  16. distribution: str,
  17. version: str | None,
  18. for_py_version: str,
  19. search_dirs: list[Path],
  20. download: bool,
  21. app_data: AppData,
  22. do_periodic_update: bool,
  23. env: dict[str, str],
  24. ) -> Wheel | None:
  25. """Get a wheel with the given distribution-version-for_py_version trio, by using the extra search dir + download."""
  26. # not all wheels are compatible with all python versions, so we need to py version qualify it
  27. wheel = None
  28. if not download or version != Version.bundle:
  29. # 1. acquire from bundle
  30. wheel = from_bundle(distribution, version, for_py_version, search_dirs, app_data, do_periodic_update, env)
  31. if download and wheel is None and version != Version.embed:
  32. # 2. download from the internet
  33. wheel = download_wheel(
  34. distribution=distribution,
  35. version_spec=Version.as_version_spec(version),
  36. for_py_version=for_py_version,
  37. search_dirs=search_dirs,
  38. app_data=app_data,
  39. to_folder=app_data.house,
  40. env=env,
  41. )
  42. if wheel is not None and app_data.can_update:
  43. add_wheel_to_update_log(wheel, for_py_version, app_data)
  44. return wheel
  45. def download_wheel( # noqa: PLR0913
  46. distribution: str,
  47. version_spec: str | None,
  48. for_py_version: str,
  49. search_dirs: list[Path],
  50. app_data: AppData,
  51. to_folder: Path,
  52. env: dict[str, str],
  53. ) -> Wheel:
  54. to_download = f"{distribution}{version_spec or ''}"
  55. LOGGER.debug("download wheel %s %s to %s", to_download, for_py_version, to_folder)
  56. cmd = [
  57. sys.executable,
  58. "-m",
  59. "pip",
  60. "download",
  61. "--progress-bar",
  62. "off",
  63. "--disable-pip-version-check",
  64. "--only-binary=:all:",
  65. "--no-deps",
  66. "--python-version",
  67. for_py_version,
  68. "-d",
  69. str(to_folder),
  70. to_download,
  71. ]
  72. # pip has no interface in python - must be a new sub-process
  73. env = pip_wheel_env_run(search_dirs, app_data, env)
  74. process = Popen(cmd, env=env, stdout=PIPE, stderr=PIPE, universal_newlines=True, encoding="utf-8")
  75. out, err = process.communicate()
  76. if process.returncode != 0:
  77. kwargs = {"output": out, "stderr": err}
  78. raise CalledProcessError(process.returncode, cmd, **kwargs)
  79. result = _find_downloaded_wheel(distribution, version_spec, for_py_version, to_folder, out)
  80. LOGGER.debug("downloaded wheel %s", result.name) # ty: ignore[unresolved-attribute]
  81. return result # ty: ignore[invalid-return-type]
  82. def _find_downloaded_wheel(
  83. distribution: str, version_spec: str | None, for_py_version: str, to_folder: Path, out: str
  84. ) -> Wheel | None:
  85. for line in out.splitlines():
  86. stripped_line = line.lstrip()
  87. for marker in ("Saved ", "File was already downloaded "):
  88. if stripped_line.startswith(marker):
  89. return Wheel(Path(stripped_line[len(marker) :]).absolute())
  90. # if for some reason the output does not match fallback to the latest version with that spec
  91. return find_compatible_in_house(distribution, version_spec, for_py_version, to_folder)
  92. def find_compatible_in_house(
  93. distribution: str, version_spec: str | None, for_py_version: str, in_folder: Path
  94. ) -> Wheel | None:
  95. wheels = discover_wheels(in_folder, distribution, None, for_py_version)
  96. start, end = 0, len(wheels)
  97. if version_spec is not None and version_spec:
  98. if version_spec.startswith("<"):
  99. from_pos, op = 1, lt
  100. elif version_spec.startswith("=="):
  101. from_pos, op = 2, eq
  102. else:
  103. raise ValueError(version_spec)
  104. version = Wheel.as_version_tuple(version_spec[from_pos:])
  105. start = next((at for at, w in enumerate(wheels) if op(w.version_tuple, version)), len(wheels))
  106. return None if start == end else wheels[start]
  107. def pip_wheel_env_run(search_dirs: list[Path], app_data: AppData, env: dict[str, str]) -> dict[str, str]:
  108. env = env.copy()
  109. env.update({"PIP_USE_WHEEL": "1", "PIP_USER": "0", "PIP_NO_INPUT": "1", "PYTHONIOENCODING": "utf-8"})
  110. wheel = get_wheel(
  111. distribution="pip",
  112. version=None,
  113. for_py_version=f"{sys.version_info.major}.{sys.version_info.minor}",
  114. search_dirs=search_dirs,
  115. download=False,
  116. app_data=app_data,
  117. do_periodic_update=False,
  118. env=env,
  119. )
  120. if wheel is None:
  121. msg = "could not find the embedded pip"
  122. raise RuntimeError(msg)
  123. env["PYTHONPATH"] = str(wheel.path)
  124. return env
  125. __all__ = [
  126. "download_wheel",
  127. "get_wheel",
  128. "pip_wheel_env_run",
  129. ]