| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637 |
- import logging
- import math
- import time
- import uuid
- from collections import defaultdict
- from typing import Dict, List, Optional, Set, Tuple
- from ray._common.utils import binary_to_hex
- from ray.autoscaler.v2.instance_manager.common import InstanceUtil
- from ray.autoscaler.v2.instance_manager.config import (
- AutoscalingConfig,
- InstanceReconcileConfig,
- Provider,
- )
- from ray.autoscaler.v2.instance_manager.instance_manager import InstanceManager
- from ray.autoscaler.v2.instance_manager.node_provider import (
- CloudInstance,
- CloudInstanceId,
- CloudInstanceProviderError,
- ICloudInstanceProvider,
- LaunchNodeError,
- TerminateNodeError,
- )
- from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import (
- CloudResourceMonitor,
- )
- from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopError
- from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import (
- RayInstallError,
- )
- from ray.autoscaler.v2.metrics_reporter import AutoscalerMetricsReporter
- from ray.autoscaler.v2.scheduler import IResourceScheduler, SchedulingRequest
- from ray.autoscaler.v2.schema import AutoscalerInstance, NodeType
- from ray.autoscaler.v2.utils import is_head_node
- from ray.core.generated.autoscaler_pb2 import (
- AutoscalingState,
- ClusterResourceState,
- FailedInstanceRequest,
- NodeState,
- NodeStatus,
- PendingInstance,
- PendingInstanceRequest,
- )
- from ray.core.generated.instance_manager_pb2 import (
- GetInstanceManagerStateRequest,
- Instance as IMInstance,
- InstanceUpdateEvent as IMInstanceUpdateEvent,
- NodeKind,
- StatusCode,
- UpdateInstanceManagerStateRequest,
- )
- logger = logging.getLogger(__name__)
- class Reconciler:
- """
- A singleton class that reconciles the instance states of the instance manager
- for autoscaler.
- """
- @staticmethod
- def reconcile(
- instance_manager: InstanceManager,
- scheduler: IResourceScheduler,
- cloud_provider: ICloudInstanceProvider,
- cloud_resource_monitor: CloudResourceMonitor,
- ray_cluster_resource_state: ClusterResourceState,
- non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
- autoscaling_config: AutoscalingConfig,
- cloud_provider_errors: Optional[List[CloudInstanceProviderError]] = None,
- ray_install_errors: Optional[List[RayInstallError]] = None,
- ray_stop_errors: Optional[List[RayStopError]] = None,
- metrics_reporter: Optional[AutoscalerMetricsReporter] = None,
- _logger: Optional[logging.Logger] = None,
- ) -> AutoscalingState:
- """
- The reconcile method computes InstanceUpdateEvents for the instance manager
- by:
- 1. Reconciling the instance manager's instances with external states like
- the cloud provider's, the ray cluster's states, the ray installer's results.
- It performs "passive" status transitions for the instances (where the status
- transition should only be reflecting the external states of the cloud provider
- and the ray cluster, and should not be actively changing them)
- 2. Stepping the instances to the active states by computing instance status
- transitions that are needed and updating the instance manager's state.
- These transitions should be "active" where the transitions have side effects
- (through InstanceStatusSubscriber) to the cloud provider and the ray cluster.
- Args:
- instance_manager: The instance manager to reconcile.
- cloud_resource_monitor: The cloud resource monitor for monitoring resource
- availability of all node types.
- ray_cluster_resource_state: The ray cluster's resource state.
- non_terminated_cloud_instances: The non-terminated cloud instances from
- the cloud provider.
- cloud_provider_errors: The errors from the cloud provider.
- ray_install_errors: The errors from RayInstaller.
- ray_stop_errors: The errors from RayStopper.
- metrics_reporter: The metric reporter to report the autoscaler metrics.
- _logger: The logger (for testing).
- """
- cloud_provider_errors = cloud_provider_errors or []
- ray_install_errors = ray_install_errors or []
- ray_stop_errors = ray_stop_errors or []
- autoscaling_state = AutoscalingState()
- autoscaling_state.last_seen_cluster_resource_state_version = (
- ray_cluster_resource_state.cluster_resource_state_version
- )
- Reconciler._sync_from(
- instance_manager=instance_manager,
- ray_nodes=ray_cluster_resource_state.node_states,
- non_terminated_cloud_instances=non_terminated_cloud_instances,
- cloud_provider_errors=cloud_provider_errors,
- ray_install_errors=ray_install_errors,
- ray_stop_errors=ray_stop_errors,
- autoscaling_config=autoscaling_config,
- )
- Reconciler._step_next(
- autoscaling_state=autoscaling_state,
- instance_manager=instance_manager,
- scheduler=scheduler,
- cloud_provider=cloud_provider,
- cloud_resource_monitor=cloud_resource_monitor,
- ray_cluster_resource_state=ray_cluster_resource_state,
- non_terminated_cloud_instances=non_terminated_cloud_instances,
- autoscaling_config=autoscaling_config,
- _logger=_logger,
- )
- Reconciler._report_metrics(
- instance_manager=instance_manager,
- autoscaling_config=autoscaling_config,
- metrics_reporter=metrics_reporter,
- )
- return autoscaling_state
- @staticmethod
- def _sync_from(
- instance_manager: InstanceManager,
- ray_nodes: List[NodeState],
- non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
- cloud_provider_errors: List[CloudInstanceProviderError],
- ray_install_errors: List[RayInstallError],
- ray_stop_errors: List[RayStopError],
- autoscaling_config: AutoscalingConfig,
- ):
- """
- Reconcile the instance states of the instance manager from external states like
- the cloud provider's, the ray cluster's states, the ray installer's results,
- etc.
- For each instance, we try to figure out if we need to transition the instance
- status to a new status, and if so, what the new status should be.
- These transitions should be purely "passive", meaning they should only be
- reflecting the external states of the cloud provider and the ray cluster,
- and should not be actively changing the states of the cloud provider or the ray
- cluster.
- More specifically, we will reconcile status transitions for:
- 1. QUEUED/REQUESTED -> ALLOCATED:
- When an instance with launch request id (indicating a previous launch
- request was made) could be assigned to an unassigned cloud instance
- of the same instance type.
- 2. REQUESTED -> ALLOCATION_FAILED:
- When there's an error from the cloud provider for launch failure so
- that the instance becomes ALLOCATION_FAILED.
- 3. ALLOCATED -> ALLOCATION_TIMEOUT:
- When an instance has been allocated to a cloud instance, but is stuck in
- this state. For example, a kubernetes pod remains pending due to
- insufficient resources.
- 4. * -> RAY_RUNNING:
- When a ray node on a cloud instance joins the ray cluster, we will
- transition the instance to RAY_RUNNING.
- 5. * -> TERMINATED:
- When the cloud instance is already terminated, we will transition the
- instance to TERMINATED.
- 6. TERMINATING -> TERMINATION_FAILED:
- When there's an error from the cloud provider for termination failure.
- 7. * -> RAY_STOPPED:
- When ray was stopped on the cloud instance, we will transition the
- instance to RAY_STOPPED.
- 8. * -> RAY_INSTALL_FAILED:
- When there's an error from RayInstaller.
- 9. RAY_STOP_REQUESTED -> RAY_RUNNING:
- When requested to stop ray, but failed to stop/drain the ray node
- (e.g. idle termination drain rejected by the node).
- Args:
- instance_manager: The instance manager to reconcile.
- ray_nodes: The ray cluster's states of ray nodes.
- non_terminated_cloud_instances: The non-terminated cloud instances from
- the cloud provider.
- cloud_provider_errors: The errors from the cloud provider.
- ray_install_errors: The errors from RayInstaller.
- ray_stop_errors: The errors from RayStopper.
- """
- # Handle 1 & 2 for cloud instance allocation.
- Reconciler._handle_cloud_instance_allocation(
- instance_manager,
- non_terminated_cloud_instances,
- cloud_provider_errors,
- )
- Reconciler._handle_cloud_instance_terminated(
- instance_manager, non_terminated_cloud_instances
- )
- Reconciler._handle_cloud_instance_termination_errors(
- instance_manager, cloud_provider_errors
- )
- Reconciler._handle_extra_cloud_instances(
- instance_manager, non_terminated_cloud_instances, ray_nodes
- )
- Reconciler._handle_ray_status_transition(
- instance_manager, ray_nodes, autoscaling_config
- )
- Reconciler._handle_ray_install_failed(instance_manager, ray_install_errors)
- Reconciler._handle_ray_stop_failed(instance_manager, ray_stop_errors, ray_nodes)
- @staticmethod
- def _step_next(
- autoscaling_state: AutoscalingState,
- instance_manager: InstanceManager,
- scheduler: IResourceScheduler,
- cloud_provider: ICloudInstanceProvider,
- cloud_resource_monitor: CloudResourceMonitor,
- ray_cluster_resource_state: ClusterResourceState,
- non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
- autoscaling_config: AutoscalingConfig,
- _logger: Optional[logging.Logger] = None,
- ):
- """
- Step the reconciler to the next state by computing instance status transitions
- that are needed and updating the instance manager's state.
- Specifically, we will:
- 1. Shut down leak cloud instances
- Leaked cloud instances that are not managed by the instance manager.
- 2. Terminating instances with ray stopped or ray install failure.
- 3. Scale down the cluster:
- (* -> RAY_STOP_REQUESTED/TERMINATING)
- b. Extra cloud due to max nodes config.
- c. Cloud instances with outdated configs.
- 4. Scale up the cluster:
- (new QUEUED)
- Create new instances based on the IResourceScheduler's decision for
- scaling up.
- 5. Request cloud provider to launch new instances.
- (QUEUED -> REQUESTED)
- 6. Install ray
- (ALLOCATED -> RAY_INSTALLING)
- When ray could be installed and launched.
- 7. Handle any stuck instances with timeouts.
- Args:
- instance_manager: The instance manager to reconcile.
- scheduler: The resource scheduler to make scaling decisions.
- cloud_resource_monitor: The cloud resource monitor for monitoring resource
- availability of all node types.
- ray_cluster_resource_state: The ray cluster's resource state.
- non_terminated_cloud_instances: The non-terminated cloud instances from
- the cloud provider.
- autoscaling_config: The autoscaling config.
- _logger: The logger (for testing).
- """
- Reconciler._handle_stuck_instances(
- instance_manager=instance_manager,
- reconcile_config=autoscaling_config.get_instance_reconcile_config(),
- _logger=_logger or logger,
- )
- Reconciler._scale_cluster(
- autoscaling_state=autoscaling_state,
- instance_manager=instance_manager,
- cloud_resource_monitor=cloud_resource_monitor,
- ray_state=ray_cluster_resource_state,
- scheduler=scheduler,
- autoscaling_config=autoscaling_config,
- )
- Reconciler._handle_instances_launch(
- instance_manager=instance_manager, autoscaling_config=autoscaling_config
- )
- Reconciler._terminate_instances(instance_manager=instance_manager)
- if not autoscaling_config.disable_node_updaters():
- Reconciler._install_ray(
- instance_manager=instance_manager,
- non_terminated_cloud_instances=non_terminated_cloud_instances,
- )
- Reconciler._fill_autoscaling_state(
- instance_manager=instance_manager, autoscaling_state=autoscaling_state
- )
- #######################################################
- # Utility methods for reconciling instance states.
- #######################################################
- @staticmethod
- def _handle_cloud_instance_allocation(
- instance_manager: InstanceManager,
- non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
- cloud_provider_errors: List[CloudInstanceProviderError],
- ):
- im_instances, version = Reconciler._get_im_instances(instance_manager)
- updates = {}
- # Compute intermediate states.
- instances_with_launch_requests: List[IMInstance] = []
- for instance in im_instances:
- if instance.status != IMInstance.REQUESTED:
- continue
- assert (
- instance.launch_request_id
- ), "Instance in REQUESTED status should have launch_request_id set."
- instances_with_launch_requests.append(instance)
- assigned_cloud_instance_ids: Set[CloudInstanceId] = {
- instance.cloud_instance_id
- for instance in im_instances
- if instance.cloud_instance_id
- and instance.status
- not in [IMInstance.TERMINATED, IMInstance.ALLOCATION_FAILED]
- }
- launch_errors: Dict[str, LaunchNodeError] = {
- error.request_id: error
- for error in cloud_provider_errors
- if isinstance(error, LaunchNodeError)
- }
- unassigned_cloud_instances_by_type: Dict[
- str, List[CloudInstance]
- ] = defaultdict(list)
- for cloud_instance_id, cloud_instance in non_terminated_cloud_instances.items():
- if cloud_instance_id not in assigned_cloud_instance_ids:
- unassigned_cloud_instances_by_type[cloud_instance.node_type].append(
- cloud_instance
- )
- # Sort the request instance by the increasing request time.
- instances_with_launch_requests.sort(
- key=lambda instance: InstanceUtil.get_status_transition_times_ns(
- instance, IMInstance.REQUESTED
- )
- )
- # For each instance, try to allocate or fail the allocation.
- for instance in instances_with_launch_requests:
- # Try allocate or fail with errors.
- update_event = Reconciler._try_resolve_pending_allocation(
- instance, unassigned_cloud_instances_by_type, launch_errors
- )
- if not update_event:
- continue
- updates[instance.instance_id] = update_event
- # Update the instance manager for the events.
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _try_resolve_pending_allocation(
- im_instance: IMInstance,
- unassigned_cloud_instances_by_type: Dict[str, List[CloudInstance]],
- launch_errors: Dict[str, LaunchNodeError],
- ) -> Optional[IMInstanceUpdateEvent]:
- """
- Allocate, or fail the cloud instance allocation for the instance.
- Args:
- im_instance: The instance to allocate or fail.
- unassigned_cloud_instances_by_type: The unassigned cloud instances by type.
- launch_errors: The launch errors from the cloud provider.
- Returns:
- Instance update to ALLOCATED: if there's a matching unassigned cloud
- instance with the same type.
- Instance update to ALLOCATION_FAILED: if the instance allocation failed
- with errors.
- None: if there's no update.
- """
- unassigned_cloud_instance = None
- # Try to allocate an unassigned cloud instance.
- # TODO(rickyx): We could also look at the launch request id
- # on the cloud node and the im instance later once all node providers
- # support request id. For now, we only look at the instance type.
- if len(unassigned_cloud_instances_by_type.get(im_instance.instance_type, [])):
- unassigned_cloud_instance = unassigned_cloud_instances_by_type[
- im_instance.instance_type
- ].pop()
- if unassigned_cloud_instance:
- return IMInstanceUpdateEvent(
- instance_id=im_instance.instance_id,
- new_instance_status=IMInstance.ALLOCATED,
- cloud_instance_id=unassigned_cloud_instance.cloud_instance_id,
- node_kind=unassigned_cloud_instance.node_kind,
- instance_type=unassigned_cloud_instance.node_type,
- details=(
- "allocated unassigned cloud instance "
- f"{unassigned_cloud_instance.cloud_instance_id}"
- ),
- )
- # If there's a launch error, transition to ALLOCATION_FAILED.
- launch_error = launch_errors.get(im_instance.launch_request_id)
- if launch_error and launch_error.node_type == im_instance.instance_type:
- return IMInstanceUpdateEvent(
- instance_id=im_instance.instance_id,
- new_instance_status=IMInstance.ALLOCATION_FAILED,
- details=f"launch failed with {str(launch_error)}",
- )
- # No update.
- return None
- @staticmethod
- def _handle_ray_stop_failed(
- instance_manager: InstanceManager,
- ray_stop_errors: List[RayStopError],
- ray_nodes: List[NodeState],
- ):
- """
- The instance requested to stop ray, but failed to stop/drain the ray node.
- E.g. connection errors, idle termination drain rejected by the node.
- We will transition the instance back to RAY_RUNNING.
- Args:
- instance_manager: The instance manager to reconcile.
- ray_stop_errors: The errors from RayStopper.
- """
- instances, version = Reconciler._get_im_instances(instance_manager)
- updates = {}
- ray_stop_errors_by_instance_id = {
- error.im_instance_id: error for error in ray_stop_errors
- }
- ray_nodes_by_ray_node_id = {binary_to_hex(n.node_id): n for n in ray_nodes}
- ray_stop_requested_instances = {
- instance.instance_id: instance
- for instance in instances
- if instance.status == IMInstance.RAY_STOP_REQUESTED
- }
- for instance_id, instance in ray_stop_requested_instances.items():
- stop_error = ray_stop_errors_by_instance_id.get(instance_id)
- if not stop_error:
- continue
- assert instance.node_id
- ray_node = ray_nodes_by_ray_node_id.get(instance.node_id)
- assert ray_node is not None and ray_node.status in [
- NodeStatus.RUNNING,
- NodeStatus.IDLE,
- ], (
- "There should be a running ray node for instance with ray stop "
- "requested failed."
- )
- updates[instance_id] = IMInstanceUpdateEvent(
- instance_id=instance_id,
- new_instance_status=IMInstance.RAY_RUNNING,
- details="failed to stop/drain ray",
- ray_node_id=instance.node_id,
- )
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _handle_ray_install_failed(
- instance_manager: InstanceManager, ray_install_errors: List[RayInstallError]
- ):
- instances, version = Reconciler._get_im_instances(instance_manager)
- updates = {}
- # Get all instances with RAY_INSTALLING status.
- instances_with_ray_installing = {
- instance.instance_id: instance
- for instance in instances
- if instance.status == IMInstance.RAY_INSTALLING
- }
- install_errors = {error.im_instance_id: error for error in ray_install_errors}
- # For each instance with RAY_INSTALLING status, check if there's any
- # install error.
- for instance_id, instance in instances_with_ray_installing.items():
- install_error = install_errors.get(instance_id)
- if install_error:
- updates[instance_id] = IMInstanceUpdateEvent(
- instance_id=instance_id,
- new_instance_status=IMInstance.RAY_INSTALL_FAILED,
- details=(
- f"failed to install ray with errors: {install_error.details}"
- ),
- )
- # Update the instance manager for the events.
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _handle_cloud_instance_terminated(
- instance_manager: InstanceManager,
- non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
- ):
- """
- For any IM (instance manager) instance with a cloud node id, if the mapped
- cloud instance is no longer running, transition the instance to TERMINATED.
- Args:
- instance_manager: The instance manager to reconcile.
- non_terminated_cloud_instances: The non-terminated cloud instances from
- the cloud provider.
- """
- updates = {}
- instances, version = Reconciler._get_im_instances(instance_manager)
- non_terminated_instances_with_cloud_instance_assigned = {
- instance.cloud_instance_id: instance
- for instance in instances
- if instance.cloud_instance_id and instance.status != IMInstance.TERMINATED
- }
- for (
- cloud_instance_id,
- instance,
- ) in non_terminated_instances_with_cloud_instance_assigned.items():
- if cloud_instance_id in non_terminated_cloud_instances.keys():
- # The cloud instance is still running.
- continue
- # The cloud instance is terminated.
- updates[instance.instance_id] = IMInstanceUpdateEvent(
- instance_id=instance.instance_id,
- new_instance_status=IMInstance.TERMINATED,
- details=f"cloud instance {cloud_instance_id} no longer found",
- )
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _handle_cloud_instance_termination_errors(
- instance_manager: InstanceManager,
- cloud_provider_errors: List[CloudInstanceProviderError],
- ):
- """
- If any TERMINATING instances have termination errors, transition the instance to
- TERMINATION_FAILED.
- We will retry the termination for the TERMINATION_FAILED instances in the next
- reconciler step.
- Args:
- instance_manager: The instance manager to reconcile.
- cloud_provider_errors: The errors from the cloud provider.
- """
- instances, version = Reconciler._get_im_instances(instance_manager)
- updates = {}
- termination_errors = {
- error.cloud_instance_id: error
- for error in cloud_provider_errors
- if isinstance(error, TerminateNodeError)
- }
- terminating_instances_by_cloud_instance_id = {
- instance.cloud_instance_id: instance
- for instance in instances
- if instance.status == IMInstance.TERMINATING
- }
- for cloud_instance_id, failure in termination_errors.items():
- instance = terminating_instances_by_cloud_instance_id.get(cloud_instance_id)
- if not instance:
- # The instance is no longer in TERMINATING status.
- continue
- updates[instance.instance_id] = IMInstanceUpdateEvent(
- instance_id=instance.instance_id,
- new_instance_status=IMInstance.TERMINATION_FAILED,
- details=f"termination failed: {str(failure)}",
- )
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _get_im_instances(
- instance_manager: InstanceManager,
- ) -> Tuple[List[IMInstance], int]:
- reply = instance_manager.get_instance_manager_state(
- request=GetInstanceManagerStateRequest()
- )
- assert reply.status.code == StatusCode.OK
- im_state = reply.state
- return im_state.instances, im_state.version
- @staticmethod
- def _update_instance_manager(
- instance_manager: InstanceManager,
- version: int,
- updates: Dict[str, IMInstanceUpdateEvent],
- ) -> None:
- if not updates:
- return
- updates = list(updates.values()) or []
- reply = instance_manager.update_instance_manager_state(
- request=UpdateInstanceManagerStateRequest(
- expected_version=version,
- updates=updates,
- )
- )
- # TODO: While it's possible that a version mismatch
- # happens, or some other failures could happen. But given
- # the current implementation:
- # 1. There's only 1 writer (the reconciler) for updating the instance
- # manager states, so there shouldn't be version mismatch.
- # 2. Any failures in one reconciler step should be caught at a higher
- # level and be retried in the next reconciler step. If the IM
- # fails to be updated, we don't have sufficient info to handle it
- # here.
- assert (
- reply.status.code == StatusCode.OK
- ), f"Failed to update instance manager: {reply}"
- @staticmethod
- def _handle_ray_status_transition(
- instance_manager: InstanceManager,
- ray_nodes: List[NodeState],
- autoscaling_config: AutoscalingConfig,
- ):
- """
- Handle the ray status transition for the instance manager.
- If a new ray node running on the instance, transition it to RAY_RUNNING.
- If a ray node stopped, transition it to RAY_STOPPED.
- If a ray node is draining, transition it to RAY_STOPPING.
- Args:
- instance_manager: The instance manager to reconcile.
- ray_nodes: The ray cluster's states of ray nodes.
- """
- instances, version = Reconciler._get_im_instances(instance_manager)
- updates = {}
- im_instances_by_cloud_instance_id = {
- instance.cloud_instance_id: instance
- for instance in instances
- if instance.cloud_instance_id
- and instance.status
- not in [IMInstance.TERMINATED, IMInstance.ALLOCATION_FAILED]
- }
- im_instances_by_ray_node_id = {
- instance.node_id: instance for instance in instances if instance.node_id
- }
- for ray_node in ray_nodes:
- im_instance = None
- ray_node_id = binary_to_hex(ray_node.node_id)
- if ray_node_id in im_instances_by_ray_node_id:
- im_instance = im_instances_by_ray_node_id[ray_node_id]
- else:
- if autoscaling_config.provider == Provider.READ_ONLY:
- # We will use the node id as the cloud instance id for read-only
- # provider.
- im_instance = im_instances_by_cloud_instance_id[ray_node_id]
- elif ray_node.instance_id:
- im_instance = im_instances_by_cloud_instance_id[
- ray_node.instance_id
- ]
- else:
- # This should only happen to a ray node that's not managed by us.
- logger.warning(
- f"Ray node {ray_node_id} has no instance id. "
- "This only happens to a ray node not managed by autoscaler. "
- "If not, please file a bug at "
- "https://github.com/ray-project/ray"
- )
- continue
- assert im_instance is not None, (
- f"Ray node {ray_node_id} has no matching "
- f"instance with cloud instance id={ray_node.instance_id}. We should "
- "not see a ray node with cloud instance id not found in IM since "
- "we have reconciled all cloud instances, and ray nodes by now."
- )
- reconciled_im_status = Reconciler._reconciled_im_status_from_ray_status(
- ray_node.status, im_instance.status
- )
- if reconciled_im_status != im_instance.status:
- updates[ray_node_id] = IMInstanceUpdateEvent(
- instance_id=im_instance.instance_id,
- new_instance_status=reconciled_im_status,
- details=(
- f"ray node {ray_node_id} is "
- f"{NodeStatus.Name(ray_node.status)}"
- ),
- ray_node_id=ray_node_id,
- instance_type=im_instance.instance_type,
- )
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _reconciled_im_status_from_ray_status(
- ray_status: NodeStatus, cur_im_status: IMInstance.InstanceStatus
- ) -> "IMInstance.InstanceStatus":
- """
- Reconcile the instance status from the ray node status.
- Args:
- ray_status: the current ray node status.
- cur_im_status: the current IM instance status.
- Returns:
- The reconciled IM instance status
- Raises:
- ValueError: If the ray status is unknown.
- """
- reconciled_im_status = None
- if ray_status in [NodeStatus.RUNNING, NodeStatus.IDLE]:
- reconciled_im_status = IMInstance.RAY_RUNNING
- elif ray_status == NodeStatus.DEAD:
- reconciled_im_status = IMInstance.RAY_STOPPED
- elif ray_status == NodeStatus.DRAINING:
- reconciled_im_status = IMInstance.RAY_STOPPING
- else:
- raise ValueError(f"Unknown ray status: {ray_status}")
- if (
- cur_im_status == reconciled_im_status
- or cur_im_status
- in InstanceUtil.get_reachable_statuses(reconciled_im_status)
- ):
- # No need to reconcile if the instance is already in the reconciled status
- # or has already transitioned beyond it.
- return cur_im_status
- return reconciled_im_status
- @staticmethod
- def _handle_instances_launch(
- instance_manager: InstanceManager, autoscaling_config: AutoscalingConfig
- ):
- instances, version = Reconciler._get_im_instances(instance_manager)
- queued_instances = []
- requested_instances = []
- running_instances = []
- for instance in instances:
- if instance.status == IMInstance.QUEUED:
- queued_instances.append(instance)
- elif instance.status == IMInstance.REQUESTED:
- requested_instances.append(instance)
- elif instance.status == IMInstance.RAY_RUNNING:
- running_instances.append(instance)
- if not queued_instances:
- # No QUEUED instances
- return
- to_launch = Reconciler._compute_to_launch(
- queued_instances,
- requested_instances,
- running_instances,
- autoscaling_config.get_upscaling_speed(),
- autoscaling_config.get_max_concurrent_launches(),
- )
- # Transition the instances to REQUESTED for instance launcher to
- # launch them.
- updates = {}
- new_launch_request_id = str(uuid.uuid4())
- for instance_type, instances in to_launch.items():
- for instance in instances:
- # Reuse launch request id for any QUEUED instances that have been
- # requested before due to retry.
- launch_request_id = (
- new_launch_request_id
- if len(instance.launch_request_id) == 0
- else instance.launch_request_id
- )
- updates[instance.instance_id] = IMInstanceUpdateEvent(
- instance_id=instance.instance_id,
- new_instance_status=IMInstance.REQUESTED,
- launch_request_id=launch_request_id,
- instance_type=instance_type,
- details=(
- f"requested to launch {instance_type} with request id "
- f"{launch_request_id}"
- ),
- )
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _compute_to_launch(
- queued_instances: List[IMInstance],
- requested_instances: List[IMInstance],
- running_instances: List[IMInstance],
- upscaling_speed: float,
- max_concurrent_launches: int,
- ) -> Dict[NodeType, List[IMInstance]]:
- def _group_by_type(instances):
- instances_by_type = defaultdict(list)
- for instance in instances:
- instances_by_type[instance.instance_type].append(instance)
- return instances_by_type
- # Sort the instances by the time they were queued.
- def _sort_by_earliest_queued(instance: IMInstance) -> List[int]:
- queue_times = InstanceUtil.get_status_transition_times_ns(
- instance, IMInstance.QUEUED
- )
- return sorted(queue_times)
- queued_instances_by_type = _group_by_type(queued_instances)
- running_instances_by_type = _group_by_type(running_instances)
- total_num_requested_to_launch = len(requested_instances)
- all_to_launch: Dict[NodeType : List[IMInstance]] = defaultdict(list)
- for (
- instance_type,
- queued_instances_for_type,
- ) in queued_instances_by_type.items():
- running_instances_for_type = running_instances_by_type.get(
- instance_type, []
- )
- # Enforce the max allowed pending nodes based on current running nodes
- num_desired_to_upscale = max(
- 1,
- math.ceil(upscaling_speed * max(len(running_instances_for_type), 1)),
- )
- # Enforce global limit, at most we can launch `max_concurrent_launches`
- num_to_launch = min(
- max_concurrent_launches - total_num_requested_to_launch,
- num_desired_to_upscale,
- )
- # Cap both ends 0 <= num_to_launch <= num_queued
- num_to_launch = max(0, num_to_launch)
- num_to_launch = min(len(queued_instances_for_type), num_to_launch)
- to_launch = sorted(queued_instances_for_type, key=_sort_by_earliest_queued)[
- :num_to_launch
- ]
- all_to_launch[instance_type].extend(to_launch)
- total_num_requested_to_launch += num_to_launch
- return all_to_launch
- @staticmethod
- def _handle_stuck_instances(
- instance_manager: InstanceManager,
- reconcile_config: InstanceReconcileConfig,
- _logger: logging.Logger,
- ):
- """
- Handle stuck instances with timeouts.
- Instances could be stuck in the following status and needs to be updated:
- - REQUESTED: cloud provider is slow/fails to launch instances.
- - ALLOCATED: ray fails to be started on the instance.
- - RAY_INSTALLING: ray fails to be installed on the instance.
- - TERMINATING: cloud provider is slow/fails to terminate instances.
- Instances could be in the following status which could be unbounded or
- transient, and we don't have a timeout mechanism to handle them. We would
- warn if they are stuck for too long:
- - RAY_STOPPING: ray taking time to drain.
- - QUEUED: cloud provider is slow to launch instances, resulting in long
- queue.
- Reconciler should handle below statuses, if not, could be slow
- reconcilation loop or a bug:
- - RAY_INSTALL_FAILED
- - RAY_STOPPED
- - TERMINATION_FAILED
- Args:
- instance_manager: The instance manager to reconcile.
- reconcile_config: The instance reconcile config.
- _logger: The logger to log the warning messages. It's used for testing.
- """
- instances, version = Reconciler._get_im_instances(instance_manager)
- instances_by_status = defaultdict(list)
- for instance in instances:
- instances_by_status[instance.status].append(instance)
- im_updates = {}
- # Fail or retry the cloud instance allocation if it's stuck
- # in the REQUESTED state.
- for instance in instances_by_status[IMInstance.REQUESTED]:
- update = Reconciler._handle_stuck_requested_instance(
- instance,
- reconcile_config.request_status_timeout_s,
- reconcile_config.max_num_retry_request_to_allocate,
- )
- if update:
- im_updates[instance.instance_id] = update
- # Leaked ALLOCATED instances should be terminated.
- # This usually happens when ray fails to be started on the instance, so
- # it's unable to be RAY_RUNNING after a long time.
- for instance in instances_by_status[IMInstance.ALLOCATED]:
- assert (
- instance.cloud_instance_id
- ), "cloud instance id should be set on ALLOCATED instance"
- update = Reconciler._handle_stuck_instance(
- instance,
- reconcile_config.allocate_status_timeout_s,
- new_status=IMInstance.ALLOCATION_TIMEOUT,
- cloud_instance_id=instance.cloud_instance_id,
- instance_type=instance.instance_type,
- )
- if update:
- im_updates[instance.instance_id] = update
- # Fail the installation if it's stuck in RAY_INSTALLING for too long.
- # If RAY_INSTALLING is stuck for too long, it's likely that the instance
- # is not able to install ray, so we should also fail the installation.
- for instance in instances_by_status[IMInstance.RAY_INSTALLING]:
- update = Reconciler._handle_stuck_instance(
- instance,
- reconcile_config.ray_install_status_timeout_s,
- new_status=IMInstance.RAY_INSTALL_FAILED,
- )
- if update:
- im_updates[instance.instance_id] = update
- # If we tried to terminate the instance, but it doesn't terminate (disappear
- # from the cloud provider) after a long time, we fail the termination.
- # This will trigger another attempt to terminate the instance.
- for instance in instances_by_status[IMInstance.TERMINATING]:
- update = Reconciler._handle_stuck_instance(
- instance,
- reconcile_config.terminating_status_timeout_s,
- new_status=IMInstance.TERMINATION_FAILED,
- )
- if update:
- im_updates[instance.instance_id] = update
- # If we tried to stop ray on the instance, but it doesn't stop after a long
- # time, we will transition it back to RAY_RUNNING as the stop/drain somehow
- # failed. If it had succeed, we should have transitioned it to RAY_STOPPING
- # or RAY_STOPPED.
- for instance in instances_by_status[IMInstance.RAY_STOP_REQUESTED]:
- update = Reconciler._handle_stuck_instance(
- instance,
- reconcile_config.ray_stop_requested_status_timeout_s,
- new_status=IMInstance.RAY_RUNNING,
- ray_node_id=instance.node_id,
- )
- if update:
- im_updates[instance.instance_id] = update
- # These statues could be unbounded or transient, and we don't have a timeout
- # mechanism to handle them. We only warn if they are stuck for too long.
- for status in [
- # Ray taking time to drain. We could also have a timeout when Drain protocol
- # supports timeout.
- IMInstance.RAY_STOPPING,
- # These should just be transient, we will terminate instances with this
- # status in the next reconciler step.
- IMInstance.RAY_INSTALL_FAILED,
- IMInstance.RAY_STOPPED,
- IMInstance.TERMINATION_FAILED,
- # Instances could be in the QUEUED status for a long time if the cloud
- # provider is slow to launch instances.
- IMInstance.QUEUED,
- ]:
- Reconciler._warn_stuck_instances(
- instances_by_status[status],
- status=status,
- warn_interval_s=reconcile_config.transient_status_warn_interval_s,
- logger=_logger,
- )
- Reconciler._update_instance_manager(instance_manager, version, im_updates)
- @staticmethod
- def _warn_stuck_instances(
- instances: List[IMInstance],
- status: IMInstance.InstanceStatus,
- warn_interval_s: int,
- logger: logging.Logger,
- ):
- """Warn if any instance is stuck in a transient/unbounded status for too
- long.
- """
- for instance in instances:
- status_times_ns = InstanceUtil.get_status_transition_times_ns(
- instance, select_instance_status=status
- )
- assert len(status_times_ns) >= 1
- status_time_ns = sorted(status_times_ns)[-1]
- if time.time_ns() - status_time_ns > warn_interval_s * 1e9:
- logger.warning(
- "Instance {}({}) is stuck in {} for {} seconds.".format(
- instance.instance_id,
- IMInstance.InstanceStatus.Name(instance.status),
- IMInstance.InstanceStatus.Name(status),
- (time.time_ns() - status_time_ns) // 1e9,
- )
- )
- @staticmethod
- def _is_head_node_running(instance_manager: InstanceManager) -> bool:
- """
- Check if the head node is running and ready.
- If we scale up the cluster before head node is running,
- it would cause issues when launching the worker nodes.
- There are corner cases when the GCS is up (so the ray cluster resource
- state is retrievable from the GCS), but the head node's raylet is not
- running so the head node is missing from the reported nodes. This happens
- when the head node is still starting up, or the raylet is not running
- due to some issues, and this would yield false.
- Args:
- instance_manager: The instance manager to reconcile.
- Returns:
- True if the head node is running and ready, False otherwise.
- """
- im_instances, _ = Reconciler._get_im_instances(instance_manager)
- for instance in im_instances:
- if instance.node_kind == NodeKind.HEAD:
- if instance.status == IMInstance.RAY_RUNNING:
- return True
- return False
- @staticmethod
- def _scale_cluster(
- autoscaling_state: AutoscalingState,
- instance_manager: InstanceManager,
- cloud_resource_monitor: CloudResourceMonitor,
- ray_state: ClusterResourceState,
- scheduler: IResourceScheduler,
- autoscaling_config: AutoscalingConfig,
- ) -> None:
- """
- Scale the cluster based on the resource state and the resource scheduler's
- decision:
- - It launches new instances if needed.
- - It terminates extra ray nodes if they should be shut down (preemption
- or idle termination)
- Args:
- autoscaling_state: The autoscaling state to reconcile.
- instance_manager: The instance manager to reconcile.
- cloud_resource_monitor: The cloud resource monitor for monitoring resource
- availability of all node types.
- ray_state: The ray cluster's resource state.
- scheduler: The resource scheduler to make scaling decisions.
- autoscaling_config: The autoscaling config.
- """
- # Get the current instance states.
- im_instances, version = Reconciler._get_im_instances(instance_manager)
- im_instances_by_instance_id = {
- i.instance_id: i for i in im_instances if i.instance_id
- }
- autoscaler_instances = []
- ray_nodes_by_id = {
- binary_to_hex(node.node_id): node for node in ray_state.node_states
- }
- for im_instance in im_instances:
- ray_node = ray_nodes_by_id.get(im_instance.node_id)
- autoscaler_instances.append(
- AutoscalerInstance(
- ray_node=ray_node,
- im_instance=im_instance,
- cloud_instance_id=(
- im_instance.cloud_instance_id
- if im_instance.cloud_instance_id
- else None
- ),
- )
- )
- # TODO(rickyx): We should probably name it as "Planner" or "Scaler"
- # or "ClusterScaler"
- sched_request = SchedulingRequest(
- node_type_configs=autoscaling_config.get_node_type_configs(),
- max_num_nodes=autoscaling_config.get_max_num_nodes(),
- resource_requests=ray_state.pending_resource_requests,
- gang_resource_requests=ray_state.pending_gang_resource_requests,
- cluster_resource_constraints=ray_state.cluster_resource_constraints,
- current_instances=autoscaler_instances,
- idle_timeout_s=autoscaling_config.get_idle_timeout_s(),
- disable_launch_config_check=(
- autoscaling_config.disable_launch_config_check()
- ),
- cloud_resource_availabilities=(
- cloud_resource_monitor.get_resource_availabilities()
- ),
- )
- # Ask scheduler for updates to the cluster shape.
- reply = scheduler.schedule(sched_request)
- # Populate the autoscaling state.
- autoscaling_state.infeasible_resource_requests.extend(
- reply.infeasible_resource_requests
- )
- autoscaling_state.infeasible_gang_resource_requests.extend(
- reply.infeasible_gang_resource_requests
- )
- autoscaling_state.infeasible_cluster_resource_constraints.extend(
- reply.infeasible_cluster_resource_constraints
- )
- if not Reconciler._is_head_node_running(instance_manager):
- # We shouldn't be scaling the cluster until the head node is ready.
- # This could happen when the head node (i.e. the raylet) is still
- # pending registration even though GCS is up.
- # We will wait until the head node is running and ready to avoid
- # scaling the cluster from min worker nodes constraint.
- return
- if autoscaling_config.provider == Provider.READ_ONLY:
- # We shouldn't be scaling the cluster if the provider is read-only.
- return
- # Scale the clusters if needed.
- to_launch = reply.to_launch
- to_terminate = reply.to_terminate
- updates = {}
- # Add terminating instances.
- for terminate_request in to_terminate:
- instance_id = terminate_request.instance_id
- if terminate_request.instance_status == IMInstance.QUEUED:
- # QUEUED instances have no cloud resources allocated yet.
- # Cancel the allocation request by transitioning directly to TERMINATED.
- updates[terminate_request.instance_id] = IMInstanceUpdateEvent(
- instance_id=instance_id,
- new_instance_status=IMInstance.TERMINATED,
- termination_request=terminate_request,
- details=f"allocation canceled: {terminate_request.details}",
- )
- elif terminate_request.instance_status in (
- IMInstance.ALLOCATED,
- IMInstance.RAY_INSTALLING,
- ):
- # The instance is not yet running, so we can't request to stop/drain Ray.
- # Therefore, we can skip the RAY_STOP_REQUESTED state and directly terminate the node.
- im_instance_to_terminate = im_instances_by_instance_id[instance_id]
- updates[terminate_request.instance_id] = IMInstanceUpdateEvent(
- instance_id=instance_id,
- new_instance_status=IMInstance.TERMINATING,
- cloud_instance_id=im_instance_to_terminate.cloud_instance_id,
- termination_request=terminate_request,
- details=f"terminating ray: {terminate_request.details}",
- )
- else:
- updates[terminate_request.instance_id] = IMInstanceUpdateEvent(
- instance_id=instance_id,
- new_instance_status=IMInstance.RAY_STOP_REQUESTED,
- termination_request=terminate_request,
- details=f"draining ray: {terminate_request.details}",
- )
- # Add new instances.
- for launch_request in to_launch:
- for _ in range(launch_request.count):
- instance_id = InstanceUtil.random_instance_id()
- updates[instance_id] = IMInstanceUpdateEvent(
- instance_id=instance_id,
- new_instance_status=IMInstance.QUEUED,
- instance_type=launch_request.instance_type,
- upsert=True,
- details=(
- f"queuing new instance of {launch_request.instance_type} "
- "from scheduler"
- ),
- )
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _terminate_instances(instance_manager: InstanceManager):
- """
- Terminate instances with the below statuses:
- - RAY_STOPPED: ray was stopped on the cloud instance.
- - ALLOCATION_TIMEOUT: cloud provider timed out to allocate a running cloud instance.
- - RAY_INSTALL_FAILED: ray installation failed on the cloud instance,
- we will not retry.
- - TERMINATION_FAILED: cloud provider failed to terminate the instance
- or timeout for termination happened, we will retry again.
- Args:
- instance_manager: The instance manager to reconcile.
- """
- im_instances, version = Reconciler._get_im_instances(instance_manager)
- updates = {}
- for instance in im_instances:
- if instance.status not in [
- IMInstance.RAY_STOPPED,
- IMInstance.ALLOCATION_TIMEOUT,
- IMInstance.RAY_INSTALL_FAILED,
- IMInstance.TERMINATION_FAILED,
- ]:
- continue
- # Terminate the instance.
- updates[instance.instance_id] = IMInstanceUpdateEvent(
- instance_id=instance.instance_id,
- new_instance_status=IMInstance.TERMINATING,
- cloud_instance_id=instance.cloud_instance_id,
- details="terminating instance from "
- f"{IMInstance.InstanceStatus.Name(instance.status)}",
- )
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _install_ray(
- instance_manager: InstanceManager,
- non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
- ) -> None:
- """
- Install ray on the allocated instances when it's ready (cloud instance
- should be running)
- This is needed if ray installation needs to be performed by
- the instance manager.
- Args:
- instance_manager: The instance manager to reconcile.
- """
- im_instances, version = Reconciler._get_im_instances(instance_manager)
- updates = {}
- for instance in im_instances:
- if instance.status != IMInstance.ALLOCATED:
- continue
- if instance.node_kind == NodeKind.HEAD:
- # Skip head node.
- continue
- cloud_instance = non_terminated_cloud_instances.get(
- instance.cloud_instance_id
- )
- assert cloud_instance, (
- f"Cloud instance {instance.cloud_instance_id} is not found "
- "in non_terminated_cloud_instances."
- )
- if not cloud_instance.is_running:
- # It might still be pending (e.g. setting up ssh)
- continue
- # Install ray on the running cloud instance
- updates[instance.instance_id] = IMInstanceUpdateEvent(
- instance_id=instance.instance_id,
- new_instance_status=IMInstance.RAY_INSTALLING,
- details="installing ray",
- )
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _fill_autoscaling_state(
- instance_manager: InstanceManager,
- autoscaling_state: AutoscalingState,
- ) -> None:
- # Use the IM instance version for the autoscaler_state_version
- instances, version = Reconciler._get_im_instances(instance_manager)
- autoscaling_state.autoscaler_state_version = version
- # Group instances by status
- instances_by_status = defaultdict(list)
- for instance in instances:
- instances_by_status[instance.status].append(instance)
- # Pending instance requests
- instances_by_launch_request = defaultdict(list)
- queued_instances = []
- for instance in (
- instances_by_status[IMInstance.REQUESTED]
- + instances_by_status[IMInstance.QUEUED]
- ):
- if instance.launch_request_id:
- instances_by_launch_request[instance.launch_request_id].append(instance)
- else:
- queued_instances.append(instance)
- for _, instances in instances_by_launch_request.items():
- num_instances_by_type = defaultdict(int)
- for instance in instances:
- num_instances_by_type[instance.instance_type] += 1
- # All instances with same request id should have the same
- # request time.
- request_update = InstanceUtil.get_last_status_transition(
- instances[0], IMInstance.REQUESTED
- )
- request_time_ns = request_update.timestamp_ns if request_update else 0
- for instance_type, count in num_instances_by_type.items():
- autoscaling_state.pending_instance_requests.append(
- PendingInstanceRequest(
- ray_node_type_name=instance_type,
- count=int(count),
- request_ts=int(request_time_ns // 1e9),
- )
- )
- # Pending instances
- for instance in (
- instances_by_status[IMInstance.ALLOCATED]
- + instances_by_status[IMInstance.RAY_INSTALLING]
- ):
- status_history = sorted(
- instance.status_history, key=lambda x: x.timestamp_ns, reverse=True
- )
- autoscaling_state.pending_instances.append(
- PendingInstance(
- instance_id=instance.instance_id,
- ray_node_type_name=instance.instance_type,
- details=status_history[0].details,
- )
- )
- # Failed instance requests
- for instance in instances_by_status[IMInstance.ALLOCATION_FAILED]:
- request_status_update = InstanceUtil.get_last_status_transition(
- instance, IMInstance.REQUESTED
- )
- failed_status_update = InstanceUtil.get_last_status_transition(
- instance, IMInstance.ALLOCATION_FAILED
- )
- failed_time = (
- failed_status_update.timestamp_ns if failed_status_update else 0
- )
- request_time = (
- request_status_update.timestamp_ns if request_status_update else 0
- )
- autoscaling_state.failed_instance_requests.append(
- FailedInstanceRequest(
- ray_node_type_name=instance.instance_type,
- start_ts=int(request_time // 1e9),
- failed_ts=int(
- failed_time // 1e9,
- ),
- reason=failed_status_update.details,
- count=1,
- )
- )
- @staticmethod
- def _handle_stuck_requested_instance(
- instance: IMInstance, timeout_s: int, max_num_retry_request_to_allocate: int
- ) -> Optional[IMInstanceUpdateEvent]:
- """
- Fail the cloud instance allocation if it's stuck in the REQUESTED state.
- Args:
- instance: The instance to handle.
- timeout_s: The timeout in seconds.
- max_num_retry_request_to_allocate: The maximum number of times an instance
- could be requested to allocate.
- Returns:
- Instance update to ALLOCATION_FAILED: if the instance allocation failed
- with errors.
- None: if there's no update.
- """
- if not InstanceUtil.has_timeout(instance, timeout_s):
- # Not timeout yet, be patient.
- return None
- all_request_times_ns = sorted(
- InstanceUtil.get_status_transition_times_ns(
- instance, select_instance_status=IMInstance.REQUESTED
- )
- )
- # Fail the allocation if we have tried too many times.
- if len(all_request_times_ns) > max_num_retry_request_to_allocate:
- return IMInstanceUpdateEvent(
- instance_id=instance.instance_id,
- new_instance_status=IMInstance.ALLOCATION_FAILED,
- details=(
- "failed to allocate cloud instance after "
- f"{len(all_request_times_ns)} attempts > "
- f"max_num_retry_request_to_allocate={max_num_retry_request_to_allocate}" # noqa
- ),
- )
- # Retry the allocation if we could by transitioning to QUEUED again.
- return IMInstanceUpdateEvent(
- instance_id=instance.instance_id,
- new_instance_status=IMInstance.QUEUED,
- details=f"queue again to launch after timeout={timeout_s}s",
- )
- @staticmethod
- def _handle_stuck_instance(
- instance: IMInstance,
- timeout_s: int,
- new_status: IMInstance.InstanceStatus,
- **update_kwargs: Dict,
- ) -> Optional[IMInstanceUpdateEvent]:
- """
- Fail the instance if it's stuck in the status for too long.
- Args:
- instance: The instance to handle.
- timeout_s: The timeout in seconds.
- new_status: The new status to transition to.
- update_kwargs: The update kwargs for InstanceUpdateEvent
- Returns:
- Instance update to the new status: if the instance is stuck in the status
- for too long.
- None: if there's no update.
- """
- if not InstanceUtil.has_timeout(instance, timeout_s):
- # Not timeout yet, be patient.
- return None
- return IMInstanceUpdateEvent(
- instance_id=instance.instance_id,
- new_instance_status=new_status,
- details=f"timeout={timeout_s}s at status "
- f"{IMInstance.InstanceStatus.Name(instance.status)}",
- **update_kwargs,
- )
- @staticmethod
- def _handle_extra_cloud_instances(
- instance_manager: InstanceManager,
- non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
- ray_nodes: List[NodeState],
- ):
- """
- For extra cloud instances (i.e. cloud instances that are non terminated as
- returned by cloud provider, but not managed by the instance manager), we
- will create new IM instances with ALLOCATED status.
- Such instances could either be:
- 1. Leaked instances that are incorrectly started by the cloud instance
- provider, and they would be terminated eventually if they fail to
- transition to RAY_RUNNING by stuck instances reconciliation, or they
- would join the ray cluster and be terminated when the cluster scales down.
- 2. Instances that are started by the cloud instance provider intentionally
- but not yet discovered by the instance manager. This could happen for
- a. Head node that's started before the autoscaler.
- b. Worker nodes that's started by the cloud provider upon users'
- actions: i.e. KubeRay scaling up the cluster with ray cluster config
- change.
- 3. Ray nodes with cloud instance id not in the cloud provider. This could
- happen if there's delay in the Ray's state (i.e. cloud instance already
- terminated, but the ray node is still not dead yet).
- Args:
- instance_manager: The instance manager to reconcile.
- non_terminated_cloud_instances: The non-terminated cloud instances from
- the cloud provider.
- ray_nodes: The ray cluster's states of ray nodes.
- """
- Reconciler._handle_extra_cloud_instances_from_ray_nodes(
- instance_manager, ray_nodes
- )
- Reconciler._handle_extra_cloud_instances_from_cloud_provider(
- instance_manager, non_terminated_cloud_instances
- )
- @staticmethod
- def _handle_extra_cloud_instances_from_cloud_provider(
- instance_manager: InstanceManager,
- non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
- ):
- """
- For extra cloud instances that are not managed by the instance manager but
- are running in the cloud provider, we will create new IM instances with
- ALLOCATED status.
- Args:
- instance_manager: The instance manager to reconcile.
- non_terminated_cloud_instances: The non-terminated cloud instances from
- the cloud provider.
- """
- updates = {}
- instances, version = Reconciler._get_im_instances(instance_manager)
- cloud_instance_ids_managed_by_im = {
- instance.cloud_instance_id
- for instance in instances
- if instance.cloud_instance_id
- and instance.status
- not in [IMInstance.TERMINATED, IMInstance.ALLOCATION_FAILED]
- }
- # Find the extra cloud instances that are not managed by the instance manager.
- for cloud_instance_id, cloud_instance in non_terminated_cloud_instances.items():
- if cloud_instance_id in cloud_instance_ids_managed_by_im:
- continue
- updates[cloud_instance_id] = IMInstanceUpdateEvent(
- instance_id=InstanceUtil.random_instance_id(), # Assign a new id.
- cloud_instance_id=cloud_instance_id,
- new_instance_status=IMInstance.ALLOCATED,
- node_kind=cloud_instance.node_kind,
- instance_type=cloud_instance.node_type,
- details=(
- "allocated unmanaged cloud instance :"
- f"{cloud_instance.cloud_instance_id} "
- f"({NodeKind.Name(cloud_instance.node_kind)}) from cloud provider"
- ),
- upsert=True,
- )
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _handle_extra_cloud_instances_from_ray_nodes(
- instance_manager: InstanceManager, ray_nodes: List[NodeState]
- ):
- """
- For extra cloud instances reported by Ray but not managed by the instance
- manager, we will create new IM instances with ALLOCATED status.
- Args:
- instance_manager: The instance manager to reconcile.
- ray_nodes: The ray cluster's states of ray nodes.
- """
- updates = {}
- instances, version = Reconciler._get_im_instances(instance_manager)
- cloud_instance_ids_managed_by_im = {
- instance.cloud_instance_id
- for instance in instances
- if instance.cloud_instance_id
- and not instance.node_id
- and instance.status
- not in [IMInstance.TERMINATED, IMInstance.ALLOCATION_FAILED]
- }
- ray_node_ids_managed_by_im = {
- instance.node_id for instance in instances if instance.node_id
- }
- for ray_node in ray_nodes:
- if not ray_node.instance_id:
- continue
- ray_node_id = binary_to_hex(ray_node.node_id)
- if ray_node_id in ray_node_ids_managed_by_im:
- continue
- cloud_instance_id = ray_node.instance_id
- if cloud_instance_id in cloud_instance_ids_managed_by_im:
- continue
- is_head = is_head_node(ray_node)
- updates[ray_node_id] = IMInstanceUpdateEvent(
- instance_id=InstanceUtil.random_instance_id(), # Assign a new id.
- cloud_instance_id=cloud_instance_id,
- new_instance_status=IMInstance.ALLOCATED,
- node_kind=NodeKind.HEAD if is_head else NodeKind.WORKER,
- ray_node_id=ray_node_id,
- instance_type=ray_node.ray_node_type_name,
- details=(
- "allocated unmanaged worker cloud instance from ray node: "
- f"{ray_node_id}"
- ),
- upsert=True,
- )
- Reconciler._update_instance_manager(instance_manager, version, updates)
- @staticmethod
- def _report_metrics(
- instance_manager: InstanceManager,
- autoscaling_config: AutoscalingConfig,
- metrics_reporter: Optional[AutoscalerMetricsReporter] = None,
- ):
- if not metrics_reporter:
- return
- instances, _ = Reconciler._get_im_instances(instance_manager)
- node_type_configs = autoscaling_config.get_node_type_configs()
- metrics_reporter.report_instances(instances, node_type_configs)
- metrics_reporter.report_resources(instances, node_type_configs)
|