| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652 |
- import os
- import re
- import subprocess
- import sys
- import tarfile
- import tempfile
- import threading
- from concurrent.futures import ThreadPoolExecutor
- from contextlib import contextmanager
- from typing import List, Optional, Sequence, Tuple
- import yaml
- import ray # noqa: F401
- from ray.autoscaler._private.cli_logger import cli_logger
- from ray.autoscaler._private.providers import _get_node_provider
- from ray.autoscaler.tags import NODE_KIND_HEAD, NODE_KIND_WORKER, TAG_RAY_NODE_KIND
- # Import psutil after ray so the packaged version is used.
- import psutil
- MAX_PARALLEL_SSH_WORKERS = 8
- DEFAULT_SSH_USER = "ubuntu"
- DEFAULT_SSH_KEYS = ["~/ray_bootstrap_key.pem", "~/.ssh/ray-autoscaler_2_us-west-2.pem"]
- class CommandFailed(RuntimeError):
- pass
- class LocalCommandFailed(CommandFailed):
- pass
- class RemoteCommandFailed(CommandFailed):
- pass
- class GetParameters:
- def __init__(
- self,
- logs: bool = True,
- debug_state: bool = True,
- pip: bool = True,
- processes: bool = True,
- processes_verbose: bool = True,
- processes_list: Optional[List[Tuple[str, bool]]] = None,
- ):
- self.logs = logs
- self.debug_state = debug_state
- self.pip = pip
- self.processes = processes
- self.processes_verbose = processes_verbose
- self.processes_list = processes_list
- class Node:
- """Node (as in "machine")"""
- def __init__(
- self,
- host: str,
- ssh_user: str = "ubuntu",
- ssh_key: str = "~/ray_bootstrap_key.pem",
- docker_container: Optional[str] = None,
- is_head: bool = False,
- ):
- self.host = host
- self.ssh_user = ssh_user
- self.ssh_key = ssh_key
- self.docker_container = docker_container
- self.is_head = is_head
- class Archive:
- """Archive object to collect and compress files into a single file.
- Objects of this class can be passed around to different data collection
- functions. These functions can use the :meth:`subdir` method to add
- files to a sub directory of the archive.
- """
- def __init__(self, file: Optional[str] = None):
- self.file = file or tempfile.mkstemp(prefix="ray_logs_", suffix=".tar.gz")[1]
- self.tar = None
- self._lock = threading.Lock()
- @property
- def is_open(self):
- return bool(self.tar)
- def open(self):
- self.tar = tarfile.open(self.file, "w:gz")
- def close(self):
- self.tar.close()
- self.tar = None
- def __enter__(self):
- self.open()
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.close()
- @contextmanager
- def subdir(self, subdir: str, root: Optional[str] = "/"):
- """Open a context to add files to the archive.
- Example:
- .. code-block:: python
- with Archive("file.tar.gz") as archive:
- with archive.subdir("logfiles", root="/tmp/logs") as sd:
- # Will be added as `logfiles/nested/file.txt`
- sd.add("/tmp/logs/nested/file.txt")
- Args:
- subdir: Subdir to which to add files to. Calling the
- ``add(path)`` command will place files into the ``subdir``
- directory of the archive.
- root: Root path. Files without an explicit ``arcname``
- will be named relatively to this path.
- Yields:
- A context object that can be used to add files to the archive.
- """
- root = os.path.abspath(root)
- class _Context:
- @staticmethod
- def add(path: str, arcname: Optional[str] = None):
- path = os.path.abspath(path)
- arcname = arcname or os.path.join(subdir, os.path.relpath(path, root))
- self._lock.acquire()
- self.tar.add(path, arcname=arcname)
- self._lock.release()
- yield _Context()
- ###
- # Functions to gather logs and information on the local node
- ###
- def get_local_ray_logs(
- archive: Archive,
- exclude: Optional[Sequence[str]] = None,
- session_log_dir: str = "/tmp/ray/session_latest",
- ) -> Archive:
- """Copy local log files into an archive.
- Args:
- archive: Archive object to add log files to.
- exclude (Sequence[str]): Sequence of regex patterns. Files that match
- any of these patterns will not be included in the archive.
- session_dir: Path to the Ray session files. Defaults to
- ``/tmp/ray/session_latest``
- Returns:
- Open archive object.
- """
- if not archive.is_open:
- archive.open()
- exclude = exclude or []
- session_log_dir = os.path.join(os.path.expanduser(session_log_dir), "logs")
- with archive.subdir("logs", root=session_log_dir) as sd:
- for root, dirs, files in os.walk(session_log_dir):
- for file in files:
- file_path = os.path.join(root, file)
- rel_path = os.path.relpath(file_path, start=session_log_dir)
- # Skip file if it matches any pattern in `exclude`
- if any(re.match(pattern, rel_path) for pattern in exclude):
- continue
- sd.add(file_path)
- return archive
- def get_local_debug_state(
- archive: Archive, session_dir: str = "/tmp/ray/session_latest"
- ) -> Archive:
- """Copy local log files into an archive.
- Args:
- archive: Archive object to add log files to.
- session_dir: Path to the Ray session files. Defaults to
- ``/tmp/ray/session_latest``
- Returns:
- Open archive object.
- """
- if not archive.is_open:
- archive.open()
- session_dir = os.path.expanduser(session_dir)
- debug_state_file = os.path.join(session_dir, "logs/debug_state.txt")
- if not os.path.exists(debug_state_file):
- raise LocalCommandFailed("No `debug_state.txt` file found.")
- with archive.subdir("", root=session_dir) as sd:
- sd.add(debug_state_file)
- return archive
- def get_local_pip_packages(archive: Archive):
- """Get currently installed pip packages and write into an archive.
- Args:
- archive: Archive object to add meta files to.
- Returns:
- Open archive object.
- """
- if not archive.is_open:
- archive.open()
- try:
- from pip._internal.operations import freeze
- except ImportError: # pip < 10.0
- from pip.operations import freeze
- with tempfile.NamedTemporaryFile("wt") as fp:
- for line in freeze.freeze():
- fp.writelines([line, "\n"])
- fp.flush()
- with archive.subdir("") as sd:
- sd.add(fp.name, "pip_packages.txt")
- return archive
- def get_local_ray_processes(
- archive: Archive,
- processes: Optional[List[Tuple[str, bool]]] = None,
- verbose: bool = False,
- ):
- """Get the status of all the relevant ray processes.
- Args:
- archive: Archive object to add process info files to.
- processes: List of processes to get information on. The first
- element of the tuple is a string to filter by, and the second
- element is a boolean indicating if we should filter by command
- name (True) or command line including parameters (False)
- verbose: If True, show entire executable command line.
- If False, show just the first term.
- Returns:
- Open archive object.
- """
- if not processes:
- # local import to avoid circular dependencies
- from ray.autoscaler._private.constants import RAY_PROCESSES
- processes = RAY_PROCESSES
- process_infos = []
- for process in psutil.process_iter(["pid", "name", "cmdline", "status"]):
- try:
- with process.oneshot():
- cmdline = " ".join(process.cmdline())
- process_infos.append(
- (
- {
- "executable": cmdline
- if verbose
- else cmdline.split("--", 1)[0][:-1],
- "name": process.name(),
- "pid": process.pid,
- "status": process.status(),
- },
- process.cmdline(),
- )
- )
- except Exception as exc:
- raise LocalCommandFailed(exc) from exc
- relevant_processes = {}
- for process_dict, cmdline in process_infos:
- for keyword, filter_by_cmd in processes:
- if filter_by_cmd:
- corpus = process_dict["name"]
- else:
- corpus = subprocess.list2cmdline(cmdline)
- if keyword in corpus and process_dict["pid"] not in relevant_processes:
- relevant_processes[process_dict["pid"]] = process_dict
- with tempfile.NamedTemporaryFile("wt") as fp:
- for line in relevant_processes.values():
- fp.writelines([yaml.dump(line), "\n"])
- fp.flush()
- with archive.subdir("meta") as sd:
- sd.add(fp.name, "process_info.txt")
- return archive
- def get_all_local_data(archive: Archive, parameters: GetParameters):
- """Get all local data.
- Gets:
- - The Ray logs of the latest session
- - The currently installed pip packages
- Args:
- archive: Archive object to add meta files to.
- parameters: Parameters (settings) for getting data.
- Returns:
- Open archive object.
- """
- if not archive.is_open:
- archive.open()
- if parameters.logs:
- try:
- get_local_ray_logs(archive=archive)
- except LocalCommandFailed as exc:
- cli_logger.error(exc)
- if parameters.debug_state:
- try:
- get_local_debug_state(archive=archive)
- except LocalCommandFailed as exc:
- cli_logger.error(exc)
- if parameters.pip:
- try:
- get_local_pip_packages(archive=archive)
- except LocalCommandFailed as exc:
- cli_logger.error(exc)
- if parameters.processes:
- try:
- get_local_ray_processes(
- archive=archive,
- processes=parameters.processes_list,
- verbose=parameters.processes_verbose,
- )
- except LocalCommandFailed as exc:
- cli_logger.error(exc)
- return archive
- ###
- # Functions to invoke remote scripts and gather data from remote nodes
- ###
- def _wrap(items: List[str], quotes="'"):
- return f"{quotes}{' '.join(items)}{quotes}"
- def create_and_get_archive_from_remote_node(
- remote_node: Node, parameters: GetParameters, script_path: str = "ray"
- ) -> Optional[str]:
- """Create an archive containing logs on a remote node and transfer.
- This will call ``ray local-dump --stream`` on the remote
- node. The resulting file will be saved locally in a temporary file and
- returned.
- Args:
- remote_node: Remote node to gather archive from.
- script_path: Path to this script on the remote node.
- parameters: Parameters (settings) for getting data.
- Returns:
- Path to a temporary file containing the node's collected data.
- """
- cmd = [
- "ssh",
- "-o StrictHostKeyChecking=no",
- "-o UserKnownHostsFile=/dev/null",
- "-o LogLevel=ERROR",
- "-i",
- remote_node.ssh_key,
- f"{remote_node.ssh_user}@{remote_node.host}",
- ]
- if remote_node.docker_container:
- cmd += [
- "docker",
- "exec",
- remote_node.docker_container,
- ]
- collect_cmd = [script_path, "local-dump", "--stream"]
- collect_cmd += ["--logs"] if parameters.logs else ["--no-logs"]
- collect_cmd += ["--debug-state"] if parameters.debug_state else ["--no-debug-state"]
- collect_cmd += ["--pip"] if parameters.pip else ["--no-pip"]
- collect_cmd += ["--processes"] if parameters.processes else ["--no-processes"]
- if parameters.processes:
- collect_cmd += (
- ["--processes-verbose"]
- if parameters.processes_verbose
- else ["--no-proccesses-verbose"]
- )
- cmd += ["/bin/bash", "-c", _wrap(collect_cmd, quotes='"')]
- cat = "node" if not remote_node.is_head else "head"
- cli_logger.print(f"Collecting data from remote node: {remote_node.host}")
- tmp = tempfile.mkstemp(prefix=f"ray_{cat}_{remote_node.host}_", suffix=".tar.gz")[1]
- with open(tmp, "wb") as fp:
- try:
- subprocess.check_call(cmd, stdout=fp, stderr=sys.stderr)
- except subprocess.CalledProcessError as exc:
- raise RemoteCommandFailed(
- f"Gathering logs from remote node failed: {' '.join(cmd)}"
- ) from exc
- return tmp
- def create_and_add_remote_data_to_local_archive(
- archive: Archive, remote_node: Node, parameters: GetParameters
- ):
- """Create and get data from remote node and add to local archive.
- Args:
- archive: Archive object to add remote data to.
- remote_node: Remote node to gather archive from.
- parameters: Parameters (settings) for getting data.
- Returns:
- Open archive object.
- """
- tmp = create_and_get_archive_from_remote_node(remote_node, parameters)
- if not archive.is_open:
- archive.open()
- cat = "node" if not remote_node.is_head else "head"
- with archive.subdir("", root=os.path.dirname(tmp)) as sd:
- sd.add(tmp, arcname=f"ray_{cat}_{remote_node.host}.tar.gz")
- return archive
- def create_and_add_local_data_to_local_archive(
- archive: Archive, parameters: GetParameters
- ):
- """Create and get data from this node and add to archive.
- Args:
- archive: Archive object to add remote data to.
- parameters: Parameters (settings) for getting data.
- Returns:
- Open archive object.
- """
- with Archive() as local_data_archive:
- get_all_local_data(local_data_archive, parameters)
- if not archive.is_open:
- archive.open()
- with archive.subdir("", root=os.path.dirname(local_data_archive.file)) as sd:
- sd.add(local_data_archive.file, arcname="local_node.tar.gz")
- os.remove(local_data_archive.file)
- return archive
- def create_archive_for_remote_nodes(
- archive: Archive, remote_nodes: Sequence[Node], parameters: GetParameters
- ):
- """Create an archive combining data from the remote nodes.
- This will parallelize calls to get data from remote nodes.
- Args:
- archive: Archive object to add remote data to.
- remote_nodes (Sequence[Node]): Sequence of remote nodes.
- parameters: Parameters (settings) for getting data.
- Returns:
- Open archive object.
- """
- if not archive.is_open:
- archive.open()
- with ThreadPoolExecutor(max_workers=MAX_PARALLEL_SSH_WORKERS) as executor:
- for remote_node in remote_nodes:
- executor.submit(
- create_and_add_remote_data_to_local_archive,
- archive=archive,
- remote_node=remote_node,
- parameters=parameters,
- )
- return archive
- def create_archive_for_local_and_remote_nodes(
- archive: Archive, remote_nodes: Sequence[Node], parameters: GetParameters
- ):
- """Create an archive combining data from the local and remote nodes.
- This will parallelize calls to get data from remote nodes.
- Args:
- archive: Archive object to add data to.
- remote_nodes (Sequence[Node]): Sequence of remote nodes.
- parameters: Parameters (settings) for getting data.
- Returns:
- Open archive object.
- """
- if not archive.is_open:
- archive.open()
- try:
- create_and_add_local_data_to_local_archive(archive, parameters)
- except CommandFailed as exc:
- cli_logger.error(exc)
- create_archive_for_remote_nodes(archive, remote_nodes, parameters)
- cli_logger.print(
- f"Collected data from local node and {len(remote_nodes)} " f"remote nodes."
- )
- return archive
- ###
- # Ray cluster info
- ###
- def get_info_from_ray_cluster_config(
- cluster_config: str,
- ) -> Tuple[List[str], str, str, Optional[str], Optional[str]]:
- """Get information from Ray cluster config.
- Return list of host IPs, ssh user, ssh key file, and optional docker
- container.
- Args:
- cluster_config: Path to ray cluster config.
- Returns:
- Tuple of list of host IPs, ssh user name, ssh key file path,
- optional docker container name, optional cluster name.
- """
- from ray.autoscaler._private.commands import _bootstrap_config
- cli_logger.print(
- f"Retrieving cluster information from ray cluster file: " f"{cluster_config}"
- )
- cluster_config = os.path.expanduser(cluster_config)
- config = yaml.safe_load(open(cluster_config).read())
- config = _bootstrap_config(config, no_config_cache=True)
- provider = _get_node_provider(config["provider"], config["cluster_name"])
- head_nodes = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_HEAD})
- worker_nodes = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
- hosts = [provider.external_ip(node) for node in head_nodes + worker_nodes]
- ssh_user = config["auth"]["ssh_user"]
- ssh_key = config["auth"]["ssh_private_key"]
- docker = None
- docker_config = config.get("docker", None)
- if docker_config:
- docker = docker_config.get("container_name", None)
- cluster_name = config.get("cluster_name", None)
- return hosts, ssh_user, ssh_key, docker, cluster_name
- def _info_from_params(
- cluster: Optional[str] = None,
- host: Optional[str] = None,
- ssh_user: Optional[str] = None,
- ssh_key: Optional[str] = None,
- docker: Optional[str] = None,
- ):
- """Parse command line arguments.
- Note: This returns a list of hosts, not a comma separated string!
- """
- if not host and not cluster:
- bootstrap_config = os.path.expanduser("~/ray_bootstrap_config.yaml")
- if os.path.exists(bootstrap_config):
- cluster = bootstrap_config
- cli_logger.warning(
- f"Detected cluster config file at {cluster}. "
- f"If this is incorrect, specify with "
- f"`ray cluster-dump <config>`"
- )
- elif cluster:
- cluster = os.path.expanduser(cluster)
- cluster_name = None
- if cluster:
- h, u, k, d, cluster_name = get_info_from_ray_cluster_config(cluster)
- ssh_user = ssh_user or u
- ssh_key = ssh_key or k
- docker = docker or d
- hosts = host.split(",") if host else h
- if not hosts:
- raise LocalCommandFailed(
- f"Invalid cluster file or cluster has no running nodes: " f"{cluster}"
- )
- elif host:
- hosts = host.split(",")
- else:
- raise LocalCommandFailed(
- "You need to either specify a `<cluster_config>` or `--host`."
- )
- if not ssh_user:
- ssh_user = DEFAULT_SSH_USER
- cli_logger.warning(
- f"Using default SSH user `{ssh_user}`. "
- f"If this is incorrect, specify with `--ssh-user <user>`"
- )
- if not ssh_key:
- for cand_key in DEFAULT_SSH_KEYS:
- cand_key_file = os.path.expanduser(cand_key)
- if os.path.exists(cand_key_file):
- ssh_key = cand_key_file
- cli_logger.warning(
- f"Auto detected SSH key file: {ssh_key}. "
- f"If this is incorrect, specify with `--ssh-key <key>`"
- )
- break
- return cluster, hosts, ssh_user, ssh_key, docker, cluster_name
|