| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790 |
- import copy
- import logging
- import time
- import uuid
- from abc import ABC, abstractmethod
- from collections import defaultdict
- from dataclasses import dataclass, field
- from enum import Enum
- from typing import Dict, List, Optional, Tuple
- from ray._private.protobuf_compat import message_to_dict
- from ray.autoscaler._private.constants import AUTOSCALER_CONSERVE_GPU_NODES
- from ray.autoscaler._private.resource_demand_scheduler import (
- UtilizationScore,
- _fits,
- _inplace_subtract,
- )
- from ray.autoscaler.v2.event_logger import AutoscalerEventLogger
- from ray.autoscaler.v2.instance_manager.common import InstanceUtil
- from ray.autoscaler.v2.instance_manager.config import NodeTypeConfig
- from ray.autoscaler.v2.schema import AutoscalerInstance, NodeType
- from ray.autoscaler.v2.utils import ProtobufUtil, ResourceRequestUtil
- from ray.core.generated.autoscaler_pb2 import (
- ClusterResourceConstraint,
- GangResourceRequest,
- ResourceRequest,
- ResourceRequestByCount,
- )
- from ray.core.generated.common_pb2 import LabelSelectorOperator
- from ray.core.generated.instance_manager_pb2 import (
- Instance,
- LaunchRequest,
- NodeKind,
- TerminationRequest,
- )
- # ============= Resource Scheduling Service API =======================
- #
- # ResourceSchedulerService is a service that schedules resource bundles
- # to nodes. It's used by the autoscaler to schedule resource bundles
- # to determine the desired cluster size to satisfy the current resource
- # demands.
- #
- logger = logging.getLogger(__name__)
- @dataclass
- class SchedulingRequest:
- # If outdated node check through launch config is disabled.
- disable_launch_config_check: bool
- # Available node type configs
- node_type_configs: Dict[NodeType, NodeTypeConfig] = field(default_factory=dict)
- # Max number of worker nodes.
- max_num_nodes: Optional[int] = None
- # Idle timeout in seconds.
- idle_timeout_s: Optional[float] = None
- # TODO: This prob could be refactored into the ClusterStatus data class later.
- # The current ray resource requests.
- resource_requests: List[ResourceRequestByCount] = field(default_factory=list)
- # The Gang resource requests.
- gang_resource_requests: List[GangResourceRequest] = field(default_factory=list)
- # cluster resource constraints.
- cluster_resource_constraints: List[ClusterResourceConstraint] = field(
- default_factory=list
- )
- # The current instances.
- current_instances: List[AutoscalerInstance] = field(default_factory=list)
- # The cloud resource availability score. A low score indicates that resource
- # allocation for this node type has recently failed.
- cloud_resource_availabilities: Dict[NodeType, float] = field(default_factory=dict)
- @dataclass
- class SchedulingReply:
- # Instances to launch.
- to_launch: List[LaunchRequest] = field(default_factory=list)
- # To terminate.
- to_terminate: List[TerminationRequest] = field(default_factory=list)
- # The infeasible resource bundles.
- infeasible_resource_requests: List[ResourceRequest] = field(default_factory=list)
- # The infeasible gang resource bundles.
- infeasible_gang_resource_requests: List[GangResourceRequest] = field(
- default_factory=list
- )
- # The infeasible cluster resource constraints.
- infeasible_cluster_resource_constraints: List[ClusterResourceConstraint] = field(
- default_factory=list
- )
- class IResourceScheduler(ABC):
- """
- Interface for a resource scheduler.
- Implements the `instance_manager.proto ResourceSchedulerService` interface.
- """
- @abstractmethod
- def schedule(self, request: SchedulingRequest) -> SchedulingReply:
- """
- Given the resource requests and the current cluster state, calculate the
- target cluster shape by trying to schedule the resource requests on the
- nodes.
- """
- pass
- class SchedulingNodeStatus(Enum):
- """
- The status of a scheduling node (`SchedulingNode`)
- """
- # The node is added by the ResourceDemandScheduler.
- TO_LAUNCH = "TO_LAUNCH"
- # The node is pending, i.e. there's already an autoscaler instance being launched
- # The node is schedulable. It could be running ray or pending to run ray. Either
- # Way, it should be able to accept new resource requests/resource constraints.
- SCHEDULABLE = "SCHEDULABLE"
- # The node is to be terminated by the ResourceDemandScheduler
- TO_TERMINATE = "TO_TERMINATE"
- class ResourceRequestSource(Enum):
- """
- The source of the resource request.
- """
- # The resource request is from demand, e.g. ray tasks/actors,
- # placement groups, etc.
- PENDING_DEMAND = "PENDING_DEMAND"
- # The resource request is from the cluster resource constraints, i.e.
- # from ray.autoscaler.sdk.request_resources().
- CLUSTER_RESOURCE_CONSTRAINT = "CLUSTER_RESOURCE_CONSTRAINT"
- @dataclass
- class SchedulingNode:
- """
- A abstraction of a node that can be scheduled on by the resource scheduler.
- A scheduling node is expected to be used as:
- node = SchedulingNode.new(instance, node_configs)
- remaining, score = node.try_schedule(requests)
- .... do something with the score ....
- NOTE:
- One could also extend the scheduling behavior by overriding `try_schedule`
- """
- # Node type name.
- node_type: NodeType
- # Status
- status: SchedulingNodeStatus
- # Resource requests scheduled on this nodes for different sources.
- sched_requests: Dict[ResourceRequestSource, List[ResourceRequest]] = field(
- default_factory=lambda: defaultdict(list)
- )
- # Available resources for different sources of requests.
- available_resources_for_sched: Dict[
- ResourceRequestSource, Dict[str, float]
- ] = field(default_factory=dict)
- # The node's current resource capacity.
- total_resources: Dict[str, float] = field(default_factory=dict)
- # Node's labels, including static or dynamic labels.
- # Note that dynamic labels are a deprecated feature. And it is only used for the
- # autoscaler’s strict-spread placement group scheduling (antiaffinity)
- labels: Dict[str, str] = field(default_factory=dict)
- # Observability descriptive message for why the node was launched in the
- # first place.
- launch_reason: Optional[str] = None
- # Termination request, none when the node is not being terminated.
- termination_request: Optional[TerminationRequest] = None
- # The instance id of the IM(Instance Manager) instance. None if the node
- # is not yet in IM.
- im_instance_id: Optional[str] = None
- # The instance status of the IM(Instance Manager) instance. None if the in-flight node
- # has not yet been assigned to an IM instance.
- im_instance_status: Optional[Instance.InstanceStatus.ValueType] = None
- # The ray node id of the ray node. None if the node is not included in
- # ray cluster's GCS report yet (not running ray yet).
- ray_node_id: Optional[str] = None
- # Idle duration in ms. Default not idle.
- idle_duration_ms: int = 0
- # Launch config hash.
- launch_config_hash: Optional[str] = None
- # node kind.
- node_kind: NodeKind = NodeKind.WORKER
- def __init__(
- self,
- node_type: NodeType,
- total_resources: Dict[str, float],
- available_resources: Dict[str, float],
- labels: Dict[str, str],
- status: SchedulingNodeStatus,
- im_instance_id: str = "",
- im_instance_status: Optional[Instance.InstanceStatus.ValueType] = None,
- ray_node_id: str = "",
- idle_duration_ms: int = 0,
- launch_config_hash: str = "",
- node_kind: NodeKind = NodeKind.WORKER,
- termination_request: Optional[TerminationRequest] = None,
- ):
- self.node_type = node_type
- self.total_resources = total_resources
- self.available_resources_for_sched = {
- ResourceRequestSource.PENDING_DEMAND: dict(available_resources),
- ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT: dict(total_resources),
- }
- self.sched_requests = {
- ResourceRequestSource.PENDING_DEMAND: [],
- ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT: [],
- }
- self.labels = labels
- self.status = status
- self.im_instance_id = im_instance_id
- self.im_instance_status = im_instance_status
- self.ray_node_id = ray_node_id
- self.idle_duration_ms = idle_duration_ms
- self.launch_config_hash = launch_config_hash
- self.node_kind = node_kind
- self.termination_request = termination_request
- def get_available_resources(self, resource_request_source: ResourceRequestSource):
- """Get the available resources for the given resource request source."""
- return self.available_resources_for_sched[resource_request_source]
- def get_sched_requests(self, resource_request_source: ResourceRequestSource):
- """Get the resource requests for the given resource request source."""
- return self.sched_requests[resource_request_source]
- def add_sched_request(
- self,
- request: ResourceRequest,
- resource_request_source: ResourceRequestSource,
- ):
- """
- Add the resource requests to the node.
- Args:
- request: The resource request to be added.
- resource_request_source: The source of the resource request.
- """
- self.sched_requests[resource_request_source].append(request)
- @staticmethod
- def new(
- instance: AutoscalerInstance,
- node_type_configs: Dict[NodeType, NodeTypeConfig],
- disable_launch_config_check: bool,
- ) -> Optional["SchedulingNode"]:
- """
- Create a new scheduling node from an autoscaler instance.
- It creates:
- - None if the instance is not schedulable by IM.
- - A schedulable node if the instance is running ray or pending to run ray,
- so it should be considered in the scheduling process.
- Args:
- instance: The instance.
- node_type_configs: The node type configs.
- disable_launch_config_check: If outdated node check through launch config is
- disabled.
- """
- if not SchedulingNode.is_schedulable(instance):
- return None
- if instance.im_instance.status == Instance.RAY_RUNNING:
- assert instance.ray_node is not None, (
- "ray node should not be None "
- f"when the instance is running ray: instance={instance}"
- )
- # An running ray node
- return SchedulingNode(
- node_type=instance.im_instance.instance_type,
- total_resources=dict(instance.ray_node.total_resources),
- # Available resources for scheduling requests of different
- # sources.
- available_resources=dict(instance.ray_node.available_resources),
- labels={
- **(instance.ray_node.labels or {}),
- # DEPRECATED: Dynamic labels are a deprecated feature. This field
- # is used here only for the autoscaler’s strict-spread placement
- # group scheduling (antiaffinity).
- **(instance.ray_node.dynamic_labels or {}),
- },
- status=SchedulingNodeStatus.SCHEDULABLE,
- im_instance_id=instance.im_instance.instance_id,
- im_instance_status=instance.im_instance.status,
- ray_node_id=instance.im_instance.node_id,
- idle_duration_ms=instance.ray_node.idle_duration_ms,
- launch_config_hash=instance.im_instance.launch_config_hash,
- node_kind=instance.im_instance.node_kind,
- )
- # This is an instance pending to run ray. Initialize a schedulable node
- # from the node type config.
- node_config = node_type_configs.get(instance.im_instance.instance_type, None)
- if node_config is None:
- if disable_launch_config_check:
- # We are not terminating outdated nodes.
- logger.info(
- f"Node config for {instance.im_instance.instance_type} is missing, "
- "but we are not terminating the outdated node because "
- "`disable_launch_config_check` is True in "
- "the autoscaler's provider config."
- )
- return None
- # Configs might have been updated, and no more
- # node_type_configs for this node type. We should terminate it.
- return SchedulingNode(
- node_type=instance.im_instance.instance_type,
- total_resources={},
- available_resources={},
- labels={},
- status=SchedulingNodeStatus.TO_TERMINATE,
- im_instance_id=instance.im_instance.instance_id,
- im_instance_status=instance.im_instance.status,
- termination_request=TerminationRequest(
- id=str(uuid.uuid4()),
- instance_id=instance.im_instance.instance_id,
- instance_status=instance.im_instance.status,
- cause=TerminationRequest.Cause.OUTDATED,
- instance_type=instance.im_instance.instance_type,
- ),
- node_kind=NodeKind.WORKER,
- )
- return SchedulingNode.from_node_config(
- node_config,
- SchedulingNodeStatus.SCHEDULABLE,
- node_kind=instance.im_instance.node_kind,
- im_instance_id=instance.im_instance.instance_id,
- im_instance_status=instance.im_instance.status,
- )
- @staticmethod
- def is_schedulable(instance: AutoscalerInstance) -> bool:
- """
- Check if the instance is schedulable by IM.
- Args:
- instance: The instance.
- Returns:
- True if the instance is schedulable by IM.
- """
- if instance.im_instance is None:
- # We will skip any instances that are not yet in IM which
- # could be
- # 1. an out-of-band ray node
- # 2. an cloud instance running ray not yet discovered
- # by the IM's Reconciler
- # 3. an cloud instance already terminated but ray state
- # still lagging behind.
- #
- # In all of these cases, the instance is not schedulable or
- # shouldn't be managed by IM, so we don't consider them.
- return False
- # These are the statuses where there's a running ray node or
- # could eventually run ray.
- if InstanceUtil.is_ray_running_reachable(instance.im_instance.status):
- return True
- return False
- @staticmethod
- def from_node_config(
- node_config: NodeTypeConfig,
- status: SchedulingNodeStatus,
- node_kind: NodeKind,
- im_instance_id: Optional[str] = None,
- im_instance_status: Optional[str] = None,
- ) -> "SchedulingNode":
- """
- Create a scheduling node from a node config.
- Args:
- node_config: The node config.
- status: The status of the node.
- node_kind: The node kind.
- im_instance_id: The instance id of the im instance.
- im_instance_status: The instance status of the im instance.
- node_kind: The node kind.
- """
- return SchedulingNode(
- node_type=node_config.name,
- total_resources=dict(node_config.resources),
- available_resources=dict(node_config.resources),
- labels=dict(node_config.labels),
- status=status,
- im_instance_id=im_instance_id,
- im_instance_status=im_instance_status,
- node_kind=node_kind,
- )
- def __post_init__(self):
- assert self.node_type, "node_type should be set"
- def try_schedule(
- self,
- requests: List[ResourceRequest],
- resource_request_source: ResourceRequestSource,
- ) -> Tuple[List[ResourceRequest], UtilizationScore]:
- """
- Try to schedule the resource requests on this node.
- This modifies the node's available resources if the requests are schedulable.
- The requests are scheduled one by one in the sorted order, and no
- backtracking is done.
- Args:
- requests: The resource requests to be scheduled.
- resource_request_source: The source of the resource request, i.e.
- pending demands from ray actors/tasks or cluster resource constraints.
- Returns:
- A tuple of:
- - list of remaining requests that cannot be scheduled on this node.
- - the utilization score for this node with respect to the current
- resource requests being scheduled.
- """
- # Track the resource requests that cannot be scheduled on this node.
- unschedulable_requests = []
- # Sort the requests and try schedule them one by one.
- for r in requests:
- if not self._try_schedule_one(r, resource_request_source):
- unschedulable_requests.append(r)
- score = self._compute_score(resource_request_source)
- return unschedulable_requests, score
- def _compute_score(
- self, resource_request_source: ResourceRequestSource
- ) -> UtilizationScore:
- """
- Compute the utilization score for this node with respect to the current resource
- request being scheduled.
- A "higher" score means that this node is more suitable for scheduling the
- current scheduled resource requests.
- The score is a tuple of 5 values:
- 1. Whether this node has labels matching the current resource request's
- label_selector requirements:
- 0: if this node does not satisfy any label selector requirements or
- no label selectors are provided.
- len(label_selectors)-i: a score based on the priority of the label
- selector in the resource request that this node satisfies.
- 2. Whether this node is a GPU node and the current resource request has
- GPU requirements:
- 0: if this node is a GPU node and the current resource request
- placed onto the node has no GPU requirements.
- 1: if this node is not a GPU node or the current resource request
- placed onto the node has GPU requirements.
- 3. The number of resource types being scheduled.
- 4. The minimum utilization rate across all resource types.
- 5. The average utilization rate across all resource types.
- NOTE:
- This function is adapted from _resource_based_utilization_scorer from
- autoscaler v1.
- TODO(rickyx,jjyao): We should also consider node labels for
- scoring. For example, if a node has a label that matches the affinity
- label of the resource request, we should give it a higher score.
- TODO(rickyx): add pluggable scoring functions here.
- Returns:
- A utilization score for this node.
- """
- sched_requests = self.get_sched_requests(resource_request_source)
- available_resources = self.get_available_resources(resource_request_source)
- # Compute the number of resource types being scheduled.
- num_matching_resource_types = 0
- sched_resource_types = set()
- for req in sched_requests:
- for resource_name, v in req.resources_bundle.items():
- if v > 0:
- sched_resource_types.add(resource_name)
- for sched_resource_type in sched_resource_types:
- if sched_resource_type in self.total_resources:
- num_matching_resource_types += 1
- # Compute the utilization rate for each resource type
- util_by_resources = []
- for k, v in self.total_resources.items():
- if v == 0:
- # Skip any zero values.
- continue
- if k in available_resources:
- util = (v - available_resources.get(k, 0)) / v
- assert util >= 0 and util <= 1, f"Invalid utilization: {util}"
- util_by_resources.append(v * (util**3))
- # Prefer not to launch a GPU node if there aren't any GPU requirements in the
- # resource bundle.
- gpu_ok = True
- if AUTOSCALER_CONSERVE_GPU_NODES:
- # TODO: we should also generalize this optimization for accelerators.
- # https://github.com/ray-project/ray/issues/43079
- is_gpu_node = self.total_resources.get("GPU", 0) > 0
- any_gpu_requests = any("GPU" in r.resources_bundle for r in sched_requests)
- if is_gpu_node and not any_gpu_requests:
- gpu_ok = False
- # Check if node satisfies label requirements.
- matches_labels = self._satisfies_label_constraints(sched_requests)
- # Prioritize avoiding gpu nodes for non-gpu workloads first,
- # then prioritize matching multiple resource types,
- # then prioritize using all resources,
- # then prioritize overall balance of multiple resources.
- return (
- matches_labels,
- gpu_ok,
- num_matching_resource_types,
- min(util_by_resources) if util_by_resources else 0,
- float(sum(util_by_resources)) / len(util_by_resources)
- if util_by_resources
- else 0,
- )
- def _satisfies_label_constraints(
- self, sched_requests: List[ResourceRequest]
- ) -> int:
- """Returns a higher value based on the priority of the label selector this node
- satisfies (first returns highest score, decreasing sequentially for fallback), 0 otherwise."""
- for req in sched_requests:
- num_selectors = len(req.label_selectors)
- for i, selector in enumerate(req.label_selectors):
- all_constraints_pass = True
- for constraint in selector.label_constraints:
- key = constraint.label_key
- values = set(constraint.label_values)
- op = constraint.operator
- node_val = self.labels.get(key)
- if op == LabelSelectorOperator.LABEL_OPERATOR_IN:
- if node_val not in values:
- all_constraints_pass = False
- break
- elif op == LabelSelectorOperator.LABEL_OPERATOR_NOT_IN:
- if node_val in values:
- all_constraints_pass = False
- break
- else:
- all_constraints_pass = False
- break
- if all_constraints_pass:
- return num_selectors - i
- return 0
- def _try_schedule_one(
- self, request: ResourceRequest, resource_request_source: ResourceRequestSource
- ) -> bool:
- """
- Try to schedule one resource request on this node. The request could be from
- various sources, specified by `resource_request_source`.
- Args:
- request: The resource request to be scheduled.
- resource_request_source: The source of the resource request, i.e.
- pending demands from ray actors/tasks or cluster resource constraints.
- Returns:
- True if the resource request is scheduled on this node.
- """
- # Enforce label selector constraints
- if request.label_selectors:
- if self._satisfies_label_constraints([request]) == 0:
- return False # Node doesn't satisfy any label selector in request.
- # Check if there's placement constraints that are not satisfied.
- for constraint in request.placement_constraints:
- if constraint.HasField("anti_affinity"):
- anti_affinity = constraint.anti_affinity
- if (
- anti_affinity.label_name in self.labels
- and anti_affinity.label_value
- == self.labels[anti_affinity.label_name]
- ):
- # The node already has a label that matches the anti-affinity
- return False
- # We don't need to check for affinity constraints here since
- # we have already combined resource requests with the affinity
- # constraints into the same request at `combine_requests_with_affinity`.
- pass
- available_resources_dict = self.get_available_resources(resource_request_source)
- # Check if there's enough resources to schedule the request.
- if not _fits(available_resources_dict, dict(request.resources_bundle)):
- return False
- # Schedule the request, update resources
- _inplace_subtract(available_resources_dict, dict(request.resources_bundle))
- # Add the request to the node.
- self.add_sched_request(request, resource_request_source)
- # Update the placement group in labels if there's any
- for constraint in request.placement_constraints:
- # We don't need to check for affinity constraints here since
- # we have already combined resource requests with the affinity
- # constraints into the same request at `combine_requests_with_affinity`.
- # We don't need node labels for enforcing affinity.
- if constraint.HasField("anti_affinity"):
- anti_affinity = constraint.anti_affinity
- self._add_label(anti_affinity.label_name, anti_affinity.label_value)
- return True
- def _add_label(self, label_name: str, label_value: str):
- """
- Add a label to the node.
- This assumes a label key can only have one value.
- """
- assert (
- self.labels.get(label_name) is None
- or self.labels[label_name] == label_value
- ), (
- f"Label {label_name} already exists with value "
- f"{self.labels[label_name]}, cannot set to "
- f"{label_value}"
- )
- self.labels[label_name] = label_value
- def __repr__(self) -> str:
- return (
- "SchedulingNode(node_type={node_type}, "
- "node_kind={node_kind}, "
- "instance_id={instance_id},"
- "instance_status={instance_status},"
- "ray_node_id={ray_node_id},"
- "idle_duration_ms={idle_duration_ms},"
- "termination_request={termination_request},"
- "status={status}, "
- "total_resources={total_resources}, "
- "available_resources_for_demand={available_resources_for_demand}, "
- "available_resources_for_cluster_resource_constraints="
- "{available_resources_for_cluster_resource_constraints},"
- "labels={labels}, launch_reason={launch_reason}), "
- "sched_requests_for_demand={sched_requests_for_demand}), "
- "sched_requests_for_cluster_resource_constraints="
- "{sched_requests_for_cluster_resources_constraint})"
- ).format(
- node_type=self.node_type,
- node_kind=self.node_kind,
- instance_id=self.im_instance_id,
- instance_status=self.im_instance_status,
- ray_node_id=self.ray_node_id,
- idle_duration_ms=self.idle_duration_ms,
- termination_request=str(message_to_dict(self.termination_request))
- if self.termination_request
- else None,
- status=self.status,
- total_resources=self.total_resources,
- available_resources_for_demand=self.available_resources_for_sched[
- ResourceRequestSource.PENDING_DEMAND
- ],
- available_resources_for_cluster_resource_constraints=self.available_resources_for_sched[ # noqa
- ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT
- ],
- labels=self.labels,
- launch_reason=self.launch_reason,
- sched_requests_for_demand="|".join(
- str(message_to_dict(r))
- for r in self.sched_requests[ResourceRequestSource.PENDING_DEMAND]
- ),
- sched_requests_for_cluster_resources_constraint="|".join(
- str(message_to_dict(r))
- for r in self.sched_requests[
- ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT
- ]
- ),
- )
- class ResourceDemandScheduler(IResourceScheduler):
- """
- A resource demand scheduler that schedules resource requests based on the
- following rules:
- 1. Enforce the minimal count of nodes for each worker node type.
- 2. Enforce the cluster resource constraints.
- 3. Schedule the gang resource requests.
- 4. Schedule the tasks/actor resource requests
- """
- def __init__(self, event_logger: Optional[AutoscalerEventLogger] = None):
- self._event_logger = event_logger
- @dataclass
- class ScheduleContext:
- """
- Encapsulates the context for processing one scheduling request.
- This exposes functions to read and write the scheduling nodes, to prevent
- accidental modification of the internal state.
- """
- # The node type configs for this scheduling request.
- _node_type_configs: Dict[NodeType, NodeTypeConfig]
- # If outdated node check through launch config is disabled.
- _disable_launch_config_check: bool
- # The max number of nodes for the entire cluster.
- _max_num_nodes: Optional[int] = None
- # The idle timeout in seconds.
- _idle_timeout_s: Optional[float] = None
- # The current schedulable nodes (including pending nodes and pending requests).
- _nodes: List[SchedulingNode] = field(default_factory=list)
- # The number of nodes by node types available for launching based on the max
- # number of workers in the config. This takes into account any pending/running
- # nodes.
- _node_type_available: Dict[NodeType, int] = field(default_factory=dict)
- # The availability scores of cloud resource. A low score suggests that
- # this type of resource has historically experienced allocation failures,
- # and the weight of this type should be reduced during scheduling.
- _cloud_resource_availabilities: Dict[NodeType, float] = field(
- default_factory=dict
- )
- def __init__(
- self,
- nodes: List[SchedulingNode],
- node_type_configs: Dict[NodeType, NodeTypeConfig],
- cloud_resource_availabilities: Dict[NodeType, float],
- disable_launch_config_check: bool,
- max_num_nodes: Optional[int] = None,
- idle_timeout_s: Optional[float] = None,
- ):
- self._nodes = nodes
- self._node_type_configs = node_type_configs
- self._node_type_available = self._compute_available_node_types(
- nodes, node_type_configs
- )
- self._max_num_nodes = max_num_nodes
- self._idle_timeout_s = idle_timeout_s
- self._disable_launch_config_check = disable_launch_config_check
- self._cloud_resource_availabilities = cloud_resource_availabilities
- @classmethod
- def from_schedule_request(
- cls, req: SchedulingRequest
- ) -> "ResourceDemandScheduler.ScheduleContext":
- """
- Create a schedule context from a schedule request.
- It will populate the context with the existing nodes and the available node
- types from the config.
- Args:
- req: The scheduling request. The caller should make sure the
- request is valid.
- """
- nodes = []
- node_type_configs = req.node_type_configs
- # Initialize the scheduling nodes.
- for instance in req.current_instances:
- node = SchedulingNode.new(
- instance, node_type_configs, req.disable_launch_config_check
- )
- if node:
- nodes.append(node)
- return cls(
- nodes=nodes,
- node_type_configs=node_type_configs,
- cloud_resource_availabilities=req.cloud_resource_availabilities,
- disable_launch_config_check=req.disable_launch_config_check,
- max_num_nodes=req.max_num_nodes,
- idle_timeout_s=req.idle_timeout_s,
- )
- @staticmethod
- def _compute_available_node_types(
- nodes: List[SchedulingNode],
- node_type_configs: Dict[NodeType, NodeTypeConfig],
- ) -> Dict[NodeType, int]:
- """
- Compute the number of nodes by node types available for launching based on
- the max number of workers in the config.
- Args:
- nodes: The current existing nodes.
- node_type_configs: The node type configs.
- Returns:
- A dict of node types and the number of nodes available for launching.
- """
- node_type_available: Dict[NodeType, int] = defaultdict(int)
- node_type_existing: Dict[NodeType, int] = defaultdict(int)
- for node in nodes:
- node_type_existing[node.node_type] += 1
- for (
- node_type,
- node_type_config,
- ) in node_type_configs.items():
- node_type_available[
- node_type
- ] = node_type_config.max_worker_nodes - node_type_existing.get(
- node_type, 0
- )
- return node_type_available
- def get_nodes(self) -> List[SchedulingNode]:
- """
- Get the current nodes with filter.
- Returns:
- A list of nodes.
- """
- nodes = copy.deepcopy(self._nodes)
- return nodes
- def get_node_type_available(self) -> Dict[NodeType, int]:
- return copy.deepcopy(self._node_type_available)
- def get_cluster_shape(self) -> Dict[NodeType, int]:
- cluster_shape = defaultdict(int)
- for node in self._nodes:
- if node.status == SchedulingNodeStatus.TO_TERMINATE:
- # Skip the nodes that are to be terminated.
- continue
- cluster_shape[node.node_type] += 1
- return cluster_shape
- def get_cluster_resources(self) -> Dict[str, float]:
- """
- Aggregate total cluster resources.
- Sums each node's `total_resources` across the current context,
- excluding nodes marked `TO_TERMINATE`.
- Returns:
- A dict mapping resource names to their summed resources.
- """
- cluster_resources = defaultdict(float)
- for node in self._nodes:
- if node.status == SchedulingNodeStatus.TO_TERMINATE:
- # Skip the nodes that are to be terminated.
- continue
- for key, value in node.total_resources.items():
- cluster_resources[key] += value
- return cluster_resources
- def get_idle_timeout_s(self) -> Optional[float]:
- return self._idle_timeout_s
- def get_cloud_resource_availabilities(self) -> Dict[NodeType, float]:
- return copy.deepcopy(self._cloud_resource_availabilities)
- def update(self, new_nodes: List[SchedulingNode]) -> None:
- """
- Update the context with the new nodes.
- """
- self._nodes = new_nodes
- # Update the available node types.
- self._node_type_available = self._compute_available_node_types(
- self._nodes, self._node_type_configs
- )
- def get_max_num_nodes(self) -> Optional[int]:
- """
- Get the max number of nodes for the entire cluster.
- """
- return self._max_num_nodes
- def get_node_type_configs(self) -> Dict[NodeType, NodeTypeConfig]:
- return self._node_type_configs
- def __str__(self) -> str:
- return "ScheduleContext({} nodes, node_type_available={})".format(
- len(self._nodes), dict(self._node_type_available)
- )
- def get_launch_requests(self) -> List[LaunchRequest]:
- """
- Get the launch requests for the nodes that are to be launched.
- """
- launch_by_type = defaultdict(int)
- for node in self._nodes:
- if node.status == SchedulingNodeStatus.TO_LAUNCH:
- launch_by_type[node.node_type] += 1
- launch_requests = []
- for instance_type, count in launch_by_type.items():
- launch_requests.append(
- LaunchRequest(
- instance_type=instance_type,
- count=count,
- id=str(uuid.uuid4()),
- request_ts_ms=time.time_ns() // 1000,
- )
- )
- return launch_requests
- def get_terminate_requests(
- self,
- ) -> List[TerminationRequest]:
- """
- Get the terminate requests for the nodes that are to be terminated.
- """
- return [
- node.termination_request
- for node in self._nodes
- if node.termination_request is not None
- ]
- def schedule(self, request: SchedulingRequest) -> SchedulingReply:
- logger.debug(
- "Scheduling for request: resource_request={}, gang_resource_request={}, "
- "cluster_constraint={}".format(
- ResourceRequestUtil.to_dict_list(request.resource_requests),
- ProtobufUtil.to_dict_list(request.gang_resource_requests),
- ProtobufUtil.to_dict_list(request.cluster_resource_constraints),
- )
- )
- ctx = ResourceDemandScheduler.ScheduleContext.from_schedule_request(request)
- # Enforce outdate nodes.
- ResourceDemandScheduler._terminate_outdated_nodes(ctx)
- # Enforce the minimal count of nodes for each worker node type.
- ResourceDemandScheduler._enforce_min_workers_per_type(ctx)
- # Enforce the max worker nodes count.
- ResourceDemandScheduler._enforce_max_workers_per_type(ctx)
- # Enforce the max worker nodes count globally.
- ResourceDemandScheduler._enforce_max_workers_global(ctx)
- # Enforce the cluster resource constraints.
- infeasible_constraints = ResourceDemandScheduler._enforce_resource_constraints(
- ctx, request.cluster_resource_constraints
- )
- # Schedule the gang resource requests.
- infeasible_gang_requests = (
- ResourceDemandScheduler._sched_gang_resource_requests(
- ctx, request.gang_resource_requests
- )
- )
- # Schedule the tasks/actor resource requests
- infeasible_requests = ResourceDemandScheduler._sched_resource_requests(
- ctx,
- ResourceRequestUtil.ungroup_by_count(request.resource_requests),
- )
- # Shutdown any idle nodes that's not needed (e.g. no resource constraints.
- # not needed by min_worker count, etc.)
- ResourceDemandScheduler._enforce_idle_termination(ctx)
- # Compute the number of nodes to launch.
- reply = SchedulingReply(
- infeasible_resource_requests=infeasible_requests,
- infeasible_gang_resource_requests=infeasible_gang_requests,
- infeasible_cluster_resource_constraints=infeasible_constraints,
- to_launch=ctx.get_launch_requests(),
- to_terminate=ctx.get_terminate_requests(),
- )
- if self._event_logger is not None:
- try:
- self._event_logger.log_cluster_scheduling_update(
- launch_requests=reply.to_launch,
- terminate_requests=reply.to_terminate,
- infeasible_requests=infeasible_requests,
- infeasible_gang_requests=infeasible_gang_requests,
- infeasible_cluster_resource_constraints=infeasible_constraints,
- cluster_resources=ctx.get_cluster_resources(),
- )
- except Exception:
- logger.exception("Failed to emit event logs.")
- return reply
- @staticmethod
- def _enforce_max_workers_per_type(
- ctx: "ResourceDemandScheduler.ScheduleContext",
- ) -> None:
- """
- Enforce the max number of workers for each node type.
- """
- # Get all the nodes by type
- all_nodes = ctx.get_nodes()
- non_terminating_nodes_by_type = defaultdict(list)
- terminating_nodes = []
- for node in all_nodes:
- if node.status == SchedulingNodeStatus.TO_TERMINATE:
- terminating_nodes.append(node)
- else:
- non_terminating_nodes_by_type[node.node_type].append(node)
- # Step 1. Enforce the max number of workers for each node type.
- for node_type in non_terminating_nodes_by_type.keys():
- non_terminate_nodes_of_type = non_terminating_nodes_by_type[node_type]
- node_config = ctx.get_node_type_configs()[node_type]
- num_max_nodes_per_type = node_config.max_worker_nodes
- num_extra_nodes = len(non_terminate_nodes_of_type) - num_max_nodes_per_type
- if num_extra_nodes <= 0:
- # No extra nodes for this type, continue.
- continue
- # Terminate the nodes
- (
- to_terminate,
- remained_nodes,
- ) = ResourceDemandScheduler._select_nodes_to_terminate(
- non_terminate_nodes_of_type,
- num_extra_nodes,
- TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE,
- max_num_nodes_per_type=num_max_nodes_per_type,
- )
- non_terminating_nodes_by_type[node_type] = remained_nodes
- terminating_nodes.extend(to_terminate)
- non_terminating_nodes = []
- for nodes in non_terminating_nodes_by_type.values():
- non_terminating_nodes.extend(nodes)
- # Update the context
- assert len(all_nodes) == len(
- terminating_nodes + non_terminating_nodes
- ), "The number of nodes should be the same after enforcing max nodes per type."
- ctx.update(terminating_nodes + non_terminating_nodes)
- if terminating_nodes:
- logger.debug(
- f"Terminating {len(terminating_nodes)} "
- "nodes for per node type max num node's constraints."
- )
- @staticmethod
- def _enforce_max_workers_global(
- ctx: "ResourceDemandScheduler.ScheduleContext",
- ) -> None:
- """
- Enforce the max number of workers for the entire cluster.
- """
- all_nodes = ctx.get_nodes()
- terminating_nodes = []
- non_terminating_nodes = []
- for node in all_nodes:
- if node.status == SchedulingNodeStatus.TO_TERMINATE:
- terminating_nodes.append(node)
- else:
- non_terminating_nodes.append(node)
- num_max_nodes = ctx.get_max_num_nodes()
- num_to_terminate = (
- max(len(non_terminating_nodes) - num_max_nodes, 0) if num_max_nodes else 0
- )
- if num_to_terminate <= 0:
- # No extra nodes needed to terminate.
- return
- # Terminate the nodes
- (
- to_terminate_nodes,
- non_terminating_nodes,
- ) = ResourceDemandScheduler._select_nodes_to_terminate(
- non_terminating_nodes,
- num_to_terminate,
- TerminationRequest.Cause.MAX_NUM_NODES,
- max_num_nodes=num_max_nodes,
- )
- assert len(to_terminate_nodes) == num_to_terminate, (
- "Terminating {} nodes, failed to terminate {} nodes to "
- "satisfy max_num_nodes={}".format(
- len(to_terminate_nodes),
- num_to_terminate - len(to_terminate_nodes),
- num_max_nodes,
- )
- )
- # Update the context
- terminating_nodes.extend(to_terminate_nodes)
- assert len(all_nodes) == len(
- terminating_nodes + non_terminating_nodes
- ), "The number of nodes should be the same after enforcing max nodes."
- all_nodes = terminating_nodes + non_terminating_nodes
- ctx.update(all_nodes)
- @staticmethod
- def _select_nodes_to_terminate(
- nodes: List[SchedulingNode],
- num_to_terminate: int,
- cause: TerminationRequest.Cause,
- max_num_nodes: Optional[int] = None,
- max_num_nodes_per_type: Optional[int] = None,
- ) -> Tuple[List[SchedulingNode], List[SchedulingNode]]:
- """
- Select 'num_to_terminate' of nodes to be terminated
- from the 'nodes' list. It should never select a head node.
- Args:
- nodes: The nodes to be terminated.
- num_to_terminate: The number of nodes to be terminated.
- cause: The cause of the termination. Should be one of
- TerminationRequest.Cause.MAX_NUM_NODES or
- TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE.
- max_num_nodes: The max number of nodes for the entire cluster only
- used when the cause is TerminationRequest.Cause.MAX_NUM_NODES.
- max_num_nodes_per_type: The max number of nodes for each node type.
- Only used when the cause is
- TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE.
- Returns:
- A tuple of:
- - The terminated nodes.
- - The remained nodes.
- """
- # Sort the nodes for termination.
- nodes.sort(key=ResourceDemandScheduler._sort_nodes_for_termination)
- # Remove the head node from the list.
- head_node = None
- for i, node in enumerate(nodes):
- if node.node_kind == NodeKind.HEAD:
- # Remove the head node from the list.
- head_node = nodes.pop(i)
- break
- terminated_nodes, remained_nodes = (
- nodes[:num_to_terminate],
- # The head could be None if there's no head node being reported yet
- # from the ray cluster.
- nodes[num_to_terminate:] + ([head_node] if head_node else []),
- )
- assert cause in [
- TerminationRequest.Cause.MAX_NUM_NODES,
- TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE,
- ], "Other termination causes don't have to select nodes for termination."
- for node in terminated_nodes:
- node.status = SchedulingNodeStatus.TO_TERMINATE
- node.termination_request = TerminationRequest(
- id=str(uuid.uuid4()),
- instance_id=node.im_instance_id,
- ray_node_id=node.ray_node_id,
- cause=cause,
- instance_type=node.node_type,
- instance_status=node.im_instance_status,
- details=(
- f"Terminating node due to {TerminationRequest.Cause.Name(cause)}: "
- f"max_num_nodes={max_num_nodes}, "
- f"max_num_nodes_per_type={max_num_nodes_per_type}"
- ),
- )
- if cause == TerminationRequest.Cause.MAX_NUM_NODES:
- node.termination_request.max_num_nodes = max_num_nodes
- elif cause == TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE:
- node.termination_request.max_num_nodes_per_type = max_num_nodes_per_type
- else:
- raise ValueError("Unknown termination cause: {}".format(cause))
- return terminated_nodes, remained_nodes
- @staticmethod
- def _sort_nodes_for_termination(node: SchedulingNode) -> Tuple:
- """
- Sort the nodes for termination increasingly by:
- 1. First if ray hasn't been started yet
- 2. Then if the nodes are idle
- 3. Then with lower resources util nodes first.
- Such that nodes sorted earlier will be terminated first.
- """
- running_ray = len(node.ray_node_id) > 0
- # Reverse the idle duration such that the nodes with the largest idle duration
- # will be terminated first.
- idle_dur = -1 * node.idle_duration_ms
- available_resources = node.get_available_resources(
- ResourceRequestSource.PENDING_DEMAND
- )
- utils_per_resources = {}
- for resource, total in node.total_resources.items():
- if total <= 0:
- continue
- utils_per_resources[resource] = (
- total - available_resources.get(resource, 0)
- ) / total
- avg_util = (
- sum(utils_per_resources.values()) / len(utils_per_resources)
- if utils_per_resources
- else 0
- )
- return (running_ray, idle_dur, avg_util)
- @staticmethod
- def _enforce_min_workers_per_type(
- ctx: "ResourceDemandScheduler.ScheduleContext",
- ) -> None:
- """
- Enforce the minimal count of nodes for each worker node type.
- """
- # Count the existing nodes by type
- count_by_node_type = ctx.get_cluster_shape()
- new_nodes = []
- # Launch new nodes to satisfy min count for each node type.
- for (
- node_type,
- node_type_config,
- ) in ctx.get_node_type_configs().items():
- cur_count = count_by_node_type.get(node_type, 0)
- min_count = node_type_config.min_worker_nodes
- if cur_count < min_count:
- logger.info(
- f"Adding {min_count - cur_count} nodes to satisfy min count for "
- f"node type: {node_type}."
- )
- new_nodes.extend(
- [
- SchedulingNode.from_node_config(
- copy.deepcopy(node_type_config),
- status=SchedulingNodeStatus.TO_LAUNCH,
- node_kind=NodeKind.WORKER,
- )
- ]
- * (min_count - cur_count)
- )
- # NOTE: we assume the aggregated number of min workers across all node types
- # should not exceed any globally enforced max_num_nodes
- # Add the new nodes to the existing nodes and update the context.
- ctx.update(new_nodes + ctx.get_nodes())
- @staticmethod
- def _enforce_resource_constraints(
- ctx: "ResourceDemandScheduler.ScheduleContext",
- constraints: List[ClusterResourceConstraint],
- ) -> List[ClusterResourceConstraint]:
- """
- Enforce the cluster resource constraints.
- Args:
- ctx: The schedule context.
- constraints: The cluster resource constraints.
- Returns:
- A list of infeasible constraints.
- Notes:
- It's different from the other scheduling functions since it doesn't actually
- schedule any resource requests. Instead, it asks if the cluster could be
- upscale to a certain shape to fulfill the constraints.
- """
- # NOTE: we currently only have 1 constraint from a cluster, but
- # we may have multiple in the future.
- assert len(constraints) <= 1, "Max 1 cluster resource constraint is supported."
- if len(constraints) == 0:
- # No cluster resource constraints - nothing needs to be done.
- return []
- constraint = constraints[0]
- # Flatten the requests for iterating through.
- requests = ResourceRequestUtil.ungroup_by_count(constraint.resource_requests)
- # Pass the empty nodes to schedule.
- scheduled_nodes, infeasible = ResourceDemandScheduler._try_schedule(
- ctx,
- requests,
- resource_request_source=ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT,
- )
- if infeasible:
- # Unable to satisfy the constraint.
- return [constraint]
- ctx.update(scheduled_nodes)
- return []
- @staticmethod
- def _sched_resource_requests(
- ctx: "ResourceDemandScheduler.ScheduleContext",
- requests: List[ResourceRequest],
- ) -> List[ResourceRequest]:
- """
- Schedule the resource requests.
- Args:
- ctx: The schedule context.
- requests_by_count: The resource requests.
- Returns:
- A list of infeasible resource requests.
- """
- nodes, infeasible = ResourceDemandScheduler._try_schedule(
- ctx, requests, resource_request_source=ResourceRequestSource.PENDING_DEMAND
- )
- # Regardless if there's feasible, we will update the context for schedule nodes.
- ctx.update(nodes)
- return infeasible
- @staticmethod
- def _sched_gang_resource_requests(
- ctx: "ResourceDemandScheduler.ScheduleContext",
- gang_requests: List[GangResourceRequest],
- ) -> List[GangResourceRequest]:
- """
- Schedule the gang resource requests.
- These requests should be scheduled atomically, i.e. either all of the resources
- requests in a gang request are scheduled or none of them are scheduled.
- For now, the gang resource requests represent Ray's placement groups, while it
- could be more general in the future:
- - For STRICT_PACK placement group requests, we combine them into a single
- request and try to schedule them together.
- - For STRICT_SPREAD placement groups requests, they should be scheduled on
- different nodes by leveraging on the node labels that are associated with
- the placement group.
- If there are requests from rescheduling placement groups due to node
- failures, these requests should not be scheduled on nodes with requests
- from the same placement group.
- Args:
- ctx: The schedule context.
- gang_requests: The gang resource requests.
- Returns:
- A list of infeasible gang resource requests.
- """
- def _sort_gang_resource_requests(req: GangResourceRequest) -> Tuple:
- """
- Key function for sorting the gang resource request by:
- 1. the number of placement constraints in the gang request.
- 2. the number of resource requests in the gang request.
- """
- total_placement_constraints = 0
- for resource_request in req.requests:
- total_placement_constraints += len(
- resource_request.placement_constraints
- )
- return (total_placement_constraints, len(req.requests))
- infeasible_gang_requests = []
- # Try fulfilling the gang requests one by one.
- for gang_req in sorted(
- gang_requests, key=_sort_gang_resource_requests, reverse=True
- ):
- if gang_req.bundle_selectors:
- # TODO: @ryanaoleary multiple `bundle_selectors` will be supported
- # for `fallback_strategy`.
- requests = gang_req.bundle_selectors[0].resource_requests
- else:
- # Use legacy field if `bundle_selectors` not provided.
- requests = gang_req.requests
- # Try to combine requests with affinity constraints into the same request.
- requests = ResourceRequestUtil.combine_requests_with_affinity(requests)
- nodes, infeasible = ResourceDemandScheduler._try_schedule(
- ctx, requests, ResourceRequestSource.PENDING_DEMAND
- )
- if infeasible:
- # Unable to satisfy the constraint. We will skip the gang request.
- # Don't update the context.
- infeasible_gang_requests.append(gang_req)
- continue
- # We are able to satisfy the constraint and thus update the context.
- ctx.update(nodes)
- return infeasible_gang_requests
- @staticmethod
- def _try_schedule(
- ctx: "ResourceDemandScheduler.ScheduleContext",
- requests_to_sched: List[ResourceRequest],
- resource_request_source: ResourceRequestSource,
- ) -> Tuple[List[SchedulingNode], List[ResourceRequest]]:
- """
- Try to schedule the resource requests on the current context.
- It tries to schedule the requests on the existing nodes first, and
- then try to schedule the requests on new nodes if possible.
- Args:
- requests_to_sched: The resource requests to be scheduled.
- ctx: The current scheduling context.
- resource_request_source: The source of the resource request, i.e.
- pending demands from ray actors/tasks or cluster resource
- constraints.
- Returns:
- - List of scheduled nodes to that have part or all of the requests
- scheduled.
- - List of infeasible requests remained that cannot be scheduled.
- """
- # First sort the requests.
- def _sort_resource_request(req: ResourceRequest) -> Tuple:
- """
- Sort the resource requests by:
- 1. The length of its placement constraints.
- 2. The length of its first label selector constraints (if any).
- 3. The number of resources it requests.
- 4. The values of resources it requests.
- 5. lexicographically for each resource (for stable ordering)
- This is a legacy sorting function for the autoscaler's binpacking
- algo - we do this so that we could have a deterministic scheduling
- results with reasonable fragmentation.
- """
- label_constraint_len = (
- len(req.label_selectors[0].label_constraints)
- if req.label_selectors
- else 0
- )
- return (
- len(req.placement_constraints),
- label_constraint_len,
- len(req.resources_bundle.values()),
- sum(req.resources_bundle.values()),
- sorted(req.resources_bundle.items()),
- )
- requests_to_sched = sorted(
- requests_to_sched, key=_sort_resource_request, reverse=True
- )
- existing_nodes = ctx.get_nodes()
- node_type_available = ctx.get_node_type_available()
- # A list of nodes that are either:
- # 1. existing nodes in the cluster. or
- # 2. new nodes that are launched to satisfy the resource requests.
- target_nodes = []
- # Try scheduling resource requests with existing nodes first.
- while len(requests_to_sched) > 0 and len(existing_nodes) > 0:
- (
- best_node,
- requests_to_sched,
- existing_nodes,
- ) = ResourceDemandScheduler._sched_best_node(
- requests_to_sched,
- existing_nodes,
- resource_request_source,
- ctx.get_cloud_resource_availabilities(),
- )
- if best_node is None:
- # No existing nodes can schedule any more requests.
- break
- target_nodes.append(best_node)
- # If there's any existing nodes left, we will add to the target nodes
- target_nodes.extend(existing_nodes)
- # Try scheduling resource requests with new nodes.
- node_pools = [
- SchedulingNode.from_node_config(
- ctx.get_node_type_configs()[node_type],
- status=SchedulingNodeStatus.TO_LAUNCH,
- node_kind=NodeKind.WORKER,
- )
- for node_type, num_available in node_type_available.items()
- if num_available > 0
- ]
- while len(requests_to_sched) > 0 and len(node_pools) > 0:
- # Max number of nodes reached.
- max_num_nodes = ctx.get_max_num_nodes()
- if max_num_nodes is not None and len(target_nodes) >= max_num_nodes:
- logger.debug(
- "Max number of nodes reached: {}, "
- "cannot launch more nodes.".format(max_num_nodes)
- )
- break
- (
- best_node,
- requests_to_sched,
- node_pools,
- ) = ResourceDemandScheduler._sched_best_node(
- requests_to_sched,
- node_pools,
- resource_request_source,
- ctx.get_cloud_resource_availabilities(),
- )
- if best_node is None:
- break
- target_nodes.append(best_node)
- # Update the node pool if a node with the same node type of the
- # added node can be launched.
- node_type_available[best_node.node_type] -= 1
- if node_type_available[best_node.node_type] > 0:
- node_pools.append(
- SchedulingNode.from_node_config(
- ctx.get_node_type_configs()[best_node.node_type],
- status=SchedulingNodeStatus.TO_LAUNCH,
- node_kind=NodeKind.WORKER,
- )
- )
- return target_nodes, requests_to_sched
- @staticmethod
- def _sched_best_node(
- requests: List[ResourceRequest],
- nodes: List[SchedulingNode],
- resource_request_source: ResourceRequestSource,
- cloud_resource_availabilities: Dict[NodeType, float],
- ) -> Tuple[SchedulingNode, List[ResourceRequest], List[SchedulingNode]]:
- """
- Schedule the requests on the best node.
- A simple greedy algorithm is used to schedule the requests:
- 1. Try to schedule the requests on each node.
- 2. Sort the nodes by a score. The sorting includes:
- 2.1. UtilizationScore: to maximize resource utilization.
- 2.2. Cloud resource availabilities: prioritize node types with
- the most available cloud resources, in order to minimize allocation
- failures.
- 3. Return the node with the highest score.
- The highest score node is updated with the scheduled requests, and the node is
- removed from the node list.
- Args:
- requests: The resource requests to be scheduled.
- nodes: The node candidates to be scheduled on. The nodes will be updated
- after the scheduling attempt, i.e. the node that is scheduled will be
- removed from the list.
- resource_request_source: The source of the resource request, i.e.
- pending demands from ray actors/tasks or cluster resource constraints.
- cloud_resource_availabilities: The cloud resource availability score. A low
- score indicates that allocation for this node type has recently failed.
- Returns:
- best_node: The best node to schedule the requests.
- infeasible: The infeasible requests that cannot be scheduled on the best
- node.
- nodes: Remaining nodes after the best node is removed.
- """
- results = []
- # A temporary data class to store the scheduling result.
- @dataclass
- class ScheduleResult:
- # The node candidate after a scheduling attempt.
- node: SchedulingNode
- # The infeasible resource requests that are not scheduled.
- infeasible_requests: List[ResourceRequest]
- # The index of the node in the original node list.
- idx: int
- # the score of the scheduling node to compare with others.
- score: UtilizationScore
- nodes_copy = copy.deepcopy(nodes)
- # Iterate through each node and modify the node's available resources
- # if the requests are schedulable.
- for idx, node in enumerate(nodes_copy):
- remaining, score = node.try_schedule(requests, resource_request_source)
- if len(remaining) == len(requests):
- # The node cannot schedule any of the requests.
- continue
- results.append(ScheduleResult(node, remaining, idx, score))
- # No nodes can schedule any of the requests.
- if len(results) == 0:
- logger.debug(
- "No nodes can schedule the requests: {}, for nodes: {}".format(
- ResourceRequestUtil.to_dict_list(requests), nodes
- )
- )
- return None, requests, nodes
- # Sort the results by score.
- results = sorted(
- results,
- key=lambda r: (
- r.score,
- cloud_resource_availabilities.get(r.node.node_type, 1),
- ),
- reverse=True,
- )
- best_result = results[0]
- # Remove the best node from the nodes.
- nodes.pop(best_result.idx)
- logger.debug(
- "Best node: {}, score: {}, remaining requests: {}".format(
- best_result.node,
- best_result.score,
- ResourceRequestUtil.to_dict_list(best_result.infeasible_requests),
- )
- )
- return best_result.node, best_result.infeasible_requests, nodes
- @staticmethod
- def _terminate_outdated_nodes(
- ctx: "ResourceDemandScheduler.ScheduleContext",
- ) -> None:
- """
- Terminate the nodes that are outdated, i.e. the node type config has been
- updated or the node's launch config hash is outdated.
- Args:
- ctx: The schedule context.
- """
- nodes = ctx.get_nodes()
- if ctx._disable_launch_config_check:
- # Outdated nodes check through launch config check is disabled.
- return
- for node in nodes:
- if node.status != SchedulingNodeStatus.SCHEDULABLE:
- # We don't need to care about the non-running nodes.
- continue
- if node.node_kind == NodeKind.HEAD:
- # We should not be terminating the head node even if it's outdated.
- logger.warning(
- f"Head node {node.im_instance_id}(ray={node.ray_node_id}) is "
- "outdated with node config changes. "
- "Please check the node's config or restart the cluster or restart "
- "the head node. Autoscaler is not able to shutdown the outdated "
- "head node"
- )
- continue
- node_type = node.node_type
- node_type_config = ctx.get_node_type_configs().get(node_type)
- if node_type_config is None or (
- node_type_config.launch_config_hash
- and node_type_config.launch_config_hash != node.launch_config_hash
- ):
- # The node type config has been updated, and the node's launch config
- # hash is outdated.
- node.status = SchedulingNodeStatus.TO_TERMINATE
- node.termination_request = TerminationRequest(
- id=str(time.time_ns()),
- instance_id=node.im_instance_id,
- ray_node_id=node.ray_node_id,
- instance_type=node.node_type,
- instance_status=node.im_instance_status,
- cause=TerminationRequest.Cause.OUTDATED,
- details=f"node from {node.node_type} has outdated config",
- )
- ctx.update(nodes)
- @staticmethod
- def _enforce_idle_termination(
- ctx: "ResourceDemandScheduler.ScheduleContext",
- ) -> None:
- """
- Enforce the idle termination for the nodes that are not needed by the cluster
- resource constraints and idle for too long.
- Args:
- ctx: The schedule context.
- """
- count_by_node_type = ctx.get_cluster_shape()
- node_type_configs = ctx.get_node_type_configs()
- terminate_nodes_by_type: Dict[NodeType, int] = defaultdict(int)
- nodes = ctx.get_nodes()
- s_to_ms = 1000
- for node in nodes:
- if node.status != SchedulingNodeStatus.SCHEDULABLE:
- # We don't need to care about the non-running nodes.
- continue
- if node.node_kind == NodeKind.HEAD:
- # The head node is not subject to idle termination.
- continue
- idle_timeout_s = ctx.get_idle_timeout_s()
- # Override the scheduler idle_timeout_s if set for this node_type.
- node_type = node.node_type
- if node_type in node_type_configs:
- if node_type_configs[node_type].idle_timeout_s is not None:
- idle_timeout_s = node_type_configs[node_type].idle_timeout_s
- if idle_timeout_s is None:
- # No idle timeout is set, skip the idle termination.
- continue
- if node.idle_duration_ms <= idle_timeout_s * s_to_ms:
- # The node is not idle for too long, skip it.
- continue
- if node.sched_requests[ResourceRequestSource.PENDING_DEMAND]:
- # The node is needed by the pending requests.
- # Skip it.
- logger.debug(
- "Node {} (idle for {} secs) is needed by the pending requests, "
- "skip idle termination.".format(
- node.ray_node_id, node.idle_duration_ms / s_to_ms
- )
- )
- continue
- if node.sched_requests[ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT]:
- # The node is needed by the resource constraints.
- # Skip it.
- logger.debug(
- "Node {} (idle for {} secs) is needed by the cluster resource "
- "constraints, skip idle termination.".format(
- node.ray_node_id, node.idle_duration_ms / s_to_ms
- )
- )
- continue
- # Honor the min_worker_nodes setting for the node type.
- min_count = 0
- if node_type in node_type_configs:
- min_count = node_type_configs[node_type].min_worker_nodes
- if (
- count_by_node_type.get(node_type, 0)
- - terminate_nodes_by_type[node_type]
- <= min_count
- ):
- logger.info(
- "Node {} (idle for {} secs) belongs to node_type {} and is "
- "required by min_worker_nodes, skipping idle termination.".format(
- node.ray_node_id, node.idle_duration_ms / s_to_ms, node_type
- )
- )
- continue
- terminate_nodes_by_type[node.node_type] += 1
- # The node is idle for too long, terminate it.
- node.status = SchedulingNodeStatus.TO_TERMINATE
- node.termination_request = TerminationRequest(
- id=str(uuid.uuid4()),
- instance_id=node.im_instance_id,
- ray_node_id=node.ray_node_id,
- cause=TerminationRequest.Cause.IDLE,
- instance_type=node.node_type,
- instance_status=node.im_instance_status,
- idle_duration_ms=node.idle_duration_ms,
- details=f"idle for {node.idle_duration_ms/s_to_ms} secs > "
- f"timeout={idle_timeout_s} secs",
- )
- ctx.update(nodes)
|