| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- import dataclasses
- import json
- import logging
- import traceback
- import aiohttp
- from aiohttp.web import Request, Response
- import ray
- import ray.dashboard.optional_utils as optional_utils
- import ray.dashboard.utils as dashboard_utils
- from ray.dashboard.modules.job.common import (
- JobDeleteResponse,
- JobLogsResponse,
- JobStopResponse,
- JobSubmitRequest,
- JobSubmitResponse,
- )
- from ray.dashboard.modules.job.job_manager import JobManager
- from ray.dashboard.modules.job.pydantic_models import JobType
- from ray.dashboard.modules.job.utils import find_job_by_ids, parse_and_validate_request
- routes = optional_utils.DashboardAgentRouteTable
- logger = logging.getLogger(__name__)
- class JobAgent(dashboard_utils.DashboardAgentModule):
- def __init__(self, dashboard_agent):
- super().__init__(dashboard_agent)
- self._job_manager = None
- @routes.post("/api/job_agent/jobs/")
- @optional_utils.init_ray_and_catch_exceptions()
- async def submit_job(self, req: Request) -> Response:
- result = await parse_and_validate_request(req, JobSubmitRequest)
- # Request parsing failed, returned with Response object.
- if isinstance(result, Response):
- return result
- else:
- submit_request = result
- request_submission_id = submit_request.submission_id or submit_request.job_id
- try:
- ray._common.usage.usage_lib.record_library_usage("job_submission")
- submission_id = await self.get_job_manager().submit_job(
- entrypoint=submit_request.entrypoint,
- submission_id=request_submission_id,
- runtime_env=submit_request.runtime_env,
- metadata=submit_request.metadata,
- entrypoint_num_cpus=submit_request.entrypoint_num_cpus,
- entrypoint_num_gpus=submit_request.entrypoint_num_gpus,
- entrypoint_memory=submit_request.entrypoint_memory,
- entrypoint_resources=submit_request.entrypoint_resources,
- entrypoint_label_selector=submit_request.entrypoint_label_selector,
- )
- resp = JobSubmitResponse(job_id=submission_id, submission_id=submission_id)
- except (TypeError, ValueError):
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)),
- content_type="application/json",
- status=aiohttp.web.HTTPOk.status_code,
- )
- @routes.post("/api/job_agent/jobs/{job_or_submission_id}/stop")
- @optional_utils.init_ray_and_catch_exceptions()
- async def stop_job(self, req: Request) -> Response:
- job_or_submission_id = req.match_info["job_or_submission_id"]
- job = await find_job_by_ids(
- self._dashboard_agent.gcs_client,
- self.get_job_manager().job_info_client(),
- job_or_submission_id,
- )
- if not job:
- return Response(
- text=f"Job {job_or_submission_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- if job.type is not JobType.SUBMISSION:
- return Response(
- text="Can only stop submission type jobs",
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- try:
- stopped = self.get_job_manager().stop_job(job.submission_id)
- resp = JobStopResponse(stopped=stopped)
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
- )
- @routes.delete("/api/job_agent/jobs/{job_or_submission_id}")
- @optional_utils.init_ray_and_catch_exceptions()
- async def delete_job(self, req: Request) -> Response:
- job_or_submission_id = req.match_info["job_or_submission_id"]
- job = await find_job_by_ids(
- self._dashboard_agent.gcs_client,
- self.get_job_manager().job_info_client(),
- job_or_submission_id,
- )
- if not job:
- return Response(
- text=f"Job {job_or_submission_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- if job.type is not JobType.SUBMISSION:
- return Response(
- text="Can only delete submission type jobs",
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- try:
- deleted = await self.get_job_manager().delete_job(job.submission_id)
- resp = JobDeleteResponse(deleted=deleted)
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
- )
- @routes.get("/api/job_agent/jobs/{job_or_submission_id}/logs")
- @optional_utils.init_ray_and_catch_exceptions()
- async def get_job_logs(self, req: Request) -> Response:
- job_or_submission_id = req.match_info["job_or_submission_id"]
- job = await find_job_by_ids(
- self._dashboard_agent.gcs_client,
- self.get_job_manager().job_info_client(),
- job_or_submission_id,
- )
- if not job:
- return Response(
- text=f"Job {job_or_submission_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- if job.type is not JobType.SUBMISSION:
- return Response(
- text="Can only get logs of submission type jobs",
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- resp = JobLogsResponse(
- logs=self.get_job_manager().get_job_logs(job.submission_id)
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
- )
- @routes.get("/api/job_agent/jobs/{job_or_submission_id}/logs/tail")
- @optional_utils.init_ray_and_catch_exceptions()
- async def tail_job_logs(self, req: Request) -> Response:
- job_or_submission_id = req.match_info["job_or_submission_id"]
- job = await find_job_by_ids(
- self._dashboard_agent.gcs_client,
- self.get_job_manager().job_info_client(),
- job_or_submission_id,
- )
- if not job:
- return Response(
- text=f"Job {job_or_submission_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- if job.type is not JobType.SUBMISSION:
- return Response(
- text="Can only get logs of submission type jobs",
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- ws = aiohttp.web.WebSocketResponse()
- await ws.prepare(req)
- async for lines in self._job_manager.tail_job_logs(job.submission_id):
- await ws.send_str(lines)
- return ws
- def get_job_manager(self):
- if not self._job_manager:
- self._job_manager = JobManager(
- self._dashboard_agent.gcs_client, self._dashboard_agent.log_dir
- )
- return self._job_manager
- async def run(self, server):
- pass
- @staticmethod
- def is_minimal_module():
- return False
|