node_provider.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. import logging
  2. from types import ModuleType
  3. from typing import Any, Dict, List, Optional
  4. from ray.autoscaler._private.command_runner import DockerCommandRunner, SSHCommandRunner
  5. from ray.autoscaler.command_runner import CommandRunnerInterface
  6. from ray.util.annotations import DeveloperAPI
  7. logger = logging.getLogger(__name__)
  8. @DeveloperAPI
  9. class NodeProvider:
  10. """Interface for getting and returning nodes from a Cloud.
  11. **Important**: This is an INTERNAL API that is only exposed for the purpose
  12. of implementing custom node providers. It is not allowed to call into
  13. NodeProvider methods from any Ray package outside the autoscaler, only to
  14. define new implementations of NodeProvider for use with the "external" node
  15. provider option.
  16. NodeProviders are namespaced by the `cluster_name` parameter; they only
  17. operate on nodes within that namespace.
  18. Nodes may be in one of three states: {pending, running, terminated}. Nodes
  19. appear immediately once started by `create_node`, and transition
  20. immediately to terminated when `terminate_node` is called.
  21. Threading and concurrency:
  22. - The autoscaler calls the following methods from multiple threads
  23. (NodeLauncher, NodeUpdaterThread, autoscaler main loop, and
  24. NodeProviderAdapter executors).
  25. - These methods MUST be thread-safe:
  26. non_terminated_nodes, is_running, is_terminated, node_tags, internal_ip,
  27. external_ip, get_node_id, create_node/create_node_with_resources_and_labels,
  28. set_node_tags, terminate_node/terminate_nodes.
  29. TODO (rueian): make sure all the existing implementations are thread-safe.
  30. """
  31. def __init__(self, provider_config: Dict[str, Any], cluster_name: str) -> None:
  32. self.provider_config = provider_config
  33. self.cluster_name = cluster_name
  34. self._internal_ip_cache: Dict[str, str] = {}
  35. self._external_ip_cache: Dict[str, str] = {}
  36. def is_readonly(self) -> bool:
  37. """Returns whether this provider is readonly.
  38. Readonly node providers do not allow nodes to be created or terminated.
  39. """
  40. return False
  41. def non_terminated_nodes(self, tag_filters: Dict[str, str]) -> List[str]:
  42. """Return a list of node ids filtered by the specified tags dict.
  43. This list must not include terminated nodes. For performance reasons,
  44. providers are allowed to cache the result of a call to
  45. non_terminated_nodes() to serve single-node queries
  46. (e.g. is_running(node_id)). This means that non_terminate_nodes() must
  47. be called again to refresh results.
  48. Examples:
  49. >>> from ray.autoscaler.node_provider import NodeProvider
  50. >>> from ray.autoscaler.tags import TAG_RAY_NODE_KIND
  51. >>> provider = NodeProvider(...) # doctest: +SKIP
  52. >>> provider.non_terminated_nodes( # doctest: +SKIP
  53. ... {TAG_RAY_NODE_KIND: "worker"})
  54. ["node-1", "node-2"]
  55. """
  56. raise NotImplementedError
  57. def is_running(self, node_id: str) -> bool:
  58. """Return whether the specified node is running."""
  59. raise NotImplementedError
  60. def is_terminated(self, node_id: str) -> bool:
  61. """Return whether the specified node is terminated."""
  62. raise NotImplementedError
  63. def node_tags(self, node_id: str) -> Dict[str, str]:
  64. """Returns the tags of the given node (string dict)."""
  65. raise NotImplementedError
  66. def external_ip(self, node_id: str) -> str:
  67. """Returns the external ip of the given node."""
  68. raise NotImplementedError
  69. def internal_ip(self, node_id: str) -> str:
  70. """Returns the internal ip (Ray ip) of the given node."""
  71. raise NotImplementedError
  72. def get_node_id(self, ip_address: str, use_internal_ip: bool = False) -> str:
  73. """Returns the node_id given an IP address.
  74. Assumes ip-address is unique per node.
  75. Args:
  76. ip_address: Address of node.
  77. use_internal_ip: Whether the ip address is
  78. public or private.
  79. Raises:
  80. ValueError: If not found.
  81. """
  82. def find_node_id():
  83. if use_internal_ip:
  84. return self._internal_ip_cache.get(ip_address)
  85. else:
  86. return self._external_ip_cache.get(ip_address)
  87. if not find_node_id():
  88. all_nodes = self.non_terminated_nodes({})
  89. ip_func = self.internal_ip if use_internal_ip else self.external_ip
  90. ip_cache = (
  91. self._internal_ip_cache if use_internal_ip else self._external_ip_cache
  92. )
  93. for node_id in all_nodes:
  94. ip_cache[ip_func(node_id)] = node_id
  95. if not find_node_id():
  96. if use_internal_ip:
  97. known_msg = f"Worker internal IPs: {list(self._internal_ip_cache)}"
  98. else:
  99. known_msg = f"Worker external IP: {list(self._external_ip_cache)}"
  100. raise ValueError(f"ip {ip_address} not found. " + known_msg)
  101. return find_node_id()
  102. def create_node(
  103. self, node_config: Dict[str, Any], tags: Dict[str, str], count: int
  104. ) -> Optional[Dict[str, Any]]:
  105. """Creates a number of nodes within the namespace.
  106. Optionally returns a mapping from created node ids to node metadata.
  107. Optionally may throw a
  108. ray.autoscaler.node_launch_exception.NodeLaunchException which the
  109. autoscaler may use to provide additional functionality such as
  110. observability.
  111. """
  112. raise NotImplementedError
  113. def create_node_with_resources_and_labels(
  114. self,
  115. node_config: Dict[str, Any],
  116. tags: Dict[str, str],
  117. count: int,
  118. resources: Dict[str, float],
  119. labels: Dict[str, str],
  120. ) -> Optional[Dict[str, Any]]:
  121. """Create nodes with a given resource and label config.
  122. This is the method actually called by the autoscaler. Prefer to
  123. implement this when possible directly, otherwise it delegates to the
  124. create_node() implementation.
  125. Optionally may throw a ray.autoscaler.node_launch_exception.NodeLaunchException.
  126. """
  127. return self.create_node(node_config, tags, count)
  128. def set_node_tags(self, node_id: str, tags: Dict[str, str]) -> None:
  129. """Sets the tag values (string dict) for the specified node."""
  130. raise NotImplementedError
  131. def terminate_node(self, node_id: str) -> Optional[Dict[str, Any]]:
  132. """Terminates the specified node.
  133. Optionally return a mapping from deleted node ids to node
  134. metadata.
  135. """
  136. raise NotImplementedError
  137. def terminate_nodes(self, node_ids: List[str]) -> Optional[Dict[str, Any]]:
  138. """Terminates a set of nodes.
  139. May be overridden with a batch method, which optionally may return a
  140. mapping from deleted node ids to node metadata.
  141. """
  142. for node_id in node_ids:
  143. logger.info("NodeProvider: {}: Terminating node".format(node_id))
  144. self.terminate_node(node_id)
  145. return None
  146. @property
  147. def max_terminate_nodes(self) -> Optional[int]:
  148. """The maximum number of nodes which can be terminated in one single
  149. API request. By default, this is "None", which means that the node
  150. provider's underlying API allows infinite requests to be terminated
  151. with one request.
  152. For example, AWS only allows 1000 nodes to be terminated
  153. at once; to terminate more, we must issue multiple separate API
  154. requests. If the limit is infinity, then simply set this to None.
  155. This may be overridden. The value may be useful when overriding the
  156. "terminate_nodes" method.
  157. """
  158. return None
  159. @staticmethod
  160. def bootstrap_config(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
  161. """Bootstraps the cluster config by adding env defaults if needed."""
  162. return cluster_config
  163. def get_command_runner(
  164. self,
  165. log_prefix: str,
  166. node_id: str,
  167. auth_config: Dict[str, Any],
  168. cluster_name: str,
  169. process_runner: ModuleType,
  170. use_internal_ip: bool,
  171. docker_config: Optional[Dict[str, Any]] = None,
  172. ) -> CommandRunnerInterface:
  173. """Returns the CommandRunner class used to perform SSH commands.
  174. Args:
  175. log_prefix: stores "NodeUpdater: {}: ".format(<node_id>). Used
  176. to print progress in the CommandRunner.
  177. node_id: the node ID.
  178. auth_config: the authentication configs from the autoscaler
  179. yaml file.
  180. cluster_name: the name of the cluster.
  181. process_runner: the module to use to run the commands
  182. in the CommandRunner. E.g., subprocess.
  183. use_internal_ip: whether the node_id belongs to an internal ip
  184. or external ip.
  185. docker_config: If set, the docker information of the docker
  186. container that commands should be run on.
  187. """
  188. common_args = {
  189. "log_prefix": log_prefix,
  190. "node_id": node_id,
  191. "provider": self,
  192. "auth_config": auth_config,
  193. "cluster_name": cluster_name,
  194. "process_runner": process_runner,
  195. "use_internal_ip": use_internal_ip,
  196. }
  197. if docker_config and docker_config["container_name"] != "":
  198. return DockerCommandRunner(docker_config, **common_args)
  199. else:
  200. return SSHCommandRunner(**common_args)
  201. def prepare_for_head_node(self, cluster_config: Dict[str, Any]) -> Dict[str, Any]:
  202. """Returns a new cluster config with custom configs for head node."""
  203. return cluster_config
  204. @staticmethod
  205. def fillout_available_node_types_resources(
  206. cluster_config: Dict[str, Any]
  207. ) -> Dict[str, Any]:
  208. """Fills out missing "resources" field for available_node_types."""
  209. return cluster_config
  210. def safe_to_scale(self) -> bool:
  211. """Optional condition to determine if it's safe to proceed with an autoscaling
  212. update. Can be used to wait for convergence of state managed by an external
  213. cluster manager.
  214. Called by the autoscaler immediately after non_terminated_nodes().
  215. If False is returned, the autoscaler will abort the update.
  216. """
  217. return True
  218. def post_process(self) -> None:
  219. """This optional method is executed at the end of
  220. StandardAutoscaler._update().
  221. """
  222. pass