scheduler.py 71 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790
  1. import copy
  2. import logging
  3. import time
  4. import uuid
  5. from abc import ABC, abstractmethod
  6. from collections import defaultdict
  7. from dataclasses import dataclass, field
  8. from enum import Enum
  9. from typing import Dict, List, Optional, Tuple
  10. from ray._private.protobuf_compat import message_to_dict
  11. from ray.autoscaler._private.constants import AUTOSCALER_CONSERVE_GPU_NODES
  12. from ray.autoscaler._private.resource_demand_scheduler import (
  13. UtilizationScore,
  14. _fits,
  15. _inplace_subtract,
  16. )
  17. from ray.autoscaler.v2.event_logger import AutoscalerEventLogger
  18. from ray.autoscaler.v2.instance_manager.common import InstanceUtil
  19. from ray.autoscaler.v2.instance_manager.config import NodeTypeConfig
  20. from ray.autoscaler.v2.schema import AutoscalerInstance, NodeType
  21. from ray.autoscaler.v2.utils import ProtobufUtil, ResourceRequestUtil
  22. from ray.core.generated.autoscaler_pb2 import (
  23. ClusterResourceConstraint,
  24. GangResourceRequest,
  25. ResourceRequest,
  26. ResourceRequestByCount,
  27. )
  28. from ray.core.generated.common_pb2 import LabelSelectorOperator
  29. from ray.core.generated.instance_manager_pb2 import (
  30. Instance,
  31. LaunchRequest,
  32. NodeKind,
  33. TerminationRequest,
  34. )
  35. # ============= Resource Scheduling Service API =======================
  36. #
  37. # ResourceSchedulerService is a service that schedules resource bundles
  38. # to nodes. It's used by the autoscaler to schedule resource bundles
  39. # to determine the desired cluster size to satisfy the current resource
  40. # demands.
  41. #
  42. logger = logging.getLogger(__name__)
  43. @dataclass
  44. class SchedulingRequest:
  45. # If outdated node check through launch config is disabled.
  46. disable_launch_config_check: bool
  47. # Available node type configs
  48. node_type_configs: Dict[NodeType, NodeTypeConfig] = field(default_factory=dict)
  49. # Max number of worker nodes.
  50. max_num_nodes: Optional[int] = None
  51. # Idle timeout in seconds.
  52. idle_timeout_s: Optional[float] = None
  53. # TODO: This prob could be refactored into the ClusterStatus data class later.
  54. # The current ray resource requests.
  55. resource_requests: List[ResourceRequestByCount] = field(default_factory=list)
  56. # The Gang resource requests.
  57. gang_resource_requests: List[GangResourceRequest] = field(default_factory=list)
  58. # cluster resource constraints.
  59. cluster_resource_constraints: List[ClusterResourceConstraint] = field(
  60. default_factory=list
  61. )
  62. # The current instances.
  63. current_instances: List[AutoscalerInstance] = field(default_factory=list)
  64. # The cloud resource availability score. A low score indicates that resource
  65. # allocation for this node type has recently failed.
  66. cloud_resource_availabilities: Dict[NodeType, float] = field(default_factory=dict)
  67. @dataclass
  68. class SchedulingReply:
  69. # Instances to launch.
  70. to_launch: List[LaunchRequest] = field(default_factory=list)
  71. # To terminate.
  72. to_terminate: List[TerminationRequest] = field(default_factory=list)
  73. # The infeasible resource bundles.
  74. infeasible_resource_requests: List[ResourceRequest] = field(default_factory=list)
  75. # The infeasible gang resource bundles.
  76. infeasible_gang_resource_requests: List[GangResourceRequest] = field(
  77. default_factory=list
  78. )
  79. # The infeasible cluster resource constraints.
  80. infeasible_cluster_resource_constraints: List[ClusterResourceConstraint] = field(
  81. default_factory=list
  82. )
  83. class IResourceScheduler(ABC):
  84. """
  85. Interface for a resource scheduler.
  86. Implements the `instance_manager.proto ResourceSchedulerService` interface.
  87. """
  88. @abstractmethod
  89. def schedule(self, request: SchedulingRequest) -> SchedulingReply:
  90. """
  91. Given the resource requests and the current cluster state, calculate the
  92. target cluster shape by trying to schedule the resource requests on the
  93. nodes.
  94. """
  95. pass
  96. class SchedulingNodeStatus(Enum):
  97. """
  98. The status of a scheduling node (`SchedulingNode`)
  99. """
  100. # The node is added by the ResourceDemandScheduler.
  101. TO_LAUNCH = "TO_LAUNCH"
  102. # The node is pending, i.e. there's already an autoscaler instance being launched
  103. # The node is schedulable. It could be running ray or pending to run ray. Either
  104. # Way, it should be able to accept new resource requests/resource constraints.
  105. SCHEDULABLE = "SCHEDULABLE"
  106. # The node is to be terminated by the ResourceDemandScheduler
  107. TO_TERMINATE = "TO_TERMINATE"
  108. class ResourceRequestSource(Enum):
  109. """
  110. The source of the resource request.
  111. """
  112. # The resource request is from demand, e.g. ray tasks/actors,
  113. # placement groups, etc.
  114. PENDING_DEMAND = "PENDING_DEMAND"
  115. # The resource request is from the cluster resource constraints, i.e.
  116. # from ray.autoscaler.sdk.request_resources().
  117. CLUSTER_RESOURCE_CONSTRAINT = "CLUSTER_RESOURCE_CONSTRAINT"
  118. @dataclass
  119. class SchedulingNode:
  120. """
  121. A abstraction of a node that can be scheduled on by the resource scheduler.
  122. A scheduling node is expected to be used as:
  123. node = SchedulingNode.new(instance, node_configs)
  124. remaining, score = node.try_schedule(requests)
  125. .... do something with the score ....
  126. NOTE:
  127. One could also extend the scheduling behavior by overriding `try_schedule`
  128. """
  129. # Node type name.
  130. node_type: NodeType
  131. # Status
  132. status: SchedulingNodeStatus
  133. # Resource requests scheduled on this nodes for different sources.
  134. sched_requests: Dict[ResourceRequestSource, List[ResourceRequest]] = field(
  135. default_factory=lambda: defaultdict(list)
  136. )
  137. # Available resources for different sources of requests.
  138. available_resources_for_sched: Dict[
  139. ResourceRequestSource, Dict[str, float]
  140. ] = field(default_factory=dict)
  141. # The node's current resource capacity.
  142. total_resources: Dict[str, float] = field(default_factory=dict)
  143. # Node's labels, including static or dynamic labels.
  144. # Note that dynamic labels are a deprecated feature. And it is only used for the
  145. # autoscaler’s strict-spread placement group scheduling (antiaffinity)
  146. labels: Dict[str, str] = field(default_factory=dict)
  147. # Observability descriptive message for why the node was launched in the
  148. # first place.
  149. launch_reason: Optional[str] = None
  150. # Termination request, none when the node is not being terminated.
  151. termination_request: Optional[TerminationRequest] = None
  152. # The instance id of the IM(Instance Manager) instance. None if the node
  153. # is not yet in IM.
  154. im_instance_id: Optional[str] = None
  155. # The instance status of the IM(Instance Manager) instance. None if the in-flight node
  156. # has not yet been assigned to an IM instance.
  157. im_instance_status: Optional[Instance.InstanceStatus.ValueType] = None
  158. # The ray node id of the ray node. None if the node is not included in
  159. # ray cluster's GCS report yet (not running ray yet).
  160. ray_node_id: Optional[str] = None
  161. # Idle duration in ms. Default not idle.
  162. idle_duration_ms: int = 0
  163. # Launch config hash.
  164. launch_config_hash: Optional[str] = None
  165. # node kind.
  166. node_kind: NodeKind = NodeKind.WORKER
  167. def __init__(
  168. self,
  169. node_type: NodeType,
  170. total_resources: Dict[str, float],
  171. available_resources: Dict[str, float],
  172. labels: Dict[str, str],
  173. status: SchedulingNodeStatus,
  174. im_instance_id: str = "",
  175. im_instance_status: Optional[Instance.InstanceStatus.ValueType] = None,
  176. ray_node_id: str = "",
  177. idle_duration_ms: int = 0,
  178. launch_config_hash: str = "",
  179. node_kind: NodeKind = NodeKind.WORKER,
  180. termination_request: Optional[TerminationRequest] = None,
  181. ):
  182. self.node_type = node_type
  183. self.total_resources = total_resources
  184. self.available_resources_for_sched = {
  185. ResourceRequestSource.PENDING_DEMAND: dict(available_resources),
  186. ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT: dict(total_resources),
  187. }
  188. self.sched_requests = {
  189. ResourceRequestSource.PENDING_DEMAND: [],
  190. ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT: [],
  191. }
  192. self.labels = labels
  193. self.status = status
  194. self.im_instance_id = im_instance_id
  195. self.im_instance_status = im_instance_status
  196. self.ray_node_id = ray_node_id
  197. self.idle_duration_ms = idle_duration_ms
  198. self.launch_config_hash = launch_config_hash
  199. self.node_kind = node_kind
  200. self.termination_request = termination_request
  201. def get_available_resources(self, resource_request_source: ResourceRequestSource):
  202. """Get the available resources for the given resource request source."""
  203. return self.available_resources_for_sched[resource_request_source]
  204. def get_sched_requests(self, resource_request_source: ResourceRequestSource):
  205. """Get the resource requests for the given resource request source."""
  206. return self.sched_requests[resource_request_source]
  207. def add_sched_request(
  208. self,
  209. request: ResourceRequest,
  210. resource_request_source: ResourceRequestSource,
  211. ):
  212. """
  213. Add the resource requests to the node.
  214. Args:
  215. request: The resource request to be added.
  216. resource_request_source: The source of the resource request.
  217. """
  218. self.sched_requests[resource_request_source].append(request)
  219. @staticmethod
  220. def new(
  221. instance: AutoscalerInstance,
  222. node_type_configs: Dict[NodeType, NodeTypeConfig],
  223. disable_launch_config_check: bool,
  224. ) -> Optional["SchedulingNode"]:
  225. """
  226. Create a new scheduling node from an autoscaler instance.
  227. It creates:
  228. - None if the instance is not schedulable by IM.
  229. - A schedulable node if the instance is running ray or pending to run ray,
  230. so it should be considered in the scheduling process.
  231. Args:
  232. instance: The instance.
  233. node_type_configs: The node type configs.
  234. disable_launch_config_check: If outdated node check through launch config is
  235. disabled.
  236. """
  237. if not SchedulingNode.is_schedulable(instance):
  238. return None
  239. if instance.im_instance.status == Instance.RAY_RUNNING:
  240. assert instance.ray_node is not None, (
  241. "ray node should not be None "
  242. f"when the instance is running ray: instance={instance}"
  243. )
  244. # An running ray node
  245. return SchedulingNode(
  246. node_type=instance.im_instance.instance_type,
  247. total_resources=dict(instance.ray_node.total_resources),
  248. # Available resources for scheduling requests of different
  249. # sources.
  250. available_resources=dict(instance.ray_node.available_resources),
  251. labels={
  252. **(instance.ray_node.labels or {}),
  253. # DEPRECATED: Dynamic labels are a deprecated feature. This field
  254. # is used here only for the autoscaler’s strict-spread placement
  255. # group scheduling (antiaffinity).
  256. **(instance.ray_node.dynamic_labels or {}),
  257. },
  258. status=SchedulingNodeStatus.SCHEDULABLE,
  259. im_instance_id=instance.im_instance.instance_id,
  260. im_instance_status=instance.im_instance.status,
  261. ray_node_id=instance.im_instance.node_id,
  262. idle_duration_ms=instance.ray_node.idle_duration_ms,
  263. launch_config_hash=instance.im_instance.launch_config_hash,
  264. node_kind=instance.im_instance.node_kind,
  265. )
  266. # This is an instance pending to run ray. Initialize a schedulable node
  267. # from the node type config.
  268. node_config = node_type_configs.get(instance.im_instance.instance_type, None)
  269. if node_config is None:
  270. if disable_launch_config_check:
  271. # We are not terminating outdated nodes.
  272. logger.info(
  273. f"Node config for {instance.im_instance.instance_type} is missing, "
  274. "but we are not terminating the outdated node because "
  275. "`disable_launch_config_check` is True in "
  276. "the autoscaler's provider config."
  277. )
  278. return None
  279. # Configs might have been updated, and no more
  280. # node_type_configs for this node type. We should terminate it.
  281. return SchedulingNode(
  282. node_type=instance.im_instance.instance_type,
  283. total_resources={},
  284. available_resources={},
  285. labels={},
  286. status=SchedulingNodeStatus.TO_TERMINATE,
  287. im_instance_id=instance.im_instance.instance_id,
  288. im_instance_status=instance.im_instance.status,
  289. termination_request=TerminationRequest(
  290. id=str(uuid.uuid4()),
  291. instance_id=instance.im_instance.instance_id,
  292. instance_status=instance.im_instance.status,
  293. cause=TerminationRequest.Cause.OUTDATED,
  294. instance_type=instance.im_instance.instance_type,
  295. ),
  296. node_kind=NodeKind.WORKER,
  297. )
  298. return SchedulingNode.from_node_config(
  299. node_config,
  300. SchedulingNodeStatus.SCHEDULABLE,
  301. node_kind=instance.im_instance.node_kind,
  302. im_instance_id=instance.im_instance.instance_id,
  303. im_instance_status=instance.im_instance.status,
  304. )
  305. @staticmethod
  306. def is_schedulable(instance: AutoscalerInstance) -> bool:
  307. """
  308. Check if the instance is schedulable by IM.
  309. Args:
  310. instance: The instance.
  311. Returns:
  312. True if the instance is schedulable by IM.
  313. """
  314. if instance.im_instance is None:
  315. # We will skip any instances that are not yet in IM which
  316. # could be
  317. # 1. an out-of-band ray node
  318. # 2. an cloud instance running ray not yet discovered
  319. # by the IM's Reconciler
  320. # 3. an cloud instance already terminated but ray state
  321. # still lagging behind.
  322. #
  323. # In all of these cases, the instance is not schedulable or
  324. # shouldn't be managed by IM, so we don't consider them.
  325. return False
  326. # These are the statuses where there's a running ray node or
  327. # could eventually run ray.
  328. if InstanceUtil.is_ray_running_reachable(instance.im_instance.status):
  329. return True
  330. return False
  331. @staticmethod
  332. def from_node_config(
  333. node_config: NodeTypeConfig,
  334. status: SchedulingNodeStatus,
  335. node_kind: NodeKind,
  336. im_instance_id: Optional[str] = None,
  337. im_instance_status: Optional[str] = None,
  338. ) -> "SchedulingNode":
  339. """
  340. Create a scheduling node from a node config.
  341. Args:
  342. node_config: The node config.
  343. status: The status of the node.
  344. node_kind: The node kind.
  345. im_instance_id: The instance id of the im instance.
  346. im_instance_status: The instance status of the im instance.
  347. node_kind: The node kind.
  348. """
  349. return SchedulingNode(
  350. node_type=node_config.name,
  351. total_resources=dict(node_config.resources),
  352. available_resources=dict(node_config.resources),
  353. labels=dict(node_config.labels),
  354. status=status,
  355. im_instance_id=im_instance_id,
  356. im_instance_status=im_instance_status,
  357. node_kind=node_kind,
  358. )
  359. def __post_init__(self):
  360. assert self.node_type, "node_type should be set"
  361. def try_schedule(
  362. self,
  363. requests: List[ResourceRequest],
  364. resource_request_source: ResourceRequestSource,
  365. ) -> Tuple[List[ResourceRequest], UtilizationScore]:
  366. """
  367. Try to schedule the resource requests on this node.
  368. This modifies the node's available resources if the requests are schedulable.
  369. The requests are scheduled one by one in the sorted order, and no
  370. backtracking is done.
  371. Args:
  372. requests: The resource requests to be scheduled.
  373. resource_request_source: The source of the resource request, i.e.
  374. pending demands from ray actors/tasks or cluster resource constraints.
  375. Returns:
  376. A tuple of:
  377. - list of remaining requests that cannot be scheduled on this node.
  378. - the utilization score for this node with respect to the current
  379. resource requests being scheduled.
  380. """
  381. # Track the resource requests that cannot be scheduled on this node.
  382. unschedulable_requests = []
  383. # Sort the requests and try schedule them one by one.
  384. for r in requests:
  385. if not self._try_schedule_one(r, resource_request_source):
  386. unschedulable_requests.append(r)
  387. score = self._compute_score(resource_request_source)
  388. return unschedulable_requests, score
  389. def _compute_score(
  390. self, resource_request_source: ResourceRequestSource
  391. ) -> UtilizationScore:
  392. """
  393. Compute the utilization score for this node with respect to the current resource
  394. request being scheduled.
  395. A "higher" score means that this node is more suitable for scheduling the
  396. current scheduled resource requests.
  397. The score is a tuple of 5 values:
  398. 1. Whether this node has labels matching the current resource request's
  399. label_selector requirements:
  400. 0: if this node does not satisfy any label selector requirements or
  401. no label selectors are provided.
  402. len(label_selectors)-i: a score based on the priority of the label
  403. selector in the resource request that this node satisfies.
  404. 2. Whether this node is a GPU node and the current resource request has
  405. GPU requirements:
  406. 0: if this node is a GPU node and the current resource request
  407. placed onto the node has no GPU requirements.
  408. 1: if this node is not a GPU node or the current resource request
  409. placed onto the node has GPU requirements.
  410. 3. The number of resource types being scheduled.
  411. 4. The minimum utilization rate across all resource types.
  412. 5. The average utilization rate across all resource types.
  413. NOTE:
  414. This function is adapted from _resource_based_utilization_scorer from
  415. autoscaler v1.
  416. TODO(rickyx,jjyao): We should also consider node labels for
  417. scoring. For example, if a node has a label that matches the affinity
  418. label of the resource request, we should give it a higher score.
  419. TODO(rickyx): add pluggable scoring functions here.
  420. Returns:
  421. A utilization score for this node.
  422. """
  423. sched_requests = self.get_sched_requests(resource_request_source)
  424. available_resources = self.get_available_resources(resource_request_source)
  425. # Compute the number of resource types being scheduled.
  426. num_matching_resource_types = 0
  427. sched_resource_types = set()
  428. for req in sched_requests:
  429. for resource_name, v in req.resources_bundle.items():
  430. if v > 0:
  431. sched_resource_types.add(resource_name)
  432. for sched_resource_type in sched_resource_types:
  433. if sched_resource_type in self.total_resources:
  434. num_matching_resource_types += 1
  435. # Compute the utilization rate for each resource type
  436. util_by_resources = []
  437. for k, v in self.total_resources.items():
  438. if v == 0:
  439. # Skip any zero values.
  440. continue
  441. if k in available_resources:
  442. util = (v - available_resources.get(k, 0)) / v
  443. assert util >= 0 and util <= 1, f"Invalid utilization: {util}"
  444. util_by_resources.append(v * (util**3))
  445. # Prefer not to launch a GPU node if there aren't any GPU requirements in the
  446. # resource bundle.
  447. gpu_ok = True
  448. if AUTOSCALER_CONSERVE_GPU_NODES:
  449. # TODO: we should also generalize this optimization for accelerators.
  450. # https://github.com/ray-project/ray/issues/43079
  451. is_gpu_node = self.total_resources.get("GPU", 0) > 0
  452. any_gpu_requests = any("GPU" in r.resources_bundle for r in sched_requests)
  453. if is_gpu_node and not any_gpu_requests:
  454. gpu_ok = False
  455. # Check if node satisfies label requirements.
  456. matches_labels = self._satisfies_label_constraints(sched_requests)
  457. # Prioritize avoiding gpu nodes for non-gpu workloads first,
  458. # then prioritize matching multiple resource types,
  459. # then prioritize using all resources,
  460. # then prioritize overall balance of multiple resources.
  461. return (
  462. matches_labels,
  463. gpu_ok,
  464. num_matching_resource_types,
  465. min(util_by_resources) if util_by_resources else 0,
  466. float(sum(util_by_resources)) / len(util_by_resources)
  467. if util_by_resources
  468. else 0,
  469. )
  470. def _satisfies_label_constraints(
  471. self, sched_requests: List[ResourceRequest]
  472. ) -> int:
  473. """Returns a higher value based on the priority of the label selector this node
  474. satisfies (first returns highest score, decreasing sequentially for fallback), 0 otherwise."""
  475. for req in sched_requests:
  476. num_selectors = len(req.label_selectors)
  477. for i, selector in enumerate(req.label_selectors):
  478. all_constraints_pass = True
  479. for constraint in selector.label_constraints:
  480. key = constraint.label_key
  481. values = set(constraint.label_values)
  482. op = constraint.operator
  483. node_val = self.labels.get(key)
  484. if op == LabelSelectorOperator.LABEL_OPERATOR_IN:
  485. if node_val not in values:
  486. all_constraints_pass = False
  487. break
  488. elif op == LabelSelectorOperator.LABEL_OPERATOR_NOT_IN:
  489. if node_val in values:
  490. all_constraints_pass = False
  491. break
  492. else:
  493. all_constraints_pass = False
  494. break
  495. if all_constraints_pass:
  496. return num_selectors - i
  497. return 0
  498. def _try_schedule_one(
  499. self, request: ResourceRequest, resource_request_source: ResourceRequestSource
  500. ) -> bool:
  501. """
  502. Try to schedule one resource request on this node. The request could be from
  503. various sources, specified by `resource_request_source`.
  504. Args:
  505. request: The resource request to be scheduled.
  506. resource_request_source: The source of the resource request, i.e.
  507. pending demands from ray actors/tasks or cluster resource constraints.
  508. Returns:
  509. True if the resource request is scheduled on this node.
  510. """
  511. # Enforce label selector constraints
  512. if request.label_selectors:
  513. if self._satisfies_label_constraints([request]) == 0:
  514. return False # Node doesn't satisfy any label selector in request.
  515. # Check if there's placement constraints that are not satisfied.
  516. for constraint in request.placement_constraints:
  517. if constraint.HasField("anti_affinity"):
  518. anti_affinity = constraint.anti_affinity
  519. if (
  520. anti_affinity.label_name in self.labels
  521. and anti_affinity.label_value
  522. == self.labels[anti_affinity.label_name]
  523. ):
  524. # The node already has a label that matches the anti-affinity
  525. return False
  526. # We don't need to check for affinity constraints here since
  527. # we have already combined resource requests with the affinity
  528. # constraints into the same request at `combine_requests_with_affinity`.
  529. pass
  530. available_resources_dict = self.get_available_resources(resource_request_source)
  531. # Check if there's enough resources to schedule the request.
  532. if not _fits(available_resources_dict, dict(request.resources_bundle)):
  533. return False
  534. # Schedule the request, update resources
  535. _inplace_subtract(available_resources_dict, dict(request.resources_bundle))
  536. # Add the request to the node.
  537. self.add_sched_request(request, resource_request_source)
  538. # Update the placement group in labels if there's any
  539. for constraint in request.placement_constraints:
  540. # We don't need to check for affinity constraints here since
  541. # we have already combined resource requests with the affinity
  542. # constraints into the same request at `combine_requests_with_affinity`.
  543. # We don't need node labels for enforcing affinity.
  544. if constraint.HasField("anti_affinity"):
  545. anti_affinity = constraint.anti_affinity
  546. self._add_label(anti_affinity.label_name, anti_affinity.label_value)
  547. return True
  548. def _add_label(self, label_name: str, label_value: str):
  549. """
  550. Add a label to the node.
  551. This assumes a label key can only have one value.
  552. """
  553. assert (
  554. self.labels.get(label_name) is None
  555. or self.labels[label_name] == label_value
  556. ), (
  557. f"Label {label_name} already exists with value "
  558. f"{self.labels[label_name]}, cannot set to "
  559. f"{label_value}"
  560. )
  561. self.labels[label_name] = label_value
  562. def __repr__(self) -> str:
  563. return (
  564. "SchedulingNode(node_type={node_type}, "
  565. "node_kind={node_kind}, "
  566. "instance_id={instance_id},"
  567. "instance_status={instance_status},"
  568. "ray_node_id={ray_node_id},"
  569. "idle_duration_ms={idle_duration_ms},"
  570. "termination_request={termination_request},"
  571. "status={status}, "
  572. "total_resources={total_resources}, "
  573. "available_resources_for_demand={available_resources_for_demand}, "
  574. "available_resources_for_cluster_resource_constraints="
  575. "{available_resources_for_cluster_resource_constraints},"
  576. "labels={labels}, launch_reason={launch_reason}), "
  577. "sched_requests_for_demand={sched_requests_for_demand}), "
  578. "sched_requests_for_cluster_resource_constraints="
  579. "{sched_requests_for_cluster_resources_constraint})"
  580. ).format(
  581. node_type=self.node_type,
  582. node_kind=self.node_kind,
  583. instance_id=self.im_instance_id,
  584. instance_status=self.im_instance_status,
  585. ray_node_id=self.ray_node_id,
  586. idle_duration_ms=self.idle_duration_ms,
  587. termination_request=str(message_to_dict(self.termination_request))
  588. if self.termination_request
  589. else None,
  590. status=self.status,
  591. total_resources=self.total_resources,
  592. available_resources_for_demand=self.available_resources_for_sched[
  593. ResourceRequestSource.PENDING_DEMAND
  594. ],
  595. available_resources_for_cluster_resource_constraints=self.available_resources_for_sched[ # noqa
  596. ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT
  597. ],
  598. labels=self.labels,
  599. launch_reason=self.launch_reason,
  600. sched_requests_for_demand="|".join(
  601. str(message_to_dict(r))
  602. for r in self.sched_requests[ResourceRequestSource.PENDING_DEMAND]
  603. ),
  604. sched_requests_for_cluster_resources_constraint="|".join(
  605. str(message_to_dict(r))
  606. for r in self.sched_requests[
  607. ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT
  608. ]
  609. ),
  610. )
  611. class ResourceDemandScheduler(IResourceScheduler):
  612. """
  613. A resource demand scheduler that schedules resource requests based on the
  614. following rules:
  615. 1. Enforce the minimal count of nodes for each worker node type.
  616. 2. Enforce the cluster resource constraints.
  617. 3. Schedule the gang resource requests.
  618. 4. Schedule the tasks/actor resource requests
  619. """
  620. def __init__(self, event_logger: Optional[AutoscalerEventLogger] = None):
  621. self._event_logger = event_logger
  622. @dataclass
  623. class ScheduleContext:
  624. """
  625. Encapsulates the context for processing one scheduling request.
  626. This exposes functions to read and write the scheduling nodes, to prevent
  627. accidental modification of the internal state.
  628. """
  629. # The node type configs for this scheduling request.
  630. _node_type_configs: Dict[NodeType, NodeTypeConfig]
  631. # If outdated node check through launch config is disabled.
  632. _disable_launch_config_check: bool
  633. # The max number of nodes for the entire cluster.
  634. _max_num_nodes: Optional[int] = None
  635. # The idle timeout in seconds.
  636. _idle_timeout_s: Optional[float] = None
  637. # The current schedulable nodes (including pending nodes and pending requests).
  638. _nodes: List[SchedulingNode] = field(default_factory=list)
  639. # The number of nodes by node types available for launching based on the max
  640. # number of workers in the config. This takes into account any pending/running
  641. # nodes.
  642. _node_type_available: Dict[NodeType, int] = field(default_factory=dict)
  643. # The availability scores of cloud resource. A low score suggests that
  644. # this type of resource has historically experienced allocation failures,
  645. # and the weight of this type should be reduced during scheduling.
  646. _cloud_resource_availabilities: Dict[NodeType, float] = field(
  647. default_factory=dict
  648. )
  649. def __init__(
  650. self,
  651. nodes: List[SchedulingNode],
  652. node_type_configs: Dict[NodeType, NodeTypeConfig],
  653. cloud_resource_availabilities: Dict[NodeType, float],
  654. disable_launch_config_check: bool,
  655. max_num_nodes: Optional[int] = None,
  656. idle_timeout_s: Optional[float] = None,
  657. ):
  658. self._nodes = nodes
  659. self._node_type_configs = node_type_configs
  660. self._node_type_available = self._compute_available_node_types(
  661. nodes, node_type_configs
  662. )
  663. self._max_num_nodes = max_num_nodes
  664. self._idle_timeout_s = idle_timeout_s
  665. self._disable_launch_config_check = disable_launch_config_check
  666. self._cloud_resource_availabilities = cloud_resource_availabilities
  667. @classmethod
  668. def from_schedule_request(
  669. cls, req: SchedulingRequest
  670. ) -> "ResourceDemandScheduler.ScheduleContext":
  671. """
  672. Create a schedule context from a schedule request.
  673. It will populate the context with the existing nodes and the available node
  674. types from the config.
  675. Args:
  676. req: The scheduling request. The caller should make sure the
  677. request is valid.
  678. """
  679. nodes = []
  680. node_type_configs = req.node_type_configs
  681. # Initialize the scheduling nodes.
  682. for instance in req.current_instances:
  683. node = SchedulingNode.new(
  684. instance, node_type_configs, req.disable_launch_config_check
  685. )
  686. if node:
  687. nodes.append(node)
  688. return cls(
  689. nodes=nodes,
  690. node_type_configs=node_type_configs,
  691. cloud_resource_availabilities=req.cloud_resource_availabilities,
  692. disable_launch_config_check=req.disable_launch_config_check,
  693. max_num_nodes=req.max_num_nodes,
  694. idle_timeout_s=req.idle_timeout_s,
  695. )
  696. @staticmethod
  697. def _compute_available_node_types(
  698. nodes: List[SchedulingNode],
  699. node_type_configs: Dict[NodeType, NodeTypeConfig],
  700. ) -> Dict[NodeType, int]:
  701. """
  702. Compute the number of nodes by node types available for launching based on
  703. the max number of workers in the config.
  704. Args:
  705. nodes: The current existing nodes.
  706. node_type_configs: The node type configs.
  707. Returns:
  708. A dict of node types and the number of nodes available for launching.
  709. """
  710. node_type_available: Dict[NodeType, int] = defaultdict(int)
  711. node_type_existing: Dict[NodeType, int] = defaultdict(int)
  712. for node in nodes:
  713. node_type_existing[node.node_type] += 1
  714. for (
  715. node_type,
  716. node_type_config,
  717. ) in node_type_configs.items():
  718. node_type_available[
  719. node_type
  720. ] = node_type_config.max_worker_nodes - node_type_existing.get(
  721. node_type, 0
  722. )
  723. return node_type_available
  724. def get_nodes(self) -> List[SchedulingNode]:
  725. """
  726. Get the current nodes with filter.
  727. Returns:
  728. A list of nodes.
  729. """
  730. nodes = copy.deepcopy(self._nodes)
  731. return nodes
  732. def get_node_type_available(self) -> Dict[NodeType, int]:
  733. return copy.deepcopy(self._node_type_available)
  734. def get_cluster_shape(self) -> Dict[NodeType, int]:
  735. cluster_shape = defaultdict(int)
  736. for node in self._nodes:
  737. if node.status == SchedulingNodeStatus.TO_TERMINATE:
  738. # Skip the nodes that are to be terminated.
  739. continue
  740. cluster_shape[node.node_type] += 1
  741. return cluster_shape
  742. def get_cluster_resources(self) -> Dict[str, float]:
  743. """
  744. Aggregate total cluster resources.
  745. Sums each node's `total_resources` across the current context,
  746. excluding nodes marked `TO_TERMINATE`.
  747. Returns:
  748. A dict mapping resource names to their summed resources.
  749. """
  750. cluster_resources = defaultdict(float)
  751. for node in self._nodes:
  752. if node.status == SchedulingNodeStatus.TO_TERMINATE:
  753. # Skip the nodes that are to be terminated.
  754. continue
  755. for key, value in node.total_resources.items():
  756. cluster_resources[key] += value
  757. return cluster_resources
  758. def get_idle_timeout_s(self) -> Optional[float]:
  759. return self._idle_timeout_s
  760. def get_cloud_resource_availabilities(self) -> Dict[NodeType, float]:
  761. return copy.deepcopy(self._cloud_resource_availabilities)
  762. def update(self, new_nodes: List[SchedulingNode]) -> None:
  763. """
  764. Update the context with the new nodes.
  765. """
  766. self._nodes = new_nodes
  767. # Update the available node types.
  768. self._node_type_available = self._compute_available_node_types(
  769. self._nodes, self._node_type_configs
  770. )
  771. def get_max_num_nodes(self) -> Optional[int]:
  772. """
  773. Get the max number of nodes for the entire cluster.
  774. """
  775. return self._max_num_nodes
  776. def get_node_type_configs(self) -> Dict[NodeType, NodeTypeConfig]:
  777. return self._node_type_configs
  778. def __str__(self) -> str:
  779. return "ScheduleContext({} nodes, node_type_available={})".format(
  780. len(self._nodes), dict(self._node_type_available)
  781. )
  782. def get_launch_requests(self) -> List[LaunchRequest]:
  783. """
  784. Get the launch requests for the nodes that are to be launched.
  785. """
  786. launch_by_type = defaultdict(int)
  787. for node in self._nodes:
  788. if node.status == SchedulingNodeStatus.TO_LAUNCH:
  789. launch_by_type[node.node_type] += 1
  790. launch_requests = []
  791. for instance_type, count in launch_by_type.items():
  792. launch_requests.append(
  793. LaunchRequest(
  794. instance_type=instance_type,
  795. count=count,
  796. id=str(uuid.uuid4()),
  797. request_ts_ms=time.time_ns() // 1000,
  798. )
  799. )
  800. return launch_requests
  801. def get_terminate_requests(
  802. self,
  803. ) -> List[TerminationRequest]:
  804. """
  805. Get the terminate requests for the nodes that are to be terminated.
  806. """
  807. return [
  808. node.termination_request
  809. for node in self._nodes
  810. if node.termination_request is not None
  811. ]
  812. def schedule(self, request: SchedulingRequest) -> SchedulingReply:
  813. logger.debug(
  814. "Scheduling for request: resource_request={}, gang_resource_request={}, "
  815. "cluster_constraint={}".format(
  816. ResourceRequestUtil.to_dict_list(request.resource_requests),
  817. ProtobufUtil.to_dict_list(request.gang_resource_requests),
  818. ProtobufUtil.to_dict_list(request.cluster_resource_constraints),
  819. )
  820. )
  821. ctx = ResourceDemandScheduler.ScheduleContext.from_schedule_request(request)
  822. # Enforce outdate nodes.
  823. ResourceDemandScheduler._terminate_outdated_nodes(ctx)
  824. # Enforce the minimal count of nodes for each worker node type.
  825. ResourceDemandScheduler._enforce_min_workers_per_type(ctx)
  826. # Enforce the max worker nodes count.
  827. ResourceDemandScheduler._enforce_max_workers_per_type(ctx)
  828. # Enforce the max worker nodes count globally.
  829. ResourceDemandScheduler._enforce_max_workers_global(ctx)
  830. # Enforce the cluster resource constraints.
  831. infeasible_constraints = ResourceDemandScheduler._enforce_resource_constraints(
  832. ctx, request.cluster_resource_constraints
  833. )
  834. # Schedule the gang resource requests.
  835. infeasible_gang_requests = (
  836. ResourceDemandScheduler._sched_gang_resource_requests(
  837. ctx, request.gang_resource_requests
  838. )
  839. )
  840. # Schedule the tasks/actor resource requests
  841. infeasible_requests = ResourceDemandScheduler._sched_resource_requests(
  842. ctx,
  843. ResourceRequestUtil.ungroup_by_count(request.resource_requests),
  844. )
  845. # Shutdown any idle nodes that's not needed (e.g. no resource constraints.
  846. # not needed by min_worker count, etc.)
  847. ResourceDemandScheduler._enforce_idle_termination(ctx)
  848. # Compute the number of nodes to launch.
  849. reply = SchedulingReply(
  850. infeasible_resource_requests=infeasible_requests,
  851. infeasible_gang_resource_requests=infeasible_gang_requests,
  852. infeasible_cluster_resource_constraints=infeasible_constraints,
  853. to_launch=ctx.get_launch_requests(),
  854. to_terminate=ctx.get_terminate_requests(),
  855. )
  856. if self._event_logger is not None:
  857. try:
  858. self._event_logger.log_cluster_scheduling_update(
  859. launch_requests=reply.to_launch,
  860. terminate_requests=reply.to_terminate,
  861. infeasible_requests=infeasible_requests,
  862. infeasible_gang_requests=infeasible_gang_requests,
  863. infeasible_cluster_resource_constraints=infeasible_constraints,
  864. cluster_resources=ctx.get_cluster_resources(),
  865. )
  866. except Exception:
  867. logger.exception("Failed to emit event logs.")
  868. return reply
  869. @staticmethod
  870. def _enforce_max_workers_per_type(
  871. ctx: "ResourceDemandScheduler.ScheduleContext",
  872. ) -> None:
  873. """
  874. Enforce the max number of workers for each node type.
  875. """
  876. # Get all the nodes by type
  877. all_nodes = ctx.get_nodes()
  878. non_terminating_nodes_by_type = defaultdict(list)
  879. terminating_nodes = []
  880. for node in all_nodes:
  881. if node.status == SchedulingNodeStatus.TO_TERMINATE:
  882. terminating_nodes.append(node)
  883. else:
  884. non_terminating_nodes_by_type[node.node_type].append(node)
  885. # Step 1. Enforce the max number of workers for each node type.
  886. for node_type in non_terminating_nodes_by_type.keys():
  887. non_terminate_nodes_of_type = non_terminating_nodes_by_type[node_type]
  888. node_config = ctx.get_node_type_configs()[node_type]
  889. num_max_nodes_per_type = node_config.max_worker_nodes
  890. num_extra_nodes = len(non_terminate_nodes_of_type) - num_max_nodes_per_type
  891. if num_extra_nodes <= 0:
  892. # No extra nodes for this type, continue.
  893. continue
  894. # Terminate the nodes
  895. (
  896. to_terminate,
  897. remained_nodes,
  898. ) = ResourceDemandScheduler._select_nodes_to_terminate(
  899. non_terminate_nodes_of_type,
  900. num_extra_nodes,
  901. TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE,
  902. max_num_nodes_per_type=num_max_nodes_per_type,
  903. )
  904. non_terminating_nodes_by_type[node_type] = remained_nodes
  905. terminating_nodes.extend(to_terminate)
  906. non_terminating_nodes = []
  907. for nodes in non_terminating_nodes_by_type.values():
  908. non_terminating_nodes.extend(nodes)
  909. # Update the context
  910. assert len(all_nodes) == len(
  911. terminating_nodes + non_terminating_nodes
  912. ), "The number of nodes should be the same after enforcing max nodes per type."
  913. ctx.update(terminating_nodes + non_terminating_nodes)
  914. if terminating_nodes:
  915. logger.debug(
  916. f"Terminating {len(terminating_nodes)} "
  917. "nodes for per node type max num node's constraints."
  918. )
  919. @staticmethod
  920. def _enforce_max_workers_global(
  921. ctx: "ResourceDemandScheduler.ScheduleContext",
  922. ) -> None:
  923. """
  924. Enforce the max number of workers for the entire cluster.
  925. """
  926. all_nodes = ctx.get_nodes()
  927. terminating_nodes = []
  928. non_terminating_nodes = []
  929. for node in all_nodes:
  930. if node.status == SchedulingNodeStatus.TO_TERMINATE:
  931. terminating_nodes.append(node)
  932. else:
  933. non_terminating_nodes.append(node)
  934. num_max_nodes = ctx.get_max_num_nodes()
  935. num_to_terminate = (
  936. max(len(non_terminating_nodes) - num_max_nodes, 0) if num_max_nodes else 0
  937. )
  938. if num_to_terminate <= 0:
  939. # No extra nodes needed to terminate.
  940. return
  941. # Terminate the nodes
  942. (
  943. to_terminate_nodes,
  944. non_terminating_nodes,
  945. ) = ResourceDemandScheduler._select_nodes_to_terminate(
  946. non_terminating_nodes,
  947. num_to_terminate,
  948. TerminationRequest.Cause.MAX_NUM_NODES,
  949. max_num_nodes=num_max_nodes,
  950. )
  951. assert len(to_terminate_nodes) == num_to_terminate, (
  952. "Terminating {} nodes, failed to terminate {} nodes to "
  953. "satisfy max_num_nodes={}".format(
  954. len(to_terminate_nodes),
  955. num_to_terminate - len(to_terminate_nodes),
  956. num_max_nodes,
  957. )
  958. )
  959. # Update the context
  960. terminating_nodes.extend(to_terminate_nodes)
  961. assert len(all_nodes) == len(
  962. terminating_nodes + non_terminating_nodes
  963. ), "The number of nodes should be the same after enforcing max nodes."
  964. all_nodes = terminating_nodes + non_terminating_nodes
  965. ctx.update(all_nodes)
  966. @staticmethod
  967. def _select_nodes_to_terminate(
  968. nodes: List[SchedulingNode],
  969. num_to_terminate: int,
  970. cause: TerminationRequest.Cause,
  971. max_num_nodes: Optional[int] = None,
  972. max_num_nodes_per_type: Optional[int] = None,
  973. ) -> Tuple[List[SchedulingNode], List[SchedulingNode]]:
  974. """
  975. Select 'num_to_terminate' of nodes to be terminated
  976. from the 'nodes' list. It should never select a head node.
  977. Args:
  978. nodes: The nodes to be terminated.
  979. num_to_terminate: The number of nodes to be terminated.
  980. cause: The cause of the termination. Should be one of
  981. TerminationRequest.Cause.MAX_NUM_NODES or
  982. TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE.
  983. max_num_nodes: The max number of nodes for the entire cluster only
  984. used when the cause is TerminationRequest.Cause.MAX_NUM_NODES.
  985. max_num_nodes_per_type: The max number of nodes for each node type.
  986. Only used when the cause is
  987. TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE.
  988. Returns:
  989. A tuple of:
  990. - The terminated nodes.
  991. - The remained nodes.
  992. """
  993. # Sort the nodes for termination.
  994. nodes.sort(key=ResourceDemandScheduler._sort_nodes_for_termination)
  995. # Remove the head node from the list.
  996. head_node = None
  997. for i, node in enumerate(nodes):
  998. if node.node_kind == NodeKind.HEAD:
  999. # Remove the head node from the list.
  1000. head_node = nodes.pop(i)
  1001. break
  1002. terminated_nodes, remained_nodes = (
  1003. nodes[:num_to_terminate],
  1004. # The head could be None if there's no head node being reported yet
  1005. # from the ray cluster.
  1006. nodes[num_to_terminate:] + ([head_node] if head_node else []),
  1007. )
  1008. assert cause in [
  1009. TerminationRequest.Cause.MAX_NUM_NODES,
  1010. TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE,
  1011. ], "Other termination causes don't have to select nodes for termination."
  1012. for node in terminated_nodes:
  1013. node.status = SchedulingNodeStatus.TO_TERMINATE
  1014. node.termination_request = TerminationRequest(
  1015. id=str(uuid.uuid4()),
  1016. instance_id=node.im_instance_id,
  1017. ray_node_id=node.ray_node_id,
  1018. cause=cause,
  1019. instance_type=node.node_type,
  1020. instance_status=node.im_instance_status,
  1021. details=(
  1022. f"Terminating node due to {TerminationRequest.Cause.Name(cause)}: "
  1023. f"max_num_nodes={max_num_nodes}, "
  1024. f"max_num_nodes_per_type={max_num_nodes_per_type}"
  1025. ),
  1026. )
  1027. if cause == TerminationRequest.Cause.MAX_NUM_NODES:
  1028. node.termination_request.max_num_nodes = max_num_nodes
  1029. elif cause == TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE:
  1030. node.termination_request.max_num_nodes_per_type = max_num_nodes_per_type
  1031. else:
  1032. raise ValueError("Unknown termination cause: {}".format(cause))
  1033. return terminated_nodes, remained_nodes
  1034. @staticmethod
  1035. def _sort_nodes_for_termination(node: SchedulingNode) -> Tuple:
  1036. """
  1037. Sort the nodes for termination increasingly by:
  1038. 1. First if ray hasn't been started yet
  1039. 2. Then if the nodes are idle
  1040. 3. Then with lower resources util nodes first.
  1041. Such that nodes sorted earlier will be terminated first.
  1042. """
  1043. running_ray = len(node.ray_node_id) > 0
  1044. # Reverse the idle duration such that the nodes with the largest idle duration
  1045. # will be terminated first.
  1046. idle_dur = -1 * node.idle_duration_ms
  1047. available_resources = node.get_available_resources(
  1048. ResourceRequestSource.PENDING_DEMAND
  1049. )
  1050. utils_per_resources = {}
  1051. for resource, total in node.total_resources.items():
  1052. if total <= 0:
  1053. continue
  1054. utils_per_resources[resource] = (
  1055. total - available_resources.get(resource, 0)
  1056. ) / total
  1057. avg_util = (
  1058. sum(utils_per_resources.values()) / len(utils_per_resources)
  1059. if utils_per_resources
  1060. else 0
  1061. )
  1062. return (running_ray, idle_dur, avg_util)
  1063. @staticmethod
  1064. def _enforce_min_workers_per_type(
  1065. ctx: "ResourceDemandScheduler.ScheduleContext",
  1066. ) -> None:
  1067. """
  1068. Enforce the minimal count of nodes for each worker node type.
  1069. """
  1070. # Count the existing nodes by type
  1071. count_by_node_type = ctx.get_cluster_shape()
  1072. new_nodes = []
  1073. # Launch new nodes to satisfy min count for each node type.
  1074. for (
  1075. node_type,
  1076. node_type_config,
  1077. ) in ctx.get_node_type_configs().items():
  1078. cur_count = count_by_node_type.get(node_type, 0)
  1079. min_count = node_type_config.min_worker_nodes
  1080. if cur_count < min_count:
  1081. logger.info(
  1082. f"Adding {min_count - cur_count} nodes to satisfy min count for "
  1083. f"node type: {node_type}."
  1084. )
  1085. new_nodes.extend(
  1086. [
  1087. SchedulingNode.from_node_config(
  1088. copy.deepcopy(node_type_config),
  1089. status=SchedulingNodeStatus.TO_LAUNCH,
  1090. node_kind=NodeKind.WORKER,
  1091. )
  1092. ]
  1093. * (min_count - cur_count)
  1094. )
  1095. # NOTE: we assume the aggregated number of min workers across all node types
  1096. # should not exceed any globally enforced max_num_nodes
  1097. # Add the new nodes to the existing nodes and update the context.
  1098. ctx.update(new_nodes + ctx.get_nodes())
  1099. @staticmethod
  1100. def _enforce_resource_constraints(
  1101. ctx: "ResourceDemandScheduler.ScheduleContext",
  1102. constraints: List[ClusterResourceConstraint],
  1103. ) -> List[ClusterResourceConstraint]:
  1104. """
  1105. Enforce the cluster resource constraints.
  1106. Args:
  1107. ctx: The schedule context.
  1108. constraints: The cluster resource constraints.
  1109. Returns:
  1110. A list of infeasible constraints.
  1111. Notes:
  1112. It's different from the other scheduling functions since it doesn't actually
  1113. schedule any resource requests. Instead, it asks if the cluster could be
  1114. upscale to a certain shape to fulfill the constraints.
  1115. """
  1116. # NOTE: we currently only have 1 constraint from a cluster, but
  1117. # we may have multiple in the future.
  1118. assert len(constraints) <= 1, "Max 1 cluster resource constraint is supported."
  1119. if len(constraints) == 0:
  1120. # No cluster resource constraints - nothing needs to be done.
  1121. return []
  1122. constraint = constraints[0]
  1123. # Flatten the requests for iterating through.
  1124. requests = ResourceRequestUtil.ungroup_by_count(constraint.resource_requests)
  1125. # Pass the empty nodes to schedule.
  1126. scheduled_nodes, infeasible = ResourceDemandScheduler._try_schedule(
  1127. ctx,
  1128. requests,
  1129. resource_request_source=ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT,
  1130. )
  1131. if infeasible:
  1132. # Unable to satisfy the constraint.
  1133. return [constraint]
  1134. ctx.update(scheduled_nodes)
  1135. return []
  1136. @staticmethod
  1137. def _sched_resource_requests(
  1138. ctx: "ResourceDemandScheduler.ScheduleContext",
  1139. requests: List[ResourceRequest],
  1140. ) -> List[ResourceRequest]:
  1141. """
  1142. Schedule the resource requests.
  1143. Args:
  1144. ctx: The schedule context.
  1145. requests_by_count: The resource requests.
  1146. Returns:
  1147. A list of infeasible resource requests.
  1148. """
  1149. nodes, infeasible = ResourceDemandScheduler._try_schedule(
  1150. ctx, requests, resource_request_source=ResourceRequestSource.PENDING_DEMAND
  1151. )
  1152. # Regardless if there's feasible, we will update the context for schedule nodes.
  1153. ctx.update(nodes)
  1154. return infeasible
  1155. @staticmethod
  1156. def _sched_gang_resource_requests(
  1157. ctx: "ResourceDemandScheduler.ScheduleContext",
  1158. gang_requests: List[GangResourceRequest],
  1159. ) -> List[GangResourceRequest]:
  1160. """
  1161. Schedule the gang resource requests.
  1162. These requests should be scheduled atomically, i.e. either all of the resources
  1163. requests in a gang request are scheduled or none of them are scheduled.
  1164. For now, the gang resource requests represent Ray's placement groups, while it
  1165. could be more general in the future:
  1166. - For STRICT_PACK placement group requests, we combine them into a single
  1167. request and try to schedule them together.
  1168. - For STRICT_SPREAD placement groups requests, they should be scheduled on
  1169. different nodes by leveraging on the node labels that are associated with
  1170. the placement group.
  1171. If there are requests from rescheduling placement groups due to node
  1172. failures, these requests should not be scheduled on nodes with requests
  1173. from the same placement group.
  1174. Args:
  1175. ctx: The schedule context.
  1176. gang_requests: The gang resource requests.
  1177. Returns:
  1178. A list of infeasible gang resource requests.
  1179. """
  1180. def _sort_gang_resource_requests(req: GangResourceRequest) -> Tuple:
  1181. """
  1182. Key function for sorting the gang resource request by:
  1183. 1. the number of placement constraints in the gang request.
  1184. 2. the number of resource requests in the gang request.
  1185. """
  1186. total_placement_constraints = 0
  1187. for resource_request in req.requests:
  1188. total_placement_constraints += len(
  1189. resource_request.placement_constraints
  1190. )
  1191. return (total_placement_constraints, len(req.requests))
  1192. infeasible_gang_requests = []
  1193. # Try fulfilling the gang requests one by one.
  1194. for gang_req in sorted(
  1195. gang_requests, key=_sort_gang_resource_requests, reverse=True
  1196. ):
  1197. if gang_req.bundle_selectors:
  1198. # TODO: @ryanaoleary multiple `bundle_selectors` will be supported
  1199. # for `fallback_strategy`.
  1200. requests = gang_req.bundle_selectors[0].resource_requests
  1201. else:
  1202. # Use legacy field if `bundle_selectors` not provided.
  1203. requests = gang_req.requests
  1204. # Try to combine requests with affinity constraints into the same request.
  1205. requests = ResourceRequestUtil.combine_requests_with_affinity(requests)
  1206. nodes, infeasible = ResourceDemandScheduler._try_schedule(
  1207. ctx, requests, ResourceRequestSource.PENDING_DEMAND
  1208. )
  1209. if infeasible:
  1210. # Unable to satisfy the constraint. We will skip the gang request.
  1211. # Don't update the context.
  1212. infeasible_gang_requests.append(gang_req)
  1213. continue
  1214. # We are able to satisfy the constraint and thus update the context.
  1215. ctx.update(nodes)
  1216. return infeasible_gang_requests
  1217. @staticmethod
  1218. def _try_schedule(
  1219. ctx: "ResourceDemandScheduler.ScheduleContext",
  1220. requests_to_sched: List[ResourceRequest],
  1221. resource_request_source: ResourceRequestSource,
  1222. ) -> Tuple[List[SchedulingNode], List[ResourceRequest]]:
  1223. """
  1224. Try to schedule the resource requests on the current context.
  1225. It tries to schedule the requests on the existing nodes first, and
  1226. then try to schedule the requests on new nodes if possible.
  1227. Args:
  1228. requests_to_sched: The resource requests to be scheduled.
  1229. ctx: The current scheduling context.
  1230. resource_request_source: The source of the resource request, i.e.
  1231. pending demands from ray actors/tasks or cluster resource
  1232. constraints.
  1233. Returns:
  1234. - List of scheduled nodes to that have part or all of the requests
  1235. scheduled.
  1236. - List of infeasible requests remained that cannot be scheduled.
  1237. """
  1238. # First sort the requests.
  1239. def _sort_resource_request(req: ResourceRequest) -> Tuple:
  1240. """
  1241. Sort the resource requests by:
  1242. 1. The length of its placement constraints.
  1243. 2. The length of its first label selector constraints (if any).
  1244. 3. The number of resources it requests.
  1245. 4. The values of resources it requests.
  1246. 5. lexicographically for each resource (for stable ordering)
  1247. This is a legacy sorting function for the autoscaler's binpacking
  1248. algo - we do this so that we could have a deterministic scheduling
  1249. results with reasonable fragmentation.
  1250. """
  1251. label_constraint_len = (
  1252. len(req.label_selectors[0].label_constraints)
  1253. if req.label_selectors
  1254. else 0
  1255. )
  1256. return (
  1257. len(req.placement_constraints),
  1258. label_constraint_len,
  1259. len(req.resources_bundle.values()),
  1260. sum(req.resources_bundle.values()),
  1261. sorted(req.resources_bundle.items()),
  1262. )
  1263. requests_to_sched = sorted(
  1264. requests_to_sched, key=_sort_resource_request, reverse=True
  1265. )
  1266. existing_nodes = ctx.get_nodes()
  1267. node_type_available = ctx.get_node_type_available()
  1268. # A list of nodes that are either:
  1269. # 1. existing nodes in the cluster. or
  1270. # 2. new nodes that are launched to satisfy the resource requests.
  1271. target_nodes = []
  1272. # Try scheduling resource requests with existing nodes first.
  1273. while len(requests_to_sched) > 0 and len(existing_nodes) > 0:
  1274. (
  1275. best_node,
  1276. requests_to_sched,
  1277. existing_nodes,
  1278. ) = ResourceDemandScheduler._sched_best_node(
  1279. requests_to_sched,
  1280. existing_nodes,
  1281. resource_request_source,
  1282. ctx.get_cloud_resource_availabilities(),
  1283. )
  1284. if best_node is None:
  1285. # No existing nodes can schedule any more requests.
  1286. break
  1287. target_nodes.append(best_node)
  1288. # If there's any existing nodes left, we will add to the target nodes
  1289. target_nodes.extend(existing_nodes)
  1290. # Try scheduling resource requests with new nodes.
  1291. node_pools = [
  1292. SchedulingNode.from_node_config(
  1293. ctx.get_node_type_configs()[node_type],
  1294. status=SchedulingNodeStatus.TO_LAUNCH,
  1295. node_kind=NodeKind.WORKER,
  1296. )
  1297. for node_type, num_available in node_type_available.items()
  1298. if num_available > 0
  1299. ]
  1300. while len(requests_to_sched) > 0 and len(node_pools) > 0:
  1301. # Max number of nodes reached.
  1302. max_num_nodes = ctx.get_max_num_nodes()
  1303. if max_num_nodes is not None and len(target_nodes) >= max_num_nodes:
  1304. logger.debug(
  1305. "Max number of nodes reached: {}, "
  1306. "cannot launch more nodes.".format(max_num_nodes)
  1307. )
  1308. break
  1309. (
  1310. best_node,
  1311. requests_to_sched,
  1312. node_pools,
  1313. ) = ResourceDemandScheduler._sched_best_node(
  1314. requests_to_sched,
  1315. node_pools,
  1316. resource_request_source,
  1317. ctx.get_cloud_resource_availabilities(),
  1318. )
  1319. if best_node is None:
  1320. break
  1321. target_nodes.append(best_node)
  1322. # Update the node pool if a node with the same node type of the
  1323. # added node can be launched.
  1324. node_type_available[best_node.node_type] -= 1
  1325. if node_type_available[best_node.node_type] > 0:
  1326. node_pools.append(
  1327. SchedulingNode.from_node_config(
  1328. ctx.get_node_type_configs()[best_node.node_type],
  1329. status=SchedulingNodeStatus.TO_LAUNCH,
  1330. node_kind=NodeKind.WORKER,
  1331. )
  1332. )
  1333. return target_nodes, requests_to_sched
  1334. @staticmethod
  1335. def _sched_best_node(
  1336. requests: List[ResourceRequest],
  1337. nodes: List[SchedulingNode],
  1338. resource_request_source: ResourceRequestSource,
  1339. cloud_resource_availabilities: Dict[NodeType, float],
  1340. ) -> Tuple[SchedulingNode, List[ResourceRequest], List[SchedulingNode]]:
  1341. """
  1342. Schedule the requests on the best node.
  1343. A simple greedy algorithm is used to schedule the requests:
  1344. 1. Try to schedule the requests on each node.
  1345. 2. Sort the nodes by a score. The sorting includes:
  1346. 2.1. UtilizationScore: to maximize resource utilization.
  1347. 2.2. Cloud resource availabilities: prioritize node types with
  1348. the most available cloud resources, in order to minimize allocation
  1349. failures.
  1350. 3. Return the node with the highest score.
  1351. The highest score node is updated with the scheduled requests, and the node is
  1352. removed from the node list.
  1353. Args:
  1354. requests: The resource requests to be scheduled.
  1355. nodes: The node candidates to be scheduled on. The nodes will be updated
  1356. after the scheduling attempt, i.e. the node that is scheduled will be
  1357. removed from the list.
  1358. resource_request_source: The source of the resource request, i.e.
  1359. pending demands from ray actors/tasks or cluster resource constraints.
  1360. cloud_resource_availabilities: The cloud resource availability score. A low
  1361. score indicates that allocation for this node type has recently failed.
  1362. Returns:
  1363. best_node: The best node to schedule the requests.
  1364. infeasible: The infeasible requests that cannot be scheduled on the best
  1365. node.
  1366. nodes: Remaining nodes after the best node is removed.
  1367. """
  1368. results = []
  1369. # A temporary data class to store the scheduling result.
  1370. @dataclass
  1371. class ScheduleResult:
  1372. # The node candidate after a scheduling attempt.
  1373. node: SchedulingNode
  1374. # The infeasible resource requests that are not scheduled.
  1375. infeasible_requests: List[ResourceRequest]
  1376. # The index of the node in the original node list.
  1377. idx: int
  1378. # the score of the scheduling node to compare with others.
  1379. score: UtilizationScore
  1380. nodes_copy = copy.deepcopy(nodes)
  1381. # Iterate through each node and modify the node's available resources
  1382. # if the requests are schedulable.
  1383. for idx, node in enumerate(nodes_copy):
  1384. remaining, score = node.try_schedule(requests, resource_request_source)
  1385. if len(remaining) == len(requests):
  1386. # The node cannot schedule any of the requests.
  1387. continue
  1388. results.append(ScheduleResult(node, remaining, idx, score))
  1389. # No nodes can schedule any of the requests.
  1390. if len(results) == 0:
  1391. logger.debug(
  1392. "No nodes can schedule the requests: {}, for nodes: {}".format(
  1393. ResourceRequestUtil.to_dict_list(requests), nodes
  1394. )
  1395. )
  1396. return None, requests, nodes
  1397. # Sort the results by score.
  1398. results = sorted(
  1399. results,
  1400. key=lambda r: (
  1401. r.score,
  1402. cloud_resource_availabilities.get(r.node.node_type, 1),
  1403. ),
  1404. reverse=True,
  1405. )
  1406. best_result = results[0]
  1407. # Remove the best node from the nodes.
  1408. nodes.pop(best_result.idx)
  1409. logger.debug(
  1410. "Best node: {}, score: {}, remaining requests: {}".format(
  1411. best_result.node,
  1412. best_result.score,
  1413. ResourceRequestUtil.to_dict_list(best_result.infeasible_requests),
  1414. )
  1415. )
  1416. return best_result.node, best_result.infeasible_requests, nodes
  1417. @staticmethod
  1418. def _terminate_outdated_nodes(
  1419. ctx: "ResourceDemandScheduler.ScheduleContext",
  1420. ) -> None:
  1421. """
  1422. Terminate the nodes that are outdated, i.e. the node type config has been
  1423. updated or the node's launch config hash is outdated.
  1424. Args:
  1425. ctx: The schedule context.
  1426. """
  1427. nodes = ctx.get_nodes()
  1428. if ctx._disable_launch_config_check:
  1429. # Outdated nodes check through launch config check is disabled.
  1430. return
  1431. for node in nodes:
  1432. if node.status != SchedulingNodeStatus.SCHEDULABLE:
  1433. # We don't need to care about the non-running nodes.
  1434. continue
  1435. if node.node_kind == NodeKind.HEAD:
  1436. # We should not be terminating the head node even if it's outdated.
  1437. logger.warning(
  1438. f"Head node {node.im_instance_id}(ray={node.ray_node_id}) is "
  1439. "outdated with node config changes. "
  1440. "Please check the node's config or restart the cluster or restart "
  1441. "the head node. Autoscaler is not able to shutdown the outdated "
  1442. "head node"
  1443. )
  1444. continue
  1445. node_type = node.node_type
  1446. node_type_config = ctx.get_node_type_configs().get(node_type)
  1447. if node_type_config is None or (
  1448. node_type_config.launch_config_hash
  1449. and node_type_config.launch_config_hash != node.launch_config_hash
  1450. ):
  1451. # The node type config has been updated, and the node's launch config
  1452. # hash is outdated.
  1453. node.status = SchedulingNodeStatus.TO_TERMINATE
  1454. node.termination_request = TerminationRequest(
  1455. id=str(time.time_ns()),
  1456. instance_id=node.im_instance_id,
  1457. ray_node_id=node.ray_node_id,
  1458. instance_type=node.node_type,
  1459. instance_status=node.im_instance_status,
  1460. cause=TerminationRequest.Cause.OUTDATED,
  1461. details=f"node from {node.node_type} has outdated config",
  1462. )
  1463. ctx.update(nodes)
  1464. @staticmethod
  1465. def _enforce_idle_termination(
  1466. ctx: "ResourceDemandScheduler.ScheduleContext",
  1467. ) -> None:
  1468. """
  1469. Enforce the idle termination for the nodes that are not needed by the cluster
  1470. resource constraints and idle for too long.
  1471. Args:
  1472. ctx: The schedule context.
  1473. """
  1474. count_by_node_type = ctx.get_cluster_shape()
  1475. node_type_configs = ctx.get_node_type_configs()
  1476. terminate_nodes_by_type: Dict[NodeType, int] = defaultdict(int)
  1477. nodes = ctx.get_nodes()
  1478. s_to_ms = 1000
  1479. for node in nodes:
  1480. if node.status != SchedulingNodeStatus.SCHEDULABLE:
  1481. # We don't need to care about the non-running nodes.
  1482. continue
  1483. if node.node_kind == NodeKind.HEAD:
  1484. # The head node is not subject to idle termination.
  1485. continue
  1486. idle_timeout_s = ctx.get_idle_timeout_s()
  1487. # Override the scheduler idle_timeout_s if set for this node_type.
  1488. node_type = node.node_type
  1489. if node_type in node_type_configs:
  1490. if node_type_configs[node_type].idle_timeout_s is not None:
  1491. idle_timeout_s = node_type_configs[node_type].idle_timeout_s
  1492. if idle_timeout_s is None:
  1493. # No idle timeout is set, skip the idle termination.
  1494. continue
  1495. if node.idle_duration_ms <= idle_timeout_s * s_to_ms:
  1496. # The node is not idle for too long, skip it.
  1497. continue
  1498. if node.sched_requests[ResourceRequestSource.PENDING_DEMAND]:
  1499. # The node is needed by the pending requests.
  1500. # Skip it.
  1501. logger.debug(
  1502. "Node {} (idle for {} secs) is needed by the pending requests, "
  1503. "skip idle termination.".format(
  1504. node.ray_node_id, node.idle_duration_ms / s_to_ms
  1505. )
  1506. )
  1507. continue
  1508. if node.sched_requests[ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT]:
  1509. # The node is needed by the resource constraints.
  1510. # Skip it.
  1511. logger.debug(
  1512. "Node {} (idle for {} secs) is needed by the cluster resource "
  1513. "constraints, skip idle termination.".format(
  1514. node.ray_node_id, node.idle_duration_ms / s_to_ms
  1515. )
  1516. )
  1517. continue
  1518. # Honor the min_worker_nodes setting for the node type.
  1519. min_count = 0
  1520. if node_type in node_type_configs:
  1521. min_count = node_type_configs[node_type].min_worker_nodes
  1522. if (
  1523. count_by_node_type.get(node_type, 0)
  1524. - terminate_nodes_by_type[node_type]
  1525. <= min_count
  1526. ):
  1527. logger.info(
  1528. "Node {} (idle for {} secs) belongs to node_type {} and is "
  1529. "required by min_worker_nodes, skipping idle termination.".format(
  1530. node.ray_node_id, node.idle_duration_ms / s_to_ms, node_type
  1531. )
  1532. )
  1533. continue
  1534. terminate_nodes_by_type[node.node_type] += 1
  1535. # The node is idle for too long, terminate it.
  1536. node.status = SchedulingNodeStatus.TO_TERMINATE
  1537. node.termination_request = TerminationRequest(
  1538. id=str(uuid.uuid4()),
  1539. instance_id=node.im_instance_id,
  1540. ray_node_id=node.ray_node_id,
  1541. cause=TerminationRequest.Cause.IDLE,
  1542. instance_type=node.node_type,
  1543. instance_status=node.im_instance_status,
  1544. idle_duration_ms=node.idle_duration_ms,
  1545. details=f"idle for {node.idle_duration_ms/s_to_ms} secs > "
  1546. f"timeout={idle_timeout_s} secs",
  1547. )
  1548. ctx.update(nodes)