job_agent.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. import dataclasses
  2. import json
  3. import logging
  4. import traceback
  5. import aiohttp
  6. from aiohttp.web import Request, Response
  7. import ray
  8. import ray.dashboard.optional_utils as optional_utils
  9. import ray.dashboard.utils as dashboard_utils
  10. from ray.dashboard.modules.job.common import (
  11. JobDeleteResponse,
  12. JobLogsResponse,
  13. JobStopResponse,
  14. JobSubmitRequest,
  15. JobSubmitResponse,
  16. )
  17. from ray.dashboard.modules.job.job_manager import JobManager
  18. from ray.dashboard.modules.job.pydantic_models import JobType
  19. from ray.dashboard.modules.job.utils import find_job_by_ids, parse_and_validate_request
  20. routes = optional_utils.DashboardAgentRouteTable
  21. logger = logging.getLogger(__name__)
  22. class JobAgent(dashboard_utils.DashboardAgentModule):
  23. def __init__(self, dashboard_agent):
  24. super().__init__(dashboard_agent)
  25. self._job_manager = None
  26. @routes.post("/api/job_agent/jobs/")
  27. @optional_utils.init_ray_and_catch_exceptions()
  28. async def submit_job(self, req: Request) -> Response:
  29. result = await parse_and_validate_request(req, JobSubmitRequest)
  30. # Request parsing failed, returned with Response object.
  31. if isinstance(result, Response):
  32. return result
  33. else:
  34. submit_request = result
  35. request_submission_id = submit_request.submission_id or submit_request.job_id
  36. try:
  37. ray._common.usage.usage_lib.record_library_usage("job_submission")
  38. submission_id = await self.get_job_manager().submit_job(
  39. entrypoint=submit_request.entrypoint,
  40. submission_id=request_submission_id,
  41. runtime_env=submit_request.runtime_env,
  42. metadata=submit_request.metadata,
  43. entrypoint_num_cpus=submit_request.entrypoint_num_cpus,
  44. entrypoint_num_gpus=submit_request.entrypoint_num_gpus,
  45. entrypoint_memory=submit_request.entrypoint_memory,
  46. entrypoint_resources=submit_request.entrypoint_resources,
  47. entrypoint_label_selector=submit_request.entrypoint_label_selector,
  48. )
  49. resp = JobSubmitResponse(job_id=submission_id, submission_id=submission_id)
  50. except (TypeError, ValueError):
  51. return Response(
  52. text=traceback.format_exc(),
  53. status=aiohttp.web.HTTPBadRequest.status_code,
  54. )
  55. except Exception:
  56. return Response(
  57. text=traceback.format_exc(),
  58. status=aiohttp.web.HTTPInternalServerError.status_code,
  59. )
  60. return Response(
  61. text=json.dumps(dataclasses.asdict(resp)),
  62. content_type="application/json",
  63. status=aiohttp.web.HTTPOk.status_code,
  64. )
  65. @routes.post("/api/job_agent/jobs/{job_or_submission_id}/stop")
  66. @optional_utils.init_ray_and_catch_exceptions()
  67. async def stop_job(self, req: Request) -> Response:
  68. job_or_submission_id = req.match_info["job_or_submission_id"]
  69. job = await find_job_by_ids(
  70. self._dashboard_agent.gcs_client,
  71. self.get_job_manager().job_info_client(),
  72. job_or_submission_id,
  73. )
  74. if not job:
  75. return Response(
  76. text=f"Job {job_or_submission_id} does not exist",
  77. status=aiohttp.web.HTTPNotFound.status_code,
  78. )
  79. if job.type is not JobType.SUBMISSION:
  80. return Response(
  81. text="Can only stop submission type jobs",
  82. status=aiohttp.web.HTTPBadRequest.status_code,
  83. )
  84. try:
  85. stopped = self.get_job_manager().stop_job(job.submission_id)
  86. resp = JobStopResponse(stopped=stopped)
  87. except Exception:
  88. return Response(
  89. text=traceback.format_exc(),
  90. status=aiohttp.web.HTTPInternalServerError.status_code,
  91. )
  92. return Response(
  93. text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
  94. )
  95. @routes.delete("/api/job_agent/jobs/{job_or_submission_id}")
  96. @optional_utils.init_ray_and_catch_exceptions()
  97. async def delete_job(self, req: Request) -> Response:
  98. job_or_submission_id = req.match_info["job_or_submission_id"]
  99. job = await find_job_by_ids(
  100. self._dashboard_agent.gcs_client,
  101. self.get_job_manager().job_info_client(),
  102. job_or_submission_id,
  103. )
  104. if not job:
  105. return Response(
  106. text=f"Job {job_or_submission_id} does not exist",
  107. status=aiohttp.web.HTTPNotFound.status_code,
  108. )
  109. if job.type is not JobType.SUBMISSION:
  110. return Response(
  111. text="Can only delete submission type jobs",
  112. status=aiohttp.web.HTTPBadRequest.status_code,
  113. )
  114. try:
  115. deleted = await self.get_job_manager().delete_job(job.submission_id)
  116. resp = JobDeleteResponse(deleted=deleted)
  117. except Exception:
  118. return Response(
  119. text=traceback.format_exc(),
  120. status=aiohttp.web.HTTPInternalServerError.status_code,
  121. )
  122. return Response(
  123. text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
  124. )
  125. @routes.get("/api/job_agent/jobs/{job_or_submission_id}/logs")
  126. @optional_utils.init_ray_and_catch_exceptions()
  127. async def get_job_logs(self, req: Request) -> Response:
  128. job_or_submission_id = req.match_info["job_or_submission_id"]
  129. job = await find_job_by_ids(
  130. self._dashboard_agent.gcs_client,
  131. self.get_job_manager().job_info_client(),
  132. job_or_submission_id,
  133. )
  134. if not job:
  135. return Response(
  136. text=f"Job {job_or_submission_id} does not exist",
  137. status=aiohttp.web.HTTPNotFound.status_code,
  138. )
  139. if job.type is not JobType.SUBMISSION:
  140. return Response(
  141. text="Can only get logs of submission type jobs",
  142. status=aiohttp.web.HTTPBadRequest.status_code,
  143. )
  144. resp = JobLogsResponse(
  145. logs=self.get_job_manager().get_job_logs(job.submission_id)
  146. )
  147. return Response(
  148. text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
  149. )
  150. @routes.get("/api/job_agent/jobs/{job_or_submission_id}/logs/tail")
  151. @optional_utils.init_ray_and_catch_exceptions()
  152. async def tail_job_logs(self, req: Request) -> Response:
  153. job_or_submission_id = req.match_info["job_or_submission_id"]
  154. job = await find_job_by_ids(
  155. self._dashboard_agent.gcs_client,
  156. self.get_job_manager().job_info_client(),
  157. job_or_submission_id,
  158. )
  159. if not job:
  160. return Response(
  161. text=f"Job {job_or_submission_id} does not exist",
  162. status=aiohttp.web.HTTPNotFound.status_code,
  163. )
  164. if job.type is not JobType.SUBMISSION:
  165. return Response(
  166. text="Can only get logs of submission type jobs",
  167. status=aiohttp.web.HTTPBadRequest.status_code,
  168. )
  169. ws = aiohttp.web.WebSocketResponse()
  170. await ws.prepare(req)
  171. async for lines in self._job_manager.tail_job_logs(job.submission_id):
  172. await ws.send_str(lines)
  173. return ws
  174. def get_job_manager(self):
  175. if not self._job_manager:
  176. self._job_manager = JobManager(
  177. self._dashboard_agent.gcs_client, self._dashboard_agent.log_dir
  178. )
  179. return self._job_manager
  180. async def run(self, server):
  181. pass
  182. @staticmethod
  183. def is_minimal_module():
  184. return False