| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- """IMPORTANT: this is an experimental interface and not currently stable."""
- import json
- import os
- import tempfile
- from contextlib import contextmanager
- from typing import Any, Callable, Dict, Iterator, List, Optional, Union
- from ray._private.label_utils import validate_label_selector
- from ray.autoscaler._private import commands
- from ray.autoscaler._private.cli_logger import cli_logger
- from ray.autoscaler._private.event_system import (
- CreateClusterEvent, # noqa: F401
- global_event_system, # noqa: F401
- )
- from ray.util.annotations import DeveloperAPI
- @DeveloperAPI
- def create_or_update_cluster(
- cluster_config: Union[dict, str],
- *,
- no_restart: bool = False,
- restart_only: bool = False,
- no_config_cache: bool = False,
- ) -> Dict[str, Any]:
- """Create or updates an autoscaling Ray cluster from a config json.
- Args:
- cluster_config (Union[str, dict]): Either the config dict of the
- cluster, or a path pointing to a file containing the config.
- no_restart: Whether to skip restarting Ray services during the
- update. This avoids interrupting running jobs and can be used to
- dynamically adjust autoscaler configuration.
- restart_only: Whether to skip running setup commands and only
- restart Ray. This cannot be used with 'no-restart'.
- no_config_cache: Whether to disable the config cache and fully
- resolve all environment settings from the Cloud provider again.
- """
- with _as_config_file(cluster_config) as config_file:
- return commands.create_or_update_cluster(
- config_file=config_file,
- override_min_workers=None,
- override_max_workers=None,
- no_restart=no_restart,
- restart_only=restart_only,
- yes=True,
- override_cluster_name=None,
- no_config_cache=no_config_cache,
- redirect_command_output=None,
- use_login_shells=True,
- )
- @DeveloperAPI
- def teardown_cluster(
- cluster_config: Union[dict, str],
- workers_only: bool = False,
- keep_min_workers: bool = False,
- ) -> None:
- """Destroys all nodes of a Ray cluster described by a config json.
- Args:
- cluster_config (Union[str, dict]): Either the config dict of the
- cluster, or a path pointing to a file containing the config.
- workers_only: Whether to keep the head node running and only
- teardown worker nodes.
- keep_min_workers: Whether to keep min_workers (as specified
- in the YAML) still running.
- """
- with _as_config_file(cluster_config) as config_file:
- return commands.teardown_cluster(
- config_file=config_file,
- yes=True,
- workers_only=workers_only,
- override_cluster_name=None,
- keep_min_workers=keep_min_workers,
- )
- @DeveloperAPI
- def run_on_cluster(
- cluster_config: Union[dict, str],
- *,
- cmd: Optional[str] = None,
- run_env: str = "auto",
- tmux: bool = False,
- stop: bool = False,
- no_config_cache: bool = False,
- port_forward: Optional[commands.Port_forward] = None,
- with_output: bool = False,
- ) -> Optional[str]:
- """Runs a command on the specified cluster.
- Args:
- cluster_config (Union[str, dict]): Either the config dict of the
- cluster, or a path pointing to a file containing the config.
- cmd: the command to run, or None for a no-op command.
- run_env: whether to run the command on the host or in a
- container. Select between "auto", "host" and "docker".
- tmux: whether to run in a tmux session
- stop: whether to stop the cluster after command run
- no_config_cache: Whether to disable the config cache and fully
- resolve all environment settings from the Cloud provider again.
- port_forward ( (int,int) or list[(int,int)]): port(s) to forward.
- with_output: Whether to capture command output.
- Returns:
- The output of the command as a string.
- """
- with _as_config_file(cluster_config) as config_file:
- return commands.exec_cluster(
- config_file,
- cmd=cmd,
- run_env=run_env,
- screen=False,
- tmux=tmux,
- stop=stop,
- start=False,
- override_cluster_name=None,
- no_config_cache=no_config_cache,
- port_forward=port_forward,
- with_output=with_output,
- )
- @DeveloperAPI
- def rsync(
- cluster_config: Union[dict, str],
- *,
- source: Optional[str],
- target: Optional[str],
- down: bool,
- ip_address: Optional[str] = None,
- use_internal_ip: bool = False,
- no_config_cache: bool = False,
- should_bootstrap: bool = True,
- ):
- """Rsyncs files to or from the cluster.
- Args:
- cluster_config (Union[str, dict]): Either the config dict of the
- cluster, or a path pointing to a file containing the config.
- source: rsync source argument.
- target: rsync target argument.
- down: whether we're syncing remote -> local.
- ip_address: Address of node.
- use_internal_ip: Whether the provided ip_address is
- public or private.
- no_config_cache: Whether to disable the config cache and fully
- resolve all environment settings from the Cloud provider again.
- should_bootstrap: whether to bootstrap cluster config before syncing
- Raises:
- RuntimeError: If the cluster head node is not found.
- """
- with _as_config_file(cluster_config) as config_file:
- return commands.rsync(
- config_file=config_file,
- source=source,
- target=target,
- override_cluster_name=None,
- down=down,
- ip_address=ip_address,
- use_internal_ip=use_internal_ip,
- no_config_cache=no_config_cache,
- all_nodes=False,
- should_bootstrap=should_bootstrap,
- )
- @DeveloperAPI
- def get_head_node_ip(cluster_config: Union[dict, str]) -> str:
- """Returns head node IP for given configuration file if exists.
- Args:
- cluster_config (Union[str, dict]): Either the config dict of the
- cluster, or a path pointing to a file containing the config.
- Returns:
- The ip address of the cluster head node.
- Raises:
- RuntimeError: If the cluster is not found.
- """
- with _as_config_file(cluster_config) as config_file:
- return commands.get_head_node_ip(config_file)
- @DeveloperAPI
- def get_worker_node_ips(cluster_config: Union[dict, str]) -> List[str]:
- """Returns worker node IPs for given configuration file.
- Args:
- cluster_config (Union[str, dict]): Either the config dict of the
- cluster, or a path pointing to a file containing the config.
- Returns:
- List of worker node ip addresses.
- Raises:
- RuntimeError: If the cluster is not found.
- """
- with _as_config_file(cluster_config) as config_file:
- return commands.get_worker_node_ips(config_file)
- @DeveloperAPI
- def request_resources(
- num_cpus: Optional[int] = None,
- bundles: Optional[List[dict]] = None,
- bundle_label_selectors: Optional[List[dict]] = None,
- ) -> None:
- """Command the autoscaler to scale to accommodate the specified requests.
- The cluster will immediately attempt to scale to accommodate the requested
- resources, bypassing normal upscaling speed constraints. This takes into
- account existing resource usage.
- For example, suppose you call ``request_resources(num_cpus=100)`` and
- there are 45 currently running tasks, each requiring 1 CPU. Then, enough
- nodes will be added so up to 100 tasks can run concurrently. It does
- **not** add enough nodes so that 145 tasks can run.
- This call is only a hint to the autoscaler. The actual resulting cluster
- size may be slightly larger or smaller than expected depending on the
- internal bin packing algorithm and max worker count restrictions.
- Args:
- num_cpus: Scale the cluster to ensure this number of CPUs are
- available. This request is persistent until another call to
- request_resources() is made to override.
- bundles (List[ResourceDict]): Scale the cluster to ensure this set of
- resource shapes can fit. This request is persistent until another
- call to request_resources() is made to override.
- bundle_label_selectors: A list of label selectors, applied per-bundle to the same
- index in the `bundles` list. For bundles without a label requirement, the
- corresponding item in the list is an empty dictionary. For each bundle.
- Label selectors consist of zero or more key-value pairs where the key is
- a label and the value is a operator (in, !in, etc.) and label value.
- Examples:
- >>> from ray.autoscaler.sdk import request_resources
- >>> # Request 1000 CPUs.
- >>> request_resources(num_cpus=1000) # doctest: +SKIP
- >>> # Request 64 CPUs and also fit a 1-GPU/4-CPU task.
- >>> request_resources( # doctest: +SKIP
- ... num_cpus=64, bundles=[{"GPU": 1, "CPU": 4}])
- >>> # Same as requesting num_cpus=3.
- >>> request_resources( # doctest: +SKIP
- ... bundles=[{"CPU": 1}, {"CPU": 1}, {"CPU": 1}])
- >>> # Requests 2 num_cpus=1 bundles, the first with
- >>> # label_selector={"accelerator-type": "in(A100)"} and second with
- >>> # label_selector={"market-type": "spot"}.
- >>> request_resources( # doctest: +SKIP
- ... bundles=[{"CPU": 1}, {"CPU": 1}]),
- ... bundle_label_selectors=[{"accelerator-type": "in(A100)"},
- ... {"market-type": "spot"}])
- """
- if num_cpus is not None and not isinstance(num_cpus, int):
- raise TypeError("num_cpus should be of type int.")
- if bundles is not None:
- if isinstance(bundles, List):
- for bundle in bundles:
- if isinstance(bundle, Dict):
- for key in bundle.keys():
- if not (isinstance(key, str) and isinstance(bundle[key], int)):
- raise TypeError(
- "each bundle key should be str and value as int."
- )
- else:
- raise TypeError("each bundle should be a Dict.")
- else:
- raise TypeError("bundles should be of type List")
- if bundle_label_selectors is not None:
- if bundles is None:
- raise ValueError(
- "`bundles` must be provided when `bundle_label_selectors` is specified."
- )
- if len(bundle_label_selectors) != len(bundles):
- raise ValueError(
- "`bundle_label_selector` must be a list with length equal to the number of bundles."
- )
- for label_selector in bundle_label_selectors:
- if (
- not isinstance(label_selector, dict)
- or not all(isinstance(k, str) for k in label_selector.keys())
- or not all(isinstance(v, str) for v in label_selector.values())
- ):
- raise ValueError(
- "Bundle label selector must be a list of string dictionary"
- " label selectors. For example: "
- '`[{ray.io/market_type": "spot"}, {"ray.io/accelerator-type": "A100"}]`.'
- )
- error_message = validate_label_selector(label_selector)
- if error_message:
- raise ValueError(
- f"Invalid label selector provided in bundle_label_selectors list."
- f" Detailed error: '{error_message}'"
- )
- return commands.request_resources(num_cpus, bundles, bundle_label_selectors)
- @DeveloperAPI
- def configure_logging(
- log_style: Optional[str] = None,
- color_mode: Optional[str] = None,
- verbosity: Optional[int] = None,
- ):
- """Configures logging for cluster command calls.
- Args:
- log_style: If 'pretty', outputs with formatting and color.
- If 'record', outputs record-style without formatting.
- 'auto' defaults to 'pretty', and disables pretty logging
- if stdin is *not* a TTY. Defaults to "auto".
- color_mode (str):
- Can be "true", "false", or "auto".
- Enables or disables `colorful`.
- If `color_mode` is "auto", is set to `not stdout.isatty()`
- vebosity (int):
- Output verbosity (0, 1, 2, 3).
- Low verbosity will disable `verbose` and `very_verbose` messages.
- """
- cli_logger.configure(
- log_style=log_style, color_mode=color_mode, verbosity=verbosity
- )
- @contextmanager
- @DeveloperAPI
- def _as_config_file(cluster_config: Union[dict, str]) -> Iterator[str]:
- if isinstance(cluster_config, dict):
- tmp = tempfile.NamedTemporaryFile("w", prefix="autoscaler-sdk-tmp-")
- tmp.write(json.dumps(cluster_config))
- tmp.flush()
- cluster_config = tmp.name
- if not os.path.exists(cluster_config):
- raise ValueError("Cluster config not found {}".format(cluster_config))
- yield cluster_config
- @DeveloperAPI
- def bootstrap_config(
- cluster_config: Dict[str, Any], no_config_cache: bool = False
- ) -> Dict[str, Any]:
- """Validate and add provider-specific fields to the config. For example,
- IAM/authentication may be added here."""
- return commands._bootstrap_config(cluster_config, no_config_cache)
- @DeveloperAPI
- def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
- """Fillout default values for a cluster_config based on the provider."""
- from ray.autoscaler._private.util import fillout_defaults
- return fillout_defaults(config)
- @DeveloperAPI
- def register_callback_handler(
- event_name: str,
- callback: Union[Callable[[Dict], None], List[Callable[[Dict], None]]],
- ) -> None:
- """Registers a callback handler for autoscaler events.
- Args:
- event_name: Event that callback should be called on. See
- CreateClusterEvent for details on the events available to be
- registered against.
- callback: Callable object that is invoked
- when specified event occurs.
- """
- global_event_system.add_callback_handler(event_name, callback)
- @DeveloperAPI
- def get_docker_host_mount_location(cluster_name: str) -> str:
- """Return host path that Docker mounts attach to."""
- docker_mount_prefix = "/tmp/ray_tmp_mount/{cluster_name}"
- return docker_mount_prefix.format(cluster_name=cluster_name)
|