application_state.py 71 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789
  1. import inspect
  2. import json
  3. import logging
  4. import os
  5. import time
  6. import traceback
  7. from copy import deepcopy
  8. from dataclasses import asdict, dataclass, field
  9. from enum import Enum
  10. from typing import Dict, List, Optional, Tuple
  11. import ray
  12. from ray import cloudpickle
  13. from ray._common.utils import import_attr, import_module_and_attr
  14. from ray.exceptions import RuntimeEnvSetupError
  15. from ray.serve._private.autoscaling_state import AutoscalingStateManager
  16. from ray.serve._private.build_app import BuiltApplication, build_app
  17. from ray.serve._private.common import (
  18. DeploymentID,
  19. DeploymentStatus,
  20. DeploymentStatusInfo,
  21. DeploymentStatusTrigger,
  22. EndpointInfo,
  23. TargetCapacityDirection,
  24. )
  25. from ray.serve._private.config import DeploymentConfig
  26. from ray.serve._private.constants import (
  27. DEFAULT_AUTOSCALING_POLICY_NAME,
  28. DEFAULT_REQUEST_ROUTER_PATH,
  29. RAY_SERVE_ENABLE_TASK_EVENTS,
  30. SERVE_LOGGER_NAME,
  31. )
  32. from ray.serve._private.deploy_utils import (
  33. deploy_args_to_deployment_info,
  34. get_app_code_version,
  35. get_deploy_args,
  36. )
  37. from ray.serve._private.deployment_info import DeploymentInfo
  38. from ray.serve._private.deployment_state import DeploymentStateManager
  39. from ray.serve._private.endpoint_state import EndpointState
  40. from ray.serve._private.logging_utils import configure_component_logger
  41. from ray.serve._private.storage.kv_store import KVStoreBase
  42. from ray.serve._private.usage import ServeUsageTag
  43. from ray.serve._private.utils import (
  44. DEFAULT,
  45. check_obj_ref_ready_nowait,
  46. override_runtime_envs_except_env_vars,
  47. validate_route_prefix,
  48. )
  49. from ray.serve.api import ASGIAppReplicaWrapper
  50. from ray.serve.config import AutoscalingConfig, AutoscalingPolicy, RequestRouterConfig
  51. from ray.serve.exceptions import RayServeException
  52. from ray.serve.generated.serve_pb2 import (
  53. ApplicationArgs as ApplicationArgsProto,
  54. ApplicationStatus as ApplicationStatusProto,
  55. ApplicationStatusInfo as ApplicationStatusInfoProto,
  56. DeploymentLanguage,
  57. DeploymentStatusInfoList as DeploymentStatusInfoListProto,
  58. StatusOverview as StatusOverviewProto,
  59. )
  60. from ray.serve.schema import (
  61. APIType,
  62. ApplicationStatus,
  63. DeploymentDetails,
  64. DeploymentNode,
  65. DeploymentTopology,
  66. LoggingConfig,
  67. ServeApplicationSchema,
  68. )
  69. from ray.types import ObjectRef
  70. from ray.util import metrics as ray_metrics
  71. logger = logging.getLogger(SERVE_LOGGER_NAME)
  72. CHECKPOINT_KEY = "serve-application-state-checkpoint"
  73. class BuildAppStatus(Enum):
  74. """Status of the build application task."""
  75. NO_TASK_IN_PROGRESS = 1
  76. IN_PROGRESS = 2
  77. SUCCEEDED = 3
  78. FAILED = 4
  79. @dataclass
  80. class BuildAppTaskInfo:
  81. """Stores info on the current in-progress build app task.
  82. We use a class instead of only storing the task object ref because
  83. when a new config is deployed, there can be an outdated in-progress
  84. build app task. We attach the code version to the task info to
  85. distinguish outdated build app tasks.
  86. """
  87. obj_ref: ObjectRef
  88. code_version: str
  89. config: ServeApplicationSchema
  90. target_capacity: Optional[float]
  91. target_capacity_direction: Optional[TargetCapacityDirection]
  92. finished: bool
  93. @dataclass(eq=True)
  94. class ApplicationStatusInfo:
  95. status: ApplicationStatus
  96. message: str = ""
  97. deployment_timestamp: float = 0
  98. def debug_string(self):
  99. return json.dumps(asdict(self), indent=4)
  100. def to_proto(self):
  101. return ApplicationStatusInfoProto(
  102. status=f"APPLICATION_STATUS_{self.status.name}",
  103. message=self.message,
  104. deployment_timestamp=self.deployment_timestamp,
  105. )
  106. @classmethod
  107. def from_proto(cls, proto: ApplicationStatusInfoProto):
  108. status = ApplicationStatusProto.Name(proto.status)[len("APPLICATION_STATUS_") :]
  109. return cls(
  110. status=ApplicationStatus(status),
  111. message=proto.message,
  112. deployment_timestamp=proto.deployment_timestamp,
  113. )
  114. @dataclass(eq=True)
  115. class StatusOverview:
  116. app_status: ApplicationStatusInfo
  117. name: str = ""
  118. deployment_statuses: List[DeploymentStatusInfo] = field(default_factory=list)
  119. def debug_string(self):
  120. return json.dumps(asdict(self), indent=4)
  121. def get_deployment_status(self, name: str) -> Optional[DeploymentStatusInfo]:
  122. """Get a deployment's status by name.
  123. Args:
  124. name: Deployment's name.
  125. Returns:
  126. Optional[DeploymentStatusInfo]: The status of the deployment if it exists,
  127. otherwise None.
  128. """
  129. for deployment_status in self.deployment_statuses:
  130. if name == deployment_status.name:
  131. return deployment_status
  132. return None
  133. def to_proto(self):
  134. # Create a protobuf for the Serve Application info
  135. app_status_proto = self.app_status.to_proto()
  136. # Create protobufs for all individual deployment statuses
  137. deployment_status_protos = map(
  138. lambda status: status.to_proto(), self.deployment_statuses
  139. )
  140. # Create a protobuf list containing all the deployment status protobufs
  141. deployment_status_proto_list = DeploymentStatusInfoListProto()
  142. deployment_status_proto_list.deployment_status_infos.extend(
  143. deployment_status_protos
  144. )
  145. # Return protobuf encapsulating application and deployment protos
  146. return StatusOverviewProto(
  147. name=self.name,
  148. app_status=app_status_proto,
  149. deployment_statuses=deployment_status_proto_list,
  150. )
  151. @classmethod
  152. def from_proto(cls, proto: StatusOverviewProto) -> "StatusOverview":
  153. # Recreate Serve Application info
  154. app_status = ApplicationStatusInfo.from_proto(proto.app_status)
  155. # Recreate deployment statuses
  156. deployment_statuses = []
  157. for info_proto in proto.deployment_statuses.deployment_status_infos:
  158. deployment_statuses.append(DeploymentStatusInfo.from_proto(info_proto))
  159. # Recreate StatusInfo
  160. return cls(
  161. app_status=app_status,
  162. deployment_statuses=deployment_statuses,
  163. name=proto.name,
  164. )
  165. @dataclass
  166. class ApplicationTargetState:
  167. """Defines target state of application.
  168. Target state can become inconsistent if the code version doesn't
  169. match that of the config. When that happens, a new build app task
  170. should be kicked off to reconcile the inconsistency.
  171. deployment_infos: map of deployment name to deployment info. This is
  172. - None if a config was deployed but the app hasn't finished
  173. building yet,
  174. - An empty dict if the app is deleting.
  175. code_version: Code version of all deployments in target state. None
  176. if application was deployed through serve.run.
  177. config: application config deployed by user. None if application was
  178. deployed through serve.run.
  179. target_capacity: the target_capacity to use when adjusting num_replicas.
  180. target_capacity_direction: the scale direction to use when
  181. running the Serve autoscaler.
  182. deleting: whether the application is being deleted.
  183. external_scaler_enabled: whether external autoscaling is enabled for
  184. this application.
  185. serialized_application_autoscaling_policy_def: Optional[bytes]
  186. """
  187. deployment_infos: Optional[Dict[str, DeploymentInfo]]
  188. code_version: Optional[str]
  189. config: Optional[ServeApplicationSchema]
  190. target_capacity: Optional[float]
  191. target_capacity_direction: Optional[TargetCapacityDirection]
  192. deleting: bool
  193. api_type: APIType
  194. serialized_application_autoscaling_policy_def: Optional[bytes]
  195. external_scaler_enabled: bool
  196. class ApplicationState:
  197. """Manage single application states with all operations"""
  198. def __init__(
  199. self,
  200. name: str,
  201. deployment_state_manager: DeploymentStateManager,
  202. autoscaling_state_manager: AutoscalingStateManager,
  203. endpoint_state: EndpointState,
  204. logging_config: LoggingConfig,
  205. external_scaler_enabled: bool,
  206. ):
  207. """
  208. Initialize an ApplicationState instance.
  209. Args:
  210. name: Application name.
  211. deployment_state_manager: Manages the state of all deployments in the cluster.
  212. autoscaling_state_manager: Manages autoscaling decisions in the cluster.
  213. endpoint_state: Manages endpoints in the system.
  214. logging_config: Logging configuration schema.
  215. external_scaler_enabled: Whether external autoscaling is enabled for
  216. this application.
  217. """
  218. self._name = name
  219. self._status_msg = ""
  220. self._deployment_state_manager = deployment_state_manager
  221. self._autoscaling_state_manager = autoscaling_state_manager
  222. self._endpoint_state = endpoint_state
  223. self._route_prefix: Optional[str] = None
  224. self._ingress_deployment_name: Optional[str] = None
  225. self._status: ApplicationStatus = ApplicationStatus.DEPLOYING
  226. self._deployment_timestamp = time.time()
  227. self._build_app_task_info: Optional[BuildAppTaskInfo] = None
  228. # Before a deploy app task finishes, we don't know what the
  229. # target deployments are, so set deployment_infos=None
  230. self._target_state: ApplicationTargetState = ApplicationTargetState(
  231. deployment_infos=None,
  232. code_version=None,
  233. config=None,
  234. target_capacity=None,
  235. target_capacity_direction=None,
  236. deleting=False,
  237. api_type=APIType.UNKNOWN,
  238. external_scaler_enabled=external_scaler_enabled,
  239. serialized_application_autoscaling_policy_def=None,
  240. )
  241. self._logging_config = logging_config
  242. @property
  243. def route_prefix(self) -> Optional[str]:
  244. return self._route_prefix
  245. @property
  246. def external_scaler_enabled(self) -> bool:
  247. return self._target_state.external_scaler_enabled
  248. @property
  249. def docs_path(self) -> Optional[str]:
  250. # get the docs path from the running deployments
  251. # we are making an assumption that the docs path can only be set
  252. # on ingress deployments with fastapi.
  253. ingress_deployment = DeploymentID(self._ingress_deployment_name, self._name)
  254. return self._deployment_state_manager.get_deployment_docs_path(
  255. ingress_deployment
  256. )
  257. @property
  258. def status(self) -> ApplicationStatus:
  259. """Status of the application.
  260. DEPLOYING: The build task is still running, or the deployments
  261. have started deploying but aren't healthy yet.
  262. RUNNING: All deployments are healthy.
  263. DEPLOY_FAILED: The build task failed or one or more deployments
  264. became unhealthy in the process of deploying
  265. UNHEALTHY: While the application was running, one or more
  266. deployments transition from healthy to unhealthy.
  267. DELETING: Application and its deployments are being deleted.
  268. """
  269. return self._status
  270. @property
  271. def deployment_timestamp(self) -> float:
  272. return self._deployment_timestamp
  273. @property
  274. def target_deployments(self) -> List[str]:
  275. """List of target deployment names in application."""
  276. if self._target_state.deployment_infos is None:
  277. return []
  278. return list(self._target_state.deployment_infos.keys())
  279. @property
  280. def ingress_deployment(self) -> Optional[str]:
  281. return self._ingress_deployment_name
  282. @property
  283. def api_type(self) -> APIType:
  284. return self._target_state.api_type
  285. def recover_target_state_from_checkpoint(
  286. self, checkpoint_data: ApplicationTargetState
  287. ):
  288. logger.info(
  289. f"Recovering target state for application '{self._name}' from checkpoint."
  290. )
  291. self._set_target_state(
  292. checkpoint_data.deployment_infos,
  293. api_type=checkpoint_data.api_type,
  294. code_version=checkpoint_data.code_version,
  295. target_config=checkpoint_data.config,
  296. target_capacity=checkpoint_data.target_capacity,
  297. target_capacity_direction=checkpoint_data.target_capacity_direction,
  298. deleting=checkpoint_data.deleting,
  299. external_scaler_enabled=checkpoint_data.external_scaler_enabled,
  300. )
  301. # Restore route prefix and docs path from checkpointed deployments when
  302. # the imperatively started application is restarting with controller.
  303. if checkpoint_data.deployment_infos is not None:
  304. self._route_prefix = self._check_routes(checkpoint_data.deployment_infos)
  305. # Restore app-level autoscaling policy from checkpoint
  306. if (
  307. checkpoint_data.config
  308. and checkpoint_data.config.autoscaling_policy is not None
  309. ):
  310. self._autoscaling_state_manager.register_application(
  311. self._name,
  312. AutoscalingPolicy(
  313. _serialized_policy_def=checkpoint_data.serialized_application_autoscaling_policy_def,
  314. **checkpoint_data.config.autoscaling_policy,
  315. ),
  316. )
  317. def _set_target_state(
  318. self,
  319. deployment_infos: Optional[Dict[str, DeploymentInfo]],
  320. *,
  321. api_type: APIType,
  322. code_version: Optional[str],
  323. target_config: Optional[ServeApplicationSchema],
  324. target_capacity: Optional[float] = None,
  325. target_capacity_direction: Optional[TargetCapacityDirection] = None,
  326. deleting: bool = False,
  327. external_scaler_enabled: bool = False,
  328. serialized_application_autoscaling_policy_def: Optional[bytes] = None,
  329. ):
  330. """Set application target state.
  331. While waiting for build task to finish, this should be
  332. (None, False)
  333. When build task has finished and during normal operation, this should be
  334. (target_deployments, False)
  335. When a request to delete the application has been received, this should be
  336. ({}, True)
  337. """
  338. if deleting:
  339. self._update_status(ApplicationStatus.DELETING)
  340. else:
  341. self._update_status(ApplicationStatus.DEPLOYING)
  342. if deployment_infos is None:
  343. self._ingress_deployment_name = None
  344. else:
  345. for name, info in deployment_infos.items():
  346. if info.ingress:
  347. self._ingress_deployment_name = name
  348. target_state = ApplicationTargetState(
  349. deployment_infos,
  350. code_version,
  351. target_config,
  352. target_capacity,
  353. target_capacity_direction,
  354. deleting,
  355. api_type=api_type,
  356. external_scaler_enabled=external_scaler_enabled,
  357. serialized_application_autoscaling_policy_def=serialized_application_autoscaling_policy_def,
  358. )
  359. self._target_state = target_state
  360. def _set_target_state_deleting(self):
  361. """Set target state to deleting.
  362. Wipes the target deployment infos, code version, and config.
  363. """
  364. self._set_target_state(
  365. deployment_infos={},
  366. api_type=self._target_state.api_type,
  367. code_version=None,
  368. target_config=None,
  369. deleting=True,
  370. external_scaler_enabled=self.external_scaler_enabled,
  371. )
  372. def _clear_target_state_and_store_config(
  373. self,
  374. target_config: Optional[ServeApplicationSchema],
  375. ):
  376. """Clears the target state and stores the config.
  377. NOTE: this currently assumes that this method is *only* called when managing
  378. apps deployed with the declarative API.
  379. """
  380. self._set_target_state(
  381. deployment_infos=None,
  382. api_type=APIType.DECLARATIVE,
  383. code_version=None,
  384. target_config=target_config,
  385. deleting=False,
  386. external_scaler_enabled=target_config.external_scaler_enabled
  387. if target_config
  388. else False,
  389. )
  390. def _delete_deployment(self, name: str) -> bool:
  391. """Delete a deployment in the application.
  392. Args:
  393. name: The name of the deployment to delete.
  394. Returns:
  395. Whether the target state has changed.
  396. """
  397. id = DeploymentID(name=name, app_name=self._name)
  398. self._endpoint_state.delete_endpoint(id)
  399. return self._deployment_state_manager.delete_deployment(id)
  400. def delete(self):
  401. """Delete the application"""
  402. if self._status != ApplicationStatus.DELETING:
  403. logger.info(
  404. f"Deleting app '{self._name}'.",
  405. extra={"log_to_stderr": False},
  406. )
  407. self._set_target_state_deleting()
  408. def is_deleted(self) -> bool:
  409. """Check whether the application is already deleted.
  410. For an application to be considered deleted, the target state has to be set to
  411. deleting and all deployments have to be deleted.
  412. """
  413. return self._target_state.deleting and len(self._get_live_deployments()) == 0
  414. def should_autoscale(self) -> bool:
  415. """Determine if autoscaling is enabled for the application.
  416. Returns:
  417. Autoscaling is enabled for the application if any of the deployments have autoscaling enabled.
  418. """
  419. return self._autoscaling_state_manager.should_autoscale_application(self._name)
  420. def autoscale(self) -> bool:
  421. """
  422. Apply the autoscaling decisions for the application.
  423. If the application has deployment-level autoscaling, it will apply the autoscaling decisions for each deployment.
  424. Returns:
  425. True if there is a change to number of replicas for any deployment. False otherwise.
  426. """
  427. target_deployments = self.target_deployments
  428. if len(target_deployments) == 0:
  429. return False
  430. deployment_to_target_num_replicas: Dict[DeploymentID, int] = {}
  431. for deployment_name in target_deployments:
  432. deployment_id = DeploymentID(name=deployment_name, app_name=self._name)
  433. target_num_replicas = (
  434. self._deployment_state_manager.get_deployment_target_num_replicas(
  435. deployment_id
  436. )
  437. )
  438. if target_num_replicas is None:
  439. continue
  440. deployment_to_target_num_replicas[deployment_id] = target_num_replicas
  441. if len(deployment_to_target_num_replicas) == 0:
  442. return False
  443. decisions: Dict[
  444. DeploymentID, int
  445. ] = self._autoscaling_state_manager.get_decision_num_replicas(
  446. self._name, deployment_to_target_num_replicas
  447. )
  448. target_state_changed = False
  449. for deployment_id, decision_num_replicas in decisions.items():
  450. target_state_changed = (
  451. self._deployment_state_manager.autoscale(
  452. deployment_id, decision_num_replicas
  453. )
  454. or target_state_changed
  455. )
  456. return target_state_changed
  457. def apply_deployment_info(
  458. self,
  459. deployment_name: str,
  460. deployment_info: DeploymentInfo,
  461. ) -> bool:
  462. """Deploys a deployment in the application.
  463. Args:
  464. deployment_name: The name of the deployment to apply.
  465. deployment_info: The deployment info to apply.
  466. Returns:
  467. Whether the target state has changed.
  468. """
  469. route_prefix = deployment_info.route_prefix
  470. if route_prefix is not None and not route_prefix.startswith("/"):
  471. raise RayServeException(
  472. f'Invalid route prefix "{route_prefix}", it must start with "/"'
  473. )
  474. deployment_id = DeploymentID(name=deployment_name, app_name=self._name)
  475. target_state_changed = self._deployment_state_manager.deploy(
  476. deployment_id, deployment_info
  477. )
  478. if deployment_info.route_prefix is not None:
  479. config = deployment_info.deployment_config
  480. # Try to get route_patterns from deployment state first (most up-to-date),
  481. # otherwise fall back to existing endpoint patterns
  482. route_patterns = (
  483. self._deployment_state_manager.get_deployment_route_patterns(
  484. deployment_id
  485. )
  486. )
  487. self._endpoint_state.update_endpoint(
  488. deployment_id,
  489. # The current meaning of the "is_cross_language" field is ambiguous.
  490. # We will work on optimizing and removing this field in the future.
  491. # Instead of using the "is_cross_language" field, we will directly
  492. # compare if the replica is Python, which will assist the Python
  493. # router in determining if the replica invocation is a cross-language
  494. # operation.
  495. EndpointInfo(
  496. route=deployment_info.route_prefix,
  497. app_is_cross_language=config.deployment_language
  498. != DeploymentLanguage.PYTHON,
  499. route_patterns=route_patterns,
  500. ),
  501. )
  502. else:
  503. self._endpoint_state.delete_endpoint(deployment_id)
  504. return target_state_changed
  505. def deploy_app(
  506. self,
  507. deployment_infos: Dict[str, DeploymentInfo],
  508. external_scaler_enabled: bool,
  509. ):
  510. """(Re-)deploy the application from list of deployment infos.
  511. This function should only be called to deploy an app from an
  512. imperative API (i.e., `serve.run` or Java API).
  513. Raises: RayServeException if there is more than one route prefix
  514. or docs path.
  515. """
  516. # Check routes are unique in deployment infos
  517. self._route_prefix = self._check_routes(deployment_infos)
  518. self._set_target_state(
  519. deployment_infos=deployment_infos,
  520. api_type=APIType.IMPERATIVE,
  521. code_version=None,
  522. target_config=None,
  523. target_capacity=None,
  524. target_capacity_direction=None,
  525. external_scaler_enabled=external_scaler_enabled,
  526. )
  527. def apply_app_config(
  528. self,
  529. config: ServeApplicationSchema,
  530. target_capacity: Optional[float],
  531. target_capacity_direction: Optional[TargetCapacityDirection],
  532. deployment_time: float,
  533. ) -> None:
  534. """Apply the config to the application.
  535. If the code version matches that of the current live deployments
  536. then it only applies the updated config to the deployment state
  537. manager. If the code version doesn't match, this will re-build
  538. the application.
  539. This function should only be called to (re-)deploy an app from
  540. the declarative API (i.e., through the REST API).
  541. """
  542. self._deployment_timestamp = deployment_time
  543. config_version = get_app_code_version(config)
  544. if config_version == self._target_state.code_version:
  545. try:
  546. overrided_infos = override_deployment_info(
  547. self._target_state.deployment_infos,
  548. config,
  549. )
  550. self._route_prefix = self._check_routes(overrided_infos)
  551. self._set_target_state(
  552. # Code version doesn't change.
  553. code_version=self._target_state.code_version,
  554. api_type=APIType.DECLARATIVE,
  555. # Everything else must reflect the new config.
  556. deployment_infos=overrided_infos,
  557. target_config=config,
  558. target_capacity=target_capacity,
  559. target_capacity_direction=target_capacity_direction,
  560. external_scaler_enabled=config.external_scaler_enabled,
  561. )
  562. except (TypeError, ValueError, RayServeException):
  563. self._clear_target_state_and_store_config(config)
  564. self._update_status(
  565. ApplicationStatus.DEPLOY_FAILED, traceback.format_exc()
  566. )
  567. except Exception:
  568. self._clear_target_state_and_store_config(config)
  569. self._update_status(
  570. ApplicationStatus.DEPLOY_FAILED,
  571. (
  572. f"Unexpected error occurred while applying config for "
  573. f"application '{self._name}': \n{traceback.format_exc()}"
  574. ),
  575. )
  576. else:
  577. # If there is an in progress build task, cancel it.
  578. if self._build_app_task_info and not self._build_app_task_info.finished:
  579. logger.info(
  580. f"Received new config for application '{self._name}'. "
  581. "Cancelling previous request."
  582. )
  583. ray.cancel(self._build_app_task_info.obj_ref)
  584. # Halt reconciliation of target deployments. A new target state
  585. # will be set once the new app has finished building.
  586. self._clear_target_state_and_store_config(config)
  587. # Record telemetry for container runtime env feature
  588. if self._target_state.config.runtime_env.get(
  589. "container"
  590. ) or self._target_state.config.runtime_env.get("image_uri"):
  591. ServeUsageTag.APP_CONTAINER_RUNTIME_ENV_USED.record("1")
  592. if isinstance(config.autoscaling_policy, dict):
  593. application_autoscaling_policy_function = config.autoscaling_policy.get(
  594. "policy_function"
  595. )
  596. else:
  597. application_autoscaling_policy_function = None
  598. deployment_to_autoscaling_policy_function = {
  599. deployment.name: deployment.autoscaling_config.get("policy", {}).get(
  600. "policy_function", DEFAULT_AUTOSCALING_POLICY_NAME
  601. )
  602. for deployment in config.deployments
  603. if isinstance(deployment.autoscaling_config, dict)
  604. }
  605. deployment_to_request_router_cls = {
  606. deployment.name: deployment.request_router_config.get(
  607. "request_router_class", DEFAULT_REQUEST_ROUTER_PATH
  608. )
  609. for deployment in config.deployments
  610. if isinstance(deployment.request_router_config, dict)
  611. }
  612. # Kick off new build app task
  613. logger.info(f"Importing and building app '{self._name}'.")
  614. build_app_obj_ref = build_serve_application.options(
  615. runtime_env=config.runtime_env,
  616. enable_task_events=RAY_SERVE_ENABLE_TASK_EVENTS,
  617. ).remote(
  618. config.import_path,
  619. config_version,
  620. config.name,
  621. config.args,
  622. self._logging_config,
  623. application_autoscaling_policy_function,
  624. deployment_to_autoscaling_policy_function,
  625. deployment_to_request_router_cls,
  626. )
  627. self._build_app_task_info = BuildAppTaskInfo(
  628. obj_ref=build_app_obj_ref,
  629. code_version=config_version,
  630. config=config,
  631. target_capacity=target_capacity,
  632. target_capacity_direction=target_capacity_direction,
  633. finished=False,
  634. )
  635. def _get_live_deployments(self) -> List[str]:
  636. return self._deployment_state_manager.get_deployments_in_application(self._name)
  637. def _determine_app_status(self) -> Tuple[ApplicationStatus, str]:
  638. """Check deployment statuses and target state, and determine the
  639. corresponding application status.
  640. Returns:
  641. Status (ApplicationStatus):
  642. RUNNING: all deployments are healthy or autoscaling.
  643. DEPLOYING: there is one or more updating deployments,
  644. and there are no unhealthy deployments.
  645. DEPLOY_FAILED: one or more deployments became unhealthy
  646. while the application was deploying.
  647. UNHEALTHY: one or more deployments became unhealthy
  648. while the application was running.
  649. DELETING: the application is being deleted.
  650. Error message (str):
  651. Non-empty string if status is DEPLOY_FAILED or UNHEALTHY
  652. """
  653. if self._target_state.deleting:
  654. return ApplicationStatus.DELETING, ""
  655. # Get the lowest rank, i.e. highest priority, deployment status info object
  656. # The deployment status info with highest priority determines the corresponding
  657. # application status to set.
  658. deployment_statuses = self.get_deployments_statuses()
  659. lowest_rank_status = min(deployment_statuses, key=lambda info: info.rank)
  660. if lowest_rank_status.status == DeploymentStatus.DEPLOY_FAILED:
  661. failed_deployments = [
  662. s.name
  663. for s in deployment_statuses
  664. if s.status == DeploymentStatus.DEPLOY_FAILED
  665. ]
  666. return (
  667. ApplicationStatus.DEPLOY_FAILED,
  668. f"Failed to update the deployments {failed_deployments}.",
  669. )
  670. elif lowest_rank_status.status == DeploymentStatus.UNHEALTHY:
  671. unhealthy_deployment_names = [
  672. s.name
  673. for s in deployment_statuses
  674. if s.status == DeploymentStatus.UNHEALTHY
  675. ]
  676. return (
  677. ApplicationStatus.UNHEALTHY,
  678. f"The deployments {unhealthy_deployment_names} are UNHEALTHY.",
  679. )
  680. elif lowest_rank_status.status == DeploymentStatus.UPDATING:
  681. return ApplicationStatus.DEPLOYING, ""
  682. elif (
  683. lowest_rank_status.status
  684. in [DeploymentStatus.UPSCALING, DeploymentStatus.DOWNSCALING]
  685. and lowest_rank_status.status_trigger
  686. == DeploymentStatusTrigger.CONFIG_UPDATE_STARTED
  687. ):
  688. return ApplicationStatus.DEPLOYING, ""
  689. else:
  690. return ApplicationStatus.RUNNING, ""
  691. def _reconcile_build_app_task(
  692. self,
  693. ) -> Tuple[Optional[bytes], Optional[Dict], BuildAppStatus, str]:
  694. """If necessary, reconcile the in-progress build task.
  695. Returns:
  696. Serialized application autoscaling policy def (bytes):
  697. The serialized application autoscaling policy def returned from the build app task
  698. if it was built successfully, otherwise None.
  699. Deploy arguments (Dict[str, DeploymentInfo]):
  700. The deploy arguments returned from the build app task
  701. and their code version.
  702. Status (BuildAppStatus):
  703. NO_TASK_IN_PROGRESS: There is no build task to reconcile.
  704. SUCCEEDED: Task finished successfully.
  705. FAILED: An error occurred during execution of build app task
  706. IN_PROGRESS: Task hasn't finished yet.
  707. Error message (str):
  708. Non-empty string if status is DEPLOY_FAILED or UNHEALTHY
  709. """
  710. if self._build_app_task_info is None or self._build_app_task_info.finished:
  711. return None, None, BuildAppStatus.NO_TASK_IN_PROGRESS, ""
  712. if not check_obj_ref_ready_nowait(self._build_app_task_info.obj_ref):
  713. return None, None, BuildAppStatus.IN_PROGRESS, ""
  714. # Retrieve build app task result
  715. self._build_app_task_info.finished = True
  716. try:
  717. serialized_application_autoscaling_policy_def, args, err = ray.get(
  718. self._build_app_task_info.obj_ref
  719. )
  720. if err is None:
  721. logger.info(f"Imported and built app '{self._name}' successfully.")
  722. else:
  723. return (
  724. None,
  725. None,
  726. BuildAppStatus.FAILED,
  727. f"Deploying app '{self._name}' failed with exception:\n{err}",
  728. )
  729. except RuntimeEnvSetupError:
  730. error_msg = (
  731. f"Runtime env setup for app '{self._name}' failed:\n"
  732. + traceback.format_exc()
  733. )
  734. return None, None, BuildAppStatus.FAILED, error_msg
  735. except Exception:
  736. error_msg = (
  737. f"Unexpected error occurred while deploying application "
  738. f"'{self._name}': \n{traceback.format_exc()}"
  739. )
  740. return None, None, BuildAppStatus.FAILED, error_msg
  741. # Convert serialized deployment args (returned by build app task)
  742. # to deployment infos and apply option overrides from config
  743. try:
  744. deployment_infos = {
  745. params["deployment_name"]: deploy_args_to_deployment_info(
  746. **params, app_name=self._name
  747. )
  748. for params in args
  749. }
  750. deployment_to_serialized_autoscaling_policy_def = {
  751. params["deployment_name"]: params["serialized_autoscaling_policy_def"]
  752. for params in args
  753. if params["serialized_autoscaling_policy_def"] is not None
  754. }
  755. deployment_to_serialized_request_router_cls = {
  756. params["deployment_name"]: params["serialized_request_router_cls"]
  757. for params in args
  758. if params["serialized_request_router_cls"] is not None
  759. }
  760. overrided_infos = override_deployment_info(
  761. deployment_infos,
  762. self._build_app_task_info.config,
  763. deployment_to_serialized_autoscaling_policy_def,
  764. deployment_to_serialized_request_router_cls,
  765. )
  766. self._route_prefix = self._check_routes(overrided_infos)
  767. return (
  768. serialized_application_autoscaling_policy_def,
  769. overrided_infos,
  770. BuildAppStatus.SUCCEEDED,
  771. "",
  772. )
  773. except (TypeError, ValueError, RayServeException):
  774. return None, None, BuildAppStatus.FAILED, traceback.format_exc()
  775. except Exception:
  776. error_msg = (
  777. f"Unexpected error occurred while applying config for application "
  778. f"'{self._name}': \n{traceback.format_exc()}"
  779. )
  780. return None, None, BuildAppStatus.FAILED, error_msg
  781. def _check_routes(
  782. self, deployment_infos: Dict[str, DeploymentInfo]
  783. ) -> Tuple[str, str]:
  784. """Check route prefixes of deployments in app.
  785. There should only be one non-null route prefix. If there is one,
  786. set it as the application route prefix. This function must be
  787. run every control loop iteration because the target config could
  788. be updated without kicking off a new task.
  789. Returns: route prefix.
  790. Raises: RayServeException if more than one route prefix is found among deployments.
  791. """
  792. num_route_prefixes = 0
  793. route_prefix = None
  794. for info in deployment_infos.values():
  795. # Update route prefix of application, which may be updated
  796. # through a redeployed config.
  797. if info.route_prefix is not None:
  798. route_prefix = info.route_prefix
  799. num_route_prefixes += 1
  800. if num_route_prefixes > 1:
  801. raise RayServeException(
  802. f'Found multiple route prefixes from application "{self._name}",'
  803. " Please specify only one route prefix for the application "
  804. "to avoid this issue."
  805. )
  806. return route_prefix
  807. def _reconcile_target_deployments(self) -> None:
  808. """Reconcile target deployments in application target state.
  809. Ensure each deployment is running on up-to-date info, and
  810. remove outdated deployments from the application.
  811. """
  812. target_state_changed = False
  813. # Set target state for each deployment
  814. for deployment_name, info in self._target_state.deployment_infos.items():
  815. deploy_info = deepcopy(info)
  816. # Apply the target capacity information to the deployment info.
  817. deploy_info.set_target_capacity(
  818. new_target_capacity=self._target_state.target_capacity,
  819. new_target_capacity_direction=(
  820. self._target_state.target_capacity_direction
  821. ),
  822. )
  823. # Apply the application logging config to the deployment logging config
  824. # if it is not set.
  825. if (
  826. self._target_state.config
  827. and self._target_state.config.logging_config
  828. and deploy_info.deployment_config.logging_config is None
  829. ):
  830. deploy_info.deployment_config.logging_config = (
  831. self._target_state.config.logging_config
  832. )
  833. target_state_changed = (
  834. self.apply_deployment_info(deployment_name, deploy_info)
  835. or target_state_changed
  836. )
  837. # Delete outdated deployments
  838. for deployment_name in self._get_live_deployments():
  839. if deployment_name not in self.target_deployments:
  840. target_state_changed = (
  841. self._delete_deployment(deployment_name) or target_state_changed
  842. )
  843. return target_state_changed
  844. def get_deployment_topology(self) -> Optional[DeploymentTopology]:
  845. """Get the deployment topology for this application.
  846. Returns:
  847. The deployment topology, or None if not yet built.
  848. """
  849. if not self.target_deployments:
  850. return None
  851. nodes = {}
  852. # Using target deployments because we wish to build best effort topology based on current state.
  853. for deployment_name in self.target_deployments:
  854. deployment_id = DeploymentID(name=deployment_name, app_name=self._name)
  855. # Get outbound deployment names from deployment state
  856. outbound_deployment = (
  857. self._deployment_state_manager.get_deployment_outbound_deployments(
  858. deployment_id
  859. )
  860. ) or []
  861. # Create node for this deployment
  862. node = DeploymentNode(
  863. name=deployment_name,
  864. app_name=self._name,
  865. outbound_deployments=[
  866. {"name": dep.name, "app_name": dep.app_name}
  867. for dep in outbound_deployment
  868. ],
  869. is_ingress=(deployment_name == self._ingress_deployment_name),
  870. )
  871. nodes[deployment_name] = node
  872. return DeploymentTopology(
  873. app_name=self._name,
  874. ingress_deployment=self._ingress_deployment_name,
  875. nodes=nodes,
  876. )
  877. def update(self) -> Tuple[bool, bool]:
  878. """Attempts to reconcile this application to match its target state.
  879. Updates the application status and status message based on the
  880. current state of the system.
  881. Returns:
  882. Whether the target state has changed.
  883. """
  884. target_state_changed = False
  885. # If the application is being deleted, ignore any build task results to
  886. # avoid flipping the state back to DEPLOYING/RUNNING.
  887. if not self._target_state.deleting:
  888. (
  889. serialized_application_autoscaling_policy_def,
  890. infos,
  891. task_status,
  892. msg,
  893. ) = self._reconcile_build_app_task()
  894. if task_status == BuildAppStatus.SUCCEEDED:
  895. target_state_changed = True
  896. self._set_target_state(
  897. deployment_infos=infos,
  898. code_version=self._build_app_task_info.code_version,
  899. api_type=self._target_state.api_type,
  900. target_config=self._build_app_task_info.config,
  901. target_capacity=self._build_app_task_info.target_capacity,
  902. target_capacity_direction=(
  903. self._build_app_task_info.target_capacity_direction
  904. ),
  905. external_scaler_enabled=self._target_state.external_scaler_enabled,
  906. serialized_application_autoscaling_policy_def=serialized_application_autoscaling_policy_def,
  907. )
  908. # Handling the case where the user turns off/turns on app-level autoscaling policy,
  909. # between app deployment.
  910. if (
  911. self._target_state.config is not None
  912. and self._target_state.config.autoscaling_policy is not None
  913. ):
  914. self._autoscaling_state_manager.register_application(
  915. self._name,
  916. AutoscalingPolicy(
  917. _serialized_policy_def=serialized_application_autoscaling_policy_def,
  918. **self._target_state.config.autoscaling_policy,
  919. ),
  920. )
  921. else:
  922. self._autoscaling_state_manager.deregister_application(self._name)
  923. elif task_status == BuildAppStatus.FAILED:
  924. self._update_status(ApplicationStatus.DEPLOY_FAILED, msg)
  925. # Only reconcile deployments when the build app task is finished. If
  926. # it's not finished, we don't know what the target list of deployments
  927. # is, so we don't perform any reconciliation.
  928. if self._target_state.deployment_infos is not None:
  929. target_state_changed = (
  930. self._reconcile_target_deployments() or target_state_changed
  931. )
  932. status, status_msg = self._determine_app_status()
  933. self._update_status(status, status_msg)
  934. # Check if app is ready to be deleted
  935. if self._target_state.deleting:
  936. return self.is_deleted(), target_state_changed
  937. return False, target_state_changed
  938. def get_checkpoint_data(self) -> ApplicationTargetState:
  939. return self._target_state
  940. def get_deployments_statuses(self) -> List[DeploymentStatusInfo]:
  941. """Return all deployment status information"""
  942. deployments = [
  943. DeploymentID(name=deployment, app_name=self._name)
  944. for deployment in self.target_deployments
  945. ]
  946. return self._deployment_state_manager.get_deployment_statuses(deployments)
  947. def get_application_status_info(self) -> ApplicationStatusInfo:
  948. """Return the application status information"""
  949. return ApplicationStatusInfo(
  950. self._status,
  951. message=self._status_msg,
  952. deployment_timestamp=self._deployment_timestamp,
  953. )
  954. def list_deployment_details(self) -> Dict[str, DeploymentDetails]:
  955. """Gets detailed info on all live deployments in this application.
  956. (Does not include deleted deployments.)
  957. Returns:
  958. A dictionary of deployment infos. The set of deployment info returned
  959. may not be the full list of deployments that are part of the application.
  960. This can happen when the application is still deploying and bringing up
  961. deployments, or when the application is deleting and some deployments have
  962. been deleted.
  963. """
  964. details = {
  965. deployment_name: self._deployment_state_manager.get_deployment_details(
  966. DeploymentID(name=deployment_name, app_name=self._name)
  967. )
  968. for deployment_name in self.target_deployments
  969. }
  970. return {k: v for k, v in details.items() if v is not None}
  971. def _update_status(self, status: ApplicationStatus, status_msg: str = "") -> None:
  972. if (
  973. status_msg
  974. and status
  975. in [
  976. ApplicationStatus.DEPLOY_FAILED,
  977. ApplicationStatus.UNHEALTHY,
  978. ]
  979. and status_msg != self._status_msg
  980. ):
  981. logger.error(status_msg)
  982. self._status = status
  983. self._status_msg = status_msg
  984. class ApplicationStateManager:
  985. def __init__(
  986. self,
  987. deployment_state_manager: DeploymentStateManager,
  988. autoscaling_state_manager: AutoscalingStateManager,
  989. endpoint_state: EndpointState,
  990. kv_store: KVStoreBase,
  991. logging_config: LoggingConfig,
  992. ):
  993. self._deployment_state_manager = deployment_state_manager
  994. self._autoscaling_state_manager = autoscaling_state_manager
  995. self._endpoint_state = endpoint_state
  996. self._kv_store = kv_store
  997. self._logging_config = logging_config
  998. self._shutting_down = False
  999. self._application_states: Dict[str, ApplicationState] = {}
  1000. # Metric for tracking application status
  1001. self._application_status_gauge = ray_metrics.Gauge(
  1002. "serve_application_status",
  1003. description=(
  1004. "Numeric status of application. "
  1005. "0=UNKNOWN, 1=DEPLOY_FAILED, 2=UNHEALTHY, 3=NOT_STARTED, "
  1006. "4=DELETING, 5=DEPLOYING, 6=RUNNING."
  1007. ),
  1008. tag_keys=("application",),
  1009. )
  1010. self._recover_from_checkpoint()
  1011. def _recover_from_checkpoint(self):
  1012. checkpoint = self._kv_store.get(CHECKPOINT_KEY)
  1013. if checkpoint is not None:
  1014. application_state_info = cloudpickle.loads(checkpoint)
  1015. for app_name, checkpoint_data in application_state_info.items():
  1016. app_state = ApplicationState(
  1017. app_name,
  1018. self._deployment_state_manager,
  1019. self._autoscaling_state_manager,
  1020. self._endpoint_state,
  1021. self._logging_config,
  1022. checkpoint_data.external_scaler_enabled,
  1023. )
  1024. app_state.recover_target_state_from_checkpoint(checkpoint_data)
  1025. self._application_states[app_name] = app_state
  1026. def delete_app(self, name: str) -> None:
  1027. """Delete application by name"""
  1028. if name not in self._application_states:
  1029. return
  1030. self._application_states[name].delete()
  1031. def deploy_apps(
  1032. self,
  1033. name_to_deployment_args: Dict[str, List[Dict]],
  1034. name_to_application_args: Dict[str, ApplicationArgsProto],
  1035. ) -> None:
  1036. live_route_prefixes: Dict[str, str] = {
  1037. app_state.route_prefix: app_name
  1038. for app_name, app_state in self._application_states.items()
  1039. if app_state.route_prefix is not None
  1040. and not app_state.status == ApplicationStatus.DELETING
  1041. }
  1042. for name, deployment_args in name_to_deployment_args.items():
  1043. for deploy_param in deployment_args:
  1044. # Make sure route_prefix is not being used by other application.
  1045. deploy_app_prefix = deploy_param.get("route_prefix")
  1046. if deploy_app_prefix is None:
  1047. continue
  1048. existing_app_name = live_route_prefixes.get(deploy_app_prefix)
  1049. # It's ok to redeploy an app with the same prefix
  1050. # if it has the same name as the app already using that prefix.
  1051. if existing_app_name is not None and existing_app_name != name:
  1052. raise RayServeException(
  1053. f"Prefix {deploy_app_prefix} is being used by application "
  1054. f'"{existing_app_name}". Failed to deploy application "{name}".'
  1055. )
  1056. # We might be deploying more than one app,
  1057. # so we need to add this app's prefix to the
  1058. # set of live route prefixes that we're checking
  1059. # against during this batch operation.
  1060. live_route_prefixes[deploy_app_prefix] = name
  1061. application_args = name_to_application_args.get(name)
  1062. external_scaler_enabled = application_args.external_scaler_enabled
  1063. if name not in self._application_states:
  1064. self._application_states[name] = ApplicationState(
  1065. name,
  1066. self._deployment_state_manager,
  1067. self._autoscaling_state_manager,
  1068. self._endpoint_state,
  1069. self._logging_config,
  1070. external_scaler_enabled,
  1071. )
  1072. ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))
  1073. deployment_infos = {
  1074. params["deployment_name"]: deploy_args_to_deployment_info(
  1075. **params, app_name=name
  1076. )
  1077. for params in deployment_args
  1078. }
  1079. self._application_states[name].deploy_app(
  1080. deployment_infos, external_scaler_enabled
  1081. )
  1082. def deploy_app(
  1083. self,
  1084. name: str,
  1085. deployment_args: List[Dict],
  1086. application_args: ApplicationArgsProto,
  1087. ) -> None:
  1088. """Deploy the specified app to the list of deployment arguments.
  1089. This function should only be called if the app is being deployed
  1090. through serve.run instead of from a config.
  1091. Args:
  1092. name: application name
  1093. deployment_args_list: arguments for deploying a list of deployments.
  1094. application_args: application arguments.
  1095. """
  1096. self.deploy_apps({name: deployment_args}, {name: application_args})
  1097. def apply_app_configs(
  1098. self,
  1099. app_configs: List[ServeApplicationSchema],
  1100. *,
  1101. deployment_time: float = 0,
  1102. target_capacity: Optional[float] = None,
  1103. target_capacity_direction: Optional[TargetCapacityDirection] = None,
  1104. ):
  1105. """Declaratively apply the list of application configs.
  1106. The applications will be reconciled to match the target state of the config.
  1107. Any applications previously deployed declaratively that are *not* present in
  1108. the list will be deleted.
  1109. """
  1110. for app_config in app_configs:
  1111. if app_config.name not in self._application_states:
  1112. logger.info(f"Deploying new app '{app_config.name}'.")
  1113. self._application_states[app_config.name] = ApplicationState(
  1114. app_config.name,
  1115. self._deployment_state_manager,
  1116. self._autoscaling_state_manager,
  1117. endpoint_state=self._endpoint_state,
  1118. logging_config=self._logging_config,
  1119. external_scaler_enabled=app_config.external_scaler_enabled,
  1120. )
  1121. self._application_states[app_config.name].apply_app_config(
  1122. app_config,
  1123. target_capacity,
  1124. target_capacity_direction,
  1125. deployment_time=deployment_time,
  1126. )
  1127. # Delete all apps that were previously deployed via the declarative API
  1128. # but are not in the config being applied.
  1129. existing_apps = {
  1130. name
  1131. for name, app_state in self._application_states.items()
  1132. if app_state.api_type == APIType.DECLARATIVE
  1133. }
  1134. apps_in_config = {app_config.name for app_config in app_configs}
  1135. for app_to_delete in existing_apps - apps_in_config:
  1136. self.delete_app(app_to_delete)
  1137. ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))
  1138. def get_deployments(self, app_name: str) -> List[str]:
  1139. """Return all deployment names by app name"""
  1140. if app_name not in self._application_states:
  1141. return []
  1142. return self._application_states[app_name].target_deployments
  1143. def get_deployments_statuses(self, app_name: str) -> List[DeploymentStatusInfo]:
  1144. """Return all deployment statuses by app name"""
  1145. if app_name not in self._application_states:
  1146. return []
  1147. return self._application_states[app_name].get_deployments_statuses()
  1148. def get_app_status(self, name: str) -> ApplicationStatus:
  1149. if name not in self._application_states:
  1150. return ApplicationStatus.NOT_STARTED
  1151. return self._application_states[name].status
  1152. def does_app_exist(self, name: str) -> bool:
  1153. return name in self._application_states
  1154. def get_app_status_info(self, name: str) -> ApplicationStatusInfo:
  1155. if name not in self._application_states:
  1156. return ApplicationStatusInfo(
  1157. ApplicationStatus.NOT_STARTED,
  1158. message=f"Application {name} doesn't exist",
  1159. deployment_timestamp=0,
  1160. )
  1161. return self._application_states[name].get_application_status_info()
  1162. def get_docs_path(self, app_name: str) -> Optional[str]:
  1163. return self._application_states[app_name].docs_path
  1164. def get_route_prefix(self, name: str) -> Optional[str]:
  1165. return self._application_states[name].route_prefix
  1166. def get_ingress_deployment_name(self, name: str) -> Optional[str]:
  1167. if name not in self._application_states:
  1168. return None
  1169. return self._application_states[name].ingress_deployment
  1170. def get_app_source(self, name: str) -> APIType:
  1171. return self._application_states[name].api_type
  1172. def get_external_scaler_enabled(self, app_name: str) -> bool:
  1173. """Check if external scaler is enabled for the application.
  1174. Args:
  1175. app_name: Name of the application.
  1176. Returns:
  1177. True if external_scaler_enabled is set for the application, False otherwise.
  1178. """
  1179. return (
  1180. self.does_app_exist(app_name)
  1181. and self._application_states[app_name].external_scaler_enabled
  1182. )
  1183. def list_app_statuses(
  1184. self, source: Optional[APIType] = None
  1185. ) -> Dict[str, ApplicationStatusInfo]:
  1186. """Return a dictionary with {app name: application info}
  1187. Args:
  1188. source: Optional API type filter. If provided, only returns apps
  1189. deployed via the specified API type.
  1190. Returns:
  1191. Dict[str, ApplicationStatusInfo]: A dictionary mapping application names
  1192. to their corresponding status information.
  1193. """
  1194. if source is None:
  1195. return {
  1196. name: self._application_states[name].get_application_status_info()
  1197. for name in self._application_states
  1198. }
  1199. else:
  1200. return {
  1201. name: self._application_states[name].get_application_status_info()
  1202. for name in self._application_states
  1203. if self.get_app_source(name) is source
  1204. }
  1205. def list_app_names(self) -> List[str]:
  1206. """Return app names without instantiating status objects."""
  1207. return list(self._application_states.keys())
  1208. def list_deployment_details(self, name: str) -> Dict[str, DeploymentDetails]:
  1209. """Gets detailed info on all deployments in specified application."""
  1210. if name not in self._application_states:
  1211. return {}
  1212. return self._application_states[name].list_deployment_details()
  1213. def get_deployment_topology(self, app_name: str) -> Optional[DeploymentTopology]:
  1214. """Get the deployment topology for an application.
  1215. Args:
  1216. app_name: Name of the application.
  1217. Returns:
  1218. The deployment topology for the application, or None if the application
  1219. doesn't exist or the topology hasn't been built yet.
  1220. """
  1221. if app_name not in self._application_states:
  1222. return None
  1223. return self._application_states[app_name].get_deployment_topology()
  1224. def update(self) -> bool:
  1225. """
  1226. Update each application state.
  1227. Returns:
  1228. bool: True if any application's target state changed during this update.
  1229. """
  1230. apps_to_be_deleted = []
  1231. any_target_state_changed = False
  1232. for name, app in self._application_states.items():
  1233. if app.should_autoscale():
  1234. any_target_state_changed = app.autoscale() or any_target_state_changed
  1235. ready_to_be_deleted, app_target_state_changed = app.update()
  1236. any_target_state_changed = (
  1237. any_target_state_changed or app_target_state_changed
  1238. )
  1239. if ready_to_be_deleted:
  1240. apps_to_be_deleted.append(name)
  1241. logger.debug(f"Application '{name}' deleted successfully.")
  1242. # Record application status metrics
  1243. for name, app in self._application_states.items():
  1244. self._application_status_gauge.set(
  1245. app.status.to_numeric(),
  1246. tags={"application": name},
  1247. )
  1248. if len(apps_to_be_deleted) > 0:
  1249. for app_name in apps_to_be_deleted:
  1250. self._autoscaling_state_manager.deregister_application(app_name)
  1251. del self._application_states[app_name]
  1252. ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))
  1253. if any_target_state_changed:
  1254. self.save_checkpoint()
  1255. self._deployment_state_manager.save_checkpoint()
  1256. return any_target_state_changed
  1257. def shutdown(self) -> None:
  1258. self._shutting_down = True
  1259. for app_state in self._application_states.values():
  1260. app_state.delete()
  1261. self._kv_store.delete(CHECKPOINT_KEY)
  1262. def is_ready_for_shutdown(self) -> bool:
  1263. """Return whether all applications have shut down.
  1264. Iterate through all application states and check if all their applications
  1265. are deleted.
  1266. """
  1267. return self._shutting_down and all(
  1268. app_state.is_deleted() for app_state in self._application_states.values()
  1269. )
  1270. def save_checkpoint(self) -> None:
  1271. """Write a checkpoint of all application states."""
  1272. if self._shutting_down:
  1273. # Once we're told to shut down, stop writing checkpoints.
  1274. # Calling .shutdown() deletes any existing checkpoint.
  1275. return
  1276. application_state_info = {
  1277. app_name: app_state.get_checkpoint_data()
  1278. for app_name, app_state in self._application_states.items()
  1279. }
  1280. self._kv_store.put(
  1281. CHECKPOINT_KEY,
  1282. cloudpickle.dumps(application_state_info),
  1283. )
  1284. @ray.remote(num_cpus=0, max_calls=1)
  1285. def build_serve_application(
  1286. import_path: str,
  1287. code_version: str,
  1288. name: str,
  1289. args: Dict,
  1290. logging_config: LoggingConfig,
  1291. application_autoscaling_policy_function: Optional[str],
  1292. deployment_to_autoscaling_policy_function: Dict[str, str],
  1293. deployment_to_request_router_cls: Dict[str, str],
  1294. ) -> Tuple[Optional[bytes], Optional[List[Dict]], Optional[str]]:
  1295. """Import and build a Serve application.
  1296. Args:
  1297. import_path: import path to top-level bound deployment.
  1298. code_version: code version inferred from app config. All
  1299. deployment versions are set to this code version.
  1300. name: application name. If specified, application will be deployed
  1301. without removing existing applications.
  1302. args: Arguments to be passed to the application builder.
  1303. logging_config: the logging config for the build app task.
  1304. application_autoscaling_policy_function: the application autoscaling policy function name
  1305. deployment_to_autoscaling_policy_function: a dictionary mapping deployment names to autoscaling policy function names
  1306. deployment_to_request_router_cls: a dictionary mapping deployment names to request router class names
  1307. Returns:
  1308. Serialized application autoscaling policy def: a serialized autoscaling
  1309. policy def for the application if it was built successfully, otherwise None.
  1310. Deploy arguments: a list of deployment arguments if application
  1311. was built successfully, otherwise None.
  1312. Error message: a string if an error was raised, otherwise None.
  1313. """
  1314. configure_component_logger(
  1315. component_name="controller",
  1316. component_id=f"build_{name}_{os.getpid()}",
  1317. logging_config=logging_config,
  1318. )
  1319. try:
  1320. from ray.serve._private.api import call_user_app_builder_with_args_if_necessary
  1321. # Import and build the application.
  1322. args_info_str = f" with arguments {args}" if args else ""
  1323. logger.info(f"Importing application '{name}'{args_info_str}.")
  1324. app = call_user_app_builder_with_args_if_necessary(
  1325. import_attr(import_path), args
  1326. )
  1327. deploy_args_list = []
  1328. built_app: BuiltApplication = build_app(
  1329. app,
  1330. name=name,
  1331. default_runtime_env=ray.get_runtime_context().runtime_env,
  1332. )
  1333. num_ingress_deployments = 0
  1334. def _get_serialized_def(attr_path: str) -> bytes:
  1335. module, attr = import_module_and_attr(attr_path)
  1336. cloudpickle.register_pickle_by_value(module)
  1337. serialized = cloudpickle.dumps(attr)
  1338. cloudpickle.unregister_pickle_by_value(module)
  1339. return serialized
  1340. application_serialized_autoscaling_policy_def = None
  1341. if application_autoscaling_policy_function is not None:
  1342. application_serialized_autoscaling_policy_def = _get_serialized_def(
  1343. application_autoscaling_policy_function
  1344. )
  1345. for deployment in built_app.deployments:
  1346. if inspect.isclass(deployment.func_or_class) and issubclass(
  1347. deployment.func_or_class, ASGIAppReplicaWrapper
  1348. ):
  1349. num_ingress_deployments += 1
  1350. is_ingress = deployment.name == built_app.ingress_deployment_name
  1351. deployment_to_serialized_autoscaling_policy_def = None
  1352. deployment_to_serialized_request_router_cls = None
  1353. if deployment.name in deployment_to_autoscaling_policy_function:
  1354. deployment_to_serialized_autoscaling_policy_def = _get_serialized_def(
  1355. deployment_to_autoscaling_policy_function[deployment.name]
  1356. )
  1357. if deployment.name in deployment_to_request_router_cls:
  1358. deployment_to_serialized_request_router_cls = _get_serialized_def(
  1359. deployment_to_request_router_cls[deployment.name]
  1360. )
  1361. deploy_args_list.append(
  1362. get_deploy_args(
  1363. name=deployment._name,
  1364. replica_config=deployment._replica_config,
  1365. ingress=is_ingress,
  1366. deployment_config=deployment._deployment_config,
  1367. version=code_version,
  1368. route_prefix="/" if is_ingress else None,
  1369. serialized_autoscaling_policy_def=deployment_to_serialized_autoscaling_policy_def,
  1370. serialized_request_router_cls=deployment_to_serialized_request_router_cls,
  1371. )
  1372. )
  1373. if num_ingress_deployments > 1:
  1374. return (
  1375. None,
  1376. None,
  1377. (
  1378. f'Found multiple FastAPI deployments in application "{built_app.name}". '
  1379. "Please only include one deployment with @serve.ingress "
  1380. "in your application to avoid this issue."
  1381. ),
  1382. )
  1383. return application_serialized_autoscaling_policy_def, deploy_args_list, None
  1384. except KeyboardInterrupt:
  1385. # Error is raised when this task is canceled with ray.cancel(), which
  1386. # happens when deploy_apps() is called.
  1387. logger.info(
  1388. "Existing config deployment request terminated because of keyboard "
  1389. "interrupt."
  1390. )
  1391. return None, None, None
  1392. except Exception:
  1393. logger.error(
  1394. f"Exception importing application '{name}'.\n{traceback.format_exc()}"
  1395. )
  1396. return None, None, traceback.format_exc()
  1397. def override_deployment_info(
  1398. deployment_infos: Dict[str, DeploymentInfo],
  1399. override_config: Optional[ServeApplicationSchema],
  1400. deployment_to_serialized_autoscaling_policy_def: Optional[Dict[str, bytes]] = None,
  1401. deployment_to_serialized_request_router_cls: Optional[Dict[str, bytes]] = None,
  1402. ) -> Dict[str, DeploymentInfo]:
  1403. """Override deployment infos with options from app config.
  1404. Args:
  1405. app_name: application name
  1406. deployment_infos: deployment info loaded from code
  1407. override_config: application config deployed by user with
  1408. options to override those loaded from code.
  1409. deployment_to_serialized_autoscaling_policy_def: serialized autoscaling policy def for each deployment
  1410. deployment_to_serialized_request_router_cls: serialized request router cls for each deployment
  1411. Returns: the updated deployment infos.
  1412. Raises:
  1413. ValueError: If config options have invalid values.
  1414. TypeError: If config options have invalid types.
  1415. """
  1416. deployment_infos = deepcopy(deployment_infos)
  1417. if override_config is None:
  1418. return deployment_infos
  1419. config_dict = override_config.dict(exclude_unset=True)
  1420. deployment_override_options = config_dict.get("deployments", [])
  1421. # Override options for each deployment listed in the config.
  1422. for options in deployment_override_options:
  1423. if "max_ongoing_requests" in options:
  1424. options["max_ongoing_requests"] = options.get("max_ongoing_requests")
  1425. deployment_name = options["name"]
  1426. if deployment_name not in deployment_infos:
  1427. raise ValueError(
  1428. f"Deployment '{deployment_name}' does not exist. "
  1429. f"Available: {list(deployment_infos.keys())}"
  1430. )
  1431. info = deployment_infos[deployment_name]
  1432. original_options = info.deployment_config.dict()
  1433. original_options["user_configured_option_names"].update(set(options))
  1434. # Override `max_ongoing_requests` and `autoscaling_config` if
  1435. # `num_replicas="auto"`
  1436. if options.get("num_replicas") == "auto":
  1437. options["num_replicas"] = None
  1438. new_config = AutoscalingConfig.default().dict()
  1439. # If `autoscaling_config` is specified, its values override
  1440. # the default `num_replicas="auto"` configuration
  1441. autoscaling_config = (
  1442. options.get("autoscaling_config")
  1443. or info.deployment_config.autoscaling_config
  1444. )
  1445. if autoscaling_config:
  1446. new_config.update(autoscaling_config)
  1447. if (
  1448. deployment_to_serialized_autoscaling_policy_def
  1449. and deployment_name in deployment_to_serialized_autoscaling_policy_def
  1450. ):
  1451. # By setting the serialized policy def, AutoscalingConfig constructor will not
  1452. # try to import the policy from the string import path
  1453. policy_obj = AutoscalingPolicy.from_serialized_policy_def(
  1454. new_config["policy"],
  1455. deployment_to_serialized_autoscaling_policy_def[deployment_name],
  1456. )
  1457. new_config["policy"] = policy_obj
  1458. options["autoscaling_config"] = AutoscalingConfig(**new_config)
  1459. ServeUsageTag.AUTO_NUM_REPLICAS_USED.record("1")
  1460. # What to pass to info.update
  1461. override_options = {}
  1462. # Merge app-level and deployment-level runtime_envs.
  1463. replica_config = info.replica_config
  1464. app_runtime_env = override_config.runtime_env
  1465. if "ray_actor_options" in options:
  1466. # If specified, get ray_actor_options from config
  1467. override_actor_options = options.pop("ray_actor_options", {})
  1468. else:
  1469. # Otherwise, get options from application code (and default to {}
  1470. # if the code sets options to None).
  1471. override_actor_options = replica_config.ray_actor_options or {}
  1472. override_placement_group_bundles = options.pop(
  1473. "placement_group_bundles", replica_config.placement_group_bundles
  1474. )
  1475. override_placement_group_strategy = options.pop(
  1476. "placement_group_strategy", replica_config.placement_group_strategy
  1477. )
  1478. override_max_replicas_per_node = options.pop(
  1479. "max_replicas_per_node", replica_config.max_replicas_per_node
  1480. )
  1481. override_bundle_label_selector = options.pop(
  1482. "placement_group_bundle_label_selector",
  1483. replica_config.placement_group_bundle_label_selector,
  1484. )
  1485. override_fallback_strategy = options.pop(
  1486. "placement_group_fallback_strategy",
  1487. replica_config.placement_group_fallback_strategy,
  1488. )
  1489. # Record telemetry for container runtime env feature at deployment level
  1490. if override_actor_options.get("runtime_env") and (
  1491. override_actor_options["runtime_env"].get("container")
  1492. or override_actor_options["runtime_env"].get("image_uri")
  1493. ):
  1494. ServeUsageTag.DEPLOYMENT_CONTAINER_RUNTIME_ENV_USED.record("1")
  1495. merged_env = override_runtime_envs_except_env_vars(
  1496. app_runtime_env, override_actor_options.get("runtime_env", {})
  1497. )
  1498. override_actor_options.update({"runtime_env": merged_env})
  1499. replica_config.update(
  1500. ray_actor_options=override_actor_options,
  1501. placement_group_bundles=override_placement_group_bundles,
  1502. placement_group_strategy=override_placement_group_strategy,
  1503. max_replicas_per_node=override_max_replicas_per_node,
  1504. placement_group_bundle_label_selector=override_bundle_label_selector,
  1505. placement_group_fallback_strategy=override_fallback_strategy,
  1506. )
  1507. override_options["replica_config"] = replica_config
  1508. if "request_router_config" in options:
  1509. request_router_config = options.get("request_router_config")
  1510. if request_router_config:
  1511. if (
  1512. deployment_to_serialized_request_router_cls
  1513. and deployment_name in deployment_to_serialized_request_router_cls
  1514. ):
  1515. # By setting the serialized request router cls, RequestRouterConfig constructor will not
  1516. # try to import the request router cls from the string import path
  1517. options[
  1518. "request_router_config"
  1519. ] = RequestRouterConfig.from_serialized_request_router_cls(
  1520. request_router_config,
  1521. deployment_to_serialized_request_router_cls[deployment_name],
  1522. )
  1523. else:
  1524. options["request_router_config"] = RequestRouterConfig(
  1525. **request_router_config
  1526. )
  1527. # Override deployment config options
  1528. options.pop("name", None)
  1529. original_options.update(options)
  1530. override_options["deployment_config"] = DeploymentConfig(**original_options)
  1531. deployment_infos[deployment_name] = info.update(**override_options)
  1532. deployment_config = deployment_infos[deployment_name].deployment_config
  1533. if (
  1534. deployment_config.autoscaling_config is not None
  1535. and deployment_config.max_ongoing_requests
  1536. < deployment_config.autoscaling_config.get_target_ongoing_requests()
  1537. ):
  1538. logger.warning(
  1539. "Autoscaling will never happen, "
  1540. "because 'max_ongoing_requests' is less than "
  1541. "'target_ongoing_requests' now."
  1542. )
  1543. # Overwrite ingress route prefix
  1544. app_route_prefix = config_dict.get("route_prefix", DEFAULT.VALUE)
  1545. validate_route_prefix(app_route_prefix)
  1546. for deployment in list(deployment_infos.values()):
  1547. if (
  1548. app_route_prefix is not DEFAULT.VALUE
  1549. and deployment.route_prefix is not None
  1550. ):
  1551. deployment.route_prefix = app_route_prefix
  1552. return deployment_infos