| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433 |
- import base64
- import collections
- import errno
- import io
- import json
- import logging
- import mmap
- import multiprocessing
- import os
- import shutil
- import signal
- import socket
- import subprocess
- import sys
- import time
- from pathlib import Path
- from typing import IO, AnyStr, List, Optional
- # Ray modules
- import ray
- import ray._private.ray_constants as ray_constants
- from ray._common.network_utils import (
- build_address,
- get_localhost_ip,
- is_ipv6,
- node_ip_address_from_perspective,
- parse_address,
- )
- from ray._private.resource_isolation_config import ResourceIsolationConfig
- from ray._raylet import GcsClient, GcsClientOptions, NodeID
- from ray.core.generated.common_pb2 import Language
- from ray.core.generated.gcs_pb2 import GcsNodeInfo
- from ray.core.generated.gcs_service_pb2 import GetAllNodeInfoRequest
- # Import psutil after ray so the packaged version is used.
- import psutil
- resource = None
- if sys.platform != "win32":
- _timeout = 30
- else:
- _timeout = 60
- EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
- # True if processes are run in the valgrind profiler.
- RUN_RAYLET_PROFILER = False
- # Location of the redis server.
- RAY_HOME = os.path.join(os.path.dirname(os.path.dirname(__file__)), "..", "..")
- RAY_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
- RAY_PRIVATE_DIR = "_private"
- AUTOSCALER_PRIVATE_DIR = os.path.join("autoscaler", "_private")
- AUTOSCALER_V2_DIR = os.path.join("autoscaler", "v2")
- # Location of the raylet executables.
- RAYLET_EXECUTABLE = os.path.join(
- RAY_PATH, "core", "src", "ray", "raylet", "raylet" + EXE_SUFFIX
- )
- GCS_SERVER_EXECUTABLE = os.path.join(
- RAY_PATH, "core", "src", "ray", "gcs", "gcs_server" + EXE_SUFFIX
- )
- JEMALLOC_SO = os.path.join(RAY_PATH, "core", "libjemalloc.so")
- JEMALLOC_SO = JEMALLOC_SO if os.path.exists(JEMALLOC_SO) else None
- # Location of the cpp default worker executables.
- DEFAULT_WORKER_EXECUTABLE = os.path.join(RAY_PATH, "cpp", "default_worker" + EXE_SUFFIX)
- # Location of the native libraries.
- DEFAULT_NATIVE_LIBRARY_PATH = os.path.join(RAY_PATH, "cpp", "lib")
- DASHBOARD_DEPENDENCY_ERROR_MESSAGE = (
- "Not all Ray Dashboard dependencies were "
- "found. To use the dashboard please "
- "install Ray using `pip install "
- "ray[default]`."
- )
- RAY_JEMALLOC_LIB_PATH = "RAY_JEMALLOC_LIB_PATH"
- RAY_JEMALLOC_CONF = "RAY_JEMALLOC_CONF"
- RAY_JEMALLOC_PROFILE = "RAY_JEMALLOC_PROFILE"
- # Comma separated name of components that will run memory profiler.
- # Ray uses `memray` to memory profile internal components.
- # The name of the component must be one of ray_constants.PROCESS_TYPE*.
- RAY_MEMRAY_PROFILE_COMPONENT_ENV = "RAY_INTERNAL_MEM_PROFILE_COMPONENTS"
- # Options to specify for `memray run` command. See
- # `memray run --help` for more details.
- # Example:
- # RAY_INTERNAL_MEM_PROFILE_OPTIONS="--live,--live-port,3456,-q,"
- # -> `memray run --live --live-port 3456 -q`
- RAY_MEMRAY_PROFILE_OPTIONS_ENV = "RAY_INTERNAL_MEM_PROFILE_OPTIONS"
- # Logger for this module. It should be configured at the entry point
- # into the program using Ray. Ray provides a default configuration at
- # entry/init points.
- logger = logging.getLogger(__name__)
- ProcessInfo = collections.namedtuple(
- "ProcessInfo",
- [
- "process",
- "stdout_file",
- "stderr_file",
- "use_valgrind",
- "use_gdb",
- "use_valgrind_profiler",
- "use_perftools_profiler",
- "use_tmux",
- ],
- )
- def _site_flags() -> List[str]:
- """Detect whether flags related to site packages are enabled for the current
- interpreter. To run Ray in hermetic build environments, it helps to pass these flags
- down to Python workers.
- """
- flags = []
- # sys.flags hidden behind helper methods for unit testing.
- if _no_site():
- flags.append("-S")
- if _no_user_site():
- flags.append("-s")
- return flags
- # sys.flags hidden behind helper methods for unit testing.
- def _no_site():
- return sys.flags.no_site
- # sys.flags hidden behind helper methods for unit testing.
- def _no_user_site():
- return sys.flags.no_user_site
- def _build_python_executable_command_memory_profileable(
- component: str, session_dir: str, unbuffered: bool = True
- ):
- """Build the Python executable command.
- It runs a memory profiler if env var is configured.
- Args:
- component: Name of the component. It must be one of
- ray_constants.PROCESS_TYPE*.
- session_dir: The directory name of the Ray session.
- unbuffered: If true, Python executable is started with unbuffered option.
- e.g., `-u`.
- It means the logs are flushed immediately (good when there's a failure),
- but writing to a log file can be slower.
- """
- command = [
- sys.executable,
- ]
- if unbuffered:
- command.append("-u")
- components_to_memory_profile = os.getenv(RAY_MEMRAY_PROFILE_COMPONENT_ENV, "")
- if not components_to_memory_profile:
- return command
- components_to_memory_profile = set(components_to_memory_profile.split(","))
- try:
- import memray # noqa: F401
- except ImportError:
- raise ImportError(
- "Memray is required to memory profiler on components "
- f"{components_to_memory_profile}. Run `pip install memray`."
- )
- if component in components_to_memory_profile:
- session_dir = Path(session_dir)
- session_name = session_dir.name
- profile_dir = session_dir / "profile"
- profile_dir.mkdir(exist_ok=True)
- output_file_path = profile_dir / f"{session_name}_memory_{component}.bin"
- options = os.getenv(RAY_MEMRAY_PROFILE_OPTIONS_ENV, None)
- options = options.split(",") if options else []
- # If neither --live nor any output option (-o/--output) is specified, add the default output path
- if not any(opt in options for opt in ("--live", "-o", "--output")):
- options[0:0] = ["-o", str(output_file_path)]
- command.extend(["-m", "memray", "run", *options])
- return command
- def _get_gcs_client_options(gcs_server_address):
- return GcsClientOptions.create(
- gcs_server_address,
- None,
- allow_cluster_id_nil=True,
- fetch_cluster_id_if_nil=False,
- )
- def serialize_config(config):
- return base64.b64encode(json.dumps(config).encode("utf-8")).decode("utf-8")
- def propagate_jemalloc_env_var(
- *,
- jemalloc_path: str,
- jemalloc_conf: str,
- jemalloc_comps: List[str],
- process_type: str,
- ):
- """Read the jemalloc memory profiling related
- env var and return the dictionary that translates
- them to proper jemalloc related env vars.
- For example, if users specify `RAY_JEMALLOC_LIB_PATH`,
- it is translated into `LD_PRELOAD` which is needed to
- run Jemalloc as a shared library.
- Params:
- jemalloc_path: The path to the jemalloc shared library.
- jemalloc_conf: `,` separated string of jemalloc config.
- jemalloc_comps: The list of Ray components
- that we will profile.
- process_type: The process type that needs jemalloc
- env var for memory profiling. If it doesn't match one of
- jemalloc_comps, the function will return an empty dict.
- Returns:
- dictionary of {env_var: value}
- that are needed to jemalloc profiling. The caller can
- call `dict.update(return_value_of_this_func)` to
- update the dict of env vars. If the process_type doesn't
- match jemalloc_comps, it will return an empty dict.
- """
- assert isinstance(jemalloc_comps, list)
- assert process_type is not None
- process_type = process_type.lower()
- if not jemalloc_path:
- return {}
- env_vars = {
- "LD_PRELOAD": jemalloc_path,
- "RAY_LD_PRELOAD_ON_WORKERS": os.environ.get("RAY_LD_PRELOAD_ON_WORKERS", "0"),
- }
- if process_type in jemalloc_comps and jemalloc_conf:
- env_vars.update({"MALLOC_CONF": jemalloc_conf})
- return env_vars
- class ConsolePopen(subprocess.Popen):
- if sys.platform == "win32":
- def terminate(self):
- if isinstance(self.stdin, io.IOBase):
- self.stdin.close()
- if self._use_signals:
- self.send_signal(signal.CTRL_BREAK_EVENT)
- else:
- super(ConsolePopen, self).terminate()
- def __init__(self, *args, **kwargs):
- # CREATE_NEW_PROCESS_GROUP is used to send Ctrl+C on Windows:
- # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal
- new_pgroup = subprocess.CREATE_NEW_PROCESS_GROUP
- flags_to_add = 0
- if ray._private.utils.detect_fate_sharing_support():
- # If we don't have kernel-mode fate-sharing, then don't do this
- # because our children need to be in out process group for
- # the process reaper to properly terminate them.
- flags_to_add = new_pgroup
- flags_key = "creationflags"
- if flags_to_add:
- kwargs[flags_key] = (kwargs.get(flags_key) or 0) | flags_to_add
- self._use_signals = kwargs[flags_key] & new_pgroup
- super(ConsolePopen, self).__init__(*args, **kwargs)
- def _find_address_from_flag(flag: str):
- """
- Attempts to find all valid Ray addresses on this node, specified by the
- flag.
- Params:
- flag: `--redis-address` or `--gcs-address`
- Returns:
- Set of detected addresses.
- """
- # Using Redis address `--redis-address` as an example:
- # Currently, this extracts the deprecated --redis-address from the command
- # that launched the raylet running on this node, if any. Anyone looking to
- # edit this function should be warned that these commands look like, for
- # example:
- # /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet
- # --redis_address=123.456.78.910 --node_ip_address=123.456.78.910
- # --raylet_socket_name=... --store_socket_name=... --object_manager_port=0
- # --min_worker_port=10000 --max_worker_port=19999
- # --node_manager_port=58578 --redis_port=6379
- # --maximum_startup_concurrency=8
- # --static_resource_list=node:123.456.78.910,1.0,object_store_memory,66
- # --config_list=plasma_store_as_thread,True
- # --python_worker_command=/usr/bin/python
- # /usr/local/lib/python3.8/dist-packages/ray/workers/default_worker.py
- # --redis-address=123.456.78.910:6379
- # --node-ip-address=123.456.78.910 --node-manager-port=58578
- # --object-store-name=... --raylet-name=...
- # --temp-dir=/tmp/ray
- # --metrics-agent-port=41856 --redis-password=[MASKED]
- # --java_worker_command= --cpp_worker_command=
- # --redis_password=[MASKED] --temp_dir=/tmp/ray --session_dir=...
- # --metrics-agent-port=41856 --metrics_export_port=64229
- # --dashboard_agent_command=/usr/bin/python
- # -u /usr/local/lib/python3.8/dist-packages/ray/dashboard/agent.py
- # --redis-address=123.456.78.910:6379 --metrics-export-port=64229
- # --dashboard-agent-port=41856 --node-manager-port=58578
- # --object-store-name=... --raylet-name=... --temp-dir=/tmp/ray
- # --log-dir=/tmp/ray/session_2020-11-08_14-29-07_199128_278000/logs
- # --redis-password=[MASKED] --object_store_memory=5037192806
- # --plasma_directory=/tmp
- # Longer arguments are elided with ... but all arguments from this instance
- # are included, to provide a sense of what is in these.
- # Indeed, we had to pull --redis-address to the front of each call to make
- # this readable.
- # As you can see, this is very long and complex, which is why we can't
- # simply extract all the arguments using regular expressions and
- # present a dict as if we never lost track of these arguments, for
- # example. Picking out --redis-address below looks like it might grab the
- # wrong thing, but double-checking that we're finding the correct process
- # by checking that the contents look like we expect would probably be prone
- # to choking in unexpected ways.
- # Notice that --redis-address appears twice. This is not a copy-paste
- # error; this is the reason why the for loop below attempts to pick out
- # every appearance of --redis-address.
- # The --redis-address here is what is now called the --address, but it
- # appears in the default_worker.py and agent.py calls as --redis-address.
- addresses = set()
- for proc in psutil.process_iter(["cmdline"]):
- try:
- # HACK: Workaround for UNIX idiosyncrasy
- # Normally, cmdline() is supposed to return the argument list.
- # But it in some cases (such as when setproctitle is called),
- # an arbitrary string resembling a command-line is stored in
- # the first argument.
- # Explanation: https://unix.stackexchange.com/a/432681
- # More info: https://github.com/giampaolo/psutil/issues/1179
- cmdline = proc.info["cmdline"]
- # NOTE(kfstorm): To support Windows, we can't use
- # `os.path.basename(cmdline[0]) == "raylet"` here.
- if _is_raylet_process(cmdline):
- for arglist in cmdline:
- # Given we're merely seeking --redis-address, we just split
- # every argument on spaces for now.
- for arg in arglist.split(" "):
- # TODO(ekl): Find a robust solution for locating Redis.
- if arg.startswith(flag):
- proc_addr = arg.split("=")[1]
- # TODO(mwtian): remove this workaround after Ray
- # no longer sets --redis-address to None.
- if proc_addr != "" and proc_addr != "None":
- addresses.add(proc_addr)
- except psutil.AccessDenied:
- pass
- except psutil.NoSuchProcess:
- pass
- return addresses
- def find_node_ids():
- """Finds any local raylet processes and returns the node id."""
- return _find_address_from_flag("--node_id")
- def find_gcs_addresses():
- """Finds any local GCS processes based on grepping ps."""
- return _find_address_from_flag("--gcs-address")
- def find_bootstrap_address(temp_dir: Optional[str]):
- """Finds the latest Ray cluster address to connect to, if any. This is the
- GCS address connected to by the last successful `ray start`."""
- return ray._private.utils.read_ray_address(temp_dir)
- def get_ray_address_from_environment(addr: str, temp_dir: Optional[str]):
- """Attempts to find the address of Ray cluster to use, in this order:
- 1. Use RAY_ADDRESS if defined and nonempty.
- 2. If no address is provided or the provided address is "auto", use the
- address in /tmp/ray/ray_current_cluster if available. This will error if
- the specified address is None and there is no address found. For "auto",
- we will fallback to connecting to any detected Ray cluster (legacy).
- 3. Otherwise, use the provided address.
- Returns:
- A string to pass into `ray.init(address=...)`, e.g. ip:port, `auto`.
- """
- env_addr = os.environ.get(ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE)
- if env_addr is not None and env_addr != "":
- addr = env_addr
- if addr is not None and addr != "auto":
- return addr
- # We should try to automatically find an active local instance.
- gcs_addrs = find_gcs_addresses()
- bootstrap_addr = find_bootstrap_address(temp_dir)
- if len(gcs_addrs) > 1 and bootstrap_addr is not None:
- logger.warning(
- f"Found multiple active Ray instances: {gcs_addrs}. "
- f"Connecting to latest cluster at {bootstrap_addr}. "
- "You can override this by setting the `--address` flag "
- "or `RAY_ADDRESS` environment variable."
- )
- elif len(gcs_addrs) > 0 and addr == "auto":
- # Preserve legacy "auto" behavior of connecting to any cluster, even if not
- # started with ray start. However if addr is None, we will raise an error.
- bootstrap_addr = list(gcs_addrs).pop()
- if bootstrap_addr is None:
- if addr is None:
- # Caller should start a new instance.
- return None
- else:
- raise ConnectionError(
- "Could not find any running Ray instance. "
- "Please specify the one to connect to by setting `--address` flag "
- "or `RAY_ADDRESS` environment variable."
- )
- return bootstrap_addr
- def wait_for_node(
- gcs_address: str,
- node_plasma_store_socket_name: str,
- timeout: int = _timeout,
- ):
- """Wait until this node has appeared in the client table.
- NOTE: Makes an RPC to the GCS up to every 0.1 seconds to
- get all node info. Use only for testing.
- Args:
- gcs_address: The gcs address
- node_plasma_store_socket_name: The
- plasma_store_socket_name for the given node which we wait for.
- timeout: The amount of time in seconds to wait before raising an
- exception.
- Raises:
- TimeoutError: An exception is raised if the timeout expires before
- the node appears in the client table.
- """
- gcs_options = GcsClientOptions.create(
- gcs_address, None, allow_cluster_id_nil=True, fetch_cluster_id_if_nil=False
- )
- global_state = ray._private.state.GlobalState()
- global_state._initialize_global_state(gcs_options)
- start_time = time.time()
- while time.time() - start_time < timeout:
- clients = global_state.node_table()
- object_store_socket_names = [
- client["ObjectStoreSocketName"] for client in clients
- ]
- if node_plasma_store_socket_name in object_store_socket_names:
- return
- else:
- time.sleep(0.1)
- raise TimeoutError(
- f"Timed out after {timeout} seconds while waiting for node to startup. "
- f"Did not find socket name {node_plasma_store_socket_name} in the list "
- "of object store socket names."
- )
- def get_node_to_connect_for_driver(
- gcs_client: GcsClient,
- node_ip_address: str = None,
- node_name: str = None,
- temp_dir: str = None,
- ) -> GcsNodeInfo:
- """
- Get the node to connect to for the driver.
- If node_ip_address, node_name, and/or temp_dir are provided, they will be used to filter the nodes to connect to.
- If node_ip_address, node_name, and/or temp_dir are not provided, or if multiple node matches the filters,
- the following logic will be applied to resolve the node to connect to:
- 1. If there are multiple nodes on the same host, this function will prioritize the head node if available.
- 2. If there is no head node, it will return an arbitrary node it finds.
- Args:
- gcs_client: The GCS client.
- node_ip_address: The IP address of the node to connect to. If not provided,
- it will be resolved to a ray node on the same host.
- node_name: The name of the node to connect to. If not provided, it will be resolved to a ray node on the same host.
- temp_dir: The temp directory of the node to connect to. If not provided, it will be resolved to a ray node on the same host.
- Returns:
- The node info of the node to connect to.
- """
- node_to_connect_info = None
- possible_node_ids = find_node_ids()
- node_selectors = []
- for id in possible_node_ids:
- id_node_selector = GetAllNodeInfoRequest.NodeSelector(
- node_id=NodeID.from_hex(id).binary()
- )
- node_selectors.append(id_node_selector)
- try:
- node_to_connect_infos = gcs_client.get_all_node_info(
- timeout=ray_constants.GCS_SERVER_REQUEST_TIMEOUT_SECONDS,
- node_selectors=node_selectors,
- state_filter=GcsNodeInfo.GcsNodeState.ALIVE,
- ).values()
- except Exception as e:
- raise RuntimeError(
- f"Failed to get node info for possible node ids: {possible_node_ids}"
- f" when trying to resolve node to connect to. Error: {repr(e)}"
- )
- if not node_to_connect_infos:
- raise RuntimeError(
- f"No node info found matching node ids: {possible_node_ids}"
- f" when trying to resolve node to connect to."
- )
- filtered_node_to_connect_infos = []
- for node_info in node_to_connect_infos:
- if (
- node_ip_address is None or node_info.node_manager_address == node_ip_address
- ) and (node_name is None or node_info.node_name == node_name):
- filtered_node_to_connect_infos.append(node_info)
- if not filtered_node_to_connect_infos:
- attrs = [node_ip_address, node_name, temp_dir]
- attrs_str = ", ".join(f"{attr}" for attr in attrs if attr is not None)
- raise RuntimeError(
- f"No node info found matching attributes: '{attrs_str}' when trying to resolve node to connect to."
- )
- # Prioritize head node if available
- for node_info in filtered_node_to_connect_infos:
- if node_info.is_head_node:
- node_to_connect_info = node_info
- break
- if node_to_connect_info is None:
- node_to_connect_info = next(iter(filtered_node_to_connect_infos))
- return node_to_connect_info
- def get_node(gcs_address, node_id):
- """
- Get the node information from the global state accessor.
- """
- global_state = ray._private.state.GlobalState()
- gcs_options = _get_gcs_client_options(gcs_address)
- global_state._initialize_global_state(gcs_options)
- return global_state.get_node(node_id)
- def get_node_with_retry(
- gcs_address: str,
- node_id: str,
- timeout_s: float = 30,
- retry_interval_s: float = 1,
- ) -> dict:
- """Get node info from GCS with retry logic.
- Keeps retrying until the node is found or timeout is reached.
- Some Ray processes (e.g., ray_client_server) start in parallel
- with the raylet. When they query GCS for node info, the raylet may not have
- registered yet. This function retries until the node info is available.
- Args:
- gcs_address: The address of the GCS server (e.g., "ip:port").
- node_id: The hex string ID of the node to find.
- timeout_s: Total timeout in seconds. Default 30s.
- retry_interval_s: Interval between retries in seconds. Default 1s.
- Returns:
- A dictionary containing node info.
- Raises:
- RuntimeError: If the node is not found within the timeout.
- """
- end_time = time.time() + timeout_s
- while True:
- try:
- node_info = get_node(gcs_address, node_id)
- if node_info is not None:
- return node_info
- except RuntimeError:
- # This is expected if the node hasn't registered with GCS yet.
- pass
- if time.time() >= end_time:
- raise RuntimeError(
- f"Timed out waiting for node info for node_id={node_id}."
- )
- time.sleep(retry_interval_s)
- def get_webui_url_from_internal_kv():
- assert ray.experimental.internal_kv._internal_kv_initialized()
- webui_url = ray.experimental.internal_kv._internal_kv_get(
- "webui:url", namespace=ray_constants.KV_NAMESPACE_DASHBOARD
- )
- return ray._common.utils.decode(webui_url) if webui_url is not None else None
- def remaining_processes_alive():
- """See if the remaining processes are alive or not.
- Note that this ignores processes that have been explicitly killed,
- e.g., via a command like node.kill_raylet().
- Returns:
- True if the remaining processes started by ray.init() are alive and
- False otherwise.
- Raises:
- Exception: An exception is raised if the processes were not started by
- ray.init().
- """
- if ray._private.worker._global_node is None:
- raise RuntimeError(
- "This process is not in a position to determine "
- "whether all processes are alive or not."
- )
- return ray._private.worker._global_node.remaining_processes_alive()
- def canonicalize_bootstrap_address(
- addr: str, temp_dir: Optional[str] = None
- ) -> Optional[str]:
- """Canonicalizes Ray cluster bootstrap address to host:port.
- Reads address from the environment if needed.
- This function should be used to process user supplied Ray cluster address,
- via ray.init() or `--address` flags, before using the address to connect.
- Returns:
- Ray cluster address string in <host:port> format or None if the caller
- should start a local Ray instance.
- """
- if addr is None or addr == "auto":
- addr = get_ray_address_from_environment(addr, temp_dir)
- if addr is None or addr == "local":
- return None
- parsed = parse_address(addr)
- if parsed is None:
- raise ValueError(f"Invalid address format: {addr}")
- host, port = parsed
- try:
- bootstrap_host = resolve_ip_for_localhost(host)
- except Exception:
- logger.exception(f"Failed to convert {addr} to host:port")
- raise
- return build_address(bootstrap_host, port)
- def canonicalize_bootstrap_address_or_die(
- addr: str, temp_dir: Optional[str] = None
- ) -> str:
- """Canonicalizes Ray cluster bootstrap address to host:port.
- This function should be used when the caller expects there to be an active
- and local Ray instance. If no address is provided or address="auto", this
- will autodetect the latest Ray instance created with `ray start`.
- For convenience, if no address can be autodetected, this function will also
- look for any running local GCS processes, based on pgrep output. This is to
- allow easier use of Ray CLIs when debugging a local Ray instance (whose GCS
- addresses are not recorded).
- Returns:
- Ray cluster address string in <host:port> format. Throws a
- ConnectionError if zero or multiple active Ray instances are
- autodetected.
- """
- bootstrap_addr = canonicalize_bootstrap_address(addr, temp_dir=temp_dir)
- if bootstrap_addr is not None:
- return bootstrap_addr
- running_gcs_addresses = find_gcs_addresses()
- if len(running_gcs_addresses) == 0:
- raise ConnectionError(
- "Could not find any running Ray instance. "
- "Please specify the one to connect to by setting the `--address` "
- "flag or `RAY_ADDRESS` environment variable."
- )
- if len(running_gcs_addresses) > 1:
- raise ConnectionError(
- f"Found multiple active Ray instances: {running_gcs_addresses}. "
- "Please specify the one to connect to by setting the `--address` "
- "flag or `RAY_ADDRESS` environment variable."
- )
- return running_gcs_addresses.pop()
- def extract_ip_port(bootstrap_address: str):
- ip_port = parse_address(bootstrap_address)
- if ip_port is None:
- raise ValueError(
- f"Malformed address {bootstrap_address}. " f"Expected '<host>:<port>'."
- )
- ip, port = ip_port
- try:
- port = int(port)
- except ValueError:
- raise ValueError(f"Malformed address port {port}. Must be an integer.")
- if port < 1024 or port > 65535:
- raise ValueError(
- f"Invalid address port {port}. Must be between 1024 "
- "and 65535 (inclusive)."
- )
- return ip, port
- def resolve_ip_for_localhost(host: str):
- """Convert to a remotely reachable IP if the host is "localhost",
- "127.0.0.1", or "::1". Otherwise do nothing.
- Args:
- host: The hostname or IP address.
- Returns:
- The same host but with the local host replaced by remotely
- reachable IP.
- """
- if not host:
- raise ValueError(f"Malformed host: {host}")
- if host == "127.0.0.1" or host == "::1" or host == "localhost":
- # Make sure localhost isn't resolved to the loopback ip
- return get_node_ip_address()
- else:
- return host
- # NOTE: This API should not be used when you obtain the
- # IP address when ray.init is not called because
- # it cannot find the IP address if it is specified by
- # ray start --node-ip-address. You should instead use
- # get_node_to_connect_ip_address.
- def get_node_ip_address(address=None):
- if ray._private.worker._global_node is not None:
- return ray._private.worker._global_node.node_ip_address
- if not ray_constants.ENABLE_RAY_CLUSTER:
- # Use loopback IP as the local IP address to prevent bothersome
- # firewall popups on OSX and Windows.
- # https://github.com/ray-project/ray/issues/18730.
- return get_localhost_ip()
- return node_ip_address_from_perspective(address)
- def get_node_instance_id():
- """Get the specified node instance id of the current node.
- Returns:
- The node instance id of the current node.
- """
- return os.getenv("RAY_CLOUD_INSTANCE_ID", "")
- def create_redis_client(redis_address, password=None, username=None):
- """Create a Redis client.
- Args:
- redis_address: The IP address and port of the Redis server.
- password: The password for Redis authentication.
- username: The username for Redis authentication.
- Returns:
- A Redis client.
- """
- import redis
- if not hasattr(create_redis_client, "instances"):
- create_redis_client.instances = {}
- num_retries = ray_constants.START_REDIS_WAIT_RETRIES
- delay = 0.001
- for i in range(num_retries):
- cli = create_redis_client.instances.get(redis_address)
- if cli is None:
- redis_ip_address, redis_port = extract_ip_port(
- canonicalize_bootstrap_address_or_die(redis_address)
- )
- cli = redis.StrictRedis(
- host=redis_ip_address,
- port=int(redis_port),
- username=username,
- password=password,
- )
- create_redis_client.instances[redis_address] = cli
- try:
- cli.ping()
- return cli
- except Exception as e:
- create_redis_client.instances.pop(redis_address)
- if i >= num_retries - 1:
- raise RuntimeError(
- f"Unable to connect to Redis at {redis_address}: {e}"
- )
- # Wait a little bit.
- time.sleep(delay)
- # Make sure the retry interval doesn't increase too large.
- delay = min(1, delay * 2)
- def start_ray_process(
- command: List[str],
- process_type: str,
- fate_share: bool,
- env_updates: Optional[dict] = None,
- cwd: Optional[str] = None,
- use_valgrind: bool = False,
- use_gdb: bool = False,
- use_valgrind_profiler: bool = False,
- use_perftools_profiler: bool = False,
- use_tmux: bool = False,
- stdout_file: Optional[IO[AnyStr]] = None,
- stderr_file: Optional[IO[AnyStr]] = None,
- pipe_stdin: bool = False,
- ):
- """Start one of the Ray processes.
- TODO(rkn): We need to figure out how these commands interact. For example,
- it may only make sense to start a process in gdb if we also start it in
- tmux. Similarly, certain combinations probably don't make sense, like
- simultaneously running the process in valgrind and the profiler.
- Args:
- command: The command to use to start the Ray process.
- process_type: The type of the process that is being started
- (e.g., "raylet").
- fate_share: If true, the child will be killed if its parent (us) dies.
- True must only be passed after detection of this functionality.
- env_updates: A dictionary of additional environment variables to
- run the command with (in addition to the caller's environment
- variables).
- cwd: The directory to run the process in.
- use_valgrind: True if we should start the process in valgrind.
- use_gdb: True if we should start the process in gdb.
- use_valgrind_profiler: True if we should start the process in
- the valgrind profiler.
- use_perftools_profiler: True if we should profile the process
- using perftools.
- use_tmux: True if we should start the process in tmux.
- stdout_file: A file handle opened for writing to redirect stdout to. If
- no redirection should happen, then this should be None.
- stderr_file: A file handle opened for writing to redirect stderr to. If
- no redirection should happen, then this should be None.
- pipe_stdin: If true, subprocess.PIPE will be passed to the process as
- stdin.
- Returns:
- Information about the process that was started including a handle to
- the process that was started.
- """
- # Detect which flags are set through environment variables.
- valgrind_env_var = f"RAY_{process_type.upper()}_VALGRIND"
- if os.environ.get(valgrind_env_var) == "1":
- logger.info("Detected environment variable '%s'.", valgrind_env_var)
- use_valgrind = True
- valgrind_profiler_env_var = f"RAY_{process_type.upper()}_VALGRIND_PROFILER"
- if os.environ.get(valgrind_profiler_env_var) == "1":
- logger.info("Detected environment variable '%s'.", valgrind_profiler_env_var)
- use_valgrind_profiler = True
- perftools_profiler_env_var = f"RAY_{process_type.upper()}_PERFTOOLS_PROFILER"
- if os.environ.get(perftools_profiler_env_var) == "1":
- logger.info("Detected environment variable '%s'.", perftools_profiler_env_var)
- use_perftools_profiler = True
- tmux_env_var = f"RAY_{process_type.upper()}_TMUX"
- if os.environ.get(tmux_env_var) == "1":
- logger.info("Detected environment variable '%s'.", tmux_env_var)
- use_tmux = True
- gdb_env_var = f"RAY_{process_type.upper()}_GDB"
- if os.environ.get(gdb_env_var) == "1":
- logger.info("Detected environment variable '%s'.", gdb_env_var)
- use_gdb = True
- # Jemalloc memory profiling.
- if os.environ.get("LD_PRELOAD") is None:
- jemalloc_lib_path = os.environ.get(RAY_JEMALLOC_LIB_PATH, JEMALLOC_SO)
- jemalloc_conf = os.environ.get(RAY_JEMALLOC_CONF, "")
- jemalloc_comps = os.environ.get(RAY_JEMALLOC_PROFILE)
- jemalloc_comps = [] if not jemalloc_comps else jemalloc_comps.split(",")
- jemalloc_env_vars = propagate_jemalloc_env_var(
- jemalloc_path=jemalloc_lib_path,
- jemalloc_conf=jemalloc_conf,
- jemalloc_comps=jemalloc_comps,
- process_type=process_type,
- )
- else:
- jemalloc_env_vars = {}
- use_jemalloc_mem_profiler = "MALLOC_CONF" in jemalloc_env_vars
- if (
- sum(
- [
- use_gdb,
- use_valgrind,
- use_valgrind_profiler,
- use_perftools_profiler,
- use_jemalloc_mem_profiler,
- ]
- )
- > 1
- ):
- raise ValueError(
- "At most one of the 'use_gdb', 'use_valgrind', "
- "'use_valgrind_profiler', 'use_perftools_profiler', "
- "and 'use_jemalloc_mem_profiler' flags can "
- "be used at a time."
- )
- if env_updates is None:
- env_updates = {}
- if not isinstance(env_updates, dict):
- raise ValueError("The 'env_updates' argument must be a dictionary.")
- modified_env = os.environ.copy()
- modified_env.update(env_updates)
- if use_gdb:
- if not use_tmux:
- raise ValueError(
- "If 'use_gdb' is true, then 'use_tmux' must be true as well."
- )
- # TODO(suquark): Any better temp file creation here?
- gdb_init_path = os.path.join(
- ray._common.utils.get_ray_temp_dir(),
- f"gdb_init_{process_type}_{time.time()}",
- )
- ray_process_path = command[0]
- ray_process_args = command[1:]
- run_args = " ".join(["'{}'".format(arg) for arg in ray_process_args])
- with open(gdb_init_path, "w") as gdb_init_file:
- gdb_init_file.write(f"run {run_args}")
- command = ["gdb", ray_process_path, "-x", gdb_init_path]
- if use_valgrind:
- command = [
- "valgrind",
- "--track-origins=yes",
- "--leak-check=full",
- "--show-leak-kinds=all",
- "--leak-check-heuristics=stdstring",
- "--error-exitcode=1",
- ] + command
- if use_valgrind_profiler:
- command = ["valgrind", "--tool=callgrind"] + command
- if use_perftools_profiler:
- modified_env["LD_PRELOAD"] = os.environ["PERFTOOLS_PATH"]
- modified_env["CPUPROFILE"] = os.environ["PERFTOOLS_LOGFILE"]
- modified_env.update(jemalloc_env_vars)
- if use_tmux:
- # The command has to be created exactly as below to ensure that it
- # works on all versions of tmux. (Tested with tmux 1.8-5, travis'
- # version, and tmux 2.1)
- command = ["tmux", "new-session", "-d", f"{' '.join(command)}"]
- if fate_share:
- assert ray._private.utils.detect_fate_sharing_support(), (
- "kernel-level fate-sharing must only be specified if "
- "detect_fate_sharing_support() has returned True"
- )
- def preexec_fn():
- import signal
- signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
- if fate_share and sys.platform.startswith("linux"):
- ray._private.utils.set_kill_on_parent_death_linux()
- win32_fate_sharing = fate_share and sys.platform == "win32"
- # With Windows fate-sharing, we need special care:
- # The process must be added to the job before it is allowed to execute.
- # Otherwise, there's a race condition: the process might spawn children
- # before the process itself is assigned to the job.
- # After that point, its children will not be added to the job anymore.
- CREATE_SUSPENDED = 0x00000004 # from Windows headers
- if sys.platform == "win32":
- # CreateProcess, which underlies Popen, is limited to
- # 32,767 characters, including the Unicode terminating null
- # character
- total_chrs = sum([len(x) for x in command])
- if total_chrs > 31766:
- raise ValueError(
- f"command is limited to a total of 31767 characters, "
- f"got {total_chrs}"
- )
- process = ConsolePopen(
- command,
- env=modified_env,
- cwd=cwd,
- stdout=stdout_file,
- stderr=stderr_file,
- stdin=subprocess.PIPE if pipe_stdin else None,
- preexec_fn=preexec_fn if sys.platform != "win32" else None,
- creationflags=CREATE_SUSPENDED if win32_fate_sharing else 0,
- )
- if win32_fate_sharing:
- try:
- ray._private.utils.set_kill_child_on_death_win32(process)
- psutil.Process(process.pid).resume()
- except (psutil.Error, OSError):
- process.kill()
- raise
- def _get_stream_name(stream):
- if stream is not None:
- try:
- return stream.name
- except AttributeError:
- return str(stream)
- return None
- return ProcessInfo(
- process=process,
- stdout_file=_get_stream_name(stdout_file),
- stderr_file=_get_stream_name(stderr_file),
- use_valgrind=use_valgrind,
- use_gdb=use_gdb,
- use_valgrind_profiler=use_valgrind_profiler,
- use_perftools_profiler=use_perftools_profiler,
- use_tmux=use_tmux,
- )
- def start_reaper(fate_share=None):
- """Start the reaper process.
- This is a lightweight process that simply
- waits for its parent process to die and then terminates its own
- process group. This allows us to ensure that ray processes are always
- terminated properly so long as that process itself isn't SIGKILLed.
- Returns:
- ProcessInfo for the process that was started.
- """
- # Make ourselves a process group leader so that the reaper can clean
- # up other ray processes without killing the process group of the
- # process that started us.
- try:
- if sys.platform != "win32":
- os.setpgrp()
- except OSError as e:
- errcode = e.errno
- if errcode == errno.EPERM and os.getpgrp() == os.getpid():
- # Nothing to do; we're already a session leader.
- pass
- else:
- logger.warning(
- f"setpgrp failed, processes may not be cleaned up properly: {e}."
- )
- # Don't start the reaper in this case as it could result in killing
- # other user processes.
- return None
- reaper_filepath = os.path.join(RAY_PATH, RAY_PRIVATE_DIR, "ray_process_reaper.py")
- command = [sys.executable, "-u", reaper_filepath]
- process_info = start_ray_process(
- command,
- ray_constants.PROCESS_TYPE_REAPER,
- pipe_stdin=True,
- fate_share=fate_share,
- )
- return process_info
- def start_log_monitor(
- session_dir: str,
- logs_dir: str,
- gcs_address: str,
- node_ip_address: str,
- fate_share: Optional[bool] = None,
- max_bytes: int = 0,
- backup_count: int = 0,
- stdout_filepath: Optional[str] = None,
- stderr_filepath: Optional[str] = None,
- ):
- """Start a log monitor process.
- Args:
- session_dir: The session directory.
- logs_dir: The directory of logging files.
- gcs_address: GCS address for pubsub.
- node_ip_address: The IP address of the node we are connected to.
- fate_share: Whether to share fate between log_monitor
- and this process.
- max_bytes: Log rotation parameter. Corresponding to
- RotatingFileHandler's maxBytes.
- backup_count: Log rotation parameter. Corresponding to
- RotatingFileHandler's backupCount.
- stdout_filepath: The file path to dump log monitor stdout.
- If None, stdout is not redirected.
- stderr_filepath: The file path to dump log monitor stderr.
- If None, stderr is not redirected.
- Returns:
- ProcessInfo for the process that was started.
- """
- log_monitor_filepath = os.path.join(RAY_PATH, RAY_PRIVATE_DIR, "log_monitor.py")
- command = [
- sys.executable,
- "-u",
- log_monitor_filepath,
- f"--session-dir={session_dir}",
- f"--logs-dir={logs_dir}",
- f"--gcs-address={gcs_address}",
- f"--node-ip-address={node_ip_address}",
- f"--logging-rotate-bytes={max_bytes}",
- f"--logging-rotate-backup-count={backup_count}",
- ]
- if stdout_filepath:
- command.append(f"--stdout-filepath={stdout_filepath}")
- if stderr_filepath:
- command.append(f"--stderr-filepath={stderr_filepath}")
- if stdout_filepath is None and stderr_filepath is None:
- # If not redirecting logging to files, unset log filename.
- # This will cause log records to go to stderr.
- command.append("--logging-filename=")
- # Use stderr log format with the component name as a message prefix.
- logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
- component=ray_constants.PROCESS_TYPE_LOG_MONITOR
- )
- command.append(f"--logging-format={logging_format}")
- stdout_file = None
- if stdout_filepath:
- stdout_file = open(os.devnull, "w")
- stderr_file = None
- if stderr_filepath:
- stderr_file = open(os.devnull, "w")
- process_info = start_ray_process(
- command,
- ray_constants.PROCESS_TYPE_LOG_MONITOR,
- stdout_file=stdout_file,
- stderr_file=stderr_file,
- fate_share=fate_share,
- )
- return process_info
- def start_api_server(
- include_dashboard: Optional[bool],
- raise_on_failure: bool,
- host: str,
- gcs_address: str,
- cluster_id_hex: str,
- node_ip_address: str,
- temp_dir: str,
- logdir: str,
- session_dir: str,
- port: Optional[int] = None,
- fate_share: Optional[bool] = None,
- max_bytes: int = 0,
- backup_count: int = 0,
- stdout_filepath: Optional[str] = None,
- stderr_filepath: Optional[str] = None,
- ):
- """Start a API server process.
- Args:
- include_dashboard: If true, this will load all dashboard-related modules
- when starting the API server, or fail. If None, it will load all
- dashboard-related modules conditioned on dependencies being present.
- Otherwise, it will only start the modules that are not relevant to
- the dashboard.
- raise_on_failure: If true, this will raise an exception
- if we fail to start the API server. Otherwise it will print
- a warning if we fail to start the API server.
- host: The host to bind the dashboard web server to.
- gcs_address: The gcs address the dashboard should connect to
- cluster_id_hex: Cluster ID in hex.
- node_ip_address: The IP address where this is running.
- temp_dir: The temporary directory used for log files and
- information for this Ray session.
- session_dir: The session directory under temp_dir.
- It is used as a identifier of individual cluster.
- logdir: The log directory used to generate dashboard log.
- port: The port to bind the dashboard web server to.
- Defaults to 8265.
- max_bytes: Log rotation parameter. Corresponding to
- RotatingFileHandler's maxBytes.
- backup_count: Log rotation parameter. Corresponding to
- RotatingFileHandler's backupCount.
- stdout_filepath: The file path to dump dashboard stdout.
- If None, stdout is not redirected.
- stderr_filepath: The file path to dump dashboard stderr.
- If None, stderr is not redirected.
- Returns:
- A tuple of :
- - Dashboard URL if dashboard enabled and started.
- - ProcessInfo for the process that was started.
- """
- try:
- # Make sure port is available.
- if port is None:
- port_retries = 50
- port = ray_constants.DEFAULT_DASHBOARD_PORT
- else:
- port_retries = 0
- port_test_socket = socket.socket(
- socket.AF_INET6 if is_ipv6(host) else socket.AF_INET,
- socket.SOCK_STREAM,
- )
- port_test_socket.setsockopt(
- socket.SOL_SOCKET,
- socket.SO_REUSEADDR,
- 1,
- )
- try:
- port_test_socket.bind((host, port))
- port_test_socket.close()
- except socket.error as e:
- # 10013 on windows is a bit more broad than just
- # "address in use": it can also indicate "permission denied".
- # TODO: improve the error message?
- if e.errno in {48, 98, 10013}: # address already in use.
- raise ValueError(
- f"Failed to bind to {host}:{port} because it's "
- "already occupied. You can use `ray start "
- "--dashboard-port ...` or `ray.init(dashboard_port=..."
- ")` to select a different port."
- )
- else:
- raise e
- # Make sure the process can start.
- dashboard_dependency_error = ray._private.utils.get_dashboard_dependency_error()
- # Explicitly check here that when the user explicitly specifies
- # dashboard inclusion, the install is not minimal.
- if include_dashboard and dashboard_dependency_error:
- logger.error(
- f"Ray dashboard dependencies failed to install properly: {dashboard_dependency_error}.\n"
- "Potential causes include:\n"
- "1. --include-dashboard is not supported when minimal ray is used. "
- "Download ray[default] to use the dashboard.\n"
- "2. Dashboard dependencies are conflicting with your python environment. "
- "Investigate your python environment and try reinstalling ray[default].\n"
- )
- raise Exception("Cannot include dashboard with missing packages.")
- include_dash: bool = True if include_dashboard is None else include_dashboard
- # Start the dashboard process.
- dashboard_dir = "dashboard"
- dashboard_filepath = os.path.join(RAY_PATH, dashboard_dir, "dashboard.py")
- command = [
- *_build_python_executable_command_memory_profileable(
- ray_constants.PROCESS_TYPE_DASHBOARD,
- session_dir,
- unbuffered=False,
- ),
- dashboard_filepath,
- f"--host={host}",
- f"--port={port}",
- f"--port-retries={port_retries}",
- f"--temp-dir={temp_dir}",
- f"--log-dir={logdir}",
- f"--session-dir={session_dir}",
- f"--logging-rotate-bytes={max_bytes}",
- f"--logging-rotate-backup-count={backup_count}",
- f"--gcs-address={gcs_address}",
- f"--cluster-id-hex={cluster_id_hex}",
- f"--node-ip-address={node_ip_address}",
- ]
- if stdout_filepath:
- command.append(f"--stdout-filepath={stdout_filepath}")
- if stderr_filepath:
- command.append(f"--stderr-filepath={stderr_filepath}")
- if stdout_filepath is None and stderr_filepath is None:
- # If not redirecting logging to files, unset log filename.
- # This will cause log records to go to stderr.
- command.append("--logging-filename=")
- # Use stderr log format with the component name as a message prefix.
- logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
- component=ray_constants.PROCESS_TYPE_DASHBOARD
- )
- command.append(f"--logging-format={logging_format}")
- if dashboard_dependency_error is not None:
- command.append("--minimal")
- if not include_dash:
- # If dashboard is not included, load modules
- # that are irrelevant to the dashboard.
- # TODO(sang): Modules like job or state APIs should be
- # loaded although dashboard is disabled. Fix it.
- command.append("--modules-to-load=UsageStatsHead")
- command.append("--disable-frontend")
- stdout_file = None
- if stdout_filepath:
- stdout_file = open(os.devnull, "w")
- stderr_file = None
- if stderr_filepath:
- stderr_file = open(os.devnull, "w")
- process_info = start_ray_process(
- command,
- ray_constants.PROCESS_TYPE_DASHBOARD,
- stdout_file=stdout_file,
- stderr_file=stderr_file,
- fate_share=fate_share,
- )
- # Retrieve the dashboard url
- gcs_client = GcsClient(address=gcs_address, cluster_id=cluster_id_hex)
- ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
- dashboard_url = None
- dashboard_returncode = None
- start_time_s = time.time()
- while (
- time.time() - start_time_s < ray_constants.RAY_DASHBOARD_STARTUP_TIMEOUT_S
- ):
- dashboard_url = ray.experimental.internal_kv._internal_kv_get(
- ray_constants.DASHBOARD_ADDRESS,
- namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
- )
- if dashboard_url is not None:
- dashboard_url = dashboard_url.decode("utf-8")
- break
- dashboard_returncode = process_info.process.poll()
- if dashboard_returncode is not None:
- break
- # This is often on the critical path of ray.init() and ray start,
- # so we need to poll often.
- time.sleep(0.1)
- # Dashboard couldn't be started.
- if dashboard_url is None:
- returncode_str = (
- f", return code {dashboard_returncode}"
- if dashboard_returncode is not None
- else ""
- )
- logger.error(f"Failed to start the dashboard {returncode_str}")
- def read_log(filename, lines_to_read):
- """Read a log file and return the last 20 lines."""
- dashboard_log = os.path.join(logdir, filename)
- # Read last n lines of dashboard log. The log file may be large.
- lines_to_read = 20
- lines = []
- with open(dashboard_log, "rb") as f:
- with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
- end = mm.size()
- for _ in range(lines_to_read):
- sep = mm.rfind(b"\n", 0, end - 1)
- if sep == -1:
- break
- lines.append(mm[sep + 1 : end].decode("utf-8"))
- end = sep
- lines.append(
- f"The last {lines_to_read} lines of {dashboard_log} "
- "(it contains the error message from the dashboard): "
- )
- return lines
- if logdir:
- lines_to_read = 20
- logger.error(
- "Error should be written to 'dashboard.log' or "
- "'dashboard.err'. We are printing the last "
- f"{lines_to_read} lines for you. See "
- "'https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#logging-directory-structure' " # noqa
- "to find where the log file is."
- )
- try:
- lines = read_log("dashboard.log", lines_to_read=lines_to_read)
- except Exception as e:
- logger.error(
- f"Couldn't read dashboard.log file. Error: {e}. "
- "It means the dashboard is broken even before it "
- "initializes the logger (mostly dependency issues). "
- "Reading the dashboard.err file which contains stdout/stderr."
- )
- # If we cannot read the .log file, we fallback to .err file.
- # This is the case where dashboard couldn't be started at all
- # and couldn't even initialize the logger to write logs to .log
- # file.
- try:
- lines = read_log("dashboard.err", lines_to_read=lines_to_read)
- except Exception as e:
- raise Exception(
- f"Failed to read dashboard.err file: {e}. "
- "It is unexpected. Please report an issue to "
- "Ray github. "
- "https://github.com/ray-project/ray/issues"
- )
- last_log_str = "\n" + "\n".join(reversed(lines[-lines_to_read:]))
- raise Exception(last_log_str)
- else:
- # Is it reachable?
- raise Exception("Failed to start a dashboard.")
- if dashboard_dependency_error is not None or not include_dash:
- # If it is the minimal installation, the web url (dashboard url)
- # shouldn't be configured because it doesn't start a server.
- dashboard_url = ""
- return dashboard_url, process_info
- except Exception as e:
- if raise_on_failure:
- raise e from e
- else:
- logger.error(e)
- return None, None
- def get_address(redis_address):
- parts = redis_address.split("://", 1)
- enable_redis_ssl = False
- if len(parts) == 1:
- redis_ip_address, redis_port = parse_address(parts[0])
- else:
- # rediss for SSL
- if len(parts) != 2 or parts[0] not in ("redis", "rediss"):
- raise ValueError(
- f"Invalid redis address {redis_address}."
- "Expected format is ip:port or redis://ip:port, "
- "or rediss://ip:port for SSL."
- )
- redis_ip_address, redis_port = parse_address(parts[1])
- if parts[0] == "rediss":
- enable_redis_ssl = True
- return redis_ip_address, redis_port, enable_redis_ssl
- def start_gcs_server(
- redis_address: str,
- log_dir: str,
- stdout_filepath: Optional[str],
- stderr_filepath: Optional[str],
- session_name: str,
- redis_username: Optional[str] = None,
- redis_password: Optional[str] = None,
- config: Optional[dict] = None,
- fate_share: Optional[bool] = None,
- gcs_server_port: Optional[int] = None,
- metrics_agent_port: Optional[int] = None,
- node_ip_address: Optional[str] = None,
- session_dir: Optional[str] = None,
- node_id: Optional[str] = None,
- ):
- """Start a gcs server.
- Args:
- redis_address: The address that the Redis server is listening on.
- log_dir: The path of the dir where gcs log files are created.
- stdout_filepath: The file path to dump gcs server stdout.
- If None, stdout is not redirected.
- stderr_filepath: The file path to dump gcs server stderr.
- If None, stderr is not redirected.
- session_name: The current Ray session name.
- redis_username: The username of the Redis server.
- redis_password: The password of the Redis server.
- config: Optional configuration that will
- override defaults in RayConfig.
- gcs_server_port: Port number of the gcs server.
- metrics_agent_port: The port where metrics agent is bound to.
- node_ip_address: IP Address of a node where gcs server starts.
- session_dir: Session directory path. Used to write the bound GCS port to a file.
- node_id: The unique ID of this node.
- Returns:
- ProcessInfo for the process that was started.
- """
- assert gcs_server_port >= 0
- command = [
- GCS_SERVER_EXECUTABLE,
- f"--log_dir={log_dir}",
- f"--config_list={serialize_config(config)}",
- f"--gcs_server_port={gcs_server_port}",
- f"--metrics-agent-port={metrics_agent_port}",
- f"--node-ip-address={node_ip_address}",
- f"--session-name={session_name}",
- f"--ray-commit={ray.__commit__}",
- f"--session-dir={session_dir}",
- f"--node-id={node_id}",
- ]
- if stdout_filepath:
- command += [f"--stdout_filepath={stdout_filepath}"]
- if stderr_filepath:
- command += [f"--stderr_filepath={stderr_filepath}"]
- if redis_address:
- redis_ip_address, redis_port, enable_redis_ssl = get_address(redis_address)
- command += [
- f"--redis_address={redis_ip_address}",
- f"--redis_port={redis_port}",
- f"--redis_enable_ssl={'true' if enable_redis_ssl else 'false'}",
- ]
- if redis_username:
- command += [f"--redis_username={redis_username}"]
- if redis_password:
- command += [f"--redis_password={redis_password}"]
- stdout_file = None
- if stdout_filepath:
- stdout_file = open(os.devnull, "w")
- stderr_file = None
- if stderr_filepath:
- stderr_file = open(os.devnull, "w")
- process_info = start_ray_process(
- command,
- ray_constants.PROCESS_TYPE_GCS_SERVER,
- stdout_file=stdout_file,
- stderr_file=stderr_file,
- fate_share=fate_share,
- )
- return process_info
- def start_raylet(
- redis_address: str,
- gcs_address: str,
- node_id: str,
- node_ip_address: str,
- node_manager_port: int,
- raylet_name: str,
- plasma_store_name: str,
- cluster_id: str,
- worker_path: str,
- setup_worker_path: str,
- temp_dir: str,
- session_dir: str,
- resource_dir: str,
- log_dir: str,
- resource_and_label_spec,
- plasma_directory: str,
- fallback_directory: str,
- object_store_memory: int,
- session_name: str,
- is_head_node: bool,
- resource_isolation_config: ResourceIsolationConfig,
- min_worker_port: Optional[int] = None,
- max_worker_port: Optional[int] = None,
- worker_port_list: Optional[List[int]] = None,
- object_manager_port: Optional[int] = None,
- redis_username: Optional[str] = None,
- redis_password: Optional[str] = None,
- metrics_agent_port: Optional[int] = None,
- metrics_export_port: Optional[int] = None,
- dashboard_agent_listen_port: Optional[int] = None,
- runtime_env_agent_port: Optional[int] = None,
- use_valgrind: bool = False,
- use_profiler: bool = False,
- raylet_stdout_filepath: Optional[str] = None,
- raylet_stderr_filepath: Optional[str] = None,
- dashboard_agent_stdout_filepath: Optional[str] = None,
- dashboard_agent_stderr_filepath: Optional[str] = None,
- runtime_env_agent_stdout_filepath: Optional[str] = None,
- runtime_env_agent_stderr_filepath: Optional[str] = None,
- huge_pages: bool = False,
- fate_share: Optional[bool] = None,
- socket_to_use: Optional[int] = None,
- max_bytes: int = 0,
- backup_count: int = 0,
- ray_debugger_external: bool = False,
- env_updates: Optional[dict] = None,
- node_name: Optional[str] = None,
- webui: Optional[str] = None,
- ):
- """Start a raylet, which is a combined local scheduler and object manager.
- Args:
- redis_address: The address of the primary Redis server.
- gcs_address: The address of GCS server.
- node_id: The hex ID of this node.
- node_ip_address: The IP address of this node.
- node_manager_port: The port to use for the node manager. If it's
- 0, a random port will be used.
- raylet_name: The name of the raylet socket to create.
- plasma_store_name: The name of the plasma store socket to connect
- to.
- worker_path: The path of the Python file that new worker
- processes will execute.
- setup_worker_path: The path of the Python file that will set up
- the environment for the worker process.
- temp_dir: The path of the temporary directory Ray will use.
- session_dir: The path of this session.
- resource_dir: The path of resource of this session .
- log_dir: The path of the dir where log files are created.
- resource_and_label_spec: Resources and key-value labels for this raylet.
- plasma_directory: A directory where the Plasma memory mapped files will
- be created.
- fallback_directory: A directory where the Object store fallback files will be created.
- object_store_memory: The amount of memory (in bytes) to start the
- object store with.
- session_name: The current Ray session name.
- resource_isolation_config: Resource isolation configuration for reserving
- memory and cpu resources for ray system processes through cgroupv2
- is_head_node: whether this node is the head node.
- min_worker_port: The lowest port number that workers will bind
- on. If not set, random ports will be chosen.
- max_worker_port: The highest port number that workers will bind
- on. If set, min_worker_port must also be set.
- worker_port_list: An explicit list of ports to be used for
- workers (comma-separated). Overrides min_worker_port and
- max_worker_port.
- object_manager_port: The port to use for the object manager. If this is
- None, then the object manager will choose its own port.
- redis_username: The username to use when connecting to Redis.
- redis_password: The password to use when connecting to Redis.
- metrics_agent_port: The port where metrics agent is bound to.
- metrics_export_port: The port at which metrics are exposed to.
- dashboard_agent_listen_port: The port at which the dashboard agent
- listens to for HTTP.
- runtime_env_agent_port: The port at which the runtime env agent
- listens to for HTTP.
- use_valgrind: True if the raylet should be started inside
- of valgrind. If this is True, use_profiler must be False.
- use_profiler: True if the raylet should be started inside
- a profiler. If this is True, use_valgrind must be False.
- raylet_stdout_filepath: The file path to dump raylet stdout.
- If None, stdout is not redirected.
- raylet_stderr_filepath: The file path to dump raylet stderr.
- If None, stderr is not redirected.
- dashboard_agent_stdout_filepath: The file path to dump
- dashboard agent stdout. If None, stdout is not redirected.
- dashboard_agent_stderr_filepath: The file path to dump
- dashboard agent stderr. If None, stderr is not redirected.
- runtime_env_agent_stdout_filepath: The file path to dump
- runtime env agent stdout. If None, stdout is not redirected.
- runtime_env_agent_stderr_filepath: The file path to dump
- runtime env agent stderr. If None, stderr is not redirected.
- huge_pages: Boolean flag indicating whether to start the Object
- Store with hugetlbfs support. Requires plasma_directory.
- fate_share: Whether to share fate between raylet and this process.
- max_bytes: Log rotation parameter. Corresponding to
- RotatingFileHandler's maxBytes.
- backup_count: Log rotation parameter. Corresponding to
- RotatingFileHandler's backupCount.
- ray_debugger_external: True if the Ray debugger should be made
- available externally to this node.
- env_updates: Environment variable overrides.
- node_name: The name of the node.
- webui: The url of the UI.
- Returns:
- ProcessInfo for the process that was started.
- """
- assert node_manager_port is not None and type(node_manager_port) is int
- if use_valgrind and use_profiler:
- raise ValueError("Cannot use valgrind and profiler at the same time.")
- # Get the static resources and labels from the resolved ResourceAndLabelSpec
- static_resources = resource_and_label_spec.to_resource_dict()
- labels = resource_and_label_spec.labels
- # Limit the number of workers that can be started in parallel by the
- # raylet. However, make sure it is at least 1.
- num_cpus_static = static_resources.get("CPU", 0)
- maximum_startup_concurrency = max(
- 1, min(multiprocessing.cpu_count(), num_cpus_static)
- )
- # Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
- resource_argument = ",".join(
- ["{},{}".format(*kv) for kv in static_resources.items()]
- )
- has_java_command = False
- if shutil.which("java") is not None:
- has_java_command = True
- ray_java_installed = False
- try:
- jars_dir = get_ray_jars_dir()
- if os.path.exists(jars_dir):
- ray_java_installed = True
- except Exception:
- pass
- include_java = has_java_command and ray_java_installed
- if include_java is True:
- java_worker_command = build_java_worker_command(
- gcs_address,
- plasma_store_name,
- raylet_name,
- redis_username,
- redis_password,
- session_dir,
- node_ip_address,
- setup_worker_path,
- )
- else:
- java_worker_command = []
- if os.path.exists(DEFAULT_WORKER_EXECUTABLE):
- cpp_worker_command = build_cpp_worker_command(
- gcs_address,
- plasma_store_name,
- raylet_name,
- redis_username,
- redis_password,
- session_dir,
- log_dir,
- node_ip_address,
- setup_worker_path,
- )
- else:
- cpp_worker_command = []
- # Create the command that the Raylet will use to start workers.
- # TODO(architkulkarni): Pipe in setup worker args separately instead of
- # inserting them into start_worker_command and later erasing them if
- # needed.
- start_worker_command = (
- [
- sys.executable,
- setup_worker_path,
- ]
- + _site_flags() # Inherit "-S" and "-s" flags from current Python interpreter.
- + [
- worker_path,
- f"--node-ip-address={node_ip_address}",
- "--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
- f"--object-store-name={plasma_store_name}",
- f"--raylet-name={raylet_name}",
- f"--redis-address={redis_address}",
- f"--metrics-agent-port={metrics_agent_port}",
- f"--logging-rotate-bytes={max_bytes}",
- f"--logging-rotate-backup-count={backup_count}",
- f"--runtime-env-agent-port={runtime_env_agent_port}",
- f"--gcs-address={gcs_address}",
- f"--session-name={session_name}",
- f"--temp-dir={temp_dir}",
- f"--webui={webui}",
- f"--cluster-id={cluster_id}",
- ]
- )
- start_worker_command.append("RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER")
- if redis_username:
- start_worker_command += [f"--redis-username={redis_username}"]
- if redis_password:
- start_worker_command += [f"--redis-password={redis_password}"]
- # If the object manager port is None, then use 0 to cause the object
- # manager to choose its own port.
- if object_manager_port is None:
- object_manager_port = 0
- if min_worker_port is None:
- min_worker_port = 0
- if max_worker_port is None:
- max_worker_port = 0
- labels_json_str = ""
- if labels:
- labels_json_str = json.dumps(labels)
- dashboard_agent_command = [
- *_build_python_executable_command_memory_profileable(
- ray_constants.PROCESS_TYPE_DASHBOARD_AGENT, session_dir
- ),
- os.path.join(RAY_PATH, "dashboard", "agent.py"),
- f"--node-id={node_id}",
- f"--node-ip-address={node_ip_address}",
- f"--metrics-export-port={metrics_export_port}",
- f"--grpc-port={metrics_agent_port}",
- f"--listen-port={dashboard_agent_listen_port}",
- "--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
- f"--object-store-name={plasma_store_name}",
- f"--raylet-name={raylet_name}",
- f"--temp-dir={temp_dir}",
- f"--session-dir={session_dir}",
- f"--log-dir={log_dir}",
- f"--logging-rotate-bytes={max_bytes}",
- f"--logging-rotate-backup-count={backup_count}",
- f"--session-name={session_name}",
- f"--gcs-address={gcs_address}",
- f"--cluster-id-hex={cluster_id}",
- ]
- if dashboard_agent_stdout_filepath:
- dashboard_agent_command.append(
- f"--stdout-filepath={dashboard_agent_stdout_filepath}"
- )
- if dashboard_agent_stderr_filepath:
- dashboard_agent_command.append(
- f"--stderr-filepath={dashboard_agent_stderr_filepath}"
- )
- if (
- dashboard_agent_stdout_filepath is None
- and dashboard_agent_stderr_filepath is None
- ):
- # If not redirecting logging to files, unset log filename.
- # This will cause log records to go to stderr.
- dashboard_agent_command.append("--logging-filename=")
- # Use stderr log format with the component name as a message prefix.
- logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
- component=ray_constants.PROCESS_TYPE_DASHBOARD_AGENT
- )
- dashboard_agent_command.append(f"--logging-format={logging_format}")
- if ray._private.utils.get_dashboard_dependency_error() is not None:
- # If dependencies are not installed, it is the minimally packaged
- # ray. We should restrict the features within dashboard agent
- # that requires additional dependencies to be downloaded.
- dashboard_agent_command.append("--minimal")
- if is_head_node:
- dashboard_agent_command.append("--head")
- runtime_env_agent_command = [
- *_build_python_executable_command_memory_profileable(
- ray_constants.PROCESS_TYPE_RUNTIME_ENV_AGENT, session_dir
- ),
- os.path.join(RAY_PATH, "_private", "runtime_env", "agent", "main.py"),
- f"--node-id={node_id}",
- f"--node-ip-address={node_ip_address}",
- f"--runtime-env-agent-port={runtime_env_agent_port}",
- f"--session-dir={session_dir}",
- f"--gcs-address={gcs_address}",
- f"--cluster-id-hex={cluster_id}",
- f"--runtime-env-dir={resource_dir}",
- f"--logging-rotate-bytes={max_bytes}",
- f"--logging-rotate-backup-count={backup_count}",
- f"--log-dir={log_dir}",
- f"--temp-dir={temp_dir}",
- ]
- if runtime_env_agent_stdout_filepath:
- runtime_env_agent_command.append(
- f"--stdout-filepath={runtime_env_agent_stdout_filepath}"
- )
- if runtime_env_agent_stderr_filepath:
- runtime_env_agent_command.append(
- f"--stderr-filepath={runtime_env_agent_stderr_filepath}"
- )
- if (
- runtime_env_agent_stdout_filepath is None
- and runtime_env_agent_stderr_filepath is None
- ):
- # If not redirecting logging to files, unset log filename.
- # This will cause log records to go to stderr.
- runtime_env_agent_command.append("--logging-filename=")
- # Use stderr log format with the component name as a message prefix.
- logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
- component=ray_constants.PROCESS_TYPE_RUNTIME_ENV_AGENT
- )
- runtime_env_agent_command.append(f"--logging-format={logging_format}")
- command = [
- RAYLET_EXECUTABLE,
- f"--raylet_socket_name={raylet_name}",
- f"--store_socket_name={plasma_store_name}",
- f"--object_manager_port={object_manager_port}",
- f"--min_worker_port={min_worker_port}",
- f"--max_worker_port={max_worker_port}",
- f"--node_manager_port={node_manager_port}",
- f"--node_id={node_id}",
- f"--node_ip_address={node_ip_address}",
- f"--maximum_startup_concurrency={maximum_startup_concurrency}",
- f"--static_resource_list={resource_argument}",
- f"--python_worker_command={subprocess.list2cmdline(start_worker_command)}", # noqa
- f"--java_worker_command={subprocess.list2cmdline(java_worker_command)}", # noqa
- f"--cpp_worker_command={subprocess.list2cmdline(cpp_worker_command)}", # noqa
- f"--native_library_path={DEFAULT_NATIVE_LIBRARY_PATH}",
- f"--temp_dir={temp_dir}",
- f"--session_dir={session_dir}",
- f"--log_dir={log_dir}",
- f"--resource_dir={resource_dir}",
- f"--metrics-agent-port={metrics_agent_port}",
- f"--metrics_export_port={metrics_export_port}",
- f"--runtime_env_agent_port={runtime_env_agent_port}",
- f"--object_store_memory={object_store_memory}",
- f"--plasma_directory={plasma_directory}",
- f"--fallback_directory={fallback_directory}",
- f"--ray-debugger-external={1 if ray_debugger_external else 0}",
- f"--gcs-address={gcs_address}",
- f"--session-name={session_name}",
- f"--labels={labels_json_str}",
- f"--cluster-id={cluster_id}",
- ]
- if resource_isolation_config.is_enabled():
- logging.info(
- f"Resource isolation enabled with cgroup_path={resource_isolation_config.cgroup_path}, "
- f"system_reserved_cpu={resource_isolation_config.system_reserved_cpu_weight} "
- f"system_reserved_memory={resource_isolation_config.system_reserved_memory}."
- )
- command.append("--enable-resource-isolation")
- command.append(f"--cgroup-path={resource_isolation_config.cgroup_path}")
- command.append(
- f"--system-reserved-cpu-weight={resource_isolation_config.system_reserved_cpu_weight}"
- )
- command.append(
- f"--system-reserved-memory-bytes={resource_isolation_config.system_reserved_memory}"
- )
- command.append(f"--system-pids={resource_isolation_config.system_pids}")
- if raylet_stdout_filepath:
- command.append(f"--stdout_filepath={raylet_stdout_filepath}")
- if raylet_stderr_filepath:
- command.append(f"--stderr_filepath={raylet_stderr_filepath}")
- if is_head_node:
- command.append("--head")
- if worker_port_list is not None:
- command.append(f"--worker_port_list={worker_port_list}")
- command.append(
- "--num_prestart_python_workers={}".format(int(resource_and_label_spec.num_cpus))
- )
- command.append(
- "--dashboard_agent_command={}".format(
- subprocess.list2cmdline(dashboard_agent_command)
- )
- )
- command.append(
- "--runtime_env_agent_command={}".format(
- subprocess.list2cmdline(runtime_env_agent_command)
- )
- )
- if huge_pages:
- command.append("--huge_pages")
- if socket_to_use:
- socket_to_use.close()
- if node_name is not None:
- command.append(
- f"--node-name={node_name}",
- )
- stdout_file = None
- if raylet_stdout_filepath:
- stdout_file = open(os.devnull, "w")
- stderr_file = None
- if raylet_stderr_filepath:
- stderr_file = open(os.devnull, "w")
- process_info = start_ray_process(
- command,
- ray_constants.PROCESS_TYPE_RAYLET,
- use_valgrind=use_valgrind,
- use_gdb=False,
- use_valgrind_profiler=use_profiler,
- use_perftools_profiler=("RAYLET_PERFTOOLS_PATH" in os.environ),
- stdout_file=stdout_file,
- stderr_file=stderr_file,
- fate_share=fate_share,
- env_updates=env_updates,
- )
- return process_info
- def get_ray_jars_dir():
- """Return a directory where all ray-related jars and
- their dependencies locate."""
- current_dir = RAY_PATH
- jars_dir = os.path.abspath(os.path.join(current_dir, "jars"))
- if not os.path.exists(jars_dir):
- raise RuntimeError(
- "Ray jars is not packaged into ray. "
- "Please build ray with java enabled "
- "(set env var RAY_INSTALL_JAVA=1)"
- )
- return os.path.abspath(os.path.join(current_dir, "jars"))
- def build_java_worker_command(
- bootstrap_address: str,
- plasma_store_name: str,
- raylet_name: str,
- redis_username: str,
- redis_password: str,
- session_dir: str,
- node_ip_address: str,
- setup_worker_path: str,
- ):
- """This method assembles the command used to start a Java worker.
- Args:
- bootstrap_address: Bootstrap address of ray cluster.
- plasma_store_name: The name of the plasma store socket to connect
- to.
- raylet_name: The name of the raylet socket to create.
- redis_username: The username to connect to Redis.
- redis_password: The password to connect to Redis.
- session_dir: The path of this session.
- node_ip_address: The IP address for this node.
- setup_worker_path: The path of the Python file that will set up
- the environment for the worker process.
- Returns:
- The command string for starting Java worker.
- """
- pairs = []
- if bootstrap_address is not None:
- pairs.append(("ray.address", bootstrap_address))
- pairs.append(("ray.raylet.node-manager-port", "RAY_NODE_MANAGER_PORT_PLACEHOLDER"))
- if plasma_store_name is not None:
- pairs.append(("ray.object-store.socket-name", plasma_store_name))
- if raylet_name is not None:
- pairs.append(("ray.raylet.socket-name", raylet_name))
- if redis_username is not None:
- pairs.append(("ray.redis.username", redis_username))
- if redis_password is not None:
- pairs.append(("ray.redis.password", redis_password))
- if node_ip_address is not None:
- pairs.append(("ray.node-ip", node_ip_address))
- pairs.append(("ray.home", RAY_HOME))
- pairs.append(("ray.logging.dir", os.path.join(session_dir, "logs")))
- pairs.append(("ray.session-dir", session_dir))
- command = (
- [sys.executable]
- + [setup_worker_path]
- + ["-D{}={}".format(*pair) for pair in pairs]
- )
- command += ["RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER"]
- command += ["io.ray.runtime.runner.worker.DefaultWorker"]
- return command
- def build_cpp_worker_command(
- bootstrap_address: str,
- plasma_store_name: str,
- raylet_name: str,
- redis_username: str,
- redis_password: str,
- session_dir: str,
- log_dir: str,
- node_ip_address: str,
- setup_worker_path: str,
- ):
- """This method assembles the command used to start a CPP worker.
- Args:
- bootstrap_address: The bootstrap address of the cluster.
- plasma_store_name: The name of the plasma store socket to connect
- to.
- raylet_name: The name of the raylet socket to create.
- redis_username: The username to connect to Redis.
- redis_password: The password to connect to Redis.
- session_dir: The path of this session.
- log_dir: The path of logs.
- node_ip_address: The ip address for this node.
- setup_worker_path: The path of the Python file that will set up
- the environment for the worker process.
- Returns:
- The command string for starting CPP worker.
- """
- command = [
- sys.executable,
- setup_worker_path,
- DEFAULT_WORKER_EXECUTABLE,
- f"--ray_plasma_store_socket_name={plasma_store_name}",
- f"--ray_raylet_socket_name={raylet_name}",
- "--ray_node_manager_port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
- f"--ray_address={bootstrap_address}",
- f"--ray_redis_username={redis_username}",
- f"--ray_redis_password={redis_password}",
- f"--ray_session_dir={session_dir}",
- f"--ray_logs_dir={log_dir}",
- f"--ray_node_ip_address={node_ip_address}",
- "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER",
- ]
- return command
- def determine_plasma_store_config(
- object_store_memory: int,
- temp_dir: str,
- plasma_directory: Optional[str] = None,
- fallback_directory: Optional[str] = None,
- huge_pages: bool = False,
- ):
- """Figure out how to configure the plasma object store.
- This will determine:
- 1. which directory to use for the plasma store. On Linux,
- we will try to use /dev/shm unless the shared memory file system is too
- small, in which case we will fall back to /tmp. If any of the object store
- memory or plasma directory parameters are specified by the user, then those
- values will be preserved.
- 2. which directory to use for the fallback files. It will default to the temp_dir
- if it is not extracted from the object_spilling_config.
- Args:
- object_store_memory: The object store memory to use.
- plasma_directory: The user-specified plasma directory parameter.
- fallback_directory: The path extracted from the object_spilling_config when the
- object spilling config is set and the spilling type is to
- filesystem.
- huge_pages: The user-specified huge pages parameter.
- Returns:
- A tuple of plasma directory to use, the fallback directory to use, and the
- object store memory to use. If it is specified by the user, then that value will
- be preserved.
- """
- if not isinstance(object_store_memory, int):
- object_store_memory = int(object_store_memory)
- if huge_pages and not (sys.platform == "linux" or sys.platform == "linux2"):
- raise ValueError("The huge_pages argument is only supported on Linux.")
- system_memory = ray._common.utils.get_system_memory()
- # Determine which directory to use. By default, use /tmp on MacOS and
- # /dev/shm on Linux, unless the shared-memory file system is too small,
- # in which case we default to /tmp on Linux.
- if plasma_directory is None:
- if sys.platform == "linux" or sys.platform == "linux2":
- shm_avail = ray._private.utils.get_shared_memory_bytes()
- # Compare the requested memory size to the memory available in
- # /dev/shm.
- if shm_avail >= object_store_memory:
- plasma_directory = "/dev/shm"
- elif (
- not os.environ.get("RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE")
- and object_store_memory > ray_constants.REQUIRE_SHM_SIZE_THRESHOLD
- ):
- raise ValueError(
- "The configured object store size ({} GB) exceeds "
- "/dev/shm size ({} GB). This will harm performance. "
- "Consider deleting files in /dev/shm or increasing its "
- "size with "
- "--shm-size in Docker. To ignore this warning, "
- "set RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE=1.".format(
- object_store_memory / 1e9, shm_avail / 1e9
- )
- )
- else:
- plasma_directory = ray._common.utils.get_user_temp_dir()
- logger.warning(
- "WARNING: The object store is using {} instead of "
- "/dev/shm because /dev/shm has only {} bytes available. "
- "This will harm performance! You may be able to free up "
- "space by deleting files in /dev/shm. If you are inside a "
- "Docker container, you can increase /dev/shm size by "
- "passing '--shm-size={:.2f}gb' to 'docker run' (or add it "
- "to the run_options list in a Ray cluster config). Make "
- "sure to set this to more than 30% of available RAM.".format(
- ray._common.utils.get_user_temp_dir(),
- shm_avail,
- object_store_memory * (1.1) / (2**30),
- )
- )
- else:
- plasma_directory = ray._common.utils.get_user_temp_dir()
- # Do some sanity checks.
- if object_store_memory > system_memory:
- raise ValueError(
- "The requested object store memory size is greater "
- "than the total available memory."
- )
- else:
- plasma_directory = os.path.abspath(plasma_directory)
- logger.info("object_store_memory is not verified when plasma_directory is set.")
- if not os.path.isdir(plasma_directory):
- raise ValueError(
- f"The plasma directory file {plasma_directory} does not exist or is not a directory."
- )
- if huge_pages and plasma_directory is None:
- raise ValueError(
- "If huge_pages is True, then the "
- "plasma_directory argument must be provided."
- )
- if object_store_memory < ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES:
- raise ValueError(
- "Attempting to cap object store memory usage at {} "
- "bytes, but the minimum allowed is {} bytes.".format(
- object_store_memory, ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES
- )
- )
- if (
- sys.platform == "darwin"
- and object_store_memory > ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT
- and os.environ.get("RAY_ENABLE_MAC_LARGE_OBJECT_STORE") != "1"
- ):
- raise ValueError(
- "The configured object store size ({:.4}GiB) exceeds "
- "the optimal size on Mac ({:.4}GiB). "
- "This will harm performance! There is a known issue where "
- "Ray's performance degrades with object store size greater"
- " than {:.4}GB on a Mac."
- "To reduce the object store capacity, specify"
- "`object_store_memory` when calling ray.init() or ray start."
- "To ignore this warning, "
- "set RAY_ENABLE_MAC_LARGE_OBJECT_STORE=1.".format(
- object_store_memory / 2**30,
- ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT / 2**30,
- ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT / 2**30,
- )
- )
- if fallback_directory is None:
- fallback_directory = temp_dir
- else:
- fallback_directory = os.path.abspath(fallback_directory)
- if not os.path.isdir(fallback_directory):
- raise ValueError(
- f"The fallback directory file {fallback_directory} does not exist or is not a directory."
- )
- # Print the object store memory using two decimal places.
- logger.debug(
- "Determine to start the Plasma object store with {} GB memory "
- "using {} and fallback to {}".format(
- round(object_store_memory / 10**9, 2),
- plasma_directory,
- fallback_directory,
- )
- )
- return plasma_directory, fallback_directory, object_store_memory
- def start_monitor(
- gcs_address: str,
- logs_dir: str,
- stdout_filepath: Optional[str] = None,
- stderr_filepath: Optional[str] = None,
- autoscaling_config: Optional[str] = None,
- fate_share: Optional[bool] = None,
- max_bytes: int = 0,
- backup_count: int = 0,
- monitor_ip: Optional[str] = None,
- autoscaler_v2: bool = False,
- ):
- """Run a process to monitor the other processes.
- Args:
- gcs_address: The address of GCS server.
- logs_dir: The path to the log directory.
- stdout_filepath: The file path to dump monitor stdout.
- If None, stdout is not redirected.
- stderr_filepath: The file path to dump monitor stderr.
- If None, stderr is not redirected.
- autoscaling_config: path to autoscaling config file.
- max_bytes: Log rotation parameter. Corresponding to
- RotatingFileHandler's maxBytes.
- backup_count: Log rotation parameter. Corresponding to
- RotatingFileHandler's backupCount.
- monitor_ip: IP address of the machine that the monitor will be
- run on. Can be excluded, but required for autoscaler metrics.
- Returns:
- ProcessInfo for the process that was started.
- """
- if autoscaler_v2:
- entrypoint = os.path.join(RAY_PATH, AUTOSCALER_V2_DIR, "monitor.py")
- else:
- entrypoint = os.path.join(RAY_PATH, AUTOSCALER_PRIVATE_DIR, "monitor.py")
- command = [
- sys.executable,
- "-u",
- entrypoint,
- f"--logs-dir={logs_dir}",
- f"--logging-rotate-bytes={max_bytes}",
- f"--logging-rotate-backup-count={backup_count}",
- ]
- assert gcs_address is not None
- command.append(f"--gcs-address={gcs_address}")
- if stdout_filepath:
- command.append(f"--stdout-filepath={stdout_filepath}")
- if stderr_filepath:
- command.append(f"--stderr-filepath={stderr_filepath}")
- if stdout_filepath is None and stderr_filepath is None:
- # If not redirecting logging to files, unset log filename.
- # This will cause log records to go to stderr.
- command.append("--logging-filename=")
- # Use stderr log format with the component name as a message prefix.
- logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
- component=ray_constants.PROCESS_TYPE_MONITOR
- )
- command.append(f"--logging-format={logging_format}")
- if autoscaling_config:
- command.append("--autoscaling-config=" + str(autoscaling_config))
- if monitor_ip:
- command.append("--monitor-ip=" + monitor_ip)
- stdout_file = None
- if stdout_filepath:
- stdout_file = open(os.devnull, "w")
- stderr_file = None
- if stderr_filepath:
- stderr_file = open(os.devnull, "w")
- process_info = start_ray_process(
- command,
- ray_constants.PROCESS_TYPE_MONITOR,
- stdout_file=stdout_file,
- stderr_file=stderr_file,
- fate_share=fate_share,
- )
- return process_info
- def start_ray_client_server(
- address: str,
- ray_client_server_ip: str,
- ray_client_server_port: int,
- stdout_file: Optional[int] = None,
- stderr_file: Optional[int] = None,
- redis_username: Optional[str] = None,
- redis_password: Optional[str] = None,
- fate_share: Optional[bool] = None,
- runtime_env_agent_address: Optional[str] = None,
- node_id: Optional[str] = None,
- server_type: str = "proxy",
- serialized_runtime_env_context: Optional[str] = None,
- ):
- """Run the server process of the Ray client.
- Args:
- address: The address of the cluster.
- ray_client_server_ip: Host IP the Ray client server listens on.
- ray_client_server_port: Port the Ray client server listens on.
- stdout_file: A file handle opened for writing to redirect stdout to. If
- no redirection should happen, then this should be None.
- stderr_file: A file handle opened for writing to redirect stderr to. If
- no redirection should happen, then this should be None.
- redis_username: The username of the Redis server.
- redis_password: The password of the Redis server.
- runtime_env_agent_address: Address to the Runtime Env Agent listens on via HTTP.
- Only needed when server_type == "proxy".
- node_id: The hex ID of this node.
- server_type: Whether to start the proxy version of Ray Client.
- serialized_runtime_env_context (str|None): If specified, the serialized
- runtime_env_context to start the client server in.
- Returns:
- ProcessInfo for the process that was started.
- """
- root_ray_dir = Path(__file__).resolve().parents[1]
- setup_worker_path = os.path.join(
- root_ray_dir, "_private", "workers", ray_constants.SETUP_WORKER_FILENAME
- )
- ray_client_server_host = ray_client_server_ip
- command = [
- sys.executable,
- setup_worker_path,
- "-m",
- "ray.util.client.server",
- f"--address={address}",
- f"--host={ray_client_server_host}",
- f"--port={ray_client_server_port}",
- f"--mode={server_type}",
- f"--language={Language.Name(Language.PYTHON)}",
- ]
- if redis_username:
- command.append(f"--redis-username={redis_username}")
- if redis_password:
- command.append(f"--redis-password={redis_password}")
- if serialized_runtime_env_context:
- command.append(
- f"--serialized-runtime-env-context={serialized_runtime_env_context}" # noqa: E501
- )
- if server_type == "proxy":
- assert len(runtime_env_agent_address) > 0
- if runtime_env_agent_address:
- command.append(f"--runtime-env-agent-address={runtime_env_agent_address}")
- if node_id:
- command.append(f"--node-id={node_id}")
- process_info = start_ray_process(
- command,
- ray_constants.PROCESS_TYPE_RAY_CLIENT_SERVER,
- stdout_file=stdout_file,
- stderr_file=stderr_file,
- fate_share=fate_share,
- )
- return process_info
- def _is_raylet_process(cmdline: Optional[List[str]]) -> bool:
- """Check if the command line belongs to a raylet process.
- Args:
- cmdline: List of command line arguments or None
- Returns:
- bool: True if this is a raylet process, False otherwise
- """
- if cmdline is None or len(cmdline) == 0:
- return False
- executable = os.path.basename(cmdline[0])
- return "raylet" in executable
|