utils.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
  1. import abc
  2. import asyncio
  3. import datetime
  4. import functools
  5. import importlib
  6. import json
  7. import logging
  8. import os
  9. import pkgutil
  10. from abc import ABCMeta, abstractmethod
  11. from base64 import b64decode
  12. from collections.abc import Mapping, Sequence
  13. from dataclasses import dataclass
  14. from enum import IntEnum
  15. from typing import TYPE_CHECKING, Any, Dict, List, Optional
  16. from ray._common.utils import binary_to_hex
  17. if TYPE_CHECKING:
  18. from ray.core.generated.node_manager_pb2 import GetNodeStatsReply
  19. from packaging.version import Version
  20. import ray
  21. import ray._private.protobuf_compat
  22. import ray._private.ray_constants as ray_constants
  23. import ray._private.services as services
  24. import ray.experimental.internal_kv as internal_kv
  25. from ray._common.network_utils import parse_address
  26. from ray._common.utils import get_or_create_event_loop
  27. from ray._private.gcs_utils import GcsChannel
  28. from ray._private.utils import (
  29. get_dashboard_dependency_error,
  30. split_address,
  31. )
  32. from ray._raylet import GcsClient
  33. try:
  34. create_task = asyncio.create_task
  35. except AttributeError:
  36. create_task = asyncio.ensure_future
  37. logger = logging.getLogger(__name__)
  38. class HTTPStatusCode(IntEnum):
  39. # 2xx Success
  40. OK = 200
  41. # 4xx Client Errors
  42. BAD_REQUEST = 400
  43. NOT_FOUND = 404
  44. TOO_MANY_REQUESTS = 429
  45. # 5xx Server Errors
  46. INTERNAL_ERROR = 500
  47. class FrontendNotFoundError(OSError):
  48. pass
  49. class DashboardAgentModule(abc.ABC):
  50. def __init__(self, dashboard_agent):
  51. """
  52. Initialize current module when DashboardAgent loading modules.
  53. :param dashboard_agent: The DashboardAgent instance.
  54. """
  55. self._dashboard_agent = dashboard_agent
  56. self.session_name = dashboard_agent.session_name
  57. @abc.abstractmethod
  58. async def run(self, server):
  59. """
  60. Run the module in an asyncio loop. An agent module can provide
  61. servicers to the server.
  62. :param server: Asyncio GRPC server, or None if ray is minimal.
  63. """
  64. @staticmethod
  65. @abc.abstractclassmethod
  66. def is_minimal_module():
  67. """
  68. Return True if the module is minimal, meaning it
  69. should work with `pip install ray` that doesn't requires additional
  70. dependencies.
  71. """
  72. @property
  73. def gcs_address(self):
  74. return self._dashboard_agent.gcs_address
  75. @dataclass
  76. class DashboardHeadModuleConfig:
  77. minimal: bool
  78. cluster_id_hex: str
  79. session_name: str
  80. gcs_address: str
  81. log_dir: str
  82. temp_dir: str
  83. session_dir: str
  84. ip: str
  85. http_host: str
  86. http_port: int
  87. class DashboardHeadModule(abc.ABC):
  88. def __init__(self, config: DashboardHeadModuleConfig):
  89. """
  90. Initialize current module when DashboardHead loading modules.
  91. :param config: The DashboardHeadModuleConfig instance.
  92. """
  93. self._config = config
  94. self._gcs_client = None
  95. self._aiogrpc_gcs_channel = None # lazy init
  96. self._http_session = None # lazy init
  97. @property
  98. def minimal(self):
  99. return self._config.minimal
  100. @property
  101. def session_name(self):
  102. return self._config.session_name
  103. @property
  104. def gcs_address(self):
  105. return self._config.gcs_address
  106. @property
  107. def log_dir(self):
  108. return self._config.log_dir
  109. @property
  110. def temp_dir(self):
  111. return self._config.temp_dir
  112. @property
  113. def session_dir(self):
  114. return self._config.session_dir
  115. @property
  116. def ip(self):
  117. return self._config.ip
  118. @property
  119. def http_host(self):
  120. return self._config.http_host
  121. @property
  122. def http_port(self):
  123. return self._config.http_port
  124. @property
  125. def http_session(self):
  126. assert not self._config.minimal, "http_session accessed in minimal Ray."
  127. import aiohttp
  128. if self._http_session is not None:
  129. return self._http_session
  130. # Create a http session for all modules.
  131. # aiohttp<4.0.0 uses a 'loop' variable, aiohttp>=4.0.0 doesn't anymore
  132. if Version(aiohttp.__version__) < Version("4.0.0"):
  133. self._http_session = aiohttp.ClientSession(loop=get_or_create_event_loop())
  134. else:
  135. self._http_session = aiohttp.ClientSession()
  136. return self._http_session
  137. @property
  138. def gcs_client(self):
  139. if self._gcs_client is None:
  140. self._gcs_client = GcsClient(
  141. address=self._config.gcs_address,
  142. cluster_id=self._config.cluster_id_hex,
  143. )
  144. if not internal_kv._internal_kv_initialized():
  145. internal_kv._initialize_internal_kv(self._gcs_client)
  146. return self._gcs_client
  147. @property
  148. def aiogrpc_gcs_channel(self):
  149. # TODO(ryw): once we removed the old gcs client, also remove this.
  150. if self._config.minimal:
  151. return None
  152. if self._aiogrpc_gcs_channel is None:
  153. gcs_channel = GcsChannel(gcs_address=self._config.gcs_address, aio=True)
  154. gcs_channel.connect()
  155. self._aiogrpc_gcs_channel = gcs_channel.channel()
  156. return self._aiogrpc_gcs_channel
  157. @abc.abstractmethod
  158. async def run(self):
  159. """
  160. Run the module in an asyncio loop. A head module can provide
  161. servicers to the server.
  162. """
  163. @staticmethod
  164. @abc.abstractclassmethod
  165. def is_minimal_module():
  166. """
  167. Return True if the module is minimal, meaning it
  168. should work with `pip install ray` that doesn't requires additional
  169. dependencies.
  170. """
  171. class RateLimitedModule(abc.ABC):
  172. """Simple rate limiter
  173. Inheriting from this class and decorate any class methods will
  174. apply simple rate limit.
  175. It will limit the maximal number of concurrent invocations of **all** the
  176. methods decorated.
  177. The below Example class will only allow 10 concurrent calls to A() and B()
  178. E.g.:
  179. class Example(RateLimitedModule):
  180. def __init__(self):
  181. super().__init__(max_num_call=10)
  182. @RateLimitedModule.enforce_max_concurrent_calls
  183. async def A():
  184. ...
  185. @RateLimitedModule.enforce_max_concurrent_calls
  186. async def B():
  187. ...
  188. async def limit_handler_(self):
  189. raise RuntimeError("rate limited reached!")
  190. """
  191. def __init__(self, max_num_call: int, logger: Optional[logging.Logger] = None):
  192. """
  193. Args:
  194. max_num_call: Maximal number of concurrent invocations of all decorated
  195. functions in the instance.
  196. Setting to -1 will disable rate limiting.
  197. logger: Logger
  198. """
  199. self.max_num_call_ = max_num_call
  200. self.num_call_ = 0
  201. self.logger_ = logger
  202. @staticmethod
  203. def enforce_max_concurrent_calls(func):
  204. """Decorator to enforce max number of invocations of the decorated func
  205. NOTE: This should be used as the innermost decorator if there are multiple
  206. ones.
  207. E.g., when decorating functions already with @routes.get(...), this must be
  208. added below then the routes decorators:
  209. ```
  210. @routes.get('/')
  211. @RateLimitedModule.enforce_max_concurrent_calls
  212. async def fn(self):
  213. ...
  214. ```
  215. """
  216. @functools.wraps(func)
  217. async def async_wrapper(self, *args, **kwargs):
  218. if self.max_num_call_ >= 0 and self.num_call_ >= self.max_num_call_:
  219. if self.logger_:
  220. self.logger_.warning(
  221. f"Max concurrent requests reached={self.max_num_call_}"
  222. )
  223. return await self.limit_handler_()
  224. self.num_call_ += 1
  225. try:
  226. ret = await func(self, *args, **kwargs)
  227. finally:
  228. self.num_call_ -= 1
  229. return ret
  230. # Returning closure here to avoid passing 'self' to the
  231. # 'enforce_max_concurrent_calls' decorator.
  232. return async_wrapper
  233. @abstractmethod
  234. async def limit_handler_(self):
  235. """Handler that is invoked when max number of concurrent calls reached"""
  236. def dashboard_module(enable):
  237. """A decorator for dashboard module."""
  238. def _cls_wrapper(cls):
  239. cls.__ray_dashboard_module_enable__ = enable
  240. return cls
  241. return _cls_wrapper
  242. def get_all_modules(module_type):
  243. """
  244. Get all importable modules that are subclass of a given module type.
  245. """
  246. logger.info(f"Get all modules by type: {module_type.__name__}")
  247. import ray.dashboard.modules
  248. should_only_load_minimal_modules = get_dashboard_dependency_error() is not None
  249. for module_loader, name, ispkg in pkgutil.walk_packages(
  250. ray.dashboard.modules.__path__, ray.dashboard.modules.__name__ + "."
  251. ):
  252. try:
  253. importlib.import_module(name)
  254. except ModuleNotFoundError as e:
  255. logger.info(
  256. f"Module {name} cannot be loaded because "
  257. "we cannot import all dependencies. Install this module using "
  258. "`pip install 'ray[default]'` for the full "
  259. f"dashboard functionality. Error: {e}"
  260. )
  261. if not should_only_load_minimal_modules:
  262. logger.info(
  263. "Although `pip install 'ray[default]'` is downloaded, "
  264. "module couldn't be imported`"
  265. )
  266. raise e
  267. imported_modules = []
  268. # module_type.__subclasses__() should contain modules that
  269. # we could successfully import.
  270. for m in module_type.__subclasses__():
  271. if not getattr(m, "__ray_dashboard_module_enable__", True):
  272. continue
  273. if should_only_load_minimal_modules and not m.is_minimal_module():
  274. continue
  275. imported_modules.append(m)
  276. logger.info(f"Available modules: {imported_modules}")
  277. return imported_modules
  278. def to_posix_time(dt):
  279. return (dt - datetime.datetime(1970, 1, 1)).total_seconds()
  280. def address_tuple(address):
  281. if isinstance(address, tuple):
  282. return address
  283. ip, port = parse_address(address)
  284. return ip, int(port)
  285. def node_stats_to_dict(
  286. message: "GetNodeStatsReply",
  287. ) -> Optional[Dict[str, List[Dict[str, Any]]]]:
  288. decode_keys = {
  289. "actorId",
  290. "jobId",
  291. "taskId",
  292. "parentTaskId",
  293. "sourceActorId",
  294. "callerId",
  295. "nodeId",
  296. "workerId",
  297. "placementGroupId",
  298. }
  299. core_workers_stats = message.core_workers_stats
  300. result = message_to_dict(message, decode_keys)
  301. result["coreWorkersStats"] = [
  302. message_to_dict(m, decode_keys, always_print_fields_with_no_presence=True)
  303. for m in core_workers_stats
  304. ]
  305. return result
  306. class CustomEncoder(json.JSONEncoder):
  307. def default(self, obj):
  308. if isinstance(obj, bytes):
  309. return binary_to_hex(obj)
  310. if isinstance(obj, Immutable):
  311. return obj.mutable()
  312. # Let the base class default method raise the TypeError
  313. return json.JSONEncoder.default(self, obj)
  314. def to_camel_case(snake_str):
  315. """Convert a snake str to camel case."""
  316. components = snake_str.split("_")
  317. # We capitalize the first letter of each component except the first one
  318. # with the 'title' method and join them together.
  319. return components[0] + "".join(x.title() for x in components[1:])
  320. def to_google_style(d):
  321. """Recursive convert all keys in dict to google style."""
  322. new_dict = {}
  323. for k, v in d.items():
  324. if isinstance(v, dict):
  325. new_dict[to_camel_case(k)] = to_google_style(v)
  326. elif isinstance(v, list):
  327. new_list = []
  328. for i in v:
  329. if isinstance(i, dict):
  330. new_list.append(to_google_style(i))
  331. else:
  332. new_list.append(i)
  333. new_dict[to_camel_case(k)] = new_list
  334. else:
  335. new_dict[to_camel_case(k)] = v
  336. return new_dict
  337. def message_to_dict(message, decode_keys=None, **kwargs):
  338. """Convert protobuf message to Python dict."""
  339. def _decode_keys(d):
  340. for k, v in d.items():
  341. if isinstance(v, dict):
  342. d[k] = _decode_keys(v)
  343. if isinstance(v, list):
  344. new_list = []
  345. for i in v:
  346. if isinstance(i, dict):
  347. new_list.append(_decode_keys(i))
  348. else:
  349. new_list.append(i)
  350. d[k] = new_list
  351. else:
  352. if k in decode_keys:
  353. d[k] = binary_to_hex(b64decode(v))
  354. else:
  355. d[k] = v
  356. return d
  357. d = ray._private.protobuf_compat.message_to_dict(
  358. message, use_integers_for_enums=False, **kwargs
  359. )
  360. if decode_keys:
  361. return _decode_keys(d)
  362. else:
  363. return d
  364. class Bunch(dict):
  365. """A dict with attribute-access."""
  366. def __getattr__(self, key):
  367. try:
  368. return self.__getitem__(key)
  369. except KeyError:
  370. raise AttributeError(key)
  371. def __setattr__(self, key, value):
  372. self.__setitem__(key, value)
  373. """
  374. https://docs.python.org/3/library/json.html?highlight=json#json.JSONEncoder
  375. +-------------------+---------------+
  376. | Python | JSON |
  377. +===================+===============+
  378. | dict | object |
  379. +-------------------+---------------+
  380. | list, tuple | array |
  381. +-------------------+---------------+
  382. | str | string |
  383. +-------------------+---------------+
  384. | int, float | number |
  385. +-------------------+---------------+
  386. | True | true |
  387. +-------------------+---------------+
  388. | False | false |
  389. +-------------------+---------------+
  390. | None | null |
  391. +-------------------+---------------+
  392. """
  393. _json_compatible_types = {dict, list, tuple, str, int, float, bool, type(None), bytes}
  394. def is_immutable(self):
  395. raise TypeError("%r objects are immutable" % self.__class__.__name__)
  396. def make_immutable(value, strict=True):
  397. value_type = type(value)
  398. if value_type is dict:
  399. return ImmutableDict(value)
  400. if value_type is list:
  401. return ImmutableList(value)
  402. if strict:
  403. if value_type not in _json_compatible_types:
  404. raise TypeError("Type {} can't be immutable.".format(value_type))
  405. return value
  406. class Immutable(metaclass=ABCMeta):
  407. @abstractmethod
  408. def mutable(self):
  409. pass
  410. class ImmutableList(Immutable, Sequence):
  411. """Makes a :class:`list` immutable."""
  412. __slots__ = ("_list", "_proxy")
  413. def __init__(self, list_value):
  414. if type(list_value) not in (list, ImmutableList):
  415. raise TypeError(f"{type(list_value)} object is not a list.")
  416. if isinstance(list_value, ImmutableList):
  417. list_value = list_value.mutable()
  418. self._list = list_value
  419. self._proxy = [None] * len(list_value)
  420. def __reduce_ex__(self, protocol):
  421. return type(self), (self._list,)
  422. def mutable(self):
  423. return self._list
  424. def __eq__(self, other):
  425. if isinstance(other, ImmutableList):
  426. other = other.mutable()
  427. return list.__eq__(self._list, other)
  428. def __ne__(self, other):
  429. if isinstance(other, ImmutableList):
  430. other = other.mutable()
  431. return list.__ne__(self._list, other)
  432. def __contains__(self, item):
  433. if isinstance(item, Immutable):
  434. item = item.mutable()
  435. return list.__contains__(self._list, item)
  436. def __getitem__(self, item):
  437. proxy = self._proxy[item]
  438. if proxy is None:
  439. proxy = self._proxy[item] = make_immutable(self._list[item])
  440. return proxy
  441. def __len__(self):
  442. return len(self._list)
  443. def __repr__(self):
  444. return "%s(%s)" % (self.__class__.__name__, list.__repr__(self._list))
  445. class ImmutableDict(Immutable, Mapping):
  446. """Makes a :class:`dict` immutable."""
  447. __slots__ = ("_dict", "_proxy")
  448. def __init__(self, dict_value):
  449. if type(dict_value) not in (dict, ImmutableDict):
  450. raise TypeError(f"{type(dict_value)} object is not a dict.")
  451. if isinstance(dict_value, ImmutableDict):
  452. dict_value = dict_value.mutable()
  453. self._dict = dict_value
  454. self._proxy = {}
  455. def __reduce_ex__(self, protocol):
  456. return type(self), (self._dict,)
  457. def mutable(self):
  458. return self._dict
  459. def get(self, key, default=None):
  460. try:
  461. return self[key]
  462. except KeyError:
  463. return make_immutable(default)
  464. def __eq__(self, other):
  465. if isinstance(other, ImmutableDict):
  466. other = other.mutable()
  467. return dict.__eq__(self._dict, other)
  468. def __ne__(self, other):
  469. if isinstance(other, ImmutableDict):
  470. other = other.mutable()
  471. return dict.__ne__(self._dict, other)
  472. def __contains__(self, item):
  473. if isinstance(item, Immutable):
  474. item = item.mutable()
  475. return dict.__contains__(self._dict, item)
  476. def __getitem__(self, item):
  477. proxy = self._proxy.get(item, None)
  478. if proxy is None:
  479. proxy = self._proxy[item] = make_immutable(self._dict[item])
  480. return proxy
  481. def __len__(self) -> int:
  482. return len(self._dict)
  483. def __iter__(self):
  484. if len(self._proxy) != len(self._dict):
  485. for key in self._dict.keys() - self._proxy.keys():
  486. self._proxy[key] = make_immutable(self._dict[key])
  487. return iter(self._proxy)
  488. def __repr__(self):
  489. return "%s(%s)" % (self.__class__.__name__, dict.__repr__(self._dict))
  490. # Register immutable types.
  491. for immutable_type in Immutable.__subclasses__():
  492. _json_compatible_types.add(immutable_type)
  493. def async_loop_forever(interval_seconds, cancellable=False):
  494. def _wrapper(coro):
  495. @functools.wraps(coro)
  496. async def _looper(*args, **kwargs):
  497. while True:
  498. try:
  499. await coro(*args, **kwargs)
  500. except asyncio.CancelledError as ex:
  501. if cancellable:
  502. logger.info(
  503. f"An async loop forever coroutine " f"is cancelled {coro}."
  504. )
  505. raise ex
  506. else:
  507. logger.exception(
  508. f"Can not cancel the async loop "
  509. f"forever coroutine {coro}."
  510. )
  511. except Exception:
  512. logger.exception(f"Error looping coroutine {coro}.")
  513. await asyncio.sleep(interval_seconds)
  514. return _looper
  515. return _wrapper
  516. def ray_client_address_to_api_server_url(address: str):
  517. """Convert a Ray Client address of a running Ray cluster to its API server URL.
  518. Args:
  519. address: The Ray Client address, e.g. "ray://my-cluster".
  520. Returns:
  521. str: The API server URL of the cluster, e.g. "http://<head-node-ip>:8265".
  522. """
  523. with ray.init(address=address) as client_context:
  524. dashboard_url = client_context.dashboard_url
  525. return f"http://{dashboard_url}"
  526. def ray_address_to_api_server_url(address: Optional[str]) -> str:
  527. """Parse a Ray cluster address into API server URL.
  528. When an address is provided, it will be used to query GCS for
  529. API server address from GCS, so a Ray cluster must be running.
  530. When an address is not provided, it will first try to auto-detect
  531. a running Ray instance, or look for local GCS process.
  532. Args:
  533. address: Ray cluster bootstrap address or Ray Client address.
  534. Could also be `auto`.
  535. Returns:
  536. API server HTTP URL.
  537. """
  538. address = services.canonicalize_bootstrap_address_or_die(address)
  539. gcs_client = GcsClient(address=address)
  540. ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
  541. api_server_url = ray._private.utils.internal_kv_get_with_retry(
  542. gcs_client,
  543. ray_constants.DASHBOARD_ADDRESS,
  544. namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
  545. num_retries=20,
  546. )
  547. if api_server_url is None:
  548. raise ValueError(
  549. (
  550. "Couldn't obtain the API server address from GCS. It is likely that "
  551. "the GCS server is down. Check gcs_server.[out | err] to see if it is "
  552. "still alive."
  553. )
  554. )
  555. api_server_url = f"http://{api_server_url.decode()}"
  556. return api_server_url
  557. def get_address_for_submission_client(address: Optional[str]) -> str:
  558. """Get Ray API server address from Ray bootstrap or Client address.
  559. If None, it will try to auto-detect a running Ray instance, or look
  560. for local GCS process.
  561. `address` is always overridden by the RAY_ADDRESS environment
  562. variable, just like the `address` argument in `ray.init()`.
  563. Args:
  564. address: Ray cluster bootstrap address or Ray Client address.
  565. Could also be "auto".
  566. Returns:
  567. API server HTTP URL, e.g. "http://<head-node-ip>:8265".
  568. """
  569. if api_server_address := os.environ.get(
  570. ray_constants.RAY_API_SERVER_ADDRESS_ENVIRONMENT_VARIABLE
  571. ):
  572. address = api_server_address
  573. logger.debug(f"Using RAY_API_SERVER_ADDRESS={address}")
  574. # Fall back to RAY_ADDRESS if RAY_API_SERVER_ADDRESS not set
  575. elif ray_address := os.environ.get(ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE):
  576. address = ray_address
  577. logger.debug(f"Using RAY_ADDRESS={address}")
  578. if address and "://" in address:
  579. module_string, _ = split_address(address)
  580. if module_string == "ray":
  581. logger.debug(
  582. f"Retrieving API server address from Ray Client address {address}..."
  583. )
  584. address = ray_client_address_to_api_server_url(address)
  585. else:
  586. # User specified a non-Ray-Client Ray cluster address.
  587. address = ray_address_to_api_server_url(address)
  588. logger.debug(f"Using API server address {address}.")
  589. return address
  590. def compose_state_message(
  591. death_reason: Optional[str], death_reason_message: Optional[str]
  592. ) -> Optional[str]:
  593. """Compose node state message based on death information.
  594. Args:
  595. death_reason: The reason of node death.
  596. This is a string representation of `gcs_pb2.NodeDeathInfo.Reason`.
  597. death_reason_message: The message of node death.
  598. This corresponds to `gcs_pb2.NodeDeathInfo.ReasonMessage`.
  599. """
  600. if death_reason == "EXPECTED_TERMINATION":
  601. state_message = "Expected termination"
  602. elif death_reason == "UNEXPECTED_TERMINATION":
  603. state_message = "Unexpected termination"
  604. elif death_reason == "AUTOSCALER_DRAIN_PREEMPTED":
  605. state_message = "Terminated due to preemption"
  606. elif death_reason == "AUTOSCALER_DRAIN_IDLE":
  607. state_message = "Terminated due to idle (no Ray activity)"
  608. else:
  609. state_message = None
  610. if death_reason_message:
  611. if state_message:
  612. state_message += f": {death_reason_message}"
  613. else:
  614. state_message = death_reason_message
  615. return state_message
  616. def close_logger_file_descriptor(logger_instance):
  617. for handler in logger_instance.handlers:
  618. handler.close()