endpoint_state.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import logging
  2. from typing import Any, Dict, Optional
  3. from ray import cloudpickle
  4. from ray.serve._private.common import DeploymentID, EndpointInfo
  5. from ray.serve._private.constants import SERVE_LOGGER_NAME
  6. from ray.serve._private.long_poll import LongPollHost, LongPollNamespace
  7. from ray.serve._private.storage.kv_store import KVStoreBase
  8. CHECKPOINT_KEY = "serve-endpoint-state-checkpoint"
  9. logger = logging.getLogger(SERVE_LOGGER_NAME)
  10. class EndpointState:
  11. """Manages all state for endpoints in the system.
  12. This class is *not* thread safe, so any state-modifying methods should be
  13. called with a lock held.
  14. """
  15. def __init__(self, kv_store: KVStoreBase, long_poll_host: LongPollHost):
  16. self._kv_store = kv_store
  17. self._long_poll_host = long_poll_host
  18. self._endpoints: Dict[DeploymentID, EndpointInfo] = dict()
  19. checkpoint = self._kv_store.get(CHECKPOINT_KEY)
  20. if checkpoint is not None:
  21. self._endpoints = cloudpickle.loads(checkpoint)
  22. self._notify_route_table_changed()
  23. def shutdown(self):
  24. self._kv_store.delete(CHECKPOINT_KEY)
  25. def is_ready_for_shutdown(self) -> bool:
  26. """Returns whether the endpoint checkpoint has been deleted.
  27. Get the endpoint checkpoint from the kv store. If it is None, then it has been
  28. deleted.
  29. """
  30. return self._kv_store.get(CHECKPOINT_KEY) is None
  31. def _checkpoint(self):
  32. self._kv_store.put(CHECKPOINT_KEY, cloudpickle.dumps(self._endpoints))
  33. def _notify_route_table_changed(self):
  34. self._long_poll_host.notify_changed(
  35. {LongPollNamespace.ROUTE_TABLE: self._endpoints}
  36. )
  37. def _get_endpoint_for_route(self, route: str) -> Optional[DeploymentID]:
  38. for endpoint, info in self._endpoints.items():
  39. if info.route == route:
  40. return endpoint
  41. return None
  42. def update_endpoint(
  43. self, endpoint: DeploymentID, endpoint_info: EndpointInfo
  44. ) -> None:
  45. """Create or update the given endpoint.
  46. This method is idempotent - if the endpoint already exists it will be
  47. updated to match the given parameters. Calling this twice with the same
  48. arguments is a no-op.
  49. """
  50. if self._endpoints.get(endpoint) == endpoint_info:
  51. return
  52. existing_route_endpoint = self._get_endpoint_for_route(endpoint_info.route)
  53. if existing_route_endpoint is not None and existing_route_endpoint != endpoint:
  54. logger.debug(
  55. f'route_prefix "{endpoint_info.route}" is currently '
  56. f'registered to deployment "{existing_route_endpoint.name}". '
  57. f'Re-registering route_prefix "{endpoint_info.route}" to '
  58. f'deployment "{endpoint.name}".'
  59. )
  60. del self._endpoints[existing_route_endpoint]
  61. self._endpoints[endpoint] = endpoint_info
  62. self._checkpoint()
  63. self._notify_route_table_changed()
  64. def get_endpoint_route(self, endpoint: DeploymentID) -> Optional[str]:
  65. if endpoint in self._endpoints:
  66. return self._endpoints[endpoint].route
  67. return None
  68. def get_endpoints(self) -> Dict[DeploymentID, Dict[str, Any]]:
  69. endpoints = {}
  70. for endpoint, info in self._endpoints.items():
  71. endpoints[endpoint] = {
  72. "route": info.route,
  73. }
  74. return endpoints
  75. def delete_endpoint(self, endpoint: DeploymentID) -> None:
  76. # This method must be idempotent. We should validate that the
  77. # specified endpoint exists on the client.
  78. if endpoint not in self._endpoints:
  79. return
  80. del self._endpoints[endpoint]
  81. self._checkpoint()
  82. self._notify_route_table_changed()