job_head.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761
  1. import asyncio
  2. import dataclasses
  3. import enum
  4. import json
  5. import logging
  6. import os
  7. import time
  8. import traceback
  9. from datetime import datetime
  10. from typing import AsyncIterator, Dict, Optional, Tuple
  11. import aiohttp.web
  12. from aiohttp.client import ClientResponse
  13. from aiohttp.web import Request, Response, StreamResponse
  14. import ray
  15. from ray import NodeID
  16. from ray._common.network_utils import build_address
  17. from ray._common.pydantic_compat import BaseModel, Extra, Field, validator
  18. from ray._common.utils import get_or_create_event_loop, load_class
  19. from ray._private.authentication.http_token_authentication import (
  20. get_auth_headers_if_auth_enabled,
  21. )
  22. from ray._private.ray_constants import KV_NAMESPACE_DASHBOARD
  23. from ray._private.runtime_env.packaging import (
  24. package_exists,
  25. pin_runtime_env_uri,
  26. upload_package_to_gcs,
  27. )
  28. from ray.dashboard.consts import (
  29. DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX,
  30. GCS_RPC_TIMEOUT_SECONDS,
  31. RAY_CLUSTER_ACTIVITY_HOOK,
  32. TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS,
  33. WAIT_AVAILABLE_AGENT_TIMEOUT,
  34. )
  35. from ray.dashboard.modules.job.common import (
  36. JobDeleteResponse,
  37. JobInfoStorageClient,
  38. JobLogsResponse,
  39. JobStopResponse,
  40. JobSubmitRequest,
  41. JobSubmitResponse,
  42. http_uri_components_to_uri,
  43. )
  44. from ray.dashboard.modules.job.pydantic_models import JobDetails, JobType
  45. from ray.dashboard.modules.job.utils import (
  46. find_job_by_ids,
  47. get_driver_jobs,
  48. get_head_node_id,
  49. parse_and_validate_request,
  50. )
  51. from ray.dashboard.modules.version import CURRENT_VERSION, VersionResponse
  52. from ray.dashboard.subprocesses.module import SubprocessModule
  53. from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
  54. from ray.dashboard.subprocesses.utils import ResponseType
  55. logger = logging.getLogger(__name__)
  56. logger.setLevel(logging.INFO)
  57. class RayActivityStatus(str, enum.Enum):
  58. ACTIVE = "ACTIVE"
  59. INACTIVE = "INACTIVE"
  60. ERROR = "ERROR"
  61. class RayActivityResponse(BaseModel, extra=Extra.allow):
  62. """
  63. Pydantic model used to inform if a particular Ray component can be considered
  64. active, and metadata about observation.
  65. """
  66. is_active: RayActivityStatus = Field(
  67. ...,
  68. description=(
  69. "Whether the corresponding Ray component is considered active or inactive, "
  70. "or if there was an error while collecting this observation."
  71. ),
  72. )
  73. reason: Optional[str] = Field(
  74. None, description="Reason if Ray component is considered active or errored."
  75. )
  76. timestamp: float = Field(
  77. ...,
  78. description=(
  79. "Timestamp of when this observation about the Ray component was made. "
  80. "This is in the format of seconds since unix epoch."
  81. ),
  82. )
  83. last_activity_at: Optional[float] = Field(
  84. None,
  85. description=(
  86. "Timestamp when last actvity of this Ray component finished in format of "
  87. "seconds since unix epoch. This field does not need to be populated "
  88. "for Ray components where it is not meaningful."
  89. ),
  90. )
  91. @validator("reason", always=True)
  92. def reason_required(cls, v, values, **kwargs):
  93. if "is_active" in values and values["is_active"] != RayActivityStatus.INACTIVE:
  94. if v is None:
  95. raise ValueError(
  96. 'Reason is required if is_active is "active" or "error"'
  97. )
  98. return v
  99. class JobAgentSubmissionClient:
  100. """A local client for submitting and interacting with jobs on a specific node
  101. in the remote cluster.
  102. Submits requests over HTTP to the job agent on the specific node using the REST API.
  103. """
  104. def __init__(
  105. self,
  106. dashboard_agent_address: str,
  107. ):
  108. self._agent_address = dashboard_agent_address
  109. self._session = aiohttp.ClientSession()
  110. def _get_headers(self):
  111. """Get auth headers if token authentication is enabled."""
  112. return get_auth_headers_if_auth_enabled({})
  113. async def _raise_error(self, resp: ClientResponse):
  114. status = resp.status
  115. error_text = await resp.text()
  116. raise RuntimeError(f"Request failed with status code {status}: {error_text}.")
  117. async def submit_job_internal(self, req: JobSubmitRequest) -> JobSubmitResponse:
  118. logger.debug(f"Submitting job with submission_id={req.submission_id}.")
  119. async with self._session.post(
  120. f"{self._agent_address}/api/job_agent/jobs/",
  121. json=dataclasses.asdict(req),
  122. headers=self._get_headers(),
  123. ) as resp:
  124. if resp.status == 200:
  125. result_json = await resp.json()
  126. return JobSubmitResponse(**result_json)
  127. else:
  128. await self._raise_error(resp)
  129. async def stop_job_internal(self, job_id: str) -> JobStopResponse:
  130. logger.debug(f"Stopping job with job_id={job_id}.")
  131. async with self._session.post(
  132. f"{self._agent_address}/api/job_agent/jobs/{job_id}/stop",
  133. headers=self._get_headers(),
  134. ) as resp:
  135. if resp.status == 200:
  136. result_json = await resp.json()
  137. return JobStopResponse(**result_json)
  138. else:
  139. await self._raise_error(resp)
  140. async def delete_job_internal(self, job_id: str) -> JobDeleteResponse:
  141. logger.debug(f"Deleting job with job_id={job_id}.")
  142. async with self._session.delete(
  143. f"{self._agent_address}/api/job_agent/jobs/{job_id}",
  144. headers=self._get_headers(),
  145. ) as resp:
  146. if resp.status == 200:
  147. result_json = await resp.json()
  148. return JobDeleteResponse(**result_json)
  149. else:
  150. await self._raise_error(resp)
  151. async def get_job_logs_internal(self, job_id: str) -> JobLogsResponse:
  152. async with self._session.get(
  153. f"{self._agent_address}/api/job_agent/jobs/{job_id}/logs",
  154. headers=self._get_headers(),
  155. ) as resp:
  156. if resp.status == 200:
  157. result_json = await resp.json()
  158. return JobLogsResponse(**result_json)
  159. else:
  160. await self._raise_error(resp)
  161. async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
  162. """Get an iterator that follows the logs of a job."""
  163. ws = await self._session.ws_connect(
  164. f"{self._agent_address}/api/job_agent/jobs/{job_id}/logs/tail",
  165. headers=self._get_headers(),
  166. )
  167. while True:
  168. msg = await ws.receive()
  169. if msg.type == aiohttp.WSMsgType.TEXT:
  170. yield msg.data
  171. elif msg.type == aiohttp.WSMsgType.CLOSED:
  172. break
  173. elif msg.type == aiohttp.WSMsgType.ERROR:
  174. pass
  175. async def close(self, ignore_error=True):
  176. try:
  177. await self._session.close()
  178. except Exception:
  179. if not ignore_error:
  180. raise
  181. class JobHead(SubprocessModule):
  182. """Runs on the head node of a Ray cluster and handles Ray Jobs APIs.
  183. NOTE(architkulkarni): Please keep this class in sync with the OpenAPI spec at
  184. `doc/source/cluster/running-applications/job-submission/openapi.yml`.
  185. We currently do not automatically check that the OpenAPI
  186. spec is in sync with the implementation. If any changes are made to the
  187. paths in the @route decorators or in the Responses returned by the
  188. methods (or any nested fields in the Responses), you will need to find the
  189. corresponding field of the OpenAPI yaml file and update it manually. Also,
  190. bump the version number in the yaml file and in this class's `get_version`.
  191. """
  192. # Time that we sleep while tailing logs while waiting for
  193. # the supervisor actor to start. We don't know which node
  194. # to read the logs from until then.
  195. WAIT_FOR_SUPERVISOR_ACTOR_INTERVAL_S = 1
  196. def __init__(self, *args, **kwargs):
  197. super().__init__(*args, **kwargs)
  198. self._job_info_client = None
  199. # To make sure that the internal KV is initialized by getting the lazy property
  200. assert self.gcs_client is not None
  201. assert ray.experimental.internal_kv._internal_kv_initialized()
  202. # It contains all `JobAgentSubmissionClient` that
  203. # `JobHead` has ever used, and will not be deleted
  204. # from it unless `JobAgentSubmissionClient` is no
  205. # longer available (the corresponding agent process is dead)
  206. # {node_id: JobAgentSubmissionClient}
  207. self._agents: Dict[NodeID, JobAgentSubmissionClient] = dict()
  208. async def get_target_agent(
  209. self, timeout_s: float = WAIT_AVAILABLE_AGENT_TIMEOUT
  210. ) -> JobAgentSubmissionClient:
  211. """
  212. Get a `JobAgentSubmissionClient`, which is a client for interacting with jobs
  213. via an agent process.
  214. Args:
  215. timeout_s: The timeout for the operation.
  216. Returns:
  217. A `JobAgentSubmissionClient` for interacting with jobs via an agent process.
  218. Raises:
  219. TimeoutError: If the operation times out.
  220. """
  221. return await self._get_head_node_agent(timeout_s)
  222. async def _get_head_node_agent_once(self) -> JobAgentSubmissionClient:
  223. head_node_id_hex = await get_head_node_id(self.gcs_client)
  224. if not head_node_id_hex:
  225. raise Exception("Head node id has not yet been persisted in GCS")
  226. head_node_id = NodeID.from_hex(head_node_id_hex)
  227. if head_node_id not in self._agents:
  228. ip, http_port, _ = await self._fetch_agent_info(head_node_id)
  229. agent_http_address = f"http://{build_address(ip, http_port)}"
  230. self._agents[head_node_id] = JobAgentSubmissionClient(agent_http_address)
  231. return self._agents[head_node_id]
  232. async def _get_head_node_agent(self, timeout_s: float) -> JobAgentSubmissionClient:
  233. """Retrieves HTTP client for `JobAgent` running on the Head node. If the head
  234. node does not have an agent, it will retry every
  235. `TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS` seconds indefinitely.
  236. Args:
  237. timeout_s: The timeout for the operation.
  238. Returns:
  239. A `JobAgentSubmissionClient` for interacting with jobs via the head node's agent process.
  240. Raises:
  241. TimeoutError: If the operation times out.
  242. """
  243. timeout_point = time.time() + timeout_s
  244. exception = None
  245. while time.time() < timeout_point:
  246. try:
  247. return await self._get_head_node_agent_once()
  248. except Exception as e:
  249. exception = e
  250. logger.exception(
  251. f"Failed to get head node agent, retrying in {TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS} seconds..."
  252. )
  253. await asyncio.sleep(TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS)
  254. raise TimeoutError(
  255. f"Failed to get head node agent within {timeout_s} seconds. The last exception is {exception}"
  256. )
  257. async def _fetch_agent_info(self, target_node_id: NodeID) -> Tuple[str, int, int]:
  258. """
  259. Fetches agent info by the Node ID. May raise exception if there's network error or the
  260. agent info is not found.
  261. Returns: (ip, http_port, grpc_port)
  262. """
  263. key = f"{DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX}{target_node_id.hex()}"
  264. value = await self.gcs_client.async_internal_kv_get(
  265. key,
  266. namespace=KV_NAMESPACE_DASHBOARD,
  267. timeout=GCS_RPC_TIMEOUT_SECONDS,
  268. )
  269. if not value:
  270. raise KeyError(
  271. f"Agent info not found in internal KV for node {target_node_id}. "
  272. "It's possible that the agent didn't launch successfully due to "
  273. "port conflicts or other issues. Please check `dashboard_agent.log` "
  274. "for more details."
  275. )
  276. return json.loads(value.decode())
  277. @routes.get("/api/version")
  278. async def get_version(self, req: Request) -> Response:
  279. # NOTE(edoakes): CURRENT_VERSION should be bumped and checked on the
  280. # client when we have backwards-incompatible changes.
  281. resp = VersionResponse(
  282. version=CURRENT_VERSION,
  283. ray_version=ray.__version__,
  284. ray_commit=ray.__commit__,
  285. session_name=self.session_name,
  286. )
  287. return Response(
  288. text=json.dumps(dataclasses.asdict(resp)),
  289. content_type="application/json",
  290. status=aiohttp.web.HTTPOk.status_code,
  291. )
  292. @routes.get("/api/packages/{protocol}/{package_name}")
  293. async def get_package(self, req: Request) -> Response:
  294. package_uri = http_uri_components_to_uri(
  295. protocol=req.match_info["protocol"],
  296. package_name=req.match_info["package_name"],
  297. )
  298. logger.debug(f"Adding temporary reference to package {package_uri}.")
  299. try:
  300. pin_runtime_env_uri(package_uri)
  301. except Exception:
  302. return Response(
  303. text=traceback.format_exc(),
  304. status=aiohttp.web.HTTPInternalServerError.status_code,
  305. )
  306. if not package_exists(package_uri):
  307. return Response(
  308. text=f"Package {package_uri} does not exist",
  309. status=aiohttp.web.HTTPNotFound.status_code,
  310. )
  311. return Response()
  312. @routes.put("/api/packages/{protocol}/{package_name}")
  313. async def upload_package(self, req: Request):
  314. package_uri = http_uri_components_to_uri(
  315. protocol=req.match_info["protocol"],
  316. package_name=req.match_info["package_name"],
  317. )
  318. logger.info(f"Uploading package {package_uri} to the GCS.")
  319. try:
  320. data = await req.read()
  321. await get_or_create_event_loop().run_in_executor(
  322. None,
  323. upload_package_to_gcs,
  324. package_uri,
  325. data,
  326. )
  327. except Exception:
  328. return Response(
  329. text=traceback.format_exc(),
  330. status=aiohttp.web.HTTPInternalServerError.status_code,
  331. )
  332. return Response(status=aiohttp.web.HTTPOk.status_code)
  333. @routes.post("/api/jobs/")
  334. async def submit_job(self, req: Request) -> Response:
  335. result = await parse_and_validate_request(req, JobSubmitRequest)
  336. # Request parsing failed, returned with Response object.
  337. if isinstance(result, Response):
  338. return result
  339. else:
  340. submit_request: JobSubmitRequest = result
  341. try:
  342. job_agent_client = await self.get_target_agent()
  343. resp = await job_agent_client.submit_job_internal(submit_request)
  344. except asyncio.TimeoutError:
  345. return Response(
  346. text="No available agent to submit job, please try again later.",
  347. status=aiohttp.web.HTTPInternalServerError.status_code,
  348. )
  349. except (TypeError, ValueError):
  350. return Response(
  351. text=traceback.format_exc(),
  352. status=aiohttp.web.HTTPBadRequest.status_code,
  353. )
  354. except Exception:
  355. return Response(
  356. text=traceback.format_exc(),
  357. status=aiohttp.web.HTTPInternalServerError.status_code,
  358. )
  359. return Response(
  360. text=json.dumps(dataclasses.asdict(resp)),
  361. content_type="application/json",
  362. status=aiohttp.web.HTTPOk.status_code,
  363. )
  364. @routes.post("/api/jobs/{job_or_submission_id}/stop")
  365. async def stop_job(self, req: Request) -> Response:
  366. job_or_submission_id = req.match_info["job_or_submission_id"]
  367. job = await find_job_by_ids(
  368. self.gcs_client,
  369. self._job_info_client,
  370. job_or_submission_id,
  371. )
  372. if not job:
  373. return Response(
  374. text=f"Job {job_or_submission_id} does not exist",
  375. status=aiohttp.web.HTTPNotFound.status_code,
  376. )
  377. if job.type is not JobType.SUBMISSION:
  378. return Response(
  379. text="Can only stop submission type jobs",
  380. status=aiohttp.web.HTTPBadRequest.status_code,
  381. )
  382. try:
  383. job_agent_client = await self.get_target_agent()
  384. resp = await job_agent_client.stop_job_internal(job.submission_id)
  385. except Exception:
  386. return Response(
  387. text=traceback.format_exc(),
  388. status=aiohttp.web.HTTPInternalServerError.status_code,
  389. )
  390. return Response(
  391. text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
  392. )
  393. @routes.delete("/api/jobs/{job_or_submission_id}")
  394. async def delete_job(self, req: Request) -> Response:
  395. job_or_submission_id = req.match_info["job_or_submission_id"]
  396. job = await find_job_by_ids(
  397. self.gcs_client,
  398. self._job_info_client,
  399. job_or_submission_id,
  400. )
  401. if not job:
  402. return Response(
  403. text=f"Job {job_or_submission_id} does not exist",
  404. status=aiohttp.web.HTTPNotFound.status_code,
  405. )
  406. if job.type is not JobType.SUBMISSION:
  407. return Response(
  408. text="Can only delete submission type jobs",
  409. status=aiohttp.web.HTTPBadRequest.status_code,
  410. )
  411. try:
  412. job_agent_client = await self.get_target_agent()
  413. resp = await job_agent_client.delete_job_internal(job.submission_id)
  414. except Exception:
  415. return Response(
  416. text=traceback.format_exc(),
  417. status=aiohttp.web.HTTPInternalServerError.status_code,
  418. )
  419. return Response(
  420. text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
  421. )
  422. @routes.get("/api/jobs/{job_or_submission_id}")
  423. async def get_job_info(self, req: Request) -> Response:
  424. job_or_submission_id = req.match_info["job_or_submission_id"]
  425. job = await find_job_by_ids(
  426. self.gcs_client,
  427. self._job_info_client,
  428. job_or_submission_id,
  429. )
  430. if not job:
  431. return Response(
  432. text=f"Job {job_or_submission_id} does not exist",
  433. status=aiohttp.web.HTTPNotFound.status_code,
  434. )
  435. return Response(
  436. text=json.dumps(job.dict()),
  437. content_type="application/json",
  438. )
  439. # TODO(rickyx): This endpoint's logic is also mirrored in state API's endpoint.
  440. # We should eventually unify the backend logic (and keep the logic in sync before
  441. # that).
  442. @routes.get("/api/jobs/")
  443. async def list_jobs(self, req: Request) -> Response:
  444. (driver_jobs, submission_job_drivers), submission_jobs = await asyncio.gather(
  445. get_driver_jobs(self.gcs_client), self._job_info_client.get_all_jobs()
  446. )
  447. submission_jobs = [
  448. JobDetails(
  449. **dataclasses.asdict(job),
  450. submission_id=submission_id,
  451. job_id=submission_job_drivers.get(submission_id).id
  452. if submission_id in submission_job_drivers
  453. else None,
  454. driver_info=submission_job_drivers.get(submission_id),
  455. type=JobType.SUBMISSION,
  456. )
  457. for submission_id, job in submission_jobs.items()
  458. ]
  459. return Response(
  460. text=json.dumps(
  461. [
  462. *[submission_job.dict() for submission_job in submission_jobs],
  463. *[job_info.dict() for job_info in driver_jobs.values()],
  464. ]
  465. ),
  466. content_type="application/json",
  467. )
  468. @routes.get("/api/jobs/{job_or_submission_id}/logs")
  469. async def get_job_logs(self, req: Request) -> Response:
  470. job_or_submission_id = req.match_info["job_or_submission_id"]
  471. job = await find_job_by_ids(
  472. self.gcs_client,
  473. self._job_info_client,
  474. job_or_submission_id,
  475. )
  476. if not job:
  477. return Response(
  478. text=f"Job {job_or_submission_id} does not exist",
  479. status=aiohttp.web.HTTPNotFound.status_code,
  480. )
  481. if job.type is not JobType.SUBMISSION:
  482. return Response(
  483. text="Can only get logs of submission type jobs",
  484. status=aiohttp.web.HTTPBadRequest.status_code,
  485. )
  486. try:
  487. job_agent_client = self.get_job_driver_agent_client(job)
  488. payload = (
  489. await job_agent_client.get_job_logs_internal(job.submission_id)
  490. if job_agent_client
  491. else JobLogsResponse("")
  492. )
  493. return Response(
  494. text=json.dumps(dataclasses.asdict(payload)),
  495. content_type="application/json",
  496. )
  497. except Exception:
  498. return Response(
  499. text=traceback.format_exc(),
  500. status=aiohttp.web.HTTPInternalServerError.status_code,
  501. )
  502. @routes.get(
  503. "/api/jobs/{job_or_submission_id}/logs/tail", resp_type=ResponseType.WEBSOCKET
  504. )
  505. async def tail_job_logs(self, req: Request) -> StreamResponse:
  506. job_or_submission_id = req.match_info["job_or_submission_id"]
  507. job = await find_job_by_ids(
  508. self.gcs_client,
  509. self._job_info_client,
  510. job_or_submission_id,
  511. )
  512. if not job:
  513. return Response(
  514. text=f"Job {job_or_submission_id} does not exist",
  515. status=aiohttp.web.HTTPNotFound.status_code,
  516. )
  517. if job.type is not JobType.SUBMISSION:
  518. return Response(
  519. text="Can only get logs of submission type jobs",
  520. status=aiohttp.web.HTTPBadRequest.status_code,
  521. )
  522. ws = aiohttp.web.WebSocketResponse()
  523. await ws.prepare(req)
  524. driver_agent_http_address = None
  525. while driver_agent_http_address is None:
  526. job = await find_job_by_ids(
  527. self.gcs_client,
  528. self._job_info_client,
  529. job_or_submission_id,
  530. )
  531. driver_agent_http_address = job.driver_agent_http_address
  532. status = job.status
  533. if status.is_terminal() and driver_agent_http_address is None:
  534. # Job exited before supervisor actor started.
  535. return ws
  536. await asyncio.sleep(self.WAIT_FOR_SUPERVISOR_ACTOR_INTERVAL_S)
  537. job_agent_client = self.get_job_driver_agent_client(job)
  538. async for lines in job_agent_client.tail_job_logs(job.submission_id):
  539. await ws.send_str(lines)
  540. return ws
  541. def get_job_driver_agent_client(
  542. self, job: JobDetails
  543. ) -> Optional[JobAgentSubmissionClient]:
  544. if job.driver_agent_http_address is None:
  545. return None
  546. driver_node_id = job.driver_node_id
  547. if driver_node_id not in self._agents:
  548. self._agents[driver_node_id] = JobAgentSubmissionClient(
  549. job.driver_agent_http_address
  550. )
  551. return self._agents[driver_node_id]
  552. @routes.get("/api/component_activities")
  553. async def get_component_activities(
  554. self, req: aiohttp.web.Request
  555. ) -> aiohttp.web.Response:
  556. timeout = req.query.get("timeout", None)
  557. if timeout and timeout.isdigit():
  558. timeout = int(timeout)
  559. else:
  560. timeout = 30
  561. # Get activity information for driver
  562. driver_activity_info = await self._get_job_activity_info(timeout=timeout)
  563. resp = {"driver": dict(driver_activity_info)}
  564. if RAY_CLUSTER_ACTIVITY_HOOK in os.environ:
  565. try:
  566. cluster_activity_callable = load_class(
  567. os.environ[RAY_CLUSTER_ACTIVITY_HOOK]
  568. )
  569. external_activity_output = cluster_activity_callable()
  570. assert isinstance(external_activity_output, dict), (
  571. f"Output of hook {os.environ[RAY_CLUSTER_ACTIVITY_HOOK]} "
  572. "should be Dict[str, RayActivityResponse]. Got "
  573. f"output: {external_activity_output}"
  574. )
  575. for component_type in external_activity_output:
  576. try:
  577. component_activity_output = external_activity_output[
  578. component_type
  579. ]
  580. # Parse and validate output to type RayActivityResponse
  581. component_activity_output = RayActivityResponse(
  582. **dict(component_activity_output)
  583. )
  584. resp[component_type] = dict(component_activity_output)
  585. except Exception as e:
  586. logger.exception(
  587. f"Failed to get activity status of {component_type} "
  588. f"from user hook {os.environ[RAY_CLUSTER_ACTIVITY_HOOK]}."
  589. )
  590. resp[component_type] = {
  591. "is_active": RayActivityStatus.ERROR,
  592. "reason": repr(e),
  593. "timestamp": datetime.now().timestamp(),
  594. }
  595. except Exception as e:
  596. logger.exception(
  597. "Failed to get activity status from user "
  598. f"hook {os.environ[RAY_CLUSTER_ACTIVITY_HOOK]}."
  599. )
  600. resp["external_component"] = {
  601. "is_active": RayActivityStatus.ERROR,
  602. "reason": repr(e),
  603. "timestamp": datetime.now().timestamp(),
  604. }
  605. return aiohttp.web.Response(
  606. text=json.dumps(resp),
  607. content_type="application/json",
  608. status=aiohttp.web.HTTPOk.status_code,
  609. )
  610. async def _get_job_activity_info(self, timeout: int) -> RayActivityResponse:
  611. # Returns if there is Ray activity from drivers (job).
  612. # Drivers in namespaces that start with _ray_internal_ are not
  613. # considered activity.
  614. # This includes the _ray_internal_dashboard job that gets automatically
  615. # created with every cluster
  616. try:
  617. reply = await self.gcs_client.async_get_all_job_info(
  618. skip_submission_job_info_field=True,
  619. skip_is_running_tasks_field=True,
  620. timeout=timeout,
  621. )
  622. num_active_drivers = 0
  623. latest_job_end_time = 0
  624. for job_table_entry in reply.values():
  625. is_dead = bool(job_table_entry.is_dead)
  626. in_internal_namespace = job_table_entry.config.ray_namespace.startswith(
  627. "_ray_internal_"
  628. )
  629. latest_job_end_time = (
  630. max(latest_job_end_time, job_table_entry.end_time)
  631. if job_table_entry.end_time
  632. else latest_job_end_time
  633. )
  634. if not is_dead and not in_internal_namespace:
  635. num_active_drivers += 1
  636. current_timestamp = datetime.now().timestamp()
  637. # Latest job end time must be before or equal to the current timestamp.
  638. # Job end times may be provided in epoch milliseconds. Check if this
  639. # is true, and convert to seconds
  640. if latest_job_end_time > current_timestamp:
  641. latest_job_end_time = latest_job_end_time / 1000
  642. assert current_timestamp >= latest_job_end_time, (
  643. f"Most recent job end time {latest_job_end_time} must be "
  644. f"before or equal to the current timestamp {current_timestamp}"
  645. )
  646. is_active = (
  647. RayActivityStatus.ACTIVE
  648. if num_active_drivers > 0
  649. else RayActivityStatus.INACTIVE
  650. )
  651. return RayActivityResponse(
  652. is_active=is_active,
  653. reason=f"Number of active drivers: {num_active_drivers}"
  654. if num_active_drivers
  655. else None,
  656. timestamp=current_timestamp,
  657. # If latest_job_end_time == 0, no jobs have finished yet so don't
  658. # populate last_activity_at
  659. last_activity_at=latest_job_end_time if latest_job_end_time else None,
  660. )
  661. except Exception as e:
  662. logger.exception("Failed to get activity status of Ray drivers.")
  663. return RayActivityResponse(
  664. is_active=RayActivityStatus.ERROR,
  665. reason=repr(e),
  666. timestamp=datetime.now().timestamp(),
  667. )
  668. async def run(self):
  669. await super().run()
  670. if not self._job_info_client:
  671. self._job_info_client = JobInfoStorageClient(self.gcs_client)