deployment.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. import logging
  2. import warnings
  3. from copy import deepcopy
  4. from typing import Any, Callable, Dict, List, Optional, Tuple, Union
  5. from ray.serve._private.config import (
  6. DeploymentConfig,
  7. ReplicaConfig,
  8. RequestRouterConfig,
  9. handle_num_replicas_auto,
  10. )
  11. from ray.serve._private.constants import SERVE_LOGGER_NAME
  12. from ray.serve._private.usage import ServeUsageTag
  13. from ray.serve._private.utils import DEFAULT, Default
  14. from ray.serve.config import AutoscalingConfig
  15. from ray.serve.schema import DeploymentSchema, LoggingConfig, RayActorOptionsSchema
  16. from ray.util.annotations import PublicAPI
  17. logger = logging.getLogger(SERVE_LOGGER_NAME)
  18. @PublicAPI(stability="stable")
  19. class Application:
  20. """One or more deployments bound with arguments that can be deployed together.
  21. Can be passed into another `Deployment.bind()` to compose multiple deployments in a
  22. single application, passed to `serve.run`, or deployed via a Serve config file.
  23. For example, to define an Application and run it in Python:
  24. .. code-block:: python
  25. from ray import serve
  26. from ray.serve import Application
  27. @serve.deployment
  28. class MyDeployment:
  29. pass
  30. app: Application = MyDeployment.bind(OtherDeployment.bind())
  31. serve.run(app)
  32. To run the same app using the command line interface (CLI):
  33. .. code-block:: bash
  34. serve run python_file:app
  35. To deploy the same app via a config file:
  36. .. code-block:: yaml
  37. applications:
  38. my_app:
  39. import_path: python_file:app
  40. """
  41. def __init__(self, bound_deployment: "Deployment"):
  42. # This is used by `build_app`, but made private so users don't use it.
  43. self._bound_deployment = bound_deployment
  44. @PublicAPI(stability="stable")
  45. class Deployment:
  46. """Class (or function) decorated with the `@serve.deployment` decorator.
  47. This is run on a number of replica actors. Requests to those replicas call
  48. this class.
  49. One or more deployments can be composed together into an `Application` which is
  50. then run via `serve.run` or a config file.
  51. Example:
  52. .. code-block:: python
  53. @serve.deployment
  54. class MyDeployment:
  55. def __init__(self, name: str):
  56. self._name = name
  57. def __call__(self, request):
  58. return "Hello world!"
  59. app = MyDeployment.bind()
  60. # Run via `serve.run` or the `serve run` CLI command.
  61. serve.run(app)
  62. """
  63. def __init__(
  64. self,
  65. name: str,
  66. deployment_config: DeploymentConfig,
  67. replica_config: ReplicaConfig,
  68. version: Optional[str] = None,
  69. _internal=False,
  70. ) -> None:
  71. if not _internal:
  72. raise RuntimeError(
  73. "The Deployment constructor should not be called "
  74. "directly. Use `@serve.deployment` instead."
  75. )
  76. self._validate_name(name)
  77. if not (version is None or isinstance(version, str)):
  78. raise TypeError("version must be a string.")
  79. self._name = name
  80. self._version = version
  81. self._deployment_config = deployment_config
  82. self._replica_config = replica_config
  83. def _validate_name(self, name: str):
  84. if not isinstance(name, str):
  85. raise TypeError("name must be a string.")
  86. # name does not contain #
  87. if "#" in name:
  88. warnings.warn(
  89. f"Deployment names should not contain the '#' character, this will raise an error starting in Ray 2.46.0. "
  90. f"Current name: {name}."
  91. )
  92. @property
  93. def name(self) -> str:
  94. """Unique name of this deployment."""
  95. return self._name
  96. @property
  97. def version(self) -> Optional[str]:
  98. return self._version
  99. @property
  100. def func_or_class(self) -> Union[Callable, str]:
  101. """Underlying class or function that this deployment wraps."""
  102. return self._replica_config.deployment_def
  103. @property
  104. def num_replicas(self) -> int:
  105. """Target number of replicas."""
  106. return self._deployment_config.num_replicas
  107. @property
  108. def user_config(self) -> Any:
  109. """Dynamic user-provided config options."""
  110. return self._deployment_config.user_config
  111. @property
  112. def max_ongoing_requests(self) -> int:
  113. """Max number of requests a replica can handle at once."""
  114. return self._deployment_config.max_ongoing_requests
  115. @property
  116. def max_queued_requests(self) -> int:
  117. """Max number of requests that can be queued in each deployment handle."""
  118. return self._deployment_config.max_queued_requests
  119. @property
  120. def route_prefix(self):
  121. raise ValueError(
  122. "`route_prefix` can no longer be specified at the deployment level. "
  123. "Pass it to `serve.run` or in the application config instead."
  124. )
  125. @property
  126. def ray_actor_options(self) -> Optional[Dict]:
  127. """Actor options such as resources required for each replica."""
  128. return self._replica_config.ray_actor_options
  129. @property
  130. def init_args(self) -> Tuple[Any]:
  131. return self._replica_config.init_args
  132. @property
  133. def init_kwargs(self) -> Tuple[Any]:
  134. return self._replica_config.init_kwargs
  135. @property
  136. def url(self) -> Optional[str]:
  137. logger.warning(
  138. "DeprecationWarning: `Deployment.url` is deprecated "
  139. "and will be removed in the future."
  140. )
  141. return None
  142. @property
  143. def logging_config(self) -> Dict:
  144. return self._deployment_config.logging_config
  145. def set_logging_config(self, logging_config: Dict):
  146. self._deployment_config.logging_config = logging_config
  147. def __call__(self):
  148. raise RuntimeError(
  149. "Deployments cannot be constructed directly. "
  150. "Use `deployment.deploy() instead.`"
  151. )
  152. def bind(self, *args, **kwargs) -> Application:
  153. """Bind the arguments to the deployment and return an Application.
  154. The returned Application can be deployed using `serve.run` (or via
  155. config file) or bound to another deployment for composition.
  156. """
  157. return Application(self.options(_init_args=args, _init_kwargs=kwargs))
  158. def options(
  159. self,
  160. func_or_class: Optional[Callable] = None,
  161. name: Default[str] = DEFAULT.VALUE,
  162. version: Default[str] = DEFAULT.VALUE,
  163. num_replicas: Default[Optional[Union[int, str]]] = DEFAULT.VALUE,
  164. route_prefix: Default[Union[str, None]] = DEFAULT.VALUE,
  165. ray_actor_options: Default[Optional[Dict]] = DEFAULT.VALUE,
  166. placement_group_bundles: Default[List[Dict[str, float]]] = DEFAULT.VALUE,
  167. placement_group_strategy: Default[str] = DEFAULT.VALUE,
  168. placement_group_bundle_label_selector: Default[
  169. List[Dict[str, str]]
  170. ] = DEFAULT.VALUE,
  171. max_replicas_per_node: Default[int] = DEFAULT.VALUE,
  172. user_config: Default[Optional[Any]] = DEFAULT.VALUE,
  173. max_ongoing_requests: Default[int] = DEFAULT.VALUE,
  174. max_queued_requests: Default[int] = DEFAULT.VALUE,
  175. autoscaling_config: Default[
  176. Union[Dict, AutoscalingConfig, None]
  177. ] = DEFAULT.VALUE,
  178. graceful_shutdown_wait_loop_s: Default[float] = DEFAULT.VALUE,
  179. graceful_shutdown_timeout_s: Default[float] = DEFAULT.VALUE,
  180. health_check_period_s: Default[float] = DEFAULT.VALUE,
  181. health_check_timeout_s: Default[float] = DEFAULT.VALUE,
  182. logging_config: Default[Union[Dict, LoggingConfig, None]] = DEFAULT.VALUE,
  183. request_router_config: Default[
  184. Union[Dict, RequestRouterConfig, None]
  185. ] = DEFAULT.VALUE,
  186. _init_args: Default[Tuple[Any]] = DEFAULT.VALUE,
  187. _init_kwargs: Default[Dict[Any, Any]] = DEFAULT.VALUE,
  188. _internal: bool = False,
  189. max_constructor_retry_count: Default[int] = DEFAULT.VALUE,
  190. ) -> "Deployment":
  191. """Return a copy of this deployment with updated options.
  192. Only those options passed in will be updated, all others will remain
  193. unchanged from the existing deployment.
  194. Refer to the `@serve.deployment` decorator docs for available arguments.
  195. """
  196. if route_prefix is not DEFAULT.VALUE:
  197. raise ValueError(
  198. "`route_prefix` can no longer be specified at the deployment level. "
  199. "Pass it to `serve.run` or in the application config instead."
  200. )
  201. # Modify max_ongoing_requests and autoscaling_config if
  202. # `num_replicas="auto"`
  203. if max_ongoing_requests is None:
  204. raise ValueError("`max_ongoing_requests` must be non-null, got None.")
  205. if num_replicas == "auto":
  206. num_replicas = None
  207. max_ongoing_requests, autoscaling_config = handle_num_replicas_auto(
  208. max_ongoing_requests, autoscaling_config
  209. )
  210. ServeUsageTag.AUTO_NUM_REPLICAS_USED.record("1")
  211. # NOTE: The user_configured_option_names should be the first thing that's
  212. # defined in this method. It depends on the locals() dictionary storing
  213. # only the function args/kwargs.
  214. # Create list of all user-configured options from keyword args
  215. user_configured_option_names = [
  216. option
  217. for option, value in locals().items()
  218. if option not in {"self", "func_or_class", "_internal"}
  219. and value is not DEFAULT.VALUE
  220. ]
  221. new_deployment_config = deepcopy(self._deployment_config)
  222. if not _internal:
  223. new_deployment_config.user_configured_option_names.update(
  224. user_configured_option_names
  225. )
  226. if num_replicas not in [
  227. DEFAULT.VALUE,
  228. None,
  229. "auto",
  230. ] and autoscaling_config not in [
  231. DEFAULT.VALUE,
  232. None,
  233. ]:
  234. raise ValueError(
  235. "Manually setting num_replicas is not allowed when "
  236. "autoscaling_config is provided."
  237. )
  238. if num_replicas == 0:
  239. raise ValueError("num_replicas is expected to larger than 0")
  240. if not _internal and version is not DEFAULT.VALUE:
  241. logger.warning(
  242. "DeprecationWarning: `version` in `Deployment.options()` has been "
  243. "deprecated. Explicitly specifying version will raise an error in the "
  244. "future!"
  245. )
  246. elif num_replicas not in [DEFAULT.VALUE, None]:
  247. new_deployment_config.num_replicas = num_replicas
  248. if user_config is not DEFAULT.VALUE:
  249. new_deployment_config.user_config = user_config
  250. if max_ongoing_requests is not DEFAULT.VALUE:
  251. new_deployment_config.max_ongoing_requests = max_ongoing_requests
  252. if max_queued_requests is not DEFAULT.VALUE:
  253. new_deployment_config.max_queued_requests = max_queued_requests
  254. if max_constructor_retry_count is not DEFAULT.VALUE:
  255. new_deployment_config.max_constructor_retry_count = (
  256. max_constructor_retry_count
  257. )
  258. if func_or_class is None:
  259. func_or_class = self._replica_config.deployment_def
  260. if name is DEFAULT.VALUE:
  261. name = self._name
  262. if version is DEFAULT.VALUE:
  263. version = self._version
  264. if _init_args is DEFAULT.VALUE:
  265. _init_args = self._replica_config.init_args
  266. if _init_kwargs is DEFAULT.VALUE:
  267. _init_kwargs = self._replica_config.init_kwargs
  268. if ray_actor_options is DEFAULT.VALUE:
  269. ray_actor_options = self._replica_config.ray_actor_options
  270. if placement_group_bundles is DEFAULT.VALUE:
  271. placement_group_bundles = self._replica_config.placement_group_bundles
  272. if placement_group_strategy is DEFAULT.VALUE:
  273. placement_group_strategy = self._replica_config.placement_group_strategy
  274. if placement_group_bundle_label_selector is DEFAULT.VALUE:
  275. placement_group_bundle_label_selector = (
  276. self._replica_config.placement_group_bundle_label_selector
  277. )
  278. # TODO(ryanaoleary@): Add conditional check once fallback_strategy is
  279. # added to placement group options.
  280. placement_group_fallback_strategy = (
  281. self._replica_config.placement_group_fallback_strategy
  282. )
  283. if max_replicas_per_node is DEFAULT.VALUE:
  284. max_replicas_per_node = self._replica_config.max_replicas_per_node
  285. if autoscaling_config is not DEFAULT.VALUE:
  286. new_deployment_config.autoscaling_config = autoscaling_config
  287. if request_router_config is not DEFAULT.VALUE:
  288. new_deployment_config.request_router_config = request_router_config
  289. if graceful_shutdown_wait_loop_s is not DEFAULT.VALUE:
  290. new_deployment_config.graceful_shutdown_wait_loop_s = (
  291. graceful_shutdown_wait_loop_s
  292. )
  293. if graceful_shutdown_timeout_s is not DEFAULT.VALUE:
  294. new_deployment_config.graceful_shutdown_timeout_s = (
  295. graceful_shutdown_timeout_s
  296. )
  297. if health_check_period_s is not DEFAULT.VALUE:
  298. new_deployment_config.health_check_period_s = health_check_period_s
  299. if health_check_timeout_s is not DEFAULT.VALUE:
  300. new_deployment_config.health_check_timeout_s = health_check_timeout_s
  301. if logging_config is not DEFAULT.VALUE:
  302. if isinstance(logging_config, LoggingConfig):
  303. logging_config = logging_config.dict()
  304. new_deployment_config.logging_config = logging_config
  305. new_replica_config = ReplicaConfig.create(
  306. func_or_class,
  307. init_args=_init_args,
  308. init_kwargs=_init_kwargs,
  309. ray_actor_options=ray_actor_options,
  310. placement_group_bundles=placement_group_bundles,
  311. placement_group_strategy=placement_group_strategy,
  312. placement_group_bundle_label_selector=placement_group_bundle_label_selector,
  313. placement_group_fallback_strategy=placement_group_fallback_strategy,
  314. max_replicas_per_node=max_replicas_per_node,
  315. )
  316. return Deployment(
  317. name,
  318. new_deployment_config,
  319. new_replica_config,
  320. version=version,
  321. _internal=True,
  322. )
  323. def __eq__(self, other):
  324. return all(
  325. [
  326. self._name == other._name,
  327. self._version == other._version,
  328. self._deployment_config == other._deployment_config,
  329. self._replica_config.init_args == other._replica_config.init_args,
  330. self._replica_config.init_kwargs == other._replica_config.init_kwargs,
  331. self._replica_config.ray_actor_options
  332. == other._replica_config.ray_actor_options,
  333. ]
  334. )
  335. def __str__(self):
  336. return f"Deployment(name={self._name})"
  337. def __repr__(self):
  338. return str(self)
  339. def deployment_to_schema(d: Deployment) -> DeploymentSchema:
  340. """Converts a live deployment object to a corresponding structured schema.
  341. Args:
  342. d: Deployment object to convert
  343. """
  344. if d.ray_actor_options is not None:
  345. ray_actor_options_schema = RayActorOptionsSchema.parse_obj(d.ray_actor_options)
  346. else:
  347. ray_actor_options_schema = None
  348. deployment_options = {
  349. "name": d.name,
  350. "num_replicas": None
  351. if d._deployment_config.autoscaling_config
  352. else d.num_replicas,
  353. "max_ongoing_requests": d.max_ongoing_requests,
  354. "max_queued_requests": d.max_queued_requests,
  355. "user_config": d.user_config,
  356. "autoscaling_config": d._deployment_config.autoscaling_config,
  357. "graceful_shutdown_wait_loop_s": d._deployment_config.graceful_shutdown_wait_loop_s, # noqa: E501
  358. "graceful_shutdown_timeout_s": d._deployment_config.graceful_shutdown_timeout_s,
  359. "health_check_period_s": d._deployment_config.health_check_period_s,
  360. "health_check_timeout_s": d._deployment_config.health_check_timeout_s,
  361. "ray_actor_options": ray_actor_options_schema,
  362. "placement_group_strategy": d._replica_config.placement_group_strategy,
  363. "placement_group_bundles": d._replica_config.placement_group_bundles,
  364. "max_replicas_per_node": d._replica_config.max_replicas_per_node,
  365. "logging_config": d._deployment_config.logging_config,
  366. "request_router_config": d._deployment_config.request_router_config,
  367. }
  368. # Let non-user-configured options be set to defaults. If the schema
  369. # is converted back to a deployment, this lets Serve continue tracking
  370. # which options were set by the user. Name is a required field in the
  371. # schema, so it should be passed in explicitly.
  372. for option in list(deployment_options.keys()):
  373. if (
  374. option != "name"
  375. and option not in d._deployment_config.user_configured_option_names
  376. ):
  377. del deployment_options[option]
  378. # TODO(Sihan) DeploymentConfig num_replicas and auto_config can be set together
  379. # because internally we use these two field for autoscale and deploy.
  380. # We can improve the code after we separate the user faced deployment config and
  381. # internal deployment config.
  382. return DeploymentSchema(**deployment_options)
  383. def schema_to_deployment(s: DeploymentSchema) -> Deployment:
  384. """Creates a deployment with parameters specified in schema.
  385. The returned deployment CANNOT be deployed immediately. It's func_or_class
  386. value is an empty string (""), which is not a valid import path. The
  387. func_or_class value must be overwritten with a valid function or class
  388. before the deployment can be deployed.
  389. """
  390. if s.ray_actor_options is DEFAULT.VALUE:
  391. ray_actor_options = None
  392. else:
  393. ray_actor_options = s.ray_actor_options.dict(exclude_unset=True)
  394. if s.placement_group_bundles is DEFAULT.VALUE:
  395. placement_group_bundles = None
  396. else:
  397. placement_group_bundles = s.placement_group_bundles
  398. if s.placement_group_strategy is DEFAULT.VALUE:
  399. placement_group_strategy = None
  400. else:
  401. placement_group_strategy = s.placement_group_strategy
  402. if s.max_replicas_per_node is DEFAULT.VALUE:
  403. max_replicas_per_node = None
  404. else:
  405. max_replicas_per_node = s.max_replicas_per_node
  406. deployment_config = DeploymentConfig.from_default(
  407. num_replicas=s.num_replicas,
  408. user_config=s.user_config,
  409. max_ongoing_requests=s.max_ongoing_requests,
  410. max_queued_requests=s.max_queued_requests,
  411. autoscaling_config=s.autoscaling_config,
  412. graceful_shutdown_wait_loop_s=s.graceful_shutdown_wait_loop_s,
  413. graceful_shutdown_timeout_s=s.graceful_shutdown_timeout_s,
  414. health_check_period_s=s.health_check_period_s,
  415. health_check_timeout_s=s.health_check_timeout_s,
  416. logging_config=s.logging_config,
  417. request_router_config=s.request_router_config,
  418. )
  419. deployment_config.user_configured_option_names = (
  420. s._get_user_configured_option_names()
  421. )
  422. replica_config = ReplicaConfig.create(
  423. deployment_def="",
  424. init_args=(),
  425. init_kwargs={},
  426. ray_actor_options=ray_actor_options,
  427. placement_group_bundles=placement_group_bundles,
  428. placement_group_strategy=placement_group_strategy,
  429. max_replicas_per_node=max_replicas_per_node,
  430. )
  431. return Deployment(
  432. name=s.name,
  433. deployment_config=deployment_config,
  434. replica_config=replica_config,
  435. _internal=True,
  436. )