updater.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. import logging
  2. import os
  3. import subprocess
  4. import time
  5. import traceback
  6. from threading import Thread
  7. import click
  8. from ray._common.usage import usage_constants, usage_lib
  9. from ray.autoscaler._private import subprocess_output_util as cmd_output_util
  10. from ray.autoscaler._private.cli_logger import cf, cli_logger
  11. from ray.autoscaler._private.command_runner import (
  12. AUTOSCALER_NODE_START_WAIT_S,
  13. ProcessRunnerError,
  14. )
  15. from ray.autoscaler._private.constants import (
  16. LABELS_ENVIRONMENT_VARIABLE,
  17. RESOURCES_ENVIRONMENT_VARIABLE,
  18. )
  19. from ray.autoscaler._private.event_system import CreateClusterEvent, global_event_system
  20. from ray.autoscaler._private.log_timer import LogTimer
  21. from ray.autoscaler.tags import (
  22. STATUS_SETTING_UP,
  23. STATUS_SYNCING_FILES,
  24. STATUS_UP_TO_DATE,
  25. STATUS_UPDATE_FAILED,
  26. STATUS_WAITING_FOR_SSH,
  27. TAG_RAY_FILE_MOUNTS_CONTENTS,
  28. TAG_RAY_NODE_STATUS,
  29. TAG_RAY_RUNTIME_CONFIG,
  30. )
  31. logger = logging.getLogger(__name__)
  32. NUM_SETUP_STEPS = 7
  33. READY_CHECK_INTERVAL = 5
  34. class NodeUpdater:
  35. """A process for syncing files and running init commands on a node.
  36. Arguments:
  37. node_id: the Node ID
  38. provider_config: Provider section of autoscaler yaml
  39. provider: NodeProvider Class
  40. auth_config: Auth section of autoscaler yaml
  41. cluster_name: the name of the cluster.
  42. file_mounts: Map of remote to local paths
  43. initialization_commands: Commands run before container launch
  44. setup_commands: Commands run before ray starts
  45. ray_start_commands: Commands to start ray
  46. runtime_hash: Used to check for config changes
  47. file_mounts_contents_hash: Used to check for changes to file mounts
  48. is_head_node: Whether to use head start/setup commands
  49. rsync_options: Extra options related to the rsync command.
  50. process_runner: the module to use to run the commands
  51. in the CommandRunner. E.g., subprocess.
  52. use_internal_ip: Wwhether the node_id belongs to an internal ip
  53. or external ip.
  54. docker_config: Docker section of autoscaler yaml
  55. restart_only: Whether to skip setup commands & just restart ray
  56. for_recovery: True if updater is for a recovering node. Only used for
  57. metric tracking.
  58. """
  59. def __init__(
  60. self,
  61. node_id,
  62. provider_config,
  63. provider,
  64. auth_config,
  65. cluster_name,
  66. file_mounts,
  67. initialization_commands,
  68. setup_commands,
  69. ray_start_commands,
  70. runtime_hash,
  71. file_mounts_contents_hash,
  72. is_head_node,
  73. node_resources=None,
  74. node_labels=None,
  75. cluster_synced_files=None,
  76. rsync_options=None,
  77. process_runner=subprocess,
  78. use_internal_ip=False,
  79. docker_config=None,
  80. restart_only=False,
  81. for_recovery=False,
  82. ):
  83. self.log_prefix = "NodeUpdater: {}: ".format(node_id)
  84. # Three cases:
  85. # 1) use_internal_ip arg is True -> use_internal_ip is True
  86. # 2) worker node -> use value of provider_config["use_internal_ips"]
  87. # 3) head node -> use value of provider_config["use_internal_ips"] unless
  88. # overriden by provider_config["use_external_head_ip"]
  89. use_internal_ip = use_internal_ip or (
  90. provider_config.get("use_internal_ips", False)
  91. and not (
  92. is_head_node and provider_config.get("use_external_head_ip", False)
  93. )
  94. )
  95. self.cmd_runner = provider.get_command_runner(
  96. self.log_prefix,
  97. node_id,
  98. auth_config,
  99. cluster_name,
  100. process_runner,
  101. use_internal_ip,
  102. docker_config,
  103. )
  104. self.daemon = True
  105. self.node_id = node_id
  106. self.provider_type = provider_config.get("type")
  107. self.provider = provider
  108. # Some node providers don't specify empty structures as
  109. # defaults. Better to be defensive.
  110. file_mounts = file_mounts or {}
  111. self.file_mounts = {
  112. remote: os.path.expanduser(local) for remote, local in file_mounts.items()
  113. }
  114. self.initialization_commands = initialization_commands
  115. self.setup_commands = setup_commands
  116. self.ray_start_commands = ray_start_commands
  117. self.node_resources = node_resources
  118. self.node_labels = node_labels
  119. self.runtime_hash = runtime_hash
  120. self.file_mounts_contents_hash = file_mounts_contents_hash
  121. # TODO (Alex): This makes the assumption that $HOME on the head and
  122. # worker nodes is the same. Also note that `cluster_synced_files` is
  123. # set on the head -> worker updaters only (so `expanduser` is only run
  124. # on the head node).
  125. cluster_synced_files = cluster_synced_files or []
  126. self.cluster_synced_files = [
  127. os.path.expanduser(path) for path in cluster_synced_files
  128. ]
  129. self.rsync_options = rsync_options or {}
  130. self.auth_config = auth_config
  131. self.is_head_node = is_head_node
  132. self.docker_config = docker_config
  133. self.restart_only = restart_only
  134. self.update_time = None
  135. self.for_recovery = for_recovery
  136. def run(self):
  137. update_start_time = time.time()
  138. if (
  139. cmd_output_util.does_allow_interactive()
  140. and cmd_output_util.is_output_redirected()
  141. ):
  142. # this is most probably a bug since the user has no control
  143. # over these settings
  144. msg = (
  145. "Output was redirected for an interactive command. "
  146. "Either do not pass `--redirect-command-output` "
  147. "or also pass in `--use-normal-shells`."
  148. )
  149. cli_logger.abort(msg)
  150. try:
  151. with LogTimer(
  152. self.log_prefix + "Applied config {}".format(self.runtime_hash)
  153. ):
  154. self.do_update()
  155. except Exception as e:
  156. self.provider.set_node_tags(
  157. self.node_id, {TAG_RAY_NODE_STATUS: STATUS_UPDATE_FAILED}
  158. )
  159. cli_logger.error("New status: {}", cf.bold(STATUS_UPDATE_FAILED))
  160. cli_logger.error("!!!")
  161. if hasattr(e, "cmd"):
  162. stderr_output = getattr(e, "stderr", "No stderr available")
  163. cli_logger.error(
  164. "Setup command `{}` failed with exit code {}. stderr: {}",
  165. cf.bold(e.cmd),
  166. e.returncode,
  167. stderr_output,
  168. )
  169. else:
  170. cli_logger.verbose_error("Exception details: {}", str(vars(e)))
  171. full_traceback = traceback.format_exc()
  172. cli_logger.error("Full traceback: {}", full_traceback)
  173. # todo: handle this better somehow?
  174. cli_logger.error("Error message: {}", str(e))
  175. cli_logger.error("!!!")
  176. cli_logger.newline()
  177. if isinstance(e, click.ClickException):
  178. # todo: why do we ignore this here
  179. return
  180. raise
  181. tags_to_set = {
  182. TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
  183. TAG_RAY_RUNTIME_CONFIG: self.runtime_hash,
  184. }
  185. if self.file_mounts_contents_hash is not None:
  186. tags_to_set[TAG_RAY_FILE_MOUNTS_CONTENTS] = self.file_mounts_contents_hash
  187. self.provider.set_node_tags(self.node_id, tags_to_set)
  188. cli_logger.labeled_value("New status", STATUS_UP_TO_DATE)
  189. self.update_time = time.time() - update_start_time
  190. self.exitcode = 0
  191. def sync_file_mounts(self, sync_cmd, step_numbers=(0, 2)):
  192. # step_numbers is (# of previous steps, total steps)
  193. previous_steps, total_steps = step_numbers
  194. nolog_paths = []
  195. if cli_logger.verbosity == 0:
  196. nolog_paths = ["~/ray_bootstrap_key.pem", "~/ray_bootstrap_config.yaml"]
  197. def do_sync(remote_path, local_path, allow_non_existing_paths=False):
  198. if allow_non_existing_paths and not os.path.exists(local_path):
  199. cli_logger.print("sync: {} does not exist. Skipping.", local_path)
  200. # Ignore missing source files. In the future we should support
  201. # the --delete-missing-args command to delete files that have
  202. # been removed
  203. return
  204. assert os.path.exists(local_path), local_path
  205. if os.path.isdir(local_path):
  206. if not local_path.endswith("/"):
  207. local_path += "/"
  208. if not remote_path.endswith("/"):
  209. remote_path += "/"
  210. with LogTimer(
  211. self.log_prefix + "Synced {} to {}".format(local_path, remote_path)
  212. ):
  213. is_docker = (
  214. self.docker_config and self.docker_config["container_name"] != ""
  215. )
  216. if not is_docker:
  217. # The DockerCommandRunner handles this internally.
  218. self.cmd_runner.run(
  219. "mkdir -p {}".format(os.path.dirname(remote_path)),
  220. run_env="host",
  221. )
  222. sync_cmd(local_path, remote_path, docker_mount_if_possible=True)
  223. if remote_path not in nolog_paths:
  224. # todo: timed here?
  225. cli_logger.print(
  226. "{} from {}", cf.bold(remote_path), cf.bold(local_path)
  227. )
  228. # Rsync file mounts
  229. with cli_logger.group(
  230. "Processing file mounts", _numbered=("[]", previous_steps + 1, total_steps)
  231. ):
  232. for remote_path, local_path in self.file_mounts.items():
  233. do_sync(remote_path, local_path)
  234. previous_steps += 1
  235. if self.cluster_synced_files:
  236. with cli_logger.group(
  237. "Processing worker file mounts",
  238. _numbered=("[]", previous_steps + 1, total_steps),
  239. ):
  240. cli_logger.print("synced files: {}", str(self.cluster_synced_files))
  241. for path in self.cluster_synced_files:
  242. do_sync(path, path, allow_non_existing_paths=True)
  243. previous_steps += 1
  244. else:
  245. cli_logger.print(
  246. "No worker file mounts to sync",
  247. _numbered=("[]", previous_steps + 1, total_steps),
  248. )
  249. def wait_ready(self, deadline):
  250. with cli_logger.group(
  251. "Waiting for SSH to become available", _numbered=("[]", 1, NUM_SETUP_STEPS)
  252. ):
  253. with LogTimer(self.log_prefix + "Got remote shell"):
  254. cli_logger.print("Running `{}` as a test.", cf.bold("uptime"))
  255. first_conn_refused_time = None
  256. while True:
  257. if time.time() > deadline:
  258. raise Exception("wait_ready timeout exceeded.")
  259. if self.provider.is_terminated(self.node_id):
  260. raise Exception(
  261. "wait_ready aborting because node "
  262. "detected as terminated."
  263. )
  264. try:
  265. # Run outside of the container
  266. self.cmd_runner.run("uptime", timeout=10, run_env="host")
  267. cli_logger.success("Success.")
  268. return True
  269. except ProcessRunnerError as e:
  270. first_conn_refused_time = cmd_output_util.handle_ssh_fails(
  271. e,
  272. first_conn_refused_time,
  273. retry_interval=READY_CHECK_INTERVAL,
  274. )
  275. time.sleep(READY_CHECK_INTERVAL)
  276. except Exception as e:
  277. # TODO(maximsmol): we should not be ignoring
  278. # exceptions if they get filtered properly
  279. # (new style log + non-interactive shells)
  280. #
  281. # however threading this configuration state
  282. # is a pain and I'm leaving it for later
  283. retry_str = "(" + str(e) + ")"
  284. if hasattr(e, "cmd"):
  285. if isinstance(e.cmd, str):
  286. cmd_ = e.cmd
  287. elif isinstance(e.cmd, list):
  288. cmd_ = " ".join(e.cmd)
  289. else:
  290. logger.debug(
  291. f"e.cmd type ({type(e.cmd)}) not list or str."
  292. )
  293. cmd_ = str(e.cmd)
  294. retry_str = "(Exit Status {}): {}".format(
  295. e.returncode, cmd_
  296. )
  297. cli_logger.print(
  298. "SSH still not available {}, retrying in {} seconds.",
  299. cf.dimmed(retry_str),
  300. cf.bold(str(READY_CHECK_INTERVAL)),
  301. )
  302. time.sleep(READY_CHECK_INTERVAL)
  303. def do_update(self):
  304. self.provider.set_node_tags(
  305. self.node_id, {TAG_RAY_NODE_STATUS: STATUS_WAITING_FOR_SSH}
  306. )
  307. cli_logger.labeled_value("New status", STATUS_WAITING_FOR_SSH)
  308. deadline = time.time() + AUTOSCALER_NODE_START_WAIT_S
  309. self.wait_ready(deadline)
  310. global_event_system.execute_callback(CreateClusterEvent.ssh_control_acquired)
  311. node_tags = self.provider.node_tags(self.node_id)
  312. logger.debug("Node tags: {}".format(str(node_tags)))
  313. if self.provider_type == "aws" and self.provider.provider_config:
  314. from ray.autoscaler._private.aws.cloudwatch.cloudwatch_helper import (
  315. CloudwatchHelper,
  316. )
  317. CloudwatchHelper(
  318. self.provider.provider_config, self.node_id, self.provider.cluster_name
  319. ).update_from_config(self.is_head_node)
  320. if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash:
  321. # When resuming from a stopped instance the runtime_hash may be the
  322. # same, but the container will not be started.
  323. init_required = self.cmd_runner.run_init(
  324. as_head=self.is_head_node,
  325. file_mounts=self.file_mounts,
  326. sync_run_yet=False,
  327. )
  328. if init_required:
  329. node_tags[TAG_RAY_RUNTIME_CONFIG] += "-invalidate"
  330. # This ensures that `setup_commands` are not removed
  331. self.restart_only = False
  332. if self.restart_only:
  333. self.setup_commands = []
  334. # runtime_hash will only change whenever the user restarts
  335. # or updates their cluster with `get_or_create_head_node`
  336. if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash and (
  337. not self.file_mounts_contents_hash
  338. or node_tags.get(TAG_RAY_FILE_MOUNTS_CONTENTS)
  339. == self.file_mounts_contents_hash
  340. ):
  341. # todo: we lie in the confirmation message since
  342. # full setup might be cancelled here
  343. cli_logger.print(
  344. "Configuration already up to date, "
  345. "skipping file mounts, initalization and setup commands.",
  346. _numbered=("[]", "2-6", NUM_SETUP_STEPS),
  347. )
  348. else:
  349. cli_logger.print(
  350. "Updating cluster configuration.", _tags=dict(hash=self.runtime_hash)
  351. )
  352. self.provider.set_node_tags(
  353. self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SYNCING_FILES}
  354. )
  355. cli_logger.labeled_value("New status", STATUS_SYNCING_FILES)
  356. self.sync_file_mounts(self.rsync_up, step_numbers=(1, NUM_SETUP_STEPS))
  357. # Only run setup commands if runtime_hash has changed because
  358. # we don't want to run setup_commands every time the head node
  359. # file_mounts folders have changed.
  360. if node_tags.get(TAG_RAY_RUNTIME_CONFIG) != self.runtime_hash:
  361. # Run init commands
  362. self.provider.set_node_tags(
  363. self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SETTING_UP}
  364. )
  365. cli_logger.labeled_value("New status", STATUS_SETTING_UP)
  366. if self.initialization_commands:
  367. with cli_logger.group(
  368. "Running initialization commands",
  369. _numbered=("[]", 4, NUM_SETUP_STEPS),
  370. ):
  371. global_event_system.execute_callback(
  372. CreateClusterEvent.run_initialization_cmd
  373. )
  374. with LogTimer(
  375. self.log_prefix + "Initialization commands",
  376. show_status=True,
  377. ):
  378. for cmd in self.initialization_commands:
  379. global_event_system.execute_callback(
  380. CreateClusterEvent.run_initialization_cmd,
  381. {"command": cmd},
  382. )
  383. try:
  384. # Overriding the existing SSHOptions class
  385. # with a new SSHOptions class that uses
  386. # this ssh_private_key as its only __init__
  387. # argument.
  388. # Run outside docker.
  389. self.cmd_runner.run(
  390. cmd,
  391. ssh_options_override_ssh_key=self.auth_config.get( # noqa: E501
  392. "ssh_private_key"
  393. ),
  394. run_env="host",
  395. )
  396. except ProcessRunnerError as e:
  397. if e.msg_type == "ssh_command_failed":
  398. cli_logger.error("Failed.")
  399. cli_logger.error("See above for stderr.")
  400. raise click.ClickException(
  401. "Initialization command failed."
  402. ) from None
  403. else:
  404. cli_logger.print(
  405. "No initialization commands to run.",
  406. _numbered=("[]", 4, NUM_SETUP_STEPS),
  407. )
  408. with cli_logger.group(
  409. "Initializing command runner",
  410. # todo: fix command numbering
  411. _numbered=("[]", 5, NUM_SETUP_STEPS),
  412. ):
  413. self.cmd_runner.run_init(
  414. as_head=self.is_head_node,
  415. file_mounts=self.file_mounts,
  416. sync_run_yet=True,
  417. )
  418. if self.setup_commands:
  419. with cli_logger.group(
  420. "Running setup commands",
  421. # todo: fix command numbering
  422. _numbered=("[]", 6, NUM_SETUP_STEPS),
  423. ):
  424. global_event_system.execute_callback(
  425. CreateClusterEvent.run_setup_cmd
  426. )
  427. with LogTimer(
  428. self.log_prefix + "Setup commands", show_status=True
  429. ):
  430. total = len(self.setup_commands)
  431. for i, cmd in enumerate(self.setup_commands):
  432. global_event_system.execute_callback(
  433. CreateClusterEvent.run_setup_cmd, {"command": cmd}
  434. )
  435. if cli_logger.verbosity == 0 and len(cmd) > 30:
  436. cmd_to_print = cf.bold(cmd[:30]) + "..."
  437. else:
  438. cmd_to_print = cf.bold(cmd)
  439. cli_logger.print(
  440. "{}", cmd_to_print, _numbered=("()", i, total)
  441. )
  442. try:
  443. # Runs in the container if docker is in use
  444. self.cmd_runner.run(cmd, run_env="auto")
  445. except ProcessRunnerError as e:
  446. if e.msg_type == "ssh_command_failed":
  447. cli_logger.error("Failed.")
  448. cli_logger.error("See above for stderr.")
  449. raise click.ClickException("Setup command failed.")
  450. else:
  451. cli_logger.print(
  452. "No setup commands to run.",
  453. _numbered=("[]", 6, NUM_SETUP_STEPS),
  454. )
  455. with cli_logger.group(
  456. "Starting the Ray runtime", _numbered=("[]", 7, NUM_SETUP_STEPS)
  457. ):
  458. global_event_system.execute_callback(CreateClusterEvent.start_ray_runtime)
  459. with LogTimer(self.log_prefix + "Ray start commands", show_status=True):
  460. for cmd in self.ray_start_commands:
  461. env_vars = {}
  462. if self.is_head_node:
  463. if usage_lib.usage_stats_enabled():
  464. env_vars[usage_constants.USAGE_STATS_ENABLED_ENV_VAR] = 1
  465. else:
  466. # Disable usage stats collection in the cluster.
  467. env_vars[usage_constants.USAGE_STATS_ENABLED_ENV_VAR] = 0
  468. # Add a resource override env variable if needed.
  469. # Local NodeProvider doesn't need resource and label override.
  470. if self.provider_type != "local":
  471. if self.node_resources:
  472. env_vars[
  473. RESOURCES_ENVIRONMENT_VARIABLE
  474. ] = self.node_resources
  475. if self.node_labels:
  476. env_vars[LABELS_ENVIRONMENT_VARIABLE] = self.node_labels
  477. try:
  478. old_redirected = cmd_output_util.is_output_redirected()
  479. cmd_output_util.set_output_redirected(False)
  480. # Runs in the container if docker is in use
  481. self.cmd_runner.run(
  482. cmd, environment_variables=env_vars, run_env="auto"
  483. )
  484. cmd_output_util.set_output_redirected(old_redirected)
  485. except ProcessRunnerError as e:
  486. if e.msg_type == "ssh_command_failed":
  487. cli_logger.error("Failed.")
  488. cli_logger.error("See above for stderr.")
  489. raise click.ClickException("Start command failed.")
  490. global_event_system.execute_callback(
  491. CreateClusterEvent.start_ray_runtime_completed
  492. )
  493. def rsync_up(self, source, target, docker_mount_if_possible=False):
  494. options = {}
  495. options["docker_mount_if_possible"] = docker_mount_if_possible
  496. options["rsync_exclude"] = self.rsync_options.get("rsync_exclude")
  497. options["rsync_filter"] = self.rsync_options.get("rsync_filter")
  498. self.cmd_runner.run_rsync_up(source, target, options=options)
  499. cli_logger.verbose(
  500. "`rsync`ed {} (local) to {} (remote)", cf.bold(source), cf.bold(target)
  501. )
  502. def rsync_down(self, source, target, docker_mount_if_possible=False):
  503. options = {}
  504. options["docker_mount_if_possible"] = docker_mount_if_possible
  505. options["rsync_exclude"] = self.rsync_options.get("rsync_exclude")
  506. options["rsync_filter"] = self.rsync_options.get("rsync_filter")
  507. self.cmd_runner.run_rsync_down(source, target, options=options)
  508. cli_logger.verbose(
  509. "`rsync`ed {} (remote) to {} (local)", cf.bold(source), cf.bold(target)
  510. )
  511. class NodeUpdaterThread(NodeUpdater, Thread):
  512. def __init__(self, *args, **kwargs):
  513. Thread.__init__(self)
  514. NodeUpdater.__init__(self, *args, **kwargs)
  515. self.exitcode = -1