import inspect import json import logging import os import time import traceback from copy import deepcopy from dataclasses import asdict, dataclass, field from enum import Enum from typing import Dict, List, Optional, Tuple import ray from ray import cloudpickle from ray._common.utils import import_attr, import_module_and_attr from ray.exceptions import RuntimeEnvSetupError from ray.serve._private.autoscaling_state import AutoscalingStateManager from ray.serve._private.build_app import BuiltApplication, build_app from ray.serve._private.common import ( DeploymentID, DeploymentStatus, DeploymentStatusInfo, DeploymentStatusTrigger, EndpointInfo, TargetCapacityDirection, ) from ray.serve._private.config import DeploymentConfig from ray.serve._private.constants import ( DEFAULT_AUTOSCALING_POLICY_NAME, DEFAULT_REQUEST_ROUTER_PATH, RAY_SERVE_ENABLE_TASK_EVENTS, SERVE_LOGGER_NAME, ) from ray.serve._private.deploy_utils import ( deploy_args_to_deployment_info, get_app_code_version, get_deploy_args, ) from ray.serve._private.deployment_info import DeploymentInfo from ray.serve._private.deployment_state import DeploymentStateManager from ray.serve._private.endpoint_state import EndpointState from ray.serve._private.logging_utils import configure_component_logger from ray.serve._private.storage.kv_store import KVStoreBase from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import ( DEFAULT, check_obj_ref_ready_nowait, override_runtime_envs_except_env_vars, validate_route_prefix, ) from ray.serve.api import ASGIAppReplicaWrapper from ray.serve.config import AutoscalingConfig, AutoscalingPolicy, RequestRouterConfig from ray.serve.exceptions import RayServeException from ray.serve.generated.serve_pb2 import ( ApplicationArgs as ApplicationArgsProto, ApplicationStatus as ApplicationStatusProto, ApplicationStatusInfo as ApplicationStatusInfoProto, DeploymentLanguage, DeploymentStatusInfoList as DeploymentStatusInfoListProto, StatusOverview as StatusOverviewProto, ) from ray.serve.schema import ( APIType, ApplicationStatus, DeploymentDetails, DeploymentNode, DeploymentTopology, LoggingConfig, ServeApplicationSchema, ) from ray.types import ObjectRef from ray.util import metrics as ray_metrics logger = logging.getLogger(SERVE_LOGGER_NAME) CHECKPOINT_KEY = "serve-application-state-checkpoint" class BuildAppStatus(Enum): """Status of the build application task.""" NO_TASK_IN_PROGRESS = 1 IN_PROGRESS = 2 SUCCEEDED = 3 FAILED = 4 @dataclass class BuildAppTaskInfo: """Stores info on the current in-progress build app task. We use a class instead of only storing the task object ref because when a new config is deployed, there can be an outdated in-progress build app task. We attach the code version to the task info to distinguish outdated build app tasks. """ obj_ref: ObjectRef code_version: str config: ServeApplicationSchema target_capacity: Optional[float] target_capacity_direction: Optional[TargetCapacityDirection] finished: bool @dataclass(eq=True) class ApplicationStatusInfo: status: ApplicationStatus message: str = "" deployment_timestamp: float = 0 def debug_string(self): return json.dumps(asdict(self), indent=4) def to_proto(self): return ApplicationStatusInfoProto( status=f"APPLICATION_STATUS_{self.status.name}", message=self.message, deployment_timestamp=self.deployment_timestamp, ) @classmethod def from_proto(cls, proto: ApplicationStatusInfoProto): status = ApplicationStatusProto.Name(proto.status)[len("APPLICATION_STATUS_") :] return cls( status=ApplicationStatus(status), message=proto.message, deployment_timestamp=proto.deployment_timestamp, ) @dataclass(eq=True) class StatusOverview: app_status: ApplicationStatusInfo name: str = "" deployment_statuses: List[DeploymentStatusInfo] = field(default_factory=list) def debug_string(self): return json.dumps(asdict(self), indent=4) def get_deployment_status(self, name: str) -> Optional[DeploymentStatusInfo]: """Get a deployment's status by name. Args: name: Deployment's name. Returns: Optional[DeploymentStatusInfo]: The status of the deployment if it exists, otherwise None. """ for deployment_status in self.deployment_statuses: if name == deployment_status.name: return deployment_status return None def to_proto(self): # Create a protobuf for the Serve Application info app_status_proto = self.app_status.to_proto() # Create protobufs for all individual deployment statuses deployment_status_protos = map( lambda status: status.to_proto(), self.deployment_statuses ) # Create a protobuf list containing all the deployment status protobufs deployment_status_proto_list = DeploymentStatusInfoListProto() deployment_status_proto_list.deployment_status_infos.extend( deployment_status_protos ) # Return protobuf encapsulating application and deployment protos return StatusOverviewProto( name=self.name, app_status=app_status_proto, deployment_statuses=deployment_status_proto_list, ) @classmethod def from_proto(cls, proto: StatusOverviewProto) -> "StatusOverview": # Recreate Serve Application info app_status = ApplicationStatusInfo.from_proto(proto.app_status) # Recreate deployment statuses deployment_statuses = [] for info_proto in proto.deployment_statuses.deployment_status_infos: deployment_statuses.append(DeploymentStatusInfo.from_proto(info_proto)) # Recreate StatusInfo return cls( app_status=app_status, deployment_statuses=deployment_statuses, name=proto.name, ) @dataclass class ApplicationTargetState: """Defines target state of application. Target state can become inconsistent if the code version doesn't match that of the config. When that happens, a new build app task should be kicked off to reconcile the inconsistency. deployment_infos: map of deployment name to deployment info. This is - None if a config was deployed but the app hasn't finished building yet, - An empty dict if the app is deleting. code_version: Code version of all deployments in target state. None if application was deployed through serve.run. config: application config deployed by user. None if application was deployed through serve.run. target_capacity: the target_capacity to use when adjusting num_replicas. target_capacity_direction: the scale direction to use when running the Serve autoscaler. deleting: whether the application is being deleted. external_scaler_enabled: whether external autoscaling is enabled for this application. serialized_application_autoscaling_policy_def: Optional[bytes] """ deployment_infos: Optional[Dict[str, DeploymentInfo]] code_version: Optional[str] config: Optional[ServeApplicationSchema] target_capacity: Optional[float] target_capacity_direction: Optional[TargetCapacityDirection] deleting: bool api_type: APIType serialized_application_autoscaling_policy_def: Optional[bytes] external_scaler_enabled: bool class ApplicationState: """Manage single application states with all operations""" def __init__( self, name: str, deployment_state_manager: DeploymentStateManager, autoscaling_state_manager: AutoscalingStateManager, endpoint_state: EndpointState, logging_config: LoggingConfig, external_scaler_enabled: bool, ): """ Initialize an ApplicationState instance. Args: name: Application name. deployment_state_manager: Manages the state of all deployments in the cluster. autoscaling_state_manager: Manages autoscaling decisions in the cluster. endpoint_state: Manages endpoints in the system. logging_config: Logging configuration schema. external_scaler_enabled: Whether external autoscaling is enabled for this application. """ self._name = name self._status_msg = "" self._deployment_state_manager = deployment_state_manager self._autoscaling_state_manager = autoscaling_state_manager self._endpoint_state = endpoint_state self._route_prefix: Optional[str] = None self._ingress_deployment_name: Optional[str] = None self._status: ApplicationStatus = ApplicationStatus.DEPLOYING self._deployment_timestamp = time.time() self._build_app_task_info: Optional[BuildAppTaskInfo] = None # Before a deploy app task finishes, we don't know what the # target deployments are, so set deployment_infos=None self._target_state: ApplicationTargetState = ApplicationTargetState( deployment_infos=None, code_version=None, config=None, target_capacity=None, target_capacity_direction=None, deleting=False, api_type=APIType.UNKNOWN, external_scaler_enabled=external_scaler_enabled, serialized_application_autoscaling_policy_def=None, ) self._logging_config = logging_config @property def route_prefix(self) -> Optional[str]: return self._route_prefix @property def external_scaler_enabled(self) -> bool: return self._target_state.external_scaler_enabled @property def docs_path(self) -> Optional[str]: # get the docs path from the running deployments # we are making an assumption that the docs path can only be set # on ingress deployments with fastapi. ingress_deployment = DeploymentID(self._ingress_deployment_name, self._name) return self._deployment_state_manager.get_deployment_docs_path( ingress_deployment ) @property def status(self) -> ApplicationStatus: """Status of the application. DEPLOYING: The build task is still running, or the deployments have started deploying but aren't healthy yet. RUNNING: All deployments are healthy. DEPLOY_FAILED: The build task failed or one or more deployments became unhealthy in the process of deploying UNHEALTHY: While the application was running, one or more deployments transition from healthy to unhealthy. DELETING: Application and its deployments are being deleted. """ return self._status @property def deployment_timestamp(self) -> float: return self._deployment_timestamp @property def target_deployments(self) -> List[str]: """List of target deployment names in application.""" if self._target_state.deployment_infos is None: return [] return list(self._target_state.deployment_infos.keys()) @property def ingress_deployment(self) -> Optional[str]: return self._ingress_deployment_name @property def api_type(self) -> APIType: return self._target_state.api_type def recover_target_state_from_checkpoint( self, checkpoint_data: ApplicationTargetState ): logger.info( f"Recovering target state for application '{self._name}' from checkpoint." ) self._set_target_state( checkpoint_data.deployment_infos, api_type=checkpoint_data.api_type, code_version=checkpoint_data.code_version, target_config=checkpoint_data.config, target_capacity=checkpoint_data.target_capacity, target_capacity_direction=checkpoint_data.target_capacity_direction, deleting=checkpoint_data.deleting, external_scaler_enabled=checkpoint_data.external_scaler_enabled, ) # Restore route prefix and docs path from checkpointed deployments when # the imperatively started application is restarting with controller. if checkpoint_data.deployment_infos is not None: self._route_prefix = self._check_routes(checkpoint_data.deployment_infos) # Restore app-level autoscaling policy from checkpoint if ( checkpoint_data.config and checkpoint_data.config.autoscaling_policy is not None ): self._autoscaling_state_manager.register_application( self._name, AutoscalingPolicy( _serialized_policy_def=checkpoint_data.serialized_application_autoscaling_policy_def, **checkpoint_data.config.autoscaling_policy, ), ) def _set_target_state( self, deployment_infos: Optional[Dict[str, DeploymentInfo]], *, api_type: APIType, code_version: Optional[str], target_config: Optional[ServeApplicationSchema], target_capacity: Optional[float] = None, target_capacity_direction: Optional[TargetCapacityDirection] = None, deleting: bool = False, external_scaler_enabled: bool = False, serialized_application_autoscaling_policy_def: Optional[bytes] = None, ): """Set application target state. While waiting for build task to finish, this should be (None, False) When build task has finished and during normal operation, this should be (target_deployments, False) When a request to delete the application has been received, this should be ({}, True) """ if deleting: self._update_status(ApplicationStatus.DELETING) else: self._update_status(ApplicationStatus.DEPLOYING) if deployment_infos is None: self._ingress_deployment_name = None else: for name, info in deployment_infos.items(): if info.ingress: self._ingress_deployment_name = name target_state = ApplicationTargetState( deployment_infos, code_version, target_config, target_capacity, target_capacity_direction, deleting, api_type=api_type, external_scaler_enabled=external_scaler_enabled, serialized_application_autoscaling_policy_def=serialized_application_autoscaling_policy_def, ) self._target_state = target_state def _set_target_state_deleting(self): """Set target state to deleting. Wipes the target deployment infos, code version, and config. """ self._set_target_state( deployment_infos={}, api_type=self._target_state.api_type, code_version=None, target_config=None, deleting=True, external_scaler_enabled=self.external_scaler_enabled, ) def _clear_target_state_and_store_config( self, target_config: Optional[ServeApplicationSchema], ): """Clears the target state and stores the config. NOTE: this currently assumes that this method is *only* called when managing apps deployed with the declarative API. """ self._set_target_state( deployment_infos=None, api_type=APIType.DECLARATIVE, code_version=None, target_config=target_config, deleting=False, external_scaler_enabled=target_config.external_scaler_enabled if target_config else False, ) def _delete_deployment(self, name: str) -> bool: """Delete a deployment in the application. Args: name: The name of the deployment to delete. Returns: Whether the target state has changed. """ id = DeploymentID(name=name, app_name=self._name) self._endpoint_state.delete_endpoint(id) return self._deployment_state_manager.delete_deployment(id) def delete(self): """Delete the application""" if self._status != ApplicationStatus.DELETING: logger.info( f"Deleting app '{self._name}'.", extra={"log_to_stderr": False}, ) self._set_target_state_deleting() def is_deleted(self) -> bool: """Check whether the application is already deleted. For an application to be considered deleted, the target state has to be set to deleting and all deployments have to be deleted. """ return self._target_state.deleting and len(self._get_live_deployments()) == 0 def should_autoscale(self) -> bool: """Determine if autoscaling is enabled for the application. Returns: Autoscaling is enabled for the application if any of the deployments have autoscaling enabled. """ return self._autoscaling_state_manager.should_autoscale_application(self._name) def autoscale(self) -> bool: """ Apply the autoscaling decisions for the application. If the application has deployment-level autoscaling, it will apply the autoscaling decisions for each deployment. Returns: True if there is a change to number of replicas for any deployment. False otherwise. """ target_deployments = self.target_deployments if len(target_deployments) == 0: return False deployment_to_target_num_replicas: Dict[DeploymentID, int] = {} for deployment_name in target_deployments: deployment_id = DeploymentID(name=deployment_name, app_name=self._name) target_num_replicas = ( self._deployment_state_manager.get_deployment_target_num_replicas( deployment_id ) ) if target_num_replicas is None: continue deployment_to_target_num_replicas[deployment_id] = target_num_replicas if len(deployment_to_target_num_replicas) == 0: return False decisions: Dict[ DeploymentID, int ] = self._autoscaling_state_manager.get_decision_num_replicas( self._name, deployment_to_target_num_replicas ) target_state_changed = False for deployment_id, decision_num_replicas in decisions.items(): target_state_changed = ( self._deployment_state_manager.autoscale( deployment_id, decision_num_replicas ) or target_state_changed ) return target_state_changed def apply_deployment_info( self, deployment_name: str, deployment_info: DeploymentInfo, ) -> bool: """Deploys a deployment in the application. Args: deployment_name: The name of the deployment to apply. deployment_info: The deployment info to apply. Returns: Whether the target state has changed. """ route_prefix = deployment_info.route_prefix if route_prefix is not None and not route_prefix.startswith("/"): raise RayServeException( f'Invalid route prefix "{route_prefix}", it must start with "/"' ) deployment_id = DeploymentID(name=deployment_name, app_name=self._name) target_state_changed = self._deployment_state_manager.deploy( deployment_id, deployment_info ) if deployment_info.route_prefix is not None: config = deployment_info.deployment_config # Try to get route_patterns from deployment state first (most up-to-date), # otherwise fall back to existing endpoint patterns route_patterns = ( self._deployment_state_manager.get_deployment_route_patterns( deployment_id ) ) self._endpoint_state.update_endpoint( deployment_id, # The current meaning of the "is_cross_language" field is ambiguous. # We will work on optimizing and removing this field in the future. # Instead of using the "is_cross_language" field, we will directly # compare if the replica is Python, which will assist the Python # router in determining if the replica invocation is a cross-language # operation. EndpointInfo( route=deployment_info.route_prefix, app_is_cross_language=config.deployment_language != DeploymentLanguage.PYTHON, route_patterns=route_patterns, ), ) else: self._endpoint_state.delete_endpoint(deployment_id) return target_state_changed def deploy_app( self, deployment_infos: Dict[str, DeploymentInfo], external_scaler_enabled: bool, ): """(Re-)deploy the application from list of deployment infos. This function should only be called to deploy an app from an imperative API (i.e., `serve.run` or Java API). Raises: RayServeException if there is more than one route prefix or docs path. """ # Check routes are unique in deployment infos self._route_prefix = self._check_routes(deployment_infos) self._set_target_state( deployment_infos=deployment_infos, api_type=APIType.IMPERATIVE, code_version=None, target_config=None, target_capacity=None, target_capacity_direction=None, external_scaler_enabled=external_scaler_enabled, ) def apply_app_config( self, config: ServeApplicationSchema, target_capacity: Optional[float], target_capacity_direction: Optional[TargetCapacityDirection], deployment_time: float, ) -> None: """Apply the config to the application. If the code version matches that of the current live deployments then it only applies the updated config to the deployment state manager. If the code version doesn't match, this will re-build the application. This function should only be called to (re-)deploy an app from the declarative API (i.e., through the REST API). """ self._deployment_timestamp = deployment_time config_version = get_app_code_version(config) if config_version == self._target_state.code_version: try: overrided_infos = override_deployment_info( self._target_state.deployment_infos, config, ) self._route_prefix = self._check_routes(overrided_infos) self._set_target_state( # Code version doesn't change. code_version=self._target_state.code_version, api_type=APIType.DECLARATIVE, # Everything else must reflect the new config. deployment_infos=overrided_infos, target_config=config, target_capacity=target_capacity, target_capacity_direction=target_capacity_direction, external_scaler_enabled=config.external_scaler_enabled, ) except (TypeError, ValueError, RayServeException): self._clear_target_state_and_store_config(config) self._update_status( ApplicationStatus.DEPLOY_FAILED, traceback.format_exc() ) except Exception: self._clear_target_state_and_store_config(config) self._update_status( ApplicationStatus.DEPLOY_FAILED, ( f"Unexpected error occurred while applying config for " f"application '{self._name}': \n{traceback.format_exc()}" ), ) else: # If there is an in progress build task, cancel it. if self._build_app_task_info and not self._build_app_task_info.finished: logger.info( f"Received new config for application '{self._name}'. " "Cancelling previous request." ) ray.cancel(self._build_app_task_info.obj_ref) # Halt reconciliation of target deployments. A new target state # will be set once the new app has finished building. self._clear_target_state_and_store_config(config) # Record telemetry for container runtime env feature if self._target_state.config.runtime_env.get( "container" ) or self._target_state.config.runtime_env.get("image_uri"): ServeUsageTag.APP_CONTAINER_RUNTIME_ENV_USED.record("1") if isinstance(config.autoscaling_policy, dict): application_autoscaling_policy_function = config.autoscaling_policy.get( "policy_function" ) else: application_autoscaling_policy_function = None deployment_to_autoscaling_policy_function = { deployment.name: deployment.autoscaling_config.get("policy", {}).get( "policy_function", DEFAULT_AUTOSCALING_POLICY_NAME ) for deployment in config.deployments if isinstance(deployment.autoscaling_config, dict) } deployment_to_request_router_cls = { deployment.name: deployment.request_router_config.get( "request_router_class", DEFAULT_REQUEST_ROUTER_PATH ) for deployment in config.deployments if isinstance(deployment.request_router_config, dict) } # Kick off new build app task logger.info(f"Importing and building app '{self._name}'.") build_app_obj_ref = build_serve_application.options( runtime_env=config.runtime_env, enable_task_events=RAY_SERVE_ENABLE_TASK_EVENTS, ).remote( config.import_path, config_version, config.name, config.args, self._logging_config, application_autoscaling_policy_function, deployment_to_autoscaling_policy_function, deployment_to_request_router_cls, ) self._build_app_task_info = BuildAppTaskInfo( obj_ref=build_app_obj_ref, code_version=config_version, config=config, target_capacity=target_capacity, target_capacity_direction=target_capacity_direction, finished=False, ) def _get_live_deployments(self) -> List[str]: return self._deployment_state_manager.get_deployments_in_application(self._name) def _determine_app_status(self) -> Tuple[ApplicationStatus, str]: """Check deployment statuses and target state, and determine the corresponding application status. Returns: Status (ApplicationStatus): RUNNING: all deployments are healthy or autoscaling. DEPLOYING: there is one or more updating deployments, and there are no unhealthy deployments. DEPLOY_FAILED: one or more deployments became unhealthy while the application was deploying. UNHEALTHY: one or more deployments became unhealthy while the application was running. DELETING: the application is being deleted. Error message (str): Non-empty string if status is DEPLOY_FAILED or UNHEALTHY """ if self._target_state.deleting: return ApplicationStatus.DELETING, "" # Get the lowest rank, i.e. highest priority, deployment status info object # The deployment status info with highest priority determines the corresponding # application status to set. deployment_statuses = self.get_deployments_statuses() lowest_rank_status = min(deployment_statuses, key=lambda info: info.rank) if lowest_rank_status.status == DeploymentStatus.DEPLOY_FAILED: failed_deployments = [ s.name for s in deployment_statuses if s.status == DeploymentStatus.DEPLOY_FAILED ] return ( ApplicationStatus.DEPLOY_FAILED, f"Failed to update the deployments {failed_deployments}.", ) elif lowest_rank_status.status == DeploymentStatus.UNHEALTHY: unhealthy_deployment_names = [ s.name for s in deployment_statuses if s.status == DeploymentStatus.UNHEALTHY ] return ( ApplicationStatus.UNHEALTHY, f"The deployments {unhealthy_deployment_names} are UNHEALTHY.", ) elif lowest_rank_status.status == DeploymentStatus.UPDATING: return ApplicationStatus.DEPLOYING, "" elif ( lowest_rank_status.status in [DeploymentStatus.UPSCALING, DeploymentStatus.DOWNSCALING] and lowest_rank_status.status_trigger == DeploymentStatusTrigger.CONFIG_UPDATE_STARTED ): return ApplicationStatus.DEPLOYING, "" else: return ApplicationStatus.RUNNING, "" def _reconcile_build_app_task( self, ) -> Tuple[Optional[bytes], Optional[Dict], BuildAppStatus, str]: """If necessary, reconcile the in-progress build task. Returns: Serialized application autoscaling policy def (bytes): The serialized application autoscaling policy def returned from the build app task if it was built successfully, otherwise None. Deploy arguments (Dict[str, DeploymentInfo]): The deploy arguments returned from the build app task and their code version. Status (BuildAppStatus): NO_TASK_IN_PROGRESS: There is no build task to reconcile. SUCCEEDED: Task finished successfully. FAILED: An error occurred during execution of build app task IN_PROGRESS: Task hasn't finished yet. Error message (str): Non-empty string if status is DEPLOY_FAILED or UNHEALTHY """ if self._build_app_task_info is None or self._build_app_task_info.finished: return None, None, BuildAppStatus.NO_TASK_IN_PROGRESS, "" if not check_obj_ref_ready_nowait(self._build_app_task_info.obj_ref): return None, None, BuildAppStatus.IN_PROGRESS, "" # Retrieve build app task result self._build_app_task_info.finished = True try: serialized_application_autoscaling_policy_def, args, err = ray.get( self._build_app_task_info.obj_ref ) if err is None: logger.info(f"Imported and built app '{self._name}' successfully.") else: return ( None, None, BuildAppStatus.FAILED, f"Deploying app '{self._name}' failed with exception:\n{err}", ) except RuntimeEnvSetupError: error_msg = ( f"Runtime env setup for app '{self._name}' failed:\n" + traceback.format_exc() ) return None, None, BuildAppStatus.FAILED, error_msg except Exception: error_msg = ( f"Unexpected error occurred while deploying application " f"'{self._name}': \n{traceback.format_exc()}" ) return None, None, BuildAppStatus.FAILED, error_msg # Convert serialized deployment args (returned by build app task) # to deployment infos and apply option overrides from config try: deployment_infos = { params["deployment_name"]: deploy_args_to_deployment_info( **params, app_name=self._name ) for params in args } deployment_to_serialized_autoscaling_policy_def = { params["deployment_name"]: params["serialized_autoscaling_policy_def"] for params in args if params["serialized_autoscaling_policy_def"] is not None } deployment_to_serialized_request_router_cls = { params["deployment_name"]: params["serialized_request_router_cls"] for params in args if params["serialized_request_router_cls"] is not None } overrided_infos = override_deployment_info( deployment_infos, self._build_app_task_info.config, deployment_to_serialized_autoscaling_policy_def, deployment_to_serialized_request_router_cls, ) self._route_prefix = self._check_routes(overrided_infos) return ( serialized_application_autoscaling_policy_def, overrided_infos, BuildAppStatus.SUCCEEDED, "", ) except (TypeError, ValueError, RayServeException): return None, None, BuildAppStatus.FAILED, traceback.format_exc() except Exception: error_msg = ( f"Unexpected error occurred while applying config for application " f"'{self._name}': \n{traceback.format_exc()}" ) return None, None, BuildAppStatus.FAILED, error_msg def _check_routes( self, deployment_infos: Dict[str, DeploymentInfo] ) -> Tuple[str, str]: """Check route prefixes of deployments in app. There should only be one non-null route prefix. If there is one, set it as the application route prefix. This function must be run every control loop iteration because the target config could be updated without kicking off a new task. Returns: route prefix. Raises: RayServeException if more than one route prefix is found among deployments. """ num_route_prefixes = 0 route_prefix = None for info in deployment_infos.values(): # Update route prefix of application, which may be updated # through a redeployed config. if info.route_prefix is not None: route_prefix = info.route_prefix num_route_prefixes += 1 if num_route_prefixes > 1: raise RayServeException( f'Found multiple route prefixes from application "{self._name}",' " Please specify only one route prefix for the application " "to avoid this issue." ) return route_prefix def _reconcile_target_deployments(self) -> None: """Reconcile target deployments in application target state. Ensure each deployment is running on up-to-date info, and remove outdated deployments from the application. """ target_state_changed = False # Set target state for each deployment for deployment_name, info in self._target_state.deployment_infos.items(): deploy_info = deepcopy(info) # Apply the target capacity information to the deployment info. deploy_info.set_target_capacity( new_target_capacity=self._target_state.target_capacity, new_target_capacity_direction=( self._target_state.target_capacity_direction ), ) # Apply the application logging config to the deployment logging config # if it is not set. if ( self._target_state.config and self._target_state.config.logging_config and deploy_info.deployment_config.logging_config is None ): deploy_info.deployment_config.logging_config = ( self._target_state.config.logging_config ) target_state_changed = ( self.apply_deployment_info(deployment_name, deploy_info) or target_state_changed ) # Delete outdated deployments for deployment_name in self._get_live_deployments(): if deployment_name not in self.target_deployments: target_state_changed = ( self._delete_deployment(deployment_name) or target_state_changed ) return target_state_changed def get_deployment_topology(self) -> Optional[DeploymentTopology]: """Get the deployment topology for this application. Returns: The deployment topology, or None if not yet built. """ if not self.target_deployments: return None nodes = {} # Using target deployments because we wish to build best effort topology based on current state. for deployment_name in self.target_deployments: deployment_id = DeploymentID(name=deployment_name, app_name=self._name) # Get outbound deployment names from deployment state outbound_deployment = ( self._deployment_state_manager.get_deployment_outbound_deployments( deployment_id ) ) or [] # Create node for this deployment node = DeploymentNode( name=deployment_name, app_name=self._name, outbound_deployments=[ {"name": dep.name, "app_name": dep.app_name} for dep in outbound_deployment ], is_ingress=(deployment_name == self._ingress_deployment_name), ) nodes[deployment_name] = node return DeploymentTopology( app_name=self._name, ingress_deployment=self._ingress_deployment_name, nodes=nodes, ) def update(self) -> Tuple[bool, bool]: """Attempts to reconcile this application to match its target state. Updates the application status and status message based on the current state of the system. Returns: Whether the target state has changed. """ target_state_changed = False # If the application is being deleted, ignore any build task results to # avoid flipping the state back to DEPLOYING/RUNNING. if not self._target_state.deleting: ( serialized_application_autoscaling_policy_def, infos, task_status, msg, ) = self._reconcile_build_app_task() if task_status == BuildAppStatus.SUCCEEDED: target_state_changed = True self._set_target_state( deployment_infos=infos, code_version=self._build_app_task_info.code_version, api_type=self._target_state.api_type, target_config=self._build_app_task_info.config, target_capacity=self._build_app_task_info.target_capacity, target_capacity_direction=( self._build_app_task_info.target_capacity_direction ), external_scaler_enabled=self._target_state.external_scaler_enabled, serialized_application_autoscaling_policy_def=serialized_application_autoscaling_policy_def, ) # Handling the case where the user turns off/turns on app-level autoscaling policy, # between app deployment. if ( self._target_state.config is not None and self._target_state.config.autoscaling_policy is not None ): self._autoscaling_state_manager.register_application( self._name, AutoscalingPolicy( _serialized_policy_def=serialized_application_autoscaling_policy_def, **self._target_state.config.autoscaling_policy, ), ) else: self._autoscaling_state_manager.deregister_application(self._name) elif task_status == BuildAppStatus.FAILED: self._update_status(ApplicationStatus.DEPLOY_FAILED, msg) # Only reconcile deployments when the build app task is finished. If # it's not finished, we don't know what the target list of deployments # is, so we don't perform any reconciliation. if self._target_state.deployment_infos is not None: target_state_changed = ( self._reconcile_target_deployments() or target_state_changed ) status, status_msg = self._determine_app_status() self._update_status(status, status_msg) # Check if app is ready to be deleted if self._target_state.deleting: return self.is_deleted(), target_state_changed return False, target_state_changed def get_checkpoint_data(self) -> ApplicationTargetState: return self._target_state def get_deployments_statuses(self) -> List[DeploymentStatusInfo]: """Return all deployment status information""" deployments = [ DeploymentID(name=deployment, app_name=self._name) for deployment in self.target_deployments ] return self._deployment_state_manager.get_deployment_statuses(deployments) def get_application_status_info(self) -> ApplicationStatusInfo: """Return the application status information""" return ApplicationStatusInfo( self._status, message=self._status_msg, deployment_timestamp=self._deployment_timestamp, ) def list_deployment_details(self) -> Dict[str, DeploymentDetails]: """Gets detailed info on all live deployments in this application. (Does not include deleted deployments.) Returns: A dictionary of deployment infos. The set of deployment info returned may not be the full list of deployments that are part of the application. This can happen when the application is still deploying and bringing up deployments, or when the application is deleting and some deployments have been deleted. """ details = { deployment_name: self._deployment_state_manager.get_deployment_details( DeploymentID(name=deployment_name, app_name=self._name) ) for deployment_name in self.target_deployments } return {k: v for k, v in details.items() if v is not None} def _update_status(self, status: ApplicationStatus, status_msg: str = "") -> None: if ( status_msg and status in [ ApplicationStatus.DEPLOY_FAILED, ApplicationStatus.UNHEALTHY, ] and status_msg != self._status_msg ): logger.error(status_msg) self._status = status self._status_msg = status_msg class ApplicationStateManager: def __init__( self, deployment_state_manager: DeploymentStateManager, autoscaling_state_manager: AutoscalingStateManager, endpoint_state: EndpointState, kv_store: KVStoreBase, logging_config: LoggingConfig, ): self._deployment_state_manager = deployment_state_manager self._autoscaling_state_manager = autoscaling_state_manager self._endpoint_state = endpoint_state self._kv_store = kv_store self._logging_config = logging_config self._shutting_down = False self._application_states: Dict[str, ApplicationState] = {} # Metric for tracking application status self._application_status_gauge = ray_metrics.Gauge( "serve_application_status", description=( "Numeric status of application. " "0=UNKNOWN, 1=DEPLOY_FAILED, 2=UNHEALTHY, 3=NOT_STARTED, " "4=DELETING, 5=DEPLOYING, 6=RUNNING." ), tag_keys=("application",), ) self._recover_from_checkpoint() def _recover_from_checkpoint(self): checkpoint = self._kv_store.get(CHECKPOINT_KEY) if checkpoint is not None: application_state_info = cloudpickle.loads(checkpoint) for app_name, checkpoint_data in application_state_info.items(): app_state = ApplicationState( app_name, self._deployment_state_manager, self._autoscaling_state_manager, self._endpoint_state, self._logging_config, checkpoint_data.external_scaler_enabled, ) app_state.recover_target_state_from_checkpoint(checkpoint_data) self._application_states[app_name] = app_state def delete_app(self, name: str) -> None: """Delete application by name""" if name not in self._application_states: return self._application_states[name].delete() def deploy_apps( self, name_to_deployment_args: Dict[str, List[Dict]], name_to_application_args: Dict[str, ApplicationArgsProto], ) -> None: live_route_prefixes: Dict[str, str] = { app_state.route_prefix: app_name for app_name, app_state in self._application_states.items() if app_state.route_prefix is not None and not app_state.status == ApplicationStatus.DELETING } for name, deployment_args in name_to_deployment_args.items(): for deploy_param in deployment_args: # Make sure route_prefix is not being used by other application. deploy_app_prefix = deploy_param.get("route_prefix") if deploy_app_prefix is None: continue existing_app_name = live_route_prefixes.get(deploy_app_prefix) # It's ok to redeploy an app with the same prefix # if it has the same name as the app already using that prefix. if existing_app_name is not None and existing_app_name != name: raise RayServeException( f"Prefix {deploy_app_prefix} is being used by application " f'"{existing_app_name}". Failed to deploy application "{name}".' ) # We might be deploying more than one app, # so we need to add this app's prefix to the # set of live route prefixes that we're checking # against during this batch operation. live_route_prefixes[deploy_app_prefix] = name application_args = name_to_application_args.get(name) external_scaler_enabled = application_args.external_scaler_enabled if name not in self._application_states: self._application_states[name] = ApplicationState( name, self._deployment_state_manager, self._autoscaling_state_manager, self._endpoint_state, self._logging_config, external_scaler_enabled, ) ServeUsageTag.NUM_APPS.record(str(len(self._application_states))) deployment_infos = { params["deployment_name"]: deploy_args_to_deployment_info( **params, app_name=name ) for params in deployment_args } self._application_states[name].deploy_app( deployment_infos, external_scaler_enabled ) def deploy_app( self, name: str, deployment_args: List[Dict], application_args: ApplicationArgsProto, ) -> None: """Deploy the specified app to the list of deployment arguments. This function should only be called if the app is being deployed through serve.run instead of from a config. Args: name: application name deployment_args_list: arguments for deploying a list of deployments. application_args: application arguments. """ self.deploy_apps({name: deployment_args}, {name: application_args}) def apply_app_configs( self, app_configs: List[ServeApplicationSchema], *, deployment_time: float = 0, target_capacity: Optional[float] = None, target_capacity_direction: Optional[TargetCapacityDirection] = None, ): """Declaratively apply the list of application configs. The applications will be reconciled to match the target state of the config. Any applications previously deployed declaratively that are *not* present in the list will be deleted. """ for app_config in app_configs: if app_config.name not in self._application_states: logger.info(f"Deploying new app '{app_config.name}'.") self._application_states[app_config.name] = ApplicationState( app_config.name, self._deployment_state_manager, self._autoscaling_state_manager, endpoint_state=self._endpoint_state, logging_config=self._logging_config, external_scaler_enabled=app_config.external_scaler_enabled, ) self._application_states[app_config.name].apply_app_config( app_config, target_capacity, target_capacity_direction, deployment_time=deployment_time, ) # Delete all apps that were previously deployed via the declarative API # but are not in the config being applied. existing_apps = { name for name, app_state in self._application_states.items() if app_state.api_type == APIType.DECLARATIVE } apps_in_config = {app_config.name for app_config in app_configs} for app_to_delete in existing_apps - apps_in_config: self.delete_app(app_to_delete) ServeUsageTag.NUM_APPS.record(str(len(self._application_states))) def get_deployments(self, app_name: str) -> List[str]: """Return all deployment names by app name""" if app_name not in self._application_states: return [] return self._application_states[app_name].target_deployments def get_deployments_statuses(self, app_name: str) -> List[DeploymentStatusInfo]: """Return all deployment statuses by app name""" if app_name not in self._application_states: return [] return self._application_states[app_name].get_deployments_statuses() def get_app_status(self, name: str) -> ApplicationStatus: if name not in self._application_states: return ApplicationStatus.NOT_STARTED return self._application_states[name].status def does_app_exist(self, name: str) -> bool: return name in self._application_states def get_app_status_info(self, name: str) -> ApplicationStatusInfo: if name not in self._application_states: return ApplicationStatusInfo( ApplicationStatus.NOT_STARTED, message=f"Application {name} doesn't exist", deployment_timestamp=0, ) return self._application_states[name].get_application_status_info() def get_docs_path(self, app_name: str) -> Optional[str]: return self._application_states[app_name].docs_path def get_route_prefix(self, name: str) -> Optional[str]: return self._application_states[name].route_prefix def get_ingress_deployment_name(self, name: str) -> Optional[str]: if name not in self._application_states: return None return self._application_states[name].ingress_deployment def get_app_source(self, name: str) -> APIType: return self._application_states[name].api_type def get_external_scaler_enabled(self, app_name: str) -> bool: """Check if external scaler is enabled for the application. Args: app_name: Name of the application. Returns: True if external_scaler_enabled is set for the application, False otherwise. """ return ( self.does_app_exist(app_name) and self._application_states[app_name].external_scaler_enabled ) def list_app_statuses( self, source: Optional[APIType] = None ) -> Dict[str, ApplicationStatusInfo]: """Return a dictionary with {app name: application info} Args: source: Optional API type filter. If provided, only returns apps deployed via the specified API type. Returns: Dict[str, ApplicationStatusInfo]: A dictionary mapping application names to their corresponding status information. """ if source is None: return { name: self._application_states[name].get_application_status_info() for name in self._application_states } else: return { name: self._application_states[name].get_application_status_info() for name in self._application_states if self.get_app_source(name) is source } def list_app_names(self) -> List[str]: """Return app names without instantiating status objects.""" return list(self._application_states.keys()) def list_deployment_details(self, name: str) -> Dict[str, DeploymentDetails]: """Gets detailed info on all deployments in specified application.""" if name not in self._application_states: return {} return self._application_states[name].list_deployment_details() def get_deployment_topology(self, app_name: str) -> Optional[DeploymentTopology]: """Get the deployment topology for an application. Args: app_name: Name of the application. Returns: The deployment topology for the application, or None if the application doesn't exist or the topology hasn't been built yet. """ if app_name not in self._application_states: return None return self._application_states[app_name].get_deployment_topology() def update(self) -> bool: """ Update each application state. Returns: bool: True if any application's target state changed during this update. """ apps_to_be_deleted = [] any_target_state_changed = False for name, app in self._application_states.items(): if app.should_autoscale(): any_target_state_changed = app.autoscale() or any_target_state_changed ready_to_be_deleted, app_target_state_changed = app.update() any_target_state_changed = ( any_target_state_changed or app_target_state_changed ) if ready_to_be_deleted: apps_to_be_deleted.append(name) logger.debug(f"Application '{name}' deleted successfully.") # Record application status metrics for name, app in self._application_states.items(): self._application_status_gauge.set( app.status.to_numeric(), tags={"application": name}, ) if len(apps_to_be_deleted) > 0: for app_name in apps_to_be_deleted: self._autoscaling_state_manager.deregister_application(app_name) del self._application_states[app_name] ServeUsageTag.NUM_APPS.record(str(len(self._application_states))) if any_target_state_changed: self.save_checkpoint() self._deployment_state_manager.save_checkpoint() return any_target_state_changed def shutdown(self) -> None: self._shutting_down = True for app_state in self._application_states.values(): app_state.delete() self._kv_store.delete(CHECKPOINT_KEY) def is_ready_for_shutdown(self) -> bool: """Return whether all applications have shut down. Iterate through all application states and check if all their applications are deleted. """ return self._shutting_down and all( app_state.is_deleted() for app_state in self._application_states.values() ) def save_checkpoint(self) -> None: """Write a checkpoint of all application states.""" if self._shutting_down: # Once we're told to shut down, stop writing checkpoints. # Calling .shutdown() deletes any existing checkpoint. return application_state_info = { app_name: app_state.get_checkpoint_data() for app_name, app_state in self._application_states.items() } self._kv_store.put( CHECKPOINT_KEY, cloudpickle.dumps(application_state_info), ) @ray.remote(num_cpus=0, max_calls=1) def build_serve_application( import_path: str, code_version: str, name: str, args: Dict, logging_config: LoggingConfig, application_autoscaling_policy_function: Optional[str], deployment_to_autoscaling_policy_function: Dict[str, str], deployment_to_request_router_cls: Dict[str, str], ) -> Tuple[Optional[bytes], Optional[List[Dict]], Optional[str]]: """Import and build a Serve application. Args: import_path: import path to top-level bound deployment. code_version: code version inferred from app config. All deployment versions are set to this code version. name: application name. If specified, application will be deployed without removing existing applications. args: Arguments to be passed to the application builder. logging_config: the logging config for the build app task. application_autoscaling_policy_function: the application autoscaling policy function name deployment_to_autoscaling_policy_function: a dictionary mapping deployment names to autoscaling policy function names deployment_to_request_router_cls: a dictionary mapping deployment names to request router class names Returns: Serialized application autoscaling policy def: a serialized autoscaling policy def for the application if it was built successfully, otherwise None. Deploy arguments: a list of deployment arguments if application was built successfully, otherwise None. Error message: a string if an error was raised, otherwise None. """ configure_component_logger( component_name="controller", component_id=f"build_{name}_{os.getpid()}", logging_config=logging_config, ) try: from ray.serve._private.api import call_user_app_builder_with_args_if_necessary # Import and build the application. args_info_str = f" with arguments {args}" if args else "" logger.info(f"Importing application '{name}'{args_info_str}.") app = call_user_app_builder_with_args_if_necessary( import_attr(import_path), args ) deploy_args_list = [] built_app: BuiltApplication = build_app( app, name=name, default_runtime_env=ray.get_runtime_context().runtime_env, ) num_ingress_deployments = 0 def _get_serialized_def(attr_path: str) -> bytes: module, attr = import_module_and_attr(attr_path) cloudpickle.register_pickle_by_value(module) serialized = cloudpickle.dumps(attr) cloudpickle.unregister_pickle_by_value(module) return serialized application_serialized_autoscaling_policy_def = None if application_autoscaling_policy_function is not None: application_serialized_autoscaling_policy_def = _get_serialized_def( application_autoscaling_policy_function ) for deployment in built_app.deployments: if inspect.isclass(deployment.func_or_class) and issubclass( deployment.func_or_class, ASGIAppReplicaWrapper ): num_ingress_deployments += 1 is_ingress = deployment.name == built_app.ingress_deployment_name deployment_to_serialized_autoscaling_policy_def = None deployment_to_serialized_request_router_cls = None if deployment.name in deployment_to_autoscaling_policy_function: deployment_to_serialized_autoscaling_policy_def = _get_serialized_def( deployment_to_autoscaling_policy_function[deployment.name] ) if deployment.name in deployment_to_request_router_cls: deployment_to_serialized_request_router_cls = _get_serialized_def( deployment_to_request_router_cls[deployment.name] ) deploy_args_list.append( get_deploy_args( name=deployment._name, replica_config=deployment._replica_config, ingress=is_ingress, deployment_config=deployment._deployment_config, version=code_version, route_prefix="/" if is_ingress else None, serialized_autoscaling_policy_def=deployment_to_serialized_autoscaling_policy_def, serialized_request_router_cls=deployment_to_serialized_request_router_cls, ) ) if num_ingress_deployments > 1: return ( None, None, ( f'Found multiple FastAPI deployments in application "{built_app.name}". ' "Please only include one deployment with @serve.ingress " "in your application to avoid this issue." ), ) return application_serialized_autoscaling_policy_def, deploy_args_list, None except KeyboardInterrupt: # Error is raised when this task is canceled with ray.cancel(), which # happens when deploy_apps() is called. logger.info( "Existing config deployment request terminated because of keyboard " "interrupt." ) return None, None, None except Exception: logger.error( f"Exception importing application '{name}'.\n{traceback.format_exc()}" ) return None, None, traceback.format_exc() def override_deployment_info( deployment_infos: Dict[str, DeploymentInfo], override_config: Optional[ServeApplicationSchema], deployment_to_serialized_autoscaling_policy_def: Optional[Dict[str, bytes]] = None, deployment_to_serialized_request_router_cls: Optional[Dict[str, bytes]] = None, ) -> Dict[str, DeploymentInfo]: """Override deployment infos with options from app config. Args: app_name: application name deployment_infos: deployment info loaded from code override_config: application config deployed by user with options to override those loaded from code. deployment_to_serialized_autoscaling_policy_def: serialized autoscaling policy def for each deployment deployment_to_serialized_request_router_cls: serialized request router cls for each deployment Returns: the updated deployment infos. Raises: ValueError: If config options have invalid values. TypeError: If config options have invalid types. """ deployment_infos = deepcopy(deployment_infos) if override_config is None: return deployment_infos config_dict = override_config.dict(exclude_unset=True) deployment_override_options = config_dict.get("deployments", []) # Override options for each deployment listed in the config. for options in deployment_override_options: if "max_ongoing_requests" in options: options["max_ongoing_requests"] = options.get("max_ongoing_requests") deployment_name = options["name"] if deployment_name not in deployment_infos: raise ValueError( f"Deployment '{deployment_name}' does not exist. " f"Available: {list(deployment_infos.keys())}" ) info = deployment_infos[deployment_name] original_options = info.deployment_config.dict() original_options["user_configured_option_names"].update(set(options)) # Override `max_ongoing_requests` and `autoscaling_config` if # `num_replicas="auto"` if options.get("num_replicas") == "auto": options["num_replicas"] = None new_config = AutoscalingConfig.default().dict() # If `autoscaling_config` is specified, its values override # the default `num_replicas="auto"` configuration autoscaling_config = ( options.get("autoscaling_config") or info.deployment_config.autoscaling_config ) if autoscaling_config: new_config.update(autoscaling_config) if ( deployment_to_serialized_autoscaling_policy_def and deployment_name in deployment_to_serialized_autoscaling_policy_def ): # By setting the serialized policy def, AutoscalingConfig constructor will not # try to import the policy from the string import path policy_obj = AutoscalingPolicy.from_serialized_policy_def( new_config["policy"], deployment_to_serialized_autoscaling_policy_def[deployment_name], ) new_config["policy"] = policy_obj options["autoscaling_config"] = AutoscalingConfig(**new_config) ServeUsageTag.AUTO_NUM_REPLICAS_USED.record("1") # What to pass to info.update override_options = {} # Merge app-level and deployment-level runtime_envs. replica_config = info.replica_config app_runtime_env = override_config.runtime_env if "ray_actor_options" in options: # If specified, get ray_actor_options from config override_actor_options = options.pop("ray_actor_options", {}) else: # Otherwise, get options from application code (and default to {} # if the code sets options to None). override_actor_options = replica_config.ray_actor_options or {} override_placement_group_bundles = options.pop( "placement_group_bundles", replica_config.placement_group_bundles ) override_placement_group_strategy = options.pop( "placement_group_strategy", replica_config.placement_group_strategy ) override_max_replicas_per_node = options.pop( "max_replicas_per_node", replica_config.max_replicas_per_node ) override_bundle_label_selector = options.pop( "placement_group_bundle_label_selector", replica_config.placement_group_bundle_label_selector, ) override_fallback_strategy = options.pop( "placement_group_fallback_strategy", replica_config.placement_group_fallback_strategy, ) # Record telemetry for container runtime env feature at deployment level if override_actor_options.get("runtime_env") and ( override_actor_options["runtime_env"].get("container") or override_actor_options["runtime_env"].get("image_uri") ): ServeUsageTag.DEPLOYMENT_CONTAINER_RUNTIME_ENV_USED.record("1") merged_env = override_runtime_envs_except_env_vars( app_runtime_env, override_actor_options.get("runtime_env", {}) ) override_actor_options.update({"runtime_env": merged_env}) replica_config.update( ray_actor_options=override_actor_options, placement_group_bundles=override_placement_group_bundles, placement_group_strategy=override_placement_group_strategy, max_replicas_per_node=override_max_replicas_per_node, placement_group_bundle_label_selector=override_bundle_label_selector, placement_group_fallback_strategy=override_fallback_strategy, ) override_options["replica_config"] = replica_config if "request_router_config" in options: request_router_config = options.get("request_router_config") if request_router_config: if ( deployment_to_serialized_request_router_cls and deployment_name in deployment_to_serialized_request_router_cls ): # By setting the serialized request router cls, RequestRouterConfig constructor will not # try to import the request router cls from the string import path options[ "request_router_config" ] = RequestRouterConfig.from_serialized_request_router_cls( request_router_config, deployment_to_serialized_request_router_cls[deployment_name], ) else: options["request_router_config"] = RequestRouterConfig( **request_router_config ) # Override deployment config options options.pop("name", None) original_options.update(options) override_options["deployment_config"] = DeploymentConfig(**original_options) deployment_infos[deployment_name] = info.update(**override_options) deployment_config = deployment_infos[deployment_name].deployment_config if ( deployment_config.autoscaling_config is not None and deployment_config.max_ongoing_requests < deployment_config.autoscaling_config.get_target_ongoing_requests() ): logger.warning( "Autoscaling will never happen, " "because 'max_ongoing_requests' is less than " "'target_ongoing_requests' now." ) # Overwrite ingress route prefix app_route_prefix = config_dict.get("route_prefix", DEFAULT.VALUE) validate_route_prefix(app_route_prefix) for deployment in list(deployment_infos.values()): if ( app_route_prefix is not DEFAULT.VALUE and deployment.route_prefix is not None ): deployment.route_prefix = app_route_prefix return deployment_infos