commands.py 61 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752
  1. import copy
  2. import datetime
  3. import hashlib
  4. import json
  5. import logging
  6. import os
  7. import random
  8. import shutil
  9. import subprocess
  10. import sys
  11. import tempfile
  12. import time
  13. from concurrent.futures import ThreadPoolExecutor
  14. from types import ModuleType
  15. from typing import Any, Dict, List, Optional, Tuple, Union
  16. import click
  17. import yaml
  18. import ray
  19. from ray._common.usage import usage_lib
  20. from ray.autoscaler._private import subprocess_output_util as cmd_output_util
  21. from ray.autoscaler._private.autoscaler import AutoscalerSummary
  22. from ray.autoscaler._private.cli_logger import cf, cli_logger
  23. from ray.autoscaler._private.cluster_dump import (
  24. Archive,
  25. GetParameters,
  26. Node,
  27. _info_from_params,
  28. create_archive_for_local_and_remote_nodes,
  29. create_archive_for_remote_nodes,
  30. get_all_local_data,
  31. )
  32. from ray.autoscaler._private.command_runner import (
  33. set_rsync_silent,
  34. set_using_login_shells,
  35. )
  36. from ray.autoscaler._private.constants import (
  37. AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
  38. MAX_PARALLEL_SHUTDOWN_WORKERS,
  39. )
  40. from ray.autoscaler._private.event_system import CreateClusterEvent, global_event_system
  41. from ray.autoscaler._private.log_timer import LogTimer
  42. from ray.autoscaler._private.node_provider_availability_tracker import (
  43. NodeAvailabilitySummary,
  44. )
  45. from ray.autoscaler._private.providers import (
  46. _NODE_PROVIDERS,
  47. _PROVIDER_PRETTY_NAMES,
  48. _get_node_provider,
  49. )
  50. from ray.autoscaler._private.updater import NodeUpdaterThread
  51. from ray.autoscaler._private.util import (
  52. LoadMetricsSummary,
  53. format_info_string,
  54. hash_launch_conf,
  55. hash_runtime_conf,
  56. prepare_config,
  57. validate_config,
  58. with_envs,
  59. )
  60. from ray.autoscaler.node_provider import NodeProvider
  61. from ray.autoscaler.tags import (
  62. NODE_KIND_HEAD,
  63. NODE_KIND_WORKER,
  64. STATUS_UNINITIALIZED,
  65. STATUS_UP_TO_DATE,
  66. TAG_RAY_LAUNCH_CONFIG,
  67. TAG_RAY_NODE_KIND,
  68. TAG_RAY_NODE_NAME,
  69. TAG_RAY_NODE_STATUS,
  70. TAG_RAY_USER_NODE_TYPE,
  71. )
  72. from ray.experimental.internal_kv import _internal_kv_put, internal_kv_get_gcs_client
  73. from ray.util.debug import log_once
  74. try: # py3
  75. from shlex import quote
  76. except ImportError: # py2
  77. from pipes import quote
  78. logger = logging.getLogger(__name__)
  79. RUN_ENV_TYPES = ["auto", "host", "docker"]
  80. POLL_INTERVAL = 5
  81. Port_forward = Union[Tuple[int, int], List[Tuple[int, int]]]
  82. def try_logging_config(config: Dict[str, Any]) -> None:
  83. if config["provider"]["type"] == "aws":
  84. from ray.autoscaler._private.aws.config import log_to_cli
  85. log_to_cli(config)
  86. def try_get_log_state(provider_config: Dict[str, Any]) -> Optional[dict]:
  87. if provider_config["type"] == "aws":
  88. from ray.autoscaler._private.aws.config import get_log_state
  89. return get_log_state()
  90. return None
  91. def try_reload_log_state(provider_config: Dict[str, Any], log_state: dict) -> None:
  92. if not log_state:
  93. return
  94. if provider_config["type"] == "aws":
  95. from ray.autoscaler._private.aws.config import reload_log_state
  96. return reload_log_state(log_state)
  97. def debug_status(
  98. status, error, verbose: bool = False, address: Optional[str] = None
  99. ) -> str:
  100. """
  101. Return a debug string for the autoscaler.
  102. Args:
  103. status: The autoscaler status string for v1
  104. error: The autoscaler error string for v1
  105. verbose: Whether to print verbose information.
  106. address: The address of the cluster (gcs address).
  107. Returns:
  108. str: A debug string for the cluster's status.
  109. """
  110. from ray.autoscaler.v2.utils import is_autoscaler_v2
  111. if is_autoscaler_v2():
  112. from ray.autoscaler.v2.sdk import get_cluster_status
  113. from ray.autoscaler.v2.utils import ClusterStatusFormatter
  114. cluster_status = get_cluster_status(address)
  115. status = ClusterStatusFormatter.format(cluster_status, verbose=verbose)
  116. elif status:
  117. status = status.decode("utf-8")
  118. status_dict = json.loads(status)
  119. lm_summary_dict = status_dict.get("load_metrics_report")
  120. autoscaler_summary_dict = status_dict.get("autoscaler_report")
  121. timestamp = status_dict.get("time")
  122. gcs_request_time = status_dict.get("gcs_request_time")
  123. non_terminated_nodes_time = status_dict.get("non_terminated_nodes_time")
  124. if lm_summary_dict and autoscaler_summary_dict and timestamp:
  125. lm_summary = LoadMetricsSummary(**lm_summary_dict)
  126. node_availability_summary_dict = autoscaler_summary_dict.pop(
  127. "node_availability_summary", {}
  128. )
  129. node_availability_summary = NodeAvailabilitySummary.from_fields(
  130. **node_availability_summary_dict
  131. )
  132. autoscaler_summary = AutoscalerSummary(
  133. node_availability_summary=node_availability_summary,
  134. **autoscaler_summary_dict,
  135. )
  136. report_time = datetime.datetime.fromtimestamp(timestamp)
  137. status = format_info_string(
  138. lm_summary,
  139. autoscaler_summary,
  140. time=report_time,
  141. gcs_request_time=gcs_request_time,
  142. non_terminated_nodes_time=non_terminated_nodes_time,
  143. verbose=verbose,
  144. )
  145. else:
  146. status = (
  147. "No cluster status. It may take a few seconds "
  148. "for the Ray internal services to start up."
  149. )
  150. else:
  151. status = (
  152. "No cluster status. It may take a few seconds "
  153. "for the Ray internal services to start up."
  154. )
  155. if error:
  156. status += "\n"
  157. status += error.decode("utf-8")
  158. return status
  159. def request_resources(
  160. num_cpus: Optional[int] = None,
  161. bundles: Optional[List[dict]] = None,
  162. bundle_label_selectors: Optional[List[dict]] = None,
  163. ) -> None:
  164. """Remotely request some CPU or GPU resources from the autoscaler. Optionally
  165. specify label selectors for nodes with the requested resources.
  166. If `bundle_label_selectors` is provided, `bundles` must also be provided.
  167. Both must be lists of the same length, and `bundle_label_selectors` expects a list
  168. of string dictionaries.
  169. This function is to be called e.g. on a node before submitting a bunch of
  170. ray.remote calls to ensure that resources rapidly become available.
  171. Args:
  172. num_cpus: Scale the cluster to ensure this number of CPUs are
  173. available. This request is persistent until another call to
  174. request_resources() is made.
  175. bundles (List[ResourceDict]): Scale the cluster to ensure this set of
  176. resource shapes can fit. This request is persistent until another
  177. call to request_resources() is made.
  178. bundle_label_selectors (List[Dict[str,str]]): Optional label selectors
  179. that new nodes must satisfy. (e.g. [{"accelerator-type": "A100"}])
  180. The elements in the bundle_label_selectors should be one-to-one mapping
  181. to the elements in bundles.
  182. """
  183. if not ray.is_initialized():
  184. raise RuntimeError("Ray is not initialized yet")
  185. to_request = []
  186. for _ in range(num_cpus or 0):
  187. to_request.append({"resources": {"CPU": 1}, "label_selector": {}})
  188. assert not bundle_label_selectors or (
  189. bundles and len(bundles) == len(bundle_label_selectors)
  190. ), "If bundle_label_selectors is provided, bundles must also be provided and have the same length."
  191. if bundles:
  192. for i, bundle in enumerate(bundles):
  193. selector = bundle_label_selectors[i] if bundle_label_selectors else {}
  194. to_request.append({"resources": bundle, "label_selector": selector})
  195. from ray.autoscaler.v2.utils import is_autoscaler_v2
  196. if is_autoscaler_v2():
  197. # For v2 autoscaler: use new format with label_selectors via GCS RPC
  198. from ray.autoscaler.v2.sdk import request_cluster_resources
  199. gcs_address = internal_kv_get_gcs_client().address
  200. request_cluster_resources(gcs_address, to_request)
  201. else:
  202. # For v1 autoscaler: write old format (ResourceDict) to KV
  203. # Extract resources field for backward compatibility
  204. to_request_v1 = [req["resources"] for req in to_request]
  205. _internal_kv_put(
  206. AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
  207. json.dumps(to_request_v1),
  208. overwrite=True,
  209. )
  210. def create_or_update_cluster(
  211. config_file: str,
  212. override_min_workers: Optional[int],
  213. override_max_workers: Optional[int],
  214. no_restart: bool,
  215. restart_only: bool,
  216. yes: bool,
  217. override_cluster_name: Optional[str] = None,
  218. no_config_cache: bool = False,
  219. redirect_command_output: Optional[bool] = False,
  220. use_login_shells: bool = True,
  221. no_monitor_on_head: bool = False,
  222. ) -> Dict[str, Any]:
  223. """Creates or updates an autoscaling Ray cluster from a config json."""
  224. # no_monitor_on_head is an internal flag used by the Ray K8s operator.
  225. # If True, prevents autoscaling config sync to the Ray head during cluster
  226. # creation. See https://github.com/ray-project/ray/pull/13720.
  227. set_using_login_shells(use_login_shells)
  228. if not use_login_shells:
  229. cmd_output_util.set_allow_interactive(False)
  230. if redirect_command_output is None:
  231. # Do not redirect by default.
  232. cmd_output_util.set_output_redirected(False)
  233. else:
  234. cmd_output_util.set_output_redirected(redirect_command_output)
  235. def handle_yaml_error(e):
  236. cli_logger.error("Cluster config invalid")
  237. cli_logger.newline()
  238. cli_logger.error("Failed to load YAML file " + cf.bold("{}"), config_file)
  239. cli_logger.newline()
  240. with cli_logger.verbatim_error_ctx("PyYAML error:"):
  241. cli_logger.error(e)
  242. cli_logger.abort()
  243. try:
  244. config = yaml.safe_load(open(config_file).read())
  245. except FileNotFoundError:
  246. cli_logger.abort(
  247. "Provided cluster configuration file ({}) does not exist",
  248. cf.bold(config_file),
  249. )
  250. except yaml.parser.ParserError as e:
  251. handle_yaml_error(e)
  252. raise
  253. except yaml.scanner.ScannerError as e:
  254. handle_yaml_error(e)
  255. raise
  256. global_event_system.execute_callback(
  257. CreateClusterEvent.up_started, {"cluster_config": config}
  258. )
  259. # todo: validate file_mounts, ssh keys, etc.
  260. importer = _NODE_PROVIDERS.get(config["provider"]["type"])
  261. if not importer:
  262. cli_logger.abort(
  263. "Unknown provider type " + cf.bold("{}") + "\n"
  264. "Available providers are: {}",
  265. config["provider"]["type"],
  266. cli_logger.render_list(
  267. [k for k in _NODE_PROVIDERS.keys() if _NODE_PROVIDERS[k] is not None]
  268. ),
  269. )
  270. printed_overrides = False
  271. def handle_cli_override(key, override):
  272. if override is not None:
  273. if key in config:
  274. nonlocal printed_overrides
  275. printed_overrides = True
  276. cli_logger.warning(
  277. "`{}` override provided on the command line.\n"
  278. " Using "
  279. + cf.bold("{}")
  280. + cf.dimmed(" [configuration file has " + cf.bold("{}") + "]"),
  281. key,
  282. override,
  283. config[key],
  284. )
  285. config[key] = override
  286. handle_cli_override("min_workers", override_min_workers)
  287. handle_cli_override("max_workers", override_max_workers)
  288. handle_cli_override("cluster_name", override_cluster_name)
  289. if printed_overrides:
  290. cli_logger.newline()
  291. cli_logger.labeled_value("Cluster", config["cluster_name"])
  292. cli_logger.newline()
  293. config = _bootstrap_config(config, no_config_cache=no_config_cache)
  294. try_logging_config(config)
  295. get_or_create_head_node(
  296. config,
  297. config_file,
  298. no_restart,
  299. restart_only,
  300. yes,
  301. override_cluster_name,
  302. no_monitor_on_head,
  303. )
  304. return config
  305. CONFIG_CACHE_VERSION = 1
  306. def _bootstrap_config(
  307. config: Dict[str, Any], no_config_cache: bool = False
  308. ) -> Dict[str, Any]:
  309. config = prepare_config(config)
  310. # NOTE: multi-node-type autoscaler is guaranteed to be in use after this.
  311. hasher = hashlib.sha256()
  312. hasher.update(json.dumps([config], sort_keys=True).encode("utf-8"))
  313. cache_key = os.path.join(
  314. tempfile.gettempdir(), "ray-config-{}".format(hasher.hexdigest())
  315. )
  316. if os.path.exists(cache_key) and not no_config_cache:
  317. config_cache = json.loads(open(cache_key).read())
  318. if config_cache.get("_version", -1) == CONFIG_CACHE_VERSION:
  319. # todo: is it fine to re-resolve? afaik it should be.
  320. # we can have migrations otherwise or something
  321. # but this seems overcomplicated given that resolving is
  322. # relatively cheap
  323. try_reload_log_state(
  324. config_cache["config"]["provider"],
  325. config_cache.get("provider_log_info"),
  326. )
  327. if log_once("_printed_cached_config_warning"):
  328. cli_logger.verbose_warning(
  329. "Loaded cached provider configuration from " + cf.bold("{}"),
  330. cache_key,
  331. )
  332. if cli_logger.verbosity == 0:
  333. cli_logger.warning("Loaded cached provider configuration")
  334. cli_logger.warning(
  335. "If you experience issues with "
  336. "the cloud provider, try re-running "
  337. "the command with {}.",
  338. cf.bold("--no-config-cache"),
  339. )
  340. cached_config = config_cache["config"]
  341. if "provider" in cached_config:
  342. cached_config["provider"]["_config_cache_path"] = cache_key
  343. return cached_config
  344. else:
  345. cli_logger.warning(
  346. "Found cached cluster config "
  347. "but the version " + cf.bold("{}") + " "
  348. "(expected " + cf.bold("{}") + ") does not match.\n"
  349. "This is normal if cluster launcher was updated.\n"
  350. "Config will be re-resolved.",
  351. config_cache.get("_version", "none"),
  352. CONFIG_CACHE_VERSION,
  353. )
  354. importer = _NODE_PROVIDERS.get(config["provider"]["type"])
  355. if not importer:
  356. raise NotImplementedError("Unsupported provider {}".format(config["provider"]))
  357. provider_cls = importer(config["provider"])
  358. cli_logger.print(
  359. "Checking {} environment settings",
  360. _PROVIDER_PRETTY_NAMES.get(config["provider"]["type"]),
  361. )
  362. try:
  363. config = provider_cls.fillout_available_node_types_resources(config)
  364. except Exception as exc:
  365. if cli_logger.verbosity > 2:
  366. logger.exception("Failed to autodetect node resources.")
  367. else:
  368. cli_logger.warning(
  369. f"Failed to autodetect node resources: {str(exc)}. "
  370. "You can see full stack trace with higher verbosity."
  371. )
  372. try:
  373. # NOTE: if `resources` field is missing, validate_config for providers
  374. # other than AWS and Kubernetes will fail (the schema error will ask
  375. # the user to manually fill the resources) as we currently support
  376. # autofilling resources for AWS and Kubernetes only.
  377. validate_config(config)
  378. except (ModuleNotFoundError, ImportError):
  379. cli_logger.abort(
  380. "Not all Ray autoscaler dependencies were found. "
  381. "In Ray 1.4+, the Ray CLI, autoscaler, and dashboard will "
  382. 'only be usable via `pip install "ray[default]"`. Please '
  383. "update your install command."
  384. )
  385. resolved_config = provider_cls.bootstrap_config(config)
  386. resolved_config["provider"]["_config_cache_path"] = cache_key
  387. if not no_config_cache:
  388. with open(cache_key, "w") as f:
  389. config_cache = {
  390. "_version": CONFIG_CACHE_VERSION,
  391. "provider_log_info": try_get_log_state(resolved_config["provider"]),
  392. "config": resolved_config,
  393. }
  394. f.write(json.dumps(config_cache))
  395. return resolved_config
  396. def teardown_cluster(
  397. config_file: str,
  398. yes: bool,
  399. workers_only: bool,
  400. override_cluster_name: Optional[str],
  401. keep_min_workers: bool,
  402. ) -> None:
  403. """Destroys all nodes of a Ray cluster described by a config json."""
  404. config = yaml.safe_load(open(config_file).read())
  405. if override_cluster_name is not None:
  406. config["cluster_name"] = override_cluster_name
  407. config = _bootstrap_config(config)
  408. cli_logger.confirm(yes, "Destroying cluster.", _abort=True)
  409. if not workers_only:
  410. try:
  411. exec_cluster(
  412. config_file,
  413. cmd="ray stop",
  414. run_env="auto",
  415. screen=False,
  416. tmux=False,
  417. stop=False,
  418. start=False,
  419. override_cluster_name=override_cluster_name,
  420. port_forward=None,
  421. with_output=False,
  422. )
  423. except Exception as e:
  424. # todo: add better exception info
  425. cli_logger.verbose_error("{}", str(e))
  426. cli_logger.warning(
  427. "Exception occurred when stopping the cluster Ray runtime "
  428. "(use -v to dump teardown exceptions)."
  429. )
  430. cli_logger.warning(
  431. "Ignoring the exception and "
  432. "attempting to shut down the cluster nodes anyway."
  433. )
  434. provider = _get_node_provider(config["provider"], config["cluster_name"])
  435. def remaining_nodes():
  436. workers = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
  437. if keep_min_workers:
  438. min_workers = config.get("min_workers", 0)
  439. cli_logger.print(
  440. "{} random worker nodes will not be shut down. "
  441. + cf.dimmed("(due to {})"),
  442. cf.bold(min_workers),
  443. cf.bold("--keep-min-workers"),
  444. )
  445. workers = random.sample(workers, len(workers) - min_workers)
  446. # todo: it's weird to kill the head node but not all workers
  447. if workers_only:
  448. cli_logger.print(
  449. "The head node will not be shut down. " + cf.dimmed("(due to {})"),
  450. cf.bold("--workers-only"),
  451. )
  452. return workers
  453. head = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_HEAD})
  454. return head + workers
  455. def run_docker_stop(node, container_name):
  456. try:
  457. updater = NodeUpdaterThread(
  458. node_id=node,
  459. provider_config=config["provider"],
  460. provider=provider,
  461. auth_config=config["auth"],
  462. cluster_name=config["cluster_name"],
  463. file_mounts=config["file_mounts"],
  464. initialization_commands=[],
  465. setup_commands=[],
  466. ray_start_commands=[],
  467. runtime_hash="",
  468. file_mounts_contents_hash="",
  469. is_head_node=False,
  470. docker_config=config.get("docker"),
  471. )
  472. _exec(
  473. updater,
  474. f"docker stop {container_name}",
  475. with_output=False,
  476. run_env="host",
  477. )
  478. except Exception:
  479. cli_logger.warning(f"Docker stop failed on {node}")
  480. # Loop here to check that both the head and worker nodes are actually
  481. # really gone
  482. A = remaining_nodes()
  483. container_name = config.get("docker", {}).get("container_name")
  484. if container_name:
  485. # This is to ensure that the parallel SSH calls below do not mess with
  486. # the users terminal.
  487. output_redir = cmd_output_util.is_output_redirected()
  488. cmd_output_util.set_output_redirected(True)
  489. allow_interactive = cmd_output_util.does_allow_interactive()
  490. cmd_output_util.set_allow_interactive(False)
  491. with ThreadPoolExecutor(max_workers=MAX_PARALLEL_SHUTDOWN_WORKERS) as executor:
  492. for node in A:
  493. executor.submit(
  494. run_docker_stop, node=node, container_name=container_name
  495. )
  496. cmd_output_util.set_output_redirected(output_redir)
  497. cmd_output_util.set_allow_interactive(allow_interactive)
  498. with LogTimer("teardown_cluster: done."):
  499. while A:
  500. provider.terminate_nodes(A)
  501. cli_logger.print(
  502. "Requested {} nodes to shut down.",
  503. cf.bold(len(A)),
  504. _tags=dict(interval="1s"),
  505. )
  506. time.sleep(POLL_INTERVAL) # todo: interval should be a variable
  507. A = remaining_nodes()
  508. cli_logger.print(
  509. "{} nodes remaining after {} second(s).", cf.bold(len(A)), POLL_INTERVAL
  510. )
  511. cli_logger.success("No nodes remaining.")
  512. # Cleanup shared cluster resources if provider supports it
  513. if hasattr(provider, "cleanup_cluster_resources") and not workers_only:
  514. try:
  515. cli_logger.print("Cleaning up shared cluster resources...")
  516. provider.cleanup_cluster_resources()
  517. cli_logger.success("Shared cluster resources cleaned up.")
  518. except Exception as e:
  519. cli_logger.verbose_error("{}", str(e))
  520. cli_logger.warning(
  521. "Failed to cleanup shared cluster resources "
  522. "(use -v to see details). "
  523. "You may need to manually delete MSI, NSG, and Subnet resources."
  524. )
  525. def kill_node(
  526. config_file: str, yes: bool, hard: bool, override_cluster_name: Optional[str]
  527. ) -> Optional[str]:
  528. """Kills a random Raylet worker."""
  529. config = yaml.safe_load(open(config_file).read())
  530. if override_cluster_name is not None:
  531. config["cluster_name"] = override_cluster_name
  532. config = _bootstrap_config(config)
  533. cli_logger.confirm(yes, "A random node will be killed.")
  534. provider = _get_node_provider(config["provider"], config["cluster_name"])
  535. nodes = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
  536. if not nodes:
  537. cli_logger.print("No worker nodes detected.")
  538. return None
  539. node = random.choice(nodes)
  540. cli_logger.print("Shutdown " + cf.bold("{}"), node)
  541. if hard:
  542. provider.terminate_node(node)
  543. else:
  544. updater = NodeUpdaterThread(
  545. node_id=node,
  546. provider_config=config["provider"],
  547. provider=provider,
  548. auth_config=config["auth"],
  549. cluster_name=config["cluster_name"],
  550. file_mounts=config["file_mounts"],
  551. initialization_commands=[],
  552. setup_commands=[],
  553. ray_start_commands=[],
  554. runtime_hash="",
  555. file_mounts_contents_hash="",
  556. is_head_node=False,
  557. docker_config=config.get("docker"),
  558. )
  559. _exec(updater, "ray stop", False, False)
  560. time.sleep(POLL_INTERVAL)
  561. if config.get("provider", {}).get("use_internal_ips", False):
  562. node_ip = provider.internal_ip(node)
  563. else:
  564. node_ip = provider.external_ip(node)
  565. return node_ip
  566. def monitor_cluster(
  567. cluster_config_file: str, num_lines: int, override_cluster_name: Optional[str]
  568. ) -> None:
  569. """Tails the autoscaler logs of a Ray cluster."""
  570. cmd = f"tail -n {num_lines} -f /tmp/ray/session_latest/logs/monitor*"
  571. exec_cluster(
  572. cluster_config_file,
  573. cmd=cmd,
  574. run_env="auto",
  575. screen=False,
  576. tmux=False,
  577. stop=False,
  578. start=False,
  579. override_cluster_name=override_cluster_name,
  580. port_forward=None,
  581. )
  582. def warn_about_bad_start_command(
  583. start_commands: List[str], no_monitor_on_head: bool = False
  584. ) -> None:
  585. ray_start_cmd = list(filter(lambda x: "ray start" in x, start_commands))
  586. if len(ray_start_cmd) == 0:
  587. cli_logger.warning(
  588. "Ray runtime will not be started because `{}` is not in `{}`.",
  589. cf.bold("ray start"),
  590. cf.bold("head_start_ray_commands"),
  591. )
  592. autoscaling_config_in_ray_start_cmd = any(
  593. "autoscaling-config" in x for x in ray_start_cmd
  594. )
  595. if not (autoscaling_config_in_ray_start_cmd or no_monitor_on_head):
  596. cli_logger.warning(
  597. "The head node will not launch any workers because "
  598. "`{}` does not have `{}` set.\n"
  599. "Potential fix: add `{}` to the `{}` command under `{}`.",
  600. cf.bold("ray start"),
  601. cf.bold("--autoscaling-config"),
  602. cf.bold("--autoscaling-config=~/ray_bootstrap_config.yaml"),
  603. cf.bold("ray start"),
  604. cf.bold("head_start_ray_commands"),
  605. )
  606. def get_or_create_head_node(
  607. config: Dict[str, Any],
  608. printable_config_file: str,
  609. no_restart: bool,
  610. restart_only: bool,
  611. yes: bool,
  612. override_cluster_name: Optional[str],
  613. no_monitor_on_head: bool = False,
  614. _provider: Optional[NodeProvider] = None,
  615. _runner: ModuleType = subprocess,
  616. ) -> None:
  617. """Create the cluster head node, which in turn creates the workers."""
  618. global_event_system.execute_callback(CreateClusterEvent.cluster_booting_started)
  619. provider = _provider or _get_node_provider(
  620. config["provider"], config["cluster_name"]
  621. )
  622. config = copy.deepcopy(config)
  623. head_node_tags = {
  624. TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
  625. }
  626. nodes = provider.non_terminated_nodes(head_node_tags)
  627. if len(nodes) > 0:
  628. head_node = nodes[0]
  629. else:
  630. head_node = None
  631. if not head_node:
  632. cli_logger.confirm(
  633. yes, "No head node found. Launching a new cluster.", _abort=True
  634. )
  635. cli_logger.newline()
  636. usage_lib.show_usage_stats_prompt(cli=True)
  637. if head_node:
  638. if restart_only:
  639. cli_logger.confirm(
  640. yes,
  641. "Updating cluster configuration and "
  642. "restarting the cluster Ray runtime. "
  643. "Setup commands will not be run due to `{}`.\n",
  644. cf.bold("--restart-only"),
  645. _abort=True,
  646. )
  647. cli_logger.newline()
  648. usage_lib.show_usage_stats_prompt(cli=True)
  649. elif no_restart:
  650. cli_logger.print(
  651. "Cluster Ray runtime will not be restarted due to `{}`.",
  652. cf.bold("--no-restart"),
  653. )
  654. cli_logger.confirm(
  655. yes,
  656. "Updating cluster configuration and running setup commands.",
  657. _abort=True,
  658. )
  659. else:
  660. cli_logger.print("Updating cluster configuration and running full setup.")
  661. cli_logger.confirm(
  662. yes, cf.bold("Cluster Ray runtime will be restarted."), _abort=True
  663. )
  664. cli_logger.newline()
  665. usage_lib.show_usage_stats_prompt(cli=True)
  666. cli_logger.newline()
  667. # TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync)
  668. head_node_config = copy.deepcopy(config.get("head_node", {}))
  669. # The above `head_node` field is deprecated in favor of per-node-type
  670. # node_configs. We allow it for backwards-compatibility.
  671. head_node_resources = None
  672. head_node_labels = None
  673. head_node_type = config.get("head_node_type")
  674. if head_node_type:
  675. head_node_tags[TAG_RAY_USER_NODE_TYPE] = head_node_type
  676. head_config = config["available_node_types"][head_node_type]
  677. head_node_config.update(head_config["node_config"])
  678. # Not necessary to keep in sync with node_launcher.py
  679. # Keep in sync with autoscaler.py _node_resources
  680. head_node_resources = head_config.get("resources")
  681. head_node_labels = head_config.get("labels")
  682. launch_hash = hash_launch_conf(head_node_config, config["auth"])
  683. creating_new_head = _should_create_new_head(
  684. head_node, launch_hash, head_node_type, provider
  685. )
  686. if creating_new_head:
  687. with cli_logger.group("Acquiring an up-to-date head node"):
  688. global_event_system.execute_callback(
  689. CreateClusterEvent.acquiring_new_head_node
  690. )
  691. if head_node is not None:
  692. cli_logger.confirm(yes, "Relaunching the head node.", _abort=True)
  693. provider.terminate_node(head_node)
  694. cli_logger.print("Terminated head node {}", head_node)
  695. head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
  696. head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
  697. config["cluster_name"]
  698. )
  699. head_node_tags[TAG_RAY_NODE_STATUS] = STATUS_UNINITIALIZED
  700. provider.create_node(head_node_config, head_node_tags, 1)
  701. cli_logger.print("Launched a new head node")
  702. start = time.time()
  703. head_node = None
  704. with cli_logger.group("Fetching the new head node"):
  705. while True:
  706. if time.time() - start > 50:
  707. cli_logger.abort(
  708. "Head node fetch timed out. Failed to create head node."
  709. )
  710. nodes = provider.non_terminated_nodes(head_node_tags)
  711. if len(nodes) == 1:
  712. head_node = nodes[0]
  713. break
  714. time.sleep(POLL_INTERVAL)
  715. cli_logger.newline()
  716. global_event_system.execute_callback(CreateClusterEvent.head_node_acquired)
  717. with cli_logger.group(
  718. "Setting up head node",
  719. _numbered=("<>", 1, 1),
  720. # cf.bold(provider.node_tags(head_node)[TAG_RAY_NODE_NAME]),
  721. _tags=dict(),
  722. ): # add id, ARN to tags?
  723. # TODO(ekl) right now we always update the head node even if the
  724. # hash matches.
  725. # We could prompt the user for what they want to do here.
  726. # No need to pass in cluster_sync_files because we use this
  727. # hash to set up the head node
  728. (runtime_hash, file_mounts_contents_hash) = hash_runtime_conf(
  729. config["file_mounts"], None, config
  730. )
  731. if not no_monitor_on_head:
  732. # Return remote_config_file to avoid prematurely closing it.
  733. config, remote_config_file = _set_up_config_for_head_node(
  734. config, provider, no_restart
  735. )
  736. cli_logger.print("Prepared bootstrap config")
  737. if restart_only:
  738. # Docker may re-launch nodes, requiring setup
  739. # commands to be rerun.
  740. if config.get("docker", {}).get("container_name"):
  741. setup_commands = config["head_setup_commands"]
  742. else:
  743. setup_commands = []
  744. ray_start_commands = config["head_start_ray_commands"]
  745. # If user passed in --no-restart and we're not creating a new head,
  746. # omit start commands.
  747. elif no_restart and not creating_new_head:
  748. setup_commands = config["head_setup_commands"]
  749. ray_start_commands = []
  750. else:
  751. setup_commands = config["head_setup_commands"]
  752. ray_start_commands = config["head_start_ray_commands"]
  753. if not no_restart:
  754. warn_about_bad_start_command(ray_start_commands, no_monitor_on_head)
  755. # Use RAY_UP_enable_autoscaler_v2 instead of RAY_enable_autoscaler_v2
  756. # to avoid accidentally enabling autoscaler v2 for ray up
  757. # due to env inheritance. The default value is 1 since Ray 2.50.0.
  758. if os.getenv("RAY_UP_enable_autoscaler_v2", "1") == "1":
  759. if "RAY_UP_enable_autoscaler_v2" not in os.environ:
  760. # TODO (rueian): Remove this notice after Ray 2.52.0.
  761. cli_logger.print(
  762. "Autoscaler v2 is now enabled by default (since Ray 2.50.0). "
  763. "To switch back to v1, set {}=0. This message can be suppressed by setting {} explicitly.",
  764. cf.bold("RAY_UP_enable_autoscaler_v2"),
  765. cf.bold("RAY_UP_enable_autoscaler_v2"),
  766. )
  767. ray_start_commands = with_envs(
  768. ray_start_commands,
  769. {
  770. "RAY_enable_autoscaler_v2": "1",
  771. "RAY_CLOUD_INSTANCE_ID": head_node,
  772. "RAY_NODE_TYPE_NAME": head_node_type,
  773. },
  774. )
  775. updater = NodeUpdaterThread(
  776. node_id=head_node,
  777. provider_config=config["provider"],
  778. provider=provider,
  779. auth_config=config["auth"],
  780. cluster_name=config["cluster_name"],
  781. file_mounts=config["file_mounts"],
  782. initialization_commands=config["initialization_commands"],
  783. setup_commands=setup_commands,
  784. ray_start_commands=ray_start_commands,
  785. process_runner=_runner,
  786. runtime_hash=runtime_hash,
  787. file_mounts_contents_hash=file_mounts_contents_hash,
  788. is_head_node=True,
  789. node_resources=head_node_resources,
  790. node_labels=head_node_labels,
  791. rsync_options={
  792. "rsync_exclude": config.get("rsync_exclude"),
  793. "rsync_filter": config.get("rsync_filter"),
  794. },
  795. docker_config=config.get("docker"),
  796. restart_only=restart_only,
  797. )
  798. updater.start()
  799. updater.join()
  800. # Refresh the node cache so we see the external ip if available
  801. provider.non_terminated_nodes(head_node_tags)
  802. if updater.exitcode != 0:
  803. # todo: this does not follow the mockup and is not good enough
  804. cli_logger.abort("Failed to setup head node.")
  805. sys.exit(1)
  806. global_event_system.execute_callback(
  807. CreateClusterEvent.cluster_booting_completed,
  808. {
  809. "head_node_id": head_node,
  810. },
  811. )
  812. monitor_str = "tail -n 100 -f /tmp/ray/session_latest/logs/monitor*"
  813. if override_cluster_name:
  814. modifiers = " --cluster-name={}".format(quote(override_cluster_name))
  815. else:
  816. modifiers = ""
  817. cli_logger.newline()
  818. with cli_logger.group("Useful commands:"):
  819. printable_config_file = os.path.abspath(printable_config_file)
  820. cli_logger.print("To terminate the cluster:")
  821. cli_logger.print(cf.bold(f" ray down {printable_config_file}{modifiers}"))
  822. cli_logger.newline()
  823. cli_logger.print("To retrieve the IP address of the cluster head:")
  824. cli_logger.print(
  825. cf.bold(f" ray get-head-ip {printable_config_file}{modifiers}")
  826. )
  827. cli_logger.newline()
  828. cli_logger.print(
  829. "To port-forward the cluster's Ray Dashboard to the local machine:"
  830. )
  831. cli_logger.print(cf.bold(f" ray dashboard {printable_config_file}{modifiers}"))
  832. cli_logger.newline()
  833. cli_logger.print(
  834. "To submit a job to the cluster, port-forward the "
  835. "Ray Dashboard in another terminal and run:"
  836. )
  837. cli_logger.print(
  838. cf.bold(
  839. " ray job submit --address http://localhost:<dashboard-port> "
  840. "--working-dir . -- python my_script.py"
  841. )
  842. )
  843. cli_logger.newline()
  844. cli_logger.print("To connect to a terminal on the cluster head for debugging:")
  845. cli_logger.print(cf.bold(f" ray attach {printable_config_file}{modifiers}"))
  846. cli_logger.newline()
  847. cli_logger.print("To monitor autoscaling:")
  848. cli_logger.print(
  849. cf.bold(
  850. f" ray exec {printable_config_file}{modifiers} {quote(monitor_str)}"
  851. )
  852. )
  853. cli_logger.newline()
  854. # Clean up temporary config file if it was created
  855. # Clean up temporary config file if it was created on Windows
  856. if (
  857. sys.platform == "win32"
  858. and not no_monitor_on_head
  859. and "remote_config_file" in locals()
  860. ):
  861. try:
  862. os.remove(remote_config_file.name)
  863. except OSError:
  864. pass # Ignore cleanup errors
  865. def _should_create_new_head(
  866. head_node_id: Optional[str],
  867. new_launch_hash: str,
  868. new_head_node_type: str,
  869. provider: NodeProvider,
  870. ) -> bool:
  871. """Decides whether a new head node needs to be created.
  872. We need a new head if at least one of the following holds:
  873. (a) There isn't an existing head node
  874. (b) The user-submitted head node_config differs from the existing head
  875. node's node_config.
  876. (c) The user-submitted head node_type key differs from the existing head
  877. node's node_type.
  878. Args:
  879. head_node_id (Optional[str]): head node id if a head exists, else None
  880. new_launch_hash: hash of current user-submitted head config
  881. new_head_node_type: current user-submitted head node-type key
  882. Returns:
  883. bool: True if a new Ray head node should be launched, False otherwise
  884. """
  885. if not head_node_id:
  886. # No head node exists, need to create it.
  887. return True
  888. # Pull existing head's data.
  889. head_tags = provider.node_tags(head_node_id)
  890. current_launch_hash = head_tags.get(TAG_RAY_LAUNCH_CONFIG)
  891. current_head_type = head_tags.get(TAG_RAY_USER_NODE_TYPE)
  892. # Compare to current head
  893. hashes_mismatch = new_launch_hash != current_launch_hash
  894. types_mismatch = new_head_node_type != current_head_type
  895. new_head_required = hashes_mismatch or types_mismatch
  896. # Warn user
  897. if new_head_required:
  898. with cli_logger.group(
  899. "Currently running head node is out-of-date with cluster configuration"
  900. ):
  901. if hashes_mismatch:
  902. cli_logger.print(
  903. "Current hash is {}, expected {}",
  904. cf.bold(current_launch_hash),
  905. cf.bold(new_launch_hash),
  906. )
  907. if types_mismatch:
  908. cli_logger.print(
  909. "Current head node type is {}, expected {}",
  910. cf.bold(current_head_type),
  911. cf.bold(new_head_node_type),
  912. )
  913. return new_head_required
  914. def _set_up_config_for_head_node(
  915. config: Dict[str, Any], provider: NodeProvider, no_restart: bool
  916. ) -> Tuple[Dict[str, Any], Any]:
  917. """Prepares autoscaling config and, if needed, ssh key, to be mounted onto
  918. the Ray head node for use by the autoscaler.
  919. Returns the modified config and the temporary config file that will be
  920. mounted onto the head node.
  921. """
  922. # Rewrite the auth config so that the head
  923. # node can update the workers
  924. remote_config = copy.deepcopy(config)
  925. # drop proxy options if they exist, otherwise
  926. # head node won't be able to connect to workers
  927. remote_config["auth"].pop("ssh_proxy_command", None)
  928. # Drop the head_node field if it was introduced. It is technically not a
  929. # valid field in the config, but it may have been introduced after
  930. # validation (see _bootstrap_config() call to
  931. # provider_cls.bootstrap_config(config)). The head node will never try to
  932. # launch a head node so it doesn't need these defaults.
  933. remote_config.pop("head_node", None)
  934. if "ssh_private_key" in config["auth"]:
  935. remote_key_path = "~/ray_bootstrap_key.pem"
  936. remote_config["auth"]["ssh_private_key"] = remote_key_path
  937. # Adjust for new file locations
  938. new_mounts = {}
  939. for remote_path in config["file_mounts"]:
  940. new_mounts[remote_path] = remote_path
  941. remote_config["file_mounts"] = new_mounts
  942. remote_config["no_restart"] = no_restart
  943. remote_config = provider.prepare_for_head_node(remote_config)
  944. # Now inject the rewritten config and SSH key into the head node
  945. is_windows = sys.platform == "win32"
  946. remote_config_file = tempfile.NamedTemporaryFile(
  947. "w", prefix="ray-bootstrap-", delete=not is_windows
  948. )
  949. remote_config_file.write(json.dumps(remote_config))
  950. remote_config_file.flush()
  951. if is_windows:
  952. remote_config_file.close() # Close the file handle to ensure it's accessible
  953. config["file_mounts"].update(
  954. {"~/ray_bootstrap_config.yaml": remote_config_file.name}
  955. )
  956. if "ssh_private_key" in config["auth"]:
  957. config["file_mounts"].update(
  958. {
  959. remote_key_path: config["auth"]["ssh_private_key"],
  960. }
  961. )
  962. return config, remote_config_file
  963. def attach_cluster(
  964. config_file: str,
  965. start: bool,
  966. use_screen: bool,
  967. use_tmux: bool,
  968. override_cluster_name: Optional[str],
  969. no_config_cache: bool = False,
  970. new: bool = False,
  971. port_forward: Optional[Port_forward] = None,
  972. node_ip: Optional[str] = None,
  973. ) -> None:
  974. """Attaches to a screen for the specified cluster.
  975. Arguments:
  976. config_file: path to the cluster yaml
  977. start: whether to start the cluster if it isn't up
  978. use_screen: whether to use screen as multiplexer
  979. use_tmux: whether to use tmux as multiplexer
  980. override_cluster_name: set the name of the cluster
  981. no_config_cache: whether to skip the config cache
  982. new: whether to force a new screen
  983. port_forward ( (int,int) or list[(int,int)] ): port(s) to forward
  984. node_ip: IP address of the node to attach to
  985. """
  986. if use_tmux:
  987. if new:
  988. cmd = "tmux new"
  989. else:
  990. cmd = "tmux attach || tmux new"
  991. elif use_screen:
  992. if new:
  993. cmd = "screen -L"
  994. else:
  995. cmd = "screen -L -xRR"
  996. else:
  997. if new:
  998. raise ValueError("--new only makes sense if passing --screen or --tmux")
  999. cmd = "$SHELL"
  1000. exec_cluster(
  1001. config_file,
  1002. cmd=cmd,
  1003. run_env="auto",
  1004. screen=False,
  1005. tmux=False,
  1006. stop=False,
  1007. start=start,
  1008. override_cluster_name=override_cluster_name,
  1009. no_config_cache=no_config_cache,
  1010. port_forward=port_forward,
  1011. _allow_uninitialized_state=True,
  1012. node_ip=node_ip,
  1013. )
  1014. def exec_cluster(
  1015. config_file: str,
  1016. *,
  1017. cmd: Optional[str] = None,
  1018. run_env: str = "auto",
  1019. screen: bool = False,
  1020. tmux: bool = False,
  1021. stop: bool = False,
  1022. start: bool = False,
  1023. override_cluster_name: Optional[str] = None,
  1024. no_config_cache: bool = False,
  1025. port_forward: Optional[Port_forward] = None,
  1026. with_output: bool = False,
  1027. _allow_uninitialized_state: bool = False,
  1028. extra_screen_args: Optional[str] = None,
  1029. node_ip: Optional[str] = None,
  1030. ) -> str:
  1031. """Runs a command on the specified cluster.
  1032. Arguments:
  1033. config_file: path to the cluster yaml
  1034. cmd: command to run
  1035. run_env: whether to run the command on the host or in a container.
  1036. Select between "auto", "host" and "docker"
  1037. screen: whether to run in a screen
  1038. tmux: whether to run in a tmux session
  1039. stop: whether to stop the cluster after command run
  1040. start: whether to start the cluster if it isn't up
  1041. override_cluster_name: set the name of the cluster
  1042. no_config_cache: whether to skip the config cache
  1043. port_forward: port(s) to forward
  1044. with_output: whether to return the command output
  1045. _allow_uninitialized_state: whether to execute on an uninitialized head
  1046. node.
  1047. extra_screen_args: optional custom additional args to screen command
  1048. node_ip: IP address of the node to execute on
  1049. """
  1050. assert not (screen and tmux), "Can specify only one of `screen` or `tmux`."
  1051. assert run_env in RUN_ENV_TYPES, "--run_env must be in {}".format(RUN_ENV_TYPES)
  1052. # TODO(rliaw): We default this to True to maintain backwards-compat.
  1053. # In the future we would want to support disabling login-shells
  1054. # and interactivity.
  1055. cmd_output_util.set_allow_interactive(True)
  1056. config = yaml.safe_load(open(config_file).read())
  1057. if override_cluster_name is not None:
  1058. config["cluster_name"] = override_cluster_name
  1059. config = _bootstrap_config(config, no_config_cache=no_config_cache)
  1060. provider = _get_node_provider(config["provider"], config["cluster_name"])
  1061. if node_ip:
  1062. # IP specified by user, find the node with the IP
  1063. if start:
  1064. cli_logger.warning(
  1065. "The {} flag is ignored when {} is specified, "
  1066. "as the node IP can be either head or worker node. "
  1067. "If you need to start the cluster, run {} first, "
  1068. "or use {} without {}.",
  1069. cf.bold("--start"),
  1070. cf.bold("--node-ip"),
  1071. cf.bold(f"ray up {config_file}"),
  1072. cf.bold("ray attach"),
  1073. cf.bold("--node-ip"),
  1074. )
  1075. use_internal_ip = config.get("provider", {}).get("use_internal_ips", False)
  1076. try:
  1077. target_node = provider.get_node_id(node_ip, use_internal_ip=use_internal_ip)
  1078. cli_logger.print("Attaching to node with IP: {}", cf.bold(node_ip))
  1079. except ValueError as e:
  1080. cli_logger.abort(
  1081. "Could not find node with IP {}. {}", cf.bold(node_ip), str(e)
  1082. )
  1083. is_head_node = (
  1084. provider.node_tags(target_node)[TAG_RAY_NODE_KIND] == NODE_KIND_HEAD
  1085. )
  1086. else:
  1087. # Default attaching to head node
  1088. target_node = _get_running_head_node(
  1089. config,
  1090. config_file,
  1091. override_cluster_name,
  1092. create_if_needed=start,
  1093. _provider=provider,
  1094. _allow_uninitialized_state=_allow_uninitialized_state,
  1095. )
  1096. is_head_node = True
  1097. updater = NodeUpdaterThread(
  1098. node_id=target_node,
  1099. provider_config=config["provider"],
  1100. provider=provider,
  1101. auth_config=config["auth"],
  1102. cluster_name=config["cluster_name"],
  1103. file_mounts=config["file_mounts"],
  1104. initialization_commands=[],
  1105. setup_commands=[],
  1106. ray_start_commands=[],
  1107. runtime_hash="",
  1108. file_mounts_contents_hash="",
  1109. is_head_node=is_head_node,
  1110. rsync_options={
  1111. "rsync_exclude": config.get("rsync_exclude"),
  1112. "rsync_filter": config.get("rsync_filter"),
  1113. },
  1114. docker_config=config.get("docker"),
  1115. )
  1116. if cmd and stop:
  1117. cmd = "; ".join(
  1118. [
  1119. cmd,
  1120. "ray stop",
  1121. "ray teardown ~/ray_bootstrap_config.yaml --yes --workers-only",
  1122. "sudo shutdown -h now",
  1123. ]
  1124. )
  1125. result = _exec(
  1126. updater,
  1127. cmd,
  1128. screen,
  1129. tmux,
  1130. port_forward=port_forward,
  1131. with_output=with_output,
  1132. run_env=run_env,
  1133. shutdown_after_run=False,
  1134. extra_screen_args=extra_screen_args,
  1135. )
  1136. if tmux or screen:
  1137. attach_command_parts = ["ray attach", config_file]
  1138. if override_cluster_name is not None:
  1139. attach_command_parts.append(
  1140. "--cluster-name={}".format(override_cluster_name)
  1141. )
  1142. if node_ip is not None:
  1143. attach_command_parts.append("--node-ip={}".format(node_ip))
  1144. if tmux:
  1145. attach_command_parts.append("--tmux")
  1146. elif screen:
  1147. attach_command_parts.append("--screen")
  1148. attach_command = " ".join(attach_command_parts)
  1149. cli_logger.print("Run `{}` to check command status.", cf.bold(attach_command))
  1150. return result
  1151. def _exec(
  1152. updater: NodeUpdaterThread,
  1153. cmd: Optional[str] = None,
  1154. screen: bool = False,
  1155. tmux: bool = False,
  1156. port_forward: Optional[Port_forward] = None,
  1157. with_output: bool = False,
  1158. run_env: str = "auto",
  1159. shutdown_after_run: bool = False,
  1160. extra_screen_args: Optional[str] = None,
  1161. ) -> str:
  1162. if cmd:
  1163. if screen:
  1164. wrapped_cmd = [
  1165. "screen",
  1166. "-L",
  1167. "-dm",
  1168. ]
  1169. if extra_screen_args is not None and len(extra_screen_args) > 0:
  1170. wrapped_cmd += [extra_screen_args]
  1171. wrapped_cmd += [
  1172. "bash",
  1173. "-c",
  1174. quote(cmd + "; exec bash"),
  1175. ]
  1176. cmd = " ".join(wrapped_cmd)
  1177. elif tmux:
  1178. # TODO: Consider providing named session functionality
  1179. wrapped_cmd = [
  1180. "tmux",
  1181. "new",
  1182. "-d",
  1183. "bash",
  1184. "-c",
  1185. quote(cmd + "; exec bash"),
  1186. ]
  1187. cmd = " ".join(wrapped_cmd)
  1188. return updater.cmd_runner.run(
  1189. cmd,
  1190. exit_on_fail=True,
  1191. port_forward=port_forward,
  1192. with_output=with_output,
  1193. run_env=run_env,
  1194. shutdown_after_run=shutdown_after_run,
  1195. )
  1196. def rsync(
  1197. config_file: str,
  1198. source: Optional[str],
  1199. target: Optional[str],
  1200. override_cluster_name: Optional[str],
  1201. down: bool,
  1202. ip_address: Optional[str] = None,
  1203. use_internal_ip: bool = False,
  1204. no_config_cache: bool = False,
  1205. all_nodes: bool = False,
  1206. should_bootstrap: bool = True,
  1207. _runner: ModuleType = subprocess,
  1208. ) -> None:
  1209. """Rsyncs files.
  1210. Arguments:
  1211. config_file: path to the cluster yaml
  1212. source: source dir
  1213. target: target dir
  1214. override_cluster_name: set the name of the cluster
  1215. down: whether we're syncing remote -> local
  1216. ip_address: Address of node. Raise Exception
  1217. if both ip_address and 'all_nodes' are provided.
  1218. use_internal_ip: Whether the provided ip_address is
  1219. public or private.
  1220. all_nodes: whether to sync worker nodes in addition to the head node
  1221. should_bootstrap: whether to bootstrap cluster config before syncing
  1222. """
  1223. if bool(source) != bool(target):
  1224. cli_logger.abort("Expected either both a source and a target, or neither.")
  1225. assert bool(source) == bool(
  1226. target
  1227. ), "Must either provide both or neither source and target."
  1228. if ip_address and all_nodes:
  1229. cli_logger.abort("Cannot provide both ip_address and 'all_nodes'.")
  1230. config = yaml.safe_load(open(config_file).read())
  1231. if override_cluster_name is not None:
  1232. config["cluster_name"] = override_cluster_name
  1233. if should_bootstrap:
  1234. config = _bootstrap_config(config, no_config_cache=no_config_cache)
  1235. is_file_mount = False
  1236. if source and target:
  1237. for remote_mount in config.get("file_mounts", {}).keys():
  1238. if (source if down else target).startswith(remote_mount):
  1239. is_file_mount = True
  1240. break
  1241. provider = _get_node_provider(config["provider"], config["cluster_name"])
  1242. def rsync_to_node(node_id, is_head_node):
  1243. updater = NodeUpdaterThread(
  1244. node_id=node_id,
  1245. provider_config=config["provider"],
  1246. provider=provider,
  1247. auth_config=config["auth"],
  1248. cluster_name=config["cluster_name"],
  1249. file_mounts=config["file_mounts"],
  1250. initialization_commands=[],
  1251. setup_commands=[],
  1252. ray_start_commands=[],
  1253. runtime_hash="",
  1254. use_internal_ip=use_internal_ip,
  1255. process_runner=_runner,
  1256. file_mounts_contents_hash="",
  1257. is_head_node=is_head_node,
  1258. rsync_options={
  1259. "rsync_exclude": config.get("rsync_exclude"),
  1260. "rsync_filter": config.get("rsync_filter"),
  1261. },
  1262. docker_config=config.get("docker"),
  1263. )
  1264. if down:
  1265. rsync = updater.rsync_down
  1266. else:
  1267. rsync = updater.rsync_up
  1268. if source and target:
  1269. # print rsync progress for single file rsync
  1270. if cli_logger.verbosity > 0:
  1271. cmd_output_util.set_output_redirected(False)
  1272. set_rsync_silent(False)
  1273. rsync(source, target, is_file_mount)
  1274. else:
  1275. updater.sync_file_mounts(rsync)
  1276. nodes = []
  1277. head_node = _get_running_head_node(
  1278. config, config_file, override_cluster_name, create_if_needed=False
  1279. )
  1280. if ip_address:
  1281. nodes = [provider.get_node_id(ip_address, use_internal_ip=use_internal_ip)]
  1282. else:
  1283. nodes = [head_node]
  1284. if all_nodes:
  1285. nodes.extend(_get_worker_nodes(config, override_cluster_name))
  1286. for node_id in nodes:
  1287. rsync_to_node(node_id, is_head_node=(node_id == head_node))
  1288. def get_head_node_ip(
  1289. config_file: str, override_cluster_name: Optional[str] = None
  1290. ) -> str:
  1291. """Returns head node IP for given configuration file if exists."""
  1292. config = yaml.safe_load(open(config_file).read())
  1293. if override_cluster_name is not None:
  1294. config["cluster_name"] = override_cluster_name
  1295. provider = _get_node_provider(config["provider"], config["cluster_name"])
  1296. head_node = _get_running_head_node(config, config_file, override_cluster_name)
  1297. provider_cfg = config.get("provider", {})
  1298. # Get internal IP if using internal IPs and
  1299. # use_external_head_ip is not specified
  1300. if provider_cfg.get("use_internal_ips", False) and not provider_cfg.get(
  1301. "use_external_head_ip", False
  1302. ):
  1303. head_node_ip = provider.internal_ip(head_node)
  1304. else:
  1305. head_node_ip = provider.external_ip(head_node)
  1306. return head_node_ip
  1307. def get_worker_node_ips(
  1308. config_file: str, override_cluster_name: Optional[str] = None
  1309. ) -> List[str]:
  1310. """Returns worker node IPs for given configuration file."""
  1311. config = yaml.safe_load(open(config_file).read())
  1312. if override_cluster_name is not None:
  1313. config["cluster_name"] = override_cluster_name
  1314. provider = _get_node_provider(config["provider"], config["cluster_name"])
  1315. nodes = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
  1316. if config.get("provider", {}).get("use_internal_ips", False):
  1317. return [provider.internal_ip(node) for node in nodes]
  1318. else:
  1319. return [provider.external_ip(node) for node in nodes]
  1320. def _get_worker_nodes(
  1321. config: Dict[str, Any], override_cluster_name: Optional[str]
  1322. ) -> List[str]:
  1323. """Returns worker node ids for given configuration."""
  1324. # todo: technically could be reused in get_worker_node_ips
  1325. if override_cluster_name is not None:
  1326. config["cluster_name"] = override_cluster_name
  1327. provider = _get_node_provider(config["provider"], config["cluster_name"])
  1328. return provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
  1329. def _get_running_head_node(
  1330. config: Dict[str, Any],
  1331. printable_config_file: str,
  1332. override_cluster_name: Optional[str],
  1333. create_if_needed: bool = False,
  1334. _provider: Optional[NodeProvider] = None,
  1335. _allow_uninitialized_state: bool = False,
  1336. ) -> str:
  1337. """Get a valid, running head node.
  1338. Args:
  1339. config (Dict[str, Any]): Cluster Config dictionary
  1340. printable_config_file: Used for printing formatted CLI commands.
  1341. override_cluster_name: Passed to `get_or_create_head_node` to
  1342. override the cluster name present in `config`.
  1343. create_if_needed: Create a head node if one is not present.
  1344. _provider: [For testing], a Node Provider to use.
  1345. _allow_uninitialized_state: Whether to return a head node that
  1346. is not 'UP TO DATE'. This is used to allow `ray attach` and
  1347. `ray exec` to debug a cluster in a bad state.
  1348. """
  1349. provider = _provider or _get_node_provider(
  1350. config["provider"], config["cluster_name"]
  1351. )
  1352. head_node_tags = {
  1353. TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
  1354. }
  1355. nodes = provider.non_terminated_nodes(head_node_tags)
  1356. head_node = None
  1357. _backup_head_node = None
  1358. for node in nodes:
  1359. node_state = provider.node_tags(node).get(TAG_RAY_NODE_STATUS)
  1360. if node_state == STATUS_UP_TO_DATE:
  1361. head_node = node
  1362. else:
  1363. _backup_head_node = node
  1364. cli_logger.warning(f"Head node ({node}) is in state {node_state}.")
  1365. if head_node is not None:
  1366. return head_node
  1367. elif create_if_needed:
  1368. get_or_create_head_node(
  1369. config,
  1370. printable_config_file=printable_config_file,
  1371. restart_only=False,
  1372. no_restart=False,
  1373. yes=True,
  1374. override_cluster_name=override_cluster_name,
  1375. )
  1376. # NOTE: `_allow_uninitialized_state` is forced to False if
  1377. # `create_if_needed` is set to True. This is to ensure that the
  1378. # commands executed after creation occur on an actually running
  1379. # cluster.
  1380. return _get_running_head_node(
  1381. config,
  1382. printable_config_file,
  1383. override_cluster_name,
  1384. create_if_needed=False,
  1385. _allow_uninitialized_state=False,
  1386. )
  1387. else:
  1388. if _allow_uninitialized_state and _backup_head_node is not None:
  1389. cli_logger.warning(
  1390. f"The head node being returned: {_backup_head_node} is not "
  1391. "`up-to-date`. If you are not debugging a startup issue "
  1392. "it is recommended to restart this head node with: {}",
  1393. cf.bold(f" ray down {printable_config_file}"),
  1394. )
  1395. return _backup_head_node
  1396. raise RuntimeError(
  1397. "Head node of cluster ({}) not found!".format(config["cluster_name"])
  1398. )
  1399. def get_local_dump_archive(
  1400. stream: bool = False,
  1401. output: Optional[str] = None,
  1402. logs: bool = True,
  1403. debug_state: bool = True,
  1404. pip: bool = True,
  1405. processes: bool = True,
  1406. processes_verbose: bool = False,
  1407. tempfile: Optional[str] = None,
  1408. ) -> Optional[str]:
  1409. if stream and output:
  1410. raise ValueError(
  1411. "You can only use either `--output` or `--stream`, but not both."
  1412. )
  1413. parameters = GetParameters(
  1414. logs=logs,
  1415. debug_state=debug_state,
  1416. pip=pip,
  1417. processes=processes,
  1418. processes_verbose=processes_verbose,
  1419. )
  1420. with Archive(file=tempfile) as archive:
  1421. get_all_local_data(archive, parameters)
  1422. tmp = archive.file
  1423. if stream:
  1424. with open(tmp, "rb") as fp:
  1425. os.write(1, fp.read())
  1426. os.remove(tmp)
  1427. return None
  1428. target = output or os.path.join(os.getcwd(), os.path.basename(tmp))
  1429. shutil.move(tmp, target)
  1430. cli_logger.print(f"Created local data archive at {target}")
  1431. return target
  1432. def get_cluster_dump_archive(
  1433. cluster_config_file: Optional[str] = None,
  1434. host: Optional[str] = None,
  1435. ssh_user: Optional[str] = None,
  1436. ssh_key: Optional[str] = None,
  1437. docker: Optional[str] = None,
  1438. local: Optional[bool] = None,
  1439. output: Optional[str] = None,
  1440. logs: bool = True,
  1441. debug_state: bool = True,
  1442. pip: bool = True,
  1443. processes: bool = True,
  1444. processes_verbose: bool = False,
  1445. tempfile: Optional[str] = None,
  1446. ) -> Optional[str]:
  1447. # Inform the user what kind of logs are collected (before actually
  1448. # collecting, so they can abort)
  1449. content_str = ""
  1450. if logs:
  1451. content_str += (
  1452. " - The logfiles of your Ray session\n"
  1453. " This usually includes Python outputs (stdout/stderr)\n"
  1454. )
  1455. if debug_state:
  1456. content_str += (
  1457. " - Debug state information on your Ray cluster \n"
  1458. " e.g. number of workers, drivers, objects, etc.\n"
  1459. )
  1460. if pip:
  1461. content_str += " - Your installed Python packages (`pip freeze`)\n"
  1462. if processes:
  1463. content_str += (
  1464. " - Information on your running Ray processes\n"
  1465. " This includes command line arguments\n"
  1466. )
  1467. cli_logger.warning(
  1468. "You are about to create a cluster dump. This will collect data from "
  1469. "cluster nodes.\n\n"
  1470. "The dump will contain this information:\n\n"
  1471. f"{content_str}\n"
  1472. f"If you are concerned about leaking private information, extract "
  1473. f"the archive and inspect its contents before sharing it with "
  1474. f"anyone."
  1475. )
  1476. # Parse arguments (e.g. fetch info from cluster config)
  1477. (
  1478. cluster_config_file,
  1479. hosts,
  1480. ssh_user,
  1481. ssh_key,
  1482. docker,
  1483. cluster_name,
  1484. ) = _info_from_params(cluster_config_file, host, ssh_user, ssh_key, docker)
  1485. nodes = [
  1486. Node(host=h, ssh_user=ssh_user, ssh_key=ssh_key, docker_container=docker)
  1487. for h in hosts
  1488. ]
  1489. if not nodes:
  1490. cli_logger.error(
  1491. "No nodes found. Specify with `--host` or by passing a ray "
  1492. "cluster config to `--cluster`."
  1493. )
  1494. return None
  1495. if cluster_config_file:
  1496. nodes[0].is_head = True
  1497. if local is None:
  1498. # If called with a cluster config, this was probably started
  1499. # from a laptop
  1500. local = not bool(cluster_config_file)
  1501. parameters = GetParameters(
  1502. logs=logs,
  1503. debug_state=debug_state,
  1504. pip=pip,
  1505. processes=processes,
  1506. processes_verbose=processes_verbose,
  1507. )
  1508. with Archive(file=tempfile) as archive:
  1509. if local:
  1510. create_archive_for_local_and_remote_nodes(
  1511. archive, remote_nodes=nodes, parameters=parameters
  1512. )
  1513. else:
  1514. create_archive_for_remote_nodes(
  1515. archive, remote_nodes=nodes, parameters=parameters
  1516. )
  1517. if not output:
  1518. if cluster_name:
  1519. filename = (
  1520. f"{cluster_name}_" f"{datetime.datetime.now():%Y-%m-%d_%H-%M-%S}.tar.gz"
  1521. )
  1522. else:
  1523. filename = (
  1524. f"collected_logs_" f"{datetime.datetime.now():%Y-%m-%d_%H-%M-%S}.tar.gz"
  1525. )
  1526. output = os.path.join(os.getcwd(), filename)
  1527. else:
  1528. output = os.path.expanduser(output)
  1529. shutil.move(archive.file, output)
  1530. return output
  1531. def confirm(msg: str, yes: bool) -> Optional[bool]:
  1532. return None if yes else click.confirm(msg, abort=True)