| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- import copy
- import json
- import logging
- import os
- from typing import Any, Dict
- import yaml
- from ray.autoscaler._private.loader import load_function_or_class
- logger = logging.getLogger(__name__)
- # For caching provider instantiations across API calls of one python session
- _provider_instances = {}
- # Minimal config for compatibility with legacy-style external configs.
- MINIMAL_EXTERNAL_CONFIG = {
- "available_node_types": {
- "ray.head.default": {},
- "ray.worker.default": {},
- },
- "head_node_type": "ray.head.default",
- "head_node": {},
- "worker_nodes": {},
- }
- def _import_aws(provider_config):
- try:
- # boto3 and botocore are imported in multiple places in the codebase,
- # so we just import them here to ensure that they are installed.
- import boto3 # noqa: F401
- except ImportError as e:
- raise ImportError(
- "The Ray AWS VM launcher requires the AWS SDK for Python (Boto3) "
- "to be installed. You can install it with `pip install boto3`."
- ) from e
- from ray.autoscaler._private.aws.node_provider import AWSNodeProvider
- return AWSNodeProvider
- def _import_gcp(provider_config):
- try:
- import googleapiclient # noqa: F401
- except ImportError as e:
- raise ImportError(
- "The Ray GCP VM launcher requires the Google API Client to be installed. "
- "You can install it with `pip install google-api-python-client`."
- ) from e
- from ray.autoscaler._private.gcp.node_provider import GCPNodeProvider
- return GCPNodeProvider
- def _import_azure(provider_config):
- from ray.autoscaler._private._azure.node_provider import AzureNodeProvider
- return AzureNodeProvider
- def _import_vsphere(provider_config):
- from ray.autoscaler._private.vsphere.node_provider import VsphereWcpNodeProvider
- return VsphereWcpNodeProvider
- def _import_local(provider_config):
- if "coordinator_address" in provider_config:
- from ray.autoscaler._private.local.coordinator_node_provider import (
- CoordinatorSenderNodeProvider,
- )
- return CoordinatorSenderNodeProvider
- else:
- from ray.autoscaler._private.local.node_provider import LocalNodeProvider
- return LocalNodeProvider
- def _import_readonly(provider_config):
- from ray.autoscaler._private.readonly.node_provider import ReadOnlyNodeProvider
- return ReadOnlyNodeProvider
- def _import_fake_multinode(provider_config):
- from ray.autoscaler._private.fake_multi_node.node_provider import (
- FakeMultiNodeProvider,
- )
- return FakeMultiNodeProvider
- def _import_fake_multinode_docker(provider_config):
- from ray.autoscaler._private.fake_multi_node.node_provider import (
- FakeMultiNodeDockerProvider,
- )
- return FakeMultiNodeDockerProvider
- def _import_kuberay(provider_config):
- from ray.autoscaler._private.kuberay.node_provider import KubeRayNodeProvider
- return KubeRayNodeProvider
- def _import_aliyun(provider_config):
- from ray.autoscaler._private.aliyun.node_provider import AliyunNodeProvider
- return AliyunNodeProvider
- def _import_spark(provider_config):
- from ray.autoscaler._private.spark.node_provider import SparkNodeProvider
- return SparkNodeProvider
- def _load_fake_multinode_defaults_config():
- import ray.autoscaler._private.fake_multi_node as ray_fake_multinode
- return os.path.join(os.path.dirname(ray_fake_multinode.__file__), "example.yaml")
- def _load_read_only_defaults_config():
- import ray.autoscaler._private.readonly as ray_readonly
- return os.path.join(os.path.dirname(ray_readonly.__file__), "defaults.yaml")
- def _load_fake_multinode_docker_defaults_config():
- import ray.autoscaler._private.fake_multi_node as ray_fake_multinode
- return os.path.join(
- os.path.dirname(ray_fake_multinode.__file__), "example_docker.yaml"
- )
- def _load_local_defaults_config():
- import ray.autoscaler.local as ray_local
- return os.path.join(os.path.dirname(ray_local.__file__), "defaults.yaml")
- def _load_aws_defaults_config():
- import ray.autoscaler.aws as ray_aws
- return os.path.join(os.path.dirname(ray_aws.__file__), "defaults.yaml")
- def _load_vsphere_defaults_config():
- import ray.autoscaler.vsphere as ray_vsphere
- return os.path.join(os.path.dirname(ray_vsphere.__file__), "defaults.yaml")
- def _load_gcp_defaults_config():
- import ray.autoscaler.gcp as ray_gcp
- return os.path.join(os.path.dirname(ray_gcp.__file__), "defaults.yaml")
- def _load_azure_defaults_config():
- import ray.autoscaler.azure as ray_azure
- return os.path.join(os.path.dirname(ray_azure.__file__), "defaults.yaml")
- def _load_aliyun_defaults_config():
- import ray.autoscaler.aliyun as ray_aliyun
- return os.path.join(os.path.dirname(ray_aliyun.__file__), "defaults.yaml")
- def _import_external(provider_config):
- provider_cls = load_function_or_class(path=provider_config["module"])
- return provider_cls
- _NODE_PROVIDERS = {
- "local": _import_local,
- "fake_multinode": _import_fake_multinode,
- "fake_multinode_docker": _import_fake_multinode_docker,
- "readonly": _import_readonly,
- "aws": _import_aws,
- "gcp": _import_gcp,
- "vsphere": _import_vsphere,
- "azure": _import_azure,
- "kuberay": _import_kuberay,
- "aliyun": _import_aliyun,
- "external": _import_external, # Import an external module
- "spark": _import_spark,
- }
- _PROVIDER_PRETTY_NAMES = {
- "readonly": "Readonly (Manual Cluster Setup)",
- "fake_multinode": "Fake Multinode",
- "fake_multinode_docker": "Fake Multinode Docker",
- "local": "Local",
- "aws": "AWS",
- "gcp": "GCP",
- "azure": "Azure",
- "kuberay": "KubeRay",
- "aliyun": "Aliyun",
- "external": "External",
- "vsphere": "vSphere",
- "spark": "Spark",
- }
- _DEFAULT_CONFIGS = {
- "fake_multinode": _load_fake_multinode_defaults_config,
- "fake_multinode_docker": _load_fake_multinode_docker_defaults_config,
- "local": _load_local_defaults_config,
- "aws": _load_aws_defaults_config,
- "gcp": _load_gcp_defaults_config,
- "azure": _load_azure_defaults_config,
- "aliyun": _load_aliyun_defaults_config,
- "vsphere": _load_vsphere_defaults_config,
- "readonly": _load_read_only_defaults_config,
- }
- def _get_node_provider_cls(provider_config: Dict[str, Any]):
- """Get the node provider class for a given provider config.
- Note that this may be used by private node providers that proxy methods to
- built-in node providers, so we should maintain backwards compatibility.
- Args:
- provider_config: provider section of the autoscaler config.
- Returns:
- NodeProvider class
- """
- importer = _NODE_PROVIDERS.get(provider_config["type"])
- if importer is None:
- raise NotImplementedError(
- "Unsupported node provider: {}".format(provider_config["type"])
- )
- return importer(provider_config)
- def _get_node_provider(
- provider_config: Dict[str, Any], cluster_name: str, use_cache: bool = True
- ) -> Any:
- """Get the instantiated node provider for a given provider config.
- Note that this may be used by private node providers that proxy methods to
- built-in node providers, so we should maintain backwards compatibility.
- Args:
- provider_config: provider section of the autoscaler config.
- cluster_name: cluster name from the autoscaler config.
- use_cache: whether or not to use a cached definition if available. If
- False, the returned object will also not be stored in the cache.
- Returns:
- NodeProvider
- """
- provider_key = (json.dumps(provider_config, sort_keys=True), cluster_name)
- if use_cache and provider_key in _provider_instances:
- return _provider_instances[provider_key]
- provider_cls = _get_node_provider_cls(provider_config)
- new_provider = provider_cls(provider_config, cluster_name)
- if use_cache:
- _provider_instances[provider_key] = new_provider
- return new_provider
- def _clear_provider_cache():
- global _provider_instances
- _provider_instances = {}
- def _get_default_config(provider_config):
- """Retrieve a node provider.
- This is an INTERNAL API. It is not allowed to call this from any Ray
- package outside the autoscaler.
- """
- if provider_config["type"] == "external":
- return copy.deepcopy(MINIMAL_EXTERNAL_CONFIG)
- load_config = _DEFAULT_CONFIGS.get(provider_config["type"])
- if load_config is None:
- raise NotImplementedError(
- "Unsupported node provider: {}".format(provider_config["type"])
- )
- path_to_default = load_config()
- with open(path_to_default) as f:
- defaults = yaml.safe_load(f)
- return defaults
|