| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955 |
- import copy
- import logging
- import sys
- import warnings
- from abc import ABC, abstractmethod
- from collections import defaultdict
- from dataclasses import dataclass
- from enum import Enum
- from functools import total_ordering
- from typing import Any, Callable, Dict, List, Optional, Set, Tuple
- import ray
- from ray._raylet import node_labels_match_selector
- from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache
- from ray.serve._private.common import (
- CreatePlacementGroupRequest,
- DeploymentID,
- ReplicaID,
- )
- from ray.serve._private.config import ReplicaConfig
- from ray.serve._private.constants import (
- RAY_SERVE_HIGH_PRIORITY_CUSTOM_RESOURCES,
- RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY,
- RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY,
- SERVE_LOGGER_NAME,
- )
- from ray.util.scheduling_strategies import (
- LabelMatchExpressionsT,
- NodeAffinitySchedulingStrategy,
- NodeLabelSchedulingStrategy,
- PlacementGroupSchedulingStrategy,
- )
- logger = logging.getLogger(SERVE_LOGGER_NAME)
- class SpreadDeploymentSchedulingPolicy:
- """A scheduling policy that spreads replicas with best effort."""
- pass
- @total_ordering
- class Resources(dict):
- # Custom resource priority from environment variable
- CUSTOM_PRIORITY: List[str] = RAY_SERVE_HIGH_PRIORITY_CUSTOM_RESOURCES
- EPSILON = 1e-9
- def get(self, key: str):
- val = super().get(key)
- if val is not None:
- return val
- # Implicit resources by default have 1 total
- if key.startswith(ray._raylet.IMPLICIT_RESOURCE_PREFIX):
- return 1
- # Otherwise by default there is 0 of this resource
- return 0
- def can_fit(self, other):
- keys = set(self.keys()) | set(other.keys())
- # We add a small epsilon to avoid floating point precision issues.
- return all(self.get(k) + self.EPSILON >= other.get(k) for k in keys)
- def __eq__(self, other):
- keys = set(self.keys()) | set(other.keys())
- return all([self.get(k) == other.get(k) for k in keys])
- def __add__(self, other):
- keys = set(self.keys()) | set(other.keys())
- kwargs = dict()
- for key in keys:
- if key.startswith(ray._raylet.IMPLICIT_RESOURCE_PREFIX):
- kwargs[key] = min(1.0, self.get(key) + other.get(key))
- else:
- kwargs[key] = self.get(key) + other.get(key)
- return Resources(kwargs)
- def __sub__(self, other):
- keys = set(self.keys()) | set(other.keys())
- kwargs = {key: self.get(key) - other.get(key) for key in keys}
- return Resources(kwargs)
- def __lt__(self, other):
- """Determines priority when sorting a list of SoftResources.
- 1. Custom resources defined in RAY_SERVE_HIGH_PRIORITY_CUSTOM_RESOURCES (sorted by priority)
- 2. GPU
- 3. CPU
- 4. memory
- 5. Other custom resources
- This means a resource with a larger number of high-priority resources is always
- sorted higher than one with fewer, regardless of other types.
- """
- keys = set(self.keys()) | set(other.keys())
- custom_keys = keys - {"GPU", "CPU", "memory"}
- for key in self.CUSTOM_PRIORITY:
- if self.get(key) < other.get(key):
- return True
- elif self.get(key) > other.get(key):
- return False
- if self.get("GPU") < other.get("GPU"):
- return True
- elif self.get("GPU") > other.get("GPU"):
- return False
- if self.get("CPU") < other.get("CPU"):
- return True
- elif self.get("CPU") > other.get("CPU"):
- return False
- if self.get("memory") < other.get("memory"):
- return True
- elif self.get("memory") > other.get("memory"):
- return False
- for key in custom_keys - set(self.CUSTOM_PRIORITY):
- if self.get(key) < other.get(key):
- return True
- elif self.get(key) > other.get(key):
- return False
- return False
- class ReplicaSchedulingRequestStatus(str, Enum):
- """The status of a replica scheduling request."""
- IN_PROGRESS = "IN_PROGRESS"
- SUCCEEDED = "SUCCEEDED"
- ACTOR_CREATION_FAILED = "ACTOR_CREATION_FAILED"
- PLACEMENT_GROUP_CREATION_FAILED = "PLACEMENT_GROUP_CREATION_FAILED"
- @dataclass
- class ReplicaSchedulingRequest:
- """Request to schedule a single replica.
- The scheduler is responsible for scheduling
- based on the deployment scheduling policy.
- """
- replica_id: ReplicaID
- actor_def: ray.actor.ActorClass
- actor_resources: Dict
- actor_options: Dict
- actor_init_args: Tuple
- on_scheduled: Callable
- status: ReplicaSchedulingRequestStatus = ReplicaSchedulingRequestStatus.IN_PROGRESS
- # Placement group bundles and strategy *for this replica*.
- # These are optional: by default replicas do not have a placement group.
- placement_group_bundles: Optional[List[Dict[str, float]]] = None
- placement_group_strategy: Optional[str] = None
- placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None
- placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None
- max_replicas_per_node: Optional[int] = None
- @property
- def required_resources(self) -> Resources:
- """The resources required to schedule this replica on a node.
- If this replica uses a strict pack placement group, the
- required resources is the sum of the placement group bundles.
- Otherwise, required resources is simply the actor resources.
- """
- if (
- self.placement_group_bundles is not None
- and self.placement_group_strategy == "STRICT_PACK"
- ):
- return sum(
- [Resources(bundle) for bundle in self.placement_group_bundles],
- Resources(),
- )
- else:
- required = Resources(self.actor_resources)
- # Using implicit resource (resources that every node
- # implicitly has and total is 1)
- # to limit the number of replicas on a single node.
- if self.max_replicas_per_node is not None:
- deployment_id = self.replica_id.deployment_id
- implicit_resource = (
- f"{ray._raylet.IMPLICIT_RESOURCE_PREFIX}"
- f"{deployment_id.app_name}:{deployment_id.name}"
- )
- required[implicit_resource] = 1.0 / self.max_replicas_per_node
- return required
- @dataclass
- class DeploymentDownscaleRequest:
- """Request to stop a certain number of replicas.
- The scheduler is responsible for
- choosing the replicas to stop.
- """
- deployment_id: DeploymentID
- num_to_stop: int
- @dataclass
- class DeploymentSchedulingInfo:
- deployment_id: DeploymentID
- scheduling_policy: Any
- actor_resources: Optional[Resources] = None
- placement_group_bundles: Optional[List[Resources]] = None
- placement_group_strategy: Optional[str] = None
- max_replicas_per_node: Optional[int] = None
- @property
- def required_resources(self) -> Resources:
- """The resources required to schedule a replica of this deployment on a node.
- If this replicas uses a strict pack placement group, the
- required resources is the sum of the placement group bundles.
- Otherwise, required resources is simply the actor resources.
- """
- if (
- self.placement_group_bundles is not None
- and self.placement_group_strategy == "STRICT_PACK"
- ):
- return sum(self.placement_group_bundles, Resources())
- else:
- required = self.actor_resources
- # Using implicit resource (resources that every node
- # implicitly has and total is 1)
- # to limit the number of replicas on a single node.
- if self.max_replicas_per_node:
- implicit_resource = (
- f"{ray._raylet.IMPLICIT_RESOURCE_PREFIX}"
- f"{self.deployment_id.app_name}:{self.deployment_id.name}"
- )
- required[implicit_resource] = 1.0 / self.max_replicas_per_node
- return required
- def is_non_strict_pack_pg(self) -> bool:
- return (
- self.placement_group_bundles is not None
- and self.placement_group_strategy != "STRICT_PACK"
- )
- @dataclass
- class LaunchingReplicaInfo:
- """Describes a replica for which a schedule request has been sent to
- core but has not been scheduled (placed on a node) yet.
- Args:
- target_node_id: The exact node that's been requested for this
- replica. This is best effort and may not be fulfilled.
- target_labels: The node labels that have been requested for this
- replica. This is best effort and may not be fulfilled.
- """
- target_node_id: Optional[str] = None
- target_labels: Optional[Dict[str, Any]] = None
- def _flatten(
- deployment_to_replicas: Dict[DeploymentID, Dict[ReplicaID, Any]]
- ) -> Dict[ReplicaID, Any]:
- """Flattens a dict of {deployment_id: {replica_id: val}} to {replica_id: val}."""
- return {
- replica_id: val
- for replicas in deployment_to_replicas.values()
- for replica_id, val in replicas.items()
- }
- class DeploymentScheduler(ABC):
- """A centralized scheduler for all Serve deployments.
- It makes a batch of scheduling decisions in each update cycle.
- """
- def __init__(
- self,
- cluster_node_info_cache: ClusterNodeInfoCache,
- head_node_id: str,
- create_placement_group_fn: Callable,
- ):
- # {deployment_id: scheduling_policy}
- self._deployments: Dict[DeploymentID, DeploymentSchedulingInfo] = {}
- # Replicas that are waiting to be scheduled.
- # {deployment_id: {replica_id: deployment_upscale_request}}
- self._pending_replicas: Dict[
- DeploymentID, Dict[str, ReplicaSchedulingRequest]
- ] = defaultdict(dict)
- # Replicas that are being scheduled.
- # The underlying actors have been submitted.
- # {deployment_id: {replica_id: target_node_id}}
- self._launching_replicas: Dict[
- DeploymentID, Dict[str, LaunchingReplicaInfo]
- ] = defaultdict(dict)
- # Replicas that are recovering.
- # We don't know where those replicas are running.
- # {deployment_id: {replica_id}}
- self._recovering_replicas = defaultdict(set)
- # Replicas that are running.
- # We know where those replicas are running.
- # {deployment_id: {replica_id: running_node_id}}
- self._running_replicas = defaultdict(dict)
- self._cluster_node_info_cache = cluster_node_info_cache
- self._head_node_id = head_node_id
- self._create_placement_group_fn = create_placement_group_fn
- def on_deployment_created(
- self,
- deployment_id: DeploymentID,
- scheduling_policy: SpreadDeploymentSchedulingPolicy,
- ) -> None:
- """Called whenever a new deployment is created."""
- assert deployment_id not in self._pending_replicas
- assert deployment_id not in self._launching_replicas
- assert deployment_id not in self._recovering_replicas
- assert deployment_id not in self._running_replicas
- self._deployments[deployment_id] = DeploymentSchedulingInfo(
- deployment_id=deployment_id, scheduling_policy=scheduling_policy
- )
- def on_deployment_deployed(
- self,
- deployment_id: DeploymentID,
- replica_config: ReplicaConfig,
- ) -> None:
- assert deployment_id in self._deployments
- info = self._deployments[deployment_id]
- info.actor_resources = Resources(replica_config.resource_dict)
- info.max_replicas_per_node = replica_config.max_replicas_per_node
- if replica_config.placement_group_bundles:
- info.placement_group_bundles = [
- Resources(bundle) for bundle in replica_config.placement_group_bundles
- ]
- if replica_config.placement_group_strategy:
- info.placement_group_strategy = replica_config.placement_group_strategy
- def on_deployment_deleted(self, deployment_id: DeploymentID) -> None:
- """Called whenever a deployment is deleted."""
- assert not self._pending_replicas[deployment_id]
- self._pending_replicas.pop(deployment_id, None)
- assert not self._launching_replicas[deployment_id]
- self._launching_replicas.pop(deployment_id, None)
- assert not self._recovering_replicas[deployment_id]
- self._recovering_replicas.pop(deployment_id, None)
- assert not self._running_replicas[deployment_id]
- self._running_replicas.pop(deployment_id, None)
- del self._deployments[deployment_id]
- def on_replica_stopping(self, replica_id: ReplicaID) -> None:
- """Called whenever a deployment replica is being stopped."""
- deployment_id = replica_id.deployment_id
- self._pending_replicas[deployment_id].pop(replica_id, None)
- self._launching_replicas[deployment_id].pop(replica_id, None)
- self._recovering_replicas[deployment_id].discard(replica_id)
- self._running_replicas[deployment_id].pop(replica_id, None)
- def on_replica_running(self, replica_id: ReplicaID, node_id: str) -> None:
- """Called whenever a deployment replica is running with a known node id."""
- deployment_id = replica_id.deployment_id
- assert replica_id not in self._pending_replicas[deployment_id]
- self._launching_replicas[deployment_id].pop(replica_id, None)
- self._recovering_replicas[deployment_id].discard(replica_id)
- self._running_replicas[deployment_id][replica_id] = node_id
- def on_replica_recovering(self, replica_id: ReplicaID) -> None:
- """Called whenever a deployment replica is recovering."""
- deployment_id = replica_id.deployment_id
- assert replica_id not in self._pending_replicas[deployment_id]
- assert replica_id not in self._launching_replicas[deployment_id]
- assert replica_id not in self._running_replicas[deployment_id]
- assert replica_id not in self._recovering_replicas[deployment_id]
- self._recovering_replicas[deployment_id].add(replica_id)
- def _on_replica_launching(
- self,
- replica_id: ReplicaID,
- target_node_id: Optional[str] = None,
- target_labels: Optional[Dict[str, Any]] = None,
- ):
- deployment_id = replica_id.deployment_id
- self._launching_replicas[deployment_id][replica_id] = LaunchingReplicaInfo(
- target_node_id=target_node_id, target_labels=target_labels
- )
- def _get_node_to_running_replicas(
- self, deployment_id: Optional[DeploymentID] = None
- ) -> Dict[str, Set[ReplicaID]]:
- res = defaultdict(set)
- if deployment_id:
- for replica_id, node_id in self._running_replicas[deployment_id].items():
- res[node_id].add(replica_id)
- else:
- for _, replicas in self._running_replicas.items():
- for replica_id, node_id in replicas.items():
- res[node_id].add(replica_id)
- return res
- def _get_available_resources_per_node(self) -> Dict[str, Resources]:
- """Gets current available resources per node.
- This returns a conservative view of the available resources
- currently in the cluster. It returns the minimum of:
- 1. The available resources per node fetched and cached from the
- GCS every control loop.
- 2. The remaining resources left over on each node after
- subtracting the resources taken up by running (already
- scheduled by core) and launching (to-be-scheduled and soft
- targeting that node) replicas.
- Note that (1) may not be accurate because it uses cached info
- and there is a delay from fetching data from GCS, and (2) may
- not be accurate because there can be other actors (not replicas)
- running in the cluster, and launching replicas may not end up on
- the node we're targeting. So the information returned from this
- method is only best effort.
- """
- available_resources = (
- self._cluster_node_info_cache.get_available_resources_per_node()
- )
- total_resources = self._cluster_node_info_cache.get_total_resources_per_node()
- gcs_info = {node_id: Resources(r) for node_id, r in available_resources.items()}
- # Manually calculate available resources per node by subtracting
- # launching and running replicas from total resources
- total_minus_replicas = {
- node_id: Resources(resources)
- for node_id, resources in total_resources.items()
- }
- for deployment_id, replicas in self._launching_replicas.items():
- deployment = self._deployments[deployment_id]
- for info in replicas.values():
- target_node_id = info.target_node_id
- if not target_node_id or target_node_id not in total_minus_replicas:
- continue
- total_minus_replicas[target_node_id] -= deployment.required_resources
- for deployment_id, replicas in self._running_replicas.items():
- deployment = self._deployments[deployment_id]
- for node_id in replicas.values():
- if node_id not in total_minus_replicas:
- continue
- total_minus_replicas[node_id] -= deployment.required_resources
- def custom_min(a: Resources, b: Resources):
- keys = set(a.keys()) | set(b.keys())
- res = Resources()
- for key in keys:
- res[key] = min(a.get(key), b.get(key))
- return res
- # Filter by active node ids (alive but not draining)
- return {
- node_id: custom_min(
- gcs_info.get(node_id, Resources()),
- total_minus_replicas.get(node_id, Resources()),
- )
- for node_id in self._cluster_node_info_cache.get_active_node_ids()
- }
- def _best_fit_node(
- self, required_resources: Resources, available_resources: Dict[str, Resources]
- ) -> Optional[str]:
- """Chooses a node using best fit strategy.
- This strategy picks the node where, if the required resources
- were to be scheduled on that node, it will leave the smallest
- remaining space. This minimizes fragmentation of resources.
- """
- min_remaining_space = None
- chosen_node = None
- for node_id in available_resources:
- if not available_resources[node_id].can_fit(required_resources):
- continue
- # TODO(zcin): We can make this better by only considering
- # custom resources that required_resources has.
- remaining_space = available_resources[node_id] - required_resources
- if min_remaining_space is None or remaining_space < min_remaining_space:
- min_remaining_space = remaining_space
- chosen_node = node_id
- return chosen_node
- @abstractmethod
- def schedule(
- self,
- upscales: Dict[DeploymentID, List[ReplicaSchedulingRequest]],
- downscales: Dict[DeploymentID, DeploymentDownscaleRequest],
- ) -> Dict[DeploymentID, Set[ReplicaID]]:
- """Called for each update cycle to do batch scheduling.
- Args:
- upscales: a dict of deployment name to a list of replicas to schedule.
- downscales: a dict of deployment name to a downscale request.
- Returns:
- The name of replicas to stop for each deployment.
- """
- raise NotImplementedError
- def _schedule_replica(
- self,
- scheduling_request: ReplicaSchedulingRequest,
- default_scheduling_strategy: str,
- target_node_id: Optional[str] = None,
- target_labels: Optional[LabelMatchExpressionsT] = None,
- ):
- """Schedule a replica from a scheduling request.
- The following special scheduling strategies will be used, in
- order of highest to lowest priority.
- 1. If a replica requires placement groups, we will choose to use
- a `PlacementGroupSchedulingStrategy`. This can also take a
- target node into consideration (soft target), if provided.
- However it cannot take into account target labels.
- 2. If a `target_node_id` is provided, we will choose to use a
- `NodeAffinitySchedulingStrategy`.
- 3. If `target_labels` is provided, we will choose to use a
- `NodeLabelSchedulingStrategy`.
- Args:
- scheduling_request: A request to schedule a replica.
- default_scheduling_strategy: The scheduling strategy to fall
- back to if no special scheduling strategy is necessary.
- target_node_id: Attempt to schedule this replica onto this
- target node.
- target_labels: Attempt to schedule this replica onto nodes
- with these target labels.
- """
- replica_id = scheduling_request.replica_id
- deployment_id = replica_id.deployment_id
- placement_group = None
- scheduling_strategy = default_scheduling_strategy
- if scheduling_request.placement_group_bundles is not None:
- placement_group_strategy = (
- scheduling_request.placement_group_strategy
- if scheduling_request.placement_group_strategy
- else "PACK"
- )
- try:
- pg = self._create_placement_group_fn(
- CreatePlacementGroupRequest(
- bundles=scheduling_request.placement_group_bundles,
- strategy=placement_group_strategy,
- target_node_id=target_node_id,
- name=scheduling_request.actor_options["name"],
- bundle_label_selector=scheduling_request.placement_group_bundle_label_selector,
- )
- )
- except Exception:
- # We add a defensive exception here, so the controller can
- # make progress even if the placement group isn't created.
- # See https://github.com/ray-project/ray/issues/43888.
- logger.exception(
- f"Failed to create a placement group for {replica_id}."
- )
- scheduling_request.status = (
- ReplicaSchedulingRequestStatus.PLACEMENT_GROUP_CREATION_FAILED
- )
- return
- scheduling_strategy = PlacementGroupSchedulingStrategy(
- placement_group=pg,
- placement_group_capture_child_tasks=True,
- )
- target_labels = None
- elif target_node_id is not None:
- scheduling_strategy = NodeAffinitySchedulingStrategy(
- node_id=target_node_id, soft=True, _spill_on_unavailable=True
- )
- target_labels = None
- elif target_labels is not None:
- scheduling_strategy = NodeLabelSchedulingStrategy(
- hard={}, soft=target_labels
- )
- target_node_id = None
- actor_options = copy.deepcopy(scheduling_request.actor_options)
- if scheduling_request.max_replicas_per_node is not None:
- if "resources" not in actor_options:
- actor_options["resources"] = {}
- # Using implicit resource (resources that every node
- # implicitly has and total is 1)
- # to limit the number of replicas on a single node.
- actor_options["resources"][
- f"{ray._raylet.IMPLICIT_RESOURCE_PREFIX}"
- f"{deployment_id.app_name}:{deployment_id.name}"
- ] = (1.0 / scheduling_request.max_replicas_per_node)
- try:
- actor_handle = scheduling_request.actor_def.options(
- scheduling_strategy=scheduling_strategy,
- **actor_options,
- ).remote(*scheduling_request.actor_init_args)
- except Exception:
- # We add a defensive exception here, so the controller can
- # make progress even if the actor options are misconfigured.
- logger.exception(f"Failed to create an actor for {replica_id}.")
- scheduling_request.status = (
- ReplicaSchedulingRequestStatus.ACTOR_CREATION_FAILED
- )
- return
- del self._pending_replicas[deployment_id][replica_id]
- self._on_replica_launching(
- replica_id, target_node_id=target_node_id, target_labels=target_labels
- )
- if isinstance(scheduling_strategy, PlacementGroupSchedulingStrategy):
- placement_group = scheduling_strategy.placement_group
- scheduling_request.status = ReplicaSchedulingRequestStatus.SUCCEEDED
- scheduling_request.on_scheduled(actor_handle, placement_group=placement_group)
- @abstractmethod
- def get_node_to_compact(
- self, allow_new_compaction: bool
- ) -> Optional[Tuple[str, float]]:
- """Returns a node ID to be compacted and a compaction deadlne."""
- raise NotImplementedError
- class DefaultDeploymentScheduler(DeploymentScheduler):
- def schedule(
- self,
- upscales: Dict[DeploymentID, List[ReplicaSchedulingRequest]],
- downscales: Dict[DeploymentID, DeploymentDownscaleRequest],
- ) -> Dict[DeploymentID, Set[ReplicaID]]:
- """Called for each update cycle to do batch scheduling.
- Args:
- upscales: a dict of deployment name to a list of replicas to schedule.
- downscales: a dict of deployment name to a downscale request.
- Returns:
- The IDs of replicas to stop for each deployment.
- """
- # Update pending replicas from upscales.
- for upscale in upscales.values():
- for scheduling_request in upscale:
- replica_id = scheduling_request.replica_id
- deployment_id = replica_id.deployment_id
- self._pending_replicas[deployment_id][replica_id] = scheduling_request
- # Check for deprecated environment variable usage
- if RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY:
- warnings.warn(
- "The environment variable 'RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY' "
- "is deprecated and will be removed in a v2.55.0 release. "
- "Please use 'RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY' instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- # Determine scheduling strategy
- non_strict_pack_pgs_exist = any(
- d.is_non_strict_pack_pg() for d in self._deployments.values()
- )
- use_pack_strategy = (
- RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY and not non_strict_pack_pgs_exist
- )
- if use_pack_strategy:
- # This branch is only reached if each deployment either:
- # 1. Use STRICT_PACK placement group strategy, or
- # 2. Do not use placement groups at all.
- # This ensures Serve's best-fit node selection is respected by Ray Core
- # (since _soft_target_node_id only works with STRICT_PACK).
- self._schedule_with_pack_strategy()
- else:
- self._schedule_with_spread_strategy()
- # Handle downscales
- deployment_to_replicas_to_stop = {}
- for downscale in downscales.values():
- deployment_to_replicas_to_stop[
- downscale.deployment_id
- ] = self._get_replicas_to_stop(
- downscale.deployment_id, downscale.num_to_stop
- )
- return deployment_to_replicas_to_stop
- def _schedule_with_pack_strategy(self):
- """Tries to schedule pending replicas using PACK strategy."""
- # Flatten dict of deployment replicas into all replicas,
- # then sort by decreasing resource size
- all_scheduling_requests = sorted(
- _flatten(self._pending_replicas).values(),
- key=lambda r: r.required_resources,
- reverse=True,
- )
- # Fetch node labels for active nodes.
- active_nodes = self._cluster_node_info_cache.get_active_node_ids()
- all_node_labels = {
- node_id: self._cluster_node_info_cache.get_node_labels(node_id)
- for node_id in active_nodes
- }
- for scheduling_request in all_scheduling_requests:
- self._pack_schedule_replica(scheduling_request, all_node_labels)
- def _schedule_with_spread_strategy(self):
- """Tries to schedule pending replicas using the SPREAD strategy."""
- for pending_replicas in self._pending_replicas.values():
- if not pending_replicas:
- continue
- for scheduling_request in list(pending_replicas.values()):
- self._schedule_replica(
- scheduling_request=scheduling_request,
- default_scheduling_strategy="SPREAD",
- )
- def _pack_schedule_replica(
- self,
- scheduling_request: ReplicaSchedulingRequest,
- all_node_labels: Dict[str, Dict[str, str]],
- ):
- """Attempts to schedule a single request on the best available node."""
- placement_candidates = self._build_pack_placement_candidates(scheduling_request)
- target_node = None
- for required_resources, required_labels in placement_candidates:
- target_node = self._find_best_fit_node_for_pack(
- required_resources,
- self._get_available_resources_per_node(),
- required_labels_list=required_labels,
- node_labels=all_node_labels,
- )
- if target_node:
- break
- self._schedule_replica(
- scheduling_request,
- default_scheduling_strategy="DEFAULT",
- target_node_id=target_node,
- )
- def _build_pack_placement_candidates(
- self, scheduling_request: ReplicaSchedulingRequest
- ) -> List[Tuple[Resources, List[Dict[str, str]]]]:
- """Returns a list of (resources, labels) tuples to attempt for scheduling."""
- # Collect a list of required resources and labels to try to schedule to
- # support replica compaction when fallback strategies are provided.
- placement_candidates = []
- primary_labels = []
- primary_bundles = scheduling_request.placement_group_bundles
- if primary_bundles:
- # PG: Use PG bundle_label_selector
- if scheduling_request.placement_group_bundle_label_selector:
- pg_strategy = scheduling_request.placement_group_strategy or None
- if pg_strategy == "STRICT_PACK":
- # All bundle_label_selectors must be satisfied on same node.
- primary_labels = (
- scheduling_request.placement_group_bundle_label_selector
- )
- else:
- # TODO(ryanaoleary@): Support PACK strategy with bundle label selectors.
- raise NotImplementedError(
- "Placement Group strategy 'PACK' with bundle_label_selector "
- "is not yet supported in the Serve scheduler."
- )
- else:
- # Actor: Use Actor label selector
- if "label_selector" in scheduling_request.actor_options:
- primary_labels = [
- scheduling_request.actor_options["label_selector"] or {}
- ]
- # If PG is defined on scheduling request, then `required_resources` represents the sum across all bundles.
- placement_candidates.append(
- (scheduling_request.required_resources, primary_labels)
- )
- if scheduling_request.placement_group_fallback_strategy:
- # TODO(ryanaoleary@): Add support for placement group fallback_strategy when it's added to options.
- raise NotImplementedError(
- "Placement Group fallback strategies are not yet supported in the Serve scheduler."
- )
- elif scheduling_request.actor_options.get("fallback_strategy"):
- # Fallback strategy provided for Ray Actor.
- for fallback in scheduling_request.actor_options["fallback_strategy"]:
- fallback_labels = [fallback.get("label_selector", {}) or {}]
- placement_candidates.append(
- (scheduling_request.required_resources, fallback_labels)
- )
- return placement_candidates
- def _get_replicas_to_stop(
- self, deployment_id: DeploymentID, max_num_to_stop: int
- ) -> Set[ReplicaID]:
- """Prioritize replicas running on a node with fewest replicas of
- all deployments.
- This algorithm helps to scale down more intelligently because it can
- relinquish nodes faster. Note that this algorithm doesn't consider
- other non-serve actors on the same node. See more at
- https://github.com/ray-project/ray/issues/20599.
- """
- replicas_to_stop = set()
- # Replicas not in running state don't have node id.
- # We will prioritize those first.
- pending_launching_recovering_replicas = set().union(
- self._pending_replicas[deployment_id].keys(),
- self._launching_replicas[deployment_id].keys(),
- self._recovering_replicas[deployment_id],
- )
- for (
- pending_launching_recovering_replica
- ) in pending_launching_recovering_replicas:
- replicas_to_stop.add(pending_launching_recovering_replica)
- if len(replicas_to_stop) == max_num_to_stop:
- return replicas_to_stop
- node_to_running_replicas_of_all_deployments = (
- self._get_node_to_running_replicas()
- )
- # _running_replicas preserves insertion order (oldest → newest).
- # Reverse once so we have newest → oldest, then bucket by node.
- ordered_running_replicas = list(self._running_replicas[deployment_id].items())
- ordered_running_replicas.reverse()
- ordered_running_replicas_of_target_deployment: Dict[
- str, List[ReplicaID]
- ] = defaultdict(list)
- for replica_id, replica_node_id in ordered_running_replicas:
- ordered_running_replicas_of_target_deployment[replica_node_id].append(
- replica_id
- )
- # Replicas on the head node has the lowest priority for downscaling
- # since we cannot relinquish the head node.
- def key(node_and_num_running_replicas_of_all_deployments):
- return (
- len(node_and_num_running_replicas_of_all_deployments[1])
- if node_and_num_running_replicas_of_all_deployments[0]
- != self._head_node_id
- else sys.maxsize
- )
- for node_id, _ in sorted(
- node_to_running_replicas_of_all_deployments.items(), key=key
- ):
- if node_id not in ordered_running_replicas_of_target_deployment:
- continue
- # Newest-first list for this node.
- for replica_id in ordered_running_replicas_of_target_deployment[node_id]:
- replicas_to_stop.add(replica_id)
- if len(replicas_to_stop) == max_num_to_stop:
- return replicas_to_stop
- return replicas_to_stop
- def _filter_nodes_by_label_selector(
- self,
- available_nodes: Dict[str, Resources],
- required_labels: Dict[str, str],
- node_labels: Dict[str, Dict[str, str]],
- ) -> Dict[str, Resources]:
- """Filters available nodes based on label selector constraints."""
- return {
- node_id: resources
- for node_id, resources in available_nodes.items()
- if node_labels_match_selector(node_labels.get(node_id, {}), required_labels)
- }
- def _find_best_fit_node_for_pack(
- self,
- required_resources: Resources,
- available_resources_per_node: Dict[str, Resources],
- required_labels_list: Optional[List[Dict[str, str]]] = None,
- node_labels: Optional[Dict[str, Dict[str, str]]] = None,
- ) -> Optional[str]:
- """Chooses best available node to schedule the required resources.
- If there are available nodes, returns the node ID of the best
- available node, minimizing fragmentation. Prefers non-idle nodes
- over idle nodes.
- """
- # Filter feasible nodes by provided label selectors if provided.
- if required_labels_list and node_labels:
- for required_labels in required_labels_list:
- available_resources_per_node = self._filter_nodes_by_label_selector(
- available_resources_per_node, required_labels, node_labels
- )
- if not available_resources_per_node:
- return None
- node_to_running_replicas = self._get_node_to_running_replicas()
- non_idle_nodes = {
- node_id: res
- for node_id, res in available_resources_per_node.items()
- if len(node_to_running_replicas.get(node_id, set())) > 0
- }
- idle_nodes = {
- node_id: res
- for node_id, res in available_resources_per_node.items()
- if len(node_to_running_replicas.get(node_id, set())) == 0
- }
- # 1. Prefer non-idle nodes
- chosen_node = self._best_fit_node(required_resources, non_idle_nodes)
- if chosen_node:
- return chosen_node
- # 2. Consider idle nodes last
- chosen_node = self._best_fit_node(required_resources, idle_nodes)
- if chosen_node:
- return chosen_node
- def get_node_to_compact(
- self, allow_new_compaction: bool
- ) -> Optional[Tuple[str, float]]:
- return None
|