job_manager.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713
  1. import asyncio
  2. import copy
  3. import logging
  4. import os
  5. import random
  6. import string
  7. import time
  8. import traceback
  9. from typing import Any, AsyncIterator, Dict, Optional, Union
  10. import ray
  11. import ray._private.ray_constants as ray_constants
  12. from ray._common.utils import Timer, run_background_task
  13. from ray._private.accelerators.npu import NOSET_ASCEND_RT_VISIBLE_DEVICES_ENV_VAR
  14. from ray._private.accelerators.nvidia_gpu import NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR
  15. from ray._private.event.event_logger import get_event_logger
  16. from ray._private.label_utils import validate_label_selector
  17. from ray._raylet import GcsClient
  18. from ray.actor import ActorHandle
  19. from ray.core.generated.event_pb2 import Event
  20. from ray.dashboard.consts import (
  21. DEFAULT_JOB_START_TIMEOUT_SECONDS,
  22. RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR,
  23. RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR,
  24. RAY_STREAM_RUNTIME_ENV_LOG_TO_JOB_DRIVER_LOG_ENV_VAR,
  25. )
  26. from ray.dashboard.modules.job.common import (
  27. JOB_ACTOR_NAME_TEMPLATE,
  28. SUPERVISOR_ACTOR_RAY_NAMESPACE,
  29. JobInfo,
  30. JobInfoStorageClient,
  31. )
  32. from ray.dashboard.modules.job.job_log_storage_client import JobLogStorageClient
  33. from ray.dashboard.modules.job.job_supervisor import JobSupervisor
  34. from ray.dashboard.modules.job.utils import get_head_node_id
  35. from ray.dashboard.utils import close_logger_file_descriptor
  36. from ray.exceptions import ActorDiedError, ActorUnschedulableError, RuntimeEnvSetupError
  37. from ray.job_submission import JobErrorType, JobStatus
  38. from ray.runtime_env import RuntimeEnvConfig
  39. from ray.util.scheduling_strategies import (
  40. NodeAffinitySchedulingStrategy,
  41. SchedulingStrategyT,
  42. )
  43. logger = logging.getLogger(__name__)
  44. def generate_job_id() -> str:
  45. """Returns a job_id of the form 'raysubmit_XYZ'.
  46. Prefixed with 'raysubmit' to avoid confusion with Ray JobID (driver ID).
  47. """
  48. rand = random.SystemRandom()
  49. possible_characters = list(
  50. set(string.ascii_letters + string.digits)
  51. - {"I", "l", "o", "O", "0"} # No confusing characters
  52. )
  53. id_part = "".join(rand.choices(possible_characters, k=16))
  54. return f"raysubmit_{id_part}"
  55. class JobManager:
  56. """Provide python APIs for job submission and management.
  57. It does not provide persistence, all info will be lost if the cluster
  58. goes down.
  59. """
  60. # Time that we will sleep while tailing logs if no new log line is
  61. # available.
  62. LOG_TAIL_SLEEP_S = 1
  63. JOB_MONITOR_LOOP_PERIOD_S = 1
  64. WAIT_FOR_ACTOR_DEATH_TIMEOUT_S = 0.1
  65. def __init__(
  66. self, gcs_client: GcsClient, logs_dir: str, timeout_check_timer: Timer = None
  67. ):
  68. self._gcs_client = gcs_client
  69. self._logs_dir = logs_dir
  70. self._job_info_client = JobInfoStorageClient(gcs_client, logs_dir)
  71. self._gcs_address = gcs_client.address
  72. self._cluster_id_hex = gcs_client.cluster_id.hex()
  73. self._log_client = JobLogStorageClient()
  74. self._supervisor_actor_cls = ray.remote(JobSupervisor)
  75. self._timeout_check_timer = timeout_check_timer or Timer()
  76. self.monitored_jobs = set()
  77. try:
  78. self.event_logger = get_event_logger(Event.SourceType.JOBS, logs_dir)
  79. except Exception:
  80. self.event_logger = None
  81. self._recover_running_jobs_event = asyncio.Event()
  82. run_background_task(self._recover_running_jobs())
  83. def _get_job_driver_logger(self, job_id: str) -> logging.Logger:
  84. """Return job driver logger to log messages to the job driver log file.
  85. If this function is called for the first time, configure the logger.
  86. """
  87. job_driver_logger = logging.getLogger(f"{__name__}.driver-{job_id}")
  88. # Configure the logger if it's not already configured.
  89. if not job_driver_logger.handlers:
  90. job_driver_log_path = self._log_client.get_log_file_path(job_id)
  91. job_driver_handler = logging.FileHandler(job_driver_log_path)
  92. job_driver_formatter = logging.Formatter(ray_constants.LOGGER_FORMAT)
  93. job_driver_handler.setFormatter(job_driver_formatter)
  94. job_driver_logger.addHandler(job_driver_handler)
  95. return job_driver_logger
  96. async def _recover_running_jobs(self):
  97. """Recovers all running jobs from the status client.
  98. For each job, we will spawn a coroutine to monitor it.
  99. Each will be added to self._running_jobs and reconciled.
  100. """
  101. try:
  102. all_jobs = await self._job_info_client.get_all_jobs()
  103. for job_id, job_info in all_jobs.items():
  104. if not job_info.status.is_terminal():
  105. run_background_task(self._monitor_job(job_id))
  106. finally:
  107. # This event is awaited in `submit_job` to avoid race conditions between
  108. # recovery and new job submission, so it must always get set even if there
  109. # are exceptions.
  110. self._recover_running_jobs_event.set()
  111. def _get_actor_for_job(self, job_id: str) -> Optional[ActorHandle]:
  112. try:
  113. return ray.get_actor(
  114. JOB_ACTOR_NAME_TEMPLATE.format(job_id=job_id),
  115. namespace=SUPERVISOR_ACTOR_RAY_NAMESPACE,
  116. )
  117. except ValueError: # Ray returns ValueError for nonexistent actor.
  118. return None
  119. async def _monitor_job(
  120. self, job_id: str, job_supervisor: Optional[ActorHandle] = None
  121. ):
  122. """Monitors the specified job until it enters a terminal state.
  123. This is necessary because we need to handle the case where the
  124. JobSupervisor dies unexpectedly.
  125. """
  126. if job_id in self.monitored_jobs:
  127. logger.debug(f"Job {job_id} is already being monitored.")
  128. return
  129. self.monitored_jobs.add(job_id)
  130. try:
  131. await self._monitor_job_internal(job_id, job_supervisor)
  132. except Exception as e:
  133. logger.error("Unhandled exception in job monitoring!", exc_info=e)
  134. raise e
  135. finally:
  136. self.monitored_jobs.remove(job_id)
  137. async def _monitor_job_internal(
  138. self, job_id: str, job_supervisor: Optional[ActorHandle] = None
  139. ):
  140. timeout = float(
  141. os.environ.get(
  142. RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR,
  143. DEFAULT_JOB_START_TIMEOUT_SECONDS,
  144. )
  145. )
  146. job_status = None
  147. job_info = None
  148. ping_obj_ref = None
  149. while True:
  150. try:
  151. # NOTE: Job monitoring loop sleeps before proceeding with monitoring
  152. # sequence to consolidate the control-flow of the pacing
  153. # in a single place, rather than having it spread across
  154. # many branches
  155. await asyncio.sleep(self.JOB_MONITOR_LOOP_PERIOD_S)
  156. job_status = await self._job_info_client.get_status(
  157. job_id, timeout=None
  158. )
  159. if job_status == JobStatus.PENDING:
  160. # Compare the current time with the job start time.
  161. # If the job is still pending, we will set the status
  162. # to FAILED.
  163. if job_info is None:
  164. job_info = await self._job_info_client.get_info(
  165. job_id, timeout=None
  166. )
  167. if (
  168. self._timeout_check_timer.time() - job_info.start_time / 1000
  169. > timeout
  170. ):
  171. err_msg = (
  172. "Job supervisor actor failed to start within "
  173. f"{timeout} seconds. This timeout can be "
  174. f"configured by setting the environment "
  175. f"variable {RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR}."
  176. )
  177. resources_specified = (
  178. (
  179. job_info.entrypoint_num_cpus is not None
  180. and job_info.entrypoint_num_cpus > 0
  181. )
  182. or (
  183. job_info.entrypoint_num_gpus is not None
  184. and job_info.entrypoint_num_gpus > 0
  185. )
  186. or (
  187. job_info.entrypoint_memory is not None
  188. and job_info.entrypoint_memory > 0
  189. )
  190. or (
  191. job_info.entrypoint_resources is not None
  192. and len(job_info.entrypoint_resources) > 0
  193. )
  194. )
  195. if resources_specified:
  196. err_msg += (
  197. " This may be because the job entrypoint's specified "
  198. "resources (entrypoint_num_cpus, entrypoint_num_gpus, "
  199. "entrypoint_resources, entrypoint_memory)"
  200. "aren't available on the cluster."
  201. " Try checking the cluster's available resources with "
  202. "`ray status` and specifying fewer resources for the "
  203. "job entrypoint."
  204. )
  205. await self._job_info_client.put_status(
  206. job_id,
  207. JobStatus.FAILED,
  208. message=err_msg,
  209. error_type=JobErrorType.JOB_SUPERVISOR_ACTOR_START_TIMEOUT,
  210. timeout=None,
  211. )
  212. logger.error(err_msg)
  213. break
  214. if job_supervisor is None:
  215. job_supervisor = self._get_actor_for_job(job_id)
  216. if job_supervisor is None:
  217. if job_status == JobStatus.PENDING:
  218. # Maybe the job supervisor actor is not created yet.
  219. # We will wait for the next loop.
  220. continue
  221. else:
  222. # The job supervisor actor is not created, but the job
  223. # status is not PENDING. This means the job supervisor
  224. # actor is not created due to some unexpected errors.
  225. # We will set the job status to FAILED.
  226. logger.error(f"Failed to get job supervisor for job {job_id}.")
  227. await self._job_info_client.put_status(
  228. job_id,
  229. JobStatus.FAILED,
  230. message=(
  231. "Unexpected error occurred: "
  232. "failed to get job supervisor."
  233. ),
  234. error_type=JobErrorType.JOB_SUPERVISOR_ACTOR_START_FAILURE,
  235. timeout=None,
  236. )
  237. break
  238. # Check to see if `JobSupervisor` is alive and reachable
  239. if ping_obj_ref is None:
  240. ping_obj_ref = job_supervisor.ping.options(
  241. max_task_retries=-1
  242. ).remote()
  243. ready, _ = ray.wait([ping_obj_ref], timeout=0)
  244. if ready:
  245. ray.get(ping_obj_ref)
  246. ping_obj_ref = None
  247. else:
  248. continue
  249. except Exception as e:
  250. job_status = await self._job_info_client.get_status(
  251. job_id, timeout=None
  252. )
  253. target_job_error_message = ""
  254. target_job_error_type: Optional[JobErrorType] = None
  255. if job_status is not None and job_status.is_terminal():
  256. # If the job is already in a terminal state, then the actor
  257. # exiting is expected.
  258. pass
  259. else:
  260. if isinstance(e, RuntimeEnvSetupError):
  261. logger.error(f"Failed to set up runtime_env for job {job_id}.")
  262. target_job_error_message = f"runtime_env setup failed: {e}"
  263. target_job_error_type = JobErrorType.RUNTIME_ENV_SETUP_FAILURE
  264. elif isinstance(e, ActorUnschedulableError):
  265. logger.error(
  266. f"Failed to schedule job {job_id} because the supervisor "
  267. f"actor could not be scheduled: {e}"
  268. )
  269. target_job_error_message = (
  270. f"Job supervisor actor could not be scheduled: {e}"
  271. )
  272. target_job_error_type = (
  273. JobErrorType.JOB_SUPERVISOR_ACTOR_UNSCHEDULABLE
  274. )
  275. elif isinstance(e, ActorDiedError):
  276. logger.error(f"Job supervisor actor for {job_id} died: {e}")
  277. target_job_error_message = f"Job supervisor actor died: {e}"
  278. target_job_error_type = JobErrorType.JOB_SUPERVISOR_ACTOR_DIED
  279. else:
  280. logger.error(
  281. f"Job monitoring for job {job_id} failed "
  282. f"unexpectedly: {e}.",
  283. exc_info=e,
  284. )
  285. target_job_error_message = f"Unexpected error occurred: {e}"
  286. target_job_error_type = (
  287. JobErrorType.JOB_SUPERVISOR_ACTOR_UNKNOWN_FAILURE
  288. )
  289. job_status = JobStatus.FAILED
  290. await self._job_info_client.put_status(
  291. job_id,
  292. job_status,
  293. message=target_job_error_message,
  294. error_type=target_job_error_type
  295. or JobErrorType.JOB_SUPERVISOR_ACTOR_UNKNOWN_FAILURE,
  296. timeout=None,
  297. )
  298. # Log error message to the job driver file for easy access.
  299. if target_job_error_message:
  300. log_path = self._log_client.get_log_file_path(job_id)
  301. os.makedirs(os.path.dirname(log_path), exist_ok=True)
  302. with open(log_path, "a") as log_file:
  303. log_file.write(target_job_error_message)
  304. # Log events
  305. if self.event_logger:
  306. event_log = (
  307. f"Completed a ray job {job_id} with a status {job_status}."
  308. )
  309. if target_job_error_message:
  310. event_log += f" {target_job_error_message}"
  311. self.event_logger.error(event_log, submission_id=job_id)
  312. else:
  313. self.event_logger.info(event_log, submission_id=job_id)
  314. break
  315. # Kill the actor defensively to avoid leaking actors in unexpected error cases.
  316. if job_supervisor is None:
  317. job_supervisor = self._get_actor_for_job(job_id)
  318. if job_supervisor is not None:
  319. ray.kill(job_supervisor, no_restart=True)
  320. def _handle_supervisor_startup(self, job_id: str, result: Optional[Exception]):
  321. """Handle the result of starting a job supervisor actor.
  322. If started successfully, result should be None. Otherwise it should be
  323. an Exception.
  324. On failure, the job will be marked failed with a relevant error
  325. message.
  326. """
  327. if result is None:
  328. return
  329. def _get_supervisor_runtime_env(
  330. self,
  331. user_runtime_env: Dict[str, Any],
  332. submission_id: str,
  333. resources_specified: bool = False,
  334. ) -> Dict[str, Any]:
  335. """Configure and return the runtime_env for the supervisor actor.
  336. Args:
  337. user_runtime_env: The runtime_env specified by the user.
  338. resources_specified: Whether the user specified resources in the
  339. submit_job() call. If so, we will skip the workaround introduced
  340. in #24546 for GPU detection and just use the user's resource
  341. requests, so that the behavior matches that of the user specifying
  342. resources for any other actor.
  343. Returns:
  344. The runtime_env for the supervisor actor.
  345. """
  346. # Make a copy to avoid mutating passed runtime_env.
  347. runtime_env = (
  348. copy.deepcopy(user_runtime_env) if user_runtime_env is not None else {}
  349. )
  350. # NOTE(edoakes): Can't use .get(, {}) here because we need to handle the case
  351. # where env_vars is explicitly set to `None`.
  352. env_vars = runtime_env.get("env_vars")
  353. if env_vars is None:
  354. env_vars = {}
  355. env_vars[ray_constants.RAY_WORKER_NICENESS] = "0"
  356. if not resources_specified:
  357. # Don't set CUDA_VISIBLE_DEVICES for the supervisor actor so the
  358. # driver can use GPUs if it wants to. This will be removed from
  359. # the driver's runtime_env so it isn't inherited by tasks & actors.
  360. env_vars[NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR] = "1"
  361. env_vars[NOSET_ASCEND_RT_VISIBLE_DEVICES_ENV_VAR] = "1"
  362. runtime_env["env_vars"] = env_vars
  363. if os.getenv(RAY_STREAM_RUNTIME_ENV_LOG_TO_JOB_DRIVER_LOG_ENV_VAR, "0") == "1":
  364. config = runtime_env.get("config")
  365. # Empty fields may be set to None, so we need to check for None explicitly.
  366. if config is None:
  367. config = RuntimeEnvConfig()
  368. config["log_files"] = [self._log_client.get_log_file_path(submission_id)]
  369. runtime_env["config"] = config
  370. return runtime_env
  371. async def _get_scheduling_strategy(
  372. self, resources_specified: bool
  373. ) -> SchedulingStrategyT:
  374. """Get the scheduling strategy for the job.
  375. If resources_specified is true, or if the environment variable is set to
  376. allow the job to run on worker nodes, we will use Ray's default actor
  377. placement strategy. Otherwise, we will force the job to use the head node.
  378. Args:
  379. resources_specified: Whether the job specified any resources
  380. (CPUs, GPUs, or custom resources).
  381. Returns:
  382. The scheduling strategy to use for the job.
  383. """
  384. if resources_specified:
  385. return "DEFAULT"
  386. if os.environ.get(RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR, "0") == "1":
  387. logger.info(
  388. f"{RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR} was set to 1. "
  389. "Using Ray's default actor scheduling strategy for the job "
  390. "driver instead of running it on the head node."
  391. )
  392. return "DEFAULT"
  393. # If the user did not specify any resources or set the driver on worker nodes
  394. # env var, we will run the driver on the head node.
  395. head_node_id = await get_head_node_id(self._gcs_client)
  396. if head_node_id is None:
  397. logger.info(
  398. "Head node ID not found in GCS. Using Ray's default actor "
  399. "scheduling strategy for the job driver instead of running "
  400. "it on the head node."
  401. )
  402. scheduling_strategy = "DEFAULT"
  403. else:
  404. logger.info(
  405. "Head node ID found in GCS; scheduling job driver on "
  406. f"head node {head_node_id}"
  407. )
  408. scheduling_strategy = NodeAffinitySchedulingStrategy(
  409. node_id=head_node_id, soft=False
  410. )
  411. return scheduling_strategy
  412. async def submit_job(
  413. self,
  414. *,
  415. entrypoint: str,
  416. submission_id: Optional[str] = None,
  417. runtime_env: Optional[Dict[str, Any]] = None,
  418. metadata: Optional[Dict[str, str]] = None,
  419. entrypoint_num_cpus: Optional[Union[int, float]] = None,
  420. entrypoint_num_gpus: Optional[Union[int, float]] = None,
  421. entrypoint_memory: Optional[int] = None,
  422. entrypoint_resources: Optional[Dict[str, float]] = None,
  423. entrypoint_label_selector: Optional[Dict[str, str]] = None,
  424. _start_signal_actor: Optional[ActorHandle] = None,
  425. ) -> str:
  426. """
  427. Job execution happens asynchronously.
  428. 1) Generate a new unique id for this job submission, each call of this
  429. method assumes they're independent submission with its own new
  430. ID, job supervisor actor, and child process.
  431. 2) Create new detached actor with same runtime_env as job spec
  432. Actual setting up runtime_env, subprocess group, driver command
  433. execution, subprocess cleaning up and running status update to GCS
  434. is all handled by job supervisor actor.
  435. Args:
  436. entrypoint: Driver command to execute in subprocess shell.
  437. Represents the entrypoint to start user application.
  438. runtime_env: Runtime environment used to execute driver command,
  439. which could contain its own ray.init() to configure runtime
  440. env at ray cluster, task and actor level.
  441. metadata: Support passing arbitrary data to driver command in
  442. case needed.
  443. entrypoint_num_cpus: The quantity of CPU cores to reserve for the execution
  444. of the entrypoint command, separately from any tasks or actors launched
  445. by it. Defaults to 0.
  446. entrypoint_num_gpus: The quantity of GPUs to reserve for
  447. the entrypoint command, separately from any tasks or actors launched
  448. by it. Defaults to 0.
  449. entrypoint_memory: The amount of total available memory for workers
  450. requesting memory the entrypoint command, separately from any tasks
  451. or actors launched by it. Defaults to 0.
  452. entrypoint_resources: The quantity of various custom resources
  453. to reserve for the entrypoint command, separately from any tasks or
  454. actors launched by it.
  455. entrypoint_label_selector: Label selector for the entrypoint command.
  456. _start_signal_actor: Used in testing only to capture state
  457. transitions between PENDING -> RUNNING. Regular user shouldn't
  458. need this.
  459. Returns:
  460. job_id: Generated uuid for further job management. Only valid
  461. within the same ray cluster.
  462. """
  463. if entrypoint_num_cpus is None:
  464. entrypoint_num_cpus = 0
  465. if entrypoint_num_gpus is None:
  466. entrypoint_num_gpus = 0
  467. if entrypoint_memory is None:
  468. entrypoint_memory = 0
  469. if submission_id is None:
  470. submission_id = generate_job_id()
  471. # Wait for `_recover_running_jobs` to run before accepting submissions to
  472. # avoid duplicate monitoring of the same job.
  473. await self._recover_running_jobs_event.wait()
  474. logger.info(f"Starting job with submission_id: {submission_id}")
  475. if entrypoint_label_selector:
  476. error_message = validate_label_selector(entrypoint_label_selector)
  477. if error_message:
  478. raise ValueError(error_message)
  479. job_info = JobInfo(
  480. entrypoint=entrypoint,
  481. status=JobStatus.PENDING,
  482. start_time=int(time.time() * 1000),
  483. metadata=metadata,
  484. runtime_env=runtime_env,
  485. entrypoint_num_cpus=entrypoint_num_cpus,
  486. entrypoint_num_gpus=entrypoint_num_gpus,
  487. entrypoint_memory=entrypoint_memory,
  488. entrypoint_resources=entrypoint_resources,
  489. )
  490. new_key_added = await self._job_info_client.put_info(
  491. submission_id, job_info, overwrite=False
  492. )
  493. if not new_key_added:
  494. raise ValueError(
  495. f"Job with submission_id {submission_id} already exists. "
  496. "Please use a different submission_id."
  497. )
  498. driver_logger = self._get_job_driver_logger(submission_id)
  499. # Wait for the actor to start up asynchronously so this call always
  500. # returns immediately and we can catch errors with the actor starting
  501. # up.
  502. try:
  503. resources_specified = any(
  504. [
  505. entrypoint_num_cpus is not None and entrypoint_num_cpus > 0,
  506. entrypoint_num_gpus is not None and entrypoint_num_gpus > 0,
  507. entrypoint_memory is not None and entrypoint_memory > 0,
  508. entrypoint_resources not in [None, {}],
  509. entrypoint_label_selector not in [None, {}],
  510. ]
  511. )
  512. scheduling_strategy = await self._get_scheduling_strategy(
  513. resources_specified
  514. )
  515. if self.event_logger:
  516. self.event_logger.info(
  517. f"Started a ray job {submission_id}.", submission_id=submission_id
  518. )
  519. driver_logger.info("Runtime env is setting up.")
  520. supervisor_options = dict(
  521. lifetime="detached",
  522. name=JOB_ACTOR_NAME_TEMPLATE.format(job_id=submission_id),
  523. num_cpus=entrypoint_num_cpus,
  524. num_gpus=entrypoint_num_gpus,
  525. memory=entrypoint_memory,
  526. resources=entrypoint_resources,
  527. scheduling_strategy=scheduling_strategy,
  528. runtime_env=self._get_supervisor_runtime_env(
  529. runtime_env, submission_id, resources_specified
  530. ),
  531. namespace=SUPERVISOR_ACTOR_RAY_NAMESPACE,
  532. # Don't pollute task events with system actor tasks that users don't
  533. # know about.
  534. enable_task_events=False,
  535. )
  536. if entrypoint_label_selector:
  537. supervisor_options["label_selector"] = entrypoint_label_selector
  538. supervisor = self._supervisor_actor_cls.options(
  539. **supervisor_options
  540. ).remote(
  541. submission_id,
  542. entrypoint,
  543. metadata or {},
  544. self._gcs_address,
  545. self._cluster_id_hex,
  546. self._logs_dir,
  547. )
  548. supervisor.run.remote(
  549. _start_signal_actor=_start_signal_actor,
  550. resources_specified=resources_specified,
  551. )
  552. # Monitor the job in the background so we can detect errors without
  553. # requiring a client to poll.
  554. run_background_task(
  555. self._monitor_job(submission_id, job_supervisor=supervisor)
  556. )
  557. except Exception as e:
  558. tb_str = traceback.format_exc()
  559. driver_logger.warning(
  560. f"Failed to start supervisor actor for job {submission_id}: '{e}'"
  561. f". Full traceback:\n{tb_str}"
  562. )
  563. await self._job_info_client.put_status(
  564. submission_id,
  565. JobStatus.FAILED,
  566. message=(
  567. f"Failed to start supervisor actor {submission_id}: '{e}'"
  568. f". Full traceback:\n{tb_str}"
  569. ),
  570. error_type=JobErrorType.JOB_SUPERVISOR_ACTOR_START_FAILURE,
  571. )
  572. finally:
  573. close_logger_file_descriptor(driver_logger)
  574. return submission_id
  575. def stop_job(self, job_id) -> bool:
  576. """Request a job to exit, fire and forget.
  577. Returns whether or not the job was running.
  578. """
  579. job_supervisor_actor = self._get_actor_for_job(job_id)
  580. if job_supervisor_actor is not None:
  581. # Actor is still alive, signal it to stop the driver, fire and
  582. # forget
  583. job_supervisor_actor.stop.remote()
  584. return True
  585. else:
  586. return False
  587. async def delete_job(self, job_id):
  588. """Delete a job's info and metadata from the cluster."""
  589. job_status = await self._job_info_client.get_status(job_id)
  590. if job_status is None or not job_status.is_terminal():
  591. raise RuntimeError(
  592. f"Attempted to delete job '{job_id}', "
  593. f"but it is in a non-terminal state {job_status}."
  594. )
  595. await self._job_info_client.delete_info(job_id)
  596. return True
  597. def job_info_client(self) -> JobInfoStorageClient:
  598. return self._job_info_client
  599. async def get_job_status(self, job_id: str) -> Optional[JobStatus]:
  600. """Get latest status of a job."""
  601. return await self._job_info_client.get_status(job_id)
  602. async def get_job_info(self, job_id: str) -> Optional[JobInfo]:
  603. """Get latest info of a job."""
  604. return await self._job_info_client.get_info(job_id)
  605. async def list_jobs(self) -> Dict[str, JobInfo]:
  606. """Get info for all jobs."""
  607. return await self._job_info_client.get_all_jobs()
  608. def get_job_logs(self, job_id: str) -> str:
  609. """Get all logs produced by a job."""
  610. return self._log_client.get_logs(job_id)
  611. async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
  612. """Return an iterator following the logs of a job."""
  613. if await self.get_job_status(job_id) is None:
  614. raise RuntimeError(f"Job '{job_id}' does not exist.")
  615. job_finished = False
  616. async for lines in self._log_client.tail_logs(job_id):
  617. if lines is None:
  618. if job_finished:
  619. # Job has already finished and we have read EOF afterwards,
  620. # it's guaranteed that we won't get any more logs.
  621. return
  622. else:
  623. status = await self.get_job_status(job_id)
  624. if status.is_terminal():
  625. job_finished = True
  626. # Continue tailing logs generated between the
  627. # last EOF read and the finish of the job.
  628. await asyncio.sleep(self.LOG_TAIL_SLEEP_S)
  629. else:
  630. yield "".join(lines)