| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- import copy
- import os
- from typing import Any, Dict, Tuple
- from ray._common.utils import get_ray_temp_dir
- from ray.autoscaler._private.cli_logger import cli_logger
- unsupported_field_message = "The field {} is not supported for on-premise clusters."
- LOCAL_CLUSTER_NODE_TYPE = "local.cluster.node"
- def prepare_local(config: Dict[str, Any]) -> Tuple[Dict[str, Any], bool]:
- """
- Prepare local cluster config for ingestion by cluster launcher and
- autoscaler.
- """
- config = copy.deepcopy(config)
- for field in "head_node", "worker_nodes", "available_node_types":
- if config.get(field):
- # If the config already contains the internal node type, it's been prepared via ray up already hence return as-is.
- if (
- field == "available_node_types"
- and LOCAL_CLUSTER_NODE_TYPE in config.get(field, {})
- ):
- return config, False
- err_msg = unsupported_field_message.format(field)
- cli_logger.abort(err_msg)
- # We use a config with a single node type for on-prem clusters.
- # Resources internally detected by Ray are not overridden by the autoscaler
- # (see NodeProvider.do_update)
- config["available_node_types"] = {
- LOCAL_CLUSTER_NODE_TYPE: {"node_config": {}, "resources": {}}
- }
- config["head_node_type"] = LOCAL_CLUSTER_NODE_TYPE
- if "coordinator_address" in config["provider"]:
- config = prepare_coordinator(config)
- else:
- config = prepare_manual(config)
- return config, True
- def prepare_coordinator(config: Dict[str, Any]) -> Dict[str, Any]:
- config = copy.deepcopy(config)
- # User should explicitly set the max number of workers for the coordinator
- # to allocate.
- if "max_workers" not in config:
- cli_logger.abort(
- "The field `max_workers` is required when using an "
- "automatically managed on-premise cluster."
- )
- node_type = config["available_node_types"][LOCAL_CLUSTER_NODE_TYPE]
- # The autoscaler no longer uses global `min_workers`.
- # Move `min_workers` to the node_type config.
- node_type["min_workers"] = config.pop("min_workers", 0)
- node_type["max_workers"] = config["max_workers"]
- return config
- def prepare_manual(config: Dict[str, Any]) -> Dict[str, Any]:
- """Validates and sets defaults for configs of manually managed on-prem
- clusters.
- - Checks for presence of required `worker_ips` and `head_ips` fields.
- - Defaults min and max workers to the number of `worker_ips`.
- - Caps min and max workers at the number of `worker_ips`.
- - Writes min and max worker info into the single worker node type.
- """
- config = copy.deepcopy(config)
- if ("worker_ips" not in config["provider"]) or (
- "head_ip" not in config["provider"]
- ):
- cli_logger.abort(
- "Please supply a `head_ip` and list of `worker_ips`. "
- "Alternatively, supply a `coordinator_address`."
- )
- num_ips = len(config["provider"]["worker_ips"])
- node_type = config["available_node_types"][LOCAL_CLUSTER_NODE_TYPE]
- # Default to keeping all provided ips in the cluster.
- config.setdefault("max_workers", num_ips)
- # The autoscaler no longer uses global `min_workers`.
- # We will move `min_workers` to the node_type config.
- min_workers = config.pop("min_workers", num_ips)
- max_workers = config["max_workers"]
- if min_workers > num_ips:
- cli_logger.warning(
- f"The value of `min_workers` supplied ({min_workers}) is greater"
- f" than the number of available worker ips ({num_ips})."
- f" Setting `min_workers={num_ips}`."
- )
- node_type["min_workers"] = num_ips
- else:
- node_type["min_workers"] = min_workers
- if max_workers > num_ips:
- cli_logger.warning(
- f"The value of `max_workers` supplied ({max_workers}) is greater"
- f" than the number of available worker ips ({num_ips})."
- f" Setting `max_workers={num_ips}`."
- )
- node_type["max_workers"] = num_ips
- config["max_workers"] = num_ips
- else:
- node_type["max_workers"] = max_workers
- if max_workers < num_ips:
- cli_logger.warning(
- f"The value of `max_workers` supplied ({max_workers}) is less"
- f" than the number of available worker ips ({num_ips})."
- f" At most {max_workers} Ray worker nodes will connect to the cluster."
- )
- return config
- def get_lock_path(cluster_name: str) -> str:
- return os.path.join(get_ray_temp_dir(), "cluster-{}.lock".format(cluster_name))
- def get_state_path(cluster_name: str) -> str:
- return os.path.join(get_ray_temp_dir(), "cluster-{}.state".format(cluster_name))
- def bootstrap_local(config: Dict[str, Any]) -> Dict[str, Any]:
- return config
|