autoscaling_policy.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. import functools
  2. import logging
  3. import math
  4. from typing import Any, Callable, Dict, Optional, Tuple, Union
  5. from ray.serve._private.common import DeploymentID
  6. from ray.serve._private.constants import (
  7. CONTROL_LOOP_INTERVAL_S,
  8. SERVE_AUTOSCALING_DECISION_COUNTERS_KEY,
  9. SERVE_LOGGER_NAME,
  10. )
  11. from ray.serve.config import AutoscalingConfig, AutoscalingContext
  12. from ray.util.annotations import PublicAPI
  13. logger = logging.getLogger(SERVE_LOGGER_NAME)
  14. def _apply_scaling_factors(
  15. desired_num_replicas: Union[int, float],
  16. current_num_replicas: int,
  17. autoscaling_config: AutoscalingConfig,
  18. ) -> int:
  19. """Apply scaling factors to the desired number of replicas.
  20. Returns the scaled number of replicas depending on the scaling factor.
  21. The computation uses the difference between desired and current to scale.
  22. """
  23. replicas_delta = desired_num_replicas - current_num_replicas
  24. scaling_factor = (
  25. autoscaling_config.get_upscaling_factor()
  26. if replicas_delta > 0
  27. else autoscaling_config.get_downscaling_factor()
  28. )
  29. scaled_num_replicas = math.ceil(
  30. current_num_replicas + scaling_factor * replicas_delta
  31. )
  32. # If the scaled_replicas are stuck during downscaling because of scaling factor, decrement by 1.
  33. if (
  34. math.ceil(float(desired_num_replicas)) < current_num_replicas
  35. and scaled_num_replicas == current_num_replicas
  36. ):
  37. scaled_num_replicas -= 1
  38. return scaled_num_replicas
  39. def _apply_delay_logic(
  40. desired_num_replicas: int,
  41. curr_target_num_replicas: int,
  42. config: AutoscalingConfig,
  43. policy_state: Dict[str, Any],
  44. ) -> Tuple[int, Dict[str, Any]]:
  45. """Apply delay logic to the desired number of replicas."""
  46. decision_num_replicas = curr_target_num_replicas
  47. decision_counter = policy_state.get(SERVE_AUTOSCALING_DECISION_COUNTERS_KEY, 0)
  48. # Scale up.
  49. if desired_num_replicas > curr_target_num_replicas:
  50. # If the previous decision was to scale down (the counter was
  51. # negative), we reset it and then increment it (set to 1).
  52. # Otherwise, just increment.
  53. if decision_counter < 0:
  54. decision_counter = 0
  55. decision_counter += 1
  56. # Only actually scale the replicas if we've made this decision for
  57. # 'scale_up_consecutive_periods' in a row.
  58. if decision_counter > int(config.upscale_delay_s / CONTROL_LOOP_INTERVAL_S):
  59. decision_counter = 0
  60. decision_num_replicas = desired_num_replicas
  61. # Scale down.
  62. elif desired_num_replicas < curr_target_num_replicas:
  63. # If the previous decision was to scale up (the counter was
  64. # positive), reset it to zero before decrementing.
  65. if decision_counter > 0:
  66. decision_counter = 0
  67. decision_counter -= 1
  68. # Downscaling to zero is only allowed from 1 -> 0
  69. is_scaling_to_zero = curr_target_num_replicas == 1
  70. # Determine the delay to use
  71. if is_scaling_to_zero:
  72. # Check if the downscale_to_zero_delay_s is set
  73. if config.downscale_to_zero_delay_s is not None:
  74. delay_s = config.downscale_to_zero_delay_s
  75. else:
  76. delay_s = config.downscale_delay_s
  77. else:
  78. delay_s = config.downscale_delay_s
  79. # The desired_num_replicas>0 for downscaling cases other than 1->0
  80. desired_num_replicas = max(1, desired_num_replicas)
  81. # Only actually scale the replicas if we've made this decision for
  82. # 'scale_down_consecutive_periods' in a row.
  83. if decision_counter < -int(delay_s / CONTROL_LOOP_INTERVAL_S):
  84. decision_counter = 0
  85. decision_num_replicas = desired_num_replicas
  86. # Do nothing.
  87. else:
  88. decision_counter = 0
  89. policy_state[SERVE_AUTOSCALING_DECISION_COUNTERS_KEY] = decision_counter
  90. return decision_num_replicas, policy_state
  91. def _apply_bounds(
  92. num_replicas: int,
  93. capacity_adjusted_min_replicas: int,
  94. capacity_adjusted_max_replicas: int,
  95. ) -> int:
  96. """Clip replica count to be within capacity-adjusted min/max bounds."""
  97. return max(
  98. capacity_adjusted_min_replicas,
  99. min(capacity_adjusted_max_replicas, num_replicas),
  100. )
  101. def _apply_default_params(
  102. desired_num_replicas: Union[int, float],
  103. ctx: AutoscalingContext,
  104. policy_state: Dict[str, Any],
  105. ) -> Tuple[int, Dict[str, Any]]:
  106. """Apply the default parameters to the desired number of replicas."""
  107. desired_num_replicas = _apply_scaling_factors(
  108. desired_num_replicas, ctx.current_num_replicas, ctx.config
  109. )
  110. # Apply bounds
  111. bounded_num_replicas = _apply_bounds(
  112. desired_num_replicas,
  113. ctx.capacity_adjusted_min_replicas,
  114. ctx.capacity_adjusted_max_replicas,
  115. )
  116. # Apply delay logic
  117. # Only send the internal state here to avoid overwriting the custom policy state.
  118. final_num_replicas, updated_state = _apply_delay_logic(
  119. bounded_num_replicas, ctx.target_num_replicas, ctx.config, policy_state
  120. )
  121. return final_num_replicas, updated_state
  122. def _apply_default_params_and_merge_state(
  123. policy_state: Dict[str, Any],
  124. user_policy_state: Dict[str, Any],
  125. desired_num_replicas: Union[int, float],
  126. ctx: AutoscalingContext,
  127. ) -> Tuple[int, Dict[str, Any]]:
  128. # Extract internal polciy state from policy_state
  129. internal_policy_state = {
  130. SERVE_AUTOSCALING_DECISION_COUNTERS_KEY: policy_state.get(
  131. SERVE_AUTOSCALING_DECISION_COUNTERS_KEY, 0
  132. )
  133. }
  134. # Only pass the internal state used for delay counters so we don't
  135. # overwrite any custom user state.
  136. final_num_replicas, updated_state = _apply_default_params(
  137. desired_num_replicas, ctx, internal_policy_state
  138. )
  139. # Merge internal updated_state with the user's custom policy state.
  140. if updated_state:
  141. user_policy_state.update(updated_state)
  142. return final_num_replicas, user_policy_state
  143. def _merge_user_state_with_internal_state(
  144. policy_state: Dict[str, Any],
  145. user_policy_state: Dict[str, Any],
  146. ) -> Dict[str, Any]:
  147. """Merge user state with previous policy state, preserving internal keys.
  148. This mutates and returns `user_policy_state`.
  149. """
  150. # Extract internal polciy state from policy_state
  151. internal_policy_state = {
  152. SERVE_AUTOSCALING_DECISION_COUNTERS_KEY: policy_state.get(
  153. SERVE_AUTOSCALING_DECISION_COUNTERS_KEY, 0
  154. )
  155. }
  156. user_policy_state.update(internal_policy_state)
  157. return user_policy_state
  158. def _get_cold_start_scale_up_replicas(ctx: AutoscalingContext) -> Optional[int]:
  159. """
  160. Returns the desired number of replicas if the cold start fast path applies, otherwise returns None.
  161. """
  162. if ctx.current_num_replicas == 0:
  163. if ctx.total_num_requests > 0:
  164. return max(
  165. math.ceil(1 * ctx.config.get_upscaling_factor()),
  166. ctx.target_num_replicas,
  167. )
  168. return ctx.target_num_replicas
  169. return None
  170. def _apply_autoscaling_config(
  171. policy_func: Callable[
  172. [AutoscalingContext], Tuple[Union[int, float], Dict[str, Any]]
  173. ]
  174. ) -> Callable[[AutoscalingContext], Tuple[int, Dict[str, Any]]]:
  175. """
  176. Wraps a custom policy function to automatically apply:
  177. - upscaling_factor / downscaling_factor
  178. - min_replicas / max_replicas bounds
  179. - upscale_delay_s / downscale_delay_s / downscale_to_zero_delay_s
  180. """
  181. @functools.wraps(policy_func)
  182. def wrapped_policy(ctx: AutoscalingContext) -> Tuple[int, Dict[str, Any]]:
  183. # Cold start fast path: 0 replicas bypasses delay logic for immediate scale-up
  184. cold_start_replicas = _get_cold_start_scale_up_replicas(ctx)
  185. if cold_start_replicas is not None:
  186. return cold_start_replicas, ctx.policy_state
  187. policy_state = ctx.policy_state.copy()
  188. desired_num_replicas, updated_custom_policy_state = policy_func(ctx)
  189. final_num_replicas, final_state = _apply_default_params_and_merge_state(
  190. policy_state, updated_custom_policy_state, desired_num_replicas, ctx
  191. )
  192. return final_num_replicas, final_state
  193. return wrapped_policy
  194. def _apply_app_level_autoscaling_config(
  195. policy_func: Callable[
  196. [Dict[DeploymentID, AutoscalingContext]],
  197. Tuple[
  198. Dict[DeploymentID, Union[int, float]],
  199. Optional[Dict[DeploymentID, Dict]],
  200. ],
  201. ]
  202. ) -> Callable[
  203. [Dict[DeploymentID, AutoscalingContext]],
  204. Tuple[Dict[DeploymentID, int], Dict[DeploymentID, Dict]],
  205. ]:
  206. """
  207. Wraps an application-level custom policy function to automatically apply per-deployment:
  208. - upscaling_factor / downscaling_factor
  209. - min_replicas / max_replicas bounds
  210. - upscale_delay_s / downscale_delay_s / downscale_to_zero_delay_s
  211. """
  212. @functools.wraps(policy_func)
  213. def wrapped_policy(
  214. contexts: Dict[DeploymentID, AutoscalingContext]
  215. ) -> Tuple[Dict[DeploymentID, int], Dict[DeploymentID, Dict]]:
  216. # Store the policy state per deployment
  217. state_per_deployment = {}
  218. for dep_id, ctx in contexts.items():
  219. state_per_deployment[dep_id] = ctx.policy_state.copy()
  220. # Send to the actual policy
  221. desired_num_replicas_dict, updated_custom_policy_state = policy_func(contexts)
  222. updated_custom_policy_state = updated_custom_policy_state or {}
  223. # Build per-deployment replicas count and state dictionary.
  224. final_decisions: Dict[DeploymentID, int] = {}
  225. final_state: Dict[DeploymentID, Dict] = {}
  226. for dep_id, ctx in contexts.items():
  227. if dep_id not in desired_num_replicas_dict:
  228. final_state[dep_id] = state_per_deployment[dep_id]
  229. continue
  230. custom_policy_state_per_deployment = updated_custom_policy_state.get(
  231. dep_id, {}
  232. )
  233. # Cold start fast path: 0 replicas bypasses delay logic for immediate scale-up
  234. cold_start_replicas = _get_cold_start_scale_up_replicas(ctx)
  235. if cold_start_replicas is not None:
  236. final_decisions[dep_id] = cold_start_replicas
  237. # Merge user policy state with internal policy state
  238. final_state[dep_id] = _merge_user_state_with_internal_state(
  239. state_per_deployment[dep_id],
  240. custom_policy_state_per_deployment,
  241. )
  242. continue
  243. final_num_replicas, final_dep_state = _apply_default_params_and_merge_state(
  244. state_per_deployment[dep_id],
  245. custom_policy_state_per_deployment,
  246. desired_num_replicas_dict[dep_id],
  247. ctx,
  248. )
  249. final_decisions[dep_id] = final_num_replicas
  250. final_state[dep_id] = final_dep_state
  251. return final_decisions, final_state
  252. return wrapped_policy
  253. def _core_replica_queue_length_policy(
  254. ctx: AutoscalingContext,
  255. ) -> Tuple[float, Dict[str, Any]]:
  256. num_running_replicas = ctx.current_num_replicas
  257. config = ctx.config
  258. if num_running_replicas == 0:
  259. raise ValueError("Number of replicas cannot be zero")
  260. target_num_requests = config.get_target_ongoing_requests() * num_running_replicas
  261. error_ratio = ctx.total_num_requests / target_num_requests
  262. desired_num_replicas = num_running_replicas * error_ratio
  263. return desired_num_replicas, {}
  264. @PublicAPI(stability="alpha")
  265. def replica_queue_length_autoscaling_policy(
  266. ctx: AutoscalingContext,
  267. ) -> Tuple[Union[int, float], Dict[str, Any]]:
  268. """The default autoscaling policy based on basic thresholds for scaling.
  269. There is a minimum threshold for the average queue length in the cluster
  270. to scale up and a maximum threshold to scale down. Each period, a 'scale
  271. up' or 'scale down' decision is made. This decision must be made for a
  272. specified number of periods in a row before the number of replicas is
  273. actually scaled. See config options for more details. Assumes
  274. `get_decision_num_replicas` is called once every CONTROL_LOOP_PERIOD_S
  275. seconds.
  276. """
  277. # Adding this guard makes the public policy safe to call directly.
  278. cold_start_replicas = _get_cold_start_scale_up_replicas(ctx)
  279. if cold_start_replicas is not None:
  280. return cold_start_replicas, ctx.policy_state
  281. return _core_replica_queue_length_policy(ctx)
  282. default_autoscaling_policy = replica_queue_length_autoscaling_policy