handle.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896
  1. import asyncio
  2. import concurrent.futures
  3. import logging
  4. import time
  5. import warnings
  6. from typing import (
  7. Any,
  8. AsyncIterator,
  9. Dict,
  10. Generator,
  11. Generic,
  12. Iterator,
  13. Optional,
  14. Tuple,
  15. TypeVar,
  16. Union,
  17. cast,
  18. )
  19. import ray
  20. from ray import serve
  21. from ray._raylet import ObjectRefGenerator # type: ignore[attr-defined]
  22. from ray.serve._private.common import (
  23. OBJ_REF_NOT_SUPPORTED_ERROR,
  24. DeploymentHandleSource,
  25. DeploymentID,
  26. RequestMetadata,
  27. )
  28. from ray.serve._private.constants import SERVE_LOGGER_NAME
  29. from ray.serve._private.default_impl import (
  30. CreateRouterCallable,
  31. create_dynamic_handle_options,
  32. create_init_handle_options,
  33. create_router,
  34. )
  35. from ray.serve._private.handle_options import (
  36. DynamicHandleOptionsBase,
  37. InitHandleOptionsBase,
  38. )
  39. from ray.serve._private.replica_result import ReplicaResult
  40. from ray.serve._private.router import Router
  41. from ray.serve._private.usage import ServeUsageTag
  42. from ray.serve._private.utils import (
  43. DEFAULT,
  44. calculate_remaining_timeout,
  45. get_random_string,
  46. inside_ray_client_context,
  47. is_running_in_asyncio_loop,
  48. )
  49. from ray.serve.exceptions import RayServeException, RequestCancelledError
  50. from ray.util import metrics
  51. from ray.util.annotations import DeveloperAPI, PublicAPI
  52. logger = logging.getLogger(SERVE_LOGGER_NAME)
  53. # TypeVar for the deployment class type in DeploymentHandle[T]
  54. T = TypeVar("T")
  55. # TypeVar for the response/result type in DeploymentResponse[R]
  56. R = TypeVar("R")
  57. class _DeploymentHandleBase(Generic[T]):
  58. def __init__(
  59. self,
  60. deployment_name: str,
  61. app_name: str,
  62. *,
  63. init_options: Optional[InitHandleOptionsBase] = None,
  64. handle_options: Optional[DynamicHandleOptionsBase] = None,
  65. _router: Optional[Router] = None,
  66. _create_router: Optional[CreateRouterCallable] = None,
  67. _request_counter: Optional[metrics.Counter] = None,
  68. _handle_id: Optional[str] = None,
  69. ):
  70. self.deployment_id = DeploymentID(name=deployment_name, app_name=app_name)
  71. self.init_options: Optional[InitHandleOptionsBase] = init_options
  72. self.handle_options: DynamicHandleOptionsBase = (
  73. handle_options or create_dynamic_handle_options()
  74. )
  75. # Handle ID is shared among handles that are returned by
  76. # `handle.options` or `handle.method`
  77. self.handle_id = _handle_id or get_random_string()
  78. self.request_counter = _request_counter or self._create_request_counter(
  79. app_name, deployment_name, self.handle_id
  80. )
  81. self._router: Optional[Router] = _router
  82. if _create_router is None:
  83. self._create_router = create_router
  84. else:
  85. self._create_router = _create_router
  86. @staticmethod
  87. def _gen_handle_tag(app_name: str, deployment_name: str, handle_id: str):
  88. if app_name:
  89. return f"{app_name}#{deployment_name}#{handle_id}"
  90. else:
  91. return f"{deployment_name}#{handle_id}"
  92. @classmethod
  93. def _create_request_counter(
  94. cls, app_name: str, deployment_name: str, handle_id: str
  95. ):
  96. return metrics.Counter(
  97. "serve_handle_request_counter",
  98. description=(
  99. "The number of handle.remote() calls that have been "
  100. "made on this handle."
  101. ),
  102. tag_keys=("handle", "deployment", "route", "application"),
  103. ).set_default_tags(
  104. {
  105. "handle": cls._gen_handle_tag(
  106. app_name, deployment_name, handle_id=handle_id
  107. ),
  108. "deployment": deployment_name,
  109. "application": app_name,
  110. }
  111. )
  112. def running_replicas_populated(self) -> bool:
  113. if self._router is None:
  114. return False
  115. return self._router.running_replicas_populated()
  116. @property
  117. def deployment_name(self) -> str:
  118. return self.deployment_id.name
  119. @property
  120. def app_name(self) -> str:
  121. return self.deployment_id.app_name
  122. @property
  123. def is_initialized(self) -> bool:
  124. return self._router is not None
  125. def _init(self, **kwargs):
  126. """Initialize this handle with arguments.
  127. A handle can only be initialized once. A handle is implicitly
  128. initialized when `.options()` or `.remote()` is called. Therefore
  129. to initialize a handle with custom init options, you must do it
  130. before calling `.options()` or `.remote()`.
  131. """
  132. if self._router is not None:
  133. raise RuntimeError(
  134. "Handle has already been initialized. Note that a handle is implicitly "
  135. "initialized when you call `.options()` or `.remote()`. You either "
  136. "tried to call `._init()` twice or called `._init()` after calling "
  137. "`.options()` or `.remote()`. If you want to modify the init options, "
  138. "please do so before calling `.options()` or `.remote()`. This handle "
  139. f"was initialized with {self.init_options}."
  140. )
  141. init_options = create_init_handle_options(**kwargs)
  142. self._router = self._create_router(
  143. handle_id=self.handle_id,
  144. deployment_id=self.deployment_id,
  145. handle_options=init_options,
  146. )
  147. self.init_options = init_options
  148. logger.info(
  149. f"Initialized DeploymentHandle {self.handle_id} for {self.deployment_id}.",
  150. extra={"log_to_stderr": False},
  151. )
  152. # Record handle api telemetry when not in the proxy
  153. if (
  154. self.init_options._source != DeploymentHandleSource.PROXY
  155. and self.__class__ == DeploymentHandle
  156. ):
  157. ServeUsageTag.DEPLOYMENT_HANDLE_API_USED.record("1")
  158. def _is_router_running_in_separate_loop(self) -> bool:
  159. if self.init_options is None:
  160. return False
  161. return self.init_options._run_router_in_separate_loop
  162. def _options(
  163. self, _prefer_local_routing=DEFAULT.VALUE, **kwargs
  164. ) -> "DeploymentHandle[T]":
  165. if kwargs.get("stream") is True and inside_ray_client_context():
  166. raise RuntimeError(
  167. "Streaming DeploymentHandles are not currently supported when "
  168. "connected to a remote Ray cluster using Ray Client."
  169. )
  170. new_handle_options = self.handle_options.copy_and_update(**kwargs)
  171. # TODO(zcin): remove when _prefer_local_routing is removed from options() path
  172. if _prefer_local_routing != DEFAULT.VALUE:
  173. self._init(_prefer_local_routing=_prefer_local_routing)
  174. if not self.is_initialized:
  175. self._init()
  176. return DeploymentHandle(
  177. self.deployment_name,
  178. self.app_name,
  179. init_options=self.init_options,
  180. handle_options=new_handle_options,
  181. _router=self._router,
  182. _create_router=self._create_router,
  183. _request_counter=self.request_counter,
  184. _handle_id=self.handle_id,
  185. )
  186. def _remote(
  187. self,
  188. args: Tuple[Any],
  189. kwargs: Dict[str, Any],
  190. ) -> Tuple[concurrent.futures.Future, RequestMetadata]:
  191. if not self.is_initialized:
  192. self._init()
  193. metadata = serve._private.default_impl.get_request_metadata(
  194. self.init_options, self.handle_options
  195. )
  196. self.request_counter.inc(
  197. tags={
  198. "route": metadata.route,
  199. "application": metadata.app_name,
  200. }
  201. )
  202. if self._router is None:
  203. raise RuntimeError("Router is not initialized")
  204. return self._router.assign_request(metadata, *args, **kwargs), metadata
  205. def options(
  206. self,
  207. *,
  208. method_name: Union[str, DEFAULT] = DEFAULT.VALUE,
  209. multiplexed_model_id: Union[str, DEFAULT] = DEFAULT.VALUE,
  210. stream: Union[bool, DEFAULT] = DEFAULT.VALUE,
  211. use_new_handle_api: Union[bool, DEFAULT] = DEFAULT.VALUE,
  212. _prefer_local_routing: Union[bool, DEFAULT] = DEFAULT.VALUE,
  213. ) -> "DeploymentHandle[T]":
  214. raise NotImplementedError
  215. def __getattr__(self, name: str) -> "DeploymentHandle[T]":
  216. return self.options(method_name=name)
  217. def shutdown(self):
  218. if self._router:
  219. shutdown_future = self._router.shutdown()
  220. if self._is_router_running_in_separate_loop():
  221. shutdown_future.result()
  222. else:
  223. logger.warning(
  224. "Synchronously shutting down a router that's running in the same "
  225. "event loop can only be done best effort. Please use "
  226. "`shutdown_async` instead."
  227. )
  228. async def shutdown_async(self):
  229. if self._router:
  230. shutdown_future: Union[
  231. asyncio.Future, concurrent.futures.Future
  232. ] = self._router.shutdown()
  233. if self._is_router_running_in_separate_loop:
  234. await asyncio.wrap_future(shutdown_future)
  235. else:
  236. await shutdown_future
  237. def __repr__(self) -> str:
  238. return f"{self.__class__.__name__}" f"(deployment='{self.deployment_name}')"
  239. @classmethod
  240. def _deserialize(cls, kwargs: Dict[str, Any]) -> "_DeploymentHandleBase[T]":
  241. """Required for this class's __reduce__ method to be picklable."""
  242. return cls(**kwargs)
  243. def __reduce__(self):
  244. serialized_constructor_args = {
  245. "deployment_name": self.deployment_name,
  246. "app_name": self.app_name,
  247. "handle_options": self.handle_options,
  248. }
  249. return self.__class__._deserialize, (serialized_constructor_args,)
  250. class _DeploymentResponseBase(Generic[R]):
  251. def __init__(
  252. self,
  253. replica_result_future: Union[
  254. concurrent.futures.Future[ReplicaResult], asyncio.Future[ReplicaResult]
  255. ],
  256. request_metadata: RequestMetadata,
  257. _is_router_running_in_separate_loop: bool = True,
  258. ):
  259. self._cancelled = False
  260. self._replica_result_future = replica_result_future
  261. self._replica_result: Optional[ReplicaResult] = None
  262. self._request_metadata: RequestMetadata = request_metadata
  263. self._is_router_running_in_separate_loop = _is_router_running_in_separate_loop
  264. @property
  265. def request_id(self) -> str:
  266. return self._request_metadata.request_id
  267. @property
  268. def by_reference(self) -> bool:
  269. return self._request_metadata._by_reference
  270. def _fetch_future_result_sync(
  271. self, _timeout_s: Optional[float] = None
  272. ) -> ReplicaResult:
  273. """Synchronously fetch the replica result.
  274. The result is cached in `self._replica_result`.
  275. """
  276. if self._replica_result is None:
  277. if not self._is_router_running_in_separate_loop:
  278. raise RuntimeError(
  279. "Sync methods should not be called from within an `asyncio` event "
  280. "loop. Use `await response` instead."
  281. )
  282. try:
  283. # When _is_router_running_in_separate_loop is True, the future
  284. # is a concurrent.futures.Future (not asyncio.Future)
  285. sync_future = cast(
  286. concurrent.futures.Future[ReplicaResult],
  287. self._replica_result_future,
  288. )
  289. self._replica_result = sync_future.result(timeout=_timeout_s)
  290. except concurrent.futures.TimeoutError:
  291. raise TimeoutError("Timed out resolving to ObjectRef.") from None
  292. except concurrent.futures.CancelledError:
  293. raise RequestCancelledError(self.request_id) from None
  294. return self._replica_result
  295. async def _fetch_future_result_async(self) -> ReplicaResult:
  296. """Asynchronously fetch replica result.
  297. The result is cached in `self._replica_result`..
  298. """
  299. if self._replica_result is None:
  300. if self._is_router_running_in_separate_loop:
  301. # Use `asyncio.wrap_future` so `self._replica_result_future` can be awaited
  302. # safely from any asyncio loop.
  303. # self._replica_result_future is a object of type concurrent.futures.Future
  304. self._replica_result = await asyncio.wrap_future(
  305. self._replica_result_future
  306. )
  307. else:
  308. # self._replica_result_future is a object of type asyncio.Future
  309. async_future = cast(
  310. asyncio.Future[ReplicaResult], self._replica_result_future
  311. )
  312. self._replica_result = await async_future
  313. return self._replica_result
  314. def cancel(self):
  315. """Attempt to cancel the `DeploymentHandle` call.
  316. This is best effort.
  317. - If the request hasn't been assigned to a replica, the assignment will be
  318. cancelled.
  319. - If the request has been assigned to a replica, `ray.cancel` will be
  320. called on the object ref, attempting to cancel the request and any downstream
  321. requests it makes.
  322. If the request is successfully cancelled, subsequent operations on the ref will
  323. raise an exception:
  324. - If the request was cancelled before assignment, they'll raise
  325. `asyncio.CancelledError` (or a `concurrent.futures.CancelledError` for
  326. synchronous methods like `.result()`.).
  327. - If the request was cancelled after assignment, they'll raise
  328. `ray.exceptions.TaskCancelledError`.
  329. """
  330. if self._cancelled:
  331. return
  332. self._cancelled = True
  333. self._replica_result_future.cancel()
  334. if not self._is_router_running_in_separate_loop:
  335. # Given that there is a event loop running, we can't call sync methods.
  336. # Hence optimistically cancel the replica result future and replica result.
  337. if self._replica_result:
  338. self._replica_result.cancel()
  339. return
  340. try:
  341. # try to fetch the results synchronously. if it succeeds,
  342. # we will explicitly cancel the replica result. if it fails,
  343. # the request is already cancelled and we can return early.
  344. self._fetch_future_result_sync()
  345. except RequestCancelledError:
  346. # request is already cancelled nothing to do here
  347. return
  348. self._replica_result.cancel()
  349. @DeveloperAPI
  350. def cancelled(self) -> bool:
  351. """Whether or not the request has been cancelled.
  352. This is `True` if `.cancel()` is called, but the request may actually have run
  353. to completion.
  354. """
  355. return self._cancelled
  356. @PublicAPI(stability="stable")
  357. class DeploymentResponse(_DeploymentResponseBase[R]):
  358. """A future-like object wrapping the result of a unary deployment handle call.
  359. From inside a deployment, a `DeploymentResponse` can be awaited to retrieve the
  360. output of the call without blocking the asyncio event loop.
  361. From outside a deployment, `.result()` can be used to retrieve the output in a
  362. blocking manner.
  363. Example:
  364. .. code-block:: python
  365. from ray import serve
  366. from ray.serve.handle import DeploymentHandle
  367. @serve.deployment
  368. class Downstream:
  369. def say_hi(self, message: str) -> str:
  370. return f"Hello {message}!"
  371. @serve.deployment
  372. class Caller:
  373. def __init__(self, handle: DeploymentHandle):
  374. self._downstream_handle = handle
  375. async def __call__(self, message: str) -> str:
  376. # Inside a deployment: `await` the result to enable concurrency.
  377. response = self._downstream_handle.say_hi.remote(message)
  378. return await response
  379. app = Caller.bind(Downstream.bind())
  380. handle: DeploymentHandle = serve.run(app)
  381. # Outside a deployment: call `.result()` to get output.
  382. response = handle.remote("world")
  383. assert response.result() == "Hello world!"
  384. A `DeploymentResponse` can be passed directly to another `DeploymentHandle` call
  385. without fetching the result to enable composing multiple deployments together.
  386. Example:
  387. .. code-block:: python
  388. from ray import serve
  389. from ray.serve.handle import DeploymentHandle
  390. @serve.deployment
  391. class Adder:
  392. def add(self, val: int) -> int:
  393. return val + 1
  394. @serve.deployment
  395. class Caller:
  396. def __init__(self, handle: DeploymentHandle):
  397. self._adder_handle = handle
  398. async def __call__(self, start: int) -> int:
  399. return await self._adder_handle.add.remote(
  400. # Pass the response directly to another handle call without awaiting.
  401. self._adder_handle.add.remote(start)
  402. )
  403. app = Caller.bind(Adder.bind())
  404. handle: DeploymentHandle = serve.run(app)
  405. assert handle.remote(0).result() == 2
  406. """
  407. def __await__(self) -> Generator[Any, None, R]:
  408. """Yields the final result of the deployment handle call."""
  409. try:
  410. replica_result = yield from self._fetch_future_result_async().__await__()
  411. result = yield from replica_result.get_async().__await__()
  412. return result
  413. except asyncio.CancelledError:
  414. if self._cancelled:
  415. raise RequestCancelledError(self.request_id) from None
  416. else:
  417. raise asyncio.CancelledError from None
  418. def __reduce__(self):
  419. raise RayServeException(
  420. "`DeploymentResponse` is not serializable. If you are passing the "
  421. "`DeploymentResponse` in a nested object (e.g. a list or dictionary) to a "
  422. "downstream deployment handle call, that is no longer supported. Please "
  423. "only pass `DeploymentResponse` objects as top level arguments."
  424. )
  425. def result(
  426. self,
  427. *,
  428. timeout_s: Optional[float] = None,
  429. _skip_asyncio_check: bool = False,
  430. ) -> R:
  431. """Fetch the result of the handle call synchronously.
  432. This should *not* be used from within a deployment as it runs in an asyncio
  433. event loop. For model composition, `await` the response instead.
  434. If `timeout_s` is provided and the result is not available before the timeout,
  435. a `TimeoutError` is raised.
  436. """
  437. if not _skip_asyncio_check and is_running_in_asyncio_loop():
  438. raise RuntimeError(
  439. "Sync methods should not be called from within an `asyncio` event "
  440. "loop. Use `await response` instead."
  441. )
  442. start_time_s = time.time()
  443. replica_result = self._fetch_future_result_sync(timeout_s)
  444. remaining_timeout_s = calculate_remaining_timeout(
  445. timeout_s=timeout_s, start_time_s=start_time_s, curr_time_s=time.time()
  446. )
  447. return replica_result.get(remaining_timeout_s)
  448. @DeveloperAPI
  449. async def _to_object_ref(self) -> ray.ObjectRef:
  450. """Advanced API to convert the response to a Ray `ObjectRef`.
  451. This is used to pass the output of a `DeploymentHandle` call to a Ray task or
  452. actor method call.
  453. This method is `async def` because it will block until the handle call has been
  454. assigned to a replica. If there are many requests in flight and all
  455. replicas' queues are full, this may be a slow operation.
  456. """
  457. ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1")
  458. if not self._request_metadata._by_reference:
  459. raise OBJ_REF_NOT_SUPPORTED_ERROR
  460. replica_result = await self._fetch_future_result_async()
  461. return await replica_result.to_object_ref_async()
  462. @DeveloperAPI
  463. def _to_object_ref_sync(
  464. self,
  465. _timeout_s: Optional[float] = None,
  466. _allow_running_in_asyncio_loop: bool = False,
  467. ) -> ray.ObjectRef:
  468. """Advanced API to convert the response to a Ray `ObjectRef`.
  469. This is used to pass the output of a `DeploymentHandle` call to a Ray task or
  470. actor method call.
  471. This method is a *blocking* call because it will block until the handle call has
  472. been assigned to a replica. If there are many requests in flight and all
  473. replicas' queues are full, this may be a slow operation.
  474. From inside a deployment, `_to_object_ref` should be used instead to avoid
  475. blocking the asyncio event loop.
  476. """
  477. ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1")
  478. if not self._request_metadata._by_reference:
  479. raise OBJ_REF_NOT_SUPPORTED_ERROR
  480. if not _allow_running_in_asyncio_loop and is_running_in_asyncio_loop():
  481. raise RuntimeError(
  482. "Sync methods should not be called from within an `asyncio` event "
  483. "loop. Use `await response._to_object_ref()` instead."
  484. )
  485. # First, fetch the result of the future
  486. start_time_s = time.time()
  487. replica_result = self._fetch_future_result_sync(_timeout_s)
  488. # Then, if necessary, resolve generator to ref
  489. remaining_timeout_s = calculate_remaining_timeout(
  490. timeout_s=_timeout_s,
  491. start_time_s=start_time_s,
  492. curr_time_s=time.time(),
  493. )
  494. return replica_result.to_object_ref(timeout_s=remaining_timeout_s)
  495. @PublicAPI(stability="stable")
  496. class DeploymentResponseGenerator(_DeploymentResponseBase[R]):
  497. """A future-like object wrapping the result of a streaming deployment handle call.
  498. This is returned when using `handle.options(stream=True)` and calling a generator
  499. deployment method.
  500. `DeploymentResponseGenerator` is both a synchronous and asynchronous iterator.
  501. When iterating over results from inside a deployment, `async for` should be used to
  502. avoid blocking the asyncio event loop.
  503. When iterating over results from outside a deployment, use a standard `for` loop.
  504. Example:
  505. .. code-block:: python
  506. from typing import AsyncGenerator, Generator
  507. from ray import serve
  508. from ray.serve.handle import DeploymentHandle
  509. @serve.deployment
  510. class Streamer:
  511. def generate_numbers(self, limit: int) -> Generator[int]:
  512. for i in range(limit):
  513. yield i
  514. @serve.deployment
  515. class Caller:
  516. def __init__(self, handle: DeploymentHandle):
  517. # Set `stream=True` on the handle to enable streaming calls.
  518. self._streaming_handle = handle.options(stream=True)
  519. async def __call__(self, limit: int) -> AsyncIterator[int]:
  520. gen: DeploymentResponseGenerator = (
  521. self._streaming_handle.generate_numbers.remote(limit)
  522. )
  523. # Inside a deployment: use `async for` to enable concurrency.
  524. async for i in gen:
  525. yield i
  526. app = Caller.bind(Streamer.bind())
  527. handle: DeploymentHandle = serve.run(app)
  528. # Outside a deployment: use a standard `for` loop.
  529. gen: DeploymentResponseGenerator = handle.options(stream=True).remote(10)
  530. assert [i for i in gen] == list(range(10))
  531. A `DeploymentResponseGenerator` *cannot* currently be passed to another
  532. `DeploymentHandle` call.
  533. """
  534. def __await__(self):
  535. raise TypeError(
  536. "`DeploymentResponseGenerator` cannot be awaited directly. Use `async for` "
  537. "or `await response.__anext__() instead`."
  538. )
  539. def __aiter__(self) -> AsyncIterator[R]:
  540. return self
  541. async def __anext__(self) -> R:
  542. try:
  543. replica_result = await self._fetch_future_result_async()
  544. return await replica_result.__anext__()
  545. except asyncio.CancelledError:
  546. if self._cancelled:
  547. raise RequestCancelledError(self.request_id) from None
  548. else:
  549. raise asyncio.CancelledError from None
  550. def __iter__(self) -> Iterator[R]:
  551. return self
  552. def __next__(self) -> R:
  553. if is_running_in_asyncio_loop():
  554. raise RuntimeError(
  555. "Sync methods should not be called from within an `asyncio` event "
  556. "loop. Use `async for` or `await response.__anext__()` instead."
  557. )
  558. replica_result = self._fetch_future_result_sync()
  559. return replica_result.__next__()
  560. def result(
  561. self,
  562. *,
  563. timeout_s: Optional[float] = None,
  564. _skip_asyncio_check: bool = False,
  565. ) -> Any:
  566. """Not supported on `DeploymentResponseGenerator`.
  567. This method exists only for API parity with `DeploymentResponse.result()` to
  568. aid static typing. A `DeploymentResponseGenerator` is returned when using
  569. streaming handles (e.g., `handle.options(stream=True)`).
  570. To consume results, iterate over the generator instead of calling `.result()`:
  571. - Outside a deployment: use a standard `for` loop
  572. - Inside a deployment: use `async for`
  573. Always raises `TypeError`.
  574. """
  575. raise TypeError(
  576. "`DeploymentResponseGenerator` doesn't support `.result()`. "
  577. "Use iteration instead: `for item in response` (outside deployments) "
  578. "or `async for item in response` (inside deployments)."
  579. )
  580. @DeveloperAPI
  581. async def _to_object_ref_gen(self) -> ObjectRefGenerator:
  582. """Advanced API to convert the generator to a Ray `ObjectRefGenerator`.
  583. This method is `async def` because it will block until the handle call has been
  584. assigned to a replica. If there are many requests in flight and all
  585. replicas' queues are full, this may be a slow operation.
  586. """
  587. ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1")
  588. if not self._request_metadata._by_reference:
  589. raise OBJ_REF_NOT_SUPPORTED_ERROR
  590. replica_result = await self._fetch_future_result_async()
  591. return replica_result.to_object_ref_gen()
  592. @DeveloperAPI
  593. def _to_object_ref_gen_sync(
  594. self,
  595. _timeout_s: Optional[float] = None,
  596. _allow_running_in_asyncio_loop: bool = False,
  597. ) -> ObjectRefGenerator:
  598. """Advanced API to convert the generator to a Ray `ObjectRefGenerator`.
  599. This method is a *blocking* call because it will block until the handle call has
  600. been assigned to a replica. If there are many requests in flight and all
  601. replicas' queues are full, this may be a slow operation.
  602. From inside a deployment, `_to_object_ref_gen` should be used instead to avoid
  603. blocking the asyncio event loop.
  604. """
  605. ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1")
  606. if not self._request_metadata._by_reference:
  607. raise OBJ_REF_NOT_SUPPORTED_ERROR
  608. if not _allow_running_in_asyncio_loop and is_running_in_asyncio_loop():
  609. raise RuntimeError(
  610. "Sync methods should not be called from within an `asyncio` event "
  611. "loop. Use `await response._to_object_ref()` instead."
  612. )
  613. replica_result = self._fetch_future_result_sync(_timeout_s)
  614. return replica_result.to_object_ref_gen()
  615. @PublicAPI(stability="stable")
  616. class DeploymentHandle(_DeploymentHandleBase[T]):
  617. """A handle used to make requests to a deployment at runtime.
  618. This is primarily used to compose multiple deployments within a single application.
  619. It can also be used to make calls to the ingress deployment of an application (e.g.,
  620. for programmatic testing).
  621. Example:
  622. .. code-block:: python
  623. import ray
  624. from ray import serve
  625. from ray.serve.handle import DeploymentHandle, DeploymentResponse
  626. @serve.deployment
  627. class Downstream:
  628. def say_hi(self, message: str):
  629. return f"Hello {message}!"
  630. self._message = message
  631. @serve.deployment
  632. class Ingress:
  633. def __init__(self, handle: DeploymentHandle):
  634. self._downstream_handle = handle
  635. async def __call__(self, name: str) -> str:
  636. response = self._downstream_handle.say_hi.remote(name)
  637. return await response
  638. app = Ingress.bind(Downstream.bind())
  639. handle: DeploymentHandle = serve.run(app)
  640. response = handle.remote("world")
  641. assert response.result() == "Hello world!"
  642. """
  643. def options(
  644. self,
  645. *,
  646. method_name: Union[str, DEFAULT] = DEFAULT.VALUE,
  647. multiplexed_model_id: Union[str, DEFAULT] = DEFAULT.VALUE,
  648. stream: Union[bool, DEFAULT] = DEFAULT.VALUE,
  649. use_new_handle_api: Union[bool, DEFAULT] = DEFAULT.VALUE,
  650. _prefer_local_routing: Union[bool, DEFAULT] = DEFAULT.VALUE,
  651. _by_reference: Union[bool, DEFAULT] = DEFAULT.VALUE,
  652. request_serialization: Union[str, DEFAULT] = DEFAULT.VALUE,
  653. response_serialization: Union[str, DEFAULT] = DEFAULT.VALUE,
  654. ) -> "DeploymentHandle[T]":
  655. """Set options for this handle and return an updated copy of it.
  656. Args:
  657. method_name: The method name to call on the deployment.
  658. multiplexed_model_id: The model ID to use for multiplexed model requests.
  659. stream: Whether to use streaming for the request.
  660. use_new_handle_api: Whether to use the new handle API.
  661. _prefer_local_routing: Whether to prefer local routing.
  662. _by_reference: Whether to use by reference.
  663. request_serialization: Serialization method for RPC requests.
  664. Available options: "cloudpickle", "pickle", "msgpack", "orjson".
  665. Defaults to "cloudpickle".
  666. response_serialization: Serialization method for RPC responses.
  667. Available options: "cloudpickle", "pickle", "msgpack", "orjson".
  668. Defaults to "cloudpickle".
  669. Example:
  670. .. code-block:: python
  671. response: DeploymentResponse = handle.options(
  672. method_name="other_method",
  673. multiplexed_model_id="model:v1",
  674. ).remote()
  675. """
  676. if use_new_handle_api is not DEFAULT.VALUE:
  677. warnings.warn(
  678. "Setting `use_new_handle_api` no longer has any effect. "
  679. "This argument will be removed in a future version."
  680. )
  681. if _prefer_local_routing is not DEFAULT.VALUE:
  682. warnings.warn(
  683. "Modifying `_prefer_local_routing` with `options()` is "
  684. "deprecated. Please use `init()` instead."
  685. )
  686. return self._options(
  687. method_name=method_name,
  688. multiplexed_model_id=multiplexed_model_id,
  689. stream=stream,
  690. _prefer_local_routing=_prefer_local_routing,
  691. _by_reference=_by_reference,
  692. request_serialization=request_serialization,
  693. response_serialization=response_serialization,
  694. )
  695. def remote(
  696. self, *args, **kwargs
  697. ) -> Union[DeploymentResponse[Any], DeploymentResponseGenerator[Any]]:
  698. """Issue a remote call to a method of the deployment.
  699. By default, the result is a `DeploymentResponse` that can be awaited to fetch
  700. the result of the call or passed to another `.remote()` call to compose multiple
  701. deployments.
  702. If `handle.options(stream=True)` is set and a generator method is called, this
  703. returns a `DeploymentResponseGenerator` instead.
  704. Example:
  705. .. code-block:: python
  706. # Fetch the result directly.
  707. response = handle.remote()
  708. result = await response
  709. # Pass the result to another handle call.
  710. composed_response = handle2.remote(handle1.remote())
  711. composed_result = await composed_response
  712. Args:
  713. *args: Positional arguments to be serialized and passed to the
  714. remote method call.
  715. **kwargs: Keyword arguments to be serialized and passed to the
  716. remote method call.
  717. """
  718. future, request_metadata = self._remote(args, kwargs)
  719. if self.handle_options.stream:
  720. return DeploymentResponseGenerator(
  721. future,
  722. request_metadata,
  723. _is_router_running_in_separate_loop=self._is_router_running_in_separate_loop(),
  724. )
  725. else:
  726. return DeploymentResponse(
  727. future,
  728. request_metadata,
  729. _is_router_running_in_separate_loop=self._is_router_running_in_separate_loop(),
  730. )