deployment_scheduler.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955
  1. import copy
  2. import logging
  3. import sys
  4. import warnings
  5. from abc import ABC, abstractmethod
  6. from collections import defaultdict
  7. from dataclasses import dataclass
  8. from enum import Enum
  9. from functools import total_ordering
  10. from typing import Any, Callable, Dict, List, Optional, Set, Tuple
  11. import ray
  12. from ray._raylet import node_labels_match_selector
  13. from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache
  14. from ray.serve._private.common import (
  15. CreatePlacementGroupRequest,
  16. DeploymentID,
  17. ReplicaID,
  18. )
  19. from ray.serve._private.config import ReplicaConfig
  20. from ray.serve._private.constants import (
  21. RAY_SERVE_HIGH_PRIORITY_CUSTOM_RESOURCES,
  22. RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY,
  23. RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY,
  24. SERVE_LOGGER_NAME,
  25. )
  26. from ray.util.scheduling_strategies import (
  27. LabelMatchExpressionsT,
  28. NodeAffinitySchedulingStrategy,
  29. NodeLabelSchedulingStrategy,
  30. PlacementGroupSchedulingStrategy,
  31. )
  32. logger = logging.getLogger(SERVE_LOGGER_NAME)
  33. class SpreadDeploymentSchedulingPolicy:
  34. """A scheduling policy that spreads replicas with best effort."""
  35. pass
  36. @total_ordering
  37. class Resources(dict):
  38. # Custom resource priority from environment variable
  39. CUSTOM_PRIORITY: List[str] = RAY_SERVE_HIGH_PRIORITY_CUSTOM_RESOURCES
  40. EPSILON = 1e-9
  41. def get(self, key: str):
  42. val = super().get(key)
  43. if val is not None:
  44. return val
  45. # Implicit resources by default have 1 total
  46. if key.startswith(ray._raylet.IMPLICIT_RESOURCE_PREFIX):
  47. return 1
  48. # Otherwise by default there is 0 of this resource
  49. return 0
  50. def can_fit(self, other):
  51. keys = set(self.keys()) | set(other.keys())
  52. # We add a small epsilon to avoid floating point precision issues.
  53. return all(self.get(k) + self.EPSILON >= other.get(k) for k in keys)
  54. def __eq__(self, other):
  55. keys = set(self.keys()) | set(other.keys())
  56. return all([self.get(k) == other.get(k) for k in keys])
  57. def __add__(self, other):
  58. keys = set(self.keys()) | set(other.keys())
  59. kwargs = dict()
  60. for key in keys:
  61. if key.startswith(ray._raylet.IMPLICIT_RESOURCE_PREFIX):
  62. kwargs[key] = min(1.0, self.get(key) + other.get(key))
  63. else:
  64. kwargs[key] = self.get(key) + other.get(key)
  65. return Resources(kwargs)
  66. def __sub__(self, other):
  67. keys = set(self.keys()) | set(other.keys())
  68. kwargs = {key: self.get(key) - other.get(key) for key in keys}
  69. return Resources(kwargs)
  70. def __lt__(self, other):
  71. """Determines priority when sorting a list of SoftResources.
  72. 1. Custom resources defined in RAY_SERVE_HIGH_PRIORITY_CUSTOM_RESOURCES (sorted by priority)
  73. 2. GPU
  74. 3. CPU
  75. 4. memory
  76. 5. Other custom resources
  77. This means a resource with a larger number of high-priority resources is always
  78. sorted higher than one with fewer, regardless of other types.
  79. """
  80. keys = set(self.keys()) | set(other.keys())
  81. custom_keys = keys - {"GPU", "CPU", "memory"}
  82. for key in self.CUSTOM_PRIORITY:
  83. if self.get(key) < other.get(key):
  84. return True
  85. elif self.get(key) > other.get(key):
  86. return False
  87. if self.get("GPU") < other.get("GPU"):
  88. return True
  89. elif self.get("GPU") > other.get("GPU"):
  90. return False
  91. if self.get("CPU") < other.get("CPU"):
  92. return True
  93. elif self.get("CPU") > other.get("CPU"):
  94. return False
  95. if self.get("memory") < other.get("memory"):
  96. return True
  97. elif self.get("memory") > other.get("memory"):
  98. return False
  99. for key in custom_keys - set(self.CUSTOM_PRIORITY):
  100. if self.get(key) < other.get(key):
  101. return True
  102. elif self.get(key) > other.get(key):
  103. return False
  104. return False
  105. class ReplicaSchedulingRequestStatus(str, Enum):
  106. """The status of a replica scheduling request."""
  107. IN_PROGRESS = "IN_PROGRESS"
  108. SUCCEEDED = "SUCCEEDED"
  109. ACTOR_CREATION_FAILED = "ACTOR_CREATION_FAILED"
  110. PLACEMENT_GROUP_CREATION_FAILED = "PLACEMENT_GROUP_CREATION_FAILED"
  111. @dataclass
  112. class ReplicaSchedulingRequest:
  113. """Request to schedule a single replica.
  114. The scheduler is responsible for scheduling
  115. based on the deployment scheduling policy.
  116. """
  117. replica_id: ReplicaID
  118. actor_def: ray.actor.ActorClass
  119. actor_resources: Dict
  120. actor_options: Dict
  121. actor_init_args: Tuple
  122. on_scheduled: Callable
  123. status: ReplicaSchedulingRequestStatus = ReplicaSchedulingRequestStatus.IN_PROGRESS
  124. # Placement group bundles and strategy *for this replica*.
  125. # These are optional: by default replicas do not have a placement group.
  126. placement_group_bundles: Optional[List[Dict[str, float]]] = None
  127. placement_group_strategy: Optional[str] = None
  128. placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None
  129. placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None
  130. max_replicas_per_node: Optional[int] = None
  131. @property
  132. def required_resources(self) -> Resources:
  133. """The resources required to schedule this replica on a node.
  134. If this replica uses a strict pack placement group, the
  135. required resources is the sum of the placement group bundles.
  136. Otherwise, required resources is simply the actor resources.
  137. """
  138. if (
  139. self.placement_group_bundles is not None
  140. and self.placement_group_strategy == "STRICT_PACK"
  141. ):
  142. return sum(
  143. [Resources(bundle) for bundle in self.placement_group_bundles],
  144. Resources(),
  145. )
  146. else:
  147. required = Resources(self.actor_resources)
  148. # Using implicit resource (resources that every node
  149. # implicitly has and total is 1)
  150. # to limit the number of replicas on a single node.
  151. if self.max_replicas_per_node is not None:
  152. deployment_id = self.replica_id.deployment_id
  153. implicit_resource = (
  154. f"{ray._raylet.IMPLICIT_RESOURCE_PREFIX}"
  155. f"{deployment_id.app_name}:{deployment_id.name}"
  156. )
  157. required[implicit_resource] = 1.0 / self.max_replicas_per_node
  158. return required
  159. @dataclass
  160. class DeploymentDownscaleRequest:
  161. """Request to stop a certain number of replicas.
  162. The scheduler is responsible for
  163. choosing the replicas to stop.
  164. """
  165. deployment_id: DeploymentID
  166. num_to_stop: int
  167. @dataclass
  168. class DeploymentSchedulingInfo:
  169. deployment_id: DeploymentID
  170. scheduling_policy: Any
  171. actor_resources: Optional[Resources] = None
  172. placement_group_bundles: Optional[List[Resources]] = None
  173. placement_group_strategy: Optional[str] = None
  174. max_replicas_per_node: Optional[int] = None
  175. @property
  176. def required_resources(self) -> Resources:
  177. """The resources required to schedule a replica of this deployment on a node.
  178. If this replicas uses a strict pack placement group, the
  179. required resources is the sum of the placement group bundles.
  180. Otherwise, required resources is simply the actor resources.
  181. """
  182. if (
  183. self.placement_group_bundles is not None
  184. and self.placement_group_strategy == "STRICT_PACK"
  185. ):
  186. return sum(self.placement_group_bundles, Resources())
  187. else:
  188. required = self.actor_resources
  189. # Using implicit resource (resources that every node
  190. # implicitly has and total is 1)
  191. # to limit the number of replicas on a single node.
  192. if self.max_replicas_per_node:
  193. implicit_resource = (
  194. f"{ray._raylet.IMPLICIT_RESOURCE_PREFIX}"
  195. f"{self.deployment_id.app_name}:{self.deployment_id.name}"
  196. )
  197. required[implicit_resource] = 1.0 / self.max_replicas_per_node
  198. return required
  199. def is_non_strict_pack_pg(self) -> bool:
  200. return (
  201. self.placement_group_bundles is not None
  202. and self.placement_group_strategy != "STRICT_PACK"
  203. )
  204. @dataclass
  205. class LaunchingReplicaInfo:
  206. """Describes a replica for which a schedule request has been sent to
  207. core but has not been scheduled (placed on a node) yet.
  208. Args:
  209. target_node_id: The exact node that's been requested for this
  210. replica. This is best effort and may not be fulfilled.
  211. target_labels: The node labels that have been requested for this
  212. replica. This is best effort and may not be fulfilled.
  213. """
  214. target_node_id: Optional[str] = None
  215. target_labels: Optional[Dict[str, Any]] = None
  216. def _flatten(
  217. deployment_to_replicas: Dict[DeploymentID, Dict[ReplicaID, Any]]
  218. ) -> Dict[ReplicaID, Any]:
  219. """Flattens a dict of {deployment_id: {replica_id: val}} to {replica_id: val}."""
  220. return {
  221. replica_id: val
  222. for replicas in deployment_to_replicas.values()
  223. for replica_id, val in replicas.items()
  224. }
  225. class DeploymentScheduler(ABC):
  226. """A centralized scheduler for all Serve deployments.
  227. It makes a batch of scheduling decisions in each update cycle.
  228. """
  229. def __init__(
  230. self,
  231. cluster_node_info_cache: ClusterNodeInfoCache,
  232. head_node_id: str,
  233. create_placement_group_fn: Callable,
  234. ):
  235. # {deployment_id: scheduling_policy}
  236. self._deployments: Dict[DeploymentID, DeploymentSchedulingInfo] = {}
  237. # Replicas that are waiting to be scheduled.
  238. # {deployment_id: {replica_id: deployment_upscale_request}}
  239. self._pending_replicas: Dict[
  240. DeploymentID, Dict[str, ReplicaSchedulingRequest]
  241. ] = defaultdict(dict)
  242. # Replicas that are being scheduled.
  243. # The underlying actors have been submitted.
  244. # {deployment_id: {replica_id: target_node_id}}
  245. self._launching_replicas: Dict[
  246. DeploymentID, Dict[str, LaunchingReplicaInfo]
  247. ] = defaultdict(dict)
  248. # Replicas that are recovering.
  249. # We don't know where those replicas are running.
  250. # {deployment_id: {replica_id}}
  251. self._recovering_replicas = defaultdict(set)
  252. # Replicas that are running.
  253. # We know where those replicas are running.
  254. # {deployment_id: {replica_id: running_node_id}}
  255. self._running_replicas = defaultdict(dict)
  256. self._cluster_node_info_cache = cluster_node_info_cache
  257. self._head_node_id = head_node_id
  258. self._create_placement_group_fn = create_placement_group_fn
  259. def on_deployment_created(
  260. self,
  261. deployment_id: DeploymentID,
  262. scheduling_policy: SpreadDeploymentSchedulingPolicy,
  263. ) -> None:
  264. """Called whenever a new deployment is created."""
  265. assert deployment_id not in self._pending_replicas
  266. assert deployment_id not in self._launching_replicas
  267. assert deployment_id not in self._recovering_replicas
  268. assert deployment_id not in self._running_replicas
  269. self._deployments[deployment_id] = DeploymentSchedulingInfo(
  270. deployment_id=deployment_id, scheduling_policy=scheduling_policy
  271. )
  272. def on_deployment_deployed(
  273. self,
  274. deployment_id: DeploymentID,
  275. replica_config: ReplicaConfig,
  276. ) -> None:
  277. assert deployment_id in self._deployments
  278. info = self._deployments[deployment_id]
  279. info.actor_resources = Resources(replica_config.resource_dict)
  280. info.max_replicas_per_node = replica_config.max_replicas_per_node
  281. if replica_config.placement_group_bundles:
  282. info.placement_group_bundles = [
  283. Resources(bundle) for bundle in replica_config.placement_group_bundles
  284. ]
  285. if replica_config.placement_group_strategy:
  286. info.placement_group_strategy = replica_config.placement_group_strategy
  287. def on_deployment_deleted(self, deployment_id: DeploymentID) -> None:
  288. """Called whenever a deployment is deleted."""
  289. assert not self._pending_replicas[deployment_id]
  290. self._pending_replicas.pop(deployment_id, None)
  291. assert not self._launching_replicas[deployment_id]
  292. self._launching_replicas.pop(deployment_id, None)
  293. assert not self._recovering_replicas[deployment_id]
  294. self._recovering_replicas.pop(deployment_id, None)
  295. assert not self._running_replicas[deployment_id]
  296. self._running_replicas.pop(deployment_id, None)
  297. del self._deployments[deployment_id]
  298. def on_replica_stopping(self, replica_id: ReplicaID) -> None:
  299. """Called whenever a deployment replica is being stopped."""
  300. deployment_id = replica_id.deployment_id
  301. self._pending_replicas[deployment_id].pop(replica_id, None)
  302. self._launching_replicas[deployment_id].pop(replica_id, None)
  303. self._recovering_replicas[deployment_id].discard(replica_id)
  304. self._running_replicas[deployment_id].pop(replica_id, None)
  305. def on_replica_running(self, replica_id: ReplicaID, node_id: str) -> None:
  306. """Called whenever a deployment replica is running with a known node id."""
  307. deployment_id = replica_id.deployment_id
  308. assert replica_id not in self._pending_replicas[deployment_id]
  309. self._launching_replicas[deployment_id].pop(replica_id, None)
  310. self._recovering_replicas[deployment_id].discard(replica_id)
  311. self._running_replicas[deployment_id][replica_id] = node_id
  312. def on_replica_recovering(self, replica_id: ReplicaID) -> None:
  313. """Called whenever a deployment replica is recovering."""
  314. deployment_id = replica_id.deployment_id
  315. assert replica_id not in self._pending_replicas[deployment_id]
  316. assert replica_id not in self._launching_replicas[deployment_id]
  317. assert replica_id not in self._running_replicas[deployment_id]
  318. assert replica_id not in self._recovering_replicas[deployment_id]
  319. self._recovering_replicas[deployment_id].add(replica_id)
  320. def _on_replica_launching(
  321. self,
  322. replica_id: ReplicaID,
  323. target_node_id: Optional[str] = None,
  324. target_labels: Optional[Dict[str, Any]] = None,
  325. ):
  326. deployment_id = replica_id.deployment_id
  327. self._launching_replicas[deployment_id][replica_id] = LaunchingReplicaInfo(
  328. target_node_id=target_node_id, target_labels=target_labels
  329. )
  330. def _get_node_to_running_replicas(
  331. self, deployment_id: Optional[DeploymentID] = None
  332. ) -> Dict[str, Set[ReplicaID]]:
  333. res = defaultdict(set)
  334. if deployment_id:
  335. for replica_id, node_id in self._running_replicas[deployment_id].items():
  336. res[node_id].add(replica_id)
  337. else:
  338. for _, replicas in self._running_replicas.items():
  339. for replica_id, node_id in replicas.items():
  340. res[node_id].add(replica_id)
  341. return res
  342. def _get_available_resources_per_node(self) -> Dict[str, Resources]:
  343. """Gets current available resources per node.
  344. This returns a conservative view of the available resources
  345. currently in the cluster. It returns the minimum of:
  346. 1. The available resources per node fetched and cached from the
  347. GCS every control loop.
  348. 2. The remaining resources left over on each node after
  349. subtracting the resources taken up by running (already
  350. scheduled by core) and launching (to-be-scheduled and soft
  351. targeting that node) replicas.
  352. Note that (1) may not be accurate because it uses cached info
  353. and there is a delay from fetching data from GCS, and (2) may
  354. not be accurate because there can be other actors (not replicas)
  355. running in the cluster, and launching replicas may not end up on
  356. the node we're targeting. So the information returned from this
  357. method is only best effort.
  358. """
  359. available_resources = (
  360. self._cluster_node_info_cache.get_available_resources_per_node()
  361. )
  362. total_resources = self._cluster_node_info_cache.get_total_resources_per_node()
  363. gcs_info = {node_id: Resources(r) for node_id, r in available_resources.items()}
  364. # Manually calculate available resources per node by subtracting
  365. # launching and running replicas from total resources
  366. total_minus_replicas = {
  367. node_id: Resources(resources)
  368. for node_id, resources in total_resources.items()
  369. }
  370. for deployment_id, replicas in self._launching_replicas.items():
  371. deployment = self._deployments[deployment_id]
  372. for info in replicas.values():
  373. target_node_id = info.target_node_id
  374. if not target_node_id or target_node_id not in total_minus_replicas:
  375. continue
  376. total_minus_replicas[target_node_id] -= deployment.required_resources
  377. for deployment_id, replicas in self._running_replicas.items():
  378. deployment = self._deployments[deployment_id]
  379. for node_id in replicas.values():
  380. if node_id not in total_minus_replicas:
  381. continue
  382. total_minus_replicas[node_id] -= deployment.required_resources
  383. def custom_min(a: Resources, b: Resources):
  384. keys = set(a.keys()) | set(b.keys())
  385. res = Resources()
  386. for key in keys:
  387. res[key] = min(a.get(key), b.get(key))
  388. return res
  389. # Filter by active node ids (alive but not draining)
  390. return {
  391. node_id: custom_min(
  392. gcs_info.get(node_id, Resources()),
  393. total_minus_replicas.get(node_id, Resources()),
  394. )
  395. for node_id in self._cluster_node_info_cache.get_active_node_ids()
  396. }
  397. def _best_fit_node(
  398. self, required_resources: Resources, available_resources: Dict[str, Resources]
  399. ) -> Optional[str]:
  400. """Chooses a node using best fit strategy.
  401. This strategy picks the node where, if the required resources
  402. were to be scheduled on that node, it will leave the smallest
  403. remaining space. This minimizes fragmentation of resources.
  404. """
  405. min_remaining_space = None
  406. chosen_node = None
  407. for node_id in available_resources:
  408. if not available_resources[node_id].can_fit(required_resources):
  409. continue
  410. # TODO(zcin): We can make this better by only considering
  411. # custom resources that required_resources has.
  412. remaining_space = available_resources[node_id] - required_resources
  413. if min_remaining_space is None or remaining_space < min_remaining_space:
  414. min_remaining_space = remaining_space
  415. chosen_node = node_id
  416. return chosen_node
  417. @abstractmethod
  418. def schedule(
  419. self,
  420. upscales: Dict[DeploymentID, List[ReplicaSchedulingRequest]],
  421. downscales: Dict[DeploymentID, DeploymentDownscaleRequest],
  422. ) -> Dict[DeploymentID, Set[ReplicaID]]:
  423. """Called for each update cycle to do batch scheduling.
  424. Args:
  425. upscales: a dict of deployment name to a list of replicas to schedule.
  426. downscales: a dict of deployment name to a downscale request.
  427. Returns:
  428. The name of replicas to stop for each deployment.
  429. """
  430. raise NotImplementedError
  431. def _schedule_replica(
  432. self,
  433. scheduling_request: ReplicaSchedulingRequest,
  434. default_scheduling_strategy: str,
  435. target_node_id: Optional[str] = None,
  436. target_labels: Optional[LabelMatchExpressionsT] = None,
  437. ):
  438. """Schedule a replica from a scheduling request.
  439. The following special scheduling strategies will be used, in
  440. order of highest to lowest priority.
  441. 1. If a replica requires placement groups, we will choose to use
  442. a `PlacementGroupSchedulingStrategy`. This can also take a
  443. target node into consideration (soft target), if provided.
  444. However it cannot take into account target labels.
  445. 2. If a `target_node_id` is provided, we will choose to use a
  446. `NodeAffinitySchedulingStrategy`.
  447. 3. If `target_labels` is provided, we will choose to use a
  448. `NodeLabelSchedulingStrategy`.
  449. Args:
  450. scheduling_request: A request to schedule a replica.
  451. default_scheduling_strategy: The scheduling strategy to fall
  452. back to if no special scheduling strategy is necessary.
  453. target_node_id: Attempt to schedule this replica onto this
  454. target node.
  455. target_labels: Attempt to schedule this replica onto nodes
  456. with these target labels.
  457. """
  458. replica_id = scheduling_request.replica_id
  459. deployment_id = replica_id.deployment_id
  460. placement_group = None
  461. scheduling_strategy = default_scheduling_strategy
  462. if scheduling_request.placement_group_bundles is not None:
  463. placement_group_strategy = (
  464. scheduling_request.placement_group_strategy
  465. if scheduling_request.placement_group_strategy
  466. else "PACK"
  467. )
  468. try:
  469. pg = self._create_placement_group_fn(
  470. CreatePlacementGroupRequest(
  471. bundles=scheduling_request.placement_group_bundles,
  472. strategy=placement_group_strategy,
  473. target_node_id=target_node_id,
  474. name=scheduling_request.actor_options["name"],
  475. bundle_label_selector=scheduling_request.placement_group_bundle_label_selector,
  476. )
  477. )
  478. except Exception:
  479. # We add a defensive exception here, so the controller can
  480. # make progress even if the placement group isn't created.
  481. # See https://github.com/ray-project/ray/issues/43888.
  482. logger.exception(
  483. f"Failed to create a placement group for {replica_id}."
  484. )
  485. scheduling_request.status = (
  486. ReplicaSchedulingRequestStatus.PLACEMENT_GROUP_CREATION_FAILED
  487. )
  488. return
  489. scheduling_strategy = PlacementGroupSchedulingStrategy(
  490. placement_group=pg,
  491. placement_group_capture_child_tasks=True,
  492. )
  493. target_labels = None
  494. elif target_node_id is not None:
  495. scheduling_strategy = NodeAffinitySchedulingStrategy(
  496. node_id=target_node_id, soft=True, _spill_on_unavailable=True
  497. )
  498. target_labels = None
  499. elif target_labels is not None:
  500. scheduling_strategy = NodeLabelSchedulingStrategy(
  501. hard={}, soft=target_labels
  502. )
  503. target_node_id = None
  504. actor_options = copy.deepcopy(scheduling_request.actor_options)
  505. if scheduling_request.max_replicas_per_node is not None:
  506. if "resources" not in actor_options:
  507. actor_options["resources"] = {}
  508. # Using implicit resource (resources that every node
  509. # implicitly has and total is 1)
  510. # to limit the number of replicas on a single node.
  511. actor_options["resources"][
  512. f"{ray._raylet.IMPLICIT_RESOURCE_PREFIX}"
  513. f"{deployment_id.app_name}:{deployment_id.name}"
  514. ] = (1.0 / scheduling_request.max_replicas_per_node)
  515. try:
  516. actor_handle = scheduling_request.actor_def.options(
  517. scheduling_strategy=scheduling_strategy,
  518. **actor_options,
  519. ).remote(*scheduling_request.actor_init_args)
  520. except Exception:
  521. # We add a defensive exception here, so the controller can
  522. # make progress even if the actor options are misconfigured.
  523. logger.exception(f"Failed to create an actor for {replica_id}.")
  524. scheduling_request.status = (
  525. ReplicaSchedulingRequestStatus.ACTOR_CREATION_FAILED
  526. )
  527. return
  528. del self._pending_replicas[deployment_id][replica_id]
  529. self._on_replica_launching(
  530. replica_id, target_node_id=target_node_id, target_labels=target_labels
  531. )
  532. if isinstance(scheduling_strategy, PlacementGroupSchedulingStrategy):
  533. placement_group = scheduling_strategy.placement_group
  534. scheduling_request.status = ReplicaSchedulingRequestStatus.SUCCEEDED
  535. scheduling_request.on_scheduled(actor_handle, placement_group=placement_group)
  536. @abstractmethod
  537. def get_node_to_compact(
  538. self, allow_new_compaction: bool
  539. ) -> Optional[Tuple[str, float]]:
  540. """Returns a node ID to be compacted and a compaction deadlne."""
  541. raise NotImplementedError
  542. class DefaultDeploymentScheduler(DeploymentScheduler):
  543. def schedule(
  544. self,
  545. upscales: Dict[DeploymentID, List[ReplicaSchedulingRequest]],
  546. downscales: Dict[DeploymentID, DeploymentDownscaleRequest],
  547. ) -> Dict[DeploymentID, Set[ReplicaID]]:
  548. """Called for each update cycle to do batch scheduling.
  549. Args:
  550. upscales: a dict of deployment name to a list of replicas to schedule.
  551. downscales: a dict of deployment name to a downscale request.
  552. Returns:
  553. The IDs of replicas to stop for each deployment.
  554. """
  555. # Update pending replicas from upscales.
  556. for upscale in upscales.values():
  557. for scheduling_request in upscale:
  558. replica_id = scheduling_request.replica_id
  559. deployment_id = replica_id.deployment_id
  560. self._pending_replicas[deployment_id][replica_id] = scheduling_request
  561. # Check for deprecated environment variable usage
  562. if RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY:
  563. warnings.warn(
  564. "The environment variable 'RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY' "
  565. "is deprecated and will be removed in a v2.55.0 release. "
  566. "Please use 'RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY' instead.",
  567. DeprecationWarning,
  568. stacklevel=2,
  569. )
  570. # Determine scheduling strategy
  571. non_strict_pack_pgs_exist = any(
  572. d.is_non_strict_pack_pg() for d in self._deployments.values()
  573. )
  574. use_pack_strategy = (
  575. RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY and not non_strict_pack_pgs_exist
  576. )
  577. if use_pack_strategy:
  578. # This branch is only reached if each deployment either:
  579. # 1. Use STRICT_PACK placement group strategy, or
  580. # 2. Do not use placement groups at all.
  581. # This ensures Serve's best-fit node selection is respected by Ray Core
  582. # (since _soft_target_node_id only works with STRICT_PACK).
  583. self._schedule_with_pack_strategy()
  584. else:
  585. self._schedule_with_spread_strategy()
  586. # Handle downscales
  587. deployment_to_replicas_to_stop = {}
  588. for downscale in downscales.values():
  589. deployment_to_replicas_to_stop[
  590. downscale.deployment_id
  591. ] = self._get_replicas_to_stop(
  592. downscale.deployment_id, downscale.num_to_stop
  593. )
  594. return deployment_to_replicas_to_stop
  595. def _schedule_with_pack_strategy(self):
  596. """Tries to schedule pending replicas using PACK strategy."""
  597. # Flatten dict of deployment replicas into all replicas,
  598. # then sort by decreasing resource size
  599. all_scheduling_requests = sorted(
  600. _flatten(self._pending_replicas).values(),
  601. key=lambda r: r.required_resources,
  602. reverse=True,
  603. )
  604. # Fetch node labels for active nodes.
  605. active_nodes = self._cluster_node_info_cache.get_active_node_ids()
  606. all_node_labels = {
  607. node_id: self._cluster_node_info_cache.get_node_labels(node_id)
  608. for node_id in active_nodes
  609. }
  610. for scheduling_request in all_scheduling_requests:
  611. self._pack_schedule_replica(scheduling_request, all_node_labels)
  612. def _schedule_with_spread_strategy(self):
  613. """Tries to schedule pending replicas using the SPREAD strategy."""
  614. for pending_replicas in self._pending_replicas.values():
  615. if not pending_replicas:
  616. continue
  617. for scheduling_request in list(pending_replicas.values()):
  618. self._schedule_replica(
  619. scheduling_request=scheduling_request,
  620. default_scheduling_strategy="SPREAD",
  621. )
  622. def _pack_schedule_replica(
  623. self,
  624. scheduling_request: ReplicaSchedulingRequest,
  625. all_node_labels: Dict[str, Dict[str, str]],
  626. ):
  627. """Attempts to schedule a single request on the best available node."""
  628. placement_candidates = self._build_pack_placement_candidates(scheduling_request)
  629. target_node = None
  630. for required_resources, required_labels in placement_candidates:
  631. target_node = self._find_best_fit_node_for_pack(
  632. required_resources,
  633. self._get_available_resources_per_node(),
  634. required_labels_list=required_labels,
  635. node_labels=all_node_labels,
  636. )
  637. if target_node:
  638. break
  639. self._schedule_replica(
  640. scheduling_request,
  641. default_scheduling_strategy="DEFAULT",
  642. target_node_id=target_node,
  643. )
  644. def _build_pack_placement_candidates(
  645. self, scheduling_request: ReplicaSchedulingRequest
  646. ) -> List[Tuple[Resources, List[Dict[str, str]]]]:
  647. """Returns a list of (resources, labels) tuples to attempt for scheduling."""
  648. # Collect a list of required resources and labels to try to schedule to
  649. # support replica compaction when fallback strategies are provided.
  650. placement_candidates = []
  651. primary_labels = []
  652. primary_bundles = scheduling_request.placement_group_bundles
  653. if primary_bundles:
  654. # PG: Use PG bundle_label_selector
  655. if scheduling_request.placement_group_bundle_label_selector:
  656. pg_strategy = scheduling_request.placement_group_strategy or None
  657. if pg_strategy == "STRICT_PACK":
  658. # All bundle_label_selectors must be satisfied on same node.
  659. primary_labels = (
  660. scheduling_request.placement_group_bundle_label_selector
  661. )
  662. else:
  663. # TODO(ryanaoleary@): Support PACK strategy with bundle label selectors.
  664. raise NotImplementedError(
  665. "Placement Group strategy 'PACK' with bundle_label_selector "
  666. "is not yet supported in the Serve scheduler."
  667. )
  668. else:
  669. # Actor: Use Actor label selector
  670. if "label_selector" in scheduling_request.actor_options:
  671. primary_labels = [
  672. scheduling_request.actor_options["label_selector"] or {}
  673. ]
  674. # If PG is defined on scheduling request, then `required_resources` represents the sum across all bundles.
  675. placement_candidates.append(
  676. (scheduling_request.required_resources, primary_labels)
  677. )
  678. if scheduling_request.placement_group_fallback_strategy:
  679. # TODO(ryanaoleary@): Add support for placement group fallback_strategy when it's added to options.
  680. raise NotImplementedError(
  681. "Placement Group fallback strategies are not yet supported in the Serve scheduler."
  682. )
  683. elif scheduling_request.actor_options.get("fallback_strategy"):
  684. # Fallback strategy provided for Ray Actor.
  685. for fallback in scheduling_request.actor_options["fallback_strategy"]:
  686. fallback_labels = [fallback.get("label_selector", {}) or {}]
  687. placement_candidates.append(
  688. (scheduling_request.required_resources, fallback_labels)
  689. )
  690. return placement_candidates
  691. def _get_replicas_to_stop(
  692. self, deployment_id: DeploymentID, max_num_to_stop: int
  693. ) -> Set[ReplicaID]:
  694. """Prioritize replicas running on a node with fewest replicas of
  695. all deployments.
  696. This algorithm helps to scale down more intelligently because it can
  697. relinquish nodes faster. Note that this algorithm doesn't consider
  698. other non-serve actors on the same node. See more at
  699. https://github.com/ray-project/ray/issues/20599.
  700. """
  701. replicas_to_stop = set()
  702. # Replicas not in running state don't have node id.
  703. # We will prioritize those first.
  704. pending_launching_recovering_replicas = set().union(
  705. self._pending_replicas[deployment_id].keys(),
  706. self._launching_replicas[deployment_id].keys(),
  707. self._recovering_replicas[deployment_id],
  708. )
  709. for (
  710. pending_launching_recovering_replica
  711. ) in pending_launching_recovering_replicas:
  712. replicas_to_stop.add(pending_launching_recovering_replica)
  713. if len(replicas_to_stop) == max_num_to_stop:
  714. return replicas_to_stop
  715. node_to_running_replicas_of_all_deployments = (
  716. self._get_node_to_running_replicas()
  717. )
  718. # _running_replicas preserves insertion order (oldest → newest).
  719. # Reverse once so we have newest → oldest, then bucket by node.
  720. ordered_running_replicas = list(self._running_replicas[deployment_id].items())
  721. ordered_running_replicas.reverse()
  722. ordered_running_replicas_of_target_deployment: Dict[
  723. str, List[ReplicaID]
  724. ] = defaultdict(list)
  725. for replica_id, replica_node_id in ordered_running_replicas:
  726. ordered_running_replicas_of_target_deployment[replica_node_id].append(
  727. replica_id
  728. )
  729. # Replicas on the head node has the lowest priority for downscaling
  730. # since we cannot relinquish the head node.
  731. def key(node_and_num_running_replicas_of_all_deployments):
  732. return (
  733. len(node_and_num_running_replicas_of_all_deployments[1])
  734. if node_and_num_running_replicas_of_all_deployments[0]
  735. != self._head_node_id
  736. else sys.maxsize
  737. )
  738. for node_id, _ in sorted(
  739. node_to_running_replicas_of_all_deployments.items(), key=key
  740. ):
  741. if node_id not in ordered_running_replicas_of_target_deployment:
  742. continue
  743. # Newest-first list for this node.
  744. for replica_id in ordered_running_replicas_of_target_deployment[node_id]:
  745. replicas_to_stop.add(replica_id)
  746. if len(replicas_to_stop) == max_num_to_stop:
  747. return replicas_to_stop
  748. return replicas_to_stop
  749. def _filter_nodes_by_label_selector(
  750. self,
  751. available_nodes: Dict[str, Resources],
  752. required_labels: Dict[str, str],
  753. node_labels: Dict[str, Dict[str, str]],
  754. ) -> Dict[str, Resources]:
  755. """Filters available nodes based on label selector constraints."""
  756. return {
  757. node_id: resources
  758. for node_id, resources in available_nodes.items()
  759. if node_labels_match_selector(node_labels.get(node_id, {}), required_labels)
  760. }
  761. def _find_best_fit_node_for_pack(
  762. self,
  763. required_resources: Resources,
  764. available_resources_per_node: Dict[str, Resources],
  765. required_labels_list: Optional[List[Dict[str, str]]] = None,
  766. node_labels: Optional[Dict[str, Dict[str, str]]] = None,
  767. ) -> Optional[str]:
  768. """Chooses best available node to schedule the required resources.
  769. If there are available nodes, returns the node ID of the best
  770. available node, minimizing fragmentation. Prefers non-idle nodes
  771. over idle nodes.
  772. """
  773. # Filter feasible nodes by provided label selectors if provided.
  774. if required_labels_list and node_labels:
  775. for required_labels in required_labels_list:
  776. available_resources_per_node = self._filter_nodes_by_label_selector(
  777. available_resources_per_node, required_labels, node_labels
  778. )
  779. if not available_resources_per_node:
  780. return None
  781. node_to_running_replicas = self._get_node_to_running_replicas()
  782. non_idle_nodes = {
  783. node_id: res
  784. for node_id, res in available_resources_per_node.items()
  785. if len(node_to_running_replicas.get(node_id, set())) > 0
  786. }
  787. idle_nodes = {
  788. node_id: res
  789. for node_id, res in available_resources_per_node.items()
  790. if len(node_to_running_replicas.get(node_id, set())) == 0
  791. }
  792. # 1. Prefer non-idle nodes
  793. chosen_node = self._best_fit_node(required_resources, non_idle_nodes)
  794. if chosen_node:
  795. return chosen_node
  796. # 2. Consider idle nodes last
  797. chosen_node = self._best_fit_node(required_resources, idle_nodes)
  798. if chosen_node:
  799. return chosen_node
  800. def get_node_to_compact(
  801. self, allow_new_compaction: bool
  802. ) -> Optional[Tuple[str, float]]:
  803. return None