api.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. import inspect
  2. import logging
  3. from types import FunctionType
  4. from typing import Any, Dict, Union
  5. import ray
  6. from ray._common.pydantic_compat import is_subclass_of_base_model
  7. from ray._common.usage import usage_lib
  8. from ray.actor import ActorHandle
  9. from ray.serve._private.client import ServeControllerClient
  10. from ray.serve._private.constants import (
  11. HTTP_PROXY_TIMEOUT,
  12. SERVE_LOGGER_NAME,
  13. SERVE_NAMESPACE,
  14. )
  15. from ray.serve._private.default_impl import get_controller_impl
  16. from ray.serve.config import HTTPOptions, gRPCOptions
  17. from ray.serve.context import _get_global_client, _set_global_client
  18. from ray.serve.deployment import Application
  19. from ray.serve.exceptions import RayServeException
  20. from ray.serve.schema import LoggingConfig
  21. logger = logging.getLogger(SERVE_LOGGER_NAME)
  22. def _check_http_options(
  23. client: ServeControllerClient, http_options: Union[dict, HTTPOptions]
  24. ) -> None:
  25. if http_options:
  26. client_http_options = client.http_config
  27. new_http_options = (
  28. http_options
  29. if isinstance(http_options, HTTPOptions)
  30. else HTTPOptions.parse_obj(http_options)
  31. )
  32. different_fields = []
  33. all_http_option_fields = new_http_options.__dict__
  34. for field in all_http_option_fields:
  35. if getattr(new_http_options, field) != getattr(client_http_options, field):
  36. different_fields.append(field)
  37. if len(different_fields):
  38. logger.warning(
  39. "The new client HTTP config differs from the existing one "
  40. f"in the following fields: {different_fields}. "
  41. "The new HTTP config is ignored."
  42. )
  43. def _start_controller(
  44. http_options: Union[None, dict, HTTPOptions] = None,
  45. grpc_options: Union[None, dict, gRPCOptions] = None,
  46. global_logging_config: Union[None, dict, LoggingConfig] = None,
  47. **kwargs,
  48. ) -> ActorHandle:
  49. """Start Ray Serve controller.
  50. The function makes sure controller is ready to start deploying apps
  51. after it returns.
  52. Parameters are same as ray.serve._private.api.serve_start().
  53. Returns: controller actor handle.
  54. """
  55. # Initialize ray if needed.
  56. ray._private.worker.global_worker._filter_logs_by_job = False
  57. if not ray.is_initialized():
  58. ray.init(namespace=SERVE_NAMESPACE)
  59. # Legacy http proxy actor check
  60. http_deprecated_args = ["http_host", "http_port", "http_middlewares"]
  61. for key in http_deprecated_args:
  62. if key in kwargs:
  63. raise ValueError(
  64. f"{key} is deprecated, please use serve.start(http_options="
  65. f'{{"{key}": {kwargs[key]}}}) instead.'
  66. )
  67. if isinstance(http_options, dict):
  68. http_options = HTTPOptions.parse_obj(http_options)
  69. if http_options is None:
  70. http_options = HTTPOptions()
  71. if isinstance(grpc_options, dict):
  72. grpc_options = gRPCOptions(**grpc_options)
  73. if global_logging_config is None:
  74. global_logging_config = LoggingConfig()
  75. elif isinstance(global_logging_config, dict):
  76. global_logging_config = LoggingConfig(**global_logging_config)
  77. controller_impl = get_controller_impl()
  78. controller = controller_impl.remote(
  79. http_options=http_options,
  80. grpc_options=grpc_options,
  81. global_logging_config=global_logging_config,
  82. )
  83. proxy_handles = ray.get(controller.get_proxies.remote())
  84. if len(proxy_handles) > 0:
  85. try:
  86. ray.get(
  87. [handle.ready.remote() for handle in proxy_handles.values()],
  88. timeout=HTTP_PROXY_TIMEOUT,
  89. )
  90. except ray.exceptions.GetTimeoutError:
  91. raise TimeoutError(
  92. f"HTTP proxies not available after {HTTP_PROXY_TIMEOUT}s."
  93. )
  94. return controller
  95. async def serve_start_async(
  96. http_options: Union[None, dict, HTTPOptions] = None,
  97. grpc_options: Union[None, dict, gRPCOptions] = None,
  98. global_logging_config: Union[None, dict, LoggingConfig] = None,
  99. **kwargs,
  100. ) -> ServeControllerClient:
  101. """Initialize a serve instance asynchronously.
  102. This function is not thread-safe. The caller should maintain the async lock in order
  103. to start the serve instance asynchronously.
  104. This function has the same functionality as ray.serve._private.api.serve_start().
  105. Parameters & Returns are same as ray.serve._private.api.serve_start().
  106. """
  107. usage_lib.record_library_usage("serve")
  108. try:
  109. client = _get_global_client(_health_check_controller=True)
  110. logger.info(
  111. f'Connecting to existing Serve app in namespace "{SERVE_NAMESPACE}".'
  112. " New http options will not be applied."
  113. )
  114. if http_options:
  115. _check_http_options(client, http_options)
  116. return client
  117. except RayServeException:
  118. pass
  119. controller = (
  120. await ray.remote(_start_controller)
  121. .options(num_cpus=0)
  122. .remote(http_options, grpc_options, global_logging_config, **kwargs)
  123. )
  124. client = ServeControllerClient(
  125. controller,
  126. )
  127. _set_global_client(client)
  128. logger.info(f'Started Serve in namespace "{SERVE_NAMESPACE}".')
  129. return client
  130. def serve_start(
  131. http_options: Union[None, dict, HTTPOptions] = None,
  132. grpc_options: Union[None, dict, gRPCOptions] = None,
  133. global_logging_config: Union[None, dict, LoggingConfig] = None,
  134. **kwargs,
  135. ) -> ServeControllerClient:
  136. """Initialize a serve instance.
  137. By default, the instance will be scoped to the lifetime of the returned
  138. Client object (or when the script exits). This is
  139. only relevant if connecting to a long-running Ray cluster (e.g., with
  140. ray.init(address="auto") or ray.init("ray://<remote_addr>")).
  141. Args:
  142. http_options (Optional[Dict, serve.HTTPOptions]): Configuration options
  143. for HTTP proxy. You can pass in a dictionary or HTTPOptions object
  144. with fields:
  145. - host(str, None): Host for HTTP servers to listen on. Defaults to
  146. "127.0.0.1". To expose Serve publicly, you probably want to set
  147. this to "0.0.0.0".
  148. - port(int): Port for HTTP server. Defaults to 8000.
  149. - root_path(str): Root path to mount the serve application
  150. (for example, "/serve"). All deployment routes will be prefixed
  151. with this path. Defaults to "".
  152. - middlewares(list): A list of Starlette middlewares that will be
  153. applied to the HTTP servers in the cluster. Defaults to [].
  154. - location(str, serve.config.DeploymentMode): The deployment
  155. location of HTTP servers:
  156. - "HeadOnly": start one HTTP server on the head node. Serve
  157. assumes the head node is the node you executed serve.start
  158. on. This is the default.
  159. - "EveryNode": start one HTTP server per node.
  160. - "NoServer" or None: disable HTTP server.
  161. - num_cpus (int): The number of CPU cores to reserve for each
  162. internal Serve HTTP proxy actor. Defaults to 0.
  163. grpc_options: [Experimental] Configuration options for gRPC proxy.
  164. You can pass in a gRPCOptions object with fields:
  165. - port(int): Port for gRPC server. Defaults to 9000.
  166. - grpc_servicer_functions(list): List of import paths for gRPC
  167. `add_servicer_to_server` functions to add to Serve's gRPC proxy.
  168. Default empty list, meaning not to start the gRPC server.
  169. """
  170. usage_lib.record_library_usage("serve")
  171. try:
  172. client = _get_global_client(_health_check_controller=True)
  173. logger.info(
  174. f'Connecting to existing Serve app in namespace "{SERVE_NAMESPACE}".'
  175. " New http options will not be applied."
  176. )
  177. if http_options:
  178. _check_http_options(client, http_options)
  179. return client
  180. except RayServeException:
  181. pass
  182. controller = _start_controller(
  183. http_options, grpc_options, global_logging_config, **kwargs
  184. )
  185. client = ServeControllerClient(
  186. controller,
  187. )
  188. _set_global_client(client)
  189. logger.info(f'Started Serve in namespace "{SERVE_NAMESPACE}".')
  190. return client
  191. def call_user_app_builder_with_args_if_necessary(
  192. builder: Union[Application, FunctionType],
  193. args: Dict[str, Any],
  194. ) -> Application:
  195. """Calls a user-provided function that returns Serve application.
  196. If an Application object is passed, this is a no-op.
  197. Else, we validate the signature of the function, convert the args dictionary to
  198. the user-annotated Pydantic model if provided, and call the function.
  199. The output of the function is returned (must be an Application).
  200. """
  201. if isinstance(builder, Application):
  202. if len(args) > 0:
  203. raise ValueError(
  204. "Arguments can only be passed to an application builder function, "
  205. "not an already built application."
  206. )
  207. return builder
  208. elif not isinstance(builder, FunctionType):
  209. raise TypeError(
  210. "Expected a built Serve application or an application builder function "
  211. f"but got: {type(builder)}."
  212. )
  213. # Check that the builder only takes a single argument.
  214. # TODO(edoakes): we may want to loosen this to allow optional kwargs in the future.
  215. signature = inspect.signature(builder)
  216. if len(signature.parameters) != 1:
  217. raise TypeError(
  218. "Application builder functions should take exactly one parameter, "
  219. "a dictionary containing the passed arguments."
  220. )
  221. # If the sole argument to the builder is a pydantic model, convert the args dict to
  222. # that model. This will perform standard pydantic validation (e.g., raise an
  223. # exception if required fields are missing).
  224. param = signature.parameters[list(signature.parameters.keys())[0]]
  225. if inspect.isclass(param.annotation) and is_subclass_of_base_model(
  226. param.annotation
  227. ):
  228. args = param.annotation.parse_obj(args)
  229. app = builder(args)
  230. if not isinstance(app, Application):
  231. raise TypeError(
  232. "Application builder functions must return an `Application` returned "
  233. f"`from `Deployment.bind()`, but got: {type(app)}."
  234. )
  235. return app