| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465 |
- import logging
- import sys
- from collections import OrderedDict
- from pathlib import Path
- from typing import Dict, List, Optional, Union
- import yaml
- from ray._private.path_utils import is_path
- from ray._private.runtime_env.packaging import parse_path
- logger = logging.getLogger(__name__)
- def validate_path(path: str) -> None:
- """Parse the path to ensure it is well-formed and exists."""
- parse_path(path)
- def validate_uri(uri: str):
- try:
- from ray._private.runtime_env.packaging import Protocol, parse_uri
- protocol, path = parse_uri(uri)
- except ValueError:
- raise ValueError(
- f"{uri} is not a valid URI. Passing directories or modules to "
- "be dynamically uploaded is only supported at the job level "
- "(i.e., passed to `ray.init`)."
- )
- if (
- protocol in Protocol.remote_protocols()
- and not path.endswith(".zip")
- and not path.endswith(".whl")
- ):
- raise ValueError("Only .zip or .whl files supported for remote URIs.")
- def _handle_local_deps_requirement_file(requirements_file: str):
- """Read the given [requirements_file], and return all required dependencies."""
- requirements_path = Path(requirements_file)
- if not requirements_path.is_file():
- raise ValueError(f"{requirements_path} is not a valid file")
- return requirements_path.read_text().strip().split("\n")
- def validate_py_modules_uris(py_modules_uris: List[str]) -> List[str]:
- """Parses and validates a 'py_modules' option.
- Expects py_modules to be a list of URIs.
- """
- if not isinstance(py_modules_uris, list):
- raise TypeError(
- "`py_modules` must be a list of strings, got " f"{type(py_modules_uris)}."
- )
- for module in py_modules_uris:
- if not isinstance(module, str):
- raise TypeError("`py_module` must be a string, got " f"{type(module)}.")
- validate_uri(module)
- def parse_and_validate_py_modules(py_modules: List[str]) -> List[str]:
- """Parses and validates a 'py_modules' option.
- Expects py_modules to be a list of local paths or URIs.
- """
- if not isinstance(py_modules, list):
- raise TypeError(
- "`py_modules` must be a list of strings, got " f"{type(py_modules)}."
- )
- for module in py_modules:
- if not isinstance(module, str):
- raise TypeError("`py_module` must be a string, got " f"{type(module)}.")
- if is_path(module):
- validate_path(module)
- else:
- validate_uri(module)
- return py_modules
- def validate_working_dir_uri(working_dir_uri: str) -> str:
- """Parses and validates a 'working_dir' option."""
- if not isinstance(working_dir_uri, str):
- raise TypeError(
- "`working_dir` must be a string, got " f"{type(working_dir_uri)}."
- )
- validate_uri(working_dir_uri)
- def parse_and_validate_working_dir(working_dir: str) -> str:
- """Parses and validates a 'working_dir' option.
- This can be a URI or a path.
- """
- assert working_dir is not None
- if not isinstance(working_dir, str):
- raise TypeError("`working_dir` must be a string, got " f"{type(working_dir)}.")
- if is_path(working_dir):
- validate_path(working_dir)
- else:
- validate_uri(working_dir)
- return working_dir
- def parse_and_validate_conda(conda: Union[str, dict]) -> Union[str, dict]:
- """Parses and validates a user-provided 'conda' option.
- Conda can be one of three cases:
- 1) A dictionary describing the env. This is passed through directly.
- 2) A string referring to the name of a preinstalled conda env.
- 3) A string pointing to a local conda YAML file. This is detected
- by looking for a '.yaml' or '.yml' suffix. In this case, the file
- will be read as YAML and passed through as a dictionary.
- """
- assert conda is not None
- if sys.platform == "win32":
- logger.warning(
- "runtime environment support is experimental on Windows. "
- "If you run into issues please file a report at "
- "https://github.com/ray-project/ray/issues."
- )
- result = conda
- if isinstance(conda, str):
- file_path = Path(conda)
- if file_path.suffix in (".yaml", ".yml"):
- if not file_path.is_file():
- raise ValueError(f"Can't find conda YAML file {file_path}.")
- try:
- result = yaml.safe_load(file_path.read_text())
- except Exception as e:
- raise ValueError(f"Failed to read conda file {file_path}: {e}.")
- elif file_path.is_absolute():
- if not file_path.is_dir():
- raise ValueError(f"Can't find conda env directory {file_path}.")
- result = str(file_path)
- elif isinstance(conda, dict):
- result = conda
- else:
- raise TypeError(
- "runtime_env['conda'] must be of type str or " f"dict, got {type(conda)}."
- )
- return result
- def parse_and_validate_uv(uv: Union[str, List[str], Dict]) -> Optional[Dict]:
- """Parses and validates a user-provided 'uv' option.
- The value of the input 'uv' field can be one of two cases:
- 1) A List[str] describing the requirements. This is passed through.
- Example usage: ["tensorflow", "requests"]
- 2) a string containing the path to a local pip “requirements.txt” file.
- 3) A python dictionary that has one field:
- a) packages (required, List[str]): a list of uv packages, it same as 1).
- b) uv_check (optional, bool): whether to enable pip check at the end of uv
- install, default to False.
- c) uv_version (optional, str): user provides a specific uv to use; if
- unspecified, default version of uv will be used.
- d) uv_pip_install_options (optional, List[str]): user-provided options for
- `uv pip install` command, default to ["--no-cache"].
- The returned parsed value will be a list of packages. If a Ray library
- (e.g. "ray[serve]") is specified, it will be deleted and replaced by its
- dependencies (e.g. "uvicorn", "requests").
- """
- assert uv is not None
- if sys.platform == "win32":
- logger.warning(
- "runtime environment support is experimental on Windows. "
- "If you run into issues please file a report at "
- "https://github.com/ray-project/ray/issues."
- )
- result: str = ""
- if isinstance(uv, str):
- uv_list = _handle_local_deps_requirement_file(uv)
- result = dict(packages=uv_list, uv_check=False)
- elif isinstance(uv, list) and all(isinstance(dep, str) for dep in uv):
- result = dict(packages=uv, uv_check=False)
- elif isinstance(uv, dict):
- if set(uv.keys()) - {
- "packages",
- "uv_check",
- "uv_version",
- "uv_pip_install_options",
- }:
- raise ValueError(
- "runtime_env['uv'] can only have these fields: "
- "packages, uv_check, uv_version and uv_pip_install_options, but got: "
- f"{list(uv.keys())}"
- )
- if "packages" not in uv:
- raise ValueError(
- f"runtime_env['uv'] must include field 'packages', but got {uv}"
- )
- if "uv_check" in uv and not isinstance(uv["uv_check"], bool):
- raise TypeError(
- "runtime_env['uv']['uv_check'] must be of type bool, "
- f"got {type(uv['uv_check'])}"
- )
- if "uv_version" in uv and not isinstance(uv["uv_version"], str):
- raise TypeError(
- "runtime_env['uv']['uv_version'] must be of type str, "
- f"got {type(uv['uv_version'])}"
- )
- if "uv_pip_install_options" in uv:
- if not isinstance(uv["uv_pip_install_options"], list):
- raise TypeError(
- "runtime_env['uv']['uv_pip_install_options'] must be of type "
- f"list[str] got {type(uv['uv_pip_install_options'])}"
- )
- # Check each item in installation option.
- for idx, cur_opt in enumerate(uv["uv_pip_install_options"]):
- if not isinstance(cur_opt, str):
- raise TypeError(
- "runtime_env['uv']['uv_pip_install_options'] must be of type "
- f"list[str] got {type(cur_opt)} for {idx}-th item."
- )
- result = uv.copy()
- result["uv_check"] = uv.get("uv_check", False)
- result["uv_pip_install_options"] = uv.get(
- "uv_pip_install_options", ["--no-cache"]
- )
- if not isinstance(uv["packages"], list):
- raise ValueError(
- "runtime_env['uv']['packages'] must be of type list, "
- f"got: {type(uv['packages'])}"
- )
- else:
- raise TypeError(
- "runtime_env['uv'] must be of type " f"List[str], or dict, got {type(uv)}"
- )
- # Deduplicate packages for package lists.
- result["packages"] = list(OrderedDict.fromkeys(result["packages"]))
- if len(result["packages"]) == 0:
- result = None
- logger.debug(f"Rewrote runtime_env `uv` field from {uv} to {result}.")
- return result
- def parse_and_validate_pip(pip: Union[str, List[str], Dict]) -> Optional[Dict]:
- """Parses and validates a user-provided 'pip' option.
- The value of the input 'pip' field can be one of two cases:
- 1) A List[str] describing the requirements. This is passed through.
- 2) A string pointing to a local requirements file. In this case, the
- file contents will be read split into a list.
- 3) A python dictionary that has three fields:
- a) packages (required, List[str]): a list of pip packages, it same as 1).
- b) pip_check (optional, bool): whether to enable pip check at the end of pip
- install, default to False.
- c) pip_version (optional, str): the version of pip, ray will spell
- the package name 'pip' in front of the `pip_version` to form the final
- requirement string, the syntax of a requirement specifier is defined in
- full in PEP 508.
- d) pip_install_options (optional, List[str]): user-provided options for
- `pip install` command, defaults to ["--disable-pip-version-check", "--no-cache-dir"].
- The returned parsed value will be a list of pip packages. If a Ray library
- (e.g. "ray[serve]") is specified, it will be deleted and replaced by its
- dependencies (e.g. "uvicorn", "requests").
- """
- assert pip is not None
- result = None
- if sys.platform == "win32":
- logger.warning(
- "runtime environment support is experimental on Windows. "
- "If you run into issues please file a report at "
- "https://github.com/ray-project/ray/issues."
- )
- if isinstance(pip, str):
- # We have been given a path to a requirements.txt file.
- pip_list = _handle_local_deps_requirement_file(pip)
- result = dict(
- packages=pip_list,
- pip_check=False,
- )
- elif isinstance(pip, list) and all(isinstance(dep, str) for dep in pip):
- result = dict(packages=pip, pip_check=False)
- elif isinstance(pip, dict):
- if set(pip.keys()) - {
- "packages",
- "pip_check",
- "pip_install_options",
- "pip_version",
- }:
- raise ValueError(
- "runtime_env['pip'] can only have these fields: "
- "packages, pip_check, pip_install_options and pip_version, but got: "
- f"{list(pip.keys())}"
- )
- if "pip_check" in pip and not isinstance(pip["pip_check"], bool):
- raise TypeError(
- "runtime_env['pip']['pip_check'] must be of type bool, "
- f"got {type(pip['pip_check'])}"
- )
- if "pip_version" in pip:
- if not isinstance(pip["pip_version"], str):
- raise TypeError(
- "runtime_env['pip']['pip_version'] must be of type str, "
- f"got {type(pip['pip_version'])}"
- )
- if "pip_install_options" in pip:
- if not isinstance(pip["pip_install_options"], list):
- raise TypeError(
- "runtime_env['pip']['pip_install_options'] must be of type "
- f"list[str] got {type(pip['pip_install_options'])}"
- )
- # Check each item in installation option.
- for idx, cur_opt in enumerate(pip["pip_install_options"]):
- if not isinstance(cur_opt, str):
- raise TypeError(
- "runtime_env['pip']['pip_install_options'] must be of type "
- f"list[str] got {type(cur_opt)} for {idx}-th item."
- )
- result = pip.copy()
- # Contrary to pip_check, we do not insert the default value of pip_install_options.
- # This is to maintain backwards compatibility with ray==2.0.1
- result["pip_check"] = pip.get("pip_check", False)
- if "packages" not in pip:
- raise ValueError(
- f"runtime_env['pip'] must include field 'packages', but got {pip}"
- )
- elif isinstance(pip["packages"], str):
- result["packages"] = _handle_local_deps_requirement_file(pip["packages"])
- elif not isinstance(pip["packages"], list):
- raise ValueError(
- "runtime_env['pip']['packages'] must be of type str of list, "
- f"got: {type(pip['packages'])}"
- )
- else:
- raise TypeError(
- "runtime_env['pip'] must be of type str or " f"List[str], got {type(pip)}"
- )
- # Eliminate duplicates to prevent `pip install` from erroring. Use
- # OrderedDict to preserve the order of the list. This makes the output
- # deterministic and easier to debug, because pip install can have
- # different behavior depending on the order of the input.
- result["packages"] = list(OrderedDict.fromkeys(result["packages"]))
- if len(result["packages"]) == 0:
- result = None
- logger.debug(f"Rewrote runtime_env `pip` field from {pip} to {result}.")
- return result
- def parse_and_validate_container(container: List[str]) -> List[str]:
- """Parses and validates a user-provided 'container' option.
- This is passed through without validation (for now).
- """
- assert container is not None
- return container
- def parse_and_validate_excludes(excludes: List[str]) -> List[str]:
- """Parses and validates a user-provided 'excludes' option.
- This is validated to verify that it is of type List[str].
- If an empty list is passed, we return `None` for consistency.
- """
- assert excludes is not None
- if isinstance(excludes, list) and len(excludes) == 0:
- return None
- if isinstance(excludes, list) and all(isinstance(path, str) for path in excludes):
- return excludes
- else:
- raise TypeError(
- "runtime_env['excludes'] must be of type "
- f"List[str], got {type(excludes)}"
- )
- def parse_and_validate_env_vars(env_vars: Dict[str, str]) -> Optional[Dict[str, str]]:
- """Parses and validates a user-provided 'env_vars' option.
- This is validated to verify that all keys and vals are strings.
- If an empty dictionary is passed, we return `None` for consistency.
- Args:
- env_vars: A dictionary of environment variables to set in the
- runtime environment.
- Returns:
- The validated env_vars dictionary, or None if it was empty.
- Raises:
- TypeError: If the env_vars is not a dictionary of strings. The error message
- will include the type of the invalid value.
- """
- assert env_vars is not None
- if len(env_vars) == 0:
- return None
- if not isinstance(env_vars, dict):
- raise TypeError(
- "runtime_env['env_vars'] must be of type "
- f"Dict[str, str], got {type(env_vars)}"
- )
- for key, val in env_vars.items():
- if not isinstance(key, str):
- raise TypeError(
- "runtime_env['env_vars'] must be of type "
- f"Dict[str, str], but the key {key} is of type {type(key)}"
- )
- if not isinstance(val, str):
- raise TypeError(
- "runtime_env['env_vars'] must be of type "
- f"Dict[str, str], but the value {val} is of type {type(val)}"
- )
- return env_vars
- # Dictionary mapping runtime_env options with the function to parse and
- # validate them.
- OPTION_TO_VALIDATION_FN = {
- "py_modules": parse_and_validate_py_modules,
- "working_dir": parse_and_validate_working_dir,
- "excludes": parse_and_validate_excludes,
- "conda": parse_and_validate_conda,
- "pip": parse_and_validate_pip,
- "uv": parse_and_validate_uv,
- "env_vars": parse_and_validate_env_vars,
- "container": parse_and_validate_container,
- }
- # RuntimeEnv can be created with local paths
- # for these options. However, after the packages
- # for these options have been uploaded to GCS,
- # they must be URIs. These functions provide the ability
- # to validate that these options only contain well-formed URIs.
- OPTION_TO_NO_PATH_VALIDATION_FN = {
- "working_dir": validate_working_dir_uri,
- "py_modules": validate_py_modules_uris,
- }
|