reconciler.py 66 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637
  1. import logging
  2. import math
  3. import time
  4. import uuid
  5. from collections import defaultdict
  6. from typing import Dict, List, Optional, Set, Tuple
  7. from ray._common.utils import binary_to_hex
  8. from ray.autoscaler.v2.instance_manager.common import InstanceUtil
  9. from ray.autoscaler.v2.instance_manager.config import (
  10. AutoscalingConfig,
  11. InstanceReconcileConfig,
  12. Provider,
  13. )
  14. from ray.autoscaler.v2.instance_manager.instance_manager import InstanceManager
  15. from ray.autoscaler.v2.instance_manager.node_provider import (
  16. CloudInstance,
  17. CloudInstanceId,
  18. CloudInstanceProviderError,
  19. ICloudInstanceProvider,
  20. LaunchNodeError,
  21. TerminateNodeError,
  22. )
  23. from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import (
  24. CloudResourceMonitor,
  25. )
  26. from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopError
  27. from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import (
  28. RayInstallError,
  29. )
  30. from ray.autoscaler.v2.metrics_reporter import AutoscalerMetricsReporter
  31. from ray.autoscaler.v2.scheduler import IResourceScheduler, SchedulingRequest
  32. from ray.autoscaler.v2.schema import AutoscalerInstance, NodeType
  33. from ray.autoscaler.v2.utils import is_head_node
  34. from ray.core.generated.autoscaler_pb2 import (
  35. AutoscalingState,
  36. ClusterResourceState,
  37. FailedInstanceRequest,
  38. NodeState,
  39. NodeStatus,
  40. PendingInstance,
  41. PendingInstanceRequest,
  42. )
  43. from ray.core.generated.instance_manager_pb2 import (
  44. GetInstanceManagerStateRequest,
  45. Instance as IMInstance,
  46. InstanceUpdateEvent as IMInstanceUpdateEvent,
  47. NodeKind,
  48. StatusCode,
  49. UpdateInstanceManagerStateRequest,
  50. )
  51. logger = logging.getLogger(__name__)
  52. class Reconciler:
  53. """
  54. A singleton class that reconciles the instance states of the instance manager
  55. for autoscaler.
  56. """
  57. @staticmethod
  58. def reconcile(
  59. instance_manager: InstanceManager,
  60. scheduler: IResourceScheduler,
  61. cloud_provider: ICloudInstanceProvider,
  62. cloud_resource_monitor: CloudResourceMonitor,
  63. ray_cluster_resource_state: ClusterResourceState,
  64. non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
  65. autoscaling_config: AutoscalingConfig,
  66. cloud_provider_errors: Optional[List[CloudInstanceProviderError]] = None,
  67. ray_install_errors: Optional[List[RayInstallError]] = None,
  68. ray_stop_errors: Optional[List[RayStopError]] = None,
  69. metrics_reporter: Optional[AutoscalerMetricsReporter] = None,
  70. _logger: Optional[logging.Logger] = None,
  71. ) -> AutoscalingState:
  72. """
  73. The reconcile method computes InstanceUpdateEvents for the instance manager
  74. by:
  75. 1. Reconciling the instance manager's instances with external states like
  76. the cloud provider's, the ray cluster's states, the ray installer's results.
  77. It performs "passive" status transitions for the instances (where the status
  78. transition should only be reflecting the external states of the cloud provider
  79. and the ray cluster, and should not be actively changing them)
  80. 2. Stepping the instances to the active states by computing instance status
  81. transitions that are needed and updating the instance manager's state.
  82. These transitions should be "active" where the transitions have side effects
  83. (through InstanceStatusSubscriber) to the cloud provider and the ray cluster.
  84. Args:
  85. instance_manager: The instance manager to reconcile.
  86. cloud_resource_monitor: The cloud resource monitor for monitoring resource
  87. availability of all node types.
  88. ray_cluster_resource_state: The ray cluster's resource state.
  89. non_terminated_cloud_instances: The non-terminated cloud instances from
  90. the cloud provider.
  91. cloud_provider_errors: The errors from the cloud provider.
  92. ray_install_errors: The errors from RayInstaller.
  93. ray_stop_errors: The errors from RayStopper.
  94. metrics_reporter: The metric reporter to report the autoscaler metrics.
  95. _logger: The logger (for testing).
  96. """
  97. cloud_provider_errors = cloud_provider_errors or []
  98. ray_install_errors = ray_install_errors or []
  99. ray_stop_errors = ray_stop_errors or []
  100. autoscaling_state = AutoscalingState()
  101. autoscaling_state.last_seen_cluster_resource_state_version = (
  102. ray_cluster_resource_state.cluster_resource_state_version
  103. )
  104. Reconciler._sync_from(
  105. instance_manager=instance_manager,
  106. ray_nodes=ray_cluster_resource_state.node_states,
  107. non_terminated_cloud_instances=non_terminated_cloud_instances,
  108. cloud_provider_errors=cloud_provider_errors,
  109. ray_install_errors=ray_install_errors,
  110. ray_stop_errors=ray_stop_errors,
  111. autoscaling_config=autoscaling_config,
  112. )
  113. Reconciler._step_next(
  114. autoscaling_state=autoscaling_state,
  115. instance_manager=instance_manager,
  116. scheduler=scheduler,
  117. cloud_provider=cloud_provider,
  118. cloud_resource_monitor=cloud_resource_monitor,
  119. ray_cluster_resource_state=ray_cluster_resource_state,
  120. non_terminated_cloud_instances=non_terminated_cloud_instances,
  121. autoscaling_config=autoscaling_config,
  122. _logger=_logger,
  123. )
  124. Reconciler._report_metrics(
  125. instance_manager=instance_manager,
  126. autoscaling_config=autoscaling_config,
  127. metrics_reporter=metrics_reporter,
  128. )
  129. return autoscaling_state
  130. @staticmethod
  131. def _sync_from(
  132. instance_manager: InstanceManager,
  133. ray_nodes: List[NodeState],
  134. non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
  135. cloud_provider_errors: List[CloudInstanceProviderError],
  136. ray_install_errors: List[RayInstallError],
  137. ray_stop_errors: List[RayStopError],
  138. autoscaling_config: AutoscalingConfig,
  139. ):
  140. """
  141. Reconcile the instance states of the instance manager from external states like
  142. the cloud provider's, the ray cluster's states, the ray installer's results,
  143. etc.
  144. For each instance, we try to figure out if we need to transition the instance
  145. status to a new status, and if so, what the new status should be.
  146. These transitions should be purely "passive", meaning they should only be
  147. reflecting the external states of the cloud provider and the ray cluster,
  148. and should not be actively changing the states of the cloud provider or the ray
  149. cluster.
  150. More specifically, we will reconcile status transitions for:
  151. 1. QUEUED/REQUESTED -> ALLOCATED:
  152. When an instance with launch request id (indicating a previous launch
  153. request was made) could be assigned to an unassigned cloud instance
  154. of the same instance type.
  155. 2. REQUESTED -> ALLOCATION_FAILED:
  156. When there's an error from the cloud provider for launch failure so
  157. that the instance becomes ALLOCATION_FAILED.
  158. 3. ALLOCATED -> ALLOCATION_TIMEOUT:
  159. When an instance has been allocated to a cloud instance, but is stuck in
  160. this state. For example, a kubernetes pod remains pending due to
  161. insufficient resources.
  162. 4. * -> RAY_RUNNING:
  163. When a ray node on a cloud instance joins the ray cluster, we will
  164. transition the instance to RAY_RUNNING.
  165. 5. * -> TERMINATED:
  166. When the cloud instance is already terminated, we will transition the
  167. instance to TERMINATED.
  168. 6. TERMINATING -> TERMINATION_FAILED:
  169. When there's an error from the cloud provider for termination failure.
  170. 7. * -> RAY_STOPPED:
  171. When ray was stopped on the cloud instance, we will transition the
  172. instance to RAY_STOPPED.
  173. 8. * -> RAY_INSTALL_FAILED:
  174. When there's an error from RayInstaller.
  175. 9. RAY_STOP_REQUESTED -> RAY_RUNNING:
  176. When requested to stop ray, but failed to stop/drain the ray node
  177. (e.g. idle termination drain rejected by the node).
  178. Args:
  179. instance_manager: The instance manager to reconcile.
  180. ray_nodes: The ray cluster's states of ray nodes.
  181. non_terminated_cloud_instances: The non-terminated cloud instances from
  182. the cloud provider.
  183. cloud_provider_errors: The errors from the cloud provider.
  184. ray_install_errors: The errors from RayInstaller.
  185. ray_stop_errors: The errors from RayStopper.
  186. """
  187. # Handle 1 & 2 for cloud instance allocation.
  188. Reconciler._handle_cloud_instance_allocation(
  189. instance_manager,
  190. non_terminated_cloud_instances,
  191. cloud_provider_errors,
  192. )
  193. Reconciler._handle_cloud_instance_terminated(
  194. instance_manager, non_terminated_cloud_instances
  195. )
  196. Reconciler._handle_cloud_instance_termination_errors(
  197. instance_manager, cloud_provider_errors
  198. )
  199. Reconciler._handle_extra_cloud_instances(
  200. instance_manager, non_terminated_cloud_instances, ray_nodes
  201. )
  202. Reconciler._handle_ray_status_transition(
  203. instance_manager, ray_nodes, autoscaling_config
  204. )
  205. Reconciler._handle_ray_install_failed(instance_manager, ray_install_errors)
  206. Reconciler._handle_ray_stop_failed(instance_manager, ray_stop_errors, ray_nodes)
  207. @staticmethod
  208. def _step_next(
  209. autoscaling_state: AutoscalingState,
  210. instance_manager: InstanceManager,
  211. scheduler: IResourceScheduler,
  212. cloud_provider: ICloudInstanceProvider,
  213. cloud_resource_monitor: CloudResourceMonitor,
  214. ray_cluster_resource_state: ClusterResourceState,
  215. non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
  216. autoscaling_config: AutoscalingConfig,
  217. _logger: Optional[logging.Logger] = None,
  218. ):
  219. """
  220. Step the reconciler to the next state by computing instance status transitions
  221. that are needed and updating the instance manager's state.
  222. Specifically, we will:
  223. 1. Shut down leak cloud instances
  224. Leaked cloud instances that are not managed by the instance manager.
  225. 2. Terminating instances with ray stopped or ray install failure.
  226. 3. Scale down the cluster:
  227. (* -> RAY_STOP_REQUESTED/TERMINATING)
  228. b. Extra cloud due to max nodes config.
  229. c. Cloud instances with outdated configs.
  230. 4. Scale up the cluster:
  231. (new QUEUED)
  232. Create new instances based on the IResourceScheduler's decision for
  233. scaling up.
  234. 5. Request cloud provider to launch new instances.
  235. (QUEUED -> REQUESTED)
  236. 6. Install ray
  237. (ALLOCATED -> RAY_INSTALLING)
  238. When ray could be installed and launched.
  239. 7. Handle any stuck instances with timeouts.
  240. Args:
  241. instance_manager: The instance manager to reconcile.
  242. scheduler: The resource scheduler to make scaling decisions.
  243. cloud_resource_monitor: The cloud resource monitor for monitoring resource
  244. availability of all node types.
  245. ray_cluster_resource_state: The ray cluster's resource state.
  246. non_terminated_cloud_instances: The non-terminated cloud instances from
  247. the cloud provider.
  248. autoscaling_config: The autoscaling config.
  249. _logger: The logger (for testing).
  250. """
  251. Reconciler._handle_stuck_instances(
  252. instance_manager=instance_manager,
  253. reconcile_config=autoscaling_config.get_instance_reconcile_config(),
  254. _logger=_logger or logger,
  255. )
  256. Reconciler._scale_cluster(
  257. autoscaling_state=autoscaling_state,
  258. instance_manager=instance_manager,
  259. cloud_resource_monitor=cloud_resource_monitor,
  260. ray_state=ray_cluster_resource_state,
  261. scheduler=scheduler,
  262. autoscaling_config=autoscaling_config,
  263. )
  264. Reconciler._handle_instances_launch(
  265. instance_manager=instance_manager, autoscaling_config=autoscaling_config
  266. )
  267. Reconciler._terminate_instances(instance_manager=instance_manager)
  268. if not autoscaling_config.disable_node_updaters():
  269. Reconciler._install_ray(
  270. instance_manager=instance_manager,
  271. non_terminated_cloud_instances=non_terminated_cloud_instances,
  272. )
  273. Reconciler._fill_autoscaling_state(
  274. instance_manager=instance_manager, autoscaling_state=autoscaling_state
  275. )
  276. #######################################################
  277. # Utility methods for reconciling instance states.
  278. #######################################################
  279. @staticmethod
  280. def _handle_cloud_instance_allocation(
  281. instance_manager: InstanceManager,
  282. non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
  283. cloud_provider_errors: List[CloudInstanceProviderError],
  284. ):
  285. im_instances, version = Reconciler._get_im_instances(instance_manager)
  286. updates = {}
  287. # Compute intermediate states.
  288. instances_with_launch_requests: List[IMInstance] = []
  289. for instance in im_instances:
  290. if instance.status != IMInstance.REQUESTED:
  291. continue
  292. assert (
  293. instance.launch_request_id
  294. ), "Instance in REQUESTED status should have launch_request_id set."
  295. instances_with_launch_requests.append(instance)
  296. assigned_cloud_instance_ids: Set[CloudInstanceId] = {
  297. instance.cloud_instance_id
  298. for instance in im_instances
  299. if instance.cloud_instance_id
  300. and instance.status
  301. not in [IMInstance.TERMINATED, IMInstance.ALLOCATION_FAILED]
  302. }
  303. launch_errors: Dict[str, LaunchNodeError] = {
  304. error.request_id: error
  305. for error in cloud_provider_errors
  306. if isinstance(error, LaunchNodeError)
  307. }
  308. unassigned_cloud_instances_by_type: Dict[
  309. str, List[CloudInstance]
  310. ] = defaultdict(list)
  311. for cloud_instance_id, cloud_instance in non_terminated_cloud_instances.items():
  312. if cloud_instance_id not in assigned_cloud_instance_ids:
  313. unassigned_cloud_instances_by_type[cloud_instance.node_type].append(
  314. cloud_instance
  315. )
  316. # Sort the request instance by the increasing request time.
  317. instances_with_launch_requests.sort(
  318. key=lambda instance: InstanceUtil.get_status_transition_times_ns(
  319. instance, IMInstance.REQUESTED
  320. )
  321. )
  322. # For each instance, try to allocate or fail the allocation.
  323. for instance in instances_with_launch_requests:
  324. # Try allocate or fail with errors.
  325. update_event = Reconciler._try_resolve_pending_allocation(
  326. instance, unassigned_cloud_instances_by_type, launch_errors
  327. )
  328. if not update_event:
  329. continue
  330. updates[instance.instance_id] = update_event
  331. # Update the instance manager for the events.
  332. Reconciler._update_instance_manager(instance_manager, version, updates)
  333. @staticmethod
  334. def _try_resolve_pending_allocation(
  335. im_instance: IMInstance,
  336. unassigned_cloud_instances_by_type: Dict[str, List[CloudInstance]],
  337. launch_errors: Dict[str, LaunchNodeError],
  338. ) -> Optional[IMInstanceUpdateEvent]:
  339. """
  340. Allocate, or fail the cloud instance allocation for the instance.
  341. Args:
  342. im_instance: The instance to allocate or fail.
  343. unassigned_cloud_instances_by_type: The unassigned cloud instances by type.
  344. launch_errors: The launch errors from the cloud provider.
  345. Returns:
  346. Instance update to ALLOCATED: if there's a matching unassigned cloud
  347. instance with the same type.
  348. Instance update to ALLOCATION_FAILED: if the instance allocation failed
  349. with errors.
  350. None: if there's no update.
  351. """
  352. unassigned_cloud_instance = None
  353. # Try to allocate an unassigned cloud instance.
  354. # TODO(rickyx): We could also look at the launch request id
  355. # on the cloud node and the im instance later once all node providers
  356. # support request id. For now, we only look at the instance type.
  357. if len(unassigned_cloud_instances_by_type.get(im_instance.instance_type, [])):
  358. unassigned_cloud_instance = unassigned_cloud_instances_by_type[
  359. im_instance.instance_type
  360. ].pop()
  361. if unassigned_cloud_instance:
  362. return IMInstanceUpdateEvent(
  363. instance_id=im_instance.instance_id,
  364. new_instance_status=IMInstance.ALLOCATED,
  365. cloud_instance_id=unassigned_cloud_instance.cloud_instance_id,
  366. node_kind=unassigned_cloud_instance.node_kind,
  367. instance_type=unassigned_cloud_instance.node_type,
  368. details=(
  369. "allocated unassigned cloud instance "
  370. f"{unassigned_cloud_instance.cloud_instance_id}"
  371. ),
  372. )
  373. # If there's a launch error, transition to ALLOCATION_FAILED.
  374. launch_error = launch_errors.get(im_instance.launch_request_id)
  375. if launch_error and launch_error.node_type == im_instance.instance_type:
  376. return IMInstanceUpdateEvent(
  377. instance_id=im_instance.instance_id,
  378. new_instance_status=IMInstance.ALLOCATION_FAILED,
  379. details=f"launch failed with {str(launch_error)}",
  380. )
  381. # No update.
  382. return None
  383. @staticmethod
  384. def _handle_ray_stop_failed(
  385. instance_manager: InstanceManager,
  386. ray_stop_errors: List[RayStopError],
  387. ray_nodes: List[NodeState],
  388. ):
  389. """
  390. The instance requested to stop ray, but failed to stop/drain the ray node.
  391. E.g. connection errors, idle termination drain rejected by the node.
  392. We will transition the instance back to RAY_RUNNING.
  393. Args:
  394. instance_manager: The instance manager to reconcile.
  395. ray_stop_errors: The errors from RayStopper.
  396. """
  397. instances, version = Reconciler._get_im_instances(instance_manager)
  398. updates = {}
  399. ray_stop_errors_by_instance_id = {
  400. error.im_instance_id: error for error in ray_stop_errors
  401. }
  402. ray_nodes_by_ray_node_id = {binary_to_hex(n.node_id): n for n in ray_nodes}
  403. ray_stop_requested_instances = {
  404. instance.instance_id: instance
  405. for instance in instances
  406. if instance.status == IMInstance.RAY_STOP_REQUESTED
  407. }
  408. for instance_id, instance in ray_stop_requested_instances.items():
  409. stop_error = ray_stop_errors_by_instance_id.get(instance_id)
  410. if not stop_error:
  411. continue
  412. assert instance.node_id
  413. ray_node = ray_nodes_by_ray_node_id.get(instance.node_id)
  414. assert ray_node is not None and ray_node.status in [
  415. NodeStatus.RUNNING,
  416. NodeStatus.IDLE,
  417. ], (
  418. "There should be a running ray node for instance with ray stop "
  419. "requested failed."
  420. )
  421. updates[instance_id] = IMInstanceUpdateEvent(
  422. instance_id=instance_id,
  423. new_instance_status=IMInstance.RAY_RUNNING,
  424. details="failed to stop/drain ray",
  425. ray_node_id=instance.node_id,
  426. )
  427. Reconciler._update_instance_manager(instance_manager, version, updates)
  428. @staticmethod
  429. def _handle_ray_install_failed(
  430. instance_manager: InstanceManager, ray_install_errors: List[RayInstallError]
  431. ):
  432. instances, version = Reconciler._get_im_instances(instance_manager)
  433. updates = {}
  434. # Get all instances with RAY_INSTALLING status.
  435. instances_with_ray_installing = {
  436. instance.instance_id: instance
  437. for instance in instances
  438. if instance.status == IMInstance.RAY_INSTALLING
  439. }
  440. install_errors = {error.im_instance_id: error for error in ray_install_errors}
  441. # For each instance with RAY_INSTALLING status, check if there's any
  442. # install error.
  443. for instance_id, instance in instances_with_ray_installing.items():
  444. install_error = install_errors.get(instance_id)
  445. if install_error:
  446. updates[instance_id] = IMInstanceUpdateEvent(
  447. instance_id=instance_id,
  448. new_instance_status=IMInstance.RAY_INSTALL_FAILED,
  449. details=(
  450. f"failed to install ray with errors: {install_error.details}"
  451. ),
  452. )
  453. # Update the instance manager for the events.
  454. Reconciler._update_instance_manager(instance_manager, version, updates)
  455. @staticmethod
  456. def _handle_cloud_instance_terminated(
  457. instance_manager: InstanceManager,
  458. non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
  459. ):
  460. """
  461. For any IM (instance manager) instance with a cloud node id, if the mapped
  462. cloud instance is no longer running, transition the instance to TERMINATED.
  463. Args:
  464. instance_manager: The instance manager to reconcile.
  465. non_terminated_cloud_instances: The non-terminated cloud instances from
  466. the cloud provider.
  467. """
  468. updates = {}
  469. instances, version = Reconciler._get_im_instances(instance_manager)
  470. non_terminated_instances_with_cloud_instance_assigned = {
  471. instance.cloud_instance_id: instance
  472. for instance in instances
  473. if instance.cloud_instance_id and instance.status != IMInstance.TERMINATED
  474. }
  475. for (
  476. cloud_instance_id,
  477. instance,
  478. ) in non_terminated_instances_with_cloud_instance_assigned.items():
  479. if cloud_instance_id in non_terminated_cloud_instances.keys():
  480. # The cloud instance is still running.
  481. continue
  482. # The cloud instance is terminated.
  483. updates[instance.instance_id] = IMInstanceUpdateEvent(
  484. instance_id=instance.instance_id,
  485. new_instance_status=IMInstance.TERMINATED,
  486. details=f"cloud instance {cloud_instance_id} no longer found",
  487. )
  488. Reconciler._update_instance_manager(instance_manager, version, updates)
  489. @staticmethod
  490. def _handle_cloud_instance_termination_errors(
  491. instance_manager: InstanceManager,
  492. cloud_provider_errors: List[CloudInstanceProviderError],
  493. ):
  494. """
  495. If any TERMINATING instances have termination errors, transition the instance to
  496. TERMINATION_FAILED.
  497. We will retry the termination for the TERMINATION_FAILED instances in the next
  498. reconciler step.
  499. Args:
  500. instance_manager: The instance manager to reconcile.
  501. cloud_provider_errors: The errors from the cloud provider.
  502. """
  503. instances, version = Reconciler._get_im_instances(instance_manager)
  504. updates = {}
  505. termination_errors = {
  506. error.cloud_instance_id: error
  507. for error in cloud_provider_errors
  508. if isinstance(error, TerminateNodeError)
  509. }
  510. terminating_instances_by_cloud_instance_id = {
  511. instance.cloud_instance_id: instance
  512. for instance in instances
  513. if instance.status == IMInstance.TERMINATING
  514. }
  515. for cloud_instance_id, failure in termination_errors.items():
  516. instance = terminating_instances_by_cloud_instance_id.get(cloud_instance_id)
  517. if not instance:
  518. # The instance is no longer in TERMINATING status.
  519. continue
  520. updates[instance.instance_id] = IMInstanceUpdateEvent(
  521. instance_id=instance.instance_id,
  522. new_instance_status=IMInstance.TERMINATION_FAILED,
  523. details=f"termination failed: {str(failure)}",
  524. )
  525. Reconciler._update_instance_manager(instance_manager, version, updates)
  526. @staticmethod
  527. def _get_im_instances(
  528. instance_manager: InstanceManager,
  529. ) -> Tuple[List[IMInstance], int]:
  530. reply = instance_manager.get_instance_manager_state(
  531. request=GetInstanceManagerStateRequest()
  532. )
  533. assert reply.status.code == StatusCode.OK
  534. im_state = reply.state
  535. return im_state.instances, im_state.version
  536. @staticmethod
  537. def _update_instance_manager(
  538. instance_manager: InstanceManager,
  539. version: int,
  540. updates: Dict[str, IMInstanceUpdateEvent],
  541. ) -> None:
  542. if not updates:
  543. return
  544. updates = list(updates.values()) or []
  545. reply = instance_manager.update_instance_manager_state(
  546. request=UpdateInstanceManagerStateRequest(
  547. expected_version=version,
  548. updates=updates,
  549. )
  550. )
  551. # TODO: While it's possible that a version mismatch
  552. # happens, or some other failures could happen. But given
  553. # the current implementation:
  554. # 1. There's only 1 writer (the reconciler) for updating the instance
  555. # manager states, so there shouldn't be version mismatch.
  556. # 2. Any failures in one reconciler step should be caught at a higher
  557. # level and be retried in the next reconciler step. If the IM
  558. # fails to be updated, we don't have sufficient info to handle it
  559. # here.
  560. assert (
  561. reply.status.code == StatusCode.OK
  562. ), f"Failed to update instance manager: {reply}"
  563. @staticmethod
  564. def _handle_ray_status_transition(
  565. instance_manager: InstanceManager,
  566. ray_nodes: List[NodeState],
  567. autoscaling_config: AutoscalingConfig,
  568. ):
  569. """
  570. Handle the ray status transition for the instance manager.
  571. If a new ray node running on the instance, transition it to RAY_RUNNING.
  572. If a ray node stopped, transition it to RAY_STOPPED.
  573. If a ray node is draining, transition it to RAY_STOPPING.
  574. Args:
  575. instance_manager: The instance manager to reconcile.
  576. ray_nodes: The ray cluster's states of ray nodes.
  577. """
  578. instances, version = Reconciler._get_im_instances(instance_manager)
  579. updates = {}
  580. im_instances_by_cloud_instance_id = {
  581. instance.cloud_instance_id: instance
  582. for instance in instances
  583. if instance.cloud_instance_id
  584. and instance.status
  585. not in [IMInstance.TERMINATED, IMInstance.ALLOCATION_FAILED]
  586. }
  587. im_instances_by_ray_node_id = {
  588. instance.node_id: instance for instance in instances if instance.node_id
  589. }
  590. for ray_node in ray_nodes:
  591. im_instance = None
  592. ray_node_id = binary_to_hex(ray_node.node_id)
  593. if ray_node_id in im_instances_by_ray_node_id:
  594. im_instance = im_instances_by_ray_node_id[ray_node_id]
  595. else:
  596. if autoscaling_config.provider == Provider.READ_ONLY:
  597. # We will use the node id as the cloud instance id for read-only
  598. # provider.
  599. im_instance = im_instances_by_cloud_instance_id[ray_node_id]
  600. elif ray_node.instance_id:
  601. im_instance = im_instances_by_cloud_instance_id[
  602. ray_node.instance_id
  603. ]
  604. else:
  605. # This should only happen to a ray node that's not managed by us.
  606. logger.warning(
  607. f"Ray node {ray_node_id} has no instance id. "
  608. "This only happens to a ray node not managed by autoscaler. "
  609. "If not, please file a bug at "
  610. "https://github.com/ray-project/ray"
  611. )
  612. continue
  613. assert im_instance is not None, (
  614. f"Ray node {ray_node_id} has no matching "
  615. f"instance with cloud instance id={ray_node.instance_id}. We should "
  616. "not see a ray node with cloud instance id not found in IM since "
  617. "we have reconciled all cloud instances, and ray nodes by now."
  618. )
  619. reconciled_im_status = Reconciler._reconciled_im_status_from_ray_status(
  620. ray_node.status, im_instance.status
  621. )
  622. if reconciled_im_status != im_instance.status:
  623. updates[ray_node_id] = IMInstanceUpdateEvent(
  624. instance_id=im_instance.instance_id,
  625. new_instance_status=reconciled_im_status,
  626. details=(
  627. f"ray node {ray_node_id} is "
  628. f"{NodeStatus.Name(ray_node.status)}"
  629. ),
  630. ray_node_id=ray_node_id,
  631. instance_type=im_instance.instance_type,
  632. )
  633. Reconciler._update_instance_manager(instance_manager, version, updates)
  634. @staticmethod
  635. def _reconciled_im_status_from_ray_status(
  636. ray_status: NodeStatus, cur_im_status: IMInstance.InstanceStatus
  637. ) -> "IMInstance.InstanceStatus":
  638. """
  639. Reconcile the instance status from the ray node status.
  640. Args:
  641. ray_status: the current ray node status.
  642. cur_im_status: the current IM instance status.
  643. Returns:
  644. The reconciled IM instance status
  645. Raises:
  646. ValueError: If the ray status is unknown.
  647. """
  648. reconciled_im_status = None
  649. if ray_status in [NodeStatus.RUNNING, NodeStatus.IDLE]:
  650. reconciled_im_status = IMInstance.RAY_RUNNING
  651. elif ray_status == NodeStatus.DEAD:
  652. reconciled_im_status = IMInstance.RAY_STOPPED
  653. elif ray_status == NodeStatus.DRAINING:
  654. reconciled_im_status = IMInstance.RAY_STOPPING
  655. else:
  656. raise ValueError(f"Unknown ray status: {ray_status}")
  657. if (
  658. cur_im_status == reconciled_im_status
  659. or cur_im_status
  660. in InstanceUtil.get_reachable_statuses(reconciled_im_status)
  661. ):
  662. # No need to reconcile if the instance is already in the reconciled status
  663. # or has already transitioned beyond it.
  664. return cur_im_status
  665. return reconciled_im_status
  666. @staticmethod
  667. def _handle_instances_launch(
  668. instance_manager: InstanceManager, autoscaling_config: AutoscalingConfig
  669. ):
  670. instances, version = Reconciler._get_im_instances(instance_manager)
  671. queued_instances = []
  672. requested_instances = []
  673. running_instances = []
  674. for instance in instances:
  675. if instance.status == IMInstance.QUEUED:
  676. queued_instances.append(instance)
  677. elif instance.status == IMInstance.REQUESTED:
  678. requested_instances.append(instance)
  679. elif instance.status == IMInstance.RAY_RUNNING:
  680. running_instances.append(instance)
  681. if not queued_instances:
  682. # No QUEUED instances
  683. return
  684. to_launch = Reconciler._compute_to_launch(
  685. queued_instances,
  686. requested_instances,
  687. running_instances,
  688. autoscaling_config.get_upscaling_speed(),
  689. autoscaling_config.get_max_concurrent_launches(),
  690. )
  691. # Transition the instances to REQUESTED for instance launcher to
  692. # launch them.
  693. updates = {}
  694. new_launch_request_id = str(uuid.uuid4())
  695. for instance_type, instances in to_launch.items():
  696. for instance in instances:
  697. # Reuse launch request id for any QUEUED instances that have been
  698. # requested before due to retry.
  699. launch_request_id = (
  700. new_launch_request_id
  701. if len(instance.launch_request_id) == 0
  702. else instance.launch_request_id
  703. )
  704. updates[instance.instance_id] = IMInstanceUpdateEvent(
  705. instance_id=instance.instance_id,
  706. new_instance_status=IMInstance.REQUESTED,
  707. launch_request_id=launch_request_id,
  708. instance_type=instance_type,
  709. details=(
  710. f"requested to launch {instance_type} with request id "
  711. f"{launch_request_id}"
  712. ),
  713. )
  714. Reconciler._update_instance_manager(instance_manager, version, updates)
  715. @staticmethod
  716. def _compute_to_launch(
  717. queued_instances: List[IMInstance],
  718. requested_instances: List[IMInstance],
  719. running_instances: List[IMInstance],
  720. upscaling_speed: float,
  721. max_concurrent_launches: int,
  722. ) -> Dict[NodeType, List[IMInstance]]:
  723. def _group_by_type(instances):
  724. instances_by_type = defaultdict(list)
  725. for instance in instances:
  726. instances_by_type[instance.instance_type].append(instance)
  727. return instances_by_type
  728. # Sort the instances by the time they were queued.
  729. def _sort_by_earliest_queued(instance: IMInstance) -> List[int]:
  730. queue_times = InstanceUtil.get_status_transition_times_ns(
  731. instance, IMInstance.QUEUED
  732. )
  733. return sorted(queue_times)
  734. queued_instances_by_type = _group_by_type(queued_instances)
  735. running_instances_by_type = _group_by_type(running_instances)
  736. total_num_requested_to_launch = len(requested_instances)
  737. all_to_launch: Dict[NodeType : List[IMInstance]] = defaultdict(list)
  738. for (
  739. instance_type,
  740. queued_instances_for_type,
  741. ) in queued_instances_by_type.items():
  742. running_instances_for_type = running_instances_by_type.get(
  743. instance_type, []
  744. )
  745. # Enforce the max allowed pending nodes based on current running nodes
  746. num_desired_to_upscale = max(
  747. 1,
  748. math.ceil(upscaling_speed * max(len(running_instances_for_type), 1)),
  749. )
  750. # Enforce global limit, at most we can launch `max_concurrent_launches`
  751. num_to_launch = min(
  752. max_concurrent_launches - total_num_requested_to_launch,
  753. num_desired_to_upscale,
  754. )
  755. # Cap both ends 0 <= num_to_launch <= num_queued
  756. num_to_launch = max(0, num_to_launch)
  757. num_to_launch = min(len(queued_instances_for_type), num_to_launch)
  758. to_launch = sorted(queued_instances_for_type, key=_sort_by_earliest_queued)[
  759. :num_to_launch
  760. ]
  761. all_to_launch[instance_type].extend(to_launch)
  762. total_num_requested_to_launch += num_to_launch
  763. return all_to_launch
  764. @staticmethod
  765. def _handle_stuck_instances(
  766. instance_manager: InstanceManager,
  767. reconcile_config: InstanceReconcileConfig,
  768. _logger: logging.Logger,
  769. ):
  770. """
  771. Handle stuck instances with timeouts.
  772. Instances could be stuck in the following status and needs to be updated:
  773. - REQUESTED: cloud provider is slow/fails to launch instances.
  774. - ALLOCATED: ray fails to be started on the instance.
  775. - RAY_INSTALLING: ray fails to be installed on the instance.
  776. - TERMINATING: cloud provider is slow/fails to terminate instances.
  777. Instances could be in the following status which could be unbounded or
  778. transient, and we don't have a timeout mechanism to handle them. We would
  779. warn if they are stuck for too long:
  780. - RAY_STOPPING: ray taking time to drain.
  781. - QUEUED: cloud provider is slow to launch instances, resulting in long
  782. queue.
  783. Reconciler should handle below statuses, if not, could be slow
  784. reconcilation loop or a bug:
  785. - RAY_INSTALL_FAILED
  786. - RAY_STOPPED
  787. - TERMINATION_FAILED
  788. Args:
  789. instance_manager: The instance manager to reconcile.
  790. reconcile_config: The instance reconcile config.
  791. _logger: The logger to log the warning messages. It's used for testing.
  792. """
  793. instances, version = Reconciler._get_im_instances(instance_manager)
  794. instances_by_status = defaultdict(list)
  795. for instance in instances:
  796. instances_by_status[instance.status].append(instance)
  797. im_updates = {}
  798. # Fail or retry the cloud instance allocation if it's stuck
  799. # in the REQUESTED state.
  800. for instance in instances_by_status[IMInstance.REQUESTED]:
  801. update = Reconciler._handle_stuck_requested_instance(
  802. instance,
  803. reconcile_config.request_status_timeout_s,
  804. reconcile_config.max_num_retry_request_to_allocate,
  805. )
  806. if update:
  807. im_updates[instance.instance_id] = update
  808. # Leaked ALLOCATED instances should be terminated.
  809. # This usually happens when ray fails to be started on the instance, so
  810. # it's unable to be RAY_RUNNING after a long time.
  811. for instance in instances_by_status[IMInstance.ALLOCATED]:
  812. assert (
  813. instance.cloud_instance_id
  814. ), "cloud instance id should be set on ALLOCATED instance"
  815. update = Reconciler._handle_stuck_instance(
  816. instance,
  817. reconcile_config.allocate_status_timeout_s,
  818. new_status=IMInstance.ALLOCATION_TIMEOUT,
  819. cloud_instance_id=instance.cloud_instance_id,
  820. instance_type=instance.instance_type,
  821. )
  822. if update:
  823. im_updates[instance.instance_id] = update
  824. # Fail the installation if it's stuck in RAY_INSTALLING for too long.
  825. # If RAY_INSTALLING is stuck for too long, it's likely that the instance
  826. # is not able to install ray, so we should also fail the installation.
  827. for instance in instances_by_status[IMInstance.RAY_INSTALLING]:
  828. update = Reconciler._handle_stuck_instance(
  829. instance,
  830. reconcile_config.ray_install_status_timeout_s,
  831. new_status=IMInstance.RAY_INSTALL_FAILED,
  832. )
  833. if update:
  834. im_updates[instance.instance_id] = update
  835. # If we tried to terminate the instance, but it doesn't terminate (disappear
  836. # from the cloud provider) after a long time, we fail the termination.
  837. # This will trigger another attempt to terminate the instance.
  838. for instance in instances_by_status[IMInstance.TERMINATING]:
  839. update = Reconciler._handle_stuck_instance(
  840. instance,
  841. reconcile_config.terminating_status_timeout_s,
  842. new_status=IMInstance.TERMINATION_FAILED,
  843. )
  844. if update:
  845. im_updates[instance.instance_id] = update
  846. # If we tried to stop ray on the instance, but it doesn't stop after a long
  847. # time, we will transition it back to RAY_RUNNING as the stop/drain somehow
  848. # failed. If it had succeed, we should have transitioned it to RAY_STOPPING
  849. # or RAY_STOPPED.
  850. for instance in instances_by_status[IMInstance.RAY_STOP_REQUESTED]:
  851. update = Reconciler._handle_stuck_instance(
  852. instance,
  853. reconcile_config.ray_stop_requested_status_timeout_s,
  854. new_status=IMInstance.RAY_RUNNING,
  855. ray_node_id=instance.node_id,
  856. )
  857. if update:
  858. im_updates[instance.instance_id] = update
  859. # These statues could be unbounded or transient, and we don't have a timeout
  860. # mechanism to handle them. We only warn if they are stuck for too long.
  861. for status in [
  862. # Ray taking time to drain. We could also have a timeout when Drain protocol
  863. # supports timeout.
  864. IMInstance.RAY_STOPPING,
  865. # These should just be transient, we will terminate instances with this
  866. # status in the next reconciler step.
  867. IMInstance.RAY_INSTALL_FAILED,
  868. IMInstance.RAY_STOPPED,
  869. IMInstance.TERMINATION_FAILED,
  870. # Instances could be in the QUEUED status for a long time if the cloud
  871. # provider is slow to launch instances.
  872. IMInstance.QUEUED,
  873. ]:
  874. Reconciler._warn_stuck_instances(
  875. instances_by_status[status],
  876. status=status,
  877. warn_interval_s=reconcile_config.transient_status_warn_interval_s,
  878. logger=_logger,
  879. )
  880. Reconciler._update_instance_manager(instance_manager, version, im_updates)
  881. @staticmethod
  882. def _warn_stuck_instances(
  883. instances: List[IMInstance],
  884. status: IMInstance.InstanceStatus,
  885. warn_interval_s: int,
  886. logger: logging.Logger,
  887. ):
  888. """Warn if any instance is stuck in a transient/unbounded status for too
  889. long.
  890. """
  891. for instance in instances:
  892. status_times_ns = InstanceUtil.get_status_transition_times_ns(
  893. instance, select_instance_status=status
  894. )
  895. assert len(status_times_ns) >= 1
  896. status_time_ns = sorted(status_times_ns)[-1]
  897. if time.time_ns() - status_time_ns > warn_interval_s * 1e9:
  898. logger.warning(
  899. "Instance {}({}) is stuck in {} for {} seconds.".format(
  900. instance.instance_id,
  901. IMInstance.InstanceStatus.Name(instance.status),
  902. IMInstance.InstanceStatus.Name(status),
  903. (time.time_ns() - status_time_ns) // 1e9,
  904. )
  905. )
  906. @staticmethod
  907. def _is_head_node_running(instance_manager: InstanceManager) -> bool:
  908. """
  909. Check if the head node is running and ready.
  910. If we scale up the cluster before head node is running,
  911. it would cause issues when launching the worker nodes.
  912. There are corner cases when the GCS is up (so the ray cluster resource
  913. state is retrievable from the GCS), but the head node's raylet is not
  914. running so the head node is missing from the reported nodes. This happens
  915. when the head node is still starting up, or the raylet is not running
  916. due to some issues, and this would yield false.
  917. Args:
  918. instance_manager: The instance manager to reconcile.
  919. Returns:
  920. True if the head node is running and ready, False otherwise.
  921. """
  922. im_instances, _ = Reconciler._get_im_instances(instance_manager)
  923. for instance in im_instances:
  924. if instance.node_kind == NodeKind.HEAD:
  925. if instance.status == IMInstance.RAY_RUNNING:
  926. return True
  927. return False
  928. @staticmethod
  929. def _scale_cluster(
  930. autoscaling_state: AutoscalingState,
  931. instance_manager: InstanceManager,
  932. cloud_resource_monitor: CloudResourceMonitor,
  933. ray_state: ClusterResourceState,
  934. scheduler: IResourceScheduler,
  935. autoscaling_config: AutoscalingConfig,
  936. ) -> None:
  937. """
  938. Scale the cluster based on the resource state and the resource scheduler's
  939. decision:
  940. - It launches new instances if needed.
  941. - It terminates extra ray nodes if they should be shut down (preemption
  942. or idle termination)
  943. Args:
  944. autoscaling_state: The autoscaling state to reconcile.
  945. instance_manager: The instance manager to reconcile.
  946. cloud_resource_monitor: The cloud resource monitor for monitoring resource
  947. availability of all node types.
  948. ray_state: The ray cluster's resource state.
  949. scheduler: The resource scheduler to make scaling decisions.
  950. autoscaling_config: The autoscaling config.
  951. """
  952. # Get the current instance states.
  953. im_instances, version = Reconciler._get_im_instances(instance_manager)
  954. im_instances_by_instance_id = {
  955. i.instance_id: i for i in im_instances if i.instance_id
  956. }
  957. autoscaler_instances = []
  958. ray_nodes_by_id = {
  959. binary_to_hex(node.node_id): node for node in ray_state.node_states
  960. }
  961. for im_instance in im_instances:
  962. ray_node = ray_nodes_by_id.get(im_instance.node_id)
  963. autoscaler_instances.append(
  964. AutoscalerInstance(
  965. ray_node=ray_node,
  966. im_instance=im_instance,
  967. cloud_instance_id=(
  968. im_instance.cloud_instance_id
  969. if im_instance.cloud_instance_id
  970. else None
  971. ),
  972. )
  973. )
  974. # TODO(rickyx): We should probably name it as "Planner" or "Scaler"
  975. # or "ClusterScaler"
  976. sched_request = SchedulingRequest(
  977. node_type_configs=autoscaling_config.get_node_type_configs(),
  978. max_num_nodes=autoscaling_config.get_max_num_nodes(),
  979. resource_requests=ray_state.pending_resource_requests,
  980. gang_resource_requests=ray_state.pending_gang_resource_requests,
  981. cluster_resource_constraints=ray_state.cluster_resource_constraints,
  982. current_instances=autoscaler_instances,
  983. idle_timeout_s=autoscaling_config.get_idle_timeout_s(),
  984. disable_launch_config_check=(
  985. autoscaling_config.disable_launch_config_check()
  986. ),
  987. cloud_resource_availabilities=(
  988. cloud_resource_monitor.get_resource_availabilities()
  989. ),
  990. )
  991. # Ask scheduler for updates to the cluster shape.
  992. reply = scheduler.schedule(sched_request)
  993. # Populate the autoscaling state.
  994. autoscaling_state.infeasible_resource_requests.extend(
  995. reply.infeasible_resource_requests
  996. )
  997. autoscaling_state.infeasible_gang_resource_requests.extend(
  998. reply.infeasible_gang_resource_requests
  999. )
  1000. autoscaling_state.infeasible_cluster_resource_constraints.extend(
  1001. reply.infeasible_cluster_resource_constraints
  1002. )
  1003. if not Reconciler._is_head_node_running(instance_manager):
  1004. # We shouldn't be scaling the cluster until the head node is ready.
  1005. # This could happen when the head node (i.e. the raylet) is still
  1006. # pending registration even though GCS is up.
  1007. # We will wait until the head node is running and ready to avoid
  1008. # scaling the cluster from min worker nodes constraint.
  1009. return
  1010. if autoscaling_config.provider == Provider.READ_ONLY:
  1011. # We shouldn't be scaling the cluster if the provider is read-only.
  1012. return
  1013. # Scale the clusters if needed.
  1014. to_launch = reply.to_launch
  1015. to_terminate = reply.to_terminate
  1016. updates = {}
  1017. # Add terminating instances.
  1018. for terminate_request in to_terminate:
  1019. instance_id = terminate_request.instance_id
  1020. if terminate_request.instance_status == IMInstance.QUEUED:
  1021. # QUEUED instances have no cloud resources allocated yet.
  1022. # Cancel the allocation request by transitioning directly to TERMINATED.
  1023. updates[terminate_request.instance_id] = IMInstanceUpdateEvent(
  1024. instance_id=instance_id,
  1025. new_instance_status=IMInstance.TERMINATED,
  1026. termination_request=terminate_request,
  1027. details=f"allocation canceled: {terminate_request.details}",
  1028. )
  1029. elif terminate_request.instance_status in (
  1030. IMInstance.ALLOCATED,
  1031. IMInstance.RAY_INSTALLING,
  1032. ):
  1033. # The instance is not yet running, so we can't request to stop/drain Ray.
  1034. # Therefore, we can skip the RAY_STOP_REQUESTED state and directly terminate the node.
  1035. im_instance_to_terminate = im_instances_by_instance_id[instance_id]
  1036. updates[terminate_request.instance_id] = IMInstanceUpdateEvent(
  1037. instance_id=instance_id,
  1038. new_instance_status=IMInstance.TERMINATING,
  1039. cloud_instance_id=im_instance_to_terminate.cloud_instance_id,
  1040. termination_request=terminate_request,
  1041. details=f"terminating ray: {terminate_request.details}",
  1042. )
  1043. else:
  1044. updates[terminate_request.instance_id] = IMInstanceUpdateEvent(
  1045. instance_id=instance_id,
  1046. new_instance_status=IMInstance.RAY_STOP_REQUESTED,
  1047. termination_request=terminate_request,
  1048. details=f"draining ray: {terminate_request.details}",
  1049. )
  1050. # Add new instances.
  1051. for launch_request in to_launch:
  1052. for _ in range(launch_request.count):
  1053. instance_id = InstanceUtil.random_instance_id()
  1054. updates[instance_id] = IMInstanceUpdateEvent(
  1055. instance_id=instance_id,
  1056. new_instance_status=IMInstance.QUEUED,
  1057. instance_type=launch_request.instance_type,
  1058. upsert=True,
  1059. details=(
  1060. f"queuing new instance of {launch_request.instance_type} "
  1061. "from scheduler"
  1062. ),
  1063. )
  1064. Reconciler._update_instance_manager(instance_manager, version, updates)
  1065. @staticmethod
  1066. def _terminate_instances(instance_manager: InstanceManager):
  1067. """
  1068. Terminate instances with the below statuses:
  1069. - RAY_STOPPED: ray was stopped on the cloud instance.
  1070. - ALLOCATION_TIMEOUT: cloud provider timed out to allocate a running cloud instance.
  1071. - RAY_INSTALL_FAILED: ray installation failed on the cloud instance,
  1072. we will not retry.
  1073. - TERMINATION_FAILED: cloud provider failed to terminate the instance
  1074. or timeout for termination happened, we will retry again.
  1075. Args:
  1076. instance_manager: The instance manager to reconcile.
  1077. """
  1078. im_instances, version = Reconciler._get_im_instances(instance_manager)
  1079. updates = {}
  1080. for instance in im_instances:
  1081. if instance.status not in [
  1082. IMInstance.RAY_STOPPED,
  1083. IMInstance.ALLOCATION_TIMEOUT,
  1084. IMInstance.RAY_INSTALL_FAILED,
  1085. IMInstance.TERMINATION_FAILED,
  1086. ]:
  1087. continue
  1088. # Terminate the instance.
  1089. updates[instance.instance_id] = IMInstanceUpdateEvent(
  1090. instance_id=instance.instance_id,
  1091. new_instance_status=IMInstance.TERMINATING,
  1092. cloud_instance_id=instance.cloud_instance_id,
  1093. details="terminating instance from "
  1094. f"{IMInstance.InstanceStatus.Name(instance.status)}",
  1095. )
  1096. Reconciler._update_instance_manager(instance_manager, version, updates)
  1097. @staticmethod
  1098. def _install_ray(
  1099. instance_manager: InstanceManager,
  1100. non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
  1101. ) -> None:
  1102. """
  1103. Install ray on the allocated instances when it's ready (cloud instance
  1104. should be running)
  1105. This is needed if ray installation needs to be performed by
  1106. the instance manager.
  1107. Args:
  1108. instance_manager: The instance manager to reconcile.
  1109. """
  1110. im_instances, version = Reconciler._get_im_instances(instance_manager)
  1111. updates = {}
  1112. for instance in im_instances:
  1113. if instance.status != IMInstance.ALLOCATED:
  1114. continue
  1115. if instance.node_kind == NodeKind.HEAD:
  1116. # Skip head node.
  1117. continue
  1118. cloud_instance = non_terminated_cloud_instances.get(
  1119. instance.cloud_instance_id
  1120. )
  1121. assert cloud_instance, (
  1122. f"Cloud instance {instance.cloud_instance_id} is not found "
  1123. "in non_terminated_cloud_instances."
  1124. )
  1125. if not cloud_instance.is_running:
  1126. # It might still be pending (e.g. setting up ssh)
  1127. continue
  1128. # Install ray on the running cloud instance
  1129. updates[instance.instance_id] = IMInstanceUpdateEvent(
  1130. instance_id=instance.instance_id,
  1131. new_instance_status=IMInstance.RAY_INSTALLING,
  1132. details="installing ray",
  1133. )
  1134. Reconciler._update_instance_manager(instance_manager, version, updates)
  1135. @staticmethod
  1136. def _fill_autoscaling_state(
  1137. instance_manager: InstanceManager,
  1138. autoscaling_state: AutoscalingState,
  1139. ) -> None:
  1140. # Use the IM instance version for the autoscaler_state_version
  1141. instances, version = Reconciler._get_im_instances(instance_manager)
  1142. autoscaling_state.autoscaler_state_version = version
  1143. # Group instances by status
  1144. instances_by_status = defaultdict(list)
  1145. for instance in instances:
  1146. instances_by_status[instance.status].append(instance)
  1147. # Pending instance requests
  1148. instances_by_launch_request = defaultdict(list)
  1149. queued_instances = []
  1150. for instance in (
  1151. instances_by_status[IMInstance.REQUESTED]
  1152. + instances_by_status[IMInstance.QUEUED]
  1153. ):
  1154. if instance.launch_request_id:
  1155. instances_by_launch_request[instance.launch_request_id].append(instance)
  1156. else:
  1157. queued_instances.append(instance)
  1158. for _, instances in instances_by_launch_request.items():
  1159. num_instances_by_type = defaultdict(int)
  1160. for instance in instances:
  1161. num_instances_by_type[instance.instance_type] += 1
  1162. # All instances with same request id should have the same
  1163. # request time.
  1164. request_update = InstanceUtil.get_last_status_transition(
  1165. instances[0], IMInstance.REQUESTED
  1166. )
  1167. request_time_ns = request_update.timestamp_ns if request_update else 0
  1168. for instance_type, count in num_instances_by_type.items():
  1169. autoscaling_state.pending_instance_requests.append(
  1170. PendingInstanceRequest(
  1171. ray_node_type_name=instance_type,
  1172. count=int(count),
  1173. request_ts=int(request_time_ns // 1e9),
  1174. )
  1175. )
  1176. # Pending instances
  1177. for instance in (
  1178. instances_by_status[IMInstance.ALLOCATED]
  1179. + instances_by_status[IMInstance.RAY_INSTALLING]
  1180. ):
  1181. status_history = sorted(
  1182. instance.status_history, key=lambda x: x.timestamp_ns, reverse=True
  1183. )
  1184. autoscaling_state.pending_instances.append(
  1185. PendingInstance(
  1186. instance_id=instance.instance_id,
  1187. ray_node_type_name=instance.instance_type,
  1188. details=status_history[0].details,
  1189. )
  1190. )
  1191. # Failed instance requests
  1192. for instance in instances_by_status[IMInstance.ALLOCATION_FAILED]:
  1193. request_status_update = InstanceUtil.get_last_status_transition(
  1194. instance, IMInstance.REQUESTED
  1195. )
  1196. failed_status_update = InstanceUtil.get_last_status_transition(
  1197. instance, IMInstance.ALLOCATION_FAILED
  1198. )
  1199. failed_time = (
  1200. failed_status_update.timestamp_ns if failed_status_update else 0
  1201. )
  1202. request_time = (
  1203. request_status_update.timestamp_ns if request_status_update else 0
  1204. )
  1205. autoscaling_state.failed_instance_requests.append(
  1206. FailedInstanceRequest(
  1207. ray_node_type_name=instance.instance_type,
  1208. start_ts=int(request_time // 1e9),
  1209. failed_ts=int(
  1210. failed_time // 1e9,
  1211. ),
  1212. reason=failed_status_update.details,
  1213. count=1,
  1214. )
  1215. )
  1216. @staticmethod
  1217. def _handle_stuck_requested_instance(
  1218. instance: IMInstance, timeout_s: int, max_num_retry_request_to_allocate: int
  1219. ) -> Optional[IMInstanceUpdateEvent]:
  1220. """
  1221. Fail the cloud instance allocation if it's stuck in the REQUESTED state.
  1222. Args:
  1223. instance: The instance to handle.
  1224. timeout_s: The timeout in seconds.
  1225. max_num_retry_request_to_allocate: The maximum number of times an instance
  1226. could be requested to allocate.
  1227. Returns:
  1228. Instance update to ALLOCATION_FAILED: if the instance allocation failed
  1229. with errors.
  1230. None: if there's no update.
  1231. """
  1232. if not InstanceUtil.has_timeout(instance, timeout_s):
  1233. # Not timeout yet, be patient.
  1234. return None
  1235. all_request_times_ns = sorted(
  1236. InstanceUtil.get_status_transition_times_ns(
  1237. instance, select_instance_status=IMInstance.REQUESTED
  1238. )
  1239. )
  1240. # Fail the allocation if we have tried too many times.
  1241. if len(all_request_times_ns) > max_num_retry_request_to_allocate:
  1242. return IMInstanceUpdateEvent(
  1243. instance_id=instance.instance_id,
  1244. new_instance_status=IMInstance.ALLOCATION_FAILED,
  1245. details=(
  1246. "failed to allocate cloud instance after "
  1247. f"{len(all_request_times_ns)} attempts > "
  1248. f"max_num_retry_request_to_allocate={max_num_retry_request_to_allocate}" # noqa
  1249. ),
  1250. )
  1251. # Retry the allocation if we could by transitioning to QUEUED again.
  1252. return IMInstanceUpdateEvent(
  1253. instance_id=instance.instance_id,
  1254. new_instance_status=IMInstance.QUEUED,
  1255. details=f"queue again to launch after timeout={timeout_s}s",
  1256. )
  1257. @staticmethod
  1258. def _handle_stuck_instance(
  1259. instance: IMInstance,
  1260. timeout_s: int,
  1261. new_status: IMInstance.InstanceStatus,
  1262. **update_kwargs: Dict,
  1263. ) -> Optional[IMInstanceUpdateEvent]:
  1264. """
  1265. Fail the instance if it's stuck in the status for too long.
  1266. Args:
  1267. instance: The instance to handle.
  1268. timeout_s: The timeout in seconds.
  1269. new_status: The new status to transition to.
  1270. update_kwargs: The update kwargs for InstanceUpdateEvent
  1271. Returns:
  1272. Instance update to the new status: if the instance is stuck in the status
  1273. for too long.
  1274. None: if there's no update.
  1275. """
  1276. if not InstanceUtil.has_timeout(instance, timeout_s):
  1277. # Not timeout yet, be patient.
  1278. return None
  1279. return IMInstanceUpdateEvent(
  1280. instance_id=instance.instance_id,
  1281. new_instance_status=new_status,
  1282. details=f"timeout={timeout_s}s at status "
  1283. f"{IMInstance.InstanceStatus.Name(instance.status)}",
  1284. **update_kwargs,
  1285. )
  1286. @staticmethod
  1287. def _handle_extra_cloud_instances(
  1288. instance_manager: InstanceManager,
  1289. non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
  1290. ray_nodes: List[NodeState],
  1291. ):
  1292. """
  1293. For extra cloud instances (i.e. cloud instances that are non terminated as
  1294. returned by cloud provider, but not managed by the instance manager), we
  1295. will create new IM instances with ALLOCATED status.
  1296. Such instances could either be:
  1297. 1. Leaked instances that are incorrectly started by the cloud instance
  1298. provider, and they would be terminated eventually if they fail to
  1299. transition to RAY_RUNNING by stuck instances reconciliation, or they
  1300. would join the ray cluster and be terminated when the cluster scales down.
  1301. 2. Instances that are started by the cloud instance provider intentionally
  1302. but not yet discovered by the instance manager. This could happen for
  1303. a. Head node that's started before the autoscaler.
  1304. b. Worker nodes that's started by the cloud provider upon users'
  1305. actions: i.e. KubeRay scaling up the cluster with ray cluster config
  1306. change.
  1307. 3. Ray nodes with cloud instance id not in the cloud provider. This could
  1308. happen if there's delay in the Ray's state (i.e. cloud instance already
  1309. terminated, but the ray node is still not dead yet).
  1310. Args:
  1311. instance_manager: The instance manager to reconcile.
  1312. non_terminated_cloud_instances: The non-terminated cloud instances from
  1313. the cloud provider.
  1314. ray_nodes: The ray cluster's states of ray nodes.
  1315. """
  1316. Reconciler._handle_extra_cloud_instances_from_ray_nodes(
  1317. instance_manager, ray_nodes
  1318. )
  1319. Reconciler._handle_extra_cloud_instances_from_cloud_provider(
  1320. instance_manager, non_terminated_cloud_instances
  1321. )
  1322. @staticmethod
  1323. def _handle_extra_cloud_instances_from_cloud_provider(
  1324. instance_manager: InstanceManager,
  1325. non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
  1326. ):
  1327. """
  1328. For extra cloud instances that are not managed by the instance manager but
  1329. are running in the cloud provider, we will create new IM instances with
  1330. ALLOCATED status.
  1331. Args:
  1332. instance_manager: The instance manager to reconcile.
  1333. non_terminated_cloud_instances: The non-terminated cloud instances from
  1334. the cloud provider.
  1335. """
  1336. updates = {}
  1337. instances, version = Reconciler._get_im_instances(instance_manager)
  1338. cloud_instance_ids_managed_by_im = {
  1339. instance.cloud_instance_id
  1340. for instance in instances
  1341. if instance.cloud_instance_id
  1342. and instance.status
  1343. not in [IMInstance.TERMINATED, IMInstance.ALLOCATION_FAILED]
  1344. }
  1345. # Find the extra cloud instances that are not managed by the instance manager.
  1346. for cloud_instance_id, cloud_instance in non_terminated_cloud_instances.items():
  1347. if cloud_instance_id in cloud_instance_ids_managed_by_im:
  1348. continue
  1349. updates[cloud_instance_id] = IMInstanceUpdateEvent(
  1350. instance_id=InstanceUtil.random_instance_id(), # Assign a new id.
  1351. cloud_instance_id=cloud_instance_id,
  1352. new_instance_status=IMInstance.ALLOCATED,
  1353. node_kind=cloud_instance.node_kind,
  1354. instance_type=cloud_instance.node_type,
  1355. details=(
  1356. "allocated unmanaged cloud instance :"
  1357. f"{cloud_instance.cloud_instance_id} "
  1358. f"({NodeKind.Name(cloud_instance.node_kind)}) from cloud provider"
  1359. ),
  1360. upsert=True,
  1361. )
  1362. Reconciler._update_instance_manager(instance_manager, version, updates)
  1363. @staticmethod
  1364. def _handle_extra_cloud_instances_from_ray_nodes(
  1365. instance_manager: InstanceManager, ray_nodes: List[NodeState]
  1366. ):
  1367. """
  1368. For extra cloud instances reported by Ray but not managed by the instance
  1369. manager, we will create new IM instances with ALLOCATED status.
  1370. Args:
  1371. instance_manager: The instance manager to reconcile.
  1372. ray_nodes: The ray cluster's states of ray nodes.
  1373. """
  1374. updates = {}
  1375. instances, version = Reconciler._get_im_instances(instance_manager)
  1376. cloud_instance_ids_managed_by_im = {
  1377. instance.cloud_instance_id
  1378. for instance in instances
  1379. if instance.cloud_instance_id
  1380. and not instance.node_id
  1381. and instance.status
  1382. not in [IMInstance.TERMINATED, IMInstance.ALLOCATION_FAILED]
  1383. }
  1384. ray_node_ids_managed_by_im = {
  1385. instance.node_id for instance in instances if instance.node_id
  1386. }
  1387. for ray_node in ray_nodes:
  1388. if not ray_node.instance_id:
  1389. continue
  1390. ray_node_id = binary_to_hex(ray_node.node_id)
  1391. if ray_node_id in ray_node_ids_managed_by_im:
  1392. continue
  1393. cloud_instance_id = ray_node.instance_id
  1394. if cloud_instance_id in cloud_instance_ids_managed_by_im:
  1395. continue
  1396. is_head = is_head_node(ray_node)
  1397. updates[ray_node_id] = IMInstanceUpdateEvent(
  1398. instance_id=InstanceUtil.random_instance_id(), # Assign a new id.
  1399. cloud_instance_id=cloud_instance_id,
  1400. new_instance_status=IMInstance.ALLOCATED,
  1401. node_kind=NodeKind.HEAD if is_head else NodeKind.WORKER,
  1402. ray_node_id=ray_node_id,
  1403. instance_type=ray_node.ray_node_type_name,
  1404. details=(
  1405. "allocated unmanaged worker cloud instance from ray node: "
  1406. f"{ray_node_id}"
  1407. ),
  1408. upsert=True,
  1409. )
  1410. Reconciler._update_instance_manager(instance_manager, version, updates)
  1411. @staticmethod
  1412. def _report_metrics(
  1413. instance_manager: InstanceManager,
  1414. autoscaling_config: AutoscalingConfig,
  1415. metrics_reporter: Optional[AutoscalerMetricsReporter] = None,
  1416. ):
  1417. if not metrics_reporter:
  1418. return
  1419. instances, _ = Reconciler._get_im_instances(instance_manager)
  1420. node_type_configs = autoscaling_config.get_node_type_configs()
  1421. metrics_reporter.report_instances(instances, node_type_configs)
  1422. metrics_reporter.report_resources(instances, node_type_configs)