command_runner.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962
  1. import hashlib
  2. import json
  3. import logging
  4. import os
  5. import subprocess
  6. import sys
  7. import time
  8. from getpass import getuser
  9. from shlex import quote
  10. from typing import Dict, List, Optional, Tuple
  11. import click
  12. from ray._private.ray_constants import DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES
  13. from ray.autoscaler._private.cli_logger import cf, cli_logger
  14. from ray.autoscaler._private.constants import (
  15. AUTOSCALER_NODE_SSH_INTERVAL_S,
  16. AUTOSCALER_NODE_START_WAIT_S,
  17. DEFAULT_OBJECT_STORE_MEMORY_PROPORTION,
  18. )
  19. from ray.autoscaler._private.docker import (
  20. check_bind_mounts_cmd,
  21. check_docker_image,
  22. check_docker_running_cmd,
  23. docker_start_cmds,
  24. with_docker_exec,
  25. )
  26. from ray.autoscaler._private.log_timer import LogTimer
  27. from ray.autoscaler._private.subprocess_output_util import (
  28. ProcessRunnerError,
  29. is_output_redirected,
  30. run_cmd_redirected,
  31. )
  32. from ray.autoscaler.command_runner import CommandRunnerInterface
  33. logger = logging.getLogger(__name__)
  34. # How long to wait for a node to start, in seconds
  35. HASH_MAX_LENGTH = 10
  36. KUBECTL_RSYNC = os.path.join(
  37. os.path.dirname(os.path.abspath(__file__)), "_kubernetes/kubectl-rsync.sh"
  38. )
  39. MAX_HOME_RETRIES = 3
  40. HOME_RETRY_DELAY_S = 5
  41. _config = {"use_login_shells": True, "silent_rsync": True}
  42. def is_rsync_silent():
  43. return _config["silent_rsync"]
  44. def set_rsync_silent(val):
  45. """Choose whether to silence rsync output.
  46. Most commands will want to list rsync'd files themselves rather than
  47. print the default rsync spew.
  48. """
  49. _config["silent_rsync"] = val
  50. def is_using_login_shells():
  51. return _config["use_login_shells"]
  52. def set_using_login_shells(val: bool):
  53. """Choose between login and non-interactive shells.
  54. Non-interactive shells have the benefit of receiving less output from
  55. subcommands (since progress bars and TTY control codes are not printed).
  56. Sometimes this can be significant since e.g. `pip install` prints
  57. hundreds of progress bar lines when downloading.
  58. Login shells have the benefit of working very close to how a proper bash
  59. session does, regarding how scripts execute and how the environment is
  60. setup. This is also how all commands were ran in the past. The only reason
  61. to use login shells over non-interactive shells is if you need some weird
  62. and non-robust tool to work.
  63. Args:
  64. val: If true, login shells will be used to run all commands.
  65. """
  66. _config["use_login_shells"] = val
  67. def _with_environment_variables(cmd: str, environment_variables: Dict[str, object]):
  68. """Prepend environment variables to a shell command.
  69. Args:
  70. cmd: The base command.
  71. environment_variables (Dict[str, object]): The set of environment
  72. variables. If an environment variable value is a dict, it will
  73. automatically be converted to a one line yaml string.
  74. """
  75. as_strings = []
  76. for key, val in environment_variables.items():
  77. val = json.dumps(val, separators=(",", ":"))
  78. s = "export {}={};".format(key, quote(val))
  79. as_strings.append(s)
  80. all_vars = "".join(as_strings)
  81. return all_vars + cmd
  82. def _with_interactive(cmd):
  83. force_interactive = (
  84. f"source ~/.bashrc; "
  85. f"export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ({cmd})"
  86. )
  87. return ["bash", "--login", "-c", "-i", quote(force_interactive)]
  88. class SSHOptions:
  89. def __init__(self, ssh_key, control_path=None, **kwargs):
  90. self.ssh_key = ssh_key
  91. self.arg_dict = {
  92. # Supresses initial fingerprint verification.
  93. "StrictHostKeyChecking": "no",
  94. # SSH IP and fingerprint pairs no longer added to known_hosts.
  95. # This is to remove a "REMOTE HOST IDENTIFICATION HAS CHANGED"
  96. # warning if a new node has the same IP as a previously
  97. # deleted node, because the fingerprints will not match in
  98. # that case.
  99. "UserKnownHostsFile": os.devnull,
  100. # Try fewer extraneous key pairs.
  101. "IdentitiesOnly": "yes",
  102. # Abort if port forwarding fails (instead of just printing to
  103. # stderr).
  104. "ExitOnForwardFailure": "yes",
  105. # Quickly kill the connection if network connection breaks (as
  106. # opposed to hanging/blocking).
  107. "ServerAliveInterval": 5,
  108. "ServerAliveCountMax": 3,
  109. }
  110. if control_path:
  111. if sys.platform == "win32":
  112. # Don't set any control path options on Windows
  113. pass
  114. else:
  115. self.arg_dict.update(
  116. {
  117. "ControlMaster": "auto",
  118. "ControlPath": "{}/%C".format(control_path),
  119. "ControlPersist": "10s",
  120. }
  121. )
  122. self.arg_dict.update(kwargs)
  123. def to_ssh_options_list(self, *, timeout=60):
  124. self.arg_dict["ConnectTimeout"] = "{}s".format(timeout)
  125. ssh_key_option = ["-i", self.ssh_key] if self.ssh_key else []
  126. return ssh_key_option + [
  127. x
  128. for y in (
  129. ["-o", "{}={}".format(k, v)]
  130. for k, v in self.arg_dict.items()
  131. if v is not None
  132. )
  133. for x in y
  134. ]
  135. class SSHCommandRunner(CommandRunnerInterface):
  136. def __init__(
  137. self,
  138. log_prefix,
  139. node_id,
  140. provider,
  141. auth_config,
  142. cluster_name,
  143. process_runner,
  144. use_internal_ip,
  145. ):
  146. ssh_control_hash = hashlib.sha256(cluster_name.encode()).hexdigest()
  147. ssh_user_hash = hashlib.sha256(getuser().encode()).hexdigest()
  148. if sys.platform == "win32":
  149. # Disable SSH control paths on Windows - currently using it cause socket errors
  150. ssh_control_path = None
  151. else:
  152. ssh_control_path = "/tmp/ray_ssh_{}/{}".format(
  153. ssh_user_hash[:HASH_MAX_LENGTH], ssh_control_hash[:HASH_MAX_LENGTH]
  154. )
  155. self.cluster_name = cluster_name
  156. self.log_prefix = log_prefix
  157. self.process_runner = process_runner
  158. self.node_id = node_id
  159. self.use_internal_ip = use_internal_ip
  160. self.provider = provider
  161. self.ssh_private_key = auth_config.get("ssh_private_key")
  162. self.ssh_user = auth_config["ssh_user"]
  163. self.ssh_control_path = ssh_control_path
  164. self.ssh_ip = None
  165. self.ssh_proxy_command = auth_config.get("ssh_proxy_command", None)
  166. self.ssh_options = SSHOptions(
  167. self.ssh_private_key,
  168. self.ssh_control_path,
  169. ProxyCommand=self.ssh_proxy_command,
  170. )
  171. def _get_node_ip(self):
  172. if self.use_internal_ip:
  173. return self.provider.internal_ip(self.node_id)
  174. else:
  175. return self.provider.external_ip(self.node_id)
  176. def _wait_for_ip(self, deadline):
  177. # if we have IP do not print waiting info
  178. ip = self._get_node_ip()
  179. if ip is not None:
  180. cli_logger.labeled_value("Fetched IP", ip)
  181. return ip
  182. interval = AUTOSCALER_NODE_SSH_INTERVAL_S
  183. with cli_logger.group("Waiting for IP"):
  184. while time.time() < deadline and not self.provider.is_terminated(
  185. self.node_id
  186. ):
  187. ip = self._get_node_ip()
  188. if ip is not None:
  189. cli_logger.labeled_value("Received", ip)
  190. return ip
  191. cli_logger.print(
  192. "Not yet available, retrying in {} seconds", cf.bold(str(interval))
  193. )
  194. time.sleep(interval)
  195. return None
  196. def _set_ssh_ip_if_required(self):
  197. if self.ssh_ip is not None:
  198. return
  199. # We assume that this never changes.
  200. # I think that's reasonable.
  201. deadline = time.time() + AUTOSCALER_NODE_START_WAIT_S
  202. with LogTimer(self.log_prefix + "Got IP"):
  203. ip = self._wait_for_ip(deadline)
  204. cli_logger.doassert(ip is not None, "Could not get node IP.") # todo: msg
  205. assert ip is not None, "Unable to find IP of node"
  206. self.ssh_ip = ip
  207. # This should run before any SSH commands and therefore ensure that
  208. # the ControlPath directory exists, allowing SSH to maintain
  209. # persistent sessions later on.
  210. if self.ssh_control_path is not None:
  211. try:
  212. os.makedirs(self.ssh_control_path, mode=0o700, exist_ok=True)
  213. except OSError as e:
  214. cli_logger.warning("{}", str(e)) # todo: msg
  215. def _run_helper(
  216. self,
  217. final_cmd: List[str],
  218. with_output: bool = False,
  219. exit_on_fail: bool = False,
  220. silent: bool = False,
  221. ):
  222. """Run a command that was already setup with SSH and `bash` settings.
  223. Args:
  224. final_cmd (List[str]):
  225. Full command to run. Should include SSH options and other
  226. processing that we do.
  227. with_output (bool):
  228. If `with_output` is `True`, command stdout will be captured and
  229. returned.
  230. exit_on_fail (bool):
  231. If `exit_on_fail` is `True`, the process will exit
  232. if the command fails (exits with a code other than 0).
  233. silent: If true, the command output will be silenced.
  234. Raises:
  235. ProcessRunnerError: If using new log style and disabled
  236. login shells.
  237. click.ClickException: If using login shells.
  238. """
  239. try:
  240. # For now, if the output is needed we just skip the new logic.
  241. # In the future we could update the new logic to support
  242. # capturing output, but it is probably not needed.
  243. if not with_output:
  244. return run_cmd_redirected(
  245. final_cmd,
  246. process_runner=self.process_runner,
  247. silent=silent,
  248. use_login_shells=is_using_login_shells(),
  249. )
  250. else:
  251. return self.process_runner.check_output(final_cmd)
  252. except subprocess.CalledProcessError as e:
  253. joined_cmd = " ".join(final_cmd)
  254. if not is_using_login_shells():
  255. raise ProcessRunnerError(
  256. "Command failed",
  257. "ssh_command_failed",
  258. code=e.returncode,
  259. command=joined_cmd,
  260. )
  261. if exit_on_fail:
  262. raise click.ClickException(
  263. "Command failed:\n\n {}\n".format(joined_cmd)
  264. ) from None
  265. else:
  266. fail_msg = "SSH command failed."
  267. if is_output_redirected():
  268. fail_msg += " See above for the output from the failure."
  269. raise click.ClickException(fail_msg) from None
  270. finally:
  271. # Do our best to flush output to terminal.
  272. # See https://github.com/ray-project/ray/pull/19473.
  273. sys.stdout.flush()
  274. sys.stderr.flush()
  275. def run(
  276. self,
  277. cmd: Optional[str] = None,
  278. timeout: int = 120,
  279. exit_on_fail: bool = False,
  280. port_forward: Optional[List[Tuple[int, int]]] = None,
  281. with_output: bool = False,
  282. environment_variables: Optional[Dict[str, object]] = None,
  283. run_env: str = "auto", # Unused argument.
  284. ssh_options_override_ssh_key: str = "",
  285. shutdown_after_run: bool = False,
  286. silent: bool = False,
  287. ) -> str:
  288. if shutdown_after_run:
  289. cmd += "; sudo shutdown -h now"
  290. if ssh_options_override_ssh_key:
  291. if self.ssh_proxy_command:
  292. ssh_options = SSHOptions(
  293. ssh_options_override_ssh_key, ProxyCommand=self.ssh_proxy_command
  294. )
  295. else:
  296. ssh_options = SSHOptions(ssh_options_override_ssh_key)
  297. else:
  298. ssh_options = self.ssh_options
  299. assert isinstance(
  300. ssh_options, SSHOptions
  301. ), "ssh_options must be of type SSHOptions, got {}".format(type(ssh_options))
  302. self._set_ssh_ip_if_required()
  303. if is_using_login_shells():
  304. ssh = ["ssh", "-tt"]
  305. else:
  306. ssh = ["ssh"]
  307. if port_forward:
  308. with cli_logger.group("Forwarding ports"):
  309. if not isinstance(port_forward, list):
  310. port_forward = [port_forward]
  311. for local, remote in port_forward:
  312. cli_logger.verbose(
  313. "Forwarding port {} to port {} on localhost.",
  314. cf.bold(local),
  315. cf.bold(remote),
  316. ) # todo: msg
  317. ssh += ["-L", "{}:localhost:{}".format(local, remote)]
  318. final_cmd = (
  319. ssh
  320. + ssh_options.to_ssh_options_list(timeout=timeout)
  321. + ["{}@{}".format(self.ssh_user, self.ssh_ip)]
  322. )
  323. if cmd:
  324. if environment_variables:
  325. cmd = _with_environment_variables(cmd, environment_variables)
  326. if is_using_login_shells():
  327. final_cmd += _with_interactive(cmd)
  328. else:
  329. final_cmd += [cmd]
  330. else:
  331. # We do this because `-o ControlMaster` causes the `-N` flag to
  332. # still create an interactive shell in some ssh versions.
  333. final_cmd.append("while true; do sleep 86400; done")
  334. cli_logger.verbose("Running `{}`", cf.bold(cmd))
  335. with cli_logger.indented():
  336. cli_logger.very_verbose(
  337. "Full command is `{}`", cf.bold(" ".join(final_cmd))
  338. )
  339. if cli_logger.verbosity > 0:
  340. with cli_logger.indented():
  341. return self._run_helper(
  342. final_cmd, with_output, exit_on_fail, silent=silent
  343. )
  344. else:
  345. return self._run_helper(final_cmd, with_output, exit_on_fail, silent=silent)
  346. def _create_rsync_filter_args(self, options):
  347. rsync_excludes = options.get("rsync_exclude") or []
  348. rsync_filters = options.get("rsync_filter") or []
  349. exclude_args = [
  350. ["--exclude", rsync_exclude] for rsync_exclude in rsync_excludes
  351. ]
  352. filter_args = [
  353. ["--filter", "dir-merge,- {}".format(rsync_filter)]
  354. for rsync_filter in rsync_filters
  355. ]
  356. # Combine and flatten the two lists
  357. return [arg for args_list in exclude_args + filter_args for arg in args_list]
  358. def run_rsync_up(self, source, target, options=None):
  359. self._set_ssh_ip_if_required()
  360. options = options or {}
  361. # on windows use scp -r instead of rsync
  362. if sys.platform == "win32":
  363. # Use scp as fallback for Windows
  364. command = ["scp", "-r"]
  365. command += self.ssh_options.to_ssh_options_list(timeout=120)
  366. command += [source, "{}@{}:{}".format(self.ssh_user, self.ssh_ip, target)]
  367. else:
  368. command = ["rsync"]
  369. command += [
  370. "--rsh",
  371. subprocess.list2cmdline(
  372. ["ssh"] + self.ssh_options.to_ssh_options_list(timeout=120)
  373. ),
  374. ]
  375. command += ["-avz"]
  376. command += self._create_rsync_filter_args(options=options)
  377. command += [source, "{}@{}:{}".format(self.ssh_user, self.ssh_ip, target)]
  378. cli_logger.verbose("Running `{}`", cf.bold(" ".join(command)))
  379. self._run_helper(command, silent=is_rsync_silent())
  380. def run_rsync_down(self, source, target, options=None):
  381. self._set_ssh_ip_if_required()
  382. # on Windows use scp -r instead of rsync
  383. if sys.platform == "win32":
  384. # Use scp as fallback for Windows
  385. command = ["scp", "-r"]
  386. command += self.ssh_options.to_ssh_options_list(timeout=120)
  387. command += ["{}@{}:{}".format(self.ssh_user, self.ssh_ip, source), target]
  388. else:
  389. command = ["rsync"]
  390. command += [
  391. "--rsh",
  392. subprocess.list2cmdline(
  393. ["ssh"] + self.ssh_options.to_ssh_options_list(timeout=120)
  394. ),
  395. ]
  396. command += ["-avz"]
  397. command += self._create_rsync_filter_args(options=options)
  398. command += ["{}@{}:{}".format(self.ssh_user, self.ssh_ip, source), target]
  399. cli_logger.verbose("Running `{}`", cf.bold(" ".join(command)))
  400. self._run_helper(command, silent=is_rsync_silent())
  401. def remote_shell_command_str(self):
  402. if self.ssh_private_key:
  403. return "ssh -o IdentitiesOnly=yes -i {} {}@{}\n".format(
  404. self.ssh_private_key, self.ssh_user, self.ssh_ip
  405. )
  406. else:
  407. return "ssh -o IdentitiesOnly=yes {}@{}\n".format(
  408. self.ssh_user, self.ssh_ip
  409. )
  410. class DockerCommandRunner(CommandRunnerInterface):
  411. def __init__(self, docker_config, **common_args):
  412. self.ssh_command_runner = SSHCommandRunner(**common_args)
  413. self.container_name = docker_config["container_name"]
  414. self.docker_config = docker_config
  415. self.home_dir = None
  416. self.initialized = False
  417. # Optionally use 'podman' instead of 'docker'
  418. use_podman = docker_config.get("use_podman", False)
  419. self.docker_cmd = "podman" if use_podman else "docker"
  420. def run(
  421. self,
  422. cmd: Optional[str] = None,
  423. timeout: int = 120,
  424. exit_on_fail: bool = False,
  425. port_forward: Optional[List[Tuple[int, int]]] = None,
  426. with_output: bool = False,
  427. environment_variables: Optional[Dict[str, object]] = None,
  428. run_env: str = "auto",
  429. ssh_options_override_ssh_key: str = "",
  430. shutdown_after_run: bool = False,
  431. ) -> str:
  432. if run_env == "auto":
  433. run_env = (
  434. "host"
  435. if (not bool(cmd) or cmd.find(self.docker_cmd) == 0)
  436. else self.docker_cmd
  437. )
  438. if environment_variables:
  439. cmd = _with_environment_variables(cmd, environment_variables)
  440. if run_env == self.docker_cmd:
  441. cmd = self._docker_expand_user(cmd, any_char=True)
  442. if is_using_login_shells():
  443. cmd = " ".join(_with_interactive(cmd))
  444. cmd = with_docker_exec(
  445. [cmd],
  446. container_name=self.container_name,
  447. with_interactive=is_using_login_shells(),
  448. docker_cmd=self.docker_cmd,
  449. )[0]
  450. if shutdown_after_run:
  451. # sudo shutdown should run after `with_docker_exec` command above
  452. cmd += "; sudo shutdown -h now"
  453. # Do not pass shutdown_after_run argument to ssh_command_runner.run()
  454. # since it is handled above.
  455. return self.ssh_command_runner.run(
  456. cmd,
  457. timeout=timeout,
  458. exit_on_fail=exit_on_fail,
  459. port_forward=port_forward,
  460. with_output=with_output,
  461. ssh_options_override_ssh_key=ssh_options_override_ssh_key,
  462. )
  463. def run_rsync_up(self, source, target, options=None):
  464. options = options or {}
  465. host_destination = os.path.join(
  466. self._get_docker_host_mount_location(self.ssh_command_runner.cluster_name),
  467. target.lstrip("/"),
  468. )
  469. host_mount_location = os.path.dirname(host_destination.rstrip("/"))
  470. if sys.platform == "win32":
  471. # fix paths if running on Windows
  472. source = source.replace("\\", "/")
  473. host_mount_location = host_mount_location.replace("\\", "/")
  474. host_destination = host_destination.replace("\\", "/")
  475. self.ssh_command_runner.run(
  476. f"mkdir -p {host_mount_location} && chown -R "
  477. f"{self.ssh_command_runner.ssh_user} {host_mount_location}",
  478. silent=is_rsync_silent(),
  479. )
  480. self.ssh_command_runner.run_rsync_up(source, host_destination, options=options)
  481. if self._check_container_status() and not options.get(
  482. "docker_mount_if_possible", False
  483. ):
  484. if os.path.isdir(source):
  485. # Adding a "." means that docker copies the *contents*
  486. # Without it, docker copies the source *into* the target
  487. host_destination += "/."
  488. # This path may not exist inside the container. This ensures
  489. # that the path is created!
  490. prefix = with_docker_exec(
  491. [
  492. "mkdir -p {}".format(
  493. os.path.dirname(self._docker_expand_user(target))
  494. )
  495. ],
  496. container_name=self.container_name,
  497. with_interactive=is_using_login_shells(),
  498. docker_cmd=self.docker_cmd,
  499. )[0]
  500. self.ssh_command_runner.run(
  501. "{} && rsync -e '{} exec -i' -avz {} {}:{}".format(
  502. prefix,
  503. self.docker_cmd,
  504. host_destination,
  505. self.container_name,
  506. self._docker_expand_user(target),
  507. ),
  508. silent=is_rsync_silent(),
  509. )
  510. def run_rsync_down(self, source, target, options=None):
  511. options = options or {}
  512. host_source = os.path.join(
  513. self._get_docker_host_mount_location(self.ssh_command_runner.cluster_name),
  514. source.lstrip("/"),
  515. )
  516. host_mount_location = os.path.dirname(host_source.rstrip("/"))
  517. # Convert Windows paths to Unix-style for remote commands
  518. host_mount_location_unix = host_mount_location.replace("\\", "/")
  519. self.ssh_command_runner.run(
  520. f"mkdir -p {host_mount_location_unix} && chown -R "
  521. f"{self.ssh_command_runner.ssh_user} {host_mount_location_unix}",
  522. silent=is_rsync_silent(),
  523. )
  524. if source[-1] == "/":
  525. source += "."
  526. # Adding a "." means that docker copies the *contents*
  527. # Without it, docker copies the source *into* the target
  528. if not options.get("docker_mount_if_possible", False):
  529. # NOTE: `--delete` is okay here because the container is the source
  530. # of truth.
  531. self.ssh_command_runner.run(
  532. "rsync -e '{} exec -i' -avz --delete {}:{} {}".format(
  533. self.docker_cmd,
  534. self.container_name,
  535. self._docker_expand_user(source),
  536. host_source.replace(
  537. "\\", "/"
  538. ), # Convert Windows paths to Unix-style for rsync
  539. ),
  540. silent=is_rsync_silent(),
  541. )
  542. self.ssh_command_runner.run_rsync_down(host_source, target, options=options)
  543. def remote_shell_command_str(self):
  544. inner_str = (
  545. self.ssh_command_runner.remote_shell_command_str()
  546. .replace("ssh", "ssh -tt", 1)
  547. .strip("\n")
  548. )
  549. return inner_str + " {} exec -it {} /bin/bash\n".format(
  550. self.docker_cmd, self.container_name
  551. )
  552. def _check_docker_installed(self):
  553. no_exist = "NoExist"
  554. output = self.ssh_command_runner.run(
  555. f"command -v {self.docker_cmd} || echo '{no_exist}'", with_output=True
  556. )
  557. cleaned_output = output.decode().strip()
  558. if no_exist in cleaned_output or "docker" not in cleaned_output:
  559. if self.docker_cmd == "docker":
  560. install_commands = [
  561. "curl -fsSL https://get.docker.com -o get-docker.sh",
  562. "sudo sh get-docker.sh",
  563. "sudo usermod -aG docker $USER",
  564. "sudo systemctl restart docker -f",
  565. ]
  566. else:
  567. install_commands = [
  568. "sudo apt-get update",
  569. "sudo apt-get -y install podman",
  570. ]
  571. logger.error(
  572. f"{self.docker_cmd.capitalize()} not installed. You can "
  573. f"install {self.docker_cmd.capitalize()} by adding the "
  574. "following commands to 'initialization_commands':\n"
  575. + "\n".join(install_commands)
  576. )
  577. def _check_container_status(self):
  578. if self.initialized:
  579. return True
  580. output = (
  581. self.ssh_command_runner.run(
  582. check_docker_running_cmd(self.container_name, self.docker_cmd),
  583. with_output=True,
  584. )
  585. .decode("utf-8")
  586. .strip()
  587. )
  588. # Checks for the false positive where "true" is in the container name
  589. return "true" in output.lower() and "no such object" not in output.lower()
  590. def _docker_expand_user(self, string, any_char=False):
  591. user_pos = string.find("~")
  592. if user_pos > -1:
  593. if self.home_dir is None:
  594. self.home_dir = (
  595. self.ssh_command_runner.run(
  596. f"{self.docker_cmd} exec {self.container_name} "
  597. "printenv HOME",
  598. with_output=True,
  599. )
  600. .decode("utf-8")
  601. .strip()
  602. )
  603. if any_char:
  604. return string.replace("~/", self.home_dir + "/")
  605. elif not any_char and user_pos == 0:
  606. return string.replace("~", self.home_dir, 1)
  607. return string
  608. def _check_if_container_restart_is_needed(
  609. self, image: str, cleaned_bind_mounts: Dict[str, str]
  610. ) -> bool:
  611. re_init_required = False
  612. running_image = (
  613. self.run(
  614. check_docker_image(self.container_name, self.docker_cmd),
  615. with_output=True,
  616. run_env="host",
  617. )
  618. .decode("utf-8")
  619. .strip()
  620. )
  621. if running_image != image:
  622. cli_logger.error(
  623. "A container with name {} is running image {} instead "
  624. + "of {} (which was provided in the YAML)",
  625. self.container_name,
  626. running_image,
  627. image,
  628. )
  629. mounts = (
  630. self.run(
  631. check_bind_mounts_cmd(self.container_name, self.docker_cmd),
  632. with_output=True,
  633. run_env="host",
  634. )
  635. .decode("utf-8")
  636. .strip()
  637. )
  638. try:
  639. active_mounts = json.loads(mounts)
  640. active_remote_mounts = {
  641. mnt["Destination"].strip("/") for mnt in active_mounts
  642. }
  643. # Ignore ray bootstrap files.
  644. requested_remote_mounts = {
  645. self._docker_expand_user(remote).strip("/")
  646. for remote in cleaned_bind_mounts.keys()
  647. }
  648. unfulfilled_mounts = requested_remote_mounts - active_remote_mounts
  649. if unfulfilled_mounts:
  650. re_init_required = True
  651. cli_logger.warning(
  652. "This Docker Container is already running. "
  653. "Restarting the Docker container on "
  654. "this node to pick up the following file_mounts {}",
  655. unfulfilled_mounts,
  656. )
  657. except json.JSONDecodeError:
  658. cli_logger.verbose(
  659. "Unable to check if file_mounts specified in the YAML "
  660. "differ from those on the running container."
  661. )
  662. return re_init_required
  663. def run_init(
  664. self, *, as_head: bool, file_mounts: Dict[str, str], sync_run_yet: bool
  665. ):
  666. BOOTSTRAP_MOUNTS = ["~/ray_bootstrap_config.yaml", "~/ray_bootstrap_key.pem"]
  667. specific_image = self.docker_config.get(
  668. f"{'head' if as_head else 'worker'}_image", self.docker_config.get("image")
  669. )
  670. self._check_docker_installed()
  671. if self.docker_config.get("pull_before_run", True):
  672. assert specific_image, (
  673. "Image must be included in config if " + "pull_before_run is specified"
  674. )
  675. self.run(
  676. "{} pull {}".format(self.docker_cmd, specific_image), run_env="host"
  677. )
  678. else:
  679. self.run(
  680. f"{self.docker_cmd} image inspect {specific_image} "
  681. "1> /dev/null 2>&1 || "
  682. f"{self.docker_cmd} pull {specific_image}"
  683. )
  684. # Bootstrap files cannot be bind mounted because docker opens the
  685. # underlying inode. When the file is switched, docker becomes outdated.
  686. cleaned_bind_mounts = file_mounts.copy()
  687. for mnt in BOOTSTRAP_MOUNTS:
  688. cleaned_bind_mounts.pop(mnt, None)
  689. docker_run_executed = False
  690. container_running = self._check_container_status()
  691. requires_re_init = False
  692. if container_running:
  693. requires_re_init = self._check_if_container_restart_is_needed(
  694. specific_image, cleaned_bind_mounts
  695. )
  696. if requires_re_init:
  697. docker_stop_cmd = f"{self.docker_cmd} stop {self.container_name}"
  698. logger.info("Executing Docker command: %s", docker_stop_cmd)
  699. self.run(docker_stop_cmd, run_env="host")
  700. if (not container_running) or requires_re_init:
  701. if not sync_run_yet:
  702. # Do not start the actual image as we need to run file_sync
  703. # first to ensure that all folders are created with the
  704. # correct ownership. Docker will create the folders with
  705. # `root` as the owner.
  706. return True
  707. # Get home directory
  708. image_env = (
  709. self.ssh_command_runner.run(
  710. f"{self.docker_cmd} "
  711. + "inspect -f '{{json .Config.Env}}' "
  712. + specific_image,
  713. with_output=True,
  714. )
  715. .decode()
  716. .strip()
  717. )
  718. home_directory = "/root"
  719. try:
  720. for env_var in json.loads(image_env):
  721. if env_var.startswith("HOME="):
  722. home_directory = env_var.split("HOME=")[1]
  723. break
  724. except json.JSONDecodeError as e:
  725. cli_logger.error(
  726. "Unable to deserialize `image_env` to Python object. "
  727. f"The `image_env` is:\n{image_env}"
  728. )
  729. raise e
  730. user_docker_run_options = self.docker_config.get(
  731. "run_options", []
  732. ) + self.docker_config.get(
  733. f"{'head' if as_head else 'worker'}_run_options", []
  734. )
  735. start_command = docker_start_cmds(
  736. self.ssh_command_runner.ssh_user,
  737. specific_image,
  738. cleaned_bind_mounts,
  739. self.container_name,
  740. self._configure_runtime(
  741. self._auto_configure_shm(user_docker_run_options)
  742. ),
  743. self.ssh_command_runner.cluster_name,
  744. home_directory,
  745. self.docker_cmd,
  746. )
  747. self.run(start_command, run_env="host")
  748. docker_run_executed = True
  749. # Explicitly copy in ray bootstrap files.
  750. for mount in BOOTSTRAP_MOUNTS:
  751. if mount in file_mounts:
  752. if not sync_run_yet:
  753. # NOTE(ilr) This rsync is needed because when starting from
  754. # a stopped instance, /tmp may be deleted and `run_init`
  755. # is called before the first `file_sync` happens
  756. self.run_rsync_up(file_mounts[mount], mount)
  757. self.ssh_command_runner.run(
  758. "rsync -e '{cmd} exec -i' -avz {src} {container}:{dst}".format(
  759. cmd=self.docker_cmd,
  760. src=os.path.join(
  761. self._get_docker_host_mount_location(
  762. self.ssh_command_runner.cluster_name
  763. ),
  764. mount,
  765. ).replace(
  766. "\\", "/"
  767. ), # Convert Windows paths to Unix-style for rsync
  768. container=self.container_name,
  769. dst=self._docker_expand_user(mount),
  770. )
  771. )
  772. try:
  773. # Check if the current user has read permission.
  774. # If they do not, try to change ownership!
  775. self.run(
  776. f"cat {mount} >/dev/null 2>&1 || "
  777. f"sudo chown $(id -u):$(id -g) {mount}"
  778. )
  779. except Exception:
  780. lsl_string = (
  781. self.run(f"ls -l {mount}", with_output=True)
  782. .decode("utf-8")
  783. .strip()
  784. )
  785. # The string is of format <Permission> <Links>
  786. # <Owner> <Group> <Size> <Date> <Name>
  787. permissions = lsl_string.split(" ")[0]
  788. owner = lsl_string.split(" ")[2]
  789. group = lsl_string.split(" ")[3]
  790. current_user = (
  791. self.run("whoami", with_output=True).decode("utf-8").strip()
  792. )
  793. cli_logger.warning(
  794. f"File ({mount}) is owned by user:{owner} and group:"
  795. f"{group} with permissions ({permissions}). The "
  796. f"current user ({current_user}) does not have "
  797. "permission to read these files, and Ray may not be "
  798. "able to autoscale. This can be resolved by "
  799. "installing `sudo` in your container, or adding a "
  800. f"command like 'chown {current_user} {mount}' to "
  801. "your `setup_commands`."
  802. )
  803. self.initialized = True
  804. return docker_run_executed
  805. def _configure_runtime(self, run_options: List[str]) -> List[str]:
  806. if self.docker_config.get("disable_automatic_runtime_detection"):
  807. return run_options
  808. runtime_output = (
  809. self.ssh_command_runner.run(
  810. f"{self.docker_cmd} " + "info -f '{{.Runtimes}}' ", with_output=True
  811. )
  812. .decode()
  813. .strip()
  814. )
  815. if "nvidia-container-runtime" in runtime_output:
  816. try:
  817. self.ssh_command_runner.run("nvidia-smi", with_output=False)
  818. return run_options + ["--runtime=nvidia"]
  819. except Exception as e:
  820. logger.warning(
  821. "NVIDIA Container Runtime is present, but no GPUs found."
  822. )
  823. logger.debug(f"nvidia-smi error: {e}")
  824. return run_options
  825. return run_options
  826. def _auto_configure_shm(self, run_options: List[str]) -> List[str]:
  827. if self.docker_config.get("disable_shm_size_detection"):
  828. return run_options
  829. for run_opt in run_options:
  830. if "--shm-size" in run_opt:
  831. logger.info(
  832. "Bypassing automatic SHM-Detection because of "
  833. f"`run_option`: {run_opt}"
  834. )
  835. return run_options
  836. try:
  837. shm_output = (
  838. self.ssh_command_runner.run(
  839. "cat /proc/meminfo || true", with_output=True
  840. )
  841. .decode()
  842. .strip()
  843. )
  844. available_memory = int(
  845. [ln for ln in shm_output.split("\n") if "MemAvailable" in ln][
  846. 0
  847. ].split()[1]
  848. )
  849. available_memory_bytes = available_memory * 1024
  850. # Overestimate SHM size by 10%
  851. shm_size = min(
  852. (available_memory_bytes * DEFAULT_OBJECT_STORE_MEMORY_PROPORTION * 1.1),
  853. DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES,
  854. )
  855. return run_options + [f"--shm-size='{shm_size}b'"]
  856. except Exception as e:
  857. logger.warning(f"Received error while trying to auto-compute SHM size {e}")
  858. return run_options
  859. def _get_docker_host_mount_location(self, cluster_name: str) -> str:
  860. """Return the docker host mount directory location."""
  861. # Imported here due to circular dependency in imports.
  862. from ray.autoscaler.sdk import get_docker_host_mount_location
  863. return get_docker_host_mount_location(cluster_name)