client.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. import asyncio
  2. import inspect
  3. import logging
  4. import random
  5. import time
  6. from collections.abc import Sequence
  7. from functools import wraps
  8. from typing import Callable, Dict, List, Optional, Tuple, Union
  9. import ray
  10. from ray.actor import ActorHandle
  11. from ray.serve._private.application_state import StatusOverview
  12. from ray.serve._private.build_app import BuiltApplication
  13. from ray.serve._private.common import (
  14. DeploymentID,
  15. DeploymentStatus,
  16. DeploymentStatusInfo,
  17. RequestRoutingInfo,
  18. )
  19. from ray.serve._private.constants import (
  20. CLIENT_CHECK_CREATION_POLLING_INTERVAL_S,
  21. CLIENT_POLLING_INTERVAL_S,
  22. HTTP_PROXY_TIMEOUT,
  23. MAX_CACHED_HANDLES,
  24. SERVE_DEFAULT_APP_NAME,
  25. SERVE_LOGGER_NAME,
  26. )
  27. from ray.serve._private.controller import ServeController
  28. from ray.serve._private.deploy_utils import get_deploy_args
  29. from ray.serve._private.deployment_info import DeploymentInfo
  30. from ray.serve._private.http_util import ASGIAppReplicaWrapper
  31. from ray.serve._private.utils import get_random_string
  32. from ray.serve.config import HTTPOptions
  33. from ray.serve.exceptions import RayServeException
  34. from ray.serve.generated.serve_pb2 import (
  35. ApplicationArgs,
  36. DeploymentArgs,
  37. DeploymentRoute,
  38. DeploymentStatusInfo as DeploymentStatusInfoProto,
  39. StatusOverview as StatusOverviewProto,
  40. )
  41. from ray.serve.handle import DeploymentHandle
  42. from ray.serve.schema import (
  43. ApplicationStatus,
  44. LoggingConfig,
  45. ServeApplicationSchema,
  46. ServeDeploySchema,
  47. )
  48. logger = logging.getLogger(SERVE_LOGGER_NAME)
  49. def _ensure_connected(f: Callable) -> Callable:
  50. @wraps(f)
  51. def check(self, *args, **kwargs):
  52. if self._shutdown:
  53. raise RayServeException("Client has already been shut down.")
  54. return f(self, *args, **kwargs)
  55. return check
  56. class ServeControllerClient:
  57. def __init__(
  58. self,
  59. controller: ActorHandle,
  60. ):
  61. self._controller: ServeController = controller
  62. self._shutdown = False
  63. self._http_config: HTTPOptions = ray.get(controller.get_http_config.remote())
  64. self._root_url = ray.get(controller.get_root_url.remote())
  65. # Each handle has the overhead of long poll client, therefore cached.
  66. self.handle_cache = dict()
  67. self._evicted_handle_keys = set()
  68. @property
  69. def root_url(self):
  70. return self._root_url
  71. @property
  72. def http_config(self):
  73. return self._http_config
  74. def __reduce__(self):
  75. raise RayServeException(("Ray Serve client cannot be serialized."))
  76. def shutdown_cached_handles(self):
  77. """Shuts down all cached handles.
  78. Remove the reference to the cached handles so that they can be
  79. garbage collected.
  80. """
  81. for cache_key in list(self.handle_cache):
  82. self.handle_cache[cache_key].shutdown()
  83. del self.handle_cache[cache_key]
  84. async def shutdown_cached_handles_async(self):
  85. """Shuts down all cached handles asynchronously.
  86. Remove the reference to the cached handles so that they can be
  87. garbage collected.
  88. """
  89. async def shutdown_task(cache_key):
  90. await self.handle_cache[cache_key].shutdown_async()
  91. del self.handle_cache[cache_key]
  92. await asyncio.gather(
  93. *[shutdown_task(cache_key) for cache_key in list(self.handle_cache)]
  94. )
  95. def shutdown(self, timeout_s: float = 30.0) -> None:
  96. """Completely shut down the connected Serve instance.
  97. Shuts down all processes and deletes all state associated with the
  98. instance.
  99. """
  100. self.shutdown_cached_handles()
  101. if ray.is_initialized() and not self._shutdown:
  102. try:
  103. ray.get(self._controller.graceful_shutdown.remote(), timeout=timeout_s)
  104. except ray.exceptions.RayActorError:
  105. # Controller has been shut down.
  106. pass
  107. except TimeoutError:
  108. logger.warning(
  109. f"Controller failed to shut down within {timeout_s}s. "
  110. "Check controller logs for more details."
  111. )
  112. self._shutdown = True
  113. async def shutdown_async(self, timeout_s: float = 30.0) -> None:
  114. """Completely shut down the connected Serve instance.
  115. Shuts down all processes and deletes all state associated with the
  116. instance.
  117. """
  118. await self.shutdown_cached_handles_async()
  119. if ray.is_initialized() and not self._shutdown:
  120. try:
  121. await asyncio.wait_for(
  122. self._controller.graceful_shutdown.remote(), timeout=timeout_s
  123. )
  124. except ray.exceptions.RayActorError:
  125. # Controller has been shut down.
  126. pass
  127. except TimeoutError:
  128. logger.warning(
  129. f"Controller failed to shut down within {timeout_s}s. "
  130. "Check controller logs for more details."
  131. )
  132. self._shutdown = True
  133. def _wait_for_deployment_healthy(self, name: str, timeout_s: int = -1):
  134. """Waits for the named deployment to enter "HEALTHY" status.
  135. Raises RuntimeError if the deployment enters the "UNHEALTHY" status
  136. instead.
  137. Raises TimeoutError if this doesn't happen before timeout_s.
  138. """
  139. start = time.time()
  140. while time.time() - start < timeout_s or timeout_s < 0:
  141. status_bytes = ray.get(self._controller.get_deployment_status.remote(name))
  142. if status_bytes is None:
  143. raise RuntimeError(
  144. f"Waiting for deployment {name} to be HEALTHY, "
  145. "but deployment doesn't exist."
  146. )
  147. status = DeploymentStatusInfo.from_proto(
  148. DeploymentStatusInfoProto.FromString(status_bytes)
  149. )
  150. if status.status == DeploymentStatus.HEALTHY:
  151. break
  152. elif status.status == DeploymentStatus.UNHEALTHY:
  153. raise RuntimeError(
  154. f"Deployment {name} is UNHEALTHY: " f"{status.message}"
  155. )
  156. else:
  157. # Guard against new unhandled statuses being added.
  158. assert status.status == DeploymentStatus.UPDATING
  159. logger.debug(
  160. f"Waiting for {name} to be healthy, current status: "
  161. f"{status.status}."
  162. )
  163. time.sleep(CLIENT_POLLING_INTERVAL_S)
  164. else:
  165. raise TimeoutError(
  166. f"Deployment {name} did not become HEALTHY after {timeout_s}s."
  167. )
  168. def _wait_for_deployment_deleted(
  169. self, name: str, app_name: str, timeout_s: int = 60
  170. ):
  171. """Waits for the named deployment to be shut down and deleted.
  172. Raises TimeoutError if this doesn't happen before timeout_s.
  173. """
  174. start = time.time()
  175. while time.time() - start < timeout_s:
  176. curr_status_bytes = ray.get(
  177. self._controller.get_deployment_status.remote(name)
  178. )
  179. if curr_status_bytes is None:
  180. break
  181. curr_status = DeploymentStatusInfo.from_proto(
  182. DeploymentStatusInfoProto.FromString(curr_status_bytes)
  183. )
  184. logger.debug(
  185. f"Waiting for {name} to be deleted, current status: {curr_status}."
  186. )
  187. time.sleep(CLIENT_POLLING_INTERVAL_S)
  188. else:
  189. raise TimeoutError(f"Deployment {name} wasn't deleted after {timeout_s}s.")
  190. def _wait_for_deployment_created(
  191. self, deployment_name: str, app_name: str, timeout_s: int = -1
  192. ):
  193. """Waits for the named deployment to be created.
  194. A deployment being created simply means that its been registered
  195. with the deployment state manager. The deployment state manager
  196. will then continue to reconcile the deployment towards its
  197. target state.
  198. Raises TimeoutError if this doesn't happen before timeout_s.
  199. """
  200. start = time.time()
  201. while time.time() - start < timeout_s or timeout_s < 0:
  202. status_bytes = ray.get(
  203. self._controller.get_deployment_status.remote(deployment_name, app_name)
  204. )
  205. if status_bytes is not None:
  206. break
  207. logger.debug(
  208. f"Waiting for deployment '{deployment_name}' in application "
  209. f"'{app_name}' to be created."
  210. )
  211. time.sleep(CLIENT_CHECK_CREATION_POLLING_INTERVAL_S)
  212. else:
  213. raise TimeoutError(
  214. f"Deployment '{deployment_name}' in application '{app_name}' "
  215. f"did not become HEALTHY after {timeout_s}s."
  216. )
  217. def _wait_for_application_running(self, name: str, timeout_s: int = -1):
  218. """Waits for the named application to enter "RUNNING" status.
  219. Raises:
  220. RuntimeError: if the application enters the "DEPLOY_FAILED" status instead.
  221. TimeoutError: if this doesn't happen before timeout_s.
  222. """
  223. start = time.time()
  224. while time.time() - start < timeout_s or timeout_s < 0:
  225. status_bytes = ray.get(self._controller.get_serve_status.remote(name))
  226. if status_bytes is None:
  227. raise RuntimeError(
  228. f"Waiting for application {name} to be RUNNING, "
  229. "but application doesn't exist."
  230. )
  231. status = StatusOverview.from_proto(
  232. StatusOverviewProto.FromString(status_bytes)
  233. )
  234. if status.app_status.status == ApplicationStatus.RUNNING:
  235. break
  236. elif status.app_status.status == ApplicationStatus.DEPLOY_FAILED:
  237. raise RuntimeError(
  238. f"Deploying application {name} failed: {status.app_status.message}"
  239. )
  240. logger.debug(
  241. f"Waiting for {name} to be RUNNING, current status: "
  242. f"{status.app_status.status}."
  243. )
  244. time.sleep(CLIENT_POLLING_INTERVAL_S)
  245. else:
  246. raise TimeoutError(
  247. f"Application {name} did not become RUNNING after {timeout_s}s."
  248. )
  249. @_ensure_connected
  250. def wait_for_proxies_serving(
  251. self, wait_for_applications_running: bool = True
  252. ) -> None:
  253. """Wait for the proxies to be ready to serve requests."""
  254. proxy_handles = ray.get(self._controller.get_proxies.remote())
  255. serving_refs = [
  256. handle.serving.remote(
  257. wait_for_applications_running=wait_for_applications_running
  258. )
  259. for handle in proxy_handles.values()
  260. ]
  261. done, pending = ray.wait(
  262. serving_refs,
  263. timeout=HTTP_PROXY_TIMEOUT,
  264. num_returns=len(serving_refs),
  265. )
  266. if len(pending) > 0:
  267. raise TimeoutError(f"Proxies not available after {HTTP_PROXY_TIMEOUT}s.")
  268. # Ensure the proxies are either serving or dead.
  269. for ref in done:
  270. try:
  271. ray.get(ref, timeout=1)
  272. except ray.exceptions.RayActorError:
  273. pass
  274. except Exception:
  275. raise TimeoutError(
  276. f"Proxies not available after {HTTP_PROXY_TIMEOUT}s."
  277. )
  278. @_ensure_connected
  279. def deploy_applications(
  280. self,
  281. built_apps: Sequence[BuiltApplication],
  282. *,
  283. wait_for_ingress_deployment_creation: bool = True,
  284. wait_for_applications_running: bool = True,
  285. ) -> List[DeploymentHandle]:
  286. name_to_deployment_args_list = {}
  287. name_to_application_args = {}
  288. for app in built_apps:
  289. deployment_args_list = []
  290. for deployment in app.deployments:
  291. if deployment.logging_config is None and app.logging_config:
  292. deployment = deployment.options(logging_config=app.logging_config)
  293. is_ingress = deployment.name == app.ingress_deployment_name
  294. deployment_args = get_deploy_args(
  295. deployment.name,
  296. ingress=is_ingress,
  297. replica_config=deployment._replica_config,
  298. deployment_config=deployment._deployment_config,
  299. version=deployment._version or get_random_string(),
  300. route_prefix=app.route_prefix if is_ingress else None,
  301. )
  302. deployment_args_proto = DeploymentArgs()
  303. deployment_args_proto.deployment_name = deployment_args[
  304. "deployment_name"
  305. ]
  306. deployment_args_proto.deployment_config = deployment_args[
  307. "deployment_config_proto_bytes"
  308. ]
  309. deployment_args_proto.replica_config = deployment_args[
  310. "replica_config_proto_bytes"
  311. ]
  312. deployment_args_proto.deployer_job_id = deployment_args[
  313. "deployer_job_id"
  314. ]
  315. if deployment_args["route_prefix"]:
  316. deployment_args_proto.route_prefix = deployment_args["route_prefix"]
  317. deployment_args_proto.ingress = deployment_args["ingress"]
  318. deployment_args_list.append(deployment_args_proto.SerializeToString())
  319. application_args_proto = ApplicationArgs()
  320. application_args_proto.external_scaler_enabled = app.external_scaler_enabled
  321. name_to_deployment_args_list[app.name] = deployment_args_list
  322. name_to_application_args[
  323. app.name
  324. ] = application_args_proto.SerializeToString()
  325. # Validate applications before sending to controller
  326. self._check_ingress_deployments(built_apps)
  327. ray.get(
  328. self._controller.deploy_applications.remote(
  329. name_to_deployment_args_list, name_to_application_args
  330. )
  331. )
  332. handles = []
  333. for app in built_apps:
  334. # The deployment state is not guaranteed to be created after
  335. # deploy_application returns; the application state manager will
  336. # need another reconcile iteration to create it.
  337. if wait_for_ingress_deployment_creation:
  338. self._wait_for_deployment_created(app.ingress_deployment_name, app.name)
  339. if wait_for_applications_running:
  340. self._wait_for_application_running(app.name)
  341. if app.route_prefix is not None:
  342. url_part = " at " + self._root_url + app.route_prefix
  343. else:
  344. url_part = ""
  345. logger.info(f"Application '{app.name}' is ready{url_part}.")
  346. handles.append(
  347. self.get_handle(
  348. app.ingress_deployment_name, app.name, check_exists=False
  349. )
  350. )
  351. return handles
  352. @_ensure_connected
  353. def deploy_apps(
  354. self,
  355. config: Union[ServeApplicationSchema, ServeDeploySchema],
  356. _blocking: bool = False,
  357. ) -> None:
  358. """Starts a task on the controller that deploys application(s) from a config.
  359. Args:
  360. config: A single-application config (ServeApplicationSchema) or a
  361. multi-application config (ServeDeploySchema)
  362. _blocking: Whether to block until the application is running.
  363. Raises:
  364. RayTaskError: If the deploy task on the controller fails. This can be
  365. because a single-app config was deployed after deploying a multi-app
  366. config, or vice versa.
  367. """
  368. ray.get(self._controller.apply_config.remote(config))
  369. if _blocking:
  370. timeout_s = 60
  371. if isinstance(config, ServeDeploySchema):
  372. app_names = {app.name for app in config.applications}
  373. else:
  374. app_names = {config.name}
  375. start = time.time()
  376. while time.time() - start < timeout_s:
  377. statuses = self.list_serve_statuses()
  378. app_to_status = {
  379. status.name: status.app_status.status
  380. for status in statuses
  381. if status.name in app_names
  382. }
  383. if len(app_names) == len(app_to_status) and set(
  384. app_to_status.values()
  385. ) == {ApplicationStatus.RUNNING}:
  386. break
  387. time.sleep(CLIENT_POLLING_INTERVAL_S)
  388. else:
  389. raise TimeoutError(
  390. f"Serve application isn't running after {timeout_s}s."
  391. )
  392. self.wait_for_proxies_serving(wait_for_applications_running=True)
  393. def _check_ingress_deployments(
  394. self, built_apps: Sequence[BuiltApplication]
  395. ) -> None:
  396. """Check @serve.ingress of deployments across applications.
  397. Raises: RayServeException if more than one @serve.ingress
  398. is found among deployments in any single application.
  399. """
  400. for app in built_apps:
  401. num_ingress_deployments = 0
  402. for deployment in app.deployments:
  403. if inspect.isclass(deployment.func_or_class) and issubclass(
  404. deployment.func_or_class, ASGIAppReplicaWrapper
  405. ):
  406. num_ingress_deployments += 1
  407. if num_ingress_deployments > 1:
  408. raise RayServeException(
  409. f'Found multiple FastAPI deployments in application "{app.name}".'
  410. "Please only include one deployment with @serve.ingress "
  411. "in your application to avoid this issue."
  412. )
  413. @_ensure_connected
  414. def delete_apps(self, names: List[str], blocking: bool = True):
  415. if not names:
  416. return
  417. logger.info(f"Deleting app {names}")
  418. self._controller.delete_apps.remote(names)
  419. if blocking:
  420. start = time.time()
  421. while time.time() - start < 60:
  422. curr_statuses_bytes = ray.get(
  423. self._controller.get_serve_statuses.remote(names)
  424. )
  425. all_deleted = True
  426. for cur_status_bytes in curr_statuses_bytes:
  427. cur_status = StatusOverview.from_proto(
  428. StatusOverviewProto.FromString(cur_status_bytes)
  429. )
  430. if cur_status.app_status.status != ApplicationStatus.NOT_STARTED:
  431. all_deleted = False
  432. if all_deleted:
  433. return
  434. time.sleep(CLIENT_POLLING_INTERVAL_S)
  435. else:
  436. raise TimeoutError(
  437. f"Some of these applications weren't deleted after 60s: {names}"
  438. )
  439. @_ensure_connected
  440. def delete_all_apps(self, blocking: bool = True):
  441. """Delete all applications"""
  442. all_apps = []
  443. for status_bytes in ray.get(self._controller.list_serve_statuses.remote()):
  444. proto = StatusOverviewProto.FromString(status_bytes)
  445. status = StatusOverview.from_proto(proto)
  446. all_apps.append(status.name)
  447. self.delete_apps(all_apps, blocking)
  448. @_ensure_connected
  449. def get_deployment_info(
  450. self, name: str, app_name: str
  451. ) -> Tuple[DeploymentInfo, str]:
  452. deployment_route = DeploymentRoute.FromString(
  453. ray.get(self._controller.get_deployment_info.remote(name, app_name))
  454. )
  455. return (
  456. DeploymentInfo.from_proto(deployment_route.deployment_info),
  457. deployment_route.route if deployment_route.route != "" else None,
  458. )
  459. @_ensure_connected
  460. def get_serve_status(self, name: str = SERVE_DEFAULT_APP_NAME) -> StatusOverview:
  461. proto = StatusOverviewProto.FromString(
  462. ray.get(self._controller.get_serve_status.remote(name))
  463. )
  464. return StatusOverview.from_proto(proto)
  465. @_ensure_connected
  466. def list_serve_statuses(self) -> List[StatusOverview]:
  467. statuses_bytes = ray.get(self._controller.list_serve_statuses.remote())
  468. return [
  469. StatusOverview.from_proto(StatusOverviewProto.FromString(status_bytes))
  470. for status_bytes in statuses_bytes
  471. ]
  472. @_ensure_connected
  473. def get_all_deployment_statuses(self) -> List[DeploymentStatusInfo]:
  474. statuses_bytes = ray.get(self._controller.get_all_deployment_statuses.remote())
  475. return [
  476. DeploymentStatusInfo.from_proto(
  477. DeploymentStatusInfoProto.FromString(status_bytes)
  478. )
  479. for status_bytes in statuses_bytes
  480. ]
  481. @_ensure_connected
  482. def get_serve_details(self) -> Dict:
  483. return ray.get(self._controller.get_serve_instance_details.remote())
  484. @_ensure_connected
  485. def get_handle(
  486. self,
  487. deployment_name: str,
  488. app_name: Optional[str] = SERVE_DEFAULT_APP_NAME,
  489. check_exists: bool = True,
  490. ) -> DeploymentHandle:
  491. """Construct a handle for the specified deployment.
  492. Args:
  493. deployment_name: Deployment name.
  494. app_name: Application name.
  495. check_exists: If False, then Serve won't check the deployment
  496. is registered. True by default.
  497. Returns:
  498. DeploymentHandle
  499. """
  500. deployment_id = DeploymentID(name=deployment_name, app_name=app_name)
  501. cache_key = (deployment_name, app_name, check_exists)
  502. if cache_key in self.handle_cache:
  503. return self.handle_cache[cache_key]
  504. if check_exists:
  505. all_deployments = ray.get(self._controller.list_deployment_ids.remote())
  506. if deployment_id not in all_deployments:
  507. raise KeyError(f"{deployment_id} does not exist.")
  508. handle = DeploymentHandle(deployment_name, app_name)
  509. self.handle_cache[cache_key] = handle
  510. if cache_key in self._evicted_handle_keys:
  511. logger.warning(
  512. "You just got a ServeHandle that was evicted from internal "
  513. "cache. This means you are getting too many ServeHandles in "
  514. "the same process, this will bring down Serve's performance. "
  515. "Please post a github issue at "
  516. "https://github.com/ray-project/ray/issues to let the Serve "
  517. "team to find workaround for your use case."
  518. )
  519. if len(self.handle_cache) > MAX_CACHED_HANDLES:
  520. # Perform random eviction to keep the handle cache from growing
  521. # infinitely. We used use WeakValueDictionary but hit
  522. # https://github.com/ray-project/ray/issues/18980.
  523. evict_key = random.choice(list(self.handle_cache.keys()))
  524. self._evicted_handle_keys.add(evict_key)
  525. self.handle_cache.pop(evict_key)
  526. return handle
  527. @_ensure_connected
  528. def record_request_routing_info(self, info: RequestRoutingInfo):
  529. """Record replica routing information for a replica.
  530. Args:
  531. info: RequestRoutingInfo including deployment name, replica tag,
  532. multiplex model ids, and routing stats.
  533. """
  534. self._controller.record_request_routing_info.remote(info)
  535. @_ensure_connected
  536. def update_global_logging_config(self, logging_config: LoggingConfig):
  537. """Reconfigure the logging config for the controller & proxies."""
  538. self._controller.reconfigure_global_logging_config.remote(logging_config)