build_app.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import logging
  2. from copy import deepcopy
  3. from dataclasses import dataclass
  4. from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union
  5. from ray.dag.py_obj_scanner import _PyObjScanner
  6. from ray.serve._private.constants import SERVE_LOGGER_NAME
  7. from ray.serve.deployment import Application, Deployment
  8. from ray.serve.handle import DeploymentHandle
  9. from ray.serve.schema import LoggingConfig
  10. logger = logging.getLogger(SERVE_LOGGER_NAME)
  11. K = TypeVar("K")
  12. V = TypeVar("V")
  13. class IDDict(dict, Generic[K, V]):
  14. """Dictionary that uses id() for keys instead of hash().
  15. This is necessary because Application objects aren't hashable and we want each
  16. instance to map to a unique key.
  17. """
  18. def __getitem__(self, key: K) -> V:
  19. if not isinstance(key, int):
  20. key = id(key)
  21. return super().__getitem__(key)
  22. def __setitem__(self, key: K, value: V):
  23. if not isinstance(key, int):
  24. key = id(key)
  25. return super().__setitem__(key, value)
  26. def __delitem__(self, key: K):
  27. if not isinstance(key, int):
  28. key = id(key)
  29. return super().__delitem__(key)
  30. def __contains__(self, key: object):
  31. if not isinstance(key, int):
  32. key = id(key)
  33. return super().__contains__(key)
  34. @dataclass(frozen=True)
  35. class BuiltApplication:
  36. # Name of the application.
  37. name: str
  38. route_prefix: Optional[str]
  39. logging_config: Optional[LoggingConfig]
  40. # Name of the application's 'ingress' deployment
  41. # (the one exposed over gRPC/HTTP/handle).
  42. ingress_deployment_name: str
  43. # List of unique deployments comprising the app.
  44. deployments: List[Deployment]
  45. # Dict[name, DeploymentHandle] mapping deployment names to the handles that replaced
  46. # them in other deployments' init args/kwargs.
  47. deployment_handles: Dict[str, DeploymentHandle]
  48. external_scaler_enabled: bool
  49. def _make_deployment_handle_default(
  50. deployment: Deployment, app_name: str
  51. ) -> DeploymentHandle:
  52. return DeploymentHandle(
  53. deployment.name,
  54. app_name=app_name,
  55. )
  56. def build_app(
  57. app: Application,
  58. *,
  59. name: str,
  60. route_prefix: Optional[str] = None,
  61. logging_config: Optional[Union[Dict, LoggingConfig]] = None,
  62. default_runtime_env: Optional[Dict[str, Any]] = None,
  63. make_deployment_handle: Optional[
  64. Callable[[Deployment, str], DeploymentHandle]
  65. ] = None,
  66. external_scaler_enabled: bool = False,
  67. ) -> BuiltApplication:
  68. """Builds the application into a list of finalized deployments.
  69. The following transformations are made:
  70. - Application objects in constructor args/kwargs are converted to
  71. DeploymentHandles for injection at runtime.
  72. - Name conflicts from deployments that use the same class are handled
  73. by appending a monotonically increasing suffix (e.g., SomeClass_1).
  74. Returns: BuiltApplication
  75. """
  76. if make_deployment_handle is None:
  77. make_deployment_handle = _make_deployment_handle_default
  78. handles = IDDict()
  79. deployment_names = IDDict()
  80. deployments = _build_app_recursive(
  81. app,
  82. app_name=name,
  83. handles=handles,
  84. deployment_names=deployment_names,
  85. default_runtime_env=default_runtime_env,
  86. make_deployment_handle=make_deployment_handle,
  87. )
  88. return BuiltApplication(
  89. name=name,
  90. route_prefix=route_prefix,
  91. logging_config=logging_config,
  92. ingress_deployment_name=deployment_names[app],
  93. deployments=deployments,
  94. deployment_handles={
  95. deployment_names[app]: handle for app, handle in handles.items()
  96. },
  97. external_scaler_enabled=external_scaler_enabled,
  98. )
  99. def _build_app_recursive(
  100. app: Application,
  101. *,
  102. app_name: str,
  103. deployment_names: IDDict[Application, str],
  104. handles: IDDict[Application, DeploymentHandle],
  105. default_runtime_env: Optional[Dict[str, Any]] = None,
  106. make_deployment_handle: Callable[[Deployment, str], DeploymentHandle],
  107. ) -> List[Deployment]:
  108. """Recursively traverses the graph of Application objects.
  109. Each Application will have an associated DeploymentHandle created that will replace
  110. it in any occurrences in other Applications' args or kwargs.
  111. Also collects a list of the unique Applications encountered and returns them as
  112. deployable Deployment objects.
  113. """
  114. # This application has already been encountered.
  115. # There's no need to recurse into its child args and we don't want to create
  116. # a duplicate entry for it in the list of deployments.
  117. if app in handles:
  118. return []
  119. deployments = []
  120. scanner = _PyObjScanner(source_type=Application)
  121. try:
  122. # Recursively traverse any Application objects bound to init args/kwargs.
  123. child_apps = scanner.find_nodes(
  124. (app._bound_deployment.init_args, app._bound_deployment.init_kwargs)
  125. )
  126. for child_app in child_apps:
  127. deployments.extend(
  128. _build_app_recursive(
  129. child_app,
  130. app_name=app_name,
  131. handles=handles,
  132. deployment_names=deployment_names,
  133. make_deployment_handle=make_deployment_handle,
  134. default_runtime_env=default_runtime_env,
  135. )
  136. )
  137. # Replace Application objects with their corresponding DeploymentHandles.
  138. new_init_args, new_init_kwargs = scanner.replace_nodes(handles)
  139. final_deployment = app._bound_deployment.options(
  140. name=_get_unique_deployment_name_memoized(app, deployment_names),
  141. _init_args=new_init_args,
  142. _init_kwargs=new_init_kwargs,
  143. )
  144. final_deployment = _set_default_runtime_env(
  145. final_deployment, default_runtime_env
  146. )
  147. # Create the DeploymentHandle that will be used to replace this application
  148. # in the arguments of its parent(s).
  149. handles[app] = make_deployment_handle(
  150. final_deployment,
  151. app_name,
  152. )
  153. return deployments + [final_deployment]
  154. finally:
  155. scanner.clear()
  156. def _set_default_runtime_env(
  157. d: Deployment, default_runtime_env: Optional[Dict[str, Any]]
  158. ) -> Deployment:
  159. """Configures the deployment with the provided default runtime_env.
  160. If the deployment does not have a runtime_env configured, the default will be set.
  161. If it does have a runtime_env configured but that runtime_env does not have a
  162. working_dir, only the working_dir field will be set.
  163. Else the deployment's runtime_env will be left untouched.
  164. """
  165. if not default_runtime_env:
  166. return d
  167. ray_actor_options = deepcopy(d.ray_actor_options or {})
  168. default_working_dir = default_runtime_env.get("working_dir", None)
  169. if "runtime_env" not in ray_actor_options:
  170. ray_actor_options["runtime_env"] = default_runtime_env
  171. elif default_working_dir is not None:
  172. ray_actor_options["runtime_env"].setdefault("working_dir", default_working_dir)
  173. return d.options(ray_actor_options=ray_actor_options)
  174. def _get_unique_deployment_name_memoized(
  175. app: Application, deployment_names: IDDict[Application, str]
  176. ) -> str:
  177. """Generates a name for the deployment.
  178. This is used to handle collisions when the user does not specify a name
  179. explicitly, so typically we'd use the class name as the default.
  180. In that case, we append a monotonically increasing suffix to the name, e.g.,
  181. Deployment, then Deployment_1, then Deployment_2, ...
  182. Names are memoized in the `deployment_names` dict, which should be passed to
  183. subsequent calls to this function.
  184. """
  185. if app in deployment_names:
  186. return deployment_names[app]
  187. idx = 1
  188. name = app._bound_deployment.name
  189. while name in deployment_names.values():
  190. name = f"{app._bound_deployment.name}_{idx}"
  191. idx += 1
  192. if idx != 1:
  193. logger.warning(
  194. "There are multiple deployments with the same name "
  195. f"'{app._bound_deployment.name}'. Renaming one to '{name}'."
  196. )
  197. deployment_names[app] = name
  198. return name