| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- """Util class to install packages via uv.
- """
- import asyncio
- import hashlib
- import json
- import logging
- import os
- import shutil
- import sys
- from asyncio import create_task, get_running_loop
- from typing import Dict, List, Optional
- from ray._common.utils import try_to_create_directory
- from ray._private.runtime_env import dependency_utils, virtualenv_utils
- from ray._private.runtime_env.packaging import Protocol, parse_uri
- from ray._private.runtime_env.plugin import RuntimeEnvPlugin
- from ray._private.runtime_env.utils import check_output_cmd
- from ray._private.utils import get_directory_size_bytes
- default_logger = logging.getLogger(__name__)
- def _get_uv_hash(uv_dict: Dict) -> str:
- """Get a deterministic hash value for `uv` related runtime envs."""
- serialized_uv_spec = json.dumps(uv_dict, sort_keys=True)
- hash_val = hashlib.sha1(serialized_uv_spec.encode("utf-8")).hexdigest()
- return hash_val
- def get_uri(runtime_env: Dict) -> Optional[str]:
- """Return `"uv://<hashed_dependencies>"`, or None if no GC required."""
- uv = runtime_env.get("uv")
- if uv is not None:
- if isinstance(uv, dict):
- uri = "uv://" + _get_uv_hash(uv_dict=uv)
- elif isinstance(uv, list):
- uri = "uv://" + _get_uv_hash(uv_dict=dict(packages=uv))
- else:
- raise TypeError(
- "uv field received by RuntimeEnvAgent must be "
- f"list or dict, not {type(uv).__name__}."
- )
- else:
- uri = None
- return uri
- class UvProcessor:
- def __init__(
- self,
- target_dir: str,
- runtime_env: "RuntimeEnv", # noqa: F821
- logger: Optional[logging.Logger] = default_logger,
- ):
- try:
- import virtualenv # noqa: F401 ensure virtualenv exists.
- except ImportError:
- raise RuntimeError(
- f"Please install virtualenv "
- f"`{sys.executable} -m pip install virtualenv`"
- f"to enable uv runtime env."
- )
- logger.debug("Setting up uv for runtime_env: %s", runtime_env)
- self._target_dir = target_dir
- # An empty directory is created to execute cmd.
- self._exec_cwd = os.path.join(self._target_dir, "exec_cwd")
- self._runtime_env = runtime_env
- self._logger = logger
- self._uv_config = self._runtime_env.uv_config()
- self._uv_env = os.environ.copy()
- self._uv_env.update(self._runtime_env.env_vars())
- async def _install_uv(
- self, path: str, cwd: str, pip_env: dict, logger: logging.Logger
- ):
- """Before package install, make sure the required version `uv` (if specifieds)
- is installed.
- """
- virtualenv_path = virtualenv_utils.get_virtualenv_path(path)
- python = virtualenv_utils.get_virtualenv_python(path)
- def _get_uv_exec_to_install() -> str:
- """Get `uv` executable with version to install."""
- uv_version = self._uv_config.get("uv_version", None)
- if uv_version:
- return f"uv{uv_version}"
- # Use default version.
- return "uv"
- uv_install_cmd = [
- python,
- "-m",
- "pip",
- "install",
- "--disable-pip-version-check",
- "--no-cache-dir",
- _get_uv_exec_to_install(),
- ]
- logger.info("Installing package uv to %s", virtualenv_path)
- await check_output_cmd(uv_install_cmd, logger=logger, cwd=cwd, env=pip_env)
- async def _check_uv_existence(
- self, path: str, cwd: str, env: dict, logger: logging.Logger
- ) -> bool:
- """Check and return the existence of `uv` in virtual env."""
- python = virtualenv_utils.get_virtualenv_python(path)
- check_existence_cmd = [
- python,
- "-m",
- "uv",
- "version",
- ]
- try:
- # If `uv` doesn't exist, exception will be thrown.
- await check_output_cmd(check_existence_cmd, logger=logger, cwd=cwd, env=env)
- return True
- except Exception:
- return False
- async def _uv_check(sef, python: str, cwd: str, logger: logging.Logger) -> None:
- """Check virtual env dependency compatibility.
- If any incompatibility detected, exception will be thrown.
- param:
- python: the path for python executable within virtual environment.
- """
- cmd = [python, "-m", "uv", "pip", "check"]
- await check_output_cmd(
- cmd,
- logger=logger,
- cwd=cwd,
- )
- async def _install_uv_packages(
- self,
- path: str,
- uv_packages: List[str],
- cwd: str,
- pip_env: Dict,
- logger: logging.Logger,
- ):
- """Install required python packages via `uv`."""
- virtualenv_path = virtualenv_utils.get_virtualenv_path(path)
- python = virtualenv_utils.get_virtualenv_python(path)
- # TODO(fyrestone): Support -i, --no-deps, --no-cache-dir, ...
- requirements_file = dependency_utils.get_requirements_file(path, uv_packages)
- # Check existence for `uv` and see if we could skip `uv` installation.
- uv_exists = await self._check_uv_existence(path, cwd, pip_env, logger)
- # Install uv, which acts as the default package manager.
- if (not uv_exists) or (self._uv_config.get("uv_version", None) is not None):
- await self._install_uv(path, cwd, pip_env, logger)
- # Avoid blocking the event loop.
- loop = get_running_loop()
- await loop.run_in_executor(
- None, dependency_utils.gen_requirements_txt, requirements_file, uv_packages
- )
- # Install all dependencies.
- #
- # Difference with pip:
- # 1. `--disable-pip-version-check` has no effect for uv.
- uv_install_cmd = [
- python,
- "-m",
- "uv",
- "pip",
- "install",
- "-r",
- requirements_file,
- ]
- uv_opt_list = self._uv_config.get("uv_pip_install_options", ["--no-cache"])
- if uv_opt_list:
- uv_install_cmd += uv_opt_list
- logger.info("Installing python requirements to %s", virtualenv_path)
- await check_output_cmd(uv_install_cmd, logger=logger, cwd=cwd, env=pip_env)
- # Check python environment for conflicts.
- if self._uv_config.get("uv_check", False):
- await self._uv_check(python, cwd, logger)
- async def _run(self):
- path = self._target_dir
- logger = self._logger
- uv_packages = self._uv_config["packages"]
- # We create an empty directory for exec cmd so that the cmd will
- # run more stable. e.g. if cwd has ray, then checking ray will
- # look up ray in cwd instead of site packages.
- os.makedirs(self._exec_cwd, exist_ok=True)
- try:
- await virtualenv_utils.create_or_get_virtualenv(
- path, self._exec_cwd, logger
- )
- python = virtualenv_utils.get_virtualenv_python(path)
- async with dependency_utils.check_ray(python, self._exec_cwd, logger):
- # Install packages with uv.
- await self._install_uv_packages(
- path,
- uv_packages,
- self._exec_cwd,
- self._uv_env,
- logger,
- )
- except Exception:
- logger.info("Delete incomplete virtualenv: %s", path)
- shutil.rmtree(path, ignore_errors=True)
- logger.exception("Failed to install uv packages.")
- raise
- def __await__(self):
- return self._run().__await__()
- class UvPlugin(RuntimeEnvPlugin):
- name = "uv"
- def __init__(self, resources_dir: str):
- self._uv_resource_dir = os.path.join(resources_dir, "uv")
- self._creating_task = {}
- # Maps a URI to a lock that is used to prevent multiple concurrent
- # installs of the same virtualenv, see #24513
- self._create_locks: Dict[str, asyncio.Lock] = {}
- # Key: created hashes. Value: size of the uv dir.
- self._created_hash_bytes: Dict[str, int] = {}
- try_to_create_directory(self._uv_resource_dir)
- def _get_path_from_hash(self, hash_val: str) -> str:
- """Generate a path from the hash of a uv spec.
- Example output:
- /tmp/ray/session_2021-11-03_16-33-59_356303_41018/runtime_resources
- /uv/ray-9a7972c3a75f55e976e620484f58410c920db091
- """
- return os.path.join(self._uv_resource_dir, hash_val)
- def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]: # noqa: F821
- """Return the uv URI from the RuntimeEnv if it exists, else return []."""
- uv_uri = runtime_env.uv_uri()
- if uv_uri:
- return [uv_uri]
- return []
- def delete_uri(
- self, uri: str, logger: Optional[logging.Logger] = default_logger
- ) -> int:
- """Delete URI and return the number of bytes deleted."""
- logger.info("Got request to delete uv URI %s", uri)
- protocol, hash_val = parse_uri(uri)
- if protocol != Protocol.UV:
- raise ValueError(
- "UvPlugin can only delete URIs with protocol "
- f"uv. Received protocol {protocol}, URI {uri}"
- )
- # Cancel running create task.
- task = self._creating_task.pop(hash_val, None)
- if task is not None:
- task.cancel()
- del self._created_hash_bytes[hash_val]
- uv_env_path = self._get_path_from_hash(hash_val)
- local_dir_size = get_directory_size_bytes(uv_env_path)
- del self._create_locks[uri]
- try:
- shutil.rmtree(uv_env_path)
- except OSError as e:
- logger.warning(f"Error when deleting uv env {uv_env_path}: {str(e)}")
- return 0
- return local_dir_size
- async def create(
- self,
- uri: str,
- runtime_env: "RuntimeEnv", # noqa: F821
- context: "RuntimeEnvContext", # noqa: F821
- logger: Optional[logging.Logger] = default_logger,
- ) -> int:
- if not runtime_env.has_uv():
- return 0
- protocol, hash_val = parse_uri(uri)
- target_dir = self._get_path_from_hash(hash_val)
- async def _create_for_hash():
- await UvProcessor(
- target_dir,
- runtime_env,
- logger,
- )
- loop = get_running_loop()
- return await loop.run_in_executor(
- None, get_directory_size_bytes, target_dir
- )
- if uri not in self._create_locks:
- # async lock to prevent the same virtualenv being concurrently installed
- self._create_locks[uri] = asyncio.Lock()
- async with self._create_locks[uri]:
- if hash_val in self._created_hash_bytes:
- return self._created_hash_bytes[hash_val]
- self._creating_task[hash_val] = task = create_task(_create_for_hash())
- task.add_done_callback(lambda _: self._creating_task.pop(hash_val, None))
- uv_dir_bytes = await task
- self._created_hash_bytes[hash_val] = uv_dir_bytes
- return uv_dir_bytes
- def modify_context(
- self,
- uris: List[str],
- runtime_env: "RuntimeEnv", # noqa: F821
- context: "RuntimeEnvContext", # noqa: F821
- logger: logging.Logger = default_logger,
- ):
- if not runtime_env.has_uv():
- return
- # UvPlugin only uses a single URI.
- uri = uris[0]
- # Update py_executable.
- protocol, hash_val = parse_uri(uri)
- target_dir = self._get_path_from_hash(hash_val)
- virtualenv_python = virtualenv_utils.get_virtualenv_python(target_dir)
- if not os.path.exists(virtualenv_python):
- raise ValueError(
- f"Local directory {target_dir} for URI {uri} does "
- "not exist on the cluster. Something may have gone wrong while "
- "installing the runtime_env `uv` packages."
- )
- context.py_executable = virtualenv_python
- context.command_prefix += virtualenv_utils.get_virtualenv_activate_command(
- target_dir
- )
|