| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830 |
- import atexit
- import collections
- import datetime
- import errno
- import json
- import logging
- import os
- import random
- import signal
- import socket
- import subprocess
- import sys
- import tempfile
- import threading
- import time
- import traceback
- from typing import IO, AnyStr, Optional, Tuple
- import ray
- import ray._private.ray_constants as ray_constants
- import ray._private.services
- from ray._common.network_utils import (
- build_address,
- get_localhost_ip,
- is_ipv6,
- parse_address,
- )
- from ray._common.ray_constants import LOGGING_ROTATE_BACKUP_COUNT, LOGGING_ROTATE_BYTES
- from ray._common.utils import try_to_create_directory
- from ray._private.resource_and_label_spec import ResourceAndLabelSpec
- from ray._private.resource_isolation_config import ResourceIsolationConfig
- from ray._private.services import get_address, serialize_config
- from ray._private.utils import (
- is_in_test,
- open_log,
- try_to_symlink,
- validate_socket_filepath,
- )
- from ray._raylet import (
- GCS_SERVER_PORT_NAME,
- GcsClient,
- get_port_filename,
- get_session_key_from_storage,
- wait_for_persisted_port,
- )
- import psutil
- # Logger for this module. It should be configured at the entry point
- # into the program using Ray. Ray configures it by default automatically
- # using logging.basicConfig in its entry/init points.
- logger = logging.getLogger(__name__)
- class Node:
- """An encapsulation of the Ray processes on a single node.
- This class is responsible for starting Ray processes and killing them,
- and it also controls the temp file policy.
- Attributes:
- all_processes: A mapping from process type (str) to a list of
- ProcessInfo objects. All lists have length one except for the Redis
- server list, which has multiple.
- """
- def __init__(
- self,
- ray_params,
- head: bool = False,
- shutdown_at_exit: bool = True,
- spawn_reaper: bool = True,
- connect_only: bool = False,
- default_worker: bool = False,
- ray_init_cluster: bool = False,
- ):
- """Start a node.
- Args:
- ray_params: The RayParams to use to configure the node.
- head: True if this is the head node, which means it will
- start additional processes like the Redis servers, monitor
- processes, and web UI.
- shutdown_at_exit: If true, spawned processes will be cleaned
- up if this process exits normally.
- spawn_reaper: If true, spawns a process that will clean up
- other spawned processes if this process dies unexpectedly.
- connect_only: If true, connect to the node without starting
- new processes.
- default_worker: Whether it's running from a ray worker or not
- ray_init_cluster: Whether it's a cluster created by ray.init()
- """
- if shutdown_at_exit:
- if connect_only:
- raise ValueError(
- "'shutdown_at_exit' and 'connect_only' cannot both be true."
- )
- self._register_shutdown_hooks()
- self._default_worker = default_worker
- self.head = head
- self.kernel_fate_share = bool(
- spawn_reaper and ray._private.utils.detect_fate_sharing_support()
- )
- self.resource_isolation_config: ResourceIsolationConfig = (
- ray_params.resource_isolation_config
- )
- self.all_processes: dict = {}
- self.removal_lock = threading.Lock()
- self.ray_init_cluster = ray_init_cluster
- if ray_init_cluster:
- assert head, "ray.init() created cluster only has the head node"
- # Set up external Redis when `RAY_REDIS_ADDRESS` is specified.
- redis_address_env = os.environ.get("RAY_REDIS_ADDRESS")
- if ray_params.external_addresses is None and redis_address_env is not None:
- external_redis = redis_address_env.split(",")
- # Reuse primary Redis as Redis shard when there's only one
- # instance provided.
- if len(external_redis) == 1:
- external_redis.append(external_redis[0])
- ray_params.external_addresses = external_redis
- ray_params.num_redis_shards = len(external_redis) - 1
- if (
- ray_params._system_config
- and len(ray_params._system_config) > 0
- and (not head and not connect_only)
- ):
- raise ValueError(
- "System config parameters can only be set on the head node."
- )
- ray_params.update_if_absent(
- include_log_monitor=True,
- resources={},
- worker_path=os.path.join(
- os.path.dirname(os.path.abspath(__file__)),
- "workers",
- "default_worker.py",
- ),
- setup_worker_path=os.path.join(
- os.path.dirname(os.path.abspath(__file__)),
- "workers",
- ray_constants.SETUP_WORKER_FILENAME,
- ),
- )
- self._resource_and_label_spec = None
- self._localhost = get_localhost_ip()
- self._ray_params = ray_params
- self._config = ray_params._system_config or {}
- # Configure log rotation parameters.
- self.max_bytes = int(os.getenv("RAY_ROTATION_MAX_BYTES", LOGGING_ROTATE_BYTES))
- self.backup_count = int(
- os.getenv("RAY_ROTATION_BACKUP_COUNT", LOGGING_ROTATE_BACKUP_COUNT)
- )
- assert self.max_bytes >= 0
- assert self.backup_count >= 0
- self._redis_address = ray_params.redis_address
- if head:
- ray_params.update_if_absent(num_redis_shards=1)
- self._gcs_address = ray_params.gcs_address
- self._gcs_client = None
- if not self.head:
- self.validate_ip_port(self.address)
- self._init_gcs_client()
- # Register the temp dir.
- self._session_name = ray_params.session_name
- if self._session_name is None:
- if head:
- # We expect this the first time we initialize a cluster, but not during
- # subsequent restarts of the head node.
- maybe_key = self.check_persisted_session_name()
- if maybe_key is None:
- # date including microsecond
- date_str = datetime.datetime.today().strftime(
- "%Y-%m-%d_%H-%M-%S_%f"
- )
- self._session_name = f"session_{date_str}_{os.getpid()}"
- else:
- self._session_name = ray._common.utils.decode(maybe_key)
- else:
- assert not self._default_worker
- session_name = ray._private.utils.internal_kv_get_with_retry(
- self.get_gcs_client(),
- "session_name",
- ray_constants.KV_NAMESPACE_SESSION,
- num_retries=ray_constants.NUM_REDIS_GET_RETRIES,
- )
- self._session_name = ray._common.utils.decode(session_name)
- # Initialize webui url
- if head:
- self._webui_url = None
- else:
- if ray_params.webui is None:
- assert not self._default_worker
- self._webui_url = ray._private.services.get_webui_url_from_internal_kv()
- else:
- self._webui_url = build_address(
- ray_params.dashboard_host, ray_params.dashboard_port
- )
- # Resolve node to connect to
- node_to_connect_info = None
- if connect_only and not self._default_worker:
- node_to_connect_info = ray._private.services.get_node_to_connect_for_driver(
- self.get_gcs_client(),
- node_ip_address=ray_params.node_ip_address,
- node_name=ray_params.node_name,
- temp_dir=ray_params.temp_dir,
- )
- # Resolve node ID
- if connect_only:
- self._node_id = ray_params.node_id
- if self._node_id is None:
- self._node_id = node_to_connect_info.node_id.hex()
- else:
- if (
- self._ray_params.env_vars is not None
- and "RAY_OVERRIDE_NODE_ID_FOR_TESTING" in self._ray_params.env_vars
- ):
- node_id = self._ray_params.env_vars["RAY_OVERRIDE_NODE_ID_FOR_TESTING"]
- logger.debug(
- f"Setting node ID to {node_id} "
- "based on ray_params.env_vars override"
- )
- self._node_id = node_id
- elif os.environ.get("RAY_OVERRIDE_NODE_ID_FOR_TESTING"):
- node_id = os.environ["RAY_OVERRIDE_NODE_ID_FOR_TESTING"]
- logger.debug(f"Setting node ID to {node_id} based on env override")
- self._node_id = node_id
- else:
- node_id = ray.NodeID.from_random().hex()
- logger.debug(f"Setting node ID to {node_id}")
- self._node_id = node_id
- # Resolve node ip address
- node_ip_address = ray_params.node_ip_address
- if node_ip_address is None:
- if connect_only:
- assert node_to_connect_info is not None
- node_ip_address = getattr(
- node_to_connect_info, "node_manager_address", None
- )
- else:
- node_ip_address = ray.util.get_node_ip_address()
- assert node_ip_address is not None
- ray_params.update_if_absent(node_ip_address=node_ip_address)
- self._node_ip_address = node_ip_address
- # It creates a session_dir.
- self._init_temp()
- # Resolve socket and port names
- if connect_only:
- # Get socket names from the configuration.
- self._plasma_store_socket_name = ray_params.plasma_store_socket_name
- self._raylet_socket_name = ray_params.raylet_socket_name
- # If user does not provide the socket name, get it from GCS.
- if (
- self._plasma_store_socket_name is None
- or self._raylet_socket_name is None
- or self._ray_params.node_manager_port is None
- ):
- # Get the address info of the processes to connect to
- # from Redis or GCS.
- assert node_to_connect_info is not None
- self._plasma_store_socket_name = (
- node_to_connect_info.object_store_socket_name
- )
- self._raylet_socket_name = node_to_connect_info.raylet_socket_name
- self._ray_params.node_manager_port = (
- node_to_connect_info.node_manager_port
- )
- else:
- # If the user specified a socket name, use it.
- self._plasma_store_socket_name = self._prepare_socket_file(
- self._ray_params.plasma_store_socket_name, default_prefix="plasma_store"
- )
- self._raylet_socket_name = self._prepare_socket_file(
- self._ray_params.raylet_socket_name, default_prefix="raylet"
- )
- self._object_spilling_config = self._get_object_spilling_config()
- logger.debug(
- f"Starting node with object spilling config: {self._object_spilling_config}"
- )
- # Obtain the fallback directoy from the object spilling config
- # Currently, we set the fallback directory to be the same as the object spilling
- # path when the object spills to file system
- self._fallback_directory = None
- if self._object_spilling_config:
- config = json.loads(self._object_spilling_config)
- if config.get("type") == "filesystem":
- directory_path = config.get("params", {}).get("directory_path")
- if isinstance(directory_path, list):
- self._fallback_directory = directory_path[0]
- elif isinstance(directory_path, str):
- self._fallback_directory = directory_path
- # If it is a head node, try validating if external storage is configurable.
- if head:
- self.validate_external_storage()
- ray_params.update_if_absent(
- metrics_agent_port=ray_params.metrics_agent_port or 0,
- metrics_export_port=ray_params.metrics_export_port or 0,
- dashboard_agent_listen_port=ray_params.dashboard_agent_listen_port or 0,
- runtime_env_agent_port=ray_params.runtime_env_agent_port or 0,
- )
- # Pick a GCS server port.
- if head:
- # For GCS fault tolerance: if the port file already exists in the
- # current session directory, this indicates a GCS restart scenario.
- # We reuse the existing port so that other components can reconnect
- # to GCS after it restarts.
- gcs_port_filename = get_port_filename(self._node_id, GCS_SERVER_PORT_NAME)
- gcs_port_file = os.path.join(self._session_dir, gcs_port_filename)
- if os.path.exists(gcs_port_file):
- gcs_port = wait_for_persisted_port(
- self._session_dir,
- self._node_id,
- GCS_SERVER_PORT_NAME,
- timeout_ms=0,
- )
- ray_params.update_if_absent(gcs_server_port=gcs_port)
- else:
- gcs_server_port = os.getenv(ray_constants.GCS_PORT_ENVIRONMENT_VARIABLE)
- ray_params.update_if_absent(
- gcs_server_port=int(gcs_server_port) if gcs_server_port else 0
- )
- if not connect_only and spawn_reaper and not self.kernel_fate_share:
- self.start_reaper_process()
- if not connect_only:
- self._ray_params.update_pre_selected_port()
- # Start processes.
- if head:
- self.start_head_processes()
- node_info = None
- if not connect_only:
- self.start_ray_processes()
- # Wait for the node info to be available in the GCS so that
- # we know it's started up.
- # Grace period to let the Raylet register with the GCS.
- # We retry in a loop in case it takes longer than expected.
- time.sleep(0.1)
- start_time = time.monotonic()
- raylet_start_wait_time_s = 30
- while True:
- try:
- # Will raise a RuntimeError if the node info is not available.
- node_info = ray._private.services.get_node(
- self.gcs_address,
- self._node_id,
- )
- break
- except RuntimeError as e:
- logger.info(f"Failed to get node info {e}")
- if time.monotonic() - start_time > raylet_start_wait_time_s:
- raise Exception(
- "The current node timed out during startup. This "
- "could happen because some of the raylet failed to "
- "startup or the GCS has become overloaded."
- )
- if connect_only:
- # Fetch node info to get labels.
- node_info = ray._private.services.get_node(
- self.gcs_address,
- self._node_id,
- )
- # Set node labels from GCS if provided at node init.
- self._node_labels = node_info.get("labels", {})
- # port can be 0 or None for two cases:
- # 1. user is starting a new ray cluster and does not specify the port, components self-bind.
- # 2. user is connecting to an existing ray cluster, no port info is provided.
- # We always update port info from GCS to ensure consistency.
- self._ray_params.node_manager_port = node_info["node_manager_port"]
- self._ray_params.runtime_env_agent_port = node_info["runtime_env_agent_port"]
- self._ray_params.metrics_agent_port = node_info["metrics_agent_port"]
- self._ray_params.metrics_export_port = node_info["metrics_export_port"]
- self._ray_params.dashboard_agent_listen_port = node_info[
- "dashboard_agent_listen_port"
- ]
- # Makes sure the Node object has valid addresses after setup.
- self.validate_ip_port(self.address)
- self.validate_ip_port(self.gcs_address)
- if not connect_only:
- self._record_stats()
- def check_persisted_session_name(self):
- if self._ray_params.external_addresses is None:
- return None
- self._redis_address = self._ray_params.external_addresses[0]
- redis_ip_address, redis_port, enable_redis_ssl = get_address(
- self._redis_address,
- )
- # Address is ip:port or redis://ip:port
- if int(redis_port) < 0:
- raise ValueError(
- f"Invalid Redis port provided: {redis_port}."
- "The port must be a non-negative integer."
- )
- return get_session_key_from_storage(
- redis_ip_address,
- int(redis_port),
- self._ray_params.redis_username,
- self._ray_params.redis_password,
- enable_redis_ssl,
- serialize_config(self._config),
- b"session_name",
- )
- @staticmethod
- def validate_ip_port(ip_port):
- """Validates the address is in the ip:port format"""
- parts = parse_address(ip_port)
- if parts is None:
- raise ValueError(f"Port is not specified for address {ip_port}")
- try:
- _ = int(parts[1])
- except ValueError:
- raise ValueError(
- f"Unable to parse port number from {parts[1]} (full address = {ip_port})"
- )
- def check_version_info(self):
- """Check if the Python and Ray version of this process matches that in GCS.
- This will be used to detect if workers or drivers are started using
- different versions of Python, or Ray.
- Raises:
- Exception: An exception is raised if there is a version mismatch.
- """
- import ray._common.usage.usage_lib as ray_usage_lib
- cluster_metadata = ray_usage_lib.get_cluster_metadata(self.get_gcs_client())
- if cluster_metadata is None:
- cluster_metadata = ray_usage_lib.get_cluster_metadata(self.get_gcs_client())
- if not cluster_metadata:
- return
- node_ip_address = ray._private.services.get_node_ip_address()
- ray._private.utils.check_version_info(
- cluster_metadata, f"node {node_ip_address}"
- )
- def _register_shutdown_hooks(self):
- # Register the atexit handler. In this case, we shouldn't call sys.exit
- # as we're already in the exit procedure.
- def atexit_handler(*args):
- self.kill_all_processes(check_alive=False, allow_graceful=True)
- atexit.register(atexit_handler)
- # Register the handler to be called if we get a SIGTERM.
- # In this case, we want to exit with an error code (1) after
- # cleaning up child processes.
- def sigterm_handler(signum, frame):
- self.kill_all_processes(check_alive=False, allow_graceful=True)
- sys.exit(1)
- ray._private.utils.set_sigterm_handler(sigterm_handler)
- def _init_temp(self):
- # Create a dictionary to store temp file index.
- self._incremental_dict = collections.defaultdict(lambda: 0)
- if self.head:
- self._ray_params.update_if_absent(
- temp_dir=ray._common.utils.get_ray_temp_dir()
- )
- self._temp_dir = self._ray_params.temp_dir
- else:
- if self._ray_params.temp_dir is None:
- assert not self._default_worker
- temp_dir = ray._private.utils.internal_kv_get_with_retry(
- self.get_gcs_client(),
- "temp_dir",
- ray_constants.KV_NAMESPACE_SESSION,
- num_retries=ray_constants.NUM_REDIS_GET_RETRIES,
- )
- self._temp_dir = ray._common.utils.decode(temp_dir)
- else:
- self._temp_dir = self._ray_params.temp_dir
- try_to_create_directory(self._temp_dir)
- if self.head:
- self._session_dir = os.path.join(self._temp_dir, self._session_name)
- else:
- if self._temp_dir is None or self._session_name is None:
- assert not self._default_worker
- session_dir = ray._private.utils.internal_kv_get_with_retry(
- self.get_gcs_client(),
- "session_dir",
- ray_constants.KV_NAMESPACE_SESSION,
- num_retries=ray_constants.NUM_REDIS_GET_RETRIES,
- )
- self._session_dir = ray._common.utils.decode(session_dir)
- else:
- self._session_dir = os.path.join(self._temp_dir, self._session_name)
- session_symlink = os.path.join(self._temp_dir, ray_constants.SESSION_LATEST)
- # Send a warning message if the session exists.
- try_to_create_directory(self._session_dir)
- try_to_symlink(session_symlink, self._session_dir)
- # Create a directory to be used for socket files.
- self._sockets_dir = os.path.join(self._session_dir, "sockets")
- try_to_create_directory(self._sockets_dir)
- # Create a directory to be used for process log files.
- self._logs_dir = os.path.join(self._session_dir, "logs")
- try_to_create_directory(self._logs_dir)
- old_logs_dir = os.path.join(self._logs_dir, "old")
- try_to_create_directory(old_logs_dir)
- # Create a directory to be used for runtime environment.
- self._runtime_env_dir = os.path.join(
- self._session_dir, self._ray_params.runtime_env_dir_name
- )
- try_to_create_directory(self._runtime_env_dir)
- # Create a symlink to the libtpu tpu_logs directory if it exists.
- user_temp_dir = ray._common.utils.get_user_temp_dir()
- tpu_log_dir = f"{user_temp_dir}/tpu_logs"
- if os.path.isdir(tpu_log_dir):
- tpu_logs_symlink = os.path.join(self._logs_dir, "tpu_logs")
- try_to_symlink(tpu_logs_symlink, tpu_log_dir)
- def get_resource_and_label_spec(self):
- """Resolve and return the current ResourceAndLabelSpec for the node."""
- if not self._resource_and_label_spec:
- self._resource_and_label_spec = ResourceAndLabelSpec(
- self._ray_params.num_cpus,
- self._ray_params.num_gpus,
- self._ray_params.memory,
- self._ray_params.available_memory_bytes,
- self._ray_params.object_store_memory,
- self._ray_params.resources,
- self._ray_params.labels,
- ).resolve(is_head=self.head, node_ip_address=self.node_ip_address)
- return self._resource_and_label_spec
- @property
- def node_id(self):
- """Get the node ID."""
- return self._node_id
- @property
- def session_name(self):
- """Get the current Ray session name."""
- return self._session_name
- @property
- def node_ip_address(self):
- """Get the IP address of this node."""
- return self._node_ip_address
- @property
- def address(self):
- """Get the address for bootstrapping, e.g. the address to pass to
- `ray start` or `ray.init()` to start worker nodes, that has been
- converted to ip:port format.
- """
- return self._gcs_address
- @property
- def gcs_address(self):
- """Get the gcs address."""
- assert self._gcs_address is not None, "Gcs address is not set"
- return self._gcs_address
- @property
- def redis_address(self):
- """Get the cluster Redis address."""
- return self._redis_address
- @property
- def redis_username(self):
- """Get the cluster Redis username."""
- return self._ray_params.redis_username
- @property
- def redis_password(self):
- """Get the cluster Redis password."""
- return self._ray_params.redis_password
- @property
- def plasma_store_socket_name(self):
- """Get the node's plasma store socket name."""
- return self._plasma_store_socket_name
- @property
- def unique_id(self):
- """Get a unique identifier for this node."""
- return f"{self.node_ip_address}:{self._plasma_store_socket_name}"
- @property
- def webui_url(self):
- """Get the cluster's web UI url."""
- return self._webui_url
- @property
- def raylet_socket_name(self):
- """Get the node's raylet socket name."""
- return self._raylet_socket_name
- @property
- def node_manager_port(self):
- """Get the node manager's port."""
- return self._ray_params.node_manager_port
- @property
- def metrics_export_port(self):
- """Get the port that exposes metrics"""
- return self._ray_params.metrics_export_port
- @property
- def metrics_agent_port(self):
- """Get the metrics agent gRPC port"""
- return self._ray_params.metrics_agent_port
- @property
- def runtime_env_agent_port(self):
- """Get the port that exposes runtime env agent as http"""
- return self._ray_params.runtime_env_agent_port
- @property
- def runtime_env_agent_address(self):
- """Get the address that exposes runtime env agent as http"""
- return f"http://{build_address(self._node_ip_address, self._ray_params.runtime_env_agent_port)}"
- @property
- def dashboard_agent_listen_port(self):
- """Get the dashboard agent's listen port"""
- return self._ray_params.dashboard_agent_listen_port
- @property
- def logging_config(self):
- """Get the logging config of the current node."""
- return {
- "log_rotation_max_bytes": self.max_bytes,
- "log_rotation_backup_count": self.backup_count,
- }
- @property
- def address_info(self):
- """Get a dictionary of addresses."""
- return {
- "node_ip_address": self._node_ip_address,
- "redis_address": self.redis_address,
- "object_store_address": self._plasma_store_socket_name,
- "raylet_socket_name": self._raylet_socket_name,
- "webui_url": self._webui_url,
- "session_dir": self._session_dir,
- "metrics_export_port": self._ray_params.metrics_export_port,
- "gcs_address": self.gcs_address,
- "address": self.address,
- "dashboard_agent_listen_port": self._ray_params.dashboard_agent_listen_port,
- }
- @property
- def node_labels(self):
- """Get the node labels."""
- return self._node_labels
- def is_head(self):
- return self.head
- def get_gcs_client(self):
- if self._gcs_client is None:
- self._init_gcs_client()
- return self._gcs_client
- def _init_gcs_client(self):
- if self.head:
- gcs_process = self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER][
- 0
- ].process
- else:
- gcs_process = None
- # TODO(ryw) instead of create a new GcsClient, wrap the one from
- # CoreWorkerProcess to save a grpc channel.
- for _ in range(ray_constants.NUM_REDIS_GET_RETRIES):
- gcs_address = None
- last_ex = None
- try:
- gcs_address = self.gcs_address
- client = GcsClient(
- address=gcs_address,
- cluster_id=self._ray_params.cluster_id, # Hex string
- )
- self.cluster_id = client.cluster_id
- if self.head:
- # Send a simple request to make sure GCS is alive
- # if it's a head node.
- client.internal_kv_get(b"dummy", None)
- self._gcs_client = client
- break
- except Exception:
- if gcs_process is not None and gcs_process.poll() is not None:
- # GCS has exited.
- break
- last_ex = traceback.format_exc()
- logger.debug(f"Connecting to GCS: {last_ex}")
- time.sleep(1)
- if self._gcs_client is None:
- if hasattr(self, "_logs_dir"):
- with open(os.path.join(self._logs_dir, "gcs_server.err")) as err:
- # Use " C " or " E " to exclude the stacktrace.
- # This should work for most cases, especitally
- # it's when GCS is starting. Only display last 10 lines of logs.
- errors = [e for e in err.readlines() if " C " in e or " E " in e][
- -10:
- ]
- error_msg = "\n" + "".join(errors) + "\n"
- raise RuntimeError(
- f"Failed to {'start' if self.head else 'connect to'} GCS. "
- f" Last {len(errors)} lines of error files:"
- f"{error_msg}."
- f"Please check {os.path.join(self._logs_dir, 'gcs_server.out')}"
- f" for details. Last connection error: {last_ex}"
- )
- else:
- raise RuntimeError(
- f"Failed to {'start' if self.head else 'connect to'} GCS. Last "
- f"connection error: {last_ex}"
- )
- ray.experimental.internal_kv._initialize_internal_kv(self._gcs_client)
- def get_temp_dir_path(self):
- """Get the path of the temporary directory."""
- return self._temp_dir
- def get_runtime_env_dir_path(self):
- """Get the path of the runtime env."""
- return self._runtime_env_dir
- def get_session_dir_path(self):
- """Get the path of the session directory."""
- return self._session_dir
- def get_logs_dir_path(self):
- """Get the path of the log files directory."""
- return self._logs_dir
- def get_sockets_dir_path(self):
- """Get the path of the sockets directory."""
- return self._sockets_dir
- def _make_inc_temp(
- self, suffix: str = "", prefix: str = "", directory_name: Optional[str] = None
- ):
- """Return an incremental temporary file name. The file is not created.
- Args:
- suffix: The suffix of the temp file.
- prefix: The prefix of the temp file.
- directory_name (str) : The base directory of the temp file.
- Returns:
- A string of file name. If there existing a file having
- the same name, the returned name will look like
- "{directory_name}/{prefix}.{unique_index}{suffix}"
- """
- if directory_name is None:
- directory_name = ray._common.utils.get_ray_temp_dir()
- directory_name = os.path.expanduser(directory_name)
- index = self._incremental_dict[suffix, prefix, directory_name]
- # `tempfile.TMP_MAX` could be extremely large,
- # so using `range` in Python2.x should be avoided.
- while index < tempfile.TMP_MAX:
- if index == 0:
- filename = os.path.join(directory_name, prefix + suffix)
- else:
- filename = os.path.join(
- directory_name, prefix + "." + str(index) + suffix
- )
- index += 1
- if not os.path.exists(filename):
- # Save the index.
- self._incremental_dict[suffix, prefix, directory_name] = index
- return filename
- raise FileExistsError(errno.EEXIST, "No usable temporary filename found")
- def should_redirect_logs(self):
- # Preferred: thread the setting explicitly via RayParams.log_to_stderr.
- # This avoids relying on process-global environment variables.
- if getattr(self._ray_params, "log_to_stderr", None) is not None:
- return not self._ray_params.log_to_stderr
- # Deprecated (kept for backward compatibility): RayParams.redirect_output.
- redirect_output = self._ray_params.redirect_output
- if redirect_output is not None:
- return redirect_output
- # Fall back to stderr redirect environment variable.
- return (
- os.environ.get(ray_constants.LOGGING_REDIRECT_STDERR_ENVIRONMENT_VARIABLE)
- != "1"
- )
- # TODO(hjiang): Re-implement the logic in C++, and expose via cython.
- def get_log_file_names(
- self,
- name: str,
- unique: bool = False,
- create_out: bool = True,
- create_err: bool = True,
- ) -> Tuple[Optional[str], Optional[str]]:
- """Get filename to dump logs for stdout and stderr, with no files opened.
- If output redirection has been disabled, no files will
- be opened and `(None, None)` will be returned.
- Args:
- name: descriptive string for this log file.
- unique: if true, a counter will be attached to `name` to
- ensure the returned filename is not already used.
- create_out: if True, create a .out file.
- create_err: if True, create a .err file.
- Returns:
- A tuple of two file handles for redirecting optional (stdout, stderr),
- or `(None, None)` if output redirection is disabled.
- """
- if not self.should_redirect_logs():
- return None, None
- log_stdout = None
- log_stderr = None
- if create_out:
- log_stdout = self._get_log_file_name(name, "out", unique=unique)
- if create_err:
- log_stderr = self._get_log_file_name(name, "err", unique=unique)
- return log_stdout, log_stderr
- def get_log_file_handles(
- self,
- name: str,
- unique: bool = False,
- create_out: bool = True,
- create_err: bool = True,
- ) -> Tuple[Optional[IO[AnyStr]], Optional[IO[AnyStr]]]:
- """Open log files with partially randomized filenames, returning the
- file handles. If output redirection has been disabled, no files will
- be opened and `(None, None)` will be returned.
- Args:
- name: descriptive string for this log file.
- unique: if true, a counter will be attached to `name` to
- ensure the returned filename is not already used.
- create_out: if True, create a .out file.
- create_err: if True, create a .err file.
- Returns:
- A tuple of two file handles for redirecting optional (stdout, stderr),
- or `(None, None)` if output redirection is disabled.
- """
- log_stdout_fname, log_stderr_fname = self.get_log_file_names(
- name, unique=unique, create_out=create_out, create_err=create_err
- )
- log_stdout = None if log_stdout_fname is None else open_log(log_stdout_fname)
- log_stderr = None if log_stderr_fname is None else open_log(log_stderr_fname)
- return log_stdout, log_stderr
- def _get_log_file_name(
- self,
- name: str,
- suffix: str,
- unique: bool = False,
- ) -> str:
- """Generate partially randomized filenames for log files.
- Args:
- name: descriptive string for this log file.
- suffix: suffix of the file. Usually it is .out of .err.
- unique: if true, a counter will be attached to `name` to
- ensure the returned filename is not already used.
- Returns:
- A tuple of two file names for redirecting (stdout, stderr).
- """
- # strip if the suffix is something like .out.
- suffix = suffix.strip(".")
- if unique:
- filename = self._make_inc_temp(
- suffix=f".{suffix}", prefix=name, directory_name=self._logs_dir
- )
- else:
- filename = os.path.join(self._logs_dir, f"{name}.{suffix}")
- return filename
- def _get_unused_port(self, allocated_ports=None):
- if allocated_ports is None:
- allocated_ports = set()
- s = socket.socket(
- socket.AF_INET6 if is_ipv6(self._node_ip_address) else socket.AF_INET,
- socket.SOCK_STREAM,
- )
- s.bind(("", 0))
- port = s.getsockname()[1]
- # Try to generate a port that is far above the 'next available' one.
- # This solves issue #8254 where GRPC fails because the port assigned
- # from this method has been used by a different process.
- for _ in range(ray_constants.NUM_PORT_RETRIES):
- new_port = random.randint(port, 65535)
- if new_port in allocated_ports:
- # This port is allocated for other usage already,
- # so we shouldn't use it even if it's not in use right now.
- continue
- new_s = socket.socket(
- socket.AF_INET6 if is_ipv6(self._node_ip_address) else socket.AF_INET,
- socket.SOCK_STREAM,
- )
- try:
- new_s.bind(("", new_port))
- except OSError:
- new_s.close()
- continue
- s.close()
- new_s.close()
- return new_port
- logger.error("Unable to succeed in selecting a random port.")
- s.close()
- return port
- def _prepare_socket_file(self, socket_path: str, default_prefix: str):
- """Prepare the socket file for raylet and plasma.
- This method helps to prepare a socket file.
- 1. Make the directory if the directory does not exist.
- 2. If the socket file exists, do nothing (this just means we aren't the
- first worker on the node).
- Args:
- socket_path: the socket file to prepare.
- """
- result = socket_path
- if sys.platform == "win32":
- if socket_path is None:
- result = (
- f"tcp://{build_address(self._localhost, self._get_unused_port())}"
- )
- else:
- if socket_path is None:
- result = self._make_inc_temp(
- prefix=default_prefix, directory_name=self._sockets_dir
- )
- else:
- try_to_create_directory(os.path.dirname(socket_path))
- validate_socket_filepath(result.split("://", 1)[-1])
- return result
- def start_reaper_process(self):
- """
- Start the reaper process.
- This must be the first process spawned and should only be called when
- ray processes should be cleaned up if this process dies.
- """
- assert (
- not self.kernel_fate_share
- ), "a reaper should not be used with kernel fate-sharing"
- process_info = ray._private.services.start_reaper(fate_share=False)
- assert ray_constants.PROCESS_TYPE_REAPER not in self.all_processes
- if process_info is not None:
- self.all_processes[ray_constants.PROCESS_TYPE_REAPER] = [
- process_info,
- ]
- def start_log_monitor(self):
- """Start the log monitor."""
- stdout_log_fname, stderr_log_fname = self.get_log_file_names(
- "log_monitor", unique=True, create_out=True, create_err=True
- )
- process_info = ray._private.services.start_log_monitor(
- self.get_session_dir_path(),
- self._logs_dir,
- self.gcs_address,
- self._node_ip_address,
- fate_share=self.kernel_fate_share,
- max_bytes=self.max_bytes,
- backup_count=self.backup_count,
- stdout_filepath=stdout_log_fname,
- stderr_filepath=stderr_log_fname,
- )
- assert ray_constants.PROCESS_TYPE_LOG_MONITOR not in self.all_processes
- self.all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] = [
- process_info,
- ]
- def start_api_server(
- self, *, include_dashboard: Optional[bool], raise_on_failure: bool
- ):
- """Start the dashboard.
- Args:
- include_dashboard: If true, this will load all dashboard-related modules
- when starting the API server. Otherwise, it will only
- start the modules that are not relevant to the dashboard.
- raise_on_failure: If true, this will raise an exception
- if we fail to start the API server. Otherwise it will print
- a warning if we fail to start the API server.
- """
- stdout_log_fname, stderr_log_fname = self.get_log_file_names(
- "dashboard", unique=True, create_out=True, create_err=True
- )
- self._webui_url, process_info = ray._private.services.start_api_server(
- include_dashboard,
- raise_on_failure,
- self._ray_params.dashboard_host,
- self.gcs_address,
- self.cluster_id.hex(),
- self._node_ip_address,
- self._temp_dir,
- self._logs_dir,
- self._session_dir,
- port=self._ray_params.dashboard_port,
- fate_share=self.kernel_fate_share,
- max_bytes=self.max_bytes,
- backup_count=self.backup_count,
- stdout_filepath=stdout_log_fname,
- stderr_filepath=stderr_log_fname,
- )
- assert ray_constants.PROCESS_TYPE_DASHBOARD not in self.all_processes
- if process_info is not None:
- self.all_processes[ray_constants.PROCESS_TYPE_DASHBOARD] = [
- process_info,
- ]
- self.get_gcs_client().internal_kv_put(
- b"webui:url",
- self._webui_url.encode(),
- True,
- ray_constants.KV_NAMESPACE_DASHBOARD,
- )
- def start_gcs_server(self):
- """Start the gcs server."""
- assert self._ray_params.gcs_server_port >= 0
- assert self._gcs_address is None, "GCS server is already running."
- assert self._gcs_client is None, "GCS client is already connected."
- stdout_log_fname, stderr_log_fname = self.get_log_file_names(
- "gcs_server", unique=True, create_out=True, create_err=True
- )
- process_info = ray._private.services.start_gcs_server(
- self.redis_address,
- log_dir=self._logs_dir,
- stdout_filepath=stdout_log_fname,
- stderr_filepath=stderr_log_fname,
- session_name=self.session_name,
- redis_username=self._ray_params.redis_username,
- redis_password=self._ray_params.redis_password,
- config=self._config,
- fate_share=self.kernel_fate_share,
- gcs_server_port=self._ray_params.gcs_server_port,
- metrics_agent_port=self._ray_params.metrics_agent_port,
- node_ip_address=self._node_ip_address,
- session_dir=self._session_dir,
- node_id=self._node_id,
- )
- assert ray_constants.PROCESS_TYPE_GCS_SERVER not in self.all_processes
- self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [
- process_info,
- ]
- if self._ray_params.gcs_server_port == 0:
- self._ray_params.gcs_server_port = wait_for_persisted_port(
- self._session_dir,
- self._node_id,
- GCS_SERVER_PORT_NAME,
- )
- # Connecting via non-localhost address may be blocked by firewall rule,
- # e.g. https://github.com/ray-project/ray/issues/15780
- # TODO(mwtian): figure out a way to use 127.0.0.1 for local connection
- # when possible.
- self._gcs_address = build_address(
- self._node_ip_address, self._ray_params.gcs_server_port
- )
- def start_raylet(
- self,
- plasma_directory: str,
- fallback_directory: str,
- object_store_memory: int,
- use_valgrind: bool = False,
- use_profiler: bool = False,
- ):
- """Start the raylet.
- Args:
- use_valgrind: True if we should start the process in
- valgrind.
- use_profiler: True if we should start the process in the
- valgrind profiler.
- """
- raylet_stdout_filepath, raylet_stderr_filepath = self.get_log_file_names(
- ray_constants.PROCESS_TYPE_RAYLET,
- unique=True,
- create_out=True,
- create_err=True,
- )
- (
- dashboard_agent_stdout_filepath,
- dashboard_agent_stderr_filepath,
- ) = self.get_log_file_names(
- ray_constants.PROCESS_TYPE_DASHBOARD_AGENT,
- unique=True,
- create_out=True,
- create_err=True,
- )
- (
- runtime_env_agent_stdout_filepath,
- runtime_env_agent_stderr_filepath,
- ) = self.get_log_file_names(
- ray_constants.PROCESS_TYPE_RUNTIME_ENV_AGENT,
- unique=True,
- create_out=True,
- create_err=True,
- )
- self.resource_isolation_config.add_system_pids(
- self._get_system_processes_for_resource_isolation()
- )
- process_info = ray._private.services.start_raylet(
- self.redis_address,
- self.gcs_address,
- self._node_id,
- self._node_ip_address,
- self._ray_params.node_manager_port,
- self._raylet_socket_name,
- self._plasma_store_socket_name,
- self.cluster_id.hex(),
- self._ray_params.worker_path,
- self._ray_params.setup_worker_path,
- self._temp_dir,
- self._session_dir,
- self._runtime_env_dir,
- self._logs_dir,
- self.get_resource_and_label_spec(),
- plasma_directory,
- fallback_directory,
- object_store_memory,
- self.session_name,
- is_head_node=self.is_head(),
- min_worker_port=self._ray_params.min_worker_port,
- max_worker_port=self._ray_params.max_worker_port,
- worker_port_list=self._ray_params.worker_port_list,
- object_manager_port=self._ray_params.object_manager_port,
- redis_username=self._ray_params.redis_username,
- redis_password=self._ray_params.redis_password,
- metrics_agent_port=self._ray_params.metrics_agent_port,
- runtime_env_agent_port=self._ray_params.runtime_env_agent_port,
- metrics_export_port=self._ray_params.metrics_export_port,
- dashboard_agent_listen_port=self._ray_params.dashboard_agent_listen_port,
- use_valgrind=use_valgrind,
- use_profiler=use_profiler,
- raylet_stdout_filepath=raylet_stdout_filepath,
- raylet_stderr_filepath=raylet_stderr_filepath,
- dashboard_agent_stdout_filepath=dashboard_agent_stdout_filepath,
- dashboard_agent_stderr_filepath=dashboard_agent_stderr_filepath,
- runtime_env_agent_stdout_filepath=runtime_env_agent_stdout_filepath,
- runtime_env_agent_stderr_filepath=runtime_env_agent_stderr_filepath,
- huge_pages=self._ray_params.huge_pages,
- fate_share=self.kernel_fate_share,
- socket_to_use=None,
- max_bytes=self.max_bytes,
- backup_count=self.backup_count,
- ray_debugger_external=self._ray_params.ray_debugger_external,
- env_updates=self._ray_params.env_vars,
- node_name=self._ray_params.node_name,
- webui=self._webui_url,
- resource_isolation_config=self.resource_isolation_config,
- )
- assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
- self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
- def start_monitor(self):
- """Start the monitor.
- Autoscaling output goes to these monitor.err/out files, and
- any modification to these files may break existing
- cluster launching commands.
- """
- from ray.autoscaler.v2.utils import is_autoscaler_v2
- stdout_log_fname, stderr_log_fname = self.get_log_file_names(
- "monitor", unique=True, create_out=True, create_err=True
- )
- process_info = ray._private.services.start_monitor(
- self.gcs_address,
- self._logs_dir,
- stdout_filepath=stdout_log_fname,
- stderr_filepath=stderr_log_fname,
- autoscaling_config=self._ray_params.autoscaling_config,
- fate_share=self.kernel_fate_share,
- max_bytes=self.max_bytes,
- backup_count=self.backup_count,
- monitor_ip=self._node_ip_address,
- autoscaler_v2=is_autoscaler_v2(fetch_from_server=True),
- )
- assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes
- self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info]
- def start_ray_client_server(self):
- """Start the ray client server process."""
- stdout_file, stderr_file = self.get_log_file_handles(
- "ray_client_server", unique=True
- )
- process_info = ray._private.services.start_ray_client_server(
- self.address,
- self._node_ip_address,
- self._ray_params.ray_client_server_port,
- stdout_file=stdout_file,
- stderr_file=stderr_file,
- redis_username=self._ray_params.redis_username,
- redis_password=self._ray_params.redis_password,
- fate_share=self.kernel_fate_share,
- runtime_env_agent_address=self.runtime_env_agent_address,
- node_id=self._node_id,
- )
- assert ray_constants.PROCESS_TYPE_RAY_CLIENT_SERVER not in self.all_processes
- self.all_processes[ray_constants.PROCESS_TYPE_RAY_CLIENT_SERVER] = [
- process_info
- ]
- def _write_cluster_info_to_kv(self):
- """Write the cluster metadata to GCS.
- Cluster metadata is always recorded, but they are
- not reported unless usage report is enabled.
- Check `usage_stats_head.py` for more details.
- """
- # Make sure the cluster metadata wasn't reported before.
- import ray._common.usage.usage_lib as ray_usage_lib
- ray_usage_lib.put_cluster_metadata(
- self.get_gcs_client(), ray_init_cluster=self.ray_init_cluster
- )
- # Make sure GCS is up.
- added = self.get_gcs_client().internal_kv_put(
- b"session_name",
- self._session_name.encode(),
- False,
- ray_constants.KV_NAMESPACE_SESSION,
- )
- if not added:
- curr_val = self.get_gcs_client().internal_kv_get(
- b"session_name", ray_constants.KV_NAMESPACE_SESSION
- )
- assert curr_val == self._session_name.encode("utf-8"), (
- f"Session name {self._session_name} does not match "
- f"persisted value {curr_val}. Perhaps there was an "
- f"error connecting to Redis."
- )
- self.get_gcs_client().internal_kv_put(
- b"session_dir",
- self._session_dir.encode(),
- True,
- ray_constants.KV_NAMESPACE_SESSION,
- )
- self.get_gcs_client().internal_kv_put(
- b"temp_dir",
- self._temp_dir.encode(),
- True,
- ray_constants.KV_NAMESPACE_SESSION,
- )
- # Add tracing_startup_hook to redis / internal kv manually
- # since internal kv is not yet initialized.
- if self._ray_params.tracing_startup_hook:
- self.get_gcs_client().internal_kv_put(
- b"tracing_startup_hook",
- self._ray_params.tracing_startup_hook.encode(),
- True,
- ray_constants.KV_NAMESPACE_TRACING,
- )
- def start_head_processes(self):
- """Start head processes on the node."""
- logger.debug(
- f"Process STDOUT and STDERR is being " f"redirected to {self._logs_dir}."
- )
- assert self._gcs_address is None
- assert self._gcs_client is None
- self.start_gcs_server()
- assert self.get_gcs_client() is not None
- self._write_cluster_info_to_kv()
- if not self._ray_params.no_monitor:
- self.start_monitor()
- if self._ray_params.ray_client_server_port:
- self.start_ray_client_server()
- if self._ray_params.include_dashboard is None:
- # Default
- raise_on_api_server_failure = False
- else:
- raise_on_api_server_failure = self._ray_params.include_dashboard
- self.start_api_server(
- include_dashboard=self._ray_params.include_dashboard,
- raise_on_failure=raise_on_api_server_failure,
- )
- def start_ray_processes(self):
- """Start all of the processes on the node."""
- logger.debug(
- f"Process STDOUT and STDERR is being " f"redirected to {self._logs_dir}."
- )
- if not self.head:
- # Get the system config from GCS first if this is a non-head node.
- gcs_options = ray._raylet.GcsClientOptions.create(
- self.gcs_address,
- self.cluster_id.hex(),
- allow_cluster_id_nil=False,
- fetch_cluster_id_if_nil=False,
- )
- global_state = ray._private.state.GlobalState()
- global_state._initialize_global_state(gcs_options)
- new_config = global_state.get_system_config()
- assert self._config.items() <= new_config.items(), (
- "The system config from GCS is not a superset of the local"
- " system config. There might be a configuration inconsistency"
- " issue between the head node and non-head nodes."
- f" Local system config: {self._config},"
- f" GCS system config: {new_config}"
- )
- self._config = new_config
- # Make sure we don't call `determine_plasma_store_config` multiple
- # times to avoid printing multiple warnings.
- resource_and_label_spec = self.get_resource_and_label_spec()
- if resource_and_label_spec.labels.get(
- ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY
- ):
- from ray._common.usage import usage_lib
- usage_lib.record_hardware_usage(
- resource_and_label_spec.labels.get(
- ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY
- )
- )
- (
- plasma_directory,
- fallback_directory,
- object_store_memory,
- ) = ray._private.services.determine_plasma_store_config(
- resource_and_label_spec.object_store_memory,
- self._temp_dir,
- plasma_directory=self._ray_params.plasma_directory,
- fallback_directory=self._fallback_directory,
- huge_pages=self._ray_params.huge_pages,
- )
- if self._ray_params.include_log_monitor:
- self.start_log_monitor()
- self.start_raylet(plasma_directory, fallback_directory, object_store_memory)
- def _get_system_processes_for_resource_isolation(self) -> str:
- """Returns a list of system processes that will be isolated by raylet.
- NOTE: If a new system process is started before the raylet starts up, it needs to be
- added to self.all_processes so it can be moved into the raylet's managed cgroup
- hierarchy.
- """
- system_process_pids = [
- str(p[0].process.pid) for p in self.all_processes.values()
- ]
- # If the dashboard api server was started on the head node, then include all of the api server's
- # child processes.
- if ray_constants.PROCESS_TYPE_DASHBOARD in self.all_processes:
- dashboard_pid = self.all_processes[ray_constants.PROCESS_TYPE_DASHBOARD][
- 0
- ].process.pid
- dashboard_process = psutil.Process(dashboard_pid)
- system_process_pids += [str(p.pid) for p in dashboard_process.children()]
- return ",".join(system_process_pids)
- def _kill_process_type(
- self,
- process_type,
- allow_graceful: bool = False,
- check_alive: bool = True,
- wait: bool = False,
- ):
- """Kill a process of a given type.
- If the process type is PROCESS_TYPE_REDIS_SERVER, then we will kill all
- of the Redis servers.
- If the process was started in valgrind, then we will raise an exception
- if the process has a non-zero exit code.
- Args:
- process_type: The type of the process to kill.
- allow_graceful: Send a SIGTERM first and give the process
- time to exit gracefully. If that doesn't work, then use
- SIGKILL. We usually want to do this outside of tests.
- check_alive: If true, then we expect the process to be alive
- and will raise an exception if the process is already dead.
- wait: If true, then this method will not return until the
- process in question has exited.
- Raises:
- This process raises an exception in the following cases:
- 1. The process had already died and check_alive is true.
- 2. The process had been started in valgrind and had a non-zero
- exit code.
- """
- # Ensure thread safety
- with self.removal_lock:
- self._kill_process_impl(
- process_type,
- allow_graceful=allow_graceful,
- check_alive=check_alive,
- wait=wait,
- )
- def _kill_process_impl(
- self, process_type, allow_graceful=False, check_alive=True, wait=False
- ):
- """See `_kill_process_type`."""
- if process_type not in self.all_processes:
- return
- process_infos = self.all_processes[process_type]
- if process_type != ray_constants.PROCESS_TYPE_REDIS_SERVER:
- assert len(process_infos) == 1
- for process_info in process_infos:
- process = process_info.process
- # Handle the case where the process has already exited.
- if process.poll() is not None:
- if check_alive:
- raise RuntimeError(
- "Attempting to kill a process of type "
- f"'{process_type}', but this process is already dead."
- )
- else:
- continue
- if process_info.use_valgrind:
- process.terminate()
- process.wait()
- if process.returncode != 0:
- message = (
- "Valgrind detected some errors in process of "
- f"type {process_type}. Error code {process.returncode}."
- )
- if process_info.stdout_file is not None:
- with open(process_info.stdout_file, "r") as f:
- message += "\nPROCESS STDOUT:\n" + f.read()
- if process_info.stderr_file is not None:
- with open(process_info.stderr_file, "r") as f:
- message += "\nPROCESS STDERR:\n" + f.read()
- raise RuntimeError(message)
- continue
- if process_info.use_valgrind_profiler:
- # Give process signal to write profiler data.
- os.kill(process.pid, signal.SIGINT)
- # Wait for profiling data to be written.
- time.sleep(0.1)
- if allow_graceful:
- process.terminate()
- # Allow the process one second to exit gracefully.
- timeout_seconds = 1
- try:
- process.wait(timeout_seconds)
- except subprocess.TimeoutExpired:
- pass
- # If the process did not exit, force kill it.
- if process.poll() is None:
- process.kill()
- # The reason we usually don't call process.wait() here is that
- # there's some chance we'd end up waiting a really long time.
- if wait:
- process.wait()
- del self.all_processes[process_type]
- def kill_redis(self, check_alive: bool = True):
- """Kill the Redis servers.
- Args:
- check_alive: Raise an exception if any of the processes
- were already dead.
- """
- self._kill_process_type(
- ray_constants.PROCESS_TYPE_REDIS_SERVER, check_alive=check_alive
- )
- def kill_raylet(self, check_alive: bool = True):
- """Kill the raylet.
- Args:
- check_alive: Raise an exception if the process was already
- dead.
- """
- self._kill_process_type(
- ray_constants.PROCESS_TYPE_RAYLET, check_alive=check_alive
- )
- def kill_log_monitor(self, check_alive: bool = True):
- """Kill the log monitor.
- Args:
- check_alive: Raise an exception if the process was already
- dead.
- """
- self._kill_process_type(
- ray_constants.PROCESS_TYPE_LOG_MONITOR, check_alive=check_alive
- )
- def kill_dashboard(self, check_alive: bool = True):
- """Kill the dashboard.
- Args:
- check_alive: Raise an exception if the process was already
- dead.
- """
- self._kill_process_type(
- ray_constants.PROCESS_TYPE_DASHBOARD, check_alive=check_alive
- )
- def kill_monitor(self, check_alive: bool = True):
- """Kill the monitor.
- Args:
- check_alive: Raise an exception if the process was already
- dead.
- """
- self._kill_process_type(
- ray_constants.PROCESS_TYPE_MONITOR, check_alive=check_alive
- )
- def kill_gcs_server(self, check_alive: bool = True):
- """Kill the gcs server.
- Args:
- check_alive: Raise an exception if the process was already
- dead.
- """
- self._kill_process_type(
- ray_constants.PROCESS_TYPE_GCS_SERVER, check_alive=check_alive, wait=True
- )
- # Clear GCS client and address to indicate no GCS server is running.
- self._gcs_address = None
- self._gcs_client = None
- def kill_reaper(self, check_alive: bool = True):
- """Kill the reaper process.
- Args:
- check_alive: Raise an exception if the process was already
- dead.
- """
- self._kill_process_type(
- ray_constants.PROCESS_TYPE_REAPER, check_alive=check_alive
- )
- def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False):
- """Kill all of the processes.
- Note that This is slower than necessary because it calls kill, wait,
- kill, wait, ... instead of kill, kill, ..., wait, wait, ...
- Args:
- check_alive: Raise an exception if any of the processes were
- already dead.
- wait: If true, then this method will not return until the
- process in question has exited.
- """
- # Kill the raylet first. This is important for suppressing errors at
- # shutdown because we give the raylet a chance to exit gracefully and
- # clean up its child worker processes. If we were to kill the plasma
- # store (or Redis) first, that could cause the raylet to exit
- # ungracefully, leading to more verbose output from the workers.
- if ray_constants.PROCESS_TYPE_RAYLET in self.all_processes:
- self._kill_process_type(
- ray_constants.PROCESS_TYPE_RAYLET,
- check_alive=check_alive,
- allow_graceful=allow_graceful,
- wait=wait,
- )
- if ray_constants.PROCESS_TYPE_GCS_SERVER in self.all_processes:
- self._kill_process_type(
- ray_constants.PROCESS_TYPE_GCS_SERVER,
- check_alive=check_alive,
- allow_graceful=allow_graceful,
- wait=wait,
- )
- # We call "list" to copy the keys because we are modifying the
- # dictionary while iterating over it.
- for process_type in list(self.all_processes.keys()):
- # Need to kill the reaper process last in case we die unexpectedly
- # while cleaning up.
- if process_type != ray_constants.PROCESS_TYPE_REAPER:
- self._kill_process_type(
- process_type,
- check_alive=check_alive,
- allow_graceful=allow_graceful,
- wait=wait,
- )
- if ray_constants.PROCESS_TYPE_REAPER in self.all_processes:
- self._kill_process_type(
- ray_constants.PROCESS_TYPE_REAPER,
- check_alive=check_alive,
- allow_graceful=allow_graceful,
- wait=wait,
- )
- def live_processes(self):
- """Return a list of the live processes.
- Returns:
- A list of the live processes.
- """
- result = []
- for process_type, process_infos in self.all_processes.items():
- for process_info in process_infos:
- if process_info.process.poll() is None:
- result.append((process_type, process_info.process))
- return result
- def dead_processes(self):
- """Return a list of the dead processes.
- Note that this ignores processes that have been explicitly killed,
- e.g., via a command like node.kill_raylet().
- Returns:
- A list of the dead processes ignoring the ones that have been
- explicitly killed.
- """
- result = []
- for process_type, process_infos in self.all_processes.items():
- for process_info in process_infos:
- if process_info.process.poll() is not None:
- result.append((process_type, process_info.process))
- return result
- def any_processes_alive(self):
- """Return true if any processes are still alive.
- Returns:
- True if any process is still alive.
- """
- return any(self.live_processes())
- def remaining_processes_alive(self):
- """Return true if all remaining processes are still alive.
- Note that this ignores processes that have been explicitly killed,
- e.g., via a command like node.kill_raylet().
- Returns:
- True if any process that wasn't explicitly killed is still alive.
- """
- return not any(self.dead_processes())
- def destroy_external_storage(self):
- object_spilling_config = self._config.get("object_spilling_config", {})
- if object_spilling_config:
- object_spilling_config = json.loads(object_spilling_config)
- from ray._private import external_storage
- storage = external_storage.setup_external_storage(
- object_spilling_config, self._node_id, self._session_name
- )
- storage.destroy_external_storage()
- def validate_external_storage(self):
- """Make sure we can setup the object spilling external storage."""
- automatic_spilling_enabled = self._config.get(
- "automatic_object_spilling_enabled", True
- )
- if not automatic_spilling_enabled:
- return
- object_spilling_config = self._object_spilling_config
- # Try setting up the storage.
- # Configure the proper system config.
- # We need to set both ray param's system config and self._config
- # because they could've been diverged at this point.
- deserialized_config = json.loads(object_spilling_config)
- self._ray_params._system_config[
- "object_spilling_config"
- ] = object_spilling_config
- self._config["object_spilling_config"] = object_spilling_config
- is_external_storage_type_fs = deserialized_config["type"] == "filesystem"
- self._ray_params._system_config[
- "is_external_storage_type_fs"
- ] = is_external_storage_type_fs
- self._config["is_external_storage_type_fs"] = is_external_storage_type_fs
- # Validate external storage usage.
- from ray._private import external_storage
- # Node ID is available only after GCS is connected. However,
- # validate_external_storage() needs to be called before it to
- # be able to validate the configs early. Therefore, we use a
- # dummy node ID here and make sure external storage can be set
- # up based on the provided config. This storage is destroyed
- # right after the validation.
- dummy_node_id = ray.NodeID.from_random().hex()
- storage = external_storage.setup_external_storage(
- deserialized_config, dummy_node_id, self._session_name
- )
- storage.destroy_external_storage()
- external_storage.reset_external_storage()
- def _get_object_spilling_config(self):
- """Consolidate the object spilling config from the ray params, environment
- variable, and system config. The object spilling directory specified through
- ray params will override the one specified through environment variable and
- system config."""
- object_spilling_directory = self._ray_params.object_spilling_directory
- if not object_spilling_directory:
- object_spilling_directory = self._config.get(
- "object_spilling_directory", ""
- )
- if not object_spilling_directory:
- object_spilling_directory = os.environ.get(
- "RAY_object_spilling_directory", ""
- )
- if object_spilling_directory:
- return json.dumps(
- {
- "type": "filesystem",
- "params": {"directory_path": object_spilling_directory},
- }
- )
- object_spilling_config = self._config.get("object_spilling_config", {})
- if not object_spilling_config:
- object_spilling_config = os.environ.get("RAY_object_spilling_config", "")
- # If the config is not specified in ray params, system config or environment
- # variable, we fill up the default.
- if not object_spilling_config:
- object_spilling_config = json.dumps(
- {"type": "filesystem", "params": {"directory_path": self._session_dir}}
- )
- else:
- if not is_in_test():
- logger.warning(
- "The object spilling config is specified from an unstable "
- "API - system config or environment variable. This is "
- "subject to change in the future. You can use the stable "
- "API - --object-spilling-directory in ray start or "
- "object_spilling_directory in ray.init() to specify the "
- "object spilling directory instead. If you need more "
- "advanced settings, please open a github issue with the "
- "Ray team."
- )
- return object_spilling_config
- def _record_stats(self):
- # This is only called when a new node is started.
- # Initialize the internal kv so that the metrics can be put
- from ray._common.usage.usage_lib import (
- TagKey,
- record_extra_usage_tag,
- record_hardware_usage,
- )
- if not ray.experimental.internal_kv._internal_kv_initialized():
- ray.experimental.internal_kv._initialize_internal_kv(self.get_gcs_client())
- assert ray.experimental.internal_kv._internal_kv_initialized()
- if self.head:
- # record head node stats
- gcs_storage_type = (
- "redis" if os.environ.get("RAY_REDIS_ADDRESS") is not None else "memory"
- )
- record_extra_usage_tag(TagKey.GCS_STORAGE, gcs_storage_type)
- cpu_model_name = ray._private.utils.get_current_node_cpu_model_name()
- if cpu_model_name:
- # CPU model name can be an arbitrary long string
- # so we truncate it to the first 50 characters
- # to avoid any issues.
- record_hardware_usage(cpu_model_name[:50])
|