sdk.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. """IMPORTANT: this is an experimental interface and not currently stable."""
  2. import json
  3. import os
  4. import tempfile
  5. from contextlib import contextmanager
  6. from typing import Any, Callable, Dict, Iterator, List, Optional, Union
  7. from ray._private.label_utils import validate_label_selector
  8. from ray.autoscaler._private import commands
  9. from ray.autoscaler._private.cli_logger import cli_logger
  10. from ray.autoscaler._private.event_system import (
  11. CreateClusterEvent, # noqa: F401
  12. global_event_system, # noqa: F401
  13. )
  14. from ray.util.annotations import DeveloperAPI
  15. @DeveloperAPI
  16. def create_or_update_cluster(
  17. cluster_config: Union[dict, str],
  18. *,
  19. no_restart: bool = False,
  20. restart_only: bool = False,
  21. no_config_cache: bool = False,
  22. ) -> Dict[str, Any]:
  23. """Create or updates an autoscaling Ray cluster from a config json.
  24. Args:
  25. cluster_config (Union[str, dict]): Either the config dict of the
  26. cluster, or a path pointing to a file containing the config.
  27. no_restart: Whether to skip restarting Ray services during the
  28. update. This avoids interrupting running jobs and can be used to
  29. dynamically adjust autoscaler configuration.
  30. restart_only: Whether to skip running setup commands and only
  31. restart Ray. This cannot be used with 'no-restart'.
  32. no_config_cache: Whether to disable the config cache and fully
  33. resolve all environment settings from the Cloud provider again.
  34. """
  35. with _as_config_file(cluster_config) as config_file:
  36. return commands.create_or_update_cluster(
  37. config_file=config_file,
  38. override_min_workers=None,
  39. override_max_workers=None,
  40. no_restart=no_restart,
  41. restart_only=restart_only,
  42. yes=True,
  43. override_cluster_name=None,
  44. no_config_cache=no_config_cache,
  45. redirect_command_output=None,
  46. use_login_shells=True,
  47. )
  48. @DeveloperAPI
  49. def teardown_cluster(
  50. cluster_config: Union[dict, str],
  51. workers_only: bool = False,
  52. keep_min_workers: bool = False,
  53. ) -> None:
  54. """Destroys all nodes of a Ray cluster described by a config json.
  55. Args:
  56. cluster_config (Union[str, dict]): Either the config dict of the
  57. cluster, or a path pointing to a file containing the config.
  58. workers_only: Whether to keep the head node running and only
  59. teardown worker nodes.
  60. keep_min_workers: Whether to keep min_workers (as specified
  61. in the YAML) still running.
  62. """
  63. with _as_config_file(cluster_config) as config_file:
  64. return commands.teardown_cluster(
  65. config_file=config_file,
  66. yes=True,
  67. workers_only=workers_only,
  68. override_cluster_name=None,
  69. keep_min_workers=keep_min_workers,
  70. )
  71. @DeveloperAPI
  72. def run_on_cluster(
  73. cluster_config: Union[dict, str],
  74. *,
  75. cmd: Optional[str] = None,
  76. run_env: str = "auto",
  77. tmux: bool = False,
  78. stop: bool = False,
  79. no_config_cache: bool = False,
  80. port_forward: Optional[commands.Port_forward] = None,
  81. with_output: bool = False,
  82. ) -> Optional[str]:
  83. """Runs a command on the specified cluster.
  84. Args:
  85. cluster_config (Union[str, dict]): Either the config dict of the
  86. cluster, or a path pointing to a file containing the config.
  87. cmd: the command to run, or None for a no-op command.
  88. run_env: whether to run the command on the host or in a
  89. container. Select between "auto", "host" and "docker".
  90. tmux: whether to run in a tmux session
  91. stop: whether to stop the cluster after command run
  92. no_config_cache: Whether to disable the config cache and fully
  93. resolve all environment settings from the Cloud provider again.
  94. port_forward ( (int,int) or list[(int,int)]): port(s) to forward.
  95. with_output: Whether to capture command output.
  96. Returns:
  97. The output of the command as a string.
  98. """
  99. with _as_config_file(cluster_config) as config_file:
  100. return commands.exec_cluster(
  101. config_file,
  102. cmd=cmd,
  103. run_env=run_env,
  104. screen=False,
  105. tmux=tmux,
  106. stop=stop,
  107. start=False,
  108. override_cluster_name=None,
  109. no_config_cache=no_config_cache,
  110. port_forward=port_forward,
  111. with_output=with_output,
  112. )
  113. @DeveloperAPI
  114. def rsync(
  115. cluster_config: Union[dict, str],
  116. *,
  117. source: Optional[str],
  118. target: Optional[str],
  119. down: bool,
  120. ip_address: Optional[str] = None,
  121. use_internal_ip: bool = False,
  122. no_config_cache: bool = False,
  123. should_bootstrap: bool = True,
  124. ):
  125. """Rsyncs files to or from the cluster.
  126. Args:
  127. cluster_config (Union[str, dict]): Either the config dict of the
  128. cluster, or a path pointing to a file containing the config.
  129. source: rsync source argument.
  130. target: rsync target argument.
  131. down: whether we're syncing remote -> local.
  132. ip_address: Address of node.
  133. use_internal_ip: Whether the provided ip_address is
  134. public or private.
  135. no_config_cache: Whether to disable the config cache and fully
  136. resolve all environment settings from the Cloud provider again.
  137. should_bootstrap: whether to bootstrap cluster config before syncing
  138. Raises:
  139. RuntimeError: If the cluster head node is not found.
  140. """
  141. with _as_config_file(cluster_config) as config_file:
  142. return commands.rsync(
  143. config_file=config_file,
  144. source=source,
  145. target=target,
  146. override_cluster_name=None,
  147. down=down,
  148. ip_address=ip_address,
  149. use_internal_ip=use_internal_ip,
  150. no_config_cache=no_config_cache,
  151. all_nodes=False,
  152. should_bootstrap=should_bootstrap,
  153. )
  154. @DeveloperAPI
  155. def get_head_node_ip(cluster_config: Union[dict, str]) -> str:
  156. """Returns head node IP for given configuration file if exists.
  157. Args:
  158. cluster_config (Union[str, dict]): Either the config dict of the
  159. cluster, or a path pointing to a file containing the config.
  160. Returns:
  161. The ip address of the cluster head node.
  162. Raises:
  163. RuntimeError: If the cluster is not found.
  164. """
  165. with _as_config_file(cluster_config) as config_file:
  166. return commands.get_head_node_ip(config_file)
  167. @DeveloperAPI
  168. def get_worker_node_ips(cluster_config: Union[dict, str]) -> List[str]:
  169. """Returns worker node IPs for given configuration file.
  170. Args:
  171. cluster_config (Union[str, dict]): Either the config dict of the
  172. cluster, or a path pointing to a file containing the config.
  173. Returns:
  174. List of worker node ip addresses.
  175. Raises:
  176. RuntimeError: If the cluster is not found.
  177. """
  178. with _as_config_file(cluster_config) as config_file:
  179. return commands.get_worker_node_ips(config_file)
  180. @DeveloperAPI
  181. def request_resources(
  182. num_cpus: Optional[int] = None,
  183. bundles: Optional[List[dict]] = None,
  184. bundle_label_selectors: Optional[List[dict]] = None,
  185. ) -> None:
  186. """Command the autoscaler to scale to accommodate the specified requests.
  187. The cluster will immediately attempt to scale to accommodate the requested
  188. resources, bypassing normal upscaling speed constraints. This takes into
  189. account existing resource usage.
  190. For example, suppose you call ``request_resources(num_cpus=100)`` and
  191. there are 45 currently running tasks, each requiring 1 CPU. Then, enough
  192. nodes will be added so up to 100 tasks can run concurrently. It does
  193. **not** add enough nodes so that 145 tasks can run.
  194. This call is only a hint to the autoscaler. The actual resulting cluster
  195. size may be slightly larger or smaller than expected depending on the
  196. internal bin packing algorithm and max worker count restrictions.
  197. Args:
  198. num_cpus: Scale the cluster to ensure this number of CPUs are
  199. available. This request is persistent until another call to
  200. request_resources() is made to override.
  201. bundles (List[ResourceDict]): Scale the cluster to ensure this set of
  202. resource shapes can fit. This request is persistent until another
  203. call to request_resources() is made to override.
  204. bundle_label_selectors: A list of label selectors, applied per-bundle to the same
  205. index in the `bundles` list. For bundles without a label requirement, the
  206. corresponding item in the list is an empty dictionary. For each bundle.
  207. Label selectors consist of zero or more key-value pairs where the key is
  208. a label and the value is a operator (in, !in, etc.) and label value.
  209. Examples:
  210. >>> from ray.autoscaler.sdk import request_resources
  211. >>> # Request 1000 CPUs.
  212. >>> request_resources(num_cpus=1000) # doctest: +SKIP
  213. >>> # Request 64 CPUs and also fit a 1-GPU/4-CPU task.
  214. >>> request_resources( # doctest: +SKIP
  215. ... num_cpus=64, bundles=[{"GPU": 1, "CPU": 4}])
  216. >>> # Same as requesting num_cpus=3.
  217. >>> request_resources( # doctest: +SKIP
  218. ... bundles=[{"CPU": 1}, {"CPU": 1}, {"CPU": 1}])
  219. >>> # Requests 2 num_cpus=1 bundles, the first with
  220. >>> # label_selector={"accelerator-type": "in(A100)"} and second with
  221. >>> # label_selector={"market-type": "spot"}.
  222. >>> request_resources( # doctest: +SKIP
  223. ... bundles=[{"CPU": 1}, {"CPU": 1}]),
  224. ... bundle_label_selectors=[{"accelerator-type": "in(A100)"},
  225. ... {"market-type": "spot"}])
  226. """
  227. if num_cpus is not None and not isinstance(num_cpus, int):
  228. raise TypeError("num_cpus should be of type int.")
  229. if bundles is not None:
  230. if isinstance(bundles, List):
  231. for bundle in bundles:
  232. if isinstance(bundle, Dict):
  233. for key in bundle.keys():
  234. if not (isinstance(key, str) and isinstance(bundle[key], int)):
  235. raise TypeError(
  236. "each bundle key should be str and value as int."
  237. )
  238. else:
  239. raise TypeError("each bundle should be a Dict.")
  240. else:
  241. raise TypeError("bundles should be of type List")
  242. if bundle_label_selectors is not None:
  243. if bundles is None:
  244. raise ValueError(
  245. "`bundles` must be provided when `bundle_label_selectors` is specified."
  246. )
  247. if len(bundle_label_selectors) != len(bundles):
  248. raise ValueError(
  249. "`bundle_label_selector` must be a list with length equal to the number of bundles."
  250. )
  251. for label_selector in bundle_label_selectors:
  252. if (
  253. not isinstance(label_selector, dict)
  254. or not all(isinstance(k, str) for k in label_selector.keys())
  255. or not all(isinstance(v, str) for v in label_selector.values())
  256. ):
  257. raise ValueError(
  258. "Bundle label selector must be a list of string dictionary"
  259. " label selectors. For example: "
  260. '`[{ray.io/market_type": "spot"}, {"ray.io/accelerator-type": "A100"}]`.'
  261. )
  262. error_message = validate_label_selector(label_selector)
  263. if error_message:
  264. raise ValueError(
  265. f"Invalid label selector provided in bundle_label_selectors list."
  266. f" Detailed error: '{error_message}'"
  267. )
  268. return commands.request_resources(num_cpus, bundles, bundle_label_selectors)
  269. @DeveloperAPI
  270. def configure_logging(
  271. log_style: Optional[str] = None,
  272. color_mode: Optional[str] = None,
  273. verbosity: Optional[int] = None,
  274. ):
  275. """Configures logging for cluster command calls.
  276. Args:
  277. log_style: If 'pretty', outputs with formatting and color.
  278. If 'record', outputs record-style without formatting.
  279. 'auto' defaults to 'pretty', and disables pretty logging
  280. if stdin is *not* a TTY. Defaults to "auto".
  281. color_mode (str):
  282. Can be "true", "false", or "auto".
  283. Enables or disables `colorful`.
  284. If `color_mode` is "auto", is set to `not stdout.isatty()`
  285. vebosity (int):
  286. Output verbosity (0, 1, 2, 3).
  287. Low verbosity will disable `verbose` and `very_verbose` messages.
  288. """
  289. cli_logger.configure(
  290. log_style=log_style, color_mode=color_mode, verbosity=verbosity
  291. )
  292. @contextmanager
  293. @DeveloperAPI
  294. def _as_config_file(cluster_config: Union[dict, str]) -> Iterator[str]:
  295. if isinstance(cluster_config, dict):
  296. tmp = tempfile.NamedTemporaryFile("w", prefix="autoscaler-sdk-tmp-")
  297. tmp.write(json.dumps(cluster_config))
  298. tmp.flush()
  299. cluster_config = tmp.name
  300. if not os.path.exists(cluster_config):
  301. raise ValueError("Cluster config not found {}".format(cluster_config))
  302. yield cluster_config
  303. @DeveloperAPI
  304. def bootstrap_config(
  305. cluster_config: Dict[str, Any], no_config_cache: bool = False
  306. ) -> Dict[str, Any]:
  307. """Validate and add provider-specific fields to the config. For example,
  308. IAM/authentication may be added here."""
  309. return commands._bootstrap_config(cluster_config, no_config_cache)
  310. @DeveloperAPI
  311. def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
  312. """Fillout default values for a cluster_config based on the provider."""
  313. from ray.autoscaler._private.util import fillout_defaults
  314. return fillout_defaults(config)
  315. @DeveloperAPI
  316. def register_callback_handler(
  317. event_name: str,
  318. callback: Union[Callable[[Dict], None], List[Callable[[Dict], None]]],
  319. ) -> None:
  320. """Registers a callback handler for autoscaler events.
  321. Args:
  322. event_name: Event that callback should be called on. See
  323. CreateClusterEvent for details on the events available to be
  324. registered against.
  325. callback: Callable object that is invoked
  326. when specified event occurs.
  327. """
  328. global_event_system.add_callback_handler(event_name, callback)
  329. @DeveloperAPI
  330. def get_docker_host_mount_location(cluster_name: str) -> str:
  331. """Return host path that Docker mounts attach to."""
  332. docker_mount_prefix = "/tmp/ray_tmp_mount/{cluster_name}"
  333. return docker_mount_prefix.format(cluster_name=cluster_name)