default_impl.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. import asyncio
  2. from typing import Callable, Optional, Tuple
  3. import ray
  4. from ray._common.constants import HEAD_NODE_RESOURCE_NAME
  5. from ray._raylet import GcsClient
  6. from ray.serve._private.cluster_node_info_cache import (
  7. ClusterNodeInfoCache,
  8. DefaultClusterNodeInfoCache,
  9. )
  10. from ray.serve._private.common import (
  11. CreatePlacementGroupRequest,
  12. DeploymentHandleSource,
  13. DeploymentID,
  14. EndpointInfo,
  15. RequestMetadata,
  16. RequestProtocol,
  17. )
  18. from ray.serve._private.constants import (
  19. CONTROLLER_MAX_CONCURRENCY,
  20. RAY_SERVE_ENABLE_TASK_EVENTS,
  21. RAY_SERVE_PROXY_PREFER_LOCAL_NODE_ROUTING,
  22. RAY_SERVE_PROXY_USE_GRPC,
  23. RAY_SERVE_RUN_ROUTER_IN_SEPARATE_LOOP,
  24. SERVE_CONTROLLER_NAME,
  25. SERVE_NAMESPACE,
  26. )
  27. from ray.serve._private.deployment_scheduler import (
  28. DefaultDeploymentScheduler,
  29. DeploymentScheduler,
  30. )
  31. from ray.serve._private.event_loop_monitoring import EventLoopMonitor
  32. from ray.serve._private.grpc_util import gRPCGenericServer
  33. from ray.serve._private.handle_options import DynamicHandleOptions, InitHandleOptions
  34. from ray.serve._private.router import CurrentLoopRouter, Router, SingletonThreadRouter
  35. from ray.serve._private.utils import (
  36. asyncio_grpc_exception_handler,
  37. generate_request_id,
  38. get_current_actor_id,
  39. get_head_node_id,
  40. inside_ray_client_context,
  41. resolve_deployment_response,
  42. )
  43. from ray.util.placement_group import PlacementGroup
  44. # NOTE: Please read carefully before changing!
  45. #
  46. # These methods are common extension points, therefore these should be
  47. # changed as a Developer API, ie methods should not be renamed, have their
  48. # API modified w/o substantial enough justification
  49. def create_cluster_node_info_cache(gcs_client: GcsClient) -> ClusterNodeInfoCache:
  50. return DefaultClusterNodeInfoCache(gcs_client)
  51. CreatePlacementGroupFn = Callable[[CreatePlacementGroupRequest], PlacementGroup]
  52. def _default_create_placement_group(
  53. request: CreatePlacementGroupRequest,
  54. ) -> PlacementGroup:
  55. return ray.util.placement_group(
  56. request.bundles,
  57. request.strategy,
  58. _soft_target_node_id=request.target_node_id,
  59. name=request.name,
  60. lifetime="detached",
  61. bundle_label_selector=request.bundle_label_selector,
  62. )
  63. def create_deployment_scheduler(
  64. cluster_node_info_cache: ClusterNodeInfoCache,
  65. head_node_id_override: Optional[str] = None,
  66. create_placement_group_fn_override: Optional[CreatePlacementGroupFn] = None,
  67. ) -> DeploymentScheduler:
  68. head_node_id = head_node_id_override or get_head_node_id()
  69. return DefaultDeploymentScheduler(
  70. cluster_node_info_cache,
  71. head_node_id,
  72. create_placement_group_fn=create_placement_group_fn_override
  73. or _default_create_placement_group,
  74. )
  75. def create_replica_impl(**kwargs):
  76. from ray.serve._private.replica import Replica
  77. return Replica(**kwargs)
  78. def create_replica_metrics_manager(**kwargs):
  79. from ray.serve._private.replica import ReplicaMetricsManager
  80. return ReplicaMetricsManager(**kwargs)
  81. def create_dynamic_handle_options(**kwargs):
  82. return DynamicHandleOptions(**kwargs)
  83. def create_init_handle_options(**kwargs):
  84. return InitHandleOptions.create(**kwargs)
  85. def get_request_metadata(init_options, handle_options):
  86. _request_context = ray.serve.context._get_serve_request_context()
  87. request_protocol = RequestProtocol.UNDEFINED
  88. if init_options and init_options._source == DeploymentHandleSource.PROXY:
  89. if _request_context.is_http_request:
  90. request_protocol = RequestProtocol.HTTP
  91. elif _request_context.grpc_context:
  92. request_protocol = RequestProtocol.GRPC
  93. return RequestMetadata(
  94. request_id=_request_context.request_id
  95. if _request_context.request_id
  96. else generate_request_id(),
  97. internal_request_id=_request_context._internal_request_id
  98. if _request_context._internal_request_id
  99. else generate_request_id(),
  100. call_method=handle_options.method_name,
  101. route=_request_context.route,
  102. app_name=_request_context.app_name,
  103. multiplexed_model_id=handle_options.multiplexed_model_id,
  104. is_streaming=handle_options.stream,
  105. _request_protocol=request_protocol,
  106. grpc_context=_request_context.grpc_context,
  107. _by_reference=handle_options._by_reference,
  108. _on_separate_loop=init_options._run_router_in_separate_loop,
  109. request_serialization=handle_options.request_serialization,
  110. response_serialization=handle_options.response_serialization,
  111. )
  112. def _get_node_id_and_az() -> Tuple[str, Optional[str]]:
  113. node_id = ray.get_runtime_context().get_node_id()
  114. try:
  115. cluster_node_info_cache = create_cluster_node_info_cache(
  116. GcsClient(address=ray.get_runtime_context().gcs_address)
  117. )
  118. cluster_node_info_cache.update()
  119. az = cluster_node_info_cache.get_node_az(node_id)
  120. except Exception:
  121. az = None
  122. return node_id, az
  123. # Interface definition for create_router.
  124. CreateRouterCallable = Callable[[str, DeploymentID, InitHandleOptions], Router]
  125. def create_router(
  126. handle_id: str,
  127. deployment_id: DeploymentID,
  128. handle_options: InitHandleOptions,
  129. request_router_class: Optional[Callable] = None,
  130. ) -> Router:
  131. # NOTE(edoakes): this is lazy due to a nasty circular import that should be fixed.
  132. from ray.serve.context import _get_global_client
  133. actor_id = get_current_actor_id()
  134. node_id, availability_zone = _get_node_id_and_az()
  135. controller_handle = _get_global_client()._controller
  136. is_inside_ray_client_context = inside_ray_client_context()
  137. if handle_options._run_router_in_separate_loop:
  138. router_wrapper_cls = SingletonThreadRouter
  139. # Determine the component for the event loop monitor
  140. if handle_options._source == DeploymentHandleSource.REPLICA:
  141. component = EventLoopMonitor.COMPONENT_REPLICA
  142. elif handle_options._source == DeploymentHandleSource.PROXY:
  143. component = EventLoopMonitor.COMPONENT_PROXY
  144. else:
  145. component = EventLoopMonitor.COMPONENT_UNKNOWN
  146. SingletonThreadRouter._get_singleton_asyncio_loop(
  147. component
  148. ).set_exception_handler(asyncio_grpc_exception_handler)
  149. else:
  150. try:
  151. asyncio.get_running_loop()
  152. except RuntimeError:
  153. raise RuntimeError(
  154. "No event loop running. You cannot use a handle initialized with "
  155. "`_run_router_in_separate_loop=False` when not inside an asyncio event "
  156. "loop."
  157. )
  158. router_wrapper_cls = CurrentLoopRouter
  159. return router_wrapper_cls(
  160. controller_handle=controller_handle,
  161. deployment_id=deployment_id,
  162. handle_id=handle_id,
  163. self_actor_id=actor_id,
  164. handle_source=handle_options._source,
  165. request_router_class=request_router_class,
  166. # Streaming ObjectRefGenerators are not supported in Ray Client
  167. enable_strict_max_ongoing_requests=not is_inside_ray_client_context,
  168. resolve_request_arg_func=resolve_deployment_response,
  169. node_id=node_id,
  170. availability_zone=availability_zone,
  171. prefer_local_node_routing=handle_options._prefer_local_routing,
  172. )
  173. def add_grpc_address(grpc_server: gRPCGenericServer, server_address: str):
  174. """Helper function to add an address to a gRPC server."""
  175. grpc_server.add_insecure_port(server_address)
  176. def get_proxy_handle(endpoint: DeploymentID, info: EndpointInfo):
  177. # NOTE(zcin): needs to be lazy import due to a circular dependency.
  178. # We should not be importing from application_state in context.
  179. from ray.serve.context import _get_global_client
  180. client = _get_global_client()
  181. handle = client.get_handle(endpoint.name, endpoint.app_name, check_exists=True)
  182. # NOTE(zcin): It's possible that a handle is already initialized
  183. # if a deployment with the same name and application name was
  184. # deleted, then redeployed later. However this is not an issue since
  185. # we initialize all handles with the same init options.
  186. if not handle.is_initialized:
  187. # NOTE(zcin): since the router is eagerly initialized here, the
  188. # proxy will receive the replica set from the controller early.
  189. handle._init(
  190. _prefer_local_routing=RAY_SERVE_PROXY_PREFER_LOCAL_NODE_ROUTING,
  191. _source=DeploymentHandleSource.PROXY,
  192. _run_router_in_separate_loop=RAY_SERVE_RUN_ROUTER_IN_SEPARATE_LOOP,
  193. )
  194. return handle.options(
  195. stream=not info.app_is_cross_language,
  196. _by_reference=not RAY_SERVE_PROXY_USE_GRPC,
  197. )
  198. def get_controller_impl():
  199. from ray.serve._private.controller import ServeController
  200. controller_impl = ray.remote(
  201. name=SERVE_CONTROLLER_NAME,
  202. namespace=SERVE_NAMESPACE,
  203. num_cpus=0,
  204. lifetime="detached",
  205. max_restarts=-1,
  206. max_task_retries=-1,
  207. resources={HEAD_NODE_RESOURCE_NAME: 0.001},
  208. max_concurrency=CONTROLLER_MAX_CONCURRENCY,
  209. enable_task_events=RAY_SERVE_ENABLE_TASK_EVENTS,
  210. )(ServeController)
  211. return controller_impl