autoscaler.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. import logging
  2. from queue import Queue
  3. from typing import List, Optional
  4. from urllib.parse import urlsplit
  5. from ray._raylet import GcsClient
  6. from ray.autoscaler._private.providers import _get_node_provider
  7. from ray.autoscaler.v2.event_logger import AutoscalerEventLogger
  8. from ray.autoscaler.v2.instance_manager.cloud_providers.kuberay.cloud_provider import (
  9. KubeRayProvider,
  10. )
  11. from ray.autoscaler.v2.instance_manager.cloud_providers.read_only.cloud_provider import ( # noqa
  12. ReadOnlyProvider,
  13. )
  14. from ray.autoscaler.v2.instance_manager.config import (
  15. AutoscalingConfig,
  16. IConfigReader,
  17. Provider,
  18. )
  19. from ray.autoscaler.v2.instance_manager.instance_manager import (
  20. InstanceManager,
  21. InstanceUpdatedSubscriber,
  22. )
  23. from ray.autoscaler.v2.instance_manager.instance_storage import InstanceStorage
  24. from ray.autoscaler.v2.instance_manager.node_provider import (
  25. ICloudInstanceProvider,
  26. NodeProviderAdapter,
  27. )
  28. from ray.autoscaler.v2.instance_manager.ray_installer import RayInstaller
  29. from ray.autoscaler.v2.instance_manager.reconciler import Reconciler
  30. from ray.autoscaler.v2.instance_manager.storage import InMemoryStorage
  31. from ray.autoscaler.v2.instance_manager.subscribers.cloud_instance_updater import (
  32. CloudInstanceUpdater,
  33. )
  34. from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import (
  35. CloudResourceMonitor,
  36. )
  37. from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopper
  38. from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import (
  39. ThreadedRayInstaller,
  40. )
  41. from ray.autoscaler.v2.metrics_reporter import AutoscalerMetricsReporter
  42. from ray.autoscaler.v2.scheduler import ResourceDemandScheduler
  43. from ray.autoscaler.v2.sdk import get_cluster_resource_state
  44. from ray.core.generated.autoscaler_pb2 import AutoscalingState
  45. logger = logging.getLogger(__name__)
  46. class Autoscaler:
  47. def __init__(
  48. self,
  49. session_name: str,
  50. config_reader: IConfigReader,
  51. gcs_client: GcsClient,
  52. event_logger: Optional[AutoscalerEventLogger] = None,
  53. metrics_reporter: Optional[AutoscalerMetricsReporter] = None,
  54. ) -> None:
  55. """
  56. Args:
  57. session_name: The current Ray session name.
  58. config_reader: The config reader.
  59. gcs_client: The GCS client.
  60. event_logger: The event logger for emitting cluster events.
  61. metrics_reporter: The metrics reporter for emitting cluster metrics.
  62. """
  63. self._config_reader = config_reader
  64. config = config_reader.get_cached_autoscaling_config()
  65. logger.info(f"Using Autoscaling Config: \n{config.dump()}")
  66. self._gcs_client = gcs_client
  67. self._cloud_instance_provider = None
  68. self._instance_manager = None
  69. self._ray_stop_errors_queue = Queue()
  70. self._ray_install_errors_queue = Queue()
  71. self._event_logger = event_logger
  72. self._metrics_reporter = metrics_reporter
  73. self._init_cloud_instance_provider(config, config_reader)
  74. self._cloud_resource_monitor = None
  75. self._init_instance_manager(
  76. session_name=session_name,
  77. config=config,
  78. cloud_provider=self._cloud_instance_provider,
  79. gcs_client=self._gcs_client,
  80. )
  81. self._scheduler = ResourceDemandScheduler(self._event_logger)
  82. def _init_cloud_instance_provider(
  83. self, config: AutoscalingConfig, config_reader: IConfigReader
  84. ):
  85. """
  86. Initialize the cloud provider, and its dependencies (the v1 node provider)
  87. Args:
  88. config: The autoscaling config.
  89. config_reader: The config reader.
  90. """
  91. provider_config = config.get_provider_config()
  92. if provider_config["type"] == "kuberay":
  93. provider_config["head_node_type"] = config.get_head_node_type()
  94. self._cloud_instance_provider = KubeRayProvider(
  95. config.get_config("cluster_name"),
  96. provider_config,
  97. )
  98. elif config.provider == Provider.READ_ONLY:
  99. provider_config["gcs_address"] = self._gcs_client.address
  100. self._cloud_instance_provider = ReadOnlyProvider(
  101. provider_config=provider_config,
  102. )
  103. else:
  104. node_provider_v1 = _get_node_provider(
  105. provider_config,
  106. config.get_config("cluster_name"),
  107. )
  108. self._cloud_instance_provider = NodeProviderAdapter(
  109. v1_provider=node_provider_v1,
  110. config_reader=config_reader,
  111. )
  112. def _init_instance_manager(
  113. self,
  114. session_name: str,
  115. cloud_provider: ICloudInstanceProvider,
  116. gcs_client: GcsClient,
  117. config: AutoscalingConfig,
  118. ):
  119. """
  120. Initialize the instance manager, and its dependencies.
  121. """
  122. instance_storage = InstanceStorage(
  123. cluster_id=session_name,
  124. storage=InMemoryStorage(),
  125. )
  126. subscribers: List[InstanceUpdatedSubscriber] = []
  127. subscribers.append(CloudInstanceUpdater(cloud_provider=cloud_provider))
  128. subscribers.append(
  129. RayStopper(gcs_client=gcs_client, error_queue=self._ray_stop_errors_queue)
  130. )
  131. if not config.disable_node_updaters() and isinstance(
  132. cloud_provider, NodeProviderAdapter
  133. ):
  134. head_node_ip = urlsplit("//" + self._gcs_client.address).hostname
  135. assert head_node_ip is not None, "Invalid GCS address format"
  136. subscribers.append(
  137. ThreadedRayInstaller(
  138. head_node_ip=head_node_ip,
  139. instance_storage=instance_storage,
  140. ray_installer=RayInstaller(
  141. provider=cloud_provider.v1_provider,
  142. config=config,
  143. ),
  144. error_queue=self._ray_install_errors_queue,
  145. # TODO(rueian): Rewrite the ThreadedRayInstaller and its underlying
  146. # NodeUpdater and CommandRunner to use the asyncio, so that we don't
  147. # need to use so many threads. We use so many threads now because
  148. # they are blocking and letting the new cloud machines to wait for
  149. # previous machines to finish installing Ray is quite inefficient.
  150. max_concurrent_installs=config.get_max_num_worker_nodes() or 50,
  151. )
  152. )
  153. self._cloud_resource_monitor = CloudResourceMonitor()
  154. subscribers.append(self._cloud_resource_monitor)
  155. self._instance_manager = InstanceManager(
  156. instance_storage=instance_storage,
  157. instance_status_update_subscribers=subscribers,
  158. )
  159. def update_autoscaling_state(
  160. self,
  161. ) -> Optional[AutoscalingState]:
  162. """Update the autoscaling state of the cluster by reconciling the current
  163. state of the cluster resources, the cloud providers as well as instance
  164. update subscribers with the desired state.
  165. Returns:
  166. AutoscalingState: The new autoscaling state of the cluster or None if
  167. the state is not updated.
  168. Raises:
  169. None: No exception.
  170. """
  171. try:
  172. ray_stop_errors = []
  173. while not self._ray_stop_errors_queue.empty():
  174. ray_stop_errors.append(self._ray_stop_errors_queue.get())
  175. ray_install_errors = []
  176. while not self._ray_install_errors_queue.empty():
  177. ray_install_errors.append(self._ray_install_errors_queue.get())
  178. # Get the current state of the ray cluster resources.
  179. ray_cluster_resource_state = get_cluster_resource_state(self._gcs_client)
  180. # Refresh the config from the source
  181. self._config_reader.refresh_cached_autoscaling_config()
  182. autoscaling_config = self._config_reader.get_cached_autoscaling_config()
  183. return Reconciler.reconcile(
  184. instance_manager=self._instance_manager,
  185. scheduler=self._scheduler,
  186. cloud_provider=self._cloud_instance_provider,
  187. cloud_resource_monitor=self._cloud_resource_monitor,
  188. ray_cluster_resource_state=ray_cluster_resource_state,
  189. non_terminated_cloud_instances=(
  190. self._cloud_instance_provider.get_non_terminated()
  191. ),
  192. cloud_provider_errors=self._cloud_instance_provider.poll_errors(),
  193. ray_install_errors=ray_install_errors,
  194. ray_stop_errors=ray_stop_errors,
  195. autoscaling_config=autoscaling_config,
  196. metrics_reporter=self._metrics_reporter,
  197. )
  198. except Exception as e:
  199. logger.exception(e)
  200. return None