cluster_init.py 75 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922
  1. import copy
  2. import json
  3. import logging
  4. import os
  5. import signal
  6. import socket
  7. import sys
  8. import threading
  9. import time
  10. import uuid
  11. import warnings
  12. from threading import Event
  13. from typing import Dict, Optional, Tuple, Type
  14. import requests
  15. import yaml
  16. from packaging.version import Version
  17. import ray
  18. import ray._private.services
  19. from .databricks_hook import DefaultDatabricksRayOnSparkStartHook
  20. from .start_hook_base import RayOnSparkStartHook
  21. from .utils import (
  22. _get_cpu_cores,
  23. _get_local_ray_node_slots,
  24. _get_num_physical_gpus,
  25. _wait_service_up,
  26. calc_mem_ray_head_node,
  27. exec_cmd,
  28. gen_cmd_exec_failure_msg,
  29. get_avail_mem_per_ray_worker_node,
  30. get_configured_spark_executor_memory_bytes,
  31. get_max_num_concurrent_tasks,
  32. get_random_unused_port,
  33. get_spark_application_driver_host,
  34. get_spark_session,
  35. get_spark_task_assigned_physical_gpus,
  36. is_in_databricks_runtime,
  37. is_port_in_use,
  38. )
  39. from ray._common.network_utils import build_address, parse_address
  40. from ray._common.utils import load_class
  41. from ray.autoscaler._private.spark.node_provider import HEAD_NODE_ID
  42. from ray.util.annotations import DeveloperAPI, PublicAPI
  43. _logger = logging.getLogger("ray.util.spark")
  44. _logger.setLevel(logging.INFO)
  45. RAY_ON_SPARK_START_HOOK = "RAY_ON_SPARK_START_HOOK"
  46. MAX_NUM_WORKER_NODES = -1
  47. RAY_ON_SPARK_COLLECT_LOG_TO_PATH = "RAY_ON_SPARK_COLLECT_LOG_TO_PATH"
  48. RAY_ON_SPARK_START_RAY_PARENT_PID = "RAY_ON_SPARK_START_RAY_PARENT_PID"
  49. def _check_system_environment():
  50. if os.name != "posix":
  51. raise RuntimeError("Ray on spark only supports running on POSIX system.")
  52. spark_dependency_error = "ray.util.spark module requires pyspark >= 3.3"
  53. try:
  54. import pyspark
  55. if Version(pyspark.__version__).release < (3, 3, 0):
  56. raise RuntimeError(spark_dependency_error)
  57. except ImportError:
  58. raise RuntimeError(spark_dependency_error)
  59. class RayClusterOnSpark:
  60. """
  61. This class is the type of instance returned by the `_setup_ray_cluster` interface.
  62. Its main functionality is to:
  63. Connect to, disconnect from, and shutdown the Ray cluster running on Apache Spark.
  64. Serve as a Python context manager for the `RayClusterOnSpark` instance.
  65. Args
  66. address: The url for the ray head node (defined as the hostname and unused
  67. port on Spark driver node)
  68. head_proc: Ray head process
  69. spark_job_group_id: The Spark job id for a submitted ray job
  70. num_workers_node: The number of workers in the ray cluster.
  71. """
  72. def __init__(
  73. self,
  74. address,
  75. head_proc,
  76. min_worker_nodes,
  77. max_worker_nodes,
  78. temp_dir,
  79. cluster_unique_id,
  80. start_hook,
  81. ray_dashboard_port,
  82. spark_job_server,
  83. global_cluster_lock_fd,
  84. ray_client_server_port,
  85. ):
  86. self.address = address
  87. self.head_proc = head_proc
  88. self.min_worker_nodes = min_worker_nodes
  89. self.max_worker_nodes = max_worker_nodes
  90. self.temp_dir = temp_dir
  91. self.cluster_unique_id = cluster_unique_id
  92. self.start_hook = start_hook
  93. self.ray_dashboard_port = ray_dashboard_port
  94. self.spark_job_server = spark_job_server
  95. self.global_cluster_lock_fd = global_cluster_lock_fd
  96. self.ray_client_server_port = ray_client_server_port
  97. self.is_shutdown = False
  98. self.spark_job_is_canceled = False
  99. self.background_job_exception = None
  100. # Ray client context returns by `ray.init`
  101. self.ray_ctx = None
  102. def wait_until_ready(self):
  103. import ray
  104. if self.is_shutdown:
  105. raise RuntimeError(
  106. "The ray cluster has been shut down or it failed to start."
  107. )
  108. try:
  109. ray.init(address=self.address)
  110. if self.ray_dashboard_port is not None and _wait_service_up(
  111. parse_address(self.address)[0],
  112. self.ray_dashboard_port,
  113. _RAY_DASHBOARD_STARTUP_TIMEOUT,
  114. ):
  115. self.start_hook.on_ray_dashboard_created(self.ray_dashboard_port)
  116. else:
  117. try:
  118. __import__("ray.dashboard.optional_deps")
  119. except ModuleNotFoundError as e:
  120. _logger.warning(
  121. "Dependencies to launch the optional dashboard API "
  122. "server cannot be found. They can be installed with "
  123. f"pip install ray[default], root cause: ({repr(e)})"
  124. )
  125. last_alive_worker_count = 0
  126. last_progress_move_time = time.time()
  127. while True:
  128. time.sleep(_RAY_CLUSTER_STARTUP_PROGRESS_CHECKING_INTERVAL)
  129. # Inside the waiting ready loop,
  130. # checking `self.background_job_exception`, if it is not None,
  131. # it means the background spark job has failed,
  132. # in this case, raise error directly.
  133. if self.background_job_exception is not None:
  134. raise RuntimeError(
  135. "Ray workers failed to start."
  136. ) from self.background_job_exception
  137. cur_alive_worker_count = (
  138. len([node for node in ray.nodes() if node["Alive"]]) - 1
  139. ) # Minus 1 means excluding the head node.
  140. if cur_alive_worker_count >= self.min_worker_nodes:
  141. _logger.info(
  142. f"Started {cur_alive_worker_count} Ray worker nodes, "
  143. f"meet the minimum number of Ray worker nodes required."
  144. )
  145. return
  146. if cur_alive_worker_count > last_alive_worker_count:
  147. last_alive_worker_count = cur_alive_worker_count
  148. last_progress_move_time = time.time()
  149. _logger.info(
  150. "Ray worker nodes are starting. Progress: "
  151. f"({cur_alive_worker_count} / {self.max_worker_nodes})"
  152. )
  153. else:
  154. if (
  155. time.time() - last_progress_move_time
  156. > _RAY_CONNECT_CLUSTER_POLL_PROGRESS_TIMEOUT
  157. ):
  158. if cur_alive_worker_count == 0:
  159. (
  160. job_server_host,
  161. job_server_port,
  162. ) = self.spark_job_server.server_address[:2]
  163. response = requests.post(
  164. url=(
  165. f"http://{build_address(job_server_host, job_server_port)}"
  166. "/query_last_worker_err"
  167. ),
  168. json={"spark_job_group_id": None},
  169. )
  170. response.raise_for_status()
  171. decoded_resp = response.content.decode("utf-8")
  172. json_res = json.loads(decoded_resp)
  173. last_worker_err = json_res["last_worker_err"]
  174. if last_worker_err:
  175. raise RuntimeError(
  176. "Starting Ray worker node failed, error:\n"
  177. f"{last_worker_err}"
  178. )
  179. else:
  180. raise RuntimeError(
  181. "Current spark cluster has no resources to launch "
  182. "Ray worker nodes."
  183. )
  184. _logger.warning(
  185. "Timeout in waiting for minimal ray workers to start. "
  186. "Started / Total requested: "
  187. f"({cur_alive_worker_count} / {self.min_worker_nodes}). "
  188. "Current spark cluster does not have sufficient resources "
  189. "to launch requested minimal number of Ray worker nodes."
  190. )
  191. return
  192. finally:
  193. ray.shutdown()
  194. def connect(self):
  195. if ray.is_initialized():
  196. raise RuntimeError("Already connected to Ray cluster.")
  197. self.ray_ctx = ray.init(address=self.address)
  198. def disconnect(self):
  199. ray.shutdown()
  200. self.ray_ctx = None
  201. def shutdown(self):
  202. """
  203. Shutdown the ray cluster created by the `setup_ray_cluster` API.
  204. """
  205. import fcntl
  206. if not self.is_shutdown:
  207. try:
  208. self.disconnect()
  209. except Exception:
  210. pass
  211. os.environ.pop("RAY_ADDRESS", None)
  212. if self.global_cluster_lock_fd is not None:
  213. # release global mode cluster lock.
  214. fcntl.flock(self.global_cluster_lock_fd, fcntl.LOCK_UN)
  215. self.spark_job_server.shutdown()
  216. try:
  217. self.head_proc.terminate()
  218. except Exception as e:
  219. # swallow exception.
  220. _logger.warning(
  221. "An Error occurred during shutdown of ray head node: " f"{repr(e)}"
  222. )
  223. self.is_shutdown = True
  224. def __enter__(self):
  225. return self
  226. def __exit__(self, exc_type, exc_val, exc_tb):
  227. self.shutdown()
  228. def _convert_ray_node_option(key, value):
  229. converted_key = f"--{key.replace('_', '-')}"
  230. if key in ["system_config", "resources", "labels"]:
  231. return f"{converted_key}={json.dumps(value)}"
  232. if value is None:
  233. return converted_key
  234. return f"{converted_key}={str(value)}"
  235. def _convert_ray_node_options(options):
  236. return [_convert_ray_node_option(k, v) for k, v in options.items()]
  237. _RAY_HEAD_STARTUP_TIMEOUT = 20
  238. _RAY_DASHBOARD_STARTUP_TIMEOUT = 60
  239. _BACKGROUND_JOB_STARTUP_WAIT = int(
  240. os.environ.get("RAY_ON_SPARK_BACKGROUND_JOB_STARTUP_WAIT", "30")
  241. )
  242. _RAY_CLUSTER_STARTUP_PROGRESS_CHECKING_INTERVAL = 3
  243. _RAY_WORKER_NODE_STARTUP_INTERVAL = int(
  244. os.environ.get("RAY_ON_SPARK_RAY_WORKER_NODE_STARTUP_INTERVAL", "10")
  245. )
  246. _RAY_CONNECT_CLUSTER_POLL_PROGRESS_TIMEOUT = 120
  247. def _preallocate_ray_worker_port_range():
  248. """
  249. If we start multiple ray workers on a machine concurrently, some ray worker
  250. processes might fail due to ray port conflicts, this is because race condition
  251. on getting free port and opening the free port.
  252. To address the issue, this function use an exclusive file lock to delay the
  253. worker processes to ensure that port acquisition does not create a resource
  254. contention issue due to a race condition.
  255. After acquiring lock, it will allocate port range for worker ports
  256. (for ray node config --min-worker-port and --max-worker-port).
  257. Because on a spark cluster, multiple ray cluster might be created, so on one spark
  258. worker machine, there might be multiple ray worker nodes running, these worker
  259. nodes might belong to different ray cluster, and we must ensure these ray nodes on
  260. the same machine using non-overlapping worker port range, to achieve this, in this
  261. function, it creates a file `/tmp/ray_on_spark_worker_port_allocation.txt` file,
  262. the file format is composed of multiple lines, each line contains 2 number: `pid`
  263. and `port_range_slot_index`, each port range slot allocates 1000 ports, and
  264. corresponding port range is:
  265. - range_begin (inclusive): 20000 + port_range_slot_index * 1000
  266. - range_end (exclusive): range_begin + 1000
  267. In this function, it first scans `/tmp/ray_on_spark_worker_port_allocation.txt`
  268. file, removing lines that containing dead process pid, then find the first unused
  269. port_range_slot_index, then regenerate this file, and return the allocated port
  270. range.
  271. Returns: Allocated port range for current worker ports
  272. """
  273. import fcntl
  274. import psutil
  275. def acquire_lock(file_path):
  276. mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
  277. try:
  278. fd = os.open(file_path, mode)
  279. # The lock file must be readable / writable to all users.
  280. os.chmod(file_path, 0o0777)
  281. # Allow for retrying getting a file lock a maximum number of seconds
  282. max_lock_iter = 600
  283. for _ in range(max_lock_iter):
  284. try:
  285. fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
  286. except BlockingIOError:
  287. # Lock is used by other processes, continue loop to wait for lock
  288. # available
  289. pass
  290. else:
  291. # Acquire lock successfully.
  292. return fd
  293. time.sleep(10)
  294. raise TimeoutError(f"Acquiring lock on file {file_path} timeout.")
  295. except Exception:
  296. os.close(fd)
  297. lock_file_path = "/tmp/ray_on_spark_worker_startup_barrier_lock.lock"
  298. try:
  299. lock_fd = acquire_lock(lock_file_path)
  300. except TimeoutError:
  301. # If timeout happens, the file lock might be hold by another process and that
  302. # process does not release the lock in time by some unexpected reason.
  303. # In this case, remove the existing lock file and create the file again, and
  304. # then acquire file lock on the new file.
  305. try:
  306. os.remove(lock_file_path)
  307. except Exception:
  308. pass
  309. lock_fd = acquire_lock(lock_file_path)
  310. def release_lock():
  311. fcntl.flock(lock_fd, fcntl.LOCK_UN)
  312. os.close(lock_fd)
  313. try:
  314. port_alloc_file = "/tmp/ray_on_spark_worker_port_allocation.txt"
  315. # NB: reading / writing `port_alloc_file` is protected by exclusive lock
  316. # on file `lock_file_path`
  317. if os.path.exists(port_alloc_file):
  318. with open(port_alloc_file, mode="r") as fp:
  319. port_alloc_data = fp.read()
  320. port_alloc_table = [
  321. line.split(" ") for line in port_alloc_data.strip().split("\n")
  322. ]
  323. port_alloc_table = [
  324. (int(pid_str), int(slot_index_str))
  325. for pid_str, slot_index_str in port_alloc_table
  326. ]
  327. else:
  328. port_alloc_table = []
  329. with open(port_alloc_file, mode="w"):
  330. pass
  331. # The port range allocation file must be readable / writable to all users.
  332. os.chmod(port_alloc_file, 0o0777)
  333. port_alloc_map = {
  334. pid: slot_index
  335. for pid, slot_index in port_alloc_table
  336. if psutil.pid_exists(pid) # remove slot used by dead process
  337. }
  338. allocated_slot_set = set(port_alloc_map.values())
  339. if len(allocated_slot_set) == 0:
  340. new_slot_index = 0
  341. else:
  342. new_slot_index = max(allocated_slot_set) + 1
  343. for index in range(new_slot_index):
  344. if index not in allocated_slot_set:
  345. new_slot_index = index
  346. break
  347. port_alloc_map[os.getpid()] = new_slot_index
  348. with open(port_alloc_file, mode="w") as fp:
  349. for pid, slot_index in port_alloc_map.items():
  350. fp.write(f"{pid} {slot_index}\n")
  351. worker_port_range_begin = 20000 + new_slot_index * 1000
  352. worker_port_range_end = worker_port_range_begin + 1000
  353. if worker_port_range_end > 65536:
  354. raise RuntimeError(
  355. "Too many ray worker nodes are running on this machine, cannot "
  356. "allocate worker port range for new ray worker node."
  357. )
  358. except Exception:
  359. release_lock()
  360. raise
  361. def hold_lock():
  362. time.sleep(_RAY_WORKER_NODE_STARTUP_INTERVAL)
  363. release_lock()
  364. threading.Thread(target=hold_lock, args=()).start()
  365. return worker_port_range_begin, worker_port_range_end
  366. def _append_default_spilling_dir_config(head_node_options, object_spilling_dir):
  367. if "system_config" not in head_node_options:
  368. head_node_options["system_config"] = {}
  369. sys_conf = head_node_options["system_config"]
  370. if "object_spilling_config" not in sys_conf:
  371. sys_conf["object_spilling_config"] = json.dumps(
  372. {
  373. "type": "filesystem",
  374. "params": {
  375. "directory_path": object_spilling_dir,
  376. },
  377. }
  378. )
  379. return head_node_options
  380. def _append_resources_config(node_options, resources):
  381. if "resources" not in node_options:
  382. node_options["resources"] = {}
  383. node_options["resources"].update(resources)
  384. return node_options
  385. def _get_default_ray_tmp_dir():
  386. return os.path.join(os.environ.get("RAY_TMPDIR", "/tmp"), "ray")
  387. def _create_hook_entry(is_global):
  388. if RAY_ON_SPARK_START_HOOK in os.environ:
  389. return load_class(os.environ[RAY_ON_SPARK_START_HOOK])()
  390. elif is_in_databricks_runtime():
  391. return DefaultDatabricksRayOnSparkStartHook(is_global)
  392. else:
  393. return RayOnSparkStartHook(is_global)
  394. def _setup_ray_cluster(
  395. *,
  396. max_worker_nodes: int,
  397. min_worker_nodes: int,
  398. num_cpus_worker_node: int,
  399. num_cpus_head_node: int,
  400. num_gpus_worker_node: int,
  401. num_gpus_head_node: int,
  402. using_stage_scheduling: bool,
  403. heap_memory_worker_node: int,
  404. heap_memory_head_node: int,
  405. object_store_memory_worker_node: int,
  406. object_store_memory_head_node: int,
  407. head_node_options: Dict,
  408. worker_node_options: Dict,
  409. ray_temp_root_dir: str,
  410. collect_log_to_path: str,
  411. autoscale_upscaling_speed: float,
  412. autoscale_idle_timeout_minutes: float,
  413. is_global: bool,
  414. ) -> Type[RayClusterOnSpark]:
  415. """
  416. The public API `ray.util.spark.setup_ray_cluster` does some argument
  417. validation and then pass validated arguments to this interface.
  418. and it returns a `RayClusterOnSpark` instance.
  419. The returned instance can be used to connect to, disconnect from and shutdown the
  420. ray cluster. This instance can also be used as a context manager (used by
  421. encapsulating operations within `with _setup_ray_cluster(...):`). Upon entering the
  422. managed scope, the ray cluster is initiated and connected to. When exiting the
  423. scope, the ray cluster is disconnected and shut down.
  424. Note: This function interface is stable and can be used for
  425. instrumentation logging patching.
  426. """
  427. import fcntl
  428. start_hook = _create_hook_entry(is_global)
  429. spark = get_spark_session()
  430. ray_head_ip = socket.gethostbyname(get_spark_application_driver_host(spark))
  431. ray_head_port = get_random_unused_port(ray_head_ip, min_port=9000, max_port=10000)
  432. port_exclude_list = [ray_head_port]
  433. # Make a copy for head_node_options to avoid changing original dict in user code.
  434. head_node_options = head_node_options.copy()
  435. include_dashboard = head_node_options.pop("include_dashboard", None)
  436. ray_dashboard_port = head_node_options.pop("dashboard_port", None)
  437. if is_global:
  438. ray_client_server_port = 10001
  439. else:
  440. ray_client_server_port = get_random_unused_port(
  441. ray_head_ip,
  442. min_port=9000,
  443. max_port=10000,
  444. exclude_list=port_exclude_list,
  445. )
  446. port_exclude_list.append(ray_client_server_port)
  447. spark_job_server_port = get_random_unused_port(
  448. ray_head_ip,
  449. min_port=9000,
  450. max_port=10000,
  451. exclude_list=port_exclude_list,
  452. )
  453. port_exclude_list.append(spark_job_server_port)
  454. if include_dashboard is None or include_dashboard is True:
  455. if ray_dashboard_port is None:
  456. ray_dashboard_port = get_random_unused_port(
  457. ray_head_ip,
  458. min_port=9000,
  459. max_port=10000,
  460. exclude_list=port_exclude_list,
  461. )
  462. port_exclude_list.append(ray_dashboard_port)
  463. ray_dashboard_agent_port = get_random_unused_port(
  464. ray_head_ip,
  465. min_port=9000,
  466. max_port=10000,
  467. exclude_list=port_exclude_list,
  468. )
  469. port_exclude_list.append(ray_dashboard_agent_port)
  470. dashboard_options = [
  471. "--dashboard-host=0.0.0.0",
  472. f"--dashboard-port={ray_dashboard_port}",
  473. f"--dashboard-agent-listen-port={ray_dashboard_agent_port}",
  474. ]
  475. # If include_dashboard is None, we don't set `--include-dashboard` option,
  476. # in this case Ray will decide whether dashboard can be started
  477. # (e.g. checking any missing dependencies).
  478. if include_dashboard is True:
  479. dashboard_options += ["--include-dashboard=true"]
  480. else:
  481. dashboard_options = [
  482. "--include-dashboard=false",
  483. ]
  484. _logger.info(
  485. f"Ray head hostname: {ray_head_ip}, port: {ray_head_port}, "
  486. f"ray client server port: {ray_client_server_port}."
  487. )
  488. cluster_unique_id = uuid.uuid4().hex[:8]
  489. if is_global:
  490. # global mode enabled
  491. # for global mode, Ray always uses default temp dir
  492. # so that local Ray client can discover it without specifying
  493. # head node address.
  494. if ray_temp_root_dir is not None:
  495. raise ValueError(
  496. "Ray on spark global mode cluster does not allow you to set "
  497. "'ray_temp_root_dir' argument."
  498. )
  499. # We only allow user to launch one active Ray on spark global cluster
  500. # at a time. So acquiring a global file lock before setting up a new
  501. # Ray on spark global cluster.
  502. global_cluster_lock_fd = os.open(
  503. "/tmp/ray_on_spark_global_cluster.lock", os.O_RDWR | os.O_CREAT | os.O_TRUNC
  504. )
  505. try:
  506. # acquiring exclusive lock to ensure copy logs and removing dir safely.
  507. fcntl.flock(global_cluster_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
  508. except BlockingIOError:
  509. # acquiring global lock failed.
  510. raise ValueError(
  511. "Acquiring global lock failed for setting up new global mode Ray on "
  512. "spark cluster. If there is an active global mode Ray on spark "
  513. "cluster, please shut down it before you create a new one."
  514. )
  515. ray_temp_dir = None
  516. ray_default_tmp_dir = _get_default_ray_tmp_dir()
  517. os.makedirs(ray_default_tmp_dir, exist_ok=True)
  518. object_spilling_dir = os.path.join(ray_default_tmp_dir, "spill")
  519. else:
  520. global_cluster_lock_fd = None
  521. if ray_temp_root_dir is None:
  522. ray_temp_root_dir = start_hook.get_default_temp_root_dir()
  523. ray_temp_dir = os.path.join(
  524. ray_temp_root_dir, f"ray-{ray_head_port}-{cluster_unique_id}"
  525. )
  526. os.makedirs(ray_temp_dir, exist_ok=True)
  527. object_spilling_dir = os.path.join(ray_temp_dir, "spill")
  528. os.makedirs(object_spilling_dir, exist_ok=True)
  529. head_node_options = _append_default_spilling_dir_config(
  530. head_node_options, object_spilling_dir
  531. )
  532. from ray.autoscaler._private.spark.spark_job_server import (
  533. _start_spark_job_server,
  534. )
  535. ray_node_custom_env = start_hook.custom_environment_variables()
  536. spark_job_server = _start_spark_job_server(
  537. ray_head_ip, spark_job_server_port, spark, ray_node_custom_env
  538. )
  539. autoscaling_cluster = AutoscalingCluster(
  540. head_resources={
  541. "CPU": num_cpus_head_node,
  542. "GPU": num_gpus_head_node,
  543. "memory": heap_memory_head_node,
  544. "object_store_memory": object_store_memory_head_node,
  545. },
  546. worker_node_types={
  547. "ray.worker": {
  548. "resources": {
  549. "CPU": num_cpus_worker_node,
  550. "GPU": num_gpus_worker_node,
  551. "memory": heap_memory_worker_node,
  552. "object_store_memory": object_store_memory_worker_node,
  553. },
  554. "node_config": {},
  555. "min_workers": min_worker_nodes,
  556. "max_workers": max_worker_nodes,
  557. },
  558. },
  559. extra_provider_config={
  560. "ray_head_ip": ray_head_ip,
  561. "ray_head_port": ray_head_port,
  562. "cluster_unique_id": cluster_unique_id,
  563. "using_stage_scheduling": using_stage_scheduling,
  564. "ray_temp_dir": ray_temp_dir,
  565. "worker_node_options": worker_node_options,
  566. "collect_log_to_path": collect_log_to_path,
  567. "spark_job_server_port": spark_job_server_port,
  568. },
  569. upscaling_speed=autoscale_upscaling_speed,
  570. idle_timeout_minutes=autoscale_idle_timeout_minutes,
  571. )
  572. ray_head_proc, tail_output_deque = autoscaling_cluster.start(
  573. ray_head_ip,
  574. ray_head_port,
  575. ray_client_server_port,
  576. ray_temp_dir,
  577. dashboard_options,
  578. head_node_options,
  579. collect_log_to_path,
  580. ray_node_custom_env,
  581. )
  582. ray_head_node_cmd = autoscaling_cluster.ray_head_node_cmd
  583. # wait ray head node spin up.
  584. time.sleep(_RAY_HEAD_STARTUP_TIMEOUT)
  585. if not is_port_in_use(ray_head_ip, ray_head_port):
  586. if ray_head_proc.poll() is None:
  587. # Ray head GCS service is down. Kill ray head node.
  588. ray_head_proc.terminate()
  589. # wait killing complete.
  590. time.sleep(0.5)
  591. cmd_exec_failure_msg = gen_cmd_exec_failure_msg(
  592. ray_head_node_cmd, ray_head_proc.returncode, tail_output_deque
  593. )
  594. raise RuntimeError("Start Ray head node failed!\n" + cmd_exec_failure_msg)
  595. _logger.info("Ray head node started.")
  596. cluster_address = build_address(ray_head_ip, ray_head_port)
  597. # Set RAY_ADDRESS environment variable to the cluster address.
  598. os.environ["RAY_ADDRESS"] = cluster_address
  599. ray_cluster_handler = RayClusterOnSpark(
  600. address=cluster_address,
  601. head_proc=ray_head_proc,
  602. min_worker_nodes=min_worker_nodes,
  603. max_worker_nodes=max_worker_nodes,
  604. temp_dir=ray_temp_dir,
  605. cluster_unique_id=cluster_unique_id,
  606. start_hook=start_hook,
  607. ray_dashboard_port=ray_dashboard_port,
  608. spark_job_server=spark_job_server,
  609. global_cluster_lock_fd=global_cluster_lock_fd,
  610. ray_client_server_port=ray_client_server_port,
  611. )
  612. start_hook.on_cluster_created(ray_cluster_handler)
  613. return ray_cluster_handler
  614. _active_ray_cluster = None
  615. _active_ray_cluster_rwlock = threading.RLock()
  616. def _create_resource_profile(num_cpus_per_node, num_gpus_per_node):
  617. from pyspark.resource.profile import ResourceProfileBuilder
  618. from pyspark.resource.requests import TaskResourceRequests
  619. task_res_req = TaskResourceRequests().cpus(num_cpus_per_node)
  620. if num_gpus_per_node > 0:
  621. task_res_req = task_res_req.resource("gpu", num_gpus_per_node)
  622. return ResourceProfileBuilder().require(task_res_req).build
  623. # A dict storing blocked key to replacement argument you should use.
  624. _head_node_option_block_keys = {
  625. "temp_dir": "ray_temp_root_dir",
  626. "block": None,
  627. "head": None,
  628. "node_ip_address": None,
  629. "port": None,
  630. "num_cpus": None,
  631. "num_gpus": None,
  632. "dashboard_host": None,
  633. "dashboard_agent_listen_port": None,
  634. }
  635. _worker_node_option_block_keys = {
  636. "temp_dir": "ray_temp_root_dir",
  637. "block": None,
  638. "head": None,
  639. "address": None,
  640. "num_cpus": "num_cpus_worker_node",
  641. "num_gpus": "num_gpus_worker_node",
  642. "memory": None,
  643. "object_store_memory": "object_store_memory_worker_node",
  644. "dashboard_agent_listen_port": None,
  645. "min_worker_port": None,
  646. "max_worker_port": None,
  647. }
  648. def _verify_node_options(node_options, block_keys, node_type):
  649. for key in node_options:
  650. if key.startswith("--") or "-" in key:
  651. raise ValueError(
  652. "For a ray node option like '--foo-bar', you should convert it to "
  653. "following format 'foo_bar' in 'head_node_options' / "
  654. "'worker_node_options' arguments."
  655. )
  656. if key in block_keys:
  657. common_err_msg = (
  658. f"Setting the option '{key}' for {node_type} nodes is not allowed."
  659. )
  660. replacement_arg = block_keys[key]
  661. if replacement_arg:
  662. raise ValueError(
  663. f"{common_err_msg} You should set the '{replacement_arg}' option "
  664. "instead."
  665. )
  666. else:
  667. raise ValueError(
  668. f"{common_err_msg} This option is controlled by Ray on Spark."
  669. )
  670. def _setup_ray_cluster_internal(
  671. max_worker_nodes: int,
  672. min_worker_nodes: Optional[int],
  673. num_cpus_worker_node: Optional[int],
  674. num_cpus_head_node: Optional[int],
  675. num_gpus_worker_node: Optional[int],
  676. num_gpus_head_node: Optional[int],
  677. heap_memory_worker_node: Optional[int],
  678. heap_memory_head_node: Optional[int],
  679. object_store_memory_worker_node: Optional[int],
  680. object_store_memory_head_node: Optional[int],
  681. head_node_options: Optional[Dict],
  682. worker_node_options: Optional[Dict],
  683. ray_temp_root_dir: Optional[str],
  684. strict_mode: bool,
  685. collect_log_to_path: Optional[str],
  686. autoscale_upscaling_speed: Optional[float],
  687. autoscale_idle_timeout_minutes: Optional[float],
  688. is_global: bool,
  689. **kwargs,
  690. ) -> Tuple[str, str]:
  691. global _active_ray_cluster
  692. _check_system_environment()
  693. _install_sigterm_signal()
  694. head_node_options = head_node_options or {}
  695. worker_node_options = worker_node_options or {}
  696. _verify_node_options(
  697. head_node_options,
  698. _head_node_option_block_keys,
  699. "Ray head node on spark",
  700. )
  701. _verify_node_options(
  702. worker_node_options,
  703. _worker_node_option_block_keys,
  704. "Ray worker node on spark",
  705. )
  706. if _active_ray_cluster is not None:
  707. raise RuntimeError(
  708. "Current active ray cluster on spark haven't shut down. Please call "
  709. "`ray.util.spark.shutdown_ray_cluster()` before initiating a new Ray "
  710. "cluster on spark."
  711. )
  712. if ray.is_initialized():
  713. raise RuntimeError(
  714. "Current python process already initialized Ray, Please shut down it "
  715. "by `ray.shutdown()` before initiating a Ray cluster on spark."
  716. )
  717. spark = get_spark_session()
  718. spark_master = spark.sparkContext.master
  719. is_spark_local_mode = spark_master == "local" or spark_master.startswith("local[")
  720. if not (
  721. spark_master.startswith("spark://")
  722. or spark_master.startswith("local-cluster[")
  723. or spark_master == "yarn"
  724. or is_spark_local_mode
  725. ):
  726. raise RuntimeError(
  727. "Ray on Spark only supports spark cluster in standalone mode, "
  728. "local-cluster mode, spark on yarn mode or spark local mode."
  729. )
  730. if is_spark_local_mode:
  731. support_stage_scheduling = False
  732. elif (
  733. is_in_databricks_runtime()
  734. and Version(os.environ["DATABRICKS_RUNTIME_VERSION"]).major >= 12
  735. ):
  736. support_stage_scheduling = True
  737. else:
  738. import pyspark
  739. if Version(pyspark.__version__).release >= (3, 4, 0):
  740. support_stage_scheduling = True
  741. else:
  742. support_stage_scheduling = False
  743. if "num_cpus_per_node" in kwargs:
  744. if num_cpus_worker_node is not None:
  745. raise ValueError(
  746. "'num_cpus_per_node' and 'num_cpus_worker_node' arguments are "
  747. "equivalent. Only set 'num_cpus_worker_node'."
  748. )
  749. num_cpus_worker_node = kwargs["num_cpus_per_node"]
  750. warnings.warn(
  751. "'num_cpus_per_node' argument is deprecated, please use "
  752. "'num_cpus_worker_node' argument instead.",
  753. DeprecationWarning,
  754. )
  755. if "num_gpus_per_node" in kwargs:
  756. if num_gpus_worker_node is not None:
  757. raise ValueError(
  758. "'num_gpus_per_node' and 'num_gpus_worker_node' arguments are "
  759. "equivalent. Only set 'num_gpus_worker_node'."
  760. )
  761. num_gpus_worker_node = kwargs["num_gpus_per_node"]
  762. warnings.warn(
  763. "'num_gpus_per_node' argument is deprecated, please use "
  764. "'num_gpus_worker_node' argument instead.",
  765. DeprecationWarning,
  766. )
  767. if "object_store_memory_per_node" in kwargs:
  768. if object_store_memory_worker_node is not None:
  769. raise ValueError(
  770. "'object_store_memory_per_node' and 'object_store_memory_worker_node' "
  771. "arguments are equivalent. Only set "
  772. "'object_store_memory_worker_node'."
  773. )
  774. object_store_memory_worker_node = kwargs["object_store_memory_per_node"]
  775. warnings.warn(
  776. "'object_store_memory_per_node' argument is deprecated, please use "
  777. "'object_store_memory_worker_node' argument instead.",
  778. DeprecationWarning,
  779. )
  780. # Environment configurations within the Spark Session that dictate how many cpus
  781. # and gpus to use for each submitted spark task.
  782. num_spark_task_cpus = int(spark.sparkContext.getConf().get("spark.task.cpus", "1"))
  783. if num_cpus_worker_node is not None and num_cpus_worker_node <= 0:
  784. raise ValueError("Argument `num_cpus_worker_node` value must be > 0.")
  785. # note: spark.task.resource.gpu.amount config might be fractional value like 0.5
  786. default_num_spark_task_gpus = float(
  787. spark.sparkContext.getConf().get("spark.task.resource.gpu.amount", "0")
  788. )
  789. rounded_num_spark_task_gpus = int(default_num_spark_task_gpus)
  790. if default_num_spark_task_gpus > 0:
  791. warn_msg = (
  792. "You configured 'spark.task.resource.gpu.amount' to "
  793. f"{default_num_spark_task_gpus},"
  794. "we recommend setting this value to 0 so that Spark jobs do not "
  795. "reserve GPU resources, preventing Ray-on-Spark workloads from having the "
  796. "maximum number of GPUs available."
  797. )
  798. if is_in_databricks_runtime():
  799. from ray.util.spark.databricks_hook import (
  800. get_databricks_display_html_function,
  801. )
  802. get_databricks_display_html_function()(
  803. f"<b style='color:red;'>{warn_msg}</b>"
  804. )
  805. else:
  806. _logger.warning(warn_msg)
  807. if num_gpus_worker_node is not None and num_gpus_worker_node < 0:
  808. raise ValueError("Argument `num_gpus_worker_node` value must be >= 0.")
  809. def _get_spark_worker_resources(_):
  810. from ray.util.spark.utils import (
  811. _get_cpu_cores,
  812. _get_num_physical_gpus,
  813. _get_spark_worker_total_physical_memory,
  814. )
  815. num_cpus_spark_worker = _get_cpu_cores()
  816. num_gpus_spark_worker = _get_num_physical_gpus()
  817. total_mem_bytes = _get_spark_worker_total_physical_memory()
  818. return (
  819. num_cpus_spark_worker,
  820. num_gpus_spark_worker,
  821. total_mem_bytes,
  822. )
  823. (num_cpus_spark_worker, num_gpus_spark_worker, spark_worker_mem_bytes,) = (
  824. spark.sparkContext.parallelize([1], 1)
  825. .map(_get_spark_worker_resources)
  826. .collect()[0]
  827. )
  828. if num_cpus_worker_node is not None and num_gpus_worker_node is not None:
  829. if support_stage_scheduling:
  830. using_stage_scheduling = True
  831. res_profile = _create_resource_profile(
  832. num_cpus_worker_node, num_gpus_worker_node
  833. )
  834. else:
  835. raise ValueError(
  836. "Current spark version does not support stage scheduling, so that "
  837. "you cannot set the argument `num_cpus_worker_node` and "
  838. "`num_gpus_worker_node` values. Without setting the 2 arguments, "
  839. "per-Ray worker node will be assigned with number of "
  840. f"'spark.task.cpus' (equals to {num_spark_task_cpus}) cpu cores "
  841. "and rounded down number of 'spark.task.resource.gpu.amount' "
  842. f"(equals to {rounded_num_spark_task_gpus}) GPUs. To enable spark "
  843. f"stage scheduling, you need to upgrade spark to 3.4 version or use "
  844. "Databricks Runtime 12.x, and you cannot use spark local mode."
  845. )
  846. elif num_cpus_worker_node is None and num_gpus_worker_node is None:
  847. if support_stage_scheduling:
  848. # Make one Ray worker node using maximum CPU / GPU resources
  849. # of the whole spark worker node, this is the optimal
  850. # configuration.
  851. num_cpus_worker_node = num_cpus_spark_worker
  852. num_gpus_worker_node = num_gpus_spark_worker
  853. using_stage_scheduling = True
  854. res_profile = _create_resource_profile(
  855. num_cpus_worker_node, num_gpus_worker_node
  856. )
  857. else:
  858. using_stage_scheduling = False
  859. res_profile = None
  860. num_cpus_worker_node = num_spark_task_cpus
  861. num_gpus_worker_node = rounded_num_spark_task_gpus
  862. else:
  863. raise ValueError(
  864. "'num_cpus_worker_node' and 'num_gpus_worker_node' arguments must be"
  865. "set together or unset together."
  866. )
  867. (
  868. ray_worker_node_heap_mem_bytes,
  869. ray_worker_node_object_store_mem_bytes,
  870. ) = get_avail_mem_per_ray_worker_node(
  871. spark,
  872. heap_memory_worker_node,
  873. object_store_memory_worker_node,
  874. num_cpus_worker_node,
  875. num_gpus_worker_node,
  876. )
  877. spark_worker_ray_node_slots = _get_local_ray_node_slots(
  878. num_cpus_spark_worker,
  879. num_gpus_spark_worker,
  880. num_cpus_worker_node,
  881. num_gpus_worker_node,
  882. )
  883. spark_executor_memory_bytes = get_configured_spark_executor_memory_bytes(spark)
  884. spark_worker_required_memory_bytes = (
  885. spark_executor_memory_bytes
  886. + spark_worker_ray_node_slots
  887. * (ray_worker_node_heap_mem_bytes + ray_worker_node_object_store_mem_bytes)
  888. )
  889. if spark_worker_required_memory_bytes > 0.8 * spark_worker_mem_bytes:
  890. warn_msg = (
  891. "In each spark worker node, we recommend making the sum of "
  892. "'spark_executor_memory + num_Ray_worker_nodes_per_spark_worker * "
  893. "(memory_worker_node + object_store_memory_worker_node)' to be less than "
  894. "'spark_worker_physical_memory * 0.8', otherwise it might lead to "
  895. "spark worker physical memory exhaustion and Ray task OOM errors."
  896. )
  897. if is_in_databricks_runtime():
  898. from ray.util.spark.databricks_hook import (
  899. get_databricks_display_html_function,
  900. )
  901. get_databricks_display_html_function()(
  902. f"<b style='background-color:Cyan;'>{warn_msg}<br></b>"
  903. )
  904. else:
  905. _logger.warning(warn_msg)
  906. if "num_worker_nodes" in kwargs:
  907. raise ValueError(
  908. "'num_worker_nodes' argument is removed, please set "
  909. "'max_worker_nodes' and 'min_worker_nodes' argument instead."
  910. )
  911. if max_worker_nodes == MAX_NUM_WORKER_NODES:
  912. if min_worker_nodes is not None:
  913. raise ValueError(
  914. "If you set 'max_worker_nodes' to 'MAX_NUM_WORKER_NODES', autoscaling "
  915. "is not supported, so that you cannot set 'min_worker_nodes' argument "
  916. "and 'min_worker_nodes' is automatically set to be equal to "
  917. "'max_worker_nodes'."
  918. )
  919. # max_worker_nodes=MAX_NUM_WORKER_NODES represents using all available
  920. # spark task slots
  921. max_worker_nodes = get_max_num_concurrent_tasks(spark.sparkContext, res_profile)
  922. min_worker_nodes = max_worker_nodes
  923. elif max_worker_nodes <= 0:
  924. raise ValueError(
  925. "The value of 'max_worker_nodes' argument must be either a positive "
  926. "integer or 'ray.util.spark.MAX_NUM_WORKER_NODES'."
  927. )
  928. if "autoscale" in kwargs:
  929. raise ValueError(
  930. "'autoscale' argument is removed. You can set 'min_worker_nodes' argument "
  931. "to be less than 'max_worker_nodes' to make autoscaling enabled."
  932. )
  933. if min_worker_nodes is None:
  934. min_worker_nodes = max_worker_nodes
  935. elif not (0 <= min_worker_nodes <= max_worker_nodes):
  936. raise ValueError(
  937. "The value of 'max_worker_nodes' argument must be an integer >= 0 "
  938. "and <= 'max_worker_nodes'"
  939. )
  940. insufficient_resources = []
  941. if num_cpus_worker_node < 4:
  942. insufficient_resources.append(
  943. "The provided CPU resources for each ray worker are inadequate to start "
  944. "a ray cluster. Based on the total cpu resources available and the "
  945. "configured task sizing, each ray worker node would start with "
  946. f"{num_cpus_worker_node} CPU cores. This is less than the recommended "
  947. "value of `4` CPUs per worker. On spark version >= 3.4 or Databricks "
  948. "Runtime 12.x, you can set the argument `num_cpus_worker_node` to "
  949. "a value >= 4 to address it, otherwise you need to increase the spark "
  950. "application configuration 'spark.task.cpus' to a minimum of `4` to "
  951. "address it."
  952. )
  953. if ray_worker_node_heap_mem_bytes < 10 * 1024 * 1024 * 1024:
  954. insufficient_resources.append(
  955. "The provided memory resources for each ray worker node are inadequate. "
  956. "Based on the total memory available on the spark cluster and the "
  957. "configured task sizing, each ray worker would start with "
  958. f"{ray_worker_node_heap_mem_bytes} bytes heap memory. This is less than "
  959. "the recommended value of 10GB. The ray worker node heap memory size is "
  960. "calculated by "
  961. "(SPARK_WORKER_PHYSICAL_MEMORY / num_local_spark_task_slots * 0.8) - "
  962. "object_store_memory_worker_node. To increase the heap space available, "
  963. "increase the memory in the spark cluster by using instance types with "
  964. "larger memory, or increase number of CPU/GPU per Ray worker node "
  965. "(so it leads to less Ray worker node slots per spark worker node), "
  966. "or apply a lower `object_store_memory_worker_node`."
  967. )
  968. if insufficient_resources:
  969. if strict_mode:
  970. raise ValueError(
  971. "You are creating ray cluster on spark with strict mode (it can be "
  972. "disabled by setting argument 'strict_mode=False' when calling API "
  973. "'setup_ray_cluster'), strict mode requires the spark cluster config "
  974. "satisfying following criterion: "
  975. "\n".join(insufficient_resources)
  976. )
  977. else:
  978. _logger.warning("\n".join(insufficient_resources))
  979. if num_cpus_head_node is None:
  980. if is_global:
  981. num_cpus_head_node = _get_cpu_cores()
  982. else:
  983. num_cpus_head_node = 0
  984. else:
  985. if num_cpus_head_node < 0:
  986. raise ValueError(
  987. "Argument `num_cpus_head_node` value must be >= 0. "
  988. f"Current value is {num_cpus_head_node}."
  989. )
  990. if num_gpus_head_node is None:
  991. if is_global:
  992. try:
  993. num_gpus_head_node = _get_num_physical_gpus()
  994. except Exception:
  995. num_gpus_head_node = 0
  996. else:
  997. num_gpus_head_node = 0
  998. else:
  999. if num_gpus_head_node < 0:
  1000. raise ValueError(
  1001. "Argument `num_gpus_head_node` value must be >= 0."
  1002. f"Current value is {num_gpus_head_node}."
  1003. )
  1004. if (
  1005. num_cpus_head_node == 0
  1006. and num_gpus_head_node == 0
  1007. and object_store_memory_head_node is None
  1008. ):
  1009. # Because tasks that require CPU or GPU resources are not scheduled to Ray
  1010. # head node, and user does not set `object_store_memory_head_node` explicitly,
  1011. # limit the heap memory and object store memory allocation to the
  1012. # head node, in order to save spark driver memory.
  1013. heap_memory_head_node = 1024 * 1024 * 1024
  1014. object_store_memory_head_node = 1024 * 1024 * 1024
  1015. else:
  1016. heap_memory_head_node, object_store_memory_head_node = calc_mem_ray_head_node(
  1017. heap_memory_head_node, object_store_memory_head_node
  1018. )
  1019. with _active_ray_cluster_rwlock:
  1020. cluster = _setup_ray_cluster(
  1021. max_worker_nodes=max_worker_nodes,
  1022. min_worker_nodes=min_worker_nodes,
  1023. num_cpus_worker_node=num_cpus_worker_node,
  1024. num_cpus_head_node=num_cpus_head_node,
  1025. num_gpus_worker_node=num_gpus_worker_node,
  1026. num_gpus_head_node=num_gpus_head_node,
  1027. using_stage_scheduling=using_stage_scheduling,
  1028. heap_memory_worker_node=ray_worker_node_heap_mem_bytes,
  1029. heap_memory_head_node=heap_memory_head_node,
  1030. object_store_memory_worker_node=ray_worker_node_object_store_mem_bytes,
  1031. object_store_memory_head_node=object_store_memory_head_node,
  1032. head_node_options=head_node_options,
  1033. worker_node_options=worker_node_options,
  1034. ray_temp_root_dir=ray_temp_root_dir,
  1035. collect_log_to_path=collect_log_to_path,
  1036. autoscale_upscaling_speed=autoscale_upscaling_speed,
  1037. autoscale_idle_timeout_minutes=autoscale_idle_timeout_minutes,
  1038. is_global=is_global,
  1039. )
  1040. # set global _active_ray_cluster to be the
  1041. # started cluster.
  1042. _active_ray_cluster = cluster
  1043. try:
  1044. cluster.wait_until_ready() # NB: this line might raise error.
  1045. except Exception as e:
  1046. try:
  1047. shutdown_ray_cluster()
  1048. except Exception:
  1049. pass
  1050. raise RuntimeError("Launch Ray-on-Spark cluster failed") from e
  1051. head_ip = parse_address(cluster.address)[0]
  1052. remote_connection_address = (
  1053. f"ray://{build_address(head_ip, cluster.ray_client_server_port)}"
  1054. )
  1055. return cluster.address, remote_connection_address
  1056. @PublicAPI
  1057. def setup_ray_cluster(
  1058. *,
  1059. max_worker_nodes: int,
  1060. min_worker_nodes: Optional[int] = None,
  1061. num_cpus_worker_node: Optional[int] = None,
  1062. num_cpus_head_node: Optional[int] = None,
  1063. num_gpus_worker_node: Optional[int] = None,
  1064. num_gpus_head_node: Optional[int] = None,
  1065. memory_worker_node: Optional[int] = None,
  1066. memory_head_node: Optional[int] = None,
  1067. object_store_memory_worker_node: Optional[int] = None,
  1068. object_store_memory_head_node: Optional[int] = None,
  1069. head_node_options: Optional[Dict] = None,
  1070. worker_node_options: Optional[Dict] = None,
  1071. ray_temp_root_dir: Optional[str] = None,
  1072. strict_mode: bool = False,
  1073. collect_log_to_path: Optional[str] = None,
  1074. autoscale_upscaling_speed: Optional[float] = 1.0,
  1075. autoscale_idle_timeout_minutes: Optional[float] = 1.0,
  1076. **kwargs,
  1077. ) -> Tuple[str, str]:
  1078. """
  1079. Set up a ray cluster on the spark cluster by starting a ray head node in the
  1080. spark application's driver side node.
  1081. After creating the head node, a background spark job is created that
  1082. generates an instance of `RayClusterOnSpark` that contains configuration for the
  1083. ray cluster that will run on the Spark cluster's worker nodes.
  1084. After a ray cluster is set up, "RAY_ADDRESS" environment variable is set to
  1085. the cluster address, so you can call `ray.init()` without specifying ray cluster
  1086. address to connect to the cluster. To shut down the cluster you can call
  1087. `ray.util.spark.shutdown_ray_cluster()`.
  1088. Note: If the active ray cluster haven't shut down, you cannot create a new ray
  1089. cluster.
  1090. Args:
  1091. max_worker_nodes: This argument represents maximum ray worker nodes to start
  1092. for the ray cluster. you can
  1093. specify the `max_worker_nodes` as `ray.util.spark.MAX_NUM_WORKER_NODES`
  1094. represents a ray cluster
  1095. configuration that will use all available resources configured for the
  1096. spark application.
  1097. To create a spark application that is intended to exclusively run a
  1098. shared ray cluster in non-scaling, it is recommended to set this argument
  1099. to `ray.util.spark.MAX_NUM_WORKER_NODES`.
  1100. min_worker_nodes: Minimal number of worker nodes (default `None`),
  1101. if "max_worker_nodes" value is equal to "min_worker_nodes" argument,
  1102. or "min_worker_nodes" argument value is None, then autoscaling is disabled
  1103. and Ray cluster is launched with fixed number "max_worker_nodes" of
  1104. Ray worker nodes, otherwise autoscaling is enabled.
  1105. num_cpus_worker_node: Number of cpus available to per-ray worker node, if not
  1106. provided, if spark stage scheduling is supported, 'num_cpus_head_node'
  1107. value equals to number of cpu cores per spark worker node, otherwise
  1108. it uses spark application configuration 'spark.task.cpus' instead.
  1109. **Limitation** Only spark version >= 3.4 or Databricks Runtime 12.x
  1110. supports setting this argument.
  1111. num_cpus_head_node: Number of cpus available to Ray head node, if not provide,
  1112. if it is global mode Ray cluster, use number of cpu cores in spark driver
  1113. node, otherwise use 0 instead.
  1114. use 0 instead. Number 0 means tasks requiring CPU resources are not
  1115. scheduled to Ray head node.
  1116. num_gpus_worker_node: Number of gpus available to per-ray worker node, if not
  1117. provided, if spark stage scheduling is supported, 'num_gpus_worker_node'
  1118. value equals to number of GPUs per spark worker node, otherwise
  1119. it uses rounded down value of spark application configuration
  1120. 'spark.task.resource.gpu.amount' instead.
  1121. This argument is only available on spark cluster that is configured with
  1122. 'gpu' resources.
  1123. **Limitation** Only spark version >= 3.4 or Databricks Runtime 12.x
  1124. supports setting this argument.
  1125. num_gpus_head_node: Number of gpus available to Ray head node, if not provide,
  1126. if it is global mode Ray cluster, use number of GPUs in spark driver node,
  1127. otherwise use 0 instead.
  1128. This argument is only available on spark cluster which spark driver node
  1129. has GPUs.
  1130. memory_worker_node: Optional[int]:
  1131. Heap memory configured for Ray worker node. This is basically setting
  1132. `--memory` option when starting Ray node by `ray start` command.
  1133. memory_head_node: Optional[int]:
  1134. Heap memory configured for Ray head node. This is basically setting
  1135. `--memory` option when starting Ray node by `ray start` command.
  1136. object_store_memory_worker_node: Object store memory available to per-ray worker
  1137. node, but it is capped by
  1138. "dev_shm_available_size * 0.8 / num_tasks_per_spark_worker".
  1139. The default value equals to
  1140. "0.3 * spark_worker_physical_memory * 0.8 / num_tasks_per_spark_worker".
  1141. object_store_memory_head_node: Object store memory available to Ray head
  1142. node, but it is capped by "dev_shm_available_size * 0.8".
  1143. The default value equals to
  1144. "0.3 * spark_driver_physical_memory * 0.8".
  1145. head_node_options: A dict representing Ray head node extra options, these
  1146. options will be passed to `ray start` script. Note you need to convert
  1147. `ray start` options key from `--foo-bar` format to `foo_bar` format.
  1148. For flag options (e.g. '--disable-usage-stats'), you should set the value
  1149. to None in the option dict, like `{"disable_usage_stats": None}`.
  1150. Note: Short name options (e.g. '-v') are not supported.
  1151. worker_node_options: A dict representing Ray worker node extra options,
  1152. these options will be passed to `ray start` script. Note you need to
  1153. convert `ray start` options key from `--foo-bar` format to `foo_bar`
  1154. format.
  1155. For flag options (e.g. '--disable-usage-stats'), you should set the value
  1156. to None in the option dict, like `{"disable_usage_stats": None}`.
  1157. Note: Short name options (e.g. '-v') are not supported.
  1158. ray_temp_root_dir: A local disk path to store the ray temporary data. The
  1159. created cluster will create a subdirectory
  1160. "ray-{head_port}-{random_suffix}" beneath this path.
  1161. strict_mode: Boolean flag to fast-fail initialization of the ray cluster if
  1162. the available spark cluster does not have sufficient resources to fulfill
  1163. the resource allocation for memory, cpu and gpu. When set to true, if the
  1164. requested resources are not available for recommended minimum recommended
  1165. functionality, an exception will be raised that details the inadequate
  1166. spark cluster configuration settings. If overridden as `False`,
  1167. a warning is raised.
  1168. collect_log_to_path: If specified, after ray head / worker nodes terminated,
  1169. collect their logs to the specified path. On Databricks Runtime, we
  1170. recommend you to specify a local path starts with '/dbfs/', because the
  1171. path mounts with a centralized storage device and stored data is persisted
  1172. after Databricks spark cluster terminated.
  1173. autoscale_upscaling_speed: If autoscale enabled, it represents the number of
  1174. nodes allowed to be pending as a multiple of the current number of nodes.
  1175. The higher the value, the more aggressive upscaling will be. For example,
  1176. if this is set to 1.0, the cluster can grow in size by at most 100% at any
  1177. time, so if the cluster currently has 20 nodes, at most 20 pending launches
  1178. are allowed. The minimum number of pending launches is 5 regardless of
  1179. this setting.
  1180. Default value is 1.0, minimum value is 1.0
  1181. autoscale_idle_timeout_minutes: If autoscale enabled, it represents the number
  1182. of minutes that need to pass before an idle worker node is removed by the
  1183. autoscaler. The smaller the value, the more aggressive downscaling will be.
  1184. Worker nodes are considered idle when they hold no active tasks, actors,
  1185. or referenced objects (either in-memory or spilled to disk). This parameter
  1186. does not affect the head node.
  1187. Default value is 1.0, minimum value is 0
  1188. Returns:
  1189. returns a tuple of (address, remote_connection_address)
  1190. "address" is in format of "<ray_head_node_ip>:<port>"
  1191. "remote_connection_address" is in format of
  1192. "ray://<ray_head_node_ip>:<ray-client-server-port>",
  1193. if your client runs on a machine that also hosts a Ray cluster node locally,
  1194. you can connect to the Ray cluster via ``ray.init(address)``,
  1195. otherwise you can connect to the Ray cluster via
  1196. ``ray.init(remote_connection_address)``.
  1197. """
  1198. return _setup_ray_cluster_internal(
  1199. max_worker_nodes=max_worker_nodes,
  1200. min_worker_nodes=min_worker_nodes,
  1201. num_cpus_worker_node=num_cpus_worker_node,
  1202. num_cpus_head_node=num_cpus_head_node,
  1203. num_gpus_worker_node=num_gpus_worker_node,
  1204. num_gpus_head_node=num_gpus_head_node,
  1205. heap_memory_worker_node=memory_worker_node,
  1206. heap_memory_head_node=memory_head_node,
  1207. object_store_memory_worker_node=object_store_memory_worker_node,
  1208. object_store_memory_head_node=object_store_memory_head_node,
  1209. head_node_options=head_node_options,
  1210. worker_node_options=worker_node_options,
  1211. ray_temp_root_dir=ray_temp_root_dir,
  1212. strict_mode=strict_mode,
  1213. collect_log_to_path=collect_log_to_path,
  1214. autoscale_upscaling_speed=autoscale_upscaling_speed,
  1215. autoscale_idle_timeout_minutes=autoscale_idle_timeout_minutes,
  1216. is_global=False,
  1217. **kwargs,
  1218. )
  1219. @PublicAPI
  1220. def setup_global_ray_cluster(
  1221. *,
  1222. max_worker_nodes: int,
  1223. is_blocking: bool = True,
  1224. min_worker_nodes: Optional[int] = None,
  1225. num_cpus_worker_node: Optional[int] = None,
  1226. num_cpus_head_node: Optional[int] = None,
  1227. num_gpus_worker_node: Optional[int] = None,
  1228. num_gpus_head_node: Optional[int] = None,
  1229. memory_worker_node: Optional[int] = None,
  1230. memory_head_node: Optional[int] = None,
  1231. object_store_memory_worker_node: Optional[int] = None,
  1232. object_store_memory_head_node: Optional[int] = None,
  1233. head_node_options: Optional[Dict] = None,
  1234. worker_node_options: Optional[Dict] = None,
  1235. strict_mode: bool = False,
  1236. collect_log_to_path: Optional[str] = None,
  1237. autoscale_upscaling_speed: Optional[float] = 1.0,
  1238. autoscale_idle_timeout_minutes: Optional[float] = 1.0,
  1239. ):
  1240. """
  1241. Set up a global mode cluster.
  1242. The global Ray on spark cluster means:
  1243. - You can only create one active global Ray on spark cluster at a time.
  1244. On databricks cluster, the global Ray cluster can be used by all users,
  1245. - as contrast, non-global Ray cluster can only be used by current notebook
  1246. user.
  1247. - It is up persistently without automatic shutdown.
  1248. - On databricks notebook, you can connect to the global cluster by calling
  1249. ``ray.init()`` without specifying its address, it will discover the
  1250. global cluster automatically if it is up.
  1251. For global mode, the ``ray_temp_root_dir`` argument is not supported.
  1252. Global model Ray cluster always use the default Ray temporary directory
  1253. path.
  1254. All arguments are the same with ``setup_ray_cluster`` API except that:
  1255. - the ``ray_temp_root_dir`` argument is not supported.
  1256. Global model Ray cluster always use the default Ray temporary directory
  1257. path.
  1258. - A new argument "is_blocking" (default ``True``) is added.
  1259. If "is_blocking" is True,
  1260. then keep the call blocking until it is interrupted.
  1261. once the call is interrupted, the global Ray on spark cluster is shut down and
  1262. `setup_global_ray_cluster` call terminates.
  1263. If "is_blocking" is False,
  1264. once Ray cluster setup completes, return immediately.
  1265. """
  1266. cluster_address = _setup_ray_cluster_internal(
  1267. max_worker_nodes=max_worker_nodes,
  1268. min_worker_nodes=min_worker_nodes,
  1269. num_cpus_worker_node=num_cpus_worker_node,
  1270. num_cpus_head_node=num_cpus_head_node,
  1271. num_gpus_worker_node=num_gpus_worker_node,
  1272. num_gpus_head_node=num_gpus_head_node,
  1273. heap_memory_worker_node=memory_worker_node,
  1274. heap_memory_head_node=memory_head_node,
  1275. object_store_memory_worker_node=object_store_memory_worker_node,
  1276. object_store_memory_head_node=object_store_memory_head_node,
  1277. head_node_options=head_node_options,
  1278. worker_node_options=worker_node_options,
  1279. ray_temp_root_dir=None,
  1280. strict_mode=strict_mode,
  1281. collect_log_to_path=collect_log_to_path,
  1282. autoscale_upscaling_speed=autoscale_upscaling_speed,
  1283. autoscale_idle_timeout_minutes=autoscale_idle_timeout_minutes,
  1284. is_global=True,
  1285. )
  1286. if not is_blocking:
  1287. return cluster_address
  1288. global _global_ray_cluster_cancel_event
  1289. try:
  1290. _global_ray_cluster_cancel_event = Event()
  1291. # serve forever until user cancel the command.
  1292. _global_ray_cluster_cancel_event.wait()
  1293. finally:
  1294. _global_ray_cluster_cancel_event = None
  1295. # once the program is interrupted,
  1296. # or the corresponding databricks notebook command is interrupted
  1297. # shut down the Ray cluster.
  1298. shutdown_ray_cluster()
  1299. def _start_ray_worker_nodes(
  1300. *,
  1301. spark_job_server,
  1302. spark_job_group_id,
  1303. spark_job_group_desc,
  1304. num_worker_nodes,
  1305. using_stage_scheduling,
  1306. ray_head_ip,
  1307. ray_head_port,
  1308. ray_temp_dir,
  1309. num_cpus_per_node,
  1310. num_gpus_per_node,
  1311. heap_memory_per_node,
  1312. object_store_memory_per_node,
  1313. worker_node_options,
  1314. collect_log_to_path,
  1315. node_id,
  1316. ):
  1317. # NB:
  1318. # In order to start ray worker nodes on spark cluster worker machines,
  1319. # We launch a background spark job:
  1320. # 1. Each spark task launches one ray worker node. This design ensures all ray
  1321. # worker nodes have the same shape (same cpus / gpus / memory configuration).
  1322. # If ray worker nodes have a non-uniform shape, the Ray cluster setup will
  1323. # be non-deterministic and could create issues with node sizing.
  1324. # 2. A ray worker node is started via the `ray start` CLI. In each spark task,
  1325. # a child process is started and will execute a `ray start ...` command in
  1326. # blocking mode.
  1327. # 3. Each task will acquire a file lock for 10s to ensure that the ray worker
  1328. # init will acquire a port connection to the ray head node that does not
  1329. # contend with other worker processes on the same Spark worker node.
  1330. # 4. When the ray cluster is shutdown, killing ray worker nodes is implemented by
  1331. # `sparkContext.cancelJobGroup` to cancel the background spark job, sending a
  1332. # SIGKILL signal to all spark tasks. Once the spark tasks are killed,
  1333. # `ray_start_node` process detects parent died event then it kills ray
  1334. # worker node.
  1335. spark = spark_job_server.spark
  1336. spark_job_server_port = spark_job_server.server_address[1]
  1337. ray_node_custom_env = spark_job_server.ray_node_custom_env
  1338. def ray_cluster_job_mapper(_):
  1339. from pyspark.taskcontext import TaskContext
  1340. _worker_logger = logging.getLogger("ray.util.spark.worker")
  1341. context = TaskContext.get()
  1342. (
  1343. worker_port_range_begin,
  1344. worker_port_range_end,
  1345. ) = _preallocate_ray_worker_port_range()
  1346. # 10001 is used as ray client server port of global mode ray cluster.
  1347. ray_worker_node_dashboard_agent_port = get_random_unused_port(
  1348. ray_head_ip, min_port=10002, max_port=20000
  1349. )
  1350. ray_worker_node_cmd = [
  1351. sys.executable,
  1352. "-m",
  1353. "ray.util.spark.start_ray_node",
  1354. f"--num-cpus={num_cpus_per_node}",
  1355. "--block",
  1356. f"--address={build_address(ray_head_ip, ray_head_port)}",
  1357. f"--memory={heap_memory_per_node}",
  1358. f"--object-store-memory={object_store_memory_per_node}",
  1359. f"--min-worker-port={worker_port_range_begin}",
  1360. f"--max-worker-port={worker_port_range_end - 1}",
  1361. f"--dashboard-agent-listen-port={ray_worker_node_dashboard_agent_port}",
  1362. *_convert_ray_node_options(worker_node_options),
  1363. ]
  1364. if ray_temp_dir is not None:
  1365. ray_worker_node_cmd.append(f"--temp-dir={ray_temp_dir}")
  1366. ray_worker_node_extra_envs = {
  1367. RAY_ON_SPARK_COLLECT_LOG_TO_PATH: collect_log_to_path or "",
  1368. RAY_ON_SPARK_START_RAY_PARENT_PID: str(os.getpid()),
  1369. "RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER": "1",
  1370. **ray_node_custom_env,
  1371. }
  1372. if num_gpus_per_node > 0:
  1373. task_resources = context.resources()
  1374. if "gpu" not in task_resources:
  1375. raise RuntimeError(
  1376. "Couldn't get the gpu id, Please check the GPU resource "
  1377. "configuration"
  1378. )
  1379. gpu_addr_list = [
  1380. int(addr.strip()) for addr in task_resources["gpu"].addresses
  1381. ]
  1382. available_physical_gpus = get_spark_task_assigned_physical_gpus(
  1383. gpu_addr_list
  1384. )
  1385. ray_worker_node_cmd.append(
  1386. f"--num-gpus={len(available_physical_gpus)}",
  1387. )
  1388. ray_worker_node_extra_envs["CUDA_VISIBLE_DEVICES"] = ",".join(
  1389. [str(gpu_id) for gpu_id in available_physical_gpus]
  1390. )
  1391. _worker_logger.info(
  1392. f"Start Ray worker, command: {' '.join(ray_worker_node_cmd)}"
  1393. )
  1394. try:
  1395. is_task_reschedule_failure = False
  1396. # Check node id availability
  1397. response = requests.post(
  1398. url=(
  1399. f"http://{build_address(ray_head_ip, spark_job_server_port)}"
  1400. "/check_node_id_availability"
  1401. ),
  1402. json={
  1403. "node_id": node_id,
  1404. "spark_job_group_id": spark_job_group_id,
  1405. },
  1406. )
  1407. if not response.json()["available"]:
  1408. # The case happens when a Ray node is down unexpected
  1409. # caused by spark worker node down and spark tries to
  1410. # reschedule the spark task, so it triggers node
  1411. # creation with duplicated node id.
  1412. # in this case, finish the spark task immediately
  1413. # so spark won't try to reschedule this task
  1414. # and Ray autoscaler will trigger a new node creation
  1415. # with new node id, and a new spark job will be created
  1416. # for holding it.
  1417. is_task_reschedule_failure = True
  1418. raise RuntimeError(
  1419. "Starting Ray worker node twice with the same node id "
  1420. "is not allowed."
  1421. )
  1422. # Notify job server the task has been launched.
  1423. requests.post(
  1424. url=(
  1425. f"http://{build_address(ray_head_ip, spark_job_server_port)}"
  1426. "/notify_task_launched"
  1427. ),
  1428. json={
  1429. "spark_job_group_id": spark_job_group_id,
  1430. },
  1431. )
  1432. # Note:
  1433. # When a pyspark job cancelled, the UDF python worker process are killed by
  1434. # signal "SIGKILL", then `start_ray_node` process will detect the parent
  1435. # died event (see `ray.util.spark.start_ray_node.check_parent_alive`) and
  1436. # then kill ray worker node process and execute cleanup routine.
  1437. exec_cmd(
  1438. ray_worker_node_cmd,
  1439. synchronous=True,
  1440. extra_env=ray_worker_node_extra_envs,
  1441. )
  1442. except Exception as e:
  1443. # In the following 2 cases, exception is raised:
  1444. # (1)
  1445. # Starting Ray worker node fails, the `e` will contain detail
  1446. # subprocess stdout/stderr output.
  1447. # (2)
  1448. # In autoscaling mode, when Ray worker node is down, autoscaler will
  1449. # try to start new Ray worker node if necessary,
  1450. # and it creates a new spark job to launch Ray worker node process,
  1451. # note the old spark job will reschedule the failed spark task
  1452. # and raise error of "Starting Ray worker node twice with the same
  1453. # node id is not allowed".
  1454. #
  1455. # For either case (1) or case (2),
  1456. # to avoid Spark triggers more spark task retries, we swallow
  1457. # exception here to make spark the task exit normally.
  1458. err_msg = f"Ray worker node process exit, reason: {e}."
  1459. _logger.warning(err_msg)
  1460. yield err_msg, is_task_reschedule_failure
  1461. spark.sparkContext.setJobGroup(
  1462. spark_job_group_id,
  1463. spark_job_group_desc,
  1464. )
  1465. # Starting a normal spark job (not barrier spark job) to run ray worker
  1466. # nodes, the design purpose is:
  1467. # 1. Using normal spark job, spark tasks can automatically retry
  1468. # individually, we don't need to write additional retry logic, But, in
  1469. # barrier mode, if one spark task fails, it will cause all other spark
  1470. # tasks killed.
  1471. # 2. Using normal spark job, we can support failover when a spark worker
  1472. # physical machine crashes. (spark will try to re-schedule the spark task
  1473. # to other spark worker nodes)
  1474. # 3. Using barrier mode job, if the cluster resources does not satisfy
  1475. # "idle spark task slots >= argument num_spark_task", then the barrier
  1476. # job gets stuck and waits until enough idle task slots available, this
  1477. # behavior is not user-friendly, on a shared spark cluster, user is hard
  1478. # to estimate how many idle tasks available at a time, But, if using normal
  1479. # spark job, it can launch job with less spark tasks (i.e. user will see a
  1480. # ray cluster setup with less worker number initially), and when more task
  1481. # slots become available, it continues to launch tasks on new available
  1482. # slots, and user can see the ray cluster worker number increases when more
  1483. # slots available.
  1484. job_rdd = spark.sparkContext.parallelize(
  1485. list(range(num_worker_nodes)), num_worker_nodes
  1486. )
  1487. if using_stage_scheduling:
  1488. resource_profile = _create_resource_profile(
  1489. num_cpus_per_node,
  1490. num_gpus_per_node,
  1491. )
  1492. job_rdd = job_rdd.withResources(resource_profile)
  1493. hook_entry = _create_hook_entry(is_global=(ray_temp_dir is None))
  1494. hook_entry.on_spark_job_created(spark_job_group_id)
  1495. err_msg, is_task_reschedule_failure = job_rdd.mapPartitions(
  1496. ray_cluster_job_mapper
  1497. ).collect()[0]
  1498. if not is_task_reschedule_failure:
  1499. spark_job_server.last_worker_error = err_msg
  1500. return err_msg
  1501. return None
  1502. @PublicAPI
  1503. def shutdown_ray_cluster() -> None:
  1504. """
  1505. Shut down the active ray cluster.
  1506. """
  1507. global _active_ray_cluster
  1508. with _active_ray_cluster_rwlock:
  1509. if _active_ray_cluster is None:
  1510. raise RuntimeError("No active ray cluster to shut down.")
  1511. _active_ray_cluster.shutdown()
  1512. _active_ray_cluster = None
  1513. _global_ray_cluster_cancel_event = None
  1514. @DeveloperAPI
  1515. class AutoscalingCluster:
  1516. """Create a ray on spark autoscaling cluster."""
  1517. def __init__(
  1518. self,
  1519. head_resources: dict,
  1520. worker_node_types: dict,
  1521. extra_provider_config: dict,
  1522. upscaling_speed: float,
  1523. idle_timeout_minutes: float,
  1524. ):
  1525. """Create the cluster.
  1526. Args:
  1527. head_resources: resources of the head node, including CPU.
  1528. worker_node_types: autoscaler node types config for worker nodes.
  1529. """
  1530. self._head_resources = head_resources.copy()
  1531. self._head_resources["NODE_ID_AS_RESOURCE"] = HEAD_NODE_ID
  1532. self._config = self._generate_config(
  1533. head_resources,
  1534. worker_node_types,
  1535. extra_provider_config,
  1536. upscaling_speed,
  1537. idle_timeout_minutes,
  1538. )
  1539. def _generate_config(
  1540. self,
  1541. head_resources,
  1542. worker_node_types,
  1543. extra_provider_config,
  1544. upscaling_speed,
  1545. idle_timeout_minutes,
  1546. ):
  1547. base_config = yaml.safe_load(
  1548. open(
  1549. os.path.join(
  1550. os.path.dirname(ray.__file__),
  1551. "autoscaler/spark/defaults.yaml",
  1552. )
  1553. )
  1554. )
  1555. custom_config = copy.deepcopy(base_config)
  1556. custom_config["available_node_types"] = worker_node_types
  1557. custom_config["available_node_types"]["ray.head.default"] = {
  1558. "resources": head_resources,
  1559. "node_config": {},
  1560. "max_workers": 0,
  1561. }
  1562. custom_config["max_workers"] = sum(
  1563. v["max_workers"] for _, v in worker_node_types.items()
  1564. )
  1565. custom_config["provider"].update(extra_provider_config)
  1566. custom_config["upscaling_speed"] = upscaling_speed
  1567. custom_config["idle_timeout_minutes"] = idle_timeout_minutes
  1568. return custom_config
  1569. def start(
  1570. self,
  1571. ray_head_ip,
  1572. ray_head_port,
  1573. ray_client_server_port,
  1574. ray_temp_dir,
  1575. dashboard_options,
  1576. head_node_options,
  1577. collect_log_to_path,
  1578. ray_node_custom_env,
  1579. ):
  1580. """Start the cluster.
  1581. After this call returns, you can connect to the cluster with
  1582. ray.init("auto").
  1583. """
  1584. from ray.util.spark.cluster_init import (
  1585. RAY_ON_SPARK_COLLECT_LOG_TO_PATH,
  1586. _append_resources_config,
  1587. _convert_ray_node_options,
  1588. )
  1589. if ray_temp_dir is not None:
  1590. autoscale_config = os.path.join(ray_temp_dir, "autoscaling_config.json")
  1591. else:
  1592. autoscale_config = os.path.join(
  1593. _get_default_ray_tmp_dir(), "autoscaling_config.json"
  1594. )
  1595. with open(autoscale_config, "w") as f:
  1596. f.write(json.dumps(self._config))
  1597. (
  1598. worker_port_range_begin,
  1599. worker_port_range_end,
  1600. ) = _preallocate_ray_worker_port_range()
  1601. ray_head_node_cmd = [
  1602. sys.executable,
  1603. "-m",
  1604. "ray.util.spark.start_ray_node",
  1605. "--block",
  1606. "--head",
  1607. f"--node-ip-address={ray_head_ip}",
  1608. f"--port={ray_head_port}",
  1609. f"--ray-client-server-port={ray_client_server_port}",
  1610. f"--autoscaling-config={autoscale_config}",
  1611. f"--min-worker-port={worker_port_range_begin}",
  1612. f"--max-worker-port={worker_port_range_end - 1}",
  1613. *dashboard_options,
  1614. ]
  1615. if ray_temp_dir is not None:
  1616. ray_head_node_cmd.append(f"--temp-dir={ray_temp_dir}")
  1617. if "CPU" in self._head_resources:
  1618. ray_head_node_cmd.append(
  1619. "--num-cpus={}".format(self._head_resources.pop("CPU"))
  1620. )
  1621. if "GPU" in self._head_resources:
  1622. ray_head_node_cmd.append(
  1623. "--num-gpus={}".format(self._head_resources.pop("GPU"))
  1624. )
  1625. if "memory" in self._head_resources:
  1626. ray_head_node_cmd.append(
  1627. "--memory={}".format(self._head_resources.pop("memory"))
  1628. )
  1629. if "object_store_memory" in self._head_resources:
  1630. ray_head_node_cmd.append(
  1631. "--object-store-memory={}".format(
  1632. self._head_resources.pop("object_store_memory")
  1633. )
  1634. )
  1635. head_node_options = _append_resources_config(
  1636. head_node_options, self._head_resources
  1637. )
  1638. ray_head_node_cmd.extend(_convert_ray_node_options(head_node_options))
  1639. extra_env = {
  1640. "AUTOSCALER_UPDATE_INTERVAL_S": "1",
  1641. RAY_ON_SPARK_COLLECT_LOG_TO_PATH: collect_log_to_path or "",
  1642. RAY_ON_SPARK_START_RAY_PARENT_PID: str(os.getpid()),
  1643. **ray_node_custom_env,
  1644. }
  1645. self.ray_head_node_cmd = ray_head_node_cmd
  1646. return _start_ray_head_node(
  1647. ray_head_node_cmd, synchronous=False, extra_env=extra_env
  1648. )
  1649. def _start_ray_head_node(ray_head_node_cmd, synchronous, extra_env):
  1650. def preexec_function():
  1651. # Make `start_ray_node` script and Ray node process run
  1652. # in a separate group,
  1653. # otherwise Ray node will be in the same group of parent process,
  1654. # if parent process is a Jupyter notebook kernel, when user
  1655. # clicks interrupt cell button, SIGINT signal is sent, then Ray node will
  1656. # receive SIGINT signal, and it causes Ray node process dies.
  1657. # `start_ray_node` script should also run in a separate group
  1658. # because on Databricks Runtime, because if Databricks notebook
  1659. # is detached, if the children processes don't exit within 1s,
  1660. # they will receive SIGKILL, this behavior makes start_ray_node
  1661. # doesn't have enough time to complete cleanup work like removing
  1662. # temp directory and collecting logs.
  1663. os.setpgrp()
  1664. return exec_cmd(
  1665. ray_head_node_cmd,
  1666. synchronous=synchronous,
  1667. extra_env=extra_env,
  1668. preexec_fn=preexec_function,
  1669. )
  1670. _sigterm_signal_installed = False
  1671. def _install_sigterm_signal():
  1672. global _sigterm_signal_installed
  1673. if _sigterm_signal_installed:
  1674. return
  1675. try:
  1676. _origin_sigterm_handler = signal.getsignal(signal.SIGTERM)
  1677. def _sigterm_handler(signum, frame):
  1678. try:
  1679. shutdown_ray_cluster()
  1680. except Exception:
  1681. # swallow exception to continue executing the following code in the
  1682. # handler
  1683. pass
  1684. signal.signal(
  1685. signal.SIGTERM, _origin_sigterm_handler
  1686. ) # Reset to original signal
  1687. os.kill(
  1688. os.getpid(), signal.SIGTERM
  1689. ) # Re-raise the signal to trigger original behavior
  1690. signal.signal(signal.SIGTERM, _sigterm_handler)
  1691. _sigterm_signal_installed = True
  1692. except Exception:
  1693. _logger.warning("Install Ray-on-Spark SIGTERM handler failed.")