proxy.py 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416
  1. import asyncio
  2. import gc
  3. import json
  4. import logging
  5. import os
  6. import pickle
  7. import time
  8. from abc import ABC, abstractmethod
  9. from typing import Any, Callable, Dict, Generator, Optional, Set, Tuple
  10. import grpc
  11. import starlette
  12. import starlette.routing
  13. from packaging import version
  14. from starlette.types import Receive
  15. import ray
  16. from ray._common.filters import CoreContextFilter
  17. from ray._common.utils import get_or_create_event_loop
  18. from ray.serve._private.common import (
  19. DeploymentID,
  20. EndpointInfo,
  21. NodeId,
  22. ReplicaID,
  23. RequestMetadata,
  24. RequestProtocol,
  25. )
  26. from ray.serve._private.constants import (
  27. HEALTHY_MESSAGE,
  28. PROXY_MIN_DRAINING_PERIOD_S,
  29. RAY_SERVE_ENABLE_PROXY_GC_OPTIMIZATIONS,
  30. RAY_SERVE_PROXY_GC_THRESHOLD,
  31. RAY_SERVE_REQUEST_PATH_LOG_BUFFER_SIZE,
  32. REQUEST_LATENCY_BUCKETS_MS,
  33. SERVE_CONTROLLER_NAME,
  34. SERVE_HTTP_REQUEST_ID_HEADER,
  35. SERVE_LOG_COMPONENT,
  36. SERVE_LOG_COMPONENT_ID,
  37. SERVE_LOG_REQUEST_ID,
  38. SERVE_LOG_ROUTE,
  39. SERVE_LOGGER_NAME,
  40. SERVE_MULTIPLEXED_MODEL_ID,
  41. SERVE_NAMESPACE,
  42. )
  43. from ray.serve._private.default_impl import get_proxy_handle
  44. from ray.serve._private.event_loop_monitoring import EventLoopMonitor
  45. from ray.serve._private.grpc_util import (
  46. get_grpc_response_status,
  47. set_grpc_code_and_details,
  48. start_grpc_server,
  49. )
  50. from ray.serve._private.http_util import (
  51. MessageQueue,
  52. configure_http_middlewares,
  53. convert_object_to_asgi_messages,
  54. get_http_response_status,
  55. receive_http_body,
  56. send_http_response_on_exception,
  57. start_asgi_http_server,
  58. )
  59. from ray.serve._private.logging_utils import (
  60. access_log_msg,
  61. configure_component_logger,
  62. configure_component_memory_profiler,
  63. get_component_logger_file_path,
  64. )
  65. from ray.serve._private.long_poll import LongPollClient, LongPollNamespace
  66. from ray.serve._private.proxy_request_response import (
  67. ASGIProxyRequest,
  68. HandlerMetadata,
  69. ProxyRequest,
  70. ResponseGenerator,
  71. ResponseHandlerInfo,
  72. ResponseStatus,
  73. gRPCProxyRequest,
  74. )
  75. from ray.serve._private.proxy_response_generator import ProxyResponseGenerator
  76. from ray.serve._private.proxy_router import ProxyRouter
  77. from ray.serve._private.usage import ServeUsageTag
  78. from ray.serve._private.utils import (
  79. asyncio_grpc_exception_handler,
  80. generate_request_id,
  81. get_head_node_id,
  82. is_grpc_enabled,
  83. )
  84. from ray.serve.config import HTTPOptions, gRPCOptions
  85. from ray.serve.generated.serve_pb2 import HealthzResponse, ListApplicationsResponse
  86. from ray.serve.handle import DeploymentHandle
  87. from ray.serve.schema import EncodingType, LoggingConfig
  88. from ray.util import metrics
  89. logger = logging.getLogger(SERVE_LOGGER_NAME)
  90. SOCKET_REUSE_PORT_ENABLED = (
  91. os.environ.get("SERVE_SOCKET_REUSE_PORT_ENABLED", "1") == "1"
  92. )
  93. if os.environ.get("SERVE_REQUEST_PROCESSING_TIMEOUT_S") is not None:
  94. logger.warning(
  95. "The `SERVE_REQUEST_PROCESSING_TIMEOUT_S` environment variable has "
  96. "been deprecated. Please set `request_timeout_s` in your Serve config's "
  97. "`http_options` or `grpc_options` field instead. `SERVE_REQUEST_PROCESSING_TIMEOUT_S` will be "
  98. "ignored in future versions. See: https://docs.ray.io/en/releases-2.5.1/serve/a"
  99. "pi/doc/ray.serve.schema.HTTPOptionsSchema.html#ray.serve.schema.HTTPOptionsSch"
  100. "ema.request_timeout_s and https://docs.ray.io/en/latest/serve/api/"
  101. "doc/ray.serve.config.gRPCOptions.request_timeout_s.html#"
  102. "ray.serve.config.gRPCOptions.request_timeout_s"
  103. )
  104. INITIAL_BACKOFF_PERIOD_SEC = 0.05
  105. MAX_BACKOFF_PERIOD_SEC = 5
  106. DRAINING_MESSAGE = "This node is being drained."
  107. class GenericProxy(ABC):
  108. """This class is served as the base class for different types of proxies.
  109. It contains all the common setup and methods required for running a proxy.
  110. The proxy subclass need to implement the following methods:
  111. - `protocol()`
  112. - `not_found_response()`
  113. - `routes_response()`
  114. - `health_response()`
  115. - `setup_request_context_and_handle()`
  116. - `send_request_to_replica()`
  117. """
  118. def __init__(
  119. self,
  120. node_id: NodeId,
  121. node_ip_address: str,
  122. is_head: bool,
  123. proxy_router: ProxyRouter,
  124. request_timeout_s: Optional[float] = None,
  125. access_log_context: Dict[str, Any] = None,
  126. ):
  127. self.request_timeout_s = request_timeout_s
  128. if self.request_timeout_s is not None and self.request_timeout_s < 0:
  129. self.request_timeout_s = None
  130. self._node_id = node_id
  131. self._is_head = is_head
  132. self.proxy_router = proxy_router
  133. self.request_counter = metrics.Counter(
  134. f"serve_num_{self.protocol.lower()}_requests",
  135. description=f"The number of {self.protocol} requests processed.",
  136. tag_keys=("route", "method", "application", "status_code"),
  137. )
  138. self.request_error_counter = metrics.Counter(
  139. f"serve_num_{self.protocol.lower()}_error_requests",
  140. description=f"The number of errored {self.protocol} responses.",
  141. tag_keys=(
  142. "route",
  143. "error_code",
  144. "method",
  145. "application",
  146. ),
  147. )
  148. self.deployment_request_error_counter = metrics.Counter(
  149. f"serve_num_deployment_{self.protocol.lower()}_error_requests",
  150. description=(
  151. f"The number of errored {self.protocol} "
  152. "responses returned by each deployment."
  153. ),
  154. tag_keys=(
  155. "deployment",
  156. "error_code",
  157. "method",
  158. "route",
  159. "application",
  160. ),
  161. )
  162. # log REQUEST_LATENCY_BUCKET_MS
  163. logger.debug(f"REQUEST_LATENCY_BUCKET_MS: {REQUEST_LATENCY_BUCKETS_MS}")
  164. self.processing_latency_tracker = metrics.Histogram(
  165. f"serve_{self.protocol.lower()}_request_latency_ms",
  166. description=(
  167. f"The end-to-end latency of {self.protocol} requests "
  168. f"(measured from the Serve {self.protocol} proxy)."
  169. ),
  170. boundaries=REQUEST_LATENCY_BUCKETS_MS,
  171. tag_keys=(
  172. "method",
  173. "route",
  174. "application",
  175. "status_code",
  176. ),
  177. )
  178. self.num_ongoing_requests_gauge = metrics.Gauge(
  179. name=f"serve_num_ongoing_{self.protocol.lower()}_requests",
  180. description=f"The number of ongoing requests in this {self.protocol} "
  181. "proxy.",
  182. tag_keys=("node_id", "node_ip_address"),
  183. ).set_default_tags(
  184. {
  185. "node_id": node_id,
  186. "node_ip_address": node_ip_address,
  187. }
  188. )
  189. # `self._ongoing_requests` is used to count the number of ongoing requests
  190. self._ongoing_requests = 0
  191. # The time when the node starts to drain.
  192. # The node is not draining if it's None.
  193. self._draining_start_time: Optional[float] = None
  194. self._access_log_context = access_log_context or {}
  195. getattr(ServeUsageTag, f"{self.protocol.upper()}_PROXY_USED").record("1")
  196. @property
  197. @abstractmethod
  198. def protocol(self) -> RequestProtocol:
  199. """Protocol used in the proxy.
  200. Each proxy needs to implement its own logic for setting up the protocol.
  201. """
  202. raise NotImplementedError
  203. def _is_draining(self) -> bool:
  204. """Whether is proxy actor is in the draining status or not."""
  205. return self._draining_start_time is not None
  206. def is_drained(self):
  207. """Check whether the proxy actor is drained or not.
  208. A proxy actor is drained if it has no ongoing requests
  209. AND it has been draining for more than
  210. `PROXY_MIN_DRAINING_PERIOD_S` seconds.
  211. """
  212. if not self._is_draining():
  213. return False
  214. return (not self._ongoing_requests) and (
  215. (time.time() - self._draining_start_time) > PROXY_MIN_DRAINING_PERIOD_S
  216. )
  217. def update_draining(self, draining: bool):
  218. """Update the draining status of the proxy.
  219. This is called by the proxy state manager
  220. to drain or un-drain the proxy actor.
  221. """
  222. if draining and (not self._is_draining()):
  223. logger.info(
  224. f"Start to drain the proxy actor on node {self._node_id}.",
  225. extra={"log_to_stderr": False},
  226. )
  227. self._draining_start_time = time.time()
  228. if (not draining) and self._is_draining():
  229. logger.info(
  230. f"Stop draining the proxy actor on node {self._node_id}.",
  231. extra={"log_to_stderr": False},
  232. )
  233. self._draining_start_time = None
  234. @abstractmethod
  235. async def not_found_response(
  236. self, proxy_request: ProxyRequest
  237. ) -> ResponseGenerator:
  238. raise NotImplementedError
  239. @abstractmethod
  240. async def routes_response(
  241. self, *, healthy: bool, message: str
  242. ) -> ResponseGenerator:
  243. raise NotImplementedError
  244. @abstractmethod
  245. async def health_response(
  246. self, *, healthy: bool, message: str
  247. ) -> ResponseGenerator:
  248. raise NotImplementedError
  249. def _ongoing_requests_start(self):
  250. """Ongoing requests start.
  251. The current autoscale logic can downscale nodes with ongoing requests if the
  252. node doesn't have replicas and has no primary copies of objects in the object
  253. store. The counter and the dummy object reference will help to keep the node
  254. alive while draining requests, so they are not dropped unintentionally.
  255. """
  256. self._ongoing_requests += 1
  257. self.num_ongoing_requests_gauge.set(self._ongoing_requests)
  258. def _ongoing_requests_end(self):
  259. """Ongoing requests end.
  260. Decrement the ongoing request counter and drop the dummy object reference
  261. signaling that the node can be downscaled safely.
  262. """
  263. self._ongoing_requests -= 1
  264. self.num_ongoing_requests_gauge.set(self._ongoing_requests)
  265. def _get_health_or_routes_reponse(
  266. self, proxy_request: ProxyRequest
  267. ) -> ResponseHandlerInfo:
  268. """Get the response handler for system health and route endpoints.
  269. If the proxy is draining or has not yet received a route table update from the
  270. controller, both will return a non-OK status.
  271. """
  272. router_ready_for_traffic, router_msg = self.proxy_router.ready_for_traffic(
  273. self._is_head
  274. )
  275. if self._is_draining():
  276. healthy = False
  277. message = DRAINING_MESSAGE
  278. elif not router_ready_for_traffic:
  279. healthy = False
  280. message = router_msg
  281. else:
  282. healthy = True
  283. message = HEALTHY_MESSAGE
  284. if proxy_request.is_health_request:
  285. response_generator = self.health_response(healthy=healthy, message=message)
  286. else:
  287. assert proxy_request.is_route_request
  288. response_generator = self.routes_response(healthy=healthy, message=message)
  289. return ResponseHandlerInfo(
  290. response_generator=response_generator,
  291. metadata=HandlerMetadata(
  292. route=proxy_request.route_path,
  293. ),
  294. should_record_access_log=False,
  295. should_increment_ongoing_requests=False,
  296. )
  297. def _get_response_handler_info(
  298. self, proxy_request: ProxyRequest
  299. ) -> ResponseHandlerInfo:
  300. if proxy_request.is_health_request or proxy_request.is_route_request:
  301. return self._get_health_or_routes_reponse(proxy_request)
  302. matched_route = None
  303. if self.protocol == RequestProtocol.HTTP:
  304. matched_route = self.proxy_router.match_route(proxy_request.route_path)
  305. elif self.protocol == RequestProtocol.GRPC:
  306. matched_route = self.proxy_router.get_handle_for_endpoint(
  307. proxy_request.route_path
  308. )
  309. if matched_route is None:
  310. return ResponseHandlerInfo(
  311. response_generator=self.not_found_response(proxy_request),
  312. metadata=HandlerMetadata(
  313. # Don't include the invalid route prefix because it can blow up our
  314. # metrics' cardinality.
  315. # See: https://github.com/ray-project/ray/issues/47999
  316. route="",
  317. ),
  318. should_record_access_log=True,
  319. should_increment_ongoing_requests=False,
  320. )
  321. else:
  322. route_prefix, handle, app_is_cross_language = matched_route
  323. # Modify the path and root path so that reverse lookups and redirection
  324. # work as expected. We do this here instead of in replicas so it can be
  325. # changed without restarting the replicas.
  326. route_path = proxy_request.route_path
  327. if route_prefix != "/" and self.protocol == RequestProtocol.HTTP:
  328. assert not route_prefix.endswith("/")
  329. proxy_request.set_root_path(proxy_request.root_path + route_prefix)
  330. # NOTE(edoakes): starlette<0.33.0 expected the ASGI 'root_prefix'
  331. # to be stripped from the 'path', which wasn't technically following
  332. # the standard. See https://github.com/encode/starlette/pull/2352.
  333. if version.parse(starlette.__version__) < version.parse("0.33.0"):
  334. proxy_request.set_path(route_path.replace(route_prefix, "", 1))
  335. # NOTE(abrar): we try to match to a specific route pattern (e.g., /api/{user_id})
  336. # for logs & metrics when available. If no pattern matches, we fall back to the
  337. # route_prefix to avoid high cardinality.
  338. # See: https://github.com/ray-project/ray/issues/47999 and
  339. # https://github.com/ray-project/ray/issues/52212
  340. if self.protocol == RequestProtocol.HTTP:
  341. logs_and_metrics_route = self.proxy_router.match_route_pattern(
  342. route_prefix, proxy_request.scope
  343. )
  344. else:
  345. logs_and_metrics_route = handle.deployment_id.app_name
  346. internal_request_id = generate_request_id()
  347. handle, request_id = self.setup_request_context_and_handle(
  348. app_name=handle.deployment_id.app_name,
  349. handle=handle,
  350. route=logs_and_metrics_route,
  351. proxy_request=proxy_request,
  352. internal_request_id=internal_request_id,
  353. )
  354. response_generator = self.send_request_to_replica(
  355. request_id=request_id,
  356. internal_request_id=internal_request_id,
  357. handle=handle,
  358. proxy_request=proxy_request,
  359. app_is_cross_language=app_is_cross_language,
  360. )
  361. return ResponseHandlerInfo(
  362. response_generator=response_generator,
  363. metadata=HandlerMetadata(
  364. application_name=handle.deployment_id.app_name,
  365. deployment_name=handle.deployment_id.name,
  366. route=logs_and_metrics_route,
  367. ),
  368. should_record_access_log=True,
  369. should_increment_ongoing_requests=True,
  370. )
  371. async def proxy_request(self, proxy_request: ProxyRequest) -> ResponseGenerator:
  372. """Wrapper for proxy request.
  373. This method is served as common entry point by the proxy. It handles the
  374. routing, including routes and health checks, ongoing request counter,
  375. and metrics.
  376. """
  377. assert proxy_request.request_type in {"http", "websocket", "grpc"}
  378. response_handler_info = self._get_response_handler_info(proxy_request)
  379. start_time = time.time()
  380. if response_handler_info.should_increment_ongoing_requests:
  381. self._ongoing_requests_start()
  382. try:
  383. # The final message yielded must always be the `ResponseStatus`.
  384. status: Optional[ResponseStatus] = None
  385. async for message in response_handler_info.response_generator:
  386. if isinstance(message, ResponseStatus):
  387. status = message
  388. yield message
  389. assert status is not None and isinstance(status, ResponseStatus)
  390. finally:
  391. # If anything during the request failed, we still want to ensure the ongoing
  392. # request counter is decremented.
  393. if response_handler_info.should_increment_ongoing_requests:
  394. self._ongoing_requests_end()
  395. latency_ms = (time.time() - start_time) * 1000.0
  396. if response_handler_info.should_record_access_log:
  397. request_context = ray.serve.context._get_serve_request_context()
  398. self._access_log_context[SERVE_LOG_ROUTE] = request_context.route
  399. self._access_log_context[SERVE_LOG_REQUEST_ID] = request_context.request_id
  400. logger.info(
  401. access_log_msg(
  402. method=proxy_request.method,
  403. route=request_context.route,
  404. status=str(status.code),
  405. latency_ms=latency_ms,
  406. ),
  407. extra=self._access_log_context,
  408. )
  409. self.request_counter.inc(
  410. tags={
  411. "route": response_handler_info.metadata.route,
  412. "method": proxy_request.method,
  413. "application": response_handler_info.metadata.application_name,
  414. "status_code": str(status.code),
  415. }
  416. )
  417. self.processing_latency_tracker.observe(
  418. latency_ms,
  419. tags={
  420. "route": response_handler_info.metadata.route,
  421. "method": proxy_request.method,
  422. "application": response_handler_info.metadata.application_name,
  423. "status_code": str(status.code),
  424. },
  425. )
  426. if status.is_error:
  427. self.request_error_counter.inc(
  428. tags={
  429. "route": response_handler_info.metadata.route,
  430. "method": proxy_request.method,
  431. "application": response_handler_info.metadata.application_name,
  432. "error_code": str(status.code),
  433. }
  434. )
  435. self.deployment_request_error_counter.inc(
  436. tags={
  437. "route": response_handler_info.metadata.route,
  438. "method": proxy_request.method,
  439. "application": response_handler_info.metadata.application_name,
  440. "error_code": str(status.code),
  441. "deployment": response_handler_info.metadata.deployment_name,
  442. }
  443. )
  444. @abstractmethod
  445. def setup_request_context_and_handle(
  446. self,
  447. app_name: str,
  448. handle: DeploymentHandle,
  449. route: str,
  450. proxy_request: ProxyRequest,
  451. internal_request_id: str,
  452. ) -> Tuple[DeploymentHandle, str]:
  453. """Setup the request context and handle for the request.
  454. Each proxy needs to implement its own logic for setting up the request context
  455. and handle.
  456. """
  457. raise NotImplementedError
  458. @abstractmethod
  459. async def send_request_to_replica(
  460. self,
  461. request_id: str,
  462. internal_request_id: str,
  463. handle: DeploymentHandle,
  464. proxy_request: ProxyRequest,
  465. app_is_cross_language: bool = False,
  466. ) -> ResponseGenerator:
  467. """Send the request to the replica and handle streaming response.
  468. Each proxy needs to implement its own logic for sending the request and
  469. handling the streaming response.
  470. """
  471. raise NotImplementedError
  472. class gRPCProxy(GenericProxy):
  473. """This class is meant to be instantiated and run by an gRPC server.
  474. This is the servicer class for the gRPC server. It implements `unary_unary`
  475. as the entry point for unary gRPC request and `unary_stream` as the entry
  476. point for streaming gRPC request.
  477. """
  478. @property
  479. def protocol(self) -> RequestProtocol:
  480. return RequestProtocol.GRPC
  481. async def not_found_response(
  482. self, proxy_request: ProxyRequest
  483. ) -> ResponseGenerator:
  484. if not proxy_request.app_name:
  485. application_message = "Application metadata not set."
  486. else:
  487. application_message = f"Application '{proxy_request.app_name}' not found."
  488. not_found_message = (
  489. f"{application_message} Ping "
  490. "/ray.serve.RayServeAPIService/ListApplications for available applications."
  491. )
  492. yield ResponseStatus(
  493. code=grpc.StatusCode.NOT_FOUND,
  494. message=not_found_message,
  495. is_error=True,
  496. )
  497. async def routes_response(
  498. self, *, healthy: bool, message: str
  499. ) -> ResponseGenerator:
  500. yield ListApplicationsResponse(
  501. application_names=[
  502. endpoint.app_name for endpoint in self.proxy_router.endpoints
  503. ],
  504. ).SerializeToString()
  505. yield ResponseStatus(
  506. code=grpc.StatusCode.OK if healthy else grpc.StatusCode.UNAVAILABLE,
  507. message=message,
  508. is_error=not healthy,
  509. )
  510. async def health_response(self, *, healthy: bool, message) -> ResponseGenerator:
  511. yield HealthzResponse(message=message).SerializeToString()
  512. yield ResponseStatus(
  513. code=grpc.StatusCode.OK if healthy else grpc.StatusCode.UNAVAILABLE,
  514. message=message,
  515. is_error=not healthy,
  516. )
  517. def service_handler_factory(self, service_method: str, stream: bool) -> Callable:
  518. async def unary_unary(
  519. request_proto: Any, context: grpc._cython.cygrpc._ServicerContext
  520. ) -> bytes:
  521. """Entry point of the gRPC proxy unary request.
  522. This method is called by the gRPC server when a unary request is received.
  523. It wraps the request in a ProxyRequest object and calls proxy_request.
  524. The return value is serialized user defined protobuf bytes.
  525. """
  526. proxy_request = gRPCProxyRequest(
  527. request_proto=request_proto,
  528. context=context,
  529. service_method=service_method,
  530. stream=False,
  531. )
  532. status = None
  533. response = None
  534. async for message in self.proxy_request(proxy_request=proxy_request):
  535. if isinstance(message, ResponseStatus):
  536. status = message
  537. else:
  538. response = message
  539. set_grpc_code_and_details(context, status)
  540. return response
  541. async def unary_stream(
  542. request_proto: Any, context: grpc._cython.cygrpc._ServicerContext
  543. ) -> Generator[bytes, None, None]:
  544. """Entry point of the gRPC proxy streaming request.
  545. This method is called by the gRPC server when a streaming request is
  546. received. It wraps the request in a ProxyRequest object and calls
  547. proxy_request. The return value is a generator of serialized user defined
  548. protobuf bytes.
  549. """
  550. proxy_request = gRPCProxyRequest(
  551. request_proto=request_proto,
  552. context=context,
  553. service_method=service_method,
  554. stream=True,
  555. )
  556. status = None
  557. async for message in self.proxy_request(proxy_request=proxy_request):
  558. if isinstance(message, ResponseStatus):
  559. status = message
  560. else:
  561. yield message
  562. set_grpc_code_and_details(context, status)
  563. return unary_stream if stream else unary_unary
  564. def setup_request_context_and_handle(
  565. self,
  566. app_name: str,
  567. handle: DeploymentHandle,
  568. route: str,
  569. proxy_request: ProxyRequest,
  570. internal_request_id: str,
  571. ) -> Tuple[DeploymentHandle, str]:
  572. """Setup request context and handle for the request.
  573. Unpack gRPC request metadata and extract info to set up request context and
  574. handle.
  575. """
  576. multiplexed_model_id = proxy_request.multiplexed_model_id
  577. request_id = proxy_request.request_id
  578. if not request_id:
  579. request_id = generate_request_id()
  580. proxy_request.request_id = request_id
  581. handle = handle.options(
  582. stream=proxy_request.stream,
  583. multiplexed_model_id=multiplexed_model_id,
  584. method_name=proxy_request.method_name,
  585. )
  586. request_context_info = {
  587. "route": route,
  588. "request_id": request_id,
  589. "_internal_request_id": internal_request_id,
  590. "app_name": app_name,
  591. "multiplexed_model_id": multiplexed_model_id,
  592. "grpc_context": proxy_request.ray_serve_grpc_context,
  593. }
  594. ray.serve.context._serve_request_context.set(
  595. ray.serve.context._RequestContext(**request_context_info)
  596. )
  597. proxy_request.send_request_id(request_id=request_id)
  598. return handle, request_id
  599. async def send_request_to_replica(
  600. self,
  601. request_id: str,
  602. internal_request_id: str,
  603. handle: DeploymentHandle,
  604. proxy_request: ProxyRequest,
  605. app_is_cross_language: bool = False,
  606. ) -> ResponseGenerator:
  607. response_generator = ProxyResponseGenerator(
  608. handle.remote(proxy_request.serialized_replica_arg()),
  609. timeout_s=self.request_timeout_s,
  610. )
  611. try:
  612. async for context, result in response_generator:
  613. context._set_on_grpc_context(proxy_request.context)
  614. yield result
  615. status = ResponseStatus(code=grpc.StatusCode.OK)
  616. except BaseException as e:
  617. status = get_grpc_response_status(e, self.request_timeout_s, request_id)
  618. # The status code should always be set.
  619. assert status is not None
  620. yield status
  621. class HTTPProxy(GenericProxy):
  622. """This class is meant to be instantiated and run by an ASGI HTTP server."""
  623. def __init__(
  624. self,
  625. node_id: NodeId,
  626. node_ip_address: str,
  627. is_head: bool,
  628. proxy_router: ProxyRouter,
  629. self_actor_name: str,
  630. request_timeout_s: Optional[float] = None,
  631. access_log_context: Dict[str, Any] = None,
  632. ):
  633. super().__init__(
  634. node_id,
  635. node_ip_address,
  636. is_head,
  637. proxy_router,
  638. request_timeout_s=request_timeout_s,
  639. access_log_context=access_log_context,
  640. )
  641. self.self_actor_name = self_actor_name
  642. self.asgi_receive_queues: Dict[str, MessageQueue] = dict()
  643. @property
  644. def protocol(self) -> RequestProtocol:
  645. return RequestProtocol.HTTP
  646. async def not_found_response(
  647. self, proxy_request: ProxyRequest
  648. ) -> ResponseGenerator:
  649. status_code = 404
  650. for message in convert_object_to_asgi_messages(
  651. f"Path '{proxy_request.path}' not found. "
  652. "Ping http://.../-/routes for available routes.",
  653. status_code=status_code,
  654. ):
  655. yield message
  656. yield ResponseStatus(code=status_code, is_error=True)
  657. async def routes_response(
  658. self, *, healthy: bool, message: str
  659. ) -> ResponseGenerator:
  660. status_code = 200 if healthy else 503
  661. if healthy:
  662. response = dict()
  663. for endpoint, info in self.proxy_router.endpoints.items():
  664. # For 2.x deployments, return {route -> app name}
  665. if endpoint.app_name:
  666. response[info.route] = endpoint.app_name
  667. # Keep compatibility with 1.x deployments.
  668. else:
  669. response[info.route] = endpoint.name
  670. else:
  671. response = message
  672. for asgi_message in convert_object_to_asgi_messages(
  673. response,
  674. status_code=status_code,
  675. ):
  676. yield asgi_message
  677. yield ResponseStatus(
  678. code=status_code,
  679. message=message,
  680. is_error=not healthy,
  681. )
  682. async def health_response(
  683. self, *, healthy: bool, message: str = ""
  684. ) -> ResponseGenerator:
  685. status_code = 200 if healthy else 503
  686. for asgi_message in convert_object_to_asgi_messages(
  687. message,
  688. status_code=status_code,
  689. ):
  690. yield asgi_message
  691. yield ResponseStatus(
  692. code=status_code,
  693. is_error=not healthy,
  694. message=message,
  695. )
  696. async def receive_asgi_messages(
  697. self, request_metadata: RequestMetadata
  698. ) -> ResponseGenerator:
  699. queue = self.asgi_receive_queues.get(request_metadata.internal_request_id, None)
  700. if queue is None:
  701. raise KeyError(f"Request ID {request_metadata.request_id} not found.")
  702. await queue.wait_for_message()
  703. return queue.get_messages_nowait()
  704. async def __call__(self, scope, receive, send):
  705. """Implements the ASGI protocol.
  706. See details at:
  707. https://asgi.readthedocs.io/en/latest/specs/index.html.
  708. """
  709. proxy_request = ASGIProxyRequest(scope=scope, receive=receive, send=send)
  710. async for message in self.proxy_request(proxy_request):
  711. if not isinstance(message, ResponseStatus):
  712. await send(message)
  713. async def proxy_asgi_receive(
  714. self, receive: Receive, queue: MessageQueue
  715. ) -> Optional[int]:
  716. """Proxies the `receive` interface, placing its messages into the queue.
  717. Once a disconnect message is received, the call exits and `receive` is no longer
  718. called.
  719. For HTTP messages, `None` is always returned.
  720. For websocket messages, the disconnect code is returned if a disconnect code is
  721. received.
  722. """
  723. try:
  724. while True:
  725. msg = await receive()
  726. await queue(msg)
  727. if msg["type"] == "http.disconnect":
  728. return None
  729. if msg["type"] == "websocket.disconnect":
  730. return msg["code"]
  731. finally:
  732. # Close the queue so any subsequent calls to fetch messages return
  733. # immediately: https://github.com/ray-project/ray/issues/38368.
  734. queue.close()
  735. def setup_request_context_and_handle(
  736. self,
  737. app_name: str,
  738. handle: DeploymentHandle,
  739. route: str,
  740. proxy_request: ProxyRequest,
  741. internal_request_id: str,
  742. ) -> Tuple[DeploymentHandle, str]:
  743. """Setup request context and handle for the request.
  744. Unpack HTTP request headers and extract info to set up request context and
  745. handle.
  746. """
  747. request_context_info = {
  748. "route": route,
  749. "app_name": app_name,
  750. "_internal_request_id": internal_request_id,
  751. "is_http_request": True,
  752. }
  753. for key, value in proxy_request.headers:
  754. if key.decode() == SERVE_MULTIPLEXED_MODEL_ID:
  755. multiplexed_model_id = value.decode()
  756. handle = handle.options(multiplexed_model_id=multiplexed_model_id)
  757. request_context_info["multiplexed_model_id"] = multiplexed_model_id
  758. if key.decode() == SERVE_HTTP_REQUEST_ID_HEADER:
  759. request_context_info["request_id"] = value.decode()
  760. ray.serve.context._serve_request_context.set(
  761. ray.serve.context._RequestContext(**request_context_info)
  762. )
  763. return handle, request_context_info["request_id"]
  764. async def _format_handle_arg_for_java(
  765. self,
  766. proxy_request: ProxyRequest,
  767. ) -> bytes:
  768. """Convert an HTTP request to the Java-accepted format (single byte string)."""
  769. query_string = proxy_request.scope.get("query_string")
  770. http_body_bytes = await receive_http_body(
  771. proxy_request.scope, proxy_request.receive, proxy_request.send
  772. )
  773. if query_string:
  774. arg = query_string.decode().split("=", 1)[1]
  775. else:
  776. arg = http_body_bytes.decode()
  777. return arg
  778. async def send_request_to_replica(
  779. self,
  780. request_id: str,
  781. internal_request_id: str,
  782. handle: DeploymentHandle,
  783. proxy_request: ProxyRequest,
  784. app_is_cross_language: bool = False,
  785. ) -> ResponseGenerator:
  786. """Send the request to the replica and yield its response messages.
  787. The yielded values will be ASGI messages until the final one, which will be
  788. the status code.
  789. """
  790. if app_is_cross_language:
  791. handle_arg_bytes = await self._format_handle_arg_for_java(proxy_request)
  792. # Response is returned as raw bytes, convert it to ASGI messages.
  793. result_callback = convert_object_to_asgi_messages
  794. else:
  795. handle_arg_bytes = proxy_request.serialized_replica_arg(
  796. proxy_actor_name=self.self_actor_name,
  797. )
  798. # Messages are returned as pickled dictionaries.
  799. result_callback = pickle.loads
  800. # Proxy the receive interface by placing the received messages on a queue.
  801. # The downstream replica must call back into `receive_asgi_messages` on this
  802. # actor to receive the messages.
  803. receive_queue = MessageQueue()
  804. self.asgi_receive_queues[internal_request_id] = receive_queue
  805. proxy_asgi_receive_task = get_or_create_event_loop().create_task(
  806. self.proxy_asgi_receive(proxy_request.receive, receive_queue)
  807. )
  808. response_generator = ProxyResponseGenerator(
  809. handle.remote(handle_arg_bytes),
  810. timeout_s=self.request_timeout_s,
  811. disconnected_task=proxy_asgi_receive_task,
  812. result_callback=result_callback,
  813. )
  814. status: Optional[ResponseStatus] = None
  815. response_started = False
  816. expecting_trailers = False
  817. try:
  818. async for asgi_message_batch in response_generator:
  819. # See the ASGI spec for message details:
  820. # https://asgi.readthedocs.io/en/latest/specs/www.html.
  821. for asgi_message in asgi_message_batch:
  822. if asgi_message["type"] == "http.response.start":
  823. # HTTP responses begin with exactly one
  824. # "http.response.start" message containing the "status"
  825. # field. Other response types (e.g., WebSockets) may not.
  826. status_code = str(asgi_message["status"])
  827. status = ResponseStatus(
  828. code=status_code,
  829. is_error=status_code.startswith(("4", "5")),
  830. )
  831. expecting_trailers = asgi_message.get("trailers", False)
  832. elif asgi_message["type"] == "websocket.accept":
  833. # Websocket code explicitly handles client disconnects,
  834. # so let the ASGI disconnect message propagate instead of
  835. # cancelling the handler.
  836. response_generator.stop_checking_for_disconnect()
  837. elif (
  838. asgi_message["type"] == "http.response.body"
  839. and not asgi_message.get("more_body", False)
  840. and not expecting_trailers
  841. ):
  842. # If the body is completed and we aren't expecting trailers, the
  843. # response is done so we should stop listening for disconnects.
  844. response_generator.stop_checking_for_disconnect()
  845. elif asgi_message["type"] == "http.response.trailers":
  846. # If we are expecting trailers, the response is only done when
  847. # the trailers message has been sent.
  848. if not asgi_message.get("more_trailers", False):
  849. response_generator.stop_checking_for_disconnect()
  850. elif asgi_message["type"] in [
  851. "websocket.close",
  852. "websocket.disconnect",
  853. ]:
  854. status_code = str(asgi_message["code"])
  855. status = ResponseStatus(
  856. code=status_code,
  857. # All status codes are considered errors aside from:
  858. # 1000 (CLOSE_NORMAL), 1001 (CLOSE_GOING_AWAY).
  859. is_error=status_code not in ["1000", "1001"],
  860. )
  861. response_generator.stop_checking_for_disconnect()
  862. yield asgi_message
  863. response_started = True
  864. except BaseException as e:
  865. status = get_http_response_status(e, self.request_timeout_s, request_id)
  866. for asgi_message in send_http_response_on_exception(
  867. status, response_started
  868. ):
  869. yield asgi_message
  870. finally:
  871. # For websocket connection, queue receive task is done when receiving
  872. # disconnect message from client.
  873. receive_client_disconnect_msg = False
  874. if not proxy_asgi_receive_task.done():
  875. proxy_asgi_receive_task.cancel()
  876. else:
  877. receive_client_disconnect_msg = True
  878. # If the server disconnects, status_code can be set above from the
  879. # disconnect message.
  880. # If client disconnects, the disconnect code comes from
  881. # a client message via the receive interface.
  882. if status is None and proxy_request.request_type == "websocket":
  883. if receive_client_disconnect_msg:
  884. # The disconnect message is sent from the client.
  885. status = ResponseStatus(
  886. code=str(proxy_asgi_receive_task.result()),
  887. is_error=True,
  888. )
  889. else:
  890. # The server disconnect without sending a disconnect message
  891. # (otherwise the `status` would be set).
  892. status = ResponseStatus(
  893. code="1000", # [Sihan] is there a better code for this?
  894. is_error=True,
  895. )
  896. del self.asgi_receive_queues[internal_request_id]
  897. # The status code should always be set.
  898. assert status is not None
  899. yield status
  900. class ProxyActorInterface(ABC):
  901. """Abstract interface for proxy actors in Ray Serve.
  902. This interface defines the contract that all proxy actor implementations must follow,
  903. allowing for different proxy backends (Ray HTTP/gRPC proxies, HAProxy, etc.).
  904. """
  905. def __init__(
  906. self,
  907. *,
  908. node_id: NodeId,
  909. node_ip_address: str,
  910. logging_config: LoggingConfig,
  911. log_buffer_size: int = RAY_SERVE_REQUEST_PATH_LOG_BUFFER_SIZE,
  912. ):
  913. """Initialize the proxy actor.
  914. Args:
  915. node_id: ID of the node this proxy is running on
  916. node_ip_address: IP address of the node
  917. logging_config: Logging configuration
  918. log_buffer_size: Size of the log buffer
  919. """
  920. self._node_id = node_id
  921. self._node_ip_address = node_ip_address
  922. self._logging_config = logging_config
  923. self._log_buffer_size = log_buffer_size
  924. self._update_logging_config(logging_config)
  925. @abstractmethod
  926. async def ready(self) -> str:
  927. """Blocks until the proxy is ready to serve requests.
  928. Returns:
  929. JSON-serialized metadata containing proxy information (worker ID, log file path, etc.)
  930. """
  931. pass
  932. @abstractmethod
  933. async def serving(self, wait_for_applications_running: bool = True) -> None:
  934. """Wait for the proxy to be ready to serve requests.
  935. Args:
  936. wait_for_applications_running: Whether to wait for the applications to be running
  937. Returns:
  938. None
  939. """
  940. pass
  941. @abstractmethod
  942. async def update_draining(
  943. self, draining: bool, _after: Optional[Any] = None
  944. ) -> None:
  945. """Update the draining status of the proxy.
  946. Args:
  947. draining: Whether the proxy should be draining
  948. _after: Optional ObjectRef for scheduling dependency
  949. """
  950. pass
  951. @abstractmethod
  952. async def is_drained(self, _after: Optional[Any] = None) -> bool:
  953. """Check whether the proxy is drained.
  954. Args:
  955. _after: Optional ObjectRef for scheduling dependency
  956. Returns:
  957. True if the proxy is drained, False otherwise
  958. """
  959. pass
  960. @abstractmethod
  961. async def check_health(self) -> bool:
  962. """Check the health of the proxy.
  963. Returns:
  964. True if the proxy is healthy, False otherwise
  965. """
  966. pass
  967. @abstractmethod
  968. def pong(self) -> str:
  969. """Respond to ping from replicas.
  970. Returns:
  971. A response string
  972. """
  973. pass
  974. @abstractmethod
  975. async def receive_asgi_messages(self, request_metadata: RequestMetadata) -> bytes:
  976. """Handle ASGI messages for HTTP requests.
  977. Args:
  978. request_metadata: Metadata about the request
  979. Returns:
  980. Serialized ASGI messages
  981. """
  982. pass
  983. # Testing and debugging methods
  984. @abstractmethod
  985. def _get_http_options(self) -> HTTPOptions:
  986. """Get HTTP options used by the proxy."""
  987. pass
  988. @abstractmethod
  989. def _get_logging_config(self) -> Optional[str]:
  990. """Get the file path for the logger (for testing purposes)."""
  991. pass
  992. @abstractmethod
  993. def _dump_ingress_replicas_for_testing(self, route: str) -> Set:
  994. """Get replicas for a route (for testing)."""
  995. pass
  996. def _update_logging_config(self, logging_config: LoggingConfig):
  997. configure_component_logger(
  998. component_name="proxy",
  999. component_id=self._node_ip_address,
  1000. logging_config=logging_config,
  1001. buffer_size=self._log_buffer_size,
  1002. )
  1003. @ray.remote(num_cpus=0)
  1004. class ProxyActor(ProxyActorInterface):
  1005. def __init__(
  1006. self,
  1007. http_options: HTTPOptions,
  1008. grpc_options: gRPCOptions,
  1009. *,
  1010. node_id: NodeId,
  1011. node_ip_address: str,
  1012. logging_config: LoggingConfig,
  1013. long_poll_client: Optional[LongPollClient] = None,
  1014. ): # noqa: F821
  1015. super().__init__(
  1016. node_id=node_id,
  1017. node_ip_address=node_ip_address,
  1018. logging_config=logging_config,
  1019. )
  1020. self._grpc_options = grpc_options
  1021. self._http_options = configure_http_middlewares(http_options)
  1022. grpc_enabled = is_grpc_enabled(self._grpc_options)
  1023. event_loop = get_or_create_event_loop()
  1024. self.long_poll_client = long_poll_client or LongPollClient(
  1025. ray.get_actor(SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE),
  1026. {
  1027. LongPollNamespace.GLOBAL_LOGGING_CONFIG: self._update_logging_config,
  1028. LongPollNamespace.ROUTE_TABLE: self._update_routes_in_proxies,
  1029. },
  1030. call_in_event_loop=event_loop,
  1031. )
  1032. startup_msg = f"Proxy starting on node {self._node_id} (HTTP port: {self._http_options.port}"
  1033. if grpc_enabled:
  1034. startup_msg += f", gRPC port: {self._grpc_options.port})."
  1035. else:
  1036. startup_msg += ")."
  1037. logger.info(startup_msg)
  1038. logger.debug(
  1039. f"Configure Proxy actor {ray.get_runtime_context().get_actor_id()} "
  1040. f"logger with logging config: {logging_config}"
  1041. )
  1042. configure_component_memory_profiler(
  1043. component_name="proxy", component_id=node_ip_address
  1044. )
  1045. if logging_config.encoding == EncodingType.JSON:
  1046. # Create logging context for access logs as a performance optimization.
  1047. # While logging_utils can automatically add Ray core and Serve access log context,
  1048. # we pre-compute it here since context evaluation is expensive and this context
  1049. # will be reused for multiple access log entries.
  1050. ray_core_logging_context = CoreContextFilter.get_ray_core_logging_context()
  1051. # remove task level log keys from ray core logging context, it would be nice
  1052. # to have task level log keys here but we are letting those go in favor of
  1053. # performance optimization. Also we cannot include task level log keys here because
  1054. # they would referance the current task (__init__) and not the task that is logging.
  1055. for key in CoreContextFilter.TASK_LEVEL_LOG_KEYS:
  1056. ray_core_logging_context.pop(key, None)
  1057. access_log_context = {
  1058. **ray_core_logging_context,
  1059. SERVE_LOG_COMPONENT: "proxy",
  1060. SERVE_LOG_COMPONENT_ID: self._node_ip_address,
  1061. "log_to_stderr": False,
  1062. "skip_context_filter": True,
  1063. "serve_access_log": True,
  1064. }
  1065. else:
  1066. access_log_context = {
  1067. "log_to_stderr": False,
  1068. "skip_context_filter": True,
  1069. "serve_access_log": True,
  1070. }
  1071. is_head = self._node_id == get_head_node_id()
  1072. self.proxy_router = ProxyRouter(get_proxy_handle)
  1073. self.http_proxy = HTTPProxy(
  1074. node_id=self._node_id,
  1075. node_ip_address=self._node_ip_address,
  1076. is_head=is_head,
  1077. self_actor_name=ray.get_runtime_context().get_actor_name(),
  1078. proxy_router=self.proxy_router,
  1079. request_timeout_s=self._http_options.request_timeout_s,
  1080. access_log_context=access_log_context,
  1081. )
  1082. self.grpc_proxy = (
  1083. gRPCProxy(
  1084. node_id=self._node_id,
  1085. node_ip_address=self._node_ip_address,
  1086. is_head=is_head,
  1087. proxy_router=self.proxy_router,
  1088. request_timeout_s=self._grpc_options.request_timeout_s,
  1089. access_log_context=access_log_context,
  1090. )
  1091. if grpc_enabled
  1092. else None
  1093. )
  1094. if self.grpc_proxy:
  1095. get_or_create_event_loop().set_exception_handler(
  1096. asyncio_grpc_exception_handler
  1097. )
  1098. # Start a task to initialize the HTTP server.
  1099. # The result of this task is checked in the `ready` method.
  1100. self._start_http_server_task = event_loop.create_task(
  1101. start_asgi_http_server(
  1102. self.http_proxy,
  1103. self._http_options,
  1104. event_loop=event_loop,
  1105. enable_so_reuseport=SOCKET_REUSE_PORT_ENABLED,
  1106. )
  1107. )
  1108. # A task that runs the HTTP server until it exits (currently runs forever).
  1109. # Populated with the result of self._start_http_server_task.
  1110. self._running_http_server_task: Optional[asyncio.Task] = None
  1111. # Start a task to initialize the gRPC server.
  1112. # The result of this task is checked in the `ready` method.
  1113. self._start_grpc_server_task: Optional[asyncio.Task] = None
  1114. if grpc_enabled:
  1115. self._start_grpc_server_task = event_loop.create_task(
  1116. start_grpc_server(
  1117. self.grpc_proxy.service_handler_factory,
  1118. self._grpc_options,
  1119. event_loop=event_loop,
  1120. enable_so_reuseport=SOCKET_REUSE_PORT_ENABLED,
  1121. ),
  1122. )
  1123. # A task that runs the gRPC server until it exits (currently runs forever).
  1124. # Populated with the result of self._start_grpc_server_task.
  1125. self._running_grpc_server_task: Optional[asyncio.Task] = None
  1126. _configure_gc_options()
  1127. # Start event loop monitoring for the proxy's main event loop.
  1128. self._event_loop_monitor = EventLoopMonitor(
  1129. component=EventLoopMonitor.COMPONENT_PROXY,
  1130. loop_type=EventLoopMonitor.LOOP_TYPE_MAIN,
  1131. actor_id=ray.get_runtime_context().get_actor_id(),
  1132. )
  1133. self._event_loop_monitor.start(event_loop)
  1134. def _update_routes_in_proxies(self, endpoints: Dict[DeploymentID, EndpointInfo]):
  1135. self.proxy_router.update_routes(endpoints)
  1136. def _get_logging_config(self) -> Tuple:
  1137. """Get the logging configuration (for testing purposes)."""
  1138. log_file_path = None
  1139. for handler in logger.handlers:
  1140. if isinstance(handler, logging.handlers.MemoryHandler):
  1141. log_file_path = handler.target.baseFilename
  1142. return log_file_path
  1143. def _dump_ingress_replicas_for_testing(self, route: str) -> Set[ReplicaID]:
  1144. _, handle, _ = self.http_proxy.proxy_router.match_route(route)
  1145. return handle._router._asyncio_router._request_router._replica_id_set
  1146. def _dump_ingress_cache_for_testing(self, route: str) -> Set[ReplicaID]:
  1147. """Get replica IDs that have entries in the queue length cache (for testing)."""
  1148. _, handle, _ = self.http_proxy.proxy_router.match_route(route)
  1149. request_router = handle._router._asyncio_router._request_router
  1150. cache = request_router.replica_queue_len_cache
  1151. return {
  1152. replica_id
  1153. for replica_id in request_router._replica_id_set
  1154. if cache.get(replica_id) is not None
  1155. }
  1156. async def ready(self) -> str:
  1157. """Blocks until the proxy HTTP (and optionally gRPC) servers are running.
  1158. Returns JSON-serialized metadata containing the proxy's worker ID and log
  1159. file path.
  1160. Raises any exceptions that occur setting up the HTTP or gRPC server.
  1161. """
  1162. try:
  1163. self._running_http_server_task = await self._start_http_server_task
  1164. except Exception as e:
  1165. logger.exception("Failed to start proxy HTTP server.")
  1166. raise e from None
  1167. try:
  1168. if self._start_grpc_server_task is not None:
  1169. self._running_grpc_server_task = await self._start_grpc_server_task
  1170. except Exception as e:
  1171. logger.exception("Failed to start proxy gRPC server.")
  1172. raise e from None
  1173. # Return proxy metadata used by the controller.
  1174. # NOTE(zcin): We need to convert the metadata to a json string because
  1175. # of cross-language scenarios. Java can't deserialize a Python tuple.
  1176. return json.dumps(
  1177. [
  1178. ray.get_runtime_context().get_worker_id(),
  1179. get_component_logger_file_path(),
  1180. ]
  1181. )
  1182. async def serving(self, wait_for_applications_running: bool = True) -> None:
  1183. """Wait for the proxy to be ready to serve requests."""
  1184. return
  1185. async def update_draining(self, draining: bool, _after: Optional[Any] = None):
  1186. """Update the draining status of the HTTP and gRPC proxies.
  1187. Unused `_after` argument is for scheduling: passing an ObjectRef
  1188. allows delaying this call until after the `_after` call has returned.
  1189. """
  1190. self.http_proxy.update_draining(draining)
  1191. if self.grpc_proxy:
  1192. self.grpc_proxy.update_draining(draining)
  1193. async def is_drained(self, _after: Optional[Any] = None):
  1194. """Check whether both HTTP and gRPC proxies are drained or not.
  1195. Unused `_after` argument is for scheduling: passing an ObjectRef
  1196. allows delaying this call until after the `_after` call has returned.
  1197. """
  1198. return self.http_proxy.is_drained() and (
  1199. self.grpc_proxy is None or self.grpc_proxy.is_drained()
  1200. )
  1201. async def check_health(self) -> bool:
  1202. """No-op method to check on the health of the HTTP Proxy.
  1203. Make sure the async event loop is not blocked.
  1204. """
  1205. logger.debug("Received health check.", extra={"log_to_stderr": False})
  1206. return True
  1207. def pong(self):
  1208. """Called by the replica to initialize its handle to the proxy."""
  1209. pass
  1210. async def receive_asgi_messages(self, request_metadata: RequestMetadata) -> bytes:
  1211. """Get ASGI messages for the provided `request_metadata`.
  1212. After the proxy has stopped receiving messages for this `request_metadata`,
  1213. this will always return immediately.
  1214. Raises `KeyError` if this request ID is not found. This will happen when the
  1215. request is no longer being handled (e.g., the user disconnects).
  1216. """
  1217. return pickle.dumps(
  1218. await self.http_proxy.receive_asgi_messages(request_metadata)
  1219. )
  1220. def _get_http_options(self) -> HTTPOptions:
  1221. """Internal method to get HTTP options used by the proxy."""
  1222. return self._http_options
  1223. def _configure_gc_options():
  1224. if not RAY_SERVE_ENABLE_PROXY_GC_OPTIMIZATIONS:
  1225. return
  1226. # Collect any objects that exist already and exclude them from future GC.
  1227. gc.collect(2)
  1228. gc.freeze()
  1229. # Tune the GC threshold to run less frequently (default is 700).
  1230. gc.set_threshold(RAY_SERVE_PROXY_GC_THRESHOLD)