config.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import copy
  2. import os
  3. from typing import Any, Dict, Tuple
  4. from ray._common.utils import get_ray_temp_dir
  5. from ray.autoscaler._private.cli_logger import cli_logger
  6. unsupported_field_message = "The field {} is not supported for on-premise clusters."
  7. LOCAL_CLUSTER_NODE_TYPE = "local.cluster.node"
  8. def prepare_local(config: Dict[str, Any]) -> Tuple[Dict[str, Any], bool]:
  9. """
  10. Prepare local cluster config for ingestion by cluster launcher and
  11. autoscaler.
  12. """
  13. config = copy.deepcopy(config)
  14. for field in "head_node", "worker_nodes", "available_node_types":
  15. if config.get(field):
  16. # If the config already contains the internal node type, it's been prepared via ray up already hence return as-is.
  17. if (
  18. field == "available_node_types"
  19. and LOCAL_CLUSTER_NODE_TYPE in config.get(field, {})
  20. ):
  21. return config, False
  22. err_msg = unsupported_field_message.format(field)
  23. cli_logger.abort(err_msg)
  24. # We use a config with a single node type for on-prem clusters.
  25. # Resources internally detected by Ray are not overridden by the autoscaler
  26. # (see NodeProvider.do_update)
  27. config["available_node_types"] = {
  28. LOCAL_CLUSTER_NODE_TYPE: {"node_config": {}, "resources": {}}
  29. }
  30. config["head_node_type"] = LOCAL_CLUSTER_NODE_TYPE
  31. if "coordinator_address" in config["provider"]:
  32. config = prepare_coordinator(config)
  33. else:
  34. config = prepare_manual(config)
  35. return config, True
  36. def prepare_coordinator(config: Dict[str, Any]) -> Dict[str, Any]:
  37. config = copy.deepcopy(config)
  38. # User should explicitly set the max number of workers for the coordinator
  39. # to allocate.
  40. if "max_workers" not in config:
  41. cli_logger.abort(
  42. "The field `max_workers` is required when using an "
  43. "automatically managed on-premise cluster."
  44. )
  45. node_type = config["available_node_types"][LOCAL_CLUSTER_NODE_TYPE]
  46. # The autoscaler no longer uses global `min_workers`.
  47. # Move `min_workers` to the node_type config.
  48. node_type["min_workers"] = config.pop("min_workers", 0)
  49. node_type["max_workers"] = config["max_workers"]
  50. return config
  51. def prepare_manual(config: Dict[str, Any]) -> Dict[str, Any]:
  52. """Validates and sets defaults for configs of manually managed on-prem
  53. clusters.
  54. - Checks for presence of required `worker_ips` and `head_ips` fields.
  55. - Defaults min and max workers to the number of `worker_ips`.
  56. - Caps min and max workers at the number of `worker_ips`.
  57. - Writes min and max worker info into the single worker node type.
  58. """
  59. config = copy.deepcopy(config)
  60. if ("worker_ips" not in config["provider"]) or (
  61. "head_ip" not in config["provider"]
  62. ):
  63. cli_logger.abort(
  64. "Please supply a `head_ip` and list of `worker_ips`. "
  65. "Alternatively, supply a `coordinator_address`."
  66. )
  67. num_ips = len(config["provider"]["worker_ips"])
  68. node_type = config["available_node_types"][LOCAL_CLUSTER_NODE_TYPE]
  69. # Default to keeping all provided ips in the cluster.
  70. config.setdefault("max_workers", num_ips)
  71. # The autoscaler no longer uses global `min_workers`.
  72. # We will move `min_workers` to the node_type config.
  73. min_workers = config.pop("min_workers", num_ips)
  74. max_workers = config["max_workers"]
  75. if min_workers > num_ips:
  76. cli_logger.warning(
  77. f"The value of `min_workers` supplied ({min_workers}) is greater"
  78. f" than the number of available worker ips ({num_ips})."
  79. f" Setting `min_workers={num_ips}`."
  80. )
  81. node_type["min_workers"] = num_ips
  82. else:
  83. node_type["min_workers"] = min_workers
  84. if max_workers > num_ips:
  85. cli_logger.warning(
  86. f"The value of `max_workers` supplied ({max_workers}) is greater"
  87. f" than the number of available worker ips ({num_ips})."
  88. f" Setting `max_workers={num_ips}`."
  89. )
  90. node_type["max_workers"] = num_ips
  91. config["max_workers"] = num_ips
  92. else:
  93. node_type["max_workers"] = max_workers
  94. if max_workers < num_ips:
  95. cli_logger.warning(
  96. f"The value of `max_workers` supplied ({max_workers}) is less"
  97. f" than the number of available worker ips ({num_ips})."
  98. f" At most {max_workers} Ray worker nodes will connect to the cluster."
  99. )
  100. return config
  101. def get_lock_path(cluster_name: str) -> str:
  102. return os.path.join(get_ray_temp_dir(), "cluster-{}.lock".format(cluster_name))
  103. def get_state_path(cluster_name: str) -> str:
  104. return os.path.join(get_ray_temp_dir(), "cluster-{}.state".format(cluster_name))
  105. def bootstrap_local(config: Dict[str, Any]) -> Dict[str, Any]:
  106. return config