from __future__ import annotations import logging import os import sys from contextlib import suppress from pathlib import Path from typing import TYPE_CHECKING, Final from platformdirs import user_data_path from ._compat import fs_path_id from ._py_info import PythonInfo from ._py_spec import PythonSpec if TYPE_CHECKING: from collections.abc import Callable, Generator, Iterable, Mapping, Sequence from ._cache import PyInfoCache _LOGGER: Final[logging.Logger] = logging.getLogger(__name__) IS_WIN: Final[bool] = sys.platform == "win32" def get_interpreter( key: str | Sequence[str], try_first_with: Iterable[str] | None = None, cache: PyInfoCache | None = None, env: Mapping[str, str] | None = None, predicate: Callable[[PythonInfo], bool] | None = None, ) -> PythonInfo | None: """ Find a Python interpreter matching *key*. Iterates over one or more specification strings and returns the first interpreter that satisfies the spec and passes the optional *predicate*. :param key: interpreter specification string(s) — an absolute path, a version (``3.12``), an implementation prefix (``cpython3.12``), or a PEP 440 specifier (``>=3.10``). When a sequence is given each entry is tried in order. :param try_first_with: executables to probe before the normal discovery search. :param cache: interpreter metadata cache; when ``None`` results are not cached. :param env: environment mapping for ``PATH`` lookup; defaults to :data:`os.environ`. :param predicate: optional callback applied after an interpreter matches the spec. Return ``True`` to accept the interpreter, ``False`` to skip it and continue searching. :return: the first matching interpreter, or ``None`` if no match is found. """ specs = [key] if isinstance(key, str) else key for spec_str in specs: if result := _find_interpreter(spec_str, try_first_with or (), cache, env, predicate): return result return None def _find_interpreter( key: str, try_first_with: Iterable[str], cache: PyInfoCache | None = None, env: Mapping[str, str] | None = None, predicate: Callable[[PythonInfo], bool] | None = None, ) -> PythonInfo | None: spec = PythonSpec.from_string_spec(key) _LOGGER.info("find interpreter for spec %r", spec) proposed_paths: set[tuple[str | None, bool]] = set() env = os.environ if env is None else env for interpreter, impl_must_match in propose_interpreters(spec, try_first_with, cache, env): if interpreter is None: # pragma: no cover continue proposed_key = interpreter.system_executable, impl_must_match if proposed_key in proposed_paths: continue _LOGGER.info("proposed %s", interpreter) if interpreter.satisfies(spec, impl_must_match=impl_must_match) and ( predicate is None or predicate(interpreter) ): _LOGGER.debug("accepted %s", interpreter) return interpreter proposed_paths.add(proposed_key) return None def _check_exe(path: str, tested_exes: set[str]) -> str | None: """Resolve *path* to an absolute path and return it if not yet tested, otherwise ``None``.""" try: os.lstat(path) except OSError: return None resolved = str(Path(path).resolve()) exe_id = fs_path_id(resolved) if exe_id in tested_exes: return None tested_exes.add(exe_id) return str(Path(path).absolute()) def _is_new_exe(exe_raw: str, tested_exes: set[str]) -> bool: """Return ``True`` and register *exe_raw* if it hasn't been tested yet.""" exe_id = fs_path_id(exe_raw) if exe_id in tested_exes: return False tested_exes.add(exe_id) return True def propose_interpreters( spec: PythonSpec, try_first_with: Iterable[str], cache: PyInfoCache | None = None, env: Mapping[str, str] | None = None, ) -> Generator[tuple[PythonInfo | None, bool], None, None]: """ Yield ``(interpreter, impl_must_match)`` candidates for *spec*. :param spec: the parsed interpreter specification to match against. :param try_first_with: executable paths to probe before the standard search. :param cache: interpreter metadata cache; when ``None`` results are not cached. :param env: environment mapping for ``PATH`` lookup; defaults to :data:`os.environ`. """ env = os.environ if env is None else env tested_exes: set[str] = set() if spec.is_abs and spec.path is not None: if exe_raw := _check_exe(spec.path, tested_exes): # pragma: no branch # first exe always new yield PythonInfo.from_exe(exe_raw, cache, env=env), True return yield from _propose_explicit(spec, try_first_with, cache, env, tested_exes) if spec.path is not None and spec.is_abs: # pragma: no cover # relative spec.path is never abs return yield from _propose_from_path(spec, cache, env, tested_exes) yield from _propose_from_uv(cache, env) def _propose_explicit( spec: PythonSpec, try_first_with: Iterable[str], cache: PyInfoCache | None, env: Mapping[str, str], tested_exes: set[str], ) -> Generator[tuple[PythonInfo | None, bool], None, None]: for py_exe in try_first_with: if exe_raw := _check_exe(str(Path(py_exe).resolve()), tested_exes): yield PythonInfo.from_exe(exe_raw, cache, env=env), True if spec.path is not None: if exe_raw := _check_exe(spec.path, tested_exes): # pragma: no branch yield PythonInfo.from_exe(exe_raw, cache, env=env), True else: yield from _propose_current_and_windows(spec, cache, env, tested_exes) def _propose_current_and_windows( spec: PythonSpec, cache: PyInfoCache | None, env: Mapping[str, str], tested_exes: set[str], ) -> Generator[tuple[PythonInfo | None, bool], None, None]: current_python = PythonInfo.current_system(cache) if _is_new_exe(str(current_python.executable), tested_exes): yield current_python, True if IS_WIN: # pragma: win32 cover from ._windows import propose_interpreters as win_propose # noqa: PLC0415 for interpreter in win_propose(spec, cache, env): if _is_new_exe(str(interpreter.executable), tested_exes): yield interpreter, True def _propose_from_path( spec: PythonSpec, cache: PyInfoCache | None, env: Mapping[str, str], tested_exes: set[str], ) -> Generator[tuple[PythonInfo | None, bool], None, None]: find_candidates = path_exe_finder(spec) for pos, path in enumerate(get_paths(env)): _LOGGER.debug(LazyPathDump(pos, path, env)) for exe, impl_must_match in find_candidates(path): exe_raw = str(exe) if resolved := _resolve_shim(exe_raw, env): _LOGGER.debug("resolved shim %s to %s", exe_raw, resolved) exe_raw = resolved if not _is_new_exe(exe_raw, tested_exes): continue interpreter = PathPythonInfo.from_exe(exe_raw, cache, raise_on_error=False, env=env) if interpreter is not None: yield interpreter, impl_must_match def _propose_from_uv( cache: PyInfoCache | None, env: Mapping[str, str], ) -> Generator[tuple[PythonInfo | None, bool], None, None]: if uv_python_dir := os.getenv("UV_PYTHON_INSTALL_DIR"): uv_python_path = Path(uv_python_dir).expanduser() elif xdg_data_home := os.getenv("XDG_DATA_HOME"): uv_python_path = Path(xdg_data_home).expanduser() / "uv" / "python" else: uv_python_path = user_data_path("uv") / "python" for exe_path in uv_python_path.glob("*/bin/python"): # pragma: no branch interpreter = PathPythonInfo.from_exe(str(exe_path), cache, raise_on_error=False, env=env) if interpreter is not None: # pragma: no branch yield interpreter, True def get_paths(env: Mapping[str, str]) -> Generator[Path, None, None]: path = env.get("PATH", None) if path is None: try: path = os.confstr("CS_PATH") except (AttributeError, ValueError): # pragma: no cover # Windows only (no confstr) path = os.defpath if path: for entry in map(Path, path.split(os.pathsep)): with suppress(OSError): if entry.is_dir() and next(entry.iterdir(), None): yield entry class LazyPathDump: def __init__(self, pos: int, path: Path, env: Mapping[str, str]) -> None: self.pos = pos self.path = path self.env = env def __repr__(self) -> str: content = f"discover PATH[{self.pos}]={self.path}" if self.env.get("_VIRTUALENV_DEBUG"): content += " with =>" for file_path in self.path.iterdir(): try: if file_path.is_dir(): continue if IS_WIN: # pragma: win32 cover pathext = self.env.get("PATHEXT", ".COM;.EXE;.BAT;.CMD").split(";") if not any(file_path.name.upper().endswith(ext) for ext in pathext): continue elif not (file_path.stat().st_mode & os.X_OK): continue except OSError: pass content += " " content += file_path.name return content def path_exe_finder(spec: PythonSpec) -> Callable[[Path], Generator[tuple[Path, bool], None, None]]: """Given a spec, return a function that can be called on a path to find all matching files in it.""" pat = spec.generate_re(windows=sys.platform == "win32") direct = spec.str_spec if sys.platform == "win32": # pragma: win32 cover direct = f"{direct}.exe" def path_exes(path: Path) -> Generator[tuple[Path, bool], None, None]: direct_path = path / direct if direct_path.exists(): yield direct_path, False for exe in path.iterdir(): match = pat.fullmatch(exe.name) if match: yield exe.absolute(), match["impl"] == "python" return path_exes def _resolve_shim(exe_path: str, env: Mapping[str, str]) -> str | None: """Resolve a version-manager shim to the actual Python binary.""" for shims_dir_env, versions_path in _VERSION_MANAGER_LAYOUTS: if root := env.get(shims_dir_env): shims_dir = os.path.join(root, "shims") if os.path.dirname(exe_path) == shims_dir: exe_name = os.path.basename(exe_path) versions_dir = os.path.join(root, *versions_path) return _resolve_shim_to_binary(exe_name, versions_dir, env) return None _VERSION_MANAGER_LAYOUTS: list[tuple[str, tuple[str, ...]]] = [ ("PYENV_ROOT", ("versions",)), ("MISE_DATA_DIR", ("installs", "python")), ("ASDF_DATA_DIR", ("installs", "python")), ] def _resolve_shim_to_binary(exe_name: str, versions_dir: str, env: Mapping[str, str]) -> str | None: for version in _active_versions(env): resolved = os.path.join(versions_dir, version, "bin", exe_name) if Path(resolved).is_file() and os.access(resolved, os.X_OK): return resolved return None def _active_versions(env: Mapping[str, str]) -> Generator[str, None, None]: """Yield active Python version strings by reading version-manager configuration.""" if pyenv_version := env.get("PYENV_VERSION"): yield from pyenv_version.split(":") return if versions := _read_python_version_file(Path.cwd()): yield from versions return if (pyenv_root := env.get("PYENV_ROOT")) and ( versions := _read_python_version_file(os.path.join(pyenv_root, "version"), search_parents=False) ): yield from versions def _read_python_version_file(start: str | Path, *, search_parents: bool = True) -> list[str] | None: """Read a ``.python-version`` file, optionally searching parent directories.""" current = start while True: candidate = os.path.join(current, ".python-version") if Path(current).is_dir() else current if Path(candidate).is_file(): with Path(candidate).open(encoding="utf-8") as fh: if versions := [v for line in fh if (v := line.strip()) and not v.startswith("#")]: return versions if not search_parents: return None parent = Path(current).parent if parent == current: return None current = parent class PathPythonInfo(PythonInfo): """python info from path.""" __all__ = [ "LazyPathDump", "PathPythonInfo", "get_interpreter", "get_paths", "propose_interpreters", ]