| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713 |
- import asyncio
- import copy
- import logging
- import os
- import random
- import string
- import time
- import traceback
- from typing import Any, AsyncIterator, Dict, Optional, Union
- import ray
- import ray._private.ray_constants as ray_constants
- from ray._common.utils import Timer, run_background_task
- from ray._private.accelerators.npu import NOSET_ASCEND_RT_VISIBLE_DEVICES_ENV_VAR
- from ray._private.accelerators.nvidia_gpu import NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR
- from ray._private.event.event_logger import get_event_logger
- from ray._private.label_utils import validate_label_selector
- from ray._raylet import GcsClient
- from ray.actor import ActorHandle
- from ray.core.generated.event_pb2 import Event
- from ray.dashboard.consts import (
- DEFAULT_JOB_START_TIMEOUT_SECONDS,
- RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR,
- RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR,
- RAY_STREAM_RUNTIME_ENV_LOG_TO_JOB_DRIVER_LOG_ENV_VAR,
- )
- from ray.dashboard.modules.job.common import (
- JOB_ACTOR_NAME_TEMPLATE,
- SUPERVISOR_ACTOR_RAY_NAMESPACE,
- JobInfo,
- JobInfoStorageClient,
- )
- from ray.dashboard.modules.job.job_log_storage_client import JobLogStorageClient
- from ray.dashboard.modules.job.job_supervisor import JobSupervisor
- from ray.dashboard.modules.job.utils import get_head_node_id
- from ray.dashboard.utils import close_logger_file_descriptor
- from ray.exceptions import ActorDiedError, ActorUnschedulableError, RuntimeEnvSetupError
- from ray.job_submission import JobErrorType, JobStatus
- from ray.runtime_env import RuntimeEnvConfig
- from ray.util.scheduling_strategies import (
- NodeAffinitySchedulingStrategy,
- SchedulingStrategyT,
- )
- logger = logging.getLogger(__name__)
- def generate_job_id() -> str:
- """Returns a job_id of the form 'raysubmit_XYZ'.
- Prefixed with 'raysubmit' to avoid confusion with Ray JobID (driver ID).
- """
- rand = random.SystemRandom()
- possible_characters = list(
- set(string.ascii_letters + string.digits)
- - {"I", "l", "o", "O", "0"} # No confusing characters
- )
- id_part = "".join(rand.choices(possible_characters, k=16))
- return f"raysubmit_{id_part}"
- class JobManager:
- """Provide python APIs for job submission and management.
- It does not provide persistence, all info will be lost if the cluster
- goes down.
- """
- # Time that we will sleep while tailing logs if no new log line is
- # available.
- LOG_TAIL_SLEEP_S = 1
- JOB_MONITOR_LOOP_PERIOD_S = 1
- WAIT_FOR_ACTOR_DEATH_TIMEOUT_S = 0.1
- def __init__(
- self, gcs_client: GcsClient, logs_dir: str, timeout_check_timer: Timer = None
- ):
- self._gcs_client = gcs_client
- self._logs_dir = logs_dir
- self._job_info_client = JobInfoStorageClient(gcs_client, logs_dir)
- self._gcs_address = gcs_client.address
- self._cluster_id_hex = gcs_client.cluster_id.hex()
- self._log_client = JobLogStorageClient()
- self._supervisor_actor_cls = ray.remote(JobSupervisor)
- self._timeout_check_timer = timeout_check_timer or Timer()
- self.monitored_jobs = set()
- try:
- self.event_logger = get_event_logger(Event.SourceType.JOBS, logs_dir)
- except Exception:
- self.event_logger = None
- self._recover_running_jobs_event = asyncio.Event()
- run_background_task(self._recover_running_jobs())
- def _get_job_driver_logger(self, job_id: str) -> logging.Logger:
- """Return job driver logger to log messages to the job driver log file.
- If this function is called for the first time, configure the logger.
- """
- job_driver_logger = logging.getLogger(f"{__name__}.driver-{job_id}")
- # Configure the logger if it's not already configured.
- if not job_driver_logger.handlers:
- job_driver_log_path = self._log_client.get_log_file_path(job_id)
- job_driver_handler = logging.FileHandler(job_driver_log_path)
- job_driver_formatter = logging.Formatter(ray_constants.LOGGER_FORMAT)
- job_driver_handler.setFormatter(job_driver_formatter)
- job_driver_logger.addHandler(job_driver_handler)
- return job_driver_logger
- async def _recover_running_jobs(self):
- """Recovers all running jobs from the status client.
- For each job, we will spawn a coroutine to monitor it.
- Each will be added to self._running_jobs and reconciled.
- """
- try:
- all_jobs = await self._job_info_client.get_all_jobs()
- for job_id, job_info in all_jobs.items():
- if not job_info.status.is_terminal():
- run_background_task(self._monitor_job(job_id))
- finally:
- # This event is awaited in `submit_job` to avoid race conditions between
- # recovery and new job submission, so it must always get set even if there
- # are exceptions.
- self._recover_running_jobs_event.set()
- def _get_actor_for_job(self, job_id: str) -> Optional[ActorHandle]:
- try:
- return ray.get_actor(
- JOB_ACTOR_NAME_TEMPLATE.format(job_id=job_id),
- namespace=SUPERVISOR_ACTOR_RAY_NAMESPACE,
- )
- except ValueError: # Ray returns ValueError for nonexistent actor.
- return None
- async def _monitor_job(
- self, job_id: str, job_supervisor: Optional[ActorHandle] = None
- ):
- """Monitors the specified job until it enters a terminal state.
- This is necessary because we need to handle the case where the
- JobSupervisor dies unexpectedly.
- """
- if job_id in self.monitored_jobs:
- logger.debug(f"Job {job_id} is already being monitored.")
- return
- self.monitored_jobs.add(job_id)
- try:
- await self._monitor_job_internal(job_id, job_supervisor)
- except Exception as e:
- logger.error("Unhandled exception in job monitoring!", exc_info=e)
- raise e
- finally:
- self.monitored_jobs.remove(job_id)
- async def _monitor_job_internal(
- self, job_id: str, job_supervisor: Optional[ActorHandle] = None
- ):
- timeout = float(
- os.environ.get(
- RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR,
- DEFAULT_JOB_START_TIMEOUT_SECONDS,
- )
- )
- job_status = None
- job_info = None
- ping_obj_ref = None
- while True:
- try:
- # NOTE: Job monitoring loop sleeps before proceeding with monitoring
- # sequence to consolidate the control-flow of the pacing
- # in a single place, rather than having it spread across
- # many branches
- await asyncio.sleep(self.JOB_MONITOR_LOOP_PERIOD_S)
- job_status = await self._job_info_client.get_status(
- job_id, timeout=None
- )
- if job_status == JobStatus.PENDING:
- # Compare the current time with the job start time.
- # If the job is still pending, we will set the status
- # to FAILED.
- if job_info is None:
- job_info = await self._job_info_client.get_info(
- job_id, timeout=None
- )
- if (
- self._timeout_check_timer.time() - job_info.start_time / 1000
- > timeout
- ):
- err_msg = (
- "Job supervisor actor failed to start within "
- f"{timeout} seconds. This timeout can be "
- f"configured by setting the environment "
- f"variable {RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR}."
- )
- resources_specified = (
- (
- job_info.entrypoint_num_cpus is not None
- and job_info.entrypoint_num_cpus > 0
- )
- or (
- job_info.entrypoint_num_gpus is not None
- and job_info.entrypoint_num_gpus > 0
- )
- or (
- job_info.entrypoint_memory is not None
- and job_info.entrypoint_memory > 0
- )
- or (
- job_info.entrypoint_resources is not None
- and len(job_info.entrypoint_resources) > 0
- )
- )
- if resources_specified:
- err_msg += (
- " This may be because the job entrypoint's specified "
- "resources (entrypoint_num_cpus, entrypoint_num_gpus, "
- "entrypoint_resources, entrypoint_memory)"
- "aren't available on the cluster."
- " Try checking the cluster's available resources with "
- "`ray status` and specifying fewer resources for the "
- "job entrypoint."
- )
- await self._job_info_client.put_status(
- job_id,
- JobStatus.FAILED,
- message=err_msg,
- error_type=JobErrorType.JOB_SUPERVISOR_ACTOR_START_TIMEOUT,
- timeout=None,
- )
- logger.error(err_msg)
- break
- if job_supervisor is None:
- job_supervisor = self._get_actor_for_job(job_id)
- if job_supervisor is None:
- if job_status == JobStatus.PENDING:
- # Maybe the job supervisor actor is not created yet.
- # We will wait for the next loop.
- continue
- else:
- # The job supervisor actor is not created, but the job
- # status is not PENDING. This means the job supervisor
- # actor is not created due to some unexpected errors.
- # We will set the job status to FAILED.
- logger.error(f"Failed to get job supervisor for job {job_id}.")
- await self._job_info_client.put_status(
- job_id,
- JobStatus.FAILED,
- message=(
- "Unexpected error occurred: "
- "failed to get job supervisor."
- ),
- error_type=JobErrorType.JOB_SUPERVISOR_ACTOR_START_FAILURE,
- timeout=None,
- )
- break
- # Check to see if `JobSupervisor` is alive and reachable
- if ping_obj_ref is None:
- ping_obj_ref = job_supervisor.ping.options(
- max_task_retries=-1
- ).remote()
- ready, _ = ray.wait([ping_obj_ref], timeout=0)
- if ready:
- ray.get(ping_obj_ref)
- ping_obj_ref = None
- else:
- continue
- except Exception as e:
- job_status = await self._job_info_client.get_status(
- job_id, timeout=None
- )
- target_job_error_message = ""
- target_job_error_type: Optional[JobErrorType] = None
- if job_status is not None and job_status.is_terminal():
- # If the job is already in a terminal state, then the actor
- # exiting is expected.
- pass
- else:
- if isinstance(e, RuntimeEnvSetupError):
- logger.error(f"Failed to set up runtime_env for job {job_id}.")
- target_job_error_message = f"runtime_env setup failed: {e}"
- target_job_error_type = JobErrorType.RUNTIME_ENV_SETUP_FAILURE
- elif isinstance(e, ActorUnschedulableError):
- logger.error(
- f"Failed to schedule job {job_id} because the supervisor "
- f"actor could not be scheduled: {e}"
- )
- target_job_error_message = (
- f"Job supervisor actor could not be scheduled: {e}"
- )
- target_job_error_type = (
- JobErrorType.JOB_SUPERVISOR_ACTOR_UNSCHEDULABLE
- )
- elif isinstance(e, ActorDiedError):
- logger.error(f"Job supervisor actor for {job_id} died: {e}")
- target_job_error_message = f"Job supervisor actor died: {e}"
- target_job_error_type = JobErrorType.JOB_SUPERVISOR_ACTOR_DIED
- else:
- logger.error(
- f"Job monitoring for job {job_id} failed "
- f"unexpectedly: {e}.",
- exc_info=e,
- )
- target_job_error_message = f"Unexpected error occurred: {e}"
- target_job_error_type = (
- JobErrorType.JOB_SUPERVISOR_ACTOR_UNKNOWN_FAILURE
- )
- job_status = JobStatus.FAILED
- await self._job_info_client.put_status(
- job_id,
- job_status,
- message=target_job_error_message,
- error_type=target_job_error_type
- or JobErrorType.JOB_SUPERVISOR_ACTOR_UNKNOWN_FAILURE,
- timeout=None,
- )
- # Log error message to the job driver file for easy access.
- if target_job_error_message:
- log_path = self._log_client.get_log_file_path(job_id)
- os.makedirs(os.path.dirname(log_path), exist_ok=True)
- with open(log_path, "a") as log_file:
- log_file.write(target_job_error_message)
- # Log events
- if self.event_logger:
- event_log = (
- f"Completed a ray job {job_id} with a status {job_status}."
- )
- if target_job_error_message:
- event_log += f" {target_job_error_message}"
- self.event_logger.error(event_log, submission_id=job_id)
- else:
- self.event_logger.info(event_log, submission_id=job_id)
- break
- # Kill the actor defensively to avoid leaking actors in unexpected error cases.
- if job_supervisor is None:
- job_supervisor = self._get_actor_for_job(job_id)
- if job_supervisor is not None:
- ray.kill(job_supervisor, no_restart=True)
- def _handle_supervisor_startup(self, job_id: str, result: Optional[Exception]):
- """Handle the result of starting a job supervisor actor.
- If started successfully, result should be None. Otherwise it should be
- an Exception.
- On failure, the job will be marked failed with a relevant error
- message.
- """
- if result is None:
- return
- def _get_supervisor_runtime_env(
- self,
- user_runtime_env: Dict[str, Any],
- submission_id: str,
- resources_specified: bool = False,
- ) -> Dict[str, Any]:
- """Configure and return the runtime_env for the supervisor actor.
- Args:
- user_runtime_env: The runtime_env specified by the user.
- resources_specified: Whether the user specified resources in the
- submit_job() call. If so, we will skip the workaround introduced
- in #24546 for GPU detection and just use the user's resource
- requests, so that the behavior matches that of the user specifying
- resources for any other actor.
- Returns:
- The runtime_env for the supervisor actor.
- """
- # Make a copy to avoid mutating passed runtime_env.
- runtime_env = (
- copy.deepcopy(user_runtime_env) if user_runtime_env is not None else {}
- )
- # NOTE(edoakes): Can't use .get(, {}) here because we need to handle the case
- # where env_vars is explicitly set to `None`.
- env_vars = runtime_env.get("env_vars")
- if env_vars is None:
- env_vars = {}
- env_vars[ray_constants.RAY_WORKER_NICENESS] = "0"
- if not resources_specified:
- # Don't set CUDA_VISIBLE_DEVICES for the supervisor actor so the
- # driver can use GPUs if it wants to. This will be removed from
- # the driver's runtime_env so it isn't inherited by tasks & actors.
- env_vars[NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR] = "1"
- env_vars[NOSET_ASCEND_RT_VISIBLE_DEVICES_ENV_VAR] = "1"
- runtime_env["env_vars"] = env_vars
- if os.getenv(RAY_STREAM_RUNTIME_ENV_LOG_TO_JOB_DRIVER_LOG_ENV_VAR, "0") == "1":
- config = runtime_env.get("config")
- # Empty fields may be set to None, so we need to check for None explicitly.
- if config is None:
- config = RuntimeEnvConfig()
- config["log_files"] = [self._log_client.get_log_file_path(submission_id)]
- runtime_env["config"] = config
- return runtime_env
- async def _get_scheduling_strategy(
- self, resources_specified: bool
- ) -> SchedulingStrategyT:
- """Get the scheduling strategy for the job.
- If resources_specified is true, or if the environment variable is set to
- allow the job to run on worker nodes, we will use Ray's default actor
- placement strategy. Otherwise, we will force the job to use the head node.
- Args:
- resources_specified: Whether the job specified any resources
- (CPUs, GPUs, or custom resources).
- Returns:
- The scheduling strategy to use for the job.
- """
- if resources_specified:
- return "DEFAULT"
- if os.environ.get(RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR, "0") == "1":
- logger.info(
- f"{RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR} was set to 1. "
- "Using Ray's default actor scheduling strategy for the job "
- "driver instead of running it on the head node."
- )
- return "DEFAULT"
- # If the user did not specify any resources or set the driver on worker nodes
- # env var, we will run the driver on the head node.
- head_node_id = await get_head_node_id(self._gcs_client)
- if head_node_id is None:
- logger.info(
- "Head node ID not found in GCS. Using Ray's default actor "
- "scheduling strategy for the job driver instead of running "
- "it on the head node."
- )
- scheduling_strategy = "DEFAULT"
- else:
- logger.info(
- "Head node ID found in GCS; scheduling job driver on "
- f"head node {head_node_id}"
- )
- scheduling_strategy = NodeAffinitySchedulingStrategy(
- node_id=head_node_id, soft=False
- )
- return scheduling_strategy
- async def submit_job(
- self,
- *,
- entrypoint: str,
- submission_id: Optional[str] = None,
- runtime_env: Optional[Dict[str, Any]] = None,
- metadata: Optional[Dict[str, str]] = None,
- entrypoint_num_cpus: Optional[Union[int, float]] = None,
- entrypoint_num_gpus: Optional[Union[int, float]] = None,
- entrypoint_memory: Optional[int] = None,
- entrypoint_resources: Optional[Dict[str, float]] = None,
- entrypoint_label_selector: Optional[Dict[str, str]] = None,
- _start_signal_actor: Optional[ActorHandle] = None,
- ) -> str:
- """
- Job execution happens asynchronously.
- 1) Generate a new unique id for this job submission, each call of this
- method assumes they're independent submission with its own new
- ID, job supervisor actor, and child process.
- 2) Create new detached actor with same runtime_env as job spec
- Actual setting up runtime_env, subprocess group, driver command
- execution, subprocess cleaning up and running status update to GCS
- is all handled by job supervisor actor.
- Args:
- entrypoint: Driver command to execute in subprocess shell.
- Represents the entrypoint to start user application.
- runtime_env: Runtime environment used to execute driver command,
- which could contain its own ray.init() to configure runtime
- env at ray cluster, task and actor level.
- metadata: Support passing arbitrary data to driver command in
- case needed.
- entrypoint_num_cpus: The quantity of CPU cores to reserve for the execution
- of the entrypoint command, separately from any tasks or actors launched
- by it. Defaults to 0.
- entrypoint_num_gpus: The quantity of GPUs to reserve for
- the entrypoint command, separately from any tasks or actors launched
- by it. Defaults to 0.
- entrypoint_memory: The amount of total available memory for workers
- requesting memory the entrypoint command, separately from any tasks
- or actors launched by it. Defaults to 0.
- entrypoint_resources: The quantity of various custom resources
- to reserve for the entrypoint command, separately from any tasks or
- actors launched by it.
- entrypoint_label_selector: Label selector for the entrypoint command.
- _start_signal_actor: Used in testing only to capture state
- transitions between PENDING -> RUNNING. Regular user shouldn't
- need this.
- Returns:
- job_id: Generated uuid for further job management. Only valid
- within the same ray cluster.
- """
- if entrypoint_num_cpus is None:
- entrypoint_num_cpus = 0
- if entrypoint_num_gpus is None:
- entrypoint_num_gpus = 0
- if entrypoint_memory is None:
- entrypoint_memory = 0
- if submission_id is None:
- submission_id = generate_job_id()
- # Wait for `_recover_running_jobs` to run before accepting submissions to
- # avoid duplicate monitoring of the same job.
- await self._recover_running_jobs_event.wait()
- logger.info(f"Starting job with submission_id: {submission_id}")
- if entrypoint_label_selector:
- error_message = validate_label_selector(entrypoint_label_selector)
- if error_message:
- raise ValueError(error_message)
- job_info = JobInfo(
- entrypoint=entrypoint,
- status=JobStatus.PENDING,
- start_time=int(time.time() * 1000),
- metadata=metadata,
- runtime_env=runtime_env,
- entrypoint_num_cpus=entrypoint_num_cpus,
- entrypoint_num_gpus=entrypoint_num_gpus,
- entrypoint_memory=entrypoint_memory,
- entrypoint_resources=entrypoint_resources,
- )
- new_key_added = await self._job_info_client.put_info(
- submission_id, job_info, overwrite=False
- )
- if not new_key_added:
- raise ValueError(
- f"Job with submission_id {submission_id} already exists. "
- "Please use a different submission_id."
- )
- driver_logger = self._get_job_driver_logger(submission_id)
- # Wait for the actor to start up asynchronously so this call always
- # returns immediately and we can catch errors with the actor starting
- # up.
- try:
- resources_specified = any(
- [
- entrypoint_num_cpus is not None and entrypoint_num_cpus > 0,
- entrypoint_num_gpus is not None and entrypoint_num_gpus > 0,
- entrypoint_memory is not None and entrypoint_memory > 0,
- entrypoint_resources not in [None, {}],
- entrypoint_label_selector not in [None, {}],
- ]
- )
- scheduling_strategy = await self._get_scheduling_strategy(
- resources_specified
- )
- if self.event_logger:
- self.event_logger.info(
- f"Started a ray job {submission_id}.", submission_id=submission_id
- )
- driver_logger.info("Runtime env is setting up.")
- supervisor_options = dict(
- lifetime="detached",
- name=JOB_ACTOR_NAME_TEMPLATE.format(job_id=submission_id),
- num_cpus=entrypoint_num_cpus,
- num_gpus=entrypoint_num_gpus,
- memory=entrypoint_memory,
- resources=entrypoint_resources,
- scheduling_strategy=scheduling_strategy,
- runtime_env=self._get_supervisor_runtime_env(
- runtime_env, submission_id, resources_specified
- ),
- namespace=SUPERVISOR_ACTOR_RAY_NAMESPACE,
- # Don't pollute task events with system actor tasks that users don't
- # know about.
- enable_task_events=False,
- )
- if entrypoint_label_selector:
- supervisor_options["label_selector"] = entrypoint_label_selector
- supervisor = self._supervisor_actor_cls.options(
- **supervisor_options
- ).remote(
- submission_id,
- entrypoint,
- metadata or {},
- self._gcs_address,
- self._cluster_id_hex,
- self._logs_dir,
- )
- supervisor.run.remote(
- _start_signal_actor=_start_signal_actor,
- resources_specified=resources_specified,
- )
- # Monitor the job in the background so we can detect errors without
- # requiring a client to poll.
- run_background_task(
- self._monitor_job(submission_id, job_supervisor=supervisor)
- )
- except Exception as e:
- tb_str = traceback.format_exc()
- driver_logger.warning(
- f"Failed to start supervisor actor for job {submission_id}: '{e}'"
- f". Full traceback:\n{tb_str}"
- )
- await self._job_info_client.put_status(
- submission_id,
- JobStatus.FAILED,
- message=(
- f"Failed to start supervisor actor {submission_id}: '{e}'"
- f". Full traceback:\n{tb_str}"
- ),
- error_type=JobErrorType.JOB_SUPERVISOR_ACTOR_START_FAILURE,
- )
- finally:
- close_logger_file_descriptor(driver_logger)
- return submission_id
- def stop_job(self, job_id) -> bool:
- """Request a job to exit, fire and forget.
- Returns whether or not the job was running.
- """
- job_supervisor_actor = self._get_actor_for_job(job_id)
- if job_supervisor_actor is not None:
- # Actor is still alive, signal it to stop the driver, fire and
- # forget
- job_supervisor_actor.stop.remote()
- return True
- else:
- return False
- async def delete_job(self, job_id):
- """Delete a job's info and metadata from the cluster."""
- job_status = await self._job_info_client.get_status(job_id)
- if job_status is None or not job_status.is_terminal():
- raise RuntimeError(
- f"Attempted to delete job '{job_id}', "
- f"but it is in a non-terminal state {job_status}."
- )
- await self._job_info_client.delete_info(job_id)
- return True
- def job_info_client(self) -> JobInfoStorageClient:
- return self._job_info_client
- async def get_job_status(self, job_id: str) -> Optional[JobStatus]:
- """Get latest status of a job."""
- return await self._job_info_client.get_status(job_id)
- async def get_job_info(self, job_id: str) -> Optional[JobInfo]:
- """Get latest info of a job."""
- return await self._job_info_client.get_info(job_id)
- async def list_jobs(self) -> Dict[str, JobInfo]:
- """Get info for all jobs."""
- return await self._job_info_client.get_all_jobs()
- def get_job_logs(self, job_id: str) -> str:
- """Get all logs produced by a job."""
- return self._log_client.get_logs(job_id)
- async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
- """Return an iterator following the logs of a job."""
- if await self.get_job_status(job_id) is None:
- raise RuntimeError(f"Job '{job_id}' does not exist.")
- job_finished = False
- async for lines in self._log_client.tail_logs(job_id):
- if lines is None:
- if job_finished:
- # Job has already finished and we have read EOF afterwards,
- # it's guaranteed that we won't get any more logs.
- return
- else:
- status = await self.get_job_status(job_id)
- if status.is_terminal():
- job_finished = True
- # Continue tailing logs generated between the
- # last EOF read and the finish of the job.
- await asyncio.sleep(self.LOG_TAIL_SLEEP_S)
- else:
- yield "".join(lines)
|