proxy_state.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855
  1. import asyncio
  2. import json
  3. import logging
  4. import os
  5. from abc import ABC, abstractmethod
  6. from copy import deepcopy
  7. from typing import Dict, List, Optional, Set, Tuple, Type
  8. import ray
  9. from ray import ObjectRef
  10. from ray._common.network_utils import build_address
  11. from ray._common.utils import Timer, TimerBase
  12. from ray.actor import ActorHandle
  13. from ray.exceptions import GetTimeoutError, RayActorError
  14. from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache
  15. from ray.serve._private.common import NodeId, RequestProtocol
  16. from ray.serve._private.constants import (
  17. ASYNC_CONCURRENCY,
  18. PROXY_DRAIN_CHECK_PERIOD_S,
  19. PROXY_HEALTH_CHECK_PERIOD_S,
  20. PROXY_HEALTH_CHECK_TIMEOUT_S,
  21. PROXY_HEALTH_CHECK_UNHEALTHY_THRESHOLD,
  22. PROXY_READY_CHECK_TIMEOUT_S,
  23. RAY_SERVE_ENABLE_TASK_EVENTS,
  24. REPLICA_STARTUP_SHUTDOWN_LATENCY_BUCKETS_MS,
  25. SERVE_LOGGER_NAME,
  26. SERVE_NAMESPACE,
  27. SERVE_PROXY_NAME,
  28. )
  29. from ray.serve._private.proxy import ProxyActor
  30. from ray.serve._private.utils import (
  31. format_actor_name,
  32. is_grpc_enabled,
  33. )
  34. from ray.serve.config import DeploymentMode, HTTPOptions, gRPCOptions
  35. from ray.serve.schema import (
  36. LoggingConfig,
  37. ProxyDetails,
  38. ProxyStatus,
  39. Target,
  40. )
  41. from ray.util import metrics
  42. from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
  43. logger = logging.getLogger(SERVE_LOGGER_NAME)
  44. class ProxyWrapper(ABC):
  45. @property
  46. @abstractmethod
  47. def actor_id(self) -> str:
  48. """Return the actor id of the proxy actor."""
  49. raise NotImplementedError
  50. @abstractmethod
  51. def is_ready(self, timeout_s: float) -> Optional[bool]:
  52. """Return whether proxy is ready to be serving requests.
  53. Since actual readiness check is asynchronous, this method could return
  54. any of the following statuses:
  55. - None: Readiness check is pending
  56. - True: Readiness check completed successfully (proxy is ready)
  57. - False: Readiness check completed with failure (either timing out
  58. or failing)
  59. """
  60. raise NotImplementedError
  61. @abstractmethod
  62. def is_healthy(self, timeout_s: float) -> Optional[bool]:
  63. """Return whether the proxy actor is healthy.
  64. Since actual health-check is asynchronous, this method could return
  65. either of the following statuses:
  66. - None: Health-check is pending
  67. - True: Health-check completed successfully (proxy is healthy)
  68. - False: Health-check completed with failure (either timing out or failing)
  69. """
  70. raise NotImplementedError
  71. @abstractmethod
  72. def is_drained(self, timeout_s: float) -> Optional[bool]:
  73. """Return whether the proxy actor is drained.
  74. Since actual check whether proxy is drained is asynchronous, this method could
  75. return either of the following statuses:
  76. - None: Drain-check is pending
  77. - True: Drain-check completed, node *is drained*
  78. - False: Drain-check completed, node is *NOT* drained
  79. """
  80. raise NotImplementedError
  81. @abstractmethod
  82. def is_shutdown(self):
  83. """Return whether the proxy actor is shutdown."""
  84. raise NotImplementedError
  85. @abstractmethod
  86. def update_draining(self, draining: bool):
  87. """Update the draining status of the proxy actor."""
  88. raise NotImplementedError
  89. @abstractmethod
  90. def kill(self):
  91. """Kill the proxy actor."""
  92. raise NotImplementedError
  93. class ActorProxyWrapper(ProxyWrapper):
  94. def __init__(
  95. self,
  96. logging_config: LoggingConfig,
  97. actor_handle: Optional[ActorHandle] = None,
  98. http_options: Optional[HTTPOptions] = None,
  99. grpc_options: Optional[gRPCOptions] = None,
  100. name: Optional[str] = None,
  101. node_id: Optional[str] = None,
  102. node_ip_address: Optional[str] = None,
  103. port: Optional[int] = None,
  104. proxy_actor_class: Type[ProxyActor] = ProxyActor,
  105. ):
  106. # initialize with provided proxy actor handle or get or create a new one.
  107. self._actor_handle = actor_handle or self._get_or_create_proxy_actor(
  108. http_options=http_options,
  109. grpc_options=grpc_options,
  110. name=name,
  111. node_id=node_id,
  112. node_ip_address=node_ip_address,
  113. port=port,
  114. proxy_actor_class=proxy_actor_class,
  115. logging_config=logging_config,
  116. )
  117. self._ready_check_future = None
  118. self._health_check_future = None
  119. self._drained_check_future = None
  120. self._update_draining_obj_ref = None
  121. self._node_id = node_id
  122. self.worker_id = None
  123. self.log_file_path = None
  124. @staticmethod
  125. def _get_or_create_proxy_actor(
  126. http_options: HTTPOptions,
  127. grpc_options: gRPCOptions,
  128. name: str,
  129. node_id: str,
  130. node_ip_address: str,
  131. port: int,
  132. logging_config: LoggingConfig,
  133. proxy_actor_class: Type[ProxyActor] = ProxyActor,
  134. ) -> ProxyWrapper:
  135. """Helper to start or reuse existing proxy.
  136. Takes the name of the proxy, the node id, and the node ip address, and look up
  137. or creates a new ProxyActor actor handle for the proxy.
  138. """
  139. proxy = None
  140. try:
  141. proxy = ray.get_actor(name, namespace=SERVE_NAMESPACE)
  142. except ValueError:
  143. addr = build_address(http_options.host, http_options.port)
  144. logger.info(
  145. f"Starting proxy on node '{node_id}' listening on '{addr}'.",
  146. extra={"log_to_stderr": False},
  147. )
  148. return proxy or proxy_actor_class.options(
  149. num_cpus=http_options.num_cpus,
  150. name=name,
  151. namespace=SERVE_NAMESPACE,
  152. lifetime="detached",
  153. max_concurrency=ASYNC_CONCURRENCY,
  154. max_restarts=0,
  155. scheduling_strategy=NodeAffinitySchedulingStrategy(node_id, soft=False),
  156. enable_task_events=RAY_SERVE_ENABLE_TASK_EVENTS,
  157. ).remote(
  158. http_options,
  159. grpc_options=grpc_options,
  160. node_id=node_id,
  161. node_ip_address=node_ip_address,
  162. logging_config=logging_config,
  163. )
  164. @property
  165. def actor_id(self) -> str:
  166. """Return the actor id of the proxy actor."""
  167. return self._actor_handle._actor_id.hex()
  168. @property
  169. def actor_handle(self) -> ActorHandle:
  170. """Return the actor handle of the proxy actor.
  171. This is used in _start_controller() in _private/controller.py to check whether
  172. the proxies exist. It is also used in some tests to access proxy's actor handle.
  173. """
  174. return self._actor_handle
  175. def is_ready(self, timeout_s: float) -> Optional[bool]:
  176. if self._ready_check_future is None:
  177. self._ready_check_future = wrap_as_future(
  178. self._actor_handle.ready.remote(), timeout_s=timeout_s
  179. )
  180. if not self._ready_check_future.done():
  181. return None
  182. try:
  183. worker_id, log_file_path = json.loads(self._ready_check_future.result())
  184. self.worker_id = worker_id
  185. self.log_file_path = log_file_path
  186. return True
  187. except TimeoutError:
  188. logger.warning(
  189. f"Proxy actor readiness check for proxy on {self._node_id}"
  190. f" didn't complete in {timeout_s}s."
  191. )
  192. except Exception:
  193. logger.exception(
  194. f"Unexpected error invoking readiness check for proxy"
  195. f" on {self._node_id}",
  196. )
  197. finally:
  198. self._ready_check_future = None
  199. return False
  200. def is_healthy(self, timeout_s: float) -> Optional[bool]:
  201. if self._health_check_future is None:
  202. self._health_check_future = wrap_as_future(
  203. self._actor_handle.check_health.remote(), timeout_s=timeout_s
  204. )
  205. if not self._health_check_future.done():
  206. return None
  207. try:
  208. return self._health_check_future.result()
  209. except TimeoutError:
  210. logger.warning(
  211. f"Didn't receive health check response for proxy"
  212. f" on {self._node_id} after {timeout_s}s."
  213. )
  214. except Exception:
  215. logger.exception(
  216. f"Unexpected error invoking health check for proxy "
  217. f"on {self._node_id}",
  218. )
  219. finally:
  220. self._health_check_future = None
  221. return False
  222. def is_drained(self, timeout_s: float) -> Optional[bool]:
  223. if self._drained_check_future is None:
  224. self._drained_check_future = wrap_as_future(
  225. self._actor_handle.is_drained.remote(),
  226. timeout_s=timeout_s,
  227. )
  228. if not self._drained_check_future.done():
  229. return None
  230. try:
  231. is_drained = self._drained_check_future.result()
  232. return is_drained
  233. except TimeoutError:
  234. logger.warning(
  235. f"Didn't receive drain check response for proxy"
  236. f" on {self._node_id} after {timeout_s}s."
  237. )
  238. except Exception:
  239. logger.exception(
  240. f"Unexpected error invoking drain-check for proxy "
  241. f"on {self._node_id}",
  242. )
  243. finally:
  244. self._drained_check_future = None
  245. return False
  246. def is_shutdown(self) -> bool:
  247. """Return whether the proxy actor is shutdown.
  248. If the actor is dead, the health check will return RayActorError.
  249. """
  250. try:
  251. ray.get(self._actor_handle.check_health.remote(), timeout=0)
  252. except RayActorError:
  253. # The actor is dead, so it's ready for shutdown.
  254. return True
  255. except GetTimeoutError:
  256. pass
  257. # The actor is still alive, so it's not ready for shutdown.
  258. return False
  259. def update_draining(self, draining: bool):
  260. """Update the draining status of the proxy actor."""
  261. # NOTE: All update_draining calls are implicitly serialized, by specifying
  262. # `ObjectRef` of the previous call
  263. self._update_draining_obj_ref = self._actor_handle.update_draining.remote(
  264. draining, _after=self._update_draining_obj_ref
  265. )
  266. # In case of cancelled draining, make sure pending draining check is cancelled
  267. # as well
  268. if not draining:
  269. future = self._drained_check_future
  270. self._drained_check_future = None
  271. if future:
  272. future.cancel()
  273. def kill(self):
  274. """Kill the proxy actor."""
  275. ray.kill(self._actor_handle, no_restart=True)
  276. class ProxyState:
  277. def __init__(
  278. self,
  279. actor_proxy_wrapper: ProxyWrapper,
  280. actor_name: str,
  281. node_id: str,
  282. node_ip: str,
  283. node_instance_id: str,
  284. proxy_restart_count: int = 0,
  285. timer: TimerBase = Timer(),
  286. ):
  287. self._actor_proxy_wrapper = actor_proxy_wrapper
  288. self._actor_name = actor_name
  289. self._node_id = node_id
  290. self._node_ip = node_ip
  291. self._status = ProxyStatus.STARTING
  292. self._timer = timer
  293. self._shutting_down = False
  294. self._consecutive_health_check_failures: int = 0
  295. self._proxy_restart_count = proxy_restart_count
  296. self._last_health_check_time: Optional[float] = None
  297. self._last_drain_check_time: Optional[float] = None
  298. self._actor_details = ProxyDetails(
  299. node_id=node_id,
  300. node_ip=node_ip,
  301. node_instance_id=node_instance_id,
  302. actor_id=self._actor_proxy_wrapper.actor_id,
  303. actor_name=self._actor_name,
  304. status=self._status,
  305. )
  306. # Metric to track proxy status as a numeric value
  307. # 1=STARTING, 2=HEALTHY, 3=UNHEALTHY, 4=DRAINING, 5=DRAINED (0=UNKNOWN reserved)
  308. self._status_gauge = metrics.Gauge(
  309. "serve_proxy_status",
  310. description=(
  311. "The current status of the proxy. "
  312. "1=STARTING, 2=HEALTHY, 3=UNHEALTHY, 4=DRAINING, 5=DRAINED."
  313. ),
  314. tag_keys=("node_id", "node_ip_address"),
  315. ).set_default_tags({"node_id": node_id, "node_ip_address": node_ip})
  316. # Set initial status (STARTING = 1)
  317. self._status_gauge.set(ProxyStatus.STARTING.to_numeric())
  318. # Metric to track proxy shutdown duration
  319. self._shutdown_duration_histogram = metrics.Histogram(
  320. "serve_proxy_shutdown_duration_ms",
  321. description=(
  322. "The time it takes for the proxy to shut down in milliseconds."
  323. ),
  324. boundaries=REPLICA_STARTUP_SHUTDOWN_LATENCY_BUCKETS_MS,
  325. tag_keys=("node_id", "node_ip_address"),
  326. ).set_default_tags({"node_id": node_id, "node_ip_address": node_ip})
  327. self._shutdown_start_time: Optional[float] = None
  328. @property
  329. def actor_handle(self) -> ActorHandle:
  330. return self._actor_proxy_wrapper.actor_handle
  331. @property
  332. def actor_name(self) -> str:
  333. return self._actor_name
  334. @property
  335. def actor_id(self) -> str:
  336. return self._actor_proxy_wrapper.actor_id
  337. @property
  338. def status(self) -> ProxyStatus:
  339. return self._status
  340. @property
  341. def actor_details(self) -> ProxyDetails:
  342. return self._actor_details
  343. @property
  344. def proxy_restart_count(self) -> int:
  345. return self._proxy_restart_count
  346. def _set_status(self, status: ProxyStatus) -> None:
  347. """Sets _status and updates _actor_details with the new status.
  348. NOTE: This method should not be used directly, instead please
  349. use `try_update_status` method
  350. """
  351. self._status = status
  352. self.update_actor_details(status=self._status)
  353. # Update the status gauge with the numeric value of the status
  354. self._status_gauge.set(status.to_numeric())
  355. def try_update_status(self, status: ProxyStatus):
  356. """Try update with the new status and only update when the conditions are met.
  357. Status will only set to UNHEALTHY after PROXY_HEALTH_CHECK_UNHEALTHY_THRESHOLD
  358. consecutive failures. A warning will be logged when the status is set to
  359. UNHEALTHY. Also, when status is set to HEALTHY, we will reset
  360. self._consecutive_health_check_failures to 0.
  361. """
  362. if status == ProxyStatus.UNHEALTHY:
  363. self._consecutive_health_check_failures += 1
  364. # Early return to skip setting UNHEALTHY status if there are still room for
  365. # retry.
  366. if (
  367. self._consecutive_health_check_failures
  368. < PROXY_HEALTH_CHECK_UNHEALTHY_THRESHOLD
  369. ):
  370. return
  371. else:
  372. # If all retries have been exhausted and setting the status to
  373. # UNHEALTHY, log a warning message to the user.
  374. logger.warning(
  375. f"Proxy {self._actor_name} failed the health check "
  376. f"{self._consecutive_health_check_failures} times in a row, marking"
  377. f" it unhealthy."
  378. )
  379. else:
  380. # Reset self._consecutive_health_check_failures when status is not
  381. # UNHEALTHY
  382. self._consecutive_health_check_failures = 0
  383. self._set_status(status=status)
  384. def update_actor_details(self, **kwargs) -> None:
  385. """Updates _actor_details with passed in kwargs."""
  386. details_kwargs = self._actor_details.dict()
  387. details_kwargs.update(kwargs)
  388. self._actor_details = ProxyDetails(**details_kwargs)
  389. def reconcile(self, draining: bool = False):
  390. try:
  391. self._reconcile_internal(draining)
  392. except Exception as e:
  393. self.try_update_status(ProxyStatus.UNHEALTHY)
  394. logger.error(
  395. "Unexpected error occurred when reconciling stae of "
  396. f"proxy on node {self._node_id}",
  397. exc_info=e,
  398. )
  399. def _reconcile_internal(self, draining: bool):
  400. """Update the status of the current proxy.
  401. The state machine is:
  402. STARTING -> HEALTHY or UNHEALTHY
  403. HEALTHY -> DRAINING or UNHEALTHY
  404. DRAINING -> HEALTHY or UNHEALTHY or DRAINED
  405. UNHEALTHY is a terminal state upon reaching which, Proxy is going to be
  406. restarted by the controller
  407. """
  408. if (
  409. self._shutting_down
  410. or self._status == ProxyStatus.DRAINED
  411. or self._status == ProxyStatus.UNHEALTHY
  412. ):
  413. return
  414. # Doing a linear backoff for the ready check timeout.
  415. ready_check_timeout = (
  416. self.proxy_restart_count + 1
  417. ) * PROXY_READY_CHECK_TIMEOUT_S
  418. if self._status == ProxyStatus.STARTING:
  419. is_ready_response = self._actor_proxy_wrapper.is_ready(ready_check_timeout)
  420. if is_ready_response is not None:
  421. if is_ready_response:
  422. self.try_update_status(ProxyStatus.HEALTHY)
  423. self.update_actor_details(
  424. worker_id=self._actor_proxy_wrapper.worker_id,
  425. log_file_path=self._actor_proxy_wrapper.log_file_path,
  426. status=self._status,
  427. )
  428. else:
  429. self.try_update_status(ProxyStatus.UNHEALTHY)
  430. logger.warning(
  431. f"Proxy actor reported not ready on node {self._node_id}"
  432. )
  433. else:
  434. # At this point, the proxy is either in HEALTHY or DRAINING status.
  435. assert self._status in {ProxyStatus.HEALTHY, ProxyStatus.DRAINING}
  436. should_check_health = self._last_health_check_time is None or (
  437. self._timer.time() - self._last_health_check_time
  438. >= PROXY_HEALTH_CHECK_PERIOD_S
  439. )
  440. # Perform health-check for proxy's actor (if necessary)
  441. if should_check_health:
  442. is_healthy_response = self._actor_proxy_wrapper.is_healthy(
  443. PROXY_HEALTH_CHECK_TIMEOUT_S
  444. )
  445. if is_healthy_response is not None:
  446. if is_healthy_response:
  447. # At this stage status is either HEALTHY or DRAINING, and here
  448. # we simply reset the status
  449. self.try_update_status(self._status)
  450. else:
  451. self.try_update_status(ProxyStatus.UNHEALTHY)
  452. self._last_health_check_time = self._timer.time()
  453. # Handle state transitions (if necessary)
  454. if self._status == ProxyStatus.UNHEALTHY:
  455. return
  456. elif self._status == ProxyStatus.HEALTHY:
  457. if draining:
  458. logger.info(f"Draining proxy on node '{self._node_id}'.")
  459. assert self._last_drain_check_time is None
  460. self._actor_proxy_wrapper.update_draining(draining=True)
  461. self.try_update_status(ProxyStatus.DRAINING)
  462. elif self._status == ProxyStatus.DRAINING:
  463. if not draining:
  464. logger.info(f"No longer draining proxy on node '{self._node_id}'.")
  465. self._last_drain_check_time = None
  466. self._actor_proxy_wrapper.update_draining(draining=False)
  467. self.try_update_status(ProxyStatus.HEALTHY)
  468. else:
  469. should_check_drain = self._last_drain_check_time is None or (
  470. self._timer.time() - self._last_drain_check_time
  471. >= PROXY_DRAIN_CHECK_PERIOD_S
  472. )
  473. if should_check_drain:
  474. # NOTE: We use the same timeout as for readiness checking
  475. is_drained_response = self._actor_proxy_wrapper.is_drained(
  476. PROXY_READY_CHECK_TIMEOUT_S
  477. )
  478. if is_drained_response is not None:
  479. if is_drained_response:
  480. self.try_update_status(ProxyStatus.DRAINED)
  481. self._last_drain_check_time = self._timer.time()
  482. def shutdown(self):
  483. self._shutting_down = True
  484. self._shutdown_start_time = self._timer.time()
  485. self._actor_proxy_wrapper.kill()
  486. def is_ready_for_shutdown(self) -> bool:
  487. """Return whether the proxy actor is shutdown.
  488. For a proxy actor to be considered shutdown, it must be marked as
  489. _shutting_down and the actor must be shut down.
  490. """
  491. if not self._shutting_down:
  492. return False
  493. is_shutdown = self._actor_proxy_wrapper.is_shutdown()
  494. if is_shutdown and self._shutdown_start_time is not None:
  495. shutdown_duration_ms = (
  496. self._timer.time() - self._shutdown_start_time
  497. ) * 1000
  498. self._shutdown_duration_histogram.observe(shutdown_duration_ms)
  499. self._shutdown_start_time = None # Prevent recording multiple times
  500. return is_shutdown
  501. class ProxyStateManager:
  502. """Manages all state for proxies in the system.
  503. This class is *not* thread safe, so any state-modifying methods should be
  504. called with a lock held.
  505. """
  506. def __init__(
  507. self,
  508. http_options: HTTPOptions,
  509. head_node_id: str,
  510. cluster_node_info_cache: ClusterNodeInfoCache,
  511. logging_config: LoggingConfig,
  512. grpc_options: Optional[gRPCOptions] = None,
  513. proxy_actor_class: Type[ProxyActor] = ProxyActor,
  514. actor_proxy_wrapper_class: Type[ProxyWrapper] = ActorProxyWrapper,
  515. timer: TimerBase = Timer(),
  516. ):
  517. self.logging_config = logging_config
  518. self._http_options = http_options or HTTPOptions()
  519. self._grpc_options = grpc_options or gRPCOptions()
  520. self._proxy_states: Dict[NodeId, ProxyState] = dict()
  521. self._proxy_restart_counts: Dict[NodeId, int] = dict()
  522. self._head_node_id: str = head_node_id
  523. self._proxy_actor_class = proxy_actor_class
  524. self._actor_proxy_wrapper_class = actor_proxy_wrapper_class
  525. self._timer = timer
  526. self._cluster_node_info_cache = cluster_node_info_cache
  527. assert isinstance(head_node_id, str)
  528. def reconfigure_logging_config(self, logging_config: LoggingConfig):
  529. self.logging_config = logging_config
  530. def shutdown(self) -> None:
  531. for proxy_state in self._proxy_states.values():
  532. proxy_state.shutdown()
  533. def is_ready_for_shutdown(self) -> bool:
  534. """Return whether all proxies are shutdown.
  535. Iterate through all proxy states and check if all their proxy actors
  536. are shutdown.
  537. """
  538. return all(
  539. proxy_state.is_ready_for_shutdown()
  540. for proxy_state in self._proxy_states.values()
  541. )
  542. def get_config(self) -> HTTPOptions:
  543. return self._http_options
  544. def get_grpc_config(self) -> gRPCOptions:
  545. return self._grpc_options
  546. def get_proxy_handles(self) -> Dict[NodeId, ActorHandle]:
  547. return {
  548. node_id: state.actor_handle for node_id, state in self._proxy_states.items()
  549. }
  550. def get_proxy_names(self) -> Dict[NodeId, str]:
  551. return {
  552. node_id: state.actor_name for node_id, state in self._proxy_states.items()
  553. }
  554. def get_proxy_details(self) -> Dict[NodeId, ProxyDetails]:
  555. return {
  556. node_id: state.actor_details
  557. for node_id, state in self._proxy_states.items()
  558. }
  559. def get_targets(self, protocol: RequestProtocol) -> List[Target]:
  560. """In Ray Serve, every proxy is responsible for routing requests to the
  561. correct application. Here we curate a list of targets for the given protocol.
  562. Where each target represents how to reach a proxy.
  563. Args:
  564. protocol: Either "http" or "grpc"
  565. """
  566. targets = []
  567. if protocol == RequestProtocol.HTTP:
  568. port = self._http_options.port
  569. elif protocol == RequestProtocol.GRPC:
  570. if not is_grpc_enabled(self._grpc_options):
  571. return []
  572. port = self._grpc_options.port
  573. else:
  574. raise ValueError(f"Invalid protocol: {protocol}")
  575. targets = [
  576. Target(
  577. ip=state.actor_details.node_ip,
  578. port=port,
  579. instance_id=state.actor_details.node_instance_id,
  580. name=state.actor_name,
  581. )
  582. for _, state in self._proxy_states.items()
  583. if state.actor_details.status == ProxyStatus.HEALTHY
  584. ]
  585. return targets
  586. def get_alive_proxy_actor_ids(self) -> Set[str]:
  587. return {state.actor_id for state in self._proxy_states.values()}
  588. def update(self, proxy_nodes: Set[NodeId] = None) -> Set[str]:
  589. """Update the state of all proxies.
  590. Start proxies on all nodes if not already exist and stop the proxies on nodes
  591. that are no longer exist. Update all proxy states. Kill and restart
  592. unhealthy proxies.
  593. """
  594. if proxy_nodes is None:
  595. proxy_nodes = set()
  596. target_nodes = self._get_target_nodes(proxy_nodes)
  597. target_node_ids = {node_id for node_id, _, _ in target_nodes}
  598. for node_id, proxy_state in self._proxy_states.items():
  599. draining = node_id not in target_node_ids
  600. proxy_state.reconcile(draining)
  601. self._stop_proxies_if_needed()
  602. self._start_proxies_if_needed(target_nodes)
  603. def _get_target_nodes(self, proxy_nodes) -> List[Tuple[str, str, str]]:
  604. """Return the list of (node_id, ip_address) to deploy HTTP and gRPC servers
  605. on."""
  606. location = self._http_options.location
  607. if location == DeploymentMode.NoServer:
  608. return []
  609. target_nodes = [
  610. (node_id, ip_address, instance_id)
  611. for node_id, ip_address, instance_id in self._cluster_node_info_cache.get_alive_nodes()
  612. if node_id in proxy_nodes
  613. ]
  614. if location == DeploymentMode.HeadOnly:
  615. nodes = [
  616. (node_id, ip_address, instance_id)
  617. for node_id, ip_address, instance_id in target_nodes
  618. if node_id == self._head_node_id
  619. ]
  620. assert len(nodes) == 1, (
  621. f"Head node not found! Head node id: {self._head_node_id}, "
  622. f"all nodes: {target_nodes}."
  623. )
  624. return nodes
  625. return target_nodes
  626. def _generate_actor_name(self, node_id: str) -> str:
  627. return format_actor_name(SERVE_PROXY_NAME, node_id)
  628. def _start_proxy(
  629. self,
  630. name: str,
  631. node_id: str,
  632. node_ip_address: str,
  633. ) -> ProxyWrapper:
  634. """Helper to start or reuse existing proxy and wrap in the proxy actor wrapper.
  635. Compute the HTTP port based on `TEST_WORKER_NODE_HTTP_PORT` env var and gRPC
  636. port based on `TEST_WORKER_NODE_GRPC_PORT` env var. Passed all the required
  637. variables into the proxy actor wrapper class and return the proxy actor wrapper.
  638. """
  639. http_options = self._http_options
  640. grpc_options = self._grpc_options
  641. if (
  642. node_id != self._head_node_id
  643. and os.getenv("TEST_WORKER_NODE_HTTP_PORT") is not None
  644. ):
  645. logger.warning(
  646. f"`TEST_WORKER_NODE_HTTP_PORT` env var is set. "
  647. f"Using it for worker node {node_id}."
  648. )
  649. http_options = deepcopy(http_options)
  650. http_options.port = int(os.getenv("TEST_WORKER_NODE_HTTP_PORT"))
  651. if (
  652. node_id != self._head_node_id
  653. and os.getenv("TEST_WORKER_NODE_GRPC_PORT") is not None
  654. ):
  655. logger.warning(
  656. f"`TEST_WORKER_NODE_GRPC_PORT` env var is set. "
  657. f"Using it for worker node {node_id}."
  658. f"{int(os.getenv('TEST_WORKER_NODE_GRPC_PORT'))}"
  659. )
  660. grpc_options = deepcopy(grpc_options)
  661. grpc_options.port = int(os.getenv("TEST_WORKER_NODE_GRPC_PORT"))
  662. return self._actor_proxy_wrapper_class(
  663. logging_config=self.logging_config,
  664. http_options=http_options,
  665. grpc_options=grpc_options,
  666. name=name,
  667. node_id=node_id,
  668. node_ip_address=node_ip_address,
  669. proxy_actor_class=self._proxy_actor_class,
  670. )
  671. def _start_proxies_if_needed(self, target_nodes) -> None:
  672. """Start a proxy on every node if it doesn't already exist."""
  673. for node_id, node_ip_address, node_instance_id in target_nodes:
  674. if node_id in self._proxy_states:
  675. continue
  676. name = self._generate_actor_name(node_id=node_id)
  677. actor_proxy_wrapper = self._start_proxy(
  678. name=name,
  679. node_id=node_id,
  680. node_ip_address=node_ip_address,
  681. )
  682. self._proxy_states[node_id] = ProxyState(
  683. actor_proxy_wrapper=actor_proxy_wrapper,
  684. actor_name=name,
  685. node_id=node_id,
  686. node_ip=node_ip_address,
  687. node_instance_id=node_instance_id,
  688. proxy_restart_count=self._proxy_restart_counts.get(node_id, 0),
  689. timer=self._timer,
  690. )
  691. def _stop_proxies_if_needed(self) -> bool:
  692. """Removes proxy actors.
  693. Removes proxy actors from any nodes that no longer exist or unhealthy proxy.
  694. """
  695. alive_node_ids = self._cluster_node_info_cache.get_alive_node_ids()
  696. to_stop = []
  697. for node_id, proxy_state in self._proxy_states.items():
  698. if node_id not in alive_node_ids:
  699. logger.info(f"Removing proxy on removed node '{node_id}'.")
  700. to_stop.append(node_id)
  701. elif proxy_state.status == ProxyStatus.UNHEALTHY:
  702. logger.info(
  703. f"Proxy on node '{node_id}' is unhealthy. Shutting down "
  704. "the unhealthy proxy and starting a new one."
  705. )
  706. to_stop.append(node_id)
  707. elif proxy_state.status == ProxyStatus.DRAINED:
  708. logger.info(f"Removing drained proxy on node '{node_id}'.")
  709. to_stop.append(node_id)
  710. for node_id in to_stop:
  711. proxy_state = self._proxy_states.pop(node_id)
  712. self._proxy_restart_counts[node_id] = proxy_state.proxy_restart_count + 1
  713. proxy_state.shutdown()
  714. def _try_set_exception(fut: asyncio.Future, e: Exception):
  715. if not fut.done():
  716. fut.set_exception(e)
  717. def wrap_as_future(ref: ObjectRef, timeout_s: Optional[float] = None) -> asyncio.Future:
  718. loop = asyncio.get_running_loop()
  719. aio_fut = asyncio.wrap_future(ref.future())
  720. if timeout_s is not None:
  721. assert timeout_s >= 0, "Timeout value should be non-negative"
  722. # Schedule handler to complete future exceptionally
  723. timeout_handler = loop.call_later(
  724. max(timeout_s, 0),
  725. _try_set_exception,
  726. aio_fut,
  727. TimeoutError(f"Future cancelled after timeout {timeout_s}s"),
  728. )
  729. # Cancel timeout handler upon completion of the future
  730. aio_fut.add_done_callback(lambda _: timeout_handler.cancel())
  731. return aio_fut