| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- """Symmetric Run for Ray."""
- import socket
- import subprocess
- import sys
- import time
- from typing import List
- import click
- import ray
- from ray._private.ray_constants import env_integer
- from ray._raylet import GcsClient
- from ray.exceptions import RpcError
- import psutil
- CLUSTER_WAIT_TIMEOUT = env_integer("RAY_SYMMETRIC_RUN_CLUSTER_WAIT_TIMEOUT", 30)
- def check_ray_already_started() -> bool:
- import ray._private.services as services
- # Try auto-detecting the Ray instance.
- running_gcs_addresses = services.find_gcs_addresses()
- return len(running_gcs_addresses) > 0
- def check_cluster_ready(nnodes, timeout=CLUSTER_WAIT_TIMEOUT):
- """Wait for all nodes to start.
- Raises an exception if the nodes don't start in time.
- """
- start_time = time.time()
- current_nodes = 1
- ray.init(ignore_reinit_error=True)
- while time.time() - start_time < timeout:
- time.sleep(5)
- current_nodes = len(ray.nodes())
- if current_nodes == nnodes:
- return True
- else:
- click.echo(
- f"Waiting for nodes to start... {current_nodes}/{nnodes} nodes started"
- )
- return False
- def check_head_node_ready(address: str, timeout=CLUSTER_WAIT_TIMEOUT):
- start_time = time.time()
- gcs_client = GcsClient(address=address)
- while time.time() - start_time < timeout:
- try:
- gcs_client.check_alive([], timeout=1)
- click.echo("Ray cluster is ready!")
- return True
- except RpcError:
- pass
- time.sleep(5)
- return False
- def curate_and_validate_ray_start_args(run_and_start_args: List[str]) -> List[str]:
- # Reparse the arguments to remove symmetric_run arguments.
- ctx = symmetric_run.make_context("_", run_and_start_args, resilient_parsing=True)
- cleaned_args = list(ctx.params["ray_args_and_entrypoint"])
- for arg in cleaned_args:
- if arg == "--head":
- raise click.ClickException("Cannot use --head option in symmetric_run.")
- if arg == "--node-ip-address":
- raise click.ClickException(
- "Cannot use --node-ip-address option in symmetric_run."
- )
- if arg == "--port":
- raise click.ClickException("Cannot use --port option in symmetric_run.")
- if arg == "--block":
- raise click.ClickException("Cannot use --block option in symmetric_run.")
- return cleaned_args
- @click.command(
- name="symmetric_run",
- context_settings={"ignore_unknown_options": True, "allow_extra_args": True},
- help="""Command to start Ray across all nodes and execute an entrypoint command.
- USAGE:
- ray symmetric-run --address ADDRESS
- [--min-nodes NUM_NODES] [RAY_START_OPTIONS] -- [ENTRYPOINT_COMMAND]
- DESCRIPTION:
- This command (1) starts a Ray cluster across all nodes,
- (2) runs a command on the head node, and (3) stops the Ray cluster.
- The '--' separator is required to distinguish between Ray start arguments
- and the entrypoint command. The --min-nodes option is optional and
- can be used to wait for a specific number of nodes to start.
- EXAMPLES:
- # Start Ray with default settings and run a Python script
- ray symmetric-run --address 127.0.0.1:6379 -- python my_script.py
- # Start Ray with specific head node and run a command
- ray symmetric-run --address 127.0.0.1:6379 --min-nodes 4 -- python train_model.py --epochs=100
- # Start Ray and run a multi-word command
- ray symmetric-run --address 127.0.0.1:6379 --min-nodes 4 --num-cpus=4 -- python -m my_module --config=prod
- RAY START OPTIONS:
- Most ray start command options are supported. Arguments that are not
- supported are: --head, --node-ip-address, --port, --block.
- SEPARATOR REQUIREMENT:
- The '--' separator is mandatory and must appear between Ray start
- arguments and the entrypoint command. This ensures clear separation
- between the two sets of arguments.
- """,
- )
- @click.option(
- "--address", required=True, type=str, help="The address of the Ray cluster."
- )
- @click.option(
- "--min-nodes",
- type=int,
- help="If provided, wait for this number of nodes to start.",
- )
- @click.argument("ray_args_and_entrypoint", nargs=-1, type=click.UNPROCESSED)
- def symmetric_run(address, min_nodes, ray_args_and_entrypoint):
- all_args = sys.argv[1:]
- if all_args and all_args[0] == "symmetric-run":
- all_args = all_args[1:]
- try:
- separator = all_args.index("--")
- except ValueError:
- raise click.ClickException(
- "No separator '--' found in arguments. Please use '--' to "
- "separate Ray start arguments and the entrypoint command."
- )
- run_and_start_args, entrypoint_on_head = (
- all_args[:separator],
- all_args[separator + 1 :],
- )
- ray_start_args = curate_and_validate_ray_start_args(run_and_start_args)
- min_nodes = 1 if min_nodes is None else min_nodes
- if not entrypoint_on_head:
- raise click.ClickException("No entrypoint command provided.")
- if check_ray_already_started():
- raise click.ClickException("Ray is already started on this node.")
- # 1. Parse address and check if we are on the head node.
- gcs_host_port = ray._common.network_utils.parse_address(address)
- if gcs_host_port is None:
- raise click.ClickException(
- f"Invalid address format: {address}, should be `host:port`"
- )
- gcs_host, gcs_port = gcs_host_port
- try:
- # AF_UNSPEC allows resolving both IPv4 and IPv6
- addrinfo = socket.getaddrinfo(
- gcs_host, gcs_port, socket.AF_UNSPEC, socket.SOCK_STREAM
- )
- resolved_gcs_host = addrinfo[0][4][0]
- except socket.gaierror:
- raise click.ClickException(f"Could not resolve hostname: {gcs_host}")
- my_ips = []
- for iface, addrs in psutil.net_if_addrs().items():
- for addr in addrs:
- # Look for AF_INET (IPv4) or AF_INET6 (IPv6)
- if addr.family in [
- socket.AddressFamily.AF_INET,
- socket.AddressFamily.AF_INET6,
- ]:
- my_ips.append(addr.address)
- if min_nodes > 1:
- # Ban localhost ips if we are not running on a single node
- # to avoid starting N head nodes
- my_ips = [ip for ip in my_ips if ip != "127.0.0.1" and ip != "::1"]
- is_head = resolved_gcs_host in my_ips
- result = None
- # 2. Start Ray and run commands.
- try:
- if is_head:
- # On the head node, start Ray, run the command, then stop Ray.
- click.echo("On head node. Starting Ray cluster head...")
- # Build the ray start command with all parameters
- ray_start_cmd = [
- "ray",
- "start",
- "--head",
- f"--node-ip-address={resolved_gcs_host}",
- f"--port={gcs_port}",
- *ray_start_args,
- ]
- # Start Ray head. This runs in the background and hides output.
- subprocess.run(ray_start_cmd, check=True, capture_output=True)
- click.echo("Head node started.")
- click.echo("=======================")
- if min_nodes > 1 and not check_cluster_ready(min_nodes):
- raise click.ClickException(
- "Timed out waiting for other nodes to start."
- )
- click.echo(
- f"Running command on head node: {entrypoint_on_head}",
- )
- click.echo("=======================")
- result = subprocess.run(entrypoint_on_head)
- click.echo("=======================")
- else:
- # On a worker node, start Ray and connect to the head.
- click.echo(f"On worker node. Connecting to Ray cluster at {address}...")
- if not check_head_node_ready(address):
- raise click.ClickException("Timed out waiting for head node to start.")
- # Build the ray start command for worker nodes with all parameters
- ray_start_cmd = [
- "ray",
- "start",
- "--address",
- address,
- "--block",
- *ray_start_args,
- ]
- # This command will block until the Ray cluster is stopped.
- subprocess.run(ray_start_cmd, check=True)
- except subprocess.CalledProcessError as e:
- click.echo(f"Failed to start Ray: {e}", err=True)
- if e.stdout:
- click.echo(f"stdout:\n{e.stdout.decode()}", err=True)
- if e.stderr:
- click.echo(f"stderr:\n{e.stderr.decode()}", err=True)
- except KeyboardInterrupt:
- # This can be triggered by ctrl-c on the user's side.
- click.echo("Interrupted by user.", err=True)
- finally:
- # Stop Ray cluster.
- subprocess.run(["ray", "stop"])
- # Propagate the exit code of the user script.
- if result is not None and result.returncode != 0:
- click.echo(f"Command failed with return code {result.returncode}", err=True)
- sys.exit(result.returncode)
- if __name__ == "__main__":
- symmetric_run()
|