| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- from typing import Any, Dict, Optional
- import ray
- from ray.serve._private.common import TargetCapacityDirection
- from ray.serve._private.config import DeploymentConfig, ReplicaConfig
- from ray.serve.generated.serve_pb2 import (
- DeploymentInfo as DeploymentInfoProto,
- TargetCapacityDirection as TargetCapacityDirectionProto,
- )
- class DeploymentInfo:
- def __init__(
- self,
- deployment_config: DeploymentConfig,
- replica_config: ReplicaConfig,
- start_time_ms: int,
- deployer_job_id: str,
- actor_name: Optional[str] = None,
- version: Optional[str] = None,
- end_time_ms: Optional[int] = None,
- route_prefix: str = None,
- ingress: bool = False,
- target_capacity: Optional[float] = None,
- target_capacity_direction: Optional[TargetCapacityDirection] = None,
- ):
- self.deployment_config = deployment_config
- self.replica_config = replica_config
- # The time when .deploy() was first called for this deployment.
- self.start_time_ms = start_time_ms
- self.actor_name = actor_name
- self.version = version
- self.deployer_job_id = deployer_job_id
- # The time when this deployment was deleted.
- self.end_time_ms = end_time_ms
- # ephermal state
- self._cached_actor_def = None
- self.route_prefix = route_prefix
- self.ingress = ingress
- self.target_capacity = target_capacity
- self.target_capacity_direction = target_capacity_direction
- def __getstate__(self) -> Dict[Any, Any]:
- clean_dict = self.__dict__.copy()
- del clean_dict["_cached_actor_def"]
- return clean_dict
- def __setstate__(self, d: Dict[Any, Any]) -> None:
- self.__dict__ = d
- self._cached_actor_def = None
- def update(
- self,
- deployment_config: DeploymentConfig = None,
- replica_config: ReplicaConfig = None,
- version: str = None,
- route_prefix: str = None,
- ) -> "DeploymentInfo":
- return DeploymentInfo(
- deployment_config=deployment_config or self.deployment_config,
- replica_config=replica_config or self.replica_config,
- start_time_ms=self.start_time_ms,
- deployer_job_id=self.deployer_job_id,
- actor_name=self.actor_name,
- version=version or self.version,
- end_time_ms=self.end_time_ms,
- route_prefix=route_prefix or self.route_prefix,
- ingress=self.ingress,
- target_capacity=self.target_capacity,
- target_capacity_direction=self.target_capacity_direction,
- )
- def set_target_capacity(
- self,
- new_target_capacity: Optional[float],
- new_target_capacity_direction: Optional[TargetCapacityDirection],
- ):
- self.target_capacity = new_target_capacity
- self.target_capacity_direction = new_target_capacity_direction
- def config_changed(self, other) -> bool:
- return (
- self.deployment_config != other.deployment_config
- or self.replica_config.ray_actor_options
- != other.replica_config.ray_actor_options
- or other.version is None
- or self.version != other.version
- )
- @property
- def actor_def(self):
- if self._cached_actor_def is None:
- assert self.actor_name is not None
- # Break circular import :(.
- from ray.serve._private.replica import ReplicaActor
- # Dynamically create a new class with custom name here so Ray picks it up
- # correctly in actor metadata table and observability stack.
- self._cached_actor_def = ray.remote(
- type(
- self.actor_name,
- (ReplicaActor,),
- dict(ReplicaActor.__dict__),
- )
- )
- return self._cached_actor_def
- @classmethod
- def from_proto(cls, proto: DeploymentInfoProto):
- deployment_config = (
- DeploymentConfig.from_proto(proto.deployment_config)
- if proto.deployment_config
- else None
- )
- target_capacity = proto.target_capacity if proto.target_capacity != -1 else None
- target_capacity_direction = TargetCapacityDirectionProto.Name(
- proto.target_capacity_direction
- )
- if target_capacity_direction == "UNSET":
- target_capacity_direction = None
- else:
- target_capacity_direction = TargetCapacityDirection(
- target_capacity_direction
- )
- data = {
- "deployment_config": deployment_config,
- "replica_config": ReplicaConfig.from_proto(
- proto.replica_config,
- deployment_config.needs_pickle() if deployment_config else True,
- ),
- "start_time_ms": proto.start_time_ms,
- "actor_name": proto.actor_name if proto.actor_name != "" else None,
- "version": proto.version if proto.version != "" else None,
- "end_time_ms": proto.end_time_ms if proto.end_time_ms != 0 else None,
- "deployer_job_id": ray.get_runtime_context().get_job_id(),
- "target_capacity": target_capacity,
- "target_capacity_direction": target_capacity_direction,
- }
- return cls(**data)
- def to_proto(self):
- data = {
- "start_time_ms": self.start_time_ms,
- "actor_name": self.actor_name,
- "version": self.version,
- "end_time_ms": self.end_time_ms,
- }
- if self.deployment_config:
- data["deployment_config"] = self.deployment_config.to_proto()
- if self.replica_config:
- data["replica_config"] = self.replica_config.to_proto()
- if self.target_capacity is None:
- data["target_capacity"] = -1
- else:
- data["target_capacity"] = self.target_capacity
- if self.target_capacity_direction is None:
- data["target_capacity_direction"] = TargetCapacityDirectionProto.UNSET
- else:
- data["target_capacity_direction"] = self.target_capacity_direction.name
- return DeploymentInfoProto(**data)
- def to_dict(self):
- # only use for logging purposes
- return {
- "deployment_config": (
- self.deployment_config.to_dict() if self.deployment_config else None
- ),
- "replica_config": (
- self.replica_config.to_dict() if self.replica_config else None
- ),
- "start_time_ms": self.start_time_ms,
- "actor_name": self.actor_name,
- "version": self.version,
- "end_time_ms": self.end_time_ms,
- }
|