controller.py 71 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766
  1. import asyncio
  2. import logging
  3. import os
  4. import pickle
  5. import time
  6. from collections import defaultdict
  7. from typing import (
  8. Any,
  9. Dict,
  10. Iterable,
  11. List,
  12. Optional,
  13. Set,
  14. Tuple,
  15. Union,
  16. )
  17. import ray
  18. from ray._common.network_utils import build_address
  19. from ray._common.utils import run_background_task
  20. from ray._raylet import GcsClient
  21. from ray.actor import ActorHandle
  22. from ray.serve._private.application_state import ApplicationStateManager, StatusOverview
  23. from ray.serve._private.autoscaling_state import AutoscalingStateManager
  24. from ray.serve._private.common import (
  25. DeploymentID,
  26. DeploymentSnapshot,
  27. HandleMetricReport,
  28. NodeId,
  29. ReplicaMetricReport,
  30. RequestProtocol,
  31. RequestRoutingInfo,
  32. RunningReplicaInfo,
  33. TargetCapacityDirection,
  34. )
  35. from ray.serve._private.config import DeploymentConfig
  36. from ray.serve._private.constants import (
  37. CONTROL_LOOP_INTERVAL_S,
  38. RAY_SERVE_CONTROLLER_CALLBACK_IMPORT_PATH,
  39. RAY_SERVE_ENABLE_DIRECT_INGRESS,
  40. RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS,
  41. RECOVERING_LONG_POLL_BROADCAST_TIMEOUT_S,
  42. SERVE_CONTROLLER_NAME,
  43. SERVE_DEFAULT_APP_NAME,
  44. SERVE_LOGGER_NAME,
  45. SERVE_NAMESPACE,
  46. )
  47. from ray.serve._private.controller_health_metrics_tracker import (
  48. ControllerHealthMetricsTracker,
  49. )
  50. from ray.serve._private.default_impl import create_cluster_node_info_cache
  51. from ray.serve._private.deployment_info import DeploymentInfo
  52. from ray.serve._private.deployment_state import (
  53. DeploymentReplica,
  54. DeploymentStateManager,
  55. )
  56. from ray.serve._private.endpoint_state import EndpointState
  57. from ray.serve._private.exceptions import ExternalScalerDisabledError
  58. from ray.serve._private.grpc_util import set_proxy_default_grpc_options
  59. from ray.serve._private.http_util import (
  60. configure_http_options_with_defaults,
  61. )
  62. from ray.serve._private.logging_utils import (
  63. configure_autoscaling_snapshot_logger,
  64. configure_component_logger,
  65. configure_component_memory_profiler,
  66. get_component_logger_file_path,
  67. )
  68. from ray.serve._private.long_poll import LongPollHost, LongPollNamespace
  69. from ray.serve._private.node_port_manager import NodePortManager
  70. from ray.serve._private.proxy_state import ProxyStateManager
  71. from ray.serve._private.storage.kv_store import RayInternalKVStore
  72. from ray.serve._private.usage import ServeUsageTag
  73. from ray.serve._private.utils import (
  74. call_function_from_import_path,
  75. get_all_live_placement_group_names,
  76. get_head_node_id,
  77. is_grpc_enabled,
  78. )
  79. from ray.serve.config import DeploymentMode, HTTPOptions, ProxyLocation, gRPCOptions
  80. from ray.serve.generated.serve_pb2 import (
  81. ActorNameList,
  82. ApplicationArgs,
  83. DeploymentArgs,
  84. DeploymentRoute,
  85. EndpointInfo as EndpointInfoProto,
  86. EndpointSet,
  87. )
  88. from ray.serve.schema import (
  89. APIType,
  90. ApplicationDetails,
  91. DeploymentDetails,
  92. HTTPOptionsSchema,
  93. LoggingConfig,
  94. ProxyDetails,
  95. ReplicaDetails,
  96. ReplicaRank,
  97. ServeActorDetails,
  98. ServeApplicationSchema,
  99. ServeDeploySchema,
  100. ServeInstanceDetails,
  101. Target,
  102. TargetGroup,
  103. gRPCOptionsSchema,
  104. )
  105. from ray.util import metrics
  106. logger = logging.getLogger(SERVE_LOGGER_NAME)
  107. # Used for testing purposes only. If this is set, the controller will crash
  108. # after writing each checkpoint with the specified probability.
  109. _CRASH_AFTER_CHECKPOINT_PROBABILITY = 0
  110. CONFIG_CHECKPOINT_KEY = "serve-app-config-checkpoint"
  111. LOGGING_CONFIG_CHECKPOINT_KEY = "serve-logging-config-checkpoint"
  112. class ServeController:
  113. """Responsible for managing the state of the serving system.
  114. The controller implements fault tolerance by persisting its state in
  115. a new checkpoint each time a state change is made. If the actor crashes,
  116. the latest checkpoint is loaded and the state is recovered. Checkpoints
  117. are written/read using a provided KV-store interface.
  118. All hard state in the system is maintained by this actor and persisted via
  119. these checkpoints. Soft state required by other components is fetched by
  120. those actors from this actor on startup and updates are pushed out from
  121. this actor.
  122. All other actors started by the controller are named, detached actors
  123. so they will not fate share with the controller if it crashes.
  124. The following guarantees are provided for state-changing calls to the
  125. controller:
  126. - If the call succeeds, the change was made and will be reflected in
  127. the system even if the controller or other actors die unexpectedly.
  128. - If the call fails, the change may have been made but isn't guaranteed
  129. to have been. The client should retry in this case. Note that this
  130. requires all implementations here to be idempotent.
  131. """
  132. async def __init__(
  133. self,
  134. *,
  135. http_options: HTTPOptions,
  136. global_logging_config: LoggingConfig,
  137. grpc_options: Optional[gRPCOptions] = None,
  138. ):
  139. self._controller_node_id = ray.get_runtime_context().get_node_id()
  140. assert (
  141. self._controller_node_id == get_head_node_id()
  142. ), "Controller must be on the head node."
  143. self.ray_worker_namespace = ray.get_runtime_context().namespace
  144. self.gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address)
  145. kv_store_namespace = f"ray-serve-{self.ray_worker_namespace}"
  146. self.kv_store = RayInternalKVStore(kv_store_namespace, self.gcs_client)
  147. self.long_poll_host = LongPollHost()
  148. self.done_recovering_event = asyncio.Event()
  149. # Autoscaling snapshot logger
  150. self._autoscaling_logger: Optional[logging.Logger] = None
  151. # Try to read config from checkpoint
  152. # logging config from checkpoint take precedence over the one passed in
  153. # the constructor.
  154. self.global_logging_config = None
  155. log_config_checkpoint = self.kv_store.get(LOGGING_CONFIG_CHECKPOINT_KEY)
  156. if log_config_checkpoint is not None:
  157. global_logging_config = pickle.loads(log_config_checkpoint)
  158. self.reconfigure_global_logging_config(global_logging_config)
  159. configure_component_memory_profiler(
  160. component_name="controller", component_id=str(os.getpid())
  161. )
  162. if RAY_SERVE_CONTROLLER_CALLBACK_IMPORT_PATH:
  163. logger.info(
  164. "Calling user-provided callback from import path "
  165. f"{RAY_SERVE_CONTROLLER_CALLBACK_IMPORT_PATH}."
  166. )
  167. call_function_from_import_path(RAY_SERVE_CONTROLLER_CALLBACK_IMPORT_PATH)
  168. # Used to read/write checkpoints.
  169. self.cluster_node_info_cache = create_cluster_node_info_cache(self.gcs_client)
  170. self.cluster_node_info_cache.update()
  171. self._direct_ingress_enabled = RAY_SERVE_ENABLE_DIRECT_INGRESS
  172. if self._direct_ingress_enabled:
  173. logger.info(
  174. "Direct ingress is enabled in ServeController, enabling proxy "
  175. "on head node only."
  176. )
  177. http_options.location = DeploymentMode.HeadOnly
  178. # Configure proxy default HTTP and gRPC options.
  179. self.proxy_state_manager = ProxyStateManager(
  180. http_options=configure_http_options_with_defaults(http_options),
  181. head_node_id=self._controller_node_id,
  182. cluster_node_info_cache=self.cluster_node_info_cache,
  183. logging_config=self.global_logging_config,
  184. grpc_options=set_proxy_default_grpc_options(grpc_options),
  185. )
  186. # We modify the HTTP and gRPC options above, so delete them to avoid
  187. del http_options, grpc_options
  188. self.endpoint_state = EndpointState(self.kv_store, self.long_poll_host)
  189. # Fetch all running actors in current cluster as source of current
  190. # replica state for controller failure recovery
  191. all_current_actors = ray.util.list_named_actors(all_namespaces=True)
  192. all_serve_actor_names = [
  193. actor["name"]
  194. for actor in all_current_actors
  195. if actor["namespace"] == SERVE_NAMESPACE
  196. ]
  197. self.autoscaling_state_manager = AutoscalingStateManager()
  198. self.deployment_state_manager = DeploymentStateManager(
  199. self.kv_store,
  200. self.long_poll_host,
  201. all_serve_actor_names,
  202. get_all_live_placement_group_names(),
  203. self.cluster_node_info_cache,
  204. self.autoscaling_state_manager,
  205. )
  206. # Manage all applications' state
  207. self.application_state_manager = ApplicationStateManager(
  208. self.deployment_state_manager,
  209. self.autoscaling_state_manager,
  210. self.endpoint_state,
  211. self.kv_store,
  212. self.global_logging_config,
  213. )
  214. # Controller actor details
  215. self._actor_details = ServeActorDetails(
  216. node_id=ray.get_runtime_context().get_node_id(),
  217. node_ip=ray.util.get_node_ip_address(),
  218. node_instance_id=ray.util.get_node_instance_id(),
  219. actor_id=ray.get_runtime_context().get_actor_id(),
  220. actor_name=SERVE_CONTROLLER_NAME,
  221. worker_id=ray.get_runtime_context().get_worker_id(),
  222. log_file_path=get_component_logger_file_path(),
  223. )
  224. self._shutting_down = False
  225. self._shutdown_event = asyncio.Event()
  226. self._shutdown_start_time = None
  227. # Actors registered for cleanup on serve.shutdown(), keyed by actor ID
  228. self._registered_cleanup_actors: Dict[str, ActorHandle] = {}
  229. # Initialize health metrics tracker
  230. self._health_metrics_tracker = ControllerHealthMetricsTracker(
  231. controller_start_time=time.time()
  232. )
  233. self._create_control_loop_metrics()
  234. run_background_task(self.run_control_loop())
  235. # The target capacity percentage for all deployments across the cluster.
  236. self._target_capacity: Optional[float] = None
  237. self._target_capacity_direction: Optional[TargetCapacityDirection] = None
  238. self._recover_state_from_checkpoint()
  239. # Nodes where proxy actors should run.
  240. self._proxy_nodes = set()
  241. self._update_proxy_nodes()
  242. # Caches for autoscaling observability
  243. self._last_autoscaling_snapshots: Dict[DeploymentID, DeploymentSnapshot] = {}
  244. self._autoscaling_enabled_deployments_cache: List[
  245. Tuple[str, str, DeploymentDetails, Any]
  246. ] = []
  247. self._refresh_autoscaling_deployments_cache()
  248. self._last_broadcasted_target_groups: List[TargetGroup] = []
  249. def reconfigure_global_logging_config(self, global_logging_config: LoggingConfig):
  250. if (
  251. self.global_logging_config
  252. and self.global_logging_config == global_logging_config
  253. ):
  254. return
  255. self.kv_store.put(
  256. LOGGING_CONFIG_CHECKPOINT_KEY, pickle.dumps(global_logging_config)
  257. )
  258. self.global_logging_config = global_logging_config
  259. self.long_poll_host.notify_changed(
  260. {LongPollNamespace.GLOBAL_LOGGING_CONFIG: global_logging_config}
  261. )
  262. configure_component_logger(
  263. component_name="controller",
  264. component_id=str(os.getpid()),
  265. logging_config=global_logging_config,
  266. )
  267. self._autoscaling_logger = configure_autoscaling_snapshot_logger(
  268. component_id=str(os.getpid()),
  269. logging_config=global_logging_config,
  270. )
  271. logger.info(
  272. f"Controller starting (version='{ray.__version__}').",
  273. extra={"log_to_stderr": False},
  274. )
  275. logger.debug(
  276. "Configure the serve controller logger "
  277. f"with logging config: {self.global_logging_config}"
  278. )
  279. def check_alive(self) -> None:
  280. """No-op to check if this controller is alive."""
  281. return
  282. def get_pid(self) -> int:
  283. return os.getpid()
  284. def record_autoscaling_metrics_from_replica(
  285. self, replica_metric_report: ReplicaMetricReport
  286. ):
  287. latency = time.time() - replica_metric_report.timestamp
  288. latency_ms = latency * 1000
  289. # Record the metrics delay for observability
  290. self.replica_metrics_delay_gauge.set(
  291. latency_ms,
  292. tags={
  293. "deployment": replica_metric_report.replica_id.deployment_id.name,
  294. "application": replica_metric_report.replica_id.deployment_id.app_name,
  295. "replica": replica_metric_report.replica_id.unique_id,
  296. },
  297. )
  298. # Track in health metrics
  299. self._health_metrics_tracker.record_replica_metrics_delay(latency_ms)
  300. if latency_ms > RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS:
  301. logger.warning(
  302. f"Received autoscaling metrics from replica {replica_metric_report.replica_id} with timestamp {replica_metric_report.timestamp} "
  303. f"which is {latency_ms}ms ago. "
  304. f"This is greater than the warning threshold RPC latency of {RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS}ms. "
  305. "This may indicate a performance issue with the controller try increasing the RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS environment variable."
  306. )
  307. self.autoscaling_state_manager.record_request_metrics_for_replica(
  308. replica_metric_report
  309. )
  310. def record_autoscaling_metrics_from_handle(
  311. self, handle_metric_report: HandleMetricReport
  312. ):
  313. latency = time.time() - handle_metric_report.timestamp
  314. latency_ms = latency * 1000
  315. # Record the metrics delay for observability
  316. self.handle_metrics_delay_gauge.set(
  317. latency_ms,
  318. tags={
  319. "deployment": handle_metric_report.deployment_id.name,
  320. "application": handle_metric_report.deployment_id.app_name,
  321. "handle": handle_metric_report.handle_id,
  322. },
  323. )
  324. # Track in health metrics
  325. self._health_metrics_tracker.record_handle_metrics_delay(latency_ms)
  326. if latency_ms > RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS:
  327. logger.warning(
  328. f"Received autoscaling metrics from handle {handle_metric_report.handle_id} for deployment {handle_metric_report.deployment_id} with timestamp {handle_metric_report.timestamp} "
  329. f"which is {latency_ms}ms ago. "
  330. f"This is greater than the warning threshold RPC latency of {RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS}ms. "
  331. "This may indicate a performance issue with the controller try increasing the RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS environment variable."
  332. )
  333. self.autoscaling_state_manager.record_request_metrics_for_handle(
  334. handle_metric_report
  335. )
  336. def _get_total_num_requests_for_deployment_for_testing(
  337. self, deployment_id: DeploymentID
  338. ):
  339. return self.autoscaling_state_manager.get_total_num_requests_for_deployment(
  340. deployment_id
  341. )
  342. def _get_metrics_for_deployment_for_testing(self, deployment_id: DeploymentID):
  343. return self.autoscaling_state_manager.get_metrics_for_deployment(deployment_id)
  344. def _dump_replica_states_for_testing(self, deployment_id: DeploymentID):
  345. return self.deployment_state_manager._deployment_states[deployment_id]._replicas
  346. def _stop_one_running_replica_for_testing(self, deployment_id):
  347. self.deployment_state_manager._deployment_states[
  348. deployment_id
  349. ]._stop_one_running_replica_for_testing()
  350. async def listen_for_change(self, keys_to_snapshot_ids: Dict[str, int]):
  351. """Proxy long pull client's listen request.
  352. Args:
  353. keys_to_snapshot_ids (Dict[str, int]): Snapshot IDs are used to
  354. determine whether or not the host should immediately return the
  355. data or wait for the value to be changed.
  356. """
  357. if not self.done_recovering_event.is_set():
  358. await self.done_recovering_event.wait()
  359. return await self.long_poll_host.listen_for_change(keys_to_snapshot_ids)
  360. async def listen_for_change_java(self, keys_to_snapshot_ids_bytes: bytes):
  361. """Proxy long pull client's listen request.
  362. Args:
  363. keys_to_snapshot_ids_bytes (Dict[str, int]): the protobuf bytes of
  364. keys_to_snapshot_ids (Dict[str, int]).
  365. """
  366. if not self.done_recovering_event.is_set():
  367. await self.done_recovering_event.wait()
  368. return await self.long_poll_host.listen_for_change_java(
  369. keys_to_snapshot_ids_bytes
  370. )
  371. def get_all_endpoints(self) -> Dict[DeploymentID, Dict[str, Any]]:
  372. """Returns a dictionary of deployment name to config."""
  373. return self.endpoint_state.get_endpoints()
  374. def get_all_endpoints_java(self) -> bytes:
  375. """Returns a dictionary of deployment name to config."""
  376. endpoints = self.get_all_endpoints()
  377. # NOTE(zcin): Java only supports 1.x deployments, so only return
  378. # a dictionary of deployment name -> endpoint info
  379. data = {
  380. endpoint_tag.name: EndpointInfoProto(route=endpoint_dict["route"])
  381. for endpoint_tag, endpoint_dict in endpoints.items()
  382. }
  383. return EndpointSet(endpoints=data).SerializeToString()
  384. def get_proxies(self) -> Dict[NodeId, ActorHandle]:
  385. """Returns a dictionary of node ID to proxy actor handles."""
  386. if self.proxy_state_manager is None:
  387. return {}
  388. return self.proxy_state_manager.get_proxy_handles()
  389. def get_proxy_names(self) -> bytes:
  390. """Returns the proxy actor name list serialized by protobuf."""
  391. if self.proxy_state_manager is None:
  392. return None
  393. actor_name_list = ActorNameList(
  394. names=self.proxy_state_manager.get_proxy_names().values()
  395. )
  396. return actor_name_list.SerializeToString()
  397. def _update_proxy_nodes(self):
  398. """Update the nodes set where proxy actors should run.
  399. Controller decides where proxy actors should run
  400. (head node and nodes with deployment replicas).
  401. """
  402. new_proxy_nodes = self.deployment_state_manager.get_active_node_ids()
  403. new_proxy_nodes = new_proxy_nodes - set(
  404. self.cluster_node_info_cache.get_draining_nodes()
  405. )
  406. new_proxy_nodes.add(self._controller_node_id)
  407. self._proxy_nodes = new_proxy_nodes
  408. def _refresh_autoscaling_deployments_cache(self) -> None:
  409. result = []
  410. active_dep_ids = set()
  411. for app_name in self.application_state_manager.list_app_names():
  412. deployment_details = self.application_state_manager.list_deployment_details(
  413. app_name
  414. )
  415. for dep_name, details in deployment_details.items():
  416. active_dep_ids.add(DeploymentID(name=dep_name, app_name=app_name))
  417. autoscaling_config = details.deployment_config.autoscaling_config
  418. if autoscaling_config:
  419. result.append((app_name, dep_name, details, autoscaling_config))
  420. self._autoscaling_enabled_deployments_cache = result
  421. self._last_autoscaling_snapshots = {
  422. k: v
  423. for k, v in self._last_autoscaling_snapshots.items()
  424. if k in active_dep_ids
  425. }
  426. def _emit_deployment_autoscaling_snapshots(self) -> None:
  427. """Emit structured autoscaling snapshot logs in a single batch per loop."""
  428. if self._autoscaling_logger is None:
  429. return
  430. snapshots_to_log: List[Dict[str, Any]] = []
  431. for (
  432. app_name,
  433. dep_name,
  434. details,
  435. autoscaling_config,
  436. ) in self._autoscaling_enabled_deployments_cache:
  437. dep_id = DeploymentID(name=dep_name, app_name=app_name)
  438. deployment_snapshot = (
  439. self.autoscaling_state_manager.get_deployment_snapshot(dep_id)
  440. )
  441. if deployment_snapshot is None:
  442. continue
  443. last = self._last_autoscaling_snapshots.get(dep_id)
  444. if last is not None and last.is_scaling_equivalent(deployment_snapshot):
  445. continue
  446. snapshots_to_log.append(deployment_snapshot.dict(exclude_none=True))
  447. self._last_autoscaling_snapshots[dep_id] = deployment_snapshot
  448. if snapshots_to_log:
  449. # Single write per control-loop iteration
  450. self._autoscaling_logger.info({"snapshots": snapshots_to_log})
  451. async def run_control_loop(self) -> None:
  452. # NOTE(edoakes): we catch all exceptions here and simply log them,
  453. # because an unhandled exception would cause the main control loop to
  454. # halt, which should *never* happen.
  455. recovering_timeout = RECOVERING_LONG_POLL_BROADCAST_TIMEOUT_S
  456. num_loops = 0
  457. start_time = time.time()
  458. while True:
  459. loop_start_time = time.time()
  460. try:
  461. await self.run_control_loop_step(
  462. start_time, recovering_timeout, num_loops
  463. )
  464. except Exception as e:
  465. # we never expect this to happen, but adding this to be safe
  466. logger.exception(f"There was an exception in the control loop: {e}")
  467. await asyncio.sleep(1)
  468. loop_duration = time.time() - loop_start_time
  469. if loop_duration > 10:
  470. logger.warning(
  471. f"The last control loop was slow (took {loop_duration}s). "
  472. "This is likely caused by running a large number of "
  473. "replicas in a single Ray cluster. Consider using "
  474. "multiple Ray clusters.",
  475. extra={"log_to_stderr": False},
  476. )
  477. self.control_loop_duration_gauge_s.set(loop_duration)
  478. # Track in health metrics
  479. self._health_metrics_tracker.record_loop_duration(loop_duration)
  480. num_loops += 1
  481. self.num_control_loops_gauge.set(num_loops)
  482. self._health_metrics_tracker.num_control_loops = num_loops
  483. sleep_start_time = time.time()
  484. await asyncio.sleep(CONTROL_LOOP_INTERVAL_S)
  485. sleep_duration = time.time() - sleep_start_time
  486. self.sleep_duration_gauge_s.set(sleep_duration)
  487. self._health_metrics_tracker.last_sleep_duration_s = sleep_duration
  488. async def run_control_loop_step(
  489. self, start_time: float, recovering_timeout: float, num_loops: int
  490. ):
  491. try:
  492. self.cluster_node_info_cache.update()
  493. except Exception:
  494. logger.exception("Exception updating cluster node info cache.")
  495. if self._shutting_down:
  496. try:
  497. self.shutdown()
  498. except Exception:
  499. logger.exception("Exception during shutdown.")
  500. if (
  501. not self.done_recovering_event.is_set()
  502. and time.time() - start_time > recovering_timeout
  503. ):
  504. logger.warning(
  505. f"Replicas still recovering after {recovering_timeout}s, "
  506. "setting done recovering event to broadcast long poll updates."
  507. )
  508. self.done_recovering_event.set()
  509. # initialize any_recovering to None to indicate that we don't know if
  510. # we've recovered anything yet
  511. any_recovering: Optional[bool] = None
  512. try:
  513. dsm_update_start_time = time.time()
  514. any_recovering = self.deployment_state_manager.update()
  515. dsm_duration = time.time() - dsm_update_start_time
  516. self.dsm_update_duration_gauge_s.set(dsm_duration)
  517. self._health_metrics_tracker.record_dsm_update_duration(dsm_duration)
  518. if not self.done_recovering_event.is_set() and not any_recovering:
  519. self.done_recovering_event.set()
  520. if num_loops > 0:
  521. # Only log if we actually needed to recover anything.
  522. logger.info(
  523. "Finished recovering deployments after "
  524. f"{(time.time() - start_time):.2f}s.",
  525. extra={"log_to_stderr": False},
  526. )
  527. except Exception:
  528. logger.exception("Exception updating deployment state.")
  529. try:
  530. asm_update_start_time = time.time()
  531. any_target_state_changed = self.application_state_manager.update()
  532. if any_recovering or any_target_state_changed:
  533. self._refresh_autoscaling_deployments_cache()
  534. asm_duration = time.time() - asm_update_start_time
  535. self.asm_update_duration_gauge_s.set(asm_duration)
  536. self._health_metrics_tracker.record_asm_update_duration(asm_duration)
  537. except Exception:
  538. logger.exception("Exception updating application state.")
  539. try:
  540. # Emit one autoscaling snapshot per deployment per loop using existing state.
  541. self._emit_deployment_autoscaling_snapshots()
  542. except Exception:
  543. logger.exception("Exception emitting deployment autoscaling snapshots.")
  544. # Update the proxy nodes set before updating the proxy states,
  545. # so they are more consistent.
  546. node_update_start_time = time.time()
  547. self._update_proxy_nodes()
  548. node_update_duration = time.time() - node_update_start_time
  549. self.node_update_duration_gauge_s.set(node_update_duration)
  550. self._health_metrics_tracker.record_node_update_duration(node_update_duration)
  551. # Don't update proxy_state until after the done recovering event is set,
  552. # otherwise we may start a new proxy but not broadcast it any
  553. # info about available deployments & their replicas.
  554. if self.proxy_state_manager and self.done_recovering_event.is_set():
  555. try:
  556. proxy_update_start_time = time.time()
  557. self.proxy_state_manager.update(proxy_nodes=self._proxy_nodes)
  558. proxy_update_duration = time.time() - proxy_update_start_time
  559. self.proxy_update_duration_gauge_s.set(proxy_update_duration)
  560. self._health_metrics_tracker.record_proxy_update_duration(
  561. proxy_update_duration
  562. )
  563. except Exception:
  564. logger.exception("Exception updating proxy state.")
  565. # When the controller is done recovering, drop invalid handle metrics
  566. # that may be stale for autoscaling
  567. if any_recovering is False:
  568. self.autoscaling_state_manager.drop_stale_handle_metrics(
  569. self.deployment_state_manager.get_alive_replica_actor_ids()
  570. | self.proxy_state_manager.get_alive_proxy_actor_ids()
  571. )
  572. # Direct ingress port management
  573. if self._direct_ingress_enabled:
  574. # Update port values for ingress replicas.
  575. # Non-ingress replicas are not expected to have ports allocated.
  576. ingress_replicas_info_list: List[
  577. Tuple[str, str, int, int]
  578. ] = self.deployment_state_manager.get_ingress_replicas_info()
  579. NodePortManager.update_ports(ingress_replicas_info_list)
  580. # Clean up stale ports
  581. # get all alive replica ids and their node ids.
  582. NodePortManager.prune(self._get_node_id_to_alive_replica_ids())
  583. def _create_control_loop_metrics(self):
  584. self.node_update_duration_gauge_s = metrics.Gauge(
  585. "serve_controller_node_update_duration_s",
  586. description="The control loop time spent on collecting proxy node info.",
  587. )
  588. self.proxy_update_duration_gauge_s = metrics.Gauge(
  589. "serve_controller_proxy_state_update_duration_s",
  590. description="The control loop time spent on updating proxy state.",
  591. )
  592. self.dsm_update_duration_gauge_s = metrics.Gauge(
  593. "serve_controller_deployment_state_update_duration_s",
  594. description="The control loop time spent on updating deployment state.",
  595. )
  596. self.asm_update_duration_gauge_s = metrics.Gauge(
  597. "serve_controller_application_state_update_duration_s",
  598. description="The control loop time spent on updating application state.",
  599. )
  600. self.sleep_duration_gauge_s = metrics.Gauge(
  601. "serve_controller_sleep_duration_s",
  602. description="The duration of the last control loop's sleep.",
  603. )
  604. self.control_loop_duration_gauge_s = metrics.Gauge(
  605. "serve_controller_control_loop_duration_s",
  606. description="The duration of the last control loop.",
  607. )
  608. self.num_control_loops_gauge = metrics.Gauge(
  609. "serve_controller_num_control_loops",
  610. description=(
  611. "The number of control loops performed by the controller. "
  612. "Increases monotonically over the controller's lifetime."
  613. ),
  614. tag_keys=("actor_id",),
  615. )
  616. self.num_control_loops_gauge.set_default_tags(
  617. {"actor_id": ray.get_runtime_context().get_actor_id()}
  618. )
  619. # Autoscaling metrics delay gauges
  620. self.replica_metrics_delay_gauge = metrics.Gauge(
  621. "serve_autoscaling_replica_metrics_delay_ms",
  622. description=(
  623. "Time taken for the replica metrics to be reported to the controller. "
  624. "High values may indicate a busy controller."
  625. ),
  626. tag_keys=("deployment", "application", "replica"),
  627. )
  628. self.handle_metrics_delay_gauge = metrics.Gauge(
  629. "serve_autoscaling_handle_metrics_delay_ms",
  630. description=(
  631. "Time taken for the handle metrics to be reported to the controller. "
  632. "High values may indicate a busy controller."
  633. ),
  634. tag_keys=("deployment", "application", "handle"),
  635. )
  636. def _recover_state_from_checkpoint(self):
  637. (
  638. deployment_time,
  639. serve_config,
  640. target_capacity_direction,
  641. ) = self._read_config_checkpoint()
  642. self._target_capacity_direction = target_capacity_direction
  643. if serve_config is not None:
  644. logger.info(
  645. "Recovered config from checkpoint.", extra={"log_to_stderr": False}
  646. )
  647. self.apply_config(serve_config, deployment_time=deployment_time)
  648. def _read_config_checkpoint(
  649. self,
  650. ) -> Tuple[float, Optional[ServeDeploySchema], Optional[TargetCapacityDirection]]:
  651. """Reads the current Serve config checkpoint.
  652. The Serve config checkpoint stores active application configs and
  653. other metadata.
  654. Returns:
  655. If the GCS contains a checkpoint, tuple of:
  656. 1. A deployment timestamp.
  657. 2. A Serve config. This Serve config is reconstructed from the
  658. active application states. It may not exactly match the
  659. submitted config (e.g. the top-level http options may be
  660. different).
  661. 3. The target_capacity direction calculated after the Serve
  662. was submitted.
  663. If the GCS doesn't contain a checkpoint, returns (0, None, None).
  664. """
  665. checkpoint = self.kv_store.get(CONFIG_CHECKPOINT_KEY)
  666. if checkpoint is not None:
  667. (
  668. deployment_time,
  669. target_capacity,
  670. target_capacity_direction,
  671. config_checkpoints_dict,
  672. ) = pickle.loads(checkpoint)
  673. return (
  674. deployment_time,
  675. ServeDeploySchema(
  676. applications=list(config_checkpoints_dict.values()),
  677. target_capacity=target_capacity,
  678. ),
  679. target_capacity_direction,
  680. )
  681. else:
  682. return (0.0, None, None)
  683. def _all_running_replicas(self) -> Dict[DeploymentID, List[RunningReplicaInfo]]:
  684. """Used for testing.
  685. Returned dictionary maps deployment names to replica infos.
  686. """
  687. return self.deployment_state_manager.get_running_replica_infos()
  688. def get_actor_details(self) -> ServeActorDetails:
  689. """Returns the actor details for this controller.
  690. Currently used for test only.
  691. """
  692. return self._actor_details
  693. def get_health_metrics(self) -> Dict[str, Any]:
  694. """Returns comprehensive health metrics for the controller.
  695. This method provides detailed performance metrics to help diagnose
  696. controller health issues, especially as cluster size increases.
  697. Returns:
  698. Dictionary containing health metrics including:
  699. - Control loop performance (iteration speed, durations)
  700. - Event loop health (task count, scheduling delay)
  701. - Component update latencies
  702. - Autoscaling metrics latency (handle/replica)
  703. - Memory usage
  704. """
  705. try:
  706. return self._health_metrics_tracker.collect_metrics().dict()
  707. except Exception:
  708. logger.exception("Exception collecting controller health metrics.")
  709. raise
  710. def get_proxy_details(self, node_id: str) -> Optional[ProxyDetails]:
  711. """Returns the proxy details for the proxy on the given node.
  712. Currently used for test only. Will return None if the proxy doesn't exist on
  713. the given node.
  714. """
  715. if self.proxy_state_manager is None:
  716. return None
  717. return self.proxy_state_manager.get_proxy_details().get(node_id)
  718. def get_deployment_timestamps(self, app_name: str) -> float:
  719. """Returns the deployment timestamp for the given app.
  720. Currently used for test only.
  721. """
  722. for (
  723. _app_name,
  724. app_status_info,
  725. ) in self.application_state_manager.list_app_statuses().items():
  726. if app_name == _app_name:
  727. return app_status_info.deployment_timestamp
  728. def get_deployment_details(
  729. self, app_name: str, deployment_name: str
  730. ) -> DeploymentDetails:
  731. """Returns the deployment details for the app and deployment.
  732. Currently used for test only.
  733. """
  734. return self.application_state_manager.list_deployment_details(app_name)[
  735. deployment_name
  736. ]
  737. def get_http_config(self) -> HTTPOptions:
  738. """Return the HTTP proxy configuration."""
  739. if self.proxy_state_manager is None:
  740. return HTTPOptions()
  741. return self.proxy_state_manager.get_config()
  742. def get_grpc_config(self) -> gRPCOptions:
  743. """Return the gRPC proxy configuration."""
  744. if self.proxy_state_manager is None:
  745. return gRPCOptions()
  746. return self.proxy_state_manager.get_grpc_config()
  747. def get_root_url(self):
  748. """Return the root url for the serve instance."""
  749. if self.proxy_state_manager is None:
  750. return None
  751. http_config = self.get_http_config()
  752. if http_config.root_url == "":
  753. # HTTP is disabled
  754. if http_config.host is None:
  755. return ""
  756. return (
  757. f"http://{build_address(http_config.host, http_config.port)}"
  758. f"{http_config.root_path}"
  759. )
  760. return http_config.root_url
  761. def config_checkpoint_deleted(self) -> bool:
  762. """Returns whether the config checkpoint has been deleted.
  763. Get the config checkpoint from the kv store. If it is None, then it has been
  764. deleted.
  765. """
  766. return self.kv_store.get(CONFIG_CHECKPOINT_KEY) is None
  767. def _register_shutdown_cleanup_actor(self, actor_handle: ActorHandle) -> None:
  768. """Register an actor to be killed on serve.shutdown().
  769. This allows deployments to register auxiliary actors (like caches,
  770. coordinators, etc.) that should be cleaned up when Serve shuts down.
  771. The actors must use lifetime="detached" to survive replica restarts,
  772. but will be explicitly killed during serve.shutdown().
  773. Note: Registered actors are NOT persisted across controller restarts.
  774. For full persistence, use controller-managed deployment-scoped actors
  775. (see https://github.com/ray-project/ray/issues/60359).
  776. If the same actor is registered multiple times (e.g., from multiple
  777. router instances sharing a tree actor via get_if_exists=True), it will
  778. only be stored once.
  779. Args:
  780. actor_handle: The actor handle to register for cleanup.
  781. """
  782. actor_id = actor_handle._actor_id.hex()
  783. self._registered_cleanup_actors[actor_id] = actor_handle
  784. def _kill_registered_cleanup_actors(self) -> None:
  785. """Kill all actors registered for shutdown cleanup."""
  786. for actor in self._registered_cleanup_actors.values():
  787. try:
  788. ray.kill(actor, no_restart=True)
  789. except Exception:
  790. pass # Actor may already be dead
  791. def shutdown(self):
  792. """Shuts down the serve instance completely.
  793. This method will only be triggered when `self._shutting_down` is true. It
  794. deletes the kv store for config checkpoints, sets application state to deleting,
  795. delete all deployments, and shuts down all proxies. Once all these
  796. resources are released, it then kills the controller actor.
  797. """
  798. if not self._shutting_down:
  799. return
  800. if self._shutdown_start_time is None:
  801. self._shutdown_start_time = time.time()
  802. logger.info("Controller shutdown started.", extra={"log_to_stderr": False})
  803. self.kv_store.delete(CONFIG_CHECKPOINT_KEY)
  804. self.kv_store.delete(LOGGING_CONFIG_CHECKPOINT_KEY)
  805. self.application_state_manager.shutdown()
  806. self.deployment_state_manager.shutdown()
  807. self.endpoint_state.shutdown()
  808. if self.proxy_state_manager:
  809. self.proxy_state_manager.shutdown()
  810. config_checkpoint_deleted = self.config_checkpoint_deleted()
  811. application_is_shutdown = self.application_state_manager.is_ready_for_shutdown()
  812. deployment_is_shutdown = self.deployment_state_manager.is_ready_for_shutdown()
  813. endpoint_is_shutdown = self.endpoint_state.is_ready_for_shutdown()
  814. proxy_state_is_shutdown = (
  815. self.proxy_state_manager is None
  816. or self.proxy_state_manager.is_ready_for_shutdown()
  817. )
  818. if (
  819. config_checkpoint_deleted
  820. and application_is_shutdown
  821. and deployment_is_shutdown
  822. and endpoint_is_shutdown
  823. and proxy_state_is_shutdown
  824. ):
  825. self._kill_registered_cleanup_actors()
  826. logger.warning(
  827. "All resources have shut down, controller exiting.",
  828. extra={"log_to_stderr": False},
  829. )
  830. _controller_actor = ray.get_runtime_context().current_actor
  831. ray.kill(_controller_actor, no_restart=True)
  832. elif time.time() - self._shutdown_start_time > 10:
  833. if not config_checkpoint_deleted:
  834. logger.warning(
  835. f"{CONFIG_CHECKPOINT_KEY} not yet deleted",
  836. extra={"log_to_stderr": False},
  837. )
  838. if not application_is_shutdown:
  839. logger.warning(
  840. "application not yet shutdown",
  841. extra={"log_to_stderr": False},
  842. )
  843. if not deployment_is_shutdown:
  844. logger.warning(
  845. "deployment not yet shutdown",
  846. extra={"log_to_stderr": False},
  847. )
  848. if not endpoint_is_shutdown:
  849. logger.warning(
  850. "endpoint not yet shutdown",
  851. extra={"log_to_stderr": False},
  852. )
  853. if not proxy_state_is_shutdown:
  854. logger.warning(
  855. "proxy_state not yet shutdown",
  856. extra={"log_to_stderr": False},
  857. )
  858. def deploy_applications(
  859. self,
  860. name_to_deployment_args_list: Dict[str, List[bytes]],
  861. name_to_application_args: Dict[str, bytes],
  862. ) -> None:
  863. """
  864. Takes in a list of dictionaries that contain deployment arguments.
  865. If same app name deployed, old application will be overwritten.
  866. Args:
  867. name: Application name.
  868. deployment_args_list: List of serialized deployment information,
  869. where each item in the list is bytes representing the serialized
  870. protobuf `DeploymentArgs` object. `DeploymentArgs` contains all the
  871. information for the single deployment.
  872. name_to_application_args: Dictionary mapping application names to serialized
  873. application arguments, where each item is bytes representing the serialized
  874. protobuf `ApplicationArgs` object. `ApplicationArgs` contains the information
  875. for the application.
  876. """
  877. name_to_deployment_args = {}
  878. for name, deployment_args_list in name_to_deployment_args_list.items():
  879. deployment_args_deserialized = []
  880. for deployment_args_bytes in deployment_args_list:
  881. args = DeploymentArgs.FromString(deployment_args_bytes)
  882. deployment_args_deserialized.append(
  883. {
  884. "deployment_name": args.deployment_name,
  885. "deployment_config_proto_bytes": args.deployment_config,
  886. "replica_config_proto_bytes": args.replica_config,
  887. "deployer_job_id": args.deployer_job_id,
  888. "ingress": args.ingress,
  889. "route_prefix": (
  890. args.route_prefix if args.HasField("route_prefix") else None
  891. ),
  892. }
  893. )
  894. name_to_deployment_args[name] = deployment_args_deserialized
  895. name_to_application_args_deserialized = {}
  896. for name, application_args_bytes in name_to_application_args.items():
  897. name_to_application_args_deserialized[name] = ApplicationArgs.FromString(
  898. application_args_bytes
  899. )
  900. self.application_state_manager.deploy_apps(
  901. name_to_deployment_args, name_to_application_args_deserialized
  902. )
  903. self.application_state_manager.save_checkpoint()
  904. def deploy_application(
  905. self,
  906. name: str,
  907. deployment_args_list: List[bytes],
  908. application_args: bytes,
  909. ) -> None:
  910. """
  911. Deploy a single application
  912. (as deploy_applications(), but it only takes a single name and deployment args).
  913. This primarily exists as a shim to avoid
  914. changing Java code in https://github.com/ray-project/ray/pull/49168,
  915. and could be removed if the Java code was refactored
  916. to use the new bulk deploy_applications API.
  917. """
  918. self.deploy_applications(
  919. {name: deployment_args_list},
  920. {name: application_args},
  921. )
  922. def apply_config(
  923. self,
  924. config: ServeDeploySchema,
  925. deployment_time: float = 0.0,
  926. ) -> None:
  927. """Apply the config described in `ServeDeploySchema`.
  928. This will upgrade the applications to the goal state specified in the
  929. config.
  930. If `deployment_time` is not provided, `time.time()` is used.
  931. """
  932. ServeUsageTag.API_VERSION.record("v2")
  933. if not deployment_time:
  934. deployment_time = time.time()
  935. new_config_checkpoint = {}
  936. _, curr_config, _ = self._read_config_checkpoint()
  937. self._target_capacity_direction = calculate_target_capacity_direction(
  938. curr_config=curr_config,
  939. new_config=config,
  940. curr_target_capacity_direction=self._target_capacity_direction,
  941. )
  942. log_target_capacity_change(
  943. self._target_capacity,
  944. config.target_capacity,
  945. self._target_capacity_direction,
  946. )
  947. self._target_capacity = config.target_capacity
  948. for app_config in config.applications:
  949. # If the application logging config is not set, use the global logging
  950. # config.
  951. if app_config.logging_config is None and config.logging_config:
  952. app_config.logging_config = config.logging_config
  953. app_config_dict = app_config.dict(exclude_unset=True)
  954. new_config_checkpoint[app_config.name] = app_config_dict
  955. self.kv_store.put(
  956. CONFIG_CHECKPOINT_KEY,
  957. pickle.dumps(
  958. (
  959. deployment_time,
  960. self._target_capacity,
  961. self._target_capacity_direction,
  962. new_config_checkpoint,
  963. )
  964. ),
  965. )
  966. # Declaratively apply the new set of applications.
  967. # This will delete any applications no longer in the config that were
  968. # previously deployed via the REST API.
  969. self.application_state_manager.apply_app_configs(
  970. config.applications,
  971. deployment_time=deployment_time,
  972. target_capacity=self._target_capacity,
  973. target_capacity_direction=self._target_capacity_direction,
  974. )
  975. self.application_state_manager.save_checkpoint()
  976. def get_deployment_info(self, name: str, app_name: str = "") -> bytes:
  977. """Get the current information about a deployment.
  978. Args:
  979. name: the name of the deployment.
  980. Returns:
  981. DeploymentRoute's protobuf serialized bytes
  982. Raises:
  983. KeyError: If the deployment doesn't exist.
  984. """
  985. id = DeploymentID(name=name, app_name=app_name)
  986. deployment_info = self.deployment_state_manager.get_deployment(id)
  987. if deployment_info is None:
  988. app_msg = f" in application '{app_name}'" if app_name else ""
  989. raise KeyError(f"Deployment '{name}' does not exist{app_msg}.")
  990. route = self.endpoint_state.get_endpoint_route(id)
  991. deployment_route = DeploymentRoute(
  992. deployment_info=deployment_info.to_proto(), route=route
  993. )
  994. return deployment_route.SerializeToString()
  995. def list_deployments_internal(
  996. self,
  997. ) -> Dict[DeploymentID, Tuple[DeploymentInfo, str]]:
  998. """Gets the current information about all deployments.
  999. Returns:
  1000. Dict(deployment_id, (DeploymentInfo, route))
  1001. """
  1002. return {
  1003. id: (info, self.endpoint_state.get_endpoint_route(id))
  1004. for id, info in self.deployment_state_manager.get_deployment_infos().items()
  1005. }
  1006. def get_deployment_config(
  1007. self, deployment_id: DeploymentID
  1008. ) -> Optional[DeploymentConfig]:
  1009. """Get the deployment config for the given deployment id.
  1010. Args:
  1011. deployment_id: The deployment id to get the config for.
  1012. Returns:
  1013. A deployment config object if the deployment id exist,
  1014. None otherwise.
  1015. """
  1016. deployment_info = self.deployment_state_manager.get_deployment_infos().get(
  1017. deployment_id
  1018. )
  1019. return deployment_info.deployment_config if deployment_info else None
  1020. def list_deployment_ids(self) -> List[DeploymentID]:
  1021. """Gets the current list of all deployments' identifiers."""
  1022. return self.deployment_state_manager._deployment_states.keys()
  1023. def update_deployment_replicas(
  1024. self, deployment_id: DeploymentID, target_num_replicas: int
  1025. ) -> None:
  1026. """Update the target number of replicas for a deployment.
  1027. Args:
  1028. deployment_id: The deployment to update.
  1029. target_num_replicas: The new target number of replicas.
  1030. Raises:
  1031. ExternalScalerDisabledError: If external_scaler_enabled is set to False for the application.
  1032. """
  1033. # Check if external scaler is enabled for this application
  1034. app_name = deployment_id.app_name
  1035. if not self.application_state_manager.does_app_exist(app_name):
  1036. raise ValueError(f"Application '{app_name}' not found")
  1037. if not self.application_state_manager.get_external_scaler_enabled(app_name):
  1038. raise ExternalScalerDisabledError(
  1039. f"Cannot update replicas for deployment '{deployment_id.name}' in "
  1040. f"application '{app_name}'. The external scaling API can only be used "
  1041. f"when 'external_scaler_enabled' is set to true in the application "
  1042. f"configuration. Current value: external_scaler_enabled=false. "
  1043. f"To use this API, redeploy your application with "
  1044. f"'external_scaler_enabled: true' in the config."
  1045. )
  1046. self.deployment_state_manager.set_target_num_replicas(
  1047. deployment_id, target_num_replicas
  1048. )
  1049. def get_serve_instance_details(self, source: Optional[APIType] = None) -> Dict:
  1050. """Gets details on all applications on the cluster and system-level info.
  1051. The information includes application and deployment statuses, config options,
  1052. error messages, etc.
  1053. Args:
  1054. source: If provided, returns application
  1055. statuses for applications matching this API type.
  1056. Defaults to None, which means all applications are returned.
  1057. Returns:
  1058. Dict that follows the format of the schema ServeInstanceDetails.
  1059. """
  1060. http_config = self.get_http_config()
  1061. grpc_config = self.get_grpc_config()
  1062. applications = {}
  1063. app_statuses = self.application_state_manager.list_app_statuses(source=source)
  1064. # If there are no app statuses, there's no point getting the app configs.
  1065. # Moreover, there might be no app statuses because the GCS is down,
  1066. # in which case getting the app configs would fail anyway,
  1067. # since they're stored in the checkpoint in the GCS.
  1068. app_configs = self.get_app_configs() if app_statuses else {}
  1069. for (
  1070. app_name,
  1071. app_status_info,
  1072. ) in app_statuses.items():
  1073. applications[app_name] = ApplicationDetails(
  1074. name=app_name,
  1075. route_prefix=self.application_state_manager.get_route_prefix(app_name),
  1076. docs_path=self.get_docs_path(app_name),
  1077. status=app_status_info.status,
  1078. message=app_status_info.message,
  1079. last_deployed_time_s=app_status_info.deployment_timestamp,
  1080. # This can be none if the app was deployed through
  1081. # serve.run, the app is in deleting state,
  1082. # or a checkpoint hasn't been set yet
  1083. deployed_app_config=app_configs.get(app_name),
  1084. source=self.application_state_manager.get_app_source(app_name),
  1085. deployments=self.application_state_manager.list_deployment_details(
  1086. app_name
  1087. ),
  1088. external_scaler_enabled=self.application_state_manager.get_external_scaler_enabled(
  1089. app_name
  1090. ),
  1091. deployment_topology=self.application_state_manager.get_deployment_topology(
  1092. app_name
  1093. ),
  1094. )
  1095. # NOTE(zcin): We use exclude_unset here because we explicitly and intentionally
  1096. # fill in all info that should be shown to users.
  1097. http_options = HTTPOptionsSchema.parse_obj(http_config.dict(exclude_unset=True))
  1098. grpc_options = gRPCOptionsSchema.parse_obj(grpc_config.dict(exclude_unset=True))
  1099. return ServeInstanceDetails(
  1100. target_capacity=self._target_capacity,
  1101. controller_info=self._actor_details,
  1102. proxy_location=ProxyLocation._from_deployment_mode(http_config.location),
  1103. http_options=http_options,
  1104. grpc_options=grpc_options,
  1105. proxies=(
  1106. self.proxy_state_manager.get_proxy_details()
  1107. if self.proxy_state_manager
  1108. else None
  1109. ),
  1110. applications=applications,
  1111. target_groups=self.get_target_groups(),
  1112. )._get_user_facing_json_serializable_dict(exclude_unset=True)
  1113. def _get_proxy_target_groups(self) -> List[TargetGroup]:
  1114. """Get target groups for proxy-based routing."""
  1115. target_groups: List[TargetGroup] = []
  1116. if self.proxy_state_manager.get_proxy_details():
  1117. # setting prefix route to "/" because in ray serve, proxy
  1118. # accepts requests from the client and routes them to the
  1119. # correct application. This is true for both HTTP and gRPC proxies.
  1120. target_groups.append(
  1121. TargetGroup(
  1122. protocol=RequestProtocol.HTTP,
  1123. route_prefix="/",
  1124. targets=self.proxy_state_manager.get_targets(RequestProtocol.HTTP),
  1125. )
  1126. )
  1127. if is_grpc_enabled(self.get_grpc_config()):
  1128. target_groups.append(
  1129. TargetGroup(
  1130. protocol=RequestProtocol.GRPC,
  1131. route_prefix="/",
  1132. targets=self.proxy_state_manager.get_targets(
  1133. RequestProtocol.GRPC
  1134. ),
  1135. )
  1136. )
  1137. return target_groups
  1138. def get_target_groups(
  1139. self,
  1140. app_name: Optional[str] = None,
  1141. from_proxy_manager: bool = False,
  1142. ) -> List[TargetGroup]:
  1143. """Get target groups for direct ingress deployments.
  1144. This returns target groups that point directly to replica ports
  1145. rather than proxy ports when direct ingress is enabled.
  1146. Following situations are possible:
  1147. 1. Direct ingress is not enabled. In this case, we just return the
  1148. target groups from the proxy implementation.
  1149. 2. Direct ingress is enabled and there are no applications. In this case,
  1150. we return target groups for proxy. Serve controller is running but there
  1151. are no applications to route traffic to.
  1152. 3. Direct ingress is enabled and there are applications. All applications
  1153. have atleast one running replica. In this case, we return target groups
  1154. for all applications with targets pointing to the running replicas.
  1155. 4. Direct ingress is enabled and there are applications. Some applications
  1156. have no running replicas. In this case, for applications that have no
  1157. running replicas, we return target groups for proxy and for applications
  1158. that have running replicas, we return target groups for direct ingress.
  1159. If there are multiple applications with no running replicas, we return
  1160. one target group per application with unique route prefix.
  1161. """
  1162. proxy_target_groups = self._get_proxy_target_groups()
  1163. if not self._direct_ingress_enabled:
  1164. return proxy_target_groups
  1165. # Get all applications and their metadata
  1166. if app_name is None:
  1167. apps = [
  1168. _app_name
  1169. for _app_name, _ in self.application_state_manager.list_app_statuses().items()
  1170. ]
  1171. else:
  1172. apps = [app_name]
  1173. # TODO(landscapepainter): A better way to handle this is to write an API that can tell
  1174. # if the ingress deployment is healthy regardless of the application status.
  1175. apps = [
  1176. app
  1177. for app in apps
  1178. if self.application_state_manager.get_route_prefix(app) is not None
  1179. ]
  1180. if not apps:
  1181. return proxy_target_groups
  1182. # Create target groups for each application
  1183. target_groups = []
  1184. for app_name in apps:
  1185. route_prefix = self.application_state_manager.get_route_prefix(app_name)
  1186. app_target_groups = self._get_target_groups_for_app(app_name, route_prefix)
  1187. if app_target_groups:
  1188. target_groups.extend(app_target_groups)
  1189. else:
  1190. target_groups.extend(
  1191. self._get_target_groups_for_app_with_no_running_replicas(
  1192. route_prefix, app_name
  1193. )
  1194. )
  1195. return target_groups
  1196. def _get_running_replica_details_for_ingress_deployment(
  1197. self, app_name: str
  1198. ) -> List[ReplicaDetails]:
  1199. """Get running replica details for a specific application."""
  1200. ingress_deployment_name = (
  1201. self.application_state_manager.get_ingress_deployment_name(app_name)
  1202. )
  1203. deployment_id = DeploymentID(app_name=app_name, name=ingress_deployment_name)
  1204. details = self.deployment_state_manager.get_deployment_details(deployment_id)
  1205. if not details:
  1206. return []
  1207. replica_details = details.replicas
  1208. running_replica_ids = {
  1209. replica_info.replica_id.unique_id
  1210. for replica_info in self.deployment_state_manager.get_running_replica_infos().get(
  1211. deployment_id, []
  1212. )
  1213. }
  1214. return [
  1215. replica_detail
  1216. for replica_detail in replica_details
  1217. if replica_detail.replica_id in running_replica_ids
  1218. ]
  1219. def _get_target_groups_for_app(
  1220. self, app_name: str, route_prefix: str
  1221. ) -> List[TargetGroup]:
  1222. """
  1223. Create HTTP and gRPC target groups for a specific application.
  1224. This function can return empty list if there are no running replicas.
  1225. Or replicas have not fully initialized yet, where their ports are not
  1226. allocated yet.
  1227. """
  1228. # Get running replicas for the ingress deployment
  1229. replica_details = self._get_running_replica_details_for_ingress_deployment(
  1230. app_name
  1231. )
  1232. if not replica_details:
  1233. return []
  1234. target_groups = []
  1235. # Create targets for each protocol
  1236. http_targets = self._get_targets_for_protocol(
  1237. replica_details, RequestProtocol.HTTP
  1238. )
  1239. if http_targets:
  1240. target_groups.append(
  1241. TargetGroup(
  1242. protocol=RequestProtocol.HTTP,
  1243. route_prefix=route_prefix,
  1244. targets=http_targets,
  1245. app_name=app_name,
  1246. )
  1247. )
  1248. # Add gRPC targets if enabled
  1249. if is_grpc_enabled(self.get_grpc_config()):
  1250. grpc_targets = self._get_targets_for_protocol(
  1251. replica_details, RequestProtocol.GRPC
  1252. )
  1253. if grpc_targets:
  1254. target_groups.append(
  1255. TargetGroup(
  1256. protocol=RequestProtocol.GRPC,
  1257. route_prefix=route_prefix,
  1258. targets=grpc_targets,
  1259. app_name=app_name,
  1260. )
  1261. )
  1262. return target_groups
  1263. def _get_target_groups_for_app_with_no_running_replicas(
  1264. self, route_prefix: str, app_name: str
  1265. ) -> List[TargetGroup]:
  1266. """
  1267. For applications that have no running replicas, we return target groups
  1268. for proxy. This will allow applications to be discoverable via the
  1269. proxy in situations where their replicas have scaled down to 0.
  1270. """
  1271. target_groups = []
  1272. http_targets = self.proxy_state_manager.get_targets(RequestProtocol.HTTP)
  1273. grpc_targets = self.proxy_state_manager.get_targets(RequestProtocol.GRPC)
  1274. if http_targets:
  1275. target_groups.append(
  1276. TargetGroup(
  1277. protocol=RequestProtocol.HTTP,
  1278. route_prefix=route_prefix,
  1279. targets=http_targets,
  1280. app_name=app_name,
  1281. )
  1282. )
  1283. if grpc_targets:
  1284. target_groups.append(
  1285. TargetGroup(
  1286. protocol=RequestProtocol.GRPC,
  1287. route_prefix=route_prefix,
  1288. targets=grpc_targets,
  1289. app_name=app_name,
  1290. )
  1291. )
  1292. return target_groups
  1293. def _get_targets_for_protocol(
  1294. self, replica_details: List[ReplicaDetails], protocol: RequestProtocol
  1295. ) -> List[Target]:
  1296. """Create targets for a specific protocol from a list of replicas."""
  1297. return [
  1298. Target(
  1299. ip=replica_detail.node_ip,
  1300. port=self._get_port(replica_detail, protocol),
  1301. instance_id=replica_detail.node_instance_id,
  1302. name=replica_detail.actor_name,
  1303. )
  1304. for replica_detail in replica_details
  1305. if self._is_port_allocated(replica_detail, protocol)
  1306. ]
  1307. def _get_node_id_to_alive_replica_ids(self) -> Dict[str, Set[str]]:
  1308. node_id_to_alive_replica_ids = defaultdict(set)
  1309. # TODO(abrar): Expose the right APIs in the DeploymentStateManager
  1310. # to get the alive replicas for a deployment.
  1311. for ds in self.deployment_state_manager._deployment_states.values():
  1312. # here we get all the replicas irrespective of their state
  1313. # unlike in the get_running_replica_infos_for_ingress_deployment
  1314. # where we only get the replicas that are running, because we dont
  1315. # wish to agressively cleanup ports for replicas that are not running
  1316. # and are in the process of being updated or are in the process of
  1317. # being started.
  1318. replicas: List[DeploymentReplica] = ds._replicas.get()
  1319. for replica in replicas:
  1320. node_id: Optional[str] = replica.actor_node_id
  1321. if node_id is None:
  1322. continue
  1323. replica_unique_id = replica.replica_id.unique_id
  1324. node_id_to_alive_replica_ids[node_id].add(replica_unique_id)
  1325. return node_id_to_alive_replica_ids
  1326. def allocate_replica_port(
  1327. self, node_id: str, replica_id: str, protocol: RequestProtocol
  1328. ) -> int:
  1329. """Allocate an HTTP port for a replica in direct ingress mode."""
  1330. node_manager = NodePortManager.get_node_manager(node_id)
  1331. return node_manager.allocate_port(replica_id, protocol)
  1332. def release_replica_port(
  1333. self,
  1334. node_id: str,
  1335. replica_id: str,
  1336. port: int,
  1337. protocol: RequestProtocol,
  1338. block_port: bool = False,
  1339. ):
  1340. """Release an HTTP port for a replica in direct ingress mode."""
  1341. node_manager = NodePortManager.get_node_manager(node_id)
  1342. node_manager.release_port(replica_id, port, protocol, block_port)
  1343. def _get_port(
  1344. self, replica_detail: ReplicaDetails, protocol: RequestProtocol
  1345. ) -> int:
  1346. """Get the port for a replica."""
  1347. node_manager = NodePortManager.get_node_manager(replica_detail.node_id)
  1348. return node_manager.get_port(replica_detail.replica_id, protocol)
  1349. def _is_port_allocated(
  1350. self, replica_detail: ReplicaDetails, protocol: RequestProtocol
  1351. ) -> bool:
  1352. """Check if the port for a replica is allocated."""
  1353. node_manager = NodePortManager.get_node_manager(replica_detail.node_id)
  1354. return node_manager.is_port_allocated(replica_detail.replica_id, protocol)
  1355. def get_serve_status(self, name: str = SERVE_DEFAULT_APP_NAME) -> bytes:
  1356. """Return application status
  1357. Args:
  1358. name: application name. If application name doesn't exist, app_status
  1359. is NOT_STARTED.
  1360. """
  1361. app_status = self.application_state_manager.get_app_status_info(name)
  1362. deployment_statuses = self.application_state_manager.get_deployments_statuses(
  1363. name
  1364. )
  1365. status_info = StatusOverview(
  1366. name=name,
  1367. app_status=app_status,
  1368. deployment_statuses=deployment_statuses,
  1369. )
  1370. return status_info.to_proto().SerializeToString()
  1371. def get_serve_statuses(self, names: List[str]) -> List[bytes]:
  1372. statuses = []
  1373. for name in names:
  1374. statuses.append(self.get_serve_status(name))
  1375. return statuses
  1376. def list_serve_statuses(self) -> List[bytes]:
  1377. statuses = []
  1378. for name in self.application_state_manager.list_app_statuses():
  1379. statuses.append(self.get_serve_status(name))
  1380. return statuses
  1381. def get_app_configs(self) -> Dict[str, ServeApplicationSchema]:
  1382. checkpoint = self.kv_store.get(CONFIG_CHECKPOINT_KEY)
  1383. if checkpoint is None:
  1384. return {}
  1385. _, _, _, config_checkpoints_dict = pickle.loads(checkpoint)
  1386. return {
  1387. app: ServeApplicationSchema.parse_obj(config)
  1388. for app, config in config_checkpoints_dict.items()
  1389. }
  1390. def get_external_scaler_enabled(self, app_name: str) -> bool:
  1391. """Get the external_scaler_enabled flag value for an application.
  1392. This is a helper method specifically for Java tests to verify the flag
  1393. is correctly set, since Java cannot deserialize Python Pydantic objects.
  1394. Args:
  1395. app_name: Name of the application.
  1396. Returns:
  1397. True if external_scaler_enabled is set for the application, False otherwise.
  1398. """
  1399. return self.application_state_manager.get_external_scaler_enabled(app_name)
  1400. def get_all_deployment_statuses(self) -> List[bytes]:
  1401. """Gets deployment status bytes for all live deployments."""
  1402. statuses = self.deployment_state_manager.get_deployment_statuses()
  1403. return [status.to_proto().SerializeToString() for status in statuses]
  1404. def get_deployment_status(
  1405. self, name: str, app_name: str = ""
  1406. ) -> Union[None, bytes]:
  1407. """Get deployment status by deployment name.
  1408. Args:
  1409. name: Deployment name.
  1410. app_name: Application name. Default is "" because 1.x
  1411. deployments go through this API.
  1412. """
  1413. id = DeploymentID(name=name, app_name=app_name)
  1414. status = self.deployment_state_manager.get_deployment_statuses([id])
  1415. if not status:
  1416. return None
  1417. return status[0].to_proto().SerializeToString()
  1418. def get_docs_path(self, name: str):
  1419. """Docs path for application.
  1420. Currently, this is the OpenAPI docs path for FastAPI-integrated applications."""
  1421. return self.application_state_manager.get_docs_path(name)
  1422. def get_ingress_deployment_name(self, app_name: str) -> Optional[str]:
  1423. """Name of the ingress deployment in an application.
  1424. Returns:
  1425. Ingress deployment name (str): if the application exists.
  1426. None: if the application does not exist.
  1427. """
  1428. return self.application_state_manager.get_ingress_deployment_name(app_name)
  1429. def delete_apps(self, names: Iterable[str]):
  1430. """Delete applications based on names
  1431. During deletion, the application status is DELETING
  1432. """
  1433. for name in names:
  1434. self.application_state_manager.delete_app(name)
  1435. self.application_state_manager.save_checkpoint()
  1436. def record_request_routing_info(self, info: RequestRoutingInfo):
  1437. """Record replica routing information for a replica.
  1438. Args:
  1439. info: RequestRoutingInfo including deployment name, replica tag,
  1440. multiplex model ids, and routing stats.
  1441. """
  1442. self.deployment_state_manager.record_request_routing_info(info)
  1443. def _get_replica_ranks_mapping(
  1444. self, deployment_id: DeploymentID
  1445. ) -> Dict[str, ReplicaRank]:
  1446. """Get the current rank mapping for all replicas in a deployment.
  1447. Args:
  1448. deployment_id: The deployment ID to get ranks for.
  1449. Returns:
  1450. Dictionary mapping replica_id to ReplicaRank object (with rank, node_rank, local_rank).
  1451. """
  1452. return self.deployment_state_manager._get_replica_ranks_mapping(deployment_id)
  1453. async def graceful_shutdown(self, wait: bool = True):
  1454. """Set the shutting down flag on controller to signal shutdown in
  1455. run_control_loop().
  1456. This is used to signal to the controller that it should proceed with shutdown
  1457. process, so it can shut down gracefully. It also waits until the shutdown
  1458. event is triggered if wait is true.
  1459. Raises:
  1460. RayActorError: if wait is True, the caller waits until the controller
  1461. is killed, which raises a RayActorError.
  1462. """
  1463. self._shutting_down = True
  1464. if not wait:
  1465. return
  1466. # This event never gets set. The caller waits indefinitely on this event
  1467. # until the controller is killed, which raises a RayActorError.
  1468. await self._shutdown_event.wait()
  1469. def _get_logging_config(self) -> Tuple:
  1470. """Get the logging configuration (for testing purposes)."""
  1471. log_file_path = None
  1472. for handler in logger.handlers:
  1473. if isinstance(handler, logging.handlers.MemoryHandler):
  1474. log_file_path = handler.target.baseFilename
  1475. return self.global_logging_config, log_file_path
  1476. def _get_target_capacity_direction(self) -> Optional[TargetCapacityDirection]:
  1477. """Gets the controller's scale direction (for testing purposes)."""
  1478. return self._target_capacity_direction
  1479. def calculate_target_capacity_direction(
  1480. curr_config: Optional[ServeDeploySchema],
  1481. new_config: ServeDeploySchema,
  1482. curr_target_capacity_direction: Optional[float],
  1483. ) -> Optional[TargetCapacityDirection]:
  1484. """Compares two Serve configs to calculate the next scaling direction."""
  1485. curr_target_capacity = None
  1486. next_target_capacity_direction = None
  1487. if curr_config is not None and applications_match(curr_config, new_config):
  1488. curr_target_capacity = curr_config.target_capacity
  1489. next_target_capacity = new_config.target_capacity
  1490. if curr_target_capacity == next_target_capacity:
  1491. next_target_capacity_direction = curr_target_capacity_direction
  1492. elif curr_target_capacity is None and next_target_capacity is not None:
  1493. # target_capacity is scaling down from None to a number.
  1494. next_target_capacity_direction = TargetCapacityDirection.DOWN
  1495. elif next_target_capacity is None:
  1496. next_target_capacity_direction = None
  1497. elif curr_target_capacity < next_target_capacity:
  1498. next_target_capacity_direction = TargetCapacityDirection.UP
  1499. else:
  1500. next_target_capacity_direction = TargetCapacityDirection.DOWN
  1501. elif new_config.target_capacity is not None:
  1502. # A config with different apps has been applied, and it contains a
  1503. # target_capacity. Serve must start scaling this config up.
  1504. next_target_capacity_direction = TargetCapacityDirection.UP
  1505. else:
  1506. next_target_capacity_direction = None
  1507. return next_target_capacity_direction
  1508. def applications_match(config1: ServeDeploySchema, config2: ServeDeploySchema) -> bool:
  1509. """Checks whether the applications in config1 and config2 match.
  1510. Two applications match if they have the same name.
  1511. """
  1512. config1_app_names = {app.name for app in config1.applications}
  1513. config2_app_names = {app.name for app in config2.applications}
  1514. return config1_app_names == config2_app_names
  1515. def log_target_capacity_change(
  1516. curr_target_capacity: Optional[float],
  1517. next_target_capacity: Optional[float],
  1518. next_target_capacity_direction: Optional[TargetCapacityDirection],
  1519. ):
  1520. """Logs changes in the target_capacity."""
  1521. if curr_target_capacity != next_target_capacity:
  1522. if isinstance(next_target_capacity_direction, TargetCapacityDirection):
  1523. logger.info(
  1524. "Target capacity scaling "
  1525. f"{next_target_capacity_direction.value.lower()} "
  1526. f"from {curr_target_capacity} to {next_target_capacity}."
  1527. )
  1528. else:
  1529. logger.info("Target capacity entering 100% at steady state.")