| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761 |
- import asyncio
- import dataclasses
- import enum
- import json
- import logging
- import os
- import time
- import traceback
- from datetime import datetime
- from typing import AsyncIterator, Dict, Optional, Tuple
- import aiohttp.web
- from aiohttp.client import ClientResponse
- from aiohttp.web import Request, Response, StreamResponse
- import ray
- from ray import NodeID
- from ray._common.network_utils import build_address
- from ray._common.pydantic_compat import BaseModel, Extra, Field, validator
- from ray._common.utils import get_or_create_event_loop, load_class
- from ray._private.authentication.http_token_authentication import (
- get_auth_headers_if_auth_enabled,
- )
- from ray._private.ray_constants import KV_NAMESPACE_DASHBOARD
- from ray._private.runtime_env.packaging import (
- package_exists,
- pin_runtime_env_uri,
- upload_package_to_gcs,
- )
- from ray.dashboard.consts import (
- DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX,
- GCS_RPC_TIMEOUT_SECONDS,
- RAY_CLUSTER_ACTIVITY_HOOK,
- TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS,
- WAIT_AVAILABLE_AGENT_TIMEOUT,
- )
- from ray.dashboard.modules.job.common import (
- JobDeleteResponse,
- JobInfoStorageClient,
- JobLogsResponse,
- JobStopResponse,
- JobSubmitRequest,
- JobSubmitResponse,
- http_uri_components_to_uri,
- )
- from ray.dashboard.modules.job.pydantic_models import JobDetails, JobType
- from ray.dashboard.modules.job.utils import (
- find_job_by_ids,
- get_driver_jobs,
- get_head_node_id,
- parse_and_validate_request,
- )
- from ray.dashboard.modules.version import CURRENT_VERSION, VersionResponse
- from ray.dashboard.subprocesses.module import SubprocessModule
- from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
- from ray.dashboard.subprocesses.utils import ResponseType
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- class RayActivityStatus(str, enum.Enum):
- ACTIVE = "ACTIVE"
- INACTIVE = "INACTIVE"
- ERROR = "ERROR"
- class RayActivityResponse(BaseModel, extra=Extra.allow):
- """
- Pydantic model used to inform if a particular Ray component can be considered
- active, and metadata about observation.
- """
- is_active: RayActivityStatus = Field(
- ...,
- description=(
- "Whether the corresponding Ray component is considered active or inactive, "
- "or if there was an error while collecting this observation."
- ),
- )
- reason: Optional[str] = Field(
- None, description="Reason if Ray component is considered active or errored."
- )
- timestamp: float = Field(
- ...,
- description=(
- "Timestamp of when this observation about the Ray component was made. "
- "This is in the format of seconds since unix epoch."
- ),
- )
- last_activity_at: Optional[float] = Field(
- None,
- description=(
- "Timestamp when last actvity of this Ray component finished in format of "
- "seconds since unix epoch. This field does not need to be populated "
- "for Ray components where it is not meaningful."
- ),
- )
- @validator("reason", always=True)
- def reason_required(cls, v, values, **kwargs):
- if "is_active" in values and values["is_active"] != RayActivityStatus.INACTIVE:
- if v is None:
- raise ValueError(
- 'Reason is required if is_active is "active" or "error"'
- )
- return v
- class JobAgentSubmissionClient:
- """A local client for submitting and interacting with jobs on a specific node
- in the remote cluster.
- Submits requests over HTTP to the job agent on the specific node using the REST API.
- """
- def __init__(
- self,
- dashboard_agent_address: str,
- ):
- self._agent_address = dashboard_agent_address
- self._session = aiohttp.ClientSession()
- def _get_headers(self):
- """Get auth headers if token authentication is enabled."""
- return get_auth_headers_if_auth_enabled({})
- async def _raise_error(self, resp: ClientResponse):
- status = resp.status
- error_text = await resp.text()
- raise RuntimeError(f"Request failed with status code {status}: {error_text}.")
- async def submit_job_internal(self, req: JobSubmitRequest) -> JobSubmitResponse:
- logger.debug(f"Submitting job with submission_id={req.submission_id}.")
- async with self._session.post(
- f"{self._agent_address}/api/job_agent/jobs/",
- json=dataclasses.asdict(req),
- headers=self._get_headers(),
- ) as resp:
- if resp.status == 200:
- result_json = await resp.json()
- return JobSubmitResponse(**result_json)
- else:
- await self._raise_error(resp)
- async def stop_job_internal(self, job_id: str) -> JobStopResponse:
- logger.debug(f"Stopping job with job_id={job_id}.")
- async with self._session.post(
- f"{self._agent_address}/api/job_agent/jobs/{job_id}/stop",
- headers=self._get_headers(),
- ) as resp:
- if resp.status == 200:
- result_json = await resp.json()
- return JobStopResponse(**result_json)
- else:
- await self._raise_error(resp)
- async def delete_job_internal(self, job_id: str) -> JobDeleteResponse:
- logger.debug(f"Deleting job with job_id={job_id}.")
- async with self._session.delete(
- f"{self._agent_address}/api/job_agent/jobs/{job_id}",
- headers=self._get_headers(),
- ) as resp:
- if resp.status == 200:
- result_json = await resp.json()
- return JobDeleteResponse(**result_json)
- else:
- await self._raise_error(resp)
- async def get_job_logs_internal(self, job_id: str) -> JobLogsResponse:
- async with self._session.get(
- f"{self._agent_address}/api/job_agent/jobs/{job_id}/logs",
- headers=self._get_headers(),
- ) as resp:
- if resp.status == 200:
- result_json = await resp.json()
- return JobLogsResponse(**result_json)
- else:
- await self._raise_error(resp)
- async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
- """Get an iterator that follows the logs of a job."""
- ws = await self._session.ws_connect(
- f"{self._agent_address}/api/job_agent/jobs/{job_id}/logs/tail",
- headers=self._get_headers(),
- )
- while True:
- msg = await ws.receive()
- if msg.type == aiohttp.WSMsgType.TEXT:
- yield msg.data
- elif msg.type == aiohttp.WSMsgType.CLOSED:
- break
- elif msg.type == aiohttp.WSMsgType.ERROR:
- pass
- async def close(self, ignore_error=True):
- try:
- await self._session.close()
- except Exception:
- if not ignore_error:
- raise
- class JobHead(SubprocessModule):
- """Runs on the head node of a Ray cluster and handles Ray Jobs APIs.
- NOTE(architkulkarni): Please keep this class in sync with the OpenAPI spec at
- `doc/source/cluster/running-applications/job-submission/openapi.yml`.
- We currently do not automatically check that the OpenAPI
- spec is in sync with the implementation. If any changes are made to the
- paths in the @route decorators or in the Responses returned by the
- methods (or any nested fields in the Responses), you will need to find the
- corresponding field of the OpenAPI yaml file and update it manually. Also,
- bump the version number in the yaml file and in this class's `get_version`.
- """
- # Time that we sleep while tailing logs while waiting for
- # the supervisor actor to start. We don't know which node
- # to read the logs from until then.
- WAIT_FOR_SUPERVISOR_ACTOR_INTERVAL_S = 1
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self._job_info_client = None
- # To make sure that the internal KV is initialized by getting the lazy property
- assert self.gcs_client is not None
- assert ray.experimental.internal_kv._internal_kv_initialized()
- # It contains all `JobAgentSubmissionClient` that
- # `JobHead` has ever used, and will not be deleted
- # from it unless `JobAgentSubmissionClient` is no
- # longer available (the corresponding agent process is dead)
- # {node_id: JobAgentSubmissionClient}
- self._agents: Dict[NodeID, JobAgentSubmissionClient] = dict()
- async def get_target_agent(
- self, timeout_s: float = WAIT_AVAILABLE_AGENT_TIMEOUT
- ) -> JobAgentSubmissionClient:
- """
- Get a `JobAgentSubmissionClient`, which is a client for interacting with jobs
- via an agent process.
- Args:
- timeout_s: The timeout for the operation.
- Returns:
- A `JobAgentSubmissionClient` for interacting with jobs via an agent process.
- Raises:
- TimeoutError: If the operation times out.
- """
- return await self._get_head_node_agent(timeout_s)
- async def _get_head_node_agent_once(self) -> JobAgentSubmissionClient:
- head_node_id_hex = await get_head_node_id(self.gcs_client)
- if not head_node_id_hex:
- raise Exception("Head node id has not yet been persisted in GCS")
- head_node_id = NodeID.from_hex(head_node_id_hex)
- if head_node_id not in self._agents:
- ip, http_port, _ = await self._fetch_agent_info(head_node_id)
- agent_http_address = f"http://{build_address(ip, http_port)}"
- self._agents[head_node_id] = JobAgentSubmissionClient(agent_http_address)
- return self._agents[head_node_id]
- async def _get_head_node_agent(self, timeout_s: float) -> JobAgentSubmissionClient:
- """Retrieves HTTP client for `JobAgent` running on the Head node. If the head
- node does not have an agent, it will retry every
- `TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS` seconds indefinitely.
- Args:
- timeout_s: The timeout for the operation.
- Returns:
- A `JobAgentSubmissionClient` for interacting with jobs via the head node's agent process.
- Raises:
- TimeoutError: If the operation times out.
- """
- timeout_point = time.time() + timeout_s
- exception = None
- while time.time() < timeout_point:
- try:
- return await self._get_head_node_agent_once()
- except Exception as e:
- exception = e
- logger.exception(
- f"Failed to get head node agent, retrying in {TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS} seconds..."
- )
- await asyncio.sleep(TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS)
- raise TimeoutError(
- f"Failed to get head node agent within {timeout_s} seconds. The last exception is {exception}"
- )
- async def _fetch_agent_info(self, target_node_id: NodeID) -> Tuple[str, int, int]:
- """
- Fetches agent info by the Node ID. May raise exception if there's network error or the
- agent info is not found.
- Returns: (ip, http_port, grpc_port)
- """
- key = f"{DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX}{target_node_id.hex()}"
- value = await self.gcs_client.async_internal_kv_get(
- key,
- namespace=KV_NAMESPACE_DASHBOARD,
- timeout=GCS_RPC_TIMEOUT_SECONDS,
- )
- if not value:
- raise KeyError(
- f"Agent info not found in internal KV for node {target_node_id}. "
- "It's possible that the agent didn't launch successfully due to "
- "port conflicts or other issues. Please check `dashboard_agent.log` "
- "for more details."
- )
- return json.loads(value.decode())
- @routes.get("/api/version")
- async def get_version(self, req: Request) -> Response:
- # NOTE(edoakes): CURRENT_VERSION should be bumped and checked on the
- # client when we have backwards-incompatible changes.
- resp = VersionResponse(
- version=CURRENT_VERSION,
- ray_version=ray.__version__,
- ray_commit=ray.__commit__,
- session_name=self.session_name,
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)),
- content_type="application/json",
- status=aiohttp.web.HTTPOk.status_code,
- )
- @routes.get("/api/packages/{protocol}/{package_name}")
- async def get_package(self, req: Request) -> Response:
- package_uri = http_uri_components_to_uri(
- protocol=req.match_info["protocol"],
- package_name=req.match_info["package_name"],
- )
- logger.debug(f"Adding temporary reference to package {package_uri}.")
- try:
- pin_runtime_env_uri(package_uri)
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- if not package_exists(package_uri):
- return Response(
- text=f"Package {package_uri} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- return Response()
- @routes.put("/api/packages/{protocol}/{package_name}")
- async def upload_package(self, req: Request):
- package_uri = http_uri_components_to_uri(
- protocol=req.match_info["protocol"],
- package_name=req.match_info["package_name"],
- )
- logger.info(f"Uploading package {package_uri} to the GCS.")
- try:
- data = await req.read()
- await get_or_create_event_loop().run_in_executor(
- None,
- upload_package_to_gcs,
- package_uri,
- data,
- )
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- return Response(status=aiohttp.web.HTTPOk.status_code)
- @routes.post("/api/jobs/")
- async def submit_job(self, req: Request) -> Response:
- result = await parse_and_validate_request(req, JobSubmitRequest)
- # Request parsing failed, returned with Response object.
- if isinstance(result, Response):
- return result
- else:
- submit_request: JobSubmitRequest = result
- try:
- job_agent_client = await self.get_target_agent()
- resp = await job_agent_client.submit_job_internal(submit_request)
- except asyncio.TimeoutError:
- return Response(
- text="No available agent to submit job, please try again later.",
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- except (TypeError, ValueError):
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)),
- content_type="application/json",
- status=aiohttp.web.HTTPOk.status_code,
- )
- @routes.post("/api/jobs/{job_or_submission_id}/stop")
- async def stop_job(self, req: Request) -> Response:
- job_or_submission_id = req.match_info["job_or_submission_id"]
- job = await find_job_by_ids(
- self.gcs_client,
- self._job_info_client,
- job_or_submission_id,
- )
- if not job:
- return Response(
- text=f"Job {job_or_submission_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- if job.type is not JobType.SUBMISSION:
- return Response(
- text="Can only stop submission type jobs",
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- try:
- job_agent_client = await self.get_target_agent()
- resp = await job_agent_client.stop_job_internal(job.submission_id)
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
- )
- @routes.delete("/api/jobs/{job_or_submission_id}")
- async def delete_job(self, req: Request) -> Response:
- job_or_submission_id = req.match_info["job_or_submission_id"]
- job = await find_job_by_ids(
- self.gcs_client,
- self._job_info_client,
- job_or_submission_id,
- )
- if not job:
- return Response(
- text=f"Job {job_or_submission_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- if job.type is not JobType.SUBMISSION:
- return Response(
- text="Can only delete submission type jobs",
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- try:
- job_agent_client = await self.get_target_agent()
- resp = await job_agent_client.delete_job_internal(job.submission_id)
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
- )
- @routes.get("/api/jobs/{job_or_submission_id}")
- async def get_job_info(self, req: Request) -> Response:
- job_or_submission_id = req.match_info["job_or_submission_id"]
- job = await find_job_by_ids(
- self.gcs_client,
- self._job_info_client,
- job_or_submission_id,
- )
- if not job:
- return Response(
- text=f"Job {job_or_submission_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- return Response(
- text=json.dumps(job.dict()),
- content_type="application/json",
- )
- # TODO(rickyx): This endpoint's logic is also mirrored in state API's endpoint.
- # We should eventually unify the backend logic (and keep the logic in sync before
- # that).
- @routes.get("/api/jobs/")
- async def list_jobs(self, req: Request) -> Response:
- (driver_jobs, submission_job_drivers), submission_jobs = await asyncio.gather(
- get_driver_jobs(self.gcs_client), self._job_info_client.get_all_jobs()
- )
- submission_jobs = [
- JobDetails(
- **dataclasses.asdict(job),
- submission_id=submission_id,
- job_id=submission_job_drivers.get(submission_id).id
- if submission_id in submission_job_drivers
- else None,
- driver_info=submission_job_drivers.get(submission_id),
- type=JobType.SUBMISSION,
- )
- for submission_id, job in submission_jobs.items()
- ]
- return Response(
- text=json.dumps(
- [
- *[submission_job.dict() for submission_job in submission_jobs],
- *[job_info.dict() for job_info in driver_jobs.values()],
- ]
- ),
- content_type="application/json",
- )
- @routes.get("/api/jobs/{job_or_submission_id}/logs")
- async def get_job_logs(self, req: Request) -> Response:
- job_or_submission_id = req.match_info["job_or_submission_id"]
- job = await find_job_by_ids(
- self.gcs_client,
- self._job_info_client,
- job_or_submission_id,
- )
- if not job:
- return Response(
- text=f"Job {job_or_submission_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- if job.type is not JobType.SUBMISSION:
- return Response(
- text="Can only get logs of submission type jobs",
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- try:
- job_agent_client = self.get_job_driver_agent_client(job)
- payload = (
- await job_agent_client.get_job_logs_internal(job.submission_id)
- if job_agent_client
- else JobLogsResponse("")
- )
- return Response(
- text=json.dumps(dataclasses.asdict(payload)),
- content_type="application/json",
- )
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- @routes.get(
- "/api/jobs/{job_or_submission_id}/logs/tail", resp_type=ResponseType.WEBSOCKET
- )
- async def tail_job_logs(self, req: Request) -> StreamResponse:
- job_or_submission_id = req.match_info["job_or_submission_id"]
- job = await find_job_by_ids(
- self.gcs_client,
- self._job_info_client,
- job_or_submission_id,
- )
- if not job:
- return Response(
- text=f"Job {job_or_submission_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- if job.type is not JobType.SUBMISSION:
- return Response(
- text="Can only get logs of submission type jobs",
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- ws = aiohttp.web.WebSocketResponse()
- await ws.prepare(req)
- driver_agent_http_address = None
- while driver_agent_http_address is None:
- job = await find_job_by_ids(
- self.gcs_client,
- self._job_info_client,
- job_or_submission_id,
- )
- driver_agent_http_address = job.driver_agent_http_address
- status = job.status
- if status.is_terminal() and driver_agent_http_address is None:
- # Job exited before supervisor actor started.
- return ws
- await asyncio.sleep(self.WAIT_FOR_SUPERVISOR_ACTOR_INTERVAL_S)
- job_agent_client = self.get_job_driver_agent_client(job)
- async for lines in job_agent_client.tail_job_logs(job.submission_id):
- await ws.send_str(lines)
- return ws
- def get_job_driver_agent_client(
- self, job: JobDetails
- ) -> Optional[JobAgentSubmissionClient]:
- if job.driver_agent_http_address is None:
- return None
- driver_node_id = job.driver_node_id
- if driver_node_id not in self._agents:
- self._agents[driver_node_id] = JobAgentSubmissionClient(
- job.driver_agent_http_address
- )
- return self._agents[driver_node_id]
- @routes.get("/api/component_activities")
- async def get_component_activities(
- self, req: aiohttp.web.Request
- ) -> aiohttp.web.Response:
- timeout = req.query.get("timeout", None)
- if timeout and timeout.isdigit():
- timeout = int(timeout)
- else:
- timeout = 30
- # Get activity information for driver
- driver_activity_info = await self._get_job_activity_info(timeout=timeout)
- resp = {"driver": dict(driver_activity_info)}
- if RAY_CLUSTER_ACTIVITY_HOOK in os.environ:
- try:
- cluster_activity_callable = load_class(
- os.environ[RAY_CLUSTER_ACTIVITY_HOOK]
- )
- external_activity_output = cluster_activity_callable()
- assert isinstance(external_activity_output, dict), (
- f"Output of hook {os.environ[RAY_CLUSTER_ACTIVITY_HOOK]} "
- "should be Dict[str, RayActivityResponse]. Got "
- f"output: {external_activity_output}"
- )
- for component_type in external_activity_output:
- try:
- component_activity_output = external_activity_output[
- component_type
- ]
- # Parse and validate output to type RayActivityResponse
- component_activity_output = RayActivityResponse(
- **dict(component_activity_output)
- )
- resp[component_type] = dict(component_activity_output)
- except Exception as e:
- logger.exception(
- f"Failed to get activity status of {component_type} "
- f"from user hook {os.environ[RAY_CLUSTER_ACTIVITY_HOOK]}."
- )
- resp[component_type] = {
- "is_active": RayActivityStatus.ERROR,
- "reason": repr(e),
- "timestamp": datetime.now().timestamp(),
- }
- except Exception as e:
- logger.exception(
- "Failed to get activity status from user "
- f"hook {os.environ[RAY_CLUSTER_ACTIVITY_HOOK]}."
- )
- resp["external_component"] = {
- "is_active": RayActivityStatus.ERROR,
- "reason": repr(e),
- "timestamp": datetime.now().timestamp(),
- }
- return aiohttp.web.Response(
- text=json.dumps(resp),
- content_type="application/json",
- status=aiohttp.web.HTTPOk.status_code,
- )
- async def _get_job_activity_info(self, timeout: int) -> RayActivityResponse:
- # Returns if there is Ray activity from drivers (job).
- # Drivers in namespaces that start with _ray_internal_ are not
- # considered activity.
- # This includes the _ray_internal_dashboard job that gets automatically
- # created with every cluster
- try:
- reply = await self.gcs_client.async_get_all_job_info(
- skip_submission_job_info_field=True,
- skip_is_running_tasks_field=True,
- timeout=timeout,
- )
- num_active_drivers = 0
- latest_job_end_time = 0
- for job_table_entry in reply.values():
- is_dead = bool(job_table_entry.is_dead)
- in_internal_namespace = job_table_entry.config.ray_namespace.startswith(
- "_ray_internal_"
- )
- latest_job_end_time = (
- max(latest_job_end_time, job_table_entry.end_time)
- if job_table_entry.end_time
- else latest_job_end_time
- )
- if not is_dead and not in_internal_namespace:
- num_active_drivers += 1
- current_timestamp = datetime.now().timestamp()
- # Latest job end time must be before or equal to the current timestamp.
- # Job end times may be provided in epoch milliseconds. Check if this
- # is true, and convert to seconds
- if latest_job_end_time > current_timestamp:
- latest_job_end_time = latest_job_end_time / 1000
- assert current_timestamp >= latest_job_end_time, (
- f"Most recent job end time {latest_job_end_time} must be "
- f"before or equal to the current timestamp {current_timestamp}"
- )
- is_active = (
- RayActivityStatus.ACTIVE
- if num_active_drivers > 0
- else RayActivityStatus.INACTIVE
- )
- return RayActivityResponse(
- is_active=is_active,
- reason=f"Number of active drivers: {num_active_drivers}"
- if num_active_drivers
- else None,
- timestamp=current_timestamp,
- # If latest_job_end_time == 0, no jobs have finished yet so don't
- # populate last_activity_at
- last_activity_at=latest_job_end_time if latest_job_end_time else None,
- )
- except Exception as e:
- logger.exception("Failed to get activity status of Ray drivers.")
- return RayActivityResponse(
- is_active=RayActivityStatus.ERROR,
- reason=repr(e),
- timestamp=datetime.now().timestamp(),
- )
- async def run(self):
- await super().run()
- if not self._job_info_client:
- self._job_info_client = JobInfoStorageClient(self.gcs_client)
|