cluster_dump.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. import os
  2. import re
  3. import subprocess
  4. import sys
  5. import tarfile
  6. import tempfile
  7. import threading
  8. from concurrent.futures import ThreadPoolExecutor
  9. from contextlib import contextmanager
  10. from typing import List, Optional, Sequence, Tuple
  11. import yaml
  12. import ray # noqa: F401
  13. from ray.autoscaler._private.cli_logger import cli_logger
  14. from ray.autoscaler._private.providers import _get_node_provider
  15. from ray.autoscaler.tags import NODE_KIND_HEAD, NODE_KIND_WORKER, TAG_RAY_NODE_KIND
  16. # Import psutil after ray so the packaged version is used.
  17. import psutil
  18. MAX_PARALLEL_SSH_WORKERS = 8
  19. DEFAULT_SSH_USER = "ubuntu"
  20. DEFAULT_SSH_KEYS = ["~/ray_bootstrap_key.pem", "~/.ssh/ray-autoscaler_2_us-west-2.pem"]
  21. class CommandFailed(RuntimeError):
  22. pass
  23. class LocalCommandFailed(CommandFailed):
  24. pass
  25. class RemoteCommandFailed(CommandFailed):
  26. pass
  27. class GetParameters:
  28. def __init__(
  29. self,
  30. logs: bool = True,
  31. debug_state: bool = True,
  32. pip: bool = True,
  33. processes: bool = True,
  34. processes_verbose: bool = True,
  35. processes_list: Optional[List[Tuple[str, bool]]] = None,
  36. ):
  37. self.logs = logs
  38. self.debug_state = debug_state
  39. self.pip = pip
  40. self.processes = processes
  41. self.processes_verbose = processes_verbose
  42. self.processes_list = processes_list
  43. class Node:
  44. """Node (as in "machine")"""
  45. def __init__(
  46. self,
  47. host: str,
  48. ssh_user: str = "ubuntu",
  49. ssh_key: str = "~/ray_bootstrap_key.pem",
  50. docker_container: Optional[str] = None,
  51. is_head: bool = False,
  52. ):
  53. self.host = host
  54. self.ssh_user = ssh_user
  55. self.ssh_key = ssh_key
  56. self.docker_container = docker_container
  57. self.is_head = is_head
  58. class Archive:
  59. """Archive object to collect and compress files into a single file.
  60. Objects of this class can be passed around to different data collection
  61. functions. These functions can use the :meth:`subdir` method to add
  62. files to a sub directory of the archive.
  63. """
  64. def __init__(self, file: Optional[str] = None):
  65. self.file = file or tempfile.mkstemp(prefix="ray_logs_", suffix=".tar.gz")[1]
  66. self.tar = None
  67. self._lock = threading.Lock()
  68. @property
  69. def is_open(self):
  70. return bool(self.tar)
  71. def open(self):
  72. self.tar = tarfile.open(self.file, "w:gz")
  73. def close(self):
  74. self.tar.close()
  75. self.tar = None
  76. def __enter__(self):
  77. self.open()
  78. return self
  79. def __exit__(self, exc_type, exc_val, exc_tb):
  80. self.close()
  81. @contextmanager
  82. def subdir(self, subdir: str, root: Optional[str] = "/"):
  83. """Open a context to add files to the archive.
  84. Example:
  85. .. code-block:: python
  86. with Archive("file.tar.gz") as archive:
  87. with archive.subdir("logfiles", root="/tmp/logs") as sd:
  88. # Will be added as `logfiles/nested/file.txt`
  89. sd.add("/tmp/logs/nested/file.txt")
  90. Args:
  91. subdir: Subdir to which to add files to. Calling the
  92. ``add(path)`` command will place files into the ``subdir``
  93. directory of the archive.
  94. root: Root path. Files without an explicit ``arcname``
  95. will be named relatively to this path.
  96. Yields:
  97. A context object that can be used to add files to the archive.
  98. """
  99. root = os.path.abspath(root)
  100. class _Context:
  101. @staticmethod
  102. def add(path: str, arcname: Optional[str] = None):
  103. path = os.path.abspath(path)
  104. arcname = arcname or os.path.join(subdir, os.path.relpath(path, root))
  105. self._lock.acquire()
  106. self.tar.add(path, arcname=arcname)
  107. self._lock.release()
  108. yield _Context()
  109. ###
  110. # Functions to gather logs and information on the local node
  111. ###
  112. def get_local_ray_logs(
  113. archive: Archive,
  114. exclude: Optional[Sequence[str]] = None,
  115. session_log_dir: str = "/tmp/ray/session_latest",
  116. ) -> Archive:
  117. """Copy local log files into an archive.
  118. Args:
  119. archive: Archive object to add log files to.
  120. exclude (Sequence[str]): Sequence of regex patterns. Files that match
  121. any of these patterns will not be included in the archive.
  122. session_dir: Path to the Ray session files. Defaults to
  123. ``/tmp/ray/session_latest``
  124. Returns:
  125. Open archive object.
  126. """
  127. if not archive.is_open:
  128. archive.open()
  129. exclude = exclude or []
  130. session_log_dir = os.path.join(os.path.expanduser(session_log_dir), "logs")
  131. with archive.subdir("logs", root=session_log_dir) as sd:
  132. for root, dirs, files in os.walk(session_log_dir):
  133. for file in files:
  134. file_path = os.path.join(root, file)
  135. rel_path = os.path.relpath(file_path, start=session_log_dir)
  136. # Skip file if it matches any pattern in `exclude`
  137. if any(re.match(pattern, rel_path) for pattern in exclude):
  138. continue
  139. sd.add(file_path)
  140. return archive
  141. def get_local_debug_state(
  142. archive: Archive, session_dir: str = "/tmp/ray/session_latest"
  143. ) -> Archive:
  144. """Copy local log files into an archive.
  145. Args:
  146. archive: Archive object to add log files to.
  147. session_dir: Path to the Ray session files. Defaults to
  148. ``/tmp/ray/session_latest``
  149. Returns:
  150. Open archive object.
  151. """
  152. if not archive.is_open:
  153. archive.open()
  154. session_dir = os.path.expanduser(session_dir)
  155. debug_state_file = os.path.join(session_dir, "logs/debug_state.txt")
  156. if not os.path.exists(debug_state_file):
  157. raise LocalCommandFailed("No `debug_state.txt` file found.")
  158. with archive.subdir("", root=session_dir) as sd:
  159. sd.add(debug_state_file)
  160. return archive
  161. def get_local_pip_packages(archive: Archive):
  162. """Get currently installed pip packages and write into an archive.
  163. Args:
  164. archive: Archive object to add meta files to.
  165. Returns:
  166. Open archive object.
  167. """
  168. if not archive.is_open:
  169. archive.open()
  170. try:
  171. from pip._internal.operations import freeze
  172. except ImportError: # pip < 10.0
  173. from pip.operations import freeze
  174. with tempfile.NamedTemporaryFile("wt") as fp:
  175. for line in freeze.freeze():
  176. fp.writelines([line, "\n"])
  177. fp.flush()
  178. with archive.subdir("") as sd:
  179. sd.add(fp.name, "pip_packages.txt")
  180. return archive
  181. def get_local_ray_processes(
  182. archive: Archive,
  183. processes: Optional[List[Tuple[str, bool]]] = None,
  184. verbose: bool = False,
  185. ):
  186. """Get the status of all the relevant ray processes.
  187. Args:
  188. archive: Archive object to add process info files to.
  189. processes: List of processes to get information on. The first
  190. element of the tuple is a string to filter by, and the second
  191. element is a boolean indicating if we should filter by command
  192. name (True) or command line including parameters (False)
  193. verbose: If True, show entire executable command line.
  194. If False, show just the first term.
  195. Returns:
  196. Open archive object.
  197. """
  198. if not processes:
  199. # local import to avoid circular dependencies
  200. from ray.autoscaler._private.constants import RAY_PROCESSES
  201. processes = RAY_PROCESSES
  202. process_infos = []
  203. for process in psutil.process_iter(["pid", "name", "cmdline", "status"]):
  204. try:
  205. with process.oneshot():
  206. cmdline = " ".join(process.cmdline())
  207. process_infos.append(
  208. (
  209. {
  210. "executable": cmdline
  211. if verbose
  212. else cmdline.split("--", 1)[0][:-1],
  213. "name": process.name(),
  214. "pid": process.pid,
  215. "status": process.status(),
  216. },
  217. process.cmdline(),
  218. )
  219. )
  220. except Exception as exc:
  221. raise LocalCommandFailed(exc) from exc
  222. relevant_processes = {}
  223. for process_dict, cmdline in process_infos:
  224. for keyword, filter_by_cmd in processes:
  225. if filter_by_cmd:
  226. corpus = process_dict["name"]
  227. else:
  228. corpus = subprocess.list2cmdline(cmdline)
  229. if keyword in corpus and process_dict["pid"] not in relevant_processes:
  230. relevant_processes[process_dict["pid"]] = process_dict
  231. with tempfile.NamedTemporaryFile("wt") as fp:
  232. for line in relevant_processes.values():
  233. fp.writelines([yaml.dump(line), "\n"])
  234. fp.flush()
  235. with archive.subdir("meta") as sd:
  236. sd.add(fp.name, "process_info.txt")
  237. return archive
  238. def get_all_local_data(archive: Archive, parameters: GetParameters):
  239. """Get all local data.
  240. Gets:
  241. - The Ray logs of the latest session
  242. - The currently installed pip packages
  243. Args:
  244. archive: Archive object to add meta files to.
  245. parameters: Parameters (settings) for getting data.
  246. Returns:
  247. Open archive object.
  248. """
  249. if not archive.is_open:
  250. archive.open()
  251. if parameters.logs:
  252. try:
  253. get_local_ray_logs(archive=archive)
  254. except LocalCommandFailed as exc:
  255. cli_logger.error(exc)
  256. if parameters.debug_state:
  257. try:
  258. get_local_debug_state(archive=archive)
  259. except LocalCommandFailed as exc:
  260. cli_logger.error(exc)
  261. if parameters.pip:
  262. try:
  263. get_local_pip_packages(archive=archive)
  264. except LocalCommandFailed as exc:
  265. cli_logger.error(exc)
  266. if parameters.processes:
  267. try:
  268. get_local_ray_processes(
  269. archive=archive,
  270. processes=parameters.processes_list,
  271. verbose=parameters.processes_verbose,
  272. )
  273. except LocalCommandFailed as exc:
  274. cli_logger.error(exc)
  275. return archive
  276. ###
  277. # Functions to invoke remote scripts and gather data from remote nodes
  278. ###
  279. def _wrap(items: List[str], quotes="'"):
  280. return f"{quotes}{' '.join(items)}{quotes}"
  281. def create_and_get_archive_from_remote_node(
  282. remote_node: Node, parameters: GetParameters, script_path: str = "ray"
  283. ) -> Optional[str]:
  284. """Create an archive containing logs on a remote node and transfer.
  285. This will call ``ray local-dump --stream`` on the remote
  286. node. The resulting file will be saved locally in a temporary file and
  287. returned.
  288. Args:
  289. remote_node: Remote node to gather archive from.
  290. script_path: Path to this script on the remote node.
  291. parameters: Parameters (settings) for getting data.
  292. Returns:
  293. Path to a temporary file containing the node's collected data.
  294. """
  295. cmd = [
  296. "ssh",
  297. "-o StrictHostKeyChecking=no",
  298. "-o UserKnownHostsFile=/dev/null",
  299. "-o LogLevel=ERROR",
  300. "-i",
  301. remote_node.ssh_key,
  302. f"{remote_node.ssh_user}@{remote_node.host}",
  303. ]
  304. if remote_node.docker_container:
  305. cmd += [
  306. "docker",
  307. "exec",
  308. remote_node.docker_container,
  309. ]
  310. collect_cmd = [script_path, "local-dump", "--stream"]
  311. collect_cmd += ["--logs"] if parameters.logs else ["--no-logs"]
  312. collect_cmd += ["--debug-state"] if parameters.debug_state else ["--no-debug-state"]
  313. collect_cmd += ["--pip"] if parameters.pip else ["--no-pip"]
  314. collect_cmd += ["--processes"] if parameters.processes else ["--no-processes"]
  315. if parameters.processes:
  316. collect_cmd += (
  317. ["--processes-verbose"]
  318. if parameters.processes_verbose
  319. else ["--no-proccesses-verbose"]
  320. )
  321. cmd += ["/bin/bash", "-c", _wrap(collect_cmd, quotes='"')]
  322. cat = "node" if not remote_node.is_head else "head"
  323. cli_logger.print(f"Collecting data from remote node: {remote_node.host}")
  324. tmp = tempfile.mkstemp(prefix=f"ray_{cat}_{remote_node.host}_", suffix=".tar.gz")[1]
  325. with open(tmp, "wb") as fp:
  326. try:
  327. subprocess.check_call(cmd, stdout=fp, stderr=sys.stderr)
  328. except subprocess.CalledProcessError as exc:
  329. raise RemoteCommandFailed(
  330. f"Gathering logs from remote node failed: {' '.join(cmd)}"
  331. ) from exc
  332. return tmp
  333. def create_and_add_remote_data_to_local_archive(
  334. archive: Archive, remote_node: Node, parameters: GetParameters
  335. ):
  336. """Create and get data from remote node and add to local archive.
  337. Args:
  338. archive: Archive object to add remote data to.
  339. remote_node: Remote node to gather archive from.
  340. parameters: Parameters (settings) for getting data.
  341. Returns:
  342. Open archive object.
  343. """
  344. tmp = create_and_get_archive_from_remote_node(remote_node, parameters)
  345. if not archive.is_open:
  346. archive.open()
  347. cat = "node" if not remote_node.is_head else "head"
  348. with archive.subdir("", root=os.path.dirname(tmp)) as sd:
  349. sd.add(tmp, arcname=f"ray_{cat}_{remote_node.host}.tar.gz")
  350. return archive
  351. def create_and_add_local_data_to_local_archive(
  352. archive: Archive, parameters: GetParameters
  353. ):
  354. """Create and get data from this node and add to archive.
  355. Args:
  356. archive: Archive object to add remote data to.
  357. parameters: Parameters (settings) for getting data.
  358. Returns:
  359. Open archive object.
  360. """
  361. with Archive() as local_data_archive:
  362. get_all_local_data(local_data_archive, parameters)
  363. if not archive.is_open:
  364. archive.open()
  365. with archive.subdir("", root=os.path.dirname(local_data_archive.file)) as sd:
  366. sd.add(local_data_archive.file, arcname="local_node.tar.gz")
  367. os.remove(local_data_archive.file)
  368. return archive
  369. def create_archive_for_remote_nodes(
  370. archive: Archive, remote_nodes: Sequence[Node], parameters: GetParameters
  371. ):
  372. """Create an archive combining data from the remote nodes.
  373. This will parallelize calls to get data from remote nodes.
  374. Args:
  375. archive: Archive object to add remote data to.
  376. remote_nodes (Sequence[Node]): Sequence of remote nodes.
  377. parameters: Parameters (settings) for getting data.
  378. Returns:
  379. Open archive object.
  380. """
  381. if not archive.is_open:
  382. archive.open()
  383. with ThreadPoolExecutor(max_workers=MAX_PARALLEL_SSH_WORKERS) as executor:
  384. for remote_node in remote_nodes:
  385. executor.submit(
  386. create_and_add_remote_data_to_local_archive,
  387. archive=archive,
  388. remote_node=remote_node,
  389. parameters=parameters,
  390. )
  391. return archive
  392. def create_archive_for_local_and_remote_nodes(
  393. archive: Archive, remote_nodes: Sequence[Node], parameters: GetParameters
  394. ):
  395. """Create an archive combining data from the local and remote nodes.
  396. This will parallelize calls to get data from remote nodes.
  397. Args:
  398. archive: Archive object to add data to.
  399. remote_nodes (Sequence[Node]): Sequence of remote nodes.
  400. parameters: Parameters (settings) for getting data.
  401. Returns:
  402. Open archive object.
  403. """
  404. if not archive.is_open:
  405. archive.open()
  406. try:
  407. create_and_add_local_data_to_local_archive(archive, parameters)
  408. except CommandFailed as exc:
  409. cli_logger.error(exc)
  410. create_archive_for_remote_nodes(archive, remote_nodes, parameters)
  411. cli_logger.print(
  412. f"Collected data from local node and {len(remote_nodes)} " f"remote nodes."
  413. )
  414. return archive
  415. ###
  416. # Ray cluster info
  417. ###
  418. def get_info_from_ray_cluster_config(
  419. cluster_config: str,
  420. ) -> Tuple[List[str], str, str, Optional[str], Optional[str]]:
  421. """Get information from Ray cluster config.
  422. Return list of host IPs, ssh user, ssh key file, and optional docker
  423. container.
  424. Args:
  425. cluster_config: Path to ray cluster config.
  426. Returns:
  427. Tuple of list of host IPs, ssh user name, ssh key file path,
  428. optional docker container name, optional cluster name.
  429. """
  430. from ray.autoscaler._private.commands import _bootstrap_config
  431. cli_logger.print(
  432. f"Retrieving cluster information from ray cluster file: " f"{cluster_config}"
  433. )
  434. cluster_config = os.path.expanduser(cluster_config)
  435. config = yaml.safe_load(open(cluster_config).read())
  436. config = _bootstrap_config(config, no_config_cache=True)
  437. provider = _get_node_provider(config["provider"], config["cluster_name"])
  438. head_nodes = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_HEAD})
  439. worker_nodes = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
  440. hosts = [provider.external_ip(node) for node in head_nodes + worker_nodes]
  441. ssh_user = config["auth"]["ssh_user"]
  442. ssh_key = config["auth"]["ssh_private_key"]
  443. docker = None
  444. docker_config = config.get("docker", None)
  445. if docker_config:
  446. docker = docker_config.get("container_name", None)
  447. cluster_name = config.get("cluster_name", None)
  448. return hosts, ssh_user, ssh_key, docker, cluster_name
  449. def _info_from_params(
  450. cluster: Optional[str] = None,
  451. host: Optional[str] = None,
  452. ssh_user: Optional[str] = None,
  453. ssh_key: Optional[str] = None,
  454. docker: Optional[str] = None,
  455. ):
  456. """Parse command line arguments.
  457. Note: This returns a list of hosts, not a comma separated string!
  458. """
  459. if not host and not cluster:
  460. bootstrap_config = os.path.expanduser("~/ray_bootstrap_config.yaml")
  461. if os.path.exists(bootstrap_config):
  462. cluster = bootstrap_config
  463. cli_logger.warning(
  464. f"Detected cluster config file at {cluster}. "
  465. f"If this is incorrect, specify with "
  466. f"`ray cluster-dump <config>`"
  467. )
  468. elif cluster:
  469. cluster = os.path.expanduser(cluster)
  470. cluster_name = None
  471. if cluster:
  472. h, u, k, d, cluster_name = get_info_from_ray_cluster_config(cluster)
  473. ssh_user = ssh_user or u
  474. ssh_key = ssh_key or k
  475. docker = docker or d
  476. hosts = host.split(",") if host else h
  477. if not hosts:
  478. raise LocalCommandFailed(
  479. f"Invalid cluster file or cluster has no running nodes: " f"{cluster}"
  480. )
  481. elif host:
  482. hosts = host.split(",")
  483. else:
  484. raise LocalCommandFailed(
  485. "You need to either specify a `<cluster_config>` or `--host`."
  486. )
  487. if not ssh_user:
  488. ssh_user = DEFAULT_SSH_USER
  489. cli_logger.warning(
  490. f"Using default SSH user `{ssh_user}`. "
  491. f"If this is incorrect, specify with `--ssh-user <user>`"
  492. )
  493. if not ssh_key:
  494. for cand_key in DEFAULT_SSH_KEYS:
  495. cand_key_file = os.path.expanduser(cand_key)
  496. if os.path.exists(cand_key_file):
  497. ssh_key = cand_key_file
  498. cli_logger.warning(
  499. f"Auto detected SSH key file: {ssh_key}. "
  500. f"If this is incorrect, specify with `--ssh-key <key>`"
  501. )
  502. break
  503. return cluster, hosts, ssh_user, ssh_key, docker, cluster_name