| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030 |
- import json
- from dataclasses import asdict, dataclass, field
- from enum import Enum
- from typing import Any, Awaitable, Callable, Dict, List, Optional
- from starlette.types import Scope
- import ray
- from ray._common.pydantic_compat import BaseModel
- from ray.actor import ActorHandle
- from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME, SERVE_NAMESPACE
- from ray.serve._private.thirdparty.get_asgi_route_name import RoutePattern
- from ray.serve.generated.serve_pb2 import (
- DeploymentStatus as DeploymentStatusProto,
- DeploymentStatusInfo as DeploymentStatusInfoProto,
- DeploymentStatusTrigger as DeploymentStatusTriggerProto,
- )
- from ray.serve.grpc_util import RayServegRPCContext
- from ray.util.annotations import PublicAPI
- REPLICA_ID_FULL_ID_STR_PREFIX = "SERVE_REPLICA::"
- @dataclass(frozen=True)
- class DeploymentID:
- name: str
- app_name: str = SERVE_DEFAULT_APP_NAME
- def __hash__(self):
- # Lazy hash caching: compute on first access, cache for subsequent calls.
- # The _hash attribute is excluded from pickling via __getstate__, so after
- # deserialization it gets recomputed with the correct per-process hash seed.
- try:
- return self._hash
- except AttributeError:
- h = hash((self.name, self.app_name))
- object.__setattr__(self, "_hash", h)
- return h
- def __getstate__(self):
- # Exclude _hash from pickling - it must be recomputed per-process
- return {"name": self.name, "app_name": self.app_name}
- def __setstate__(self, state):
- object.__setattr__(self, "name", state["name"])
- object.__setattr__(self, "app_name", state["app_name"])
- def to_replica_actor_class_name(self):
- return f"ServeReplica:{self.app_name}:{self.name}"
- def __str__(self):
- return f"Deployment(name='{self.name}', app='{self.app_name}')"
- def __repr__(self):
- return str(self)
- @PublicAPI(stability="alpha")
- @dataclass(frozen=True)
- class ReplicaID:
- """A unique identifier for a replica."""
- unique_id: str
- """A unique identifier for the replica within the deployment."""
- deployment_id: DeploymentID
- """The deployment this replica belongs to."""
- def __hash__(self):
- # Lazy hash caching: compute on first access, cache for subsequent calls.
- # The _hash attribute is excluded from pickling via __getstate__, so after
- # deserialization it gets recomputed with the correct per-process hash seed.
- try:
- return self._hash
- except AttributeError:
- h = hash((self.unique_id, self.deployment_id))
- object.__setattr__(self, "_hash", h)
- return h
- def __getstate__(self):
- # Exclude _hash from pickling - it must be recomputed per-process
- return {"unique_id": self.unique_id, "deployment_id": self.deployment_id}
- def __setstate__(self, state):
- object.__setattr__(self, "unique_id", state["unique_id"])
- object.__setattr__(self, "deployment_id", state["deployment_id"])
- def to_full_id_str(self) -> str:
- s = f"{self.deployment_id.name}#{self.unique_id}"
- if self.deployment_id.app_name:
- s = f"{self.deployment_id.app_name}#{s}"
- return f"{REPLICA_ID_FULL_ID_STR_PREFIX}{s}"
- @staticmethod
- def is_full_id_str(s: str) -> bool:
- return s.startswith(REPLICA_ID_FULL_ID_STR_PREFIX)
- @classmethod
- def from_full_id_str(cls, s: str):
- assert cls.is_full_id_str(s)
- parsed = s[len(REPLICA_ID_FULL_ID_STR_PREFIX) :].split("#")
- if len(parsed) == 3:
- app_name, deployment_name, unique_id = parsed
- elif len(parsed) == 2:
- app_name = ""
- deployment_name, unique_id = parsed
- else:
- raise ValueError(
- f"Given replica ID string {s} didn't match expected pattern, "
- "ensure it has either two or three fields with delimiter '#'."
- )
- return cls(
- unique_id,
- deployment_id=DeploymentID(name=deployment_name, app_name=app_name),
- )
- def __repr__(self) -> str:
- return str(self)
- def __str__(self) -> str:
- """Returns a human-readable string.
- This is used in user-facing log messages, so take care when updating it.
- """
- return (
- f"Replica("
- f"id='{self.unique_id}', "
- f"deployment='{self.deployment_id.name}', "
- f"app='{self.deployment_id.app_name}'"
- ")"
- )
- NodeId = str
- Duration = float
- ApplicationName = str
- @dataclass
- class EndpointInfo:
- """Metadata about a deployment's HTTP/gRPC endpoint.
- This represents the public routing interface for a deployment. It's created when
- a deployment is registered with a route prefix and broadcast to all proxies via
- the long poll mechanism (ROUTE_TABLE namespace).
- Flow:
- 1. Created in ApplicationState when deployment is applied
- 2. Stored in EndpointState (controller's source of truth)
- 3. Broadcast to all ProxyActors via long poll (ROUTE_TABLE)
- 4. Cached in ProxyRouter for request routing
- 5. Used to route incoming HTTP/gRPC requests to correct deployments
- 6. Used to determine route patterns for accurate metrics tagging
- Key Difference from DeploymentInfo:
- - EndpointInfo: Just HTTP/gRPC routing metadata (shared with proxies)
- - DeploymentInfo: Complete deployment config (replicas, resources, etc.)
- Attributes:
- route: The route prefix for this deployment (e.g., "/api").
- app_is_cross_language: Whether the deployment uses a different language
- than the proxy (e.g., Java deployment with Python proxy). This affects
- how the proxy serializes/deserializes requests.
- route_patterns: List of RoutePattern objects for ASGI route patterns.
- Each RoutePattern has methods (list of HTTP methods or None) and path.
- Examples: [RoutePattern(methods=["GET", "POST"], path="/"),
- RoutePattern(methods=["PUT"], path="/users/{id}"),
- RoutePattern(methods=None, path="/websocket")]
- Used by proxies to match incoming requests to specific route patterns
- for accurate metrics tagging. This avoids high cardinality by using
- parameterized patterns instead of individual request paths.
- Only populated for deployments with ASGI apps (FastAPI/Starlette).
- """
- route: str
- app_is_cross_language: bool = False
- route_patterns: Optional[List["RoutePattern"]] = None
- # Keep in sync with ServeReplicaState in dashboard/client/src/type/serve.ts
- class ReplicaState(str, Enum):
- STARTING = "STARTING"
- UPDATING = "UPDATING"
- RECOVERING = "RECOVERING"
- RUNNING = "RUNNING"
- STOPPING = "STOPPING"
- PENDING_MIGRATION = "PENDING_MIGRATION"
- class DeploymentStatus(str, Enum):
- UPDATING = "UPDATING"
- HEALTHY = "HEALTHY"
- UNHEALTHY = "UNHEALTHY"
- DEPLOY_FAILED = "DEPLOY_FAILED"
- UPSCALING = "UPSCALING"
- DOWNSCALING = "DOWNSCALING"
- def to_numeric(self) -> int:
- """Convert status to numeric value for metrics, it serves state
- progression order on the dashboard.
- 0 is reserved for UNKNOWN. Values are ordered by severity/state progression:
- 0=UNKNOWN, 1=DEPLOY_FAILED, 2=UNHEALTHY, 3=UPDATING,
- 4=UPSCALING, 5=DOWNSCALING, 6=HEALTHY
- """
- mapping = {
- DeploymentStatus.DEPLOY_FAILED: 1,
- DeploymentStatus.UNHEALTHY: 2,
- DeploymentStatus.UPDATING: 3,
- DeploymentStatus.UPSCALING: 4,
- DeploymentStatus.DOWNSCALING: 5,
- DeploymentStatus.HEALTHY: 6,
- }
- return mapping.get(self, 0)
- class DeploymentStatusTrigger(str, Enum):
- """Explains how a deployment reached its current DeploymentStatus."""
- UNSPECIFIED = "UNSPECIFIED"
- CONFIG_UPDATE_STARTED = "CONFIG_UPDATE_STARTED"
- CONFIG_UPDATE_COMPLETED = "CONFIG_UPDATE_COMPLETED"
- UPSCALE_COMPLETED = "UPSCALE_COMPLETED"
- DOWNSCALE_COMPLETED = "DOWNSCALE_COMPLETED"
- AUTOSCALING = "AUTOSCALING"
- REPLICA_STARTUP_FAILED = "REPLICA_STARTUP_FAILED"
- HEALTH_CHECK_FAILED = "HEALTH_CHECK_FAILED"
- INTERNAL_ERROR = "INTERNAL_ERROR"
- DELETING = "DELETING"
- # Internal Enum used to manage deployment state machine
- class DeploymentStatusInternalTrigger(str, Enum):
- HEALTHY = "HEALTHY"
- CONFIG_UPDATE = "CONFIG_UPDATE"
- AUTOSCALE_UP = "AUTOSCALE_UP"
- AUTOSCALE_DOWN = "AUTOSCALE_DOWN"
- # MANUALLY_INCREASE_NUM_REPLICAS and MANUALLY_DECREASE_NUM_REPLICAS are used
- # instead of CONFIG_UPDATE when the config update only scales
- # the number of replicas.
- MANUALLY_INCREASE_NUM_REPLICAS = "MANUALLY_INCREASE_NUM_REPLICAS"
- MANUALLY_DECREASE_NUM_REPLICAS = "MANUALLY_DECREASE_NUM_REPLICAS"
- REPLICA_STARTUP_FAILED = "REPLICA_STARTUP_FAILED"
- HEALTH_CHECK_FAILED = "HEALTH_CHECK_FAILED"
- INTERNAL_ERROR = "INTERNAL_ERROR"
- DELETE = "DELETE"
- # List of states in ranked order.
- #
- # Each ranked state has the format of a tuple with either 1 or 2 items.
- # If 1 item: contains a single DeploymentStatus, representing states with
- # that DeploymentStatus and any DeploymentStatusTrigger.
- # If 2 items: tuple contains a DeploymentStatus and a DeploymentStatusTrigger,
- # representing a state with that status and status trigger.
- DEPLOYMENT_STATUS_RANKING_ORDER = {
- # Status ranking order is defined in a following fashion:
- # 0. (Highest) State signaling a deploy failure.
- (DeploymentStatus.DEPLOY_FAILED,): 0,
- # 1. State signaling any non-deploy failures in the system.
- (DeploymentStatus.UNHEALTHY,): 1,
- # 2. States signaling the user updated the configuration.
- (DeploymentStatus.UPDATING,): 2,
- (DeploymentStatus.UPSCALING, DeploymentStatusTrigger.CONFIG_UPDATE_STARTED): 2,
- (
- DeploymentStatus.DOWNSCALING,
- DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
- ): 2,
- # 3. Steady state or autoscaling.
- (DeploymentStatus.UPSCALING, DeploymentStatusTrigger.AUTOSCALING): 3,
- (DeploymentStatus.DOWNSCALING, DeploymentStatusTrigger.AUTOSCALING): 3,
- (DeploymentStatus.HEALTHY,): 3,
- }
- @dataclass(eq=True)
- class DeploymentStatusInfo:
- name: str
- status: DeploymentStatus
- status_trigger: DeploymentStatusTrigger
- message: str = ""
- @property
- def rank(self) -> int:
- """Get priority of state based on ranking_order().
- The ranked order indicates what the status should be of a
- hierarchically "higher" resource when derived from a group of
- `DeploymentStatusInfo` sub-resources.
- """
- if (self.status,) in DEPLOYMENT_STATUS_RANKING_ORDER:
- return DEPLOYMENT_STATUS_RANKING_ORDER[(self.status,)]
- elif (self.status, self.status_trigger) in DEPLOYMENT_STATUS_RANKING_ORDER:
- return DEPLOYMENT_STATUS_RANKING_ORDER[(self.status, self.status_trigger)]
- def debug_string(self):
- return json.dumps(asdict(self), indent=4)
- def _updated_copy(
- self,
- status: DeploymentStatus = None,
- status_trigger: DeploymentStatusTrigger = None,
- message: str = "",
- ):
- """Returns a copy of the current object with the passed in kwargs updated."""
- return DeploymentStatusInfo(
- name=self.name,
- status=status if status else self.status,
- status_trigger=status_trigger if status_trigger else self.status_trigger,
- message=message,
- )
- def update_message(self, message: str):
- return self._updated_copy(message=message)
- def handle_transition(
- self,
- trigger: DeploymentStatusInternalTrigger,
- message: str = "",
- ) -> "DeploymentStatusInfo":
- """Handles a transition from the current state to the next state.
- Args:
- trigger: An internal trigger that determines the state
- transition. This is the new incoming trigger causing the
- transition.
- message: The message to set in status info.
- Returns:
- New instance of DeploymentStatusInfo representing the
- next state to transition to.
- """
- # If there was an unexpected internal error during reconciliation, set
- # status to unhealthy immediately and return
- if trigger == DeploymentStatusInternalTrigger.INTERNAL_ERROR:
- return self._updated_copy(
- status=DeploymentStatus.UNHEALTHY,
- status_trigger=DeploymentStatusTrigger.INTERNAL_ERROR,
- message=message,
- )
- # If deployment is being deleted, set status immediately and return
- elif trigger == DeploymentStatusInternalTrigger.DELETE:
- return self._updated_copy(
- status=DeploymentStatus.UPDATING,
- status_trigger=DeploymentStatusTrigger.DELETING,
- message=message,
- )
- # Otherwise, go through normal state machine transitions
- elif self.status == DeploymentStatus.UPDATING:
- # Finished updating configuration and transition to healthy
- if trigger == DeploymentStatusInternalTrigger.HEALTHY:
- return self._updated_copy(
- status=DeploymentStatus.HEALTHY,
- status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_COMPLETED,
- message=message,
- )
- # A new configuration has been deployed before deployment
- # has finished updating
- elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE:
- return self._updated_copy(
- status=DeploymentStatus.UPDATING,
- status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
- message=message,
- )
- # Autoscaling.
- elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_UP:
- return self._updated_copy(
- status=DeploymentStatus.UPSCALING,
- status_trigger=DeploymentStatusTrigger.AUTOSCALING,
- message=message,
- )
- elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_DOWN:
- return self._updated_copy(
- status=DeploymentStatus.DOWNSCALING,
- status_trigger=DeploymentStatusTrigger.AUTOSCALING,
- message=message,
- )
- # Manually increasing or decreasing num replicas does not
- # change the status while deployment is still updating.
- elif trigger in {
- DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS,
- DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS,
- }:
- return self
- # Failures occurred while a deployment was being updated
- elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED:
- return self._updated_copy(
- status=DeploymentStatus.DEPLOY_FAILED,
- status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED,
- message=message,
- )
- elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED:
- return self._updated_copy(
- status=DeploymentStatus.DEPLOY_FAILED,
- status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED,
- message=message,
- )
- elif self.status in {DeploymentStatus.UPSCALING, DeploymentStatus.DOWNSCALING}:
- # Failures occurred while upscaling/downscaling
- if trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED:
- return self._updated_copy(
- status=DeploymentStatus.UNHEALTHY,
- status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED,
- message=message,
- )
- elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED:
- return self._updated_copy(
- status=DeploymentStatus.UNHEALTHY,
- status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED,
- message=message,
- )
- # Deployment transitions to healthy
- elif trigger == DeploymentStatusInternalTrigger.HEALTHY:
- return self._updated_copy(
- status=DeploymentStatus.HEALTHY,
- status_trigger=DeploymentStatusTrigger.UPSCALE_COMPLETED
- if self.status == DeploymentStatus.UPSCALING
- else DeploymentStatusTrigger.DOWNSCALE_COMPLETED,
- message=message,
- )
- # Configuration is updated before scaling is finished
- elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE:
- return self._updated_copy(
- status=DeploymentStatus.UPDATING,
- status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
- message=message,
- )
- elif self.status_trigger == DeploymentStatusTrigger.AUTOSCALING:
- # Upscale replicas before previous autoscaling has finished
- if trigger == DeploymentStatusInternalTrigger.AUTOSCALE_UP:
- return self._updated_copy(
- status=DeploymentStatus.UPSCALING,
- message=message,
- )
- # Downscale replicas before previous autoscaling has finished
- elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_DOWN:
- return self._updated_copy(
- status=DeploymentStatus.DOWNSCALING,
- message=message,
- )
- # Manually upscale replicas with config update before previous autoscaling has finished
- elif (
- trigger
- == DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS
- ):
- return self._updated_copy(
- status=DeploymentStatus.UPSCALING,
- status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
- message=message,
- )
- # Manually downscale replicas with config update before previous autoscaling has finished
- elif (
- trigger
- == DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS
- ):
- return self._updated_copy(
- status=DeploymentStatus.DOWNSCALING,
- status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
- message=message,
- )
- elif self.status_trigger == DeploymentStatusTrigger.CONFIG_UPDATE_STARTED:
- # Upscale replicas before previous config update has finished
- if (
- trigger
- == DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS
- ):
- return self._updated_copy(
- status=DeploymentStatus.UPSCALING, message=message
- )
- # Downscale replicas before previous config update has finished
- elif (
- trigger
- == DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS
- ):
- return self._updated_copy(
- status=DeploymentStatus.DOWNSCALING, message=message
- )
- elif self.status == DeploymentStatus.HEALTHY:
- # Deployment remains healthy
- if trigger == DeploymentStatusInternalTrigger.HEALTHY:
- return self
- # New configuration is deployed
- elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE:
- return self._updated_copy(
- status=DeploymentStatus.UPDATING,
- status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
- message=message,
- )
- # Manually scaling / autoscaling num replicas
- elif (
- trigger
- == DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS
- ):
- return self._updated_copy(
- status=DeploymentStatus.UPSCALING,
- status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
- message=message,
- )
- elif (
- trigger
- == DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS
- ):
- return self._updated_copy(
- status=DeploymentStatus.DOWNSCALING,
- status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
- message=message,
- )
- elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_UP:
- return self._updated_copy(
- status=DeploymentStatus.UPSCALING,
- status_trigger=DeploymentStatusTrigger.AUTOSCALING,
- message=message,
- )
- elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_DOWN:
- return self._updated_copy(
- status=DeploymentStatus.DOWNSCALING,
- status_trigger=DeploymentStatusTrigger.AUTOSCALING,
- message=message,
- )
- # Health check for one or more replicas has failed
- elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED:
- return self._updated_copy(
- status=DeploymentStatus.UNHEALTHY,
- status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED,
- message=message,
- )
- elif self.status == DeploymentStatus.UNHEALTHY:
- # The deployment recovered
- if trigger == DeploymentStatusInternalTrigger.HEALTHY:
- return self._updated_copy(
- status=DeploymentStatus.HEALTHY,
- status_trigger=DeploymentStatusTrigger.UNSPECIFIED,
- message=message,
- )
- # A new configuration is being deployed.
- elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE:
- return self._updated_copy(
- status=DeploymentStatus.UPDATING,
- status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
- message=message,
- )
- # Old failures keep getting triggered, or new failures occurred.
- elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED:
- return self._updated_copy(
- status=DeploymentStatus.UNHEALTHY,
- status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED,
- message=message,
- )
- elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED:
- return self._updated_copy(
- status=DeploymentStatus.UNHEALTHY,
- status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED,
- message=message,
- )
- elif self.status == DeploymentStatus.DEPLOY_FAILED:
- # The deployment recovered
- if trigger == DeploymentStatusInternalTrigger.HEALTHY:
- return self._updated_copy(
- status=DeploymentStatus.HEALTHY,
- status_trigger=DeploymentStatusTrigger.UNSPECIFIED,
- message=message,
- )
- # A new configuration is being deployed.
- elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE:
- return self._updated_copy(
- status=DeploymentStatus.UPDATING,
- status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
- message=message,
- )
- # Old failures keep getting triggered, or new failures occurred.
- elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED:
- return self._updated_copy(
- status=DeploymentStatus.DEPLOY_FAILED,
- status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED,
- message=message,
- )
- elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED:
- return self._updated_copy(
- status=DeploymentStatus.DEPLOY_FAILED,
- status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED,
- message=message,
- )
- # If it's any other transition, ignore it.
- return self
- def to_proto(self):
- return DeploymentStatusInfoProto(
- name=self.name,
- status=f"DEPLOYMENT_STATUS_{self.status.name}",
- status_trigger=f"DEPLOYMENT_STATUS_TRIGGER_{self.status_trigger.name}",
- message=self.message,
- )
- @classmethod
- def from_proto(cls, proto: DeploymentStatusInfoProto):
- status = DeploymentStatusProto.Name(proto.status)[len("DEPLOYMENT_STATUS_") :]
- status_trigger = DeploymentStatusTriggerProto.Name(proto.status_trigger)[
- len("DEPLOYMENT_STATUS_TRIGGER_") :
- ]
- return cls(
- name=proto.name,
- status=DeploymentStatus(status),
- status_trigger=DeploymentStatusTrigger(status_trigger),
- message=proto.message,
- )
- @dataclass(frozen=True)
- class RunningReplicaInfo:
- replica_id: ReplicaID
- node_id: Optional[str]
- node_ip: Optional[str]
- availability_zone: Optional[str]
- actor_name: str
- max_ongoing_requests: int
- is_cross_language: bool = False
- multiplexed_model_ids: List[str] = field(default_factory=list)
- routing_stats: Dict[str, Any] = field(default_factory=dict)
- port: Optional[int] = None
- def __post_init__(self):
- # Set hash value when object is constructed.
- # We use _actor_id to hash the ActorHandle object
- # instead of actor_handle itself to make sure
- # it is consistently same actor handle between different
- # object ids.
- hash_val = hash(
- " ".join(
- [
- self.replica_id.to_full_id_str(),
- self.node_id if self.node_id else "",
- self.actor_name,
- str(self.max_ongoing_requests),
- str(self.is_cross_language),
- str(self.multiplexed_model_ids),
- str(self.routing_stats),
- ]
- )
- )
- # RunningReplicaInfo class set frozen=True, this is the hacky way to set
- # new attribute for the class.
- object.__setattr__(self, "_hash", hash_val)
- def __hash__(self):
- return self._hash
- def __eq__(self, other):
- return all(
- [
- isinstance(other, RunningReplicaInfo),
- self._hash == other._hash,
- ]
- )
- def get_actor_handle(self) -> ActorHandle:
- actor_handle = ray.get_actor(self.actor_name, namespace=SERVE_NAMESPACE)
- return actor_handle
- @dataclass(frozen=True)
- class DeploymentTargetInfo:
- is_available: bool
- running_replicas: List[RunningReplicaInfo]
- class ServeDeployMode(str, Enum):
- MULTI_APP = "MULTI_APP"
- class ServeComponentType(str, Enum):
- REPLICA = "replica"
- @dataclass
- class RequestRoutingInfo:
- """Information about the request routing.
- It includes deployment name (from ReplicaID), replica tag (from ReplicaID),
- multiplex model ids, and routing stats.
- """
- replica_id: ReplicaID
- multiplexed_model_ids: Optional[List[str]] = None
- routing_stats: Optional[Dict[str, Any]] = None
- @dataclass
- class gRPCRequest:
- """Sent from the GRPC proxy to replicas on both unary and streaming codepaths."""
- user_request_proto: Any
- class RequestProtocol(str, Enum):
- UNDEFINED = "UNDEFINED"
- HTTP = "HTTP"
- GRPC = "gRPC"
- class DeploymentHandleSource(str, Enum):
- UNKNOWN = "UNKNOWN"
- PROXY = "PROXY"
- REPLICA = "REPLICA"
- @dataclass
- class RequestMetadata:
- # request_id can be passed by the client and is only generated by the proxy if the
- # client did not pass it in the headers. It is used for logging across different
- # system. We can not guarantee the uniqueness of its value.
- request_id: str
- # internal_request_id is always generated by the proxy and is used for tracking
- # request objects. We can assume this is always unique between requests.
- internal_request_id: str
- # Method of the user callable to execute.
- call_method: str = "__call__"
- # HTTP route path of the request.
- route: str = ""
- # Application name.
- app_name: str = ""
- # Multiplexed model ID.
- multiplexed_model_id: str = ""
- # If this request expects a streaming response.
- is_streaming: bool = False
- _http_method: str = ""
- # The protocol to serve this request
- _request_protocol: RequestProtocol = RequestProtocol.UNDEFINED
- # Serve's gRPC context associated with this request for getting and setting metadata
- grpc_context: Optional[RayServegRPCContext] = None
- # Tracing context
- tracing_context: Optional[Dict[str, str]] = None
- # Whether it is a direct ingress request
- is_direct_ingress: bool = False
- # By reference or value
- _by_reference: bool = True
- _on_separate_loop: bool = True
- # gRPC serialization options
- request_serialization: str = "cloudpickle"
- response_serialization: str = "cloudpickle"
- @property
- def is_http_request(self) -> bool:
- return self._request_protocol == RequestProtocol.HTTP
- @property
- def is_grpc_request(self) -> bool:
- return self._request_protocol == RequestProtocol.GRPC
- class StreamingHTTPRequest:
- """Sent from the HTTP proxy to replicas on the streaming codepath."""
- def __init__(
- self,
- asgi_scope: Scope,
- *,
- proxy_actor_name: Optional[str] = None,
- receive_asgi_messages: Optional[
- Callable[[RequestMetadata], Awaitable[bytes]]
- ] = None,
- ):
- self._asgi_scope: Scope = asgi_scope
- if proxy_actor_name is None and receive_asgi_messages is None:
- raise ValueError(
- "Either proxy_actor_name or receive_asgi_messages must be provided."
- )
- # If receive_asgi_messages is passed, it'll be called directly.
- # If proxy_actor_name is passed, the actor will be fetched and its
- # `receive_asgi_messages` method will be called.
- self._proxy_actor_name: Optional[str] = proxy_actor_name
- # Need to keep the actor handle cached to avoid "lost reference to actor" error.
- self._cached_proxy_actor: Optional[ActorHandle] = None
- self._receive_asgi_messages: Optional[
- Callable[[RequestMetadata], Awaitable[bytes]]
- ] = receive_asgi_messages
- @property
- def asgi_scope(self) -> Scope:
- return self._asgi_scope
- @property
- def receive_asgi_messages(self) -> Callable[[RequestMetadata], Awaitable[bytes]]:
- if self._receive_asgi_messages is None:
- self._cached_proxy_actor = ray.get_actor(
- self._proxy_actor_name, namespace=SERVE_NAMESPACE
- )
- self._receive_asgi_messages = (
- self._cached_proxy_actor.receive_asgi_messages.remote
- )
- return self._receive_asgi_messages
- class TargetCapacityDirection(str, Enum):
- """Determines what direction the target capacity is scaling."""
- UP = "UP"
- DOWN = "DOWN"
- @dataclass(frozen=True)
- class ReplicaQueueLengthInfo:
- accepted: bool
- num_ongoing_requests: int
- @dataclass(frozen=True)
- class CreatePlacementGroupRequest:
- bundles: List[Dict[str, float]]
- strategy: str
- target_node_id: str
- name: str
- runtime_env: Optional[str] = None
- bundle_label_selector: Optional[List[Dict[str, str]]] = None
- fallback_strategy: Optional[List[Dict[str, Any]]] = None
- # This error is used to raise when a by-value DeploymentResponse is converted to an
- # ObjectRef.
- OBJ_REF_NOT_SUPPORTED_ERROR = RuntimeError(
- "Converting by-value DeploymentResponses to ObjectRefs is not supported. "
- "Use handle.options(_by_reference=True) to enable it."
- )
- class AutoscalingStatus(str, Enum):
- UPSCALE = "AUTOSCALING_UPSCALE"
- DOWNSCALE = "AUTOSCALING_DOWNSCALE"
- STABLE = "AUTOSCALING_STABLE"
- @staticmethod
- def format_scaling_status(trigger: "AutoscalingStatus") -> str:
- mapping = {
- AutoscalingStatus.UPSCALE: "scaling up",
- AutoscalingStatus.DOWNSCALE: "scaling down",
- AutoscalingStatus.STABLE: "stable",
- }
- return mapping.get(trigger, str(trigger).lower())
- class DeploymentSnapshot(BaseModel):
- snapshot_type: str = "deployment"
- timestamp_str: str
- app: str
- deployment: str
- current_replicas: int
- target_replicas: int
- min_replicas: Optional[int]
- max_replicas: Optional[int]
- scaling_status: str
- policy_name: str
- look_back_period_s: Optional[float]
- queued_requests: Optional[float]
- ongoing_requests: float
- metrics_health: str
- errors: List[str]
- @staticmethod
- def format_metrics_health_text(
- *,
- time_since_last_collected_metrics_s: Optional[float],
- look_back_period_s: Optional[float],
- ) -> str:
- """
- - < 1s -> integer milliseconds
- - >= 1s -> seconds with two decimals
- """
- if time_since_last_collected_metrics_s is None:
- return "unknown"
- val = time_since_last_collected_metrics_s
- if val < 1.0:
- return f"{val * 1000:.0f}ms"
- return f"{val:.2f}s"
- def is_scaling_equivalent(self, other: "DeploymentSnapshot") -> bool:
- """Return True if scaling-related fields are equal.
- Used for autoscaling snapshot log deduplication. Compares only:
- target_replicas, min_replicas, max_replicas, scaling_status
- """
- if not isinstance(other, DeploymentSnapshot):
- return False
- return (
- self.app == other.app
- and self.deployment == other.deployment
- and self.target_replicas == other.target_replicas
- and self.min_replicas == other.min_replicas
- and self.max_replicas == other.max_replicas
- and self.scaling_status == other.scaling_status
- )
- RUNNING_REQUESTS_KEY = "running_requests"
- ONGOING_REQUESTS_KEY = "ongoing_requests"
- QUEUED_REQUESTS_KEY = "queued_requests"
- @dataclass(order=True)
- class TimeStampedValue:
- timestamp: float
- value: float = field(compare=False)
- # Type alias for time series data
- TimeSeries = List[TimeStampedValue]
- @dataclass
- class HandleMetricReport:
- """Report from a deployment handle on queued and ongoing requests.
- Args:
- deployment_id: The deployment ID of the deployment handle.
- handle_id: The handle ID of the deployment handle.
- actor_id: If the deployment handle (from which this metric was
- sent) lives on an actor, the ID of that actor.
- handle_source: Describes what kind of entity holds this
- deployment handle: a Serve proxy, a Serve replica, or
- unknown.
- aggregated_queued_requests: average number of queued requests at the
- handle over the past look_back_period_s seconds.
- queued_requests: list of values of queued requests at the
- handle over the past look_back_period_s seconds. This is a list because
- we take multiple measurements over time.
- aggregated_metrics: A map of metric name to the aggregated value over the past
- look_back_period_s seconds at the handle for each replica.
- metrics: A map of metric name to the list of values running at that handle for each replica
- over the past look_back_period_s seconds. This is a list because
- we take multiple measurements over time.
- timestamp: The time at which this report was created.
- """
- deployment_id: DeploymentID
- handle_id: str
- actor_id: str
- handle_source: DeploymentHandleSource
- aggregated_queued_requests: float
- queued_requests: TimeSeries
- aggregated_metrics: Dict[str, Dict[ReplicaID, float]]
- metrics: Dict[str, Dict[ReplicaID, TimeSeries]]
- timestamp: float
- @property
- def total_requests(self) -> float:
- """Total number of queued and running requests."""
- return self.aggregated_queued_requests + sum(
- self.aggregated_metrics.get(RUNNING_REQUESTS_KEY, {}).values()
- )
- @property
- def is_serve_component_source(self) -> bool:
- """Whether the handle source is a Serve actor.
- More specifically, this returns whether a Serve actor tracked
- by the controller holds the deployment handle that sent this
- report. If the deployment handle lives on a driver, a Ray task,
- or an actor that's not a Serve replica, then this returns False.
- """
- return self.handle_source in [
- DeploymentHandleSource.PROXY,
- DeploymentHandleSource.REPLICA,
- ]
- @dataclass
- class ReplicaMetricReport:
- """Report from a replica on ongoing requests.
- Args:
- replica_id: The replica ID of the replica.
- aggregated_metrics: A map of metric name to the aggregated value over the past
- look_back_period_s seconds at the replica.
- metrics: A map of metric name to the list of values running at that replica
- over the past look_back_period_s seconds. This is a list because
- we take multiple measurements over time.
- timestamp: The time at which this report was created.
- """
- replica_id: ReplicaID
- aggregated_metrics: Dict[str, float]
- metrics: Dict[str, TimeSeries]
- timestamp: float
- class AutoscalingSnapshotError(str, Enum):
- METRICS_UNAVAILABLE = "METRICS_UNAVAILABLE"
|