deployment_info.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. from typing import Any, Dict, Optional
  2. import ray
  3. from ray.serve._private.common import TargetCapacityDirection
  4. from ray.serve._private.config import DeploymentConfig, ReplicaConfig
  5. from ray.serve.generated.serve_pb2 import (
  6. DeploymentInfo as DeploymentInfoProto,
  7. TargetCapacityDirection as TargetCapacityDirectionProto,
  8. )
  9. class DeploymentInfo:
  10. def __init__(
  11. self,
  12. deployment_config: DeploymentConfig,
  13. replica_config: ReplicaConfig,
  14. start_time_ms: int,
  15. deployer_job_id: str,
  16. actor_name: Optional[str] = None,
  17. version: Optional[str] = None,
  18. end_time_ms: Optional[int] = None,
  19. route_prefix: str = None,
  20. ingress: bool = False,
  21. target_capacity: Optional[float] = None,
  22. target_capacity_direction: Optional[TargetCapacityDirection] = None,
  23. ):
  24. self.deployment_config = deployment_config
  25. self.replica_config = replica_config
  26. # The time when .deploy() was first called for this deployment.
  27. self.start_time_ms = start_time_ms
  28. self.actor_name = actor_name
  29. self.version = version
  30. self.deployer_job_id = deployer_job_id
  31. # The time when this deployment was deleted.
  32. self.end_time_ms = end_time_ms
  33. # ephermal state
  34. self._cached_actor_def = None
  35. self.route_prefix = route_prefix
  36. self.ingress = ingress
  37. self.target_capacity = target_capacity
  38. self.target_capacity_direction = target_capacity_direction
  39. def __getstate__(self) -> Dict[Any, Any]:
  40. clean_dict = self.__dict__.copy()
  41. del clean_dict["_cached_actor_def"]
  42. return clean_dict
  43. def __setstate__(self, d: Dict[Any, Any]) -> None:
  44. self.__dict__ = d
  45. self._cached_actor_def = None
  46. def update(
  47. self,
  48. deployment_config: DeploymentConfig = None,
  49. replica_config: ReplicaConfig = None,
  50. version: str = None,
  51. route_prefix: str = None,
  52. ) -> "DeploymentInfo":
  53. return DeploymentInfo(
  54. deployment_config=deployment_config or self.deployment_config,
  55. replica_config=replica_config or self.replica_config,
  56. start_time_ms=self.start_time_ms,
  57. deployer_job_id=self.deployer_job_id,
  58. actor_name=self.actor_name,
  59. version=version or self.version,
  60. end_time_ms=self.end_time_ms,
  61. route_prefix=route_prefix or self.route_prefix,
  62. ingress=self.ingress,
  63. target_capacity=self.target_capacity,
  64. target_capacity_direction=self.target_capacity_direction,
  65. )
  66. def set_target_capacity(
  67. self,
  68. new_target_capacity: Optional[float],
  69. new_target_capacity_direction: Optional[TargetCapacityDirection],
  70. ):
  71. self.target_capacity = new_target_capacity
  72. self.target_capacity_direction = new_target_capacity_direction
  73. def config_changed(self, other) -> bool:
  74. return (
  75. self.deployment_config != other.deployment_config
  76. or self.replica_config.ray_actor_options
  77. != other.replica_config.ray_actor_options
  78. or other.version is None
  79. or self.version != other.version
  80. )
  81. @property
  82. def actor_def(self):
  83. if self._cached_actor_def is None:
  84. assert self.actor_name is not None
  85. # Break circular import :(.
  86. from ray.serve._private.replica import ReplicaActor
  87. # Dynamically create a new class with custom name here so Ray picks it up
  88. # correctly in actor metadata table and observability stack.
  89. self._cached_actor_def = ray.remote(
  90. type(
  91. self.actor_name,
  92. (ReplicaActor,),
  93. dict(ReplicaActor.__dict__),
  94. )
  95. )
  96. return self._cached_actor_def
  97. @classmethod
  98. def from_proto(cls, proto: DeploymentInfoProto):
  99. deployment_config = (
  100. DeploymentConfig.from_proto(proto.deployment_config)
  101. if proto.deployment_config
  102. else None
  103. )
  104. target_capacity = proto.target_capacity if proto.target_capacity != -1 else None
  105. target_capacity_direction = TargetCapacityDirectionProto.Name(
  106. proto.target_capacity_direction
  107. )
  108. if target_capacity_direction == "UNSET":
  109. target_capacity_direction = None
  110. else:
  111. target_capacity_direction = TargetCapacityDirection(
  112. target_capacity_direction
  113. )
  114. data = {
  115. "deployment_config": deployment_config,
  116. "replica_config": ReplicaConfig.from_proto(
  117. proto.replica_config,
  118. deployment_config.needs_pickle() if deployment_config else True,
  119. ),
  120. "start_time_ms": proto.start_time_ms,
  121. "actor_name": proto.actor_name if proto.actor_name != "" else None,
  122. "version": proto.version if proto.version != "" else None,
  123. "end_time_ms": proto.end_time_ms if proto.end_time_ms != 0 else None,
  124. "deployer_job_id": ray.get_runtime_context().get_job_id(),
  125. "target_capacity": target_capacity,
  126. "target_capacity_direction": target_capacity_direction,
  127. }
  128. return cls(**data)
  129. def to_proto(self):
  130. data = {
  131. "start_time_ms": self.start_time_ms,
  132. "actor_name": self.actor_name,
  133. "version": self.version,
  134. "end_time_ms": self.end_time_ms,
  135. }
  136. if self.deployment_config:
  137. data["deployment_config"] = self.deployment_config.to_proto()
  138. if self.replica_config:
  139. data["replica_config"] = self.replica_config.to_proto()
  140. if self.target_capacity is None:
  141. data["target_capacity"] = -1
  142. else:
  143. data["target_capacity"] = self.target_capacity
  144. if self.target_capacity_direction is None:
  145. data["target_capacity_direction"] = TargetCapacityDirectionProto.UNSET
  146. else:
  147. data["target_capacity_direction"] = self.target_capacity_direction.name
  148. return DeploymentInfoProto(**data)
  149. def to_dict(self):
  150. # only use for logging purposes
  151. return {
  152. "deployment_config": (
  153. self.deployment_config.to_dict() if self.deployment_config else None
  154. ),
  155. "replica_config": (
  156. self.replica_config.to_dict() if self.replica_config else None
  157. ),
  158. "start_time_ms": self.start_time_ms,
  159. "actor_name": self.actor_name,
  160. "version": self.version,
  161. "end_time_ms": self.end_time_ms,
  162. }