config.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979
  1. import inspect
  2. import json
  3. from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
  4. from google.protobuf.descriptor import FieldDescriptor
  5. from google.protobuf.message import Message
  6. from ray import cloudpickle
  7. from ray._common import ray_option_utils
  8. from ray._common.pydantic_compat import (
  9. BaseModel,
  10. Field,
  11. NonNegativeFloat,
  12. NonNegativeInt,
  13. PositiveFloat,
  14. PositiveInt,
  15. validator,
  16. )
  17. from ray._common.serialization import pickle_dumps
  18. from ray._common.utils import resources_from_ray_options
  19. from ray.serve._private.constants import (
  20. DEFAULT_CONSTRUCTOR_RETRY_COUNT,
  21. DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_S,
  22. DEFAULT_GRACEFUL_SHUTDOWN_WAIT_LOOP_S,
  23. DEFAULT_HEALTH_CHECK_PERIOD_S,
  24. DEFAULT_HEALTH_CHECK_TIMEOUT_S,
  25. DEFAULT_MAX_ONGOING_REQUESTS,
  26. MAX_REPLICAS_PER_NODE_MAX_VALUE,
  27. )
  28. from ray.serve._private.utils import DEFAULT, DeploymentOptionUpdateType
  29. from ray.serve.config import (
  30. AggregationFunction,
  31. AutoscalingConfig,
  32. DeploymentMode,
  33. HTTPOptions,
  34. ProxyLocation,
  35. RequestRouterConfig,
  36. )
  37. from ray.serve.generated.serve_pb2 import (
  38. AutoscalingConfig as AutoscalingConfigProto,
  39. DeploymentConfig as DeploymentConfigProto,
  40. DeploymentLanguage,
  41. EncodingType as EncodingTypeProto,
  42. LoggingConfig as LoggingConfigProto,
  43. ReplicaConfig as ReplicaConfigProto,
  44. RequestRouterConfig as RequestRouterConfigProto,
  45. )
  46. from ray.util.placement_group import validate_placement_group
  47. def _needs_pickle(deployment_language: DeploymentLanguage, is_cross_language: bool):
  48. """From Serve client API's perspective, decide whether pickling is needed."""
  49. if deployment_language == DeploymentLanguage.PYTHON and not is_cross_language:
  50. # Python client deploying Python replicas.
  51. return True
  52. elif deployment_language == DeploymentLanguage.JAVA and is_cross_language:
  53. # Python client deploying Java replicas,
  54. # using xlang serialization via cloudpickle.
  55. return True
  56. else:
  57. return False
  58. def _proto_to_dict(proto: Message) -> Dict:
  59. """Recursively convert a protobuf into a Python dictionary.
  60. This is an alternative to protobuf's `MessageToDict`. Unlike
  61. `MessageToDict`, this function doesn't add an extra base64
  62. encoding to bytes when constructing a json response.
  63. """
  64. data = {}
  65. # Fill data with non-empty fields.
  66. for field, value in proto.ListFields():
  67. # Handle repeated fields
  68. if field.label == FieldDescriptor.LABEL_REPEATED:
  69. # if we dont do this block the repeated field will be a list of
  70. # `google.protobuf.internal.containers.RepeatedScalarFieldContainer
  71. # Explicitly convert to list
  72. if field.type == FieldDescriptor.TYPE_MESSAGE:
  73. data[field.name] = [
  74. _proto_to_dict(v) for v in value
  75. ] # Convert each item
  76. else:
  77. data[field.name] = list(value) # Convert to list directly
  78. # Recursively call if the field is another protobuf.
  79. elif field.type == FieldDescriptor.TYPE_MESSAGE:
  80. data[field.name] = _proto_to_dict(value)
  81. else:
  82. data[field.name] = value
  83. # Fill data default values.
  84. for field in proto.DESCRIPTOR.fields:
  85. if (
  86. field.name not in data # skip the fields that are already set
  87. and field.type != FieldDescriptor.TYPE_MESSAGE # skip nested messages
  88. and not field.containing_oneof # skip optional fields
  89. ):
  90. data[field.name] = field.default_value
  91. return data
  92. class DeploymentConfig(BaseModel):
  93. """Internal datastructure wrapping config options for a deployment.
  94. Args:
  95. num_replicas: The number of processes to start up that
  96. handles requests to this deployment. Defaults to 1.
  97. max_ongoing_requests: The maximum number of queries
  98. that is sent to a replica of this deployment without receiving
  99. a response. Defaults to 5.
  100. max_queued_requests: Maximum number of requests to this deployment that will be
  101. queued at each *caller* (proxy or DeploymentHandle). Once this limit is
  102. reached, subsequent requests will raise a BackPressureError (for handles) or
  103. return an HTTP 503 status code (for HTTP requests). Defaults to -1 (no
  104. limit).
  105. user_config: Arguments to pass to the reconfigure
  106. method of the deployment. The reconfigure method is called if
  107. user_config is not None. Must be JSON-serializable.
  108. graceful_shutdown_wait_loop_s: Duration
  109. that deployment replicas wait until there is no more work to
  110. be done before shutting down.
  111. graceful_shutdown_timeout_s: Controller waits for this duration
  112. to forcefully kill the replica for shutdown.
  113. health_check_period_s: Frequency at which the controller health
  114. checks replicas.
  115. health_check_timeout_s: Timeout that the controller waits for a
  116. response from the replica's health check before marking it
  117. unhealthy.
  118. autoscaling_config: Autoscaling configuration.
  119. logging_config: Configuration for deployment logs.
  120. user_configured_option_names: The names of options manually
  121. configured by the user.
  122. request_router_config: Configuration for deployment request router.
  123. max_constructor_retry_count: Maximum number of times to retry the
  124. deployment constructor. Defaults to 20.
  125. """
  126. num_replicas: Optional[NonNegativeInt] = Field(
  127. default=1, update_type=DeploymentOptionUpdateType.LightWeight
  128. )
  129. max_ongoing_requests: PositiveInt = Field(
  130. default=DEFAULT_MAX_ONGOING_REQUESTS,
  131. update_type=DeploymentOptionUpdateType.NeedsActorReconfigure,
  132. )
  133. max_queued_requests: int = Field(
  134. default=-1,
  135. update_type=DeploymentOptionUpdateType.LightWeight,
  136. )
  137. user_config: Any = Field(
  138. default=None, update_type=DeploymentOptionUpdateType.NeedsActorReconfigure
  139. )
  140. graceful_shutdown_timeout_s: NonNegativeFloat = Field(
  141. default=DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_S,
  142. update_type=DeploymentOptionUpdateType.NeedsReconfigure,
  143. )
  144. graceful_shutdown_wait_loop_s: NonNegativeFloat = Field(
  145. default=DEFAULT_GRACEFUL_SHUTDOWN_WAIT_LOOP_S,
  146. update_type=DeploymentOptionUpdateType.NeedsActorReconfigure,
  147. )
  148. health_check_period_s: PositiveFloat = Field(
  149. default=DEFAULT_HEALTH_CHECK_PERIOD_S,
  150. update_type=DeploymentOptionUpdateType.NeedsReconfigure,
  151. )
  152. health_check_timeout_s: PositiveFloat = Field(
  153. default=DEFAULT_HEALTH_CHECK_TIMEOUT_S,
  154. update_type=DeploymentOptionUpdateType.NeedsReconfigure,
  155. )
  156. autoscaling_config: Optional[AutoscalingConfig] = Field(
  157. default=None, update_type=DeploymentOptionUpdateType.NeedsActorReconfigure
  158. )
  159. request_router_config: RequestRouterConfig = Field(
  160. default_factory=RequestRouterConfig,
  161. update_type=DeploymentOptionUpdateType.NeedsActorReconfigure,
  162. )
  163. # This flag is used to let replica know they are deployed from
  164. # a different language.
  165. is_cross_language: bool = False
  166. # This flag is used to let controller know which language does
  167. # the deployment use.
  168. deployment_language: Any = DeploymentLanguage.PYTHON
  169. version: Optional[str] = Field(
  170. default=None,
  171. update_type=DeploymentOptionUpdateType.HeavyWeight,
  172. )
  173. logging_config: Optional[dict] = Field(
  174. default=None,
  175. update_type=DeploymentOptionUpdateType.NeedsActorReconfigure,
  176. )
  177. max_constructor_retry_count: PositiveInt = Field(
  178. default=DEFAULT_CONSTRUCTOR_RETRY_COUNT,
  179. update_type=DeploymentOptionUpdateType.NeedsReconfigure,
  180. )
  181. # Contains the names of deployment options manually set by the user
  182. user_configured_option_names: Set[str] = set()
  183. class Config:
  184. validate_assignment = True
  185. arbitrary_types_allowed = True
  186. @validator("user_config", always=True)
  187. def user_config_json_serializable(cls, v):
  188. if isinstance(v, bytes):
  189. return v
  190. if v is not None:
  191. try:
  192. json.dumps(v)
  193. except TypeError as e:
  194. raise ValueError(f"user_config is not JSON-serializable: {str(e)}.")
  195. return v
  196. @validator("logging_config", always=True)
  197. def logging_config_valid(cls, v):
  198. if v is None:
  199. return v
  200. if not isinstance(v, dict):
  201. raise TypeError(
  202. f"Got invalid type '{type(v)}' for logging_config. "
  203. "Expected a dictionary."
  204. )
  205. # Handle default value
  206. from ray.serve.schema import LoggingConfig
  207. v = LoggingConfig(**v).dict()
  208. return v
  209. @validator("max_queued_requests", always=True)
  210. def validate_max_queued_requests(cls, v):
  211. if not isinstance(v, int):
  212. raise TypeError("max_queued_requests must be an integer.")
  213. if v < 1 and v != -1:
  214. raise ValueError(
  215. "max_queued_requests must be -1 (no limit) or a positive integer."
  216. )
  217. return v
  218. def needs_pickle(self):
  219. return _needs_pickle(self.deployment_language, self.is_cross_language)
  220. def to_proto(self):
  221. data = self.dict()
  222. if data.get("user_config") is not None:
  223. if self.needs_pickle():
  224. data["user_config"] = cloudpickle.dumps(data["user_config"])
  225. if data.get("autoscaling_config"):
  226. # By setting the serialized policy def, on the protobuf level, AutoscalingConfig constructor will not
  227. # try to import the policy from the string import path when the protobuf is deserialized on the controller side
  228. data["autoscaling_config"]["policy"][
  229. "_serialized_policy_def"
  230. ] = self.autoscaling_config.policy._serialized_policy_def
  231. data["autoscaling_config"] = AutoscalingConfigProto(
  232. **data["autoscaling_config"]
  233. )
  234. if data.get("request_router_config"):
  235. router_kwargs = data["request_router_config"].get("request_router_kwargs")
  236. if router_kwargs is not None:
  237. if not router_kwargs:
  238. data["request_router_config"]["request_router_kwargs"] = b""
  239. elif self.needs_pickle():
  240. # Protobuf requires bytes, so we need to pickle
  241. data["request_router_config"][
  242. "request_router_kwargs"
  243. ] = cloudpickle.dumps(router_kwargs)
  244. else:
  245. raise ValueError(
  246. "Non-empty request_router_kwargs not supported"
  247. f"for cross-language deployments. Got: {router_kwargs}"
  248. )
  249. # By setting the serialized request router cls, on the protobuf level, RequestRouterConfig constructor will not
  250. # try to import the request router cls from the string import path when the protobuf is deserialized on the controller side
  251. data["request_router_config"][
  252. "_serialized_request_router_cls"
  253. ] = self.request_router_config._serialized_request_router_cls
  254. data["request_router_config"] = RequestRouterConfigProto(
  255. **data["request_router_config"]
  256. )
  257. if data.get("logging_config"):
  258. if "encoding" in data["logging_config"]:
  259. data["logging_config"]["encoding"] = EncodingTypeProto.Value(
  260. data["logging_config"]["encoding"]
  261. )
  262. data["logging_config"] = LoggingConfigProto(**data["logging_config"])
  263. data["user_configured_option_names"] = list(
  264. data["user_configured_option_names"]
  265. )
  266. return DeploymentConfigProto(**data)
  267. def to_proto_bytes(self):
  268. return self.to_proto().SerializeToString()
  269. def to_dict(self):
  270. # only use for logging purposes
  271. return self.dict()
  272. @classmethod
  273. def from_proto(cls, proto: DeploymentConfigProto):
  274. data = _proto_to_dict(proto)
  275. deployment_language = (
  276. data["deployment_language"]
  277. if "deployment_language" in data
  278. else DeploymentLanguage.PYTHON
  279. )
  280. is_cross_language = (
  281. data["is_cross_language"] if "is_cross_language" in data else False
  282. )
  283. needs_pickle = _needs_pickle(deployment_language, is_cross_language)
  284. if "user_config" in data:
  285. if data["user_config"] != b"":
  286. if needs_pickle:
  287. data["user_config"] = cloudpickle.loads(proto.user_config)
  288. else:
  289. data["user_config"] = proto.user_config
  290. else:
  291. data["user_config"] = None
  292. if "request_router_config" in data:
  293. if "request_router_kwargs" in data["request_router_config"]:
  294. request_router_kwargs = data["request_router_config"][
  295. "request_router_kwargs"
  296. ]
  297. if request_router_kwargs != b"":
  298. if needs_pickle:
  299. data["request_router_config"][
  300. "request_router_kwargs"
  301. ] = cloudpickle.loads(
  302. proto.request_router_config.request_router_kwargs
  303. )
  304. else:
  305. data["request_router_config"][
  306. "request_router_kwargs"
  307. ] = proto.request_router_config.request_router_kwargs
  308. else:
  309. data["request_router_config"]["request_router_kwargs"] = {}
  310. data["request_router_config"] = RequestRouterConfig(
  311. **data["request_router_config"]
  312. )
  313. if "autoscaling_config" in data:
  314. if not data["autoscaling_config"].get("upscale_smoothing_factor"):
  315. data["autoscaling_config"]["upscale_smoothing_factor"] = None
  316. if not data["autoscaling_config"].get("downscale_smoothing_factor"):
  317. data["autoscaling_config"]["downscale_smoothing_factor"] = None
  318. if not data["autoscaling_config"].get("upscaling_factor"):
  319. data["autoscaling_config"]["upscaling_factor"] = None
  320. if not data["autoscaling_config"].get("downscaling_factor"):
  321. data["autoscaling_config"]["downscaling_factor"] = None
  322. if not data["autoscaling_config"].get("target_ongoing_requests"):
  323. data["autoscaling_config"]["target_ongoing_requests"] = None
  324. if not data["autoscaling_config"].get("aggregation_function"):
  325. data["autoscaling_config"][
  326. "aggregation_function"
  327. ] = AggregationFunction.MEAN
  328. data["autoscaling_config"] = AutoscalingConfig(**data["autoscaling_config"])
  329. if "version" in data:
  330. if data["version"] == "":
  331. data["version"] = None
  332. if "user_configured_option_names" in data:
  333. data["user_configured_option_names"] = set(
  334. data["user_configured_option_names"]
  335. )
  336. if "logging_config" in data:
  337. if "encoding" in data["logging_config"]:
  338. data["logging_config"]["encoding"] = EncodingTypeProto.Name(
  339. data["logging_config"]["encoding"]
  340. )
  341. return cls(**data)
  342. @classmethod
  343. def from_proto_bytes(cls, proto_bytes: bytes):
  344. proto = DeploymentConfigProto.FromString(proto_bytes)
  345. return cls.from_proto(proto)
  346. @classmethod
  347. def from_default(cls, **kwargs):
  348. """Creates a default DeploymentConfig and overrides it with kwargs.
  349. Ignores any kwargs set to DEFAULT.VALUE.
  350. Raises:
  351. TypeError: when a keyword that's not an argument to the class is
  352. passed in.
  353. """
  354. config = cls()
  355. valid_config_options = set(config.dict().keys())
  356. # Friendly error if a non-DeploymentConfig kwarg was passed in
  357. for key, val in kwargs.items():
  358. if key not in valid_config_options:
  359. raise TypeError(
  360. f'Got invalid Deployment config option "{key}" '
  361. f"(with value {val}) as keyword argument. All Deployment "
  362. "config options must come from this list: "
  363. f"{list(valid_config_options)}."
  364. )
  365. kwargs = {key: val for key, val in kwargs.items() if val != DEFAULT.VALUE}
  366. for key, val in kwargs.items():
  367. config.__setattr__(key, val)
  368. return config
  369. def handle_num_replicas_auto(
  370. max_ongoing_requests: Union[int, DEFAULT],
  371. autoscaling_config: Optional[Union[Dict, AutoscalingConfig, DEFAULT]],
  372. ):
  373. """Return modified `max_ongoing_requests` and `autoscaling_config`
  374. for when num_replicas="auto".
  375. If `autoscaling_config` is unspecified, returns the modified value
  376. AutoscalingConfig.default().
  377. If it is specified, the specified fields in `autoscaling_config`
  378. override that of AutoscalingConfig.default().
  379. """
  380. if autoscaling_config in [DEFAULT.VALUE, None]:
  381. # If autoscaling config wasn't specified, use default
  382. # configuration
  383. autoscaling_config = AutoscalingConfig.default()
  384. else:
  385. # If autoscaling config was specified, values specified in
  386. # autoscaling config overrides the default configuration
  387. default_config = AutoscalingConfig.default().dict(exclude_unset=True)
  388. autoscaling_config = (
  389. autoscaling_config
  390. if isinstance(autoscaling_config, dict)
  391. else autoscaling_config.dict(exclude_unset=True)
  392. )
  393. default_config.update(autoscaling_config)
  394. autoscaling_config = AutoscalingConfig(**default_config)
  395. return max_ongoing_requests, autoscaling_config
  396. class ReplicaConfig:
  397. """Internal datastructure wrapping config options for a deployment's replicas.
  398. Provides five main properties (see property docstrings for more info):
  399. deployment_def: the code, or a reference to the code, that this
  400. replica should run.
  401. init_args: the deployment_def's init_args.
  402. init_kwargs: the deployment_def's init_kwargs.
  403. ray_actor_options: the Ray actor options to pass into the replica's
  404. actor.
  405. resource_dict: contains info on this replica's actor's resource needs.
  406. Offers a serialized equivalent (e.g. serialized_deployment_def) for
  407. deployment_def, init_args, and init_kwargs. Deserializes these properties
  408. when they're first accessed, if they were not passed in directly through
  409. create().
  410. Use the classmethod create() to make a ReplicaConfig with the deserialized
  411. properties.
  412. Note: overwriting or setting any property after the ReplicaConfig has been
  413. constructed is currently undefined behavior. The config's fields should not
  414. be modified externally after it is created.
  415. """
  416. def __init__(
  417. self,
  418. deployment_def_name: str,
  419. serialized_deployment_def: bytes,
  420. serialized_init_args: bytes,
  421. serialized_init_kwargs: bytes,
  422. ray_actor_options: Dict,
  423. placement_group_bundles: Optional[List[Dict[str, float]]] = None,
  424. placement_group_strategy: Optional[str] = None,
  425. placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None,
  426. placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None,
  427. max_replicas_per_node: Optional[int] = None,
  428. needs_pickle: bool = True,
  429. ):
  430. """Construct a ReplicaConfig with serialized properties.
  431. All parameters are required. See classmethod create() for defaults.
  432. """
  433. self.deployment_def_name = deployment_def_name
  434. # Store serialized versions of code properties.
  435. self.serialized_deployment_def = serialized_deployment_def
  436. self.serialized_init_args = serialized_init_args
  437. self.serialized_init_kwargs = serialized_init_kwargs
  438. # Deserialize properties when first accessed. See @property methods.
  439. self._deployment_def = None
  440. self._init_args = None
  441. self._init_kwargs = None
  442. # Configure ray_actor_options. These are the Ray options ultimately
  443. # passed into the replica's actor when it's created.
  444. self.ray_actor_options = ray_actor_options
  445. self.placement_group_bundles = placement_group_bundles
  446. self.placement_group_strategy = placement_group_strategy
  447. self.placement_group_bundle_label_selector = (
  448. placement_group_bundle_label_selector
  449. )
  450. self.placement_group_fallback_strategy = placement_group_fallback_strategy
  451. self.max_replicas_per_node = max_replicas_per_node
  452. self._normalize_bundle_label_selector()
  453. self._validate()
  454. # Create resource_dict. This contains info about the replica's resource
  455. # needs. It does NOT set the replica's resource usage. That's done by
  456. # the ray_actor_options.
  457. self.resource_dict = resources_from_ray_options(self.ray_actor_options)
  458. self.needs_pickle = needs_pickle
  459. def _normalize_bundle_label_selector(self):
  460. """If a single selector is provided for multiple bundles, it is broadcasted
  461. uniformly to all bundles.
  462. """
  463. if (
  464. self.placement_group_bundles
  465. and self.placement_group_bundle_label_selector
  466. and len(self.placement_group_bundle_label_selector) == 1
  467. and len(self.placement_group_bundles) > 1
  468. ):
  469. single_selector = self.placement_group_bundle_label_selector[0]
  470. self.placement_group_bundle_label_selector = [
  471. single_selector.copy() for _ in range(len(self.placement_group_bundles))
  472. ]
  473. def _validate(self):
  474. self._validate_ray_actor_options()
  475. self._validate_placement_group_options()
  476. self._validate_max_replicas_per_node()
  477. if (
  478. self.max_replicas_per_node is not None
  479. and self.placement_group_bundles is not None
  480. ):
  481. raise ValueError(
  482. "Setting max_replicas_per_node is not allowed when "
  483. "placement_group_bundles is provided."
  484. )
  485. def update(
  486. self,
  487. ray_actor_options: dict,
  488. placement_group_bundles: Optional[List[Dict[str, float]]] = None,
  489. placement_group_strategy: Optional[str] = None,
  490. placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None,
  491. placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None,
  492. max_replicas_per_node: Optional[int] = None,
  493. ):
  494. self.ray_actor_options = ray_actor_options
  495. self.placement_group_bundles = placement_group_bundles
  496. self.placement_group_strategy = placement_group_strategy
  497. self.placement_group_bundle_label_selector = (
  498. placement_group_bundle_label_selector
  499. )
  500. self.placement_group_fallback_strategy = placement_group_fallback_strategy
  501. self.max_replicas_per_node = max_replicas_per_node
  502. self._normalize_bundle_label_selector()
  503. self._validate()
  504. self.resource_dict = resources_from_ray_options(self.ray_actor_options)
  505. @classmethod
  506. def create(
  507. cls,
  508. deployment_def: Union[Callable, str],
  509. init_args: Optional[Tuple[Any]] = None,
  510. init_kwargs: Optional[Dict[Any, Any]] = None,
  511. ray_actor_options: Optional[Dict] = None,
  512. placement_group_bundles: Optional[List[Dict[str, float]]] = None,
  513. placement_group_strategy: Optional[str] = None,
  514. placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None,
  515. placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None,
  516. max_replicas_per_node: Optional[int] = None,
  517. deployment_def_name: Optional[str] = None,
  518. ):
  519. """Create a ReplicaConfig from deserialized parameters."""
  520. if not callable(deployment_def) and not isinstance(deployment_def, str):
  521. raise TypeError("@serve.deployment must be called on a class or function.")
  522. if not (init_args is None or isinstance(init_args, (tuple, list))):
  523. raise TypeError("init_args must be a tuple.")
  524. if not (init_kwargs is None or isinstance(init_kwargs, dict)):
  525. raise TypeError("init_kwargs must be a dict.")
  526. if inspect.isfunction(deployment_def):
  527. if init_args:
  528. raise ValueError("init_args not supported for function deployments.")
  529. elif init_kwargs:
  530. raise ValueError("init_kwargs not supported for function deployments.")
  531. if not isinstance(deployment_def, (Callable, str)):
  532. raise TypeError(
  533. f'Got invalid type "{type(deployment_def)}" for '
  534. "deployment_def. Expected deployment_def to be a "
  535. "class, function, or string."
  536. )
  537. # Set defaults
  538. if init_args is None:
  539. init_args = ()
  540. if init_kwargs is None:
  541. init_kwargs = {}
  542. if ray_actor_options is None:
  543. ray_actor_options = {}
  544. if deployment_def_name is None:
  545. if isinstance(deployment_def, str):
  546. deployment_def_name = deployment_def
  547. else:
  548. deployment_def_name = deployment_def.__name__
  549. config = cls(
  550. deployment_def_name=deployment_def_name,
  551. serialized_deployment_def=pickle_dumps(
  552. deployment_def,
  553. f"Could not serialize the deployment {repr(deployment_def)}",
  554. ),
  555. serialized_init_args=pickle_dumps(
  556. init_args, "Could not serialize the deployment init args"
  557. ),
  558. serialized_init_kwargs=pickle_dumps(
  559. init_kwargs, "Could not serialize the deployment init kwargs"
  560. ),
  561. ray_actor_options=ray_actor_options,
  562. placement_group_bundles=placement_group_bundles,
  563. placement_group_strategy=placement_group_strategy,
  564. placement_group_bundle_label_selector=placement_group_bundle_label_selector,
  565. placement_group_fallback_strategy=placement_group_fallback_strategy,
  566. max_replicas_per_node=max_replicas_per_node,
  567. )
  568. config._deployment_def = deployment_def
  569. config._init_args = init_args
  570. config._init_kwargs = init_kwargs
  571. return config
  572. def _validate_ray_actor_options(self):
  573. if not isinstance(self.ray_actor_options, dict):
  574. raise TypeError(
  575. f'Got invalid type "{type(self.ray_actor_options)}" for '
  576. "ray_actor_options. Expected a dictionary."
  577. )
  578. # Please keep this in sync with the docstring for the ray_actor_options
  579. # kwarg in api.py.
  580. allowed_ray_actor_options = {
  581. # Resource options
  582. "accelerator_type",
  583. "memory",
  584. "num_cpus",
  585. "num_gpus",
  586. "resources",
  587. # Other options
  588. "runtime_env",
  589. "label_selector",
  590. "fallback_strategy",
  591. }
  592. for option in self.ray_actor_options:
  593. if option not in allowed_ray_actor_options:
  594. raise ValueError(
  595. f"Specifying '{option}' in ray_actor_options is not allowed. "
  596. f"Allowed options: {allowed_ray_actor_options}"
  597. )
  598. ray_option_utils.validate_actor_options(self.ray_actor_options, in_options=True)
  599. # Set Serve replica defaults
  600. if self.ray_actor_options.get("num_cpus") is None:
  601. self.ray_actor_options["num_cpus"] = 1
  602. def _validate_max_replicas_per_node(self) -> None:
  603. if self.max_replicas_per_node is None:
  604. return
  605. if not isinstance(self.max_replicas_per_node, int):
  606. raise TypeError(
  607. f"Get invalid type '{type(self.max_replicas_per_node)}' for "
  608. "max_replicas_per_node. Expected None or an integer "
  609. f"in the range of [1, {MAX_REPLICAS_PER_NODE_MAX_VALUE}]."
  610. )
  611. if (
  612. self.max_replicas_per_node < 1
  613. or self.max_replicas_per_node > MAX_REPLICAS_PER_NODE_MAX_VALUE
  614. ):
  615. raise ValueError(
  616. f"Invalid max_replicas_per_node {self.max_replicas_per_node}. "
  617. "Valid values are None or an integer "
  618. f"in the range of [1, {MAX_REPLICAS_PER_NODE_MAX_VALUE}]."
  619. )
  620. def _validate_placement_group_options(self) -> None:
  621. if self.placement_group_strategy is not None:
  622. if self.placement_group_bundles is None:
  623. raise ValueError(
  624. "If `placement_group_strategy` is provided, "
  625. "`placement_group_bundles` must also be provided."
  626. )
  627. if self.placement_group_fallback_strategy is not None:
  628. if self.placement_group_bundles is None:
  629. raise ValueError(
  630. "If `placement_group_fallback_strategy` is provided, "
  631. "`placement_group_bundles` must also be provided."
  632. )
  633. if not isinstance(self.placement_group_fallback_strategy, list):
  634. raise TypeError(
  635. "placement_group_fallback_strategy must be a list of dictionaries. "
  636. f"Got: {type(self.placement_group_fallback_strategy)}."
  637. )
  638. for i, strategy in enumerate(self.placement_group_fallback_strategy):
  639. if not isinstance(strategy, dict):
  640. raise TypeError(
  641. f"placement_group_fallback_strategy entry at index {i} must be a dictionary. "
  642. f"Got: {type(strategy)}."
  643. )
  644. if self.placement_group_bundle_label_selector is not None:
  645. if self.placement_group_bundles is None:
  646. raise ValueError(
  647. "If `placement_group_bundle_label_selector` is provided, "
  648. "`placement_group_bundles` must also be provided."
  649. )
  650. if self.placement_group_bundles is not None:
  651. validate_placement_group(
  652. bundles=self.placement_group_bundles,
  653. strategy=self.placement_group_strategy or "PACK",
  654. lifetime="detached",
  655. bundle_label_selector=self.placement_group_bundle_label_selector,
  656. )
  657. resource_error_prefix = (
  658. "When using `placement_group_bundles`, the replica actor "
  659. "will be placed in the first bundle, so the resource "
  660. "requirements for the actor must be a subset of the first "
  661. "bundle."
  662. )
  663. first_bundle = self.placement_group_bundles[0]
  664. # Validate that the replica actor fits in the first bundle.
  665. bundle_cpu = first_bundle.get("CPU", 0)
  666. replica_actor_num_cpus = self.ray_actor_options.get("num_cpus", 0)
  667. if bundle_cpu < replica_actor_num_cpus:
  668. raise ValueError(
  669. f"{resource_error_prefix} `num_cpus` for the actor is "
  670. f"{replica_actor_num_cpus}, but the bundle only has "
  671. f"{bundle_cpu} `CPU` specified."
  672. )
  673. bundle_gpu = first_bundle.get("GPU", 0)
  674. replica_actor_num_gpus = self.ray_actor_options.get("num_gpus", 0)
  675. if bundle_gpu < replica_actor_num_gpus:
  676. raise ValueError(
  677. f"{resource_error_prefix} `num_gpus` for the actor is "
  678. f"{replica_actor_num_gpus}, but the bundle only has "
  679. f"{bundle_gpu} `GPU` specified."
  680. )
  681. replica_actor_resources = self.ray_actor_options.get("resources", {})
  682. for actor_resource, actor_value in replica_actor_resources.items():
  683. bundle_value = first_bundle.get(actor_resource, 0)
  684. if bundle_value < actor_value:
  685. raise ValueError(
  686. f"{resource_error_prefix} `{actor_resource}` requirement "
  687. f"for the actor is {actor_value}, but the bundle only "
  688. f"has {bundle_value} `{actor_resource}` specified."
  689. )
  690. @property
  691. def deployment_def(self) -> Union[Callable, str]:
  692. """The code, or a reference to the code, that this replica runs.
  693. For Python replicas, this can be one of the following:
  694. - Function (Callable)
  695. - Class (Callable)
  696. - Import path (str)
  697. For Java replicas, this can be one of the following:
  698. - Class path (str)
  699. """
  700. if self._deployment_def is None:
  701. if self.needs_pickle:
  702. self._deployment_def = cloudpickle.loads(self.serialized_deployment_def)
  703. else:
  704. self._deployment_def = self.serialized_deployment_def.decode(
  705. encoding="utf-8"
  706. )
  707. return self._deployment_def
  708. @property
  709. def init_args(self) -> Optional[Union[Tuple[Any], bytes]]:
  710. """The init_args for a Python class.
  711. This property is only meaningful if deployment_def is a Python class.
  712. Otherwise, it is None.
  713. """
  714. if self._init_args is None:
  715. if self.needs_pickle:
  716. self._init_args = cloudpickle.loads(self.serialized_init_args)
  717. else:
  718. self._init_args = self.serialized_init_args
  719. return self._init_args
  720. @property
  721. def init_kwargs(self) -> Optional[Tuple[Any]]:
  722. """The init_kwargs for a Python class.
  723. This property is only meaningful if deployment_def is a Python class.
  724. Otherwise, it is None.
  725. """
  726. if self._init_kwargs is None:
  727. self._init_kwargs = cloudpickle.loads(self.serialized_init_kwargs)
  728. return self._init_kwargs
  729. @classmethod
  730. def from_proto(cls, proto: ReplicaConfigProto, needs_pickle: bool = True):
  731. return ReplicaConfig(
  732. deployment_def_name=proto.deployment_def_name,
  733. serialized_deployment_def=proto.deployment_def,
  734. serialized_init_args=(proto.init_args if proto.init_args != b"" else None),
  735. serialized_init_kwargs=(
  736. proto.init_kwargs if proto.init_kwargs != b"" else None
  737. ),
  738. ray_actor_options=json.loads(proto.ray_actor_options),
  739. placement_group_bundles=(
  740. json.loads(proto.placement_group_bundles)
  741. if proto.placement_group_bundles
  742. else None
  743. ),
  744. placement_group_strategy=(
  745. proto.placement_group_strategy
  746. if proto.placement_group_strategy != ""
  747. else None
  748. ),
  749. placement_group_bundle_label_selector=(
  750. json.loads(proto.placement_group_bundle_label_selector)
  751. if proto.placement_group_bundle_label_selector
  752. else None
  753. ),
  754. placement_group_fallback_strategy=(
  755. json.loads(proto.placement_group_fallback_strategy)
  756. if proto.placement_group_fallback_strategy
  757. else None
  758. ),
  759. max_replicas_per_node=(
  760. proto.max_replicas_per_node if proto.max_replicas_per_node else None
  761. ),
  762. needs_pickle=needs_pickle,
  763. )
  764. @classmethod
  765. def from_proto_bytes(cls, proto_bytes: bytes, needs_pickle: bool = True):
  766. proto = ReplicaConfigProto.FromString(proto_bytes)
  767. return cls.from_proto(proto, needs_pickle)
  768. def to_proto(self):
  769. placement_group_bundles = (
  770. json.dumps(self.placement_group_bundles)
  771. if self.placement_group_bundles is not None
  772. else ""
  773. )
  774. bundle_label_selector = (
  775. json.dumps(self.placement_group_bundle_label_selector)
  776. if self.placement_group_bundle_label_selector is not None
  777. else ""
  778. )
  779. fallback_strategy = (
  780. json.dumps(self.placement_group_fallback_strategy)
  781. if self.placement_group_fallback_strategy is not None
  782. else ""
  783. )
  784. max_replicas_per_node = (
  785. self.max_replicas_per_node if self.max_replicas_per_node is not None else 0
  786. )
  787. return ReplicaConfigProto(
  788. deployment_def_name=self.deployment_def_name,
  789. deployment_def=self.serialized_deployment_def,
  790. init_args=self.serialized_init_args,
  791. init_kwargs=self.serialized_init_kwargs,
  792. ray_actor_options=json.dumps(self.ray_actor_options),
  793. placement_group_bundles=placement_group_bundles,
  794. placement_group_strategy=self.placement_group_strategy,
  795. placement_group_bundle_label_selector=bundle_label_selector,
  796. placement_group_fallback_strategy=fallback_strategy,
  797. max_replicas_per_node=max_replicas_per_node,
  798. )
  799. def to_proto_bytes(self):
  800. return self.to_proto().SerializeToString()
  801. def to_dict(self):
  802. # only use for logging purposes
  803. return {
  804. "deployment_def_name": self.deployment_def_name,
  805. "ray_actor_options": self.ray_actor_options,
  806. "placement_group_bundles": self.placement_group_bundles,
  807. "placement_group_strategy": self.placement_group_strategy,
  808. "placement_group_bundle_label_selector": self.placement_group_bundle_label_selector,
  809. "placement_group_fallback_strategy": self.placement_group_fallback_strategy,
  810. "max_replicas_per_node": self.max_replicas_per_node,
  811. }
  812. def prepare_imperative_http_options(
  813. proxy_location: Union[None, str, ProxyLocation],
  814. http_options: Union[None, dict, HTTPOptions],
  815. ) -> HTTPOptions:
  816. """Prepare `HTTPOptions` with a resolved `location` based on `proxy_location` and `http_options`.
  817. Precedence:
  818. - If `proxy_location` is provided, it overrides any `location` in `http_options`.
  819. - Else if `http_options` specifies a `location` explicitly (HTTPOptions(...) or dict with 'location'), keep it.
  820. - Else (no `proxy_location` and no explicit `location`) set `location` to `DeploymentMode.EveryNode`.
  821. A bare `HTTPOptions()` counts as an explicit default (`HeadOnly`).
  822. Args:
  823. proxy_location: Optional ProxyLocation (or its string representation).
  824. http_options: Optional HTTPOptions instance or dict. If None, a new HTTPOptions() is created.
  825. Returns:
  826. HTTPOptions: New instance with resolved location.
  827. Note:
  828. 1. Default ProxyLocation (when unspecified) resolves to DeploymentMode.EveryNode.
  829. 2. Default HTTPOptions() location is DeploymentMode.HeadOnly.
  830. 3. `HTTPOptions` is used in `imperative` mode (Python API) cluster set-up.
  831. `Declarative` mode (CLI / REST) uses `HTTPOptionsSchema`.
  832. Raises:
  833. ValueError: If http_options is not None, dict, or HTTPOptions.
  834. """
  835. if http_options is None:
  836. location_set_explicitly = False
  837. http_options = HTTPOptions()
  838. elif isinstance(http_options, dict):
  839. location_set_explicitly = "location" in http_options
  840. http_options = HTTPOptions(**http_options)
  841. elif isinstance(http_options, HTTPOptions):
  842. # empty `HTTPOptions()` is considered as user specified the default location value `HeadOnly` explicitly
  843. location_set_explicitly = True
  844. http_options = HTTPOptions(**http_options.dict(exclude_unset=True))
  845. else:
  846. raise ValueError(
  847. f"Unexpected type for http_options: `{type(http_options).__name__}`"
  848. )
  849. if proxy_location is None:
  850. if not location_set_explicitly:
  851. http_options.location = DeploymentMode.EveryNode
  852. else:
  853. http_options.location = ProxyLocation._to_deployment_mode(proxy_location)
  854. return http_options