| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- """Acquire Python information via subprocess interrogation with multi-level caching."""
- from __future__ import annotations
- import hashlib
- import json
- import logging
- import os
- import pkgutil
- import secrets
- import subprocess # noqa: S404
- import sys
- import tempfile
- from collections import OrderedDict
- from contextlib import contextmanager
- from pathlib import Path
- from shlex import quote
- from subprocess import Popen, TimeoutExpired # noqa: S404
- from typing import TYPE_CHECKING, Final
- from ._cache import NoOpCache
- from ._py_info import PythonInfo
- if TYPE_CHECKING:
- from collections.abc import Generator, Mapping
- from ._cache import ContentStore, PyInfoCache
- _CACHE: OrderedDict[Path, PythonInfo | Exception] = OrderedDict()
- _CACHE[Path(sys.executable)] = PythonInfo()
- _LOGGER: Final[logging.Logger] = logging.getLogger(__name__)
- def from_exe( # noqa: PLR0913
- cls: type[PythonInfo],
- cache: PyInfoCache | None,
- exe: str,
- env: Mapping[str, str] | None = None,
- *,
- raise_on_error: bool = True,
- ignore_cache: bool = False,
- ) -> PythonInfo | None:
- env = os.environ if env is None else env
- result = _get_from_cache(cls, cache, exe, env, ignore_cache=ignore_cache)
- if isinstance(result, Exception):
- if raise_on_error:
- raise result
- _LOGGER.info("%s", result)
- result = None
- return result
- def _get_from_cache(
- cls: type[PythonInfo],
- cache: PyInfoCache | None,
- exe: str,
- env: Mapping[str, str],
- *,
- ignore_cache: bool = True,
- ) -> PythonInfo | Exception:
- exe_path = Path(exe)
- if not ignore_cache and exe_path in _CACHE:
- result = _CACHE[exe_path]
- else:
- py_info = _get_via_file_cache(cls, cache, exe_path, exe, env)
- result = _CACHE[exe_path] = py_info
- if isinstance(result, PythonInfo):
- result.executable = exe
- return result
- def _get_via_file_cache(
- cls: type[PythonInfo],
- cache: PyInfoCache | None,
- path: Path,
- exe: str,
- env: Mapping[str, str],
- ) -> PythonInfo | Exception:
- path_text = str(path)
- try:
- path_modified = path.stat().st_mtime
- except OSError:
- path_modified = -1
- py_info_script = Path(Path(__file__).resolve()).parent / "_py_info.py"
- try:
- py_info_hash: str | None = hashlib.sha256(py_info_script.read_bytes()).hexdigest()
- except OSError:
- py_info_hash = None
- resolved_cache = cache if cache is not None else NoOpCache()
- py_info: PythonInfo | None = None
- py_info_store = resolved_cache.py_info(path)
- with py_info_store.locked():
- if py_info_store.exists() and (data := py_info_store.read()) is not None:
- of_path, of_st_mtime = data.get("path"), data.get("st_mtime")
- of_content, of_hash = data.get("content"), data.get("hash")
- if (
- of_path == path_text
- and of_st_mtime == path_modified
- and of_hash == py_info_hash
- and isinstance(of_content, dict)
- ):
- py_info = _load_cached_py_info(cls, py_info_store, of_content)
- else:
- py_info_store.remove()
- if py_info is None:
- failure, py_info = _run_subprocess(cls, exe, env)
- if failure is not None:
- _LOGGER.debug("first subprocess attempt failed for %s (%s), retrying", exe, failure)
- failure, py_info = _run_subprocess(cls, exe, env)
- if failure is not None:
- return failure
- if py_info is not None:
- py_info_store.write({
- "st_mtime": path_modified,
- "path": path_text,
- "content": py_info.to_dict(),
- "hash": py_info_hash,
- })
- if py_info is None:
- msg = f"{exe} failed to produce interpreter info"
- return RuntimeError(msg)
- return py_info
- def _load_cached_py_info(
- cls: type[PythonInfo],
- py_info_store: ContentStore,
- content: dict,
- ) -> PythonInfo | None:
- try:
- py_info = cls.from_dict(content.copy())
- except (KeyError, TypeError):
- py_info_store.remove()
- return None
- if (sys_exe := py_info.system_executable) is not None and not Path(sys_exe).exists():
- py_info_store.remove()
- return None
- return py_info
- COOKIE_LENGTH: Final[int] = 32
- def gen_cookie() -> str:
- return secrets.token_hex(COOKIE_LENGTH // 2)
- @contextmanager
- def _resolve_py_info_script() -> Generator[Path]:
- py_info_script = Path(Path(__file__).resolve()).parent / "_py_info.py"
- if py_info_script.is_file():
- yield py_info_script
- else:
- data = pkgutil.get_data(__package__ or __name__, "_py_info.py")
- if data is None:
- msg = "cannot locate _py_info.py for subprocess interrogation"
- raise FileNotFoundError(msg)
- fd, tmp = tempfile.mkstemp(suffix=".py")
- try:
- os.write(fd, data)
- os.close(fd)
- yield Path(tmp)
- finally:
- Path(tmp).unlink()
- def _extract_between_cookies(out: str, start_cookie: str, end_cookie: str) -> tuple[str, str, int, int]:
- """Extract payload between reversed cookie markers, forwarding any surrounding output to stdout."""
- raw_out = out
- out_starts = out.find(start_cookie[::-1])
- if out_starts > -1:
- if pre_cookie := out[:out_starts]:
- sys.stdout.write(pre_cookie)
- out = out[out_starts + COOKIE_LENGTH :]
- out_ends = out.find(end_cookie[::-1])
- if out_ends > -1:
- if post_cookie := out[out_ends + COOKIE_LENGTH :]:
- sys.stdout.write(post_cookie)
- out = out[:out_ends]
- return out, raw_out, out_starts, out_ends
- def _run_subprocess(
- cls: type[PythonInfo],
- exe: str,
- env: Mapping[str, str],
- ) -> tuple[Exception | None, PythonInfo | None]:
- start_cookie = gen_cookie()
- end_cookie = gen_cookie()
- timeout = float(env.get("PY_DISCOVERY_TIMEOUT", "15"))
- with _resolve_py_info_script() as py_info_script:
- cmd = [exe, str(py_info_script), start_cookie, end_cookie]
- env = dict(env)
- env.pop("__PYVENV_LAUNCHER__", None)
- env["PYTHONUTF8"] = "1"
- _LOGGER.debug("get interpreter info via cmd: %s", LogCmd(cmd))
- try:
- process = Popen( # noqa: S603
- cmd,
- universal_newlines=True,
- stdin=subprocess.PIPE,
- stderr=subprocess.PIPE,
- stdout=subprocess.PIPE,
- env=env,
- encoding="utf-8",
- errors="backslashreplace",
- )
- out, err = process.communicate(timeout=timeout)
- code = process.returncode
- except TimeoutExpired:
- process.kill()
- process.communicate()
- out, err, code = "", "timed out", -1
- except OSError as os_error:
- out, err, code = "", os_error.strerror, os_error.errno
- if code != 0:
- msg = f"{exe} with code {code}{f' out: {out!r}' if out else ''}{f' err: {err!r}' if err else ''}"
- return RuntimeError(f"failed to query {msg}"), None
- out, raw_out, out_starts, out_ends = _extract_between_cookies(out, start_cookie, end_cookie)
- try:
- result = cls.from_json(out)
- result.executable = exe
- except json.JSONDecodeError as exc:
- _LOGGER.warning(
- "subprocess %s returned invalid JSON; raw stdout %d chars, start cookie %s, end cookie %s, "
- "parsed output %d chars: %r",
- exe,
- len(raw_out),
- "found" if out_starts > -1 else "missing",
- "found" if out_ends > -1 else "missing",
- len(out),
- out[:200] if out else "<empty>",
- )
- msg = f"{exe} returned invalid JSON (exit code {code}){f', stderr: {err!r}' if err else ''}"
- failure = RuntimeError(msg)
- failure.__cause__ = exc
- return failure, None
- return None, result
- class LogCmd:
- def __init__(self, cmd: list[str], env: Mapping[str, str] | None = None) -> None:
- self.cmd = cmd
- self.env = env
- def __repr__(self) -> str:
- cmd_repr = " ".join(quote(str(c)) for c in self.cmd)
- if self.env is not None:
- cmd_repr = f"{cmd_repr} env of {self.env!r}"
- return cmd_repr
- def clear(cache: PyInfoCache) -> None:
- cache.py_info_clear()
- _CACHE.clear()
- __all__ = [
- "LogCmd",
- "clear",
- "from_exe",
- ]
|