api.py 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114
  1. import collections
  2. import inspect
  3. import logging
  4. from functools import wraps
  5. from typing import Any, Callable, Dict, List, Optional, Sequence, Type, Union
  6. from attr import dataclass
  7. from fastapi import APIRouter, FastAPI
  8. from starlette.types import ASGIApp
  9. import ray
  10. from ray import cloudpickle
  11. from ray._common.serialization import pickle_dumps
  12. from ray.serve._private.build_app import build_app
  13. from ray.serve._private.config import (
  14. DeploymentConfig,
  15. ReplicaConfig,
  16. handle_num_replicas_auto,
  17. prepare_imperative_http_options,
  18. )
  19. from ray.serve._private.constants import (
  20. RAY_SERVE_FORCE_LOCAL_TESTING_MODE,
  21. SERVE_DEFAULT_APP_NAME,
  22. SERVE_LOGGER_NAME,
  23. )
  24. from ray.serve._private.http_util import (
  25. ASGIAppReplicaWrapper,
  26. make_fastapi_class_based_view,
  27. )
  28. from ray.serve._private.local_testing_mode import make_local_deployment_handle
  29. from ray.serve._private.logging_utils import configure_component_logger
  30. from ray.serve._private.usage import ServeUsageTag
  31. from ray.serve._private.utils import (
  32. DEFAULT,
  33. Default,
  34. copy_class_metadata,
  35. ensure_serialization_context,
  36. extract_self_if_method_call,
  37. validate_route_prefix,
  38. wait_for_interrupt,
  39. )
  40. from ray.serve.config import (
  41. AutoscalingConfig,
  42. HTTPOptions,
  43. ProxyLocation,
  44. RequestRouterConfig,
  45. gRPCOptions,
  46. )
  47. from ray.serve.context import (
  48. ReplicaContext,
  49. _get_global_client,
  50. _get_internal_replica_context,
  51. _set_global_client,
  52. )
  53. from ray.serve.deployment import Application, Deployment
  54. from ray.serve.exceptions import RayServeException
  55. from ray.serve.handle import DeploymentHandle
  56. from ray.serve.multiplex import _ModelMultiplexWrapper
  57. from ray.serve.schema import LoggingConfig, ServeInstanceDetails, ServeStatus
  58. from ray.util.annotations import DeveloperAPI, PublicAPI
  59. from ray.serve._private import api as _private_api # isort:skip
  60. logger = logging.getLogger(SERVE_LOGGER_NAME)
  61. @PublicAPI(stability="stable")
  62. def start(
  63. proxy_location: Union[None, str, ProxyLocation] = None,
  64. http_options: Union[None, dict, HTTPOptions] = None,
  65. grpc_options: Union[None, dict, gRPCOptions] = None,
  66. logging_config: Union[None, dict, LoggingConfig] = None,
  67. **kwargs,
  68. ):
  69. """Start Serve on the cluster.
  70. Used to set cluster-scoped configurations such as HTTP options. In most cases, this
  71. does not need to be called manually and Serve will be started when an application is
  72. first deployed to the cluster.
  73. These cluster-scoped options cannot be updated dynamically. To update them, start a
  74. new cluster or shut down Serve on the cluster and start it again.
  75. These options can also be set in the config file deployed via REST API.
  76. Args:
  77. proxy_location: Where to run proxies that handle ingress traffic to the
  78. cluster (defaults to every node in the cluster with at least one replica on
  79. it). See `ProxyLocation` for supported options.
  80. http_options: HTTP config options for the proxies. These can be passed as an
  81. unstructured dictionary or the structured `HTTPOptions` class. See
  82. `HTTPOptions` for supported options.
  83. grpc_options: [EXPERIMENTAL] gRPC config options for the proxies. These can
  84. be passed as an unstructured dictionary or the structured `gRPCOptions`
  85. class See `gRPCOptions` for supported options.
  86. logging_config: logging config options for the serve component (
  87. controller & proxy).
  88. """
  89. http_options = prepare_imperative_http_options(proxy_location, http_options)
  90. _private_api.serve_start(
  91. http_options=http_options,
  92. grpc_options=grpc_options,
  93. global_logging_config=logging_config,
  94. **kwargs,
  95. )
  96. @PublicAPI(stability="stable")
  97. def shutdown():
  98. """Completely shut down Serve on the cluster.
  99. Deletes all applications and shuts down Serve system actors.
  100. """
  101. try:
  102. client = _get_global_client()
  103. except RayServeException:
  104. logger.info(
  105. "Nothing to shut down. There's no Serve application "
  106. "running on this Ray cluster."
  107. )
  108. return
  109. client.shutdown()
  110. _set_global_client(None)
  111. @PublicAPI(stability="alpha")
  112. async def shutdown_async():
  113. """Completely shut down Serve on the cluster asynchronously.
  114. Deletes all applications and shuts down Serve system actors.
  115. """
  116. try:
  117. client = _get_global_client()
  118. except RayServeException:
  119. logger.info(
  120. "Nothing to shut down. There's no Serve application "
  121. "running on this Ray cluster."
  122. )
  123. return
  124. await client.shutdown_async()
  125. _set_global_client(None)
  126. @DeveloperAPI
  127. def get_replica_context() -> ReplicaContext:
  128. """Returns the deployment and replica tag from within a replica at runtime.
  129. A replica tag uniquely identifies a single replica for a Ray Serve
  130. deployment.
  131. Raises:
  132. RayServeException: if not called from within a Ray Serve deployment.
  133. Example:
  134. .. code-block:: python
  135. from ray import serve
  136. @serve.deployment
  137. class MyDeployment:
  138. def __init__(self):
  139. # Prints "MyDeployment"
  140. print(serve.get_replica_context().deployment)
  141. """
  142. internal_replica_context = _get_internal_replica_context()
  143. if internal_replica_context is None:
  144. raise RayServeException(
  145. "`serve.get_replica_context()` "
  146. "may only be called from within a "
  147. "Ray Serve deployment."
  148. )
  149. return internal_replica_context
  150. @PublicAPI(stability="stable")
  151. def ingress(app: Union[ASGIApp, Callable]) -> Callable:
  152. """Wrap a deployment class with an ASGI application for HTTP request parsing.
  153. There are a few different ways to use this functionality.
  154. Example:
  155. FastAPI app routes are defined inside the deployment class.
  156. .. code-block:: python
  157. from ray import serve
  158. from fastapi import FastAPI
  159. app = FastAPI()
  160. @serve.deployment
  161. @serve.ingress(app)
  162. class MyFastAPIDeployment:
  163. @app.get("/hi")
  164. def say_hi(self) -> str:
  165. return "Hello world!"
  166. app = MyFastAPIDeployment.bind()
  167. You can also use a standalone FastAPI app without registering
  168. routes inside the deployment.
  169. .. code-block:: python
  170. from ray import serve
  171. from fastapi import FastAPI
  172. app = FastAPI()
  173. @app.get("/hi")
  174. def say_hi():
  175. return "Hello world!"
  176. deployment = serve.deployment(serve.ingress(app)())
  177. app = deployment.bind()
  178. You can also pass in a builder function that returns an ASGI app.
  179. The builder function is evaluated when the deployment is initialized on
  180. replicas. This example shows how to use a sub-deployment inside the routes
  181. defined outside the deployment class.
  182. .. code-block:: python
  183. from ray import serve
  184. @serve.deployment
  185. class SubDeployment:
  186. def __call__(self):
  187. return "Hello world!"
  188. def build_asgi_app():
  189. from fastapi import FastAPI
  190. app = FastAPI()
  191. def get_sub_deployment_handle():
  192. return serve.get_deployment_handle(SubDeployment.name, app_name="my_app")
  193. @app.get("/hi")
  194. async def say_hi(handle: Depends(get_sub_deployment_handle)):
  195. return await handle.remote()
  196. return app
  197. deployment = serve.deployment(serve.ingress(build_asgi_app)())
  198. app = deployment.bind(SubDeployment.bind(), name="my_app", route_prefix="/")
  199. Args:
  200. app: the FastAPI app to wrap this class with.
  201. Can be any ASGI-compatible callable.
  202. You can also pass in a builder function that returns an ASGI app.
  203. """
  204. def decorator(cls: Optional[Type[Any]] = None) -> Callable:
  205. if cls is None:
  206. class ASGIIngressDeployment:
  207. def __init__(self, *args, **kwargs):
  208. self.args = args
  209. self.kwargs = kwargs
  210. cls = ASGIIngressDeployment
  211. if not inspect.isclass(cls):
  212. raise ValueError("@serve.ingress must be used with a class.")
  213. if issubclass(cls, collections.abc.Callable):
  214. raise ValueError(
  215. "Classes passed to @serve.ingress may not have __call__ method."
  216. )
  217. # Sometimes there are decorators on the methods. We want to fix
  218. # the fast api routes here.
  219. if isinstance(app, (FastAPI, APIRouter)):
  220. make_fastapi_class_based_view(app, cls)
  221. frozen_app_or_func: Union[ASGIApp, Callable] = None
  222. if inspect.isfunction(app):
  223. frozen_app_or_func = app
  224. else:
  225. # Free the state of the app so subsequent modification won't affect
  226. # this ingress deployment. We don't use copy.copy here to avoid
  227. # recursion issue.
  228. ensure_serialization_context()
  229. frozen_app_or_func = cloudpickle.loads(
  230. pickle_dumps(app, error_msg="Failed to serialize the ASGI app.")
  231. )
  232. class ASGIIngressWrapper(cls, ASGIAppReplicaWrapper):
  233. def __init__(self, *args, **kwargs):
  234. # Call user-defined constructor.
  235. cls.__init__(self, *args, **kwargs)
  236. ServeUsageTag.FASTAPI_USED.record("1")
  237. ASGIAppReplicaWrapper.__init__(self, frozen_app_or_func)
  238. async def __del__(self):
  239. await ASGIAppReplicaWrapper.__del__(self)
  240. # Call user-defined destructor if defined.
  241. if hasattr(cls, "__del__"):
  242. if inspect.iscoroutinefunction(cls.__del__):
  243. await cls.__del__(self)
  244. else:
  245. cls.__del__(self)
  246. copy_class_metadata(ASGIIngressWrapper, cls)
  247. return ASGIIngressWrapper
  248. return decorator
  249. @PublicAPI(stability="stable")
  250. def deployment(
  251. _func_or_class: Optional[Callable] = None,
  252. name: Default[str] = DEFAULT.VALUE,
  253. version: Default[str] = DEFAULT.VALUE,
  254. num_replicas: Default[Optional[Union[int, str]]] = DEFAULT.VALUE,
  255. route_prefix: Default[Union[str, None]] = DEFAULT.VALUE,
  256. ray_actor_options: Default[Dict] = DEFAULT.VALUE,
  257. placement_group_bundles: Default[List[Dict[str, float]]] = DEFAULT.VALUE,
  258. placement_group_strategy: Default[str] = DEFAULT.VALUE,
  259. placement_group_bundle_label_selector: Default[
  260. List[Dict[str, str]]
  261. ] = DEFAULT.VALUE,
  262. max_replicas_per_node: Default[int] = DEFAULT.VALUE,
  263. user_config: Default[Optional[Any]] = DEFAULT.VALUE,
  264. max_ongoing_requests: Default[int] = DEFAULT.VALUE,
  265. max_queued_requests: Default[int] = DEFAULT.VALUE,
  266. autoscaling_config: Default[Union[Dict, AutoscalingConfig, None]] = DEFAULT.VALUE,
  267. graceful_shutdown_wait_loop_s: Default[float] = DEFAULT.VALUE,
  268. graceful_shutdown_timeout_s: Default[float] = DEFAULT.VALUE,
  269. health_check_period_s: Default[float] = DEFAULT.VALUE,
  270. health_check_timeout_s: Default[float] = DEFAULT.VALUE,
  271. logging_config: Default[Union[Dict, LoggingConfig, None]] = DEFAULT.VALUE,
  272. request_router_config: Default[
  273. Union[Dict, RequestRouterConfig, None]
  274. ] = DEFAULT.VALUE,
  275. max_constructor_retry_count: Default[int] = DEFAULT.VALUE,
  276. ) -> Callable[[Callable], Deployment]:
  277. """Decorator that converts a Python class to a `Deployment`.
  278. Example:
  279. .. code-block:: python
  280. from ray import serve
  281. @serve.deployment(num_replicas=2)
  282. class MyDeployment:
  283. pass
  284. app = MyDeployment.bind()
  285. Args:
  286. _func_or_class: The class or function to be decorated.
  287. name: Name uniquely identifying this deployment within the application.
  288. If not provided, the name of the class or function is used.
  289. version: Version of the deployment. Deprecated.
  290. num_replicas: Number of replicas to run that handle requests to
  291. this deployment. Defaults to 1.
  292. route_prefix: Route prefix for HTTP requests. Defaults to '/'. Deprecated.
  293. ray_actor_options: Options to pass to the Ray Actor decorator, such as
  294. resource requirements. Valid options are: `accelerator_type`, `memory`,
  295. `num_cpus`, `num_gpus`, `resources`, `runtime_env`, and `label_selector`.
  296. placement_group_bundles: Defines a set of placement group bundles to be
  297. scheduled *for each replica* of this deployment. The replica actor will
  298. be scheduled in the first bundle provided, so the resources specified in
  299. `ray_actor_options` must be a subset of the first bundle's resources. All
  300. actors and tasks created by the replica actor will be scheduled in the
  301. placement group by default (`placement_group_capture_child_tasks` is set
  302. to True).
  303. This cannot be set together with max_replicas_per_node.
  304. placement_group_strategy: Strategy to use for the replica placement group
  305. specified via `placement_group_bundles`. Defaults to `PACK`.
  306. placement_group_bundle_label_selector: A list of label selectors to apply to the
  307. placement group on a per-bundle level. If a single label selector is provided,
  308. it is applied to all bundles. Otherwise, the length must match `placement_group_bundles`.
  309. max_replicas_per_node: The max number of replicas of this deployment that can
  310. run on a single node. Valid values are None (default, no limit)
  311. or an integer in the range of [1, 100].
  312. This cannot be set together with placement_group_bundles.
  313. user_config: Config to pass to the reconfigure method of the deployment. This
  314. can be updated dynamically without restarting the replicas of the
  315. deployment. The user_config must be fully JSON-serializable.
  316. max_ongoing_requests: Maximum number of requests that are sent to a
  317. replica of this deployment without receiving a response. Defaults to 5.
  318. max_queued_requests: [EXPERIMENTAL] Maximum number of requests to this
  319. deployment that will be queued at each *caller* (proxy or DeploymentHandle).
  320. Once this limit is reached, subsequent requests will raise a
  321. BackPressureError (for handles) or return an HTTP 503 status code (for HTTP
  322. requests). Defaults to -1 (no limit).
  323. autoscaling_config: Parameters to configure autoscaling behavior. If this
  324. is set, `num_replicas` should be "auto" or not set.
  325. graceful_shutdown_wait_loop_s: Duration that replicas wait until there is
  326. no more work to be done before shutting down. Defaults to 2s.
  327. graceful_shutdown_timeout_s: Duration to wait for a replica to gracefully
  328. shut down before being forcefully killed. Defaults to 20s.
  329. health_check_period_s: Duration between health check calls for the replica.
  330. Defaults to 10s. The health check is by default a no-op Actor call to the
  331. replica, but you can define your own health check using the "check_health"
  332. method in your deployment that raises an exception when unhealthy.
  333. health_check_timeout_s: Duration in seconds, that replicas wait for a health
  334. check method to return before considering it as failed. Defaults to 30s.
  335. logging_config: Logging config options for the deployment. If provided,
  336. the config will be used to set up the Serve logger on the deployment.
  337. request_router_config: Config for the request router used for this deployment.
  338. max_constructor_retry_count: Maximum number of times to retry the deployment
  339. constructor. Defaults to 20.
  340. Returns:
  341. `Deployment`
  342. """
  343. if route_prefix is not DEFAULT.VALUE:
  344. raise ValueError(
  345. "`route_prefix` can no longer be specified at the deployment level. "
  346. "Pass it to `serve.run` or in the application config instead."
  347. )
  348. if max_ongoing_requests is None:
  349. raise ValueError("`max_ongoing_requests` must be non-null, got None.")
  350. if num_replicas == "auto":
  351. num_replicas = None
  352. max_ongoing_requests, autoscaling_config = handle_num_replicas_auto(
  353. max_ongoing_requests, autoscaling_config
  354. )
  355. ServeUsageTag.AUTO_NUM_REPLICAS_USED.record("1")
  356. # NOTE: The user_configured_option_names should be the first thing that's
  357. # defined in this function. It depends on the locals() dictionary storing
  358. # only the function args/kwargs.
  359. # Create list of all user-configured options from keyword args
  360. user_configured_option_names = [
  361. option
  362. for option, value in locals().items()
  363. if option != "_func_or_class" and value is not DEFAULT.VALUE
  364. ]
  365. # Num of replicas should not be 0.
  366. # TODO(Sihan) separate num_replicas attribute from internal and api
  367. if num_replicas == 0:
  368. raise ValueError("num_replicas is expected to larger than 0")
  369. if num_replicas not in [DEFAULT.VALUE, None, "auto"] and autoscaling_config not in [
  370. DEFAULT.VALUE,
  371. None,
  372. ]:
  373. raise ValueError(
  374. "Manually setting num_replicas is not allowed when "
  375. "autoscaling_config is provided."
  376. )
  377. if version is not DEFAULT.VALUE:
  378. logger.warning(
  379. "DeprecationWarning: `version` in `@serve.deployment` has been deprecated. "
  380. "Explicitly specifying version will raise an error in the future!"
  381. )
  382. if isinstance(logging_config, LoggingConfig):
  383. logging_config = logging_config.dict()
  384. deployment_config = DeploymentConfig.from_default(
  385. num_replicas=num_replicas if num_replicas is not None else 1,
  386. user_config=user_config,
  387. max_ongoing_requests=max_ongoing_requests,
  388. max_queued_requests=max_queued_requests,
  389. autoscaling_config=autoscaling_config,
  390. graceful_shutdown_wait_loop_s=graceful_shutdown_wait_loop_s,
  391. graceful_shutdown_timeout_s=graceful_shutdown_timeout_s,
  392. health_check_period_s=health_check_period_s,
  393. health_check_timeout_s=health_check_timeout_s,
  394. logging_config=logging_config,
  395. request_router_config=request_router_config,
  396. max_constructor_retry_count=max_constructor_retry_count,
  397. )
  398. deployment_config.user_configured_option_names = set(user_configured_option_names)
  399. def decorator(_func_or_class):
  400. replica_config = ReplicaConfig.create(
  401. _func_or_class,
  402. init_args=None,
  403. init_kwargs=None,
  404. ray_actor_options=(
  405. ray_actor_options if ray_actor_options is not DEFAULT.VALUE else None
  406. ),
  407. placement_group_bundles=(
  408. placement_group_bundles
  409. if placement_group_bundles is not DEFAULT.VALUE
  410. else None
  411. ),
  412. placement_group_strategy=(
  413. placement_group_strategy
  414. if placement_group_strategy is not DEFAULT.VALUE
  415. else None
  416. ),
  417. placement_group_bundle_label_selector=(
  418. placement_group_bundle_label_selector
  419. if placement_group_bundle_label_selector is not DEFAULT.VALUE
  420. else None
  421. ),
  422. # TODO(ryanaoleary@): add placement_group_fallback_strategy when
  423. # fallback_strategy support is added to placement group options.
  424. max_replicas_per_node=(
  425. max_replicas_per_node
  426. if max_replicas_per_node is not DEFAULT.VALUE
  427. else None
  428. ),
  429. )
  430. return Deployment(
  431. name if name is not DEFAULT.VALUE else _func_or_class.__name__,
  432. deployment_config,
  433. replica_config,
  434. version=(version if version is not DEFAULT.VALUE else None),
  435. _internal=True,
  436. )
  437. # This handles both parametrized and non-parametrized usage of the
  438. # decorator. See the @serve.batch code for more details.
  439. return decorator(_func_or_class) if callable(_func_or_class) else decorator
  440. @DeveloperAPI
  441. @dataclass(frozen=True)
  442. class RunTarget:
  443. """Represents a Serve application to run for `serve.run_many`."""
  444. target: Application
  445. name: str = SERVE_DEFAULT_APP_NAME
  446. route_prefix: Optional[str] = "/"
  447. logging_config: Optional[Union[Dict, LoggingConfig]] = None
  448. external_scaler_enabled: bool = False
  449. @DeveloperAPI
  450. def _run_many(
  451. targets: Sequence[RunTarget],
  452. wait_for_ingress_deployment_creation: bool = True,
  453. wait_for_applications_running: bool = True,
  454. _local_testing_mode: bool = False,
  455. ) -> List[DeploymentHandle]:
  456. """Run many applications and return the handles to their ingress deployments.
  457. This is only used internally with the _blocking not totally blocking the following
  458. code indefinitely until Ctrl-C'd.
  459. """
  460. if not targets:
  461. raise ValueError("No applications provided.")
  462. if RAY_SERVE_FORCE_LOCAL_TESTING_MODE:
  463. if not _local_testing_mode:
  464. logger.info("Overriding local_testing_mode=True from environment variable.")
  465. _local_testing_mode = True
  466. built_apps = []
  467. for t in targets:
  468. if len(t.name) == 0:
  469. raise RayServeException("Application name must a non-empty string.")
  470. if not isinstance(t.target, Application):
  471. raise TypeError(
  472. "`serve.run` expects an `Application` returned by `Deployment.bind()`."
  473. )
  474. validate_route_prefix(t.route_prefix)
  475. built_apps.append(
  476. build_app(
  477. t.target,
  478. name=t.name,
  479. route_prefix=t.route_prefix,
  480. logging_config=t.logging_config,
  481. make_deployment_handle=make_local_deployment_handle
  482. if _local_testing_mode
  483. else None,
  484. default_runtime_env=ray.get_runtime_context().runtime_env
  485. if not _local_testing_mode
  486. else None,
  487. external_scaler_enabled=t.external_scaler_enabled,
  488. )
  489. )
  490. if _local_testing_mode:
  491. # implicitly use the last target's logging config (if provided) in local testing mode
  492. logging_config = t.logging_config or LoggingConfig()
  493. if not isinstance(logging_config, LoggingConfig):
  494. logging_config = LoggingConfig(**(logging_config or {}))
  495. configure_component_logger(
  496. component_name="local_test",
  497. component_id="-",
  498. logging_config=logging_config,
  499. stream_handler_only=True,
  500. )
  501. return [b.deployment_handles[b.ingress_deployment_name] for b in built_apps]
  502. else:
  503. client = _private_api.serve_start(
  504. http_options={"location": "EveryNode"},
  505. global_logging_config=None,
  506. )
  507. # Record after Ray has been started.
  508. ServeUsageTag.API_VERSION.record("v2")
  509. handles = client.deploy_applications(
  510. built_apps,
  511. wait_for_ingress_deployment_creation=wait_for_ingress_deployment_creation,
  512. wait_for_applications_running=wait_for_applications_running,
  513. )
  514. client.wait_for_proxies_serving(
  515. wait_for_applications_running=wait_for_applications_running
  516. )
  517. return handles
  518. @PublicAPI(stability="stable")
  519. def _run(
  520. target: Application,
  521. *,
  522. _blocking: bool = True,
  523. name: str = SERVE_DEFAULT_APP_NAME,
  524. route_prefix: Optional[str] = "/",
  525. logging_config: Optional[Union[Dict, LoggingConfig]] = None,
  526. _local_testing_mode: bool = False,
  527. external_scaler_enabled: bool = False,
  528. ) -> DeploymentHandle:
  529. """Run an application and return a handle to its ingress deployment.
  530. This is only used internally with the _blocking not totally blocking the following
  531. code indefinitely until Ctrl-C'd.
  532. """
  533. return _run_many(
  534. [
  535. RunTarget(
  536. target=target,
  537. name=name,
  538. route_prefix=route_prefix,
  539. logging_config=logging_config,
  540. external_scaler_enabled=external_scaler_enabled,
  541. )
  542. ],
  543. wait_for_applications_running=_blocking,
  544. _local_testing_mode=_local_testing_mode,
  545. )[0]
  546. @DeveloperAPI
  547. def run_many(
  548. targets: Sequence[RunTarget],
  549. blocking: bool = False,
  550. wait_for_ingress_deployment_creation: bool = True,
  551. wait_for_applications_running: bool = True,
  552. _local_testing_mode: bool = False,
  553. ) -> List[DeploymentHandle]:
  554. """Run many applications and return the handles to their ingress deployments.
  555. Args:
  556. targets:
  557. A sequence of `RunTarget`,
  558. each containing information about an application to deploy.
  559. blocking: Whether this call should be blocking. If True, it
  560. will loop and log status until Ctrl-C'd.
  561. wait_for_ingress_deployment_creation: Whether to wait for the ingress
  562. deployments to be created.
  563. wait_for_applications_running: Whether to wait for the applications to be
  564. running. Note that this effectively implies
  565. `wait_for_ingress_deployment_creation=True`,
  566. because the ingress deployments must be created
  567. before the applications can be running.
  568. Returns:
  569. List[DeploymentHandle]: A list of handles that can be used
  570. to call the applications.
  571. """
  572. handles = _run_many(
  573. targets,
  574. wait_for_ingress_deployment_creation=wait_for_ingress_deployment_creation,
  575. wait_for_applications_running=wait_for_applications_running,
  576. _local_testing_mode=_local_testing_mode,
  577. )
  578. if blocking:
  579. wait_for_interrupt()
  580. return handles
  581. @PublicAPI(stability="stable")
  582. def run(
  583. target: Application,
  584. blocking: bool = False,
  585. name: str = SERVE_DEFAULT_APP_NAME,
  586. route_prefix: Optional[str] = "/",
  587. logging_config: Optional[Union[Dict, LoggingConfig]] = None,
  588. _local_testing_mode: bool = False,
  589. external_scaler_enabled: bool = False,
  590. ) -> DeploymentHandle:
  591. """Run an application and return a handle to its ingress deployment.
  592. The application is returned by `Deployment.bind()`. Example:
  593. .. code-block:: python
  594. handle = serve.run(MyDeployment.bind())
  595. ray.get(handle.remote())
  596. Args:
  597. target:
  598. A Serve application returned by `Deployment.bind()`.
  599. blocking: Whether this call should be blocking. If True, it
  600. will loop and log status until Ctrl-C'd.
  601. name: Application name. If not provided, this will be the only
  602. application running on the cluster (it will delete all others).
  603. route_prefix: Route prefix for HTTP requests. Defaults to '/'.
  604. If `None` is passed, the application will not be exposed over HTTP
  605. (this may be useful if you only want the application to be exposed via
  606. gRPC or a `DeploymentHandle`).
  607. logging_config: Application logging config. If provided, the config will
  608. be applied to all deployments which doesn't have logging config.
  609. external_scaler_enabled: Whether external autoscaling is enabled for
  610. this application.
  611. Returns:
  612. DeploymentHandle: A handle that can be used to call the application.
  613. """
  614. handle = _run(
  615. target=target,
  616. name=name,
  617. route_prefix=route_prefix,
  618. logging_config=logging_config,
  619. _local_testing_mode=_local_testing_mode,
  620. external_scaler_enabled=external_scaler_enabled,
  621. )
  622. if blocking:
  623. wait_for_interrupt()
  624. return handle
  625. @PublicAPI(stability="stable")
  626. def delete(name: str, _blocking: bool = True):
  627. """Delete an application by its name.
  628. Deletes the app with all corresponding deployments.
  629. """
  630. client = _get_global_client()
  631. client.delete_apps([name], blocking=_blocking)
  632. @PublicAPI(stability="beta")
  633. def multiplexed(
  634. func: Optional[Callable[..., Any]] = None, max_num_models_per_replica: int = 3
  635. ):
  636. """Wrap a callable or method used to load multiplexed models in a replica.
  637. The function can be standalone function or a method of a class. The
  638. function must have exactly one argument, the model id of type `str` for the
  639. model to be loaded.
  640. It is required to define the function with `async def` and the function must be
  641. an async function. It is recommended to define coroutines for long running
  642. IO tasks in the function to avoid blocking the event loop.
  643. The multiplexed function is called to load a model with the given model ID when
  644. necessary.
  645. When the number of models in one replica is larger than max_num_models_per_replica,
  646. the models will be unloaded using an LRU policy.
  647. If you want to release resources after the model is loaded, you can define
  648. a `__del__` method in your model class. The `__del__` method will be called when
  649. the model is unloaded.
  650. Example:
  651. .. code-block:: python
  652. from ray import serve
  653. @serve.deployment
  654. class MultiplexedDeployment:
  655. def __init__(self):
  656. # Define s3 base path to load models.
  657. self.s3_base_path = "s3://my_bucket/my_models"
  658. @serve.multiplexed(max_num_models_per_replica=5)
  659. async def load_model(self, model_id: str) -> Any:
  660. # Load model with the given tag
  661. # You can use any model loading library here
  662. # and return the loaded model. load_from_s3 is
  663. # a placeholder function.
  664. return load_from_s3(model_id)
  665. async def __call__(self, request):
  666. # Get the model_id from the request context.
  667. model_id = serve.get_multiplexed_model_id()
  668. # Load the model for the requested model_id.
  669. # If the model is already cached locally,
  670. # this will just be a dictionary lookup.
  671. model = await self.load_model(model_id)
  672. return model(request)
  673. Args:
  674. max_num_models_per_replica: the maximum number of models
  675. to be loaded on each replica. By default, it is 3, which
  676. means that each replica can cache up to 3 models. You can
  677. set it to a larger number if you have enough memory on
  678. the node resource, in opposite, you can set it to a smaller
  679. number if you want to save memory on the node resource.
  680. """
  681. if func is not None:
  682. if not callable(func):
  683. raise TypeError(
  684. "The `multiplexed` decorator must be used with a function or method."
  685. )
  686. # TODO(Sihan): Make the API accept the sync function as well.
  687. # https://github.com/ray-project/ray/issues/35356
  688. if not inspect.iscoroutinefunction(func):
  689. raise TypeError(
  690. "@serve.multiplexed can only be used to decorate async "
  691. "functions or methods."
  692. )
  693. signature = inspect.signature(func)
  694. if len(signature.parameters) == 0 or len(signature.parameters) > 2:
  695. raise TypeError(
  696. "@serve.multiplexed can only be used to decorate functions or methods "
  697. "with at least one 'model_id: str' argument."
  698. )
  699. if not isinstance(max_num_models_per_replica, int):
  700. raise TypeError("max_num_models_per_replica must be an integer.")
  701. if max_num_models_per_replica != -1 and max_num_models_per_replica <= 0:
  702. raise ValueError("max_num_models_per_replica must be positive.")
  703. def _multiplex_decorator(func: Callable):
  704. @wraps(func)
  705. async def _multiplex_wrapper(*args):
  706. args_check_error_msg = (
  707. "Functions decorated with `@serve.multiplexed` must take exactly one"
  708. "the multiplexed model ID (str), but got {}"
  709. )
  710. if not args:
  711. raise TypeError(
  712. args_check_error_msg.format("no arguments are provided.")
  713. )
  714. self = extract_self_if_method_call(args, func)
  715. # User defined multiplexed function can be a standalone function or a
  716. # method of a class. If it is a method of a class, the first argument
  717. # is self.
  718. if self is None:
  719. if len(args) != 1:
  720. raise TypeError(
  721. args_check_error_msg.format("more than one arguments.")
  722. )
  723. multiplex_object = func
  724. model_id = args[0]
  725. else:
  726. # count self as an argument
  727. if len(args) != 2:
  728. raise TypeError(
  729. args_check_error_msg.format("more than one arguments.")
  730. )
  731. multiplex_object = self
  732. model_id = args[1]
  733. multiplex_attr = "__serve_multiplex_wrapper"
  734. # If the multiplexed function is called for the first time,
  735. # create a model multiplex wrapper and cache it in the multiplex object.
  736. if not hasattr(multiplex_object, multiplex_attr):
  737. model_multiplex_wrapper = _ModelMultiplexWrapper(
  738. func, self, max_num_models_per_replica
  739. )
  740. setattr(multiplex_object, multiplex_attr, model_multiplex_wrapper)
  741. else:
  742. model_multiplex_wrapper = getattr(multiplex_object, multiplex_attr)
  743. return await model_multiplex_wrapper.load_model(model_id)
  744. return _multiplex_wrapper
  745. return _multiplex_decorator(func) if callable(func) else _multiplex_decorator
  746. @PublicAPI(stability="beta")
  747. def get_multiplexed_model_id() -> str:
  748. """Get the multiplexed model ID for the current request.
  749. This is used with a function decorated with `@serve.multiplexed`
  750. to retrieve the model ID for the current request.
  751. When called from within a batched function (decorated with `@serve.batch`),
  752. this returns the multiplexed model ID that is common to all requests in
  753. the current batch. This works because batches are automatically split
  754. by model ID to ensure all requests in a batch target the same model.
  755. .. code-block:: python
  756. import ray
  757. from ray import serve
  758. import requests
  759. # Set the multiplexed model id with the key
  760. # "ray_serve_multiplexed_model_id" in the request
  761. # headers when sending requests to the http proxy.
  762. requests.get("http://localhost:8000",
  763. headers={"ray_serve_multiplexed_model_id": "model_1"})
  764. # This can also be set when using `DeploymentHandle`.
  765. handle.options(multiplexed_model_id="model_1").remote("blablabla")
  766. # In your deployment code, you can retrieve the model id from
  767. # `get_multiplexed_model_id()`.
  768. @serve.deployment
  769. def my_deployment_function(request):
  770. assert serve.get_multiplexed_model_id() == "model_1"
  771. """
  772. # First check if we're inside a batch context. If so, get the model ID
  773. # from the batch request context. All requests in a batch are guaranteed
  774. # to have the same multiplexed_model_id (batches are split by model ID).
  775. batch_request_context = ray.serve.context._get_serve_batch_request_context()
  776. if batch_request_context:
  777. return batch_request_context[0].multiplexed_model_id
  778. # Fall back to the regular request context
  779. _request_context = ray.serve.context._get_serve_request_context()
  780. return _request_context.multiplexed_model_id
  781. @PublicAPI(stability="alpha")
  782. def status() -> ServeStatus:
  783. """Get the status of Serve on the cluster.
  784. Includes status of all HTTP Proxies, all active applications, and
  785. their deployments.
  786. .. code-block:: python
  787. @serve.deployment(num_replicas=2)
  788. class MyDeployment:
  789. pass
  790. serve.run(MyDeployment.bind())
  791. status = serve.status()
  792. assert status.applications["default"].status == "RUNNING"
  793. """
  794. client = _get_global_client(raise_if_no_controller_running=False)
  795. if client is None:
  796. # Serve has not started yet
  797. return ServeStatus()
  798. ServeUsageTag.SERVE_STATUS_API_USED.record("1")
  799. details = ServeInstanceDetails(**client.get_serve_details())
  800. return details._get_status()
  801. @PublicAPI(stability="alpha")
  802. def get_app_handle(name: str) -> DeploymentHandle:
  803. """Get a handle to the application's ingress deployment by name.
  804. Args:
  805. name: Name of application to get a handle to.
  806. Raises:
  807. RayServeException: If no Serve controller is running, or if the
  808. application does not exist.
  809. .. code-block:: python
  810. import ray
  811. from ray import serve
  812. @serve.deployment
  813. def f(val: int) -> int:
  814. return val * 2
  815. serve.run(f.bind(), name="my_app")
  816. handle = serve.get_app_handle("my_app")
  817. assert handle.remote(3).result() == 6
  818. """
  819. client = _get_global_client()
  820. ingress = ray.get(client._controller.get_ingress_deployment_name.remote(name))
  821. if ingress is None:
  822. raise RayServeException(f"Application '{name}' does not exist.")
  823. ServeUsageTag.SERVE_GET_APP_HANDLE_API_USED.record("1")
  824. # There is no need to check if the deployment exists since the
  825. # deployment name was just fetched from the controller
  826. return client.get_handle(ingress, name, check_exists=False)
  827. @DeveloperAPI
  828. def get_deployment_handle(
  829. deployment_name: str,
  830. app_name: Optional[str] = None,
  831. _check_exists: bool = True,
  832. _record_telemetry: bool = True,
  833. ) -> DeploymentHandle:
  834. """Get a handle to a deployment by name.
  835. This is a developer API and is for advanced Ray users and library developers.
  836. Args:
  837. deployment_name: Name of deployment to get a handle to.
  838. app_name: Application in which deployment resides. If calling
  839. from inside a Serve application and `app_name` is not
  840. specified, this will default to the application from which
  841. this API is called.
  842. Raises:
  843. RayServeException: If no Serve controller is running, or if
  844. calling from outside a Serve application and no application
  845. name is specified.
  846. The following example gets the handle to the ingress deployment of
  847. an application, which is equivalent to using `serve.get_app_handle`.
  848. .. testcode::
  849. import ray
  850. from ray import serve
  851. @serve.deployment
  852. def f(val: int) -> int:
  853. return val * 2
  854. serve.run(f.bind(), name="my_app")
  855. handle = serve.get_deployment_handle("f", app_name="my_app")
  856. assert handle.remote(3).result() == 6
  857. serve.shutdown()
  858. The following example demonstrates how you can use this API to get
  859. the handle to a non-ingress deployment in an application.
  860. .. testcode::
  861. import ray
  862. from ray import serve
  863. from ray.serve.handle import DeploymentHandle
  864. @serve.deployment
  865. class Multiplier:
  866. def __init__(self, multiple: int):
  867. self._multiple = multiple
  868. def __call__(self, val: int) -> int:
  869. return val * self._multiple
  870. @serve.deployment
  871. class Adder:
  872. def __init__(self, handle: DeploymentHandle, increment: int):
  873. self._handle = handle
  874. self._increment = increment
  875. async def __call__(self, val: int) -> int:
  876. return await self._handle.remote(val) + self._increment
  877. # The app calculates 2 * x + 3
  878. serve.run(Adder.bind(Multiplier.bind(2), 3), name="math_app")
  879. handle = serve.get_app_handle("math_app")
  880. assert handle.remote(5).result() == 13
  881. # Get handle to Multiplier only
  882. handle = serve.get_deployment_handle("Multiplier", app_name="math_app")
  883. assert handle.remote(5).result() == 10
  884. serve.shutdown()
  885. """
  886. client = _get_global_client()
  887. internal_replica_context = _get_internal_replica_context()
  888. if app_name is None:
  889. if internal_replica_context is None:
  890. raise RayServeException(
  891. "Please specify an application name when getting a deployment handle "
  892. "outside of a Serve application."
  893. )
  894. else:
  895. app_name = internal_replica_context.app_name
  896. if _record_telemetry:
  897. ServeUsageTag.SERVE_GET_DEPLOYMENT_HANDLE_API_USED.record("1")
  898. handle: DeploymentHandle = client.get_handle(
  899. deployment_name, app_name, check_exists=_check_exists
  900. )
  901. # Track handle creation if called from within a replica
  902. if (
  903. internal_replica_context is not None
  904. and internal_replica_context._handle_registration_callback is not None
  905. ):
  906. internal_replica_context._handle_registration_callback(handle.deployment_id)
  907. return handle