services.py 93 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433
  1. import base64
  2. import collections
  3. import errno
  4. import io
  5. import json
  6. import logging
  7. import mmap
  8. import multiprocessing
  9. import os
  10. import shutil
  11. import signal
  12. import socket
  13. import subprocess
  14. import sys
  15. import time
  16. from pathlib import Path
  17. from typing import IO, AnyStr, List, Optional
  18. # Ray modules
  19. import ray
  20. import ray._private.ray_constants as ray_constants
  21. from ray._common.network_utils import (
  22. build_address,
  23. get_localhost_ip,
  24. is_ipv6,
  25. node_ip_address_from_perspective,
  26. parse_address,
  27. )
  28. from ray._private.resource_isolation_config import ResourceIsolationConfig
  29. from ray._raylet import GcsClient, GcsClientOptions, NodeID
  30. from ray.core.generated.common_pb2 import Language
  31. from ray.core.generated.gcs_pb2 import GcsNodeInfo
  32. from ray.core.generated.gcs_service_pb2 import GetAllNodeInfoRequest
  33. # Import psutil after ray so the packaged version is used.
  34. import psutil
  35. resource = None
  36. if sys.platform != "win32":
  37. _timeout = 30
  38. else:
  39. _timeout = 60
  40. EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
  41. # True if processes are run in the valgrind profiler.
  42. RUN_RAYLET_PROFILER = False
  43. # Location of the redis server.
  44. RAY_HOME = os.path.join(os.path.dirname(os.path.dirname(__file__)), "..", "..")
  45. RAY_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
  46. RAY_PRIVATE_DIR = "_private"
  47. AUTOSCALER_PRIVATE_DIR = os.path.join("autoscaler", "_private")
  48. AUTOSCALER_V2_DIR = os.path.join("autoscaler", "v2")
  49. # Location of the raylet executables.
  50. RAYLET_EXECUTABLE = os.path.join(
  51. RAY_PATH, "core", "src", "ray", "raylet", "raylet" + EXE_SUFFIX
  52. )
  53. GCS_SERVER_EXECUTABLE = os.path.join(
  54. RAY_PATH, "core", "src", "ray", "gcs", "gcs_server" + EXE_SUFFIX
  55. )
  56. JEMALLOC_SO = os.path.join(RAY_PATH, "core", "libjemalloc.so")
  57. JEMALLOC_SO = JEMALLOC_SO if os.path.exists(JEMALLOC_SO) else None
  58. # Location of the cpp default worker executables.
  59. DEFAULT_WORKER_EXECUTABLE = os.path.join(RAY_PATH, "cpp", "default_worker" + EXE_SUFFIX)
  60. # Location of the native libraries.
  61. DEFAULT_NATIVE_LIBRARY_PATH = os.path.join(RAY_PATH, "cpp", "lib")
  62. DASHBOARD_DEPENDENCY_ERROR_MESSAGE = (
  63. "Not all Ray Dashboard dependencies were "
  64. "found. To use the dashboard please "
  65. "install Ray using `pip install "
  66. "ray[default]`."
  67. )
  68. RAY_JEMALLOC_LIB_PATH = "RAY_JEMALLOC_LIB_PATH"
  69. RAY_JEMALLOC_CONF = "RAY_JEMALLOC_CONF"
  70. RAY_JEMALLOC_PROFILE = "RAY_JEMALLOC_PROFILE"
  71. # Comma separated name of components that will run memory profiler.
  72. # Ray uses `memray` to memory profile internal components.
  73. # The name of the component must be one of ray_constants.PROCESS_TYPE*.
  74. RAY_MEMRAY_PROFILE_COMPONENT_ENV = "RAY_INTERNAL_MEM_PROFILE_COMPONENTS"
  75. # Options to specify for `memray run` command. See
  76. # `memray run --help` for more details.
  77. # Example:
  78. # RAY_INTERNAL_MEM_PROFILE_OPTIONS="--live,--live-port,3456,-q,"
  79. # -> `memray run --live --live-port 3456 -q`
  80. RAY_MEMRAY_PROFILE_OPTIONS_ENV = "RAY_INTERNAL_MEM_PROFILE_OPTIONS"
  81. # Logger for this module. It should be configured at the entry point
  82. # into the program using Ray. Ray provides a default configuration at
  83. # entry/init points.
  84. logger = logging.getLogger(__name__)
  85. ProcessInfo = collections.namedtuple(
  86. "ProcessInfo",
  87. [
  88. "process",
  89. "stdout_file",
  90. "stderr_file",
  91. "use_valgrind",
  92. "use_gdb",
  93. "use_valgrind_profiler",
  94. "use_perftools_profiler",
  95. "use_tmux",
  96. ],
  97. )
  98. def _site_flags() -> List[str]:
  99. """Detect whether flags related to site packages are enabled for the current
  100. interpreter. To run Ray in hermetic build environments, it helps to pass these flags
  101. down to Python workers.
  102. """
  103. flags = []
  104. # sys.flags hidden behind helper methods for unit testing.
  105. if _no_site():
  106. flags.append("-S")
  107. if _no_user_site():
  108. flags.append("-s")
  109. return flags
  110. # sys.flags hidden behind helper methods for unit testing.
  111. def _no_site():
  112. return sys.flags.no_site
  113. # sys.flags hidden behind helper methods for unit testing.
  114. def _no_user_site():
  115. return sys.flags.no_user_site
  116. def _build_python_executable_command_memory_profileable(
  117. component: str, session_dir: str, unbuffered: bool = True
  118. ):
  119. """Build the Python executable command.
  120. It runs a memory profiler if env var is configured.
  121. Args:
  122. component: Name of the component. It must be one of
  123. ray_constants.PROCESS_TYPE*.
  124. session_dir: The directory name of the Ray session.
  125. unbuffered: If true, Python executable is started with unbuffered option.
  126. e.g., `-u`.
  127. It means the logs are flushed immediately (good when there's a failure),
  128. but writing to a log file can be slower.
  129. """
  130. command = [
  131. sys.executable,
  132. ]
  133. if unbuffered:
  134. command.append("-u")
  135. components_to_memory_profile = os.getenv(RAY_MEMRAY_PROFILE_COMPONENT_ENV, "")
  136. if not components_to_memory_profile:
  137. return command
  138. components_to_memory_profile = set(components_to_memory_profile.split(","))
  139. try:
  140. import memray # noqa: F401
  141. except ImportError:
  142. raise ImportError(
  143. "Memray is required to memory profiler on components "
  144. f"{components_to_memory_profile}. Run `pip install memray`."
  145. )
  146. if component in components_to_memory_profile:
  147. session_dir = Path(session_dir)
  148. session_name = session_dir.name
  149. profile_dir = session_dir / "profile"
  150. profile_dir.mkdir(exist_ok=True)
  151. output_file_path = profile_dir / f"{session_name}_memory_{component}.bin"
  152. options = os.getenv(RAY_MEMRAY_PROFILE_OPTIONS_ENV, None)
  153. options = options.split(",") if options else []
  154. # If neither --live nor any output option (-o/--output) is specified, add the default output path
  155. if not any(opt in options for opt in ("--live", "-o", "--output")):
  156. options[0:0] = ["-o", str(output_file_path)]
  157. command.extend(["-m", "memray", "run", *options])
  158. return command
  159. def _get_gcs_client_options(gcs_server_address):
  160. return GcsClientOptions.create(
  161. gcs_server_address,
  162. None,
  163. allow_cluster_id_nil=True,
  164. fetch_cluster_id_if_nil=False,
  165. )
  166. def serialize_config(config):
  167. return base64.b64encode(json.dumps(config).encode("utf-8")).decode("utf-8")
  168. def propagate_jemalloc_env_var(
  169. *,
  170. jemalloc_path: str,
  171. jemalloc_conf: str,
  172. jemalloc_comps: List[str],
  173. process_type: str,
  174. ):
  175. """Read the jemalloc memory profiling related
  176. env var and return the dictionary that translates
  177. them to proper jemalloc related env vars.
  178. For example, if users specify `RAY_JEMALLOC_LIB_PATH`,
  179. it is translated into `LD_PRELOAD` which is needed to
  180. run Jemalloc as a shared library.
  181. Params:
  182. jemalloc_path: The path to the jemalloc shared library.
  183. jemalloc_conf: `,` separated string of jemalloc config.
  184. jemalloc_comps: The list of Ray components
  185. that we will profile.
  186. process_type: The process type that needs jemalloc
  187. env var for memory profiling. If it doesn't match one of
  188. jemalloc_comps, the function will return an empty dict.
  189. Returns:
  190. dictionary of {env_var: value}
  191. that are needed to jemalloc profiling. The caller can
  192. call `dict.update(return_value_of_this_func)` to
  193. update the dict of env vars. If the process_type doesn't
  194. match jemalloc_comps, it will return an empty dict.
  195. """
  196. assert isinstance(jemalloc_comps, list)
  197. assert process_type is not None
  198. process_type = process_type.lower()
  199. if not jemalloc_path:
  200. return {}
  201. env_vars = {
  202. "LD_PRELOAD": jemalloc_path,
  203. "RAY_LD_PRELOAD_ON_WORKERS": os.environ.get("RAY_LD_PRELOAD_ON_WORKERS", "0"),
  204. }
  205. if process_type in jemalloc_comps and jemalloc_conf:
  206. env_vars.update({"MALLOC_CONF": jemalloc_conf})
  207. return env_vars
  208. class ConsolePopen(subprocess.Popen):
  209. if sys.platform == "win32":
  210. def terminate(self):
  211. if isinstance(self.stdin, io.IOBase):
  212. self.stdin.close()
  213. if self._use_signals:
  214. self.send_signal(signal.CTRL_BREAK_EVENT)
  215. else:
  216. super(ConsolePopen, self).terminate()
  217. def __init__(self, *args, **kwargs):
  218. # CREATE_NEW_PROCESS_GROUP is used to send Ctrl+C on Windows:
  219. # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal
  220. new_pgroup = subprocess.CREATE_NEW_PROCESS_GROUP
  221. flags_to_add = 0
  222. if ray._private.utils.detect_fate_sharing_support():
  223. # If we don't have kernel-mode fate-sharing, then don't do this
  224. # because our children need to be in out process group for
  225. # the process reaper to properly terminate them.
  226. flags_to_add = new_pgroup
  227. flags_key = "creationflags"
  228. if flags_to_add:
  229. kwargs[flags_key] = (kwargs.get(flags_key) or 0) | flags_to_add
  230. self._use_signals = kwargs[flags_key] & new_pgroup
  231. super(ConsolePopen, self).__init__(*args, **kwargs)
  232. def _find_address_from_flag(flag: str):
  233. """
  234. Attempts to find all valid Ray addresses on this node, specified by the
  235. flag.
  236. Params:
  237. flag: `--redis-address` or `--gcs-address`
  238. Returns:
  239. Set of detected addresses.
  240. """
  241. # Using Redis address `--redis-address` as an example:
  242. # Currently, this extracts the deprecated --redis-address from the command
  243. # that launched the raylet running on this node, if any. Anyone looking to
  244. # edit this function should be warned that these commands look like, for
  245. # example:
  246. # /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet
  247. # --redis_address=123.456.78.910 --node_ip_address=123.456.78.910
  248. # --raylet_socket_name=... --store_socket_name=... --object_manager_port=0
  249. # --min_worker_port=10000 --max_worker_port=19999
  250. # --node_manager_port=58578 --redis_port=6379
  251. # --maximum_startup_concurrency=8
  252. # --static_resource_list=node:123.456.78.910,1.0,object_store_memory,66
  253. # --config_list=plasma_store_as_thread,True
  254. # --python_worker_command=/usr/bin/python
  255. # /usr/local/lib/python3.8/dist-packages/ray/workers/default_worker.py
  256. # --redis-address=123.456.78.910:6379
  257. # --node-ip-address=123.456.78.910 --node-manager-port=58578
  258. # --object-store-name=... --raylet-name=...
  259. # --temp-dir=/tmp/ray
  260. # --metrics-agent-port=41856 --redis-password=[MASKED]
  261. # --java_worker_command= --cpp_worker_command=
  262. # --redis_password=[MASKED] --temp_dir=/tmp/ray --session_dir=...
  263. # --metrics-agent-port=41856 --metrics_export_port=64229
  264. # --dashboard_agent_command=/usr/bin/python
  265. # -u /usr/local/lib/python3.8/dist-packages/ray/dashboard/agent.py
  266. # --redis-address=123.456.78.910:6379 --metrics-export-port=64229
  267. # --dashboard-agent-port=41856 --node-manager-port=58578
  268. # --object-store-name=... --raylet-name=... --temp-dir=/tmp/ray
  269. # --log-dir=/tmp/ray/session_2020-11-08_14-29-07_199128_278000/logs
  270. # --redis-password=[MASKED] --object_store_memory=5037192806
  271. # --plasma_directory=/tmp
  272. # Longer arguments are elided with ... but all arguments from this instance
  273. # are included, to provide a sense of what is in these.
  274. # Indeed, we had to pull --redis-address to the front of each call to make
  275. # this readable.
  276. # As you can see, this is very long and complex, which is why we can't
  277. # simply extract all the arguments using regular expressions and
  278. # present a dict as if we never lost track of these arguments, for
  279. # example. Picking out --redis-address below looks like it might grab the
  280. # wrong thing, but double-checking that we're finding the correct process
  281. # by checking that the contents look like we expect would probably be prone
  282. # to choking in unexpected ways.
  283. # Notice that --redis-address appears twice. This is not a copy-paste
  284. # error; this is the reason why the for loop below attempts to pick out
  285. # every appearance of --redis-address.
  286. # The --redis-address here is what is now called the --address, but it
  287. # appears in the default_worker.py and agent.py calls as --redis-address.
  288. addresses = set()
  289. for proc in psutil.process_iter(["cmdline"]):
  290. try:
  291. # HACK: Workaround for UNIX idiosyncrasy
  292. # Normally, cmdline() is supposed to return the argument list.
  293. # But it in some cases (such as when setproctitle is called),
  294. # an arbitrary string resembling a command-line is stored in
  295. # the first argument.
  296. # Explanation: https://unix.stackexchange.com/a/432681
  297. # More info: https://github.com/giampaolo/psutil/issues/1179
  298. cmdline = proc.info["cmdline"]
  299. # NOTE(kfstorm): To support Windows, we can't use
  300. # `os.path.basename(cmdline[0]) == "raylet"` here.
  301. if _is_raylet_process(cmdline):
  302. for arglist in cmdline:
  303. # Given we're merely seeking --redis-address, we just split
  304. # every argument on spaces for now.
  305. for arg in arglist.split(" "):
  306. # TODO(ekl): Find a robust solution for locating Redis.
  307. if arg.startswith(flag):
  308. proc_addr = arg.split("=")[1]
  309. # TODO(mwtian): remove this workaround after Ray
  310. # no longer sets --redis-address to None.
  311. if proc_addr != "" and proc_addr != "None":
  312. addresses.add(proc_addr)
  313. except psutil.AccessDenied:
  314. pass
  315. except psutil.NoSuchProcess:
  316. pass
  317. return addresses
  318. def find_node_ids():
  319. """Finds any local raylet processes and returns the node id."""
  320. return _find_address_from_flag("--node_id")
  321. def find_gcs_addresses():
  322. """Finds any local GCS processes based on grepping ps."""
  323. return _find_address_from_flag("--gcs-address")
  324. def find_bootstrap_address(temp_dir: Optional[str]):
  325. """Finds the latest Ray cluster address to connect to, if any. This is the
  326. GCS address connected to by the last successful `ray start`."""
  327. return ray._private.utils.read_ray_address(temp_dir)
  328. def get_ray_address_from_environment(addr: str, temp_dir: Optional[str]):
  329. """Attempts to find the address of Ray cluster to use, in this order:
  330. 1. Use RAY_ADDRESS if defined and nonempty.
  331. 2. If no address is provided or the provided address is "auto", use the
  332. address in /tmp/ray/ray_current_cluster if available. This will error if
  333. the specified address is None and there is no address found. For "auto",
  334. we will fallback to connecting to any detected Ray cluster (legacy).
  335. 3. Otherwise, use the provided address.
  336. Returns:
  337. A string to pass into `ray.init(address=...)`, e.g. ip:port, `auto`.
  338. """
  339. env_addr = os.environ.get(ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE)
  340. if env_addr is not None and env_addr != "":
  341. addr = env_addr
  342. if addr is not None and addr != "auto":
  343. return addr
  344. # We should try to automatically find an active local instance.
  345. gcs_addrs = find_gcs_addresses()
  346. bootstrap_addr = find_bootstrap_address(temp_dir)
  347. if len(gcs_addrs) > 1 and bootstrap_addr is not None:
  348. logger.warning(
  349. f"Found multiple active Ray instances: {gcs_addrs}. "
  350. f"Connecting to latest cluster at {bootstrap_addr}. "
  351. "You can override this by setting the `--address` flag "
  352. "or `RAY_ADDRESS` environment variable."
  353. )
  354. elif len(gcs_addrs) > 0 and addr == "auto":
  355. # Preserve legacy "auto" behavior of connecting to any cluster, even if not
  356. # started with ray start. However if addr is None, we will raise an error.
  357. bootstrap_addr = list(gcs_addrs).pop()
  358. if bootstrap_addr is None:
  359. if addr is None:
  360. # Caller should start a new instance.
  361. return None
  362. else:
  363. raise ConnectionError(
  364. "Could not find any running Ray instance. "
  365. "Please specify the one to connect to by setting `--address` flag "
  366. "or `RAY_ADDRESS` environment variable."
  367. )
  368. return bootstrap_addr
  369. def wait_for_node(
  370. gcs_address: str,
  371. node_plasma_store_socket_name: str,
  372. timeout: int = _timeout,
  373. ):
  374. """Wait until this node has appeared in the client table.
  375. NOTE: Makes an RPC to the GCS up to every 0.1 seconds to
  376. get all node info. Use only for testing.
  377. Args:
  378. gcs_address: The gcs address
  379. node_plasma_store_socket_name: The
  380. plasma_store_socket_name for the given node which we wait for.
  381. timeout: The amount of time in seconds to wait before raising an
  382. exception.
  383. Raises:
  384. TimeoutError: An exception is raised if the timeout expires before
  385. the node appears in the client table.
  386. """
  387. gcs_options = GcsClientOptions.create(
  388. gcs_address, None, allow_cluster_id_nil=True, fetch_cluster_id_if_nil=False
  389. )
  390. global_state = ray._private.state.GlobalState()
  391. global_state._initialize_global_state(gcs_options)
  392. start_time = time.time()
  393. while time.time() - start_time < timeout:
  394. clients = global_state.node_table()
  395. object_store_socket_names = [
  396. client["ObjectStoreSocketName"] for client in clients
  397. ]
  398. if node_plasma_store_socket_name in object_store_socket_names:
  399. return
  400. else:
  401. time.sleep(0.1)
  402. raise TimeoutError(
  403. f"Timed out after {timeout} seconds while waiting for node to startup. "
  404. f"Did not find socket name {node_plasma_store_socket_name} in the list "
  405. "of object store socket names."
  406. )
  407. def get_node_to_connect_for_driver(
  408. gcs_client: GcsClient,
  409. node_ip_address: str = None,
  410. node_name: str = None,
  411. temp_dir: str = None,
  412. ) -> GcsNodeInfo:
  413. """
  414. Get the node to connect to for the driver.
  415. If node_ip_address, node_name, and/or temp_dir are provided, they will be used to filter the nodes to connect to.
  416. If node_ip_address, node_name, and/or temp_dir are not provided, or if multiple node matches the filters,
  417. the following logic will be applied to resolve the node to connect to:
  418. 1. If there are multiple nodes on the same host, this function will prioritize the head node if available.
  419. 2. If there is no head node, it will return an arbitrary node it finds.
  420. Args:
  421. gcs_client: The GCS client.
  422. node_ip_address: The IP address of the node to connect to. If not provided,
  423. it will be resolved to a ray node on the same host.
  424. node_name: The name of the node to connect to. If not provided, it will be resolved to a ray node on the same host.
  425. temp_dir: The temp directory of the node to connect to. If not provided, it will be resolved to a ray node on the same host.
  426. Returns:
  427. The node info of the node to connect to.
  428. """
  429. node_to_connect_info = None
  430. possible_node_ids = find_node_ids()
  431. node_selectors = []
  432. for id in possible_node_ids:
  433. id_node_selector = GetAllNodeInfoRequest.NodeSelector(
  434. node_id=NodeID.from_hex(id).binary()
  435. )
  436. node_selectors.append(id_node_selector)
  437. try:
  438. node_to_connect_infos = gcs_client.get_all_node_info(
  439. timeout=ray_constants.GCS_SERVER_REQUEST_TIMEOUT_SECONDS,
  440. node_selectors=node_selectors,
  441. state_filter=GcsNodeInfo.GcsNodeState.ALIVE,
  442. ).values()
  443. except Exception as e:
  444. raise RuntimeError(
  445. f"Failed to get node info for possible node ids: {possible_node_ids}"
  446. f" when trying to resolve node to connect to. Error: {repr(e)}"
  447. )
  448. if not node_to_connect_infos:
  449. raise RuntimeError(
  450. f"No node info found matching node ids: {possible_node_ids}"
  451. f" when trying to resolve node to connect to."
  452. )
  453. filtered_node_to_connect_infos = []
  454. for node_info in node_to_connect_infos:
  455. if (
  456. node_ip_address is None or node_info.node_manager_address == node_ip_address
  457. ) and (node_name is None or node_info.node_name == node_name):
  458. filtered_node_to_connect_infos.append(node_info)
  459. if not filtered_node_to_connect_infos:
  460. attrs = [node_ip_address, node_name, temp_dir]
  461. attrs_str = ", ".join(f"{attr}" for attr in attrs if attr is not None)
  462. raise RuntimeError(
  463. f"No node info found matching attributes: '{attrs_str}' when trying to resolve node to connect to."
  464. )
  465. # Prioritize head node if available
  466. for node_info in filtered_node_to_connect_infos:
  467. if node_info.is_head_node:
  468. node_to_connect_info = node_info
  469. break
  470. if node_to_connect_info is None:
  471. node_to_connect_info = next(iter(filtered_node_to_connect_infos))
  472. return node_to_connect_info
  473. def get_node(gcs_address, node_id):
  474. """
  475. Get the node information from the global state accessor.
  476. """
  477. global_state = ray._private.state.GlobalState()
  478. gcs_options = _get_gcs_client_options(gcs_address)
  479. global_state._initialize_global_state(gcs_options)
  480. return global_state.get_node(node_id)
  481. def get_node_with_retry(
  482. gcs_address: str,
  483. node_id: str,
  484. timeout_s: float = 30,
  485. retry_interval_s: float = 1,
  486. ) -> dict:
  487. """Get node info from GCS with retry logic.
  488. Keeps retrying until the node is found or timeout is reached.
  489. Some Ray processes (e.g., ray_client_server) start in parallel
  490. with the raylet. When they query GCS for node info, the raylet may not have
  491. registered yet. This function retries until the node info is available.
  492. Args:
  493. gcs_address: The address of the GCS server (e.g., "ip:port").
  494. node_id: The hex string ID of the node to find.
  495. timeout_s: Total timeout in seconds. Default 30s.
  496. retry_interval_s: Interval between retries in seconds. Default 1s.
  497. Returns:
  498. A dictionary containing node info.
  499. Raises:
  500. RuntimeError: If the node is not found within the timeout.
  501. """
  502. end_time = time.time() + timeout_s
  503. while True:
  504. try:
  505. node_info = get_node(gcs_address, node_id)
  506. if node_info is not None:
  507. return node_info
  508. except RuntimeError:
  509. # This is expected if the node hasn't registered with GCS yet.
  510. pass
  511. if time.time() >= end_time:
  512. raise RuntimeError(
  513. f"Timed out waiting for node info for node_id={node_id}."
  514. )
  515. time.sleep(retry_interval_s)
  516. def get_webui_url_from_internal_kv():
  517. assert ray.experimental.internal_kv._internal_kv_initialized()
  518. webui_url = ray.experimental.internal_kv._internal_kv_get(
  519. "webui:url", namespace=ray_constants.KV_NAMESPACE_DASHBOARD
  520. )
  521. return ray._common.utils.decode(webui_url) if webui_url is not None else None
  522. def remaining_processes_alive():
  523. """See if the remaining processes are alive or not.
  524. Note that this ignores processes that have been explicitly killed,
  525. e.g., via a command like node.kill_raylet().
  526. Returns:
  527. True if the remaining processes started by ray.init() are alive and
  528. False otherwise.
  529. Raises:
  530. Exception: An exception is raised if the processes were not started by
  531. ray.init().
  532. """
  533. if ray._private.worker._global_node is None:
  534. raise RuntimeError(
  535. "This process is not in a position to determine "
  536. "whether all processes are alive or not."
  537. )
  538. return ray._private.worker._global_node.remaining_processes_alive()
  539. def canonicalize_bootstrap_address(
  540. addr: str, temp_dir: Optional[str] = None
  541. ) -> Optional[str]:
  542. """Canonicalizes Ray cluster bootstrap address to host:port.
  543. Reads address from the environment if needed.
  544. This function should be used to process user supplied Ray cluster address,
  545. via ray.init() or `--address` flags, before using the address to connect.
  546. Returns:
  547. Ray cluster address string in <host:port> format or None if the caller
  548. should start a local Ray instance.
  549. """
  550. if addr is None or addr == "auto":
  551. addr = get_ray_address_from_environment(addr, temp_dir)
  552. if addr is None or addr == "local":
  553. return None
  554. parsed = parse_address(addr)
  555. if parsed is None:
  556. raise ValueError(f"Invalid address format: {addr}")
  557. host, port = parsed
  558. try:
  559. bootstrap_host = resolve_ip_for_localhost(host)
  560. except Exception:
  561. logger.exception(f"Failed to convert {addr} to host:port")
  562. raise
  563. return build_address(bootstrap_host, port)
  564. def canonicalize_bootstrap_address_or_die(
  565. addr: str, temp_dir: Optional[str] = None
  566. ) -> str:
  567. """Canonicalizes Ray cluster bootstrap address to host:port.
  568. This function should be used when the caller expects there to be an active
  569. and local Ray instance. If no address is provided or address="auto", this
  570. will autodetect the latest Ray instance created with `ray start`.
  571. For convenience, if no address can be autodetected, this function will also
  572. look for any running local GCS processes, based on pgrep output. This is to
  573. allow easier use of Ray CLIs when debugging a local Ray instance (whose GCS
  574. addresses are not recorded).
  575. Returns:
  576. Ray cluster address string in <host:port> format. Throws a
  577. ConnectionError if zero or multiple active Ray instances are
  578. autodetected.
  579. """
  580. bootstrap_addr = canonicalize_bootstrap_address(addr, temp_dir=temp_dir)
  581. if bootstrap_addr is not None:
  582. return bootstrap_addr
  583. running_gcs_addresses = find_gcs_addresses()
  584. if len(running_gcs_addresses) == 0:
  585. raise ConnectionError(
  586. "Could not find any running Ray instance. "
  587. "Please specify the one to connect to by setting the `--address` "
  588. "flag or `RAY_ADDRESS` environment variable."
  589. )
  590. if len(running_gcs_addresses) > 1:
  591. raise ConnectionError(
  592. f"Found multiple active Ray instances: {running_gcs_addresses}. "
  593. "Please specify the one to connect to by setting the `--address` "
  594. "flag or `RAY_ADDRESS` environment variable."
  595. )
  596. return running_gcs_addresses.pop()
  597. def extract_ip_port(bootstrap_address: str):
  598. ip_port = parse_address(bootstrap_address)
  599. if ip_port is None:
  600. raise ValueError(
  601. f"Malformed address {bootstrap_address}. " f"Expected '<host>:<port>'."
  602. )
  603. ip, port = ip_port
  604. try:
  605. port = int(port)
  606. except ValueError:
  607. raise ValueError(f"Malformed address port {port}. Must be an integer.")
  608. if port < 1024 or port > 65535:
  609. raise ValueError(
  610. f"Invalid address port {port}. Must be between 1024 "
  611. "and 65535 (inclusive)."
  612. )
  613. return ip, port
  614. def resolve_ip_for_localhost(host: str):
  615. """Convert to a remotely reachable IP if the host is "localhost",
  616. "127.0.0.1", or "::1". Otherwise do nothing.
  617. Args:
  618. host: The hostname or IP address.
  619. Returns:
  620. The same host but with the local host replaced by remotely
  621. reachable IP.
  622. """
  623. if not host:
  624. raise ValueError(f"Malformed host: {host}")
  625. if host == "127.0.0.1" or host == "::1" or host == "localhost":
  626. # Make sure localhost isn't resolved to the loopback ip
  627. return get_node_ip_address()
  628. else:
  629. return host
  630. # NOTE: This API should not be used when you obtain the
  631. # IP address when ray.init is not called because
  632. # it cannot find the IP address if it is specified by
  633. # ray start --node-ip-address. You should instead use
  634. # get_node_to_connect_ip_address.
  635. def get_node_ip_address(address=None):
  636. if ray._private.worker._global_node is not None:
  637. return ray._private.worker._global_node.node_ip_address
  638. if not ray_constants.ENABLE_RAY_CLUSTER:
  639. # Use loopback IP as the local IP address to prevent bothersome
  640. # firewall popups on OSX and Windows.
  641. # https://github.com/ray-project/ray/issues/18730.
  642. return get_localhost_ip()
  643. return node_ip_address_from_perspective(address)
  644. def get_node_instance_id():
  645. """Get the specified node instance id of the current node.
  646. Returns:
  647. The node instance id of the current node.
  648. """
  649. return os.getenv("RAY_CLOUD_INSTANCE_ID", "")
  650. def create_redis_client(redis_address, password=None, username=None):
  651. """Create a Redis client.
  652. Args:
  653. redis_address: The IP address and port of the Redis server.
  654. password: The password for Redis authentication.
  655. username: The username for Redis authentication.
  656. Returns:
  657. A Redis client.
  658. """
  659. import redis
  660. if not hasattr(create_redis_client, "instances"):
  661. create_redis_client.instances = {}
  662. num_retries = ray_constants.START_REDIS_WAIT_RETRIES
  663. delay = 0.001
  664. for i in range(num_retries):
  665. cli = create_redis_client.instances.get(redis_address)
  666. if cli is None:
  667. redis_ip_address, redis_port = extract_ip_port(
  668. canonicalize_bootstrap_address_or_die(redis_address)
  669. )
  670. cli = redis.StrictRedis(
  671. host=redis_ip_address,
  672. port=int(redis_port),
  673. username=username,
  674. password=password,
  675. )
  676. create_redis_client.instances[redis_address] = cli
  677. try:
  678. cli.ping()
  679. return cli
  680. except Exception as e:
  681. create_redis_client.instances.pop(redis_address)
  682. if i >= num_retries - 1:
  683. raise RuntimeError(
  684. f"Unable to connect to Redis at {redis_address}: {e}"
  685. )
  686. # Wait a little bit.
  687. time.sleep(delay)
  688. # Make sure the retry interval doesn't increase too large.
  689. delay = min(1, delay * 2)
  690. def start_ray_process(
  691. command: List[str],
  692. process_type: str,
  693. fate_share: bool,
  694. env_updates: Optional[dict] = None,
  695. cwd: Optional[str] = None,
  696. use_valgrind: bool = False,
  697. use_gdb: bool = False,
  698. use_valgrind_profiler: bool = False,
  699. use_perftools_profiler: bool = False,
  700. use_tmux: bool = False,
  701. stdout_file: Optional[IO[AnyStr]] = None,
  702. stderr_file: Optional[IO[AnyStr]] = None,
  703. pipe_stdin: bool = False,
  704. ):
  705. """Start one of the Ray processes.
  706. TODO(rkn): We need to figure out how these commands interact. For example,
  707. it may only make sense to start a process in gdb if we also start it in
  708. tmux. Similarly, certain combinations probably don't make sense, like
  709. simultaneously running the process in valgrind and the profiler.
  710. Args:
  711. command: The command to use to start the Ray process.
  712. process_type: The type of the process that is being started
  713. (e.g., "raylet").
  714. fate_share: If true, the child will be killed if its parent (us) dies.
  715. True must only be passed after detection of this functionality.
  716. env_updates: A dictionary of additional environment variables to
  717. run the command with (in addition to the caller's environment
  718. variables).
  719. cwd: The directory to run the process in.
  720. use_valgrind: True if we should start the process in valgrind.
  721. use_gdb: True if we should start the process in gdb.
  722. use_valgrind_profiler: True if we should start the process in
  723. the valgrind profiler.
  724. use_perftools_profiler: True if we should profile the process
  725. using perftools.
  726. use_tmux: True if we should start the process in tmux.
  727. stdout_file: A file handle opened for writing to redirect stdout to. If
  728. no redirection should happen, then this should be None.
  729. stderr_file: A file handle opened for writing to redirect stderr to. If
  730. no redirection should happen, then this should be None.
  731. pipe_stdin: If true, subprocess.PIPE will be passed to the process as
  732. stdin.
  733. Returns:
  734. Information about the process that was started including a handle to
  735. the process that was started.
  736. """
  737. # Detect which flags are set through environment variables.
  738. valgrind_env_var = f"RAY_{process_type.upper()}_VALGRIND"
  739. if os.environ.get(valgrind_env_var) == "1":
  740. logger.info("Detected environment variable '%s'.", valgrind_env_var)
  741. use_valgrind = True
  742. valgrind_profiler_env_var = f"RAY_{process_type.upper()}_VALGRIND_PROFILER"
  743. if os.environ.get(valgrind_profiler_env_var) == "1":
  744. logger.info("Detected environment variable '%s'.", valgrind_profiler_env_var)
  745. use_valgrind_profiler = True
  746. perftools_profiler_env_var = f"RAY_{process_type.upper()}_PERFTOOLS_PROFILER"
  747. if os.environ.get(perftools_profiler_env_var) == "1":
  748. logger.info("Detected environment variable '%s'.", perftools_profiler_env_var)
  749. use_perftools_profiler = True
  750. tmux_env_var = f"RAY_{process_type.upper()}_TMUX"
  751. if os.environ.get(tmux_env_var) == "1":
  752. logger.info("Detected environment variable '%s'.", tmux_env_var)
  753. use_tmux = True
  754. gdb_env_var = f"RAY_{process_type.upper()}_GDB"
  755. if os.environ.get(gdb_env_var) == "1":
  756. logger.info("Detected environment variable '%s'.", gdb_env_var)
  757. use_gdb = True
  758. # Jemalloc memory profiling.
  759. if os.environ.get("LD_PRELOAD") is None:
  760. jemalloc_lib_path = os.environ.get(RAY_JEMALLOC_LIB_PATH, JEMALLOC_SO)
  761. jemalloc_conf = os.environ.get(RAY_JEMALLOC_CONF, "")
  762. jemalloc_comps = os.environ.get(RAY_JEMALLOC_PROFILE)
  763. jemalloc_comps = [] if not jemalloc_comps else jemalloc_comps.split(",")
  764. jemalloc_env_vars = propagate_jemalloc_env_var(
  765. jemalloc_path=jemalloc_lib_path,
  766. jemalloc_conf=jemalloc_conf,
  767. jemalloc_comps=jemalloc_comps,
  768. process_type=process_type,
  769. )
  770. else:
  771. jemalloc_env_vars = {}
  772. use_jemalloc_mem_profiler = "MALLOC_CONF" in jemalloc_env_vars
  773. if (
  774. sum(
  775. [
  776. use_gdb,
  777. use_valgrind,
  778. use_valgrind_profiler,
  779. use_perftools_profiler,
  780. use_jemalloc_mem_profiler,
  781. ]
  782. )
  783. > 1
  784. ):
  785. raise ValueError(
  786. "At most one of the 'use_gdb', 'use_valgrind', "
  787. "'use_valgrind_profiler', 'use_perftools_profiler', "
  788. "and 'use_jemalloc_mem_profiler' flags can "
  789. "be used at a time."
  790. )
  791. if env_updates is None:
  792. env_updates = {}
  793. if not isinstance(env_updates, dict):
  794. raise ValueError("The 'env_updates' argument must be a dictionary.")
  795. modified_env = os.environ.copy()
  796. modified_env.update(env_updates)
  797. if use_gdb:
  798. if not use_tmux:
  799. raise ValueError(
  800. "If 'use_gdb' is true, then 'use_tmux' must be true as well."
  801. )
  802. # TODO(suquark): Any better temp file creation here?
  803. gdb_init_path = os.path.join(
  804. ray._common.utils.get_ray_temp_dir(),
  805. f"gdb_init_{process_type}_{time.time()}",
  806. )
  807. ray_process_path = command[0]
  808. ray_process_args = command[1:]
  809. run_args = " ".join(["'{}'".format(arg) for arg in ray_process_args])
  810. with open(gdb_init_path, "w") as gdb_init_file:
  811. gdb_init_file.write(f"run {run_args}")
  812. command = ["gdb", ray_process_path, "-x", gdb_init_path]
  813. if use_valgrind:
  814. command = [
  815. "valgrind",
  816. "--track-origins=yes",
  817. "--leak-check=full",
  818. "--show-leak-kinds=all",
  819. "--leak-check-heuristics=stdstring",
  820. "--error-exitcode=1",
  821. ] + command
  822. if use_valgrind_profiler:
  823. command = ["valgrind", "--tool=callgrind"] + command
  824. if use_perftools_profiler:
  825. modified_env["LD_PRELOAD"] = os.environ["PERFTOOLS_PATH"]
  826. modified_env["CPUPROFILE"] = os.environ["PERFTOOLS_LOGFILE"]
  827. modified_env.update(jemalloc_env_vars)
  828. if use_tmux:
  829. # The command has to be created exactly as below to ensure that it
  830. # works on all versions of tmux. (Tested with tmux 1.8-5, travis'
  831. # version, and tmux 2.1)
  832. command = ["tmux", "new-session", "-d", f"{' '.join(command)}"]
  833. if fate_share:
  834. assert ray._private.utils.detect_fate_sharing_support(), (
  835. "kernel-level fate-sharing must only be specified if "
  836. "detect_fate_sharing_support() has returned True"
  837. )
  838. def preexec_fn():
  839. import signal
  840. signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
  841. if fate_share and sys.platform.startswith("linux"):
  842. ray._private.utils.set_kill_on_parent_death_linux()
  843. win32_fate_sharing = fate_share and sys.platform == "win32"
  844. # With Windows fate-sharing, we need special care:
  845. # The process must be added to the job before it is allowed to execute.
  846. # Otherwise, there's a race condition: the process might spawn children
  847. # before the process itself is assigned to the job.
  848. # After that point, its children will not be added to the job anymore.
  849. CREATE_SUSPENDED = 0x00000004 # from Windows headers
  850. if sys.platform == "win32":
  851. # CreateProcess, which underlies Popen, is limited to
  852. # 32,767 characters, including the Unicode terminating null
  853. # character
  854. total_chrs = sum([len(x) for x in command])
  855. if total_chrs > 31766:
  856. raise ValueError(
  857. f"command is limited to a total of 31767 characters, "
  858. f"got {total_chrs}"
  859. )
  860. process = ConsolePopen(
  861. command,
  862. env=modified_env,
  863. cwd=cwd,
  864. stdout=stdout_file,
  865. stderr=stderr_file,
  866. stdin=subprocess.PIPE if pipe_stdin else None,
  867. preexec_fn=preexec_fn if sys.platform != "win32" else None,
  868. creationflags=CREATE_SUSPENDED if win32_fate_sharing else 0,
  869. )
  870. if win32_fate_sharing:
  871. try:
  872. ray._private.utils.set_kill_child_on_death_win32(process)
  873. psutil.Process(process.pid).resume()
  874. except (psutil.Error, OSError):
  875. process.kill()
  876. raise
  877. def _get_stream_name(stream):
  878. if stream is not None:
  879. try:
  880. return stream.name
  881. except AttributeError:
  882. return str(stream)
  883. return None
  884. return ProcessInfo(
  885. process=process,
  886. stdout_file=_get_stream_name(stdout_file),
  887. stderr_file=_get_stream_name(stderr_file),
  888. use_valgrind=use_valgrind,
  889. use_gdb=use_gdb,
  890. use_valgrind_profiler=use_valgrind_profiler,
  891. use_perftools_profiler=use_perftools_profiler,
  892. use_tmux=use_tmux,
  893. )
  894. def start_reaper(fate_share=None):
  895. """Start the reaper process.
  896. This is a lightweight process that simply
  897. waits for its parent process to die and then terminates its own
  898. process group. This allows us to ensure that ray processes are always
  899. terminated properly so long as that process itself isn't SIGKILLed.
  900. Returns:
  901. ProcessInfo for the process that was started.
  902. """
  903. # Make ourselves a process group leader so that the reaper can clean
  904. # up other ray processes without killing the process group of the
  905. # process that started us.
  906. try:
  907. if sys.platform != "win32":
  908. os.setpgrp()
  909. except OSError as e:
  910. errcode = e.errno
  911. if errcode == errno.EPERM and os.getpgrp() == os.getpid():
  912. # Nothing to do; we're already a session leader.
  913. pass
  914. else:
  915. logger.warning(
  916. f"setpgrp failed, processes may not be cleaned up properly: {e}."
  917. )
  918. # Don't start the reaper in this case as it could result in killing
  919. # other user processes.
  920. return None
  921. reaper_filepath = os.path.join(RAY_PATH, RAY_PRIVATE_DIR, "ray_process_reaper.py")
  922. command = [sys.executable, "-u", reaper_filepath]
  923. process_info = start_ray_process(
  924. command,
  925. ray_constants.PROCESS_TYPE_REAPER,
  926. pipe_stdin=True,
  927. fate_share=fate_share,
  928. )
  929. return process_info
  930. def start_log_monitor(
  931. session_dir: str,
  932. logs_dir: str,
  933. gcs_address: str,
  934. node_ip_address: str,
  935. fate_share: Optional[bool] = None,
  936. max_bytes: int = 0,
  937. backup_count: int = 0,
  938. stdout_filepath: Optional[str] = None,
  939. stderr_filepath: Optional[str] = None,
  940. ):
  941. """Start a log monitor process.
  942. Args:
  943. session_dir: The session directory.
  944. logs_dir: The directory of logging files.
  945. gcs_address: GCS address for pubsub.
  946. node_ip_address: The IP address of the node we are connected to.
  947. fate_share: Whether to share fate between log_monitor
  948. and this process.
  949. max_bytes: Log rotation parameter. Corresponding to
  950. RotatingFileHandler's maxBytes.
  951. backup_count: Log rotation parameter. Corresponding to
  952. RotatingFileHandler's backupCount.
  953. stdout_filepath: The file path to dump log monitor stdout.
  954. If None, stdout is not redirected.
  955. stderr_filepath: The file path to dump log monitor stderr.
  956. If None, stderr is not redirected.
  957. Returns:
  958. ProcessInfo for the process that was started.
  959. """
  960. log_monitor_filepath = os.path.join(RAY_PATH, RAY_PRIVATE_DIR, "log_monitor.py")
  961. command = [
  962. sys.executable,
  963. "-u",
  964. log_monitor_filepath,
  965. f"--session-dir={session_dir}",
  966. f"--logs-dir={logs_dir}",
  967. f"--gcs-address={gcs_address}",
  968. f"--node-ip-address={node_ip_address}",
  969. f"--logging-rotate-bytes={max_bytes}",
  970. f"--logging-rotate-backup-count={backup_count}",
  971. ]
  972. if stdout_filepath:
  973. command.append(f"--stdout-filepath={stdout_filepath}")
  974. if stderr_filepath:
  975. command.append(f"--stderr-filepath={stderr_filepath}")
  976. if stdout_filepath is None and stderr_filepath is None:
  977. # If not redirecting logging to files, unset log filename.
  978. # This will cause log records to go to stderr.
  979. command.append("--logging-filename=")
  980. # Use stderr log format with the component name as a message prefix.
  981. logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
  982. component=ray_constants.PROCESS_TYPE_LOG_MONITOR
  983. )
  984. command.append(f"--logging-format={logging_format}")
  985. stdout_file = None
  986. if stdout_filepath:
  987. stdout_file = open(os.devnull, "w")
  988. stderr_file = None
  989. if stderr_filepath:
  990. stderr_file = open(os.devnull, "w")
  991. process_info = start_ray_process(
  992. command,
  993. ray_constants.PROCESS_TYPE_LOG_MONITOR,
  994. stdout_file=stdout_file,
  995. stderr_file=stderr_file,
  996. fate_share=fate_share,
  997. )
  998. return process_info
  999. def start_api_server(
  1000. include_dashboard: Optional[bool],
  1001. raise_on_failure: bool,
  1002. host: str,
  1003. gcs_address: str,
  1004. cluster_id_hex: str,
  1005. node_ip_address: str,
  1006. temp_dir: str,
  1007. logdir: str,
  1008. session_dir: str,
  1009. port: Optional[int] = None,
  1010. fate_share: Optional[bool] = None,
  1011. max_bytes: int = 0,
  1012. backup_count: int = 0,
  1013. stdout_filepath: Optional[str] = None,
  1014. stderr_filepath: Optional[str] = None,
  1015. ):
  1016. """Start a API server process.
  1017. Args:
  1018. include_dashboard: If true, this will load all dashboard-related modules
  1019. when starting the API server, or fail. If None, it will load all
  1020. dashboard-related modules conditioned on dependencies being present.
  1021. Otherwise, it will only start the modules that are not relevant to
  1022. the dashboard.
  1023. raise_on_failure: If true, this will raise an exception
  1024. if we fail to start the API server. Otherwise it will print
  1025. a warning if we fail to start the API server.
  1026. host: The host to bind the dashboard web server to.
  1027. gcs_address: The gcs address the dashboard should connect to
  1028. cluster_id_hex: Cluster ID in hex.
  1029. node_ip_address: The IP address where this is running.
  1030. temp_dir: The temporary directory used for log files and
  1031. information for this Ray session.
  1032. session_dir: The session directory under temp_dir.
  1033. It is used as a identifier of individual cluster.
  1034. logdir: The log directory used to generate dashboard log.
  1035. port: The port to bind the dashboard web server to.
  1036. Defaults to 8265.
  1037. max_bytes: Log rotation parameter. Corresponding to
  1038. RotatingFileHandler's maxBytes.
  1039. backup_count: Log rotation parameter. Corresponding to
  1040. RotatingFileHandler's backupCount.
  1041. stdout_filepath: The file path to dump dashboard stdout.
  1042. If None, stdout is not redirected.
  1043. stderr_filepath: The file path to dump dashboard stderr.
  1044. If None, stderr is not redirected.
  1045. Returns:
  1046. A tuple of :
  1047. - Dashboard URL if dashboard enabled and started.
  1048. - ProcessInfo for the process that was started.
  1049. """
  1050. try:
  1051. # Make sure port is available.
  1052. if port is None:
  1053. port_retries = 50
  1054. port = ray_constants.DEFAULT_DASHBOARD_PORT
  1055. else:
  1056. port_retries = 0
  1057. port_test_socket = socket.socket(
  1058. socket.AF_INET6 if is_ipv6(host) else socket.AF_INET,
  1059. socket.SOCK_STREAM,
  1060. )
  1061. port_test_socket.setsockopt(
  1062. socket.SOL_SOCKET,
  1063. socket.SO_REUSEADDR,
  1064. 1,
  1065. )
  1066. try:
  1067. port_test_socket.bind((host, port))
  1068. port_test_socket.close()
  1069. except socket.error as e:
  1070. # 10013 on windows is a bit more broad than just
  1071. # "address in use": it can also indicate "permission denied".
  1072. # TODO: improve the error message?
  1073. if e.errno in {48, 98, 10013}: # address already in use.
  1074. raise ValueError(
  1075. f"Failed to bind to {host}:{port} because it's "
  1076. "already occupied. You can use `ray start "
  1077. "--dashboard-port ...` or `ray.init(dashboard_port=..."
  1078. ")` to select a different port."
  1079. )
  1080. else:
  1081. raise e
  1082. # Make sure the process can start.
  1083. dashboard_dependency_error = ray._private.utils.get_dashboard_dependency_error()
  1084. # Explicitly check here that when the user explicitly specifies
  1085. # dashboard inclusion, the install is not minimal.
  1086. if include_dashboard and dashboard_dependency_error:
  1087. logger.error(
  1088. f"Ray dashboard dependencies failed to install properly: {dashboard_dependency_error}.\n"
  1089. "Potential causes include:\n"
  1090. "1. --include-dashboard is not supported when minimal ray is used. "
  1091. "Download ray[default] to use the dashboard.\n"
  1092. "2. Dashboard dependencies are conflicting with your python environment. "
  1093. "Investigate your python environment and try reinstalling ray[default].\n"
  1094. )
  1095. raise Exception("Cannot include dashboard with missing packages.")
  1096. include_dash: bool = True if include_dashboard is None else include_dashboard
  1097. # Start the dashboard process.
  1098. dashboard_dir = "dashboard"
  1099. dashboard_filepath = os.path.join(RAY_PATH, dashboard_dir, "dashboard.py")
  1100. command = [
  1101. *_build_python_executable_command_memory_profileable(
  1102. ray_constants.PROCESS_TYPE_DASHBOARD,
  1103. session_dir,
  1104. unbuffered=False,
  1105. ),
  1106. dashboard_filepath,
  1107. f"--host={host}",
  1108. f"--port={port}",
  1109. f"--port-retries={port_retries}",
  1110. f"--temp-dir={temp_dir}",
  1111. f"--log-dir={logdir}",
  1112. f"--session-dir={session_dir}",
  1113. f"--logging-rotate-bytes={max_bytes}",
  1114. f"--logging-rotate-backup-count={backup_count}",
  1115. f"--gcs-address={gcs_address}",
  1116. f"--cluster-id-hex={cluster_id_hex}",
  1117. f"--node-ip-address={node_ip_address}",
  1118. ]
  1119. if stdout_filepath:
  1120. command.append(f"--stdout-filepath={stdout_filepath}")
  1121. if stderr_filepath:
  1122. command.append(f"--stderr-filepath={stderr_filepath}")
  1123. if stdout_filepath is None and stderr_filepath is None:
  1124. # If not redirecting logging to files, unset log filename.
  1125. # This will cause log records to go to stderr.
  1126. command.append("--logging-filename=")
  1127. # Use stderr log format with the component name as a message prefix.
  1128. logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
  1129. component=ray_constants.PROCESS_TYPE_DASHBOARD
  1130. )
  1131. command.append(f"--logging-format={logging_format}")
  1132. if dashboard_dependency_error is not None:
  1133. command.append("--minimal")
  1134. if not include_dash:
  1135. # If dashboard is not included, load modules
  1136. # that are irrelevant to the dashboard.
  1137. # TODO(sang): Modules like job or state APIs should be
  1138. # loaded although dashboard is disabled. Fix it.
  1139. command.append("--modules-to-load=UsageStatsHead")
  1140. command.append("--disable-frontend")
  1141. stdout_file = None
  1142. if stdout_filepath:
  1143. stdout_file = open(os.devnull, "w")
  1144. stderr_file = None
  1145. if stderr_filepath:
  1146. stderr_file = open(os.devnull, "w")
  1147. process_info = start_ray_process(
  1148. command,
  1149. ray_constants.PROCESS_TYPE_DASHBOARD,
  1150. stdout_file=stdout_file,
  1151. stderr_file=stderr_file,
  1152. fate_share=fate_share,
  1153. )
  1154. # Retrieve the dashboard url
  1155. gcs_client = GcsClient(address=gcs_address, cluster_id=cluster_id_hex)
  1156. ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
  1157. dashboard_url = None
  1158. dashboard_returncode = None
  1159. start_time_s = time.time()
  1160. while (
  1161. time.time() - start_time_s < ray_constants.RAY_DASHBOARD_STARTUP_TIMEOUT_S
  1162. ):
  1163. dashboard_url = ray.experimental.internal_kv._internal_kv_get(
  1164. ray_constants.DASHBOARD_ADDRESS,
  1165. namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
  1166. )
  1167. if dashboard_url is not None:
  1168. dashboard_url = dashboard_url.decode("utf-8")
  1169. break
  1170. dashboard_returncode = process_info.process.poll()
  1171. if dashboard_returncode is not None:
  1172. break
  1173. # This is often on the critical path of ray.init() and ray start,
  1174. # so we need to poll often.
  1175. time.sleep(0.1)
  1176. # Dashboard couldn't be started.
  1177. if dashboard_url is None:
  1178. returncode_str = (
  1179. f", return code {dashboard_returncode}"
  1180. if dashboard_returncode is not None
  1181. else ""
  1182. )
  1183. logger.error(f"Failed to start the dashboard {returncode_str}")
  1184. def read_log(filename, lines_to_read):
  1185. """Read a log file and return the last 20 lines."""
  1186. dashboard_log = os.path.join(logdir, filename)
  1187. # Read last n lines of dashboard log. The log file may be large.
  1188. lines_to_read = 20
  1189. lines = []
  1190. with open(dashboard_log, "rb") as f:
  1191. with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
  1192. end = mm.size()
  1193. for _ in range(lines_to_read):
  1194. sep = mm.rfind(b"\n", 0, end - 1)
  1195. if sep == -1:
  1196. break
  1197. lines.append(mm[sep + 1 : end].decode("utf-8"))
  1198. end = sep
  1199. lines.append(
  1200. f"The last {lines_to_read} lines of {dashboard_log} "
  1201. "(it contains the error message from the dashboard): "
  1202. )
  1203. return lines
  1204. if logdir:
  1205. lines_to_read = 20
  1206. logger.error(
  1207. "Error should be written to 'dashboard.log' or "
  1208. "'dashboard.err'. We are printing the last "
  1209. f"{lines_to_read} lines for you. See "
  1210. "'https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#logging-directory-structure' " # noqa
  1211. "to find where the log file is."
  1212. )
  1213. try:
  1214. lines = read_log("dashboard.log", lines_to_read=lines_to_read)
  1215. except Exception as e:
  1216. logger.error(
  1217. f"Couldn't read dashboard.log file. Error: {e}. "
  1218. "It means the dashboard is broken even before it "
  1219. "initializes the logger (mostly dependency issues). "
  1220. "Reading the dashboard.err file which contains stdout/stderr."
  1221. )
  1222. # If we cannot read the .log file, we fallback to .err file.
  1223. # This is the case where dashboard couldn't be started at all
  1224. # and couldn't even initialize the logger to write logs to .log
  1225. # file.
  1226. try:
  1227. lines = read_log("dashboard.err", lines_to_read=lines_to_read)
  1228. except Exception as e:
  1229. raise Exception(
  1230. f"Failed to read dashboard.err file: {e}. "
  1231. "It is unexpected. Please report an issue to "
  1232. "Ray github. "
  1233. "https://github.com/ray-project/ray/issues"
  1234. )
  1235. last_log_str = "\n" + "\n".join(reversed(lines[-lines_to_read:]))
  1236. raise Exception(last_log_str)
  1237. else:
  1238. # Is it reachable?
  1239. raise Exception("Failed to start a dashboard.")
  1240. if dashboard_dependency_error is not None or not include_dash:
  1241. # If it is the minimal installation, the web url (dashboard url)
  1242. # shouldn't be configured because it doesn't start a server.
  1243. dashboard_url = ""
  1244. return dashboard_url, process_info
  1245. except Exception as e:
  1246. if raise_on_failure:
  1247. raise e from e
  1248. else:
  1249. logger.error(e)
  1250. return None, None
  1251. def get_address(redis_address):
  1252. parts = redis_address.split("://", 1)
  1253. enable_redis_ssl = False
  1254. if len(parts) == 1:
  1255. redis_ip_address, redis_port = parse_address(parts[0])
  1256. else:
  1257. # rediss for SSL
  1258. if len(parts) != 2 or parts[0] not in ("redis", "rediss"):
  1259. raise ValueError(
  1260. f"Invalid redis address {redis_address}."
  1261. "Expected format is ip:port or redis://ip:port, "
  1262. "or rediss://ip:port for SSL."
  1263. )
  1264. redis_ip_address, redis_port = parse_address(parts[1])
  1265. if parts[0] == "rediss":
  1266. enable_redis_ssl = True
  1267. return redis_ip_address, redis_port, enable_redis_ssl
  1268. def start_gcs_server(
  1269. redis_address: str,
  1270. log_dir: str,
  1271. stdout_filepath: Optional[str],
  1272. stderr_filepath: Optional[str],
  1273. session_name: str,
  1274. redis_username: Optional[str] = None,
  1275. redis_password: Optional[str] = None,
  1276. config: Optional[dict] = None,
  1277. fate_share: Optional[bool] = None,
  1278. gcs_server_port: Optional[int] = None,
  1279. metrics_agent_port: Optional[int] = None,
  1280. node_ip_address: Optional[str] = None,
  1281. session_dir: Optional[str] = None,
  1282. node_id: Optional[str] = None,
  1283. ):
  1284. """Start a gcs server.
  1285. Args:
  1286. redis_address: The address that the Redis server is listening on.
  1287. log_dir: The path of the dir where gcs log files are created.
  1288. stdout_filepath: The file path to dump gcs server stdout.
  1289. If None, stdout is not redirected.
  1290. stderr_filepath: The file path to dump gcs server stderr.
  1291. If None, stderr is not redirected.
  1292. session_name: The current Ray session name.
  1293. redis_username: The username of the Redis server.
  1294. redis_password: The password of the Redis server.
  1295. config: Optional configuration that will
  1296. override defaults in RayConfig.
  1297. gcs_server_port: Port number of the gcs server.
  1298. metrics_agent_port: The port where metrics agent is bound to.
  1299. node_ip_address: IP Address of a node where gcs server starts.
  1300. session_dir: Session directory path. Used to write the bound GCS port to a file.
  1301. node_id: The unique ID of this node.
  1302. Returns:
  1303. ProcessInfo for the process that was started.
  1304. """
  1305. assert gcs_server_port >= 0
  1306. command = [
  1307. GCS_SERVER_EXECUTABLE,
  1308. f"--log_dir={log_dir}",
  1309. f"--config_list={serialize_config(config)}",
  1310. f"--gcs_server_port={gcs_server_port}",
  1311. f"--metrics-agent-port={metrics_agent_port}",
  1312. f"--node-ip-address={node_ip_address}",
  1313. f"--session-name={session_name}",
  1314. f"--ray-commit={ray.__commit__}",
  1315. f"--session-dir={session_dir}",
  1316. f"--node-id={node_id}",
  1317. ]
  1318. if stdout_filepath:
  1319. command += [f"--stdout_filepath={stdout_filepath}"]
  1320. if stderr_filepath:
  1321. command += [f"--stderr_filepath={stderr_filepath}"]
  1322. if redis_address:
  1323. redis_ip_address, redis_port, enable_redis_ssl = get_address(redis_address)
  1324. command += [
  1325. f"--redis_address={redis_ip_address}",
  1326. f"--redis_port={redis_port}",
  1327. f"--redis_enable_ssl={'true' if enable_redis_ssl else 'false'}",
  1328. ]
  1329. if redis_username:
  1330. command += [f"--redis_username={redis_username}"]
  1331. if redis_password:
  1332. command += [f"--redis_password={redis_password}"]
  1333. stdout_file = None
  1334. if stdout_filepath:
  1335. stdout_file = open(os.devnull, "w")
  1336. stderr_file = None
  1337. if stderr_filepath:
  1338. stderr_file = open(os.devnull, "w")
  1339. process_info = start_ray_process(
  1340. command,
  1341. ray_constants.PROCESS_TYPE_GCS_SERVER,
  1342. stdout_file=stdout_file,
  1343. stderr_file=stderr_file,
  1344. fate_share=fate_share,
  1345. )
  1346. return process_info
  1347. def start_raylet(
  1348. redis_address: str,
  1349. gcs_address: str,
  1350. node_id: str,
  1351. node_ip_address: str,
  1352. node_manager_port: int,
  1353. raylet_name: str,
  1354. plasma_store_name: str,
  1355. cluster_id: str,
  1356. worker_path: str,
  1357. setup_worker_path: str,
  1358. temp_dir: str,
  1359. session_dir: str,
  1360. resource_dir: str,
  1361. log_dir: str,
  1362. resource_and_label_spec,
  1363. plasma_directory: str,
  1364. fallback_directory: str,
  1365. object_store_memory: int,
  1366. session_name: str,
  1367. is_head_node: bool,
  1368. resource_isolation_config: ResourceIsolationConfig,
  1369. min_worker_port: Optional[int] = None,
  1370. max_worker_port: Optional[int] = None,
  1371. worker_port_list: Optional[List[int]] = None,
  1372. object_manager_port: Optional[int] = None,
  1373. redis_username: Optional[str] = None,
  1374. redis_password: Optional[str] = None,
  1375. metrics_agent_port: Optional[int] = None,
  1376. metrics_export_port: Optional[int] = None,
  1377. dashboard_agent_listen_port: Optional[int] = None,
  1378. runtime_env_agent_port: Optional[int] = None,
  1379. use_valgrind: bool = False,
  1380. use_profiler: bool = False,
  1381. raylet_stdout_filepath: Optional[str] = None,
  1382. raylet_stderr_filepath: Optional[str] = None,
  1383. dashboard_agent_stdout_filepath: Optional[str] = None,
  1384. dashboard_agent_stderr_filepath: Optional[str] = None,
  1385. runtime_env_agent_stdout_filepath: Optional[str] = None,
  1386. runtime_env_agent_stderr_filepath: Optional[str] = None,
  1387. huge_pages: bool = False,
  1388. fate_share: Optional[bool] = None,
  1389. socket_to_use: Optional[int] = None,
  1390. max_bytes: int = 0,
  1391. backup_count: int = 0,
  1392. ray_debugger_external: bool = False,
  1393. env_updates: Optional[dict] = None,
  1394. node_name: Optional[str] = None,
  1395. webui: Optional[str] = None,
  1396. ):
  1397. """Start a raylet, which is a combined local scheduler and object manager.
  1398. Args:
  1399. redis_address: The address of the primary Redis server.
  1400. gcs_address: The address of GCS server.
  1401. node_id: The hex ID of this node.
  1402. node_ip_address: The IP address of this node.
  1403. node_manager_port: The port to use for the node manager. If it's
  1404. 0, a random port will be used.
  1405. raylet_name: The name of the raylet socket to create.
  1406. plasma_store_name: The name of the plasma store socket to connect
  1407. to.
  1408. worker_path: The path of the Python file that new worker
  1409. processes will execute.
  1410. setup_worker_path: The path of the Python file that will set up
  1411. the environment for the worker process.
  1412. temp_dir: The path of the temporary directory Ray will use.
  1413. session_dir: The path of this session.
  1414. resource_dir: The path of resource of this session .
  1415. log_dir: The path of the dir where log files are created.
  1416. resource_and_label_spec: Resources and key-value labels for this raylet.
  1417. plasma_directory: A directory where the Plasma memory mapped files will
  1418. be created.
  1419. fallback_directory: A directory where the Object store fallback files will be created.
  1420. object_store_memory: The amount of memory (in bytes) to start the
  1421. object store with.
  1422. session_name: The current Ray session name.
  1423. resource_isolation_config: Resource isolation configuration for reserving
  1424. memory and cpu resources for ray system processes through cgroupv2
  1425. is_head_node: whether this node is the head node.
  1426. min_worker_port: The lowest port number that workers will bind
  1427. on. If not set, random ports will be chosen.
  1428. max_worker_port: The highest port number that workers will bind
  1429. on. If set, min_worker_port must also be set.
  1430. worker_port_list: An explicit list of ports to be used for
  1431. workers (comma-separated). Overrides min_worker_port and
  1432. max_worker_port.
  1433. object_manager_port: The port to use for the object manager. If this is
  1434. None, then the object manager will choose its own port.
  1435. redis_username: The username to use when connecting to Redis.
  1436. redis_password: The password to use when connecting to Redis.
  1437. metrics_agent_port: The port where metrics agent is bound to.
  1438. metrics_export_port: The port at which metrics are exposed to.
  1439. dashboard_agent_listen_port: The port at which the dashboard agent
  1440. listens to for HTTP.
  1441. runtime_env_agent_port: The port at which the runtime env agent
  1442. listens to for HTTP.
  1443. use_valgrind: True if the raylet should be started inside
  1444. of valgrind. If this is True, use_profiler must be False.
  1445. use_profiler: True if the raylet should be started inside
  1446. a profiler. If this is True, use_valgrind must be False.
  1447. raylet_stdout_filepath: The file path to dump raylet stdout.
  1448. If None, stdout is not redirected.
  1449. raylet_stderr_filepath: The file path to dump raylet stderr.
  1450. If None, stderr is not redirected.
  1451. dashboard_agent_stdout_filepath: The file path to dump
  1452. dashboard agent stdout. If None, stdout is not redirected.
  1453. dashboard_agent_stderr_filepath: The file path to dump
  1454. dashboard agent stderr. If None, stderr is not redirected.
  1455. runtime_env_agent_stdout_filepath: The file path to dump
  1456. runtime env agent stdout. If None, stdout is not redirected.
  1457. runtime_env_agent_stderr_filepath: The file path to dump
  1458. runtime env agent stderr. If None, stderr is not redirected.
  1459. huge_pages: Boolean flag indicating whether to start the Object
  1460. Store with hugetlbfs support. Requires plasma_directory.
  1461. fate_share: Whether to share fate between raylet and this process.
  1462. max_bytes: Log rotation parameter. Corresponding to
  1463. RotatingFileHandler's maxBytes.
  1464. backup_count: Log rotation parameter. Corresponding to
  1465. RotatingFileHandler's backupCount.
  1466. ray_debugger_external: True if the Ray debugger should be made
  1467. available externally to this node.
  1468. env_updates: Environment variable overrides.
  1469. node_name: The name of the node.
  1470. webui: The url of the UI.
  1471. Returns:
  1472. ProcessInfo for the process that was started.
  1473. """
  1474. assert node_manager_port is not None and type(node_manager_port) is int
  1475. if use_valgrind and use_profiler:
  1476. raise ValueError("Cannot use valgrind and profiler at the same time.")
  1477. # Get the static resources and labels from the resolved ResourceAndLabelSpec
  1478. static_resources = resource_and_label_spec.to_resource_dict()
  1479. labels = resource_and_label_spec.labels
  1480. # Limit the number of workers that can be started in parallel by the
  1481. # raylet. However, make sure it is at least 1.
  1482. num_cpus_static = static_resources.get("CPU", 0)
  1483. maximum_startup_concurrency = max(
  1484. 1, min(multiprocessing.cpu_count(), num_cpus_static)
  1485. )
  1486. # Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
  1487. resource_argument = ",".join(
  1488. ["{},{}".format(*kv) for kv in static_resources.items()]
  1489. )
  1490. has_java_command = False
  1491. if shutil.which("java") is not None:
  1492. has_java_command = True
  1493. ray_java_installed = False
  1494. try:
  1495. jars_dir = get_ray_jars_dir()
  1496. if os.path.exists(jars_dir):
  1497. ray_java_installed = True
  1498. except Exception:
  1499. pass
  1500. include_java = has_java_command and ray_java_installed
  1501. if include_java is True:
  1502. java_worker_command = build_java_worker_command(
  1503. gcs_address,
  1504. plasma_store_name,
  1505. raylet_name,
  1506. redis_username,
  1507. redis_password,
  1508. session_dir,
  1509. node_ip_address,
  1510. setup_worker_path,
  1511. )
  1512. else:
  1513. java_worker_command = []
  1514. if os.path.exists(DEFAULT_WORKER_EXECUTABLE):
  1515. cpp_worker_command = build_cpp_worker_command(
  1516. gcs_address,
  1517. plasma_store_name,
  1518. raylet_name,
  1519. redis_username,
  1520. redis_password,
  1521. session_dir,
  1522. log_dir,
  1523. node_ip_address,
  1524. setup_worker_path,
  1525. )
  1526. else:
  1527. cpp_worker_command = []
  1528. # Create the command that the Raylet will use to start workers.
  1529. # TODO(architkulkarni): Pipe in setup worker args separately instead of
  1530. # inserting them into start_worker_command and later erasing them if
  1531. # needed.
  1532. start_worker_command = (
  1533. [
  1534. sys.executable,
  1535. setup_worker_path,
  1536. ]
  1537. + _site_flags() # Inherit "-S" and "-s" flags from current Python interpreter.
  1538. + [
  1539. worker_path,
  1540. f"--node-ip-address={node_ip_address}",
  1541. "--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
  1542. f"--object-store-name={plasma_store_name}",
  1543. f"--raylet-name={raylet_name}",
  1544. f"--redis-address={redis_address}",
  1545. f"--metrics-agent-port={metrics_agent_port}",
  1546. f"--logging-rotate-bytes={max_bytes}",
  1547. f"--logging-rotate-backup-count={backup_count}",
  1548. f"--runtime-env-agent-port={runtime_env_agent_port}",
  1549. f"--gcs-address={gcs_address}",
  1550. f"--session-name={session_name}",
  1551. f"--temp-dir={temp_dir}",
  1552. f"--webui={webui}",
  1553. f"--cluster-id={cluster_id}",
  1554. ]
  1555. )
  1556. start_worker_command.append("RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER")
  1557. if redis_username:
  1558. start_worker_command += [f"--redis-username={redis_username}"]
  1559. if redis_password:
  1560. start_worker_command += [f"--redis-password={redis_password}"]
  1561. # If the object manager port is None, then use 0 to cause the object
  1562. # manager to choose its own port.
  1563. if object_manager_port is None:
  1564. object_manager_port = 0
  1565. if min_worker_port is None:
  1566. min_worker_port = 0
  1567. if max_worker_port is None:
  1568. max_worker_port = 0
  1569. labels_json_str = ""
  1570. if labels:
  1571. labels_json_str = json.dumps(labels)
  1572. dashboard_agent_command = [
  1573. *_build_python_executable_command_memory_profileable(
  1574. ray_constants.PROCESS_TYPE_DASHBOARD_AGENT, session_dir
  1575. ),
  1576. os.path.join(RAY_PATH, "dashboard", "agent.py"),
  1577. f"--node-id={node_id}",
  1578. f"--node-ip-address={node_ip_address}",
  1579. f"--metrics-export-port={metrics_export_port}",
  1580. f"--grpc-port={metrics_agent_port}",
  1581. f"--listen-port={dashboard_agent_listen_port}",
  1582. "--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
  1583. f"--object-store-name={plasma_store_name}",
  1584. f"--raylet-name={raylet_name}",
  1585. f"--temp-dir={temp_dir}",
  1586. f"--session-dir={session_dir}",
  1587. f"--log-dir={log_dir}",
  1588. f"--logging-rotate-bytes={max_bytes}",
  1589. f"--logging-rotate-backup-count={backup_count}",
  1590. f"--session-name={session_name}",
  1591. f"--gcs-address={gcs_address}",
  1592. f"--cluster-id-hex={cluster_id}",
  1593. ]
  1594. if dashboard_agent_stdout_filepath:
  1595. dashboard_agent_command.append(
  1596. f"--stdout-filepath={dashboard_agent_stdout_filepath}"
  1597. )
  1598. if dashboard_agent_stderr_filepath:
  1599. dashboard_agent_command.append(
  1600. f"--stderr-filepath={dashboard_agent_stderr_filepath}"
  1601. )
  1602. if (
  1603. dashboard_agent_stdout_filepath is None
  1604. and dashboard_agent_stderr_filepath is None
  1605. ):
  1606. # If not redirecting logging to files, unset log filename.
  1607. # This will cause log records to go to stderr.
  1608. dashboard_agent_command.append("--logging-filename=")
  1609. # Use stderr log format with the component name as a message prefix.
  1610. logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
  1611. component=ray_constants.PROCESS_TYPE_DASHBOARD_AGENT
  1612. )
  1613. dashboard_agent_command.append(f"--logging-format={logging_format}")
  1614. if ray._private.utils.get_dashboard_dependency_error() is not None:
  1615. # If dependencies are not installed, it is the minimally packaged
  1616. # ray. We should restrict the features within dashboard agent
  1617. # that requires additional dependencies to be downloaded.
  1618. dashboard_agent_command.append("--minimal")
  1619. if is_head_node:
  1620. dashboard_agent_command.append("--head")
  1621. runtime_env_agent_command = [
  1622. *_build_python_executable_command_memory_profileable(
  1623. ray_constants.PROCESS_TYPE_RUNTIME_ENV_AGENT, session_dir
  1624. ),
  1625. os.path.join(RAY_PATH, "_private", "runtime_env", "agent", "main.py"),
  1626. f"--node-id={node_id}",
  1627. f"--node-ip-address={node_ip_address}",
  1628. f"--runtime-env-agent-port={runtime_env_agent_port}",
  1629. f"--session-dir={session_dir}",
  1630. f"--gcs-address={gcs_address}",
  1631. f"--cluster-id-hex={cluster_id}",
  1632. f"--runtime-env-dir={resource_dir}",
  1633. f"--logging-rotate-bytes={max_bytes}",
  1634. f"--logging-rotate-backup-count={backup_count}",
  1635. f"--log-dir={log_dir}",
  1636. f"--temp-dir={temp_dir}",
  1637. ]
  1638. if runtime_env_agent_stdout_filepath:
  1639. runtime_env_agent_command.append(
  1640. f"--stdout-filepath={runtime_env_agent_stdout_filepath}"
  1641. )
  1642. if runtime_env_agent_stderr_filepath:
  1643. runtime_env_agent_command.append(
  1644. f"--stderr-filepath={runtime_env_agent_stderr_filepath}"
  1645. )
  1646. if (
  1647. runtime_env_agent_stdout_filepath is None
  1648. and runtime_env_agent_stderr_filepath is None
  1649. ):
  1650. # If not redirecting logging to files, unset log filename.
  1651. # This will cause log records to go to stderr.
  1652. runtime_env_agent_command.append("--logging-filename=")
  1653. # Use stderr log format with the component name as a message prefix.
  1654. logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
  1655. component=ray_constants.PROCESS_TYPE_RUNTIME_ENV_AGENT
  1656. )
  1657. runtime_env_agent_command.append(f"--logging-format={logging_format}")
  1658. command = [
  1659. RAYLET_EXECUTABLE,
  1660. f"--raylet_socket_name={raylet_name}",
  1661. f"--store_socket_name={plasma_store_name}",
  1662. f"--object_manager_port={object_manager_port}",
  1663. f"--min_worker_port={min_worker_port}",
  1664. f"--max_worker_port={max_worker_port}",
  1665. f"--node_manager_port={node_manager_port}",
  1666. f"--node_id={node_id}",
  1667. f"--node_ip_address={node_ip_address}",
  1668. f"--maximum_startup_concurrency={maximum_startup_concurrency}",
  1669. f"--static_resource_list={resource_argument}",
  1670. f"--python_worker_command={subprocess.list2cmdline(start_worker_command)}", # noqa
  1671. f"--java_worker_command={subprocess.list2cmdline(java_worker_command)}", # noqa
  1672. f"--cpp_worker_command={subprocess.list2cmdline(cpp_worker_command)}", # noqa
  1673. f"--native_library_path={DEFAULT_NATIVE_LIBRARY_PATH}",
  1674. f"--temp_dir={temp_dir}",
  1675. f"--session_dir={session_dir}",
  1676. f"--log_dir={log_dir}",
  1677. f"--resource_dir={resource_dir}",
  1678. f"--metrics-agent-port={metrics_agent_port}",
  1679. f"--metrics_export_port={metrics_export_port}",
  1680. f"--runtime_env_agent_port={runtime_env_agent_port}",
  1681. f"--object_store_memory={object_store_memory}",
  1682. f"--plasma_directory={plasma_directory}",
  1683. f"--fallback_directory={fallback_directory}",
  1684. f"--ray-debugger-external={1 if ray_debugger_external else 0}",
  1685. f"--gcs-address={gcs_address}",
  1686. f"--session-name={session_name}",
  1687. f"--labels={labels_json_str}",
  1688. f"--cluster-id={cluster_id}",
  1689. ]
  1690. if resource_isolation_config.is_enabled():
  1691. logging.info(
  1692. f"Resource isolation enabled with cgroup_path={resource_isolation_config.cgroup_path}, "
  1693. f"system_reserved_cpu={resource_isolation_config.system_reserved_cpu_weight} "
  1694. f"system_reserved_memory={resource_isolation_config.system_reserved_memory}."
  1695. )
  1696. command.append("--enable-resource-isolation")
  1697. command.append(f"--cgroup-path={resource_isolation_config.cgroup_path}")
  1698. command.append(
  1699. f"--system-reserved-cpu-weight={resource_isolation_config.system_reserved_cpu_weight}"
  1700. )
  1701. command.append(
  1702. f"--system-reserved-memory-bytes={resource_isolation_config.system_reserved_memory}"
  1703. )
  1704. command.append(f"--system-pids={resource_isolation_config.system_pids}")
  1705. if raylet_stdout_filepath:
  1706. command.append(f"--stdout_filepath={raylet_stdout_filepath}")
  1707. if raylet_stderr_filepath:
  1708. command.append(f"--stderr_filepath={raylet_stderr_filepath}")
  1709. if is_head_node:
  1710. command.append("--head")
  1711. if worker_port_list is not None:
  1712. command.append(f"--worker_port_list={worker_port_list}")
  1713. command.append(
  1714. "--num_prestart_python_workers={}".format(int(resource_and_label_spec.num_cpus))
  1715. )
  1716. command.append(
  1717. "--dashboard_agent_command={}".format(
  1718. subprocess.list2cmdline(dashboard_agent_command)
  1719. )
  1720. )
  1721. command.append(
  1722. "--runtime_env_agent_command={}".format(
  1723. subprocess.list2cmdline(runtime_env_agent_command)
  1724. )
  1725. )
  1726. if huge_pages:
  1727. command.append("--huge_pages")
  1728. if socket_to_use:
  1729. socket_to_use.close()
  1730. if node_name is not None:
  1731. command.append(
  1732. f"--node-name={node_name}",
  1733. )
  1734. stdout_file = None
  1735. if raylet_stdout_filepath:
  1736. stdout_file = open(os.devnull, "w")
  1737. stderr_file = None
  1738. if raylet_stderr_filepath:
  1739. stderr_file = open(os.devnull, "w")
  1740. process_info = start_ray_process(
  1741. command,
  1742. ray_constants.PROCESS_TYPE_RAYLET,
  1743. use_valgrind=use_valgrind,
  1744. use_gdb=False,
  1745. use_valgrind_profiler=use_profiler,
  1746. use_perftools_profiler=("RAYLET_PERFTOOLS_PATH" in os.environ),
  1747. stdout_file=stdout_file,
  1748. stderr_file=stderr_file,
  1749. fate_share=fate_share,
  1750. env_updates=env_updates,
  1751. )
  1752. return process_info
  1753. def get_ray_jars_dir():
  1754. """Return a directory where all ray-related jars and
  1755. their dependencies locate."""
  1756. current_dir = RAY_PATH
  1757. jars_dir = os.path.abspath(os.path.join(current_dir, "jars"))
  1758. if not os.path.exists(jars_dir):
  1759. raise RuntimeError(
  1760. "Ray jars is not packaged into ray. "
  1761. "Please build ray with java enabled "
  1762. "(set env var RAY_INSTALL_JAVA=1)"
  1763. )
  1764. return os.path.abspath(os.path.join(current_dir, "jars"))
  1765. def build_java_worker_command(
  1766. bootstrap_address: str,
  1767. plasma_store_name: str,
  1768. raylet_name: str,
  1769. redis_username: str,
  1770. redis_password: str,
  1771. session_dir: str,
  1772. node_ip_address: str,
  1773. setup_worker_path: str,
  1774. ):
  1775. """This method assembles the command used to start a Java worker.
  1776. Args:
  1777. bootstrap_address: Bootstrap address of ray cluster.
  1778. plasma_store_name: The name of the plasma store socket to connect
  1779. to.
  1780. raylet_name: The name of the raylet socket to create.
  1781. redis_username: The username to connect to Redis.
  1782. redis_password: The password to connect to Redis.
  1783. session_dir: The path of this session.
  1784. node_ip_address: The IP address for this node.
  1785. setup_worker_path: The path of the Python file that will set up
  1786. the environment for the worker process.
  1787. Returns:
  1788. The command string for starting Java worker.
  1789. """
  1790. pairs = []
  1791. if bootstrap_address is not None:
  1792. pairs.append(("ray.address", bootstrap_address))
  1793. pairs.append(("ray.raylet.node-manager-port", "RAY_NODE_MANAGER_PORT_PLACEHOLDER"))
  1794. if plasma_store_name is not None:
  1795. pairs.append(("ray.object-store.socket-name", plasma_store_name))
  1796. if raylet_name is not None:
  1797. pairs.append(("ray.raylet.socket-name", raylet_name))
  1798. if redis_username is not None:
  1799. pairs.append(("ray.redis.username", redis_username))
  1800. if redis_password is not None:
  1801. pairs.append(("ray.redis.password", redis_password))
  1802. if node_ip_address is not None:
  1803. pairs.append(("ray.node-ip", node_ip_address))
  1804. pairs.append(("ray.home", RAY_HOME))
  1805. pairs.append(("ray.logging.dir", os.path.join(session_dir, "logs")))
  1806. pairs.append(("ray.session-dir", session_dir))
  1807. command = (
  1808. [sys.executable]
  1809. + [setup_worker_path]
  1810. + ["-D{}={}".format(*pair) for pair in pairs]
  1811. )
  1812. command += ["RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER"]
  1813. command += ["io.ray.runtime.runner.worker.DefaultWorker"]
  1814. return command
  1815. def build_cpp_worker_command(
  1816. bootstrap_address: str,
  1817. plasma_store_name: str,
  1818. raylet_name: str,
  1819. redis_username: str,
  1820. redis_password: str,
  1821. session_dir: str,
  1822. log_dir: str,
  1823. node_ip_address: str,
  1824. setup_worker_path: str,
  1825. ):
  1826. """This method assembles the command used to start a CPP worker.
  1827. Args:
  1828. bootstrap_address: The bootstrap address of the cluster.
  1829. plasma_store_name: The name of the plasma store socket to connect
  1830. to.
  1831. raylet_name: The name of the raylet socket to create.
  1832. redis_username: The username to connect to Redis.
  1833. redis_password: The password to connect to Redis.
  1834. session_dir: The path of this session.
  1835. log_dir: The path of logs.
  1836. node_ip_address: The ip address for this node.
  1837. setup_worker_path: The path of the Python file that will set up
  1838. the environment for the worker process.
  1839. Returns:
  1840. The command string for starting CPP worker.
  1841. """
  1842. command = [
  1843. sys.executable,
  1844. setup_worker_path,
  1845. DEFAULT_WORKER_EXECUTABLE,
  1846. f"--ray_plasma_store_socket_name={plasma_store_name}",
  1847. f"--ray_raylet_socket_name={raylet_name}",
  1848. "--ray_node_manager_port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
  1849. f"--ray_address={bootstrap_address}",
  1850. f"--ray_redis_username={redis_username}",
  1851. f"--ray_redis_password={redis_password}",
  1852. f"--ray_session_dir={session_dir}",
  1853. f"--ray_logs_dir={log_dir}",
  1854. f"--ray_node_ip_address={node_ip_address}",
  1855. "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER",
  1856. ]
  1857. return command
  1858. def determine_plasma_store_config(
  1859. object_store_memory: int,
  1860. temp_dir: str,
  1861. plasma_directory: Optional[str] = None,
  1862. fallback_directory: Optional[str] = None,
  1863. huge_pages: bool = False,
  1864. ):
  1865. """Figure out how to configure the plasma object store.
  1866. This will determine:
  1867. 1. which directory to use for the plasma store. On Linux,
  1868. we will try to use /dev/shm unless the shared memory file system is too
  1869. small, in which case we will fall back to /tmp. If any of the object store
  1870. memory or plasma directory parameters are specified by the user, then those
  1871. values will be preserved.
  1872. 2. which directory to use for the fallback files. It will default to the temp_dir
  1873. if it is not extracted from the object_spilling_config.
  1874. Args:
  1875. object_store_memory: The object store memory to use.
  1876. plasma_directory: The user-specified plasma directory parameter.
  1877. fallback_directory: The path extracted from the object_spilling_config when the
  1878. object spilling config is set and the spilling type is to
  1879. filesystem.
  1880. huge_pages: The user-specified huge pages parameter.
  1881. Returns:
  1882. A tuple of plasma directory to use, the fallback directory to use, and the
  1883. object store memory to use. If it is specified by the user, then that value will
  1884. be preserved.
  1885. """
  1886. if not isinstance(object_store_memory, int):
  1887. object_store_memory = int(object_store_memory)
  1888. if huge_pages and not (sys.platform == "linux" or sys.platform == "linux2"):
  1889. raise ValueError("The huge_pages argument is only supported on Linux.")
  1890. system_memory = ray._common.utils.get_system_memory()
  1891. # Determine which directory to use. By default, use /tmp on MacOS and
  1892. # /dev/shm on Linux, unless the shared-memory file system is too small,
  1893. # in which case we default to /tmp on Linux.
  1894. if plasma_directory is None:
  1895. if sys.platform == "linux" or sys.platform == "linux2":
  1896. shm_avail = ray._private.utils.get_shared_memory_bytes()
  1897. # Compare the requested memory size to the memory available in
  1898. # /dev/shm.
  1899. if shm_avail >= object_store_memory:
  1900. plasma_directory = "/dev/shm"
  1901. elif (
  1902. not os.environ.get("RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE")
  1903. and object_store_memory > ray_constants.REQUIRE_SHM_SIZE_THRESHOLD
  1904. ):
  1905. raise ValueError(
  1906. "The configured object store size ({} GB) exceeds "
  1907. "/dev/shm size ({} GB). This will harm performance. "
  1908. "Consider deleting files in /dev/shm or increasing its "
  1909. "size with "
  1910. "--shm-size in Docker. To ignore this warning, "
  1911. "set RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE=1.".format(
  1912. object_store_memory / 1e9, shm_avail / 1e9
  1913. )
  1914. )
  1915. else:
  1916. plasma_directory = ray._common.utils.get_user_temp_dir()
  1917. logger.warning(
  1918. "WARNING: The object store is using {} instead of "
  1919. "/dev/shm because /dev/shm has only {} bytes available. "
  1920. "This will harm performance! You may be able to free up "
  1921. "space by deleting files in /dev/shm. If you are inside a "
  1922. "Docker container, you can increase /dev/shm size by "
  1923. "passing '--shm-size={:.2f}gb' to 'docker run' (or add it "
  1924. "to the run_options list in a Ray cluster config). Make "
  1925. "sure to set this to more than 30% of available RAM.".format(
  1926. ray._common.utils.get_user_temp_dir(),
  1927. shm_avail,
  1928. object_store_memory * (1.1) / (2**30),
  1929. )
  1930. )
  1931. else:
  1932. plasma_directory = ray._common.utils.get_user_temp_dir()
  1933. # Do some sanity checks.
  1934. if object_store_memory > system_memory:
  1935. raise ValueError(
  1936. "The requested object store memory size is greater "
  1937. "than the total available memory."
  1938. )
  1939. else:
  1940. plasma_directory = os.path.abspath(plasma_directory)
  1941. logger.info("object_store_memory is not verified when plasma_directory is set.")
  1942. if not os.path.isdir(plasma_directory):
  1943. raise ValueError(
  1944. f"The plasma directory file {plasma_directory} does not exist or is not a directory."
  1945. )
  1946. if huge_pages and plasma_directory is None:
  1947. raise ValueError(
  1948. "If huge_pages is True, then the "
  1949. "plasma_directory argument must be provided."
  1950. )
  1951. if object_store_memory < ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES:
  1952. raise ValueError(
  1953. "Attempting to cap object store memory usage at {} "
  1954. "bytes, but the minimum allowed is {} bytes.".format(
  1955. object_store_memory, ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES
  1956. )
  1957. )
  1958. if (
  1959. sys.platform == "darwin"
  1960. and object_store_memory > ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT
  1961. and os.environ.get("RAY_ENABLE_MAC_LARGE_OBJECT_STORE") != "1"
  1962. ):
  1963. raise ValueError(
  1964. "The configured object store size ({:.4}GiB) exceeds "
  1965. "the optimal size on Mac ({:.4}GiB). "
  1966. "This will harm performance! There is a known issue where "
  1967. "Ray's performance degrades with object store size greater"
  1968. " than {:.4}GB on a Mac."
  1969. "To reduce the object store capacity, specify"
  1970. "`object_store_memory` when calling ray.init() or ray start."
  1971. "To ignore this warning, "
  1972. "set RAY_ENABLE_MAC_LARGE_OBJECT_STORE=1.".format(
  1973. object_store_memory / 2**30,
  1974. ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT / 2**30,
  1975. ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT / 2**30,
  1976. )
  1977. )
  1978. if fallback_directory is None:
  1979. fallback_directory = temp_dir
  1980. else:
  1981. fallback_directory = os.path.abspath(fallback_directory)
  1982. if not os.path.isdir(fallback_directory):
  1983. raise ValueError(
  1984. f"The fallback directory file {fallback_directory} does not exist or is not a directory."
  1985. )
  1986. # Print the object store memory using two decimal places.
  1987. logger.debug(
  1988. "Determine to start the Plasma object store with {} GB memory "
  1989. "using {} and fallback to {}".format(
  1990. round(object_store_memory / 10**9, 2),
  1991. plasma_directory,
  1992. fallback_directory,
  1993. )
  1994. )
  1995. return plasma_directory, fallback_directory, object_store_memory
  1996. def start_monitor(
  1997. gcs_address: str,
  1998. logs_dir: str,
  1999. stdout_filepath: Optional[str] = None,
  2000. stderr_filepath: Optional[str] = None,
  2001. autoscaling_config: Optional[str] = None,
  2002. fate_share: Optional[bool] = None,
  2003. max_bytes: int = 0,
  2004. backup_count: int = 0,
  2005. monitor_ip: Optional[str] = None,
  2006. autoscaler_v2: bool = False,
  2007. ):
  2008. """Run a process to monitor the other processes.
  2009. Args:
  2010. gcs_address: The address of GCS server.
  2011. logs_dir: The path to the log directory.
  2012. stdout_filepath: The file path to dump monitor stdout.
  2013. If None, stdout is not redirected.
  2014. stderr_filepath: The file path to dump monitor stderr.
  2015. If None, stderr is not redirected.
  2016. autoscaling_config: path to autoscaling config file.
  2017. max_bytes: Log rotation parameter. Corresponding to
  2018. RotatingFileHandler's maxBytes.
  2019. backup_count: Log rotation parameter. Corresponding to
  2020. RotatingFileHandler's backupCount.
  2021. monitor_ip: IP address of the machine that the monitor will be
  2022. run on. Can be excluded, but required for autoscaler metrics.
  2023. Returns:
  2024. ProcessInfo for the process that was started.
  2025. """
  2026. if autoscaler_v2:
  2027. entrypoint = os.path.join(RAY_PATH, AUTOSCALER_V2_DIR, "monitor.py")
  2028. else:
  2029. entrypoint = os.path.join(RAY_PATH, AUTOSCALER_PRIVATE_DIR, "monitor.py")
  2030. command = [
  2031. sys.executable,
  2032. "-u",
  2033. entrypoint,
  2034. f"--logs-dir={logs_dir}",
  2035. f"--logging-rotate-bytes={max_bytes}",
  2036. f"--logging-rotate-backup-count={backup_count}",
  2037. ]
  2038. assert gcs_address is not None
  2039. command.append(f"--gcs-address={gcs_address}")
  2040. if stdout_filepath:
  2041. command.append(f"--stdout-filepath={stdout_filepath}")
  2042. if stderr_filepath:
  2043. command.append(f"--stderr-filepath={stderr_filepath}")
  2044. if stdout_filepath is None and stderr_filepath is None:
  2045. # If not redirecting logging to files, unset log filename.
  2046. # This will cause log records to go to stderr.
  2047. command.append("--logging-filename=")
  2048. # Use stderr log format with the component name as a message prefix.
  2049. logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
  2050. component=ray_constants.PROCESS_TYPE_MONITOR
  2051. )
  2052. command.append(f"--logging-format={logging_format}")
  2053. if autoscaling_config:
  2054. command.append("--autoscaling-config=" + str(autoscaling_config))
  2055. if monitor_ip:
  2056. command.append("--monitor-ip=" + monitor_ip)
  2057. stdout_file = None
  2058. if stdout_filepath:
  2059. stdout_file = open(os.devnull, "w")
  2060. stderr_file = None
  2061. if stderr_filepath:
  2062. stderr_file = open(os.devnull, "w")
  2063. process_info = start_ray_process(
  2064. command,
  2065. ray_constants.PROCESS_TYPE_MONITOR,
  2066. stdout_file=stdout_file,
  2067. stderr_file=stderr_file,
  2068. fate_share=fate_share,
  2069. )
  2070. return process_info
  2071. def start_ray_client_server(
  2072. address: str,
  2073. ray_client_server_ip: str,
  2074. ray_client_server_port: int,
  2075. stdout_file: Optional[int] = None,
  2076. stderr_file: Optional[int] = None,
  2077. redis_username: Optional[str] = None,
  2078. redis_password: Optional[str] = None,
  2079. fate_share: Optional[bool] = None,
  2080. runtime_env_agent_address: Optional[str] = None,
  2081. node_id: Optional[str] = None,
  2082. server_type: str = "proxy",
  2083. serialized_runtime_env_context: Optional[str] = None,
  2084. ):
  2085. """Run the server process of the Ray client.
  2086. Args:
  2087. address: The address of the cluster.
  2088. ray_client_server_ip: Host IP the Ray client server listens on.
  2089. ray_client_server_port: Port the Ray client server listens on.
  2090. stdout_file: A file handle opened for writing to redirect stdout to. If
  2091. no redirection should happen, then this should be None.
  2092. stderr_file: A file handle opened for writing to redirect stderr to. If
  2093. no redirection should happen, then this should be None.
  2094. redis_username: The username of the Redis server.
  2095. redis_password: The password of the Redis server.
  2096. runtime_env_agent_address: Address to the Runtime Env Agent listens on via HTTP.
  2097. Only needed when server_type == "proxy".
  2098. node_id: The hex ID of this node.
  2099. server_type: Whether to start the proxy version of Ray Client.
  2100. serialized_runtime_env_context (str|None): If specified, the serialized
  2101. runtime_env_context to start the client server in.
  2102. Returns:
  2103. ProcessInfo for the process that was started.
  2104. """
  2105. root_ray_dir = Path(__file__).resolve().parents[1]
  2106. setup_worker_path = os.path.join(
  2107. root_ray_dir, "_private", "workers", ray_constants.SETUP_WORKER_FILENAME
  2108. )
  2109. ray_client_server_host = ray_client_server_ip
  2110. command = [
  2111. sys.executable,
  2112. setup_worker_path,
  2113. "-m",
  2114. "ray.util.client.server",
  2115. f"--address={address}",
  2116. f"--host={ray_client_server_host}",
  2117. f"--port={ray_client_server_port}",
  2118. f"--mode={server_type}",
  2119. f"--language={Language.Name(Language.PYTHON)}",
  2120. ]
  2121. if redis_username:
  2122. command.append(f"--redis-username={redis_username}")
  2123. if redis_password:
  2124. command.append(f"--redis-password={redis_password}")
  2125. if serialized_runtime_env_context:
  2126. command.append(
  2127. f"--serialized-runtime-env-context={serialized_runtime_env_context}" # noqa: E501
  2128. )
  2129. if server_type == "proxy":
  2130. assert len(runtime_env_agent_address) > 0
  2131. if runtime_env_agent_address:
  2132. command.append(f"--runtime-env-agent-address={runtime_env_agent_address}")
  2133. if node_id:
  2134. command.append(f"--node-id={node_id}")
  2135. process_info = start_ray_process(
  2136. command,
  2137. ray_constants.PROCESS_TYPE_RAY_CLIENT_SERVER,
  2138. stdout_file=stdout_file,
  2139. stderr_file=stderr_file,
  2140. fate_share=fate_share,
  2141. )
  2142. return process_info
  2143. def _is_raylet_process(cmdline: Optional[List[str]]) -> bool:
  2144. """Check if the command line belongs to a raylet process.
  2145. Args:
  2146. cmdline: List of command line arguments or None
  2147. Returns:
  2148. bool: True if this is a raylet process, False otherwise
  2149. """
  2150. if cmdline is None or len(cmdline) == 0:
  2151. return False
  2152. executable = os.path.basename(cmdline[0])
  2153. return "raylet" in executable