config.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
  1. import json
  2. import logging
  3. import warnings
  4. from enum import Enum
  5. from functools import cached_property
  6. from typing import Any, Callable, Dict, List, Optional, Union
  7. from ray import cloudpickle
  8. from ray._common.pydantic_compat import (
  9. BaseModel,
  10. Field,
  11. NonNegativeFloat,
  12. NonNegativeInt,
  13. PositiveFloat,
  14. PositiveInt,
  15. PrivateAttr,
  16. validator,
  17. )
  18. from ray._common.utils import import_attr, import_module_and_attr
  19. # Import types needed for AutoscalingContext
  20. from ray.serve._private.common import DeploymentID, ReplicaID, TimeSeries
  21. from ray.serve._private.constants import (
  22. DEFAULT_AUTOSCALING_POLICY_NAME,
  23. DEFAULT_GRPC_PORT,
  24. DEFAULT_HTTP_HOST,
  25. DEFAULT_HTTP_PORT,
  26. DEFAULT_REQUEST_ROUTER_PATH,
  27. DEFAULT_REQUEST_ROUTING_STATS_PERIOD_S,
  28. DEFAULT_REQUEST_ROUTING_STATS_TIMEOUT_S,
  29. DEFAULT_TARGET_ONGOING_REQUESTS,
  30. DEFAULT_UVICORN_KEEP_ALIVE_TIMEOUT_S,
  31. SERVE_LOGGER_NAME,
  32. )
  33. from ray.serve._private.utils import validate_ssl_config
  34. from ray.util.annotations import Deprecated, PublicAPI
  35. logger = logging.getLogger(SERVE_LOGGER_NAME)
  36. @PublicAPI(stability="alpha")
  37. class AutoscalingContext:
  38. """Rich context provided to custom autoscaling policies.
  39. This class provides comprehensive information about a deployment's current state,
  40. metrics, and configuration that can be used by custom autoscaling policies to
  41. make intelligent scaling decisions.
  42. The context includes deployment metadata, current replica state, built-in and
  43. custom metrics, capacity bounds, policy state, and timing information.
  44. Note: The aggregated_metrics and raw_metrics fields support lazy evaluation.
  45. You can pass callables that will be evaluated only when accessed, with results
  46. cached for subsequent accesses.
  47. """
  48. def __init__(
  49. self,
  50. deployment_id: DeploymentID,
  51. deployment_name: str,
  52. app_name: Optional[str],
  53. current_num_replicas: int,
  54. target_num_replicas: int,
  55. running_replicas: List[ReplicaID],
  56. total_num_requests: Union[float, Callable[[], float]],
  57. total_queued_requests: Optional[Union[float, Callable[[], float]]],
  58. aggregated_metrics: Optional[
  59. Union[
  60. Dict[str, Dict[ReplicaID, float]],
  61. Callable[[], Dict[str, Dict[ReplicaID, float]]],
  62. ]
  63. ],
  64. raw_metrics: Optional[
  65. Union[
  66. Dict[str, Dict[ReplicaID, TimeSeries]],
  67. Callable[[], Dict[str, Dict[ReplicaID, TimeSeries]]],
  68. ]
  69. ],
  70. capacity_adjusted_min_replicas: int,
  71. capacity_adjusted_max_replicas: int,
  72. policy_state: Dict[str, Any],
  73. last_scale_up_time: Optional[float],
  74. last_scale_down_time: Optional[float],
  75. current_time: Optional[float],
  76. config: Optional[Any],
  77. ):
  78. # Deployment information
  79. self.deployment_id = deployment_id #: Unique identifier for the deployment.
  80. self.deployment_name = deployment_name #: Name of the deployment.
  81. self.app_name = app_name #: Name of the application containing this deployment.
  82. # Current state
  83. self.current_num_replicas = (
  84. current_num_replicas #: Current number of running replicas.
  85. )
  86. self.target_num_replicas = (
  87. target_num_replicas #: Target number of replicas set by the autoscaler.
  88. )
  89. self.running_replicas = (
  90. running_replicas #: List of currently running replica IDs.
  91. )
  92. # Built-in metrics
  93. self._total_num_requests_value = (
  94. total_num_requests #: Total number of requests across all replicas.
  95. )
  96. self._total_queued_requests_value = (
  97. total_queued_requests #: Number of requests currently queued.
  98. )
  99. # Custom metrics - store potentially lazy callables privately
  100. self._aggregated_metrics_value = aggregated_metrics
  101. self._raw_metrics_value = raw_metrics
  102. # Capacity and bounds
  103. self.capacity_adjusted_min_replicas = capacity_adjusted_min_replicas #: Minimum replicas adjusted for cluster capacity.
  104. self.capacity_adjusted_max_replicas = capacity_adjusted_max_replicas #: Maximum replicas adjusted for cluster capacity.
  105. # Policy state
  106. self.policy_state = (
  107. policy_state #: Persistent state dictionary for the autoscaling policy.
  108. )
  109. # Timing
  110. self.last_scale_up_time = (
  111. last_scale_up_time #: Timestamp of last scale-up action.
  112. )
  113. self.last_scale_down_time = (
  114. last_scale_down_time #: Timestamp of last scale-down action.
  115. )
  116. self.current_time = current_time #: Current timestamp.
  117. # Config
  118. self.config = config #: Autoscaling configuration for this deployment.
  119. @cached_property
  120. def aggregated_metrics(self) -> Optional[Dict[str, Dict[ReplicaID, float]]]:
  121. if callable(self._aggregated_metrics_value):
  122. return self._aggregated_metrics_value()
  123. return self._aggregated_metrics_value
  124. @cached_property
  125. def raw_metrics(self) -> Optional[Dict[str, Dict[ReplicaID, TimeSeries]]]:
  126. if callable(self._raw_metrics_value):
  127. return self._raw_metrics_value()
  128. return self._raw_metrics_value
  129. @cached_property
  130. def total_num_requests(self) -> float:
  131. if callable(self._total_num_requests_value):
  132. return self._total_num_requests_value()
  133. return self._total_num_requests_value
  134. @cached_property
  135. def total_queued_requests(self) -> float:
  136. if callable(self._total_queued_requests_value):
  137. return self._total_queued_requests_value()
  138. return self._total_queued_requests_value
  139. @property
  140. def total_running_requests(self) -> float:
  141. # NOTE: for non-additive aggregation functions, total_running_requests is not
  142. # accurate, consider this is an approximation.
  143. return self.total_num_requests - self.total_queued_requests
  144. @PublicAPI(stability="alpha")
  145. class RequestRouterConfig(BaseModel):
  146. """Config for the Serve request router.
  147. This class configures how Ray Serve routes requests to deployment replicas. The router is
  148. responsible for selecting which replica should handle each incoming request based on the
  149. configured routing policy. You can customize the routing behavior by specifying a custom
  150. request router class and providing configuration parameters.
  151. The router also manages periodic health checks and scheduling statistics collection from
  152. replicas to make informed routing decisions.
  153. Example:
  154. .. code-block:: python
  155. from ray.serve.config import RequestRouterConfig, DeploymentConfig
  156. from ray import serve
  157. # Use default router with custom stats collection interval
  158. request_router_config = RequestRouterConfig(
  159. request_routing_stats_period_s=5.0,
  160. request_routing_stats_timeout_s=15.0
  161. )
  162. # Use custom router class
  163. request_router_config = RequestRouterConfig(
  164. request_router_class="ray.serve.llm.request_router.PrefixCacheAffinityRouter",
  165. request_router_kwargs={"imbalanced_threshold": 20}
  166. )
  167. deployment_config = DeploymentConfig(
  168. request_router_config=request_router_config
  169. )
  170. deployment = serve.deploy(
  171. "my_deployment",
  172. deployment_config=deployment_config
  173. )
  174. """
  175. _serialized_request_router_cls: bytes = PrivateAttr(default=b"")
  176. request_router_class: Union[str, Callable] = Field(
  177. default=DEFAULT_REQUEST_ROUTER_PATH,
  178. description=(
  179. "The class of the request router that Ray Serve uses for this deployment. This value can be "
  180. "a string or a class. All the deployment handles that you create for this "
  181. "deployment use the routing policy defined by the request router. "
  182. "Default to Serve's PowerOfTwoChoicesRequestRouter."
  183. ),
  184. )
  185. request_router_kwargs: Dict[str, Any] = Field(
  186. default_factory=dict,
  187. description=(
  188. "Keyword arguments that Ray Serve passes to the request router class "
  189. "initialize_state method."
  190. ),
  191. )
  192. request_routing_stats_period_s: PositiveFloat = Field(
  193. default=DEFAULT_REQUEST_ROUTING_STATS_PERIOD_S,
  194. description=(
  195. "Duration between record scheduling stats calls for the replica. "
  196. "Defaults to 10s. The health check is by default a no-op Actor call "
  197. "to the replica, but you can define your own request scheduling stats "
  198. "using the 'record_scheduling_stats' method in your deployment."
  199. ),
  200. )
  201. request_routing_stats_timeout_s: PositiveFloat = Field(
  202. default=DEFAULT_REQUEST_ROUTING_STATS_TIMEOUT_S,
  203. description=(
  204. "Duration in seconds, that replicas wait for a request scheduling "
  205. "stats method to return before considering it as failed. Defaults to 30s."
  206. ),
  207. )
  208. @validator("request_router_kwargs", always=True)
  209. def request_router_kwargs_json_serializable(cls, v):
  210. if isinstance(v, bytes):
  211. return v
  212. if v is not None:
  213. try:
  214. json.dumps(v)
  215. except TypeError as e:
  216. raise ValueError(
  217. f"request_router_kwargs is not JSON-serializable: {str(e)}."
  218. )
  219. return v
  220. def __init__(self, **kwargs: dict[str, Any]):
  221. """Initialize RequestRouterConfig with the given parameters.
  222. Needed to serialize the request router class since validators are not called
  223. for attributes that begin with an underscore.
  224. Args:
  225. **kwargs: Keyword arguments to pass to BaseModel.
  226. """
  227. serialized_request_router_cls = kwargs.pop(
  228. "_serialized_request_router_cls", None
  229. )
  230. super().__init__(**kwargs)
  231. if serialized_request_router_cls:
  232. self._serialized_request_router_cls = serialized_request_router_cls
  233. else:
  234. self._serialize_request_router_cls()
  235. def set_serialized_request_router_cls(
  236. self, serialized_request_router_cls: bytes
  237. ) -> None:
  238. self._serialized_request_router_cls = serialized_request_router_cls
  239. @classmethod
  240. def from_serialized_request_router_cls(
  241. cls, request_router_config: dict, serialized_request_router_cls: bytes
  242. ) -> "RequestRouterConfig":
  243. config = request_router_config.copy()
  244. config["_serialized_request_router_cls"] = serialized_request_router_cls
  245. return cls(**config)
  246. def get_serialized_request_router_cls(self) -> Optional[bytes]:
  247. return self._serialized_request_router_cls
  248. def _serialize_request_router_cls(self) -> None:
  249. """Import and serialize request router class with cloudpickle.
  250. Import the request router if you pass it in as a string import path.
  251. Then cloudpickle the request router and set to
  252. `_serialized_request_router_cls`.
  253. """
  254. request_router_class = self.request_router_class
  255. if isinstance(request_router_class, Callable):
  256. request_router_class = (
  257. f"{request_router_class.__module__}.{request_router_class.__name__}"
  258. )
  259. request_router_path = request_router_class or DEFAULT_REQUEST_ROUTER_PATH
  260. request_router_module, request_router_class = import_module_and_attr(
  261. request_router_path
  262. )
  263. cloudpickle.register_pickle_by_value(request_router_module)
  264. self.set_serialized_request_router_cls(cloudpickle.dumps(request_router_class))
  265. cloudpickle.unregister_pickle_by_value(request_router_module)
  266. # Update the request_router_class field to be the string path
  267. self.request_router_class = request_router_path
  268. def get_request_router_class(self) -> Callable:
  269. """Deserialize the request router from cloudpickled bytes."""
  270. try:
  271. return cloudpickle.loads(self._serialized_request_router_cls)
  272. except (ModuleNotFoundError, ImportError) as e:
  273. raise ImportError(
  274. f"Failed to deserialize custom request router: {e}\n\n"
  275. "This typically happens when the router depends on external modules "
  276. "that aren't available in the current environment. To fix this:\n"
  277. " - Ensure all dependencies are installed in your Docker image or environment\n"
  278. " - Package your router as a Python package and install it\n"
  279. " - Place the router module in PYTHONPATH\n\n"
  280. "For more details, see: https://docs.ray.io/en/latest/serve/advanced-guides/"
  281. "custom-request-router.html#gotchas-and-limitations"
  282. ) from e
  283. DEFAULT_METRICS_INTERVAL_S = 10.0
  284. @PublicAPI(stability="alpha")
  285. class AggregationFunction(str, Enum):
  286. MEAN = "mean"
  287. MAX = "max"
  288. MIN = "min"
  289. @PublicAPI(stability="alpha")
  290. class AutoscalingPolicy(BaseModel):
  291. # Cloudpickled policy definition.
  292. _serialized_policy_def: bytes = PrivateAttr(default=b"")
  293. policy_function: Union[str, Callable] = Field(
  294. default=DEFAULT_AUTOSCALING_POLICY_NAME,
  295. description="Policy function can be a string import path or a function callable. "
  296. "If it's a string import path, it must be of the form `path.to.module:function_name`. ",
  297. )
  298. def __init__(self, **kwargs):
  299. serialized_policy_def = kwargs.pop("_serialized_policy_def", None)
  300. super().__init__(**kwargs)
  301. if serialized_policy_def:
  302. self._serialized_policy_def = serialized_policy_def
  303. else:
  304. self.serialize_policy()
  305. def set_serialized_policy_def(self, serialized_policy_def: bytes) -> None:
  306. self._serialized_policy_def = serialized_policy_def
  307. @classmethod
  308. def from_serialized_policy_def(
  309. cls, policy_config: dict, serialized_policy_def: bytes
  310. ) -> "AutoscalingPolicy":
  311. config = policy_config.copy()
  312. config["_serialized_policy_def"] = serialized_policy_def
  313. return cls(**config)
  314. def get_serialized_policy_def(self) -> Optional[bytes]:
  315. return self._serialized_policy_def
  316. def serialize_policy(self) -> None:
  317. """Serialize policy with cloudpickle.
  318. Import the policy if it's passed in as a string import path. Then cloudpickle
  319. the policy and set `serialized_policy_def` if it's empty.
  320. """
  321. policy_path = self.policy_function
  322. if isinstance(policy_path, Callable):
  323. policy_path = f"{policy_path.__module__}.{policy_path.__name__}"
  324. if not self._serialized_policy_def:
  325. policy_module, policy_function = import_module_and_attr(policy_path)
  326. cloudpickle.register_pickle_by_value(policy_module)
  327. self.set_serialized_policy_def(cloudpickle.dumps(policy_function))
  328. cloudpickle.unregister_pickle_by_value(policy_module)
  329. self.policy_function = policy_path
  330. def is_default_policy_function(self) -> bool:
  331. return self.policy_function == DEFAULT_AUTOSCALING_POLICY_NAME
  332. def get_policy(self) -> Callable:
  333. """Deserialize policy from cloudpickled bytes."""
  334. try:
  335. return cloudpickle.loads(self._serialized_policy_def)
  336. except (ModuleNotFoundError, ImportError) as e:
  337. raise ImportError(
  338. f"Failed to deserialize custom autoscaling policy: {e}\n\n"
  339. "This typically happens when the policy depends on external modules "
  340. "that aren't available in the current environment. To fix this:\n"
  341. " - Ensure all dependencies are installed in your Docker image or environment\n"
  342. " - Package your policy as a Python package and install it\n"
  343. " - Place the policy module in PYTHONPATH\n\n"
  344. "For more details, see: https://docs.ray.io/en/latest/serve/advanced-guides/"
  345. "advanced-autoscaling.html#gotchas-and-limitations"
  346. ) from e
  347. @PublicAPI(stability="stable")
  348. class AutoscalingConfig(BaseModel):
  349. """Config for the Serve Autoscaler."""
  350. # Please keep these options in sync with those in
  351. # `src/ray/protobuf/serve.proto`.
  352. # Publicly exposed options
  353. min_replicas: NonNegativeInt = 1
  354. initial_replicas: Optional[NonNegativeInt] = None
  355. max_replicas: PositiveInt = 1
  356. target_ongoing_requests: Optional[PositiveFloat] = DEFAULT_TARGET_ONGOING_REQUESTS
  357. metrics_interval_s: PositiveFloat = Field(
  358. default=DEFAULT_METRICS_INTERVAL_S,
  359. description="[DEPRECATED] How often to scrape for metrics. "
  360. "Will be replaced by the environment variables "
  361. "`RAY_SERVE_REPLICA_AUTOSCALING_METRIC_PUSH_INTERVAL_S` and "
  362. "`RAY_SERVE_HANDLE_AUTOSCALING_METRIC_PUSH_INTERVAL_S` in a future release.",
  363. )
  364. look_back_period_s: PositiveFloat = Field(
  365. default=30.0, description="Time window to average over for metrics."
  366. )
  367. smoothing_factor: PositiveFloat = Field(
  368. default=1.0,
  369. description="[DEPRECATED] Smoothing factor for autoscaling decisions.",
  370. )
  371. # DEPRECATED: replaced by `downscaling_factor`
  372. upscale_smoothing_factor: Optional[PositiveFloat] = Field(
  373. default=None, description="[DEPRECATED] Please use `upscaling_factor` instead."
  374. )
  375. # DEPRECATED: replaced by `upscaling_factor`
  376. downscale_smoothing_factor: Optional[PositiveFloat] = Field(
  377. default=None,
  378. description="[DEPRECATED] Please use `downscaling_factor` instead.",
  379. )
  380. upscaling_factor: Optional[PositiveFloat] = Field(
  381. default=None,
  382. description='Multiplicative "gain" factor to limit upscaling decisions.',
  383. )
  384. downscaling_factor: Optional[PositiveFloat] = Field(
  385. default=None,
  386. description='Multiplicative "gain" factor to limit downscaling decisions.',
  387. )
  388. # How frequently to make autoscaling decisions
  389. # loop_period_s: float = CONTROL_LOOP_PERIOD_S
  390. downscale_delay_s: NonNegativeFloat = Field(
  391. default=600.0,
  392. description="How long to wait before scaling down replicas to a value greater than 0.",
  393. )
  394. # Optionally set for 1->0 transition
  395. downscale_to_zero_delay_s: Optional[NonNegativeFloat] = Field(
  396. default=None,
  397. description="How long to wait before scaling down replicas from 1 to 0. If not set, the value of `downscale_delay_s` will be used.",
  398. )
  399. upscale_delay_s: NonNegativeFloat = Field(
  400. default=30.0, description="How long to wait before scaling up replicas."
  401. )
  402. aggregation_function: Union[str, AggregationFunction] = Field(
  403. default=AggregationFunction.MEAN,
  404. description="Function used to aggregate metrics across a time window.",
  405. )
  406. # Autoscaling policy. This policy is deployment scoped. Defaults to the request-based autoscaler.
  407. policy: AutoscalingPolicy = Field(
  408. default_factory=AutoscalingPolicy,
  409. description="The autoscaling policy for the deployment. This option is experimental.",
  410. )
  411. @validator("max_replicas", always=True)
  412. def replicas_settings_valid(cls, max_replicas, values):
  413. min_replicas = values.get("min_replicas")
  414. initial_replicas = values.get("initial_replicas")
  415. if min_replicas is not None and max_replicas < min_replicas:
  416. raise ValueError(
  417. f"max_replicas ({max_replicas}) must be greater than "
  418. f"or equal to min_replicas ({min_replicas})!"
  419. )
  420. if initial_replicas is not None:
  421. if initial_replicas < min_replicas:
  422. raise ValueError(
  423. f"min_replicas ({min_replicas}) must be less than "
  424. f"or equal to initial_replicas ({initial_replicas})!"
  425. )
  426. elif initial_replicas > max_replicas:
  427. raise ValueError(
  428. f"max_replicas ({max_replicas}) must be greater than "
  429. f"or equal to initial_replicas ({initial_replicas})!"
  430. )
  431. return max_replicas
  432. @validator("metrics_interval_s")
  433. def metrics_interval_s_deprecation_warning(cls, v: PositiveFloat) -> PositiveFloat:
  434. if v != DEFAULT_METRICS_INTERVAL_S:
  435. warnings.warn(
  436. "The `metrics_interval_s` field in AutoscalingConfig is deprecated and "
  437. "will be replaced by the environment variables "
  438. "`RAY_SERVE_REPLICA_AUTOSCALING_METRIC_PUSH_INTERVAL_S` and "
  439. "`RAY_SERVE_HANDLE_AUTOSCALING_METRIC_PUSH_INTERVAL_S` in a future release.",
  440. DeprecationWarning,
  441. )
  442. return v
  443. @validator("look_back_period_s", always=True)
  444. def look_back_period_s_valid(cls, v: PositiveFloat, values):
  445. # Get metrics_interval_s from values, or use default if not set
  446. metrics_interval_s = values.get(
  447. "metrics_interval_s", DEFAULT_METRICS_INTERVAL_S
  448. )
  449. if v <= metrics_interval_s:
  450. # Warns currently, will raise an exception in a future release
  451. warnings.warn(
  452. f"`look_back_period_s` ({v}) must be greater than `metrics_interval_s` "
  453. f"({metrics_interval_s}). This will raise an exception in a future "
  454. f"release. Please set `look_back_period_s` > `metrics_interval_s`.",
  455. FutureWarning,
  456. )
  457. return v
  458. @validator("aggregation_function", always=True)
  459. def aggregation_function_valid(cls, v: Union[str, AggregationFunction]):
  460. if isinstance(v, AggregationFunction):
  461. return v
  462. return AggregationFunction(str(v).lower())
  463. @classmethod
  464. def default(cls):
  465. return cls(
  466. target_ongoing_requests=DEFAULT_TARGET_ONGOING_REQUESTS,
  467. min_replicas=1,
  468. max_replicas=100,
  469. )
  470. def get_upscaling_factor(self) -> PositiveFloat:
  471. if self.upscaling_factor:
  472. return self.upscaling_factor
  473. return self.upscale_smoothing_factor or self.smoothing_factor
  474. def get_downscaling_factor(self) -> PositiveFloat:
  475. if self.downscaling_factor:
  476. return self.downscaling_factor
  477. return self.downscale_smoothing_factor or self.smoothing_factor
  478. def get_target_ongoing_requests(self) -> PositiveFloat:
  479. return self.target_ongoing_requests
  480. # Keep in sync with ServeDeploymentMode in dashboard/client/src/type/serve.ts
  481. @Deprecated
  482. class DeploymentMode(str, Enum):
  483. NoServer = "NoServer"
  484. HeadOnly = "HeadOnly"
  485. EveryNode = "EveryNode"
  486. @PublicAPI(stability="stable")
  487. class ProxyLocation(str, Enum):
  488. """Config for where to run proxies to receive ingress traffic to the cluster.
  489. Options:
  490. - Disabled: don't run proxies at all. This should be used if you are only
  491. making calls to your applications via deployment handles.
  492. - HeadOnly: only run a single proxy on the head node.
  493. - EveryNode: run a proxy on every node in the cluster that has at least one
  494. replica actor. This is the default.
  495. """
  496. Disabled = "Disabled"
  497. HeadOnly = "HeadOnly"
  498. EveryNode = "EveryNode"
  499. @classmethod
  500. def _to_deployment_mode(
  501. cls, proxy_location: Union["ProxyLocation", str]
  502. ) -> DeploymentMode:
  503. if isinstance(proxy_location, str):
  504. proxy_location = ProxyLocation(proxy_location)
  505. elif not isinstance(proxy_location, ProxyLocation):
  506. raise TypeError(
  507. f"Must be a `ProxyLocation` or str, got: {type(proxy_location)}."
  508. )
  509. if proxy_location == ProxyLocation.Disabled:
  510. return DeploymentMode.NoServer
  511. else:
  512. return DeploymentMode(proxy_location.value)
  513. @classmethod
  514. def _from_deployment_mode(
  515. cls, deployment_mode: Optional[Union[DeploymentMode, str]]
  516. ) -> Optional["ProxyLocation"]:
  517. """Converts DeploymentMode enum into ProxyLocation enum.
  518. DeploymentMode is a deprecated version of ProxyLocation that's still
  519. used internally throughout Serve.
  520. """
  521. if deployment_mode is None:
  522. return None
  523. elif isinstance(deployment_mode, str):
  524. deployment_mode = DeploymentMode(deployment_mode)
  525. elif not isinstance(deployment_mode, DeploymentMode):
  526. raise TypeError(
  527. f"Must be a `DeploymentMode` or str, got: {type(deployment_mode)}."
  528. )
  529. if deployment_mode == DeploymentMode.NoServer:
  530. return ProxyLocation.Disabled
  531. else:
  532. return ProxyLocation(deployment_mode.value)
  533. @PublicAPI(stability="stable")
  534. class HTTPOptions(BaseModel):
  535. """HTTP options for the proxies. Supported fields:
  536. - host: Host that the proxies listens for HTTP on. Defaults to
  537. "127.0.0.1". To expose Serve publicly, you probably want to set
  538. this to "0.0.0.0".
  539. - port: Port that the proxies listen for HTTP on. Defaults to 8000.
  540. - root_path: An optional root path to mount the serve application
  541. (for example, "/prefix"). All deployment routes are prefixed
  542. with this path.
  543. - request_timeout_s: End-to-end timeout for HTTP requests.
  544. - keep_alive_timeout_s: Duration to keep idle connections alive when no
  545. requests are ongoing.
  546. - ssl_keyfile: Path to the SSL key file for HTTPS. If provided with
  547. ssl_certfile, the HTTP server will use HTTPS.
  548. - ssl_certfile: Path to the SSL certificate file for HTTPS. If provided
  549. with ssl_keyfile, the HTTP server will use HTTPS.
  550. - ssl_keyfile_password: Optional password for the SSL key file.
  551. - ssl_ca_certs: Optional path to CA certificate file for client certificate
  552. verification.
  553. - location: [DEPRECATED: use `proxy_location` field instead] The deployment
  554. location of HTTP servers:
  555. - "HeadOnly": start one HTTP server on the head node. Serve
  556. assumes the head node is the node you executed serve.start
  557. on. This is the default.
  558. - "EveryNode": start one HTTP server per node.
  559. - "NoServer": disable HTTP server.
  560. - num_cpus: [DEPRECATED] The number of CPU cores to reserve for each
  561. internal Serve HTTP proxy actor.
  562. """
  563. host: Optional[str] = DEFAULT_HTTP_HOST
  564. port: int = DEFAULT_HTTP_PORT
  565. middlewares: List[Any] = []
  566. location: Optional[DeploymentMode] = DeploymentMode.HeadOnly
  567. num_cpus: int = 0
  568. root_url: str = ""
  569. root_path: str = ""
  570. request_timeout_s: Optional[float] = None
  571. keep_alive_timeout_s: int = DEFAULT_UVICORN_KEEP_ALIVE_TIMEOUT_S
  572. ssl_keyfile: Optional[str] = None
  573. ssl_certfile: Optional[str] = None
  574. ssl_keyfile_password: Optional[str] = None
  575. ssl_ca_certs: Optional[str] = None
  576. @validator("location", always=True)
  577. def location_backfill_no_server(cls, v, values):
  578. if values["host"] is None or v is None:
  579. return DeploymentMode.NoServer
  580. return v
  581. @validator("ssl_certfile")
  582. def validate_ssl_certfile(cls, v, values):
  583. ssl_keyfile = values.get("ssl_keyfile")
  584. validate_ssl_config(v, ssl_keyfile)
  585. return v
  586. @validator("middlewares", always=True)
  587. def warn_for_middlewares(cls, v, values):
  588. if v:
  589. warnings.warn(
  590. "Passing `middlewares` to HTTPOptions is deprecated and will be "
  591. "removed in a future version. Consider using the FastAPI integration "
  592. "to configure middlewares on your deployments: "
  593. "https://docs.ray.io/en/latest/serve/http-guide.html#fastapi-http-deployments" # noqa 501
  594. )
  595. return v
  596. @validator("num_cpus", always=True)
  597. def warn_for_num_cpus(cls, v, values):
  598. if v:
  599. warnings.warn(
  600. "Passing `num_cpus` to HTTPOptions is deprecated and will be "
  601. "removed in a future version."
  602. )
  603. return v
  604. class Config:
  605. validate_assignment = True
  606. arbitrary_types_allowed = True
  607. @PublicAPI(stability="alpha")
  608. class gRPCOptions(BaseModel):
  609. """gRPC options for the proxies. Supported fields:
  610. Args:
  611. port (int):
  612. Port for gRPC server if started. Default to 9000. Cannot be
  613. updated once Serve has started running. Serve must be shut down and
  614. restarted with the new port instead.
  615. grpc_servicer_functions (List[str]):
  616. List of import paths for gRPC `add_servicer_to_server` functions to add to
  617. Serve's gRPC proxy. Default to empty list, which means no gRPC methods will
  618. be added and no gRPC server will be started. The servicer functions need to
  619. be importable from the context of where Serve is running.
  620. request_timeout_s: End-to-end timeout for gRPC requests.
  621. """
  622. port: int = DEFAULT_GRPC_PORT
  623. grpc_servicer_functions: List[str] = []
  624. request_timeout_s: Optional[float] = None
  625. @property
  626. def grpc_servicer_func_callable(self) -> List[Callable]:
  627. """Return a list of callable functions from the grpc_servicer_functions.
  628. If the function is not callable or not found, it will be ignored and a warning
  629. will be logged.
  630. """
  631. callables = []
  632. for func in self.grpc_servicer_functions:
  633. try:
  634. imported_func = import_attr(func)
  635. if callable(imported_func):
  636. callables.append(imported_func)
  637. else:
  638. message = (
  639. f"{func} is not a callable function! Please make sure "
  640. "the function is imported correctly."
  641. )
  642. raise ValueError(message)
  643. except ModuleNotFoundError as e:
  644. message = (
  645. f"{func} can't be imported! Please make sure there are no typo "
  646. "in those functions. Or you might want to rebuild service "
  647. "definitions if .proto file is changed."
  648. )
  649. raise ModuleNotFoundError(message) from e
  650. return callables