serve_head.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. import asyncio
  2. import dataclasses
  3. import json
  4. import logging
  5. from functools import wraps
  6. from typing import Optional
  7. import aiohttp
  8. from aiohttp.web import Request, Response
  9. import ray
  10. import ray.dashboard.optional_utils as dashboard_optional_utils
  11. from ray._common.pydantic_compat import ValidationError
  12. from ray.dashboard.modules.version import CURRENT_VERSION, VersionResponse
  13. from ray.dashboard.subprocesses.module import SubprocessModule
  14. from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
  15. from ray.exceptions import RayTaskError
  16. logger = logging.getLogger(__name__)
  17. logger.setLevel(logging.INFO)
  18. def validate_endpoint():
  19. def decorator(func):
  20. @wraps(func)
  21. async def check(self, *args, **kwargs):
  22. try:
  23. from ray import serve # noqa: F401
  24. except ImportError:
  25. return Response(
  26. status=501,
  27. text=(
  28. "Serve dependencies are not installed. Please run `pip "
  29. 'install "ray[serve]"`.'
  30. ),
  31. )
  32. return await func(self, *args, **kwargs)
  33. return check
  34. return decorator
  35. # NOTE (shrekris-anyscale): This class uses delayed imports for all
  36. # Ray Serve-related modules. That way, users can use the Ray dashboard agent for
  37. # non-Serve purposes without downloading Serve dependencies.
  38. class ServeHead(SubprocessModule):
  39. def __init__(self, *args, **kwargs):
  40. super().__init__(*args, **kwargs)
  41. self._controller = None
  42. self._controller_lock = asyncio.Lock()
  43. # serve_start_async is not thread-safe call. This lock
  44. # will make sure there is only one call that starts the serve instance.
  45. # If the lock is already acquired by another async task, the async task
  46. # will asynchronously wait for the lock.
  47. self._controller_start_lock = asyncio.Lock()
  48. # To init gcs_client in internal_kv for record_extra_usage_tag.
  49. assert self.gcs_client is not None
  50. assert ray.experimental.internal_kv._internal_kv_initialized()
  51. # TODO: It's better to use `/api/version`.
  52. # It requires a refactor of ClassMethodRouteTable to differentiate the server.
  53. @routes.get("/api/ray/version")
  54. async def get_version(self, req: Request) -> Response:
  55. # NOTE(edoakes): CURRENT_VERSION should be bumped and checked on the
  56. # client when we have backwards-incompatible changes.
  57. resp = VersionResponse(
  58. version=CURRENT_VERSION,
  59. ray_version=ray.__version__,
  60. ray_commit=ray.__commit__,
  61. session_name=self.session_name,
  62. )
  63. return Response(
  64. text=json.dumps(dataclasses.asdict(resp)),
  65. content_type="application/json",
  66. status=aiohttp.web.HTTPOk.status_code,
  67. )
  68. @routes.get("/api/serve/applications/")
  69. @dashboard_optional_utils.init_ray_and_catch_exceptions()
  70. @validate_endpoint()
  71. async def get_serve_instance_details(self, req: Request) -> Response:
  72. from ray.serve.schema import APIType, ServeInstanceDetails
  73. api_type: Optional[APIType] = None
  74. api_type_str = req.query.get("api_type")
  75. if api_type_str:
  76. api_type_lower = api_type_str.lower()
  77. valid_values = APIType.get_valid_user_values()
  78. if api_type_lower not in valid_values:
  79. # Explicitly check against valid user values (excludes 'unknown')
  80. return Response(
  81. status=400,
  82. text=(
  83. f"Invalid 'api_type' value: '{api_type_str}'. "
  84. f"Must be one of: {', '.join(valid_values)}"
  85. ),
  86. content_type="text/plain",
  87. )
  88. api_type = APIType(api_type_lower)
  89. controller = await self.get_serve_controller()
  90. if controller is None:
  91. # If no serve instance is running, return a dict that represents that.
  92. details = ServeInstanceDetails.get_empty_schema_dict()
  93. else:
  94. try:
  95. details = await controller.get_serve_instance_details.remote(
  96. source=api_type
  97. )
  98. except ray.exceptions.RayTaskError as e:
  99. # Task failure sometimes are due to GCS
  100. # failure. When GCS failed, we expect a longer time
  101. # to recover.
  102. return Response(
  103. status=503,
  104. text=(
  105. "Failed to get a response from the controller. "
  106. f"The GCS may be down, please retry later: {e}"
  107. ),
  108. )
  109. return Response(
  110. text=json.dumps(details),
  111. content_type="application/json",
  112. )
  113. @routes.delete("/api/serve/applications/")
  114. @dashboard_optional_utils.init_ray_and_catch_exceptions()
  115. async def delete_serve_applications(self, req: Request) -> Response:
  116. from ray import serve
  117. if await self.get_serve_controller() is not None:
  118. serve.shutdown()
  119. return Response()
  120. @routes.put("/api/serve/applications/")
  121. @dashboard_optional_utils.init_ray_and_catch_exceptions()
  122. @validate_endpoint()
  123. async def put_all_applications(self, req: Request) -> Response:
  124. from ray._common.usage.usage_lib import TagKey, record_extra_usage_tag
  125. from ray.serve._private.api import serve_start_async
  126. from ray.serve.config import ProxyLocation
  127. from ray.serve.schema import ServeDeploySchema
  128. try:
  129. config: ServeDeploySchema = ServeDeploySchema.parse_obj(await req.json())
  130. except ValidationError as e:
  131. return Response(
  132. status=400,
  133. text=repr(e),
  134. )
  135. config_http_options = config.http_options.dict()
  136. location = ProxyLocation._to_deployment_mode(config.proxy_location)
  137. full_http_options = dict({"location": location}, **config_http_options)
  138. grpc_options = config.grpc_options.dict()
  139. async with self._controller_start_lock:
  140. client = await serve_start_async(
  141. http_options=full_http_options,
  142. grpc_options=grpc_options,
  143. global_logging_config=config.logging_config,
  144. )
  145. # Serve ignores HTTP options if it was already running when
  146. # serve_start_async() is called. Therefore we validate that no
  147. # existing HTTP options are updated and print warning in case they are
  148. self.validate_http_options(client, full_http_options)
  149. try:
  150. if config.logging_config:
  151. client.update_global_logging_config(config.logging_config)
  152. client.deploy_apps(config)
  153. record_extra_usage_tag(TagKey.SERVE_REST_API_VERSION, "v2")
  154. except RayTaskError as e:
  155. return Response(
  156. status=400,
  157. text=str(e),
  158. )
  159. else:
  160. return Response()
  161. def _create_json_response(self, data, status: int) -> Response:
  162. """Create a JSON response with the given data and status."""
  163. return Response(
  164. status=status,
  165. text=json.dumps(data),
  166. content_type="application/json",
  167. )
  168. @routes.post(
  169. "/api/v1/applications/{application_name}/deployments/{deployment_name}/scale"
  170. )
  171. @dashboard_optional_utils.init_ray_and_catch_exceptions()
  172. @validate_endpoint()
  173. async def scale_deployment(self, req: Request) -> Response:
  174. from ray.serve._private.common import DeploymentID
  175. from ray.serve._private.exceptions import (
  176. DeploymentIsBeingDeletedError,
  177. ExternalScalerDisabledError,
  178. )
  179. from ray.serve.schema import ScaleDeploymentRequest
  180. # Extract path parameters
  181. application_name = req.match_info.get("application_name")
  182. deployment_name = req.match_info.get("deployment_name")
  183. if not application_name or not deployment_name:
  184. return self._create_json_response(
  185. {"error": "Missing application_name or deployment_name in path"}, 400
  186. )
  187. try:
  188. request_data = await req.json()
  189. scale_request = ScaleDeploymentRequest(**request_data)
  190. except Exception as e:
  191. return self._create_json_response(
  192. {"error": f"Invalid request body: {str(e)}"}, 400
  193. )
  194. controller = await self.get_serve_controller()
  195. if controller is None:
  196. return self._create_json_response(
  197. {"error": "Serve controller is not available"}, 503
  198. )
  199. try:
  200. deployment_id = DeploymentID(
  201. name=deployment_name, app_name=application_name
  202. )
  203. # Update the target number of replicas
  204. logger.info(
  205. f"Scaling deployment {deployment_name}, application {application_name} to {scale_request.target_num_replicas} replicas"
  206. )
  207. await controller.update_deployment_replicas.remote(
  208. deployment_id, scale_request.target_num_replicas
  209. )
  210. return self._create_json_response(
  211. {
  212. "message": "Scaling request received. Deployment will get scaled asynchronously."
  213. },
  214. 200,
  215. )
  216. except Exception as e:
  217. if isinstance(e, DeploymentIsBeingDeletedError):
  218. # From customer's viewpoint, the deployment is deleted instead of being deleted
  219. # as they must have already executed the delete command
  220. return self._create_json_response(
  221. {"error": "Deployment is deleted"}, 412
  222. )
  223. elif isinstance(e, ExternalScalerDisabledError):
  224. return self._create_json_response({"error": str(e.cause)}, 412)
  225. if isinstance(e, ValueError) and "not found" in str(e):
  226. return self._create_json_response(
  227. {"error": "Application or Deployment not found"}, 400
  228. )
  229. else:
  230. logger.error(
  231. f"Got an Internal Server Error while scaling deployment, error: {e}"
  232. )
  233. return self._create_json_response(
  234. {"error": "Internal Server Error"}, 503
  235. )
  236. def validate_http_options(self, client, http_options):
  237. divergent_http_options = []
  238. for option, new_value in http_options.items():
  239. prev_value = getattr(client.http_config, option)
  240. if prev_value != new_value:
  241. divergent_http_options.append(option)
  242. if divergent_http_options:
  243. logger.warning(
  244. "Serve is already running on this Ray cluster and "
  245. "it's not possible to update its HTTP options without "
  246. "restarting it. Following options are attempted to be "
  247. f"updated: {divergent_http_options}."
  248. )
  249. async def get_serve_controller(self):
  250. """Gets the ServeController to the this cluster's Serve app.
  251. return: If Serve is running on this Ray cluster, returns a client to
  252. the Serve controller. If Serve is not running, returns None.
  253. """
  254. async with self._controller_lock:
  255. if self._controller is not None:
  256. try:
  257. await self._controller.check_alive.remote()
  258. return self._controller
  259. except ray.exceptions.RayActorError:
  260. logger.info("Controller is dead")
  261. self._controller = None
  262. # Try to connect to serve even when we detect the actor is dead
  263. # because the user might have started a new
  264. # serve cluter.
  265. from ray.serve._private.constants import (
  266. SERVE_CONTROLLER_NAME,
  267. SERVE_NAMESPACE,
  268. )
  269. try:
  270. # get_actor is a sync call but it'll timeout after
  271. # ray.dashboard.consts.GCS_RPC_TIMEOUT_SECONDS
  272. self._controller = ray.get_actor(
  273. SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE
  274. )
  275. except Exception as e:
  276. logger.debug(
  277. "There is no "
  278. "instance running on this Ray cluster. Please "
  279. "call `serve.start(detached=True) to start "
  280. f"one: {e}"
  281. )
  282. return self._controller