| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458 |
- """Periodically update bundled versions."""
- from __future__ import annotations
- import json
- import logging
- import os
- import ssl
- import sys
- from datetime import datetime, timedelta, timezone
- from itertools import groupby
- from pathlib import Path
- from shutil import copy2
- from subprocess import DEVNULL, Popen
- from textwrap import dedent
- from threading import Thread
- from typing import TYPE_CHECKING
- from urllib.error import URLError
- from urllib.request import urlopen
- from virtualenv.app_data import AppDataDiskFolder
- from virtualenv.seed.wheels.embed import BUNDLE_SUPPORT
- from virtualenv.seed.wheels.util import Wheel
- from virtualenv.util.subprocess import CREATE_NO_WINDOW
- if TYPE_CHECKING:
- from collections.abc import Generator
- from virtualenv.app_data.base import AppData
- LOGGER = logging.getLogger(__name__)
- GRACE_PERIOD_CI = timedelta(hours=1) # prevent version switch in the middle of a CI run
- GRACE_PERIOD_MINOR = timedelta(days=28)
- UPDATE_PERIOD = timedelta(days=14)
- UPDATE_ABORTED_DELAY = timedelta(hours=1)
- def periodic_update( # noqa: PLR0913
- distribution: str,
- of_version: str | None,
- for_py_version: str,
- wheel: Wheel | None,
- search_dirs: list[Path],
- app_data: AppData,
- do_periodic_update: bool,
- env: dict[str, str],
- ) -> Wheel | None:
- if do_periodic_update:
- handle_auto_update(distribution, for_py_version, wheel, search_dirs, app_data, env)
- now = datetime.now(tz=timezone.utc)
- def _update_wheel(ver: NewVersion) -> Wheel:
- updated_wheel = Wheel(app_data.house / ver.filename)
- LOGGER.debug("using %supdated wheel %s", "periodically " if updated_wheel else "", updated_wheel)
- return updated_wheel
- u_log = UpdateLog.from_app_data(app_data, distribution, for_py_version)
- if of_version is None:
- for _, group in groupby(u_log.versions, key=lambda v: v.wheel.version_tuple[0:2]):
- # use only latest patch version per minor, earlier assumed to be buggy
- all_patches = list(group)
- ignore_grace_period_minor = any(version for version in all_patches if version.use(now))
- for version in all_patches:
- if wheel is not None and Path(version.filename).name == wheel.name:
- return wheel
- if version.use(now, ignore_grace_period_minor):
- return _update_wheel(version)
- else:
- for version in u_log.versions:
- if version.wheel.version == of_version:
- return _update_wheel(version)
- return wheel
- def handle_auto_update( # noqa: PLR0913
- distribution: str,
- for_py_version: str,
- wheel: Wheel | None,
- search_dirs: list[Path],
- app_data: AppData,
- env: dict[str, str],
- ) -> None:
- embed_update_log = app_data.embed_update_log(distribution, for_py_version)
- u_log = UpdateLog.from_dict(embed_update_log.read())
- if u_log.needs_update:
- u_log.periodic = True
- u_log.started = datetime.now(tz=timezone.utc)
- embed_update_log.write(u_log.to_dict())
- trigger_update(distribution, for_py_version, wheel, search_dirs, app_data, periodic=True, env=env)
- def add_wheel_to_update_log(wheel: Wheel, for_py_version: str, app_data: AppData) -> None:
- embed_update_log = app_data.embed_update_log(wheel.distribution, for_py_version)
- LOGGER.debug("adding %s information to %s", wheel.name, embed_update_log.file) # ty: ignore[unresolved-attribute]
- u_log = UpdateLog.from_dict(embed_update_log.read())
- if any(version.filename == wheel.name for version in u_log.versions):
- LOGGER.warning("%s already present in %s", wheel.name, embed_update_log.file) # ty: ignore[unresolved-attribute]
- return
- # we don't need a release date for sources other than "periodic"
- version = NewVersion(wheel.name, datetime.now(tz=timezone.utc), None, "download")
- u_log.versions.append(version) # always write at the end for proper updates
- embed_update_log.write(u_log.to_dict())
- DATETIME_FMT = "%Y-%m-%dT%H:%M:%S.%fZ"
- def dump_datetime(value: datetime | None) -> str | None:
- return None if value is None else value.strftime(DATETIME_FMT)
- def load_datetime(value: str | None) -> datetime | None:
- return None if value is None else datetime.strptime(value, DATETIME_FMT).replace(tzinfo=timezone.utc)
- class NewVersion: # noqa: PLW1641
- def __init__(self, filename: str, found_date: datetime, release_date: datetime | None, source: str) -> None:
- self.filename = filename
- self.found_date = found_date
- self.release_date = release_date
- self.source = source
- @classmethod
- def from_dict(cls, dictionary: dict[str, str | None]) -> NewVersion:
- return cls(
- filename=dictionary["filename"], # ty: ignore[invalid-argument-type]
- found_date=load_datetime(dictionary["found_date"]), # ty: ignore[invalid-argument-type]
- release_date=load_datetime(dictionary["release_date"]),
- source=dictionary["source"], # ty: ignore[invalid-argument-type]
- )
- def to_dict(self) -> dict[str, str | None]:
- return {
- "filename": self.filename,
- "release_date": dump_datetime(self.release_date),
- "found_date": dump_datetime(self.found_date),
- "source": self.source,
- }
- def use(self, now: datetime, ignore_grace_period_minor: bool = False, ignore_grace_period_ci: bool = False) -> bool: # noqa: FBT002
- if self.source == "manual":
- return True
- if self.source == "periodic" and (self.found_date < now - GRACE_PERIOD_CI or ignore_grace_period_ci):
- if not ignore_grace_period_minor:
- compare_from = self.release_date or self.found_date
- return now - compare_from >= GRACE_PERIOD_MINOR
- return True
- return False
- def __repr__(self) -> str:
- return (
- f"{self.__class__.__name__}(filename={self.filename}), found_date={self.found_date}, "
- f"release_date={self.release_date}, source={self.source})"
- )
- def __eq__(self, other: object) -> bool:
- return type(self) == type(other) and all( # noqa: E721
- getattr(self, k) == getattr(other, k) for k in ["filename", "release_date", "found_date", "source"]
- )
- def __ne__(self, other: object) -> bool:
- return not (self == other)
- @property
- def wheel(self) -> Wheel:
- return Wheel(Path(self.filename))
- class UpdateLog:
- def __init__(
- self, started: datetime | None, completed: datetime | None, versions: list[NewVersion], periodic: bool | None
- ) -> None:
- self.started = started
- self.completed = completed
- self.versions = versions
- self.periodic = periodic
- @classmethod
- def from_dict(cls, dictionary: dict[str, object] | None) -> UpdateLog:
- if dictionary is None:
- dictionary = {}
- return cls(
- load_datetime(dictionary.get("started")), # ty: ignore[invalid-argument-type]
- load_datetime(dictionary.get("completed")), # ty: ignore[invalid-argument-type]
- [NewVersion.from_dict(v) for v in dictionary.get("versions", [])], # ty: ignore[not-iterable]
- dictionary.get("periodic"), # ty: ignore[invalid-argument-type]
- )
- @classmethod
- def from_app_data(cls, app_data: AppData, distribution: str, for_py_version: str) -> UpdateLog:
- raw_json = app_data.embed_update_log(distribution, for_py_version).read()
- return cls.from_dict(raw_json)
- def to_dict(self) -> dict[str, object]:
- return {
- "started": dump_datetime(self.started),
- "completed": dump_datetime(self.completed),
- "periodic": self.periodic,
- "versions": [r.to_dict() for r in self.versions],
- }
- @property
- def needs_update(self) -> bool:
- now = datetime.now(tz=timezone.utc)
- if self.completed is None: # never completed
- return self._check_start(now)
- if now - self.completed <= UPDATE_PERIOD:
- return False
- return self._check_start(now)
- def _check_start(self, now: datetime) -> bool:
- return self.started is None or now - self.started > UPDATE_ABORTED_DELAY
- def trigger_update( # noqa: PLR0913
- distribution: str,
- for_py_version: str,
- wheel: Wheel | None,
- search_dirs: list[Path],
- app_data: AppData,
- env: dict[str, str],
- periodic: bool,
- ) -> None:
- wheel_path = None if wheel is None else str(wheel.path)
- cmd = [
- sys.executable,
- "-c",
- dedent(
- """
- from virtualenv.report import setup_report, MAX_LEVEL
- from virtualenv.seed.wheels.periodic_update import do_update
- setup_report(MAX_LEVEL, show_pid=True)
- do_update({!r}, {!r}, {!r}, {!r}, {!r}, {!r})
- """,
- )
- .strip()
- .format(distribution, for_py_version, wheel_path, str(app_data), [str(p) for p in search_dirs], periodic),
- ]
- debug = env.get("_VIRTUALENV_PERIODIC_UPDATE_INLINE") == "1"
- pipe = None if debug else DEVNULL
- kwargs = {"stdout": pipe, "stderr": pipe}
- if not debug and sys.platform == "win32":
- kwargs["creationflags"] = CREATE_NO_WINDOW
- process = Popen(cmd, **kwargs) # ty: ignore[no-matching-overload]
- LOGGER.info(
- "triggered periodic upgrade of %s%s (for python %s) via background process having PID %d",
- distribution,
- "" if wheel is None else f"=={wheel.version}",
- for_py_version,
- process.pid,
- )
- if debug:
- process.communicate() # on purpose not called to make it a background process
- else:
- # set the returncode here -> no ResourceWarning on main process exit if the subprocess still runs
- process.returncode = 0
- def do_update( # noqa: PLR0913
- distribution: str,
- for_py_version: str,
- embed_filename: str | None,
- app_data: str | AppData,
- search_dirs: list[str] | list[Path],
- periodic: bool,
- ) -> list[NewVersion] | None:
- versions = None
- try:
- versions = _run_do_update(app_data, distribution, embed_filename, for_py_version, periodic, search_dirs)
- finally:
- LOGGER.debug("done %s %s with %s", distribution, for_py_version, versions)
- return versions
- def _run_do_update( # noqa: C901, PLR0913
- app_data: str | AppData,
- distribution: str,
- embed_filename: str | None,
- for_py_version: str,
- periodic: bool,
- search_dirs: list[str] | list[Path],
- ) -> list[NewVersion]:
- from virtualenv.seed.wheels import acquire # noqa: PLC0415
- wheel_filename = None if embed_filename is None else Path(embed_filename)
- embed_version = None if wheel_filename is None else Wheel(wheel_filename).version_tuple
- app_data = AppDataDiskFolder(app_data) if isinstance(app_data, str) else app_data
- search_dirs = [Path(p) if isinstance(p, str) else p for p in search_dirs]
- wheelhouse = app_data.house
- embed_update_log = app_data.embed_update_log(distribution, for_py_version)
- u_log = UpdateLog.from_dict(embed_update_log.read())
- now = datetime.now(tz=timezone.utc)
- update_versions, other_versions = [], []
- for version in u_log.versions:
- if version.source in {"periodic", "manual"}:
- update_versions.append(version)
- else:
- other_versions.append(version)
- if periodic:
- source = "periodic"
- else:
- source = "manual"
- # mark the most recent one as source "manual"
- if update_versions:
- update_versions[0].source = source
- if wheel_filename is not None:
- dest = wheelhouse / wheel_filename.name
- if not dest.exists():
- copy2(str(wheel_filename), str(wheelhouse))
- last, last_version, versions, filenames = None, None, [], set()
- while last is None or not last.use(now, ignore_grace_period_ci=True):
- download_time = datetime.now(tz=timezone.utc)
- dest = acquire.download_wheel(
- distribution=distribution,
- version_spec=None if last_version is None else f"<{last_version}",
- for_py_version=for_py_version,
- search_dirs=search_dirs,
- app_data=app_data,
- to_folder=wheelhouse,
- env=os.environ, # ty: ignore[invalid-argument-type]
- )
- if dest is None or (update_versions and update_versions[0].filename == dest.name):
- break
- release_date = release_date_for_wheel_path(dest.path)
- last = NewVersion(filename=dest.path.name, release_date=release_date, found_date=download_time, source=source)
- LOGGER.info("detected %s in %s", last, datetime.now(tz=timezone.utc) - download_time)
- versions.append(last)
- filenames.add(last.filename)
- last_wheel = last.wheel
- last_version = last_wheel.version
- if embed_version is not None and embed_version >= last_wheel.version_tuple:
- break # stop download if we reach the embed version
- u_log.periodic = periodic
- if not u_log.periodic:
- u_log.started = now
- # update other_versions by removing version we just found
- other_versions = [version for version in other_versions if version.filename not in filenames]
- u_log.versions = versions + update_versions + other_versions
- u_log.completed = datetime.now(tz=timezone.utc)
- embed_update_log.write(u_log.to_dict())
- return versions
- def release_date_for_wheel_path(dest: Path) -> datetime | None:
- wheel = Wheel(dest)
- # the most accurate is to ask PyPi - e.g. https://pypi.org/pypi/pip/json,
- # see https://warehouse.pypa.io/api-reference/json/ for more details
- content = _pypi_get_distribution_info_cached(wheel.distribution)
- if content is not None:
- try:
- upload_time = content["releases"][wheel.version][0]["upload_time"] # ty: ignore[not-subscriptable]
- return datetime.strptime(upload_time, "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc)
- except Exception as exception: # noqa: BLE001
- LOGGER.error("could not load release date %s because %r", content, exception) # noqa: TRY400
- return None
- def _request_context() -> Generator[ssl.SSLContext | None, None, None]:
- yield None
- # fallback to non verified HTTPS (the information we request is not sensitive, so fallback)
- yield ssl._create_unverified_context() # noqa: S323, SLF001
- _PYPI_CACHE = {}
- def _pypi_get_distribution_info_cached(distribution: str) -> dict[str, object] | None:
- if distribution not in _PYPI_CACHE:
- _PYPI_CACHE[distribution] = _pypi_get_distribution_info(distribution)
- return _PYPI_CACHE[distribution]
- def _pypi_get_distribution_info(distribution: str) -> dict[str, object] | None:
- content, url = None, f"https://pypi.org/pypi/{distribution}/json"
- try:
- for context in _request_context():
- try:
- with urlopen(url, context=context) as file_handler:
- content = json.load(file_handler)
- break
- except URLError as exception:
- LOGGER.error("failed to access %s because %r", url, exception) # noqa: TRY400
- except Exception as exception: # noqa: BLE001
- LOGGER.error("failed to access %s because %r", url, exception) # noqa: TRY400
- return content
- def manual_upgrade(app_data: AppData, env: dict[str, str]) -> None:
- threads = []
- for for_py_version, distribution_to_package in BUNDLE_SUPPORT.items():
- # load extra search dir for the given for_py
- for distribution in distribution_to_package:
- thread = Thread(target=_run_manual_upgrade, args=(app_data, distribution, for_py_version, env))
- thread.start()
- threads.append(thread)
- for thread in threads:
- thread.join()
- def _run_manual_upgrade(app_data: AppData, distribution: str, for_py_version: str, env: dict[str, str]) -> None:
- start = datetime.now(tz=timezone.utc)
- from .bundle import from_bundle # noqa: PLC0415
- current = from_bundle(
- distribution=distribution,
- version=None,
- for_py_version=for_py_version,
- search_dirs=[],
- app_data=app_data,
- do_periodic_update=False,
- env=env,
- )
- LOGGER.warning(
- "upgrade %s for python %s with current %s",
- distribution,
- for_py_version,
- "" if current is None else current.name,
- )
- versions = do_update(
- distribution=distribution,
- for_py_version=for_py_version,
- embed_filename=current.path, # ty: ignore[invalid-argument-type, unresolved-attribute]
- app_data=app_data,
- search_dirs=[],
- periodic=False,
- )
- args = [
- distribution,
- for_py_version,
- datetime.now(tz=timezone.utc) - start,
- ]
- if versions:
- args.append("\n".join(f"\t{v}" for v in versions))
- ver_update = "new entries found:\n%s" if versions else "no new versions found"
- msg = f"upgraded %s for python %s in %s {ver_update}"
- LOGGER.warning(msg, *args)
- __all__ = [
- "NewVersion",
- "UpdateLog",
- "add_wheel_to_update_log",
- "do_update",
- "dump_datetime",
- "load_datetime",
- "manual_upgrade",
- "periodic_update",
- "release_date_for_wheel_path",
- "trigger_update",
- ]
|