common.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. import asyncio
  2. import json
  3. import logging
  4. import time
  5. from dataclasses import asdict, dataclass, replace
  6. from enum import Enum
  7. from pathlib import Path
  8. from typing import Any, Dict, Optional, Tuple, Union
  9. from ray._private import ray_constants
  10. from ray._private.event.export_event_logger import (
  11. EventLogType,
  12. check_export_api_enabled,
  13. get_export_event_logger,
  14. )
  15. from ray._private.runtime_env.packaging import parse_uri
  16. from ray._raylet import RAY_INTERNAL_NAMESPACE_PREFIX, GcsClient
  17. from ray.core.generated.export_event_pb2 import ExportEvent
  18. from ray.core.generated.export_submission_job_event_pb2 import (
  19. ExportSubmissionJobEventData,
  20. )
  21. from ray.util.annotations import PublicAPI
  22. # NOTE(edoakes): these constants should be considered a public API because
  23. # they're exposed in the snapshot API.
  24. JOB_ID_METADATA_KEY = "job_submission_id"
  25. JOB_NAME_METADATA_KEY = "job_name"
  26. JOB_ACTOR_NAME_TEMPLATE = f"{RAY_INTERNAL_NAMESPACE_PREFIX}job_actor_" + "{job_id}"
  27. # In order to get information about SupervisorActors launched by different jobs,
  28. # they must be set to the same namespace.
  29. SUPERVISOR_ACTOR_RAY_NAMESPACE = "SUPERVISOR_ACTOR_RAY_NAMESPACE"
  30. JOB_LOGS_PATH_TEMPLATE = "job-driver-{submission_id}.log"
  31. logger = logging.getLogger(__name__)
  32. @PublicAPI(stability="stable")
  33. class JobStatus(str, Enum):
  34. """An enumeration for describing the status of a job."""
  35. #: The job has not started yet, likely waiting for the runtime_env to be set up.
  36. PENDING = "PENDING"
  37. #: The job is currently running.
  38. RUNNING = "RUNNING"
  39. #: The job was intentionally stopped by the user.
  40. STOPPED = "STOPPED"
  41. #: The job finished successfully.
  42. SUCCEEDED = "SUCCEEDED"
  43. #: The job failed.
  44. FAILED = "FAILED"
  45. def __str__(self) -> str:
  46. return f"{self.value}"
  47. def is_terminal(self) -> bool:
  48. """Return whether or not this status is terminal.
  49. A terminal status is one that cannot transition to any other status.
  50. The terminal statuses are "STOPPED", "SUCCEEDED", and "FAILED".
  51. Returns:
  52. True if this status is terminal, otherwise False.
  53. """
  54. return self.value in {"STOPPED", "SUCCEEDED", "FAILED"}
  55. @PublicAPI(stability="stable")
  56. class JobErrorType(str, Enum):
  57. """An enumeration for describing the error type of a job."""
  58. # Runtime environment failed to be set up
  59. RUNTIME_ENV_SETUP_FAILURE = "RUNTIME_ENV_SETUP_FAILURE"
  60. # Job supervisor actor launched, but job failed to start within timeout
  61. JOB_SUPERVISOR_ACTOR_START_TIMEOUT = "JOB_SUPERVISOR_ACTOR_START_TIMEOUT"
  62. # Job supervisor actor failed to start
  63. JOB_SUPERVISOR_ACTOR_START_FAILURE = "JOB_SUPERVISOR_ACTOR_START_FAILURE"
  64. # Job supervisor actor failed to be scheduled
  65. JOB_SUPERVISOR_ACTOR_UNSCHEDULABLE = "JOB_SUPERVISOR_ACTOR_UNSCHEDULABLE"
  66. # Job supervisor actor failed for unknown exception
  67. JOB_SUPERVISOR_ACTOR_UNKNOWN_FAILURE = "JOB_SUPERVISOR_ACTOR_UNKNOWN_FAILURE"
  68. # Job supervisor actor died
  69. JOB_SUPERVISOR_ACTOR_DIED = "JOB_SUPERVISOR_ACTOR_DIED"
  70. # Job driver script failed to start due to exception
  71. JOB_ENTRYPOINT_COMMAND_START_ERROR = "JOB_ENTRYPOINT_COMMAND_START_ERROR"
  72. # Job driver script failed due to non-zero exit code
  73. JOB_ENTRYPOINT_COMMAND_ERROR = "JOB_ENTRYPOINT_COMMAND_ERROR"
  74. # TODO(aguo): Convert to pydantic model
  75. @PublicAPI(stability="stable")
  76. @dataclass
  77. class JobInfo:
  78. """A class for recording information associated with a job and its execution.
  79. Please keep this in sync with the JobsAPIInfo proto in src/ray/protobuf/gcs.proto.
  80. """
  81. #: The status of the job.
  82. status: JobStatus
  83. #: The entrypoint command for this job.
  84. entrypoint: str
  85. #: A message describing the status in more detail.
  86. message: Optional[str] = None
  87. #: Internal error, user script error
  88. error_type: Optional[JobErrorType] = None
  89. #: The time when the job was started. A Unix timestamp in ms.
  90. start_time: Optional[int] = None
  91. #: The time when the job moved into a terminal state. A Unix timestamp in ms.
  92. end_time: Optional[int] = None
  93. #: Arbitrary user-provided metadata for the job.
  94. metadata: Optional[Dict[str, str]] = None
  95. #: The runtime environment for the job.
  96. runtime_env: Optional[Dict[str, Any]] = None
  97. #: The quantity of CPU cores to reserve for the entrypoint command.
  98. entrypoint_num_cpus: Optional[Union[int, float]] = None
  99. #: The number of GPUs to reserve for the entrypoint command.
  100. entrypoint_num_gpus: Optional[Union[int, float]] = None
  101. #: The amount of memory for workers requesting memory for the entrypoint command.
  102. entrypoint_memory: Optional[int] = None
  103. #: The quantity of various custom resources to reserve for the entrypoint command.
  104. entrypoint_resources: Optional[Dict[str, float]] = None
  105. #: Driver agent http address
  106. driver_agent_http_address: Optional[str] = None
  107. #: The node id that driver running on. It will be None only when the job status
  108. # is PENDING, and this field will not be deleted or modified even if the driver dies
  109. driver_node_id: Optional[str] = None
  110. #: The driver process exit code after the driver executed. Return None if driver
  111. #: doesn't finish executing
  112. driver_exit_code: Optional[int] = None
  113. def __post_init__(self):
  114. if isinstance(self.status, str):
  115. self.status = JobStatus(self.status)
  116. if self.message is None:
  117. if self.status == JobStatus.PENDING:
  118. self.message = "Job has not started yet."
  119. if any(
  120. [
  121. self.entrypoint_num_cpus is not None
  122. and self.entrypoint_num_cpus > 0,
  123. self.entrypoint_num_gpus is not None
  124. and self.entrypoint_num_gpus > 0,
  125. self.entrypoint_memory is not None
  126. and self.entrypoint_memory > 0,
  127. self.entrypoint_resources not in [None, {}],
  128. ]
  129. ):
  130. self.message += (
  131. " It may be waiting for resources "
  132. "(CPUs, GPUs, memory, custom resources) to become available."
  133. )
  134. if self.runtime_env not in [None, {}]:
  135. self.message += (
  136. " It may be waiting for the runtime environment to be set up."
  137. )
  138. elif self.status == JobStatus.RUNNING:
  139. self.message = "Job is currently running."
  140. elif self.status == JobStatus.STOPPED:
  141. self.message = "Job was intentionally stopped."
  142. elif self.status == JobStatus.SUCCEEDED:
  143. self.message = "Job finished successfully."
  144. elif self.status == JobStatus.FAILED:
  145. self.message = "Job failed."
  146. def to_json(self) -> Dict[str, Any]:
  147. """Convert this object to a JSON-serializable dictionary.
  148. Note that the runtime_env field is converted to a JSON-serialized string
  149. and the field is renamed to runtime_env_json.
  150. Returns:
  151. A JSON-serializable dictionary representing the JobInfo object.
  152. """
  153. json_dict = asdict(self)
  154. # Convert enum values to strings.
  155. json_dict["status"] = str(json_dict["status"])
  156. json_dict["error_type"] = (
  157. json_dict["error_type"].value if json_dict.get("error_type") else None
  158. )
  159. # Convert runtime_env to a JSON-serialized string.
  160. if "runtime_env" in json_dict:
  161. if json_dict["runtime_env"] is not None:
  162. json_dict["runtime_env_json"] = json.dumps(json_dict["runtime_env"])
  163. del json_dict["runtime_env"]
  164. # Assert that the dictionary is JSON-serializable.
  165. json.dumps(json_dict)
  166. return json_dict
  167. @classmethod
  168. def from_json(cls, json_dict: Dict[str, Any]) -> None:
  169. """Initialize this object from a JSON dictionary.
  170. Note that the runtime_env_json field is converted to a dictionary and
  171. the field is renamed to runtime_env.
  172. Args:
  173. json_dict: A JSON dictionary to use to initialize the JobInfo object.
  174. """
  175. # Convert enum values to enum objects.
  176. json_dict["status"] = JobStatus(json_dict["status"])
  177. json_dict["error_type"] = (
  178. JobErrorType(json_dict["error_type"])
  179. if json_dict.get("error_type")
  180. else None
  181. )
  182. # Convert runtime_env from a JSON-serialized string to a dictionary.
  183. if "runtime_env_json" in json_dict:
  184. if json_dict["runtime_env_json"] is not None:
  185. json_dict["runtime_env"] = json.loads(json_dict["runtime_env_json"])
  186. del json_dict["runtime_env_json"]
  187. return cls(**json_dict)
  188. class JobInfoStorageClient:
  189. """
  190. Interface to put and get job data from the Internal KV store.
  191. """
  192. # Please keep this format in sync with JobDataKey()
  193. # in src/ray/gcs/gcs_server/gcs_job_manager.h.
  194. JOB_DATA_KEY_PREFIX = f"{RAY_INTERNAL_NAMESPACE_PREFIX}job_info_"
  195. JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}"
  196. def __init__(
  197. self,
  198. gcs_client: GcsClient,
  199. export_event_log_dir_root: Optional[str] = None,
  200. ):
  201. """
  202. Initialize the JobInfoStorageClient which manages data in the internal KV store.
  203. Export Submission Job events are written when the KV store is updated if
  204. the feature flag is on and a export_event_log_dir_root is passed.
  205. export_event_log_dir_root doesn't need to be passed if the caller
  206. is not modifying data in the KV store.
  207. """
  208. self._gcs_client = gcs_client
  209. self._export_submission_job_event_logger: logging.Logger = None
  210. try:
  211. if (
  212. check_export_api_enabled(ExportEvent.SourceType.EXPORT_SUBMISSION_JOB)
  213. and export_event_log_dir_root is not None
  214. ):
  215. self._export_submission_job_event_logger = get_export_event_logger(
  216. EventLogType.SUBMISSION_JOB,
  217. export_event_log_dir_root,
  218. )
  219. except Exception:
  220. logger.exception(
  221. "Unable to initialize export event logger so no export "
  222. "events will be written."
  223. )
  224. async def put_info(
  225. self,
  226. job_id: str,
  227. job_info: JobInfo,
  228. overwrite: bool = True,
  229. timeout: Optional[int] = 30,
  230. ) -> bool:
  231. """Put job info to the internal kv store.
  232. Args:
  233. job_id: The job id.
  234. job_info: The job info.
  235. overwrite: Whether to overwrite the existing job info.
  236. timeout: The timeout in seconds for the GCS operation.
  237. Returns:
  238. True if a new key is added.
  239. """
  240. added_num = await self._gcs_client.async_internal_kv_put(
  241. self.JOB_DATA_KEY.format(job_id=job_id).encode(),
  242. json.dumps(job_info.to_json()).encode(),
  243. overwrite,
  244. namespace=ray_constants.KV_NAMESPACE_JOB,
  245. timeout=timeout,
  246. )
  247. if added_num == 1 or overwrite:
  248. # Write export event if data was updated in the KV store
  249. try:
  250. self._write_submission_job_export_event(job_id, job_info)
  251. except Exception:
  252. logger.exception("Error while writing job submission export event.")
  253. return added_num == 1
  254. def _write_submission_job_export_event(
  255. self, job_id: str, job_info: JobInfo
  256. ) -> None:
  257. """
  258. Write Submission Job export event if _export_submission_job_event_logger
  259. exists. The logger will exist if the export API feature flag is enabled
  260. and a log directory was passed to JobInfoStorageClient.
  261. """
  262. if not self._export_submission_job_event_logger:
  263. return
  264. status_value_descriptor = (
  265. ExportSubmissionJobEventData.JobStatus.DESCRIPTOR.values_by_name.get(
  266. job_info.status.name
  267. )
  268. )
  269. if status_value_descriptor is None:
  270. logger.error(
  271. f"{job_info.status.name} is not a valid "
  272. "ExportSubmissionJobEventData.JobStatus enum value. This event "
  273. "will not be written."
  274. )
  275. return
  276. job_status = status_value_descriptor.number
  277. submission_event_data = ExportSubmissionJobEventData(
  278. submission_job_id=job_id,
  279. status=job_status,
  280. entrypoint=job_info.entrypoint,
  281. message=job_info.message,
  282. metadata=job_info.metadata,
  283. error_type=job_info.error_type,
  284. start_time=job_info.start_time,
  285. end_time=job_info.end_time,
  286. runtime_env_json=json.dumps(job_info.runtime_env),
  287. driver_agent_http_address=job_info.driver_agent_http_address,
  288. driver_node_id=job_info.driver_node_id,
  289. driver_exit_code=job_info.driver_exit_code,
  290. )
  291. self._export_submission_job_event_logger.send_event(submission_event_data)
  292. async def get_info(self, job_id: str, timeout: int = 30) -> Optional[JobInfo]:
  293. serialized_info = await self._gcs_client.async_internal_kv_get(
  294. self.JOB_DATA_KEY.format(job_id=job_id).encode(),
  295. namespace=ray_constants.KV_NAMESPACE_JOB,
  296. timeout=timeout,
  297. )
  298. if serialized_info is None:
  299. return None
  300. else:
  301. return JobInfo.from_json(json.loads(serialized_info))
  302. async def delete_info(self, job_id: str, timeout: int = 30):
  303. await self._gcs_client.async_internal_kv_del(
  304. self.JOB_DATA_KEY.format(job_id=job_id).encode(),
  305. False,
  306. namespace=ray_constants.KV_NAMESPACE_JOB,
  307. timeout=timeout,
  308. )
  309. async def put_status(
  310. self,
  311. job_id: str,
  312. status: JobStatus,
  313. message: Optional[str] = None,
  314. driver_exit_code: Optional[int] = None,
  315. error_type: Optional[JobErrorType] = None,
  316. jobinfo_replace_kwargs: Optional[Dict[str, Any]] = None,
  317. timeout: Optional[int] = 30,
  318. ):
  319. """Puts or updates job status. Sets end_time if status is terminal."""
  320. old_info = await self.get_info(job_id, timeout=timeout)
  321. if jobinfo_replace_kwargs is None:
  322. jobinfo_replace_kwargs = dict()
  323. jobinfo_replace_kwargs.update(
  324. status=status,
  325. message=message,
  326. driver_exit_code=driver_exit_code,
  327. error_type=error_type,
  328. )
  329. if old_info is not None:
  330. if status != old_info.status and old_info.status.is_terminal():
  331. assert False, "Attempted to change job status from a terminal state."
  332. new_info = replace(old_info, **jobinfo_replace_kwargs)
  333. else:
  334. new_info = JobInfo(
  335. entrypoint="Entrypoint not found.", **jobinfo_replace_kwargs
  336. )
  337. if status.is_terminal():
  338. new_info.end_time = int(time.time() * 1000)
  339. await self.put_info(job_id, new_info, timeout=timeout)
  340. async def get_status(self, job_id: str, timeout: int = 30) -> Optional[JobStatus]:
  341. job_info = await self.get_info(job_id, timeout)
  342. if job_info is None:
  343. return None
  344. else:
  345. return job_info.status
  346. async def get_all_jobs(self, timeout: int = 30) -> Dict[str, JobInfo]:
  347. raw_job_ids_with_prefixes = await self._gcs_client.async_internal_kv_keys(
  348. self.JOB_DATA_KEY_PREFIX.encode(),
  349. namespace=ray_constants.KV_NAMESPACE_JOB,
  350. timeout=timeout,
  351. )
  352. job_ids_with_prefixes = [
  353. job_id.decode() for job_id in raw_job_ids_with_prefixes
  354. ]
  355. job_ids = []
  356. for job_id_with_prefix in job_ids_with_prefixes:
  357. assert job_id_with_prefix.startswith(
  358. self.JOB_DATA_KEY_PREFIX
  359. ), "Unexpected format for internal_kv key for Job submission"
  360. job_ids.append(job_id_with_prefix[len(self.JOB_DATA_KEY_PREFIX) :])
  361. async def get_job_info(job_id: str):
  362. job_info = await self.get_info(job_id, timeout)
  363. return job_id, job_info
  364. return dict(await asyncio.gather(*[get_job_info(job_id) for job_id in job_ids]))
  365. def uri_to_http_components(package_uri: str) -> Tuple[str, str]:
  366. suffix = Path(package_uri).suffix
  367. if suffix not in {".zip", ".whl"}:
  368. raise ValueError(f"package_uri ({package_uri}) does not end in .zip or .whl")
  369. # We need to strip the <protocol>:// prefix to make it possible to pass
  370. # the package_uri over HTTP.
  371. protocol, package_name = parse_uri(package_uri)
  372. return protocol.value, package_name
  373. def http_uri_components_to_uri(protocol: str, package_name: str) -> str:
  374. return f"{protocol}://{package_name}"
  375. def validate_request_type(json_data: Dict[str, Any], request_type: dataclass) -> Any:
  376. return request_type(**json_data)
  377. @dataclass
  378. class JobSubmitRequest:
  379. # Command to start execution, ex: "python script.py"
  380. entrypoint: str
  381. # Optional submission_id to specify for the job. If the submission_id
  382. # is not specified, one will be generated. If a job with the same
  383. # submission_id already exists, it will be rejected.
  384. submission_id: Optional[str] = None
  385. # DEPRECATED. Use submission_id instead
  386. job_id: Optional[str] = None
  387. # Dict to setup execution environment.
  388. runtime_env: Optional[Dict[str, Any]] = None
  389. # Metadata to pass in to the JobConfig.
  390. metadata: Optional[Dict[str, str]] = None
  391. # The quantity of CPU cores to reserve for the execution
  392. # of the entrypoint command, separately from any Ray tasks or actors
  393. # that are created by it.
  394. entrypoint_num_cpus: Optional[Union[int, float]] = None
  395. # The quantity of GPUs to reserve for the execution
  396. # of the entrypoint command, separately from any Ray tasks or actors
  397. # that are created by it.
  398. entrypoint_num_gpus: Optional[Union[int, float]] = None
  399. # The amount of total available memory for workers requesting memory
  400. # for the execution of the entrypoint command, separately from any Ray
  401. # tasks or actors that are created by it.
  402. entrypoint_memory: Optional[int] = None
  403. # The quantity of various custom resources
  404. # to reserve for the entrypoint command, separately from any Ray tasks
  405. # or actors that are created by it.
  406. entrypoint_resources: Optional[Dict[str, float]] = None
  407. # Label selector for the entrypoint command.
  408. entrypoint_label_selector: Optional[Dict[str, str]] = None
  409. def __post_init__(self):
  410. if not isinstance(self.entrypoint, str):
  411. raise TypeError(f"entrypoint must be a string, got {type(self.entrypoint)}")
  412. if self.submission_id is not None and not isinstance(self.submission_id, str):
  413. raise TypeError(
  414. "submission_id must be a string if provided, "
  415. f"got {type(self.submission_id)}"
  416. )
  417. if self.job_id is not None and not isinstance(self.job_id, str):
  418. raise TypeError(
  419. "job_id must be a string if provided, " f"got {type(self.job_id)}"
  420. )
  421. if self.runtime_env is not None:
  422. if not isinstance(self.runtime_env, dict):
  423. raise TypeError(
  424. f"runtime_env must be a dict, got {type(self.runtime_env)}"
  425. )
  426. else:
  427. for k in self.runtime_env.keys():
  428. if not isinstance(k, str):
  429. raise TypeError(
  430. f"runtime_env keys must be strings, got {type(k)}"
  431. )
  432. if self.metadata is not None:
  433. if not isinstance(self.metadata, dict):
  434. raise TypeError(f"metadata must be a dict, got {type(self.metadata)}")
  435. else:
  436. for k in self.metadata.keys():
  437. if not isinstance(k, str):
  438. raise TypeError(f"metadata keys must be strings, got {type(k)}")
  439. for v in self.metadata.values():
  440. if not isinstance(v, str):
  441. raise TypeError(
  442. f"metadata values must be strings, got {type(v)}"
  443. )
  444. if self.entrypoint_num_cpus is not None and not isinstance(
  445. self.entrypoint_num_cpus, (int, float)
  446. ):
  447. raise TypeError(
  448. "entrypoint_num_cpus must be a number, "
  449. f"got {type(self.entrypoint_num_cpus)}"
  450. )
  451. if self.entrypoint_num_gpus is not None and not isinstance(
  452. self.entrypoint_num_gpus, (int, float)
  453. ):
  454. raise TypeError(
  455. "entrypoint_num_gpus must be a number, "
  456. f"got {type(self.entrypoint_num_gpus)}"
  457. )
  458. if self.entrypoint_memory is not None and not isinstance(
  459. self.entrypoint_memory, int
  460. ):
  461. raise TypeError(
  462. "entrypoint_memory must be an integer, "
  463. f"got {type(self.entrypoint_memory)}"
  464. )
  465. if self.entrypoint_resources is not None:
  466. if not isinstance(self.entrypoint_resources, dict):
  467. raise TypeError(
  468. "entrypoint_resources must be a dict, "
  469. f"got {type(self.entrypoint_resources)}"
  470. )
  471. else:
  472. for k in self.entrypoint_resources.keys():
  473. if not isinstance(k, str):
  474. raise TypeError(
  475. "entrypoint_resources keys must be strings, "
  476. f"got {type(k)}"
  477. )
  478. for v in self.entrypoint_resources.values():
  479. if not isinstance(v, (int, float)):
  480. raise TypeError(
  481. "entrypoint_resources values must be numbers, "
  482. f"got {type(v)}"
  483. )
  484. if self.entrypoint_label_selector is not None:
  485. if not isinstance(self.entrypoint_label_selector, dict):
  486. raise TypeError(
  487. "entrypoint_label_selector must be a dict, "
  488. f"got {type(self.entrypoint_label_selector)}"
  489. )
  490. else:
  491. for k, v in self.entrypoint_label_selector.items():
  492. if not isinstance(k, str):
  493. raise TypeError(
  494. "entrypoint_label_selector keys must be strings, "
  495. f"got {type(k)}"
  496. )
  497. if not isinstance(v, str):
  498. raise TypeError(
  499. "entrypoint_label_selector values must be strings, "
  500. f"got {type(v)}"
  501. )
  502. @dataclass
  503. class JobSubmitResponse:
  504. # DEPRECATED: Use submission_id instead.
  505. job_id: str
  506. submission_id: str
  507. @dataclass
  508. class JobStopResponse:
  509. stopped: bool
  510. @dataclass
  511. class JobDeleteResponse:
  512. deleted: bool
  513. # TODO(jiaodong): Support log streaming #19415
  514. @dataclass
  515. class JobLogsResponse:
  516. logs: str