| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979 |
- import inspect
- import json
- from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
- from google.protobuf.descriptor import FieldDescriptor
- from google.protobuf.message import Message
- from ray import cloudpickle
- from ray._common import ray_option_utils
- from ray._common.pydantic_compat import (
- BaseModel,
- Field,
- NonNegativeFloat,
- NonNegativeInt,
- PositiveFloat,
- PositiveInt,
- validator,
- )
- from ray._common.serialization import pickle_dumps
- from ray._common.utils import resources_from_ray_options
- from ray.serve._private.constants import (
- DEFAULT_CONSTRUCTOR_RETRY_COUNT,
- DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_S,
- DEFAULT_GRACEFUL_SHUTDOWN_WAIT_LOOP_S,
- DEFAULT_HEALTH_CHECK_PERIOD_S,
- DEFAULT_HEALTH_CHECK_TIMEOUT_S,
- DEFAULT_MAX_ONGOING_REQUESTS,
- MAX_REPLICAS_PER_NODE_MAX_VALUE,
- )
- from ray.serve._private.utils import DEFAULT, DeploymentOptionUpdateType
- from ray.serve.config import (
- AggregationFunction,
- AutoscalingConfig,
- DeploymentMode,
- HTTPOptions,
- ProxyLocation,
- RequestRouterConfig,
- )
- from ray.serve.generated.serve_pb2 import (
- AutoscalingConfig as AutoscalingConfigProto,
- DeploymentConfig as DeploymentConfigProto,
- DeploymentLanguage,
- EncodingType as EncodingTypeProto,
- LoggingConfig as LoggingConfigProto,
- ReplicaConfig as ReplicaConfigProto,
- RequestRouterConfig as RequestRouterConfigProto,
- )
- from ray.util.placement_group import validate_placement_group
- def _needs_pickle(deployment_language: DeploymentLanguage, is_cross_language: bool):
- """From Serve client API's perspective, decide whether pickling is needed."""
- if deployment_language == DeploymentLanguage.PYTHON and not is_cross_language:
- # Python client deploying Python replicas.
- return True
- elif deployment_language == DeploymentLanguage.JAVA and is_cross_language:
- # Python client deploying Java replicas,
- # using xlang serialization via cloudpickle.
- return True
- else:
- return False
- def _proto_to_dict(proto: Message) -> Dict:
- """Recursively convert a protobuf into a Python dictionary.
- This is an alternative to protobuf's `MessageToDict`. Unlike
- `MessageToDict`, this function doesn't add an extra base64
- encoding to bytes when constructing a json response.
- """
- data = {}
- # Fill data with non-empty fields.
- for field, value in proto.ListFields():
- # Handle repeated fields
- if field.label == FieldDescriptor.LABEL_REPEATED:
- # if we dont do this block the repeated field will be a list of
- # `google.protobuf.internal.containers.RepeatedScalarFieldContainer
- # Explicitly convert to list
- if field.type == FieldDescriptor.TYPE_MESSAGE:
- data[field.name] = [
- _proto_to_dict(v) for v in value
- ] # Convert each item
- else:
- data[field.name] = list(value) # Convert to list directly
- # Recursively call if the field is another protobuf.
- elif field.type == FieldDescriptor.TYPE_MESSAGE:
- data[field.name] = _proto_to_dict(value)
- else:
- data[field.name] = value
- # Fill data default values.
- for field in proto.DESCRIPTOR.fields:
- if (
- field.name not in data # skip the fields that are already set
- and field.type != FieldDescriptor.TYPE_MESSAGE # skip nested messages
- and not field.containing_oneof # skip optional fields
- ):
- data[field.name] = field.default_value
- return data
- class DeploymentConfig(BaseModel):
- """Internal datastructure wrapping config options for a deployment.
- Args:
- num_replicas: The number of processes to start up that
- handles requests to this deployment. Defaults to 1.
- max_ongoing_requests: The maximum number of queries
- that is sent to a replica of this deployment without receiving
- a response. Defaults to 5.
- max_queued_requests: Maximum number of requests to this deployment that will be
- queued at each *caller* (proxy or DeploymentHandle). Once this limit is
- reached, subsequent requests will raise a BackPressureError (for handles) or
- return an HTTP 503 status code (for HTTP requests). Defaults to -1 (no
- limit).
- user_config: Arguments to pass to the reconfigure
- method of the deployment. The reconfigure method is called if
- user_config is not None. Must be JSON-serializable.
- graceful_shutdown_wait_loop_s: Duration
- that deployment replicas wait until there is no more work to
- be done before shutting down.
- graceful_shutdown_timeout_s: Controller waits for this duration
- to forcefully kill the replica for shutdown.
- health_check_period_s: Frequency at which the controller health
- checks replicas.
- health_check_timeout_s: Timeout that the controller waits for a
- response from the replica's health check before marking it
- unhealthy.
- autoscaling_config: Autoscaling configuration.
- logging_config: Configuration for deployment logs.
- user_configured_option_names: The names of options manually
- configured by the user.
- request_router_config: Configuration for deployment request router.
- max_constructor_retry_count: Maximum number of times to retry the
- deployment constructor. Defaults to 20.
- """
- num_replicas: Optional[NonNegativeInt] = Field(
- default=1, update_type=DeploymentOptionUpdateType.LightWeight
- )
- max_ongoing_requests: PositiveInt = Field(
- default=DEFAULT_MAX_ONGOING_REQUESTS,
- update_type=DeploymentOptionUpdateType.NeedsActorReconfigure,
- )
- max_queued_requests: int = Field(
- default=-1,
- update_type=DeploymentOptionUpdateType.LightWeight,
- )
- user_config: Any = Field(
- default=None, update_type=DeploymentOptionUpdateType.NeedsActorReconfigure
- )
- graceful_shutdown_timeout_s: NonNegativeFloat = Field(
- default=DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_S,
- update_type=DeploymentOptionUpdateType.NeedsReconfigure,
- )
- graceful_shutdown_wait_loop_s: NonNegativeFloat = Field(
- default=DEFAULT_GRACEFUL_SHUTDOWN_WAIT_LOOP_S,
- update_type=DeploymentOptionUpdateType.NeedsActorReconfigure,
- )
- health_check_period_s: PositiveFloat = Field(
- default=DEFAULT_HEALTH_CHECK_PERIOD_S,
- update_type=DeploymentOptionUpdateType.NeedsReconfigure,
- )
- health_check_timeout_s: PositiveFloat = Field(
- default=DEFAULT_HEALTH_CHECK_TIMEOUT_S,
- update_type=DeploymentOptionUpdateType.NeedsReconfigure,
- )
- autoscaling_config: Optional[AutoscalingConfig] = Field(
- default=None, update_type=DeploymentOptionUpdateType.NeedsActorReconfigure
- )
- request_router_config: RequestRouterConfig = Field(
- default_factory=RequestRouterConfig,
- update_type=DeploymentOptionUpdateType.NeedsActorReconfigure,
- )
- # This flag is used to let replica know they are deployed from
- # a different language.
- is_cross_language: bool = False
- # This flag is used to let controller know which language does
- # the deployment use.
- deployment_language: Any = DeploymentLanguage.PYTHON
- version: Optional[str] = Field(
- default=None,
- update_type=DeploymentOptionUpdateType.HeavyWeight,
- )
- logging_config: Optional[dict] = Field(
- default=None,
- update_type=DeploymentOptionUpdateType.NeedsActorReconfigure,
- )
- max_constructor_retry_count: PositiveInt = Field(
- default=DEFAULT_CONSTRUCTOR_RETRY_COUNT,
- update_type=DeploymentOptionUpdateType.NeedsReconfigure,
- )
- # Contains the names of deployment options manually set by the user
- user_configured_option_names: Set[str] = set()
- class Config:
- validate_assignment = True
- arbitrary_types_allowed = True
- @validator("user_config", always=True)
- def user_config_json_serializable(cls, v):
- if isinstance(v, bytes):
- return v
- if v is not None:
- try:
- json.dumps(v)
- except TypeError as e:
- raise ValueError(f"user_config is not JSON-serializable: {str(e)}.")
- return v
- @validator("logging_config", always=True)
- def logging_config_valid(cls, v):
- if v is None:
- return v
- if not isinstance(v, dict):
- raise TypeError(
- f"Got invalid type '{type(v)}' for logging_config. "
- "Expected a dictionary."
- )
- # Handle default value
- from ray.serve.schema import LoggingConfig
- v = LoggingConfig(**v).dict()
- return v
- @validator("max_queued_requests", always=True)
- def validate_max_queued_requests(cls, v):
- if not isinstance(v, int):
- raise TypeError("max_queued_requests must be an integer.")
- if v < 1 and v != -1:
- raise ValueError(
- "max_queued_requests must be -1 (no limit) or a positive integer."
- )
- return v
- def needs_pickle(self):
- return _needs_pickle(self.deployment_language, self.is_cross_language)
- def to_proto(self):
- data = self.dict()
- if data.get("user_config") is not None:
- if self.needs_pickle():
- data["user_config"] = cloudpickle.dumps(data["user_config"])
- if data.get("autoscaling_config"):
- # By setting the serialized policy def, on the protobuf level, AutoscalingConfig constructor will not
- # try to import the policy from the string import path when the protobuf is deserialized on the controller side
- data["autoscaling_config"]["policy"][
- "_serialized_policy_def"
- ] = self.autoscaling_config.policy._serialized_policy_def
- data["autoscaling_config"] = AutoscalingConfigProto(
- **data["autoscaling_config"]
- )
- if data.get("request_router_config"):
- router_kwargs = data["request_router_config"].get("request_router_kwargs")
- if router_kwargs is not None:
- if not router_kwargs:
- data["request_router_config"]["request_router_kwargs"] = b""
- elif self.needs_pickle():
- # Protobuf requires bytes, so we need to pickle
- data["request_router_config"][
- "request_router_kwargs"
- ] = cloudpickle.dumps(router_kwargs)
- else:
- raise ValueError(
- "Non-empty request_router_kwargs not supported"
- f"for cross-language deployments. Got: {router_kwargs}"
- )
- # By setting the serialized request router cls, on the protobuf level, RequestRouterConfig constructor will not
- # try to import the request router cls from the string import path when the protobuf is deserialized on the controller side
- data["request_router_config"][
- "_serialized_request_router_cls"
- ] = self.request_router_config._serialized_request_router_cls
- data["request_router_config"] = RequestRouterConfigProto(
- **data["request_router_config"]
- )
- if data.get("logging_config"):
- if "encoding" in data["logging_config"]:
- data["logging_config"]["encoding"] = EncodingTypeProto.Value(
- data["logging_config"]["encoding"]
- )
- data["logging_config"] = LoggingConfigProto(**data["logging_config"])
- data["user_configured_option_names"] = list(
- data["user_configured_option_names"]
- )
- return DeploymentConfigProto(**data)
- def to_proto_bytes(self):
- return self.to_proto().SerializeToString()
- def to_dict(self):
- # only use for logging purposes
- return self.dict()
- @classmethod
- def from_proto(cls, proto: DeploymentConfigProto):
- data = _proto_to_dict(proto)
- deployment_language = (
- data["deployment_language"]
- if "deployment_language" in data
- else DeploymentLanguage.PYTHON
- )
- is_cross_language = (
- data["is_cross_language"] if "is_cross_language" in data else False
- )
- needs_pickle = _needs_pickle(deployment_language, is_cross_language)
- if "user_config" in data:
- if data["user_config"] != b"":
- if needs_pickle:
- data["user_config"] = cloudpickle.loads(proto.user_config)
- else:
- data["user_config"] = proto.user_config
- else:
- data["user_config"] = None
- if "request_router_config" in data:
- if "request_router_kwargs" in data["request_router_config"]:
- request_router_kwargs = data["request_router_config"][
- "request_router_kwargs"
- ]
- if request_router_kwargs != b"":
- if needs_pickle:
- data["request_router_config"][
- "request_router_kwargs"
- ] = cloudpickle.loads(
- proto.request_router_config.request_router_kwargs
- )
- else:
- data["request_router_config"][
- "request_router_kwargs"
- ] = proto.request_router_config.request_router_kwargs
- else:
- data["request_router_config"]["request_router_kwargs"] = {}
- data["request_router_config"] = RequestRouterConfig(
- **data["request_router_config"]
- )
- if "autoscaling_config" in data:
- if not data["autoscaling_config"].get("upscale_smoothing_factor"):
- data["autoscaling_config"]["upscale_smoothing_factor"] = None
- if not data["autoscaling_config"].get("downscale_smoothing_factor"):
- data["autoscaling_config"]["downscale_smoothing_factor"] = None
- if not data["autoscaling_config"].get("upscaling_factor"):
- data["autoscaling_config"]["upscaling_factor"] = None
- if not data["autoscaling_config"].get("downscaling_factor"):
- data["autoscaling_config"]["downscaling_factor"] = None
- if not data["autoscaling_config"].get("target_ongoing_requests"):
- data["autoscaling_config"]["target_ongoing_requests"] = None
- if not data["autoscaling_config"].get("aggregation_function"):
- data["autoscaling_config"][
- "aggregation_function"
- ] = AggregationFunction.MEAN
- data["autoscaling_config"] = AutoscalingConfig(**data["autoscaling_config"])
- if "version" in data:
- if data["version"] == "":
- data["version"] = None
- if "user_configured_option_names" in data:
- data["user_configured_option_names"] = set(
- data["user_configured_option_names"]
- )
- if "logging_config" in data:
- if "encoding" in data["logging_config"]:
- data["logging_config"]["encoding"] = EncodingTypeProto.Name(
- data["logging_config"]["encoding"]
- )
- return cls(**data)
- @classmethod
- def from_proto_bytes(cls, proto_bytes: bytes):
- proto = DeploymentConfigProto.FromString(proto_bytes)
- return cls.from_proto(proto)
- @classmethod
- def from_default(cls, **kwargs):
- """Creates a default DeploymentConfig and overrides it with kwargs.
- Ignores any kwargs set to DEFAULT.VALUE.
- Raises:
- TypeError: when a keyword that's not an argument to the class is
- passed in.
- """
- config = cls()
- valid_config_options = set(config.dict().keys())
- # Friendly error if a non-DeploymentConfig kwarg was passed in
- for key, val in kwargs.items():
- if key not in valid_config_options:
- raise TypeError(
- f'Got invalid Deployment config option "{key}" '
- f"(with value {val}) as keyword argument. All Deployment "
- "config options must come from this list: "
- f"{list(valid_config_options)}."
- )
- kwargs = {key: val for key, val in kwargs.items() if val != DEFAULT.VALUE}
- for key, val in kwargs.items():
- config.__setattr__(key, val)
- return config
- def handle_num_replicas_auto(
- max_ongoing_requests: Union[int, DEFAULT],
- autoscaling_config: Optional[Union[Dict, AutoscalingConfig, DEFAULT]],
- ):
- """Return modified `max_ongoing_requests` and `autoscaling_config`
- for when num_replicas="auto".
- If `autoscaling_config` is unspecified, returns the modified value
- AutoscalingConfig.default().
- If it is specified, the specified fields in `autoscaling_config`
- override that of AutoscalingConfig.default().
- """
- if autoscaling_config in [DEFAULT.VALUE, None]:
- # If autoscaling config wasn't specified, use default
- # configuration
- autoscaling_config = AutoscalingConfig.default()
- else:
- # If autoscaling config was specified, values specified in
- # autoscaling config overrides the default configuration
- default_config = AutoscalingConfig.default().dict(exclude_unset=True)
- autoscaling_config = (
- autoscaling_config
- if isinstance(autoscaling_config, dict)
- else autoscaling_config.dict(exclude_unset=True)
- )
- default_config.update(autoscaling_config)
- autoscaling_config = AutoscalingConfig(**default_config)
- return max_ongoing_requests, autoscaling_config
- class ReplicaConfig:
- """Internal datastructure wrapping config options for a deployment's replicas.
- Provides five main properties (see property docstrings for more info):
- deployment_def: the code, or a reference to the code, that this
- replica should run.
- init_args: the deployment_def's init_args.
- init_kwargs: the deployment_def's init_kwargs.
- ray_actor_options: the Ray actor options to pass into the replica's
- actor.
- resource_dict: contains info on this replica's actor's resource needs.
- Offers a serialized equivalent (e.g. serialized_deployment_def) for
- deployment_def, init_args, and init_kwargs. Deserializes these properties
- when they're first accessed, if they were not passed in directly through
- create().
- Use the classmethod create() to make a ReplicaConfig with the deserialized
- properties.
- Note: overwriting or setting any property after the ReplicaConfig has been
- constructed is currently undefined behavior. The config's fields should not
- be modified externally after it is created.
- """
- def __init__(
- self,
- deployment_def_name: str,
- serialized_deployment_def: bytes,
- serialized_init_args: bytes,
- serialized_init_kwargs: bytes,
- ray_actor_options: Dict,
- placement_group_bundles: Optional[List[Dict[str, float]]] = None,
- placement_group_strategy: Optional[str] = None,
- placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None,
- placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None,
- max_replicas_per_node: Optional[int] = None,
- needs_pickle: bool = True,
- ):
- """Construct a ReplicaConfig with serialized properties.
- All parameters are required. See classmethod create() for defaults.
- """
- self.deployment_def_name = deployment_def_name
- # Store serialized versions of code properties.
- self.serialized_deployment_def = serialized_deployment_def
- self.serialized_init_args = serialized_init_args
- self.serialized_init_kwargs = serialized_init_kwargs
- # Deserialize properties when first accessed. See @property methods.
- self._deployment_def = None
- self._init_args = None
- self._init_kwargs = None
- # Configure ray_actor_options. These are the Ray options ultimately
- # passed into the replica's actor when it's created.
- self.ray_actor_options = ray_actor_options
- self.placement_group_bundles = placement_group_bundles
- self.placement_group_strategy = placement_group_strategy
- self.placement_group_bundle_label_selector = (
- placement_group_bundle_label_selector
- )
- self.placement_group_fallback_strategy = placement_group_fallback_strategy
- self.max_replicas_per_node = max_replicas_per_node
- self._normalize_bundle_label_selector()
- self._validate()
- # Create resource_dict. This contains info about the replica's resource
- # needs. It does NOT set the replica's resource usage. That's done by
- # the ray_actor_options.
- self.resource_dict = resources_from_ray_options(self.ray_actor_options)
- self.needs_pickle = needs_pickle
- def _normalize_bundle_label_selector(self):
- """If a single selector is provided for multiple bundles, it is broadcasted
- uniformly to all bundles.
- """
- if (
- self.placement_group_bundles
- and self.placement_group_bundle_label_selector
- and len(self.placement_group_bundle_label_selector) == 1
- and len(self.placement_group_bundles) > 1
- ):
- single_selector = self.placement_group_bundle_label_selector[0]
- self.placement_group_bundle_label_selector = [
- single_selector.copy() for _ in range(len(self.placement_group_bundles))
- ]
- def _validate(self):
- self._validate_ray_actor_options()
- self._validate_placement_group_options()
- self._validate_max_replicas_per_node()
- if (
- self.max_replicas_per_node is not None
- and self.placement_group_bundles is not None
- ):
- raise ValueError(
- "Setting max_replicas_per_node is not allowed when "
- "placement_group_bundles is provided."
- )
- def update(
- self,
- ray_actor_options: dict,
- placement_group_bundles: Optional[List[Dict[str, float]]] = None,
- placement_group_strategy: Optional[str] = None,
- placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None,
- placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None,
- max_replicas_per_node: Optional[int] = None,
- ):
- self.ray_actor_options = ray_actor_options
- self.placement_group_bundles = placement_group_bundles
- self.placement_group_strategy = placement_group_strategy
- self.placement_group_bundle_label_selector = (
- placement_group_bundle_label_selector
- )
- self.placement_group_fallback_strategy = placement_group_fallback_strategy
- self.max_replicas_per_node = max_replicas_per_node
- self._normalize_bundle_label_selector()
- self._validate()
- self.resource_dict = resources_from_ray_options(self.ray_actor_options)
- @classmethod
- def create(
- cls,
- deployment_def: Union[Callable, str],
- init_args: Optional[Tuple[Any]] = None,
- init_kwargs: Optional[Dict[Any, Any]] = None,
- ray_actor_options: Optional[Dict] = None,
- placement_group_bundles: Optional[List[Dict[str, float]]] = None,
- placement_group_strategy: Optional[str] = None,
- placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None,
- placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None,
- max_replicas_per_node: Optional[int] = None,
- deployment_def_name: Optional[str] = None,
- ):
- """Create a ReplicaConfig from deserialized parameters."""
- if not callable(deployment_def) and not isinstance(deployment_def, str):
- raise TypeError("@serve.deployment must be called on a class or function.")
- if not (init_args is None or isinstance(init_args, (tuple, list))):
- raise TypeError("init_args must be a tuple.")
- if not (init_kwargs is None or isinstance(init_kwargs, dict)):
- raise TypeError("init_kwargs must be a dict.")
- if inspect.isfunction(deployment_def):
- if init_args:
- raise ValueError("init_args not supported for function deployments.")
- elif init_kwargs:
- raise ValueError("init_kwargs not supported for function deployments.")
- if not isinstance(deployment_def, (Callable, str)):
- raise TypeError(
- f'Got invalid type "{type(deployment_def)}" for '
- "deployment_def. Expected deployment_def to be a "
- "class, function, or string."
- )
- # Set defaults
- if init_args is None:
- init_args = ()
- if init_kwargs is None:
- init_kwargs = {}
- if ray_actor_options is None:
- ray_actor_options = {}
- if deployment_def_name is None:
- if isinstance(deployment_def, str):
- deployment_def_name = deployment_def
- else:
- deployment_def_name = deployment_def.__name__
- config = cls(
- deployment_def_name=deployment_def_name,
- serialized_deployment_def=pickle_dumps(
- deployment_def,
- f"Could not serialize the deployment {repr(deployment_def)}",
- ),
- serialized_init_args=pickle_dumps(
- init_args, "Could not serialize the deployment init args"
- ),
- serialized_init_kwargs=pickle_dumps(
- init_kwargs, "Could not serialize the deployment init kwargs"
- ),
- ray_actor_options=ray_actor_options,
- placement_group_bundles=placement_group_bundles,
- placement_group_strategy=placement_group_strategy,
- placement_group_bundle_label_selector=placement_group_bundle_label_selector,
- placement_group_fallback_strategy=placement_group_fallback_strategy,
- max_replicas_per_node=max_replicas_per_node,
- )
- config._deployment_def = deployment_def
- config._init_args = init_args
- config._init_kwargs = init_kwargs
- return config
- def _validate_ray_actor_options(self):
- if not isinstance(self.ray_actor_options, dict):
- raise TypeError(
- f'Got invalid type "{type(self.ray_actor_options)}" for '
- "ray_actor_options. Expected a dictionary."
- )
- # Please keep this in sync with the docstring for the ray_actor_options
- # kwarg in api.py.
- allowed_ray_actor_options = {
- # Resource options
- "accelerator_type",
- "memory",
- "num_cpus",
- "num_gpus",
- "resources",
- # Other options
- "runtime_env",
- "label_selector",
- "fallback_strategy",
- }
- for option in self.ray_actor_options:
- if option not in allowed_ray_actor_options:
- raise ValueError(
- f"Specifying '{option}' in ray_actor_options is not allowed. "
- f"Allowed options: {allowed_ray_actor_options}"
- )
- ray_option_utils.validate_actor_options(self.ray_actor_options, in_options=True)
- # Set Serve replica defaults
- if self.ray_actor_options.get("num_cpus") is None:
- self.ray_actor_options["num_cpus"] = 1
- def _validate_max_replicas_per_node(self) -> None:
- if self.max_replicas_per_node is None:
- return
- if not isinstance(self.max_replicas_per_node, int):
- raise TypeError(
- f"Get invalid type '{type(self.max_replicas_per_node)}' for "
- "max_replicas_per_node. Expected None or an integer "
- f"in the range of [1, {MAX_REPLICAS_PER_NODE_MAX_VALUE}]."
- )
- if (
- self.max_replicas_per_node < 1
- or self.max_replicas_per_node > MAX_REPLICAS_PER_NODE_MAX_VALUE
- ):
- raise ValueError(
- f"Invalid max_replicas_per_node {self.max_replicas_per_node}. "
- "Valid values are None or an integer "
- f"in the range of [1, {MAX_REPLICAS_PER_NODE_MAX_VALUE}]."
- )
- def _validate_placement_group_options(self) -> None:
- if self.placement_group_strategy is not None:
- if self.placement_group_bundles is None:
- raise ValueError(
- "If `placement_group_strategy` is provided, "
- "`placement_group_bundles` must also be provided."
- )
- if self.placement_group_fallback_strategy is not None:
- if self.placement_group_bundles is None:
- raise ValueError(
- "If `placement_group_fallback_strategy` is provided, "
- "`placement_group_bundles` must also be provided."
- )
- if not isinstance(self.placement_group_fallback_strategy, list):
- raise TypeError(
- "placement_group_fallback_strategy must be a list of dictionaries. "
- f"Got: {type(self.placement_group_fallback_strategy)}."
- )
- for i, strategy in enumerate(self.placement_group_fallback_strategy):
- if not isinstance(strategy, dict):
- raise TypeError(
- f"placement_group_fallback_strategy entry at index {i} must be a dictionary. "
- f"Got: {type(strategy)}."
- )
- if self.placement_group_bundle_label_selector is not None:
- if self.placement_group_bundles is None:
- raise ValueError(
- "If `placement_group_bundle_label_selector` is provided, "
- "`placement_group_bundles` must also be provided."
- )
- if self.placement_group_bundles is not None:
- validate_placement_group(
- bundles=self.placement_group_bundles,
- strategy=self.placement_group_strategy or "PACK",
- lifetime="detached",
- bundle_label_selector=self.placement_group_bundle_label_selector,
- )
- resource_error_prefix = (
- "When using `placement_group_bundles`, the replica actor "
- "will be placed in the first bundle, so the resource "
- "requirements for the actor must be a subset of the first "
- "bundle."
- )
- first_bundle = self.placement_group_bundles[0]
- # Validate that the replica actor fits in the first bundle.
- bundle_cpu = first_bundle.get("CPU", 0)
- replica_actor_num_cpus = self.ray_actor_options.get("num_cpus", 0)
- if bundle_cpu < replica_actor_num_cpus:
- raise ValueError(
- f"{resource_error_prefix} `num_cpus` for the actor is "
- f"{replica_actor_num_cpus}, but the bundle only has "
- f"{bundle_cpu} `CPU` specified."
- )
- bundle_gpu = first_bundle.get("GPU", 0)
- replica_actor_num_gpus = self.ray_actor_options.get("num_gpus", 0)
- if bundle_gpu < replica_actor_num_gpus:
- raise ValueError(
- f"{resource_error_prefix} `num_gpus` for the actor is "
- f"{replica_actor_num_gpus}, but the bundle only has "
- f"{bundle_gpu} `GPU` specified."
- )
- replica_actor_resources = self.ray_actor_options.get("resources", {})
- for actor_resource, actor_value in replica_actor_resources.items():
- bundle_value = first_bundle.get(actor_resource, 0)
- if bundle_value < actor_value:
- raise ValueError(
- f"{resource_error_prefix} `{actor_resource}` requirement "
- f"for the actor is {actor_value}, but the bundle only "
- f"has {bundle_value} `{actor_resource}` specified."
- )
- @property
- def deployment_def(self) -> Union[Callable, str]:
- """The code, or a reference to the code, that this replica runs.
- For Python replicas, this can be one of the following:
- - Function (Callable)
- - Class (Callable)
- - Import path (str)
- For Java replicas, this can be one of the following:
- - Class path (str)
- """
- if self._deployment_def is None:
- if self.needs_pickle:
- self._deployment_def = cloudpickle.loads(self.serialized_deployment_def)
- else:
- self._deployment_def = self.serialized_deployment_def.decode(
- encoding="utf-8"
- )
- return self._deployment_def
- @property
- def init_args(self) -> Optional[Union[Tuple[Any], bytes]]:
- """The init_args for a Python class.
- This property is only meaningful if deployment_def is a Python class.
- Otherwise, it is None.
- """
- if self._init_args is None:
- if self.needs_pickle:
- self._init_args = cloudpickle.loads(self.serialized_init_args)
- else:
- self._init_args = self.serialized_init_args
- return self._init_args
- @property
- def init_kwargs(self) -> Optional[Tuple[Any]]:
- """The init_kwargs for a Python class.
- This property is only meaningful if deployment_def is a Python class.
- Otherwise, it is None.
- """
- if self._init_kwargs is None:
- self._init_kwargs = cloudpickle.loads(self.serialized_init_kwargs)
- return self._init_kwargs
- @classmethod
- def from_proto(cls, proto: ReplicaConfigProto, needs_pickle: bool = True):
- return ReplicaConfig(
- deployment_def_name=proto.deployment_def_name,
- serialized_deployment_def=proto.deployment_def,
- serialized_init_args=(proto.init_args if proto.init_args != b"" else None),
- serialized_init_kwargs=(
- proto.init_kwargs if proto.init_kwargs != b"" else None
- ),
- ray_actor_options=json.loads(proto.ray_actor_options),
- placement_group_bundles=(
- json.loads(proto.placement_group_bundles)
- if proto.placement_group_bundles
- else None
- ),
- placement_group_strategy=(
- proto.placement_group_strategy
- if proto.placement_group_strategy != ""
- else None
- ),
- placement_group_bundle_label_selector=(
- json.loads(proto.placement_group_bundle_label_selector)
- if proto.placement_group_bundle_label_selector
- else None
- ),
- placement_group_fallback_strategy=(
- json.loads(proto.placement_group_fallback_strategy)
- if proto.placement_group_fallback_strategy
- else None
- ),
- max_replicas_per_node=(
- proto.max_replicas_per_node if proto.max_replicas_per_node else None
- ),
- needs_pickle=needs_pickle,
- )
- @classmethod
- def from_proto_bytes(cls, proto_bytes: bytes, needs_pickle: bool = True):
- proto = ReplicaConfigProto.FromString(proto_bytes)
- return cls.from_proto(proto, needs_pickle)
- def to_proto(self):
- placement_group_bundles = (
- json.dumps(self.placement_group_bundles)
- if self.placement_group_bundles is not None
- else ""
- )
- bundle_label_selector = (
- json.dumps(self.placement_group_bundle_label_selector)
- if self.placement_group_bundle_label_selector is not None
- else ""
- )
- fallback_strategy = (
- json.dumps(self.placement_group_fallback_strategy)
- if self.placement_group_fallback_strategy is not None
- else ""
- )
- max_replicas_per_node = (
- self.max_replicas_per_node if self.max_replicas_per_node is not None else 0
- )
- return ReplicaConfigProto(
- deployment_def_name=self.deployment_def_name,
- deployment_def=self.serialized_deployment_def,
- init_args=self.serialized_init_args,
- init_kwargs=self.serialized_init_kwargs,
- ray_actor_options=json.dumps(self.ray_actor_options),
- placement_group_bundles=placement_group_bundles,
- placement_group_strategy=self.placement_group_strategy,
- placement_group_bundle_label_selector=bundle_label_selector,
- placement_group_fallback_strategy=fallback_strategy,
- max_replicas_per_node=max_replicas_per_node,
- )
- def to_proto_bytes(self):
- return self.to_proto().SerializeToString()
- def to_dict(self):
- # only use for logging purposes
- return {
- "deployment_def_name": self.deployment_def_name,
- "ray_actor_options": self.ray_actor_options,
- "placement_group_bundles": self.placement_group_bundles,
- "placement_group_strategy": self.placement_group_strategy,
- "placement_group_bundle_label_selector": self.placement_group_bundle_label_selector,
- "placement_group_fallback_strategy": self.placement_group_fallback_strategy,
- "max_replicas_per_node": self.max_replicas_per_node,
- }
- def prepare_imperative_http_options(
- proxy_location: Union[None, str, ProxyLocation],
- http_options: Union[None, dict, HTTPOptions],
- ) -> HTTPOptions:
- """Prepare `HTTPOptions` with a resolved `location` based on `proxy_location` and `http_options`.
- Precedence:
- - If `proxy_location` is provided, it overrides any `location` in `http_options`.
- - Else if `http_options` specifies a `location` explicitly (HTTPOptions(...) or dict with 'location'), keep it.
- - Else (no `proxy_location` and no explicit `location`) set `location` to `DeploymentMode.EveryNode`.
- A bare `HTTPOptions()` counts as an explicit default (`HeadOnly`).
- Args:
- proxy_location: Optional ProxyLocation (or its string representation).
- http_options: Optional HTTPOptions instance or dict. If None, a new HTTPOptions() is created.
- Returns:
- HTTPOptions: New instance with resolved location.
- Note:
- 1. Default ProxyLocation (when unspecified) resolves to DeploymentMode.EveryNode.
- 2. Default HTTPOptions() location is DeploymentMode.HeadOnly.
- 3. `HTTPOptions` is used in `imperative` mode (Python API) cluster set-up.
- `Declarative` mode (CLI / REST) uses `HTTPOptionsSchema`.
- Raises:
- ValueError: If http_options is not None, dict, or HTTPOptions.
- """
- if http_options is None:
- location_set_explicitly = False
- http_options = HTTPOptions()
- elif isinstance(http_options, dict):
- location_set_explicitly = "location" in http_options
- http_options = HTTPOptions(**http_options)
- elif isinstance(http_options, HTTPOptions):
- # empty `HTTPOptions()` is considered as user specified the default location value `HeadOnly` explicitly
- location_set_explicitly = True
- http_options = HTTPOptions(**http_options.dict(exclude_unset=True))
- else:
- raise ValueError(
- f"Unexpected type for http_options: `{type(http_options).__name__}`"
- )
- if proxy_location is None:
- if not location_set_explicitly:
- http_options.location = DeploymentMode.EveryNode
- else:
- http_options.location = ProxyLocation._to_deployment_mode(proxy_location)
- return http_options
|