"""Acquire Python information via subprocess interrogation with multi-level caching.""" from __future__ import annotations import hashlib import json import logging import os import pkgutil import secrets import subprocess # noqa: S404 import sys import tempfile from collections import OrderedDict from contextlib import contextmanager from pathlib import Path from shlex import quote from subprocess import Popen, TimeoutExpired # noqa: S404 from typing import TYPE_CHECKING, Final from ._cache import NoOpCache from ._py_info import PythonInfo if TYPE_CHECKING: from collections.abc import Generator, Mapping from ._cache import ContentStore, PyInfoCache _CACHE: OrderedDict[Path, PythonInfo | Exception] = OrderedDict() _CACHE[Path(sys.executable)] = PythonInfo() _LOGGER: Final[logging.Logger] = logging.getLogger(__name__) def from_exe( # noqa: PLR0913 cls: type[PythonInfo], cache: PyInfoCache | None, exe: str, env: Mapping[str, str] | None = None, *, raise_on_error: bool = True, ignore_cache: bool = False, ) -> PythonInfo | None: env = os.environ if env is None else env result = _get_from_cache(cls, cache, exe, env, ignore_cache=ignore_cache) if isinstance(result, Exception): if raise_on_error: raise result _LOGGER.info("%s", result) result = None return result def _get_from_cache( cls: type[PythonInfo], cache: PyInfoCache | None, exe: str, env: Mapping[str, str], *, ignore_cache: bool = True, ) -> PythonInfo | Exception: exe_path = Path(exe) if not ignore_cache and exe_path in _CACHE: result = _CACHE[exe_path] else: py_info = _get_via_file_cache(cls, cache, exe_path, exe, env) result = _CACHE[exe_path] = py_info if isinstance(result, PythonInfo): result.executable = exe return result def _get_via_file_cache( cls: type[PythonInfo], cache: PyInfoCache | None, path: Path, exe: str, env: Mapping[str, str], ) -> PythonInfo | Exception: path_text = str(path) try: path_modified = path.stat().st_mtime except OSError: path_modified = -1 py_info_script = Path(Path(__file__).resolve()).parent / "_py_info.py" try: py_info_hash: str | None = hashlib.sha256(py_info_script.read_bytes()).hexdigest() except OSError: py_info_hash = None resolved_cache = cache if cache is not None else NoOpCache() py_info: PythonInfo | None = None py_info_store = resolved_cache.py_info(path) with py_info_store.locked(): if py_info_store.exists() and (data := py_info_store.read()) is not None: of_path, of_st_mtime = data.get("path"), data.get("st_mtime") of_content, of_hash = data.get("content"), data.get("hash") if ( of_path == path_text and of_st_mtime == path_modified and of_hash == py_info_hash and isinstance(of_content, dict) ): py_info = _load_cached_py_info(cls, py_info_store, of_content) else: py_info_store.remove() if py_info is None: failure, py_info = _run_subprocess(cls, exe, env) if failure is not None: _LOGGER.debug("first subprocess attempt failed for %s (%s), retrying", exe, failure) failure, py_info = _run_subprocess(cls, exe, env) if failure is not None: return failure if py_info is not None: py_info_store.write({ "st_mtime": path_modified, "path": path_text, "content": py_info.to_dict(), "hash": py_info_hash, }) if py_info is None: msg = f"{exe} failed to produce interpreter info" return RuntimeError(msg) return py_info def _load_cached_py_info( cls: type[PythonInfo], py_info_store: ContentStore, content: dict, ) -> PythonInfo | None: try: py_info = cls.from_dict(content.copy()) except (KeyError, TypeError): py_info_store.remove() return None if (sys_exe := py_info.system_executable) is not None and not Path(sys_exe).exists(): py_info_store.remove() return None return py_info COOKIE_LENGTH: Final[int] = 32 def gen_cookie() -> str: return secrets.token_hex(COOKIE_LENGTH // 2) @contextmanager def _resolve_py_info_script() -> Generator[Path]: py_info_script = Path(Path(__file__).resolve()).parent / "_py_info.py" if py_info_script.is_file(): yield py_info_script else: data = pkgutil.get_data(__package__ or __name__, "_py_info.py") if data is None: msg = "cannot locate _py_info.py for subprocess interrogation" raise FileNotFoundError(msg) fd, tmp = tempfile.mkstemp(suffix=".py") try: os.write(fd, data) os.close(fd) yield Path(tmp) finally: Path(tmp).unlink() def _extract_between_cookies(out: str, start_cookie: str, end_cookie: str) -> tuple[str, str, int, int]: """Extract payload between reversed cookie markers, forwarding any surrounding output to stdout.""" raw_out = out out_starts = out.find(start_cookie[::-1]) if out_starts > -1: if pre_cookie := out[:out_starts]: sys.stdout.write(pre_cookie) out = out[out_starts + COOKIE_LENGTH :] out_ends = out.find(end_cookie[::-1]) if out_ends > -1: if post_cookie := out[out_ends + COOKIE_LENGTH :]: sys.stdout.write(post_cookie) out = out[:out_ends] return out, raw_out, out_starts, out_ends def _run_subprocess( cls: type[PythonInfo], exe: str, env: Mapping[str, str], ) -> tuple[Exception | None, PythonInfo | None]: start_cookie = gen_cookie() end_cookie = gen_cookie() timeout = float(env.get("PY_DISCOVERY_TIMEOUT", "15")) with _resolve_py_info_script() as py_info_script: cmd = [exe, str(py_info_script), start_cookie, end_cookie] env = dict(env) env.pop("__PYVENV_LAUNCHER__", None) env["PYTHONUTF8"] = "1" _LOGGER.debug("get interpreter info via cmd: %s", LogCmd(cmd)) try: process = Popen( # noqa: S603 cmd, universal_newlines=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env, encoding="utf-8", errors="backslashreplace", ) out, err = process.communicate(timeout=timeout) code = process.returncode except TimeoutExpired: process.kill() process.communicate() out, err, code = "", "timed out", -1 except OSError as os_error: out, err, code = "", os_error.strerror, os_error.errno if code != 0: msg = f"{exe} with code {code}{f' out: {out!r}' if out else ''}{f' err: {err!r}' if err else ''}" return RuntimeError(f"failed to query {msg}"), None out, raw_out, out_starts, out_ends = _extract_between_cookies(out, start_cookie, end_cookie) try: result = cls.from_json(out) result.executable = exe except json.JSONDecodeError as exc: _LOGGER.warning( "subprocess %s returned invalid JSON; raw stdout %d chars, start cookie %s, end cookie %s, " "parsed output %d chars: %r", exe, len(raw_out), "found" if out_starts > -1 else "missing", "found" if out_ends > -1 else "missing", len(out), out[:200] if out else "", ) msg = f"{exe} returned invalid JSON (exit code {code}){f', stderr: {err!r}' if err else ''}" failure = RuntimeError(msg) failure.__cause__ = exc return failure, None return None, result class LogCmd: def __init__(self, cmd: list[str], env: Mapping[str, str] | None = None) -> None: self.cmd = cmd self.env = env def __repr__(self) -> str: cmd_repr = " ".join(quote(str(c)) for c in self.cmd) if self.env is not None: cmd_repr = f"{cmd_repr} env of {self.env!r}" return cmd_repr def clear(cache: PyInfoCache) -> None: cache.py_info_clear() _CACHE.clear() __all__ = [ "LogCmd", "clear", "from_exe", ]