| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789 |
- import inspect
- import json
- import logging
- import os
- import time
- import traceback
- from copy import deepcopy
- from dataclasses import asdict, dataclass, field
- from enum import Enum
- from typing import Dict, List, Optional, Tuple
- import ray
- from ray import cloudpickle
- from ray._common.utils import import_attr, import_module_and_attr
- from ray.exceptions import RuntimeEnvSetupError
- from ray.serve._private.autoscaling_state import AutoscalingStateManager
- from ray.serve._private.build_app import BuiltApplication, build_app
- from ray.serve._private.common import (
- DeploymentID,
- DeploymentStatus,
- DeploymentStatusInfo,
- DeploymentStatusTrigger,
- EndpointInfo,
- TargetCapacityDirection,
- )
- from ray.serve._private.config import DeploymentConfig
- from ray.serve._private.constants import (
- DEFAULT_AUTOSCALING_POLICY_NAME,
- DEFAULT_REQUEST_ROUTER_PATH,
- RAY_SERVE_ENABLE_TASK_EVENTS,
- SERVE_LOGGER_NAME,
- )
- from ray.serve._private.deploy_utils import (
- deploy_args_to_deployment_info,
- get_app_code_version,
- get_deploy_args,
- )
- from ray.serve._private.deployment_info import DeploymentInfo
- from ray.serve._private.deployment_state import DeploymentStateManager
- from ray.serve._private.endpoint_state import EndpointState
- from ray.serve._private.logging_utils import configure_component_logger
- from ray.serve._private.storage.kv_store import KVStoreBase
- from ray.serve._private.usage import ServeUsageTag
- from ray.serve._private.utils import (
- DEFAULT,
- check_obj_ref_ready_nowait,
- override_runtime_envs_except_env_vars,
- validate_route_prefix,
- )
- from ray.serve.api import ASGIAppReplicaWrapper
- from ray.serve.config import AutoscalingConfig, AutoscalingPolicy, RequestRouterConfig
- from ray.serve.exceptions import RayServeException
- from ray.serve.generated.serve_pb2 import (
- ApplicationArgs as ApplicationArgsProto,
- ApplicationStatus as ApplicationStatusProto,
- ApplicationStatusInfo as ApplicationStatusInfoProto,
- DeploymentLanguage,
- DeploymentStatusInfoList as DeploymentStatusInfoListProto,
- StatusOverview as StatusOverviewProto,
- )
- from ray.serve.schema import (
- APIType,
- ApplicationStatus,
- DeploymentDetails,
- DeploymentNode,
- DeploymentTopology,
- LoggingConfig,
- ServeApplicationSchema,
- )
- from ray.types import ObjectRef
- from ray.util import metrics as ray_metrics
- logger = logging.getLogger(SERVE_LOGGER_NAME)
- CHECKPOINT_KEY = "serve-application-state-checkpoint"
- class BuildAppStatus(Enum):
- """Status of the build application task."""
- NO_TASK_IN_PROGRESS = 1
- IN_PROGRESS = 2
- SUCCEEDED = 3
- FAILED = 4
- @dataclass
- class BuildAppTaskInfo:
- """Stores info on the current in-progress build app task.
- We use a class instead of only storing the task object ref because
- when a new config is deployed, there can be an outdated in-progress
- build app task. We attach the code version to the task info to
- distinguish outdated build app tasks.
- """
- obj_ref: ObjectRef
- code_version: str
- config: ServeApplicationSchema
- target_capacity: Optional[float]
- target_capacity_direction: Optional[TargetCapacityDirection]
- finished: bool
- @dataclass(eq=True)
- class ApplicationStatusInfo:
- status: ApplicationStatus
- message: str = ""
- deployment_timestamp: float = 0
- def debug_string(self):
- return json.dumps(asdict(self), indent=4)
- def to_proto(self):
- return ApplicationStatusInfoProto(
- status=f"APPLICATION_STATUS_{self.status.name}",
- message=self.message,
- deployment_timestamp=self.deployment_timestamp,
- )
- @classmethod
- def from_proto(cls, proto: ApplicationStatusInfoProto):
- status = ApplicationStatusProto.Name(proto.status)[len("APPLICATION_STATUS_") :]
- return cls(
- status=ApplicationStatus(status),
- message=proto.message,
- deployment_timestamp=proto.deployment_timestamp,
- )
- @dataclass(eq=True)
- class StatusOverview:
- app_status: ApplicationStatusInfo
- name: str = ""
- deployment_statuses: List[DeploymentStatusInfo] = field(default_factory=list)
- def debug_string(self):
- return json.dumps(asdict(self), indent=4)
- def get_deployment_status(self, name: str) -> Optional[DeploymentStatusInfo]:
- """Get a deployment's status by name.
- Args:
- name: Deployment's name.
- Returns:
- Optional[DeploymentStatusInfo]: The status of the deployment if it exists,
- otherwise None.
- """
- for deployment_status in self.deployment_statuses:
- if name == deployment_status.name:
- return deployment_status
- return None
- def to_proto(self):
- # Create a protobuf for the Serve Application info
- app_status_proto = self.app_status.to_proto()
- # Create protobufs for all individual deployment statuses
- deployment_status_protos = map(
- lambda status: status.to_proto(), self.deployment_statuses
- )
- # Create a protobuf list containing all the deployment status protobufs
- deployment_status_proto_list = DeploymentStatusInfoListProto()
- deployment_status_proto_list.deployment_status_infos.extend(
- deployment_status_protos
- )
- # Return protobuf encapsulating application and deployment protos
- return StatusOverviewProto(
- name=self.name,
- app_status=app_status_proto,
- deployment_statuses=deployment_status_proto_list,
- )
- @classmethod
- def from_proto(cls, proto: StatusOverviewProto) -> "StatusOverview":
- # Recreate Serve Application info
- app_status = ApplicationStatusInfo.from_proto(proto.app_status)
- # Recreate deployment statuses
- deployment_statuses = []
- for info_proto in proto.deployment_statuses.deployment_status_infos:
- deployment_statuses.append(DeploymentStatusInfo.from_proto(info_proto))
- # Recreate StatusInfo
- return cls(
- app_status=app_status,
- deployment_statuses=deployment_statuses,
- name=proto.name,
- )
- @dataclass
- class ApplicationTargetState:
- """Defines target state of application.
- Target state can become inconsistent if the code version doesn't
- match that of the config. When that happens, a new build app task
- should be kicked off to reconcile the inconsistency.
- deployment_infos: map of deployment name to deployment info. This is
- - None if a config was deployed but the app hasn't finished
- building yet,
- - An empty dict if the app is deleting.
- code_version: Code version of all deployments in target state. None
- if application was deployed through serve.run.
- config: application config deployed by user. None if application was
- deployed through serve.run.
- target_capacity: the target_capacity to use when adjusting num_replicas.
- target_capacity_direction: the scale direction to use when
- running the Serve autoscaler.
- deleting: whether the application is being deleted.
- external_scaler_enabled: whether external autoscaling is enabled for
- this application.
- serialized_application_autoscaling_policy_def: Optional[bytes]
- """
- deployment_infos: Optional[Dict[str, DeploymentInfo]]
- code_version: Optional[str]
- config: Optional[ServeApplicationSchema]
- target_capacity: Optional[float]
- target_capacity_direction: Optional[TargetCapacityDirection]
- deleting: bool
- api_type: APIType
- serialized_application_autoscaling_policy_def: Optional[bytes]
- external_scaler_enabled: bool
- class ApplicationState:
- """Manage single application states with all operations"""
- def __init__(
- self,
- name: str,
- deployment_state_manager: DeploymentStateManager,
- autoscaling_state_manager: AutoscalingStateManager,
- endpoint_state: EndpointState,
- logging_config: LoggingConfig,
- external_scaler_enabled: bool,
- ):
- """
- Initialize an ApplicationState instance.
- Args:
- name: Application name.
- deployment_state_manager: Manages the state of all deployments in the cluster.
- autoscaling_state_manager: Manages autoscaling decisions in the cluster.
- endpoint_state: Manages endpoints in the system.
- logging_config: Logging configuration schema.
- external_scaler_enabled: Whether external autoscaling is enabled for
- this application.
- """
- self._name = name
- self._status_msg = ""
- self._deployment_state_manager = deployment_state_manager
- self._autoscaling_state_manager = autoscaling_state_manager
- self._endpoint_state = endpoint_state
- self._route_prefix: Optional[str] = None
- self._ingress_deployment_name: Optional[str] = None
- self._status: ApplicationStatus = ApplicationStatus.DEPLOYING
- self._deployment_timestamp = time.time()
- self._build_app_task_info: Optional[BuildAppTaskInfo] = None
- # Before a deploy app task finishes, we don't know what the
- # target deployments are, so set deployment_infos=None
- self._target_state: ApplicationTargetState = ApplicationTargetState(
- deployment_infos=None,
- code_version=None,
- config=None,
- target_capacity=None,
- target_capacity_direction=None,
- deleting=False,
- api_type=APIType.UNKNOWN,
- external_scaler_enabled=external_scaler_enabled,
- serialized_application_autoscaling_policy_def=None,
- )
- self._logging_config = logging_config
- @property
- def route_prefix(self) -> Optional[str]:
- return self._route_prefix
- @property
- def external_scaler_enabled(self) -> bool:
- return self._target_state.external_scaler_enabled
- @property
- def docs_path(self) -> Optional[str]:
- # get the docs path from the running deployments
- # we are making an assumption that the docs path can only be set
- # on ingress deployments with fastapi.
- ingress_deployment = DeploymentID(self._ingress_deployment_name, self._name)
- return self._deployment_state_manager.get_deployment_docs_path(
- ingress_deployment
- )
- @property
- def status(self) -> ApplicationStatus:
- """Status of the application.
- DEPLOYING: The build task is still running, or the deployments
- have started deploying but aren't healthy yet.
- RUNNING: All deployments are healthy.
- DEPLOY_FAILED: The build task failed or one or more deployments
- became unhealthy in the process of deploying
- UNHEALTHY: While the application was running, one or more
- deployments transition from healthy to unhealthy.
- DELETING: Application and its deployments are being deleted.
- """
- return self._status
- @property
- def deployment_timestamp(self) -> float:
- return self._deployment_timestamp
- @property
- def target_deployments(self) -> List[str]:
- """List of target deployment names in application."""
- if self._target_state.deployment_infos is None:
- return []
- return list(self._target_state.deployment_infos.keys())
- @property
- def ingress_deployment(self) -> Optional[str]:
- return self._ingress_deployment_name
- @property
- def api_type(self) -> APIType:
- return self._target_state.api_type
- def recover_target_state_from_checkpoint(
- self, checkpoint_data: ApplicationTargetState
- ):
- logger.info(
- f"Recovering target state for application '{self._name}' from checkpoint."
- )
- self._set_target_state(
- checkpoint_data.deployment_infos,
- api_type=checkpoint_data.api_type,
- code_version=checkpoint_data.code_version,
- target_config=checkpoint_data.config,
- target_capacity=checkpoint_data.target_capacity,
- target_capacity_direction=checkpoint_data.target_capacity_direction,
- deleting=checkpoint_data.deleting,
- external_scaler_enabled=checkpoint_data.external_scaler_enabled,
- )
- # Restore route prefix and docs path from checkpointed deployments when
- # the imperatively started application is restarting with controller.
- if checkpoint_data.deployment_infos is not None:
- self._route_prefix = self._check_routes(checkpoint_data.deployment_infos)
- # Restore app-level autoscaling policy from checkpoint
- if (
- checkpoint_data.config
- and checkpoint_data.config.autoscaling_policy is not None
- ):
- self._autoscaling_state_manager.register_application(
- self._name,
- AutoscalingPolicy(
- _serialized_policy_def=checkpoint_data.serialized_application_autoscaling_policy_def,
- **checkpoint_data.config.autoscaling_policy,
- ),
- )
- def _set_target_state(
- self,
- deployment_infos: Optional[Dict[str, DeploymentInfo]],
- *,
- api_type: APIType,
- code_version: Optional[str],
- target_config: Optional[ServeApplicationSchema],
- target_capacity: Optional[float] = None,
- target_capacity_direction: Optional[TargetCapacityDirection] = None,
- deleting: bool = False,
- external_scaler_enabled: bool = False,
- serialized_application_autoscaling_policy_def: Optional[bytes] = None,
- ):
- """Set application target state.
- While waiting for build task to finish, this should be
- (None, False)
- When build task has finished and during normal operation, this should be
- (target_deployments, False)
- When a request to delete the application has been received, this should be
- ({}, True)
- """
- if deleting:
- self._update_status(ApplicationStatus.DELETING)
- else:
- self._update_status(ApplicationStatus.DEPLOYING)
- if deployment_infos is None:
- self._ingress_deployment_name = None
- else:
- for name, info in deployment_infos.items():
- if info.ingress:
- self._ingress_deployment_name = name
- target_state = ApplicationTargetState(
- deployment_infos,
- code_version,
- target_config,
- target_capacity,
- target_capacity_direction,
- deleting,
- api_type=api_type,
- external_scaler_enabled=external_scaler_enabled,
- serialized_application_autoscaling_policy_def=serialized_application_autoscaling_policy_def,
- )
- self._target_state = target_state
- def _set_target_state_deleting(self):
- """Set target state to deleting.
- Wipes the target deployment infos, code version, and config.
- """
- self._set_target_state(
- deployment_infos={},
- api_type=self._target_state.api_type,
- code_version=None,
- target_config=None,
- deleting=True,
- external_scaler_enabled=self.external_scaler_enabled,
- )
- def _clear_target_state_and_store_config(
- self,
- target_config: Optional[ServeApplicationSchema],
- ):
- """Clears the target state and stores the config.
- NOTE: this currently assumes that this method is *only* called when managing
- apps deployed with the declarative API.
- """
- self._set_target_state(
- deployment_infos=None,
- api_type=APIType.DECLARATIVE,
- code_version=None,
- target_config=target_config,
- deleting=False,
- external_scaler_enabled=target_config.external_scaler_enabled
- if target_config
- else False,
- )
- def _delete_deployment(self, name: str) -> bool:
- """Delete a deployment in the application.
- Args:
- name: The name of the deployment to delete.
- Returns:
- Whether the target state has changed.
- """
- id = DeploymentID(name=name, app_name=self._name)
- self._endpoint_state.delete_endpoint(id)
- return self._deployment_state_manager.delete_deployment(id)
- def delete(self):
- """Delete the application"""
- if self._status != ApplicationStatus.DELETING:
- logger.info(
- f"Deleting app '{self._name}'.",
- extra={"log_to_stderr": False},
- )
- self._set_target_state_deleting()
- def is_deleted(self) -> bool:
- """Check whether the application is already deleted.
- For an application to be considered deleted, the target state has to be set to
- deleting and all deployments have to be deleted.
- """
- return self._target_state.deleting and len(self._get_live_deployments()) == 0
- def should_autoscale(self) -> bool:
- """Determine if autoscaling is enabled for the application.
- Returns:
- Autoscaling is enabled for the application if any of the deployments have autoscaling enabled.
- """
- return self._autoscaling_state_manager.should_autoscale_application(self._name)
- def autoscale(self) -> bool:
- """
- Apply the autoscaling decisions for the application.
- If the application has deployment-level autoscaling, it will apply the autoscaling decisions for each deployment.
- Returns:
- True if there is a change to number of replicas for any deployment. False otherwise.
- """
- target_deployments = self.target_deployments
- if len(target_deployments) == 0:
- return False
- deployment_to_target_num_replicas: Dict[DeploymentID, int] = {}
- for deployment_name in target_deployments:
- deployment_id = DeploymentID(name=deployment_name, app_name=self._name)
- target_num_replicas = (
- self._deployment_state_manager.get_deployment_target_num_replicas(
- deployment_id
- )
- )
- if target_num_replicas is None:
- continue
- deployment_to_target_num_replicas[deployment_id] = target_num_replicas
- if len(deployment_to_target_num_replicas) == 0:
- return False
- decisions: Dict[
- DeploymentID, int
- ] = self._autoscaling_state_manager.get_decision_num_replicas(
- self._name, deployment_to_target_num_replicas
- )
- target_state_changed = False
- for deployment_id, decision_num_replicas in decisions.items():
- target_state_changed = (
- self._deployment_state_manager.autoscale(
- deployment_id, decision_num_replicas
- )
- or target_state_changed
- )
- return target_state_changed
- def apply_deployment_info(
- self,
- deployment_name: str,
- deployment_info: DeploymentInfo,
- ) -> bool:
- """Deploys a deployment in the application.
- Args:
- deployment_name: The name of the deployment to apply.
- deployment_info: The deployment info to apply.
- Returns:
- Whether the target state has changed.
- """
- route_prefix = deployment_info.route_prefix
- if route_prefix is not None and not route_prefix.startswith("/"):
- raise RayServeException(
- f'Invalid route prefix "{route_prefix}", it must start with "/"'
- )
- deployment_id = DeploymentID(name=deployment_name, app_name=self._name)
- target_state_changed = self._deployment_state_manager.deploy(
- deployment_id, deployment_info
- )
- if deployment_info.route_prefix is not None:
- config = deployment_info.deployment_config
- # Try to get route_patterns from deployment state first (most up-to-date),
- # otherwise fall back to existing endpoint patterns
- route_patterns = (
- self._deployment_state_manager.get_deployment_route_patterns(
- deployment_id
- )
- )
- self._endpoint_state.update_endpoint(
- deployment_id,
- # The current meaning of the "is_cross_language" field is ambiguous.
- # We will work on optimizing and removing this field in the future.
- # Instead of using the "is_cross_language" field, we will directly
- # compare if the replica is Python, which will assist the Python
- # router in determining if the replica invocation is a cross-language
- # operation.
- EndpointInfo(
- route=deployment_info.route_prefix,
- app_is_cross_language=config.deployment_language
- != DeploymentLanguage.PYTHON,
- route_patterns=route_patterns,
- ),
- )
- else:
- self._endpoint_state.delete_endpoint(deployment_id)
- return target_state_changed
- def deploy_app(
- self,
- deployment_infos: Dict[str, DeploymentInfo],
- external_scaler_enabled: bool,
- ):
- """(Re-)deploy the application from list of deployment infos.
- This function should only be called to deploy an app from an
- imperative API (i.e., `serve.run` or Java API).
- Raises: RayServeException if there is more than one route prefix
- or docs path.
- """
- # Check routes are unique in deployment infos
- self._route_prefix = self._check_routes(deployment_infos)
- self._set_target_state(
- deployment_infos=deployment_infos,
- api_type=APIType.IMPERATIVE,
- code_version=None,
- target_config=None,
- target_capacity=None,
- target_capacity_direction=None,
- external_scaler_enabled=external_scaler_enabled,
- )
- def apply_app_config(
- self,
- config: ServeApplicationSchema,
- target_capacity: Optional[float],
- target_capacity_direction: Optional[TargetCapacityDirection],
- deployment_time: float,
- ) -> None:
- """Apply the config to the application.
- If the code version matches that of the current live deployments
- then it only applies the updated config to the deployment state
- manager. If the code version doesn't match, this will re-build
- the application.
- This function should only be called to (re-)deploy an app from
- the declarative API (i.e., through the REST API).
- """
- self._deployment_timestamp = deployment_time
- config_version = get_app_code_version(config)
- if config_version == self._target_state.code_version:
- try:
- overrided_infos = override_deployment_info(
- self._target_state.deployment_infos,
- config,
- )
- self._route_prefix = self._check_routes(overrided_infos)
- self._set_target_state(
- # Code version doesn't change.
- code_version=self._target_state.code_version,
- api_type=APIType.DECLARATIVE,
- # Everything else must reflect the new config.
- deployment_infos=overrided_infos,
- target_config=config,
- target_capacity=target_capacity,
- target_capacity_direction=target_capacity_direction,
- external_scaler_enabled=config.external_scaler_enabled,
- )
- except (TypeError, ValueError, RayServeException):
- self._clear_target_state_and_store_config(config)
- self._update_status(
- ApplicationStatus.DEPLOY_FAILED, traceback.format_exc()
- )
- except Exception:
- self._clear_target_state_and_store_config(config)
- self._update_status(
- ApplicationStatus.DEPLOY_FAILED,
- (
- f"Unexpected error occurred while applying config for "
- f"application '{self._name}': \n{traceback.format_exc()}"
- ),
- )
- else:
- # If there is an in progress build task, cancel it.
- if self._build_app_task_info and not self._build_app_task_info.finished:
- logger.info(
- f"Received new config for application '{self._name}'. "
- "Cancelling previous request."
- )
- ray.cancel(self._build_app_task_info.obj_ref)
- # Halt reconciliation of target deployments. A new target state
- # will be set once the new app has finished building.
- self._clear_target_state_and_store_config(config)
- # Record telemetry for container runtime env feature
- if self._target_state.config.runtime_env.get(
- "container"
- ) or self._target_state.config.runtime_env.get("image_uri"):
- ServeUsageTag.APP_CONTAINER_RUNTIME_ENV_USED.record("1")
- if isinstance(config.autoscaling_policy, dict):
- application_autoscaling_policy_function = config.autoscaling_policy.get(
- "policy_function"
- )
- else:
- application_autoscaling_policy_function = None
- deployment_to_autoscaling_policy_function = {
- deployment.name: deployment.autoscaling_config.get("policy", {}).get(
- "policy_function", DEFAULT_AUTOSCALING_POLICY_NAME
- )
- for deployment in config.deployments
- if isinstance(deployment.autoscaling_config, dict)
- }
- deployment_to_request_router_cls = {
- deployment.name: deployment.request_router_config.get(
- "request_router_class", DEFAULT_REQUEST_ROUTER_PATH
- )
- for deployment in config.deployments
- if isinstance(deployment.request_router_config, dict)
- }
- # Kick off new build app task
- logger.info(f"Importing and building app '{self._name}'.")
- build_app_obj_ref = build_serve_application.options(
- runtime_env=config.runtime_env,
- enable_task_events=RAY_SERVE_ENABLE_TASK_EVENTS,
- ).remote(
- config.import_path,
- config_version,
- config.name,
- config.args,
- self._logging_config,
- application_autoscaling_policy_function,
- deployment_to_autoscaling_policy_function,
- deployment_to_request_router_cls,
- )
- self._build_app_task_info = BuildAppTaskInfo(
- obj_ref=build_app_obj_ref,
- code_version=config_version,
- config=config,
- target_capacity=target_capacity,
- target_capacity_direction=target_capacity_direction,
- finished=False,
- )
- def _get_live_deployments(self) -> List[str]:
- return self._deployment_state_manager.get_deployments_in_application(self._name)
- def _determine_app_status(self) -> Tuple[ApplicationStatus, str]:
- """Check deployment statuses and target state, and determine the
- corresponding application status.
- Returns:
- Status (ApplicationStatus):
- RUNNING: all deployments are healthy or autoscaling.
- DEPLOYING: there is one or more updating deployments,
- and there are no unhealthy deployments.
- DEPLOY_FAILED: one or more deployments became unhealthy
- while the application was deploying.
- UNHEALTHY: one or more deployments became unhealthy
- while the application was running.
- DELETING: the application is being deleted.
- Error message (str):
- Non-empty string if status is DEPLOY_FAILED or UNHEALTHY
- """
- if self._target_state.deleting:
- return ApplicationStatus.DELETING, ""
- # Get the lowest rank, i.e. highest priority, deployment status info object
- # The deployment status info with highest priority determines the corresponding
- # application status to set.
- deployment_statuses = self.get_deployments_statuses()
- lowest_rank_status = min(deployment_statuses, key=lambda info: info.rank)
- if lowest_rank_status.status == DeploymentStatus.DEPLOY_FAILED:
- failed_deployments = [
- s.name
- for s in deployment_statuses
- if s.status == DeploymentStatus.DEPLOY_FAILED
- ]
- return (
- ApplicationStatus.DEPLOY_FAILED,
- f"Failed to update the deployments {failed_deployments}.",
- )
- elif lowest_rank_status.status == DeploymentStatus.UNHEALTHY:
- unhealthy_deployment_names = [
- s.name
- for s in deployment_statuses
- if s.status == DeploymentStatus.UNHEALTHY
- ]
- return (
- ApplicationStatus.UNHEALTHY,
- f"The deployments {unhealthy_deployment_names} are UNHEALTHY.",
- )
- elif lowest_rank_status.status == DeploymentStatus.UPDATING:
- return ApplicationStatus.DEPLOYING, ""
- elif (
- lowest_rank_status.status
- in [DeploymentStatus.UPSCALING, DeploymentStatus.DOWNSCALING]
- and lowest_rank_status.status_trigger
- == DeploymentStatusTrigger.CONFIG_UPDATE_STARTED
- ):
- return ApplicationStatus.DEPLOYING, ""
- else:
- return ApplicationStatus.RUNNING, ""
- def _reconcile_build_app_task(
- self,
- ) -> Tuple[Optional[bytes], Optional[Dict], BuildAppStatus, str]:
- """If necessary, reconcile the in-progress build task.
- Returns:
- Serialized application autoscaling policy def (bytes):
- The serialized application autoscaling policy def returned from the build app task
- if it was built successfully, otherwise None.
- Deploy arguments (Dict[str, DeploymentInfo]):
- The deploy arguments returned from the build app task
- and their code version.
- Status (BuildAppStatus):
- NO_TASK_IN_PROGRESS: There is no build task to reconcile.
- SUCCEEDED: Task finished successfully.
- FAILED: An error occurred during execution of build app task
- IN_PROGRESS: Task hasn't finished yet.
- Error message (str):
- Non-empty string if status is DEPLOY_FAILED or UNHEALTHY
- """
- if self._build_app_task_info is None or self._build_app_task_info.finished:
- return None, None, BuildAppStatus.NO_TASK_IN_PROGRESS, ""
- if not check_obj_ref_ready_nowait(self._build_app_task_info.obj_ref):
- return None, None, BuildAppStatus.IN_PROGRESS, ""
- # Retrieve build app task result
- self._build_app_task_info.finished = True
- try:
- serialized_application_autoscaling_policy_def, args, err = ray.get(
- self._build_app_task_info.obj_ref
- )
- if err is None:
- logger.info(f"Imported and built app '{self._name}' successfully.")
- else:
- return (
- None,
- None,
- BuildAppStatus.FAILED,
- f"Deploying app '{self._name}' failed with exception:\n{err}",
- )
- except RuntimeEnvSetupError:
- error_msg = (
- f"Runtime env setup for app '{self._name}' failed:\n"
- + traceback.format_exc()
- )
- return None, None, BuildAppStatus.FAILED, error_msg
- except Exception:
- error_msg = (
- f"Unexpected error occurred while deploying application "
- f"'{self._name}': \n{traceback.format_exc()}"
- )
- return None, None, BuildAppStatus.FAILED, error_msg
- # Convert serialized deployment args (returned by build app task)
- # to deployment infos and apply option overrides from config
- try:
- deployment_infos = {
- params["deployment_name"]: deploy_args_to_deployment_info(
- **params, app_name=self._name
- )
- for params in args
- }
- deployment_to_serialized_autoscaling_policy_def = {
- params["deployment_name"]: params["serialized_autoscaling_policy_def"]
- for params in args
- if params["serialized_autoscaling_policy_def"] is not None
- }
- deployment_to_serialized_request_router_cls = {
- params["deployment_name"]: params["serialized_request_router_cls"]
- for params in args
- if params["serialized_request_router_cls"] is not None
- }
- overrided_infos = override_deployment_info(
- deployment_infos,
- self._build_app_task_info.config,
- deployment_to_serialized_autoscaling_policy_def,
- deployment_to_serialized_request_router_cls,
- )
- self._route_prefix = self._check_routes(overrided_infos)
- return (
- serialized_application_autoscaling_policy_def,
- overrided_infos,
- BuildAppStatus.SUCCEEDED,
- "",
- )
- except (TypeError, ValueError, RayServeException):
- return None, None, BuildAppStatus.FAILED, traceback.format_exc()
- except Exception:
- error_msg = (
- f"Unexpected error occurred while applying config for application "
- f"'{self._name}': \n{traceback.format_exc()}"
- )
- return None, None, BuildAppStatus.FAILED, error_msg
- def _check_routes(
- self, deployment_infos: Dict[str, DeploymentInfo]
- ) -> Tuple[str, str]:
- """Check route prefixes of deployments in app.
- There should only be one non-null route prefix. If there is one,
- set it as the application route prefix. This function must be
- run every control loop iteration because the target config could
- be updated without kicking off a new task.
- Returns: route prefix.
- Raises: RayServeException if more than one route prefix is found among deployments.
- """
- num_route_prefixes = 0
- route_prefix = None
- for info in deployment_infos.values():
- # Update route prefix of application, which may be updated
- # through a redeployed config.
- if info.route_prefix is not None:
- route_prefix = info.route_prefix
- num_route_prefixes += 1
- if num_route_prefixes > 1:
- raise RayServeException(
- f'Found multiple route prefixes from application "{self._name}",'
- " Please specify only one route prefix for the application "
- "to avoid this issue."
- )
- return route_prefix
- def _reconcile_target_deployments(self) -> None:
- """Reconcile target deployments in application target state.
- Ensure each deployment is running on up-to-date info, and
- remove outdated deployments from the application.
- """
- target_state_changed = False
- # Set target state for each deployment
- for deployment_name, info in self._target_state.deployment_infos.items():
- deploy_info = deepcopy(info)
- # Apply the target capacity information to the deployment info.
- deploy_info.set_target_capacity(
- new_target_capacity=self._target_state.target_capacity,
- new_target_capacity_direction=(
- self._target_state.target_capacity_direction
- ),
- )
- # Apply the application logging config to the deployment logging config
- # if it is not set.
- if (
- self._target_state.config
- and self._target_state.config.logging_config
- and deploy_info.deployment_config.logging_config is None
- ):
- deploy_info.deployment_config.logging_config = (
- self._target_state.config.logging_config
- )
- target_state_changed = (
- self.apply_deployment_info(deployment_name, deploy_info)
- or target_state_changed
- )
- # Delete outdated deployments
- for deployment_name in self._get_live_deployments():
- if deployment_name not in self.target_deployments:
- target_state_changed = (
- self._delete_deployment(deployment_name) or target_state_changed
- )
- return target_state_changed
- def get_deployment_topology(self) -> Optional[DeploymentTopology]:
- """Get the deployment topology for this application.
- Returns:
- The deployment topology, or None if not yet built.
- """
- if not self.target_deployments:
- return None
- nodes = {}
- # Using target deployments because we wish to build best effort topology based on current state.
- for deployment_name in self.target_deployments:
- deployment_id = DeploymentID(name=deployment_name, app_name=self._name)
- # Get outbound deployment names from deployment state
- outbound_deployment = (
- self._deployment_state_manager.get_deployment_outbound_deployments(
- deployment_id
- )
- ) or []
- # Create node for this deployment
- node = DeploymentNode(
- name=deployment_name,
- app_name=self._name,
- outbound_deployments=[
- {"name": dep.name, "app_name": dep.app_name}
- for dep in outbound_deployment
- ],
- is_ingress=(deployment_name == self._ingress_deployment_name),
- )
- nodes[deployment_name] = node
- return DeploymentTopology(
- app_name=self._name,
- ingress_deployment=self._ingress_deployment_name,
- nodes=nodes,
- )
- def update(self) -> Tuple[bool, bool]:
- """Attempts to reconcile this application to match its target state.
- Updates the application status and status message based on the
- current state of the system.
- Returns:
- Whether the target state has changed.
- """
- target_state_changed = False
- # If the application is being deleted, ignore any build task results to
- # avoid flipping the state back to DEPLOYING/RUNNING.
- if not self._target_state.deleting:
- (
- serialized_application_autoscaling_policy_def,
- infos,
- task_status,
- msg,
- ) = self._reconcile_build_app_task()
- if task_status == BuildAppStatus.SUCCEEDED:
- target_state_changed = True
- self._set_target_state(
- deployment_infos=infos,
- code_version=self._build_app_task_info.code_version,
- api_type=self._target_state.api_type,
- target_config=self._build_app_task_info.config,
- target_capacity=self._build_app_task_info.target_capacity,
- target_capacity_direction=(
- self._build_app_task_info.target_capacity_direction
- ),
- external_scaler_enabled=self._target_state.external_scaler_enabled,
- serialized_application_autoscaling_policy_def=serialized_application_autoscaling_policy_def,
- )
- # Handling the case where the user turns off/turns on app-level autoscaling policy,
- # between app deployment.
- if (
- self._target_state.config is not None
- and self._target_state.config.autoscaling_policy is not None
- ):
- self._autoscaling_state_manager.register_application(
- self._name,
- AutoscalingPolicy(
- _serialized_policy_def=serialized_application_autoscaling_policy_def,
- **self._target_state.config.autoscaling_policy,
- ),
- )
- else:
- self._autoscaling_state_manager.deregister_application(self._name)
- elif task_status == BuildAppStatus.FAILED:
- self._update_status(ApplicationStatus.DEPLOY_FAILED, msg)
- # Only reconcile deployments when the build app task is finished. If
- # it's not finished, we don't know what the target list of deployments
- # is, so we don't perform any reconciliation.
- if self._target_state.deployment_infos is not None:
- target_state_changed = (
- self._reconcile_target_deployments() or target_state_changed
- )
- status, status_msg = self._determine_app_status()
- self._update_status(status, status_msg)
- # Check if app is ready to be deleted
- if self._target_state.deleting:
- return self.is_deleted(), target_state_changed
- return False, target_state_changed
- def get_checkpoint_data(self) -> ApplicationTargetState:
- return self._target_state
- def get_deployments_statuses(self) -> List[DeploymentStatusInfo]:
- """Return all deployment status information"""
- deployments = [
- DeploymentID(name=deployment, app_name=self._name)
- for deployment in self.target_deployments
- ]
- return self._deployment_state_manager.get_deployment_statuses(deployments)
- def get_application_status_info(self) -> ApplicationStatusInfo:
- """Return the application status information"""
- return ApplicationStatusInfo(
- self._status,
- message=self._status_msg,
- deployment_timestamp=self._deployment_timestamp,
- )
- def list_deployment_details(self) -> Dict[str, DeploymentDetails]:
- """Gets detailed info on all live deployments in this application.
- (Does not include deleted deployments.)
- Returns:
- A dictionary of deployment infos. The set of deployment info returned
- may not be the full list of deployments that are part of the application.
- This can happen when the application is still deploying and bringing up
- deployments, or when the application is deleting and some deployments have
- been deleted.
- """
- details = {
- deployment_name: self._deployment_state_manager.get_deployment_details(
- DeploymentID(name=deployment_name, app_name=self._name)
- )
- for deployment_name in self.target_deployments
- }
- return {k: v for k, v in details.items() if v is not None}
- def _update_status(self, status: ApplicationStatus, status_msg: str = "") -> None:
- if (
- status_msg
- and status
- in [
- ApplicationStatus.DEPLOY_FAILED,
- ApplicationStatus.UNHEALTHY,
- ]
- and status_msg != self._status_msg
- ):
- logger.error(status_msg)
- self._status = status
- self._status_msg = status_msg
- class ApplicationStateManager:
- def __init__(
- self,
- deployment_state_manager: DeploymentStateManager,
- autoscaling_state_manager: AutoscalingStateManager,
- endpoint_state: EndpointState,
- kv_store: KVStoreBase,
- logging_config: LoggingConfig,
- ):
- self._deployment_state_manager = deployment_state_manager
- self._autoscaling_state_manager = autoscaling_state_manager
- self._endpoint_state = endpoint_state
- self._kv_store = kv_store
- self._logging_config = logging_config
- self._shutting_down = False
- self._application_states: Dict[str, ApplicationState] = {}
- # Metric for tracking application status
- self._application_status_gauge = ray_metrics.Gauge(
- "serve_application_status",
- description=(
- "Numeric status of application. "
- "0=UNKNOWN, 1=DEPLOY_FAILED, 2=UNHEALTHY, 3=NOT_STARTED, "
- "4=DELETING, 5=DEPLOYING, 6=RUNNING."
- ),
- tag_keys=("application",),
- )
- self._recover_from_checkpoint()
- def _recover_from_checkpoint(self):
- checkpoint = self._kv_store.get(CHECKPOINT_KEY)
- if checkpoint is not None:
- application_state_info = cloudpickle.loads(checkpoint)
- for app_name, checkpoint_data in application_state_info.items():
- app_state = ApplicationState(
- app_name,
- self._deployment_state_manager,
- self._autoscaling_state_manager,
- self._endpoint_state,
- self._logging_config,
- checkpoint_data.external_scaler_enabled,
- )
- app_state.recover_target_state_from_checkpoint(checkpoint_data)
- self._application_states[app_name] = app_state
- def delete_app(self, name: str) -> None:
- """Delete application by name"""
- if name not in self._application_states:
- return
- self._application_states[name].delete()
- def deploy_apps(
- self,
- name_to_deployment_args: Dict[str, List[Dict]],
- name_to_application_args: Dict[str, ApplicationArgsProto],
- ) -> None:
- live_route_prefixes: Dict[str, str] = {
- app_state.route_prefix: app_name
- for app_name, app_state in self._application_states.items()
- if app_state.route_prefix is not None
- and not app_state.status == ApplicationStatus.DELETING
- }
- for name, deployment_args in name_to_deployment_args.items():
- for deploy_param in deployment_args:
- # Make sure route_prefix is not being used by other application.
- deploy_app_prefix = deploy_param.get("route_prefix")
- if deploy_app_prefix is None:
- continue
- existing_app_name = live_route_prefixes.get(deploy_app_prefix)
- # It's ok to redeploy an app with the same prefix
- # if it has the same name as the app already using that prefix.
- if existing_app_name is not None and existing_app_name != name:
- raise RayServeException(
- f"Prefix {deploy_app_prefix} is being used by application "
- f'"{existing_app_name}". Failed to deploy application "{name}".'
- )
- # We might be deploying more than one app,
- # so we need to add this app's prefix to the
- # set of live route prefixes that we're checking
- # against during this batch operation.
- live_route_prefixes[deploy_app_prefix] = name
- application_args = name_to_application_args.get(name)
- external_scaler_enabled = application_args.external_scaler_enabled
- if name not in self._application_states:
- self._application_states[name] = ApplicationState(
- name,
- self._deployment_state_manager,
- self._autoscaling_state_manager,
- self._endpoint_state,
- self._logging_config,
- external_scaler_enabled,
- )
- ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))
- deployment_infos = {
- params["deployment_name"]: deploy_args_to_deployment_info(
- **params, app_name=name
- )
- for params in deployment_args
- }
- self._application_states[name].deploy_app(
- deployment_infos, external_scaler_enabled
- )
- def deploy_app(
- self,
- name: str,
- deployment_args: List[Dict],
- application_args: ApplicationArgsProto,
- ) -> None:
- """Deploy the specified app to the list of deployment arguments.
- This function should only be called if the app is being deployed
- through serve.run instead of from a config.
- Args:
- name: application name
- deployment_args_list: arguments for deploying a list of deployments.
- application_args: application arguments.
- """
- self.deploy_apps({name: deployment_args}, {name: application_args})
- def apply_app_configs(
- self,
- app_configs: List[ServeApplicationSchema],
- *,
- deployment_time: float = 0,
- target_capacity: Optional[float] = None,
- target_capacity_direction: Optional[TargetCapacityDirection] = None,
- ):
- """Declaratively apply the list of application configs.
- The applications will be reconciled to match the target state of the config.
- Any applications previously deployed declaratively that are *not* present in
- the list will be deleted.
- """
- for app_config in app_configs:
- if app_config.name not in self._application_states:
- logger.info(f"Deploying new app '{app_config.name}'.")
- self._application_states[app_config.name] = ApplicationState(
- app_config.name,
- self._deployment_state_manager,
- self._autoscaling_state_manager,
- endpoint_state=self._endpoint_state,
- logging_config=self._logging_config,
- external_scaler_enabled=app_config.external_scaler_enabled,
- )
- self._application_states[app_config.name].apply_app_config(
- app_config,
- target_capacity,
- target_capacity_direction,
- deployment_time=deployment_time,
- )
- # Delete all apps that were previously deployed via the declarative API
- # but are not in the config being applied.
- existing_apps = {
- name
- for name, app_state in self._application_states.items()
- if app_state.api_type == APIType.DECLARATIVE
- }
- apps_in_config = {app_config.name for app_config in app_configs}
- for app_to_delete in existing_apps - apps_in_config:
- self.delete_app(app_to_delete)
- ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))
- def get_deployments(self, app_name: str) -> List[str]:
- """Return all deployment names by app name"""
- if app_name not in self._application_states:
- return []
- return self._application_states[app_name].target_deployments
- def get_deployments_statuses(self, app_name: str) -> List[DeploymentStatusInfo]:
- """Return all deployment statuses by app name"""
- if app_name not in self._application_states:
- return []
- return self._application_states[app_name].get_deployments_statuses()
- def get_app_status(self, name: str) -> ApplicationStatus:
- if name not in self._application_states:
- return ApplicationStatus.NOT_STARTED
- return self._application_states[name].status
- def does_app_exist(self, name: str) -> bool:
- return name in self._application_states
- def get_app_status_info(self, name: str) -> ApplicationStatusInfo:
- if name not in self._application_states:
- return ApplicationStatusInfo(
- ApplicationStatus.NOT_STARTED,
- message=f"Application {name} doesn't exist",
- deployment_timestamp=0,
- )
- return self._application_states[name].get_application_status_info()
- def get_docs_path(self, app_name: str) -> Optional[str]:
- return self._application_states[app_name].docs_path
- def get_route_prefix(self, name: str) -> Optional[str]:
- return self._application_states[name].route_prefix
- def get_ingress_deployment_name(self, name: str) -> Optional[str]:
- if name not in self._application_states:
- return None
- return self._application_states[name].ingress_deployment
- def get_app_source(self, name: str) -> APIType:
- return self._application_states[name].api_type
- def get_external_scaler_enabled(self, app_name: str) -> bool:
- """Check if external scaler is enabled for the application.
- Args:
- app_name: Name of the application.
- Returns:
- True if external_scaler_enabled is set for the application, False otherwise.
- """
- return (
- self.does_app_exist(app_name)
- and self._application_states[app_name].external_scaler_enabled
- )
- def list_app_statuses(
- self, source: Optional[APIType] = None
- ) -> Dict[str, ApplicationStatusInfo]:
- """Return a dictionary with {app name: application info}
- Args:
- source: Optional API type filter. If provided, only returns apps
- deployed via the specified API type.
- Returns:
- Dict[str, ApplicationStatusInfo]: A dictionary mapping application names
- to their corresponding status information.
- """
- if source is None:
- return {
- name: self._application_states[name].get_application_status_info()
- for name in self._application_states
- }
- else:
- return {
- name: self._application_states[name].get_application_status_info()
- for name in self._application_states
- if self.get_app_source(name) is source
- }
- def list_app_names(self) -> List[str]:
- """Return app names without instantiating status objects."""
- return list(self._application_states.keys())
- def list_deployment_details(self, name: str) -> Dict[str, DeploymentDetails]:
- """Gets detailed info on all deployments in specified application."""
- if name not in self._application_states:
- return {}
- return self._application_states[name].list_deployment_details()
- def get_deployment_topology(self, app_name: str) -> Optional[DeploymentTopology]:
- """Get the deployment topology for an application.
- Args:
- app_name: Name of the application.
- Returns:
- The deployment topology for the application, or None if the application
- doesn't exist or the topology hasn't been built yet.
- """
- if app_name not in self._application_states:
- return None
- return self._application_states[app_name].get_deployment_topology()
- def update(self) -> bool:
- """
- Update each application state.
- Returns:
- bool: True if any application's target state changed during this update.
- """
- apps_to_be_deleted = []
- any_target_state_changed = False
- for name, app in self._application_states.items():
- if app.should_autoscale():
- any_target_state_changed = app.autoscale() or any_target_state_changed
- ready_to_be_deleted, app_target_state_changed = app.update()
- any_target_state_changed = (
- any_target_state_changed or app_target_state_changed
- )
- if ready_to_be_deleted:
- apps_to_be_deleted.append(name)
- logger.debug(f"Application '{name}' deleted successfully.")
- # Record application status metrics
- for name, app in self._application_states.items():
- self._application_status_gauge.set(
- app.status.to_numeric(),
- tags={"application": name},
- )
- if len(apps_to_be_deleted) > 0:
- for app_name in apps_to_be_deleted:
- self._autoscaling_state_manager.deregister_application(app_name)
- del self._application_states[app_name]
- ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))
- if any_target_state_changed:
- self.save_checkpoint()
- self._deployment_state_manager.save_checkpoint()
- return any_target_state_changed
- def shutdown(self) -> None:
- self._shutting_down = True
- for app_state in self._application_states.values():
- app_state.delete()
- self._kv_store.delete(CHECKPOINT_KEY)
- def is_ready_for_shutdown(self) -> bool:
- """Return whether all applications have shut down.
- Iterate through all application states and check if all their applications
- are deleted.
- """
- return self._shutting_down and all(
- app_state.is_deleted() for app_state in self._application_states.values()
- )
- def save_checkpoint(self) -> None:
- """Write a checkpoint of all application states."""
- if self._shutting_down:
- # Once we're told to shut down, stop writing checkpoints.
- # Calling .shutdown() deletes any existing checkpoint.
- return
- application_state_info = {
- app_name: app_state.get_checkpoint_data()
- for app_name, app_state in self._application_states.items()
- }
- self._kv_store.put(
- CHECKPOINT_KEY,
- cloudpickle.dumps(application_state_info),
- )
- @ray.remote(num_cpus=0, max_calls=1)
- def build_serve_application(
- import_path: str,
- code_version: str,
- name: str,
- args: Dict,
- logging_config: LoggingConfig,
- application_autoscaling_policy_function: Optional[str],
- deployment_to_autoscaling_policy_function: Dict[str, str],
- deployment_to_request_router_cls: Dict[str, str],
- ) -> Tuple[Optional[bytes], Optional[List[Dict]], Optional[str]]:
- """Import and build a Serve application.
- Args:
- import_path: import path to top-level bound deployment.
- code_version: code version inferred from app config. All
- deployment versions are set to this code version.
- name: application name. If specified, application will be deployed
- without removing existing applications.
- args: Arguments to be passed to the application builder.
- logging_config: the logging config for the build app task.
- application_autoscaling_policy_function: the application autoscaling policy function name
- deployment_to_autoscaling_policy_function: a dictionary mapping deployment names to autoscaling policy function names
- deployment_to_request_router_cls: a dictionary mapping deployment names to request router class names
- Returns:
- Serialized application autoscaling policy def: a serialized autoscaling
- policy def for the application if it was built successfully, otherwise None.
- Deploy arguments: a list of deployment arguments if application
- was built successfully, otherwise None.
- Error message: a string if an error was raised, otherwise None.
- """
- configure_component_logger(
- component_name="controller",
- component_id=f"build_{name}_{os.getpid()}",
- logging_config=logging_config,
- )
- try:
- from ray.serve._private.api import call_user_app_builder_with_args_if_necessary
- # Import and build the application.
- args_info_str = f" with arguments {args}" if args else ""
- logger.info(f"Importing application '{name}'{args_info_str}.")
- app = call_user_app_builder_with_args_if_necessary(
- import_attr(import_path), args
- )
- deploy_args_list = []
- built_app: BuiltApplication = build_app(
- app,
- name=name,
- default_runtime_env=ray.get_runtime_context().runtime_env,
- )
- num_ingress_deployments = 0
- def _get_serialized_def(attr_path: str) -> bytes:
- module, attr = import_module_and_attr(attr_path)
- cloudpickle.register_pickle_by_value(module)
- serialized = cloudpickle.dumps(attr)
- cloudpickle.unregister_pickle_by_value(module)
- return serialized
- application_serialized_autoscaling_policy_def = None
- if application_autoscaling_policy_function is not None:
- application_serialized_autoscaling_policy_def = _get_serialized_def(
- application_autoscaling_policy_function
- )
- for deployment in built_app.deployments:
- if inspect.isclass(deployment.func_or_class) and issubclass(
- deployment.func_or_class, ASGIAppReplicaWrapper
- ):
- num_ingress_deployments += 1
- is_ingress = deployment.name == built_app.ingress_deployment_name
- deployment_to_serialized_autoscaling_policy_def = None
- deployment_to_serialized_request_router_cls = None
- if deployment.name in deployment_to_autoscaling_policy_function:
- deployment_to_serialized_autoscaling_policy_def = _get_serialized_def(
- deployment_to_autoscaling_policy_function[deployment.name]
- )
- if deployment.name in deployment_to_request_router_cls:
- deployment_to_serialized_request_router_cls = _get_serialized_def(
- deployment_to_request_router_cls[deployment.name]
- )
- deploy_args_list.append(
- get_deploy_args(
- name=deployment._name,
- replica_config=deployment._replica_config,
- ingress=is_ingress,
- deployment_config=deployment._deployment_config,
- version=code_version,
- route_prefix="/" if is_ingress else None,
- serialized_autoscaling_policy_def=deployment_to_serialized_autoscaling_policy_def,
- serialized_request_router_cls=deployment_to_serialized_request_router_cls,
- )
- )
- if num_ingress_deployments > 1:
- return (
- None,
- None,
- (
- f'Found multiple FastAPI deployments in application "{built_app.name}". '
- "Please only include one deployment with @serve.ingress "
- "in your application to avoid this issue."
- ),
- )
- return application_serialized_autoscaling_policy_def, deploy_args_list, None
- except KeyboardInterrupt:
- # Error is raised when this task is canceled with ray.cancel(), which
- # happens when deploy_apps() is called.
- logger.info(
- "Existing config deployment request terminated because of keyboard "
- "interrupt."
- )
- return None, None, None
- except Exception:
- logger.error(
- f"Exception importing application '{name}'.\n{traceback.format_exc()}"
- )
- return None, None, traceback.format_exc()
- def override_deployment_info(
- deployment_infos: Dict[str, DeploymentInfo],
- override_config: Optional[ServeApplicationSchema],
- deployment_to_serialized_autoscaling_policy_def: Optional[Dict[str, bytes]] = None,
- deployment_to_serialized_request_router_cls: Optional[Dict[str, bytes]] = None,
- ) -> Dict[str, DeploymentInfo]:
- """Override deployment infos with options from app config.
- Args:
- app_name: application name
- deployment_infos: deployment info loaded from code
- override_config: application config deployed by user with
- options to override those loaded from code.
- deployment_to_serialized_autoscaling_policy_def: serialized autoscaling policy def for each deployment
- deployment_to_serialized_request_router_cls: serialized request router cls for each deployment
- Returns: the updated deployment infos.
- Raises:
- ValueError: If config options have invalid values.
- TypeError: If config options have invalid types.
- """
- deployment_infos = deepcopy(deployment_infos)
- if override_config is None:
- return deployment_infos
- config_dict = override_config.dict(exclude_unset=True)
- deployment_override_options = config_dict.get("deployments", [])
- # Override options for each deployment listed in the config.
- for options in deployment_override_options:
- if "max_ongoing_requests" in options:
- options["max_ongoing_requests"] = options.get("max_ongoing_requests")
- deployment_name = options["name"]
- if deployment_name not in deployment_infos:
- raise ValueError(
- f"Deployment '{deployment_name}' does not exist. "
- f"Available: {list(deployment_infos.keys())}"
- )
- info = deployment_infos[deployment_name]
- original_options = info.deployment_config.dict()
- original_options["user_configured_option_names"].update(set(options))
- # Override `max_ongoing_requests` and `autoscaling_config` if
- # `num_replicas="auto"`
- if options.get("num_replicas") == "auto":
- options["num_replicas"] = None
- new_config = AutoscalingConfig.default().dict()
- # If `autoscaling_config` is specified, its values override
- # the default `num_replicas="auto"` configuration
- autoscaling_config = (
- options.get("autoscaling_config")
- or info.deployment_config.autoscaling_config
- )
- if autoscaling_config:
- new_config.update(autoscaling_config)
- if (
- deployment_to_serialized_autoscaling_policy_def
- and deployment_name in deployment_to_serialized_autoscaling_policy_def
- ):
- # By setting the serialized policy def, AutoscalingConfig constructor will not
- # try to import the policy from the string import path
- policy_obj = AutoscalingPolicy.from_serialized_policy_def(
- new_config["policy"],
- deployment_to_serialized_autoscaling_policy_def[deployment_name],
- )
- new_config["policy"] = policy_obj
- options["autoscaling_config"] = AutoscalingConfig(**new_config)
- ServeUsageTag.AUTO_NUM_REPLICAS_USED.record("1")
- # What to pass to info.update
- override_options = {}
- # Merge app-level and deployment-level runtime_envs.
- replica_config = info.replica_config
- app_runtime_env = override_config.runtime_env
- if "ray_actor_options" in options:
- # If specified, get ray_actor_options from config
- override_actor_options = options.pop("ray_actor_options", {})
- else:
- # Otherwise, get options from application code (and default to {}
- # if the code sets options to None).
- override_actor_options = replica_config.ray_actor_options or {}
- override_placement_group_bundles = options.pop(
- "placement_group_bundles", replica_config.placement_group_bundles
- )
- override_placement_group_strategy = options.pop(
- "placement_group_strategy", replica_config.placement_group_strategy
- )
- override_max_replicas_per_node = options.pop(
- "max_replicas_per_node", replica_config.max_replicas_per_node
- )
- override_bundle_label_selector = options.pop(
- "placement_group_bundle_label_selector",
- replica_config.placement_group_bundle_label_selector,
- )
- override_fallback_strategy = options.pop(
- "placement_group_fallback_strategy",
- replica_config.placement_group_fallback_strategy,
- )
- # Record telemetry for container runtime env feature at deployment level
- if override_actor_options.get("runtime_env") and (
- override_actor_options["runtime_env"].get("container")
- or override_actor_options["runtime_env"].get("image_uri")
- ):
- ServeUsageTag.DEPLOYMENT_CONTAINER_RUNTIME_ENV_USED.record("1")
- merged_env = override_runtime_envs_except_env_vars(
- app_runtime_env, override_actor_options.get("runtime_env", {})
- )
- override_actor_options.update({"runtime_env": merged_env})
- replica_config.update(
- ray_actor_options=override_actor_options,
- placement_group_bundles=override_placement_group_bundles,
- placement_group_strategy=override_placement_group_strategy,
- max_replicas_per_node=override_max_replicas_per_node,
- placement_group_bundle_label_selector=override_bundle_label_selector,
- placement_group_fallback_strategy=override_fallback_strategy,
- )
- override_options["replica_config"] = replica_config
- if "request_router_config" in options:
- request_router_config = options.get("request_router_config")
- if request_router_config:
- if (
- deployment_to_serialized_request_router_cls
- and deployment_name in deployment_to_serialized_request_router_cls
- ):
- # By setting the serialized request router cls, RequestRouterConfig constructor will not
- # try to import the request router cls from the string import path
- options[
- "request_router_config"
- ] = RequestRouterConfig.from_serialized_request_router_cls(
- request_router_config,
- deployment_to_serialized_request_router_cls[deployment_name],
- )
- else:
- options["request_router_config"] = RequestRouterConfig(
- **request_router_config
- )
- # Override deployment config options
- options.pop("name", None)
- original_options.update(options)
- override_options["deployment_config"] = DeploymentConfig(**original_options)
- deployment_infos[deployment_name] = info.update(**override_options)
- deployment_config = deployment_infos[deployment_name].deployment_config
- if (
- deployment_config.autoscaling_config is not None
- and deployment_config.max_ongoing_requests
- < deployment_config.autoscaling_config.get_target_ongoing_requests()
- ):
- logger.warning(
- "Autoscaling will never happen, "
- "because 'max_ongoing_requests' is less than "
- "'target_ongoing_requests' now."
- )
- # Overwrite ingress route prefix
- app_route_prefix = config_dict.get("route_prefix", DEFAULT.VALUE)
- validate_route_prefix(app_route_prefix)
- for deployment in list(deployment_infos.values()):
- if (
- app_route_prefix is not DEFAULT.VALUE
- and deployment.route_prefix is not None
- ):
- deployment.route_prefix = app_route_prefix
- return deployment_infos
|