| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- import inspect
- import logging
- from types import FunctionType
- from typing import Any, Dict, Union
- import ray
- from ray._common.pydantic_compat import is_subclass_of_base_model
- from ray._common.usage import usage_lib
- from ray.actor import ActorHandle
- from ray.serve._private.client import ServeControllerClient
- from ray.serve._private.constants import (
- HTTP_PROXY_TIMEOUT,
- SERVE_LOGGER_NAME,
- SERVE_NAMESPACE,
- )
- from ray.serve._private.default_impl import get_controller_impl
- from ray.serve.config import HTTPOptions, gRPCOptions
- from ray.serve.context import _get_global_client, _set_global_client
- from ray.serve.deployment import Application
- from ray.serve.exceptions import RayServeException
- from ray.serve.schema import LoggingConfig
- logger = logging.getLogger(SERVE_LOGGER_NAME)
- def _check_http_options(
- client: ServeControllerClient, http_options: Union[dict, HTTPOptions]
- ) -> None:
- if http_options:
- client_http_options = client.http_config
- new_http_options = (
- http_options
- if isinstance(http_options, HTTPOptions)
- else HTTPOptions.parse_obj(http_options)
- )
- different_fields = []
- all_http_option_fields = new_http_options.__dict__
- for field in all_http_option_fields:
- if getattr(new_http_options, field) != getattr(client_http_options, field):
- different_fields.append(field)
- if len(different_fields):
- logger.warning(
- "The new client HTTP config differs from the existing one "
- f"in the following fields: {different_fields}. "
- "The new HTTP config is ignored."
- )
- def _start_controller(
- http_options: Union[None, dict, HTTPOptions] = None,
- grpc_options: Union[None, dict, gRPCOptions] = None,
- global_logging_config: Union[None, dict, LoggingConfig] = None,
- **kwargs,
- ) -> ActorHandle:
- """Start Ray Serve controller.
- The function makes sure controller is ready to start deploying apps
- after it returns.
- Parameters are same as ray.serve._private.api.serve_start().
- Returns: controller actor handle.
- """
- # Initialize ray if needed.
- ray._private.worker.global_worker._filter_logs_by_job = False
- if not ray.is_initialized():
- ray.init(namespace=SERVE_NAMESPACE)
- # Legacy http proxy actor check
- http_deprecated_args = ["http_host", "http_port", "http_middlewares"]
- for key in http_deprecated_args:
- if key in kwargs:
- raise ValueError(
- f"{key} is deprecated, please use serve.start(http_options="
- f'{{"{key}": {kwargs[key]}}}) instead.'
- )
- if isinstance(http_options, dict):
- http_options = HTTPOptions.parse_obj(http_options)
- if http_options is None:
- http_options = HTTPOptions()
- if isinstance(grpc_options, dict):
- grpc_options = gRPCOptions(**grpc_options)
- if global_logging_config is None:
- global_logging_config = LoggingConfig()
- elif isinstance(global_logging_config, dict):
- global_logging_config = LoggingConfig(**global_logging_config)
- controller_impl = get_controller_impl()
- controller = controller_impl.remote(
- http_options=http_options,
- grpc_options=grpc_options,
- global_logging_config=global_logging_config,
- )
- proxy_handles = ray.get(controller.get_proxies.remote())
- if len(proxy_handles) > 0:
- try:
- ray.get(
- [handle.ready.remote() for handle in proxy_handles.values()],
- timeout=HTTP_PROXY_TIMEOUT,
- )
- except ray.exceptions.GetTimeoutError:
- raise TimeoutError(
- f"HTTP proxies not available after {HTTP_PROXY_TIMEOUT}s."
- )
- return controller
- async def serve_start_async(
- http_options: Union[None, dict, HTTPOptions] = None,
- grpc_options: Union[None, dict, gRPCOptions] = None,
- global_logging_config: Union[None, dict, LoggingConfig] = None,
- **kwargs,
- ) -> ServeControllerClient:
- """Initialize a serve instance asynchronously.
- This function is not thread-safe. The caller should maintain the async lock in order
- to start the serve instance asynchronously.
- This function has the same functionality as ray.serve._private.api.serve_start().
- Parameters & Returns are same as ray.serve._private.api.serve_start().
- """
- usage_lib.record_library_usage("serve")
- try:
- client = _get_global_client(_health_check_controller=True)
- logger.info(
- f'Connecting to existing Serve app in namespace "{SERVE_NAMESPACE}".'
- " New http options will not be applied."
- )
- if http_options:
- _check_http_options(client, http_options)
- return client
- except RayServeException:
- pass
- controller = (
- await ray.remote(_start_controller)
- .options(num_cpus=0)
- .remote(http_options, grpc_options, global_logging_config, **kwargs)
- )
- client = ServeControllerClient(
- controller,
- )
- _set_global_client(client)
- logger.info(f'Started Serve in namespace "{SERVE_NAMESPACE}".')
- return client
- def serve_start(
- http_options: Union[None, dict, HTTPOptions] = None,
- grpc_options: Union[None, dict, gRPCOptions] = None,
- global_logging_config: Union[None, dict, LoggingConfig] = None,
- **kwargs,
- ) -> ServeControllerClient:
- """Initialize a serve instance.
- By default, the instance will be scoped to the lifetime of the returned
- Client object (or when the script exits). This is
- only relevant if connecting to a long-running Ray cluster (e.g., with
- ray.init(address="auto") or ray.init("ray://<remote_addr>")).
- Args:
- http_options (Optional[Dict, serve.HTTPOptions]): Configuration options
- for HTTP proxy. You can pass in a dictionary or HTTPOptions object
- with fields:
- - host(str, None): Host for HTTP servers to listen on. Defaults to
- "127.0.0.1". To expose Serve publicly, you probably want to set
- this to "0.0.0.0".
- - port(int): Port for HTTP server. Defaults to 8000.
- - root_path(str): Root path to mount the serve application
- (for example, "/serve"). All deployment routes will be prefixed
- with this path. Defaults to "".
- - middlewares(list): A list of Starlette middlewares that will be
- applied to the HTTP servers in the cluster. Defaults to [].
- - location(str, serve.config.DeploymentMode): The deployment
- location of HTTP servers:
- - "HeadOnly": start one HTTP server on the head node. Serve
- assumes the head node is the node you executed serve.start
- on. This is the default.
- - "EveryNode": start one HTTP server per node.
- - "NoServer" or None: disable HTTP server.
- - num_cpus (int): The number of CPU cores to reserve for each
- internal Serve HTTP proxy actor. Defaults to 0.
- grpc_options: [Experimental] Configuration options for gRPC proxy.
- You can pass in a gRPCOptions object with fields:
- - port(int): Port for gRPC server. Defaults to 9000.
- - grpc_servicer_functions(list): List of import paths for gRPC
- `add_servicer_to_server` functions to add to Serve's gRPC proxy.
- Default empty list, meaning not to start the gRPC server.
- """
- usage_lib.record_library_usage("serve")
- try:
- client = _get_global_client(_health_check_controller=True)
- logger.info(
- f'Connecting to existing Serve app in namespace "{SERVE_NAMESPACE}".'
- " New http options will not be applied."
- )
- if http_options:
- _check_http_options(client, http_options)
- return client
- except RayServeException:
- pass
- controller = _start_controller(
- http_options, grpc_options, global_logging_config, **kwargs
- )
- client = ServeControllerClient(
- controller,
- )
- _set_global_client(client)
- logger.info(f'Started Serve in namespace "{SERVE_NAMESPACE}".')
- return client
- def call_user_app_builder_with_args_if_necessary(
- builder: Union[Application, FunctionType],
- args: Dict[str, Any],
- ) -> Application:
- """Calls a user-provided function that returns Serve application.
- If an Application object is passed, this is a no-op.
- Else, we validate the signature of the function, convert the args dictionary to
- the user-annotated Pydantic model if provided, and call the function.
- The output of the function is returned (must be an Application).
- """
- if isinstance(builder, Application):
- if len(args) > 0:
- raise ValueError(
- "Arguments can only be passed to an application builder function, "
- "not an already built application."
- )
- return builder
- elif not isinstance(builder, FunctionType):
- raise TypeError(
- "Expected a built Serve application or an application builder function "
- f"but got: {type(builder)}."
- )
- # Check that the builder only takes a single argument.
- # TODO(edoakes): we may want to loosen this to allow optional kwargs in the future.
- signature = inspect.signature(builder)
- if len(signature.parameters) != 1:
- raise TypeError(
- "Application builder functions should take exactly one parameter, "
- "a dictionary containing the passed arguments."
- )
- # If the sole argument to the builder is a pydantic model, convert the args dict to
- # that model. This will perform standard pydantic validation (e.g., raise an
- # exception if required fields are missing).
- param = signature.parameters[list(signature.parameters.keys())[0]]
- if inspect.isclass(param.annotation) and is_subclass_of_base_model(
- param.annotation
- ):
- args = param.annotation.parse_obj(args)
- app = builder(args)
- if not isinstance(app, Application):
- raise TypeError(
- "Application builder functions must return an `Application` returned "
- f"`from `Deployment.bind()`, but got: {type(app)}."
- )
- return app
|