import inspect import json from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from google.protobuf.descriptor import FieldDescriptor from google.protobuf.message import Message from ray import cloudpickle from ray._common import ray_option_utils from ray._common.pydantic_compat import ( BaseModel, Field, NonNegativeFloat, NonNegativeInt, PositiveFloat, PositiveInt, validator, ) from ray._common.serialization import pickle_dumps from ray._common.utils import resources_from_ray_options from ray.serve._private.constants import ( DEFAULT_CONSTRUCTOR_RETRY_COUNT, DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_S, DEFAULT_GRACEFUL_SHUTDOWN_WAIT_LOOP_S, DEFAULT_HEALTH_CHECK_PERIOD_S, DEFAULT_HEALTH_CHECK_TIMEOUT_S, DEFAULT_MAX_ONGOING_REQUESTS, MAX_REPLICAS_PER_NODE_MAX_VALUE, ) from ray.serve._private.utils import DEFAULT, DeploymentOptionUpdateType from ray.serve.config import ( AggregationFunction, AutoscalingConfig, DeploymentMode, HTTPOptions, ProxyLocation, RequestRouterConfig, ) from ray.serve.generated.serve_pb2 import ( AutoscalingConfig as AutoscalingConfigProto, DeploymentConfig as DeploymentConfigProto, DeploymentLanguage, EncodingType as EncodingTypeProto, LoggingConfig as LoggingConfigProto, ReplicaConfig as ReplicaConfigProto, RequestRouterConfig as RequestRouterConfigProto, ) from ray.util.placement_group import validate_placement_group def _needs_pickle(deployment_language: DeploymentLanguage, is_cross_language: bool): """From Serve client API's perspective, decide whether pickling is needed.""" if deployment_language == DeploymentLanguage.PYTHON and not is_cross_language: # Python client deploying Python replicas. return True elif deployment_language == DeploymentLanguage.JAVA and is_cross_language: # Python client deploying Java replicas, # using xlang serialization via cloudpickle. return True else: return False def _proto_to_dict(proto: Message) -> Dict: """Recursively convert a protobuf into a Python dictionary. This is an alternative to protobuf's `MessageToDict`. Unlike `MessageToDict`, this function doesn't add an extra base64 encoding to bytes when constructing a json response. """ data = {} # Fill data with non-empty fields. for field, value in proto.ListFields(): # Handle repeated fields if field.label == FieldDescriptor.LABEL_REPEATED: # if we dont do this block the repeated field will be a list of # `google.protobuf.internal.containers.RepeatedScalarFieldContainer # Explicitly convert to list if field.type == FieldDescriptor.TYPE_MESSAGE: data[field.name] = [ _proto_to_dict(v) for v in value ] # Convert each item else: data[field.name] = list(value) # Convert to list directly # Recursively call if the field is another protobuf. elif field.type == FieldDescriptor.TYPE_MESSAGE: data[field.name] = _proto_to_dict(value) else: data[field.name] = value # Fill data default values. for field in proto.DESCRIPTOR.fields: if ( field.name not in data # skip the fields that are already set and field.type != FieldDescriptor.TYPE_MESSAGE # skip nested messages and not field.containing_oneof # skip optional fields ): data[field.name] = field.default_value return data class DeploymentConfig(BaseModel): """Internal datastructure wrapping config options for a deployment. Args: num_replicas: The number of processes to start up that handles requests to this deployment. Defaults to 1. max_ongoing_requests: The maximum number of queries that is sent to a replica of this deployment without receiving a response. Defaults to 5. max_queued_requests: Maximum number of requests to this deployment that will be queued at each *caller* (proxy or DeploymentHandle). Once this limit is reached, subsequent requests will raise a BackPressureError (for handles) or return an HTTP 503 status code (for HTTP requests). Defaults to -1 (no limit). user_config: Arguments to pass to the reconfigure method of the deployment. The reconfigure method is called if user_config is not None. Must be JSON-serializable. graceful_shutdown_wait_loop_s: Duration that deployment replicas wait until there is no more work to be done before shutting down. graceful_shutdown_timeout_s: Controller waits for this duration to forcefully kill the replica for shutdown. health_check_period_s: Frequency at which the controller health checks replicas. health_check_timeout_s: Timeout that the controller waits for a response from the replica's health check before marking it unhealthy. autoscaling_config: Autoscaling configuration. logging_config: Configuration for deployment logs. user_configured_option_names: The names of options manually configured by the user. request_router_config: Configuration for deployment request router. max_constructor_retry_count: Maximum number of times to retry the deployment constructor. Defaults to 20. """ num_replicas: Optional[NonNegativeInt] = Field( default=1, update_type=DeploymentOptionUpdateType.LightWeight ) max_ongoing_requests: PositiveInt = Field( default=DEFAULT_MAX_ONGOING_REQUESTS, update_type=DeploymentOptionUpdateType.NeedsActorReconfigure, ) max_queued_requests: int = Field( default=-1, update_type=DeploymentOptionUpdateType.LightWeight, ) user_config: Any = Field( default=None, update_type=DeploymentOptionUpdateType.NeedsActorReconfigure ) graceful_shutdown_timeout_s: NonNegativeFloat = Field( default=DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_S, update_type=DeploymentOptionUpdateType.NeedsReconfigure, ) graceful_shutdown_wait_loop_s: NonNegativeFloat = Field( default=DEFAULT_GRACEFUL_SHUTDOWN_WAIT_LOOP_S, update_type=DeploymentOptionUpdateType.NeedsActorReconfigure, ) health_check_period_s: PositiveFloat = Field( default=DEFAULT_HEALTH_CHECK_PERIOD_S, update_type=DeploymentOptionUpdateType.NeedsReconfigure, ) health_check_timeout_s: PositiveFloat = Field( default=DEFAULT_HEALTH_CHECK_TIMEOUT_S, update_type=DeploymentOptionUpdateType.NeedsReconfigure, ) autoscaling_config: Optional[AutoscalingConfig] = Field( default=None, update_type=DeploymentOptionUpdateType.NeedsActorReconfigure ) request_router_config: RequestRouterConfig = Field( default_factory=RequestRouterConfig, update_type=DeploymentOptionUpdateType.NeedsActorReconfigure, ) # This flag is used to let replica know they are deployed from # a different language. is_cross_language: bool = False # This flag is used to let controller know which language does # the deployment use. deployment_language: Any = DeploymentLanguage.PYTHON version: Optional[str] = Field( default=None, update_type=DeploymentOptionUpdateType.HeavyWeight, ) logging_config: Optional[dict] = Field( default=None, update_type=DeploymentOptionUpdateType.NeedsActorReconfigure, ) max_constructor_retry_count: PositiveInt = Field( default=DEFAULT_CONSTRUCTOR_RETRY_COUNT, update_type=DeploymentOptionUpdateType.NeedsReconfigure, ) # Contains the names of deployment options manually set by the user user_configured_option_names: Set[str] = set() class Config: validate_assignment = True arbitrary_types_allowed = True @validator("user_config", always=True) def user_config_json_serializable(cls, v): if isinstance(v, bytes): return v if v is not None: try: json.dumps(v) except TypeError as e: raise ValueError(f"user_config is not JSON-serializable: {str(e)}.") return v @validator("logging_config", always=True) def logging_config_valid(cls, v): if v is None: return v if not isinstance(v, dict): raise TypeError( f"Got invalid type '{type(v)}' for logging_config. " "Expected a dictionary." ) # Handle default value from ray.serve.schema import LoggingConfig v = LoggingConfig(**v).dict() return v @validator("max_queued_requests", always=True) def validate_max_queued_requests(cls, v): if not isinstance(v, int): raise TypeError("max_queued_requests must be an integer.") if v < 1 and v != -1: raise ValueError( "max_queued_requests must be -1 (no limit) or a positive integer." ) return v def needs_pickle(self): return _needs_pickle(self.deployment_language, self.is_cross_language) def to_proto(self): data = self.dict() if data.get("user_config") is not None: if self.needs_pickle(): data["user_config"] = cloudpickle.dumps(data["user_config"]) if data.get("autoscaling_config"): # By setting the serialized policy def, on the protobuf level, AutoscalingConfig constructor will not # try to import the policy from the string import path when the protobuf is deserialized on the controller side data["autoscaling_config"]["policy"][ "_serialized_policy_def" ] = self.autoscaling_config.policy._serialized_policy_def data["autoscaling_config"] = AutoscalingConfigProto( **data["autoscaling_config"] ) if data.get("request_router_config"): router_kwargs = data["request_router_config"].get("request_router_kwargs") if router_kwargs is not None: if not router_kwargs: data["request_router_config"]["request_router_kwargs"] = b"" elif self.needs_pickle(): # Protobuf requires bytes, so we need to pickle data["request_router_config"][ "request_router_kwargs" ] = cloudpickle.dumps(router_kwargs) else: raise ValueError( "Non-empty request_router_kwargs not supported" f"for cross-language deployments. Got: {router_kwargs}" ) # By setting the serialized request router cls, on the protobuf level, RequestRouterConfig constructor will not # try to import the request router cls from the string import path when the protobuf is deserialized on the controller side data["request_router_config"][ "_serialized_request_router_cls" ] = self.request_router_config._serialized_request_router_cls data["request_router_config"] = RequestRouterConfigProto( **data["request_router_config"] ) if data.get("logging_config"): if "encoding" in data["logging_config"]: data["logging_config"]["encoding"] = EncodingTypeProto.Value( data["logging_config"]["encoding"] ) data["logging_config"] = LoggingConfigProto(**data["logging_config"]) data["user_configured_option_names"] = list( data["user_configured_option_names"] ) return DeploymentConfigProto(**data) def to_proto_bytes(self): return self.to_proto().SerializeToString() def to_dict(self): # only use for logging purposes return self.dict() @classmethod def from_proto(cls, proto: DeploymentConfigProto): data = _proto_to_dict(proto) deployment_language = ( data["deployment_language"] if "deployment_language" in data else DeploymentLanguage.PYTHON ) is_cross_language = ( data["is_cross_language"] if "is_cross_language" in data else False ) needs_pickle = _needs_pickle(deployment_language, is_cross_language) if "user_config" in data: if data["user_config"] != b"": if needs_pickle: data["user_config"] = cloudpickle.loads(proto.user_config) else: data["user_config"] = proto.user_config else: data["user_config"] = None if "request_router_config" in data: if "request_router_kwargs" in data["request_router_config"]: request_router_kwargs = data["request_router_config"][ "request_router_kwargs" ] if request_router_kwargs != b"": if needs_pickle: data["request_router_config"][ "request_router_kwargs" ] = cloudpickle.loads( proto.request_router_config.request_router_kwargs ) else: data["request_router_config"][ "request_router_kwargs" ] = proto.request_router_config.request_router_kwargs else: data["request_router_config"]["request_router_kwargs"] = {} data["request_router_config"] = RequestRouterConfig( **data["request_router_config"] ) if "autoscaling_config" in data: if not data["autoscaling_config"].get("upscale_smoothing_factor"): data["autoscaling_config"]["upscale_smoothing_factor"] = None if not data["autoscaling_config"].get("downscale_smoothing_factor"): data["autoscaling_config"]["downscale_smoothing_factor"] = None if not data["autoscaling_config"].get("upscaling_factor"): data["autoscaling_config"]["upscaling_factor"] = None if not data["autoscaling_config"].get("downscaling_factor"): data["autoscaling_config"]["downscaling_factor"] = None if not data["autoscaling_config"].get("target_ongoing_requests"): data["autoscaling_config"]["target_ongoing_requests"] = None if not data["autoscaling_config"].get("aggregation_function"): data["autoscaling_config"][ "aggregation_function" ] = AggregationFunction.MEAN data["autoscaling_config"] = AutoscalingConfig(**data["autoscaling_config"]) if "version" in data: if data["version"] == "": data["version"] = None if "user_configured_option_names" in data: data["user_configured_option_names"] = set( data["user_configured_option_names"] ) if "logging_config" in data: if "encoding" in data["logging_config"]: data["logging_config"]["encoding"] = EncodingTypeProto.Name( data["logging_config"]["encoding"] ) return cls(**data) @classmethod def from_proto_bytes(cls, proto_bytes: bytes): proto = DeploymentConfigProto.FromString(proto_bytes) return cls.from_proto(proto) @classmethod def from_default(cls, **kwargs): """Creates a default DeploymentConfig and overrides it with kwargs. Ignores any kwargs set to DEFAULT.VALUE. Raises: TypeError: when a keyword that's not an argument to the class is passed in. """ config = cls() valid_config_options = set(config.dict().keys()) # Friendly error if a non-DeploymentConfig kwarg was passed in for key, val in kwargs.items(): if key not in valid_config_options: raise TypeError( f'Got invalid Deployment config option "{key}" ' f"(with value {val}) as keyword argument. All Deployment " "config options must come from this list: " f"{list(valid_config_options)}." ) kwargs = {key: val for key, val in kwargs.items() if val != DEFAULT.VALUE} for key, val in kwargs.items(): config.__setattr__(key, val) return config def handle_num_replicas_auto( max_ongoing_requests: Union[int, DEFAULT], autoscaling_config: Optional[Union[Dict, AutoscalingConfig, DEFAULT]], ): """Return modified `max_ongoing_requests` and `autoscaling_config` for when num_replicas="auto". If `autoscaling_config` is unspecified, returns the modified value AutoscalingConfig.default(). If it is specified, the specified fields in `autoscaling_config` override that of AutoscalingConfig.default(). """ if autoscaling_config in [DEFAULT.VALUE, None]: # If autoscaling config wasn't specified, use default # configuration autoscaling_config = AutoscalingConfig.default() else: # If autoscaling config was specified, values specified in # autoscaling config overrides the default configuration default_config = AutoscalingConfig.default().dict(exclude_unset=True) autoscaling_config = ( autoscaling_config if isinstance(autoscaling_config, dict) else autoscaling_config.dict(exclude_unset=True) ) default_config.update(autoscaling_config) autoscaling_config = AutoscalingConfig(**default_config) return max_ongoing_requests, autoscaling_config class ReplicaConfig: """Internal datastructure wrapping config options for a deployment's replicas. Provides five main properties (see property docstrings for more info): deployment_def: the code, or a reference to the code, that this replica should run. init_args: the deployment_def's init_args. init_kwargs: the deployment_def's init_kwargs. ray_actor_options: the Ray actor options to pass into the replica's actor. resource_dict: contains info on this replica's actor's resource needs. Offers a serialized equivalent (e.g. serialized_deployment_def) for deployment_def, init_args, and init_kwargs. Deserializes these properties when they're first accessed, if they were not passed in directly through create(). Use the classmethod create() to make a ReplicaConfig with the deserialized properties. Note: overwriting or setting any property after the ReplicaConfig has been constructed is currently undefined behavior. The config's fields should not be modified externally after it is created. """ def __init__( self, deployment_def_name: str, serialized_deployment_def: bytes, serialized_init_args: bytes, serialized_init_kwargs: bytes, ray_actor_options: Dict, placement_group_bundles: Optional[List[Dict[str, float]]] = None, placement_group_strategy: Optional[str] = None, placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None, placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None, max_replicas_per_node: Optional[int] = None, needs_pickle: bool = True, ): """Construct a ReplicaConfig with serialized properties. All parameters are required. See classmethod create() for defaults. """ self.deployment_def_name = deployment_def_name # Store serialized versions of code properties. self.serialized_deployment_def = serialized_deployment_def self.serialized_init_args = serialized_init_args self.serialized_init_kwargs = serialized_init_kwargs # Deserialize properties when first accessed. See @property methods. self._deployment_def = None self._init_args = None self._init_kwargs = None # Configure ray_actor_options. These are the Ray options ultimately # passed into the replica's actor when it's created. self.ray_actor_options = ray_actor_options self.placement_group_bundles = placement_group_bundles self.placement_group_strategy = placement_group_strategy self.placement_group_bundle_label_selector = ( placement_group_bundle_label_selector ) self.placement_group_fallback_strategy = placement_group_fallback_strategy self.max_replicas_per_node = max_replicas_per_node self._normalize_bundle_label_selector() self._validate() # Create resource_dict. This contains info about the replica's resource # needs. It does NOT set the replica's resource usage. That's done by # the ray_actor_options. self.resource_dict = resources_from_ray_options(self.ray_actor_options) self.needs_pickle = needs_pickle def _normalize_bundle_label_selector(self): """If a single selector is provided for multiple bundles, it is broadcasted uniformly to all bundles. """ if ( self.placement_group_bundles and self.placement_group_bundle_label_selector and len(self.placement_group_bundle_label_selector) == 1 and len(self.placement_group_bundles) > 1 ): single_selector = self.placement_group_bundle_label_selector[0] self.placement_group_bundle_label_selector = [ single_selector.copy() for _ in range(len(self.placement_group_bundles)) ] def _validate(self): self._validate_ray_actor_options() self._validate_placement_group_options() self._validate_max_replicas_per_node() if ( self.max_replicas_per_node is not None and self.placement_group_bundles is not None ): raise ValueError( "Setting max_replicas_per_node is not allowed when " "placement_group_bundles is provided." ) def update( self, ray_actor_options: dict, placement_group_bundles: Optional[List[Dict[str, float]]] = None, placement_group_strategy: Optional[str] = None, placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None, placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None, max_replicas_per_node: Optional[int] = None, ): self.ray_actor_options = ray_actor_options self.placement_group_bundles = placement_group_bundles self.placement_group_strategy = placement_group_strategy self.placement_group_bundle_label_selector = ( placement_group_bundle_label_selector ) self.placement_group_fallback_strategy = placement_group_fallback_strategy self.max_replicas_per_node = max_replicas_per_node self._normalize_bundle_label_selector() self._validate() self.resource_dict = resources_from_ray_options(self.ray_actor_options) @classmethod def create( cls, deployment_def: Union[Callable, str], init_args: Optional[Tuple[Any]] = None, init_kwargs: Optional[Dict[Any, Any]] = None, ray_actor_options: Optional[Dict] = None, placement_group_bundles: Optional[List[Dict[str, float]]] = None, placement_group_strategy: Optional[str] = None, placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None, placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None, max_replicas_per_node: Optional[int] = None, deployment_def_name: Optional[str] = None, ): """Create a ReplicaConfig from deserialized parameters.""" if not callable(deployment_def) and not isinstance(deployment_def, str): raise TypeError("@serve.deployment must be called on a class or function.") if not (init_args is None or isinstance(init_args, (tuple, list))): raise TypeError("init_args must be a tuple.") if not (init_kwargs is None or isinstance(init_kwargs, dict)): raise TypeError("init_kwargs must be a dict.") if inspect.isfunction(deployment_def): if init_args: raise ValueError("init_args not supported for function deployments.") elif init_kwargs: raise ValueError("init_kwargs not supported for function deployments.") if not isinstance(deployment_def, (Callable, str)): raise TypeError( f'Got invalid type "{type(deployment_def)}" for ' "deployment_def. Expected deployment_def to be a " "class, function, or string." ) # Set defaults if init_args is None: init_args = () if init_kwargs is None: init_kwargs = {} if ray_actor_options is None: ray_actor_options = {} if deployment_def_name is None: if isinstance(deployment_def, str): deployment_def_name = deployment_def else: deployment_def_name = deployment_def.__name__ config = cls( deployment_def_name=deployment_def_name, serialized_deployment_def=pickle_dumps( deployment_def, f"Could not serialize the deployment {repr(deployment_def)}", ), serialized_init_args=pickle_dumps( init_args, "Could not serialize the deployment init args" ), serialized_init_kwargs=pickle_dumps( init_kwargs, "Could not serialize the deployment init kwargs" ), ray_actor_options=ray_actor_options, placement_group_bundles=placement_group_bundles, placement_group_strategy=placement_group_strategy, placement_group_bundle_label_selector=placement_group_bundle_label_selector, placement_group_fallback_strategy=placement_group_fallback_strategy, max_replicas_per_node=max_replicas_per_node, ) config._deployment_def = deployment_def config._init_args = init_args config._init_kwargs = init_kwargs return config def _validate_ray_actor_options(self): if not isinstance(self.ray_actor_options, dict): raise TypeError( f'Got invalid type "{type(self.ray_actor_options)}" for ' "ray_actor_options. Expected a dictionary." ) # Please keep this in sync with the docstring for the ray_actor_options # kwarg in api.py. allowed_ray_actor_options = { # Resource options "accelerator_type", "memory", "num_cpus", "num_gpus", "resources", # Other options "runtime_env", "label_selector", "fallback_strategy", } for option in self.ray_actor_options: if option not in allowed_ray_actor_options: raise ValueError( f"Specifying '{option}' in ray_actor_options is not allowed. " f"Allowed options: {allowed_ray_actor_options}" ) ray_option_utils.validate_actor_options(self.ray_actor_options, in_options=True) # Set Serve replica defaults if self.ray_actor_options.get("num_cpus") is None: self.ray_actor_options["num_cpus"] = 1 def _validate_max_replicas_per_node(self) -> None: if self.max_replicas_per_node is None: return if not isinstance(self.max_replicas_per_node, int): raise TypeError( f"Get invalid type '{type(self.max_replicas_per_node)}' for " "max_replicas_per_node. Expected None or an integer " f"in the range of [1, {MAX_REPLICAS_PER_NODE_MAX_VALUE}]." ) if ( self.max_replicas_per_node < 1 or self.max_replicas_per_node > MAX_REPLICAS_PER_NODE_MAX_VALUE ): raise ValueError( f"Invalid max_replicas_per_node {self.max_replicas_per_node}. " "Valid values are None or an integer " f"in the range of [1, {MAX_REPLICAS_PER_NODE_MAX_VALUE}]." ) def _validate_placement_group_options(self) -> None: if self.placement_group_strategy is not None: if self.placement_group_bundles is None: raise ValueError( "If `placement_group_strategy` is provided, " "`placement_group_bundles` must also be provided." ) if self.placement_group_fallback_strategy is not None: if self.placement_group_bundles is None: raise ValueError( "If `placement_group_fallback_strategy` is provided, " "`placement_group_bundles` must also be provided." ) if not isinstance(self.placement_group_fallback_strategy, list): raise TypeError( "placement_group_fallback_strategy must be a list of dictionaries. " f"Got: {type(self.placement_group_fallback_strategy)}." ) for i, strategy in enumerate(self.placement_group_fallback_strategy): if not isinstance(strategy, dict): raise TypeError( f"placement_group_fallback_strategy entry at index {i} must be a dictionary. " f"Got: {type(strategy)}." ) if self.placement_group_bundle_label_selector is not None: if self.placement_group_bundles is None: raise ValueError( "If `placement_group_bundle_label_selector` is provided, " "`placement_group_bundles` must also be provided." ) if self.placement_group_bundles is not None: validate_placement_group( bundles=self.placement_group_bundles, strategy=self.placement_group_strategy or "PACK", lifetime="detached", bundle_label_selector=self.placement_group_bundle_label_selector, ) resource_error_prefix = ( "When using `placement_group_bundles`, the replica actor " "will be placed in the first bundle, so the resource " "requirements for the actor must be a subset of the first " "bundle." ) first_bundle = self.placement_group_bundles[0] # Validate that the replica actor fits in the first bundle. bundle_cpu = first_bundle.get("CPU", 0) replica_actor_num_cpus = self.ray_actor_options.get("num_cpus", 0) if bundle_cpu < replica_actor_num_cpus: raise ValueError( f"{resource_error_prefix} `num_cpus` for the actor is " f"{replica_actor_num_cpus}, but the bundle only has " f"{bundle_cpu} `CPU` specified." ) bundle_gpu = first_bundle.get("GPU", 0) replica_actor_num_gpus = self.ray_actor_options.get("num_gpus", 0) if bundle_gpu < replica_actor_num_gpus: raise ValueError( f"{resource_error_prefix} `num_gpus` for the actor is " f"{replica_actor_num_gpus}, but the bundle only has " f"{bundle_gpu} `GPU` specified." ) replica_actor_resources = self.ray_actor_options.get("resources", {}) for actor_resource, actor_value in replica_actor_resources.items(): bundle_value = first_bundle.get(actor_resource, 0) if bundle_value < actor_value: raise ValueError( f"{resource_error_prefix} `{actor_resource}` requirement " f"for the actor is {actor_value}, but the bundle only " f"has {bundle_value} `{actor_resource}` specified." ) @property def deployment_def(self) -> Union[Callable, str]: """The code, or a reference to the code, that this replica runs. For Python replicas, this can be one of the following: - Function (Callable) - Class (Callable) - Import path (str) For Java replicas, this can be one of the following: - Class path (str) """ if self._deployment_def is None: if self.needs_pickle: self._deployment_def = cloudpickle.loads(self.serialized_deployment_def) else: self._deployment_def = self.serialized_deployment_def.decode( encoding="utf-8" ) return self._deployment_def @property def init_args(self) -> Optional[Union[Tuple[Any], bytes]]: """The init_args for a Python class. This property is only meaningful if deployment_def is a Python class. Otherwise, it is None. """ if self._init_args is None: if self.needs_pickle: self._init_args = cloudpickle.loads(self.serialized_init_args) else: self._init_args = self.serialized_init_args return self._init_args @property def init_kwargs(self) -> Optional[Tuple[Any]]: """The init_kwargs for a Python class. This property is only meaningful if deployment_def is a Python class. Otherwise, it is None. """ if self._init_kwargs is None: self._init_kwargs = cloudpickle.loads(self.serialized_init_kwargs) return self._init_kwargs @classmethod def from_proto(cls, proto: ReplicaConfigProto, needs_pickle: bool = True): return ReplicaConfig( deployment_def_name=proto.deployment_def_name, serialized_deployment_def=proto.deployment_def, serialized_init_args=(proto.init_args if proto.init_args != b"" else None), serialized_init_kwargs=( proto.init_kwargs if proto.init_kwargs != b"" else None ), ray_actor_options=json.loads(proto.ray_actor_options), placement_group_bundles=( json.loads(proto.placement_group_bundles) if proto.placement_group_bundles else None ), placement_group_strategy=( proto.placement_group_strategy if proto.placement_group_strategy != "" else None ), placement_group_bundle_label_selector=( json.loads(proto.placement_group_bundle_label_selector) if proto.placement_group_bundle_label_selector else None ), placement_group_fallback_strategy=( json.loads(proto.placement_group_fallback_strategy) if proto.placement_group_fallback_strategy else None ), max_replicas_per_node=( proto.max_replicas_per_node if proto.max_replicas_per_node else None ), needs_pickle=needs_pickle, ) @classmethod def from_proto_bytes(cls, proto_bytes: bytes, needs_pickle: bool = True): proto = ReplicaConfigProto.FromString(proto_bytes) return cls.from_proto(proto, needs_pickle) def to_proto(self): placement_group_bundles = ( json.dumps(self.placement_group_bundles) if self.placement_group_bundles is not None else "" ) bundle_label_selector = ( json.dumps(self.placement_group_bundle_label_selector) if self.placement_group_bundle_label_selector is not None else "" ) fallback_strategy = ( json.dumps(self.placement_group_fallback_strategy) if self.placement_group_fallback_strategy is not None else "" ) max_replicas_per_node = ( self.max_replicas_per_node if self.max_replicas_per_node is not None else 0 ) return ReplicaConfigProto( deployment_def_name=self.deployment_def_name, deployment_def=self.serialized_deployment_def, init_args=self.serialized_init_args, init_kwargs=self.serialized_init_kwargs, ray_actor_options=json.dumps(self.ray_actor_options), placement_group_bundles=placement_group_bundles, placement_group_strategy=self.placement_group_strategy, placement_group_bundle_label_selector=bundle_label_selector, placement_group_fallback_strategy=fallback_strategy, max_replicas_per_node=max_replicas_per_node, ) def to_proto_bytes(self): return self.to_proto().SerializeToString() def to_dict(self): # only use for logging purposes return { "deployment_def_name": self.deployment_def_name, "ray_actor_options": self.ray_actor_options, "placement_group_bundles": self.placement_group_bundles, "placement_group_strategy": self.placement_group_strategy, "placement_group_bundle_label_selector": self.placement_group_bundle_label_selector, "placement_group_fallback_strategy": self.placement_group_fallback_strategy, "max_replicas_per_node": self.max_replicas_per_node, } def prepare_imperative_http_options( proxy_location: Union[None, str, ProxyLocation], http_options: Union[None, dict, HTTPOptions], ) -> HTTPOptions: """Prepare `HTTPOptions` with a resolved `location` based on `proxy_location` and `http_options`. Precedence: - If `proxy_location` is provided, it overrides any `location` in `http_options`. - Else if `http_options` specifies a `location` explicitly (HTTPOptions(...) or dict with 'location'), keep it. - Else (no `proxy_location` and no explicit `location`) set `location` to `DeploymentMode.EveryNode`. A bare `HTTPOptions()` counts as an explicit default (`HeadOnly`). Args: proxy_location: Optional ProxyLocation (or its string representation). http_options: Optional HTTPOptions instance or dict. If None, a new HTTPOptions() is created. Returns: HTTPOptions: New instance with resolved location. Note: 1. Default ProxyLocation (when unspecified) resolves to DeploymentMode.EveryNode. 2. Default HTTPOptions() location is DeploymentMode.HeadOnly. 3. `HTTPOptions` is used in `imperative` mode (Python API) cluster set-up. `Declarative` mode (CLI / REST) uses `HTTPOptionsSchema`. Raises: ValueError: If http_options is not None, dict, or HTTPOptions. """ if http_options is None: location_set_explicitly = False http_options = HTTPOptions() elif isinstance(http_options, dict): location_set_explicitly = "location" in http_options http_options = HTTPOptions(**http_options) elif isinstance(http_options, HTTPOptions): # empty `HTTPOptions()` is considered as user specified the default location value `HeadOnly` explicitly location_set_explicitly = True http_options = HTTPOptions(**http_options.dict(exclude_unset=True)) else: raise ValueError( f"Unexpected type for http_options: `{type(http_options).__name__}`" ) if proxy_location is None: if not location_set_explicitly: http_options.location = DeploymentMode.EveryNode else: http_options.location = ProxyLocation._to_deployment_mode(proxy_location) return http_options