api.py 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470
  1. import logging
  2. import threading
  3. import urllib
  4. import warnings
  5. from contextlib import contextmanager
  6. from dataclasses import fields
  7. from typing import Any, Dict, Generator, List, Optional, Tuple, Union
  8. import requests
  9. import ray
  10. from ray._private.authentication.http_token_authentication import (
  11. get_auth_headers_if_auth_enabled,
  12. )
  13. from ray.dashboard.modules.dashboard_sdk import SubmissionClient
  14. from ray.dashboard.utils import (
  15. get_address_for_submission_client,
  16. ray_address_to_api_server_url,
  17. )
  18. from ray.util.annotations import DeveloperAPI
  19. from ray.util.state.common import (
  20. DEFAULT_LIMIT,
  21. DEFAULT_RPC_TIMEOUT,
  22. ActorState,
  23. ClusterEventState,
  24. GetApiOptions,
  25. GetLogOptions,
  26. JobState,
  27. ListApiOptions,
  28. NodeState,
  29. ObjectState,
  30. PlacementGroupState,
  31. PredicateType,
  32. RuntimeEnvState,
  33. StateResource,
  34. SummaryApiOptions,
  35. SummaryResource,
  36. SupportedFilterType,
  37. TaskState,
  38. WorkerState,
  39. dict_to_state,
  40. )
  41. from ray.util.state.exception import RayStateApiException, ServerUnavailable
  42. logger = logging.getLogger(__name__)
  43. _MAX_HTTP_RESPONSE_EXCEPTION_TEXT = 500
  44. @contextmanager
  45. def warnings_on_slow_request(
  46. *, address: str, endpoint: str, timeout: float, explain: bool
  47. ):
  48. """A context manager to print warnings if the request is replied slowly.
  49. Warnings are printed 3 times
  50. Args:
  51. address: The address of the endpoint.
  52. endpoint: The name of the endpoint.
  53. timeout: Request timeout in seconds.
  54. explain: Whether ot not it will print the warning.
  55. """
  56. # Do nothing if explain is not specified.
  57. if not explain:
  58. yield
  59. return
  60. # Prepare timers to print warning.
  61. # Print 3 times with exponential backoff. timeout / 2, timeout / 4, timeout / 8
  62. def print_warning(elapsed: float):
  63. logger.info(
  64. f"({round(elapsed, 2)} / {timeout} seconds) "
  65. "Waiting for the response from the API server "
  66. f"address {address}{endpoint}.",
  67. )
  68. warning_timers = [
  69. threading.Timer(timeout / i, print_warning, args=[timeout / i])
  70. for i in [2, 4, 8]
  71. ]
  72. try:
  73. for timer in warning_timers:
  74. timer.start()
  75. yield
  76. finally:
  77. # Make sure all timers are cancelled once request is terminated.
  78. for timer in warning_timers:
  79. timer.cancel()
  80. """
  81. This file contains API client and methods for querying ray state.
  82. Usage:
  83. 1. [Recommended] With StateApiClient:
  84. ```
  85. client = StateApiClient(address="auto")
  86. data = client.list(StateResource.NODES)
  87. ...
  88. ```
  89. 2. With SDK APIs:
  90. The API creates a `StateApiClient` for each invocation. So if multiple
  91. invocations of listing are used, it is better to reuse the `StateApiClient`
  92. as suggested above.
  93. ```
  94. data = list_nodes(address="auto")
  95. ```
  96. """
  97. @DeveloperAPI
  98. class StateApiClient(SubmissionClient):
  99. """State API Client issues REST GET requests to the server for resource states."""
  100. def __init__(
  101. self,
  102. address: Optional[str] = None,
  103. cookies: Optional[Dict[str, Any]] = None,
  104. headers: Optional[Dict[str, Any]] = None,
  105. ):
  106. """Initialize a StateApiClient and check the connection to the cluster.
  107. Args:
  108. address: Ray bootstrap address (e.g. `127.0.0.0:6379`, `auto`), or Ray
  109. Client address (e.g. `ray://<head-node-ip>:10001`), or Ray dashboard
  110. address (e.g. `http://<head-node-ip>:8265`).
  111. If not provided, it will be detected automatically from any running
  112. local Ray cluster.
  113. cookies: Cookies to use when sending requests to the HTTP job server.
  114. headers: Headers to use when sending requests to the HTTP job server, used
  115. for cases like authentication to a remote cluster.
  116. """
  117. if requests is None:
  118. raise RuntimeError(
  119. "The Ray state CLI & SDK require the ray[default] "
  120. "installation: `pip install 'ray[default']``"
  121. )
  122. if not headers:
  123. headers = {"Content-Type": "application/json"}
  124. # Resolve API server URL
  125. api_server_url = get_address_for_submission_client(address)
  126. super().__init__(
  127. address=api_server_url,
  128. create_cluster_if_needed=False,
  129. headers=headers,
  130. cookies=cookies,
  131. )
  132. @classmethod
  133. def _make_param(cls, options: Union[ListApiOptions, GetApiOptions]) -> Dict:
  134. options_dict = {}
  135. for field in fields(options):
  136. # TODO(rickyyx): We will need to find a way to pass server side timeout
  137. # TODO(rickyyx): We will have to convert filter option
  138. # slightly differently for now. But could we do k,v pair rather than this?
  139. # I see we are also converting dict to XXXApiOptions later on, we could
  140. # probably organize the marshaling a bit better.
  141. if field.name == "filters":
  142. options_dict["filter_keys"] = []
  143. options_dict["filter_predicates"] = []
  144. options_dict["filter_values"] = []
  145. for filter in options.filters:
  146. if len(filter) != 3:
  147. raise ValueError(
  148. f"The given filter has incorrect input type, {filter}. "
  149. "Provide (key, predicate, value) tuples."
  150. )
  151. filter_k, filter_predicate, filter_val = filter
  152. options_dict["filter_keys"].append(filter_k)
  153. options_dict["filter_predicates"].append(filter_predicate)
  154. options_dict["filter_values"].append(filter_val)
  155. continue
  156. option_val = getattr(options, field.name)
  157. if option_val is not None:
  158. options_dict[field.name] = option_val
  159. return options_dict
  160. def _make_http_get_request(
  161. self,
  162. endpoint: str,
  163. params: Dict,
  164. timeout: float,
  165. _explain: bool = False,
  166. ) -> Dict:
  167. with warnings_on_slow_request(
  168. address=self._address, endpoint=endpoint, timeout=timeout, explain=_explain
  169. ):
  170. # Send a request.
  171. response = None
  172. try:
  173. response = self._do_request(
  174. "GET",
  175. endpoint,
  176. timeout=timeout,
  177. params=params,
  178. )
  179. # If we have a valid JSON error, don't raise a generic exception but
  180. # instead let the caller parse it to raise a more precise exception.
  181. if (
  182. response.status_code == 500
  183. and "application/json"
  184. not in response.headers.get("Content-Type", "")
  185. ):
  186. response.raise_for_status()
  187. except requests.exceptions.RequestException as e:
  188. err_str = f"Failed to make request to {self._address}{endpoint}. "
  189. # Best-effort to give hints to users on potential reasons of connection
  190. # failure.
  191. err_str += (
  192. "Failed to connect to API server. Please check the API server "
  193. "log for details. Make sure dependencies are installed with "
  194. "`pip install ray[default]`. Please also check dashboard is "
  195. "available, and included when starting ray cluster, "
  196. "i.e. `ray start --include-dashboard=True --head`. "
  197. )
  198. if response is None:
  199. raise ServerUnavailable(err_str)
  200. err_str += f"Response(url={response.url},status={response.status_code})"
  201. raise RayStateApiException(err_str) from e
  202. try:
  203. # Process the response.
  204. response = response.json()
  205. except requests.exceptions.JSONDecodeError as e:
  206. raise RayStateApiException(
  207. f"Failed to parse Response(url={response.url}, "
  208. f"status={response.status_code}, text='{response.text[:_MAX_HTTP_RESPONSE_EXCEPTION_TEXT]}')"
  209. ) from e
  210. if response["result"] is False:
  211. raise RayStateApiException(
  212. "API server internal error. See dashboard.log file for more details. "
  213. f"Error: {response['msg']}"
  214. )
  215. # Dictionary of `ListApiResponse` or `SummaryApiResponse`
  216. return response["data"]["result"]
  217. def get(
  218. self,
  219. resource: StateResource,
  220. id: str,
  221. options: Optional[GetApiOptions],
  222. _explain: bool = False,
  223. ) -> Optional[
  224. Union[
  225. ActorState,
  226. PlacementGroupState,
  227. NodeState,
  228. WorkerState,
  229. TaskState,
  230. List[ObjectState],
  231. JobState,
  232. ]
  233. ]:
  234. """Get resources states by id
  235. Args:
  236. resource_name: Resource names, i.e. 'workers', 'actors', 'nodes',
  237. 'placement_groups', 'tasks', 'objects'.
  238. 'jobs' and 'runtime-envs' are not supported yet.
  239. id: ID for the resource, i.e. 'node_id' for nodes.
  240. options: Get options. See `GetApiOptions` for details.
  241. _explain: Print the API information such as API
  242. latency or failed query information.
  243. Returns:
  244. None if not found, and if found:
  245. - ActorState for actors
  246. - PlacementGroupState for placement groups
  247. - NodeState for nodes
  248. - WorkerState for workers
  249. - TaskState for tasks
  250. - JobState for jobs
  251. Empty list for objects if not found, or list of ObjectState for objects
  252. Raises:
  253. Exception: This doesn't catch any exceptions raised when the underlying request
  254. call raises exceptions. For example, it could raise `requests.Timeout`
  255. when timeout occurs.
  256. ValueError:
  257. if the resource could not be GET by id, i.e. jobs and runtime-envs.
  258. """
  259. # TODO(rickyyx): Make GET not using filters on list operation
  260. params = self._make_param(options)
  261. RESOURCE_ID_KEY_NAME = {
  262. StateResource.NODES: "node_id",
  263. StateResource.ACTORS: "actor_id",
  264. StateResource.PLACEMENT_GROUPS: "placement_group_id",
  265. StateResource.WORKERS: "worker_id",
  266. StateResource.TASKS: "task_id",
  267. StateResource.OBJECTS: "object_id",
  268. StateResource.JOBS: "submission_id",
  269. }
  270. if resource not in RESOURCE_ID_KEY_NAME:
  271. raise ValueError(f"Can't get {resource.name} by id.")
  272. params["filter_keys"] = [RESOURCE_ID_KEY_NAME[resource]]
  273. params["filter_predicates"] = ["="]
  274. params["filter_values"] = [id]
  275. params["detail"] = True
  276. endpoint = f"/api/v0/{resource.value}"
  277. list_api_response = self._make_http_get_request(
  278. endpoint=endpoint,
  279. params=params,
  280. timeout=options.timeout,
  281. _explain=_explain,
  282. )
  283. result = list_api_response["result"]
  284. # Empty result
  285. if len(result) == 0:
  286. return None
  287. result = [dict_to_state(d, resource) for d in result]
  288. if resource == StateResource.OBJECTS:
  289. # NOTE(rickyyx):
  290. # There might be multiple object entries for a single object id
  291. # because a single object could be referenced at different places
  292. # e.g. pinned as local variable, used as parameter
  293. return result
  294. if resource == StateResource.TASKS:
  295. # There might be multiple task attempts given a task id due to
  296. # task retries.
  297. if len(result) == 1:
  298. return result[0]
  299. return result
  300. # For the rest of the resources, there should only be a single entry
  301. # for a particular id.
  302. assert len(result) == 1
  303. return result[0]
  304. def _print_api_warning(
  305. self,
  306. resource: StateResource,
  307. api_response: dict,
  308. warn_data_source_not_available: bool = True,
  309. warn_data_truncation: bool = True,
  310. warn_limit: bool = True,
  311. warn_server_side_warnings: bool = True,
  312. ):
  313. """Print the API warnings.
  314. Args:
  315. resource: Resource names, i.e. 'jobs', 'actors', 'nodes',
  316. see `StateResource` for details.
  317. api_response: The dictionarified `ListApiResponse` or `SummaryApiResponse`.
  318. warn_data_source_not_available: Warn when some data sources
  319. are not available.
  320. warn_data_truncation: Warn when results were truncated at
  321. the data source.
  322. warn_limit: Warn when results were limited.
  323. warn_server_side_warnings: Warn when the server side generates warnings
  324. (E.g., when callsites not enabled for listing objects)
  325. """
  326. # Print warnings if anything was given.
  327. if warn_data_source_not_available:
  328. warning_msgs = api_response.get("partial_failure_warning", None)
  329. if warning_msgs:
  330. warnings.warn(warning_msgs)
  331. if warn_data_truncation:
  332. # Print warnings if data is truncated at the data source.
  333. num_after_truncation = api_response["num_after_truncation"]
  334. total = api_response["total"]
  335. if total > num_after_truncation:
  336. # NOTE(rickyyx): For now, there's not much users
  337. # could do (neither can we), with hard truncation.
  338. # Unless we allow users to set a higher
  339. # `RAY_MAX_LIMIT_FROM_DATA_SOURCE`, the data will
  340. # always be truncated at the data source.
  341. warnings.warn(
  342. (
  343. "The returned data may contain incomplete result. "
  344. f"{num_after_truncation} ({total} total from the cluster) "
  345. f"{resource.value} are retrieved from the data source. "
  346. f"{total - num_after_truncation} entries have been truncated. "
  347. f"Max of {num_after_truncation} entries are retrieved "
  348. "from data source to prevent over-sized payloads."
  349. ),
  350. )
  351. if warn_limit:
  352. # Print warnings if return data is limited at the API server due to
  353. # limit enforced at the server side
  354. num_filtered = api_response["num_filtered"]
  355. data = api_response["result"]
  356. if num_filtered > len(data):
  357. warnings.warn(
  358. (
  359. f"Limit last {len(data)} entries "
  360. f"(Total {num_filtered}). Use `--filter` to reduce "
  361. "the amount of data to return or "
  362. "setting a higher limit with `--limit` to see all data. "
  363. ),
  364. )
  365. if warn_server_side_warnings:
  366. # Print the additional warnings.
  367. warnings_to_print = api_response.get("warnings", [])
  368. if warnings_to_print:
  369. for warning_to_print in warnings_to_print:
  370. warnings.warn(warning_to_print)
  371. def _raise_on_missing_output(self, resource: StateResource, api_response: dict):
  372. """Raise an exception when the API resopnse contains a missing output.
  373. Output can be missing if (1) Failures on some of data source queries (e.g.,
  374. `ray list tasks` queries all raylets, and if some of queries fail, it will
  375. contain missing output. If all queries fail, it will just fail). (2) Data
  376. is truncated because the output is too large.
  377. Args:
  378. resource: Resource names, i.e. 'jobs', 'actors', 'nodes',
  379. see `StateResource` for details.
  380. api_response: The dictionarified `ListApiResponse` or `SummaryApiResponse`.
  381. """
  382. # Raise an exception if there are partial failures that cause missing output.
  383. warning_msgs = api_response.get("partial_failure_warning", None)
  384. if warning_msgs:
  385. raise RayStateApiException(
  386. f"Failed to retrieve all {resource.value} from the cluster because"
  387. "they are not reachable due to query failures to the data sources. "
  388. "To avoid raising an exception and allow having missing output, "
  389. "set `raise_on_missing_output=False`. "
  390. )
  391. # Raise an exception is there is data truncation that cause missing output.
  392. total = api_response["total"]
  393. num_after_truncation = api_response["num_after_truncation"]
  394. if total != num_after_truncation:
  395. raise RayStateApiException(
  396. f"Failed to retrieve all {total} {resource.value} from the cluster "
  397. "because they are not reachable due to data truncation. It happens "
  398. "when the returned data is too large "
  399. # When the data is truncated, the truncation
  400. # threshold == num_after_truncation. We cannot set this to env
  401. # var because the CLI side might not have the correct env var.
  402. f"(> {num_after_truncation}) "
  403. "To avoid raising an exception and allow having missing output, "
  404. "set `raise_on_missing_output=False`. "
  405. )
  406. def list(
  407. self,
  408. resource: StateResource,
  409. options: ListApiOptions,
  410. raise_on_missing_output: bool,
  411. _explain: bool = False,
  412. ) -> List[
  413. Union[
  414. ActorState,
  415. JobState,
  416. NodeState,
  417. TaskState,
  418. ObjectState,
  419. PlacementGroupState,
  420. RuntimeEnvState,
  421. WorkerState,
  422. ClusterEventState,
  423. ]
  424. ]:
  425. """List resources states
  426. Args:
  427. resource: Resource names, i.e. 'jobs', 'actors', 'nodes',
  428. see `StateResource` for details.
  429. options: List options. See `ListApiOptions` for details.
  430. raise_on_missing_output: When True, raise an exception if the output
  431. is incomplete. Output can be incomplete if
  432. (1) there's a partial network failure when the source is distributed.
  433. (2) data is truncated because it is too large.
  434. Set it to False to avoid throwing an exception on missing data.
  435. _explain: Print the API information such as API
  436. latency or failed query information.
  437. Returns:
  438. A list of queried result from `ListApiResponse`,
  439. Raises:
  440. Exception: This doesn't catch any exceptions raised when the
  441. underlying request call raises exceptions. For example, it could
  442. raise `requests.Timeout` when timeout occurs.
  443. """
  444. if options.has_conflicting_filters():
  445. # return early with empty list when there are conflicting filters
  446. return []
  447. endpoint = f"/api/v0/{resource.value}"
  448. params = self._make_param(options)
  449. list_api_response = self._make_http_get_request(
  450. endpoint=endpoint,
  451. params=params,
  452. timeout=options.timeout,
  453. _explain=_explain,
  454. )
  455. if raise_on_missing_output:
  456. self._raise_on_missing_output(resource, list_api_response)
  457. if _explain:
  458. self._print_api_warning(resource, list_api_response)
  459. return [dict_to_state(d, resource) for d in list_api_response["result"]]
  460. def summary(
  461. self,
  462. resource: SummaryResource,
  463. *,
  464. options: SummaryApiOptions,
  465. raise_on_missing_output: bool,
  466. _explain: bool = False,
  467. ) -> Dict:
  468. """Summarize resources states
  469. Args:
  470. resource_name: Resource names,
  471. see `SummaryResource` for details.
  472. options: summary options. See `SummaryApiOptions` for details.
  473. raise_on_missing_output: Raise an exception if the output has missing data.
  474. Output can have missing data if (1) there's a partial network failure
  475. when the source is distributed. (2) data is truncated
  476. because it is too large.
  477. _explain: Print the API information such as API
  478. latency or failed query information.
  479. Returns:
  480. A dictionary of queried result from `SummaryApiResponse`.
  481. Raises:
  482. Exception: This doesn't catch any exceptions raised when the
  483. underlying request call raises exceptions. For example, it could
  484. raise `requests.Timeout` when timeout occurs.
  485. """
  486. params = {"timeout": options.timeout}
  487. endpoint = f"/api/v0/{resource.value}/summarize"
  488. summary_api_response = self._make_http_get_request(
  489. endpoint=endpoint,
  490. params=params,
  491. timeout=options.timeout,
  492. _explain=_explain,
  493. )
  494. if raise_on_missing_output:
  495. self._raise_on_missing_output(resource, summary_api_response)
  496. if _explain:
  497. # There's no limit applied to summary, so we shouldn't warn.
  498. self._print_api_warning(resource, summary_api_response, warn_limit=False)
  499. return summary_api_response["result"]["node_id_to_summary"]
  500. @DeveloperAPI
  501. def get_actor(
  502. id: str,
  503. address: Optional[str] = None,
  504. timeout: int = DEFAULT_RPC_TIMEOUT,
  505. _explain: bool = False,
  506. ) -> Optional[ActorState]:
  507. """Get an actor by id.
  508. Args:
  509. id: Id of the actor
  510. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  511. If None, it will be resolved automatically from an initialized ray.
  512. timeout: Max timeout value for the state API requests made.
  513. _explain: Print the API information such as API latency or
  514. failed query information.
  515. Returns:
  516. None if actor not found, or
  517. :class:`ActorState <ray.util.state.common.ActorState>`.
  518. Raises:
  519. RayStateApiException: if the CLI failed to query the data.
  520. """ # noqa: E501
  521. return StateApiClient(address=address).get(
  522. StateResource.ACTORS, id, GetApiOptions(timeout=timeout), _explain=_explain
  523. )
  524. @DeveloperAPI
  525. def get_job(
  526. id: str,
  527. address: Optional[str] = None,
  528. timeout: int = DEFAULT_RPC_TIMEOUT,
  529. _explain: bool = False,
  530. ) -> Optional[JobState]:
  531. """Get a submission job detail by id.
  532. Args:
  533. id: Submission ID obtained from job API.
  534. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  535. If None, it will be resolved automatically from an initialized ray.
  536. timeout: Max timeout value for the state API requests made.
  537. _explain: Print the API information such as API latency or
  538. failed query information.
  539. Returns:
  540. None if job not found, or
  541. :class:`JobState <ray.util.state.common.JobState>`.
  542. Raises:
  543. RayStateApiException: if the CLI failed to query the data.
  544. """ # noqa: E501
  545. return StateApiClient(address=address).get(
  546. StateResource.JOBS,
  547. id,
  548. GetApiOptions(timeout=timeout),
  549. _explain=_explain,
  550. )
  551. @DeveloperAPI
  552. def get_placement_group(
  553. id: str,
  554. address: Optional[str] = None,
  555. timeout: int = DEFAULT_RPC_TIMEOUT,
  556. _explain: bool = False,
  557. ) -> Optional[PlacementGroupState]:
  558. """Get a placement group by id.
  559. Args:
  560. id: Id of the placement group
  561. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  562. If None, it will be resolved automatically from an initialized ray.
  563. timeout: Max timeout value for the state APIs requests made.
  564. _explain: Print the API information such as API latency or
  565. failed query information.
  566. Returns:
  567. None if actor not found, or
  568. :class:`~ray.util.state.common.PlacementGroupState`.
  569. Raises:
  570. RayStateApiException: if the CLI failed to query the data.
  571. """ # noqa: E501
  572. return StateApiClient(address=address).get(
  573. StateResource.PLACEMENT_GROUPS,
  574. id,
  575. GetApiOptions(timeout=timeout),
  576. _explain=_explain,
  577. )
  578. @DeveloperAPI
  579. def get_node(
  580. id: str,
  581. address: Optional[str] = None,
  582. timeout: int = DEFAULT_RPC_TIMEOUT,
  583. _explain: bool = False,
  584. ) -> Optional[NodeState]:
  585. """Get a node by id.
  586. Args:
  587. id: Id of the node.
  588. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  589. If None, it will be resolved automatically from an initialized ray.
  590. timeout: Max timeout value for the state APIs requests made.
  591. _explain: Print the API information such as API latency or
  592. failed query information.
  593. Returns:
  594. None if actor not found, or
  595. :class:`NodeState <ray.util.state.common.NodeState>`.
  596. Raises:
  597. RayStateApiException: if the CLI is failed to query the data.
  598. """ # noqa: E501
  599. return StateApiClient(address=address).get(
  600. StateResource.NODES,
  601. id,
  602. GetApiOptions(timeout=timeout),
  603. _explain=_explain,
  604. )
  605. @DeveloperAPI
  606. def get_worker(
  607. id: str,
  608. address: Optional[str] = None,
  609. timeout: int = DEFAULT_RPC_TIMEOUT,
  610. _explain: bool = False,
  611. ) -> Optional[WorkerState]:
  612. """Get a worker by id.
  613. Args:
  614. id: Id of the worker
  615. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  616. If None, it will be resolved automatically from an initialized ray.
  617. timeout: Max timeout value for the state APIs requests made.
  618. _explain: Print the API information such as API latency or
  619. failed query information.
  620. Returns:
  621. None if actor not found, or
  622. :class:`WorkerState <ray.util.state.common.WorkerState>`.
  623. Raises:
  624. RayStateApiException: if the CLI failed to query the data.
  625. """ # noqa: E501
  626. return StateApiClient(address=address).get(
  627. StateResource.WORKERS,
  628. id,
  629. GetApiOptions(timeout=timeout),
  630. _explain=_explain,
  631. )
  632. @DeveloperAPI
  633. def get_task(
  634. id: Union[str, "ray.ObjectRef"],
  635. address: Optional[str] = None,
  636. timeout: int = DEFAULT_RPC_TIMEOUT,
  637. _explain: bool = False,
  638. ) -> Optional[TaskState]:
  639. """Get task attempts of a task by id.
  640. Args:
  641. id: String id of the task or ObjectRef that corresponds to task
  642. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  643. If None, it will be resolved automatically from an initialized ray.
  644. timeout: Max timeout value for the state APIs requests made.
  645. _explain: Print the API information such as API latency or
  646. failed query information.
  647. Returns:
  648. None if task not found, or a list of
  649. :class:`~ray.util.state.common.TaskState`
  650. from the task attempts.
  651. Raises:
  652. RayStateApiException: if the CLI failed to query the data.
  653. """ # noqa: E501
  654. str_id: str
  655. if isinstance(id, str):
  656. str_id = id
  657. else:
  658. str_id = id.task_id().hex()
  659. return StateApiClient(address=address).get(
  660. StateResource.TASKS,
  661. str_id,
  662. GetApiOptions(timeout=timeout),
  663. _explain=_explain,
  664. )
  665. @DeveloperAPI
  666. def get_objects(
  667. id: str,
  668. address: Optional[str] = None,
  669. timeout: int = DEFAULT_RPC_TIMEOUT,
  670. _explain: bool = False,
  671. ) -> List[ObjectState]:
  672. """Get objects by id.
  673. There could be more than 1 entry returned since an object could be
  674. referenced at different places.
  675. Args:
  676. id: Id of the object
  677. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  678. If None, it will be resolved automatically from an initialized ray.
  679. timeout: Max timeout value for the state APIs requests made.
  680. _explain: Print the API information such as API latency or
  681. failed query information.
  682. Returns:
  683. List of
  684. :class:`~ray.util.state.common.ObjectState`.
  685. Raises:
  686. RayStateApiException: if the CLI failed to query the data.
  687. """ # noqa: E501
  688. return StateApiClient(address=address).get(
  689. StateResource.OBJECTS,
  690. id,
  691. GetApiOptions(timeout=timeout),
  692. _explain=_explain,
  693. )
  694. @DeveloperAPI
  695. def list_actors(
  696. address: Optional[str] = None,
  697. filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = None,
  698. limit: int = DEFAULT_LIMIT,
  699. timeout: int = DEFAULT_RPC_TIMEOUT,
  700. detail: bool = False,
  701. raise_on_missing_output: bool = True,
  702. _explain: bool = False,
  703. ) -> List[ActorState]:
  704. """List actors in the cluster.
  705. Args:
  706. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  707. If None, it will be resolved automatically from an initialized ray.
  708. filters: List of tuples of filter key, predicate (=, or !=), and
  709. the filter value. E.g., `("id", "=", "abcd")`
  710. String filter values are case-insensitive.
  711. limit: Max number of entries returned by the state backend.
  712. timeout: Max timeout value for the state APIs requests made.
  713. detail: When True, more details info (specified in `ActorState`)
  714. will be queried and returned. See
  715. :class:`ActorState <ray.util.state.common.ActorState>`.
  716. raise_on_missing_output: When True, exceptions will be raised if
  717. there is missing data due to truncation/data source unavailable.
  718. _explain: Print the API information such as API latency or
  719. failed query information.
  720. Returns:
  721. List of
  722. :class:`ActorState <ray.util.state.common.ActorState>`.
  723. Raises:
  724. RayStateApiException: if the CLI failed to query the data.
  725. """ # noqa: E501
  726. return StateApiClient(address=address).list(
  727. StateResource.ACTORS,
  728. options=ListApiOptions(
  729. limit=limit,
  730. timeout=timeout,
  731. filters=filters,
  732. detail=detail,
  733. ),
  734. raise_on_missing_output=raise_on_missing_output,
  735. _explain=_explain,
  736. )
  737. @DeveloperAPI
  738. def list_placement_groups(
  739. address: Optional[str] = None,
  740. filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = None,
  741. limit: int = DEFAULT_LIMIT,
  742. timeout: int = DEFAULT_RPC_TIMEOUT,
  743. detail: bool = False,
  744. raise_on_missing_output: bool = True,
  745. _explain: bool = False,
  746. ) -> List[PlacementGroupState]:
  747. """List placement groups in the cluster.
  748. Args:
  749. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  750. If None, it will be resolved automatically from an initialized ray.
  751. filters: List of tuples of filter key, predicate (=, or !=), and
  752. the filter value. E.g., `("state", "=", "abcd")`
  753. String filter values are case-insensitive.
  754. limit: Max number of entries returned by the state backend.
  755. timeout: Max timeout value for the state APIs requests made.
  756. detail: When True, more details info (specified in `PlacementGroupState`)
  757. will be queried and returned. See
  758. :class:`~ray.util.state.common.PlacementGroupState`.
  759. raise_on_missing_output: When True, exceptions will be raised if
  760. there is missing data due to truncation/data source unavailable.
  761. _explain: Print the API information such as API latency or
  762. failed query information.
  763. Returns:
  764. List of :class:`~ray.util.state.common.PlacementGroupState`.
  765. Raises:
  766. RayStateApiException: if the CLI failed to query the data.
  767. """ # noqa: E501
  768. return StateApiClient(address=address).list(
  769. StateResource.PLACEMENT_GROUPS,
  770. options=ListApiOptions(
  771. limit=limit, timeout=timeout, filters=filters, detail=detail
  772. ),
  773. raise_on_missing_output=raise_on_missing_output,
  774. _explain=_explain,
  775. )
  776. @DeveloperAPI
  777. def list_nodes(
  778. address: Optional[str] = None,
  779. filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = None,
  780. limit: int = DEFAULT_LIMIT,
  781. timeout: int = DEFAULT_RPC_TIMEOUT,
  782. detail: bool = False,
  783. raise_on_missing_output: bool = True,
  784. _explain: bool = False,
  785. ) -> List[NodeState]:
  786. """List nodes in the cluster.
  787. Args:
  788. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  789. If None, it will be resolved automatically from an initialized ray.
  790. filters: List of tuples of filter key, predicate (=, or !=), and
  791. the filter value. E.g., `("node_name", "=", "abcd")`
  792. String filter values are case-insensitive.
  793. limit: Max number of entries returned by the state backend.
  794. timeout: Max timeout value for the state APIs requests made.
  795. detail: When True, more details info (specified in `NodeState`)
  796. will be queried and returned. See
  797. :class:`NodeState <ray.util.state.common.NodeState>`.
  798. raise_on_missing_output: When True, exceptions will be raised if
  799. there is missing data due to truncation/data source unavailable.
  800. _explain: Print the API information such as API latency or
  801. failed query information.
  802. Returns:
  803. List of dictionarified
  804. :class:`NodeState <ray.util.state.common.NodeState>`.
  805. Raises:
  806. RayStateApiException: if the CLI failed to query the data.
  807. """ # noqa: E501
  808. return StateApiClient(address=address).list(
  809. StateResource.NODES,
  810. options=ListApiOptions(
  811. limit=limit, timeout=timeout, filters=filters, detail=detail
  812. ),
  813. raise_on_missing_output=raise_on_missing_output,
  814. _explain=_explain,
  815. )
  816. @DeveloperAPI
  817. def list_jobs(
  818. address: Optional[str] = None,
  819. filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = None,
  820. limit: int = DEFAULT_LIMIT,
  821. timeout: int = DEFAULT_RPC_TIMEOUT,
  822. detail: bool = False,
  823. raise_on_missing_output: bool = True,
  824. _explain: bool = False,
  825. ) -> List[JobState]:
  826. """List jobs submitted to the cluster by :ref:`ray job submission <jobs-overview>`.
  827. Args:
  828. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  829. If None, it will be resolved automatically from an initialized ray.
  830. filters: List of tuples of filter key, predicate (=, or !=), and
  831. the filter value. E.g., `("status", "=", "abcd")`
  832. String filter values are case-insensitive.
  833. limit: Max number of entries returned by the state backend.
  834. timeout: Max timeout value for the state APIs requests made.
  835. detail: When True, more details info (specified in `JobState`)
  836. will be queried and returned. See
  837. :class:`JobState <ray.util.state.common.JobState>`.
  838. raise_on_missing_output: When True, exceptions will be raised if
  839. there is missing data due to truncation/data source unavailable.
  840. _explain: Print the API information such as API latency or
  841. failed query information.
  842. Returns:
  843. List of dictionarified
  844. :class:`JobState <ray.util.state.common.JobState>`.
  845. Raises:
  846. RayStateApiException: if the CLI failed to query the data.
  847. """ # noqa: E501
  848. return StateApiClient(address=address).list(
  849. StateResource.JOBS,
  850. options=ListApiOptions(
  851. limit=limit, timeout=timeout, filters=filters, detail=detail
  852. ),
  853. raise_on_missing_output=raise_on_missing_output,
  854. _explain=_explain,
  855. )
  856. @DeveloperAPI
  857. def list_workers(
  858. address: Optional[str] = None,
  859. filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = None,
  860. limit: int = DEFAULT_LIMIT,
  861. timeout: int = DEFAULT_RPC_TIMEOUT,
  862. detail: bool = False,
  863. raise_on_missing_output: bool = True,
  864. _explain: bool = False,
  865. ) -> List[WorkerState]:
  866. """List workers in the cluster.
  867. Args:
  868. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  869. If None, it will be resolved automatically from an initialized ray.
  870. filters: List of tuples of filter key, predicate (=, or !=), and
  871. the filter value. E.g., `("is_alive", "=", "True")`
  872. String filter values are case-insensitive.
  873. limit: Max number of entries returned by the state backend.
  874. timeout: Max timeout value for the state APIs requests made.
  875. detail: When True, more details info (specified in `WorkerState`)
  876. will be queried and returned. See
  877. :class:`WorkerState <ray.util.state.common.WorkerState>`.
  878. raise_on_missing_output: When True, exceptions will be raised if
  879. there is missing data due to truncation/data source unavailable.
  880. _explain: Print the API information such as API latency or
  881. failed query information.
  882. Returns:
  883. List of
  884. :class:`WorkerState <ray.util.state.common.WorkerState>`.
  885. Raises:
  886. RayStateApiException: if the CLI failed to query the data.
  887. """ # noqa: E501
  888. return StateApiClient(address=address).list(
  889. StateResource.WORKERS,
  890. options=ListApiOptions(
  891. limit=limit, timeout=timeout, filters=filters, detail=detail
  892. ),
  893. raise_on_missing_output=raise_on_missing_output,
  894. _explain=_explain,
  895. )
  896. @DeveloperAPI
  897. def list_tasks(
  898. address: Optional[str] = None,
  899. filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = None,
  900. limit: int = DEFAULT_LIMIT,
  901. timeout: int = DEFAULT_RPC_TIMEOUT,
  902. detail: bool = False,
  903. raise_on_missing_output: bool = True,
  904. _explain: bool = False,
  905. ) -> List[TaskState]:
  906. """List tasks in the cluster.
  907. Args:
  908. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  909. If None, it will be resolved automatically from an initialized ray.
  910. filters: List of tuples of filter key, predicate (=, or !=), and
  911. the filter value. E.g., `("is_alive", "=", "True")`
  912. String filter values are case-insensitive.
  913. limit: Max number of entries returned by the state backend.
  914. timeout: Max timeout value for the state APIs requests made.
  915. detail: When True, more details info (specified in `TaskState`)
  916. will be queried and returned. See
  917. :class:`TaskState <ray.util.state.common.TaskState>`.
  918. raise_on_missing_output: When True, exceptions will be raised if
  919. there is missing data due to truncation/data source unavailable.
  920. _explain: Print the API information such as API latency or
  921. failed query information.
  922. Returns:
  923. List of
  924. :class:`TaskState <ray.util.state.common.TaskState>`.
  925. Raises:
  926. RayStateApiException: if the CLI failed to query the data.
  927. """ # noqa: E501
  928. return StateApiClient(address=address).list(
  929. StateResource.TASKS,
  930. options=ListApiOptions(
  931. limit=limit, timeout=timeout, filters=filters, detail=detail
  932. ),
  933. raise_on_missing_output=raise_on_missing_output,
  934. _explain=_explain,
  935. )
  936. @DeveloperAPI
  937. def list_objects(
  938. address: Optional[str] = None,
  939. filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = None,
  940. limit: int = DEFAULT_LIMIT,
  941. timeout: int = DEFAULT_RPC_TIMEOUT,
  942. detail: bool = False,
  943. raise_on_missing_output: bool = True,
  944. _explain: bool = False,
  945. ) -> List[ObjectState]:
  946. """List objects in the cluster.
  947. Args:
  948. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  949. If None, it will be resolved automatically from an initialized ray.
  950. filters: List of tuples of filter key, predicate (=, or !=), and
  951. the filter value. E.g., `("ip", "=", "0.0.0.0")`
  952. String filter values are case-insensitive.
  953. limit: Max number of entries returned by the state backend.
  954. timeout: Max timeout value for the state APIs requests made.
  955. detail: When True, more details info (specified in `ObjectState`)
  956. will be queried and returned. See
  957. :class:`ObjectState <ray.util.state.common.ObjectState>`.
  958. raise_on_missing_output: When True, exceptions will be raised if
  959. there is missing data due to truncation/data source unavailable.
  960. _explain: Print the API information such as API latency or
  961. failed query information.
  962. Returns:
  963. List of
  964. :class:`ObjectState <ray.util.state.common.ObjectState>`.
  965. Raises:
  966. RayStateApiException: if the CLI failed to query the data.
  967. """ # noqa: E501
  968. return StateApiClient(address=address).list(
  969. StateResource.OBJECTS,
  970. options=ListApiOptions(
  971. limit=limit, timeout=timeout, filters=filters, detail=detail
  972. ),
  973. raise_on_missing_output=raise_on_missing_output,
  974. _explain=_explain,
  975. )
  976. @DeveloperAPI
  977. def list_runtime_envs(
  978. address: Optional[str] = None,
  979. filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = None,
  980. limit: int = DEFAULT_LIMIT,
  981. timeout: int = DEFAULT_RPC_TIMEOUT,
  982. detail: bool = False,
  983. raise_on_missing_output: bool = True,
  984. _explain: bool = False,
  985. ) -> List[RuntimeEnvState]:
  986. """List runtime environments in the cluster.
  987. Args:
  988. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  989. If None, it will be resolved automatically from an initialized ray.
  990. filters: List of tuples of filter key, predicate (=, or !=), and
  991. the filter value. E.g., `("node_id", "=", "abcdef")`
  992. String filter values are case-insensitive.
  993. limit: Max number of entries returned by the state backend.
  994. timeout: Max timeout value for the state APIs requests made.
  995. detail: When True, more details info (specified in `RuntimeEnvState`)
  996. will be queried and returned. See
  997. :class:`RuntimeEnvState <ray.util.state.common.RuntimeEnvState>`.
  998. raise_on_missing_output: When True, exceptions will be raised if
  999. there is missing data due to truncation/data source unavailable.
  1000. _explain: Print the API information such as API latency or
  1001. failed query information.
  1002. Returns:
  1003. List of
  1004. :class:`RuntimeEnvState <ray.util.state.common.RuntimeEnvState>`.
  1005. Raises:
  1006. RayStateApiException: if the CLI failed to query the data.
  1007. """ # noqa: E501
  1008. return StateApiClient(address=address).list(
  1009. StateResource.RUNTIME_ENVS,
  1010. options=ListApiOptions(
  1011. limit=limit, timeout=timeout, filters=filters, detail=detail
  1012. ),
  1013. raise_on_missing_output=raise_on_missing_output,
  1014. _explain=_explain,
  1015. )
  1016. @DeveloperAPI
  1017. def list_cluster_events(
  1018. address: Optional[str] = None,
  1019. filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = None,
  1020. limit: int = DEFAULT_LIMIT,
  1021. timeout: int = DEFAULT_RPC_TIMEOUT,
  1022. detail: bool = False,
  1023. raise_on_missing_output: bool = True,
  1024. _explain: bool = False,
  1025. ) -> List[Dict]:
  1026. return StateApiClient(address=address).list(
  1027. StateResource.CLUSTER_EVENTS,
  1028. options=ListApiOptions(
  1029. limit=limit, timeout=timeout, filters=filters, detail=detail
  1030. ),
  1031. raise_on_missing_output=raise_on_missing_output,
  1032. _explain=_explain,
  1033. )
  1034. """
  1035. Log APIs
  1036. """
  1037. @DeveloperAPI
  1038. def get_log(
  1039. address: Optional[str] = None,
  1040. node_id: Optional[str] = None,
  1041. node_ip: Optional[str] = None,
  1042. filename: Optional[str] = None,
  1043. actor_id: Optional[str] = None,
  1044. task_id: Optional[str] = None,
  1045. pid: Optional[int] = None,
  1046. follow: bool = False,
  1047. tail: int = -1,
  1048. timeout: int = DEFAULT_RPC_TIMEOUT,
  1049. suffix: str = "out",
  1050. encoding: Optional[str] = "utf-8",
  1051. errors: Optional[str] = "strict",
  1052. submission_id: Optional[str] = None,
  1053. attempt_number: int = 0,
  1054. _interval: Optional[float] = None,
  1055. filter_ansi_code: bool = False,
  1056. ) -> Generator[str, None, None]:
  1057. """Retrieve log file based on file name or some entities ids (pid, actor id, task id).
  1058. Examples:
  1059. .. testcode::
  1060. :hide:
  1061. import ray
  1062. import time
  1063. ray.shutdown()
  1064. ray.init()
  1065. # Wait for the node to be registered to the dashboard
  1066. time.sleep(5)
  1067. .. testcode::
  1068. import ray
  1069. from ray.util.state import get_log
  1070. # Node id could be retrieved from list_nodes() or ray.nodes()
  1071. node_id = ray.nodes()[0]["NodeID"]
  1072. filename = "raylet.out"
  1073. for l in get_log(filename=filename, node_id=node_id):
  1074. print(l)
  1075. .. testoutput::
  1076. :options: +MOCK
  1077. [2023-05-19 12:35:18,347 I 4259 68399276] (raylet) io_service_pool.cc:35: IOServicePool is running with 1 io_service.
  1078. [2023-05-19 12:35:18,348 I 4259 68399276] (raylet) store_runner.cc:32: Allowing the Plasma store to use up to 2.14748GB of memory.
  1079. [2023-05-19 12:35:18,348 I 4259 68399276] (raylet) store_runner.cc:48: Starting object store with directory /tmp, fallback /tmp/ray, and huge page support disabled
  1080. Args:
  1081. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  1082. If not specified, it will be retrieved from the initialized ray cluster.
  1083. node_id: Id of the node containing the logs .
  1084. node_ip: Ip of the node containing the logs. (At least one of the node_id and
  1085. node_ip have to be supplied when identifying a node).
  1086. filename: Name of the file (relative to the ray log directory) to be retrieved.
  1087. actor_id: Id of the actor if getting logs from an actor.
  1088. task_id: Id of the task if getting logs from a non concurrent actor.
  1089. For concurrent actor, please query the log with actor_id.
  1090. pid: PID of the worker if getting logs generated by a worker. When querying
  1091. with pid, either node_id or node_ip must be supplied.
  1092. follow: When set to True, logs will be streamed and followed.
  1093. tail: Number of lines to get from the end of the log file. Set to -1 for getting
  1094. the entire log.
  1095. timeout: Max timeout for requests made when getting the logs.
  1096. suffix: The suffix of the log file if query by id of tasks/workers/actors. Default to "out".
  1097. encoding: The encoding used to decode the content of the log file. Default is
  1098. "utf-8". Use None to get binary data directly.
  1099. errors: The error handling scheme to use for decoding errors. Default is
  1100. "strict". See https://docs.python.org/3/library/codecs.html#error-handlers
  1101. submission_id: Job submission ID if getting log from a submission job.
  1102. attempt_number: The attempt number of the task if getting logs generated by a task.
  1103. _interval: The interval in secs to print new logs when `follow=True`.
  1104. filter_ansi_code: A boolean flag for determining whether to filter ANSI escape codes.
  1105. Setting to `True` removes ANSI escape codes from the output. The default value is `False`.
  1106. Return:
  1107. A Generator of log line, None for SendType and ReturnType.
  1108. Raises:
  1109. RayStateApiException: if the CLI failed to query the data.
  1110. """ # noqa: E501
  1111. api_server_url = ray_address_to_api_server_url(address)
  1112. media_type = "stream" if follow else "file"
  1113. options = GetLogOptions(
  1114. node_id=node_id,
  1115. node_ip=node_ip,
  1116. filename=filename,
  1117. actor_id=actor_id,
  1118. task_id=task_id,
  1119. pid=pid,
  1120. lines=tail,
  1121. interval=_interval,
  1122. media_type=media_type,
  1123. timeout=timeout,
  1124. suffix=suffix,
  1125. submission_id=submission_id,
  1126. attempt_number=attempt_number,
  1127. )
  1128. options_dict = {}
  1129. for field in fields(options):
  1130. option_val = getattr(options, field.name)
  1131. if option_val is not None:
  1132. options_dict[field.name] = option_val
  1133. if filter_ansi_code is not None:
  1134. options_dict["filter_ansi_code"] = filter_ansi_code
  1135. with requests.get(
  1136. f"{api_server_url}/api/v0/logs/{media_type}?"
  1137. f"{urllib.parse.urlencode(options_dict)}",
  1138. stream=True,
  1139. headers=get_auth_headers_if_auth_enabled({}),
  1140. ) as r:
  1141. if r.status_code != 200:
  1142. raise RayStateApiException(r.text)
  1143. for chunk in r.iter_content(chunk_size=None):
  1144. if encoding is not None:
  1145. chunk = chunk.decode(encoding=encoding, errors=errors)
  1146. yield chunk
  1147. @DeveloperAPI
  1148. def list_logs(
  1149. address: Optional[str] = None,
  1150. node_id: Optional[str] = None,
  1151. node_ip: Optional[str] = None,
  1152. glob_filter: Optional[str] = None,
  1153. timeout: int = DEFAULT_RPC_TIMEOUT,
  1154. ) -> Dict[str, List[str]]:
  1155. """Listing log files available.
  1156. Args:
  1157. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  1158. If not specified, it will be retrieved from the initialized ray cluster.
  1159. node_id: Id of the node containing the logs.
  1160. node_ip: Ip of the node containing the logs.
  1161. glob_filter: Name of the file (relative to the ray log directory) to be
  1162. retrieved. E.g. `glob_filter="*worker*"` for all worker logs.
  1163. actor_id: Id of the actor if getting logs from an actor.
  1164. timeout: Max timeout for requests made when getting the logs.
  1165. _interval: The interval in secs to print new logs when `follow=True`.
  1166. Return:
  1167. A dictionary where the keys are log groups (e.g. gcs, raylet, worker), and
  1168. values are list of log filenames.
  1169. Raises:
  1170. RayStateApiException: if the CLI failed to query the data, or ConnectionError if
  1171. failed to resolve the ray address.
  1172. """ # noqa: E501
  1173. assert (
  1174. node_ip is not None or node_id is not None
  1175. ), "At least one of node ip and node id is required"
  1176. api_server_url = ray_address_to_api_server_url(address)
  1177. if not glob_filter:
  1178. glob_filter = "*"
  1179. options_dict = {}
  1180. if node_ip:
  1181. options_dict["node_ip"] = node_ip
  1182. if node_id:
  1183. options_dict["node_id"] = node_id
  1184. if glob_filter:
  1185. options_dict["glob"] = glob_filter
  1186. options_dict["timeout"] = timeout
  1187. r = requests.get(
  1188. f"{api_server_url}/api/v0/logs?{urllib.parse.urlencode(options_dict)}",
  1189. headers=get_auth_headers_if_auth_enabled({}),
  1190. )
  1191. # TODO(rickyx): we could do better at error handling here.
  1192. r.raise_for_status()
  1193. response = r.json()
  1194. if response["result"] is False:
  1195. raise RayStateApiException(
  1196. "API server internal error. See dashboard.log file for more details. "
  1197. f"Error: {response['msg']}"
  1198. )
  1199. return response["data"]["result"]
  1200. """
  1201. Summary APIs
  1202. """
  1203. @DeveloperAPI
  1204. def summarize_tasks(
  1205. address: Optional[str] = None,
  1206. timeout: int = DEFAULT_RPC_TIMEOUT,
  1207. raise_on_missing_output: bool = True,
  1208. _explain: bool = False,
  1209. ) -> Dict:
  1210. """Summarize the tasks in cluster.
  1211. Args:
  1212. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  1213. If None, it will be resolved automatically from an initialized ray.
  1214. timeout: Max timeout for requests made when getting the states.
  1215. raise_on_missing_output: When True, exceptions will be raised if
  1216. there is missing data due to truncation/data source unavailable.
  1217. _explain: Print the API information such as API latency or
  1218. failed query information.
  1219. Return:
  1220. Dictionarified
  1221. :class:`~ray.util.state.common.TaskSummaries`
  1222. Raises:
  1223. RayStateApiException: if the CLI is failed to query the data.
  1224. """ # noqa: E501
  1225. return StateApiClient(address=address).summary(
  1226. SummaryResource.TASKS,
  1227. options=SummaryApiOptions(timeout=timeout),
  1228. raise_on_missing_output=raise_on_missing_output,
  1229. _explain=_explain,
  1230. )
  1231. @DeveloperAPI
  1232. def summarize_actors(
  1233. address: Optional[str] = None,
  1234. timeout: int = DEFAULT_RPC_TIMEOUT,
  1235. raise_on_missing_output: bool = True,
  1236. _explain: bool = False,
  1237. ) -> Dict:
  1238. """Summarize the actors in cluster.
  1239. Args:
  1240. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  1241. If None, it will be resolved automatically from an initialized ray.
  1242. timeout: Max timeout for requests made when getting the states.
  1243. raise_on_missing_output: When True, exceptions will be raised if
  1244. there is missing data due to truncation/data source unavailable.
  1245. _explain: Print the API information such as API latency or
  1246. failed query information.
  1247. Return:
  1248. Dictionarified
  1249. :class:`~ray.util.state.common.ActorSummaries`
  1250. Raises:
  1251. RayStateApiException: if the CLI failed to query the data.
  1252. """ # noqa: E501
  1253. return StateApiClient(address=address).summary(
  1254. SummaryResource.ACTORS,
  1255. options=SummaryApiOptions(timeout=timeout),
  1256. raise_on_missing_output=raise_on_missing_output,
  1257. _explain=_explain,
  1258. )
  1259. @DeveloperAPI
  1260. def summarize_objects(
  1261. address: Optional[str] = None,
  1262. timeout: int = DEFAULT_RPC_TIMEOUT,
  1263. raise_on_missing_output: bool = True,
  1264. _explain: bool = False,
  1265. ) -> Dict:
  1266. """Summarize the objects in cluster.
  1267. Args:
  1268. address: Ray bootstrap address, could be `auto`, `localhost:6379`.
  1269. If None, it will be resolved automatically from an initialized ray.
  1270. timeout: Max timeout for requests made when getting the states.
  1271. raise_on_missing_output: When True, exceptions will be raised if
  1272. there is missing data due to truncation/data source unavailable.
  1273. _explain: Print the API information such as API latency or
  1274. failed query information.
  1275. Return:
  1276. Dictionarified :class:`~ray.util.state.common.ObjectSummaries`
  1277. Raises:
  1278. RayStateApiException: if the CLI failed to query the data.
  1279. """ # noqa: E501
  1280. return StateApiClient(address=address).summary(
  1281. SummaryResource.OBJECTS,
  1282. options=SummaryApiOptions(timeout=timeout),
  1283. raise_on_missing_output=raise_on_missing_output,
  1284. _explain=_explain,
  1285. )