router.py 50 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190
  1. import asyncio
  2. import concurrent.futures
  3. import logging
  4. import threading
  5. import time
  6. import weakref
  7. from abc import ABC, abstractmethod
  8. from asyncio import AbstractEventLoop, ensure_future, futures
  9. from collections import defaultdict
  10. from collections.abc import MutableMapping
  11. from contextlib import contextmanager
  12. from functools import lru_cache, partial
  13. from typing import (
  14. Any,
  15. Callable,
  16. Coroutine,
  17. DefaultDict,
  18. Dict,
  19. List,
  20. Optional,
  21. Union,
  22. )
  23. import ray
  24. from ray.actor import ActorHandle
  25. from ray.exceptions import ActorDiedError, ActorUnavailableError, RayError
  26. from ray.serve._private.common import (
  27. RUNNING_REQUESTS_KEY,
  28. DeploymentHandleSource,
  29. DeploymentID,
  30. DeploymentTargetInfo,
  31. HandleMetricReport,
  32. ReplicaID,
  33. RequestMetadata,
  34. RunningReplicaInfo,
  35. )
  36. from ray.serve._private.config import DeploymentConfig
  37. from ray.serve._private.constants import (
  38. RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
  39. RAY_SERVE_HANDLE_AUTOSCALING_METRIC_RECORD_INTERVAL_S,
  40. RAY_SERVE_METRICS_EXPORT_INTERVAL_MS,
  41. RAY_SERVE_PROXY_PREFER_LOCAL_AZ_ROUTING,
  42. SERVE_LOGGER_NAME,
  43. )
  44. from ray.serve._private.event_loop_monitoring import EventLoopMonitor
  45. from ray.serve._private.long_poll import LongPollClient, LongPollNamespace
  46. from ray.serve._private.metrics_utils import (
  47. QUEUED_REQUESTS_KEY,
  48. InMemoryMetricsStore,
  49. MetricsPusher,
  50. TimeStampedValue,
  51. )
  52. from ray.serve._private.replica_result import ReplicaResult
  53. from ray.serve._private.request_router import PendingRequest, RequestRouter
  54. from ray.serve._private.request_router.pow_2_router import (
  55. PowerOfTwoChoicesRequestRouter,
  56. )
  57. from ray.serve._private.request_router.replica_wrapper import RunningReplica
  58. from ray.serve._private.usage import ServeUsageTag
  59. from ray.serve._private.utils import (
  60. generate_request_id,
  61. resolve_deployment_response,
  62. )
  63. from ray.serve.config import AutoscalingConfig
  64. from ray.serve.exceptions import BackPressureError, DeploymentUnavailableError
  65. from ray.util import metrics
  66. logger = logging.getLogger(SERVE_LOGGER_NAME)
  67. class RouterMetricsManager:
  68. """Manages metrics for the router."""
  69. PUSH_METRICS_TO_CONTROLLER_TASK_NAME = "push_metrics_to_controller"
  70. RECORD_METRICS_TASK_NAME = "record_metrics"
  71. def __init__(
  72. self,
  73. deployment_id: DeploymentID,
  74. handle_id: str,
  75. self_actor_id: str,
  76. handle_source: DeploymentHandleSource,
  77. controller_handle: ActorHandle,
  78. router_requests_counter: metrics.Counter,
  79. queued_requests_gauge: metrics.Gauge,
  80. running_requests_gauge: metrics.Gauge,
  81. event_loop: asyncio.BaseEventLoop,
  82. ):
  83. self._handle_id = handle_id
  84. self._deployment_id = deployment_id
  85. self._self_actor_id = self_actor_id
  86. self._handle_source = handle_source
  87. self._controller_handle = controller_handle
  88. # Exported metrics
  89. self.num_router_requests = router_requests_counter
  90. self.num_router_requests.set_default_tags(
  91. {
  92. "deployment": deployment_id.name,
  93. "application": deployment_id.app_name,
  94. "handle": self._handle_id,
  95. "actor_id": self._self_actor_id,
  96. }
  97. )
  98. self.num_queued_requests = 0
  99. self.num_queued_requests_gauge = queued_requests_gauge
  100. self.num_queued_requests_gauge.set_default_tags(
  101. {
  102. "deployment": deployment_id.name,
  103. "application": deployment_id.app_name,
  104. "handle": self._handle_id,
  105. "actor_id": self._self_actor_id,
  106. }
  107. )
  108. self.num_queued_requests_gauge.set(0)
  109. # Track queries sent to replicas for the autoscaling algorithm.
  110. self.num_requests_sent_to_replicas: DefaultDict[ReplicaID, int] = defaultdict(
  111. int
  112. )
  113. self.num_running_requests_gauge = running_requests_gauge
  114. self.num_running_requests_gauge.set_default_tags(
  115. {
  116. "deployment": deployment_id.name,
  117. "application": deployment_id.app_name,
  118. "handle": self._handle_id,
  119. "actor_id": self._self_actor_id,
  120. }
  121. )
  122. # We use Ray object ref callbacks to update state when tracking
  123. # number of requests running on replicas. The callbacks will be
  124. # called from a C++ thread into the router's async event loop,
  125. # so non-atomic read and write operations need to be guarded by
  126. # this thread-safe lock.
  127. self._queries_lock = threading.Lock()
  128. # Regularly aggregate and push autoscaling metrics to controller
  129. self.metrics_pusher = MetricsPusher()
  130. self.metrics_store = InMemoryMetricsStore()
  131. # The config for the deployment this router sends requests to will be broadcast
  132. # by the controller. That means it is not available until we get the first
  133. # update. This includes an optional autoscaling config.
  134. self._deployment_config: Optional[DeploymentConfig] = None
  135. # Track whether the metrics manager has been shutdown
  136. self._shutdown: bool = False
  137. # If the interval is set to 0, eagerly sets all metrics.
  138. self._cached_metrics_enabled = RAY_SERVE_METRICS_EXPORT_INTERVAL_MS != 0
  139. self._cached_metrics_interval_s = RAY_SERVE_METRICS_EXPORT_INTERVAL_MS / 1000
  140. if self._cached_metrics_enabled:
  141. self._cached_num_router_requests = defaultdict(int)
  142. def create_metrics_task():
  143. event_loop.create_task(self._report_cached_metrics_forever())
  144. # the constructor is called in the user thread, but its trying to create a task on the event loop
  145. # which is running in the router thread. This is not thread safe, so we need to use call_soon_threadsafe
  146. # to create the task on the event loop thread safely.
  147. event_loop.call_soon_threadsafe(create_metrics_task)
  148. @contextmanager
  149. def wrap_request_assignment(self, request_meta: RequestMetadata):
  150. max_queued_requests = (
  151. self._deployment_config.max_queued_requests
  152. if self._deployment_config is not None
  153. else -1
  154. )
  155. if (
  156. max_queued_requests != -1
  157. and self.num_queued_requests >= max_queued_requests
  158. ):
  159. # Due to the async nature of request handling, we may reject more requests
  160. # than strictly necessary. This is more likely to happen during
  161. # high concurrency. Here's why:
  162. #
  163. # When multiple requests arrive simultaneously with max_queued_requests=1:
  164. # 1. First request increments num_queued_requests to 1
  165. # 2. Before that request gets assigned to a replica and decrements the counter,
  166. # we yield to the event loop
  167. # 3. Other requests see num_queued_requests=1 and get rejected, even though
  168. # the first request will soon free up the queue slot
  169. #
  170. # For example, with max_queued_requests=1 and 4 simultaneous requests:
  171. # - Request 1 gets queued (num_queued_requests=1)
  172. # - Requests 2,3,4 get rejected since queue appears full
  173. # - Request 1 gets assigned and frees queue slot (num_queued_requests=0)
  174. # - But we already rejected Request 2 which could have been queued
  175. e = BackPressureError(
  176. num_queued_requests=self.num_queued_requests,
  177. max_queued_requests=max_queued_requests,
  178. )
  179. logger.warning(e.message)
  180. raise e
  181. self.inc_num_total_requests(request_meta.route)
  182. yield
  183. @contextmanager
  184. def wrap_queued_request(self, is_retry: bool, num_curr_replicas: int):
  185. """Increment queued requests gauge and maybe push autoscaling metrics to controller."""
  186. try:
  187. self.inc_num_queued_requests()
  188. # Optimization: if there are currently zero replicas for a deployment,
  189. # push handle metric to controller to allow for fast cold start time.
  190. # Only do this on the first attempt to route the request.
  191. if not is_retry and self.should_send_scaled_to_zero_optimized_push(
  192. curr_num_replicas=num_curr_replicas
  193. ):
  194. self.push_autoscaling_metrics_to_controller()
  195. yield
  196. finally:
  197. # If the request is disconnected before assignment, this coroutine
  198. # gets cancelled by the caller and an asyncio.CancelledError is
  199. # raised. The finally block ensures that num_queued_requests
  200. # is correctly decremented in this case.
  201. self.dec_num_queued_requests()
  202. def _update_running_replicas(self, running_replicas: List[RunningReplicaInfo]):
  203. """Prune list of replica ids in self.num_queries_sent_to_replicas.
  204. We want to avoid self.num_queries_sent_to_replicas from growing
  205. in memory as the deployment upscales and downscales over time.
  206. """
  207. running_replica_set = {replica.replica_id for replica in running_replicas}
  208. with self._queries_lock:
  209. self.num_requests_sent_to_replicas = defaultdict(
  210. int,
  211. {
  212. id: self.num_requests_sent_to_replicas[id]
  213. for id, num_queries in self.num_requests_sent_to_replicas.items()
  214. if num_queries or id in running_replica_set
  215. },
  216. )
  217. @property
  218. def autoscaling_config(self) -> Optional[AutoscalingConfig]:
  219. if self._deployment_config is None:
  220. return None
  221. return self._deployment_config.autoscaling_config
  222. def update_deployment_config(
  223. self, deployment_config: DeploymentConfig, curr_num_replicas: int
  224. ):
  225. """Update the config for the deployment this router sends requests to."""
  226. if self._shutdown:
  227. return
  228. self._deployment_config = deployment_config
  229. # Start the metrics pusher if autoscaling is enabled.
  230. autoscaling_config = self.autoscaling_config
  231. if autoscaling_config:
  232. self.metrics_pusher.start()
  233. # Optimization for autoscaling cold start time. If there are
  234. # currently 0 replicas for the deployment, and there is at
  235. # least one queued request on this router, then immediately
  236. # push handle metric to the controller.
  237. if self.should_send_scaled_to_zero_optimized_push(curr_num_replicas):
  238. self.push_autoscaling_metrics_to_controller()
  239. # Record number of queued + ongoing requests at regular
  240. # intervals into the in-memory metrics store
  241. self.metrics_pusher.register_or_update_task(
  242. self.RECORD_METRICS_TASK_NAME,
  243. self._add_autoscaling_metrics_point,
  244. min(
  245. RAY_SERVE_HANDLE_AUTOSCALING_METRIC_RECORD_INTERVAL_S,
  246. autoscaling_config.metrics_interval_s,
  247. ),
  248. )
  249. # Push metrics to the controller periodically.
  250. self.metrics_pusher.register_or_update_task(
  251. self.PUSH_METRICS_TO_CONTROLLER_TASK_NAME,
  252. self.push_autoscaling_metrics_to_controller,
  253. autoscaling_config.metrics_interval_s,
  254. )
  255. else:
  256. if self.metrics_pusher:
  257. self.metrics_pusher.stop_tasks()
  258. def _report_cached_metrics(self):
  259. for route, count in self._cached_num_router_requests.items():
  260. self.num_router_requests.inc(count, tags={"route": route})
  261. self._cached_num_router_requests.clear()
  262. self.num_queued_requests_gauge.set(self.num_queued_requests)
  263. self.num_running_requests_gauge.set(
  264. sum(self.num_requests_sent_to_replicas.values())
  265. )
  266. async def _report_cached_metrics_forever(self):
  267. assert self._cached_metrics_interval_s > 0
  268. consecutive_errors = 0
  269. while True:
  270. try:
  271. await asyncio.sleep(self._cached_metrics_interval_s)
  272. self._report_cached_metrics()
  273. consecutive_errors = 0
  274. except Exception:
  275. logger.exception("Unexpected error reporting metrics.")
  276. # Exponential backoff starting at 1s and capping at 10s.
  277. backoff_time_s = min(10, 2**consecutive_errors)
  278. consecutive_errors += 1
  279. await asyncio.sleep(backoff_time_s)
  280. def inc_num_total_requests(self, route: str):
  281. if self._cached_metrics_enabled:
  282. self._cached_num_router_requests[route] += 1
  283. else:
  284. self.num_router_requests.inc(tags={"route": route})
  285. def inc_num_queued_requests(self):
  286. self.num_queued_requests += 1
  287. if not self._cached_metrics_enabled:
  288. self.num_queued_requests_gauge.set(self.num_queued_requests)
  289. def dec_num_queued_requests(self):
  290. self.num_queued_requests -= 1
  291. if not self._cached_metrics_enabled:
  292. self.num_queued_requests_gauge.set(self.num_queued_requests)
  293. def inc_num_running_requests_for_replica(self, replica_id: ReplicaID):
  294. with self._queries_lock:
  295. self.num_requests_sent_to_replicas[replica_id] += 1
  296. if not self._cached_metrics_enabled:
  297. self.num_running_requests_gauge.set(
  298. sum(self.num_requests_sent_to_replicas.values())
  299. )
  300. def dec_num_running_requests_for_replica(self, replica_id: ReplicaID):
  301. with self._queries_lock:
  302. self.num_requests_sent_to_replicas[replica_id] -= 1
  303. if not self._cached_metrics_enabled:
  304. self.num_running_requests_gauge.set(
  305. sum(self.num_requests_sent_to_replicas.values())
  306. )
  307. def should_send_scaled_to_zero_optimized_push(self, curr_num_replicas: int) -> bool:
  308. return (
  309. self.autoscaling_config is not None
  310. and curr_num_replicas == 0
  311. and self.num_queued_requests > 0
  312. )
  313. def push_autoscaling_metrics_to_controller(self):
  314. """Pushes queued and running request metrics to the controller.
  315. These metrics are used by the controller for autoscaling.
  316. """
  317. self._controller_handle.record_autoscaling_metrics_from_handle.remote(
  318. self._get_metrics_report()
  319. )
  320. def _add_autoscaling_metrics_point(self):
  321. """Adds metrics point for queued and running requests at replicas.
  322. Also prunes keys in the in memory metrics store with outdated datapoints.
  323. ┌─────────────────────────────────────────────────────────────────┐
  324. │ Handle-based metrics collection │
  325. ├─────────────────────────────────────────────────────────────────┤
  326. │ │
  327. │ Client Handle Replicas │
  328. │ ┌──────┐ ┌────────┐ ┌─────────┐ │
  329. │ │ App │───────────>│ Handle │─────────>│ Replica │ │
  330. │ │ │ Requests │ │ Forwards │ 1 │ │
  331. │ └──────┘ │ Tracks │ └─────────┘ │
  332. │ │ Queued │ │
  333. │ │ + │ ┌─────────┐ │
  334. │ │Running │─────────>│ Replica │ │
  335. │ │Requests│ Forwards │ 2 │ │
  336. │ └────────┘ └─────────┘ │
  337. │ │ │
  338. │ │ Push metrics │
  339. │ └─────────────────> Controller │
  340. │ │
  341. └─────────────────────────────────────────────────────────────────┘
  342. :::{note}
  343. The long-term plan is to deprecate handle-based metrics collection in favor of
  344. replica-based collection. Replica-based collection will become the default in a
  345. future release. Queued requests will be continues to be tracked at the handle.
  346. :::
  347. """
  348. timestamp = time.time()
  349. self.metrics_store.add_metrics_point(
  350. {QUEUED_REQUESTS_KEY: self.num_queued_requests}, timestamp
  351. )
  352. if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
  353. self.metrics_store.add_metrics_point(
  354. self.num_requests_sent_to_replicas, timestamp
  355. )
  356. # Prevent in memory metrics store memory from growing
  357. start_timestamp = time.time() - self.autoscaling_config.look_back_period_s
  358. self.metrics_store.prune_keys_and_compact_data(start_timestamp)
  359. def _get_metrics_report(self) -> HandleMetricReport:
  360. timestamp = time.time()
  361. running_requests = dict()
  362. avg_running_requests = dict()
  363. look_back_period = self.autoscaling_config.look_back_period_s
  364. self.metrics_store.prune_keys_and_compact_data(time.time() - look_back_period)
  365. avg_queued_requests = self.metrics_store.aggregate_avg([QUEUED_REQUESTS_KEY])[0]
  366. if avg_queued_requests is None:
  367. # If the queued requests timeseries is empty, we set the
  368. # average to the current number of queued requests.
  369. avg_queued_requests = self.num_queued_requests
  370. # If the queued requests timeseries is empty, we set the number of data points to 1.
  371. # This is to avoid division by zero.
  372. num_data_points = self.metrics_store.timeseries_count(QUEUED_REQUESTS_KEY) or 1
  373. queued_requests = self.metrics_store.data.get(
  374. QUEUED_REQUESTS_KEY, [TimeStampedValue(timestamp, self.num_queued_requests)]
  375. )
  376. if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE and self.autoscaling_config:
  377. for replica_id, num_requests in self.num_requests_sent_to_replicas.items():
  378. # Calculate avg running requests.
  379. # NOTE (abrar): The number of data points from queued requests is often higher than
  380. # those from running requests. This is because replica metrics are only collected
  381. # once a replica is up, whereas queued request metrics are collected continuously
  382. # as long as the handle is alive. To approximate the true average of ongoing requests,
  383. # we should normalize by using the same number of data points for both queued and
  384. # running request time series.
  385. running_requests_sum = self.metrics_store.aggregate_sum([replica_id])[0]
  386. if running_requests_sum is None:
  387. # If the running requests timeseries is empty, we set the sum
  388. # to the current number of requests.
  389. running_requests_sum = num_requests
  390. avg_running_requests[replica_id] = (
  391. running_requests_sum / num_data_points
  392. )
  393. # Get running requests data
  394. running_requests[replica_id] = self.metrics_store.data.get(
  395. replica_id, [TimeStampedValue(timestamp, num_requests)]
  396. )
  397. handle_metric_report = HandleMetricReport(
  398. deployment_id=self._deployment_id,
  399. handle_id=self._handle_id,
  400. actor_id=self._self_actor_id,
  401. handle_source=self._handle_source,
  402. aggregated_queued_requests=avg_queued_requests,
  403. queued_requests=queued_requests,
  404. aggregated_metrics={
  405. RUNNING_REQUESTS_KEY: avg_running_requests,
  406. },
  407. metrics={
  408. RUNNING_REQUESTS_KEY: running_requests,
  409. },
  410. timestamp=timestamp,
  411. )
  412. return handle_metric_report
  413. async def shutdown(self):
  414. """Shutdown metrics manager gracefully."""
  415. if self.metrics_pusher:
  416. await self.metrics_pusher.graceful_shutdown()
  417. self._shutdown = True
  418. class Router(ABC):
  419. @abstractmethod
  420. def running_replicas_populated(self) -> bool:
  421. pass
  422. @abstractmethod
  423. def assign_request(
  424. self,
  425. request_meta: RequestMetadata,
  426. *request_args,
  427. **request_kwargs,
  428. ) -> concurrent.futures.Future[ReplicaResult]:
  429. pass
  430. @abstractmethod
  431. def shutdown(self) -> concurrent.futures.Future:
  432. pass
  433. async def create_event() -> asyncio.Event:
  434. """Helper to create an asyncio event in the current event loop."""
  435. return asyncio.Event()
  436. class AsyncioRouter:
  437. def __init__(
  438. self,
  439. controller_handle: ActorHandle,
  440. deployment_id: DeploymentID,
  441. handle_id: str,
  442. self_actor_id: str,
  443. handle_source: DeploymentHandleSource,
  444. event_loop: asyncio.BaseEventLoop,
  445. enable_strict_max_ongoing_requests: bool,
  446. node_id: str,
  447. availability_zone: Optional[str],
  448. prefer_local_node_routing: bool,
  449. resolve_request_arg_func: Coroutine = resolve_deployment_response,
  450. request_router_class: Optional[Callable] = None,
  451. request_router_kwargs: Optional[Dict[str, Any]] = None,
  452. request_router: Optional[RequestRouter] = None,
  453. _request_router_initialized_event: Optional[asyncio.Event] = None,
  454. ):
  455. """Used to assign requests to downstream replicas for a deployment.
  456. The routing behavior is delegated to a RequestRouter; this is a thin
  457. wrapper that adds metrics and logging.
  458. """
  459. self._controller_handle = controller_handle
  460. self.deployment_id = deployment_id
  461. self._self_actor_id = self_actor_id
  462. self._handle_source = handle_source
  463. self._event_loop = event_loop
  464. self._request_router_class = request_router_class
  465. self._request_router_kwargs = (
  466. request_router_kwargs if request_router_kwargs else {}
  467. )
  468. self._enable_strict_max_ongoing_requests = enable_strict_max_ongoing_requests
  469. self._node_id = node_id
  470. self._availability_zone = availability_zone
  471. self._prefer_local_node_routing = prefer_local_node_routing
  472. # By default, deployment is available unless we receive news
  473. # otherwise through a long poll broadcast from the controller.
  474. self._deployment_available = True
  475. # The request router will be lazy loaded to decouple form the initialization.
  476. self._request_router: Optional[RequestRouter] = request_router
  477. if _request_router_initialized_event:
  478. self._request_router_initialized = _request_router_initialized_event
  479. else:
  480. future = asyncio.run_coroutine_threadsafe(create_event(), self._event_loop)
  481. self._request_router_initialized = future.result()
  482. if self._request_router:
  483. self._request_router_initialized.set()
  484. self._resolve_request_arg_func = resolve_request_arg_func
  485. self._running_replicas: Optional[List[RunningReplicaInfo]] = None
  486. # Flipped to `True` once the router has received a non-empty
  487. # replica set at least once.
  488. self._running_replicas_populated: bool = False
  489. # Initializing `self._metrics_manager` before `self.long_poll_client` is
  490. # necessary to avoid race condition where `self.update_deployment_config()`
  491. # might be called before `self._metrics_manager` instance is created.
  492. self._metrics_manager = RouterMetricsManager(
  493. deployment_id,
  494. handle_id,
  495. self_actor_id,
  496. handle_source,
  497. controller_handle,
  498. metrics.Counter(
  499. "serve_num_router_requests",
  500. description="The number of requests processed by the router.",
  501. tag_keys=("deployment", "route", "application", "handle", "actor_id"),
  502. ),
  503. metrics.Gauge(
  504. "serve_deployment_queued_queries",
  505. description=(
  506. "The current number of queries to this deployment waiting"
  507. " to be assigned to a replica."
  508. ),
  509. tag_keys=("deployment", "application", "handle", "actor_id"),
  510. ),
  511. metrics.Gauge(
  512. "serve_num_ongoing_requests_at_replicas",
  513. description=(
  514. "The current number of requests to this deployment that "
  515. "have been submitted to a replica."
  516. ),
  517. tag_keys=("deployment", "application", "handle", "actor_id"),
  518. ),
  519. event_loop,
  520. )
  521. # The Router needs to stay informed about changes to the target deployment's
  522. # running replicas and deployment config. We do this via the long poll system.
  523. # However, for efficiency, we don't want to create a LongPollClient for every
  524. # DeploymentHandle, so we use a shared LongPollClient that all Routers
  525. # register themselves with. But first, the router needs to get a fast initial
  526. # update so that it can start serving requests, which we do with a dedicated
  527. # LongPollClient that stops running once the shared client takes over.
  528. self.long_poll_client = LongPollClient(
  529. controller_handle,
  530. {
  531. (
  532. LongPollNamespace.DEPLOYMENT_TARGETS,
  533. deployment_id,
  534. ): self.update_deployment_targets,
  535. (
  536. LongPollNamespace.DEPLOYMENT_CONFIG,
  537. deployment_id,
  538. ): self.update_deployment_config,
  539. },
  540. call_in_event_loop=self._event_loop,
  541. )
  542. shared = SharedRouterLongPollClient.get_or_create(
  543. controller_handle, self._event_loop
  544. )
  545. shared.register(self)
  546. @property
  547. def request_router(self) -> Optional[RequestRouter]:
  548. """Get and lazy loading request router.
  549. If the request_router_class not provided, and the request router is not
  550. yet initialized, then it will return None. Otherwise, if request router
  551. is not yet initialized, it will be initialized and returned. Also,
  552. setting `self._request_router_initialized` to signal that the request
  553. router is initialized.
  554. """
  555. if not self._request_router and self._request_router_class:
  556. request_router = self._request_router_class(
  557. deployment_id=self.deployment_id,
  558. handle_source=self._handle_source,
  559. self_node_id=self._node_id,
  560. self_actor_id=self._self_actor_id,
  561. self_actor_handle=ray.get_runtime_context().current_actor
  562. if ray.get_runtime_context().get_actor_id()
  563. else None,
  564. # Streaming ObjectRefGenerators are not supported in Ray Client
  565. use_replica_queue_len_cache=self._enable_strict_max_ongoing_requests,
  566. create_replica_wrapper_func=lambda r: RunningReplica(r),
  567. prefer_local_node_routing=self._prefer_local_node_routing,
  568. prefer_local_az_routing=RAY_SERVE_PROXY_PREFER_LOCAL_AZ_ROUTING,
  569. self_availability_zone=self._availability_zone,
  570. )
  571. request_router.initialize_state(**(self._request_router_kwargs))
  572. # Populate the running replicas if they are already available.
  573. if self._running_replicas is not None:
  574. request_router._update_running_replicas(self._running_replicas)
  575. self._request_router = request_router
  576. self._request_router_initialized.set()
  577. # Log usage telemetry to indicate that custom request router
  578. # feature is being used in this cluster.
  579. if (
  580. self._request_router_class.__name__
  581. != PowerOfTwoChoicesRequestRouter.__name__
  582. ):
  583. ServeUsageTag.CUSTOM_REQUEST_ROUTER_USED.record("1")
  584. return self._request_router
  585. def running_replicas_populated(self) -> bool:
  586. return self._running_replicas_populated
  587. def update_deployment_targets(self, deployment_target_info: DeploymentTargetInfo):
  588. self._deployment_available = deployment_target_info.is_available
  589. running_replicas = deployment_target_info.running_replicas
  590. if self.request_router:
  591. self.request_router._update_running_replicas(running_replicas)
  592. else:
  593. # In this case, the request router hasn't been initialized yet.
  594. # Store the running replicas so that we can update the request
  595. # router once it is initialized.
  596. self._running_replicas = running_replicas
  597. self._metrics_manager._update_running_replicas(running_replicas)
  598. if running_replicas:
  599. self._running_replicas_populated = True
  600. def update_deployment_config(self, deployment_config: DeploymentConfig):
  601. self._request_router_class = (
  602. deployment_config.request_router_config.get_request_router_class()
  603. )
  604. self._request_router_kwargs = (
  605. deployment_config.request_router_config.request_router_kwargs
  606. )
  607. self._metrics_manager.update_deployment_config(
  608. deployment_config,
  609. curr_num_replicas=len(self.request_router.curr_replicas),
  610. )
  611. async def _resolve_request_arguments(
  612. self,
  613. pr: PendingRequest,
  614. ) -> None:
  615. """Asynchronously resolve and replace top-level request args and kwargs."""
  616. if pr.resolved:
  617. return
  618. new_args = list(pr.args)
  619. new_kwargs = pr.kwargs.copy()
  620. # Map from index -> task for resolving positional arg
  621. resolve_arg_tasks = {}
  622. for i, obj in enumerate(pr.args):
  623. task = await self._resolve_request_arg_func(obj, pr.metadata)
  624. if task is not None:
  625. resolve_arg_tasks[i] = task
  626. # Map from key -> task for resolving key-word arg
  627. resolve_kwarg_tasks = {}
  628. for k, obj in pr.kwargs.items():
  629. task = await self._resolve_request_arg_func(obj, pr.metadata)
  630. if task is not None:
  631. resolve_kwarg_tasks[k] = task
  632. # Gather all argument resolution tasks concurrently.
  633. if resolve_arg_tasks or resolve_kwarg_tasks:
  634. all_tasks = list(resolve_arg_tasks.values()) + list(
  635. resolve_kwarg_tasks.values()
  636. )
  637. await asyncio.wait(all_tasks)
  638. # Update new args and new kwargs with resolved arguments
  639. for index, task in resolve_arg_tasks.items():
  640. new_args[index] = task.result()
  641. for key, task in resolve_kwarg_tasks.items():
  642. new_kwargs[key] = task.result()
  643. pr.args = new_args
  644. pr.kwargs = new_kwargs
  645. pr.resolved = True
  646. def _process_finished_request(
  647. self,
  648. replica_id: ReplicaID,
  649. parent_request_id: str,
  650. response_id: str,
  651. result: Union[Any, RayError],
  652. ):
  653. self._metrics_manager.dec_num_running_requests_for_replica(replica_id)
  654. if isinstance(result, ActorDiedError):
  655. # Replica has died but controller hasn't notified the router yet.
  656. # Don't consider this replica for requests in the future, and retry
  657. # routing request.
  658. if self.request_router:
  659. self.request_router.on_replica_actor_died(replica_id)
  660. logger.warning(
  661. f"{replica_id} will not be considered for future "
  662. "requests because it has died."
  663. )
  664. elif isinstance(result, ActorUnavailableError):
  665. # There are network issues, or replica has died but GCS is down so
  666. # ActorUnavailableError will be raised until GCS recovers. For the
  667. # time being, invalidate the cache entry so that we don't try to
  668. # send requests to this replica without actively probing, and retry
  669. # routing request.
  670. if self.request_router:
  671. self.request_router.on_replica_actor_unavailable(replica_id)
  672. logger.warning(
  673. f"Request failed because {replica_id} is temporarily unavailable."
  674. )
  675. async def _route_and_send_request_once(
  676. self,
  677. pr: PendingRequest,
  678. response_id: str,
  679. is_retry: bool,
  680. ) -> Optional[ReplicaResult]:
  681. result: Optional[ReplicaResult] = None
  682. replica: Optional[RunningReplica] = None
  683. try:
  684. # Resolve request arguments BEFORE incrementing queued requests.
  685. # This ensures that queue metrics reflect actual pending work,
  686. # not time spent waiting for upstream DeploymentResponse arguments.
  687. # See: https://github.com/ray-project/ray/issues/60624
  688. if not pr.resolved:
  689. await self._resolve_request_arguments(pr)
  690. num_curr_replicas = len(self.request_router.curr_replicas)
  691. with self._metrics_manager.wrap_queued_request(is_retry, num_curr_replicas):
  692. replica = await self.request_router._choose_replica_for_request(
  693. pr, is_retry=is_retry
  694. )
  695. # If the queue len cache is disabled or we're sending a request to Java,
  696. # then directly send the query and hand the response back. The replica will
  697. # never reject requests in this code path.
  698. with_rejection = (
  699. self._enable_strict_max_ongoing_requests
  700. and not replica.is_cross_language
  701. )
  702. result = replica.try_send_request(pr, with_rejection=with_rejection)
  703. # Proactively update the queue length cache.
  704. self.request_router.on_send_request(replica.replica_id)
  705. # Keep track of requests that have been sent out to replicas
  706. if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
  707. _request_context = ray.serve.context._get_serve_request_context()
  708. request_id: str = _request_context.request_id
  709. self._metrics_manager.inc_num_running_requests_for_replica(
  710. replica.replica_id
  711. )
  712. callback = partial(
  713. self._process_finished_request,
  714. replica.replica_id,
  715. request_id,
  716. response_id,
  717. )
  718. result.add_done_callback(callback)
  719. if not with_rejection:
  720. return result
  721. queue_info = await result.get_rejection_response()
  722. self.request_router.on_new_queue_len_info(replica.replica_id, queue_info)
  723. if queue_info.accepted:
  724. self.request_router.on_request_routed(pr, replica.replica_id, result)
  725. return result
  726. except asyncio.CancelledError:
  727. # NOTE(edoakes): this is not strictly necessary because there are
  728. # currently no `await` statements between getting the ref and returning,
  729. # but I'm adding it defensively.
  730. if result is not None:
  731. result.cancel()
  732. raise
  733. except ActorDiedError:
  734. # Replica has died but controller hasn't notified the router yet.
  735. # Don't consider this replica for requests in the future, and retry
  736. # routing request.
  737. if replica is not None:
  738. self.request_router.on_replica_actor_died(replica.replica_id)
  739. logger.warning(
  740. f"{replica.replica_id} will not be considered for future "
  741. "requests because it has died."
  742. )
  743. except ActorUnavailableError:
  744. # There are network issues, or replica has died but GCS is down so
  745. # ActorUnavailableError will be raised until GCS recovers. For the
  746. # time being, invalidate the cache entry so that we don't try to
  747. # send requests to this replica without actively probing, and retry
  748. # routing request.
  749. if replica is not None:
  750. self.request_router.on_replica_actor_unavailable(replica.replica_id)
  751. logger.warning(f"{replica.replica_id} is temporarily unavailable.")
  752. return None
  753. async def route_and_send_request(
  754. self,
  755. pr: PendingRequest,
  756. response_id: str,
  757. ) -> ReplicaResult:
  758. """Choose a replica for the request and send it.
  759. This will block indefinitely if no replicas are available to handle the
  760. request, so it's up to the caller to time out or cancel the request.
  761. """
  762. # Wait for the router to be initialized before sending the request.
  763. await self._request_router_initialized.wait()
  764. is_retry = False
  765. while True:
  766. result = await self._route_and_send_request_once(
  767. pr,
  768. response_id,
  769. is_retry,
  770. )
  771. if result is not None:
  772. return result
  773. # If the replica rejects the request, retry the routing process. The
  774. # request will be placed on the front of the queue to avoid tail latencies.
  775. # TODO(edoakes): this retry procedure is not perfect because it'll reset the
  776. # process of choosing candidates replicas (i.e., for locality-awareness).
  777. is_retry = True
  778. async def assign_request(
  779. self,
  780. request_meta: RequestMetadata,
  781. *request_args,
  782. **request_kwargs,
  783. ) -> ReplicaResult:
  784. """Assign a request to a replica and return the resulting object_ref."""
  785. if not self._deployment_available:
  786. raise DeploymentUnavailableError(self.deployment_id)
  787. response_id = generate_request_id()
  788. assign_request_task = asyncio.current_task()
  789. ray.serve.context._add_request_pending_assignment(
  790. request_meta.internal_request_id, response_id, assign_request_task
  791. )
  792. assign_request_task.add_done_callback(
  793. lambda _: ray.serve.context._remove_request_pending_assignment(
  794. request_meta.internal_request_id, response_id
  795. )
  796. )
  797. # Wait for the router to be initialized before sending the request.
  798. await self._request_router_initialized.wait()
  799. with self._metrics_manager.wrap_request_assignment(request_meta):
  800. replica_result = None
  801. try:
  802. replica_result = await self.route_and_send_request(
  803. PendingRequest(
  804. args=list(request_args),
  805. kwargs=request_kwargs,
  806. metadata=request_meta,
  807. ),
  808. response_id,
  809. )
  810. return replica_result
  811. except asyncio.CancelledError:
  812. # NOTE(edoakes): this is not strictly necessary because
  813. # there are currently no `await` statements between
  814. # getting the ref and returning, but I'm adding it defensively.
  815. if replica_result is not None:
  816. replica_result.cancel()
  817. raise
  818. async def shutdown(self):
  819. await self._metrics_manager.shutdown()
  820. class SingletonThreadRouter(Router):
  821. """Wrapper class that runs an AsyncioRouter on a separate thread.
  822. The motivation for this is to avoid user code blocking the event loop and
  823. preventing the router from making progress.
  824. Maintains a singleton event loop running in a daemon thread that is shared by
  825. all AsyncioRouters.
  826. """
  827. _asyncio_loop: Optional[asyncio.AbstractEventLoop] = None
  828. _asyncio_loop_creation_lock = threading.Lock()
  829. _event_loop_monitor: Optional[EventLoopMonitor] = None
  830. def __init__(self, **passthrough_kwargs):
  831. assert (
  832. "event_loop" not in passthrough_kwargs
  833. ), "SingletonThreadRouter manages the router event loop."
  834. if passthrough_kwargs.get("handle_source") == DeploymentHandleSource.REPLICA:
  835. component = EventLoopMonitor.COMPONENT_REPLICA
  836. elif passthrough_kwargs.get("handle_source") == DeploymentHandleSource.PROXY:
  837. component = EventLoopMonitor.COMPONENT_PROXY
  838. else:
  839. component = EventLoopMonitor.COMPONENT_UNKNOWN
  840. self._asyncio_router = AsyncioRouter(
  841. event_loop=self._get_singleton_asyncio_loop(component), **passthrough_kwargs
  842. )
  843. @classmethod
  844. def _get_singleton_asyncio_loop(cls, component: str) -> asyncio.AbstractEventLoop:
  845. """Get singleton asyncio loop running in a daemon thread.
  846. This method is thread safe.
  847. """
  848. with cls._asyncio_loop_creation_lock:
  849. if cls._asyncio_loop is None:
  850. cls._asyncio_loop = asyncio.new_event_loop()
  851. # Create event loop monitor for the router loop.
  852. # This is shared across all replicas in this process.
  853. actor_id = ray.get_runtime_context().get_actor_id()
  854. cls._event_loop_monitor = EventLoopMonitor(
  855. component=component,
  856. loop_type=EventLoopMonitor.LOOP_TYPE_ROUTER,
  857. # actor_id is None when using DeploymentHandle.remote()
  858. # from the driver.
  859. actor_id=actor_id or "",
  860. )
  861. def _run_router_event_loop():
  862. asyncio.set_event_loop(cls._asyncio_loop)
  863. # Start monitoring before run_forever so the task is scheduled.
  864. cls._event_loop_monitor.start(cls._asyncio_loop)
  865. cls._asyncio_loop.run_forever()
  866. thread = threading.Thread(
  867. daemon=True,
  868. target=_run_router_event_loop,
  869. )
  870. thread.start()
  871. return cls._asyncio_loop
  872. def running_replicas_populated(self) -> bool:
  873. return self._asyncio_router.running_replicas_populated()
  874. def assign_request(
  875. self,
  876. request_meta: RequestMetadata,
  877. *request_args,
  878. **request_kwargs,
  879. ) -> concurrent.futures.Future[ReplicaResult]:
  880. """Routes assign_request call on the internal asyncio loop.
  881. This method uses `run_coroutine_threadsafe` to execute the actual request
  882. assignment logic (`_asyncio_router.assign_request`) on the dedicated
  883. asyncio event loop thread. It returns a `concurrent.futures.Future` that
  884. can be awaited or queried from the calling thread.
  885. Returns:
  886. A concurrent.futures.Future resolving to the ReplicaResult representing
  887. the assigned request.
  888. """
  889. def asyncio_future_callback(
  890. asyncio_future: asyncio.Future, concurrent_future: concurrent.futures.Future
  891. ):
  892. """Callback attached to the asyncio Task running assign_request.
  893. This runs when the asyncio Task finishes (completes, fails, or is cancelled).
  894. Its primary goal is to propagate cancellation initiated via the
  895. `concurrent_future` back to the `ReplicaResult` in situations where
  896. asyncio_future didn't see the cancellation event in time. Think of it
  897. like a second line of defense for cancellation of replica results.
  898. """
  899. # Check if the cancellation originated from the concurrent.futures.Future
  900. if (
  901. concurrent_future.cancelled()
  902. and not asyncio_future.cancelled()
  903. and asyncio_future.exception() is None
  904. ):
  905. result: ReplicaResult = asyncio_future.result()
  906. logger.info(
  907. "Asyncio task completed despite cancellation attempt. "
  908. "Attempting to cancel the request that was assigned to a replica."
  909. )
  910. result.cancel()
  911. concurrent_future = concurrent.futures.Future()
  912. def create_task_and_setup():
  913. task = self._asyncio_loop.create_task(
  914. self._asyncio_router.assign_request(
  915. request_meta, *request_args, **request_kwargs
  916. )
  917. )
  918. # Set up your cancellation callback
  919. task.add_done_callback(
  920. lambda _: asyncio_future_callback(_, concurrent_future)
  921. )
  922. try:
  923. # chain the two futures to handle direction channel of cancellation
  924. futures._chain_future(
  925. ensure_future(task, loop=self._asyncio_loop), concurrent_future
  926. )
  927. except (SystemExit, KeyboardInterrupt):
  928. raise
  929. except BaseException as exc:
  930. if concurrent_future.set_running_or_notify_cancel():
  931. concurrent_future.set_exception(exc)
  932. raise
  933. # Schedule on the event loop thread
  934. self._asyncio_loop.call_soon_threadsafe(create_task_and_setup)
  935. return concurrent_future
  936. def shutdown(self) -> concurrent.futures.Future:
  937. return asyncio.run_coroutine_threadsafe(
  938. self._asyncio_router.shutdown(), loop=self._asyncio_loop
  939. )
  940. class SharedRouterLongPollClient:
  941. def __init__(self, controller_handle: ActorHandle, event_loop: AbstractEventLoop):
  942. self.controller_handler = controller_handle
  943. self.event_loop = event_loop
  944. # We use a WeakSet to store the Routers so that we don't prevent them
  945. # from being garbage-collected.
  946. self.routers: MutableMapping[
  947. DeploymentID, weakref.WeakSet[AsyncioRouter]
  948. ] = defaultdict(weakref.WeakSet)
  949. # Creating the LongPollClient implicitly starts it
  950. self.long_poll_client = LongPollClient(
  951. controller_handle,
  952. key_listeners={},
  953. call_in_event_loop=self.event_loop,
  954. )
  955. @classmethod
  956. @lru_cache(maxsize=None)
  957. def get_or_create(
  958. cls, controller_handle: ActorHandle, event_loop: AbstractEventLoop
  959. ) -> "SharedRouterLongPollClient":
  960. shared = cls(controller_handle=controller_handle, event_loop=event_loop)
  961. logger.info(f"Started {shared}.")
  962. return shared
  963. def update_deployment_targets(
  964. self,
  965. deployment_target_info: DeploymentTargetInfo,
  966. deployment_id: DeploymentID,
  967. ) -> None:
  968. for router in self.routers[deployment_id]:
  969. router.update_deployment_targets(deployment_target_info)
  970. router.long_poll_client.stop()
  971. def update_deployment_config(
  972. self, deployment_config: DeploymentConfig, deployment_id: DeploymentID
  973. ) -> None:
  974. for router in self.routers[deployment_id]:
  975. router.update_deployment_config(deployment_config)
  976. router.long_poll_client.stop()
  977. def register(self, router: AsyncioRouter) -> None:
  978. # We need to run the underlying method in the same event loop that runs
  979. # the long poll loop, because we need to mutate the mapping of routers,
  980. # which are also being iterated over by the key listener callbacks.
  981. # If those happened concurrently in different threads,
  982. # we could get a `RuntimeError: Set changed size during iteration`.
  983. # See https://github.com/ray-project/ray/pull/53613 for more details.
  984. self.event_loop.call_soon_threadsafe(self._register, router)
  985. def _register(self, router: AsyncioRouter) -> None:
  986. self.routers[router.deployment_id].add(router)
  987. # Remove the entries for any deployment ids that no longer have any routers.
  988. # The WeakSets will automatically lose track of Routers that get GC'd,
  989. # but the outer dict will keep the key around, so we need to clean up manually.
  990. # Note the list(...) to avoid mutating self.routers while iterating over it.
  991. for deployment_id, routers in list(self.routers.items()):
  992. if not routers:
  993. self.routers.pop(deployment_id)
  994. # Register the new listeners on the long poll client.
  995. # Some of these listeners may already exist, but it's safe to add them again.
  996. key_listeners = {
  997. (LongPollNamespace.DEPLOYMENT_TARGETS, deployment_id): partial(
  998. self.update_deployment_targets, deployment_id=deployment_id
  999. )
  1000. for deployment_id in self.routers.keys()
  1001. } | {
  1002. (LongPollNamespace.DEPLOYMENT_CONFIG, deployment_id): partial(
  1003. self.update_deployment_config, deployment_id=deployment_id
  1004. )
  1005. for deployment_id in self.routers.keys()
  1006. }
  1007. self.long_poll_client.add_key_listeners(key_listeners)
  1008. class CurrentLoopRouter(Router):
  1009. """Wrapper class that runs an AsyncioRouter on the current asyncio loop.
  1010. Note that this class is NOT THREAD-SAFE, and all methods are expected to be
  1011. invoked from a single asyncio event loop.
  1012. """
  1013. def __init__(self, **passthrough_kwargs):
  1014. assert (
  1015. "event_loop" not in passthrough_kwargs
  1016. ), "CurrentLoopRouter uses the current event loop."
  1017. self._asyncio_loop = asyncio.get_running_loop()
  1018. self._asyncio_router = AsyncioRouter(
  1019. event_loop=self._asyncio_loop,
  1020. _request_router_initialized_event=asyncio.Event(),
  1021. **passthrough_kwargs,
  1022. )
  1023. def running_replicas_populated(self) -> bool:
  1024. return self._asyncio_router.running_replicas_populated()
  1025. def assign_request(
  1026. self,
  1027. request_meta: RequestMetadata,
  1028. *request_args,
  1029. **request_kwargs,
  1030. ) -> asyncio.Future[ReplicaResult]:
  1031. return self._asyncio_loop.create_task(
  1032. self._asyncio_router.assign_request(
  1033. request_meta, *request_args, **request_kwargs
  1034. ),
  1035. )
  1036. def shutdown(self) -> asyncio.Future:
  1037. return self._asyncio_loop.create_task(self._asyncio_router.shutdown())