| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- import logging
- from types import ModuleType
- from typing import Any, Dict, List, Optional
- from ray.autoscaler._private.command_runner import DockerCommandRunner, SSHCommandRunner
- from ray.autoscaler.command_runner import CommandRunnerInterface
- from ray.util.annotations import DeveloperAPI
- logger = logging.getLogger(__name__)
- @DeveloperAPI
- class NodeProvider:
- """Interface for getting and returning nodes from a Cloud.
- **Important**: This is an INTERNAL API that is only exposed for the purpose
- of implementing custom node providers. It is not allowed to call into
- NodeProvider methods from any Ray package outside the autoscaler, only to
- define new implementations of NodeProvider for use with the "external" node
- provider option.
- NodeProviders are namespaced by the `cluster_name` parameter; they only
- operate on nodes within that namespace.
- Nodes may be in one of three states: {pending, running, terminated}. Nodes
- appear immediately once started by `create_node`, and transition
- immediately to terminated when `terminate_node` is called.
- Threading and concurrency:
- - The autoscaler calls the following methods from multiple threads
- (NodeLauncher, NodeUpdaterThread, autoscaler main loop, and
- NodeProviderAdapter executors).
- - These methods MUST be thread-safe:
- non_terminated_nodes, is_running, is_terminated, node_tags, internal_ip,
- external_ip, get_node_id, create_node/create_node_with_resources_and_labels,
- set_node_tags, terminate_node/terminate_nodes.
- TODO (rueian): make sure all the existing implementations are thread-safe.
- """
- def __init__(self, provider_config: Dict[str, Any], cluster_name: str) -> None:
- self.provider_config = provider_config
- self.cluster_name = cluster_name
- self._internal_ip_cache: Dict[str, str] = {}
- self._external_ip_cache: Dict[str, str] = {}
- def is_readonly(self) -> bool:
- """Returns whether this provider is readonly.
- Readonly node providers do not allow nodes to be created or terminated.
- """
- return False
- def non_terminated_nodes(self, tag_filters: Dict[str, str]) -> List[str]:
- """Return a list of node ids filtered by the specified tags dict.
- This list must not include terminated nodes. For performance reasons,
- providers are allowed to cache the result of a call to
- non_terminated_nodes() to serve single-node queries
- (e.g. is_running(node_id)). This means that non_terminate_nodes() must
- be called again to refresh results.
- Examples:
- >>> from ray.autoscaler.node_provider import NodeProvider
- >>> from ray.autoscaler.tags import TAG_RAY_NODE_KIND
- >>> provider = NodeProvider(...) # doctest: +SKIP
- >>> provider.non_terminated_nodes( # doctest: +SKIP
- ... {TAG_RAY_NODE_KIND: "worker"})
- ["node-1", "node-2"]
- """
- raise NotImplementedError
- def is_running(self, node_id: str) -> bool:
- """Return whether the specified node is running."""
- raise NotImplementedError
- def is_terminated(self, node_id: str) -> bool:
- """Return whether the specified node is terminated."""
- raise NotImplementedError
- def node_tags(self, node_id: str) -> Dict[str, str]:
- """Returns the tags of the given node (string dict)."""
- raise NotImplementedError
- def external_ip(self, node_id: str) -> str:
- """Returns the external ip of the given node."""
- raise NotImplementedError
- def internal_ip(self, node_id: str) -> str:
- """Returns the internal ip (Ray ip) of the given node."""
- raise NotImplementedError
- def get_node_id(self, ip_address: str, use_internal_ip: bool = False) -> str:
- """Returns the node_id given an IP address.
- Assumes ip-address is unique per node.
- Args:
- ip_address: Address of node.
- use_internal_ip: Whether the ip address is
- public or private.
- Raises:
- ValueError: If not found.
- """
- def find_node_id():
- if use_internal_ip:
- return self._internal_ip_cache.get(ip_address)
- else:
- return self._external_ip_cache.get(ip_address)
- if not find_node_id():
- all_nodes = self.non_terminated_nodes({})
- ip_func = self.internal_ip if use_internal_ip else self.external_ip
- ip_cache = (
- self._internal_ip_cache if use_internal_ip else self._external_ip_cache
- )
- for node_id in all_nodes:
- ip_cache[ip_func(node_id)] = node_id
- if not find_node_id():
- if use_internal_ip:
- known_msg = f"Worker internal IPs: {list(self._internal_ip_cache)}"
- else:
- known_msg = f"Worker external IP: {list(self._external_ip_cache)}"
- raise ValueError(f"ip {ip_address} not found. " + known_msg)
- return find_node_id()
- def create_node(
- self, node_config: Dict[str, Any], tags: Dict[str, str], count: int
- ) -> Optional[Dict[str, Any]]:
- """Creates a number of nodes within the namespace.
- Optionally returns a mapping from created node ids to node metadata.
- Optionally may throw a
- ray.autoscaler.node_launch_exception.NodeLaunchException which the
- autoscaler may use to provide additional functionality such as
- observability.
- """
- raise NotImplementedError
- def create_node_with_resources_and_labels(
- self,
- node_config: Dict[str, Any],
- tags: Dict[str, str],
- count: int,
- resources: Dict[str, float],
- labels: Dict[str, str],
- ) -> Optional[Dict[str, Any]]:
- """Create nodes with a given resource and label config.
- This is the method actually called by the autoscaler. Prefer to
- implement this when possible directly, otherwise it delegates to the
- create_node() implementation.
- Optionally may throw a ray.autoscaler.node_launch_exception.NodeLaunchException.
- """
- return self.create_node(node_config, tags, count)
- def set_node_tags(self, node_id: str, tags: Dict[str, str]) -> None:
- """Sets the tag values (string dict) for the specified node."""
- raise NotImplementedError
- def terminate_node(self, node_id: str) -> Optional[Dict[str, Any]]:
- """Terminates the specified node.
- Optionally return a mapping from deleted node ids to node
- metadata.
- """
- raise NotImplementedError
- def terminate_nodes(self, node_ids: List[str]) -> Optional[Dict[str, Any]]:
- """Terminates a set of nodes.
- May be overridden with a batch method, which optionally may return a
- mapping from deleted node ids to node metadata.
- """
- for node_id in node_ids:
- logger.info("NodeProvider: {}: Terminating node".format(node_id))
- self.terminate_node(node_id)
- return None
- @property
- def max_terminate_nodes(self) -> Optional[int]:
- """The maximum number of nodes which can be terminated in one single
- API request. By default, this is "None", which means that the node
- provider's underlying API allows infinite requests to be terminated
- with one request.
- For example, AWS only allows 1000 nodes to be terminated
- at once; to terminate more, we must issue multiple separate API
- requests. If the limit is infinity, then simply set this to None.
- This may be overridden. The value may be useful when overriding the
- "terminate_nodes" method.
- """
- return None
- @staticmethod
- def bootstrap_config(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
- """Bootstraps the cluster config by adding env defaults if needed."""
- return cluster_config
- def get_command_runner(
- self,
- log_prefix: str,
- node_id: str,
- auth_config: Dict[str, Any],
- cluster_name: str,
- process_runner: ModuleType,
- use_internal_ip: bool,
- docker_config: Optional[Dict[str, Any]] = None,
- ) -> CommandRunnerInterface:
- """Returns the CommandRunner class used to perform SSH commands.
- Args:
- log_prefix: stores "NodeUpdater: {}: ".format(<node_id>). Used
- to print progress in the CommandRunner.
- node_id: the node ID.
- auth_config: the authentication configs from the autoscaler
- yaml file.
- cluster_name: the name of the cluster.
- process_runner: the module to use to run the commands
- in the CommandRunner. E.g., subprocess.
- use_internal_ip: whether the node_id belongs to an internal ip
- or external ip.
- docker_config: If set, the docker information of the docker
- container that commands should be run on.
- """
- common_args = {
- "log_prefix": log_prefix,
- "node_id": node_id,
- "provider": self,
- "auth_config": auth_config,
- "cluster_name": cluster_name,
- "process_runner": process_runner,
- "use_internal_ip": use_internal_ip,
- }
- if docker_config and docker_config["container_name"] != "":
- return DockerCommandRunner(docker_config, **common_args)
- else:
- return SSHCommandRunner(**common_args)
- def prepare_for_head_node(self, cluster_config: Dict[str, Any]) -> Dict[str, Any]:
- """Returns a new cluster config with custom configs for head node."""
- return cluster_config
- @staticmethod
- def fillout_available_node_types_resources(
- cluster_config: Dict[str, Any]
- ) -> Dict[str, Any]:
- """Fills out missing "resources" field for available_node_types."""
- return cluster_config
- def safe_to_scale(self) -> bool:
- """Optional condition to determine if it's safe to proceed with an autoscaling
- update. Can be used to wait for convergence of state managed by an external
- cluster manager.
- Called by the autoscaler immediately after non_terminated_nodes().
- If False is returned, the autoscaler will abort the update.
- """
- return True
- def post_process(self) -> None:
- """This optional method is executed at the end of
- StandardAutoscaler._update().
- """
- pass
|