state_aggregator.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. import asyncio
  2. import logging
  3. from concurrent.futures import ThreadPoolExecutor
  4. from itertools import islice
  5. from typing import List, Optional
  6. import ray.dashboard.memory_utils as memory_utils
  7. from ray import NodeID
  8. from ray._common.utils import get_or_create_event_loop
  9. from ray._private.profiling import chrome_tracing_dump
  10. from ray._private.ray_constants import env_integer
  11. from ray.dashboard.state_api_utils import do_filter
  12. from ray.dashboard.utils import compose_state_message
  13. from ray.runtime_env import RuntimeEnv
  14. from ray.util.state.common import (
  15. RAY_MAX_LIMIT_FROM_API_SERVER,
  16. ActorState,
  17. ActorSummaries,
  18. JobState,
  19. ListApiOptions,
  20. ListApiResponse,
  21. NodeState,
  22. ObjectState,
  23. ObjectSummaries,
  24. PlacementGroupState,
  25. RuntimeEnvState,
  26. StateSummary,
  27. SummaryApiOptions,
  28. SummaryApiResponse,
  29. TaskState,
  30. TaskSummaries,
  31. WorkerState,
  32. protobuf_message_to_dict,
  33. protobuf_to_task_state_dict,
  34. )
  35. from ray.util.state.state_manager import DataSourceUnavailable, StateDataSourceClient
  36. logger = logging.getLogger(__name__)
  37. GCS_QUERY_FAILURE_WARNING = (
  38. "Failed to query data from GCS. It is due to "
  39. "(1) GCS is unexpectedly failed. "
  40. "(2) GCS is overloaded. "
  41. "(3) There's an unexpected network issue. "
  42. "Please check the gcs_server.out log to find the root cause."
  43. )
  44. NODE_QUERY_FAILURE_WARNING = (
  45. "Failed to query data from {type}. "
  46. "Queried {total} {type} "
  47. "and {network_failures} {type} failed to reply. It is due to "
  48. "(1) {type} is unexpectedly failed. "
  49. "(2) {type} is overloaded. "
  50. "(3) There's an unexpected network issue. Please check the "
  51. "{log_command} to find the root cause."
  52. )
  53. # TODO(sang): Move the class to state/state_manager.py.
  54. # TODO(sang): Remove *State and replaces with Pydantic or protobuf.
  55. # (depending on API interface standardization).
  56. class StateAPIManager:
  57. """A class to query states from data source, caches, and post-processes
  58. the entries.
  59. """
  60. def __init__(
  61. self,
  62. state_data_source_client: StateDataSourceClient,
  63. thread_pool_executor: ThreadPoolExecutor,
  64. ):
  65. self._client = state_data_source_client
  66. self._thread_pool_executor = thread_pool_executor
  67. @property
  68. def data_source_client(self):
  69. return self._client
  70. async def list_actors(self, *, option: ListApiOptions) -> ListApiResponse:
  71. """List all actor information from the cluster.
  72. Returns:
  73. {actor_id -> actor_data_in_dict}
  74. actor_data_in_dict's schema is in ActorState
  75. """
  76. try:
  77. reply = await self._client.get_all_actor_info(
  78. timeout=option.timeout, filters=option.filters
  79. )
  80. except DataSourceUnavailable:
  81. raise DataSourceUnavailable(GCS_QUERY_FAILURE_WARNING)
  82. def transform(reply) -> ListApiResponse:
  83. result = []
  84. for message in reply.actor_table_data:
  85. # Note: this is different from actor_table_data_to_dict in actor_head.py
  86. # because we set preserving_proto_field_name=True so fields are
  87. # snake_case, while actor_table_data_to_dict in actor_head.py is
  88. # camelCase.
  89. # TODO(ryw): modify actor_table_data_to_dict to use snake_case, and
  90. # consolidate the code.
  91. data = protobuf_message_to_dict(
  92. message=message,
  93. fields_to_decode=[
  94. "actor_id",
  95. "owner_id",
  96. "job_id",
  97. "node_id",
  98. "placement_group_id",
  99. ],
  100. )
  101. result.append(data)
  102. num_after_truncation = len(result) + reply.num_filtered
  103. result = do_filter(result, option.filters, ActorState, option.detail)
  104. num_filtered = len(result)
  105. # Sort to make the output deterministic.
  106. result.sort(key=lambda entry: entry["actor_id"])
  107. result = list(islice(result, option.limit))
  108. return ListApiResponse(
  109. result=result,
  110. total=reply.total,
  111. num_after_truncation=num_after_truncation,
  112. num_filtered=num_filtered,
  113. )
  114. return await get_or_create_event_loop().run_in_executor(
  115. self._thread_pool_executor, transform, reply
  116. )
  117. async def list_placement_groups(self, *, option: ListApiOptions) -> ListApiResponse:
  118. """List all placement group information from the cluster.
  119. Returns:
  120. {pg_id -> pg_data_in_dict}
  121. pg_data_in_dict's schema is in PlacementGroupState
  122. """
  123. try:
  124. reply = await self._client.get_all_placement_group_info(
  125. timeout=option.timeout
  126. )
  127. except DataSourceUnavailable:
  128. raise DataSourceUnavailable(GCS_QUERY_FAILURE_WARNING)
  129. def transform(reply) -> ListApiResponse:
  130. result = []
  131. for message in reply.placement_group_table_data:
  132. data = protobuf_message_to_dict(
  133. message=message,
  134. fields_to_decode=[
  135. "placement_group_id",
  136. "creator_job_id",
  137. "node_id",
  138. ],
  139. )
  140. result.append(data)
  141. num_after_truncation = len(result)
  142. result = do_filter(
  143. result, option.filters, PlacementGroupState, option.detail
  144. )
  145. num_filtered = len(result)
  146. # Sort to make the output deterministic.
  147. result.sort(key=lambda entry: entry["placement_group_id"])
  148. return ListApiResponse(
  149. result=list(islice(result, option.limit)),
  150. total=reply.total,
  151. num_after_truncation=num_after_truncation,
  152. num_filtered=num_filtered,
  153. )
  154. return await get_or_create_event_loop().run_in_executor(
  155. self._thread_pool_executor, transform, reply
  156. )
  157. async def list_nodes(self, *, option: ListApiOptions) -> ListApiResponse:
  158. """List all node information from the cluster.
  159. Returns:
  160. {node_id -> node_data_in_dict}
  161. node_data_in_dict's schema is in NodeState
  162. """
  163. try:
  164. reply = await self._client.get_all_node_info(
  165. timeout=option.timeout, filters=option.filters
  166. )
  167. except DataSourceUnavailable:
  168. raise DataSourceUnavailable(GCS_QUERY_FAILURE_WARNING)
  169. def transform(reply) -> ListApiResponse:
  170. result = []
  171. for message in reply.node_info_list:
  172. data = protobuf_message_to_dict(
  173. message=message, fields_to_decode=["node_id"]
  174. )
  175. data["node_ip"] = data["node_manager_address"]
  176. data["start_time_ms"] = int(data["start_time_ms"])
  177. data["end_time_ms"] = int(data["end_time_ms"])
  178. death_info = data.get("death_info", {})
  179. data["state_message"] = compose_state_message(
  180. death_info.get("reason", None),
  181. death_info.get("reason_message", None),
  182. )
  183. result.append(data)
  184. num_after_truncation = len(result) + reply.num_filtered
  185. result = do_filter(result, option.filters, NodeState, option.detail)
  186. num_filtered = len(result)
  187. # Sort to make the output deterministic.
  188. result.sort(key=lambda entry: entry["node_id"])
  189. result = list(islice(result, option.limit))
  190. return ListApiResponse(
  191. result=result,
  192. total=reply.total,
  193. num_after_truncation=num_after_truncation,
  194. num_filtered=num_filtered,
  195. )
  196. return await get_or_create_event_loop().run_in_executor(
  197. self._thread_pool_executor, transform, reply
  198. )
  199. async def list_workers(self, *, option: ListApiOptions) -> ListApiResponse:
  200. """List all worker information from the cluster.
  201. Returns:
  202. {worker_id -> worker_data_in_dict}
  203. worker_data_in_dict's schema is in WorkerState
  204. """
  205. try:
  206. reply = await self._client.get_all_worker_info(
  207. timeout=option.timeout,
  208. filters=option.filters,
  209. )
  210. except DataSourceUnavailable:
  211. raise DataSourceUnavailable(GCS_QUERY_FAILURE_WARNING)
  212. def transform(reply) -> ListApiResponse:
  213. result = []
  214. for message in reply.worker_table_data:
  215. data = protobuf_message_to_dict(
  216. message=message, fields_to_decode=["worker_id", "node_id"]
  217. )
  218. data["worker_id"] = data["worker_address"]["worker_id"]
  219. data["node_id"] = data["worker_address"]["node_id"]
  220. data["ip"] = data["worker_address"]["ip_address"]
  221. data["start_time_ms"] = int(data["start_time_ms"])
  222. data["end_time_ms"] = int(data["end_time_ms"])
  223. data["worker_launch_time_ms"] = int(data["worker_launch_time_ms"])
  224. data["worker_launched_time_ms"] = int(data["worker_launched_time_ms"])
  225. result.append(data)
  226. num_after_truncation = len(result) + reply.num_filtered
  227. result = do_filter(result, option.filters, WorkerState, option.detail)
  228. num_filtered = len(result)
  229. # Sort to make the output deterministic.
  230. result.sort(key=lambda entry: entry["worker_id"])
  231. result = list(islice(result, option.limit))
  232. return ListApiResponse(
  233. result=result,
  234. total=reply.total,
  235. num_after_truncation=num_after_truncation,
  236. num_filtered=num_filtered,
  237. )
  238. return await get_or_create_event_loop().run_in_executor(
  239. self._thread_pool_executor, transform, reply
  240. )
  241. async def list_jobs(self, *, option: ListApiOptions) -> ListApiResponse:
  242. try:
  243. reply = await self._client.get_job_info(timeout=option.timeout)
  244. except DataSourceUnavailable:
  245. raise DataSourceUnavailable(GCS_QUERY_FAILURE_WARNING)
  246. def transform(reply) -> ListApiResponse:
  247. result = [job.dict() for job in reply]
  248. total = len(result)
  249. result = do_filter(result, option.filters, JobState, option.detail)
  250. num_filtered = len(result)
  251. result.sort(key=lambda entry: entry["job_id"] or "")
  252. result = list(islice(result, option.limit))
  253. return ListApiResponse(
  254. result=result,
  255. total=total,
  256. num_after_truncation=total,
  257. num_filtered=num_filtered,
  258. )
  259. return await get_or_create_event_loop().run_in_executor(
  260. self._thread_pool_executor, transform, reply
  261. )
  262. async def list_tasks(self, *, option: ListApiOptions) -> ListApiResponse:
  263. """List all task information from the cluster.
  264. Returns:
  265. {task_id -> task_data_in_dict}
  266. task_data_in_dict's schema is in TaskState
  267. """
  268. try:
  269. reply = await self._client.get_all_task_info(
  270. timeout=option.timeout,
  271. filters=option.filters,
  272. exclude_driver=option.exclude_driver,
  273. )
  274. except DataSourceUnavailable:
  275. raise DataSourceUnavailable(GCS_QUERY_FAILURE_WARNING)
  276. def transform(reply) -> ListApiResponse:
  277. """
  278. Transforms from proto to dict, applies filters, sorts, and truncates.
  279. This function is executed in a separate thread.
  280. """
  281. result = [
  282. protobuf_to_task_state_dict(message) for message in reply.events_by_task
  283. ]
  284. # Num pre-truncation is the number of tasks returned from
  285. # source + num filtered on source
  286. num_after_truncation = len(result)
  287. num_total = len(result) + reply.num_status_task_events_dropped
  288. # Only certain filters are done on GCS, so here the filter function is still
  289. # needed to apply all the filters
  290. result = do_filter(result, option.filters, TaskState, option.detail)
  291. num_filtered = len(result)
  292. result.sort(key=lambda entry: entry["task_id"])
  293. result = list(islice(result, option.limit))
  294. # TODO(rickyx): we could do better with the warning logic. It's messy now.
  295. return ListApiResponse(
  296. result=result,
  297. total=num_total,
  298. num_after_truncation=num_after_truncation,
  299. num_filtered=num_filtered,
  300. )
  301. # In the error case
  302. if reply.status.code != 0:
  303. return ListApiResponse(
  304. result=[],
  305. total=0,
  306. num_after_truncation=0,
  307. num_filtered=0,
  308. warnings=[reply.status.message],
  309. )
  310. return await get_or_create_event_loop().run_in_executor(
  311. self._thread_pool_executor, transform, reply
  312. )
  313. async def list_objects(self, *, option: ListApiOptions) -> ListApiResponse:
  314. """List all object information from the cluster.
  315. Returns:
  316. {object_id -> object_data_in_dict}
  317. object_data_in_dict's schema is in ObjectState
  318. """
  319. all_node_info_reply = await self._client.get_all_node_info(
  320. timeout=option.timeout,
  321. limit=None,
  322. filters=[("state", "=", "ALIVE")],
  323. )
  324. tasks = [
  325. self._client.get_object_info(
  326. node_info.node_manager_address,
  327. node_info.node_manager_port,
  328. timeout=option.timeout,
  329. )
  330. for node_info in all_node_info_reply.node_info_list
  331. ]
  332. replies = await asyncio.gather(
  333. *tasks,
  334. return_exceptions=True,
  335. )
  336. def transform(replies) -> ListApiResponse:
  337. unresponsive_nodes = 0
  338. worker_stats = []
  339. total_objects = 0
  340. for reply in replies:
  341. if isinstance(reply, DataSourceUnavailable):
  342. unresponsive_nodes += 1
  343. continue
  344. elif isinstance(reply, Exception):
  345. raise reply
  346. total_objects += reply.total
  347. for core_worker_stat in reply.core_workers_stats:
  348. # NOTE: Set preserving_proto_field_name=False here because
  349. # `construct_memory_table` requires a dictionary that has
  350. # modified protobuf name
  351. # (e.g., workerId instead of worker_id) as a key.
  352. worker_stats.append(
  353. protobuf_message_to_dict(
  354. message=core_worker_stat,
  355. fields_to_decode=["object_id"],
  356. preserving_proto_field_name=False,
  357. )
  358. )
  359. partial_failure_warning = None
  360. if len(tasks) > 0 and unresponsive_nodes > 0:
  361. warning_msg = NODE_QUERY_FAILURE_WARNING.format(
  362. type="raylet",
  363. total=len(tasks),
  364. network_failures=unresponsive_nodes,
  365. log_command="raylet.out",
  366. )
  367. if unresponsive_nodes == len(tasks):
  368. raise DataSourceUnavailable(warning_msg)
  369. partial_failure_warning = (
  370. f"The returned data may contain incomplete result. {warning_msg}"
  371. )
  372. result = []
  373. memory_table = memory_utils.construct_memory_table(worker_stats)
  374. for entry in memory_table.table:
  375. data = entry.as_dict()
  376. # `construct_memory_table` returns object_ref field which is indeed
  377. # object_id. We do transformation here.
  378. # TODO(sang): Refactor `construct_memory_table`.
  379. data["object_id"] = data["object_ref"]
  380. del data["object_ref"]
  381. data["ip"] = data["node_ip_address"]
  382. del data["node_ip_address"]
  383. data["type"] = data["type"].upper()
  384. data["task_status"] = (
  385. "NIL" if data["task_status"] == "-" else data["task_status"]
  386. )
  387. result.append(data)
  388. # Add callsite warnings if it is not configured.
  389. callsite_warning = []
  390. callsite_enabled = env_integer("RAY_record_ref_creation_sites", 0)
  391. if not callsite_enabled:
  392. callsite_warning.append(
  393. "Callsite is not being recorded. "
  394. "To record callsite information for each ObjectRef created, set "
  395. "env variable RAY_record_ref_creation_sites=1 during `ray start` "
  396. "and `ray.init`."
  397. )
  398. num_after_truncation = len(result)
  399. result = do_filter(result, option.filters, ObjectState, option.detail)
  400. num_filtered = len(result)
  401. # Sort to make the output deterministic.
  402. result.sort(key=lambda entry: entry["object_id"])
  403. result = list(islice(result, option.limit))
  404. return ListApiResponse(
  405. result=result,
  406. partial_failure_warning=partial_failure_warning,
  407. total=total_objects,
  408. num_after_truncation=num_after_truncation,
  409. num_filtered=num_filtered,
  410. warnings=callsite_warning,
  411. )
  412. return await get_or_create_event_loop().run_in_executor(
  413. self._thread_pool_executor, transform, replies
  414. )
  415. async def list_runtime_envs(self, *, option: ListApiOptions) -> ListApiResponse:
  416. """List all runtime env information from the cluster.
  417. Returns:
  418. A list of runtime env information in the cluster.
  419. The schema of returned "dict" is equivalent to the
  420. `RuntimeEnvState` protobuf message.
  421. We don't have id -> data mapping like other API because runtime env
  422. doesn't have unique ids.
  423. """
  424. live_node_info_reply = await self._client.get_all_node_info(
  425. timeout=option.timeout,
  426. limit=None,
  427. filters=[("state", "=", "ALIVE")],
  428. )
  429. node_infos = [
  430. node_info
  431. for node_info in live_node_info_reply.node_info_list
  432. if node_info.runtime_env_agent_port is not None
  433. ]
  434. tasks = [
  435. self._client.get_runtime_envs_info(
  436. node_info.node_manager_address,
  437. node_info.runtime_env_agent_port,
  438. timeout=option.timeout,
  439. )
  440. for node_info in node_infos
  441. ]
  442. replies = await asyncio.gather(
  443. *tasks,
  444. return_exceptions=True,
  445. )
  446. def transform(replies) -> ListApiResponse:
  447. result = []
  448. unresponsive_nodes = 0
  449. total_runtime_envs = 0
  450. for node_info, reply in zip(node_infos, replies):
  451. if isinstance(reply, DataSourceUnavailable):
  452. unresponsive_nodes += 1
  453. continue
  454. elif isinstance(reply, Exception):
  455. raise reply
  456. total_runtime_envs += reply.total
  457. states = reply.runtime_env_states
  458. for state in states:
  459. data = protobuf_message_to_dict(message=state, fields_to_decode=[])
  460. # Need to deserialize this field.
  461. data["runtime_env"] = RuntimeEnv.deserialize(
  462. data["runtime_env"]
  463. ).to_dict()
  464. data["node_id"] = NodeID(node_info.node_id).hex()
  465. result.append(data)
  466. partial_failure_warning = None
  467. if len(tasks) > 0 and unresponsive_nodes > 0:
  468. warning_msg = NODE_QUERY_FAILURE_WARNING.format(
  469. type="agent",
  470. total=len(tasks),
  471. network_failures=unresponsive_nodes,
  472. log_command="dashboard_agent.log",
  473. )
  474. if unresponsive_nodes == len(tasks):
  475. raise DataSourceUnavailable(warning_msg)
  476. partial_failure_warning = (
  477. f"The returned data may contain incomplete result. {warning_msg}"
  478. )
  479. num_after_truncation = len(result)
  480. result = do_filter(result, option.filters, RuntimeEnvState, option.detail)
  481. num_filtered = len(result)
  482. # Sort to make the output deterministic.
  483. def sort_func(entry):
  484. # If creation time is not there yet (runtime env is failed
  485. # to be created or not created yet, they are the highest priority.
  486. # Otherwise, "bigger" creation time is coming first.
  487. if "creation_time_ms" not in entry:
  488. return float("inf")
  489. elif entry["creation_time_ms"] is None:
  490. return float("inf")
  491. else:
  492. return float(entry["creation_time_ms"])
  493. result.sort(key=sort_func, reverse=True)
  494. result = list(islice(result, option.limit))
  495. return ListApiResponse(
  496. result=result,
  497. partial_failure_warning=partial_failure_warning,
  498. total=total_runtime_envs,
  499. num_after_truncation=num_after_truncation,
  500. num_filtered=num_filtered,
  501. )
  502. return await get_or_create_event_loop().run_in_executor(
  503. self._thread_pool_executor, transform, replies
  504. )
  505. async def summarize_tasks(self, option: SummaryApiOptions) -> SummaryApiResponse:
  506. summary_by = option.summary_by or "func_name"
  507. if summary_by not in ["func_name", "lineage"]:
  508. raise ValueError('summary_by must be one of "func_name" or "lineage".')
  509. # For summary, try getting as many entries as possible to minimze data loss.
  510. result = await self.list_tasks(
  511. option=ListApiOptions(
  512. timeout=option.timeout,
  513. limit=RAY_MAX_LIMIT_FROM_API_SERVER,
  514. filters=option.filters,
  515. detail=summary_by == "lineage",
  516. )
  517. )
  518. if summary_by == "func_name":
  519. summary_results = TaskSummaries.to_summary_by_func_name(tasks=result.result)
  520. else:
  521. # We will need the actors info for actor tasks.
  522. actors = await self.list_actors(
  523. option=ListApiOptions(
  524. timeout=option.timeout,
  525. limit=RAY_MAX_LIMIT_FROM_API_SERVER,
  526. detail=True,
  527. )
  528. )
  529. summary_results = TaskSummaries.to_summary_by_lineage(
  530. tasks=result.result, actors=actors.result
  531. )
  532. summary = StateSummary(node_id_to_summary={"cluster": summary_results})
  533. warnings = result.warnings
  534. if (
  535. summary_results.total_actor_scheduled
  536. + summary_results.total_actor_tasks
  537. + summary_results.total_tasks
  538. < result.num_filtered
  539. ):
  540. warnings = warnings or []
  541. warnings.append(
  542. "There is missing data in this aggregation. "
  543. "Possibly due to task data being evicted to preserve memory."
  544. )
  545. return SummaryApiResponse(
  546. total=result.total,
  547. result=summary,
  548. partial_failure_warning=result.partial_failure_warning,
  549. warnings=warnings,
  550. num_after_truncation=result.num_after_truncation,
  551. num_filtered=result.num_filtered,
  552. )
  553. async def summarize_actors(self, option: SummaryApiOptions) -> SummaryApiResponse:
  554. # For summary, try getting as many entries as possible to minimze data loss.
  555. result = await self.list_actors(
  556. option=ListApiOptions(
  557. timeout=option.timeout,
  558. limit=RAY_MAX_LIMIT_FROM_API_SERVER,
  559. filters=option.filters,
  560. )
  561. )
  562. summary = StateSummary(
  563. node_id_to_summary={
  564. "cluster": ActorSummaries.to_summary(actors=result.result)
  565. }
  566. )
  567. return SummaryApiResponse(
  568. total=result.total,
  569. result=summary,
  570. partial_failure_warning=result.partial_failure_warning,
  571. warnings=result.warnings,
  572. num_after_truncation=result.num_after_truncation,
  573. num_filtered=result.num_filtered,
  574. )
  575. async def summarize_objects(self, option: SummaryApiOptions) -> SummaryApiResponse:
  576. # For summary, try getting as many entries as possible to minimize data loss.
  577. result = await self.list_objects(
  578. option=ListApiOptions(
  579. timeout=option.timeout,
  580. limit=RAY_MAX_LIMIT_FROM_API_SERVER,
  581. filters=option.filters,
  582. )
  583. )
  584. summary = StateSummary(
  585. node_id_to_summary={
  586. "cluster": ObjectSummaries.to_summary(objects=result.result)
  587. }
  588. )
  589. return SummaryApiResponse(
  590. total=result.total,
  591. result=summary,
  592. partial_failure_warning=result.partial_failure_warning,
  593. warnings=result.warnings,
  594. num_after_truncation=result.num_after_truncation,
  595. num_filtered=result.num_filtered,
  596. )
  597. async def generate_task_timeline(self, job_id: Optional[str]) -> List[dict]:
  598. filters = [("job_id", "=", job_id)] if job_id else None
  599. result = await self.list_tasks(
  600. option=ListApiOptions(detail=True, filters=filters, limit=10000)
  601. )
  602. return chrome_tracing_dump(result.result)