autoscaling_state.py 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264
  1. import logging
  2. import math
  3. import time
  4. from collections import defaultdict
  5. from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
  6. from ray.serve._private.common import (
  7. RUNNING_REQUESTS_KEY,
  8. ApplicationName,
  9. AutoscalingSnapshotError,
  10. AutoscalingStatus,
  11. DeploymentID,
  12. DeploymentSnapshot,
  13. HandleMetricReport,
  14. ReplicaID,
  15. ReplicaMetricReport,
  16. TargetCapacityDirection,
  17. TimeSeries,
  18. )
  19. from ray.serve._private.constants import (
  20. RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER,
  21. RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S,
  22. SERVE_LOGGER_NAME,
  23. )
  24. from ray.serve._private.deployment_info import DeploymentInfo
  25. from ray.serve._private.metrics_utils import (
  26. aggregate_timeseries,
  27. merge_instantaneous_total,
  28. )
  29. from ray.serve._private.usage import ServeUsageTag
  30. from ray.serve._private.utils import get_capacity_adjusted_num_replicas
  31. from ray.serve.autoscaling_policy import (
  32. _apply_app_level_autoscaling_config,
  33. _apply_autoscaling_config,
  34. )
  35. from ray.serve.config import AutoscalingContext, AutoscalingPolicy
  36. from ray.util import metrics
  37. logger = logging.getLogger(SERVE_LOGGER_NAME)
  38. class DeploymentAutoscalingState:
  39. """Manages autoscaling for a single deployment."""
  40. def __init__(self, deployment_id: DeploymentID):
  41. self._deployment_id = deployment_id
  42. # Map from handle ID to handle request metric report. Metrics
  43. # are removed from this dict either when the actor on which the
  44. # handle lived dies, or after a period of no updates.
  45. self._handle_requests: Dict[str, HandleMetricReport] = dict()
  46. # Map from replica ID to replica request metric report. Metrics
  47. # are removed from this dict when a replica is stopped.
  48. # Prometheus + Custom metrics from each replica are also included
  49. self._replica_metrics: Dict[ReplicaID, ReplicaMetricReport] = dict()
  50. self._deployment_info = None
  51. self._config = None
  52. self._policy: Optional[
  53. Callable[
  54. [AutoscalingContext], Tuple[Union[int, float], Optional[Dict[str, Any]]]
  55. ]
  56. ] = None
  57. # user defined policy returns a dictionary of state that is persisted between autoscaling decisions
  58. # content of the dictionary is determined by the user defined policy
  59. self._policy_state: Optional[Dict[str, Any]] = None
  60. self._running_replicas: List[ReplicaID] = []
  61. self._target_capacity: Optional[float] = None
  62. self._target_capacity_direction: Optional[TargetCapacityDirection] = None
  63. self._cached_deployment_snapshot: Optional[DeploymentSnapshot] = None
  64. self._latest_metrics_timestamp: Optional[float] = None
  65. # Track timestamps of last scale up and scale down events
  66. self._last_scale_up_time: Optional[float] = None
  67. self._last_scale_down_time: Optional[float] = None
  68. self.autoscaling_decision_gauge = metrics.Gauge(
  69. "serve_autoscaling_desired_replicas",
  70. description=(
  71. "The raw autoscaling decision (number of replicas) from the autoscaling "
  72. "policy before applying min/max bounds."
  73. ),
  74. tag_keys=("deployment", "application"),
  75. )
  76. self.autoscaling_total_requests_gauge = metrics.Gauge(
  77. "serve_autoscaling_total_requests",
  78. description=(
  79. "Total number of requests as seen by the autoscaler. This is the input "
  80. "to the autoscaling decision."
  81. ),
  82. tag_keys=("deployment", "application"),
  83. )
  84. self.autoscaling_policy_execution_time_gauge = metrics.Gauge(
  85. "serve_autoscaling_policy_execution_time_ms",
  86. description=(
  87. "Time taken to execute the autoscaling policy in milliseconds. "
  88. "High values may indicate a slow or complex policy."
  89. ),
  90. tag_keys=("deployment", "application", "policy_scope"),
  91. )
  92. def register(self, info: DeploymentInfo, curr_target_num_replicas: int) -> int:
  93. """Registers an autoscaling deployment's info.
  94. Returns the number of replicas the target should be set to.
  95. """
  96. config = info.deployment_config.autoscaling_config
  97. if config is None:
  98. raise ValueError(
  99. f"Autoscaling config is not set for deployment {self._deployment_id}"
  100. )
  101. if (
  102. self._deployment_info is None or self._deployment_info.config_changed(info)
  103. ) and config.initial_replicas is not None:
  104. target_num_replicas = config.initial_replicas
  105. else:
  106. target_num_replicas = curr_target_num_replicas
  107. self._deployment_info = info
  108. self._config = config
  109. # Apply default autoscaling config to the policy
  110. self._policy = _apply_autoscaling_config(self._config.policy.get_policy())
  111. self._target_capacity = info.target_capacity
  112. self._target_capacity_direction = info.target_capacity_direction
  113. self._policy_state = {}
  114. # Log when custom autoscaling policy is used for deployment
  115. if not self._config.policy.is_default_policy_function():
  116. logger.info(
  117. f"Using custom autoscaling policy '{self._config.policy.policy_function}' "
  118. f"for deployment '{self._deployment_id}'."
  119. )
  120. # Record telemetry for custom autoscaling policy usage
  121. ServeUsageTag.CUSTOM_AUTOSCALING_POLICY_USED.record("1")
  122. return self.apply_bounds(target_num_replicas)
  123. def on_replica_stopped(self, replica_id: ReplicaID):
  124. if replica_id in self._replica_metrics:
  125. del self._replica_metrics[replica_id]
  126. def get_num_replicas_lower_bound(self) -> int:
  127. if self._config.initial_replicas is not None and (
  128. self._target_capacity_direction == TargetCapacityDirection.UP
  129. ):
  130. return get_capacity_adjusted_num_replicas(
  131. self._config.initial_replicas,
  132. self._target_capacity,
  133. )
  134. else:
  135. return get_capacity_adjusted_num_replicas(
  136. self._config.min_replicas,
  137. self._target_capacity,
  138. )
  139. def get_num_replicas_upper_bound(self) -> int:
  140. return get_capacity_adjusted_num_replicas(
  141. self._config.max_replicas,
  142. self._target_capacity,
  143. )
  144. def update_running_replica_ids(self, running_replicas: List[ReplicaID]):
  145. """Update cached set of running replica IDs for this deployment."""
  146. self._running_replicas = running_replicas
  147. def record_scale_up(self):
  148. """Record a scale up event by updating the timestamp."""
  149. self._last_scale_up_time = time.time()
  150. def record_scale_down(self):
  151. """Record a scale down event by updating the timestamp."""
  152. self._last_scale_down_time = time.time()
  153. def is_within_bounds(self, num_replicas_running_at_target_version: int):
  154. """Whether or not this deployment is within the autoscaling bounds.
  155. Returns: True if the number of running replicas for the current
  156. deployment version is within the autoscaling bounds. False
  157. otherwise.
  158. """
  159. return (
  160. self.apply_bounds(num_replicas_running_at_target_version)
  161. == num_replicas_running_at_target_version
  162. )
  163. def apply_bounds(self, num_replicas: int) -> int:
  164. """Clips a replica count with current autoscaling bounds.
  165. This takes into account target capacity.
  166. """
  167. return max(
  168. self.get_num_replicas_lower_bound(),
  169. min(self.get_num_replicas_upper_bound(), num_replicas),
  170. )
  171. def record_request_metrics_for_replica(
  172. self, replica_metric_report: ReplicaMetricReport
  173. ) -> None:
  174. """Records average number of ongoing requests at a replica."""
  175. replica_id = replica_metric_report.replica_id
  176. send_timestamp = replica_metric_report.timestamp
  177. if (
  178. replica_id not in self._replica_metrics
  179. or send_timestamp > self._replica_metrics[replica_id].timestamp
  180. ):
  181. if self._latest_metrics_timestamp is None:
  182. self._latest_metrics_timestamp = send_timestamp
  183. else:
  184. self._latest_metrics_timestamp = max(
  185. self._latest_metrics_timestamp, send_timestamp
  186. )
  187. self._replica_metrics[replica_id] = replica_metric_report
  188. def record_request_metrics_for_handle(
  189. self,
  190. handle_metric_report: HandleMetricReport,
  191. ) -> None:
  192. """Records average number of queued and running requests at a handle for this
  193. deployment.
  194. """
  195. handle_id = handle_metric_report.handle_id
  196. send_timestamp = handle_metric_report.timestamp
  197. if (
  198. handle_id not in self._handle_requests
  199. or send_timestamp > self._handle_requests[handle_id].timestamp
  200. ):
  201. self._handle_requests[handle_id] = handle_metric_report
  202. if self._latest_metrics_timestamp is None:
  203. self._latest_metrics_timestamp = send_timestamp
  204. else:
  205. self._latest_metrics_timestamp = max(
  206. self._latest_metrics_timestamp, send_timestamp
  207. )
  208. def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]) -> None:
  209. """Drops handle metrics that are no longer valid.
  210. This includes handles that live on Serve Proxy or replica actors
  211. that have died AND handles from which the controller hasn't
  212. received an update for too long.
  213. """
  214. timeout_s = max(
  215. 2 * self._config.metrics_interval_s,
  216. RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S,
  217. )
  218. for handle_id, handle_metric in list(self._handle_requests.items()):
  219. # Drop metrics for handles that are on Serve proxy/replica
  220. # actors that have died
  221. if (
  222. handle_metric.is_serve_component_source
  223. and handle_metric.actor_id is not None
  224. and handle_metric.actor_id not in alive_serve_actor_ids
  225. ):
  226. del self._handle_requests[handle_id]
  227. if handle_metric.total_requests > 0:
  228. logger.debug(
  229. f"Dropping metrics for handle '{handle_id}' because the Serve "
  230. f"actor it was on ({handle_metric.actor_id}) is no longer "
  231. f"alive. It had {handle_metric.total_requests} ongoing requests"
  232. )
  233. # Drop metrics for handles that haven't sent an update in a while.
  234. # This is expected behavior for handles that were on replicas or
  235. # proxies that have been shut down.
  236. elif time.time() - handle_metric.timestamp >= timeout_s:
  237. del self._handle_requests[handle_id]
  238. if handle_metric.total_requests > 0:
  239. actor_id = handle_metric.actor_id
  240. actor_info = f"on actor '{actor_id}' " if actor_id else ""
  241. logger.info(
  242. f"Dropping stale metrics for handle '{handle_id}' {actor_info}"
  243. f"because no update was received for {timeout_s:.1f}s. "
  244. f"Ongoing requests was: {handle_metric.total_requests}."
  245. )
  246. def record_autoscaling_metrics(
  247. self,
  248. decision_num_replicas: int,
  249. total_num_requests: float,
  250. policy_execution_time_ms: float,
  251. policy_scope: str,
  252. ):
  253. tags = {
  254. "deployment": self._deployment_id.name,
  255. "application": self._deployment_id.app_name,
  256. }
  257. self.autoscaling_decision_gauge.set(decision_num_replicas, tags=tags)
  258. self.autoscaling_total_requests_gauge.set(total_num_requests, tags=tags)
  259. self.autoscaling_policy_execution_time_gauge.set(
  260. policy_execution_time_ms, tags={**tags, "policy_scope": policy_scope}
  261. )
  262. def get_decision_num_replicas(
  263. self, curr_target_num_replicas: int, _skip_bound_check: bool = False
  264. ) -> int:
  265. """Decide the target number of replicas to autoscale to.
  266. The decision is based off of the number of requests received
  267. for this deployment. After the decision number of replicas is
  268. returned by the policy, it is then bounded by the bounds min
  269. and max adjusted by the target capacity and returned. If
  270. `_skip_bound_check` is True, then the bounds are not applied.
  271. """
  272. if self._policy is None:
  273. raise ValueError(f"Policy is not set for deployment {self._deployment_id}.")
  274. autoscaling_context = self.get_autoscaling_context(curr_target_num_replicas)
  275. # Time the policy execution
  276. start_time = time.time()
  277. decision_num_replicas, self._policy_state = self._policy(autoscaling_context)
  278. # The policy can return a float value.
  279. if isinstance(decision_num_replicas, float):
  280. decision_num_replicas = math.ceil(decision_num_replicas)
  281. policy_execution_time_ms = (time.time() - start_time) * 1000
  282. self.record_autoscaling_metrics(
  283. decision_num_replicas,
  284. autoscaling_context.total_num_requests,
  285. policy_execution_time_ms,
  286. "deployment",
  287. )
  288. if _skip_bound_check:
  289. return decision_num_replicas
  290. decision_num_replicas = self.apply_bounds(decision_num_replicas)
  291. self._cached_deployment_snapshot = self._create_deployment_snapshot(
  292. ctx=autoscaling_context,
  293. target_replicas=decision_num_replicas,
  294. )
  295. return decision_num_replicas
  296. def get_autoscaling_context(
  297. self,
  298. curr_target_num_replicas,
  299. override_policy_state: Optional[Dict[str, Any]] = None,
  300. ) -> AutoscalingContext:
  301. # Adding this to overwrite policy state during application level autoscaling
  302. if override_policy_state is not None:
  303. current_policy_state = override_policy_state.copy()
  304. elif self._policy_state is not None:
  305. current_policy_state = self._policy_state.copy()
  306. else:
  307. current_policy_state = {}
  308. return AutoscalingContext(
  309. deployment_id=self._deployment_id,
  310. deployment_name=self._deployment_id.name,
  311. app_name=self._deployment_id.app_name,
  312. current_num_replicas=len(self._running_replicas),
  313. target_num_replicas=curr_target_num_replicas,
  314. running_replicas=self._running_replicas,
  315. total_num_requests=self.get_total_num_requests,
  316. capacity_adjusted_min_replicas=self.get_num_replicas_lower_bound(),
  317. capacity_adjusted_max_replicas=self.get_num_replicas_upper_bound(),
  318. policy_state=current_policy_state,
  319. current_time=time.time(),
  320. config=self._config,
  321. total_queued_requests=self._get_queued_requests,
  322. aggregated_metrics=self._get_aggregated_custom_metrics,
  323. raw_metrics=self._get_raw_custom_metrics,
  324. last_scale_up_time=self._last_scale_up_time,
  325. last_scale_down_time=self._last_scale_down_time,
  326. )
  327. def _collect_replica_running_requests(self) -> List[TimeSeries]:
  328. """Collect running requests timeseries from replicas for aggregation.
  329. Returns:
  330. List of timeseries data.
  331. """
  332. timeseries_list = []
  333. for replica_id in self._running_replicas:
  334. replica_metric_report = self._replica_metrics.get(replica_id, None)
  335. if (
  336. replica_metric_report is not None
  337. and RUNNING_REQUESTS_KEY in replica_metric_report.metrics
  338. ):
  339. timeseries_list.append(
  340. replica_metric_report.metrics[RUNNING_REQUESTS_KEY]
  341. )
  342. return timeseries_list
  343. def _collect_handle_queued_requests(self) -> List[TimeSeries]:
  344. """Collect queued requests timeseries from all handles.
  345. Returns:
  346. List of timeseries data.
  347. """
  348. timeseries_list = []
  349. for handle_metric_report in self._handle_requests.values():
  350. timeseries_list.append(handle_metric_report.queued_requests)
  351. return timeseries_list
  352. def _collect_handle_running_requests(self) -> List[TimeSeries]:
  353. """Collect running requests timeseries from handles when not collected on replicas.
  354. Returns:
  355. List of timeseries data.
  356. Example:
  357. If there are 2 handles, each managing 2 replicas, and the running requests metrics are:
  358. - Handle 1: Replica 1: 5, Replica 2: 7
  359. - Handle 2: Replica 1: 3, Replica 2: 1
  360. and the timestamp is 0.1 and 0.2 respectively
  361. Then the returned list will be:
  362. [
  363. [TimeStampedValue(timestamp=0.1, value=5.0)],
  364. [TimeStampedValue(timestamp=0.2, value=7.0)],
  365. [TimeStampedValue(timestamp=0.1, value=3.0)],
  366. [TimeStampedValue(timestamp=0.2, value=1.0)]
  367. ]
  368. """
  369. timeseries_list = []
  370. for handle_metric in self._handle_requests.values():
  371. for replica_id in self._running_replicas:
  372. if (
  373. RUNNING_REQUESTS_KEY not in handle_metric.metrics
  374. or replica_id not in handle_metric.metrics[RUNNING_REQUESTS_KEY]
  375. ):
  376. continue
  377. timeseries_list.append(
  378. handle_metric.metrics[RUNNING_REQUESTS_KEY][replica_id]
  379. )
  380. return timeseries_list
  381. def _merge_and_aggregate_timeseries(
  382. self,
  383. timeseries_list: List[TimeSeries],
  384. ) -> float:
  385. """Aggregate and average a metric from timeseries data using instantaneous merge.
  386. Args:
  387. timeseries_list: A list of TimeSeries (TimeSeries), where each
  388. TimeSeries represents measurements from a single source (replica, handle, etc.).
  389. Each list is sorted by timestamp ascending.
  390. Returns:
  391. The time-weighted average of the metric
  392. Example:
  393. If the timeseries_list is:
  394. [
  395. [
  396. TimeStampedValue(timestamp=0.1, value=5.0),
  397. TimeStampedValue(timestamp=0.2, value=7.0),
  398. ],
  399. [
  400. TimeStampedValue(timestamp=0.2, value=3.0),
  401. TimeStampedValue(timestamp=0.3, value=1.0),
  402. ]
  403. ]
  404. Then the returned value will be:
  405. (5.0*0.1 + 7.0*0.2 + 3.0*0.2 + 1.0*0.3) / (0.1 + 0.2 + 0.2 + 0.3) = 4.5 / 0.8 = 5.625
  406. """
  407. if not timeseries_list:
  408. return 0.0
  409. # Use instantaneous merge approach - no arbitrary windowing needed
  410. merged_timeseries = merge_instantaneous_total(timeseries_list)
  411. if merged_timeseries:
  412. # assume that the last recorded metric is valid for last_window_s seconds
  413. last_metric_time = merged_timeseries[-1].timestamp
  414. # we dont want to make any assumption about how long the last metric will be valid
  415. # only conclude that the last metric is valid for last_window_s seconds that is the
  416. # difference between the current time and the last metric recorded time
  417. last_window_s = time.time() - last_metric_time
  418. # adding a check to negative values caused by clock skew
  419. # between replicas and controller. Also add a small epsilon to avoid division by zero
  420. if last_window_s <= 0:
  421. last_window_s = 1e-3
  422. # Calculate the aggregated metric value
  423. value = aggregate_timeseries(
  424. merged_timeseries,
  425. aggregation_function=self._config.aggregation_function,
  426. last_window_s=last_window_s,
  427. )
  428. return value if value is not None else 0.0
  429. return 0.0
  430. def _calculate_total_requests_aggregate_mode(self) -> float:
  431. """Calculate total requests using aggregate metrics mode with timeseries data.
  432. This method works with raw timeseries metrics data and performs aggregation
  433. at the controller level, providing more accurate and stable metrics compared
  434. to simple mode.
  435. Processing Steps:
  436. 1. Collect raw timeseries data (eg: running request) from replicas (if available)
  437. 2. Collect queued requests from handles (always tracked at handle level)
  438. 3. Collect raw timeseries data (eg: running request) from handles (if not available from replicas)
  439. 4. Merge timeseries using instantaneous approach for mathematically correct totals
  440. 5. Calculate time-weighted average running requests from the merged timeseries
  441. Key Differences from Simple Mode:
  442. - Uses raw timeseries data instead of pre-aggregated metrics
  443. - Performs instantaneous merging for exact gauge semantics
  444. - Aggregates at the controller level rather than using pre-computed averages
  445. - Uses time-weighted averaging over the look_back_period_s interval for accurate calculations
  446. Metrics Collection:
  447. Running requests are collected with either replica-level or handle-level metrics.
  448. Queued requests are always collected from handles regardless of where
  449. running requests are collected.
  450. Timeseries Aggregation:
  451. Raw timeseries data from multiple sources is merged using an instantaneous
  452. approach that treats gauges as right-continuous step functions. This provides
  453. mathematically correct totals without arbitrary windowing bias.
  454. Example with Numbers:
  455. Assume metrics_interval_s = 0.5s, current time = 2.0s
  456. Step 1: Collect raw timeseries from 2 replicas (r1, r2)
  457. replica_metrics = [
  458. {"running_requests": [(t=0.2, val=5), (t=0.8, val=7), (t=1.5, val=6)]}, # r1
  459. {"running_requests": [(t=0.1, val=3), (t=0.9, val=4), (t=1.4, val=8)]} # r2
  460. ]
  461. Step 2: Collect queued requests from handles
  462. handle_queued = 2 + 3 = 5 # total from all handles
  463. Step 3: No handle metrics needed (replica metrics available)
  464. handle_metrics = []
  465. Step 4: Merge timeseries using instantaneous approach
  466. # Create delta events: r1 starts at 5 (t=0.2), changes to 7 (t=0.8), then 6 (t=1.5)
  467. # r2 starts at 3 (t=0.1), changes to 4 (t=0.9), then 8 (t=1.4)
  468. # Merged instantaneous total: [(t=0.1, val=3), (t=0.2, val=8), (t=0.8, val=10), (t=0.9, val=11), (t=1.4, val=15), (t=1.5, val=14)]
  469. merged_timeseries = {"running_requests": [(0.1, 3), (0.2, 8), (0.8, 10), (0.9, 11), (1.4, 15), (1.5, 14)]}
  470. Step 5: Calculate time-weighted average over full timeseries (t=0.1 to t=1.5+0.5=2.0)
  471. # Time-weighted calculation: (3*0.1 + 8*0.6 + 10*0.1 + 11*0.5 + 15*0.1 + 14*0.5) / 2.0 = 10.05
  472. avg_running = 10.05
  473. Final result: total_requests = avg_running + queued = 10.05 + 5 = 15.05
  474. Returns:
  475. Total number of requests (average running + queued) calculated from
  476. timeseries data aggregation.
  477. """
  478. # Collect replica-based running requests (returns List[TimeSeries])
  479. replica_timeseries = self._collect_replica_running_requests()
  480. metrics_collected_on_replicas = len(replica_timeseries) > 0
  481. # Collect queued requests from handles (returns List[TimeSeries])
  482. queued_timeseries = self._collect_handle_queued_requests()
  483. if not metrics_collected_on_replicas:
  484. # Collect handle-based running requests if not collected on replicas
  485. handle_timeseries = self._collect_handle_running_requests()
  486. else:
  487. handle_timeseries = []
  488. # Collect all timeseries for ongoing requests
  489. ongoing_requests_timeseries = []
  490. # Add replica timeseries
  491. ongoing_requests_timeseries.extend(replica_timeseries)
  492. # Add handle timeseries if replica metrics weren't collected
  493. if not metrics_collected_on_replicas:
  494. ongoing_requests_timeseries.extend(handle_timeseries)
  495. # Add queued timeseries
  496. ongoing_requests_timeseries.extend(queued_timeseries)
  497. # Aggregate and add running requests to total
  498. ongoing_requests = self._merge_and_aggregate_timeseries(
  499. ongoing_requests_timeseries
  500. )
  501. return ongoing_requests
  502. def _calculate_total_requests_simple_mode(self) -> float:
  503. """Calculate total requests using simple aggregated metrics mode.
  504. This method works with pre-aggregated metrics that are computed by averaging
  505. (or other functions) over the past look_back_period_s seconds.
  506. Metrics Collection:
  507. Metrics can be collected at two levels:
  508. 1. Replica level: Each replica reports one aggregated metric value
  509. 2. Handle level: Each handle reports metrics for multiple replicas
  510. Replica-Level Metrics Example:
  511. For 3 replicas (r1, r2, r3), metrics might look like:
  512. {
  513. "r1": 10,
  514. "r2": 20,
  515. "r3": 30
  516. }
  517. Total requests = 10 + 20 + 30 = 60
  518. Handle-Level Metrics Example:
  519. For 3 handles (h1, h2, h3), each managing 2 replicas:
  520. - h1 manages r1, r2
  521. - h2 manages r2, r3
  522. - h3 manages r3, r1
  523. Metrics structure:
  524. {
  525. "h1": {"r1": 10, "r2": 20},
  526. "h2": {"r2": 20, "r3": 30},
  527. "h3": {"r3": 30, "r1": 10}
  528. }
  529. Total requests = 10 + 20 + 20 + 30 + 30 + 10 = 120
  530. Note: We can safely sum all handle metrics because each unique request
  531. is counted only once across all handles (no double-counting).
  532. Queued Requests:
  533. Queued request metrics are always tracked at the handle level, regardless
  534. of whether running request metrics are collected at replicas or handles.
  535. Returns:
  536. Total number of requests (running + queued) across all replicas/handles.
  537. """
  538. total_requests = 0
  539. for id in self._running_replicas:
  540. if id in self._replica_metrics:
  541. total_requests += self._replica_metrics[id].aggregated_metrics.get(
  542. RUNNING_REQUESTS_KEY, 0
  543. )
  544. metrics_collected_on_replicas = total_requests > 0
  545. # Add handle metrics
  546. for handle_metric in self._handle_requests.values():
  547. total_requests += handle_metric.aggregated_queued_requests
  548. # Add running requests from handles if not collected on replicas
  549. if not metrics_collected_on_replicas:
  550. for replica_id in self._running_replicas:
  551. if replica_id in handle_metric.aggregated_metrics.get(
  552. RUNNING_REQUESTS_KEY, {}
  553. ):
  554. total_requests += handle_metric.aggregated_metrics.get(
  555. RUNNING_REQUESTS_KEY
  556. ).get(replica_id)
  557. return total_requests
  558. def get_total_num_requests(self) -> float:
  559. """Get average total number of requests aggregated over the past
  560. `look_back_period_s` number of seconds.
  561. If there are 0 running replicas, then returns the total number
  562. of requests queued at handles
  563. This code assumes that the metrics are either emmited on handles
  564. or on replicas, but not both. Its the responsibility of the writer
  565. to ensure enclusivity of the metrics.
  566. """
  567. if RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER:
  568. return self._calculate_total_requests_aggregate_mode()
  569. else:
  570. return self._calculate_total_requests_simple_mode()
  571. def _create_deployment_snapshot(
  572. self,
  573. *,
  574. ctx: AutoscalingContext,
  575. target_replicas: int,
  576. ) -> DeploymentSnapshot:
  577. """Create a fully-populated DeploymentSnapshot using data already available in
  578. AutoscalingState and the provided context.
  579. """
  580. current_replicas = ctx.current_num_replicas
  581. min_replicas = ctx.capacity_adjusted_min_replicas
  582. max_replicas = ctx.capacity_adjusted_max_replicas
  583. queued_requests = ctx.total_queued_requests
  584. if self._latest_metrics_timestamp is not None:
  585. time_since_last_collected_metrics_s = (
  586. time.time() - self._latest_metrics_timestamp
  587. )
  588. else:
  589. time_since_last_collected_metrics_s = None
  590. if target_replicas > current_replicas:
  591. scaling_status_raw = AutoscalingStatus.UPSCALE
  592. elif target_replicas < current_replicas:
  593. scaling_status_raw = AutoscalingStatus.DOWNSCALE
  594. else:
  595. scaling_status_raw = AutoscalingStatus.STABLE
  596. scaling_status = AutoscalingStatus.format_scaling_status(scaling_status_raw)
  597. look_back_period_s = self._config.look_back_period_s
  598. metrics_health = DeploymentSnapshot.format_metrics_health_text(
  599. time_since_last_collected_metrics_s=time_since_last_collected_metrics_s,
  600. look_back_period_s=look_back_period_s,
  601. )
  602. errors: List[str] = []
  603. if time_since_last_collected_metrics_s is None:
  604. errors.append(AutoscalingSnapshotError.METRICS_UNAVAILABLE)
  605. policy = ctx.config.policy.get_policy()
  606. policy_name_str = f"{policy.__module__}.{policy.__name__}"
  607. return DeploymentSnapshot(
  608. timestamp_str=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
  609. app=self._deployment_id.app_name,
  610. deployment=self._deployment_id.name,
  611. current_replicas=current_replicas,
  612. target_replicas=target_replicas,
  613. min_replicas=min_replicas,
  614. max_replicas=max_replicas,
  615. scaling_status=scaling_status,
  616. policy_name=policy_name_str,
  617. look_back_period_s=look_back_period_s,
  618. queued_requests=float(queued_requests),
  619. ongoing_requests=float(ctx.total_num_requests),
  620. metrics_health=metrics_health,
  621. errors=errors,
  622. )
  623. def get_deployment_snapshot(self) -> Optional[DeploymentSnapshot]:
  624. """
  625. Return the cached deployment snapshot if available.
  626. """
  627. return self._cached_deployment_snapshot
  628. def get_replica_metrics(self) -> Dict[ReplicaID, List[TimeSeries]]:
  629. """Get the raw replica metrics dict."""
  630. metric_values = defaultdict(list)
  631. for id in self._running_replicas:
  632. if id in self._replica_metrics and self._replica_metrics[id].metrics:
  633. for k, v in self._replica_metrics[id].metrics.items():
  634. metric_values[k].append(v)
  635. return metric_values
  636. def _get_queued_requests(self) -> float:
  637. """Calculate the total number of queued requests across all handles.
  638. Returns:
  639. Sum of queued requests at all handles. Uses aggregated values in simple mode,
  640. or aggregates timeseries data in aggregate mode.
  641. """
  642. if RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER:
  643. # Aggregate mode: collect and aggregate timeseries
  644. queued_timeseries = self._collect_handle_queued_requests()
  645. if not queued_timeseries:
  646. return 0.0
  647. return self._merge_and_aggregate_timeseries(queued_timeseries)
  648. else:
  649. # Simple mode: sum pre-aggregated values
  650. return sum(
  651. handle_metric.aggregated_queued_requests
  652. for handle_metric in self._handle_requests.values()
  653. )
  654. def _get_aggregated_custom_metrics(self) -> Dict[str, Dict[ReplicaID, float]]:
  655. """Aggregate custom metrics from replica metric reports.
  656. This method aggregates raw timeseries data from replicas on the controller,
  657. similar to how ongoing requests are aggregated.
  658. Returns:
  659. Dict mapping metric name to dict of replica ID to aggregated metric value.
  660. """
  661. aggregated_metrics = defaultdict(dict)
  662. for replica_id in self._running_replicas:
  663. replica_metric_report = self._replica_metrics.get(replica_id)
  664. if replica_metric_report is None:
  665. continue
  666. for metric_name, timeseries in replica_metric_report.metrics.items():
  667. # Aggregate the timeseries for this custom metric
  668. aggregated_value = self._merge_and_aggregate_timeseries([timeseries])
  669. aggregated_metrics[metric_name][replica_id] = aggregated_value
  670. return dict(aggregated_metrics)
  671. def _get_raw_custom_metrics(
  672. self,
  673. ) -> Dict[str, Dict[ReplicaID, TimeSeries]]:
  674. """Extract raw custom metric values from replica metric reports.
  675. Returns:
  676. Dict mapping metric name to dict of replica ID to raw metric timeseries.
  677. """
  678. raw_metrics = defaultdict(dict)
  679. for replica_id in self._running_replicas:
  680. replica_metric_report = self._replica_metrics.get(replica_id)
  681. if replica_metric_report is None:
  682. continue
  683. for metric_name, timeseries in replica_metric_report.metrics.items():
  684. # Extract values from TimeStampedValue list
  685. raw_metrics[metric_name][replica_id] = timeseries
  686. return dict(raw_metrics)
  687. class ApplicationAutoscalingState:
  688. """Manages autoscaling for a single application."""
  689. def __init__(
  690. self,
  691. app_name: ApplicationName,
  692. ):
  693. self._app_name = app_name
  694. self._deployment_autoscaling_states: Dict[
  695. DeploymentID, DeploymentAutoscalingState
  696. ] = {}
  697. self._policy: Optional[
  698. Callable[
  699. [Dict[DeploymentID, AutoscalingContext]],
  700. Tuple[
  701. Dict[DeploymentID, Union[int, float]],
  702. Optional[Dict[DeploymentID, Dict]],
  703. ],
  704. ]
  705. ] = None
  706. # user defined policy returns a dictionary of state that is persisted between autoscaling decisions
  707. # content of the dictionary is determined by the user defined policy but is keyed by deployment id
  708. self._policy_state: Optional[Dict[DeploymentID, Dict]] = None
  709. @property
  710. def deployments(self):
  711. return self._deployment_autoscaling_states.keys()
  712. def register(
  713. self,
  714. autoscaling_policy: AutoscalingPolicy,
  715. ):
  716. """Register or update application-level autoscaling config and deployments.
  717. This will overwrite the deployment-level policies with the application-level policy.
  718. Args:
  719. autoscaling_policy: The autoscaling policy to register.
  720. """
  721. # Apply default autoscaling config to the policy
  722. self._policy = _apply_app_level_autoscaling_config(
  723. autoscaling_policy.get_policy()
  724. )
  725. self._policy_state = {}
  726. # Log when custom autoscaling policy is used for application
  727. if not autoscaling_policy.is_default_policy_function():
  728. logger.info(
  729. f"Using custom autoscaling policy '{autoscaling_policy.policy_function}' "
  730. f"for application '{self._app_name}'."
  731. )
  732. # Record telemetry for custom autoscaling policy usage
  733. ServeUsageTag.CUSTOM_AUTOSCALING_POLICY_USED.record("1")
  734. def has_policy(self) -> bool:
  735. return self._policy is not None
  736. def register_deployment(
  737. self,
  738. deployment_id: DeploymentID,
  739. info: DeploymentInfo,
  740. curr_target_num_replicas: int,
  741. ) -> int:
  742. """Register a single deployment under this application."""
  743. if deployment_id not in self._deployment_autoscaling_states:
  744. self._deployment_autoscaling_states[
  745. deployment_id
  746. ] = DeploymentAutoscalingState(deployment_id)
  747. if info.deployment_config.autoscaling_config is None:
  748. raise ValueError(
  749. f"Autoscaling config is not set for deployment {deployment_id}"
  750. )
  751. # if the deployment-level policy is not the default policy, and the application has a policy,
  752. # warn the user that the application-level policy will take precedence
  753. if (
  754. not info.deployment_config.autoscaling_config.policy.is_default_policy_function()
  755. and self.has_policy()
  756. ):
  757. logger.warning(
  758. f"User provided both a deployment-level and an application-level policy for deployment {deployment_id}. "
  759. "The application-level policy will take precedence."
  760. )
  761. return self._deployment_autoscaling_states[deployment_id].register(
  762. info,
  763. curr_target_num_replicas,
  764. )
  765. def deregister_deployment(self, deployment_id: DeploymentID):
  766. if deployment_id not in self._deployment_autoscaling_states:
  767. logger.warning(
  768. f"Cannot deregister autoscaling state for deployment {deployment_id} because it is not registered"
  769. )
  770. return
  771. self._deployment_autoscaling_states.pop(deployment_id)
  772. def should_autoscale_deployment(self, deployment_id: DeploymentID):
  773. return deployment_id in self._deployment_autoscaling_states
  774. def _validate_policy_state(
  775. self, policy_state: Optional[Dict[DeploymentID, Dict[str, Any]]]
  776. ):
  777. """Validate that the returned policy_state from an application-level policy is correctly formatted."""
  778. if policy_state is None:
  779. return
  780. assert isinstance(
  781. policy_state, dict
  782. ), "Application-level autoscaling policy must return policy_state as Dict[DeploymentID, Dict[str, Any]]"
  783. # Check that all keys are valid deployment IDs
  784. for deployment_id in policy_state.keys():
  785. assert (
  786. deployment_id in self._deployment_autoscaling_states
  787. ), f"Policy state contains invalid deployment ID: {deployment_id}"
  788. assert isinstance(
  789. policy_state[deployment_id], dict
  790. ), f"Policy state for deployment {deployment_id} must be a dictionary, got {type(policy_state[deployment_id])}"
  791. def get_decision_num_replicas(
  792. self,
  793. deployment_to_target_num_replicas: Dict[DeploymentID, int],
  794. _skip_bound_check: bool = False,
  795. ) -> Dict[DeploymentID, int]:
  796. """
  797. Decide scaling for all deployments in this application by calling
  798. each deployment's autoscaling policy.
  799. """
  800. if self.has_policy():
  801. # Using app-level policy
  802. # TODO(nadongjun): App-level autoscaling bypasses per-deployment snapshot creation; add snapshot support here.
  803. autoscaling_contexts = {
  804. deployment_id: state.get_autoscaling_context(
  805. deployment_to_target_num_replicas[deployment_id],
  806. self._policy_state.get(deployment_id, {})
  807. if self._policy_state
  808. else {},
  809. )
  810. for deployment_id, state in self._deployment_autoscaling_states.items()
  811. }
  812. # Time the policy execution
  813. start_time = time.time()
  814. # Policy returns decisions: {deployment_id -> decision} and
  815. # policy state: {deployment_id -> Dict}
  816. decisions, returned_policy_state = self._policy(autoscaling_contexts)
  817. policy_execution_time_ms = (time.time() - start_time) * 1000
  818. # Validate returned policy_state
  819. self._validate_policy_state(returned_policy_state)
  820. self._policy_state = returned_policy_state
  821. # Validate returned decisions
  822. assert (
  823. type(decisions) is dict
  824. ), "Autoscaling policy must return a dictionary of deployment_name -> decision_num_replicas"
  825. # assert that deployment_id is in decisions is valid
  826. for deployment_id in decisions.keys():
  827. assert (
  828. deployment_id in self._deployment_autoscaling_states
  829. ), f"Deployment {deployment_id} is not registered"
  830. assert (
  831. deployment_id in deployment_to_target_num_replicas
  832. ), f"Deployment {deployment_id} is invalid"
  833. results = {}
  834. for deployment_id, num_replicas in decisions.items():
  835. deployment_autoscaling_state = self._deployment_autoscaling_states[
  836. deployment_id
  837. ]
  838. deployment_autoscaling_state.record_autoscaling_metrics(
  839. num_replicas,
  840. autoscaling_contexts[deployment_id].total_num_requests,
  841. policy_execution_time_ms,
  842. "application",
  843. )
  844. results[deployment_id] = (
  845. self._deployment_autoscaling_states[deployment_id].apply_bounds(
  846. math.ceil(num_replicas)
  847. )
  848. if not _skip_bound_check
  849. else math.ceil(num_replicas)
  850. )
  851. return results
  852. else:
  853. # Using deployment-level policy
  854. return {
  855. deployment_id: deployment_autoscaling_state.get_decision_num_replicas(
  856. curr_target_num_replicas=deployment_to_target_num_replicas[
  857. deployment_id
  858. ],
  859. _skip_bound_check=_skip_bound_check,
  860. )
  861. for deployment_id, deployment_autoscaling_state in self._deployment_autoscaling_states.items()
  862. }
  863. def update_running_replica_ids(
  864. self, deployment_id: DeploymentID, running_replicas: List[ReplicaID]
  865. ):
  866. self._deployment_autoscaling_states[deployment_id].update_running_replica_ids(
  867. running_replicas
  868. )
  869. def record_scale_up(self, deployment_id: DeploymentID):
  870. """Record a scale up event for a deployment."""
  871. if deployment_id in self._deployment_autoscaling_states:
  872. self._deployment_autoscaling_states[deployment_id].record_scale_up()
  873. def record_scale_down(self, deployment_id: DeploymentID):
  874. """Record a scale down event for a deployment."""
  875. if deployment_id in self._deployment_autoscaling_states:
  876. self._deployment_autoscaling_states[deployment_id].record_scale_down()
  877. def on_replica_stopped(self, replica_id: ReplicaID):
  878. dep_id = replica_id.deployment_id
  879. if dep_id in self._deployment_autoscaling_states:
  880. self._deployment_autoscaling_states[dep_id].on_replica_stopped(replica_id)
  881. def get_total_num_requests_for_deployment(
  882. self, deployment_id: DeploymentID
  883. ) -> float:
  884. return self._deployment_autoscaling_states[
  885. deployment_id
  886. ].get_total_num_requests()
  887. def get_replica_metrics_by_deployment_id(self, deployment_id: DeploymentID):
  888. return self._deployment_autoscaling_states[deployment_id].get_replica_metrics()
  889. def is_within_bounds(
  890. self, deployment_id: DeploymentID, num_replicas_running_at_target_version: int
  891. ) -> bool:
  892. return self._deployment_autoscaling_states[deployment_id].is_within_bounds(
  893. num_replicas_running_at_target_version
  894. )
  895. def record_request_metrics_for_replica(
  896. self, replica_metric_report: ReplicaMetricReport
  897. ):
  898. dep_id = replica_metric_report.replica_id.deployment_id
  899. # Defensively guard against delayed replica metrics arriving
  900. # after the deployment's been deleted
  901. if dep_id in self._deployment_autoscaling_states:
  902. self._deployment_autoscaling_states[
  903. dep_id
  904. ].record_request_metrics_for_replica(replica_metric_report)
  905. def record_request_metrics_for_handle(
  906. self, handle_metric_report: HandleMetricReport
  907. ):
  908. dep_id = handle_metric_report.deployment_id
  909. if dep_id in self._deployment_autoscaling_states:
  910. self._deployment_autoscaling_states[
  911. dep_id
  912. ].record_request_metrics_for_handle(handle_metric_report)
  913. def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]):
  914. """Drops handle metrics that are no longer valid.
  915. This includes handles that live on Serve Proxy or replica actors
  916. that have died AND handles from which the controller hasn't
  917. received an update for too long.
  918. """
  919. for dep_state in self._deployment_autoscaling_states.values():
  920. dep_state.drop_stale_handle_metrics(alive_serve_actor_ids)
  921. class AutoscalingStateManager:
  922. """Manages all things autoscaling related.
  923. Keeps track of request metrics for each application and its deployments,
  924. and decides on the target number of replicas to autoscale to.
  925. """
  926. def __init__(self):
  927. self._app_autoscaling_states: Dict[
  928. ApplicationName, ApplicationAutoscalingState
  929. ] = {}
  930. def register_deployment(
  931. self,
  932. deployment_id: DeploymentID,
  933. info: DeploymentInfo,
  934. curr_target_num_replicas: int,
  935. ) -> int:
  936. """Register autoscaling deployment info."""
  937. assert info.deployment_config.autoscaling_config
  938. app_name = deployment_id.app_name
  939. app_state = self._app_autoscaling_states.setdefault(
  940. app_name, ApplicationAutoscalingState(app_name)
  941. )
  942. logger.info(f"Registering autoscaling state for deployment {deployment_id}")
  943. return app_state.register_deployment(
  944. deployment_id, info, curr_target_num_replicas
  945. )
  946. def deregister_deployment(self, deployment_id: DeploymentID):
  947. """Remove deployment from tracking."""
  948. app_state = self._app_autoscaling_states.get(deployment_id.app_name)
  949. if app_state:
  950. logger.info(
  951. f"Deregistering autoscaling state for deployment {deployment_id}"
  952. )
  953. app_state.deregister_deployment(deployment_id)
  954. def register_application(
  955. self,
  956. app_name: ApplicationName,
  957. autoscaling_policy: AutoscalingPolicy,
  958. ):
  959. app_state = self._app_autoscaling_states.setdefault(
  960. app_name, ApplicationAutoscalingState(app_name)
  961. )
  962. logger.info(f"Registering autoscaling state for application {app_name}")
  963. app_state.register(autoscaling_policy)
  964. def deregister_application(self, app_name: ApplicationName):
  965. """Remove application from tracking."""
  966. if app_name in self._app_autoscaling_states:
  967. logger.info(f"Deregistering autoscaling state for application {app_name}")
  968. self._app_autoscaling_states.pop(app_name, None)
  969. def _application_has_policy(self, app_name: ApplicationName) -> bool:
  970. return (
  971. app_name in self._app_autoscaling_states
  972. and self._app_autoscaling_states[app_name].has_policy()
  973. )
  974. def get_decision_num_replicas(
  975. self,
  976. app_name: ApplicationName,
  977. deployment_to_target_num_replicas: Dict[DeploymentID, int],
  978. ) -> Dict[DeploymentID, int]:
  979. """
  980. Decide scaling for all deployments in the application.
  981. Args:
  982. app_name: The name of the application.
  983. deployment_to_target_num_replicas: A dictionary of deployment_id to target number of replicas.
  984. Returns:
  985. A dictionary of deployment_id to decision number of replicas.
  986. """
  987. return self._app_autoscaling_states[app_name].get_decision_num_replicas(
  988. deployment_to_target_num_replicas
  989. )
  990. def should_autoscale_application(self, app_name: ApplicationName):
  991. return app_name in self._app_autoscaling_states
  992. def should_autoscale_deployment(self, deployment_id: DeploymentID):
  993. return (
  994. deployment_id.app_name in self._app_autoscaling_states
  995. and self._app_autoscaling_states[
  996. deployment_id.app_name
  997. ].should_autoscale_deployment(deployment_id)
  998. )
  999. def update_running_replica_ids(
  1000. self, deployment_id: DeploymentID, running_replicas: List[ReplicaID]
  1001. ):
  1002. app_state = self._app_autoscaling_states.get(deployment_id.app_name)
  1003. if app_state:
  1004. app_state.update_running_replica_ids(deployment_id, running_replicas)
  1005. def record_scale_up(self, deployment_id: DeploymentID):
  1006. """Record a scale up event for a deployment.
  1007. Args:
  1008. deployment_id: The ID of the deployment being scaled up.
  1009. """
  1010. app_state = self._app_autoscaling_states.get(deployment_id.app_name)
  1011. if app_state:
  1012. app_state.record_scale_up(deployment_id)
  1013. def record_scale_down(self, deployment_id: DeploymentID):
  1014. """Record a scale down event for a deployment.
  1015. Args:
  1016. deployment_id: The ID of the deployment being scaled down.
  1017. """
  1018. app_state = self._app_autoscaling_states.get(deployment_id.app_name)
  1019. if app_state:
  1020. app_state.record_scale_down(deployment_id)
  1021. def on_replica_stopped(self, replica_id: ReplicaID):
  1022. app_state = self._app_autoscaling_states.get(replica_id.deployment_id.app_name)
  1023. if app_state:
  1024. app_state.on_replica_stopped(replica_id)
  1025. def get_metrics_for_deployment(
  1026. self, deployment_id: DeploymentID
  1027. ) -> Dict[ReplicaID, List[TimeSeries]]:
  1028. if deployment_id.app_name in self._app_autoscaling_states:
  1029. return self._app_autoscaling_states[
  1030. deployment_id.app_name
  1031. ].get_replica_metrics_by_deployment_id(deployment_id)
  1032. else:
  1033. return {}
  1034. def get_total_num_requests_for_deployment(
  1035. self, deployment_id: DeploymentID
  1036. ) -> float:
  1037. if deployment_id.app_name in self._app_autoscaling_states:
  1038. return self._app_autoscaling_states[
  1039. deployment_id.app_name
  1040. ].get_total_num_requests_for_deployment(deployment_id)
  1041. else:
  1042. return 0
  1043. def is_within_bounds(
  1044. self, deployment_id: DeploymentID, num_replicas_running_at_target_version: int
  1045. ) -> bool:
  1046. app_state = self._app_autoscaling_states[deployment_id.app_name]
  1047. return app_state.is_within_bounds(
  1048. deployment_id, num_replicas_running_at_target_version
  1049. )
  1050. def record_request_metrics_for_replica(
  1051. self, replica_metric_report: ReplicaMetricReport
  1052. ) -> None:
  1053. app_state = self._app_autoscaling_states.get(
  1054. replica_metric_report.replica_id.deployment_id.app_name
  1055. )
  1056. if app_state:
  1057. app_state.record_request_metrics_for_replica(replica_metric_report)
  1058. def record_request_metrics_for_handle(
  1059. self,
  1060. handle_metric_report: HandleMetricReport,
  1061. ) -> None:
  1062. """Update request metric for a specific handle."""
  1063. app_state = self._app_autoscaling_states.get(
  1064. handle_metric_report.deployment_id.app_name
  1065. )
  1066. if app_state:
  1067. app_state.record_request_metrics_for_handle(handle_metric_report)
  1068. def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]) -> None:
  1069. for app_state in self._app_autoscaling_states.values():
  1070. app_state.drop_stale_handle_metrics(alive_serve_actor_ids)
  1071. def get_deployment_snapshot(
  1072. self, deployment_id: DeploymentID
  1073. ) -> Optional[DeploymentSnapshot]:
  1074. app_state = self._app_autoscaling_states.get(deployment_id.app_name)
  1075. if not app_state:
  1076. return None
  1077. dep_state = app_state._deployment_autoscaling_states.get(deployment_id)
  1078. return dep_state.get_deployment_snapshot() if dep_state else None