proxy_router.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. import logging
  2. from typing import Any, Callable, Dict, List, Optional, Tuple
  3. from starlette.applications import Starlette
  4. from starlette.requests import Request
  5. from starlette.routing import Route
  6. from starlette.types import Scope
  7. from ray.serve._private.common import ApplicationName, DeploymentID, EndpointInfo
  8. from ray.serve._private.constants import SERVE_LOGGER_NAME
  9. from ray.serve._private.thirdparty.get_asgi_route_name import (
  10. RoutePattern,
  11. get_asgi_route_name,
  12. )
  13. from ray.serve.handle import DeploymentHandle
  14. logger = logging.getLogger(SERVE_LOGGER_NAME)
  15. NO_ROUTES_MESSAGE = "Route table is not populated yet."
  16. NO_REPLICAS_MESSAGE = "No replicas are available yet."
  17. class ProxyRouter:
  18. """Router interface for the proxy to use."""
  19. def __init__(
  20. self,
  21. get_handle: Callable[[str, str], DeploymentHandle],
  22. ):
  23. # Function to get a handle given a name. Used to mock for testing.
  24. self._get_handle = get_handle
  25. # Contains a ServeHandle for each endpoint.
  26. self.handles: Dict[DeploymentID, DeploymentHandle] = dict()
  27. # Flipped to `True` once the route table has been updated at least once.
  28. # The proxy router is not ready for traffic until the route table is populated
  29. self._route_table_populated = False
  30. # Info used for HTTP proxy
  31. # Routes sorted in order of decreasing length.
  32. self.sorted_routes: List[str] = list()
  33. # Endpoints associated with the routes.
  34. self.route_info: Dict[str, DeploymentID] = dict()
  35. # Map of application name to is_cross_language.
  36. self.app_to_is_cross_language: Dict[ApplicationName, bool] = dict()
  37. # Info used for gRPC proxy
  38. # Endpoints info associated with endpoints.
  39. self.endpoints: Dict[DeploymentID, EndpointInfo] = dict()
  40. # Map of route prefix to list of route patterns for that endpoint
  41. # Used to match incoming requests to ASGI route patterns for metrics
  42. # Route patterns are tuples of (methods, path) where methods can be None
  43. self.route_patterns: Dict[str, List[RoutePattern]] = dict()
  44. # Cache of mock Starlette apps for route pattern matching
  45. # Key: route prefix, Value: pre-built Starlette app with routes
  46. self._route_pattern_apps: Dict[str, Any] = dict()
  47. def ready_for_traffic(self, is_head: bool) -> Tuple[bool, str]:
  48. """Whether the proxy router is ready to serve traffic.
  49. The first return value will be false if any of the following hold:
  50. - The route table has not been populated yet with a non-empty set of routes
  51. - The route table has been populated, but none of the handles
  52. have received running replicas yet AND it lives on a worker node.
  53. Otherwise, the first return value will be true.
  54. """
  55. if not self._route_table_populated:
  56. return False, NO_ROUTES_MESSAGE
  57. # NOTE(zcin): For the proxy on the head node, even if none of its handles have
  58. # been populated with running replicas yet, we MUST mark the proxy as ready for
  59. # traffic. This is to handle the case when all deployments have scaled to zero.
  60. # If the deployments (more precisely, ingress deployments) have all scaled down
  61. # to zero, at least one proxy needs to be able to receive incoming requests to
  62. # trigger upscale.
  63. if is_head:
  64. return True, ""
  65. for handle in self.handles.values():
  66. if handle.running_replicas_populated():
  67. return True, ""
  68. return False, NO_REPLICAS_MESSAGE
  69. def update_routes(self, endpoints: Dict[DeploymentID, EndpointInfo]):
  70. logger.info(
  71. f"Got updated endpoints: {endpoints}.", extra={"log_to_stderr": True}
  72. )
  73. if endpoints:
  74. self._route_table_populated = True
  75. self.endpoints = endpoints
  76. existing_handles = set(self.handles.keys())
  77. routes = []
  78. route_info = {}
  79. app_to_is_cross_language = {}
  80. route_patterns = {}
  81. for endpoint, info in endpoints.items():
  82. routes.append(info.route)
  83. route_info[info.route] = endpoint
  84. app_to_is_cross_language[endpoint.app_name] = info.app_is_cross_language
  85. if info.route_patterns:
  86. route_patterns[info.route] = info.route_patterns
  87. if endpoint in self.handles:
  88. existing_handles.remove(endpoint)
  89. else:
  90. self.handles[endpoint] = self._get_handle(endpoint, info)
  91. # Clean up any handles that are no longer used.
  92. if len(existing_handles) > 0:
  93. logger.info(
  94. f"Deleting {len(existing_handles)} unused handles.",
  95. extra={"log_to_stderr": False},
  96. )
  97. for endpoint in existing_handles:
  98. del self.handles[endpoint]
  99. # Routes are sorted in order of decreasing length to enable longest
  100. # prefix matching.
  101. self.sorted_routes = sorted(routes, key=lambda x: len(x), reverse=True)
  102. self.route_info = route_info
  103. self.app_to_is_cross_language = app_to_is_cross_language
  104. self.route_patterns = route_patterns
  105. # Invalidate cached mock apps when route patterns change
  106. self._route_pattern_apps.clear()
  107. def match_route(
  108. self, target_route: str
  109. ) -> Optional[Tuple[str, DeploymentHandle, bool]]:
  110. """Return the longest prefix match among existing routes for the route.
  111. Args:
  112. target_route: route to match against.
  113. Returns:
  114. (route, handle, is_cross_language) if found, else None.
  115. """
  116. for route in self.sorted_routes:
  117. if target_route.startswith(route):
  118. matched = False
  119. # If the route we matched on ends in a '/', then so does the
  120. # target route and this must be a match.
  121. if route.endswith("/"):
  122. matched = True
  123. # If the route we matched on doesn't end in a '/', we need to
  124. # do another check to ensure that either this is an exact match
  125. # or the next character in the target route is a '/'. This is
  126. # to guard against the scenario where we have '/route' as a
  127. # prefix and there's a request to '/routesuffix'. In this case,
  128. # it should *not* be a match.
  129. elif len(target_route) == len(route) or target_route[len(route)] == "/":
  130. matched = True
  131. if matched:
  132. endpoint = self.route_info[route]
  133. return (
  134. route,
  135. self.handles[endpoint],
  136. self.app_to_is_cross_language[endpoint.app_name],
  137. )
  138. return None
  139. def get_handle_for_endpoint(
  140. self, target_app_name: str
  141. ) -> Optional[Tuple[str, DeploymentHandle, bool]]:
  142. """Return the handle that matches with endpoint.
  143. Args:
  144. target_app_name: app_name to match against.
  145. Returns:
  146. (route, handle, is_cross_language) for the single app if there
  147. is only one, else find the app and handle for exact match. Else return None.
  148. """
  149. for endpoint_tag, handle in self.handles.items():
  150. # If the target_app_name matches with the endpoint or if
  151. # there is only one endpoint.
  152. if target_app_name == endpoint_tag.app_name or len(self.handles) == 1:
  153. endpoint_info = self.endpoints[endpoint_tag]
  154. return (
  155. endpoint_info.route,
  156. handle,
  157. endpoint_info.app_is_cross_language,
  158. )
  159. return None
  160. def match_route_pattern(self, route_prefix: str, asgi_scope: Scope) -> str:
  161. """Match an incoming request to a specific route pattern.
  162. This attempts to match the request path to a route pattern (e.g., /api/{user_id})
  163. rather than just the route prefix. This provides more granular metrics.
  164. The mock Starlette app is cached per route_prefix for performance, avoiding
  165. the overhead of recreating the app and routes on every request.
  166. Args:
  167. route_prefix: The matched route prefix from match_route()
  168. asgi_scope: The ASGI scope containing the request path and method
  169. Returns:
  170. The matched route pattern if available, otherwise the route_prefix
  171. """
  172. # If we don't have route patterns for this prefix, return the prefix
  173. if route_prefix not in self.route_patterns:
  174. return route_prefix
  175. patterns = self.route_patterns[route_prefix]
  176. if not patterns:
  177. return route_prefix
  178. # Get or create the cached mock app for this route_prefix
  179. mock_app = self._route_pattern_apps.get(route_prefix)
  180. if mock_app is None:
  181. try:
  182. # Create routes from patterns
  183. # We use a dummy endpoint since we only need pattern matching
  184. async def dummy_endpoint(request: Request):
  185. pass
  186. routes = [
  187. Route(pattern.path, dummy_endpoint, methods=pattern.methods)
  188. for pattern in patterns
  189. ]
  190. mock_app = Starlette(routes=routes)
  191. # Cache the mock app for future requests
  192. self._route_pattern_apps[route_prefix] = mock_app
  193. except Exception:
  194. # If app creation fails, fall back to route prefix
  195. logger.debug(
  196. f"Failed to create mock app for route pattern matching: {route_prefix}",
  197. exc_info=True,
  198. )
  199. return route_prefix
  200. # Use the cached mock app to match the route pattern
  201. try:
  202. matched = get_asgi_route_name(mock_app, asgi_scope)
  203. if matched:
  204. return matched
  205. except Exception:
  206. # If matching fails for any reason, fall back to route prefix
  207. logger.debug(
  208. f"Failed to match route pattern for {route_prefix}",
  209. exc_info=True,
  210. )
  211. # Fall back to route prefix if no pattern matched
  212. return route_prefix