| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- import asyncio
- import dataclasses
- import json
- import logging
- from functools import wraps
- from typing import Optional
- import aiohttp
- from aiohttp.web import Request, Response
- import ray
- import ray.dashboard.optional_utils as dashboard_optional_utils
- from ray._common.pydantic_compat import ValidationError
- from ray.dashboard.modules.version import CURRENT_VERSION, VersionResponse
- from ray.dashboard.subprocesses.module import SubprocessModule
- from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
- from ray.exceptions import RayTaskError
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- def validate_endpoint():
- def decorator(func):
- @wraps(func)
- async def check(self, *args, **kwargs):
- try:
- from ray import serve # noqa: F401
- except ImportError:
- return Response(
- status=501,
- text=(
- "Serve dependencies are not installed. Please run `pip "
- 'install "ray[serve]"`.'
- ),
- )
- return await func(self, *args, **kwargs)
- return check
- return decorator
- # NOTE (shrekris-anyscale): This class uses delayed imports for all
- # Ray Serve-related modules. That way, users can use the Ray dashboard agent for
- # non-Serve purposes without downloading Serve dependencies.
- class ServeHead(SubprocessModule):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self._controller = None
- self._controller_lock = asyncio.Lock()
- # serve_start_async is not thread-safe call. This lock
- # will make sure there is only one call that starts the serve instance.
- # If the lock is already acquired by another async task, the async task
- # will asynchronously wait for the lock.
- self._controller_start_lock = asyncio.Lock()
- # To init gcs_client in internal_kv for record_extra_usage_tag.
- assert self.gcs_client is not None
- assert ray.experimental.internal_kv._internal_kv_initialized()
- # TODO: It's better to use `/api/version`.
- # It requires a refactor of ClassMethodRouteTable to differentiate the server.
- @routes.get("/api/ray/version")
- async def get_version(self, req: Request) -> Response:
- # NOTE(edoakes): CURRENT_VERSION should be bumped and checked on the
- # client when we have backwards-incompatible changes.
- resp = VersionResponse(
- version=CURRENT_VERSION,
- ray_version=ray.__version__,
- ray_commit=ray.__commit__,
- session_name=self.session_name,
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)),
- content_type="application/json",
- status=aiohttp.web.HTTPOk.status_code,
- )
- @routes.get("/api/serve/applications/")
- @dashboard_optional_utils.init_ray_and_catch_exceptions()
- @validate_endpoint()
- async def get_serve_instance_details(self, req: Request) -> Response:
- from ray.serve.schema import APIType, ServeInstanceDetails
- api_type: Optional[APIType] = None
- api_type_str = req.query.get("api_type")
- if api_type_str:
- api_type_lower = api_type_str.lower()
- valid_values = APIType.get_valid_user_values()
- if api_type_lower not in valid_values:
- # Explicitly check against valid user values (excludes 'unknown')
- return Response(
- status=400,
- text=(
- f"Invalid 'api_type' value: '{api_type_str}'. "
- f"Must be one of: {', '.join(valid_values)}"
- ),
- content_type="text/plain",
- )
- api_type = APIType(api_type_lower)
- controller = await self.get_serve_controller()
- if controller is None:
- # If no serve instance is running, return a dict that represents that.
- details = ServeInstanceDetails.get_empty_schema_dict()
- else:
- try:
- details = await controller.get_serve_instance_details.remote(
- source=api_type
- )
- except ray.exceptions.RayTaskError as e:
- # Task failure sometimes are due to GCS
- # failure. When GCS failed, we expect a longer time
- # to recover.
- return Response(
- status=503,
- text=(
- "Failed to get a response from the controller. "
- f"The GCS may be down, please retry later: {e}"
- ),
- )
- return Response(
- text=json.dumps(details),
- content_type="application/json",
- )
- @routes.delete("/api/serve/applications/")
- @dashboard_optional_utils.init_ray_and_catch_exceptions()
- async def delete_serve_applications(self, req: Request) -> Response:
- from ray import serve
- if await self.get_serve_controller() is not None:
- serve.shutdown()
- return Response()
- @routes.put("/api/serve/applications/")
- @dashboard_optional_utils.init_ray_and_catch_exceptions()
- @validate_endpoint()
- async def put_all_applications(self, req: Request) -> Response:
- from ray._common.usage.usage_lib import TagKey, record_extra_usage_tag
- from ray.serve._private.api import serve_start_async
- from ray.serve.config import ProxyLocation
- from ray.serve.schema import ServeDeploySchema
- try:
- config: ServeDeploySchema = ServeDeploySchema.parse_obj(await req.json())
- except ValidationError as e:
- return Response(
- status=400,
- text=repr(e),
- )
- config_http_options = config.http_options.dict()
- location = ProxyLocation._to_deployment_mode(config.proxy_location)
- full_http_options = dict({"location": location}, **config_http_options)
- grpc_options = config.grpc_options.dict()
- async with self._controller_start_lock:
- client = await serve_start_async(
- http_options=full_http_options,
- grpc_options=grpc_options,
- global_logging_config=config.logging_config,
- )
- # Serve ignores HTTP options if it was already running when
- # serve_start_async() is called. Therefore we validate that no
- # existing HTTP options are updated and print warning in case they are
- self.validate_http_options(client, full_http_options)
- try:
- if config.logging_config:
- client.update_global_logging_config(config.logging_config)
- client.deploy_apps(config)
- record_extra_usage_tag(TagKey.SERVE_REST_API_VERSION, "v2")
- except RayTaskError as e:
- return Response(
- status=400,
- text=str(e),
- )
- else:
- return Response()
- def _create_json_response(self, data, status: int) -> Response:
- """Create a JSON response with the given data and status."""
- return Response(
- status=status,
- text=json.dumps(data),
- content_type="application/json",
- )
- @routes.post(
- "/api/v1/applications/{application_name}/deployments/{deployment_name}/scale"
- )
- @dashboard_optional_utils.init_ray_and_catch_exceptions()
- @validate_endpoint()
- async def scale_deployment(self, req: Request) -> Response:
- from ray.serve._private.common import DeploymentID
- from ray.serve._private.exceptions import (
- DeploymentIsBeingDeletedError,
- ExternalScalerDisabledError,
- )
- from ray.serve.schema import ScaleDeploymentRequest
- # Extract path parameters
- application_name = req.match_info.get("application_name")
- deployment_name = req.match_info.get("deployment_name")
- if not application_name or not deployment_name:
- return self._create_json_response(
- {"error": "Missing application_name or deployment_name in path"}, 400
- )
- try:
- request_data = await req.json()
- scale_request = ScaleDeploymentRequest(**request_data)
- except Exception as e:
- return self._create_json_response(
- {"error": f"Invalid request body: {str(e)}"}, 400
- )
- controller = await self.get_serve_controller()
- if controller is None:
- return self._create_json_response(
- {"error": "Serve controller is not available"}, 503
- )
- try:
- deployment_id = DeploymentID(
- name=deployment_name, app_name=application_name
- )
- # Update the target number of replicas
- logger.info(
- f"Scaling deployment {deployment_name}, application {application_name} to {scale_request.target_num_replicas} replicas"
- )
- await controller.update_deployment_replicas.remote(
- deployment_id, scale_request.target_num_replicas
- )
- return self._create_json_response(
- {
- "message": "Scaling request received. Deployment will get scaled asynchronously."
- },
- 200,
- )
- except Exception as e:
- if isinstance(e, DeploymentIsBeingDeletedError):
- # From customer's viewpoint, the deployment is deleted instead of being deleted
- # as they must have already executed the delete command
- return self._create_json_response(
- {"error": "Deployment is deleted"}, 412
- )
- elif isinstance(e, ExternalScalerDisabledError):
- return self._create_json_response({"error": str(e.cause)}, 412)
- if isinstance(e, ValueError) and "not found" in str(e):
- return self._create_json_response(
- {"error": "Application or Deployment not found"}, 400
- )
- else:
- logger.error(
- f"Got an Internal Server Error while scaling deployment, error: {e}"
- )
- return self._create_json_response(
- {"error": "Internal Server Error"}, 503
- )
- def validate_http_options(self, client, http_options):
- divergent_http_options = []
- for option, new_value in http_options.items():
- prev_value = getattr(client.http_config, option)
- if prev_value != new_value:
- divergent_http_options.append(option)
- if divergent_http_options:
- logger.warning(
- "Serve is already running on this Ray cluster and "
- "it's not possible to update its HTTP options without "
- "restarting it. Following options are attempted to be "
- f"updated: {divergent_http_options}."
- )
- async def get_serve_controller(self):
- """Gets the ServeController to the this cluster's Serve app.
- return: If Serve is running on this Ray cluster, returns a client to
- the Serve controller. If Serve is not running, returns None.
- """
- async with self._controller_lock:
- if self._controller is not None:
- try:
- await self._controller.check_alive.remote()
- return self._controller
- except ray.exceptions.RayActorError:
- logger.info("Controller is dead")
- self._controller = None
- # Try to connect to serve even when we detect the actor is dead
- # because the user might have started a new
- # serve cluter.
- from ray.serve._private.constants import (
- SERVE_CONTROLLER_NAME,
- SERVE_NAMESPACE,
- )
- try:
- # get_actor is a sync call but it'll timeout after
- # ray.dashboard.consts.GCS_RPC_TIMEOUT_SECONDS
- self._controller = ray.get_actor(
- SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE
- )
- except Exception as e:
- logger.debug(
- "There is no "
- "instance running on this Ray cluster. Please "
- "call `serve.start(detached=True) to start "
- f"one: {e}"
- )
- return self._controller
|