common.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. import time
  2. import uuid
  3. from typing import Dict, List, Optional, Set
  4. from ray.core.generated.instance_manager_pb2 import Instance, InstanceUpdateEvent
  5. class InstanceUtil:
  6. """
  7. A helper class to group updates and operations on an Instance object defined
  8. in instance_manager.proto
  9. """
  10. # Memoized reachable from sets, where the key is the instance status, and
  11. # the value is the set of instance status that is reachable from the key
  12. # instance status.
  13. _reachable_from: Optional[
  14. Dict["Instance.InstanceStatus", Set["Instance.InstanceStatus"]]
  15. ] = None
  16. @staticmethod
  17. def new_instance(
  18. instance_id: str,
  19. instance_type: str,
  20. status: Instance.InstanceStatus,
  21. details: str = "",
  22. ) -> Instance:
  23. """
  24. Returns a new instance with the given status.
  25. Args:
  26. instance_id: The instance id.
  27. instance_type: The instance type.
  28. status: The status of the new instance.
  29. details: The details of the status transition.
  30. """
  31. instance = Instance()
  32. instance.version = 0 # it will be populated by the underlying storage.
  33. instance.instance_id = instance_id
  34. instance.instance_type = instance_type
  35. instance.status = status
  36. InstanceUtil._record_status_transition(instance, status, details)
  37. return instance
  38. @staticmethod
  39. def random_instance_id() -> str:
  40. """
  41. Returns a random instance id.
  42. """
  43. return str(uuid.uuid4())
  44. @staticmethod
  45. def is_cloud_instance_allocated(instance_status: Instance.InstanceStatus) -> bool:
  46. """
  47. Returns True if the instance is in a status where there could exist
  48. a cloud instance allocated by the cloud provider.
  49. """
  50. assert instance_status != Instance.UNKNOWN
  51. return instance_status in {
  52. Instance.ALLOCATED,
  53. Instance.RAY_INSTALLING,
  54. Instance.RAY_RUNNING,
  55. Instance.RAY_STOPPING,
  56. Instance.RAY_STOP_REQUESTED,
  57. Instance.RAY_STOPPED,
  58. Instance.TERMINATING,
  59. Instance.RAY_INSTALL_FAILED,
  60. Instance.TERMINATION_FAILED,
  61. Instance.ALLOCATION_TIMEOUT,
  62. }
  63. @staticmethod
  64. def is_ray_running(instance_status: Instance.InstanceStatus) -> bool:
  65. """
  66. Returns True if the instance is in a status where the ray process is
  67. running on the cloud instance.
  68. i.e. RAY_RUNNING, RAY_STOP_REQUESTED, RAY_STOPPING
  69. """
  70. assert instance_status != Instance.UNKNOWN
  71. if instance_status in InstanceUtil.get_reachable_statuses(
  72. Instance.RAY_STOPPING
  73. ):
  74. return False
  75. if instance_status in InstanceUtil.get_reachable_statuses(Instance.RAY_RUNNING):
  76. return True
  77. return False
  78. @staticmethod
  79. def is_ray_pending(instance_status: Instance.InstanceStatus) -> bool:
  80. """
  81. Returns True if the instance is in a status where the ray process is
  82. pending to be started on the cloud instance.
  83. """
  84. assert instance_status != Instance.UNKNOWN
  85. # Not gonna be in a RAY_RUNNING status.
  86. if Instance.RAY_RUNNING not in InstanceUtil.get_reachable_statuses(
  87. instance_status
  88. ):
  89. return False
  90. # Already running ray.
  91. if instance_status in InstanceUtil.get_reachable_statuses(Instance.RAY_RUNNING):
  92. return False
  93. return True
  94. def is_ray_running_reachable(instance_status: Instance.InstanceStatus) -> bool:
  95. """
  96. Returns True if the instance is in a status where it may transition
  97. to RAY_RUNNING status.
  98. """
  99. return Instance.RAY_RUNNING in InstanceUtil.get_reachable_statuses(
  100. instance_status
  101. )
  102. @staticmethod
  103. def set_status(
  104. instance: Instance,
  105. new_instance_status: Instance.InstanceStatus,
  106. details: str = "",
  107. ) -> bool:
  108. """Transitions the instance to the new state.
  109. Args:
  110. instance: The instance to update.
  111. new_instance_status: The new status to transition to.
  112. details: The details of the transition.
  113. Returns:
  114. True if the status transition is successful, False otherwise.
  115. """
  116. if (
  117. new_instance_status
  118. not in InstanceUtil.get_valid_transitions()[instance.status]
  119. ):
  120. return False
  121. instance.status = new_instance_status
  122. InstanceUtil._record_status_transition(instance, new_instance_status, details)
  123. return True
  124. @staticmethod
  125. def _record_status_transition(
  126. instance: Instance, status: Instance.InstanceStatus, details: str
  127. ):
  128. """Records the status transition.
  129. Args:
  130. instance: The instance to update.
  131. status: The new status to transition to.
  132. """
  133. now_ns = time.time_ns()
  134. instance.status_history.append(
  135. Instance.StatusHistory(
  136. instance_status=status,
  137. timestamp_ns=now_ns,
  138. details=details,
  139. )
  140. )
  141. @staticmethod
  142. def has_timeout(instance: Instance, timeout_s: int) -> bool:
  143. """
  144. Returns True if the instance has been in the current status for more
  145. than the timeout_seconds.
  146. Args:
  147. instance: The instance to check.
  148. timeout_seconds: The timeout in seconds.
  149. Returns:
  150. True if the instance has been in the current status for more than
  151. the timeout_s seconds.
  152. """
  153. cur_status = instance.status
  154. status_times_ns = InstanceUtil.get_status_transition_times_ns(
  155. instance, select_instance_status=cur_status
  156. )
  157. assert len(status_times_ns) >= 1, (
  158. f"instance {instance.instance_id} has {len(status_times_ns)} "
  159. f"{Instance.InstanceStatus.Name(cur_status)} status"
  160. )
  161. status_time_ns = sorted(status_times_ns)[-1]
  162. if time.time_ns() - status_time_ns <= (timeout_s * 1e9):
  163. return False
  164. return True
  165. @staticmethod
  166. def get_valid_transitions() -> Dict[
  167. "Instance.InstanceStatus", Set["Instance.InstanceStatus"]
  168. ]:
  169. return {
  170. # This is the initial status of a new instance.
  171. Instance.QUEUED: {
  172. # Cloud provider requested to launch a node for the instance.
  173. # This happens when the a launch request is made to the node provider.
  174. Instance.REQUESTED,
  175. # Allocation request canceled before being requested.
  176. # This happens when max_workers config is reduced or other termination
  177. # triggers occur while the instance is still queued.
  178. Instance.TERMINATED,
  179. },
  180. # When in this status, a launch request to the node provider is made.
  181. Instance.REQUESTED: {
  182. # Cloud provider allocated a cloud instance for the instance.
  183. # This happens when the cloud instance first appears in the list of
  184. # running cloud instances from the cloud instance provider.
  185. Instance.ALLOCATED,
  186. # Retry the allocation, become queueing again.
  187. Instance.QUEUED,
  188. # Cloud provider fails to allocate one. Either as a timeout or
  189. # the launch request fails immediately.
  190. Instance.ALLOCATION_FAILED,
  191. },
  192. # When in this status, the cloud instance is allocated and running. This
  193. # happens when the cloud instance is present in node provider's list of
  194. # running cloud instances.
  195. Instance.ALLOCATED: {
  196. # Ray needs to be install and launch on the provisioned cloud instance.
  197. # This happens when the cloud instance is allocated, and the autoscaler
  198. # is responsible for installing and launching ray on the cloud instance.
  199. # For node provider that manages the ray installation and launching,
  200. # this state is skipped.
  201. Instance.RAY_INSTALLING,
  202. # Ray is already installed on the provisioned cloud
  203. # instance. It could be any valid ray status.
  204. Instance.RAY_RUNNING,
  205. # The cloud provider timed out for allocating running cloud instance.
  206. # The CloudResourceMonitor subscriber will lower this node-type's priority
  207. # in feature schedules.
  208. Instance.ALLOCATION_TIMEOUT,
  209. Instance.RAY_STOPPING,
  210. Instance.RAY_STOPPED,
  211. # Instance is requested to be stopped, e.g. instance leaked: no matching
  212. # Instance with the same type is found in the autoscaler's state.
  213. Instance.TERMINATING,
  214. # cloud instance somehow failed.
  215. Instance.TERMINATED,
  216. },
  217. # Ray process is being installed and started on the cloud instance.
  218. # This status is skipped for node provider that manages the ray
  219. # installation and launching. (e.g. Ray-on-Spark)
  220. Instance.RAY_INSTALLING: {
  221. # Ray installed and launched successfully, reported by the ray cluster.
  222. # Similar to the Instance.ALLOCATED -> Instance.RAY_RUNNING transition,
  223. # where the ray process is managed by the node provider.
  224. Instance.RAY_RUNNING,
  225. # Ray installation failed. This happens when the ray process failed to
  226. # be installed and started on the cloud instance.
  227. Instance.RAY_INSTALL_FAILED,
  228. # Wen the ray node is reported as stopped by the ray cluster.
  229. # This could happen that the ray process was stopped quickly after start
  230. # such that a ray running node wasn't discovered and the RAY_RUNNING
  231. # transition was skipped.
  232. Instance.RAY_STOPPED,
  233. # A cloud instance is being terminated (when the instance itself is no
  234. # longer needed, e.g. instance is outdated, autoscaler is scaling down)
  235. Instance.TERMINATING,
  236. # cloud instance somehow failed during the installation process.
  237. Instance.TERMINATED,
  238. },
  239. # Ray process is installed and running on the cloud instance. When in this
  240. # status, a ray node must be present in the ray cluster.
  241. Instance.RAY_RUNNING: {
  242. # Ray is requested to be stopped.
  243. Instance.RAY_STOP_REQUESTED,
  244. # Ray is stopping (currently draining),
  245. # e.g. idle termination.
  246. Instance.RAY_STOPPING,
  247. # Ray is already stopped, as reported by the ray cluster.
  248. Instance.RAY_STOPPED,
  249. # A cloud instance is being terminated (when the instance itself is no
  250. # longer needed, e.g. instance is outdated, autoscaler is scaling down)
  251. Instance.TERMINATING,
  252. # cloud instance somehow failed.
  253. Instance.TERMINATED,
  254. },
  255. # Ray process should be stopped on the cloud instance. The RayStopper
  256. # subscriber will listen to this status and stop the ray process.
  257. Instance.RAY_STOP_REQUESTED: {
  258. # Ray is stopping on the cloud instance.
  259. Instance.RAY_STOPPING,
  260. # Ray stopped already.
  261. Instance.RAY_STOPPED,
  262. # Ray stop request failed (e.g. idle node no longer idle),
  263. # ray is still running.
  264. Instance.RAY_RUNNING,
  265. # cloud instance somehow failed.
  266. Instance.TERMINATED,
  267. },
  268. # An instance has been allocated to a cloud instance, but the cloud
  269. # provider timed out for allocating running cloud instance, e.g. the
  270. # a kubernetes pod remains pending due to insufficient resources.
  271. Instance.ALLOCATION_TIMEOUT: {
  272. # Instance is requested to be stopped
  273. Instance.TERMINATING
  274. },
  275. # When in this status, the ray process is requested to be stopped to the
  276. # ray cluster, but not yet present in the dead ray node list reported by
  277. # the ray cluster.
  278. Instance.RAY_STOPPING: {
  279. # Ray is stopped, and the ray node is present in the dead ray node list
  280. # reported by the ray cluster.
  281. Instance.RAY_STOPPED,
  282. # A cloud instance is being terminated (when the instance itself is no
  283. # longer needed, e.g. instance is outdated, autoscaler is scaling down)
  284. Instance.TERMINATING,
  285. # cloud instance somehow failed.
  286. Instance.TERMINATED,
  287. },
  288. # When in this status, the ray process is stopped, and the ray node is
  289. # present in the dead ray node list reported by the ray cluster.
  290. Instance.RAY_STOPPED: {
  291. # A cloud instance is being terminated (when the instance itself is no
  292. # longer needed, e.g. instance is outdated, autoscaler is scaling down)
  293. Instance.TERMINATING,
  294. # cloud instance somehow failed.
  295. Instance.TERMINATED,
  296. },
  297. # When in this status, the cloud instance is requested to be stopped to
  298. # the node provider.
  299. Instance.TERMINATING: {
  300. # When a cloud instance no longer appears in the list of running cloud
  301. # instances from the node provider.
  302. Instance.TERMINATED,
  303. # When the cloud instance failed to be terminated.
  304. Instance.TERMINATION_FAILED,
  305. },
  306. # When in this status, the cloud instance failed to be terminated by the
  307. # node provider. We will keep retrying.
  308. Instance.TERMINATION_FAILED: {
  309. # Retry the termination, become terminating again.
  310. Instance.TERMINATING,
  311. },
  312. # An instance is marked as terminated when:
  313. # 1. A cloud instance disappears from the list of running cloud instances
  314. # from the node provider (follows from TERMINATING or other running states).
  315. # 2. An allocation request is canceled before cloud resources are allocated
  316. # (follows from QUEUED).
  317. # This is a terminal state.
  318. Instance.TERMINATED: set(), # Terminal state.
  319. # When in this status, the cloud instance failed to be allocated by the
  320. # node provider.
  321. Instance.ALLOCATION_FAILED: set(), # Terminal state.
  322. Instance.RAY_INSTALL_FAILED: {
  323. # Autoscaler requests to shutdown the instance when ray install failed.
  324. Instance.TERMINATING,
  325. # cloud instance somehow failed.
  326. Instance.TERMINATED,
  327. },
  328. # Initial state before the instance is created. Should never be used.
  329. Instance.UNKNOWN: set(),
  330. }
  331. @staticmethod
  332. def get_status_transitions(
  333. instance: Instance,
  334. select_instance_status: Optional["Instance.InstanceStatus"] = None,
  335. ) -> List["Instance.StatusHistory"]:
  336. """
  337. Returns the status history of the instance.
  338. Args:
  339. instance: The instance.
  340. select_instance_status: The go-to status to search for, i.e. select
  341. only status history when the instance transitions into the status.
  342. If None, returns all status updates.
  343. """
  344. history = []
  345. for status_update in instance.status_history:
  346. if (
  347. select_instance_status
  348. and status_update.instance_status != select_instance_status
  349. ):
  350. continue
  351. history.append(status_update)
  352. return history
  353. @staticmethod
  354. def get_last_status_transition(
  355. instance: Instance,
  356. select_instance_status: Optional["Instance.InstanceStatus"] = None,
  357. ) -> Optional["Instance.StatusHistory"]:
  358. """
  359. Returns the last status transition of the instance.
  360. Args:
  361. instance: The instance.
  362. instance_status: The status to search for. If None, returns the last
  363. status update.
  364. """
  365. history = InstanceUtil.get_status_transitions(instance, select_instance_status)
  366. history.sort(key=lambda x: x.timestamp_ns)
  367. if history:
  368. return history[-1]
  369. return None
  370. @staticmethod
  371. def get_status_transition_times_ns(
  372. instance: Instance,
  373. select_instance_status: Optional["Instance.InstanceStatus"] = None,
  374. ) -> List[int]:
  375. """
  376. Returns a list of timestamps of the instance status update.
  377. Args:
  378. instance: The instance.
  379. instance_status: The status to search for. If None, returns all
  380. status updates timestamps.
  381. Returns:
  382. The list of timestamps of the instance status updates.
  383. """
  384. return [
  385. e.timestamp_ns
  386. for e in InstanceUtil.get_status_transitions(
  387. instance, select_instance_status
  388. )
  389. ]
  390. @classmethod
  391. def get_reachable_statuses(
  392. cls,
  393. instance_status: Instance.InstanceStatus,
  394. ) -> Set["Instance.InstanceStatus"]:
  395. """
  396. Returns the set of instance status that is reachable from the given
  397. instance status following the status transitions.
  398. This method is memoized.
  399. Args:
  400. instance_status: The instance status to start from.
  401. Returns:
  402. The set of instance status that is reachable from the given instance
  403. status.
  404. """
  405. if cls._reachable_from is None:
  406. cls._compute_reachable()
  407. return cls._reachable_from[instance_status]
  408. @staticmethod
  409. def get_log_str_for_update(instance: Instance, update: InstanceUpdateEvent) -> str:
  410. """Returns a log string for the given instance update."""
  411. if update.upsert:
  412. return (
  413. f"New instance "
  414. f"{Instance.InstanceStatus.Name(update.new_instance_status)} (id="
  415. f"{instance.instance_id}, type={instance.instance_type}, "
  416. f"cloud_instance_id={instance.cloud_instance_id}, "
  417. f"ray_id={instance.node_id}): {update.details}"
  418. )
  419. return (
  420. f"Update instance "
  421. f"{Instance.InstanceStatus.Name(instance.status)}->"
  422. f"{Instance.InstanceStatus.Name(update.new_instance_status)} (id="
  423. f"{instance.instance_id}, type={instance.instance_type}, "
  424. f"cloud_instance_id={instance.cloud_instance_id}, "
  425. f"ray_id={instance.node_id}): {update.details}"
  426. )
  427. @classmethod
  428. def _compute_reachable(cls):
  429. """
  430. Computes and memorize the from status sets for each status machine with
  431. a DFS search.
  432. """
  433. valid_transitions = cls.get_valid_transitions()
  434. def dfs(graph, start, visited):
  435. """
  436. Regular DFS algorithm to find all reachable nodes from a given node.
  437. """
  438. for next_node in graph[start]:
  439. if next_node not in visited:
  440. # We delay adding the visited set here so we could capture
  441. # the self loop.
  442. visited.add(next_node)
  443. dfs(graph, next_node, visited)
  444. return visited
  445. # Initialize the graphs
  446. cls._reachable_from = {}
  447. for status in Instance.InstanceStatus.values():
  448. # All nodes reachable from 'start'
  449. visited = set()
  450. cls._reachable_from[status] = dfs(valid_transitions, status, visited)