providers.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. import copy
  2. import json
  3. import logging
  4. import os
  5. from typing import Any, Dict
  6. import yaml
  7. from ray.autoscaler._private.loader import load_function_or_class
  8. logger = logging.getLogger(__name__)
  9. # For caching provider instantiations across API calls of one python session
  10. _provider_instances = {}
  11. # Minimal config for compatibility with legacy-style external configs.
  12. MINIMAL_EXTERNAL_CONFIG = {
  13. "available_node_types": {
  14. "ray.head.default": {},
  15. "ray.worker.default": {},
  16. },
  17. "head_node_type": "ray.head.default",
  18. "head_node": {},
  19. "worker_nodes": {},
  20. }
  21. def _import_aws(provider_config):
  22. try:
  23. # boto3 and botocore are imported in multiple places in the codebase,
  24. # so we just import them here to ensure that they are installed.
  25. import boto3 # noqa: F401
  26. except ImportError as e:
  27. raise ImportError(
  28. "The Ray AWS VM launcher requires the AWS SDK for Python (Boto3) "
  29. "to be installed. You can install it with `pip install boto3`."
  30. ) from e
  31. from ray.autoscaler._private.aws.node_provider import AWSNodeProvider
  32. return AWSNodeProvider
  33. def _import_gcp(provider_config):
  34. try:
  35. import googleapiclient # noqa: F401
  36. except ImportError as e:
  37. raise ImportError(
  38. "The Ray GCP VM launcher requires the Google API Client to be installed. "
  39. "You can install it with `pip install google-api-python-client`."
  40. ) from e
  41. from ray.autoscaler._private.gcp.node_provider import GCPNodeProvider
  42. return GCPNodeProvider
  43. def _import_azure(provider_config):
  44. from ray.autoscaler._private._azure.node_provider import AzureNodeProvider
  45. return AzureNodeProvider
  46. def _import_vsphere(provider_config):
  47. from ray.autoscaler._private.vsphere.node_provider import VsphereWcpNodeProvider
  48. return VsphereWcpNodeProvider
  49. def _import_local(provider_config):
  50. if "coordinator_address" in provider_config:
  51. from ray.autoscaler._private.local.coordinator_node_provider import (
  52. CoordinatorSenderNodeProvider,
  53. )
  54. return CoordinatorSenderNodeProvider
  55. else:
  56. from ray.autoscaler._private.local.node_provider import LocalNodeProvider
  57. return LocalNodeProvider
  58. def _import_readonly(provider_config):
  59. from ray.autoscaler._private.readonly.node_provider import ReadOnlyNodeProvider
  60. return ReadOnlyNodeProvider
  61. def _import_fake_multinode(provider_config):
  62. from ray.autoscaler._private.fake_multi_node.node_provider import (
  63. FakeMultiNodeProvider,
  64. )
  65. return FakeMultiNodeProvider
  66. def _import_fake_multinode_docker(provider_config):
  67. from ray.autoscaler._private.fake_multi_node.node_provider import (
  68. FakeMultiNodeDockerProvider,
  69. )
  70. return FakeMultiNodeDockerProvider
  71. def _import_kuberay(provider_config):
  72. from ray.autoscaler._private.kuberay.node_provider import KubeRayNodeProvider
  73. return KubeRayNodeProvider
  74. def _import_aliyun(provider_config):
  75. from ray.autoscaler._private.aliyun.node_provider import AliyunNodeProvider
  76. return AliyunNodeProvider
  77. def _import_spark(provider_config):
  78. from ray.autoscaler._private.spark.node_provider import SparkNodeProvider
  79. return SparkNodeProvider
  80. def _load_fake_multinode_defaults_config():
  81. import ray.autoscaler._private.fake_multi_node as ray_fake_multinode
  82. return os.path.join(os.path.dirname(ray_fake_multinode.__file__), "example.yaml")
  83. def _load_read_only_defaults_config():
  84. import ray.autoscaler._private.readonly as ray_readonly
  85. return os.path.join(os.path.dirname(ray_readonly.__file__), "defaults.yaml")
  86. def _load_fake_multinode_docker_defaults_config():
  87. import ray.autoscaler._private.fake_multi_node as ray_fake_multinode
  88. return os.path.join(
  89. os.path.dirname(ray_fake_multinode.__file__), "example_docker.yaml"
  90. )
  91. def _load_local_defaults_config():
  92. import ray.autoscaler.local as ray_local
  93. return os.path.join(os.path.dirname(ray_local.__file__), "defaults.yaml")
  94. def _load_aws_defaults_config():
  95. import ray.autoscaler.aws as ray_aws
  96. return os.path.join(os.path.dirname(ray_aws.__file__), "defaults.yaml")
  97. def _load_vsphere_defaults_config():
  98. import ray.autoscaler.vsphere as ray_vsphere
  99. return os.path.join(os.path.dirname(ray_vsphere.__file__), "defaults.yaml")
  100. def _load_gcp_defaults_config():
  101. import ray.autoscaler.gcp as ray_gcp
  102. return os.path.join(os.path.dirname(ray_gcp.__file__), "defaults.yaml")
  103. def _load_azure_defaults_config():
  104. import ray.autoscaler.azure as ray_azure
  105. return os.path.join(os.path.dirname(ray_azure.__file__), "defaults.yaml")
  106. def _load_aliyun_defaults_config():
  107. import ray.autoscaler.aliyun as ray_aliyun
  108. return os.path.join(os.path.dirname(ray_aliyun.__file__), "defaults.yaml")
  109. def _import_external(provider_config):
  110. provider_cls = load_function_or_class(path=provider_config["module"])
  111. return provider_cls
  112. _NODE_PROVIDERS = {
  113. "local": _import_local,
  114. "fake_multinode": _import_fake_multinode,
  115. "fake_multinode_docker": _import_fake_multinode_docker,
  116. "readonly": _import_readonly,
  117. "aws": _import_aws,
  118. "gcp": _import_gcp,
  119. "vsphere": _import_vsphere,
  120. "azure": _import_azure,
  121. "kuberay": _import_kuberay,
  122. "aliyun": _import_aliyun,
  123. "external": _import_external, # Import an external module
  124. "spark": _import_spark,
  125. }
  126. _PROVIDER_PRETTY_NAMES = {
  127. "readonly": "Readonly (Manual Cluster Setup)",
  128. "fake_multinode": "Fake Multinode",
  129. "fake_multinode_docker": "Fake Multinode Docker",
  130. "local": "Local",
  131. "aws": "AWS",
  132. "gcp": "GCP",
  133. "azure": "Azure",
  134. "kuberay": "KubeRay",
  135. "aliyun": "Aliyun",
  136. "external": "External",
  137. "vsphere": "vSphere",
  138. "spark": "Spark",
  139. }
  140. _DEFAULT_CONFIGS = {
  141. "fake_multinode": _load_fake_multinode_defaults_config,
  142. "fake_multinode_docker": _load_fake_multinode_docker_defaults_config,
  143. "local": _load_local_defaults_config,
  144. "aws": _load_aws_defaults_config,
  145. "gcp": _load_gcp_defaults_config,
  146. "azure": _load_azure_defaults_config,
  147. "aliyun": _load_aliyun_defaults_config,
  148. "vsphere": _load_vsphere_defaults_config,
  149. "readonly": _load_read_only_defaults_config,
  150. }
  151. def _get_node_provider_cls(provider_config: Dict[str, Any]):
  152. """Get the node provider class for a given provider config.
  153. Note that this may be used by private node providers that proxy methods to
  154. built-in node providers, so we should maintain backwards compatibility.
  155. Args:
  156. provider_config: provider section of the autoscaler config.
  157. Returns:
  158. NodeProvider class
  159. """
  160. importer = _NODE_PROVIDERS.get(provider_config["type"])
  161. if importer is None:
  162. raise NotImplementedError(
  163. "Unsupported node provider: {}".format(provider_config["type"])
  164. )
  165. return importer(provider_config)
  166. def _get_node_provider(
  167. provider_config: Dict[str, Any], cluster_name: str, use_cache: bool = True
  168. ) -> Any:
  169. """Get the instantiated node provider for a given provider config.
  170. Note that this may be used by private node providers that proxy methods to
  171. built-in node providers, so we should maintain backwards compatibility.
  172. Args:
  173. provider_config: provider section of the autoscaler config.
  174. cluster_name: cluster name from the autoscaler config.
  175. use_cache: whether or not to use a cached definition if available. If
  176. False, the returned object will also not be stored in the cache.
  177. Returns:
  178. NodeProvider
  179. """
  180. provider_key = (json.dumps(provider_config, sort_keys=True), cluster_name)
  181. if use_cache and provider_key in _provider_instances:
  182. return _provider_instances[provider_key]
  183. provider_cls = _get_node_provider_cls(provider_config)
  184. new_provider = provider_cls(provider_config, cluster_name)
  185. if use_cache:
  186. _provider_instances[provider_key] = new_provider
  187. return new_provider
  188. def _clear_provider_cache():
  189. global _provider_instances
  190. _provider_instances = {}
  191. def _get_default_config(provider_config):
  192. """Retrieve a node provider.
  193. This is an INTERNAL API. It is not allowed to call this from any Ray
  194. package outside the autoscaler.
  195. """
  196. if provider_config["type"] == "external":
  197. return copy.deepcopy(MINIMAL_EXTERNAL_CONFIG)
  198. load_config = _DEFAULT_CONFIGS.get(provider_config["type"])
  199. if load_config is None:
  200. raise NotImplementedError(
  201. "Unsupported node provider: {}".format(provider_config["type"])
  202. )
  203. path_to_default = load_config()
  204. with open(path_to_default) as f:
  205. defaults = yaml.safe_load(f)
  206. return defaults