| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- import asyncio
- import hashlib
- import json
- import logging
- import os
- import shutil
- import sys
- from asyncio import create_task, get_running_loop
- from typing import Dict, List, Optional
- from ray._common.utils import try_to_create_directory
- from ray._private.runtime_env import dependency_utils, virtualenv_utils
- from ray._private.runtime_env.packaging import Protocol, parse_uri
- from ray._private.runtime_env.plugin import RuntimeEnvPlugin
- from ray._private.runtime_env.utils import check_output_cmd
- from ray._private.utils import get_directory_size_bytes
- default_logger = logging.getLogger(__name__)
- def _get_pip_hash(pip_dict: Dict) -> str:
- serialized_pip_spec = json.dumps(pip_dict, sort_keys=True)
- hash_val = hashlib.sha1(serialized_pip_spec.encode("utf-8")).hexdigest()
- return hash_val
- def get_uri(runtime_env: Dict) -> Optional[str]:
- """Return `"pip://<hashed_dependencies>"`, or None if no GC required."""
- pip = runtime_env.get("pip")
- if pip is not None:
- if isinstance(pip, dict):
- uri = "pip://" + _get_pip_hash(pip_dict=pip)
- elif isinstance(pip, list):
- uri = "pip://" + _get_pip_hash(pip_dict=dict(packages=pip))
- else:
- raise TypeError(
- "pip field received by RuntimeEnvAgent must be "
- f"list or dict, not {type(pip).__name__}."
- )
- else:
- uri = None
- return uri
- class PipProcessor:
- def __init__(
- self,
- target_dir: str,
- runtime_env: "RuntimeEnv", # noqa: F821
- logger: Optional[logging.Logger] = default_logger,
- ):
- try:
- import virtualenv # noqa: F401 ensure virtualenv exists.
- except ImportError:
- raise RuntimeError(
- f"Please install virtualenv "
- f"`{sys.executable} -m pip install virtualenv`"
- f"to enable pip runtime env."
- )
- logger.debug("Setting up pip for runtime_env: %s", runtime_env)
- self._target_dir = target_dir
- self._runtime_env = runtime_env
- self._logger = logger
- self._pip_config = self._runtime_env.pip_config()
- self._pip_env = os.environ.copy()
- self._pip_env.update(self._runtime_env.env_vars())
- @classmethod
- async def _ensure_pip_version(
- cls,
- path: str,
- pip_version: Optional[str],
- cwd: str,
- pip_env: Dict,
- logger: logging.Logger,
- ):
- """Run the pip command to reinstall pip to the specified version."""
- if not pip_version:
- return
- python = virtualenv_utils.get_virtualenv_python(path)
- # Ensure pip version.
- pip_reinstall_cmd = [
- python,
- "-m",
- "pip",
- "install",
- "--disable-pip-version-check",
- f"pip{pip_version}",
- ]
- logger.info("Installing pip with version %s", pip_version)
- await check_output_cmd(pip_reinstall_cmd, logger=logger, cwd=cwd, env=pip_env)
- async def _pip_check(
- self,
- path: str,
- pip_check: bool,
- cwd: str,
- pip_env: Dict,
- logger: logging.Logger,
- ):
- """Run the pip check command to check python dependency conflicts.
- If exists conflicts, the exit code of pip check command will be non-zero.
- """
- if not pip_check:
- logger.info("Skip pip check.")
- return
- python = virtualenv_utils.get_virtualenv_python(path)
- await check_output_cmd(
- [python, "-m", "pip", "check", "--disable-pip-version-check"],
- logger=logger,
- cwd=cwd,
- env=pip_env,
- )
- logger.info("Pip check on %s successfully.", path)
- async def _install_pip_packages(
- self,
- path: str,
- pip_packages: List[str],
- cwd: str,
- pip_env: Dict,
- logger: logging.Logger,
- ):
- virtualenv_path = virtualenv_utils.get_virtualenv_path(path)
- python = virtualenv_utils.get_virtualenv_python(path)
- # TODO(fyrestone): Support -i, --no-deps, --no-cache-dir, ...
- pip_requirements_file = dependency_utils.get_requirements_file(
- path, pip_packages
- )
- # Avoid blocking the event loop.
- loop = get_running_loop()
- await loop.run_in_executor(
- None,
- dependency_utils.gen_requirements_txt,
- pip_requirements_file,
- pip_packages,
- )
- # Install all dependencies
- # The default options for pip install are
- #
- # --disable-pip-version-check
- # Don't periodically check PyPI to determine whether a new version
- # of pip is available for download.
- #
- # --no-cache-dir
- # Disable the cache, the pip runtime env is a one-time installation,
- # and we don't need to handle the pip cache broken.
- #
- # Allow users to specify their own options to install packages via `pip`.
- pip_install_cmd = [
- python,
- "-m",
- "pip",
- "install",
- "-r",
- pip_requirements_file,
- ]
- pip_opt_list = self._pip_config.get(
- "pip_install_options", ["--disable-pip-version-check", "--no-cache-dir"]
- )
- pip_install_cmd.extend(pip_opt_list)
- logger.info("Installing python requirements to %s", virtualenv_path)
- await check_output_cmd(pip_install_cmd, logger=logger, cwd=cwd, env=pip_env)
- async def _run(self):
- path = self._target_dir
- logger = self._logger
- pip_packages = self._pip_config["packages"]
- # We create an empty directory for exec cmd so that the cmd will
- # run more stable. e.g. if cwd has ray, then checking ray will
- # look up ray in cwd instead of site packages.
- exec_cwd = os.path.join(path, "exec_cwd")
- os.makedirs(exec_cwd, exist_ok=True)
- try:
- await virtualenv_utils.create_or_get_virtualenv(path, exec_cwd, logger)
- python = virtualenv_utils.get_virtualenv_python(path)
- async with dependency_utils.check_ray(python, exec_cwd, logger):
- # Ensure pip version.
- await self._ensure_pip_version(
- path,
- self._pip_config.get("pip_version", None),
- exec_cwd,
- self._pip_env,
- logger,
- )
- # Install pip packages.
- await self._install_pip_packages(
- path,
- pip_packages,
- exec_cwd,
- self._pip_env,
- logger,
- )
- # Check python environment for conflicts.
- await self._pip_check(
- path,
- self._pip_config.get("pip_check", False),
- exec_cwd,
- self._pip_env,
- logger,
- )
- except Exception:
- logger.info("Delete incomplete virtualenv: %s", path)
- shutil.rmtree(path, ignore_errors=True)
- logger.exception("Failed to install pip packages.")
- raise
- def __await__(self):
- return self._run().__await__()
- class PipPlugin(RuntimeEnvPlugin):
- name = "pip"
- def __init__(self, resources_dir: str):
- self._pip_resources_dir = os.path.join(resources_dir, "pip")
- self._creating_task = {}
- # Maps a URI to a lock that is used to prevent multiple concurrent
- # installs of the same virtualenv, see #24513
- self._create_locks: Dict[str, asyncio.Lock] = {}
- # Key: created hashes. Value: size of the pip dir.
- self._created_hash_bytes: Dict[str, int] = {}
- try_to_create_directory(self._pip_resources_dir)
- def _get_path_from_hash(self, hash_val: str) -> str:
- """Generate a path from the hash of a pip spec.
- Example output:
- /tmp/ray/session_2021-11-03_16-33-59_356303_41018/runtime_resources
- /pip/ray-9a7972c3a75f55e976e620484f58410c920db091
- """
- return os.path.join(self._pip_resources_dir, hash_val)
- def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]: # noqa: F821
- """Return the pip URI from the RuntimeEnv if it exists, else return []."""
- pip_uri = runtime_env.pip_uri()
- if pip_uri:
- return [pip_uri]
- return []
- def delete_uri(
- self, uri: str, logger: Optional[logging.Logger] = default_logger
- ) -> int:
- """Delete URI and return the number of bytes deleted."""
- logger.info("Got request to delete pip URI %s", uri)
- protocol, hash_val = parse_uri(uri)
- if protocol != Protocol.PIP:
- raise ValueError(
- "PipPlugin can only delete URIs with protocol "
- f"pip. Received protocol {protocol}, URI {uri}"
- )
- # Cancel running create task.
- task = self._creating_task.pop(hash_val, None)
- if task is not None:
- task.cancel()
- del self._created_hash_bytes[hash_val]
- pip_env_path = self._get_path_from_hash(hash_val)
- local_dir_size = get_directory_size_bytes(pip_env_path)
- del self._create_locks[uri]
- try:
- shutil.rmtree(pip_env_path)
- except OSError as e:
- logger.warning(f"Error when deleting pip env {pip_env_path}: {str(e)}")
- return 0
- return local_dir_size
- async def create(
- self,
- uri: str,
- runtime_env: "RuntimeEnv", # noqa: F821
- context: "RuntimeEnvContext", # noqa: F821
- logger: Optional[logging.Logger] = default_logger,
- ) -> int:
- if not runtime_env.has_pip():
- return 0
- protocol, hash_val = parse_uri(uri)
- target_dir = self._get_path_from_hash(hash_val)
- async def _create_for_hash():
- await PipProcessor(
- target_dir,
- runtime_env,
- logger,
- )
- loop = get_running_loop()
- return await loop.run_in_executor(
- None, get_directory_size_bytes, target_dir
- )
- if uri not in self._create_locks:
- # async lock to prevent the same virtualenv being concurrently installed
- self._create_locks[uri] = asyncio.Lock()
- async with self._create_locks[uri]:
- if hash_val in self._created_hash_bytes:
- return self._created_hash_bytes[hash_val]
- self._creating_task[hash_val] = task = create_task(_create_for_hash())
- task.add_done_callback(lambda _: self._creating_task.pop(hash_val, None))
- pip_dir_bytes = await task
- self._created_hash_bytes[hash_val] = pip_dir_bytes
- return pip_dir_bytes
- def modify_context(
- self,
- uris: List[str],
- runtime_env: "RuntimeEnv", # noqa: F821
- context: "RuntimeEnvContext", # noqa: F821
- logger: logging.Logger = default_logger,
- ):
- if not runtime_env.has_pip():
- return
- # PipPlugin only uses a single URI.
- uri = uris[0]
- # Update py_executable.
- protocol, hash_val = parse_uri(uri)
- target_dir = self._get_path_from_hash(hash_val)
- virtualenv_python = virtualenv_utils.get_virtualenv_python(target_dir)
- if not os.path.exists(virtualenv_python):
- raise ValueError(
- f"Local directory {target_dir} for URI {uri} does "
- "not exist on the cluster. Something may have gone wrong while "
- "installing the runtime_env `pip` packages."
- )
- context.py_executable = virtualenv_python
- context.command_prefix += virtualenv_utils.get_virtualenv_activate_command(
- target_dir
- )
|