| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752 |
- import copy
- import datetime
- import hashlib
- import json
- import logging
- import os
- import random
- import shutil
- import subprocess
- import sys
- import tempfile
- import time
- from concurrent.futures import ThreadPoolExecutor
- from types import ModuleType
- from typing import Any, Dict, List, Optional, Tuple, Union
- import click
- import yaml
- import ray
- from ray._common.usage import usage_lib
- from ray.autoscaler._private import subprocess_output_util as cmd_output_util
- from ray.autoscaler._private.autoscaler import AutoscalerSummary
- from ray.autoscaler._private.cli_logger import cf, cli_logger
- from ray.autoscaler._private.cluster_dump import (
- Archive,
- GetParameters,
- Node,
- _info_from_params,
- create_archive_for_local_and_remote_nodes,
- create_archive_for_remote_nodes,
- get_all_local_data,
- )
- from ray.autoscaler._private.command_runner import (
- set_rsync_silent,
- set_using_login_shells,
- )
- from ray.autoscaler._private.constants import (
- AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
- MAX_PARALLEL_SHUTDOWN_WORKERS,
- )
- from ray.autoscaler._private.event_system import CreateClusterEvent, global_event_system
- from ray.autoscaler._private.log_timer import LogTimer
- from ray.autoscaler._private.node_provider_availability_tracker import (
- NodeAvailabilitySummary,
- )
- from ray.autoscaler._private.providers import (
- _NODE_PROVIDERS,
- _PROVIDER_PRETTY_NAMES,
- _get_node_provider,
- )
- from ray.autoscaler._private.updater import NodeUpdaterThread
- from ray.autoscaler._private.util import (
- LoadMetricsSummary,
- format_info_string,
- hash_launch_conf,
- hash_runtime_conf,
- prepare_config,
- validate_config,
- with_envs,
- )
- from ray.autoscaler.node_provider import NodeProvider
- from ray.autoscaler.tags import (
- NODE_KIND_HEAD,
- NODE_KIND_WORKER,
- STATUS_UNINITIALIZED,
- STATUS_UP_TO_DATE,
- TAG_RAY_LAUNCH_CONFIG,
- TAG_RAY_NODE_KIND,
- TAG_RAY_NODE_NAME,
- TAG_RAY_NODE_STATUS,
- TAG_RAY_USER_NODE_TYPE,
- )
- from ray.experimental.internal_kv import _internal_kv_put, internal_kv_get_gcs_client
- from ray.util.debug import log_once
- try: # py3
- from shlex import quote
- except ImportError: # py2
- from pipes import quote
- logger = logging.getLogger(__name__)
- RUN_ENV_TYPES = ["auto", "host", "docker"]
- POLL_INTERVAL = 5
- Port_forward = Union[Tuple[int, int], List[Tuple[int, int]]]
- def try_logging_config(config: Dict[str, Any]) -> None:
- if config["provider"]["type"] == "aws":
- from ray.autoscaler._private.aws.config import log_to_cli
- log_to_cli(config)
- def try_get_log_state(provider_config: Dict[str, Any]) -> Optional[dict]:
- if provider_config["type"] == "aws":
- from ray.autoscaler._private.aws.config import get_log_state
- return get_log_state()
- return None
- def try_reload_log_state(provider_config: Dict[str, Any], log_state: dict) -> None:
- if not log_state:
- return
- if provider_config["type"] == "aws":
- from ray.autoscaler._private.aws.config import reload_log_state
- return reload_log_state(log_state)
- def debug_status(
- status, error, verbose: bool = False, address: Optional[str] = None
- ) -> str:
- """
- Return a debug string for the autoscaler.
- Args:
- status: The autoscaler status string for v1
- error: The autoscaler error string for v1
- verbose: Whether to print verbose information.
- address: The address of the cluster (gcs address).
- Returns:
- str: A debug string for the cluster's status.
- """
- from ray.autoscaler.v2.utils import is_autoscaler_v2
- if is_autoscaler_v2():
- from ray.autoscaler.v2.sdk import get_cluster_status
- from ray.autoscaler.v2.utils import ClusterStatusFormatter
- cluster_status = get_cluster_status(address)
- status = ClusterStatusFormatter.format(cluster_status, verbose=verbose)
- elif status:
- status = status.decode("utf-8")
- status_dict = json.loads(status)
- lm_summary_dict = status_dict.get("load_metrics_report")
- autoscaler_summary_dict = status_dict.get("autoscaler_report")
- timestamp = status_dict.get("time")
- gcs_request_time = status_dict.get("gcs_request_time")
- non_terminated_nodes_time = status_dict.get("non_terminated_nodes_time")
- if lm_summary_dict and autoscaler_summary_dict and timestamp:
- lm_summary = LoadMetricsSummary(**lm_summary_dict)
- node_availability_summary_dict = autoscaler_summary_dict.pop(
- "node_availability_summary", {}
- )
- node_availability_summary = NodeAvailabilitySummary.from_fields(
- **node_availability_summary_dict
- )
- autoscaler_summary = AutoscalerSummary(
- node_availability_summary=node_availability_summary,
- **autoscaler_summary_dict,
- )
- report_time = datetime.datetime.fromtimestamp(timestamp)
- status = format_info_string(
- lm_summary,
- autoscaler_summary,
- time=report_time,
- gcs_request_time=gcs_request_time,
- non_terminated_nodes_time=non_terminated_nodes_time,
- verbose=verbose,
- )
- else:
- status = (
- "No cluster status. It may take a few seconds "
- "for the Ray internal services to start up."
- )
- else:
- status = (
- "No cluster status. It may take a few seconds "
- "for the Ray internal services to start up."
- )
- if error:
- status += "\n"
- status += error.decode("utf-8")
- return status
- def request_resources(
- num_cpus: Optional[int] = None,
- bundles: Optional[List[dict]] = None,
- bundle_label_selectors: Optional[List[dict]] = None,
- ) -> None:
- """Remotely request some CPU or GPU resources from the autoscaler. Optionally
- specify label selectors for nodes with the requested resources.
- If `bundle_label_selectors` is provided, `bundles` must also be provided.
- Both must be lists of the same length, and `bundle_label_selectors` expects a list
- of string dictionaries.
- This function is to be called e.g. on a node before submitting a bunch of
- ray.remote calls to ensure that resources rapidly become available.
- Args:
- num_cpus: Scale the cluster to ensure this number of CPUs are
- available. This request is persistent until another call to
- request_resources() is made.
- bundles (List[ResourceDict]): Scale the cluster to ensure this set of
- resource shapes can fit. This request is persistent until another
- call to request_resources() is made.
- bundle_label_selectors (List[Dict[str,str]]): Optional label selectors
- that new nodes must satisfy. (e.g. [{"accelerator-type": "A100"}])
- The elements in the bundle_label_selectors should be one-to-one mapping
- to the elements in bundles.
- """
- if not ray.is_initialized():
- raise RuntimeError("Ray is not initialized yet")
- to_request = []
- for _ in range(num_cpus or 0):
- to_request.append({"resources": {"CPU": 1}, "label_selector": {}})
- assert not bundle_label_selectors or (
- bundles and len(bundles) == len(bundle_label_selectors)
- ), "If bundle_label_selectors is provided, bundles must also be provided and have the same length."
- if bundles:
- for i, bundle in enumerate(bundles):
- selector = bundle_label_selectors[i] if bundle_label_selectors else {}
- to_request.append({"resources": bundle, "label_selector": selector})
- from ray.autoscaler.v2.utils import is_autoscaler_v2
- if is_autoscaler_v2():
- # For v2 autoscaler: use new format with label_selectors via GCS RPC
- from ray.autoscaler.v2.sdk import request_cluster_resources
- gcs_address = internal_kv_get_gcs_client().address
- request_cluster_resources(gcs_address, to_request)
- else:
- # For v1 autoscaler: write old format (ResourceDict) to KV
- # Extract resources field for backward compatibility
- to_request_v1 = [req["resources"] for req in to_request]
- _internal_kv_put(
- AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
- json.dumps(to_request_v1),
- overwrite=True,
- )
- def create_or_update_cluster(
- config_file: str,
- override_min_workers: Optional[int],
- override_max_workers: Optional[int],
- no_restart: bool,
- restart_only: bool,
- yes: bool,
- override_cluster_name: Optional[str] = None,
- no_config_cache: bool = False,
- redirect_command_output: Optional[bool] = False,
- use_login_shells: bool = True,
- no_monitor_on_head: bool = False,
- ) -> Dict[str, Any]:
- """Creates or updates an autoscaling Ray cluster from a config json."""
- # no_monitor_on_head is an internal flag used by the Ray K8s operator.
- # If True, prevents autoscaling config sync to the Ray head during cluster
- # creation. See https://github.com/ray-project/ray/pull/13720.
- set_using_login_shells(use_login_shells)
- if not use_login_shells:
- cmd_output_util.set_allow_interactive(False)
- if redirect_command_output is None:
- # Do not redirect by default.
- cmd_output_util.set_output_redirected(False)
- else:
- cmd_output_util.set_output_redirected(redirect_command_output)
- def handle_yaml_error(e):
- cli_logger.error("Cluster config invalid")
- cli_logger.newline()
- cli_logger.error("Failed to load YAML file " + cf.bold("{}"), config_file)
- cli_logger.newline()
- with cli_logger.verbatim_error_ctx("PyYAML error:"):
- cli_logger.error(e)
- cli_logger.abort()
- try:
- config = yaml.safe_load(open(config_file).read())
- except FileNotFoundError:
- cli_logger.abort(
- "Provided cluster configuration file ({}) does not exist",
- cf.bold(config_file),
- )
- except yaml.parser.ParserError as e:
- handle_yaml_error(e)
- raise
- except yaml.scanner.ScannerError as e:
- handle_yaml_error(e)
- raise
- global_event_system.execute_callback(
- CreateClusterEvent.up_started, {"cluster_config": config}
- )
- # todo: validate file_mounts, ssh keys, etc.
- importer = _NODE_PROVIDERS.get(config["provider"]["type"])
- if not importer:
- cli_logger.abort(
- "Unknown provider type " + cf.bold("{}") + "\n"
- "Available providers are: {}",
- config["provider"]["type"],
- cli_logger.render_list(
- [k for k in _NODE_PROVIDERS.keys() if _NODE_PROVIDERS[k] is not None]
- ),
- )
- printed_overrides = False
- def handle_cli_override(key, override):
- if override is not None:
- if key in config:
- nonlocal printed_overrides
- printed_overrides = True
- cli_logger.warning(
- "`{}` override provided on the command line.\n"
- " Using "
- + cf.bold("{}")
- + cf.dimmed(" [configuration file has " + cf.bold("{}") + "]"),
- key,
- override,
- config[key],
- )
- config[key] = override
- handle_cli_override("min_workers", override_min_workers)
- handle_cli_override("max_workers", override_max_workers)
- handle_cli_override("cluster_name", override_cluster_name)
- if printed_overrides:
- cli_logger.newline()
- cli_logger.labeled_value("Cluster", config["cluster_name"])
- cli_logger.newline()
- config = _bootstrap_config(config, no_config_cache=no_config_cache)
- try_logging_config(config)
- get_or_create_head_node(
- config,
- config_file,
- no_restart,
- restart_only,
- yes,
- override_cluster_name,
- no_monitor_on_head,
- )
- return config
- CONFIG_CACHE_VERSION = 1
- def _bootstrap_config(
- config: Dict[str, Any], no_config_cache: bool = False
- ) -> Dict[str, Any]:
- config = prepare_config(config)
- # NOTE: multi-node-type autoscaler is guaranteed to be in use after this.
- hasher = hashlib.sha256()
- hasher.update(json.dumps([config], sort_keys=True).encode("utf-8"))
- cache_key = os.path.join(
- tempfile.gettempdir(), "ray-config-{}".format(hasher.hexdigest())
- )
- if os.path.exists(cache_key) and not no_config_cache:
- config_cache = json.loads(open(cache_key).read())
- if config_cache.get("_version", -1) == CONFIG_CACHE_VERSION:
- # todo: is it fine to re-resolve? afaik it should be.
- # we can have migrations otherwise or something
- # but this seems overcomplicated given that resolving is
- # relatively cheap
- try_reload_log_state(
- config_cache["config"]["provider"],
- config_cache.get("provider_log_info"),
- )
- if log_once("_printed_cached_config_warning"):
- cli_logger.verbose_warning(
- "Loaded cached provider configuration from " + cf.bold("{}"),
- cache_key,
- )
- if cli_logger.verbosity == 0:
- cli_logger.warning("Loaded cached provider configuration")
- cli_logger.warning(
- "If you experience issues with "
- "the cloud provider, try re-running "
- "the command with {}.",
- cf.bold("--no-config-cache"),
- )
- cached_config = config_cache["config"]
- if "provider" in cached_config:
- cached_config["provider"]["_config_cache_path"] = cache_key
- return cached_config
- else:
- cli_logger.warning(
- "Found cached cluster config "
- "but the version " + cf.bold("{}") + " "
- "(expected " + cf.bold("{}") + ") does not match.\n"
- "This is normal if cluster launcher was updated.\n"
- "Config will be re-resolved.",
- config_cache.get("_version", "none"),
- CONFIG_CACHE_VERSION,
- )
- importer = _NODE_PROVIDERS.get(config["provider"]["type"])
- if not importer:
- raise NotImplementedError("Unsupported provider {}".format(config["provider"]))
- provider_cls = importer(config["provider"])
- cli_logger.print(
- "Checking {} environment settings",
- _PROVIDER_PRETTY_NAMES.get(config["provider"]["type"]),
- )
- try:
- config = provider_cls.fillout_available_node_types_resources(config)
- except Exception as exc:
- if cli_logger.verbosity > 2:
- logger.exception("Failed to autodetect node resources.")
- else:
- cli_logger.warning(
- f"Failed to autodetect node resources: {str(exc)}. "
- "You can see full stack trace with higher verbosity."
- )
- try:
- # NOTE: if `resources` field is missing, validate_config for providers
- # other than AWS and Kubernetes will fail (the schema error will ask
- # the user to manually fill the resources) as we currently support
- # autofilling resources for AWS and Kubernetes only.
- validate_config(config)
- except (ModuleNotFoundError, ImportError):
- cli_logger.abort(
- "Not all Ray autoscaler dependencies were found. "
- "In Ray 1.4+, the Ray CLI, autoscaler, and dashboard will "
- 'only be usable via `pip install "ray[default]"`. Please '
- "update your install command."
- )
- resolved_config = provider_cls.bootstrap_config(config)
- resolved_config["provider"]["_config_cache_path"] = cache_key
- if not no_config_cache:
- with open(cache_key, "w") as f:
- config_cache = {
- "_version": CONFIG_CACHE_VERSION,
- "provider_log_info": try_get_log_state(resolved_config["provider"]),
- "config": resolved_config,
- }
- f.write(json.dumps(config_cache))
- return resolved_config
- def teardown_cluster(
- config_file: str,
- yes: bool,
- workers_only: bool,
- override_cluster_name: Optional[str],
- keep_min_workers: bool,
- ) -> None:
- """Destroys all nodes of a Ray cluster described by a config json."""
- config = yaml.safe_load(open(config_file).read())
- if override_cluster_name is not None:
- config["cluster_name"] = override_cluster_name
- config = _bootstrap_config(config)
- cli_logger.confirm(yes, "Destroying cluster.", _abort=True)
- if not workers_only:
- try:
- exec_cluster(
- config_file,
- cmd="ray stop",
- run_env="auto",
- screen=False,
- tmux=False,
- stop=False,
- start=False,
- override_cluster_name=override_cluster_name,
- port_forward=None,
- with_output=False,
- )
- except Exception as e:
- # todo: add better exception info
- cli_logger.verbose_error("{}", str(e))
- cli_logger.warning(
- "Exception occurred when stopping the cluster Ray runtime "
- "(use -v to dump teardown exceptions)."
- )
- cli_logger.warning(
- "Ignoring the exception and "
- "attempting to shut down the cluster nodes anyway."
- )
- provider = _get_node_provider(config["provider"], config["cluster_name"])
- def remaining_nodes():
- workers = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
- if keep_min_workers:
- min_workers = config.get("min_workers", 0)
- cli_logger.print(
- "{} random worker nodes will not be shut down. "
- + cf.dimmed("(due to {})"),
- cf.bold(min_workers),
- cf.bold("--keep-min-workers"),
- )
- workers = random.sample(workers, len(workers) - min_workers)
- # todo: it's weird to kill the head node but not all workers
- if workers_only:
- cli_logger.print(
- "The head node will not be shut down. " + cf.dimmed("(due to {})"),
- cf.bold("--workers-only"),
- )
- return workers
- head = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_HEAD})
- return head + workers
- def run_docker_stop(node, container_name):
- try:
- updater = NodeUpdaterThread(
- node_id=node,
- provider_config=config["provider"],
- provider=provider,
- auth_config=config["auth"],
- cluster_name=config["cluster_name"],
- file_mounts=config["file_mounts"],
- initialization_commands=[],
- setup_commands=[],
- ray_start_commands=[],
- runtime_hash="",
- file_mounts_contents_hash="",
- is_head_node=False,
- docker_config=config.get("docker"),
- )
- _exec(
- updater,
- f"docker stop {container_name}",
- with_output=False,
- run_env="host",
- )
- except Exception:
- cli_logger.warning(f"Docker stop failed on {node}")
- # Loop here to check that both the head and worker nodes are actually
- # really gone
- A = remaining_nodes()
- container_name = config.get("docker", {}).get("container_name")
- if container_name:
- # This is to ensure that the parallel SSH calls below do not mess with
- # the users terminal.
- output_redir = cmd_output_util.is_output_redirected()
- cmd_output_util.set_output_redirected(True)
- allow_interactive = cmd_output_util.does_allow_interactive()
- cmd_output_util.set_allow_interactive(False)
- with ThreadPoolExecutor(max_workers=MAX_PARALLEL_SHUTDOWN_WORKERS) as executor:
- for node in A:
- executor.submit(
- run_docker_stop, node=node, container_name=container_name
- )
- cmd_output_util.set_output_redirected(output_redir)
- cmd_output_util.set_allow_interactive(allow_interactive)
- with LogTimer("teardown_cluster: done."):
- while A:
- provider.terminate_nodes(A)
- cli_logger.print(
- "Requested {} nodes to shut down.",
- cf.bold(len(A)),
- _tags=dict(interval="1s"),
- )
- time.sleep(POLL_INTERVAL) # todo: interval should be a variable
- A = remaining_nodes()
- cli_logger.print(
- "{} nodes remaining after {} second(s).", cf.bold(len(A)), POLL_INTERVAL
- )
- cli_logger.success("No nodes remaining.")
- # Cleanup shared cluster resources if provider supports it
- if hasattr(provider, "cleanup_cluster_resources") and not workers_only:
- try:
- cli_logger.print("Cleaning up shared cluster resources...")
- provider.cleanup_cluster_resources()
- cli_logger.success("Shared cluster resources cleaned up.")
- except Exception as e:
- cli_logger.verbose_error("{}", str(e))
- cli_logger.warning(
- "Failed to cleanup shared cluster resources "
- "(use -v to see details). "
- "You may need to manually delete MSI, NSG, and Subnet resources."
- )
- def kill_node(
- config_file: str, yes: bool, hard: bool, override_cluster_name: Optional[str]
- ) -> Optional[str]:
- """Kills a random Raylet worker."""
- config = yaml.safe_load(open(config_file).read())
- if override_cluster_name is not None:
- config["cluster_name"] = override_cluster_name
- config = _bootstrap_config(config)
- cli_logger.confirm(yes, "A random node will be killed.")
- provider = _get_node_provider(config["provider"], config["cluster_name"])
- nodes = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
- if not nodes:
- cli_logger.print("No worker nodes detected.")
- return None
- node = random.choice(nodes)
- cli_logger.print("Shutdown " + cf.bold("{}"), node)
- if hard:
- provider.terminate_node(node)
- else:
- updater = NodeUpdaterThread(
- node_id=node,
- provider_config=config["provider"],
- provider=provider,
- auth_config=config["auth"],
- cluster_name=config["cluster_name"],
- file_mounts=config["file_mounts"],
- initialization_commands=[],
- setup_commands=[],
- ray_start_commands=[],
- runtime_hash="",
- file_mounts_contents_hash="",
- is_head_node=False,
- docker_config=config.get("docker"),
- )
- _exec(updater, "ray stop", False, False)
- time.sleep(POLL_INTERVAL)
- if config.get("provider", {}).get("use_internal_ips", False):
- node_ip = provider.internal_ip(node)
- else:
- node_ip = provider.external_ip(node)
- return node_ip
- def monitor_cluster(
- cluster_config_file: str, num_lines: int, override_cluster_name: Optional[str]
- ) -> None:
- """Tails the autoscaler logs of a Ray cluster."""
- cmd = f"tail -n {num_lines} -f /tmp/ray/session_latest/logs/monitor*"
- exec_cluster(
- cluster_config_file,
- cmd=cmd,
- run_env="auto",
- screen=False,
- tmux=False,
- stop=False,
- start=False,
- override_cluster_name=override_cluster_name,
- port_forward=None,
- )
- def warn_about_bad_start_command(
- start_commands: List[str], no_monitor_on_head: bool = False
- ) -> None:
- ray_start_cmd = list(filter(lambda x: "ray start" in x, start_commands))
- if len(ray_start_cmd) == 0:
- cli_logger.warning(
- "Ray runtime will not be started because `{}` is not in `{}`.",
- cf.bold("ray start"),
- cf.bold("head_start_ray_commands"),
- )
- autoscaling_config_in_ray_start_cmd = any(
- "autoscaling-config" in x for x in ray_start_cmd
- )
- if not (autoscaling_config_in_ray_start_cmd or no_monitor_on_head):
- cli_logger.warning(
- "The head node will not launch any workers because "
- "`{}` does not have `{}` set.\n"
- "Potential fix: add `{}` to the `{}` command under `{}`.",
- cf.bold("ray start"),
- cf.bold("--autoscaling-config"),
- cf.bold("--autoscaling-config=~/ray_bootstrap_config.yaml"),
- cf.bold("ray start"),
- cf.bold("head_start_ray_commands"),
- )
- def get_or_create_head_node(
- config: Dict[str, Any],
- printable_config_file: str,
- no_restart: bool,
- restart_only: bool,
- yes: bool,
- override_cluster_name: Optional[str],
- no_monitor_on_head: bool = False,
- _provider: Optional[NodeProvider] = None,
- _runner: ModuleType = subprocess,
- ) -> None:
- """Create the cluster head node, which in turn creates the workers."""
- global_event_system.execute_callback(CreateClusterEvent.cluster_booting_started)
- provider = _provider or _get_node_provider(
- config["provider"], config["cluster_name"]
- )
- config = copy.deepcopy(config)
- head_node_tags = {
- TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
- }
- nodes = provider.non_terminated_nodes(head_node_tags)
- if len(nodes) > 0:
- head_node = nodes[0]
- else:
- head_node = None
- if not head_node:
- cli_logger.confirm(
- yes, "No head node found. Launching a new cluster.", _abort=True
- )
- cli_logger.newline()
- usage_lib.show_usage_stats_prompt(cli=True)
- if head_node:
- if restart_only:
- cli_logger.confirm(
- yes,
- "Updating cluster configuration and "
- "restarting the cluster Ray runtime. "
- "Setup commands will not be run due to `{}`.\n",
- cf.bold("--restart-only"),
- _abort=True,
- )
- cli_logger.newline()
- usage_lib.show_usage_stats_prompt(cli=True)
- elif no_restart:
- cli_logger.print(
- "Cluster Ray runtime will not be restarted due to `{}`.",
- cf.bold("--no-restart"),
- )
- cli_logger.confirm(
- yes,
- "Updating cluster configuration and running setup commands.",
- _abort=True,
- )
- else:
- cli_logger.print("Updating cluster configuration and running full setup.")
- cli_logger.confirm(
- yes, cf.bold("Cluster Ray runtime will be restarted."), _abort=True
- )
- cli_logger.newline()
- usage_lib.show_usage_stats_prompt(cli=True)
- cli_logger.newline()
- # TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync)
- head_node_config = copy.deepcopy(config.get("head_node", {}))
- # The above `head_node` field is deprecated in favor of per-node-type
- # node_configs. We allow it for backwards-compatibility.
- head_node_resources = None
- head_node_labels = None
- head_node_type = config.get("head_node_type")
- if head_node_type:
- head_node_tags[TAG_RAY_USER_NODE_TYPE] = head_node_type
- head_config = config["available_node_types"][head_node_type]
- head_node_config.update(head_config["node_config"])
- # Not necessary to keep in sync with node_launcher.py
- # Keep in sync with autoscaler.py _node_resources
- head_node_resources = head_config.get("resources")
- head_node_labels = head_config.get("labels")
- launch_hash = hash_launch_conf(head_node_config, config["auth"])
- creating_new_head = _should_create_new_head(
- head_node, launch_hash, head_node_type, provider
- )
- if creating_new_head:
- with cli_logger.group("Acquiring an up-to-date head node"):
- global_event_system.execute_callback(
- CreateClusterEvent.acquiring_new_head_node
- )
- if head_node is not None:
- cli_logger.confirm(yes, "Relaunching the head node.", _abort=True)
- provider.terminate_node(head_node)
- cli_logger.print("Terminated head node {}", head_node)
- head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
- head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
- config["cluster_name"]
- )
- head_node_tags[TAG_RAY_NODE_STATUS] = STATUS_UNINITIALIZED
- provider.create_node(head_node_config, head_node_tags, 1)
- cli_logger.print("Launched a new head node")
- start = time.time()
- head_node = None
- with cli_logger.group("Fetching the new head node"):
- while True:
- if time.time() - start > 50:
- cli_logger.abort(
- "Head node fetch timed out. Failed to create head node."
- )
- nodes = provider.non_terminated_nodes(head_node_tags)
- if len(nodes) == 1:
- head_node = nodes[0]
- break
- time.sleep(POLL_INTERVAL)
- cli_logger.newline()
- global_event_system.execute_callback(CreateClusterEvent.head_node_acquired)
- with cli_logger.group(
- "Setting up head node",
- _numbered=("<>", 1, 1),
- # cf.bold(provider.node_tags(head_node)[TAG_RAY_NODE_NAME]),
- _tags=dict(),
- ): # add id, ARN to tags?
- # TODO(ekl) right now we always update the head node even if the
- # hash matches.
- # We could prompt the user for what they want to do here.
- # No need to pass in cluster_sync_files because we use this
- # hash to set up the head node
- (runtime_hash, file_mounts_contents_hash) = hash_runtime_conf(
- config["file_mounts"], None, config
- )
- if not no_monitor_on_head:
- # Return remote_config_file to avoid prematurely closing it.
- config, remote_config_file = _set_up_config_for_head_node(
- config, provider, no_restart
- )
- cli_logger.print("Prepared bootstrap config")
- if restart_only:
- # Docker may re-launch nodes, requiring setup
- # commands to be rerun.
- if config.get("docker", {}).get("container_name"):
- setup_commands = config["head_setup_commands"]
- else:
- setup_commands = []
- ray_start_commands = config["head_start_ray_commands"]
- # If user passed in --no-restart and we're not creating a new head,
- # omit start commands.
- elif no_restart and not creating_new_head:
- setup_commands = config["head_setup_commands"]
- ray_start_commands = []
- else:
- setup_commands = config["head_setup_commands"]
- ray_start_commands = config["head_start_ray_commands"]
- if not no_restart:
- warn_about_bad_start_command(ray_start_commands, no_monitor_on_head)
- # Use RAY_UP_enable_autoscaler_v2 instead of RAY_enable_autoscaler_v2
- # to avoid accidentally enabling autoscaler v2 for ray up
- # due to env inheritance. The default value is 1 since Ray 2.50.0.
- if os.getenv("RAY_UP_enable_autoscaler_v2", "1") == "1":
- if "RAY_UP_enable_autoscaler_v2" not in os.environ:
- # TODO (rueian): Remove this notice after Ray 2.52.0.
- cli_logger.print(
- "Autoscaler v2 is now enabled by default (since Ray 2.50.0). "
- "To switch back to v1, set {}=0. This message can be suppressed by setting {} explicitly.",
- cf.bold("RAY_UP_enable_autoscaler_v2"),
- cf.bold("RAY_UP_enable_autoscaler_v2"),
- )
- ray_start_commands = with_envs(
- ray_start_commands,
- {
- "RAY_enable_autoscaler_v2": "1",
- "RAY_CLOUD_INSTANCE_ID": head_node,
- "RAY_NODE_TYPE_NAME": head_node_type,
- },
- )
- updater = NodeUpdaterThread(
- node_id=head_node,
- provider_config=config["provider"],
- provider=provider,
- auth_config=config["auth"],
- cluster_name=config["cluster_name"],
- file_mounts=config["file_mounts"],
- initialization_commands=config["initialization_commands"],
- setup_commands=setup_commands,
- ray_start_commands=ray_start_commands,
- process_runner=_runner,
- runtime_hash=runtime_hash,
- file_mounts_contents_hash=file_mounts_contents_hash,
- is_head_node=True,
- node_resources=head_node_resources,
- node_labels=head_node_labels,
- rsync_options={
- "rsync_exclude": config.get("rsync_exclude"),
- "rsync_filter": config.get("rsync_filter"),
- },
- docker_config=config.get("docker"),
- restart_only=restart_only,
- )
- updater.start()
- updater.join()
- # Refresh the node cache so we see the external ip if available
- provider.non_terminated_nodes(head_node_tags)
- if updater.exitcode != 0:
- # todo: this does not follow the mockup and is not good enough
- cli_logger.abort("Failed to setup head node.")
- sys.exit(1)
- global_event_system.execute_callback(
- CreateClusterEvent.cluster_booting_completed,
- {
- "head_node_id": head_node,
- },
- )
- monitor_str = "tail -n 100 -f /tmp/ray/session_latest/logs/monitor*"
- if override_cluster_name:
- modifiers = " --cluster-name={}".format(quote(override_cluster_name))
- else:
- modifiers = ""
- cli_logger.newline()
- with cli_logger.group("Useful commands:"):
- printable_config_file = os.path.abspath(printable_config_file)
- cli_logger.print("To terminate the cluster:")
- cli_logger.print(cf.bold(f" ray down {printable_config_file}{modifiers}"))
- cli_logger.newline()
- cli_logger.print("To retrieve the IP address of the cluster head:")
- cli_logger.print(
- cf.bold(f" ray get-head-ip {printable_config_file}{modifiers}")
- )
- cli_logger.newline()
- cli_logger.print(
- "To port-forward the cluster's Ray Dashboard to the local machine:"
- )
- cli_logger.print(cf.bold(f" ray dashboard {printable_config_file}{modifiers}"))
- cli_logger.newline()
- cli_logger.print(
- "To submit a job to the cluster, port-forward the "
- "Ray Dashboard in another terminal and run:"
- )
- cli_logger.print(
- cf.bold(
- " ray job submit --address http://localhost:<dashboard-port> "
- "--working-dir . -- python my_script.py"
- )
- )
- cli_logger.newline()
- cli_logger.print("To connect to a terminal on the cluster head for debugging:")
- cli_logger.print(cf.bold(f" ray attach {printable_config_file}{modifiers}"))
- cli_logger.newline()
- cli_logger.print("To monitor autoscaling:")
- cli_logger.print(
- cf.bold(
- f" ray exec {printable_config_file}{modifiers} {quote(monitor_str)}"
- )
- )
- cli_logger.newline()
- # Clean up temporary config file if it was created
- # Clean up temporary config file if it was created on Windows
- if (
- sys.platform == "win32"
- and not no_monitor_on_head
- and "remote_config_file" in locals()
- ):
- try:
- os.remove(remote_config_file.name)
- except OSError:
- pass # Ignore cleanup errors
- def _should_create_new_head(
- head_node_id: Optional[str],
- new_launch_hash: str,
- new_head_node_type: str,
- provider: NodeProvider,
- ) -> bool:
- """Decides whether a new head node needs to be created.
- We need a new head if at least one of the following holds:
- (a) There isn't an existing head node
- (b) The user-submitted head node_config differs from the existing head
- node's node_config.
- (c) The user-submitted head node_type key differs from the existing head
- node's node_type.
- Args:
- head_node_id (Optional[str]): head node id if a head exists, else None
- new_launch_hash: hash of current user-submitted head config
- new_head_node_type: current user-submitted head node-type key
- Returns:
- bool: True if a new Ray head node should be launched, False otherwise
- """
- if not head_node_id:
- # No head node exists, need to create it.
- return True
- # Pull existing head's data.
- head_tags = provider.node_tags(head_node_id)
- current_launch_hash = head_tags.get(TAG_RAY_LAUNCH_CONFIG)
- current_head_type = head_tags.get(TAG_RAY_USER_NODE_TYPE)
- # Compare to current head
- hashes_mismatch = new_launch_hash != current_launch_hash
- types_mismatch = new_head_node_type != current_head_type
- new_head_required = hashes_mismatch or types_mismatch
- # Warn user
- if new_head_required:
- with cli_logger.group(
- "Currently running head node is out-of-date with cluster configuration"
- ):
- if hashes_mismatch:
- cli_logger.print(
- "Current hash is {}, expected {}",
- cf.bold(current_launch_hash),
- cf.bold(new_launch_hash),
- )
- if types_mismatch:
- cli_logger.print(
- "Current head node type is {}, expected {}",
- cf.bold(current_head_type),
- cf.bold(new_head_node_type),
- )
- return new_head_required
- def _set_up_config_for_head_node(
- config: Dict[str, Any], provider: NodeProvider, no_restart: bool
- ) -> Tuple[Dict[str, Any], Any]:
- """Prepares autoscaling config and, if needed, ssh key, to be mounted onto
- the Ray head node for use by the autoscaler.
- Returns the modified config and the temporary config file that will be
- mounted onto the head node.
- """
- # Rewrite the auth config so that the head
- # node can update the workers
- remote_config = copy.deepcopy(config)
- # drop proxy options if they exist, otherwise
- # head node won't be able to connect to workers
- remote_config["auth"].pop("ssh_proxy_command", None)
- # Drop the head_node field if it was introduced. It is technically not a
- # valid field in the config, but it may have been introduced after
- # validation (see _bootstrap_config() call to
- # provider_cls.bootstrap_config(config)). The head node will never try to
- # launch a head node so it doesn't need these defaults.
- remote_config.pop("head_node", None)
- if "ssh_private_key" in config["auth"]:
- remote_key_path = "~/ray_bootstrap_key.pem"
- remote_config["auth"]["ssh_private_key"] = remote_key_path
- # Adjust for new file locations
- new_mounts = {}
- for remote_path in config["file_mounts"]:
- new_mounts[remote_path] = remote_path
- remote_config["file_mounts"] = new_mounts
- remote_config["no_restart"] = no_restart
- remote_config = provider.prepare_for_head_node(remote_config)
- # Now inject the rewritten config and SSH key into the head node
- is_windows = sys.platform == "win32"
- remote_config_file = tempfile.NamedTemporaryFile(
- "w", prefix="ray-bootstrap-", delete=not is_windows
- )
- remote_config_file.write(json.dumps(remote_config))
- remote_config_file.flush()
- if is_windows:
- remote_config_file.close() # Close the file handle to ensure it's accessible
- config["file_mounts"].update(
- {"~/ray_bootstrap_config.yaml": remote_config_file.name}
- )
- if "ssh_private_key" in config["auth"]:
- config["file_mounts"].update(
- {
- remote_key_path: config["auth"]["ssh_private_key"],
- }
- )
- return config, remote_config_file
- def attach_cluster(
- config_file: str,
- start: bool,
- use_screen: bool,
- use_tmux: bool,
- override_cluster_name: Optional[str],
- no_config_cache: bool = False,
- new: bool = False,
- port_forward: Optional[Port_forward] = None,
- node_ip: Optional[str] = None,
- ) -> None:
- """Attaches to a screen for the specified cluster.
- Arguments:
- config_file: path to the cluster yaml
- start: whether to start the cluster if it isn't up
- use_screen: whether to use screen as multiplexer
- use_tmux: whether to use tmux as multiplexer
- override_cluster_name: set the name of the cluster
- no_config_cache: whether to skip the config cache
- new: whether to force a new screen
- port_forward ( (int,int) or list[(int,int)] ): port(s) to forward
- node_ip: IP address of the node to attach to
- """
- if use_tmux:
- if new:
- cmd = "tmux new"
- else:
- cmd = "tmux attach || tmux new"
- elif use_screen:
- if new:
- cmd = "screen -L"
- else:
- cmd = "screen -L -xRR"
- else:
- if new:
- raise ValueError("--new only makes sense if passing --screen or --tmux")
- cmd = "$SHELL"
- exec_cluster(
- config_file,
- cmd=cmd,
- run_env="auto",
- screen=False,
- tmux=False,
- stop=False,
- start=start,
- override_cluster_name=override_cluster_name,
- no_config_cache=no_config_cache,
- port_forward=port_forward,
- _allow_uninitialized_state=True,
- node_ip=node_ip,
- )
- def exec_cluster(
- config_file: str,
- *,
- cmd: Optional[str] = None,
- run_env: str = "auto",
- screen: bool = False,
- tmux: bool = False,
- stop: bool = False,
- start: bool = False,
- override_cluster_name: Optional[str] = None,
- no_config_cache: bool = False,
- port_forward: Optional[Port_forward] = None,
- with_output: bool = False,
- _allow_uninitialized_state: bool = False,
- extra_screen_args: Optional[str] = None,
- node_ip: Optional[str] = None,
- ) -> str:
- """Runs a command on the specified cluster.
- Arguments:
- config_file: path to the cluster yaml
- cmd: command to run
- run_env: whether to run the command on the host or in a container.
- Select between "auto", "host" and "docker"
- screen: whether to run in a screen
- tmux: whether to run in a tmux session
- stop: whether to stop the cluster after command run
- start: whether to start the cluster if it isn't up
- override_cluster_name: set the name of the cluster
- no_config_cache: whether to skip the config cache
- port_forward: port(s) to forward
- with_output: whether to return the command output
- _allow_uninitialized_state: whether to execute on an uninitialized head
- node.
- extra_screen_args: optional custom additional args to screen command
- node_ip: IP address of the node to execute on
- """
- assert not (screen and tmux), "Can specify only one of `screen` or `tmux`."
- assert run_env in RUN_ENV_TYPES, "--run_env must be in {}".format(RUN_ENV_TYPES)
- # TODO(rliaw): We default this to True to maintain backwards-compat.
- # In the future we would want to support disabling login-shells
- # and interactivity.
- cmd_output_util.set_allow_interactive(True)
- config = yaml.safe_load(open(config_file).read())
- if override_cluster_name is not None:
- config["cluster_name"] = override_cluster_name
- config = _bootstrap_config(config, no_config_cache=no_config_cache)
- provider = _get_node_provider(config["provider"], config["cluster_name"])
- if node_ip:
- # IP specified by user, find the node with the IP
- if start:
- cli_logger.warning(
- "The {} flag is ignored when {} is specified, "
- "as the node IP can be either head or worker node. "
- "If you need to start the cluster, run {} first, "
- "or use {} without {}.",
- cf.bold("--start"),
- cf.bold("--node-ip"),
- cf.bold(f"ray up {config_file}"),
- cf.bold("ray attach"),
- cf.bold("--node-ip"),
- )
- use_internal_ip = config.get("provider", {}).get("use_internal_ips", False)
- try:
- target_node = provider.get_node_id(node_ip, use_internal_ip=use_internal_ip)
- cli_logger.print("Attaching to node with IP: {}", cf.bold(node_ip))
- except ValueError as e:
- cli_logger.abort(
- "Could not find node with IP {}. {}", cf.bold(node_ip), str(e)
- )
- is_head_node = (
- provider.node_tags(target_node)[TAG_RAY_NODE_KIND] == NODE_KIND_HEAD
- )
- else:
- # Default attaching to head node
- target_node = _get_running_head_node(
- config,
- config_file,
- override_cluster_name,
- create_if_needed=start,
- _provider=provider,
- _allow_uninitialized_state=_allow_uninitialized_state,
- )
- is_head_node = True
- updater = NodeUpdaterThread(
- node_id=target_node,
- provider_config=config["provider"],
- provider=provider,
- auth_config=config["auth"],
- cluster_name=config["cluster_name"],
- file_mounts=config["file_mounts"],
- initialization_commands=[],
- setup_commands=[],
- ray_start_commands=[],
- runtime_hash="",
- file_mounts_contents_hash="",
- is_head_node=is_head_node,
- rsync_options={
- "rsync_exclude": config.get("rsync_exclude"),
- "rsync_filter": config.get("rsync_filter"),
- },
- docker_config=config.get("docker"),
- )
- if cmd and stop:
- cmd = "; ".join(
- [
- cmd,
- "ray stop",
- "ray teardown ~/ray_bootstrap_config.yaml --yes --workers-only",
- "sudo shutdown -h now",
- ]
- )
- result = _exec(
- updater,
- cmd,
- screen,
- tmux,
- port_forward=port_forward,
- with_output=with_output,
- run_env=run_env,
- shutdown_after_run=False,
- extra_screen_args=extra_screen_args,
- )
- if tmux or screen:
- attach_command_parts = ["ray attach", config_file]
- if override_cluster_name is not None:
- attach_command_parts.append(
- "--cluster-name={}".format(override_cluster_name)
- )
- if node_ip is not None:
- attach_command_parts.append("--node-ip={}".format(node_ip))
- if tmux:
- attach_command_parts.append("--tmux")
- elif screen:
- attach_command_parts.append("--screen")
- attach_command = " ".join(attach_command_parts)
- cli_logger.print("Run `{}` to check command status.", cf.bold(attach_command))
- return result
- def _exec(
- updater: NodeUpdaterThread,
- cmd: Optional[str] = None,
- screen: bool = False,
- tmux: bool = False,
- port_forward: Optional[Port_forward] = None,
- with_output: bool = False,
- run_env: str = "auto",
- shutdown_after_run: bool = False,
- extra_screen_args: Optional[str] = None,
- ) -> str:
- if cmd:
- if screen:
- wrapped_cmd = [
- "screen",
- "-L",
- "-dm",
- ]
- if extra_screen_args is not None and len(extra_screen_args) > 0:
- wrapped_cmd += [extra_screen_args]
- wrapped_cmd += [
- "bash",
- "-c",
- quote(cmd + "; exec bash"),
- ]
- cmd = " ".join(wrapped_cmd)
- elif tmux:
- # TODO: Consider providing named session functionality
- wrapped_cmd = [
- "tmux",
- "new",
- "-d",
- "bash",
- "-c",
- quote(cmd + "; exec bash"),
- ]
- cmd = " ".join(wrapped_cmd)
- return updater.cmd_runner.run(
- cmd,
- exit_on_fail=True,
- port_forward=port_forward,
- with_output=with_output,
- run_env=run_env,
- shutdown_after_run=shutdown_after_run,
- )
- def rsync(
- config_file: str,
- source: Optional[str],
- target: Optional[str],
- override_cluster_name: Optional[str],
- down: bool,
- ip_address: Optional[str] = None,
- use_internal_ip: bool = False,
- no_config_cache: bool = False,
- all_nodes: bool = False,
- should_bootstrap: bool = True,
- _runner: ModuleType = subprocess,
- ) -> None:
- """Rsyncs files.
- Arguments:
- config_file: path to the cluster yaml
- source: source dir
- target: target dir
- override_cluster_name: set the name of the cluster
- down: whether we're syncing remote -> local
- ip_address: Address of node. Raise Exception
- if both ip_address and 'all_nodes' are provided.
- use_internal_ip: Whether the provided ip_address is
- public or private.
- all_nodes: whether to sync worker nodes in addition to the head node
- should_bootstrap: whether to bootstrap cluster config before syncing
- """
- if bool(source) != bool(target):
- cli_logger.abort("Expected either both a source and a target, or neither.")
- assert bool(source) == bool(
- target
- ), "Must either provide both or neither source and target."
- if ip_address and all_nodes:
- cli_logger.abort("Cannot provide both ip_address and 'all_nodes'.")
- config = yaml.safe_load(open(config_file).read())
- if override_cluster_name is not None:
- config["cluster_name"] = override_cluster_name
- if should_bootstrap:
- config = _bootstrap_config(config, no_config_cache=no_config_cache)
- is_file_mount = False
- if source and target:
- for remote_mount in config.get("file_mounts", {}).keys():
- if (source if down else target).startswith(remote_mount):
- is_file_mount = True
- break
- provider = _get_node_provider(config["provider"], config["cluster_name"])
- def rsync_to_node(node_id, is_head_node):
- updater = NodeUpdaterThread(
- node_id=node_id,
- provider_config=config["provider"],
- provider=provider,
- auth_config=config["auth"],
- cluster_name=config["cluster_name"],
- file_mounts=config["file_mounts"],
- initialization_commands=[],
- setup_commands=[],
- ray_start_commands=[],
- runtime_hash="",
- use_internal_ip=use_internal_ip,
- process_runner=_runner,
- file_mounts_contents_hash="",
- is_head_node=is_head_node,
- rsync_options={
- "rsync_exclude": config.get("rsync_exclude"),
- "rsync_filter": config.get("rsync_filter"),
- },
- docker_config=config.get("docker"),
- )
- if down:
- rsync = updater.rsync_down
- else:
- rsync = updater.rsync_up
- if source and target:
- # print rsync progress for single file rsync
- if cli_logger.verbosity > 0:
- cmd_output_util.set_output_redirected(False)
- set_rsync_silent(False)
- rsync(source, target, is_file_mount)
- else:
- updater.sync_file_mounts(rsync)
- nodes = []
- head_node = _get_running_head_node(
- config, config_file, override_cluster_name, create_if_needed=False
- )
- if ip_address:
- nodes = [provider.get_node_id(ip_address, use_internal_ip=use_internal_ip)]
- else:
- nodes = [head_node]
- if all_nodes:
- nodes.extend(_get_worker_nodes(config, override_cluster_name))
- for node_id in nodes:
- rsync_to_node(node_id, is_head_node=(node_id == head_node))
- def get_head_node_ip(
- config_file: str, override_cluster_name: Optional[str] = None
- ) -> str:
- """Returns head node IP for given configuration file if exists."""
- config = yaml.safe_load(open(config_file).read())
- if override_cluster_name is not None:
- config["cluster_name"] = override_cluster_name
- provider = _get_node_provider(config["provider"], config["cluster_name"])
- head_node = _get_running_head_node(config, config_file, override_cluster_name)
- provider_cfg = config.get("provider", {})
- # Get internal IP if using internal IPs and
- # use_external_head_ip is not specified
- if provider_cfg.get("use_internal_ips", False) and not provider_cfg.get(
- "use_external_head_ip", False
- ):
- head_node_ip = provider.internal_ip(head_node)
- else:
- head_node_ip = provider.external_ip(head_node)
- return head_node_ip
- def get_worker_node_ips(
- config_file: str, override_cluster_name: Optional[str] = None
- ) -> List[str]:
- """Returns worker node IPs for given configuration file."""
- config = yaml.safe_load(open(config_file).read())
- if override_cluster_name is not None:
- config["cluster_name"] = override_cluster_name
- provider = _get_node_provider(config["provider"], config["cluster_name"])
- nodes = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
- if config.get("provider", {}).get("use_internal_ips", False):
- return [provider.internal_ip(node) for node in nodes]
- else:
- return [provider.external_ip(node) for node in nodes]
- def _get_worker_nodes(
- config: Dict[str, Any], override_cluster_name: Optional[str]
- ) -> List[str]:
- """Returns worker node ids for given configuration."""
- # todo: technically could be reused in get_worker_node_ips
- if override_cluster_name is not None:
- config["cluster_name"] = override_cluster_name
- provider = _get_node_provider(config["provider"], config["cluster_name"])
- return provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
- def _get_running_head_node(
- config: Dict[str, Any],
- printable_config_file: str,
- override_cluster_name: Optional[str],
- create_if_needed: bool = False,
- _provider: Optional[NodeProvider] = None,
- _allow_uninitialized_state: bool = False,
- ) -> str:
- """Get a valid, running head node.
- Args:
- config (Dict[str, Any]): Cluster Config dictionary
- printable_config_file: Used for printing formatted CLI commands.
- override_cluster_name: Passed to `get_or_create_head_node` to
- override the cluster name present in `config`.
- create_if_needed: Create a head node if one is not present.
- _provider: [For testing], a Node Provider to use.
- _allow_uninitialized_state: Whether to return a head node that
- is not 'UP TO DATE'. This is used to allow `ray attach` and
- `ray exec` to debug a cluster in a bad state.
- """
- provider = _provider or _get_node_provider(
- config["provider"], config["cluster_name"]
- )
- head_node_tags = {
- TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
- }
- nodes = provider.non_terminated_nodes(head_node_tags)
- head_node = None
- _backup_head_node = None
- for node in nodes:
- node_state = provider.node_tags(node).get(TAG_RAY_NODE_STATUS)
- if node_state == STATUS_UP_TO_DATE:
- head_node = node
- else:
- _backup_head_node = node
- cli_logger.warning(f"Head node ({node}) is in state {node_state}.")
- if head_node is not None:
- return head_node
- elif create_if_needed:
- get_or_create_head_node(
- config,
- printable_config_file=printable_config_file,
- restart_only=False,
- no_restart=False,
- yes=True,
- override_cluster_name=override_cluster_name,
- )
- # NOTE: `_allow_uninitialized_state` is forced to False if
- # `create_if_needed` is set to True. This is to ensure that the
- # commands executed after creation occur on an actually running
- # cluster.
- return _get_running_head_node(
- config,
- printable_config_file,
- override_cluster_name,
- create_if_needed=False,
- _allow_uninitialized_state=False,
- )
- else:
- if _allow_uninitialized_state and _backup_head_node is not None:
- cli_logger.warning(
- f"The head node being returned: {_backup_head_node} is not "
- "`up-to-date`. If you are not debugging a startup issue "
- "it is recommended to restart this head node with: {}",
- cf.bold(f" ray down {printable_config_file}"),
- )
- return _backup_head_node
- raise RuntimeError(
- "Head node of cluster ({}) not found!".format(config["cluster_name"])
- )
- def get_local_dump_archive(
- stream: bool = False,
- output: Optional[str] = None,
- logs: bool = True,
- debug_state: bool = True,
- pip: bool = True,
- processes: bool = True,
- processes_verbose: bool = False,
- tempfile: Optional[str] = None,
- ) -> Optional[str]:
- if stream and output:
- raise ValueError(
- "You can only use either `--output` or `--stream`, but not both."
- )
- parameters = GetParameters(
- logs=logs,
- debug_state=debug_state,
- pip=pip,
- processes=processes,
- processes_verbose=processes_verbose,
- )
- with Archive(file=tempfile) as archive:
- get_all_local_data(archive, parameters)
- tmp = archive.file
- if stream:
- with open(tmp, "rb") as fp:
- os.write(1, fp.read())
- os.remove(tmp)
- return None
- target = output or os.path.join(os.getcwd(), os.path.basename(tmp))
- shutil.move(tmp, target)
- cli_logger.print(f"Created local data archive at {target}")
- return target
- def get_cluster_dump_archive(
- cluster_config_file: Optional[str] = None,
- host: Optional[str] = None,
- ssh_user: Optional[str] = None,
- ssh_key: Optional[str] = None,
- docker: Optional[str] = None,
- local: Optional[bool] = None,
- output: Optional[str] = None,
- logs: bool = True,
- debug_state: bool = True,
- pip: bool = True,
- processes: bool = True,
- processes_verbose: bool = False,
- tempfile: Optional[str] = None,
- ) -> Optional[str]:
- # Inform the user what kind of logs are collected (before actually
- # collecting, so they can abort)
- content_str = ""
- if logs:
- content_str += (
- " - The logfiles of your Ray session\n"
- " This usually includes Python outputs (stdout/stderr)\n"
- )
- if debug_state:
- content_str += (
- " - Debug state information on your Ray cluster \n"
- " e.g. number of workers, drivers, objects, etc.\n"
- )
- if pip:
- content_str += " - Your installed Python packages (`pip freeze`)\n"
- if processes:
- content_str += (
- " - Information on your running Ray processes\n"
- " This includes command line arguments\n"
- )
- cli_logger.warning(
- "You are about to create a cluster dump. This will collect data from "
- "cluster nodes.\n\n"
- "The dump will contain this information:\n\n"
- f"{content_str}\n"
- f"If you are concerned about leaking private information, extract "
- f"the archive and inspect its contents before sharing it with "
- f"anyone."
- )
- # Parse arguments (e.g. fetch info from cluster config)
- (
- cluster_config_file,
- hosts,
- ssh_user,
- ssh_key,
- docker,
- cluster_name,
- ) = _info_from_params(cluster_config_file, host, ssh_user, ssh_key, docker)
- nodes = [
- Node(host=h, ssh_user=ssh_user, ssh_key=ssh_key, docker_container=docker)
- for h in hosts
- ]
- if not nodes:
- cli_logger.error(
- "No nodes found. Specify with `--host` or by passing a ray "
- "cluster config to `--cluster`."
- )
- return None
- if cluster_config_file:
- nodes[0].is_head = True
- if local is None:
- # If called with a cluster config, this was probably started
- # from a laptop
- local = not bool(cluster_config_file)
- parameters = GetParameters(
- logs=logs,
- debug_state=debug_state,
- pip=pip,
- processes=processes,
- processes_verbose=processes_verbose,
- )
- with Archive(file=tempfile) as archive:
- if local:
- create_archive_for_local_and_remote_nodes(
- archive, remote_nodes=nodes, parameters=parameters
- )
- else:
- create_archive_for_remote_nodes(
- archive, remote_nodes=nodes, parameters=parameters
- )
- if not output:
- if cluster_name:
- filename = (
- f"{cluster_name}_" f"{datetime.datetime.now():%Y-%m-%d_%H-%M-%S}.tar.gz"
- )
- else:
- filename = (
- f"collected_logs_" f"{datetime.datetime.now():%Y-%m-%d_%H-%M-%S}.tar.gz"
- )
- output = os.path.join(os.getcwd(), filename)
- else:
- output = os.path.expanduser(output)
- shutil.move(archive.file, output)
- return output
- def confirm(msg: str, yes: bool) -> Optional[bool]:
- return None if yes else click.confirm(msg, abort=True)
|