common.py 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743
  1. import datetime
  2. import json
  3. import logging
  4. import sys
  5. import warnings
  6. from abc import ABC
  7. from dataclasses import asdict, field, fields
  8. from enum import Enum, unique
  9. from typing import Any, Dict, List, Optional, Set, Tuple, Union
  10. import ray.dashboard.utils as dashboard_utils
  11. # TODO(aguo): Instead of a version check, modify the below models
  12. # to use pydantic BaseModel instead of dataclass.
  13. # In pydantic 2, dataclass no longer needs the `init=True` kwarg to
  14. # generate an __init__ method. Additionally, it will raise an error if
  15. # it detects `init=True` to be set.
  16. from ray._common.pydantic_compat import IS_PYDANTIC_2
  17. from ray._private.custom_types import (
  18. TypeActorStatus,
  19. TypeNodeStatus,
  20. TypePlacementGroupStatus,
  21. TypeReferenceType,
  22. TypeTaskStatus,
  23. TypeTaskType,
  24. TypeWorkerExitType,
  25. TypeWorkerType,
  26. )
  27. from ray._private.ray_constants import env_integer
  28. from ray.core.generated.common_pb2 import TaskStatus, TaskType
  29. from ray.core.generated.gcs_pb2 import TaskEvents
  30. from ray.dashboard.modules.job.pydantic_models import JobDetails
  31. from ray.util.state.exception import RayStateApiException
  32. try:
  33. from pydantic.dataclasses import dataclass
  34. except ImportError:
  35. # pydantic is not available in the dashboard.
  36. # We will use the dataclass from the standard library.
  37. from dataclasses import dataclass
  38. logger = logging.getLogger(__name__)
  39. DEFAULT_RPC_TIMEOUT = 30
  40. DEFAULT_LIMIT = 100
  41. DEFAULT_LOG_LIMIT = 1000
  42. DEFAULT_DOWNLOAD_FILENAME = "file.txt"
  43. # Max number of entries from API server to the client
  44. RAY_MAX_LIMIT_FROM_API_SERVER = env_integer(
  45. "RAY_MAX_LIMIT_FROM_API_SERVER", 10 * 1000
  46. ) # 10k
  47. # Max number of entries from data sources (rest will be truncated at the
  48. # data source, e.g. raylet)
  49. RAY_MAX_LIMIT_FROM_DATA_SOURCE = env_integer(
  50. "RAY_MAX_LIMIT_FROM_DATA_SOURCE", 10 * 1000
  51. ) # 10k
  52. @unique
  53. class StateResource(Enum):
  54. ACTORS = "actors"
  55. JOBS = "jobs"
  56. PLACEMENT_GROUPS = "placement_groups"
  57. NODES = "nodes"
  58. WORKERS = "workers"
  59. TASKS = "tasks"
  60. OBJECTS = "objects"
  61. RUNTIME_ENVS = "runtime_envs"
  62. CLUSTER_EVENTS = "cluster_events"
  63. @unique
  64. class SummaryResource(Enum):
  65. ACTORS = "actors"
  66. TASKS = "tasks"
  67. OBJECTS = "objects"
  68. SupportedFilterType = Union[str, bool, int, float]
  69. PredicateType = str # Literal["=", "!="]
  70. class Humanify:
  71. """A class containing default methods to
  72. convert units into a human readable string."""
  73. def timestamp(x: float):
  74. """Converts milliseconds to a datetime object."""
  75. return str(datetime.datetime.fromtimestamp(x / 1000))
  76. def memory(x: int):
  77. """Converts raw bytes to a human readable memory size."""
  78. if x >= 2**30:
  79. return str(format(x / (2**30), ".3f")) + " GiB"
  80. elif x >= 2**20:
  81. return str(format(x / (2**20), ".3f")) + " MiB"
  82. elif x >= 2**10:
  83. return str(format(x / (2**10), ".3f")) + " KiB"
  84. return str(format(x, ".3f")) + " B"
  85. def duration(x: int):
  86. """Converts milliseconds to a human readable duration."""
  87. return str(datetime.timedelta(milliseconds=x))
  88. def events(events: List[dict]):
  89. """Converts a list of task events into a human readable format."""
  90. for event in events:
  91. if "created_ms" in event:
  92. event["created_ms"] = Humanify.timestamp(event["created_ms"])
  93. return events
  94. def node_resources(resources: dict):
  95. """Converts a node's resources into a human readable format."""
  96. for resource in resources:
  97. if "memory" in resource:
  98. resources[resource] = Humanify.memory(resources[resource])
  99. return resources
  100. @dataclass(init=not IS_PYDANTIC_2)
  101. class ListApiOptions:
  102. # Maximum number of entries to return
  103. limit: int = DEFAULT_LIMIT
  104. # The timeout for the API call.
  105. timeout: int = DEFAULT_RPC_TIMEOUT
  106. # If True, more detailed output will be printed.
  107. # The API could query more sources than detail == False
  108. # to get more data in detail.
  109. detail: bool = False
  110. # Filters. Each tuple pair (key, predicate, value) means key predicate value.
  111. # If there's more than 1 filter, it means AND.
  112. # E.g., [(key, "=", val), (key2, "!=" val2)] means (key=val) AND (key2!=val2)
  113. filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = field(
  114. default_factory=list
  115. )
  116. # [only tasks] If driver tasks should be excluded.
  117. exclude_driver: bool = True
  118. # When the request is processed on the server side,
  119. # we should apply multiplier so that server side can finish
  120. # processing a request within timeout. Otherwise,
  121. # timeout will always lead Http timeout.
  122. server_timeout_multiplier: float = 0.8
  123. def __post_init__(self):
  124. # To return the data to users, when there's a partial failure
  125. # we need to have a timeout that's smaller than the users' timeout.
  126. # 80% is configured arbitrarily.
  127. self.timeout = max(1, int(self.timeout * self.server_timeout_multiplier))
  128. assert self.timeout != 0, "0 second timeout is not supported."
  129. if self.filters is None:
  130. self.filters = []
  131. for filter in self.filters:
  132. _, filter_predicate, _ = filter
  133. if filter_predicate != "=" and filter_predicate != "!=":
  134. raise ValueError(
  135. f"Unsupported filter predicate {filter_predicate} is given. "
  136. "Available predicates: =, !=."
  137. )
  138. def has_conflicting_filters(self) -> bool:
  139. # Check the filters in the ListApiOptions conflicts. Specifically for:
  140. # - multiple '=' filters with the same key but different values.
  141. # TODO(myan): More conflicts situation can be added for further optimization.
  142. # For example, 2 filters with same key and same value but one with '=' predicate
  143. # and ther other with '!=' predicate
  144. equal_filters = {}
  145. for filter in self.filters:
  146. filter_key, filter_predicate, filter_value = filter
  147. if filter_predicate == "=":
  148. if (
  149. filter_key in equal_filters
  150. and equal_filters[filter_key] != filter_value
  151. ):
  152. warnings.warn(
  153. "There are multiple '=' filters with the same "
  154. f"key '{filter_key}' but different values"
  155. f"'{equal_filters[filter_key]}' & '{filter_value}'. "
  156. "Empty result set will be returned",
  157. UserWarning,
  158. )
  159. return True
  160. elif filter_key not in equal_filters:
  161. equal_filters[filter_key] = filter_value
  162. return False
  163. @dataclass(init=not IS_PYDANTIC_2)
  164. class GetApiOptions:
  165. # Timeout for the HTTP request
  166. timeout: int = DEFAULT_RPC_TIMEOUT
  167. # When the request is processed on the server side,
  168. # we should apply multiplier so that server side can finish
  169. # processing a request within timeout. Otherwise,
  170. # timeout will always lead Http timeout.
  171. server_timeout_multiplier: float = 0.8
  172. def __post_init__(self):
  173. # To return the data to users, when there's a partial failure
  174. # we need to have a timeout that's smaller than the users' timeout.
  175. # 80% is configured arbitrarily.
  176. self.timeout = max(1, int(self.timeout * self.server_timeout_multiplier))
  177. assert self.timeout != 0, "0 second timeout is not supported."
  178. @dataclass(init=not IS_PYDANTIC_2)
  179. class SummaryApiOptions:
  180. # Timeout for the HTTP request
  181. timeout: int = DEFAULT_RPC_TIMEOUT
  182. # Filters. Each tuple pair (key, predicate, value) means key predicate value.
  183. # If there's more than 1 filter, it means AND.
  184. # E.g., [(key, "=", val), (key2, "!=" val2)] means (key=val) AND (key2!=val2)
  185. # For summary endpoints that call list under the hood, we'll pass
  186. # these filters directly into the list call.
  187. filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = field(
  188. default_factory=list
  189. )
  190. # Change out to summarize the output. There is a summary_by value for each entity.
  191. # Tasks: by func_name
  192. # Actors: by class
  193. # Objects: by callsite
  194. summary_by: Optional[str] = None
  195. def state_column(*, filterable: bool, detail: bool = False, format_fn=None, **kwargs):
  196. """A wrapper around dataclass.field to add additional metadata.
  197. The metadata is used to define detail / filterable option of
  198. each column.
  199. Args:
  200. detail: If True, the column is used when detail == True
  201. filterable: If True, the column can be used for filtering.
  202. kwargs: The same kwargs for the `dataclasses.field` function.
  203. """
  204. m = {"detail": detail, "filterable": filterable, "format_fn": format_fn}
  205. # Default for detail field is None since it could be missing.
  206. if detail and "default" not in kwargs:
  207. kwargs["default"] = None
  208. if "metadata" in kwargs:
  209. # Metadata explicitly specified, so add detail and filterable if missing.
  210. kwargs["metadata"].update(m)
  211. else:
  212. # Metadata not explicitly specified, so add it.
  213. kwargs["metadata"] = m
  214. return field(**kwargs)
  215. class StateSchema(ABC):
  216. """Schema class for Ray resource abstraction.
  217. The child class must be dataclass. All child classes
  218. - perform runtime type checking upon initialization.
  219. - are supposed to use `state_column` instead of `field`.
  220. It will allow the class to return filterable/detail columns.
  221. If `state_column` is not specified, that column is not filterable
  222. and for non-detail output.
  223. For example,
  224. ```
  225. @dataclass
  226. class State(StateSchema):
  227. column_a: str
  228. column_b: int = state_column(detail=True, filterable=True)
  229. s = State(column_a="abc", b=1)
  230. # Returns {"column_b"}
  231. s.filterable_columns()
  232. # Returns {"column_a"}
  233. s.base_columns()
  234. # Returns {"column_a", "column_b"}
  235. s.columns()
  236. ```
  237. In addition, the schema also provides a humanify abstract method to
  238. convert the state object into something human readable, ready for printing.
  239. Subclasses should override this method, providing logic to convert its own fields
  240. to something human readable, packaged and returned in a dict.
  241. Each field that wants to be humanified should include a 'format_fn' key in its
  242. metadata dictionary.
  243. """
  244. @classmethod
  245. def humanify(cls, state: dict) -> dict:
  246. """Convert the given state object into something human readable."""
  247. for f in fields(cls):
  248. if (
  249. f.metadata.get("format_fn") is not None
  250. and f.name in state
  251. and state[f.name] is not None
  252. ):
  253. try:
  254. state[f.name] = f.metadata["format_fn"](state[f.name])
  255. except Exception as e:
  256. logger.error(f"Failed to format {f.name}:{state[f.name]} with {e}")
  257. return state
  258. @classmethod
  259. def list_columns(cls, detail: bool = True) -> List[str]:
  260. """Return a list of columns."""
  261. cols = []
  262. for f in fields(cls):
  263. if detail:
  264. cols.append(f.name)
  265. elif not f.metadata.get("detail", False):
  266. cols.append(f.name)
  267. return cols
  268. @classmethod
  269. def columns(cls) -> Set[str]:
  270. """Return a set of all columns."""
  271. return set(cls.list_columns(detail=True))
  272. @classmethod
  273. def filterable_columns(cls) -> Set[str]:
  274. """Return a list of filterable columns"""
  275. filterable = set()
  276. for f in fields(cls):
  277. if f.metadata.get("filterable", False):
  278. filterable.add(f.name)
  279. return filterable
  280. @classmethod
  281. def base_columns(cls) -> Set[str]:
  282. """Return a list of base columns.
  283. Base columns mean columns to return when detail == False.
  284. """
  285. return set(cls.list_columns(detail=False))
  286. @classmethod
  287. def detail_columns(cls) -> Set[str]:
  288. """Return a list of detail columns.
  289. Detail columns mean columns to return when detail == True.
  290. """
  291. return set(cls.list_columns(detail=True))
  292. def asdict(self):
  293. return asdict(self)
  294. # Allow dict like access on the class directly for backward compatibility.
  295. def __getitem__(self, key):
  296. return getattr(self, key)
  297. def __setitem__(self, key, value):
  298. setattr(self, key, value)
  299. def get(self, key, default=None):
  300. return getattr(self, key, default)
  301. def filter_fields(data: dict, state_dataclass: StateSchema, detail: bool) -> dict:
  302. """Filter the given data's columns based on the given schema.
  303. Args:
  304. data: A single data entry to filter columns.
  305. state_dataclass: The schema to filter data.
  306. detail: Whether or not it should include columns for detail output.
  307. """
  308. filtered_data = {}
  309. columns = state_dataclass.columns() if detail else state_dataclass.base_columns()
  310. for col in columns:
  311. if col in data:
  312. filtered_data[col] = data[col]
  313. else:
  314. filtered_data[col] = None
  315. return filtered_data
  316. @dataclass(init=not IS_PYDANTIC_2)
  317. class GetLogOptions:
  318. timeout: int
  319. node_id: Optional[str] = None
  320. node_ip: Optional[str] = None
  321. # One of {file, stream}. File means it will return the whole log.
  322. # stream means it will keep the connection and streaming the log.
  323. media_type: str = "file"
  324. # The filename to match when finding the log to download from the Ray log directory.
  325. # NOTE: This can be a nested path relative to the Ray log directory.
  326. filename: Optional[str] = None
  327. # The filename to download the log as on the client side.
  328. # If not provided, the filename will be "file.txt".
  329. download_filename: str = DEFAULT_DOWNLOAD_FILENAME
  330. # The actor id of the log. It is used only for worker logs.
  331. actor_id: Optional[str] = None
  332. # The task id of the log.
  333. task_id: Optional[str] = None
  334. # The attempt number of the task.
  335. attempt_number: int = 0
  336. # The pid of the log. It is used only for worker logs.
  337. pid: Optional[int] = None
  338. # Total log lines to return.
  339. lines: int = 1000
  340. # The interval where new logs are streamed to.
  341. # Should be used only when media_type == stream.
  342. interval: Optional[float] = None
  343. # The suffix of the log file if file resolution not through filename directly.
  344. # Default to "out".
  345. suffix: str = "out"
  346. # The job submission id for submission job. This doesn't work for driver job
  347. # since Ray doesn't log driver logs to file in the ray logs directory.
  348. submission_id: Optional[str] = None
  349. def __post_init__(self):
  350. if self.pid:
  351. self.pid = int(self.pid)
  352. if self.interval:
  353. self.interval = float(self.interval)
  354. self.lines = int(self.lines)
  355. if self.media_type == "file":
  356. assert self.interval is None
  357. if self.media_type not in ["file", "stream"]:
  358. raise ValueError(f"Invalid media type: {self.media_type}")
  359. if not (self.node_id or self.node_ip) and not (self.actor_id or self.task_id):
  360. raise ValueError(
  361. "node_id or node_ip must be provided as constructor arguments when no "
  362. "actor or task_id is supplied as arguments."
  363. )
  364. if self.node_id and self.node_ip:
  365. raise ValueError(
  366. "Both node_id and node_ip are given. Only one of them can be provided. "
  367. f"Given node id: {self.node_id}, given node ip: {self.node_ip}"
  368. )
  369. if not (
  370. self.actor_id
  371. or self.task_id
  372. or self.pid
  373. or self.filename
  374. or self.submission_id
  375. ):
  376. raise ValueError(
  377. "None of actor_id, task_id, pid, submission_id or filename "
  378. "is provided. At least one of them is required to fetch logs."
  379. )
  380. if self.suffix not in ["out", "err"]:
  381. raise ValueError(
  382. f"Invalid suffix: {self.suffix}. Must be one of 'out' or 'err'."
  383. )
  384. # See the ActorTableData message in gcs.proto for all potential options that
  385. # can be included in this class.
  386. @dataclass(init=not IS_PYDANTIC_2)
  387. class ActorState(StateSchema):
  388. """Actor State"""
  389. #: The id of the actor.
  390. actor_id: str = state_column(filterable=True)
  391. #: The class name of the actor.
  392. class_name: str = state_column(filterable=True)
  393. #: The state of the actor.
  394. #:
  395. #: - DEPENDENCIES_UNREADY: Actor is waiting for dependency to be ready.
  396. #: E.g., a new actor is waiting for object ref that's created from
  397. #: other remote task.
  398. #: - PENDING_CREATION: Actor's dependency is ready, but it is not created yet.
  399. #: It could be because there are not enough resources, too many actor
  400. #: entries in the scheduler queue, or the actor creation is slow
  401. #: (e.g., slow runtime environment creation,
  402. #: slow worker startup, or etc.).
  403. #: - ALIVE: The actor is created, and it is alive.
  404. #: - RESTARTING: The actor is dead, and it is restarting.
  405. #: It is equivalent to `PENDING_CREATION`,
  406. #: but means the actor was dead more than once.
  407. #: - DEAD: The actor is permanatly dead.
  408. state: TypeActorStatus = state_column(filterable=True)
  409. #: The job id of this actor.
  410. job_id: str = state_column(filterable=True)
  411. #: The name of the actor given by the `name` argument.
  412. name: Optional[str] = state_column(filterable=True)
  413. #: The node id of this actor.
  414. #: If the actor is restarting, it could be the node id
  415. #: of the dead actor (and it will be re-updated when
  416. #: the actor is successfully restarted).
  417. node_id: Optional[str] = state_column(filterable=True)
  418. #: The pid of the actor. 0 if it is not created yet.
  419. pid: Optional[int] = state_column(filterable=True)
  420. #: The namespace of the actor.
  421. ray_namespace: Optional[str] = state_column(filterable=True)
  422. #: The runtime environment information of the actor.
  423. serialized_runtime_env: Optional[str] = state_column(filterable=False, detail=True)
  424. #: The resource requirement of the actor.
  425. required_resources: Optional[dict] = state_column(filterable=False, detail=True)
  426. #: Actor's death information in detail. None if the actor is not dead yet.
  427. death_cause: Optional[dict] = state_column(filterable=False, detail=True)
  428. #: True if the actor is detached. False otherwise.
  429. is_detached: Optional[bool] = state_column(filterable=False, detail=True)
  430. #: The placement group id that's associated with this actor.
  431. placement_group_id: Optional[str] = state_column(detail=True, filterable=True)
  432. #: Actor's repr name if a customized __repr__ method exists, else empty string.
  433. repr_name: Optional[str] = state_column(detail=True, filterable=True)
  434. #: Number of restarts that has been tried on this actor.
  435. num_restarts: int = state_column(filterable=False, detail=True)
  436. #: Number of times this actor is restarted due to lineage reconstructions.
  437. num_restarts_due_to_lineage_reconstruction: int = state_column(
  438. filterable=False, detail=True
  439. )
  440. #: Number of times this actor is restarted due to node preemption.
  441. num_restarts_due_to_node_preemption: int = state_column(
  442. filterable=False, detail=True
  443. )
  444. #: The call site of the actor creation.
  445. call_site: Optional[str] = state_column(detail=True, filterable=False)
  446. #: The label selector for the actor.
  447. label_selector: Optional[dict] = state_column(detail=True, filterable=False)
  448. @dataclass(init=not IS_PYDANTIC_2)
  449. class PlacementGroupState(StateSchema):
  450. """PlacementGroup State"""
  451. #: The id of the placement group.
  452. placement_group_id: str = state_column(filterable=True)
  453. #: The name of the placement group if it is given by the name argument.
  454. name: str = state_column(filterable=True)
  455. #: The job id of the placement group.
  456. creator_job_id: str = state_column(filterable=True)
  457. #: The state of the placement group.
  458. #:
  459. #: - PENDING: The placement group creation is pending scheduling.
  460. #: It could be because there's not enough resources, some of creation
  461. #: stage has failed (e.g., failed to commit placement gropus because
  462. #: the node is dead).
  463. #: - CREATED: The placement group is created.
  464. #: - REMOVED: The placement group is removed.
  465. #: - RESCHEDULING: The placement group is rescheduling because some of
  466. #: bundles are dead because they were on dead nodes.
  467. state: TypePlacementGroupStatus = state_column(filterable=True)
  468. #: The bundle specification of the placement group.
  469. bundles: Optional[List[dict]] = state_column(filterable=False, detail=True)
  470. #: True if the placement group is detached. False otherwise.
  471. is_detached: Optional[bool] = state_column(filterable=True, detail=True)
  472. #: The scheduling stats of the placement group.
  473. stats: Optional[dict] = state_column(filterable=False, detail=True)
  474. @dataclass(init=not IS_PYDANTIC_2)
  475. class NodeState(StateSchema):
  476. """Node State"""
  477. #: The id of the node.
  478. node_id: str = state_column(filterable=True)
  479. #: The ip address of the node.
  480. node_ip: str = state_column(filterable=True)
  481. #: If this is a head node.
  482. is_head_node: bool = state_column(filterable=True)
  483. #: The state of the node.
  484. #:
  485. #: ALIVE: The node is alive.
  486. #: DEAD: The node is dead.
  487. state: TypeNodeStatus = state_column(filterable=True)
  488. #: The state message of the node.
  489. #: This provides more detailed information about the node's state.
  490. state_message: Optional[str] = state_column(filterable=False)
  491. #: The name of the node if it is given by the name argument.
  492. node_name: str = state_column(filterable=True)
  493. #: The total resources of the node.
  494. resources_total: dict = state_column(
  495. filterable=False, format_fn=Humanify.node_resources
  496. )
  497. #: The labels of the node.
  498. labels: dict = state_column(filterable=False)
  499. #: The time when the node (raylet) starts.
  500. start_time_ms: Optional[int] = state_column(
  501. filterable=False, detail=True, format_fn=Humanify.timestamp
  502. )
  503. #: The time when the node exits. The timestamp could be delayed
  504. #: if the node is dead unexpectedly (could be delayed
  505. # up to 30 seconds).
  506. end_time_ms: Optional[int] = state_column(
  507. filterable=False, detail=True, format_fn=Humanify.timestamp
  508. )
  509. # NOTE: Declaring this as dataclass would make __init__ not being called properly.
  510. # NOTE: `JobDetails` will be `None` in the minimal install because Pydantic is not
  511. # installed. Inheriting from `None` raises an exception.
  512. class JobState(StateSchema, JobDetails if JobDetails is not None else object):
  513. """The state of the job that's submitted by Ray's Job APIs or driver jobs"""
  514. def __init__(self, **kwargs):
  515. JobDetails.__init__(self, **kwargs)
  516. @classmethod
  517. def filterable_columns(cls) -> Set[str]:
  518. # We are not doing any filtering since filtering is currently done
  519. # at the backend.
  520. return {"job_id", "type", "status", "submission_id"}
  521. @classmethod
  522. def humanify(cls, state: dict) -> dict:
  523. return state
  524. @classmethod
  525. def list_columns(cls, detail: bool = True) -> List[str]:
  526. if not detail:
  527. return [
  528. "job_id",
  529. "submission_id",
  530. "entrypoint",
  531. "type",
  532. "status",
  533. "message",
  534. "error_type",
  535. "driver_info",
  536. ]
  537. if JobDetails is None:
  538. # We don't have pydantic in the dashboard. This is because
  539. # we call this method at module import time, so we need to
  540. # check if the class is a pydantic model.
  541. return []
  542. # TODO(aguo): Once we only support pydantic 2, we can remove this if check.
  543. # In pydantic 2.0, `__fields__` has been renamed to `model_fields`.
  544. return (
  545. list(JobDetails.model_fields.keys())
  546. if hasattr(JobDetails, "model_fields")
  547. else list(JobDetails.__fields__.keys())
  548. )
  549. def asdict(self):
  550. return JobDetails.dict(self)
  551. @classmethod
  552. def schema_dict(cls) -> Dict[str, Any]:
  553. schema_types = cls.schema()["properties"]
  554. # Get type name to actual type mapping.
  555. return {
  556. k: v["type"] for k, v in schema_types.items() if v.get("type") is not None
  557. }
  558. @dataclass(init=not IS_PYDANTIC_2)
  559. class WorkerState(StateSchema):
  560. """Worker State"""
  561. #: The id of the worker.
  562. worker_id: str = state_column(filterable=True)
  563. #: Whether or not if the worker is alive.
  564. is_alive: bool = state_column(filterable=True)
  565. #: The type of the worker.
  566. #:
  567. #: - WORKER: The regular Ray worker process that executes tasks or
  568. # instantiates an actor.
  569. #: - DRIVER: The driver (Python script that calls `ray.init`).
  570. #: - SPILL_WORKER: The worker that spills objects.
  571. #: - RESTORE_WORKER: The worker that restores objects.
  572. worker_type: TypeWorkerType = state_column(filterable=True)
  573. #: The exit type of the worker if the worker is dead.
  574. #:
  575. #: - SYSTEM_ERROR: Worker exit due to system level failures (i.e. worker crash).
  576. #: - INTENDED_SYSTEM_EXIT: System-level exit that is intended. E.g.,
  577. #: Workers are killed because they are idle for a long time.
  578. #: - USER_ERROR: Worker exits because of user error.
  579. #: E.g., execptions from the actor initialization.
  580. #: - INTENDED_USER_EXIT: Intended exit from users (e.g., users exit
  581. #: workers with exit code 0 or exit initated by Ray API such as ray.kill).
  582. exit_type: Optional[TypeWorkerExitType] = state_column(filterable=True)
  583. #: The node id of the worker.
  584. node_id: str = state_column(filterable=True)
  585. #: The ip address of the worker.
  586. ip: str = state_column(filterable=True)
  587. #: The pid of the worker.
  588. pid: int = state_column(filterable=True)
  589. #: The exit detail of the worker if the worker is dead.
  590. exit_detail: Optional[str] = state_column(detail=True, filterable=False)
  591. #: The time worker is first launched.
  592. #: -1 if the value doesn't exist.
  593. #: The lifecycle of worker is as follow.
  594. #: worker_launch_time_ms (process startup requested).
  595. #: -> worker_launched_time_ms (process started).
  596. #: -> start_time_ms (worker is ready to be used).
  597. #: -> end_time_ms (worker is destroyed).
  598. worker_launch_time_ms: Optional[int] = state_column(
  599. filterable=False,
  600. detail=True,
  601. format_fn=lambda x: "" if x == -1 else Humanify.timestamp(x),
  602. )
  603. #: The time worker is successfully launched
  604. #: -1 if the value doesn't exist.
  605. worker_launched_time_ms: Optional[int] = state_column(
  606. filterable=False,
  607. detail=True,
  608. format_fn=lambda x: "" if x == -1 else Humanify.timestamp(x),
  609. )
  610. #: The time when the worker is started and initialized.
  611. #: 0 if the value doesn't exist.
  612. start_time_ms: Optional[int] = state_column(
  613. filterable=False, detail=True, format_fn=Humanify.timestamp
  614. )
  615. #: The time when the worker exits. The timestamp could be delayed
  616. #: if the worker is dead unexpectedly.
  617. #: 0 if the value doesn't exist.
  618. end_time_ms: Optional[int] = state_column(
  619. filterable=False, detail=True, format_fn=Humanify.timestamp
  620. )
  621. # the debugger port of the worker
  622. debugger_port: Optional[int] = state_column(filterable=True, detail=True)
  623. # the number of threads paused in this worker
  624. num_paused_threads: Optional[int] = state_column(filterable=True, detail=True)
  625. @dataclass(init=not IS_PYDANTIC_2)
  626. class ClusterEventState(StateSchema):
  627. severity: str = state_column(filterable=True)
  628. time: str = state_column(filterable=False)
  629. source_type: str = state_column(filterable=True)
  630. message: str = state_column(filterable=False)
  631. event_id: str = state_column(filterable=True)
  632. custom_fields: Optional[dict] = state_column(filterable=False, detail=True)
  633. @dataclass(init=not IS_PYDANTIC_2)
  634. class TaskState(StateSchema):
  635. """Task State"""
  636. #: The id of the task.
  637. task_id: str = state_column(filterable=True)
  638. #: The attempt (retry) number of the task.
  639. attempt_number: int = state_column(filterable=True)
  640. #: The name of the task if it is given by the name argument.
  641. name: str = state_column(filterable=True)
  642. #: The state of the task.
  643. #:
  644. #: Refer to src/ray/protobuf/common.proto for a detailed explanation of the state
  645. #: breakdowns and typical state transition flow.
  646. #:
  647. state: TypeTaskStatus = state_column(filterable=True)
  648. #: The job id of this task.
  649. job_id: str = state_column(filterable=True)
  650. #: The actor id that's associated with this task.
  651. #: It is empty if there's no relevant actors.
  652. actor_id: Optional[str] = state_column(filterable=True)
  653. #: The type of the task.
  654. #:
  655. #: - NORMAL_TASK: Tasks created by `func.remote()``
  656. #: - ACTOR_CREATION_TASK: Actors created by `class.remote()`
  657. #: - ACTOR_TASK: Actor tasks submitted by `actor.method.remote()`
  658. #: - DRIVER_TASK: Driver (A script that calls `ray.init`).
  659. type: TypeTaskType = state_column(filterable=True)
  660. #: The name of the task. If is the name of the function
  661. #: if the type is a task or an actor task.
  662. #: It is the name of the class if it is a actor scheduling task.
  663. func_or_class_name: str = state_column(filterable=True)
  664. #: The parent task id. If the parent is a normal task, it will be the task's id.
  665. #: If the parent runs in a concurrent actor (async actor or threaded actor),
  666. #: it will be the actor's creation task id.
  667. parent_task_id: str = state_column(filterable=True)
  668. #: Id of the node that runs the task. If the task is retried, it could
  669. #: contain the node id of the previous executed task.
  670. #: If empty, it means the task hasn't been scheduled yet.
  671. node_id: Optional[str] = state_column(filterable=True)
  672. #: The worker id that's associated with this task.
  673. worker_id: Optional[str] = state_column(filterable=True)
  674. #: The worker's pid that's associated with this task.
  675. worker_pid: Optional[int] = state_column(filterable=True)
  676. #: Task error type.
  677. error_type: Optional[str] = state_column(filterable=True)
  678. #: The language of the task. E.g., Python, Java, or Cpp.
  679. language: Optional[str] = state_column(detail=True, filterable=True)
  680. #: The required resources to execute the task.
  681. required_resources: Optional[dict] = state_column(detail=True, filterable=False)
  682. #: The runtime environment information for the task.
  683. runtime_env_info: Optional[dict] = state_column(detail=True, filterable=False)
  684. #: The placement group id that's associated with this task.
  685. placement_group_id: Optional[str] = state_column(detail=True, filterable=True)
  686. #: The list of events of the given task.
  687. #: Refer to src/ray/protobuf/common.proto for a detailed explanation of the state
  688. #: breakdowns and typical state transition flow.
  689. events: Optional[List[dict]] = state_column(
  690. detail=True, filterable=False, format_fn=Humanify.events
  691. )
  692. #: The list of profile events of the given task.
  693. profiling_data: Optional[dict] = state_column(detail=True, filterable=False)
  694. #: The time when the task is created. A Unix timestamp in ms.
  695. creation_time_ms: Optional[int] = state_column(
  696. detail=True,
  697. filterable=False,
  698. format_fn=Humanify.timestamp,
  699. )
  700. #: The time when the task starts to run. A Unix timestamp in ms.
  701. start_time_ms: Optional[int] = state_column(
  702. detail=True,
  703. filterable=False,
  704. format_fn=Humanify.timestamp,
  705. )
  706. #: The time when the task is finished or failed. A Unix timestamp in ms.
  707. end_time_ms: Optional[int] = state_column(
  708. detail=True, filterable=False, format_fn=Humanify.timestamp
  709. )
  710. #: The task logs info, e.g. offset into the worker log file when the task
  711. #: starts/finishes.
  712. #: None if the task is from a concurrent actor (e.g. async actor or threaded actor)
  713. task_log_info: Optional[dict] = state_column(detail=True, filterable=False)
  714. #: Task error detail info.
  715. error_message: Optional[str] = state_column(detail=True, filterable=False)
  716. # Is task paused by the debugger
  717. is_debugger_paused: Optional[bool] = state_column(detail=True, filterable=True)
  718. #: The call site of the task.
  719. call_site: Optional[str] = state_column(detail=True, filterable=False)
  720. #: The label selector for the task.
  721. label_selector: Optional[dict] = state_column(detail=True, filterable=False)
  722. @dataclass(init=not IS_PYDANTIC_2)
  723. class ObjectState(StateSchema):
  724. """Object State"""
  725. #: The id of the object.
  726. object_id: str = state_column(filterable=True)
  727. #: The size of the object in mb.
  728. object_size: int = state_column(filterable=True, format_fn=Humanify.memory)
  729. #: The status of the task that creates the object.
  730. #:
  731. #: - NIL: We don't have a status for this task because we are not the owner or the
  732. #: task metadata has already been deleted.
  733. #: - WAITING_FOR_DEPENDENCIES: The task is waiting for its dependencies
  734. #: to be created.
  735. #: - SCHEDULED: All dependencies have been created and the task is
  736. #: scheduled to execute.
  737. #: It could be because the task is waiting for resources,
  738. #: runtime environmenet creation, fetching dependencies to the
  739. #: local node, and etc..
  740. #: - FINISHED: The task finished successfully.
  741. #: - WAITING_FOR_EXECUTION: The task is scheduled properly and
  742. #: waiting for execution. It includes time to deliver the task
  743. #: to the remote worker + queueing time from the execution side.
  744. #: - RUNNING: The task that is running.
  745. task_status: TypeTaskStatus = state_column(filterable=True)
  746. #: The number of times the task has been executed (including the current execution)
  747. attempt_number: int = state_column(filterable=True)
  748. #: The reference type of the object.
  749. #: See :ref:`Debugging with Ray Memory <debug-with-ray-memory>` for more details.
  750. #:
  751. #: - ACTOR_HANDLE: The reference is an actor handle.
  752. #: - PINNED_IN_MEMORY: The object is pinned in memory, meaning there's
  753. #: in-flight `ray.get` on this reference.
  754. #: - LOCAL_REFERENCE: There's a local reference (e.g., Python reference)
  755. #: to this object reference. The object won't be GC'ed until all of them is gone.
  756. #: - USED_BY_PENDING_TASK: The object reference is passed to other tasks. E.g.,
  757. #: `a = ray.put()` -> `task.remote(a)`. In this case, a is used by a
  758. #: pending task `task`.
  759. #: - CAPTURED_IN_OBJECT: The object is serialized by other objects. E.g.,
  760. #: `a = ray.put(1)` -> `b = ray.put([a])`. a is serialized within a list.
  761. #: - UNKNOWN_STATUS: The object ref status is unkonwn.
  762. reference_type: TypeReferenceType = state_column(filterable=True)
  763. #: The callsite of the object.
  764. call_site: str = state_column(filterable=True)
  765. #: The worker type that creates the object.
  766. #:
  767. #: - WORKER: The regular Ray worker process that executes tasks or
  768. #: instantiates an actor.
  769. #: - DRIVER: The driver (Python script that calls `ray.init`).
  770. #: - SPILL_WORKER: The worker that spills objects.
  771. #: - RESTORE_WORKER: The worker that restores objects.
  772. type: TypeWorkerType = state_column(filterable=True)
  773. #: The pid of the owner.
  774. pid: int = state_column(filterable=True)
  775. #: The ip address of the owner.
  776. ip: str = state_column(filterable=True)
  777. @dataclass(init=not IS_PYDANTIC_2)
  778. class RuntimeEnvState(StateSchema):
  779. """Runtime Environment State"""
  780. #: The runtime environment spec.
  781. runtime_env: dict = state_column(filterable=True)
  782. #: Whether or not the runtime env creation has succeeded.
  783. success: bool = state_column(filterable=True)
  784. #: The latency of creating the runtime environment.
  785. #: Available if the runtime env is successfully created.
  786. creation_time_ms: Optional[float] = state_column(
  787. filterable=False, format_fn=Humanify.timestamp
  788. )
  789. #: The node id of this runtime environment.
  790. node_id: str = state_column(filterable=True)
  791. #: The number of actors and tasks that use this runtime environment.
  792. ref_cnt: Optional[int] = state_column(detail=True, filterable=False)
  793. #: The error message if the runtime environment creation has failed.
  794. #: Available if the runtime env is failed to be created.
  795. error: Optional[str] = state_column(detail=True, filterable=True)
  796. AVAILABLE_STATES = [
  797. ActorState,
  798. PlacementGroupState,
  799. NodeState,
  800. WorkerState,
  801. JobState,
  802. TaskState,
  803. ObjectState,
  804. RuntimeEnvState,
  805. ]
  806. for state in AVAILABLE_STATES:
  807. if len(state.filterable_columns()) > 0:
  808. filterable_cols = "\n\n ".join(state.filterable_columns())
  809. state.__doc__ += f"""
  810. \nBelow columns can be used for the `--filter` option.
  811. \n
  812. {filterable_cols}
  813. \n
  814. """
  815. if len(state.detail_columns()) > 0:
  816. detail_cols = "\n\n ".join(state.detail_columns())
  817. state.__doc__ += f"""
  818. \nBelow columns are available only when `get` API is used,
  819. \n`--detail` is specified through CLI, or `detail=True` is given to Python APIs.
  820. \n
  821. \n
  822. {detail_cols}
  823. \n
  824. """
  825. @dataclass(init=not IS_PYDANTIC_2)
  826. class ListApiResponse:
  827. # NOTE(rickyyx): We currently perform hard truncation when querying
  828. # resources which could have a large number (e.g. asking raylets for
  829. # the number of all objects).
  830. # The returned of resources seen by the user will go through from the
  831. # below funnel:
  832. # - total
  833. # | With truncation at the data source if the number of returned
  834. # | resource exceeds `RAY_MAX_LIMIT_FROM_DATA_SOURCE`
  835. # v
  836. # - num_after_truncation
  837. # | With filtering at the state API server
  838. # v
  839. # - num_filtered
  840. # | With limiting,
  841. # | set by min(`RAY_MAX_LIMIT_FROM_API_SERER`, <user-supplied limit>)
  842. # v
  843. # - len(result)
  844. # Total number of the available resource from the cluster.
  845. total: int
  846. # Number of resources returned by data sources after truncation
  847. num_after_truncation: int
  848. # Number of resources after filtering
  849. num_filtered: int
  850. # Returned data. None if no data is returned.
  851. result: List[Dict]
  852. # List API can have a partial failure if queries to
  853. # all sources fail. For example, getting object states
  854. # require to ping all raylets, and it is possible some of
  855. # them fails. Note that it is impossible to guarantee high
  856. # availability of data because ray's state information is
  857. # not replicated.
  858. partial_failure_warning: Optional[str] = ""
  859. # A list of warnings to print.
  860. warnings: Optional[List[str]] = None
  861. """
  862. Summary API schema
  863. """
  864. DRIVER_TASK_ID_PREFIX = "ffffffffffffffffffffffffffffffffffffffff"
  865. @dataclass(init=not IS_PYDANTIC_2)
  866. class TaskSummaryPerFuncOrClassName:
  867. #: The function or class name of this task.
  868. func_or_class_name: str
  869. #: The type of the class. Equivalent to protobuf TaskType.
  870. type: str
  871. #: State name to the count dict. State name is equivalent to
  872. #: the protobuf TaskStatus.
  873. state_counts: Dict[TypeTaskStatus, int] = field(default_factory=dict)
  874. @dataclass
  875. class Link:
  876. #: The type of entity to link to
  877. type: str
  878. #: The id of the entity to link to
  879. id: str
  880. @dataclass(init=not IS_PYDANTIC_2)
  881. class NestedTaskSummary:
  882. #: The name of this task group
  883. name: str
  884. #: A unique identifier for this group
  885. key: str
  886. #: The type of the class. Equivalent to protobuf TaskType,
  887. #: "ACTOR" if it represents an Actor, or "GROUP" if it's a grouping of tasks.
  888. type: str
  889. #: Unix timestamp to use to sort the task group.
  890. timestamp: Optional[int] = None
  891. #: State name to the count dict. State name is equivalent to
  892. #: the protobuf TaskStatus.
  893. state_counts: Dict[TypeTaskStatus, int] = field(default_factory=dict)
  894. #: The child
  895. children: List["NestedTaskSummary"] = field(default_factory=list)
  896. #: A link to more details about this summary.
  897. link: Optional[Link] = None
  898. @dataclass
  899. class TaskSummaries:
  900. #: Group key -> summary.
  901. #: Right now, we only have func_class_name as a key.
  902. # TODO(sang): Support the task group abstraction.
  903. summary: Union[Dict[str, TaskSummaryPerFuncOrClassName], List[NestedTaskSummary]]
  904. #: Total Ray tasks.
  905. total_tasks: int
  906. #: Total actor tasks.
  907. total_actor_tasks: int
  908. #: Total scheduled actors.
  909. total_actor_scheduled: int
  910. summary_by: str = "func_name"
  911. @classmethod
  912. def to_summary_by_func_name(cls, *, tasks: List[Dict]) -> "TaskSummaries":
  913. # NOTE: The argument tasks contains a list of dictionary
  914. # that have the same k/v as TaskState.
  915. summary = {}
  916. total_tasks = 0
  917. total_actor_tasks = 0
  918. total_actor_scheduled = 0
  919. for task in tasks:
  920. key = task["func_or_class_name"]
  921. if key not in summary:
  922. summary[key] = TaskSummaryPerFuncOrClassName(
  923. func_or_class_name=task["func_or_class_name"],
  924. type=task["type"],
  925. )
  926. task_summary = summary[key]
  927. state = task["state"]
  928. if state not in task_summary.state_counts:
  929. task_summary.state_counts[state] = 0
  930. task_summary.state_counts[state] += 1
  931. type_enum = TaskType.DESCRIPTOR.values_by_name[task["type"]].number
  932. if type_enum == TaskType.NORMAL_TASK:
  933. total_tasks += 1
  934. elif type_enum == TaskType.ACTOR_CREATION_TASK:
  935. total_actor_scheduled += 1
  936. elif type_enum == TaskType.ACTOR_TASK:
  937. total_actor_tasks += 1
  938. return TaskSummaries(
  939. summary=summary,
  940. total_tasks=total_tasks,
  941. total_actor_tasks=total_actor_tasks,
  942. total_actor_scheduled=total_actor_scheduled,
  943. summary_by="func_name",
  944. )
  945. @classmethod
  946. def to_summary_by_lineage(
  947. cls, *, tasks: List[Dict], actors: List[Dict]
  948. ) -> "TaskSummaries":
  949. """
  950. This summarizes tasks by lineage.
  951. i.e. A task will be grouped with another task if they have the
  952. same parent.
  953. This does things in 4 steps.
  954. Step 1: Iterate through all tasks and keep track of them by id and ownership
  955. Step 2: Put the tasks in a tree structure based on ownership
  956. Step 3: Merge together siblings in the tree if there are more
  957. than one with the same name.
  958. Step 4: Sort by running and then errored and then successful tasks
  959. Step 5: Total the children
  960. This can probably be more efficient if we merge together some steps to
  961. reduce the amount of iterations but this algorithm produces very easy to
  962. understand code. We can optimize in the future.
  963. """
  964. # NOTE: The argument tasks contains a list of dictionary
  965. # that have the same k/v as TaskState.
  966. tasks_by_id = {}
  967. task_group_by_id = {}
  968. actor_creation_task_id_for_actor_id = {}
  969. summary = []
  970. total_tasks = 0
  971. total_actor_tasks = 0
  972. total_actor_scheduled = 0
  973. # Step 1
  974. # We cannot assume that a parent task always comes before the child task
  975. # So we need to keep track of all tasks by ids so we can quickly find the
  976. # parent.
  977. # We also track the actor creation tasks so we can quickly figure out the
  978. # ownership of actors.
  979. for task in tasks:
  980. tasks_by_id[task["task_id"]] = task
  981. type_enum = TaskType.DESCRIPTOR.values_by_name[task["type"]].number
  982. if type_enum == TaskType.ACTOR_CREATION_TASK:
  983. actor_creation_task_id_for_actor_id[task["actor_id"]] = task["task_id"]
  984. actor_dict = {actor["actor_id"]: actor for actor in actors}
  985. def get_or_create_task_group(task_id: str) -> Optional[NestedTaskSummary]:
  986. """
  987. Gets an already created task_group
  988. OR
  989. Creates a task group and puts it in the right place under its parent.
  990. For actor tasks, the parent is the Actor that owns it. For all other
  991. tasks, the owner is the driver or task that created it.
  992. Returns None if there is missing data about the task or one of its parents.
  993. For task groups that represents actors, the id is in the
  994. format actor:{actor_id}
  995. """
  996. if task_id in task_group_by_id:
  997. return task_group_by_id[task_id]
  998. task = tasks_by_id.get(task_id)
  999. if not task:
  1000. logger.debug(f"We're missing data about {task_id}")
  1001. # We're missing data about this parent. So we're dropping the whole
  1002. # tree at that node.
  1003. return None
  1004. # Use name first which allows users to customize the name of
  1005. # their remote function call using the name option.
  1006. func_name = task["name"] or task["func_or_class_name"]
  1007. task_id = task["task_id"]
  1008. type_enum = TaskType.DESCRIPTOR.values_by_name[task["type"]].number
  1009. task_group_by_id[task_id] = NestedTaskSummary(
  1010. name=func_name,
  1011. key=task_id,
  1012. type=task["type"],
  1013. timestamp=task["creation_time_ms"],
  1014. link=Link(type="task", id=task_id),
  1015. )
  1016. # Set summary in right place under parent
  1017. if (
  1018. type_enum == TaskType.ACTOR_TASK
  1019. or type_enum == TaskType.ACTOR_CREATION_TASK
  1020. ):
  1021. # For actor tasks, the parent is the actor and not the parent task.
  1022. parent_task_group = get_or_create_actor_task_group(task["actor_id"])
  1023. if parent_task_group:
  1024. parent_task_group.children.append(task_group_by_id[task_id])
  1025. else:
  1026. parent_task_id = task["parent_task_id"]
  1027. if not parent_task_id or parent_task_id.startswith(
  1028. DRIVER_TASK_ID_PREFIX
  1029. ):
  1030. summary.append(task_group_by_id[task_id])
  1031. else:
  1032. parent_task_group = get_or_create_task_group(parent_task_id)
  1033. if parent_task_group:
  1034. parent_task_group.children.append(task_group_by_id[task_id])
  1035. return task_group_by_id[task_id]
  1036. def get_or_create_actor_task_group(
  1037. actor_id: str,
  1038. ) -> Optional[NestedTaskSummary]:
  1039. """
  1040. Gets an existing task group that represents an actor.
  1041. OR
  1042. Creates a task group that represents an actor. The owner of the actor is
  1043. the parent of the creation_task that created that actor.
  1044. Returns None if there is missing data about the actor or one of its parents.
  1045. """
  1046. key = f"actor:{actor_id}"
  1047. actor = actor_dict.get(actor_id)
  1048. if key not in task_group_by_id:
  1049. creation_task_id = actor_creation_task_id_for_actor_id.get(actor_id)
  1050. creation_task = tasks_by_id.get(creation_task_id)
  1051. if not creation_task:
  1052. logger.debug(f"We're missing data about actor {actor_id}")
  1053. # We're missing data about the parent. So we're dropping the whole
  1054. # tree at that node.
  1055. return None
  1056. # TODO(rickyx)
  1057. # We are using repr name for grouping actors if exists,
  1058. # else use class name. We should be using some group_name in the future.
  1059. if actor is None:
  1060. logger.debug(
  1061. f"We are missing actor info for actor {actor_id}, "
  1062. f"even though creation task exists: {creation_task}"
  1063. )
  1064. [actor_name, *rest] = creation_task["func_or_class_name"].split(".")
  1065. else:
  1066. actor_name = (
  1067. actor["repr_name"]
  1068. if actor["repr_name"]
  1069. else actor["class_name"]
  1070. )
  1071. task_group_by_id[key] = NestedTaskSummary(
  1072. name=actor_name,
  1073. key=key,
  1074. type="ACTOR",
  1075. timestamp=task["creation_time_ms"],
  1076. link=Link(type="actor", id=actor_id),
  1077. )
  1078. parent_task_id = creation_task["parent_task_id"]
  1079. if not parent_task_id or parent_task_id.startswith(
  1080. DRIVER_TASK_ID_PREFIX
  1081. ):
  1082. summary.append(task_group_by_id[key])
  1083. else:
  1084. parent_task_group = get_or_create_task_group(parent_task_id)
  1085. if parent_task_group:
  1086. parent_task_group.children.append(task_group_by_id[key])
  1087. return task_group_by_id[key]
  1088. # Step 2: Create the tree structure based on ownership
  1089. for task in tasks:
  1090. task_id = task["task_id"]
  1091. task_group = get_or_create_task_group(task_id)
  1092. if not task_group:
  1093. # We are probably missing data about this task or one of its parents.
  1094. continue
  1095. state = task["state"]
  1096. if state not in task_group.state_counts:
  1097. task_group.state_counts[state] = 0
  1098. task_group.state_counts[state] += 1
  1099. type_enum = TaskType.DESCRIPTOR.values_by_name[task["type"]].number
  1100. if type_enum == TaskType.NORMAL_TASK:
  1101. total_tasks += 1
  1102. elif type_enum == TaskType.ACTOR_CREATION_TASK:
  1103. total_actor_scheduled += 1
  1104. elif type_enum == TaskType.ACTOR_TASK:
  1105. total_actor_tasks += 1
  1106. def merge_sibings_for_task_group(
  1107. siblings: List[NestedTaskSummary],
  1108. ) -> Tuple[List[NestedTaskSummary], Optional[int]]:
  1109. """
  1110. Merges task summaries with the same name into a group if there are more than
  1111. one child with that name.
  1112. Args:
  1113. siblings: A list of NestedTaskSummary's to merge together
  1114. Returns
  1115. Index 0: A list of NestedTaskSummary's which have been merged
  1116. Index 1: The smallest timestamp amongst the siblings
  1117. """
  1118. if not len(siblings):
  1119. return siblings, None
  1120. # Group by name
  1121. groups = {}
  1122. min_timestamp = None
  1123. for child in siblings:
  1124. child.children, child_min_timestamp = merge_sibings_for_task_group(
  1125. child.children
  1126. )
  1127. if child_min_timestamp and child_min_timestamp < (
  1128. child.timestamp or sys.maxsize
  1129. ):
  1130. child.timestamp = child_min_timestamp
  1131. if child.name not in groups:
  1132. groups[child.name] = NestedTaskSummary(
  1133. name=child.name,
  1134. key=child.name,
  1135. type="GROUP",
  1136. )
  1137. groups[child.name].children.append(child)
  1138. if child.timestamp and child.timestamp < (
  1139. groups[child.name].timestamp or sys.maxsize
  1140. ):
  1141. groups[child.name].timestamp = child.timestamp
  1142. if child.timestamp < (min_timestamp or sys.maxsize):
  1143. min_timestamp = child.timestamp
  1144. # Take the groups that have more than one children and return it.
  1145. # For groups with just one child, return the child itself instead of
  1146. # creating a group.
  1147. return [
  1148. group if len(group.children) > 1 else group.children[0]
  1149. for group in groups.values()
  1150. ], min_timestamp
  1151. # Step 3
  1152. summary, _ = merge_sibings_for_task_group(summary)
  1153. def get_running_tasks_count(task_group: NestedTaskSummary) -> int:
  1154. return (
  1155. task_group.state_counts.get("RUNNING", 0)
  1156. + task_group.state_counts.get("RUNNING_IN_RAY_GET", 0)
  1157. + task_group.state_counts.get("RUNNING_IN_RAY_WAIT", 0)
  1158. )
  1159. def get_pending_tasks_count(task_group: NestedTaskSummary) -> int:
  1160. return (
  1161. task_group.state_counts.get("PENDING_ARGS_AVAIL", 0)
  1162. + task_group.state_counts.get("PENDING_NODE_ASSIGNMENT", 0)
  1163. + task_group.state_counts.get("PENDING_OBJ_STORE_MEM_AVAIL", 0)
  1164. + task_group.state_counts.get("PENDING_ARGS_FETCH", 0)
  1165. )
  1166. def sort_task_groups(task_groups: List[NestedTaskSummary]) -> None:
  1167. # Sort by running tasks, pending tasks, failed tasks, timestamp,
  1168. # and actor_creation_task
  1169. # Put actor creation tasks above other tasks with the same timestamp
  1170. task_groups.sort(key=lambda x: 0 if x.type == "ACTOR_CREATION_TASK" else 1)
  1171. task_groups.sort(key=lambda x: x.timestamp or sys.maxsize)
  1172. task_groups.sort(
  1173. key=lambda x: x.state_counts.get("FAILED", 0), reverse=True
  1174. )
  1175. task_groups.sort(key=get_pending_tasks_count, reverse=True)
  1176. task_groups.sort(key=get_running_tasks_count, reverse=True)
  1177. def calc_total_for_task_group(
  1178. task_group: NestedTaskSummary,
  1179. ) -> NestedTaskSummary:
  1180. """
  1181. Calculates the total of a group as the sum of all children.
  1182. Sorts children by timestamp
  1183. """
  1184. if not len(task_group.children):
  1185. return task_group
  1186. for child in task_group.children:
  1187. totaled = calc_total_for_task_group(child)
  1188. for state, count in totaled.state_counts.items():
  1189. task_group.state_counts[state] = (
  1190. task_group.state_counts.get(state, 0) + count
  1191. )
  1192. sort_task_groups(task_group.children)
  1193. return task_group
  1194. # Step 4
  1195. summary = [calc_total_for_task_group(task_group) for task_group in summary]
  1196. sort_task_groups(summary)
  1197. return TaskSummaries(
  1198. summary=summary,
  1199. total_tasks=total_tasks,
  1200. total_actor_tasks=total_actor_tasks,
  1201. total_actor_scheduled=total_actor_scheduled,
  1202. summary_by="lineage",
  1203. )
  1204. @dataclass(init=not IS_PYDANTIC_2)
  1205. class ActorSummaryPerClass:
  1206. #: The class name of the actor.
  1207. class_name: str
  1208. #: State name to the count dict. State name is equivalent to
  1209. #: the protobuf ActorState.
  1210. state_counts: Dict[TypeActorStatus, int] = field(default_factory=dict)
  1211. @dataclass
  1212. class ActorSummaries:
  1213. #: Group key (actor class name) -> summary
  1214. summary: Dict[str, ActorSummaryPerClass]
  1215. #: Total number of actors
  1216. total_actors: int
  1217. summary_by: str = "class"
  1218. @classmethod
  1219. def to_summary(cls, *, actors: List[Dict]):
  1220. # NOTE: The argument tasks contains a list of dictionary
  1221. # that have the same k/v as ActorState.
  1222. summary = {}
  1223. total_actors = 0
  1224. for actor in actors:
  1225. key = actor["class_name"]
  1226. if key not in summary:
  1227. summary[key] = ActorSummaryPerClass(
  1228. class_name=actor["class_name"],
  1229. )
  1230. actor_summary = summary[key]
  1231. state = actor["state"]
  1232. if state not in actor_summary.state_counts:
  1233. actor_summary.state_counts[state] = 0
  1234. actor_summary.state_counts[state] += 1
  1235. total_actors += 1
  1236. return ActorSummaries(
  1237. summary=summary,
  1238. total_actors=total_actors,
  1239. )
  1240. @dataclass(init=not IS_PYDANTIC_2)
  1241. class ObjectSummaryPerKey:
  1242. #: Total number of objects of the type.
  1243. total_objects: int
  1244. #: Total size in mb.
  1245. total_size_mb: float
  1246. #: Total number of workers that reference the type of objects.
  1247. total_num_workers: int
  1248. #: Total number of nodes that reference the type of objects.
  1249. total_num_nodes: int
  1250. #: State name to the count dict. State name is equivalent to
  1251. #: ObjectState.
  1252. task_state_counts: Dict[TypeTaskStatus, int] = field(default_factory=dict)
  1253. #: Attempt number to the count dict. The attempt number include the current
  1254. #: execution
  1255. task_attempt_number_counts: Dict[str, int] = field(default_factory=dict)
  1256. #: Ref count type to the count dict. State name is equivalent to
  1257. #: ObjectState.
  1258. ref_type_counts: Dict[TypeReferenceType, int] = field(default_factory=dict)
  1259. @dataclass
  1260. class ObjectSummaries:
  1261. #: Group key (actor class name) -> summary
  1262. summary: Dict[str, ObjectSummaryPerKey]
  1263. #: Total number of referenced objects in the cluster.
  1264. total_objects: int
  1265. #: Total size of referenced objects in the cluster in MB.
  1266. total_size_mb: float
  1267. #: Whether or not the callsite collection is enabled.
  1268. callsite_enabled: bool
  1269. summary_by: str = "callsite"
  1270. @classmethod
  1271. def to_summary(cls, *, objects: List[Dict]):
  1272. # NOTE: The argument tasks contains a list of dictionary
  1273. # that have the same k/v as ObjectState.
  1274. summary = {}
  1275. total_objects = 0
  1276. total_size_mb = 0
  1277. key_to_workers = {}
  1278. key_to_nodes = {}
  1279. callsite_enabled = True
  1280. for object in objects:
  1281. key = object["call_site"]
  1282. if key == "disabled":
  1283. callsite_enabled = False
  1284. if key not in summary:
  1285. summary[key] = ObjectSummaryPerKey(
  1286. total_objects=0,
  1287. total_size_mb=0,
  1288. total_num_workers=0,
  1289. total_num_nodes=0,
  1290. )
  1291. key_to_workers[key] = set()
  1292. key_to_nodes[key] = set()
  1293. object_summary = summary[key]
  1294. task_state = object["task_status"]
  1295. if task_state not in object_summary.task_state_counts:
  1296. object_summary.task_state_counts[task_state] = 0
  1297. object_summary.task_state_counts[task_state] += 1
  1298. attempt_number = str(object["attempt_number"])
  1299. if attempt_number not in object_summary.task_attempt_number_counts:
  1300. object_summary.task_attempt_number_counts[attempt_number] = 0
  1301. object_summary.task_attempt_number_counts[attempt_number] += 1
  1302. ref_type = object["reference_type"]
  1303. if ref_type not in object_summary.ref_type_counts:
  1304. object_summary.ref_type_counts[ref_type] = 0
  1305. object_summary.ref_type_counts[ref_type] += 1
  1306. object_summary.total_objects += 1
  1307. total_objects += 1
  1308. size_bytes = object["object_size"]
  1309. # object_size's unit is byte by default. It is -1, if the size is
  1310. # unknown.
  1311. if size_bytes != -1:
  1312. object_summary.total_size_mb += size_bytes / 1024**2
  1313. total_size_mb += size_bytes / 1024**2
  1314. key_to_workers[key].add(object["pid"])
  1315. key_to_nodes[key].add(object["ip"])
  1316. # Convert set of pid & node ips to length.
  1317. for key, workers in key_to_workers.items():
  1318. summary[key].total_num_workers = len(workers)
  1319. for key, nodes in key_to_nodes.items():
  1320. summary[key].total_num_nodes = len(nodes)
  1321. return ObjectSummaries(
  1322. summary=summary,
  1323. total_objects=total_objects,
  1324. total_size_mb=total_size_mb,
  1325. callsite_enabled=callsite_enabled,
  1326. )
  1327. @dataclass(init=not IS_PYDANTIC_2)
  1328. class StateSummary:
  1329. #: Node ID -> summary per node
  1330. #: If the data is not required to be orgnized per node, it will contain
  1331. #: a single key, "cluster".
  1332. node_id_to_summary: Dict[str, Union[TaskSummaries, ActorSummaries, ObjectSummaries]]
  1333. @dataclass(init=not IS_PYDANTIC_2)
  1334. class SummaryApiResponse:
  1335. # Carried over from ListApiResponse
  1336. # We currently use list API for listing the resources
  1337. total: int
  1338. # Carried over from ListApiResponse
  1339. # Number of resources returned by data sources after truncation
  1340. num_after_truncation: int
  1341. # Number of resources after filtering
  1342. num_filtered: int
  1343. result: StateSummary = None
  1344. partial_failure_warning: Optional[str] = ""
  1345. # A list of warnings to print.
  1346. warnings: Optional[List[str]] = None
  1347. def resource_to_schema(resource: StateResource) -> StateSchema:
  1348. if resource == StateResource.ACTORS:
  1349. return ActorState
  1350. elif resource == StateResource.JOBS:
  1351. return JobState
  1352. elif resource == StateResource.NODES:
  1353. return NodeState
  1354. elif resource == StateResource.OBJECTS:
  1355. return ObjectState
  1356. elif resource == StateResource.PLACEMENT_GROUPS:
  1357. return PlacementGroupState
  1358. elif resource == StateResource.RUNTIME_ENVS:
  1359. return RuntimeEnvState
  1360. elif resource == StateResource.TASKS:
  1361. return TaskState
  1362. elif resource == StateResource.WORKERS:
  1363. return WorkerState
  1364. elif resource == StateResource.CLUSTER_EVENTS:
  1365. return ClusterEventState
  1366. else:
  1367. assert False, "Unreachable"
  1368. def protobuf_message_to_dict(
  1369. message,
  1370. fields_to_decode: List[str],
  1371. preserving_proto_field_name: bool = True,
  1372. ) -> dict:
  1373. """Convert a protobuf message to dict
  1374. Args:
  1375. fields_to_decode: field names which will be decoded from binary to hex.
  1376. preserving_proto_field_name: a pass-through option for protobuf message
  1377. method. See google.protobuf MessageToDict
  1378. Return:
  1379. Dictionary of the converted rpc protobuf.
  1380. """
  1381. return dashboard_utils.message_to_dict(
  1382. message,
  1383. fields_to_decode,
  1384. always_print_fields_with_no_presence=True,
  1385. preserving_proto_field_name=preserving_proto_field_name,
  1386. )
  1387. def protobuf_to_task_state_dict(message: TaskEvents) -> dict:
  1388. """
  1389. Convert a TaskEvents to a dic repr of `TaskState`
  1390. """
  1391. task_attempt = protobuf_message_to_dict(
  1392. message=message,
  1393. fields_to_decode=[
  1394. "task_id",
  1395. "job_id",
  1396. "node_id",
  1397. "actor_id",
  1398. "parent_task_id",
  1399. "worker_id",
  1400. "placement_group_id",
  1401. "component_id",
  1402. ],
  1403. )
  1404. task_state = {}
  1405. task_info = task_attempt.get("task_info", {})
  1406. state_updates = task_attempt.get("state_updates", {})
  1407. profiling_data = task_attempt.get("profile_events", {})
  1408. if profiling_data:
  1409. for event in profiling_data["events"]:
  1410. # End/start times are recorded in ns. We convert them to ms.
  1411. event["end_time"] = int(event["end_time"]) / 1e6
  1412. event["start_time"] = int(event["start_time"]) / 1e6
  1413. event["extra_data"] = json.loads(event["extra_data"])
  1414. task_state["profiling_data"] = profiling_data
  1415. # Convert those settable fields
  1416. mappings = [
  1417. (
  1418. task_info,
  1419. [
  1420. "task_id",
  1421. "name",
  1422. "actor_id",
  1423. "type",
  1424. "func_or_class_name",
  1425. "language",
  1426. "required_resources",
  1427. "runtime_env_info",
  1428. "parent_task_id",
  1429. "placement_group_id",
  1430. "call_site",
  1431. "label_selector",
  1432. ],
  1433. ),
  1434. (task_attempt, ["task_id", "attempt_number", "job_id"]),
  1435. (
  1436. state_updates,
  1437. [
  1438. "node_id",
  1439. "worker_id",
  1440. "task_log_info",
  1441. "actor_repr_name",
  1442. "worker_pid",
  1443. "is_debugger_paused",
  1444. ],
  1445. ),
  1446. ]
  1447. for src, keys in mappings:
  1448. for key in keys:
  1449. task_state[key] = src.get(key)
  1450. task_state["creation_time_ms"] = None
  1451. task_state["start_time_ms"] = None
  1452. task_state["end_time_ms"] = None
  1453. events = []
  1454. if "state_ts_ns" in state_updates:
  1455. state_ts_ns = state_updates["state_ts_ns"]
  1456. for state_name, state in TaskStatus.items():
  1457. # state_ts_ns is Map[str, str] after protobuf MessageToDict
  1458. key = str(state)
  1459. if key in state_ts_ns:
  1460. # timestamp is recorded as nanosecond from the backend.
  1461. # We need to convert it to the second.
  1462. ts_ms = int(state_ts_ns[key]) // 1e6
  1463. events.append(
  1464. {
  1465. "state": state_name,
  1466. "created_ms": ts_ms,
  1467. }
  1468. )
  1469. if state == TaskStatus.PENDING_ARGS_AVAIL:
  1470. task_state["creation_time_ms"] = ts_ms
  1471. if state == TaskStatus.RUNNING:
  1472. task_state["start_time_ms"] = ts_ms
  1473. if state == TaskStatus.FINISHED or state == TaskStatus.FAILED:
  1474. task_state["end_time_ms"] = ts_ms
  1475. task_state["events"] = events
  1476. if len(events) > 0:
  1477. latest_state = events[-1]["state"]
  1478. else:
  1479. latest_state = "NIL"
  1480. task_state["state"] = latest_state
  1481. # Parse error info
  1482. if latest_state == "FAILED":
  1483. error_info = state_updates.get("error_info", None)
  1484. if error_info:
  1485. # We captured colored error message printed to console, e.g.
  1486. # "\x1b[31mTraceback (most recent call last):\x1b[0m",
  1487. # this is to remove the ANSI escape codes.
  1488. task_state["error_message"] = remove_ansi_escape_codes(
  1489. error_info.get("error_message", "")
  1490. )
  1491. task_state["error_type"] = error_info.get("error_type", "")
  1492. # Parse actor task name for actor with repr name.
  1493. if (
  1494. state_updates.get("actor_repr_name")
  1495. and task_state["type"] == "ACTOR_TASK"
  1496. and task_state["name"]
  1497. == task_state["func_or_class_name"] # no name option provided.
  1498. ):
  1499. # If it's an actor task with no name override, and has repr name defined
  1500. # for the actor, we override the name.
  1501. method_name = task_state["name"].split(".")[-1]
  1502. actor_repr_task_name = f"{state_updates['actor_repr_name']}.{method_name}"
  1503. task_state["name"] = actor_repr_task_name
  1504. return task_state
  1505. def remove_ansi_escape_codes(text: str) -> str:
  1506. """Remove ANSI escape codes from a string."""
  1507. import re
  1508. return re.sub(r"\x1b[^m]*m", "", text)
  1509. def dict_to_state(d: Dict, state_resource: StateResource) -> StateSchema:
  1510. """Convert a dict to a state schema.
  1511. Args:
  1512. d: a dict to convert.
  1513. state_resource: the state resource to convert to.
  1514. Returns:
  1515. A state schema.
  1516. """
  1517. try:
  1518. return resource_to_schema(state_resource)(**d)
  1519. except Exception as e:
  1520. raise RayStateApiException(f"Failed to convert {d} to StateSchema: {e}") from e