| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490 |
- import time
- import uuid
- from typing import Dict, List, Optional, Set
- from ray.core.generated.instance_manager_pb2 import Instance, InstanceUpdateEvent
- class InstanceUtil:
- """
- A helper class to group updates and operations on an Instance object defined
- in instance_manager.proto
- """
- # Memoized reachable from sets, where the key is the instance status, and
- # the value is the set of instance status that is reachable from the key
- # instance status.
- _reachable_from: Optional[
- Dict["Instance.InstanceStatus", Set["Instance.InstanceStatus"]]
- ] = None
- @staticmethod
- def new_instance(
- instance_id: str,
- instance_type: str,
- status: Instance.InstanceStatus,
- details: str = "",
- ) -> Instance:
- """
- Returns a new instance with the given status.
- Args:
- instance_id: The instance id.
- instance_type: The instance type.
- status: The status of the new instance.
- details: The details of the status transition.
- """
- instance = Instance()
- instance.version = 0 # it will be populated by the underlying storage.
- instance.instance_id = instance_id
- instance.instance_type = instance_type
- instance.status = status
- InstanceUtil._record_status_transition(instance, status, details)
- return instance
- @staticmethod
- def random_instance_id() -> str:
- """
- Returns a random instance id.
- """
- return str(uuid.uuid4())
- @staticmethod
- def is_cloud_instance_allocated(instance_status: Instance.InstanceStatus) -> bool:
- """
- Returns True if the instance is in a status where there could exist
- a cloud instance allocated by the cloud provider.
- """
- assert instance_status != Instance.UNKNOWN
- return instance_status in {
- Instance.ALLOCATED,
- Instance.RAY_INSTALLING,
- Instance.RAY_RUNNING,
- Instance.RAY_STOPPING,
- Instance.RAY_STOP_REQUESTED,
- Instance.RAY_STOPPED,
- Instance.TERMINATING,
- Instance.RAY_INSTALL_FAILED,
- Instance.TERMINATION_FAILED,
- Instance.ALLOCATION_TIMEOUT,
- }
- @staticmethod
- def is_ray_running(instance_status: Instance.InstanceStatus) -> bool:
- """
- Returns True if the instance is in a status where the ray process is
- running on the cloud instance.
- i.e. RAY_RUNNING, RAY_STOP_REQUESTED, RAY_STOPPING
- """
- assert instance_status != Instance.UNKNOWN
- if instance_status in InstanceUtil.get_reachable_statuses(
- Instance.RAY_STOPPING
- ):
- return False
- if instance_status in InstanceUtil.get_reachable_statuses(Instance.RAY_RUNNING):
- return True
- return False
- @staticmethod
- def is_ray_pending(instance_status: Instance.InstanceStatus) -> bool:
- """
- Returns True if the instance is in a status where the ray process is
- pending to be started on the cloud instance.
- """
- assert instance_status != Instance.UNKNOWN
- # Not gonna be in a RAY_RUNNING status.
- if Instance.RAY_RUNNING not in InstanceUtil.get_reachable_statuses(
- instance_status
- ):
- return False
- # Already running ray.
- if instance_status in InstanceUtil.get_reachable_statuses(Instance.RAY_RUNNING):
- return False
- return True
- def is_ray_running_reachable(instance_status: Instance.InstanceStatus) -> bool:
- """
- Returns True if the instance is in a status where it may transition
- to RAY_RUNNING status.
- """
- return Instance.RAY_RUNNING in InstanceUtil.get_reachable_statuses(
- instance_status
- )
- @staticmethod
- def set_status(
- instance: Instance,
- new_instance_status: Instance.InstanceStatus,
- details: str = "",
- ) -> bool:
- """Transitions the instance to the new state.
- Args:
- instance: The instance to update.
- new_instance_status: The new status to transition to.
- details: The details of the transition.
- Returns:
- True if the status transition is successful, False otherwise.
- """
- if (
- new_instance_status
- not in InstanceUtil.get_valid_transitions()[instance.status]
- ):
- return False
- instance.status = new_instance_status
- InstanceUtil._record_status_transition(instance, new_instance_status, details)
- return True
- @staticmethod
- def _record_status_transition(
- instance: Instance, status: Instance.InstanceStatus, details: str
- ):
- """Records the status transition.
- Args:
- instance: The instance to update.
- status: The new status to transition to.
- """
- now_ns = time.time_ns()
- instance.status_history.append(
- Instance.StatusHistory(
- instance_status=status,
- timestamp_ns=now_ns,
- details=details,
- )
- )
- @staticmethod
- def has_timeout(instance: Instance, timeout_s: int) -> bool:
- """
- Returns True if the instance has been in the current status for more
- than the timeout_seconds.
- Args:
- instance: The instance to check.
- timeout_seconds: The timeout in seconds.
- Returns:
- True if the instance has been in the current status for more than
- the timeout_s seconds.
- """
- cur_status = instance.status
- status_times_ns = InstanceUtil.get_status_transition_times_ns(
- instance, select_instance_status=cur_status
- )
- assert len(status_times_ns) >= 1, (
- f"instance {instance.instance_id} has {len(status_times_ns)} "
- f"{Instance.InstanceStatus.Name(cur_status)} status"
- )
- status_time_ns = sorted(status_times_ns)[-1]
- if time.time_ns() - status_time_ns <= (timeout_s * 1e9):
- return False
- return True
- @staticmethod
- def get_valid_transitions() -> Dict[
- "Instance.InstanceStatus", Set["Instance.InstanceStatus"]
- ]:
- return {
- # This is the initial status of a new instance.
- Instance.QUEUED: {
- # Cloud provider requested to launch a node for the instance.
- # This happens when the a launch request is made to the node provider.
- Instance.REQUESTED,
- # Allocation request canceled before being requested.
- # This happens when max_workers config is reduced or other termination
- # triggers occur while the instance is still queued.
- Instance.TERMINATED,
- },
- # When in this status, a launch request to the node provider is made.
- Instance.REQUESTED: {
- # Cloud provider allocated a cloud instance for the instance.
- # This happens when the cloud instance first appears in the list of
- # running cloud instances from the cloud instance provider.
- Instance.ALLOCATED,
- # Retry the allocation, become queueing again.
- Instance.QUEUED,
- # Cloud provider fails to allocate one. Either as a timeout or
- # the launch request fails immediately.
- Instance.ALLOCATION_FAILED,
- },
- # When in this status, the cloud instance is allocated and running. This
- # happens when the cloud instance is present in node provider's list of
- # running cloud instances.
- Instance.ALLOCATED: {
- # Ray needs to be install and launch on the provisioned cloud instance.
- # This happens when the cloud instance is allocated, and the autoscaler
- # is responsible for installing and launching ray on the cloud instance.
- # For node provider that manages the ray installation and launching,
- # this state is skipped.
- Instance.RAY_INSTALLING,
- # Ray is already installed on the provisioned cloud
- # instance. It could be any valid ray status.
- Instance.RAY_RUNNING,
- # The cloud provider timed out for allocating running cloud instance.
- # The CloudResourceMonitor subscriber will lower this node-type's priority
- # in feature schedules.
- Instance.ALLOCATION_TIMEOUT,
- Instance.RAY_STOPPING,
- Instance.RAY_STOPPED,
- # Instance is requested to be stopped, e.g. instance leaked: no matching
- # Instance with the same type is found in the autoscaler's state.
- Instance.TERMINATING,
- # cloud instance somehow failed.
- Instance.TERMINATED,
- },
- # Ray process is being installed and started on the cloud instance.
- # This status is skipped for node provider that manages the ray
- # installation and launching. (e.g. Ray-on-Spark)
- Instance.RAY_INSTALLING: {
- # Ray installed and launched successfully, reported by the ray cluster.
- # Similar to the Instance.ALLOCATED -> Instance.RAY_RUNNING transition,
- # where the ray process is managed by the node provider.
- Instance.RAY_RUNNING,
- # Ray installation failed. This happens when the ray process failed to
- # be installed and started on the cloud instance.
- Instance.RAY_INSTALL_FAILED,
- # Wen the ray node is reported as stopped by the ray cluster.
- # This could happen that the ray process was stopped quickly after start
- # such that a ray running node wasn't discovered and the RAY_RUNNING
- # transition was skipped.
- Instance.RAY_STOPPED,
- # A cloud instance is being terminated (when the instance itself is no
- # longer needed, e.g. instance is outdated, autoscaler is scaling down)
- Instance.TERMINATING,
- # cloud instance somehow failed during the installation process.
- Instance.TERMINATED,
- },
- # Ray process is installed and running on the cloud instance. When in this
- # status, a ray node must be present in the ray cluster.
- Instance.RAY_RUNNING: {
- # Ray is requested to be stopped.
- Instance.RAY_STOP_REQUESTED,
- # Ray is stopping (currently draining),
- # e.g. idle termination.
- Instance.RAY_STOPPING,
- # Ray is already stopped, as reported by the ray cluster.
- Instance.RAY_STOPPED,
- # A cloud instance is being terminated (when the instance itself is no
- # longer needed, e.g. instance is outdated, autoscaler is scaling down)
- Instance.TERMINATING,
- # cloud instance somehow failed.
- Instance.TERMINATED,
- },
- # Ray process should be stopped on the cloud instance. The RayStopper
- # subscriber will listen to this status and stop the ray process.
- Instance.RAY_STOP_REQUESTED: {
- # Ray is stopping on the cloud instance.
- Instance.RAY_STOPPING,
- # Ray stopped already.
- Instance.RAY_STOPPED,
- # Ray stop request failed (e.g. idle node no longer idle),
- # ray is still running.
- Instance.RAY_RUNNING,
- # cloud instance somehow failed.
- Instance.TERMINATED,
- },
- # An instance has been allocated to a cloud instance, but the cloud
- # provider timed out for allocating running cloud instance, e.g. the
- # a kubernetes pod remains pending due to insufficient resources.
- Instance.ALLOCATION_TIMEOUT: {
- # Instance is requested to be stopped
- Instance.TERMINATING
- },
- # When in this status, the ray process is requested to be stopped to the
- # ray cluster, but not yet present in the dead ray node list reported by
- # the ray cluster.
- Instance.RAY_STOPPING: {
- # Ray is stopped, and the ray node is present in the dead ray node list
- # reported by the ray cluster.
- Instance.RAY_STOPPED,
- # A cloud instance is being terminated (when the instance itself is no
- # longer needed, e.g. instance is outdated, autoscaler is scaling down)
- Instance.TERMINATING,
- # cloud instance somehow failed.
- Instance.TERMINATED,
- },
- # When in this status, the ray process is stopped, and the ray node is
- # present in the dead ray node list reported by the ray cluster.
- Instance.RAY_STOPPED: {
- # A cloud instance is being terminated (when the instance itself is no
- # longer needed, e.g. instance is outdated, autoscaler is scaling down)
- Instance.TERMINATING,
- # cloud instance somehow failed.
- Instance.TERMINATED,
- },
- # When in this status, the cloud instance is requested to be stopped to
- # the node provider.
- Instance.TERMINATING: {
- # When a cloud instance no longer appears in the list of running cloud
- # instances from the node provider.
- Instance.TERMINATED,
- # When the cloud instance failed to be terminated.
- Instance.TERMINATION_FAILED,
- },
- # When in this status, the cloud instance failed to be terminated by the
- # node provider. We will keep retrying.
- Instance.TERMINATION_FAILED: {
- # Retry the termination, become terminating again.
- Instance.TERMINATING,
- },
- # An instance is marked as terminated when:
- # 1. A cloud instance disappears from the list of running cloud instances
- # from the node provider (follows from TERMINATING or other running states).
- # 2. An allocation request is canceled before cloud resources are allocated
- # (follows from QUEUED).
- # This is a terminal state.
- Instance.TERMINATED: set(), # Terminal state.
- # When in this status, the cloud instance failed to be allocated by the
- # node provider.
- Instance.ALLOCATION_FAILED: set(), # Terminal state.
- Instance.RAY_INSTALL_FAILED: {
- # Autoscaler requests to shutdown the instance when ray install failed.
- Instance.TERMINATING,
- # cloud instance somehow failed.
- Instance.TERMINATED,
- },
- # Initial state before the instance is created. Should never be used.
- Instance.UNKNOWN: set(),
- }
- @staticmethod
- def get_status_transitions(
- instance: Instance,
- select_instance_status: Optional["Instance.InstanceStatus"] = None,
- ) -> List["Instance.StatusHistory"]:
- """
- Returns the status history of the instance.
- Args:
- instance: The instance.
- select_instance_status: The go-to status to search for, i.e. select
- only status history when the instance transitions into the status.
- If None, returns all status updates.
- """
- history = []
- for status_update in instance.status_history:
- if (
- select_instance_status
- and status_update.instance_status != select_instance_status
- ):
- continue
- history.append(status_update)
- return history
- @staticmethod
- def get_last_status_transition(
- instance: Instance,
- select_instance_status: Optional["Instance.InstanceStatus"] = None,
- ) -> Optional["Instance.StatusHistory"]:
- """
- Returns the last status transition of the instance.
- Args:
- instance: The instance.
- instance_status: The status to search for. If None, returns the last
- status update.
- """
- history = InstanceUtil.get_status_transitions(instance, select_instance_status)
- history.sort(key=lambda x: x.timestamp_ns)
- if history:
- return history[-1]
- return None
- @staticmethod
- def get_status_transition_times_ns(
- instance: Instance,
- select_instance_status: Optional["Instance.InstanceStatus"] = None,
- ) -> List[int]:
- """
- Returns a list of timestamps of the instance status update.
- Args:
- instance: The instance.
- instance_status: The status to search for. If None, returns all
- status updates timestamps.
- Returns:
- The list of timestamps of the instance status updates.
- """
- return [
- e.timestamp_ns
- for e in InstanceUtil.get_status_transitions(
- instance, select_instance_status
- )
- ]
- @classmethod
- def get_reachable_statuses(
- cls,
- instance_status: Instance.InstanceStatus,
- ) -> Set["Instance.InstanceStatus"]:
- """
- Returns the set of instance status that is reachable from the given
- instance status following the status transitions.
- This method is memoized.
- Args:
- instance_status: The instance status to start from.
- Returns:
- The set of instance status that is reachable from the given instance
- status.
- """
- if cls._reachable_from is None:
- cls._compute_reachable()
- return cls._reachable_from[instance_status]
- @staticmethod
- def get_log_str_for_update(instance: Instance, update: InstanceUpdateEvent) -> str:
- """Returns a log string for the given instance update."""
- if update.upsert:
- return (
- f"New instance "
- f"{Instance.InstanceStatus.Name(update.new_instance_status)} (id="
- f"{instance.instance_id}, type={instance.instance_type}, "
- f"cloud_instance_id={instance.cloud_instance_id}, "
- f"ray_id={instance.node_id}): {update.details}"
- )
- return (
- f"Update instance "
- f"{Instance.InstanceStatus.Name(instance.status)}->"
- f"{Instance.InstanceStatus.Name(update.new_instance_status)} (id="
- f"{instance.instance_id}, type={instance.instance_type}, "
- f"cloud_instance_id={instance.cloud_instance_id}, "
- f"ray_id={instance.node_id}): {update.details}"
- )
- @classmethod
- def _compute_reachable(cls):
- """
- Computes and memorize the from status sets for each status machine with
- a DFS search.
- """
- valid_transitions = cls.get_valid_transitions()
- def dfs(graph, start, visited):
- """
- Regular DFS algorithm to find all reachable nodes from a given node.
- """
- for next_node in graph[start]:
- if next_node not in visited:
- # We delay adding the visited set here so we could capture
- # the self loop.
- visited.add(next_node)
- dfs(graph, next_node, visited)
- return visited
- # Initialize the graphs
- cls._reachable_from = {}
- for status in Instance.InstanceStatus.values():
- # All nodes reachable from 'start'
- visited = set()
- cls._reachable_from[status] = dfs(valid_transitions, status, visited)
|