config.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. import copy
  2. import logging
  3. from abc import ABC, abstractmethod
  4. from dataclasses import dataclass, field
  5. from enum import Enum
  6. from pathlib import Path
  7. from typing import Any, Dict, List, Optional
  8. import yaml
  9. from ray._common.utils import binary_to_hex
  10. from ray._private.ray_constants import env_integer
  11. from ray._raylet import GcsClient
  12. from ray.autoscaler._private.constants import (
  13. AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
  14. DEFAULT_UPSCALING_SPEED,
  15. DISABLE_LAUNCH_CONFIG_CHECK_KEY,
  16. DISABLE_NODE_UPDATERS_KEY,
  17. )
  18. from ray.autoscaler._private.kuberay.autoscaling_config import AutoscalingConfigProducer
  19. from ray.autoscaler._private.monitor import BASE_READONLY_CONFIG
  20. from ray.autoscaler._private.util import (
  21. format_readonly_node_type,
  22. hash_launch_conf,
  23. hash_runtime_conf,
  24. prepare_config,
  25. validate_config,
  26. )
  27. from ray.autoscaler.v2.schema import NodeType
  28. from ray.autoscaler.v2.sdk import get_cluster_resource_state
  29. from ray.autoscaler.v2.utils import is_head_node
  30. logger = logging.getLogger(__name__)
  31. class Provider(Enum):
  32. UNKNOWN = 0
  33. ALIYUN = 1
  34. AWS = 2
  35. AZURE = 3
  36. GCP = 4
  37. KUBERAY = 5
  38. LOCAL = 6
  39. READ_ONLY = 7
  40. class IConfigReader(ABC):
  41. """An interface for reading Autoscaling config.
  42. A utility class that reads autoscaling configs from various sources:
  43. - File
  44. - In-memory dict
  45. - Remote config service (e.g. KubeRay's config)
  46. Example:
  47. reader = FileConfigReader("path/to/config.yaml")
  48. # Get the recently cached config.
  49. config = reader.get_cached_autoscaling_config()
  50. ...
  51. # Refresh the cached config.
  52. reader.refresh_cached_autoscaling_config()
  53. config = reader.get_cached_autoscaling_config()
  54. """
  55. @abstractmethod
  56. def get_cached_autoscaling_config(self) -> "AutoscalingConfig":
  57. """Returns the recently read autoscaling config.
  58. Returns:
  59. AutoscalingConfig: The recently read autoscaling config.
  60. """
  61. pass
  62. @abstractmethod
  63. def refresh_cached_autoscaling_config(self):
  64. """Read the config from the source."""
  65. pass
  66. @dataclass(frozen=True)
  67. class InstanceReconcileConfig:
  68. # The timeout for waiting for a REQUESTED instance to be ALLOCATED.
  69. request_status_timeout_s: int = env_integer(
  70. "RAY_AUTOSCALER_RECONCILE_REQUEST_STATUS_TIMEOUT_S", 10 * 60
  71. )
  72. # The timeout for waiting for a ALLOCATED instance to be RAY_RUNNING.
  73. allocate_status_timeout_s: int = env_integer(
  74. "RAY_AUTOSCALER_RECONCILE_ALLOCATE_STATUS_TIMEOUT_S", 60 * 60
  75. )
  76. # The timeout for waiting for a RAY_INSTALLING instance to be RAY_RUNNING.
  77. ray_install_status_timeout_s: int = env_integer(
  78. "RAY_AUTOSCALER_RECONCILE_RAY_INSTALL_STATUS_TIMEOUT_S", 30 * 60
  79. )
  80. # The timeout for waiting for a TERMINATING instance to be TERMINATED.
  81. terminating_status_timeout_s: int = env_integer(
  82. "RAY_AUTOSCALER_RECONCILE_TERMINATING_STATUS_TIMEOUT_S", 300
  83. )
  84. # The timeout for waiting for a RAY_STOP_REQUESTED instance
  85. # to be RAY_STOPPING or RAY_STOPPED.
  86. ray_stop_requested_status_timeout_s: int = env_integer(
  87. "RAY_AUTOSCALER_RECONCILE_RAY_STOP_REQUESTED_STATUS_TIMEOUT_S", 300
  88. )
  89. # The interval for raise a warning when an instance in transient status
  90. # is not updated for a long time.
  91. transient_status_warn_interval_s: int = env_integer(
  92. "RAY_AUTOSCALER_RECONCILE_TRANSIENT_STATUS_WARN_INTERVAL_S", 90
  93. )
  94. # The number of times to retry requesting to allocate an instance.
  95. max_num_retry_request_to_allocate: int = env_integer(
  96. "RAY_AUTOSCALER_RECONCILE_MAX_NUM_RETRY_REQUEST_TO_ALLOCATE", 3
  97. )
  98. @dataclass
  99. class NodeTypeConfig:
  100. """
  101. NodeTypeConfig is the helper class to provide node type specific configs.
  102. This maps to subset of the `available_node_types` field in the
  103. autoscaling config.
  104. """
  105. # Node type name
  106. name: NodeType
  107. # The minimal number of worker nodes to be launched for this node type.
  108. min_worker_nodes: int
  109. # The maximal number of worker nodes can be launched for this node type.
  110. max_worker_nodes: int
  111. # Idle timeout seconds for worker nodes of this node type.
  112. idle_timeout_s: Optional[float] = None
  113. # The total resources on the node.
  114. resources: Dict[str, float] = field(default_factory=dict)
  115. # The labels on the node.
  116. labels: Dict[str, str] = field(default_factory=dict)
  117. # The node config's launch config hash. It's calculated from the auth
  118. # config, and the node's config in the `AutoscalingConfig` for the node
  119. # type when launching the node. It's used to detect config changes.
  120. launch_config_hash: str = ""
  121. def __post_init__(self):
  122. assert self.min_worker_nodes <= self.max_worker_nodes
  123. assert self.min_worker_nodes >= 0
  124. class AutoscalingConfig:
  125. """
  126. AutoscalingConfig is the helper class to provide autoscaling
  127. related configs.
  128. # TODO(rickyx):
  129. 1. Move the config validation logic here.
  130. 2. Deprecate the ray-schema.json for validation because it's
  131. static thus not possible to validate the config with interdependency
  132. of each other.
  133. """
  134. def __init__(
  135. self,
  136. configs: Dict[str, Any],
  137. skip_content_hash: bool = False,
  138. ) -> None:
  139. """
  140. Args:
  141. configs : The raw configs dict.
  142. skip_content_hash :
  143. Whether to skip file mounts/ray command hash calculation.
  144. """
  145. self._sync_continuously = False
  146. self.update_configs(configs, skip_content_hash)
  147. def update_configs(self, configs: Dict[str, Any], skip_content_hash: bool) -> None:
  148. self._configs = prepare_config(configs)
  149. validate_config(self._configs)
  150. if skip_content_hash:
  151. return
  152. self._calculate_hashes()
  153. self._sync_continuously = self._configs.get(
  154. "generate_file_mounts_contents_hash", True
  155. )
  156. def _calculate_hashes(self) -> None:
  157. logger.info("Calculating hashes for file mounts and ray commands.")
  158. self._runtime_hash, self._file_mounts_contents_hash = hash_runtime_conf(
  159. self._configs.get("file_mounts", {}),
  160. self._configs.get("cluster_synced_files", []),
  161. [
  162. self._configs.get("worker_setup_commands", []),
  163. self._configs.get("worker_start_ray_commands", []),
  164. ],
  165. generate_file_mounts_contents_hash=self._configs.get(
  166. "generate_file_mounts_contents_hash", True
  167. ),
  168. )
  169. def get_cloud_node_config(self, ray_node_type: NodeType) -> Dict[str, Any]:
  170. return copy.deepcopy(
  171. self.get_node_type_specific_config(ray_node_type, "node_config") or {}
  172. )
  173. def get_docker_config(self, ray_node_type: NodeType) -> Dict[str, Any]:
  174. """
  175. Return the docker config for the specified node type.
  176. If it's a head node, the image will be chosen in the following order:
  177. 1. Node specific docker image.
  178. 2. The 'docker' config's 'head_image' field.
  179. 3. The 'docker' config's 'image' field.
  180. If it's a worker node, the image will be chosen in the following order:
  181. 1. Node specific docker image.
  182. 2. The 'docker' config's 'worker_image' field.
  183. 3. The 'docker' config's 'image' field.
  184. """
  185. # TODO(rickyx): It's unfortunate we have multiple fields in ray-schema.json
  186. # that can specify docker images. We should consolidate them.
  187. docker_config = copy.deepcopy(self._configs.get("docker", {}))
  188. node_specific_docker_config = self._configs["available_node_types"][
  189. ray_node_type
  190. ].get("docker", {})
  191. # Override the global docker config with node specific docker config.
  192. docker_config.update(node_specific_docker_config)
  193. if self._configs.get("head_node_type") == ray_node_type:
  194. if "head_image" in docker_config:
  195. logger.info(
  196. "Overwriting image={} by head_image({}) for head node docker.".format( # noqa: E501
  197. docker_config["image"], docker_config["head_image"]
  198. )
  199. )
  200. docker_config["image"] = docker_config["head_image"]
  201. else:
  202. if "worker_image" in docker_config:
  203. logger.info(
  204. "Overwriting image={} by worker_image({}) for worker node docker.".format( # noqa: E501
  205. docker_config["image"], docker_config["worker_image"]
  206. )
  207. )
  208. docker_config["image"] = docker_config["worker_image"]
  209. # These fields should be merged.
  210. docker_config.pop("head_image", None)
  211. docker_config.pop("worker_image", None)
  212. return docker_config
  213. def get_worker_start_ray_commands(self) -> List[str]:
  214. return self._configs.get("worker_start_ray_commands", [])
  215. def get_head_setup_commands(self) -> List[str]:
  216. return self._configs.get("head_setup_commands", [])
  217. def get_head_start_ray_commands(self) -> List[str]:
  218. return self._configs.get("head_start_ray_commands", [])
  219. def get_worker_setup_commands(self, ray_node_type: NodeType) -> List[str]:
  220. """
  221. Return the worker setup commands for the specified node type.
  222. If the node type specific worker setup commands are not specified,
  223. return the global worker setup commands.
  224. """
  225. worker_setup_command = self.get_node_type_specific_config(
  226. ray_node_type, "worker_setup_commands"
  227. )
  228. if worker_setup_command is None:
  229. # Return global worker setup commands if node type specific
  230. # worker setup commands are not specified.
  231. logger.info(
  232. "Using global worker setup commands for {}".format(ray_node_type)
  233. )
  234. return self._configs.get("worker_setup_commands", [])
  235. return worker_setup_command
  236. def get_initialization_commands(self, ray_node_type: NodeType) -> List[str]:
  237. """
  238. Return the initialization commands for the specified node type.
  239. If the node type specific initialization commands are not specified,
  240. return the global initialization commands.
  241. """
  242. initialization_command = self.get_node_type_specific_config(
  243. ray_node_type, "initialization_commands"
  244. )
  245. if initialization_command is None:
  246. logger.info(
  247. "Using global initialization commands for {}".format(ray_node_type)
  248. )
  249. return self._configs.get("initialization_commands", [])
  250. return initialization_command
  251. def get_node_type_specific_config(
  252. self, ray_node_type: NodeType, config_name: str
  253. ) -> Optional[Any]:
  254. node_specific_config = self._configs["available_node_types"].get(
  255. ray_node_type, {}
  256. )
  257. return node_specific_config.get(config_name, None)
  258. def get_node_resources(self, ray_node_type: NodeType) -> Dict[str, float]:
  259. return copy.deepcopy(
  260. self.get_node_type_specific_config(ray_node_type, "resources") or {}
  261. )
  262. def get_node_labels(self, ray_node_type: NodeType) -> Dict[str, str]:
  263. return copy.deepcopy(
  264. self.get_node_type_specific_config(ray_node_type, "labels") or {}
  265. )
  266. def get_config(self, config_name, default=None) -> Any:
  267. return self._configs.get(config_name, default)
  268. def get_provider_instance_type(self, ray_node_type: NodeType) -> str:
  269. provider = self.provider
  270. node_config = self.get_node_type_specific_config(ray_node_type, "node_config")
  271. if provider in [Provider.AWS, Provider.ALIYUN]:
  272. return node_config.get("InstanceType", "")
  273. elif provider == Provider.AZURE:
  274. return node_config.get("azure_arm_parameters", {}).get("vmSize", "")
  275. elif provider == Provider.GCP:
  276. return node_config.get("machineType", "")
  277. elif provider in [Provider.KUBERAY, Provider.LOCAL, Provider.UNKNOWN]:
  278. return ""
  279. else:
  280. raise ValueError(f"Unknown provider {provider}")
  281. def get_node_type_configs(self) -> Dict[NodeType, NodeTypeConfig]:
  282. """
  283. Returns the node type configs from the `available_node_types` field.
  284. Returns:
  285. Dict[NodeType, NodeTypeConfig]: The node type configs.
  286. """
  287. available_node_types = self._configs.get("available_node_types", {})
  288. if not available_node_types:
  289. return None
  290. node_type_configs = {}
  291. auth_config = self._configs.get("auth", {})
  292. head_node_type = self.get_head_node_type()
  293. assert head_node_type
  294. for node_type, node_config in available_node_types.items():
  295. launch_config_hash = hash_launch_conf(
  296. node_config.get("node_config", {}), auth_config
  297. )
  298. max_workers_nodes = node_config.get("max_workers", 0)
  299. if head_node_type == node_type:
  300. max_workers_nodes += 1
  301. node_type_configs[node_type] = NodeTypeConfig(
  302. name=node_type,
  303. min_worker_nodes=node_config.get("min_workers", 0),
  304. max_worker_nodes=max_workers_nodes,
  305. idle_timeout_s=node_config.get("idle_timeout_s", None),
  306. resources=node_config.get("resources", {}),
  307. labels=node_config.get("labels", {}),
  308. launch_config_hash=launch_config_hash,
  309. )
  310. return node_type_configs
  311. def get_head_node_type(self) -> NodeType:
  312. """
  313. Returns the head node type.
  314. If there is only one node type, return the only node type as the head
  315. node type.
  316. If there are multiple node types, return the head node type specified
  317. in the config.
  318. """
  319. available_node_types = self._configs.get("available_node_types", {})
  320. if len(available_node_types) == 1:
  321. return list(available_node_types.keys())[0]
  322. return self._configs.get("head_node_type")
  323. def get_max_num_worker_nodes(self) -> Optional[int]:
  324. return self.get_config("max_workers", None)
  325. def get_max_num_nodes(self) -> Optional[int]:
  326. max_num_workers = self.get_max_num_worker_nodes()
  327. if max_num_workers is not None:
  328. return max_num_workers + 1 # For head node
  329. return None
  330. def get_raw_config_mutable(self) -> Dict[str, Any]:
  331. return self._configs
  332. def get_upscaling_speed(self) -> float:
  333. return self.get_config("upscaling_speed", DEFAULT_UPSCALING_SPEED)
  334. def get_max_concurrent_launches(self) -> int:
  335. return AUTOSCALER_MAX_CONCURRENT_LAUNCHES
  336. def disable_node_updaters(self) -> bool:
  337. provider_config = self._configs.get("provider", {})
  338. return provider_config.get(DISABLE_NODE_UPDATERS_KEY, False)
  339. def get_idle_timeout_s(self) -> Optional[float]:
  340. """
  341. Returns the idle timeout in seconds if present in config, otherwise None.
  342. """
  343. idle_timeout_s = self.get_config("idle_timeout_minutes", None)
  344. return idle_timeout_s * 60 if idle_timeout_s is not None else None
  345. def disable_launch_config_check(self) -> bool:
  346. provider_config = self.get_provider_config()
  347. return provider_config.get(DISABLE_LAUNCH_CONFIG_CHECK_KEY, True)
  348. def get_instance_reconcile_config(self) -> InstanceReconcileConfig:
  349. # TODO(rickyx): we need a way to customize these configs,
  350. # either extending the current ray-schema.json, or just use another
  351. # schema validation paths.
  352. return InstanceReconcileConfig()
  353. def get_provider_config(self) -> Dict[str, Any]:
  354. return self._configs.get("provider", {})
  355. def dump(self) -> str:
  356. return yaml.safe_dump(self._configs)
  357. @property
  358. def provider(self) -> Provider:
  359. provider_str = self._configs.get("provider", {}).get("type", "")
  360. if provider_str == "local":
  361. return Provider.LOCAL
  362. elif provider_str == "aws":
  363. return Provider.AWS
  364. elif provider_str == "azure":
  365. return Provider.AZURE
  366. elif provider_str == "gcp":
  367. return Provider.GCP
  368. elif provider_str == "aliyun":
  369. return Provider.ALIYUN
  370. elif provider_str == "kuberay":
  371. return Provider.KUBERAY
  372. elif provider_str == "readonly":
  373. return Provider.READ_ONLY
  374. else:
  375. return Provider.UNKNOWN
  376. @property
  377. def runtime_hash(self) -> str:
  378. if not hasattr(self, "_runtime_hash"):
  379. self._calculate_hashes()
  380. return self._runtime_hash
  381. @property
  382. def file_mounts_contents_hash(self) -> str:
  383. if not hasattr(self, "_file_mounts_contents_hash"):
  384. self._calculate_hashes()
  385. return self._file_mounts_contents_hash
  386. class FileConfigReader(IConfigReader):
  387. """A class that reads cluster config from a yaml file."""
  388. def __init__(self, config_file: str, skip_content_hash: bool = True) -> None:
  389. """
  390. Args:
  391. config_file: The path to the config file.
  392. skip_content_hash: Whether to skip file mounts/ray command
  393. hash calculation. Default to True.
  394. """
  395. self._config_file_path = Path(config_file).resolve()
  396. self._skip_content_hash = skip_content_hash
  397. self._cached_config = self._read()
  398. def _read(self) -> AutoscalingConfig:
  399. with open(self._config_file_path) as f:
  400. config = yaml.safe_load(f.read())
  401. return AutoscalingConfig(config, skip_content_hash=self._skip_content_hash)
  402. def get_cached_autoscaling_config(self) -> AutoscalingConfig:
  403. """
  404. Returns:
  405. AutoscalingConfig: The autoscaling config.
  406. """
  407. return self._cached_config
  408. def refresh_cached_autoscaling_config(self):
  409. self._cached_config = self._read()
  410. class KubeRayConfigReader(IConfigReader):
  411. """A class that reads cluster config from a K8s RayCluster CR."""
  412. def __init__(self, config_producer: AutoscalingConfigProducer):
  413. self._config_producer = config_producer
  414. self._cached_config = self._generate_configs_from_k8s()
  415. def _generate_configs_from_k8s(self) -> AutoscalingConfig:
  416. return AutoscalingConfig(self._config_producer())
  417. def get_cached_autoscaling_config(self) -> AutoscalingConfig:
  418. """
  419. Returns:
  420. AutoscalingConfig: The autoscaling config.
  421. """
  422. return self._cached_config
  423. def refresh_cached_autoscaling_config(self):
  424. """
  425. Reads the configs from the K8s RayCluster CR.
  426. This reads from the K8s API server every time to pick up changes.
  427. """
  428. self._cached_config = self._generate_configs_from_k8s()
  429. class ReadOnlyProviderConfigReader(IConfigReader):
  430. """A class that reads cluster config for a read-only provider.
  431. This is used for laptop mode / manual cluster setup modes, in order to
  432. provide status reporting in the same way for users."""
  433. def __init__(self, gcs_address: str):
  434. self._configs = BASE_READONLY_CONFIG
  435. self._gcs_client = GcsClient(address=gcs_address)
  436. def refresh_cached_autoscaling_config(self) -> AutoscalingConfig:
  437. # Update the config with node types from GCS.
  438. ray_cluster_resource_state = get_cluster_resource_state(self._gcs_client)
  439. # Format each node type's config from the running nodes.
  440. available_node_types = {}
  441. head_node_type = None
  442. for node_state in ray_cluster_resource_state.node_states:
  443. node_type = node_state.ray_node_type_name
  444. if not node_type:
  445. node_type = format_readonly_node_type(binary_to_hex(node_state.node_id))
  446. if is_head_node(node_state):
  447. head_node_type = node_type
  448. if node_type not in available_node_types:
  449. available_node_types[node_type] = {
  450. "resources": dict(node_state.total_resources),
  451. "min_workers": 0,
  452. "max_workers": 0 if is_head_node(node_state) else 1,
  453. "node_config": {},
  454. }
  455. elif not is_head_node(node_state):
  456. available_node_types[node_type]["max_workers"] += 1
  457. if available_node_types:
  458. self._configs["available_node_types"].update(available_node_types)
  459. self._configs["max_workers"] = len(available_node_types)
  460. assert head_node_type, "Head node type should be found."
  461. self._configs["head_node_type"] = head_node_type
  462. # Don't idle terminated nodes in read-only mode.
  463. self._configs.pop("idle_timeout_minutes", None)
  464. def get_cached_autoscaling_config(self) -> AutoscalingConfig:
  465. return AutoscalingConfig(self._configs, skip_content_hash=True)