node.py 71 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830
  1. import atexit
  2. import collections
  3. import datetime
  4. import errno
  5. import json
  6. import logging
  7. import os
  8. import random
  9. import signal
  10. import socket
  11. import subprocess
  12. import sys
  13. import tempfile
  14. import threading
  15. import time
  16. import traceback
  17. from typing import IO, AnyStr, Optional, Tuple
  18. import ray
  19. import ray._private.ray_constants as ray_constants
  20. import ray._private.services
  21. from ray._common.network_utils import (
  22. build_address,
  23. get_localhost_ip,
  24. is_ipv6,
  25. parse_address,
  26. )
  27. from ray._common.ray_constants import LOGGING_ROTATE_BACKUP_COUNT, LOGGING_ROTATE_BYTES
  28. from ray._common.utils import try_to_create_directory
  29. from ray._private.resource_and_label_spec import ResourceAndLabelSpec
  30. from ray._private.resource_isolation_config import ResourceIsolationConfig
  31. from ray._private.services import get_address, serialize_config
  32. from ray._private.utils import (
  33. is_in_test,
  34. open_log,
  35. try_to_symlink,
  36. validate_socket_filepath,
  37. )
  38. from ray._raylet import (
  39. GCS_SERVER_PORT_NAME,
  40. GcsClient,
  41. get_port_filename,
  42. get_session_key_from_storage,
  43. wait_for_persisted_port,
  44. )
  45. import psutil
  46. # Logger for this module. It should be configured at the entry point
  47. # into the program using Ray. Ray configures it by default automatically
  48. # using logging.basicConfig in its entry/init points.
  49. logger = logging.getLogger(__name__)
  50. class Node:
  51. """An encapsulation of the Ray processes on a single node.
  52. This class is responsible for starting Ray processes and killing them,
  53. and it also controls the temp file policy.
  54. Attributes:
  55. all_processes: A mapping from process type (str) to a list of
  56. ProcessInfo objects. All lists have length one except for the Redis
  57. server list, which has multiple.
  58. """
  59. def __init__(
  60. self,
  61. ray_params,
  62. head: bool = False,
  63. shutdown_at_exit: bool = True,
  64. spawn_reaper: bool = True,
  65. connect_only: bool = False,
  66. default_worker: bool = False,
  67. ray_init_cluster: bool = False,
  68. ):
  69. """Start a node.
  70. Args:
  71. ray_params: The RayParams to use to configure the node.
  72. head: True if this is the head node, which means it will
  73. start additional processes like the Redis servers, monitor
  74. processes, and web UI.
  75. shutdown_at_exit: If true, spawned processes will be cleaned
  76. up if this process exits normally.
  77. spawn_reaper: If true, spawns a process that will clean up
  78. other spawned processes if this process dies unexpectedly.
  79. connect_only: If true, connect to the node without starting
  80. new processes.
  81. default_worker: Whether it's running from a ray worker or not
  82. ray_init_cluster: Whether it's a cluster created by ray.init()
  83. """
  84. if shutdown_at_exit:
  85. if connect_only:
  86. raise ValueError(
  87. "'shutdown_at_exit' and 'connect_only' cannot both be true."
  88. )
  89. self._register_shutdown_hooks()
  90. self._default_worker = default_worker
  91. self.head = head
  92. self.kernel_fate_share = bool(
  93. spawn_reaper and ray._private.utils.detect_fate_sharing_support()
  94. )
  95. self.resource_isolation_config: ResourceIsolationConfig = (
  96. ray_params.resource_isolation_config
  97. )
  98. self.all_processes: dict = {}
  99. self.removal_lock = threading.Lock()
  100. self.ray_init_cluster = ray_init_cluster
  101. if ray_init_cluster:
  102. assert head, "ray.init() created cluster only has the head node"
  103. # Set up external Redis when `RAY_REDIS_ADDRESS` is specified.
  104. redis_address_env = os.environ.get("RAY_REDIS_ADDRESS")
  105. if ray_params.external_addresses is None and redis_address_env is not None:
  106. external_redis = redis_address_env.split(",")
  107. # Reuse primary Redis as Redis shard when there's only one
  108. # instance provided.
  109. if len(external_redis) == 1:
  110. external_redis.append(external_redis[0])
  111. ray_params.external_addresses = external_redis
  112. ray_params.num_redis_shards = len(external_redis) - 1
  113. if (
  114. ray_params._system_config
  115. and len(ray_params._system_config) > 0
  116. and (not head and not connect_only)
  117. ):
  118. raise ValueError(
  119. "System config parameters can only be set on the head node."
  120. )
  121. ray_params.update_if_absent(
  122. include_log_monitor=True,
  123. resources={},
  124. worker_path=os.path.join(
  125. os.path.dirname(os.path.abspath(__file__)),
  126. "workers",
  127. "default_worker.py",
  128. ),
  129. setup_worker_path=os.path.join(
  130. os.path.dirname(os.path.abspath(__file__)),
  131. "workers",
  132. ray_constants.SETUP_WORKER_FILENAME,
  133. ),
  134. )
  135. self._resource_and_label_spec = None
  136. self._localhost = get_localhost_ip()
  137. self._ray_params = ray_params
  138. self._config = ray_params._system_config or {}
  139. # Configure log rotation parameters.
  140. self.max_bytes = int(os.getenv("RAY_ROTATION_MAX_BYTES", LOGGING_ROTATE_BYTES))
  141. self.backup_count = int(
  142. os.getenv("RAY_ROTATION_BACKUP_COUNT", LOGGING_ROTATE_BACKUP_COUNT)
  143. )
  144. assert self.max_bytes >= 0
  145. assert self.backup_count >= 0
  146. self._redis_address = ray_params.redis_address
  147. if head:
  148. ray_params.update_if_absent(num_redis_shards=1)
  149. self._gcs_address = ray_params.gcs_address
  150. self._gcs_client = None
  151. if not self.head:
  152. self.validate_ip_port(self.address)
  153. self._init_gcs_client()
  154. # Register the temp dir.
  155. self._session_name = ray_params.session_name
  156. if self._session_name is None:
  157. if head:
  158. # We expect this the first time we initialize a cluster, but not during
  159. # subsequent restarts of the head node.
  160. maybe_key = self.check_persisted_session_name()
  161. if maybe_key is None:
  162. # date including microsecond
  163. date_str = datetime.datetime.today().strftime(
  164. "%Y-%m-%d_%H-%M-%S_%f"
  165. )
  166. self._session_name = f"session_{date_str}_{os.getpid()}"
  167. else:
  168. self._session_name = ray._common.utils.decode(maybe_key)
  169. else:
  170. assert not self._default_worker
  171. session_name = ray._private.utils.internal_kv_get_with_retry(
  172. self.get_gcs_client(),
  173. "session_name",
  174. ray_constants.KV_NAMESPACE_SESSION,
  175. num_retries=ray_constants.NUM_REDIS_GET_RETRIES,
  176. )
  177. self._session_name = ray._common.utils.decode(session_name)
  178. # Initialize webui url
  179. if head:
  180. self._webui_url = None
  181. else:
  182. if ray_params.webui is None:
  183. assert not self._default_worker
  184. self._webui_url = ray._private.services.get_webui_url_from_internal_kv()
  185. else:
  186. self._webui_url = build_address(
  187. ray_params.dashboard_host, ray_params.dashboard_port
  188. )
  189. # Resolve node to connect to
  190. node_to_connect_info = None
  191. if connect_only and not self._default_worker:
  192. node_to_connect_info = ray._private.services.get_node_to_connect_for_driver(
  193. self.get_gcs_client(),
  194. node_ip_address=ray_params.node_ip_address,
  195. node_name=ray_params.node_name,
  196. temp_dir=ray_params.temp_dir,
  197. )
  198. # Resolve node ID
  199. if connect_only:
  200. self._node_id = ray_params.node_id
  201. if self._node_id is None:
  202. self._node_id = node_to_connect_info.node_id.hex()
  203. else:
  204. if (
  205. self._ray_params.env_vars is not None
  206. and "RAY_OVERRIDE_NODE_ID_FOR_TESTING" in self._ray_params.env_vars
  207. ):
  208. node_id = self._ray_params.env_vars["RAY_OVERRIDE_NODE_ID_FOR_TESTING"]
  209. logger.debug(
  210. f"Setting node ID to {node_id} "
  211. "based on ray_params.env_vars override"
  212. )
  213. self._node_id = node_id
  214. elif os.environ.get("RAY_OVERRIDE_NODE_ID_FOR_TESTING"):
  215. node_id = os.environ["RAY_OVERRIDE_NODE_ID_FOR_TESTING"]
  216. logger.debug(f"Setting node ID to {node_id} based on env override")
  217. self._node_id = node_id
  218. else:
  219. node_id = ray.NodeID.from_random().hex()
  220. logger.debug(f"Setting node ID to {node_id}")
  221. self._node_id = node_id
  222. # Resolve node ip address
  223. node_ip_address = ray_params.node_ip_address
  224. if node_ip_address is None:
  225. if connect_only:
  226. assert node_to_connect_info is not None
  227. node_ip_address = getattr(
  228. node_to_connect_info, "node_manager_address", None
  229. )
  230. else:
  231. node_ip_address = ray.util.get_node_ip_address()
  232. assert node_ip_address is not None
  233. ray_params.update_if_absent(node_ip_address=node_ip_address)
  234. self._node_ip_address = node_ip_address
  235. # It creates a session_dir.
  236. self._init_temp()
  237. # Resolve socket and port names
  238. if connect_only:
  239. # Get socket names from the configuration.
  240. self._plasma_store_socket_name = ray_params.plasma_store_socket_name
  241. self._raylet_socket_name = ray_params.raylet_socket_name
  242. # If user does not provide the socket name, get it from GCS.
  243. if (
  244. self._plasma_store_socket_name is None
  245. or self._raylet_socket_name is None
  246. or self._ray_params.node_manager_port is None
  247. ):
  248. # Get the address info of the processes to connect to
  249. # from Redis or GCS.
  250. assert node_to_connect_info is not None
  251. self._plasma_store_socket_name = (
  252. node_to_connect_info.object_store_socket_name
  253. )
  254. self._raylet_socket_name = node_to_connect_info.raylet_socket_name
  255. self._ray_params.node_manager_port = (
  256. node_to_connect_info.node_manager_port
  257. )
  258. else:
  259. # If the user specified a socket name, use it.
  260. self._plasma_store_socket_name = self._prepare_socket_file(
  261. self._ray_params.plasma_store_socket_name, default_prefix="plasma_store"
  262. )
  263. self._raylet_socket_name = self._prepare_socket_file(
  264. self._ray_params.raylet_socket_name, default_prefix="raylet"
  265. )
  266. self._object_spilling_config = self._get_object_spilling_config()
  267. logger.debug(
  268. f"Starting node with object spilling config: {self._object_spilling_config}"
  269. )
  270. # Obtain the fallback directoy from the object spilling config
  271. # Currently, we set the fallback directory to be the same as the object spilling
  272. # path when the object spills to file system
  273. self._fallback_directory = None
  274. if self._object_spilling_config:
  275. config = json.loads(self._object_spilling_config)
  276. if config.get("type") == "filesystem":
  277. directory_path = config.get("params", {}).get("directory_path")
  278. if isinstance(directory_path, list):
  279. self._fallback_directory = directory_path[0]
  280. elif isinstance(directory_path, str):
  281. self._fallback_directory = directory_path
  282. # If it is a head node, try validating if external storage is configurable.
  283. if head:
  284. self.validate_external_storage()
  285. ray_params.update_if_absent(
  286. metrics_agent_port=ray_params.metrics_agent_port or 0,
  287. metrics_export_port=ray_params.metrics_export_port or 0,
  288. dashboard_agent_listen_port=ray_params.dashboard_agent_listen_port or 0,
  289. runtime_env_agent_port=ray_params.runtime_env_agent_port or 0,
  290. )
  291. # Pick a GCS server port.
  292. if head:
  293. # For GCS fault tolerance: if the port file already exists in the
  294. # current session directory, this indicates a GCS restart scenario.
  295. # We reuse the existing port so that other components can reconnect
  296. # to GCS after it restarts.
  297. gcs_port_filename = get_port_filename(self._node_id, GCS_SERVER_PORT_NAME)
  298. gcs_port_file = os.path.join(self._session_dir, gcs_port_filename)
  299. if os.path.exists(gcs_port_file):
  300. gcs_port = wait_for_persisted_port(
  301. self._session_dir,
  302. self._node_id,
  303. GCS_SERVER_PORT_NAME,
  304. timeout_ms=0,
  305. )
  306. ray_params.update_if_absent(gcs_server_port=gcs_port)
  307. else:
  308. gcs_server_port = os.getenv(ray_constants.GCS_PORT_ENVIRONMENT_VARIABLE)
  309. ray_params.update_if_absent(
  310. gcs_server_port=int(gcs_server_port) if gcs_server_port else 0
  311. )
  312. if not connect_only and spawn_reaper and not self.kernel_fate_share:
  313. self.start_reaper_process()
  314. if not connect_only:
  315. self._ray_params.update_pre_selected_port()
  316. # Start processes.
  317. if head:
  318. self.start_head_processes()
  319. node_info = None
  320. if not connect_only:
  321. self.start_ray_processes()
  322. # Wait for the node info to be available in the GCS so that
  323. # we know it's started up.
  324. # Grace period to let the Raylet register with the GCS.
  325. # We retry in a loop in case it takes longer than expected.
  326. time.sleep(0.1)
  327. start_time = time.monotonic()
  328. raylet_start_wait_time_s = 30
  329. while True:
  330. try:
  331. # Will raise a RuntimeError if the node info is not available.
  332. node_info = ray._private.services.get_node(
  333. self.gcs_address,
  334. self._node_id,
  335. )
  336. break
  337. except RuntimeError as e:
  338. logger.info(f"Failed to get node info {e}")
  339. if time.monotonic() - start_time > raylet_start_wait_time_s:
  340. raise Exception(
  341. "The current node timed out during startup. This "
  342. "could happen because some of the raylet failed to "
  343. "startup or the GCS has become overloaded."
  344. )
  345. if connect_only:
  346. # Fetch node info to get labels.
  347. node_info = ray._private.services.get_node(
  348. self.gcs_address,
  349. self._node_id,
  350. )
  351. # Set node labels from GCS if provided at node init.
  352. self._node_labels = node_info.get("labels", {})
  353. # port can be 0 or None for two cases:
  354. # 1. user is starting a new ray cluster and does not specify the port, components self-bind.
  355. # 2. user is connecting to an existing ray cluster, no port info is provided.
  356. # We always update port info from GCS to ensure consistency.
  357. self._ray_params.node_manager_port = node_info["node_manager_port"]
  358. self._ray_params.runtime_env_agent_port = node_info["runtime_env_agent_port"]
  359. self._ray_params.metrics_agent_port = node_info["metrics_agent_port"]
  360. self._ray_params.metrics_export_port = node_info["metrics_export_port"]
  361. self._ray_params.dashboard_agent_listen_port = node_info[
  362. "dashboard_agent_listen_port"
  363. ]
  364. # Makes sure the Node object has valid addresses after setup.
  365. self.validate_ip_port(self.address)
  366. self.validate_ip_port(self.gcs_address)
  367. if not connect_only:
  368. self._record_stats()
  369. def check_persisted_session_name(self):
  370. if self._ray_params.external_addresses is None:
  371. return None
  372. self._redis_address = self._ray_params.external_addresses[0]
  373. redis_ip_address, redis_port, enable_redis_ssl = get_address(
  374. self._redis_address,
  375. )
  376. # Address is ip:port or redis://ip:port
  377. if int(redis_port) < 0:
  378. raise ValueError(
  379. f"Invalid Redis port provided: {redis_port}."
  380. "The port must be a non-negative integer."
  381. )
  382. return get_session_key_from_storage(
  383. redis_ip_address,
  384. int(redis_port),
  385. self._ray_params.redis_username,
  386. self._ray_params.redis_password,
  387. enable_redis_ssl,
  388. serialize_config(self._config),
  389. b"session_name",
  390. )
  391. @staticmethod
  392. def validate_ip_port(ip_port):
  393. """Validates the address is in the ip:port format"""
  394. parts = parse_address(ip_port)
  395. if parts is None:
  396. raise ValueError(f"Port is not specified for address {ip_port}")
  397. try:
  398. _ = int(parts[1])
  399. except ValueError:
  400. raise ValueError(
  401. f"Unable to parse port number from {parts[1]} (full address = {ip_port})"
  402. )
  403. def check_version_info(self):
  404. """Check if the Python and Ray version of this process matches that in GCS.
  405. This will be used to detect if workers or drivers are started using
  406. different versions of Python, or Ray.
  407. Raises:
  408. Exception: An exception is raised if there is a version mismatch.
  409. """
  410. import ray._common.usage.usage_lib as ray_usage_lib
  411. cluster_metadata = ray_usage_lib.get_cluster_metadata(self.get_gcs_client())
  412. if cluster_metadata is None:
  413. cluster_metadata = ray_usage_lib.get_cluster_metadata(self.get_gcs_client())
  414. if not cluster_metadata:
  415. return
  416. node_ip_address = ray._private.services.get_node_ip_address()
  417. ray._private.utils.check_version_info(
  418. cluster_metadata, f"node {node_ip_address}"
  419. )
  420. def _register_shutdown_hooks(self):
  421. # Register the atexit handler. In this case, we shouldn't call sys.exit
  422. # as we're already in the exit procedure.
  423. def atexit_handler(*args):
  424. self.kill_all_processes(check_alive=False, allow_graceful=True)
  425. atexit.register(atexit_handler)
  426. # Register the handler to be called if we get a SIGTERM.
  427. # In this case, we want to exit with an error code (1) after
  428. # cleaning up child processes.
  429. def sigterm_handler(signum, frame):
  430. self.kill_all_processes(check_alive=False, allow_graceful=True)
  431. sys.exit(1)
  432. ray._private.utils.set_sigterm_handler(sigterm_handler)
  433. def _init_temp(self):
  434. # Create a dictionary to store temp file index.
  435. self._incremental_dict = collections.defaultdict(lambda: 0)
  436. if self.head:
  437. self._ray_params.update_if_absent(
  438. temp_dir=ray._common.utils.get_ray_temp_dir()
  439. )
  440. self._temp_dir = self._ray_params.temp_dir
  441. else:
  442. if self._ray_params.temp_dir is None:
  443. assert not self._default_worker
  444. temp_dir = ray._private.utils.internal_kv_get_with_retry(
  445. self.get_gcs_client(),
  446. "temp_dir",
  447. ray_constants.KV_NAMESPACE_SESSION,
  448. num_retries=ray_constants.NUM_REDIS_GET_RETRIES,
  449. )
  450. self._temp_dir = ray._common.utils.decode(temp_dir)
  451. else:
  452. self._temp_dir = self._ray_params.temp_dir
  453. try_to_create_directory(self._temp_dir)
  454. if self.head:
  455. self._session_dir = os.path.join(self._temp_dir, self._session_name)
  456. else:
  457. if self._temp_dir is None or self._session_name is None:
  458. assert not self._default_worker
  459. session_dir = ray._private.utils.internal_kv_get_with_retry(
  460. self.get_gcs_client(),
  461. "session_dir",
  462. ray_constants.KV_NAMESPACE_SESSION,
  463. num_retries=ray_constants.NUM_REDIS_GET_RETRIES,
  464. )
  465. self._session_dir = ray._common.utils.decode(session_dir)
  466. else:
  467. self._session_dir = os.path.join(self._temp_dir, self._session_name)
  468. session_symlink = os.path.join(self._temp_dir, ray_constants.SESSION_LATEST)
  469. # Send a warning message if the session exists.
  470. try_to_create_directory(self._session_dir)
  471. try_to_symlink(session_symlink, self._session_dir)
  472. # Create a directory to be used for socket files.
  473. self._sockets_dir = os.path.join(self._session_dir, "sockets")
  474. try_to_create_directory(self._sockets_dir)
  475. # Create a directory to be used for process log files.
  476. self._logs_dir = os.path.join(self._session_dir, "logs")
  477. try_to_create_directory(self._logs_dir)
  478. old_logs_dir = os.path.join(self._logs_dir, "old")
  479. try_to_create_directory(old_logs_dir)
  480. # Create a directory to be used for runtime environment.
  481. self._runtime_env_dir = os.path.join(
  482. self._session_dir, self._ray_params.runtime_env_dir_name
  483. )
  484. try_to_create_directory(self._runtime_env_dir)
  485. # Create a symlink to the libtpu tpu_logs directory if it exists.
  486. user_temp_dir = ray._common.utils.get_user_temp_dir()
  487. tpu_log_dir = f"{user_temp_dir}/tpu_logs"
  488. if os.path.isdir(tpu_log_dir):
  489. tpu_logs_symlink = os.path.join(self._logs_dir, "tpu_logs")
  490. try_to_symlink(tpu_logs_symlink, tpu_log_dir)
  491. def get_resource_and_label_spec(self):
  492. """Resolve and return the current ResourceAndLabelSpec for the node."""
  493. if not self._resource_and_label_spec:
  494. self._resource_and_label_spec = ResourceAndLabelSpec(
  495. self._ray_params.num_cpus,
  496. self._ray_params.num_gpus,
  497. self._ray_params.memory,
  498. self._ray_params.available_memory_bytes,
  499. self._ray_params.object_store_memory,
  500. self._ray_params.resources,
  501. self._ray_params.labels,
  502. ).resolve(is_head=self.head, node_ip_address=self.node_ip_address)
  503. return self._resource_and_label_spec
  504. @property
  505. def node_id(self):
  506. """Get the node ID."""
  507. return self._node_id
  508. @property
  509. def session_name(self):
  510. """Get the current Ray session name."""
  511. return self._session_name
  512. @property
  513. def node_ip_address(self):
  514. """Get the IP address of this node."""
  515. return self._node_ip_address
  516. @property
  517. def address(self):
  518. """Get the address for bootstrapping, e.g. the address to pass to
  519. `ray start` or `ray.init()` to start worker nodes, that has been
  520. converted to ip:port format.
  521. """
  522. return self._gcs_address
  523. @property
  524. def gcs_address(self):
  525. """Get the gcs address."""
  526. assert self._gcs_address is not None, "Gcs address is not set"
  527. return self._gcs_address
  528. @property
  529. def redis_address(self):
  530. """Get the cluster Redis address."""
  531. return self._redis_address
  532. @property
  533. def redis_username(self):
  534. """Get the cluster Redis username."""
  535. return self._ray_params.redis_username
  536. @property
  537. def redis_password(self):
  538. """Get the cluster Redis password."""
  539. return self._ray_params.redis_password
  540. @property
  541. def plasma_store_socket_name(self):
  542. """Get the node's plasma store socket name."""
  543. return self._plasma_store_socket_name
  544. @property
  545. def unique_id(self):
  546. """Get a unique identifier for this node."""
  547. return f"{self.node_ip_address}:{self._plasma_store_socket_name}"
  548. @property
  549. def webui_url(self):
  550. """Get the cluster's web UI url."""
  551. return self._webui_url
  552. @property
  553. def raylet_socket_name(self):
  554. """Get the node's raylet socket name."""
  555. return self._raylet_socket_name
  556. @property
  557. def node_manager_port(self):
  558. """Get the node manager's port."""
  559. return self._ray_params.node_manager_port
  560. @property
  561. def metrics_export_port(self):
  562. """Get the port that exposes metrics"""
  563. return self._ray_params.metrics_export_port
  564. @property
  565. def metrics_agent_port(self):
  566. """Get the metrics agent gRPC port"""
  567. return self._ray_params.metrics_agent_port
  568. @property
  569. def runtime_env_agent_port(self):
  570. """Get the port that exposes runtime env agent as http"""
  571. return self._ray_params.runtime_env_agent_port
  572. @property
  573. def runtime_env_agent_address(self):
  574. """Get the address that exposes runtime env agent as http"""
  575. return f"http://{build_address(self._node_ip_address, self._ray_params.runtime_env_agent_port)}"
  576. @property
  577. def dashboard_agent_listen_port(self):
  578. """Get the dashboard agent's listen port"""
  579. return self._ray_params.dashboard_agent_listen_port
  580. @property
  581. def logging_config(self):
  582. """Get the logging config of the current node."""
  583. return {
  584. "log_rotation_max_bytes": self.max_bytes,
  585. "log_rotation_backup_count": self.backup_count,
  586. }
  587. @property
  588. def address_info(self):
  589. """Get a dictionary of addresses."""
  590. return {
  591. "node_ip_address": self._node_ip_address,
  592. "redis_address": self.redis_address,
  593. "object_store_address": self._plasma_store_socket_name,
  594. "raylet_socket_name": self._raylet_socket_name,
  595. "webui_url": self._webui_url,
  596. "session_dir": self._session_dir,
  597. "metrics_export_port": self._ray_params.metrics_export_port,
  598. "gcs_address": self.gcs_address,
  599. "address": self.address,
  600. "dashboard_agent_listen_port": self._ray_params.dashboard_agent_listen_port,
  601. }
  602. @property
  603. def node_labels(self):
  604. """Get the node labels."""
  605. return self._node_labels
  606. def is_head(self):
  607. return self.head
  608. def get_gcs_client(self):
  609. if self._gcs_client is None:
  610. self._init_gcs_client()
  611. return self._gcs_client
  612. def _init_gcs_client(self):
  613. if self.head:
  614. gcs_process = self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER][
  615. 0
  616. ].process
  617. else:
  618. gcs_process = None
  619. # TODO(ryw) instead of create a new GcsClient, wrap the one from
  620. # CoreWorkerProcess to save a grpc channel.
  621. for _ in range(ray_constants.NUM_REDIS_GET_RETRIES):
  622. gcs_address = None
  623. last_ex = None
  624. try:
  625. gcs_address = self.gcs_address
  626. client = GcsClient(
  627. address=gcs_address,
  628. cluster_id=self._ray_params.cluster_id, # Hex string
  629. )
  630. self.cluster_id = client.cluster_id
  631. if self.head:
  632. # Send a simple request to make sure GCS is alive
  633. # if it's a head node.
  634. client.internal_kv_get(b"dummy", None)
  635. self._gcs_client = client
  636. break
  637. except Exception:
  638. if gcs_process is not None and gcs_process.poll() is not None:
  639. # GCS has exited.
  640. break
  641. last_ex = traceback.format_exc()
  642. logger.debug(f"Connecting to GCS: {last_ex}")
  643. time.sleep(1)
  644. if self._gcs_client is None:
  645. if hasattr(self, "_logs_dir"):
  646. with open(os.path.join(self._logs_dir, "gcs_server.err")) as err:
  647. # Use " C " or " E " to exclude the stacktrace.
  648. # This should work for most cases, especitally
  649. # it's when GCS is starting. Only display last 10 lines of logs.
  650. errors = [e for e in err.readlines() if " C " in e or " E " in e][
  651. -10:
  652. ]
  653. error_msg = "\n" + "".join(errors) + "\n"
  654. raise RuntimeError(
  655. f"Failed to {'start' if self.head else 'connect to'} GCS. "
  656. f" Last {len(errors)} lines of error files:"
  657. f"{error_msg}."
  658. f"Please check {os.path.join(self._logs_dir, 'gcs_server.out')}"
  659. f" for details. Last connection error: {last_ex}"
  660. )
  661. else:
  662. raise RuntimeError(
  663. f"Failed to {'start' if self.head else 'connect to'} GCS. Last "
  664. f"connection error: {last_ex}"
  665. )
  666. ray.experimental.internal_kv._initialize_internal_kv(self._gcs_client)
  667. def get_temp_dir_path(self):
  668. """Get the path of the temporary directory."""
  669. return self._temp_dir
  670. def get_runtime_env_dir_path(self):
  671. """Get the path of the runtime env."""
  672. return self._runtime_env_dir
  673. def get_session_dir_path(self):
  674. """Get the path of the session directory."""
  675. return self._session_dir
  676. def get_logs_dir_path(self):
  677. """Get the path of the log files directory."""
  678. return self._logs_dir
  679. def get_sockets_dir_path(self):
  680. """Get the path of the sockets directory."""
  681. return self._sockets_dir
  682. def _make_inc_temp(
  683. self, suffix: str = "", prefix: str = "", directory_name: Optional[str] = None
  684. ):
  685. """Return an incremental temporary file name. The file is not created.
  686. Args:
  687. suffix: The suffix of the temp file.
  688. prefix: The prefix of the temp file.
  689. directory_name (str) : The base directory of the temp file.
  690. Returns:
  691. A string of file name. If there existing a file having
  692. the same name, the returned name will look like
  693. "{directory_name}/{prefix}.{unique_index}{suffix}"
  694. """
  695. if directory_name is None:
  696. directory_name = ray._common.utils.get_ray_temp_dir()
  697. directory_name = os.path.expanduser(directory_name)
  698. index = self._incremental_dict[suffix, prefix, directory_name]
  699. # `tempfile.TMP_MAX` could be extremely large,
  700. # so using `range` in Python2.x should be avoided.
  701. while index < tempfile.TMP_MAX:
  702. if index == 0:
  703. filename = os.path.join(directory_name, prefix + suffix)
  704. else:
  705. filename = os.path.join(
  706. directory_name, prefix + "." + str(index) + suffix
  707. )
  708. index += 1
  709. if not os.path.exists(filename):
  710. # Save the index.
  711. self._incremental_dict[suffix, prefix, directory_name] = index
  712. return filename
  713. raise FileExistsError(errno.EEXIST, "No usable temporary filename found")
  714. def should_redirect_logs(self):
  715. # Preferred: thread the setting explicitly via RayParams.log_to_stderr.
  716. # This avoids relying on process-global environment variables.
  717. if getattr(self._ray_params, "log_to_stderr", None) is not None:
  718. return not self._ray_params.log_to_stderr
  719. # Deprecated (kept for backward compatibility): RayParams.redirect_output.
  720. redirect_output = self._ray_params.redirect_output
  721. if redirect_output is not None:
  722. return redirect_output
  723. # Fall back to stderr redirect environment variable.
  724. return (
  725. os.environ.get(ray_constants.LOGGING_REDIRECT_STDERR_ENVIRONMENT_VARIABLE)
  726. != "1"
  727. )
  728. # TODO(hjiang): Re-implement the logic in C++, and expose via cython.
  729. def get_log_file_names(
  730. self,
  731. name: str,
  732. unique: bool = False,
  733. create_out: bool = True,
  734. create_err: bool = True,
  735. ) -> Tuple[Optional[str], Optional[str]]:
  736. """Get filename to dump logs for stdout and stderr, with no files opened.
  737. If output redirection has been disabled, no files will
  738. be opened and `(None, None)` will be returned.
  739. Args:
  740. name: descriptive string for this log file.
  741. unique: if true, a counter will be attached to `name` to
  742. ensure the returned filename is not already used.
  743. create_out: if True, create a .out file.
  744. create_err: if True, create a .err file.
  745. Returns:
  746. A tuple of two file handles for redirecting optional (stdout, stderr),
  747. or `(None, None)` if output redirection is disabled.
  748. """
  749. if not self.should_redirect_logs():
  750. return None, None
  751. log_stdout = None
  752. log_stderr = None
  753. if create_out:
  754. log_stdout = self._get_log_file_name(name, "out", unique=unique)
  755. if create_err:
  756. log_stderr = self._get_log_file_name(name, "err", unique=unique)
  757. return log_stdout, log_stderr
  758. def get_log_file_handles(
  759. self,
  760. name: str,
  761. unique: bool = False,
  762. create_out: bool = True,
  763. create_err: bool = True,
  764. ) -> Tuple[Optional[IO[AnyStr]], Optional[IO[AnyStr]]]:
  765. """Open log files with partially randomized filenames, returning the
  766. file handles. If output redirection has been disabled, no files will
  767. be opened and `(None, None)` will be returned.
  768. Args:
  769. name: descriptive string for this log file.
  770. unique: if true, a counter will be attached to `name` to
  771. ensure the returned filename is not already used.
  772. create_out: if True, create a .out file.
  773. create_err: if True, create a .err file.
  774. Returns:
  775. A tuple of two file handles for redirecting optional (stdout, stderr),
  776. or `(None, None)` if output redirection is disabled.
  777. """
  778. log_stdout_fname, log_stderr_fname = self.get_log_file_names(
  779. name, unique=unique, create_out=create_out, create_err=create_err
  780. )
  781. log_stdout = None if log_stdout_fname is None else open_log(log_stdout_fname)
  782. log_stderr = None if log_stderr_fname is None else open_log(log_stderr_fname)
  783. return log_stdout, log_stderr
  784. def _get_log_file_name(
  785. self,
  786. name: str,
  787. suffix: str,
  788. unique: bool = False,
  789. ) -> str:
  790. """Generate partially randomized filenames for log files.
  791. Args:
  792. name: descriptive string for this log file.
  793. suffix: suffix of the file. Usually it is .out of .err.
  794. unique: if true, a counter will be attached to `name` to
  795. ensure the returned filename is not already used.
  796. Returns:
  797. A tuple of two file names for redirecting (stdout, stderr).
  798. """
  799. # strip if the suffix is something like .out.
  800. suffix = suffix.strip(".")
  801. if unique:
  802. filename = self._make_inc_temp(
  803. suffix=f".{suffix}", prefix=name, directory_name=self._logs_dir
  804. )
  805. else:
  806. filename = os.path.join(self._logs_dir, f"{name}.{suffix}")
  807. return filename
  808. def _get_unused_port(self, allocated_ports=None):
  809. if allocated_ports is None:
  810. allocated_ports = set()
  811. s = socket.socket(
  812. socket.AF_INET6 if is_ipv6(self._node_ip_address) else socket.AF_INET,
  813. socket.SOCK_STREAM,
  814. )
  815. s.bind(("", 0))
  816. port = s.getsockname()[1]
  817. # Try to generate a port that is far above the 'next available' one.
  818. # This solves issue #8254 where GRPC fails because the port assigned
  819. # from this method has been used by a different process.
  820. for _ in range(ray_constants.NUM_PORT_RETRIES):
  821. new_port = random.randint(port, 65535)
  822. if new_port in allocated_ports:
  823. # This port is allocated for other usage already,
  824. # so we shouldn't use it even if it's not in use right now.
  825. continue
  826. new_s = socket.socket(
  827. socket.AF_INET6 if is_ipv6(self._node_ip_address) else socket.AF_INET,
  828. socket.SOCK_STREAM,
  829. )
  830. try:
  831. new_s.bind(("", new_port))
  832. except OSError:
  833. new_s.close()
  834. continue
  835. s.close()
  836. new_s.close()
  837. return new_port
  838. logger.error("Unable to succeed in selecting a random port.")
  839. s.close()
  840. return port
  841. def _prepare_socket_file(self, socket_path: str, default_prefix: str):
  842. """Prepare the socket file for raylet and plasma.
  843. This method helps to prepare a socket file.
  844. 1. Make the directory if the directory does not exist.
  845. 2. If the socket file exists, do nothing (this just means we aren't the
  846. first worker on the node).
  847. Args:
  848. socket_path: the socket file to prepare.
  849. """
  850. result = socket_path
  851. if sys.platform == "win32":
  852. if socket_path is None:
  853. result = (
  854. f"tcp://{build_address(self._localhost, self._get_unused_port())}"
  855. )
  856. else:
  857. if socket_path is None:
  858. result = self._make_inc_temp(
  859. prefix=default_prefix, directory_name=self._sockets_dir
  860. )
  861. else:
  862. try_to_create_directory(os.path.dirname(socket_path))
  863. validate_socket_filepath(result.split("://", 1)[-1])
  864. return result
  865. def start_reaper_process(self):
  866. """
  867. Start the reaper process.
  868. This must be the first process spawned and should only be called when
  869. ray processes should be cleaned up if this process dies.
  870. """
  871. assert (
  872. not self.kernel_fate_share
  873. ), "a reaper should not be used with kernel fate-sharing"
  874. process_info = ray._private.services.start_reaper(fate_share=False)
  875. assert ray_constants.PROCESS_TYPE_REAPER not in self.all_processes
  876. if process_info is not None:
  877. self.all_processes[ray_constants.PROCESS_TYPE_REAPER] = [
  878. process_info,
  879. ]
  880. def start_log_monitor(self):
  881. """Start the log monitor."""
  882. stdout_log_fname, stderr_log_fname = self.get_log_file_names(
  883. "log_monitor", unique=True, create_out=True, create_err=True
  884. )
  885. process_info = ray._private.services.start_log_monitor(
  886. self.get_session_dir_path(),
  887. self._logs_dir,
  888. self.gcs_address,
  889. self._node_ip_address,
  890. fate_share=self.kernel_fate_share,
  891. max_bytes=self.max_bytes,
  892. backup_count=self.backup_count,
  893. stdout_filepath=stdout_log_fname,
  894. stderr_filepath=stderr_log_fname,
  895. )
  896. assert ray_constants.PROCESS_TYPE_LOG_MONITOR not in self.all_processes
  897. self.all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] = [
  898. process_info,
  899. ]
  900. def start_api_server(
  901. self, *, include_dashboard: Optional[bool], raise_on_failure: bool
  902. ):
  903. """Start the dashboard.
  904. Args:
  905. include_dashboard: If true, this will load all dashboard-related modules
  906. when starting the API server. Otherwise, it will only
  907. start the modules that are not relevant to the dashboard.
  908. raise_on_failure: If true, this will raise an exception
  909. if we fail to start the API server. Otherwise it will print
  910. a warning if we fail to start the API server.
  911. """
  912. stdout_log_fname, stderr_log_fname = self.get_log_file_names(
  913. "dashboard", unique=True, create_out=True, create_err=True
  914. )
  915. self._webui_url, process_info = ray._private.services.start_api_server(
  916. include_dashboard,
  917. raise_on_failure,
  918. self._ray_params.dashboard_host,
  919. self.gcs_address,
  920. self.cluster_id.hex(),
  921. self._node_ip_address,
  922. self._temp_dir,
  923. self._logs_dir,
  924. self._session_dir,
  925. port=self._ray_params.dashboard_port,
  926. fate_share=self.kernel_fate_share,
  927. max_bytes=self.max_bytes,
  928. backup_count=self.backup_count,
  929. stdout_filepath=stdout_log_fname,
  930. stderr_filepath=stderr_log_fname,
  931. )
  932. assert ray_constants.PROCESS_TYPE_DASHBOARD not in self.all_processes
  933. if process_info is not None:
  934. self.all_processes[ray_constants.PROCESS_TYPE_DASHBOARD] = [
  935. process_info,
  936. ]
  937. self.get_gcs_client().internal_kv_put(
  938. b"webui:url",
  939. self._webui_url.encode(),
  940. True,
  941. ray_constants.KV_NAMESPACE_DASHBOARD,
  942. )
  943. def start_gcs_server(self):
  944. """Start the gcs server."""
  945. assert self._ray_params.gcs_server_port >= 0
  946. assert self._gcs_address is None, "GCS server is already running."
  947. assert self._gcs_client is None, "GCS client is already connected."
  948. stdout_log_fname, stderr_log_fname = self.get_log_file_names(
  949. "gcs_server", unique=True, create_out=True, create_err=True
  950. )
  951. process_info = ray._private.services.start_gcs_server(
  952. self.redis_address,
  953. log_dir=self._logs_dir,
  954. stdout_filepath=stdout_log_fname,
  955. stderr_filepath=stderr_log_fname,
  956. session_name=self.session_name,
  957. redis_username=self._ray_params.redis_username,
  958. redis_password=self._ray_params.redis_password,
  959. config=self._config,
  960. fate_share=self.kernel_fate_share,
  961. gcs_server_port=self._ray_params.gcs_server_port,
  962. metrics_agent_port=self._ray_params.metrics_agent_port,
  963. node_ip_address=self._node_ip_address,
  964. session_dir=self._session_dir,
  965. node_id=self._node_id,
  966. )
  967. assert ray_constants.PROCESS_TYPE_GCS_SERVER not in self.all_processes
  968. self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [
  969. process_info,
  970. ]
  971. if self._ray_params.gcs_server_port == 0:
  972. self._ray_params.gcs_server_port = wait_for_persisted_port(
  973. self._session_dir,
  974. self._node_id,
  975. GCS_SERVER_PORT_NAME,
  976. )
  977. # Connecting via non-localhost address may be blocked by firewall rule,
  978. # e.g. https://github.com/ray-project/ray/issues/15780
  979. # TODO(mwtian): figure out a way to use 127.0.0.1 for local connection
  980. # when possible.
  981. self._gcs_address = build_address(
  982. self._node_ip_address, self._ray_params.gcs_server_port
  983. )
  984. def start_raylet(
  985. self,
  986. plasma_directory: str,
  987. fallback_directory: str,
  988. object_store_memory: int,
  989. use_valgrind: bool = False,
  990. use_profiler: bool = False,
  991. ):
  992. """Start the raylet.
  993. Args:
  994. use_valgrind: True if we should start the process in
  995. valgrind.
  996. use_profiler: True if we should start the process in the
  997. valgrind profiler.
  998. """
  999. raylet_stdout_filepath, raylet_stderr_filepath = self.get_log_file_names(
  1000. ray_constants.PROCESS_TYPE_RAYLET,
  1001. unique=True,
  1002. create_out=True,
  1003. create_err=True,
  1004. )
  1005. (
  1006. dashboard_agent_stdout_filepath,
  1007. dashboard_agent_stderr_filepath,
  1008. ) = self.get_log_file_names(
  1009. ray_constants.PROCESS_TYPE_DASHBOARD_AGENT,
  1010. unique=True,
  1011. create_out=True,
  1012. create_err=True,
  1013. )
  1014. (
  1015. runtime_env_agent_stdout_filepath,
  1016. runtime_env_agent_stderr_filepath,
  1017. ) = self.get_log_file_names(
  1018. ray_constants.PROCESS_TYPE_RUNTIME_ENV_AGENT,
  1019. unique=True,
  1020. create_out=True,
  1021. create_err=True,
  1022. )
  1023. self.resource_isolation_config.add_system_pids(
  1024. self._get_system_processes_for_resource_isolation()
  1025. )
  1026. process_info = ray._private.services.start_raylet(
  1027. self.redis_address,
  1028. self.gcs_address,
  1029. self._node_id,
  1030. self._node_ip_address,
  1031. self._ray_params.node_manager_port,
  1032. self._raylet_socket_name,
  1033. self._plasma_store_socket_name,
  1034. self.cluster_id.hex(),
  1035. self._ray_params.worker_path,
  1036. self._ray_params.setup_worker_path,
  1037. self._temp_dir,
  1038. self._session_dir,
  1039. self._runtime_env_dir,
  1040. self._logs_dir,
  1041. self.get_resource_and_label_spec(),
  1042. plasma_directory,
  1043. fallback_directory,
  1044. object_store_memory,
  1045. self.session_name,
  1046. is_head_node=self.is_head(),
  1047. min_worker_port=self._ray_params.min_worker_port,
  1048. max_worker_port=self._ray_params.max_worker_port,
  1049. worker_port_list=self._ray_params.worker_port_list,
  1050. object_manager_port=self._ray_params.object_manager_port,
  1051. redis_username=self._ray_params.redis_username,
  1052. redis_password=self._ray_params.redis_password,
  1053. metrics_agent_port=self._ray_params.metrics_agent_port,
  1054. runtime_env_agent_port=self._ray_params.runtime_env_agent_port,
  1055. metrics_export_port=self._ray_params.metrics_export_port,
  1056. dashboard_agent_listen_port=self._ray_params.dashboard_agent_listen_port,
  1057. use_valgrind=use_valgrind,
  1058. use_profiler=use_profiler,
  1059. raylet_stdout_filepath=raylet_stdout_filepath,
  1060. raylet_stderr_filepath=raylet_stderr_filepath,
  1061. dashboard_agent_stdout_filepath=dashboard_agent_stdout_filepath,
  1062. dashboard_agent_stderr_filepath=dashboard_agent_stderr_filepath,
  1063. runtime_env_agent_stdout_filepath=runtime_env_agent_stdout_filepath,
  1064. runtime_env_agent_stderr_filepath=runtime_env_agent_stderr_filepath,
  1065. huge_pages=self._ray_params.huge_pages,
  1066. fate_share=self.kernel_fate_share,
  1067. socket_to_use=None,
  1068. max_bytes=self.max_bytes,
  1069. backup_count=self.backup_count,
  1070. ray_debugger_external=self._ray_params.ray_debugger_external,
  1071. env_updates=self._ray_params.env_vars,
  1072. node_name=self._ray_params.node_name,
  1073. webui=self._webui_url,
  1074. resource_isolation_config=self.resource_isolation_config,
  1075. )
  1076. assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
  1077. self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
  1078. def start_monitor(self):
  1079. """Start the monitor.
  1080. Autoscaling output goes to these monitor.err/out files, and
  1081. any modification to these files may break existing
  1082. cluster launching commands.
  1083. """
  1084. from ray.autoscaler.v2.utils import is_autoscaler_v2
  1085. stdout_log_fname, stderr_log_fname = self.get_log_file_names(
  1086. "monitor", unique=True, create_out=True, create_err=True
  1087. )
  1088. process_info = ray._private.services.start_monitor(
  1089. self.gcs_address,
  1090. self._logs_dir,
  1091. stdout_filepath=stdout_log_fname,
  1092. stderr_filepath=stderr_log_fname,
  1093. autoscaling_config=self._ray_params.autoscaling_config,
  1094. fate_share=self.kernel_fate_share,
  1095. max_bytes=self.max_bytes,
  1096. backup_count=self.backup_count,
  1097. monitor_ip=self._node_ip_address,
  1098. autoscaler_v2=is_autoscaler_v2(fetch_from_server=True),
  1099. )
  1100. assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes
  1101. self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info]
  1102. def start_ray_client_server(self):
  1103. """Start the ray client server process."""
  1104. stdout_file, stderr_file = self.get_log_file_handles(
  1105. "ray_client_server", unique=True
  1106. )
  1107. process_info = ray._private.services.start_ray_client_server(
  1108. self.address,
  1109. self._node_ip_address,
  1110. self._ray_params.ray_client_server_port,
  1111. stdout_file=stdout_file,
  1112. stderr_file=stderr_file,
  1113. redis_username=self._ray_params.redis_username,
  1114. redis_password=self._ray_params.redis_password,
  1115. fate_share=self.kernel_fate_share,
  1116. runtime_env_agent_address=self.runtime_env_agent_address,
  1117. node_id=self._node_id,
  1118. )
  1119. assert ray_constants.PROCESS_TYPE_RAY_CLIENT_SERVER not in self.all_processes
  1120. self.all_processes[ray_constants.PROCESS_TYPE_RAY_CLIENT_SERVER] = [
  1121. process_info
  1122. ]
  1123. def _write_cluster_info_to_kv(self):
  1124. """Write the cluster metadata to GCS.
  1125. Cluster metadata is always recorded, but they are
  1126. not reported unless usage report is enabled.
  1127. Check `usage_stats_head.py` for more details.
  1128. """
  1129. # Make sure the cluster metadata wasn't reported before.
  1130. import ray._common.usage.usage_lib as ray_usage_lib
  1131. ray_usage_lib.put_cluster_metadata(
  1132. self.get_gcs_client(), ray_init_cluster=self.ray_init_cluster
  1133. )
  1134. # Make sure GCS is up.
  1135. added = self.get_gcs_client().internal_kv_put(
  1136. b"session_name",
  1137. self._session_name.encode(),
  1138. False,
  1139. ray_constants.KV_NAMESPACE_SESSION,
  1140. )
  1141. if not added:
  1142. curr_val = self.get_gcs_client().internal_kv_get(
  1143. b"session_name", ray_constants.KV_NAMESPACE_SESSION
  1144. )
  1145. assert curr_val == self._session_name.encode("utf-8"), (
  1146. f"Session name {self._session_name} does not match "
  1147. f"persisted value {curr_val}. Perhaps there was an "
  1148. f"error connecting to Redis."
  1149. )
  1150. self.get_gcs_client().internal_kv_put(
  1151. b"session_dir",
  1152. self._session_dir.encode(),
  1153. True,
  1154. ray_constants.KV_NAMESPACE_SESSION,
  1155. )
  1156. self.get_gcs_client().internal_kv_put(
  1157. b"temp_dir",
  1158. self._temp_dir.encode(),
  1159. True,
  1160. ray_constants.KV_NAMESPACE_SESSION,
  1161. )
  1162. # Add tracing_startup_hook to redis / internal kv manually
  1163. # since internal kv is not yet initialized.
  1164. if self._ray_params.tracing_startup_hook:
  1165. self.get_gcs_client().internal_kv_put(
  1166. b"tracing_startup_hook",
  1167. self._ray_params.tracing_startup_hook.encode(),
  1168. True,
  1169. ray_constants.KV_NAMESPACE_TRACING,
  1170. )
  1171. def start_head_processes(self):
  1172. """Start head processes on the node."""
  1173. logger.debug(
  1174. f"Process STDOUT and STDERR is being " f"redirected to {self._logs_dir}."
  1175. )
  1176. assert self._gcs_address is None
  1177. assert self._gcs_client is None
  1178. self.start_gcs_server()
  1179. assert self.get_gcs_client() is not None
  1180. self._write_cluster_info_to_kv()
  1181. if not self._ray_params.no_monitor:
  1182. self.start_monitor()
  1183. if self._ray_params.ray_client_server_port:
  1184. self.start_ray_client_server()
  1185. if self._ray_params.include_dashboard is None:
  1186. # Default
  1187. raise_on_api_server_failure = False
  1188. else:
  1189. raise_on_api_server_failure = self._ray_params.include_dashboard
  1190. self.start_api_server(
  1191. include_dashboard=self._ray_params.include_dashboard,
  1192. raise_on_failure=raise_on_api_server_failure,
  1193. )
  1194. def start_ray_processes(self):
  1195. """Start all of the processes on the node."""
  1196. logger.debug(
  1197. f"Process STDOUT and STDERR is being " f"redirected to {self._logs_dir}."
  1198. )
  1199. if not self.head:
  1200. # Get the system config from GCS first if this is a non-head node.
  1201. gcs_options = ray._raylet.GcsClientOptions.create(
  1202. self.gcs_address,
  1203. self.cluster_id.hex(),
  1204. allow_cluster_id_nil=False,
  1205. fetch_cluster_id_if_nil=False,
  1206. )
  1207. global_state = ray._private.state.GlobalState()
  1208. global_state._initialize_global_state(gcs_options)
  1209. new_config = global_state.get_system_config()
  1210. assert self._config.items() <= new_config.items(), (
  1211. "The system config from GCS is not a superset of the local"
  1212. " system config. There might be a configuration inconsistency"
  1213. " issue between the head node and non-head nodes."
  1214. f" Local system config: {self._config},"
  1215. f" GCS system config: {new_config}"
  1216. )
  1217. self._config = new_config
  1218. # Make sure we don't call `determine_plasma_store_config` multiple
  1219. # times to avoid printing multiple warnings.
  1220. resource_and_label_spec = self.get_resource_and_label_spec()
  1221. if resource_and_label_spec.labels.get(
  1222. ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY
  1223. ):
  1224. from ray._common.usage import usage_lib
  1225. usage_lib.record_hardware_usage(
  1226. resource_and_label_spec.labels.get(
  1227. ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY
  1228. )
  1229. )
  1230. (
  1231. plasma_directory,
  1232. fallback_directory,
  1233. object_store_memory,
  1234. ) = ray._private.services.determine_plasma_store_config(
  1235. resource_and_label_spec.object_store_memory,
  1236. self._temp_dir,
  1237. plasma_directory=self._ray_params.plasma_directory,
  1238. fallback_directory=self._fallback_directory,
  1239. huge_pages=self._ray_params.huge_pages,
  1240. )
  1241. if self._ray_params.include_log_monitor:
  1242. self.start_log_monitor()
  1243. self.start_raylet(plasma_directory, fallback_directory, object_store_memory)
  1244. def _get_system_processes_for_resource_isolation(self) -> str:
  1245. """Returns a list of system processes that will be isolated by raylet.
  1246. NOTE: If a new system process is started before the raylet starts up, it needs to be
  1247. added to self.all_processes so it can be moved into the raylet's managed cgroup
  1248. hierarchy.
  1249. """
  1250. system_process_pids = [
  1251. str(p[0].process.pid) for p in self.all_processes.values()
  1252. ]
  1253. # If the dashboard api server was started on the head node, then include all of the api server's
  1254. # child processes.
  1255. if ray_constants.PROCESS_TYPE_DASHBOARD in self.all_processes:
  1256. dashboard_pid = self.all_processes[ray_constants.PROCESS_TYPE_DASHBOARD][
  1257. 0
  1258. ].process.pid
  1259. dashboard_process = psutil.Process(dashboard_pid)
  1260. system_process_pids += [str(p.pid) for p in dashboard_process.children()]
  1261. return ",".join(system_process_pids)
  1262. def _kill_process_type(
  1263. self,
  1264. process_type,
  1265. allow_graceful: bool = False,
  1266. check_alive: bool = True,
  1267. wait: bool = False,
  1268. ):
  1269. """Kill a process of a given type.
  1270. If the process type is PROCESS_TYPE_REDIS_SERVER, then we will kill all
  1271. of the Redis servers.
  1272. If the process was started in valgrind, then we will raise an exception
  1273. if the process has a non-zero exit code.
  1274. Args:
  1275. process_type: The type of the process to kill.
  1276. allow_graceful: Send a SIGTERM first and give the process
  1277. time to exit gracefully. If that doesn't work, then use
  1278. SIGKILL. We usually want to do this outside of tests.
  1279. check_alive: If true, then we expect the process to be alive
  1280. and will raise an exception if the process is already dead.
  1281. wait: If true, then this method will not return until the
  1282. process in question has exited.
  1283. Raises:
  1284. This process raises an exception in the following cases:
  1285. 1. The process had already died and check_alive is true.
  1286. 2. The process had been started in valgrind and had a non-zero
  1287. exit code.
  1288. """
  1289. # Ensure thread safety
  1290. with self.removal_lock:
  1291. self._kill_process_impl(
  1292. process_type,
  1293. allow_graceful=allow_graceful,
  1294. check_alive=check_alive,
  1295. wait=wait,
  1296. )
  1297. def _kill_process_impl(
  1298. self, process_type, allow_graceful=False, check_alive=True, wait=False
  1299. ):
  1300. """See `_kill_process_type`."""
  1301. if process_type not in self.all_processes:
  1302. return
  1303. process_infos = self.all_processes[process_type]
  1304. if process_type != ray_constants.PROCESS_TYPE_REDIS_SERVER:
  1305. assert len(process_infos) == 1
  1306. for process_info in process_infos:
  1307. process = process_info.process
  1308. # Handle the case where the process has already exited.
  1309. if process.poll() is not None:
  1310. if check_alive:
  1311. raise RuntimeError(
  1312. "Attempting to kill a process of type "
  1313. f"'{process_type}', but this process is already dead."
  1314. )
  1315. else:
  1316. continue
  1317. if process_info.use_valgrind:
  1318. process.terminate()
  1319. process.wait()
  1320. if process.returncode != 0:
  1321. message = (
  1322. "Valgrind detected some errors in process of "
  1323. f"type {process_type}. Error code {process.returncode}."
  1324. )
  1325. if process_info.stdout_file is not None:
  1326. with open(process_info.stdout_file, "r") as f:
  1327. message += "\nPROCESS STDOUT:\n" + f.read()
  1328. if process_info.stderr_file is not None:
  1329. with open(process_info.stderr_file, "r") as f:
  1330. message += "\nPROCESS STDERR:\n" + f.read()
  1331. raise RuntimeError(message)
  1332. continue
  1333. if process_info.use_valgrind_profiler:
  1334. # Give process signal to write profiler data.
  1335. os.kill(process.pid, signal.SIGINT)
  1336. # Wait for profiling data to be written.
  1337. time.sleep(0.1)
  1338. if allow_graceful:
  1339. process.terminate()
  1340. # Allow the process one second to exit gracefully.
  1341. timeout_seconds = 1
  1342. try:
  1343. process.wait(timeout_seconds)
  1344. except subprocess.TimeoutExpired:
  1345. pass
  1346. # If the process did not exit, force kill it.
  1347. if process.poll() is None:
  1348. process.kill()
  1349. # The reason we usually don't call process.wait() here is that
  1350. # there's some chance we'd end up waiting a really long time.
  1351. if wait:
  1352. process.wait()
  1353. del self.all_processes[process_type]
  1354. def kill_redis(self, check_alive: bool = True):
  1355. """Kill the Redis servers.
  1356. Args:
  1357. check_alive: Raise an exception if any of the processes
  1358. were already dead.
  1359. """
  1360. self._kill_process_type(
  1361. ray_constants.PROCESS_TYPE_REDIS_SERVER, check_alive=check_alive
  1362. )
  1363. def kill_raylet(self, check_alive: bool = True):
  1364. """Kill the raylet.
  1365. Args:
  1366. check_alive: Raise an exception if the process was already
  1367. dead.
  1368. """
  1369. self._kill_process_type(
  1370. ray_constants.PROCESS_TYPE_RAYLET, check_alive=check_alive
  1371. )
  1372. def kill_log_monitor(self, check_alive: bool = True):
  1373. """Kill the log monitor.
  1374. Args:
  1375. check_alive: Raise an exception if the process was already
  1376. dead.
  1377. """
  1378. self._kill_process_type(
  1379. ray_constants.PROCESS_TYPE_LOG_MONITOR, check_alive=check_alive
  1380. )
  1381. def kill_dashboard(self, check_alive: bool = True):
  1382. """Kill the dashboard.
  1383. Args:
  1384. check_alive: Raise an exception if the process was already
  1385. dead.
  1386. """
  1387. self._kill_process_type(
  1388. ray_constants.PROCESS_TYPE_DASHBOARD, check_alive=check_alive
  1389. )
  1390. def kill_monitor(self, check_alive: bool = True):
  1391. """Kill the monitor.
  1392. Args:
  1393. check_alive: Raise an exception if the process was already
  1394. dead.
  1395. """
  1396. self._kill_process_type(
  1397. ray_constants.PROCESS_TYPE_MONITOR, check_alive=check_alive
  1398. )
  1399. def kill_gcs_server(self, check_alive: bool = True):
  1400. """Kill the gcs server.
  1401. Args:
  1402. check_alive: Raise an exception if the process was already
  1403. dead.
  1404. """
  1405. self._kill_process_type(
  1406. ray_constants.PROCESS_TYPE_GCS_SERVER, check_alive=check_alive, wait=True
  1407. )
  1408. # Clear GCS client and address to indicate no GCS server is running.
  1409. self._gcs_address = None
  1410. self._gcs_client = None
  1411. def kill_reaper(self, check_alive: bool = True):
  1412. """Kill the reaper process.
  1413. Args:
  1414. check_alive: Raise an exception if the process was already
  1415. dead.
  1416. """
  1417. self._kill_process_type(
  1418. ray_constants.PROCESS_TYPE_REAPER, check_alive=check_alive
  1419. )
  1420. def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False):
  1421. """Kill all of the processes.
  1422. Note that This is slower than necessary because it calls kill, wait,
  1423. kill, wait, ... instead of kill, kill, ..., wait, wait, ...
  1424. Args:
  1425. check_alive: Raise an exception if any of the processes were
  1426. already dead.
  1427. wait: If true, then this method will not return until the
  1428. process in question has exited.
  1429. """
  1430. # Kill the raylet first. This is important for suppressing errors at
  1431. # shutdown because we give the raylet a chance to exit gracefully and
  1432. # clean up its child worker processes. If we were to kill the plasma
  1433. # store (or Redis) first, that could cause the raylet to exit
  1434. # ungracefully, leading to more verbose output from the workers.
  1435. if ray_constants.PROCESS_TYPE_RAYLET in self.all_processes:
  1436. self._kill_process_type(
  1437. ray_constants.PROCESS_TYPE_RAYLET,
  1438. check_alive=check_alive,
  1439. allow_graceful=allow_graceful,
  1440. wait=wait,
  1441. )
  1442. if ray_constants.PROCESS_TYPE_GCS_SERVER in self.all_processes:
  1443. self._kill_process_type(
  1444. ray_constants.PROCESS_TYPE_GCS_SERVER,
  1445. check_alive=check_alive,
  1446. allow_graceful=allow_graceful,
  1447. wait=wait,
  1448. )
  1449. # We call "list" to copy the keys because we are modifying the
  1450. # dictionary while iterating over it.
  1451. for process_type in list(self.all_processes.keys()):
  1452. # Need to kill the reaper process last in case we die unexpectedly
  1453. # while cleaning up.
  1454. if process_type != ray_constants.PROCESS_TYPE_REAPER:
  1455. self._kill_process_type(
  1456. process_type,
  1457. check_alive=check_alive,
  1458. allow_graceful=allow_graceful,
  1459. wait=wait,
  1460. )
  1461. if ray_constants.PROCESS_TYPE_REAPER in self.all_processes:
  1462. self._kill_process_type(
  1463. ray_constants.PROCESS_TYPE_REAPER,
  1464. check_alive=check_alive,
  1465. allow_graceful=allow_graceful,
  1466. wait=wait,
  1467. )
  1468. def live_processes(self):
  1469. """Return a list of the live processes.
  1470. Returns:
  1471. A list of the live processes.
  1472. """
  1473. result = []
  1474. for process_type, process_infos in self.all_processes.items():
  1475. for process_info in process_infos:
  1476. if process_info.process.poll() is None:
  1477. result.append((process_type, process_info.process))
  1478. return result
  1479. def dead_processes(self):
  1480. """Return a list of the dead processes.
  1481. Note that this ignores processes that have been explicitly killed,
  1482. e.g., via a command like node.kill_raylet().
  1483. Returns:
  1484. A list of the dead processes ignoring the ones that have been
  1485. explicitly killed.
  1486. """
  1487. result = []
  1488. for process_type, process_infos in self.all_processes.items():
  1489. for process_info in process_infos:
  1490. if process_info.process.poll() is not None:
  1491. result.append((process_type, process_info.process))
  1492. return result
  1493. def any_processes_alive(self):
  1494. """Return true if any processes are still alive.
  1495. Returns:
  1496. True if any process is still alive.
  1497. """
  1498. return any(self.live_processes())
  1499. def remaining_processes_alive(self):
  1500. """Return true if all remaining processes are still alive.
  1501. Note that this ignores processes that have been explicitly killed,
  1502. e.g., via a command like node.kill_raylet().
  1503. Returns:
  1504. True if any process that wasn't explicitly killed is still alive.
  1505. """
  1506. return not any(self.dead_processes())
  1507. def destroy_external_storage(self):
  1508. object_spilling_config = self._config.get("object_spilling_config", {})
  1509. if object_spilling_config:
  1510. object_spilling_config = json.loads(object_spilling_config)
  1511. from ray._private import external_storage
  1512. storage = external_storage.setup_external_storage(
  1513. object_spilling_config, self._node_id, self._session_name
  1514. )
  1515. storage.destroy_external_storage()
  1516. def validate_external_storage(self):
  1517. """Make sure we can setup the object spilling external storage."""
  1518. automatic_spilling_enabled = self._config.get(
  1519. "automatic_object_spilling_enabled", True
  1520. )
  1521. if not automatic_spilling_enabled:
  1522. return
  1523. object_spilling_config = self._object_spilling_config
  1524. # Try setting up the storage.
  1525. # Configure the proper system config.
  1526. # We need to set both ray param's system config and self._config
  1527. # because they could've been diverged at this point.
  1528. deserialized_config = json.loads(object_spilling_config)
  1529. self._ray_params._system_config[
  1530. "object_spilling_config"
  1531. ] = object_spilling_config
  1532. self._config["object_spilling_config"] = object_spilling_config
  1533. is_external_storage_type_fs = deserialized_config["type"] == "filesystem"
  1534. self._ray_params._system_config[
  1535. "is_external_storage_type_fs"
  1536. ] = is_external_storage_type_fs
  1537. self._config["is_external_storage_type_fs"] = is_external_storage_type_fs
  1538. # Validate external storage usage.
  1539. from ray._private import external_storage
  1540. # Node ID is available only after GCS is connected. However,
  1541. # validate_external_storage() needs to be called before it to
  1542. # be able to validate the configs early. Therefore, we use a
  1543. # dummy node ID here and make sure external storage can be set
  1544. # up based on the provided config. This storage is destroyed
  1545. # right after the validation.
  1546. dummy_node_id = ray.NodeID.from_random().hex()
  1547. storage = external_storage.setup_external_storage(
  1548. deserialized_config, dummy_node_id, self._session_name
  1549. )
  1550. storage.destroy_external_storage()
  1551. external_storage.reset_external_storage()
  1552. def _get_object_spilling_config(self):
  1553. """Consolidate the object spilling config from the ray params, environment
  1554. variable, and system config. The object spilling directory specified through
  1555. ray params will override the one specified through environment variable and
  1556. system config."""
  1557. object_spilling_directory = self._ray_params.object_spilling_directory
  1558. if not object_spilling_directory:
  1559. object_spilling_directory = self._config.get(
  1560. "object_spilling_directory", ""
  1561. )
  1562. if not object_spilling_directory:
  1563. object_spilling_directory = os.environ.get(
  1564. "RAY_object_spilling_directory", ""
  1565. )
  1566. if object_spilling_directory:
  1567. return json.dumps(
  1568. {
  1569. "type": "filesystem",
  1570. "params": {"directory_path": object_spilling_directory},
  1571. }
  1572. )
  1573. object_spilling_config = self._config.get("object_spilling_config", {})
  1574. if not object_spilling_config:
  1575. object_spilling_config = os.environ.get("RAY_object_spilling_config", "")
  1576. # If the config is not specified in ray params, system config or environment
  1577. # variable, we fill up the default.
  1578. if not object_spilling_config:
  1579. object_spilling_config = json.dumps(
  1580. {"type": "filesystem", "params": {"directory_path": self._session_dir}}
  1581. )
  1582. else:
  1583. if not is_in_test():
  1584. logger.warning(
  1585. "The object spilling config is specified from an unstable "
  1586. "API - system config or environment variable. This is "
  1587. "subject to change in the future. You can use the stable "
  1588. "API - --object-spilling-directory in ray start or "
  1589. "object_spilling_directory in ray.init() to specify the "
  1590. "object spilling directory instead. If you need more "
  1591. "advanced settings, please open a github issue with the "
  1592. "Ray team."
  1593. )
  1594. return object_spilling_config
  1595. def _record_stats(self):
  1596. # This is only called when a new node is started.
  1597. # Initialize the internal kv so that the metrics can be put
  1598. from ray._common.usage.usage_lib import (
  1599. TagKey,
  1600. record_extra_usage_tag,
  1601. record_hardware_usage,
  1602. )
  1603. if not ray.experimental.internal_kv._internal_kv_initialized():
  1604. ray.experimental.internal_kv._initialize_internal_kv(self.get_gcs_client())
  1605. assert ray.experimental.internal_kv._internal_kv_initialized()
  1606. if self.head:
  1607. # record head node stats
  1608. gcs_storage_type = (
  1609. "redis" if os.environ.get("RAY_REDIS_ADDRESS") is not None else "memory"
  1610. )
  1611. record_extra_usage_tag(TagKey.GCS_STORAGE, gcs_storage_type)
  1612. cpu_model_name = ray._private.utils.get_current_node_cpu_model_name()
  1613. if cpu_model_name:
  1614. # CPU model name can be an arbitrary long string
  1615. # so we truncate it to the first 50 characters
  1616. # to avoid any issues.
  1617. record_hardware_usage(cpu_model_name[:50])