_discovery.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. from __future__ import annotations
  2. import logging
  3. import os
  4. import sys
  5. from contextlib import suppress
  6. from pathlib import Path
  7. from typing import TYPE_CHECKING, Final
  8. from platformdirs import user_data_path
  9. from ._compat import fs_path_id
  10. from ._py_info import PythonInfo
  11. from ._py_spec import PythonSpec
  12. if TYPE_CHECKING:
  13. from collections.abc import Callable, Generator, Iterable, Mapping, Sequence
  14. from ._cache import PyInfoCache
  15. _LOGGER: Final[logging.Logger] = logging.getLogger(__name__)
  16. IS_WIN: Final[bool] = sys.platform == "win32"
  17. def get_interpreter(
  18. key: str | Sequence[str],
  19. try_first_with: Iterable[str] | None = None,
  20. cache: PyInfoCache | None = None,
  21. env: Mapping[str, str] | None = None,
  22. predicate: Callable[[PythonInfo], bool] | None = None,
  23. ) -> PythonInfo | None:
  24. """
  25. Find a Python interpreter matching *key*.
  26. Iterates over one or more specification strings and returns the first interpreter that satisfies the spec and passes
  27. the optional *predicate*.
  28. :param key: interpreter specification string(s) — an absolute path, a version (``3.12``), an implementation prefix
  29. (``cpython3.12``), or a PEP 440 specifier (``>=3.10``). When a sequence is given each entry is tried in order.
  30. :param try_first_with: executables to probe before the normal discovery search.
  31. :param cache: interpreter metadata cache; when ``None`` results are not cached.
  32. :param env: environment mapping for ``PATH`` lookup; defaults to :data:`os.environ`.
  33. :param predicate: optional callback applied after an interpreter matches the spec. Return ``True`` to accept the
  34. interpreter, ``False`` to skip it and continue searching.
  35. :return: the first matching interpreter, or ``None`` if no match is found.
  36. """
  37. specs = [key] if isinstance(key, str) else key
  38. for spec_str in specs:
  39. if result := _find_interpreter(spec_str, try_first_with or (), cache, env, predicate):
  40. return result
  41. return None
  42. def _find_interpreter(
  43. key: str,
  44. try_first_with: Iterable[str],
  45. cache: PyInfoCache | None = None,
  46. env: Mapping[str, str] | None = None,
  47. predicate: Callable[[PythonInfo], bool] | None = None,
  48. ) -> PythonInfo | None:
  49. spec = PythonSpec.from_string_spec(key)
  50. _LOGGER.info("find interpreter for spec %r", spec)
  51. proposed_paths: set[tuple[str | None, bool]] = set()
  52. env = os.environ if env is None else env
  53. for interpreter, impl_must_match in propose_interpreters(spec, try_first_with, cache, env):
  54. if interpreter is None: # pragma: no cover
  55. continue
  56. proposed_key = interpreter.system_executable, impl_must_match
  57. if proposed_key in proposed_paths:
  58. continue
  59. _LOGGER.info("proposed %s", interpreter)
  60. if interpreter.satisfies(spec, impl_must_match=impl_must_match) and (
  61. predicate is None or predicate(interpreter)
  62. ):
  63. _LOGGER.debug("accepted %s", interpreter)
  64. return interpreter
  65. proposed_paths.add(proposed_key)
  66. return None
  67. def _check_exe(path: str, tested_exes: set[str]) -> str | None:
  68. """Resolve *path* to an absolute path and return it if not yet tested, otherwise ``None``."""
  69. try:
  70. os.lstat(path)
  71. except OSError:
  72. return None
  73. resolved = str(Path(path).resolve())
  74. exe_id = fs_path_id(resolved)
  75. if exe_id in tested_exes:
  76. return None
  77. tested_exes.add(exe_id)
  78. return str(Path(path).absolute())
  79. def _is_new_exe(exe_raw: str, tested_exes: set[str]) -> bool:
  80. """Return ``True`` and register *exe_raw* if it hasn't been tested yet."""
  81. exe_id = fs_path_id(exe_raw)
  82. if exe_id in tested_exes:
  83. return False
  84. tested_exes.add(exe_id)
  85. return True
  86. def propose_interpreters(
  87. spec: PythonSpec,
  88. try_first_with: Iterable[str],
  89. cache: PyInfoCache | None = None,
  90. env: Mapping[str, str] | None = None,
  91. ) -> Generator[tuple[PythonInfo | None, bool], None, None]:
  92. """
  93. Yield ``(interpreter, impl_must_match)`` candidates for *spec*.
  94. :param spec: the parsed interpreter specification to match against.
  95. :param try_first_with: executable paths to probe before the standard search.
  96. :param cache: interpreter metadata cache; when ``None`` results are not cached.
  97. :param env: environment mapping for ``PATH`` lookup; defaults to :data:`os.environ`.
  98. """
  99. env = os.environ if env is None else env
  100. tested_exes: set[str] = set()
  101. if spec.is_abs and spec.path is not None:
  102. if exe_raw := _check_exe(spec.path, tested_exes): # pragma: no branch # first exe always new
  103. yield PythonInfo.from_exe(exe_raw, cache, env=env), True
  104. return
  105. yield from _propose_explicit(spec, try_first_with, cache, env, tested_exes)
  106. if spec.path is not None and spec.is_abs: # pragma: no cover # relative spec.path is never abs
  107. return
  108. yield from _propose_from_path(spec, cache, env, tested_exes)
  109. yield from _propose_from_uv(cache, env)
  110. def _propose_explicit(
  111. spec: PythonSpec,
  112. try_first_with: Iterable[str],
  113. cache: PyInfoCache | None,
  114. env: Mapping[str, str],
  115. tested_exes: set[str],
  116. ) -> Generator[tuple[PythonInfo | None, bool], None, None]:
  117. for py_exe in try_first_with:
  118. if exe_raw := _check_exe(str(Path(py_exe).resolve()), tested_exes):
  119. yield PythonInfo.from_exe(exe_raw, cache, env=env), True
  120. if spec.path is not None:
  121. if exe_raw := _check_exe(spec.path, tested_exes): # pragma: no branch
  122. yield PythonInfo.from_exe(exe_raw, cache, env=env), True
  123. else:
  124. yield from _propose_current_and_windows(spec, cache, env, tested_exes)
  125. def _propose_current_and_windows(
  126. spec: PythonSpec,
  127. cache: PyInfoCache | None,
  128. env: Mapping[str, str],
  129. tested_exes: set[str],
  130. ) -> Generator[tuple[PythonInfo | None, bool], None, None]:
  131. current_python = PythonInfo.current_system(cache)
  132. if _is_new_exe(str(current_python.executable), tested_exes):
  133. yield current_python, True
  134. if IS_WIN: # pragma: win32 cover
  135. from ._windows import propose_interpreters as win_propose # noqa: PLC0415
  136. for interpreter in win_propose(spec, cache, env):
  137. if _is_new_exe(str(interpreter.executable), tested_exes):
  138. yield interpreter, True
  139. def _propose_from_path(
  140. spec: PythonSpec,
  141. cache: PyInfoCache | None,
  142. env: Mapping[str, str],
  143. tested_exes: set[str],
  144. ) -> Generator[tuple[PythonInfo | None, bool], None, None]:
  145. find_candidates = path_exe_finder(spec)
  146. for pos, path in enumerate(get_paths(env)):
  147. _LOGGER.debug(LazyPathDump(pos, path, env))
  148. for exe, impl_must_match in find_candidates(path):
  149. exe_raw = str(exe)
  150. if resolved := _resolve_shim(exe_raw, env):
  151. _LOGGER.debug("resolved shim %s to %s", exe_raw, resolved)
  152. exe_raw = resolved
  153. if not _is_new_exe(exe_raw, tested_exes):
  154. continue
  155. interpreter = PathPythonInfo.from_exe(exe_raw, cache, raise_on_error=False, env=env)
  156. if interpreter is not None:
  157. yield interpreter, impl_must_match
  158. def _propose_from_uv(
  159. cache: PyInfoCache | None,
  160. env: Mapping[str, str],
  161. ) -> Generator[tuple[PythonInfo | None, bool], None, None]:
  162. if uv_python_dir := os.getenv("UV_PYTHON_INSTALL_DIR"):
  163. uv_python_path = Path(uv_python_dir).expanduser()
  164. elif xdg_data_home := os.getenv("XDG_DATA_HOME"):
  165. uv_python_path = Path(xdg_data_home).expanduser() / "uv" / "python"
  166. else:
  167. uv_python_path = user_data_path("uv") / "python"
  168. for exe_path in uv_python_path.glob("*/bin/python"): # pragma: no branch
  169. interpreter = PathPythonInfo.from_exe(str(exe_path), cache, raise_on_error=False, env=env)
  170. if interpreter is not None: # pragma: no branch
  171. yield interpreter, True
  172. def get_paths(env: Mapping[str, str]) -> Generator[Path, None, None]:
  173. path = env.get("PATH", None)
  174. if path is None:
  175. try:
  176. path = os.confstr("CS_PATH")
  177. except (AttributeError, ValueError): # pragma: no cover # Windows only (no confstr)
  178. path = os.defpath
  179. if path:
  180. for entry in map(Path, path.split(os.pathsep)):
  181. with suppress(OSError):
  182. if entry.is_dir() and next(entry.iterdir(), None):
  183. yield entry
  184. class LazyPathDump:
  185. def __init__(self, pos: int, path: Path, env: Mapping[str, str]) -> None:
  186. self.pos = pos
  187. self.path = path
  188. self.env = env
  189. def __repr__(self) -> str:
  190. content = f"discover PATH[{self.pos}]={self.path}"
  191. if self.env.get("_VIRTUALENV_DEBUG"):
  192. content += " with =>"
  193. for file_path in self.path.iterdir():
  194. try:
  195. if file_path.is_dir():
  196. continue
  197. if IS_WIN: # pragma: win32 cover
  198. pathext = self.env.get("PATHEXT", ".COM;.EXE;.BAT;.CMD").split(";")
  199. if not any(file_path.name.upper().endswith(ext) for ext in pathext):
  200. continue
  201. elif not (file_path.stat().st_mode & os.X_OK):
  202. continue
  203. except OSError:
  204. pass
  205. content += " "
  206. content += file_path.name
  207. return content
  208. def path_exe_finder(spec: PythonSpec) -> Callable[[Path], Generator[tuple[Path, bool], None, None]]:
  209. """Given a spec, return a function that can be called on a path to find all matching files in it."""
  210. pat = spec.generate_re(windows=sys.platform == "win32")
  211. direct = spec.str_spec
  212. if sys.platform == "win32": # pragma: win32 cover
  213. direct = f"{direct}.exe"
  214. def path_exes(path: Path) -> Generator[tuple[Path, bool], None, None]:
  215. direct_path = path / direct
  216. if direct_path.exists():
  217. yield direct_path, False
  218. for exe in path.iterdir():
  219. match = pat.fullmatch(exe.name)
  220. if match:
  221. yield exe.absolute(), match["impl"] == "python"
  222. return path_exes
  223. def _resolve_shim(exe_path: str, env: Mapping[str, str]) -> str | None:
  224. """Resolve a version-manager shim to the actual Python binary."""
  225. for shims_dir_env, versions_path in _VERSION_MANAGER_LAYOUTS:
  226. if root := env.get(shims_dir_env):
  227. shims_dir = os.path.join(root, "shims")
  228. if os.path.dirname(exe_path) == shims_dir:
  229. exe_name = os.path.basename(exe_path)
  230. versions_dir = os.path.join(root, *versions_path)
  231. return _resolve_shim_to_binary(exe_name, versions_dir, env)
  232. return None
  233. _VERSION_MANAGER_LAYOUTS: list[tuple[str, tuple[str, ...]]] = [
  234. ("PYENV_ROOT", ("versions",)),
  235. ("MISE_DATA_DIR", ("installs", "python")),
  236. ("ASDF_DATA_DIR", ("installs", "python")),
  237. ]
  238. def _resolve_shim_to_binary(exe_name: str, versions_dir: str, env: Mapping[str, str]) -> str | None:
  239. for version in _active_versions(env):
  240. resolved = os.path.join(versions_dir, version, "bin", exe_name)
  241. if Path(resolved).is_file() and os.access(resolved, os.X_OK):
  242. return resolved
  243. return None
  244. def _active_versions(env: Mapping[str, str]) -> Generator[str, None, None]:
  245. """Yield active Python version strings by reading version-manager configuration."""
  246. if pyenv_version := env.get("PYENV_VERSION"):
  247. yield from pyenv_version.split(":")
  248. return
  249. if versions := _read_python_version_file(Path.cwd()):
  250. yield from versions
  251. return
  252. if (pyenv_root := env.get("PYENV_ROOT")) and (
  253. versions := _read_python_version_file(os.path.join(pyenv_root, "version"), search_parents=False)
  254. ):
  255. yield from versions
  256. def _read_python_version_file(start: str | Path, *, search_parents: bool = True) -> list[str] | None:
  257. """Read a ``.python-version`` file, optionally searching parent directories."""
  258. current = start
  259. while True:
  260. candidate = os.path.join(current, ".python-version") if Path(current).is_dir() else current
  261. if Path(candidate).is_file():
  262. with Path(candidate).open(encoding="utf-8") as fh:
  263. if versions := [v for line in fh if (v := line.strip()) and not v.startswith("#")]:
  264. return versions
  265. if not search_parents:
  266. return None
  267. parent = Path(current).parent
  268. if parent == current:
  269. return None
  270. current = parent
  271. class PathPythonInfo(PythonInfo):
  272. """python info from path."""
  273. __all__ = [
  274. "LazyPathDump",
  275. "PathPythonInfo",
  276. "get_interpreter",
  277. "get_paths",
  278. "propose_interpreters",
  279. ]