version.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import json
  2. import logging
  3. from copy import deepcopy
  4. from typing import Any, Dict, List, Optional
  5. from zlib import crc32
  6. from ray._common.pydantic_compat import BaseModel
  7. from ray.serve._private.config import DeploymentConfig
  8. from ray.serve._private.utils import DeploymentOptionUpdateType, get_random_string
  9. from ray.serve.config import AutoscalingConfig
  10. from ray.serve.generated.serve_pb2 import DeploymentVersion as DeploymentVersionProto
  11. logger = logging.getLogger("ray.serve")
  12. class DeploymentVersion:
  13. def __init__(
  14. self,
  15. code_version: Optional[str],
  16. deployment_config: DeploymentConfig,
  17. ray_actor_options: Optional[Dict],
  18. placement_group_bundles: Optional[List[Dict[str, float]]] = None,
  19. placement_group_strategy: Optional[str] = None,
  20. placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None,
  21. placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None,
  22. max_replicas_per_node: Optional[int] = None,
  23. route_prefix: Optional[str] = None,
  24. ):
  25. if code_version is not None and not isinstance(code_version, str):
  26. raise TypeError(f"code_version must be str, got {type(code_version)}.")
  27. if code_version is None:
  28. self.code_version = get_random_string()
  29. else:
  30. self.code_version = code_version
  31. # Options for this field may be mutated over time, so any logic that uses this
  32. # should access this field directly.
  33. self.deployment_config = deployment_config
  34. self.ray_actor_options = ray_actor_options
  35. self.placement_group_bundles = placement_group_bundles
  36. self.placement_group_strategy = placement_group_strategy
  37. self.placement_group_bundle_label_selector = (
  38. placement_group_bundle_label_selector
  39. )
  40. self.placement_group_fallback_strategy = placement_group_fallback_strategy
  41. self.max_replicas_per_node = max_replicas_per_node
  42. self.route_prefix = route_prefix
  43. self.compute_hashes()
  44. @classmethod
  45. def from_deployment_version(
  46. cls, deployment_version, deployment_config, route_prefix: Optional[str] = None
  47. ):
  48. version_copy = deepcopy(deployment_version)
  49. version_copy.deployment_config = deployment_config
  50. version_copy.route_prefix = route_prefix
  51. version_copy.compute_hashes()
  52. return version_copy
  53. def __hash__(self) -> int:
  54. return self._hash
  55. def __eq__(self, other: Any) -> bool:
  56. if not isinstance(other, DeploymentVersion):
  57. return False
  58. return self._hash == other._hash
  59. def requires_actor_restart(self, new_version):
  60. """Determines whether the new version requires actors of the current version to
  61. be restarted.
  62. """
  63. return (
  64. self.code_version != new_version.code_version
  65. or self.ray_actor_options_hash != new_version.ray_actor_options_hash
  66. or self.placement_group_options_hash
  67. != new_version.placement_group_options_hash
  68. or self.max_replicas_per_node != new_version.max_replicas_per_node
  69. )
  70. def requires_actor_reconfigure(self, new_version):
  71. """Determines whether the new version requires calling reconfigure() on the
  72. replica actor.
  73. """
  74. return self.reconfigure_actor_hash != new_version.reconfigure_actor_hash
  75. def requires_long_poll_broadcast(self, new_version):
  76. """Determines whether lightweightly updating an existing replica to the new
  77. version requires broadcasting through long poll that the running replicas has
  78. changed.
  79. """
  80. return (
  81. self.deployment_config.max_ongoing_requests
  82. != new_version.deployment_config.max_ongoing_requests
  83. )
  84. def compute_hashes(self):
  85. # If these change, the controller will rolling upgrade existing replicas.
  86. serialized_ray_actor_options = _serialize(self.ray_actor_options or {})
  87. self.ray_actor_options_hash = crc32(serialized_ray_actor_options)
  88. combined_placement_group_options = {}
  89. if self.placement_group_bundles is not None:
  90. combined_placement_group_options["bundles"] = self.placement_group_bundles
  91. if self.placement_group_strategy is not None:
  92. combined_placement_group_options["strategy"] = self.placement_group_strategy
  93. if self.placement_group_bundle_label_selector is not None:
  94. combined_placement_group_options[
  95. "bundle_label_selector"
  96. ] = self.placement_group_bundle_label_selector
  97. if self.placement_group_fallback_strategy is not None:
  98. combined_placement_group_options[
  99. "fallback_strategy"
  100. ] = self.placement_group_fallback_strategy
  101. serialized_placement_group_options = _serialize(
  102. combined_placement_group_options
  103. )
  104. self.placement_group_options_hash = crc32(serialized_placement_group_options)
  105. # Include app-level route prefix in the version hashes so changing
  106. # it triggers an in-place reconfigure of running replicas.
  107. serialized_route_prefix = _serialize(self.route_prefix)
  108. # If this changes, DeploymentReplica.reconfigure() will call reconfigure on the
  109. # actual replica actor
  110. self.reconfigure_actor_hash = crc32(
  111. serialized_route_prefix
  112. + self._get_serialized_options(
  113. [DeploymentOptionUpdateType.NeedsActorReconfigure]
  114. )
  115. )
  116. # Used by __eq__ in deployment state to either reconfigure the replicas or
  117. # stop and restart them
  118. self._hash = crc32(
  119. self.code_version.encode("utf-8")
  120. + serialized_ray_actor_options
  121. + serialized_placement_group_options
  122. + str(self.max_replicas_per_node).encode("utf-8")
  123. + serialized_route_prefix
  124. + self._get_serialized_options(
  125. [
  126. DeploymentOptionUpdateType.NeedsReconfigure,
  127. DeploymentOptionUpdateType.NeedsActorReconfigure,
  128. ]
  129. )
  130. )
  131. def to_proto(self) -> bytes:
  132. # TODO(simon): enable cross language user config
  133. placement_group_bundles = (
  134. json.dumps(self.placement_group_bundles)
  135. if self.placement_group_bundles is not None
  136. else ""
  137. )
  138. placement_group_bundle_label_selector = (
  139. json.dumps(self.placement_group_bundle_label_selector)
  140. if self.placement_group_bundle_label_selector is not None
  141. else ""
  142. )
  143. placement_group_fallback_strategy = (
  144. json.dumps(self.placement_group_fallback_strategy)
  145. if self.placement_group_fallback_strategy is not None
  146. else ""
  147. )
  148. placement_group_strategy = (
  149. self.placement_group_strategy
  150. if self.placement_group_strategy is not None
  151. else ""
  152. )
  153. max_replicas_per_node = (
  154. self.max_replicas_per_node if self.max_replicas_per_node is not None else 0
  155. )
  156. return DeploymentVersionProto(
  157. code_version=self.code_version,
  158. deployment_config=self.deployment_config.to_proto(),
  159. ray_actor_options=json.dumps(self.ray_actor_options),
  160. placement_group_bundles=placement_group_bundles,
  161. placement_group_strategy=placement_group_strategy,
  162. placement_group_bundle_label_selector=placement_group_bundle_label_selector,
  163. placement_group_fallback_strategy=placement_group_fallback_strategy,
  164. max_replicas_per_node=max_replicas_per_node,
  165. )
  166. @classmethod
  167. def from_proto(cls, proto: DeploymentVersionProto):
  168. return DeploymentVersion(
  169. proto.code_version,
  170. DeploymentConfig.from_proto(proto.deployment_config),
  171. json.loads(proto.ray_actor_options),
  172. placement_group_bundles=(
  173. json.loads(proto.placement_group_bundles)
  174. if proto.placement_group_bundles
  175. else None
  176. ),
  177. placement_group_bundle_label_selector=(
  178. json.loads(proto.placement_group_bundle_label_selector)
  179. if proto.placement_group_bundle_label_selector
  180. else None
  181. ),
  182. placement_group_fallback_strategy=(
  183. json.loads(proto.placement_group_fallback_strategy)
  184. if proto.placement_group_fallback_strategy
  185. else None
  186. ),
  187. placement_group_strategy=(
  188. proto.placement_group_strategy
  189. if proto.placement_group_strategy
  190. else None
  191. ),
  192. max_replicas_per_node=(
  193. proto.max_replicas_per_node if proto.max_replicas_per_node else None
  194. ),
  195. )
  196. def _get_serialized_options(
  197. self, update_types: List[DeploymentOptionUpdateType]
  198. ) -> bytes:
  199. """Returns a serialized dictionary containing fields of a deployment config that
  200. should prompt a deployment version update.
  201. """
  202. reconfigure_dict = {}
  203. # TODO(aguo): Once we only support pydantic 2, we can remove this if check.
  204. # In pydantic 2.0, `__fields__` has been renamed to `model_fields`.
  205. fields = (
  206. self.deployment_config.model_fields
  207. if hasattr(self.deployment_config, "model_fields")
  208. else self.deployment_config.__fields__
  209. )
  210. for option_name, field in fields.items():
  211. option_weight = field.field_info.extra.get("update_type")
  212. if option_weight in update_types:
  213. reconfigure_dict[option_name] = getattr(
  214. self.deployment_config, option_name
  215. )
  216. # If autoscaling config was changed, only broadcast to
  217. # replicas if metrics_interval_s or look_back_period_s
  218. # was changed, because the rest of the fields are only
  219. # used in deployment state manager
  220. if isinstance(reconfigure_dict[option_name], AutoscalingConfig):
  221. reconfigure_dict[option_name] = reconfigure_dict[option_name].dict(
  222. include={"metrics_interval_s", "look_back_period_s"}
  223. )
  224. elif isinstance(reconfigure_dict[option_name], BaseModel):
  225. reconfigure_dict[option_name] = reconfigure_dict[option_name].dict()
  226. # Can't serialize bytes. The request router class is already
  227. # included in the serialized config as request_router_class.
  228. if "request_router_config" in reconfigure_dict:
  229. reconfigure_dict["request_router_config"].pop(
  230. "_serialized_request_router_cls", None
  231. )
  232. if (
  233. isinstance(self.deployment_config.user_config, bytes)
  234. and "user_config" in reconfigure_dict
  235. ):
  236. del reconfigure_dict["user_config"]
  237. return self.deployment_config.user_config + _serialize(reconfigure_dict)
  238. return _serialize(reconfigure_dict)
  239. def _serialize(json_object):
  240. return str.encode(json.dumps(json_object, sort_keys=True))