util.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056
  1. import base64
  2. import collections
  3. import copy
  4. import hashlib
  5. import json
  6. import logging
  7. import os
  8. import sys
  9. import threading
  10. from dataclasses import dataclass
  11. from datetime import datetime
  12. from io import StringIO
  13. from numbers import Number, Real
  14. from typing import Any, Dict, List, Optional, Tuple, Union
  15. import ray
  16. import ray._private.services as services
  17. from ray._common.utils import PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME
  18. from ray._private.utils import (
  19. PLACEMENT_GROUP_INDEXED_BUNDLED_RESOURCE_PATTERN,
  20. PLACEMENT_GROUP_WILDCARD_RESOURCE_PATTERN,
  21. )
  22. from ray.autoscaler._private import constants
  23. from ray.autoscaler._private.cli_logger import cli_logger
  24. from ray.autoscaler._private.docker import validate_docker_config
  25. from ray.autoscaler._private.local.config import prepare_local
  26. from ray.autoscaler._private.providers import _get_default_config
  27. from ray.autoscaler.tags import NODE_TYPE_LEGACY_HEAD, NODE_TYPE_LEGACY_WORKER
  28. REQUIRED, OPTIONAL = True, False
  29. HEAD_TYPE_MAX_WORKERS_WARN_TEMPLATE = (
  30. "Setting `max_workers` for node type"
  31. " `{node_type}` to the global `max_workers` value of {max_workers}. To"
  32. " avoid spawning worker nodes of type `{node_type}`, explicitly set"
  33. " `max_workers: 0` for `{node_type}`.\n"
  34. "Note that `max_workers: 0` was the default value prior to Ray 1.3.0."
  35. " Your current version is Ray {version}.\n"
  36. "See the docs for more information:\n"
  37. "https://docs.ray.io/en/master/cluster/config.html"
  38. "#cluster-configuration-node-max-workers\n"
  39. "https://docs.ray.io/en/master/cluster/config.html#full-configuration"
  40. )
  41. ResourceBundle = Dict[str, Union[int, float]]
  42. # A Dict and the count of how many times it occurred.
  43. # Refer to freq_of_dicts() below.
  44. DictCount = Tuple[Dict, Number]
  45. # e.g., cpu_4_ondemand.
  46. NodeType = str
  47. # e.g., head, worker, unmanaged
  48. NodeKind = str
  49. # e.g., {"resources": ..., "max_workers": ...}.
  50. NodeTypeConfigDict = Dict[str, Any]
  51. # e.g., {"GPU": 1}.
  52. ResourceDict = Dict[str, Real]
  53. # e.g., "node-1".
  54. NodeID = str
  55. # e.g., "127.0.0.1".
  56. NodeIP = str
  57. # Number of nodes to launch
  58. NodeCount = int
  59. # e.g. "up-to-date", "update-failed"
  60. # See autoscaler/tags.py for other status
  61. # values used by the autoscaler.
  62. NodeStatus = str
  63. Usage = Dict[str, Tuple[Number, Number]]
  64. logger = logging.getLogger(__name__)
  65. def base32hex(data: bytes) -> str:
  66. """Encode bytes using base32hex, without padding and in lower case.
  67. This is used to create a shorter hash string that is compatible with
  68. GCP label value constraints (<= 63 chars, lowercase, no padding).
  69. """
  70. return base64.b32hexencode(data).decode("ascii").lower().rstrip("=")
  71. def is_placement_group_resource(resource_name: str) -> bool:
  72. """
  73. Check if a resource name is structured like a placement group.
  74. """
  75. return bool(
  76. PLACEMENT_GROUP_WILDCARD_RESOURCE_PATTERN.match(resource_name)
  77. or PLACEMENT_GROUP_INDEXED_BUNDLED_RESOURCE_PATTERN.match(resource_name)
  78. )
  79. @dataclass
  80. class LoadMetricsSummary:
  81. # Map of resource name (e.g. "memory") to pair of (Used, Available) numbers
  82. usage: Usage
  83. # Counts of demand bundles from task/actor demand.
  84. # e.g. [({"CPU": 1}, 5), ({"GPU":1}, 2)]
  85. resource_demand: List[DictCount]
  86. # Counts of pending placement groups
  87. pg_demand: List[DictCount]
  88. # Counts of demand bundles requested by autoscaler.sdk.request_resources
  89. request_demand: List[DictCount]
  90. node_types: List[DictCount]
  91. # Optionally included for backwards compatibility: IP of the head node. See
  92. # https://github.com/ray-project/ray/pull/20623 for details.
  93. head_ip: Optional[NodeIP] = None
  94. # Optionally included for backwards compatibility: Resource breakdown by
  95. # node. Mapping from node id to resource usage.
  96. usage_by_node: Optional[Dict[str, Usage]] = None
  97. # A mapping from node name (the same key as `usage_by_node`) to node type.
  98. # Optional for deployment modes which have the concept of node types and
  99. # backwards compatibility.
  100. node_type_mapping: Optional[Dict[str, str]] = None
  101. idle_time_map: Optional[Dict[str, int]] = None
  102. class ConcurrentCounter:
  103. def __init__(self):
  104. self._lock = threading.RLock()
  105. self._counter = collections.defaultdict(int)
  106. def inc(self, key, count):
  107. with self._lock:
  108. self._counter[key] += count
  109. return self.value
  110. def dec(self, key, count):
  111. with self._lock:
  112. self._counter[key] -= count
  113. assert self._counter[key] >= 0, "counter cannot go negative"
  114. return self.value
  115. def breakdown(self):
  116. with self._lock:
  117. return dict(self._counter)
  118. @property
  119. def value(self):
  120. with self._lock:
  121. return sum(self._counter.values())
  122. def validate_config(config: Dict[str, Any]) -> None:
  123. """Required Dicts indicate that no extra fields can be introduced."""
  124. if not isinstance(config, dict):
  125. raise ValueError("Config {} is not a dictionary".format(config))
  126. schema_path = os.path.join(
  127. os.path.dirname(ray.autoscaler.__file__), "ray-schema.json"
  128. )
  129. with open(schema_path) as f:
  130. schema = json.load(f)
  131. try:
  132. import jsonschema
  133. except (ModuleNotFoundError, ImportError) as e:
  134. # Don't log a warning message here. Logging be handled by upstream.
  135. raise e from None
  136. try:
  137. jsonschema.validate(config, schema)
  138. except jsonschema.ValidationError as e:
  139. raise e from None
  140. # Detect out of date defaults. This happens when the autoscaler that filled
  141. # out the default values is older than the version of the autoscaler that
  142. # is running on the cluster.
  143. if "cluster_synced_files" not in config:
  144. raise RuntimeError(
  145. "Missing 'cluster_synced_files' field in the cluster "
  146. "configuration. This is likely due to the Ray version running "
  147. "in the cluster {ray_version} is greater than the Ray version "
  148. "running on your laptop. Please try updating Ray on your local "
  149. "machine and make sure the versions match.".format(
  150. ray_version=ray.__version__
  151. )
  152. )
  153. if "available_node_types" in config:
  154. if "head_node_type" not in config:
  155. raise ValueError(
  156. "You must specify `head_node_type` if `available_node_types is set."
  157. )
  158. if config["head_node_type"] not in config["available_node_types"]:
  159. raise ValueError("`head_node_type` must be one of `available_node_types`.")
  160. sum_min_workers = sum(
  161. config["available_node_types"][node_type].get("min_workers", 0)
  162. for node_type in config["available_node_types"]
  163. )
  164. if sum_min_workers > config["max_workers"]:
  165. raise ValueError(
  166. "The specified global `max_workers` is smaller than the "
  167. "sum of `min_workers` of all the available node types."
  168. )
  169. if sys.platform == "win32" and config.get("file_mounts_sync_continuously", False):
  170. raise ValueError(
  171. "`file_mounts_sync_continuously` is not supported on Windows. "
  172. "Please set this to False when running on Windows."
  173. )
  174. def check_legacy_fields(config: Dict[str, Any]) -> None:
  175. """For use in providers that have completed the migration to
  176. available_node_types.
  177. Warns user that head_node and worker_nodes fields are being ignored.
  178. Throws an error if available_node_types and head_node_type aren't
  179. specified.
  180. """
  181. # log warning if non-empty head_node field
  182. if "head_node" in config and config["head_node"]:
  183. cli_logger.warning(
  184. "The `head_node` field is deprecated and will be ignored. "
  185. "Use `head_node_type` and `available_node_types` instead."
  186. )
  187. # log warning if non-empty worker_nodes field
  188. if "worker_nodes" in config and config["worker_nodes"]:
  189. cli_logger.warning(
  190. "The `worker_nodes` field is deprecated and will be ignored. "
  191. "Use `available_node_types` instead."
  192. )
  193. if "available_node_types" not in config:
  194. cli_logger.error("`available_node_types` not specified in config")
  195. raise ValueError("`available_node_types` not specified in config")
  196. if "head_node_type" not in config:
  197. cli_logger.error("`head_node_type` not specified in config")
  198. raise ValueError("`head_node_type` not specified in config")
  199. def prepare_config(config: Dict[str, Any]) -> Dict[str, Any]:
  200. """
  201. The returned config has the following properties:
  202. - Uses the multi-node-type autoscaler configuration.
  203. - Merged with the appropriate defaults.yaml
  204. - Has a valid Docker configuration if provided.
  205. - Has max_worker set for each node type.
  206. """
  207. is_local = config.get("provider", {}).get("type") == "local"
  208. is_kuberay = config.get("provider", {}).get("type") == "kuberay"
  209. if is_local:
  210. config, modified = prepare_local(config)
  211. # If the config is already prepared via ray up, return it as is.
  212. if not modified:
  213. return config
  214. elif is_kuberay:
  215. # With KubeRay, we don't need to do anything here since KubeRay
  216. # generate the autoscaler config from the RayCluster CR instead
  217. # of loading from the files.
  218. return config
  219. with_defaults = fillout_defaults(config)
  220. merge_setup_commands(with_defaults)
  221. validate_docker_config(with_defaults)
  222. fill_node_type_min_max_workers(with_defaults)
  223. return with_defaults
  224. def translate_trivial_legacy_config(config: Dict[str, Any]):
  225. """
  226. Drop empty deprecated fields ("head_node" and "worker_node").
  227. """
  228. REMOVABLE_FIELDS = ["head_node", "worker_nodes"]
  229. for field in REMOVABLE_FIELDS:
  230. if field in config and not config[field]:
  231. logger.warning(
  232. f"Dropping the empty legacy field {field}. {field}"
  233. "is not supported for ray>=2.0.0. It is recommended to remove"
  234. f"{field} from the cluster config."
  235. )
  236. del config[field]
  237. def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
  238. defaults = _get_default_config(config["provider"])
  239. defaults.update(config)
  240. # Just for clarity:
  241. merged_config = copy.deepcopy(defaults)
  242. # Fill auth field to avoid key errors.
  243. # This field is accessed when calling NodeUpdater but is not relevant to
  244. # certain node providers and is thus left out of some cluster launching
  245. # configs.
  246. merged_config["auth"] = merged_config.get("auth", {})
  247. # A legacy config is one which doesn't have available_node_types,
  248. # but has at least one of head_node or worker_nodes.
  249. is_legacy_config = ("available_node_types" not in config) and (
  250. "head_node" in config or "worker_nodes" in config
  251. )
  252. # Do merging logic for legacy configs.
  253. if is_legacy_config:
  254. merged_config = merge_legacy_yaml_with_defaults(merged_config)
  255. # Take care of this here, in case a config does not specify any of head,
  256. # workers, node types, but does specify min workers:
  257. merged_config.pop("min_workers", None)
  258. translate_trivial_legacy_config(merged_config)
  259. return merged_config
  260. def merge_legacy_yaml_with_defaults(merged_config: Dict[str, Any]) -> Dict[str, Any]:
  261. """Rewrite legacy config's available node types after it has been merged
  262. with defaults yaml.
  263. """
  264. cli_logger.warning(
  265. "Converting legacy cluster config to a multi node type cluster "
  266. "config. Multi-node-type cluster configs are the recommended "
  267. "format for configuring Ray clusters. "
  268. "See the docs for more information:\n"
  269. "https://docs.ray.io/en/master/cluster/config.html#full-configuration"
  270. )
  271. # Get default head and worker types.
  272. default_head_type = merged_config["head_node_type"]
  273. # Default configs are assumed to have two node types -- one for the head
  274. # and one for the workers.
  275. assert len(merged_config["available_node_types"].keys()) == 2
  276. default_worker_type = (
  277. merged_config["available_node_types"].keys() - {default_head_type}
  278. ).pop()
  279. if merged_config["head_node"]:
  280. # User specified a head node in legacy config.
  281. # Convert it into data for the head's node type.
  282. head_node_info = {
  283. "node_config": merged_config["head_node"],
  284. "resources": merged_config["head_node"].get("resources") or {},
  285. "min_workers": 0,
  286. "max_workers": 0,
  287. }
  288. else:
  289. # Use default data for the head's node type.
  290. head_node_info = merged_config["available_node_types"][default_head_type]
  291. if merged_config["worker_nodes"]:
  292. # User specified a worker node in legacy config.
  293. # Convert it into data for the workers' node type.
  294. worker_node_info = {
  295. "node_config": merged_config["worker_nodes"],
  296. "resources": merged_config["worker_nodes"].get("resources") or {},
  297. "min_workers": merged_config.get("min_workers", 0),
  298. "max_workers": merged_config["max_workers"],
  299. }
  300. else:
  301. # Use default data for the workers' node type.
  302. worker_node_info = merged_config["available_node_types"][default_worker_type]
  303. # Rewrite available_node_types.
  304. merged_config["available_node_types"] = {
  305. NODE_TYPE_LEGACY_HEAD: head_node_info,
  306. NODE_TYPE_LEGACY_WORKER: worker_node_info,
  307. }
  308. merged_config["head_node_type"] = NODE_TYPE_LEGACY_HEAD
  309. # Resources field in head/worker fields cause node launch to fail.
  310. merged_config["head_node"].pop("resources", None)
  311. merged_config["worker_nodes"].pop("resources", None)
  312. return merged_config
  313. def merge_setup_commands(config):
  314. config["head_setup_commands"] = (
  315. config["setup_commands"] + config["head_setup_commands"]
  316. )
  317. config["worker_setup_commands"] = (
  318. config["setup_commands"] + config["worker_setup_commands"]
  319. )
  320. return config
  321. def fill_node_type_min_max_workers(config):
  322. """Sets default per-node max workers to global max_workers.
  323. This equivalent to setting the default per-node max workers to infinity,
  324. with the only upper constraint coming from the global max_workers.
  325. Sets default per-node min workers to zero.
  326. Also sets default max_workers for the head node to zero.
  327. """
  328. assert "max_workers" in config, "Global max workers should be set."
  329. node_types = config["available_node_types"]
  330. for node_type_name in node_types:
  331. node_type_data = node_types[node_type_name]
  332. node_type_data.setdefault("min_workers", 0)
  333. if "max_workers" not in node_type_data:
  334. if node_type_name == config["head_node_type"]:
  335. logger.info("setting max workers for head node type to 0")
  336. node_type_data.setdefault("max_workers", 0)
  337. else:
  338. global_max_workers = config["max_workers"]
  339. logger.info(
  340. f"setting max workers for {node_type_name} to "
  341. f"{global_max_workers}"
  342. )
  343. node_type_data.setdefault("max_workers", global_max_workers)
  344. def with_envs(cmds: List[str], kv: Dict[str, str]) -> str:
  345. """
  346. Returns a list of commands with the given environment variables set.
  347. Args:
  348. cmds (List[str]): List of commands to set environment variables for.
  349. kv (Dict[str, str]): Dictionary of environment variables to set.
  350. Returns:
  351. List[str]: List of commands with the given environment variables set.
  352. Example:
  353. with_envs(["echo $FOO"], {"FOO": "BAR"})
  354. -> ["export FOO=BAR; echo $FOO"]
  355. """
  356. out_cmds = []
  357. for cmd in cmds:
  358. kv_str = ""
  359. for k, v in kv.items():
  360. # We will need to do export here so that it works correctly with
  361. # shell if the cmd args uses the argument.
  362. kv_str += f"export {k}={v}; "
  363. out_cmds.append(f"{kv_str}{cmd}")
  364. return out_cmds
  365. def with_head_node_ip(cmds, head_ip=None):
  366. if head_ip is None:
  367. head_ip = services.get_node_ip_address()
  368. return with_envs(cmds, {"RAY_HEAD_IP": head_ip})
  369. def hash_launch_conf(node_conf, auth):
  370. hasher = hashlib.sha256()
  371. # For hashing, we replace the path to the key with the
  372. # key itself. This is to make sure the hashes are the
  373. # same even if keys live at different locations on different
  374. # machines.
  375. full_auth = auth.copy()
  376. for key_type in ["ssh_private_key", "ssh_public_key"]:
  377. if key_type in auth:
  378. with open(os.path.expanduser(auth[key_type])) as key:
  379. full_auth[key_type] = key.read()
  380. hasher.update(json.dumps([node_conf, full_auth], sort_keys=True).encode("utf-8"))
  381. return base32hex(hasher.digest())
  382. # Cache the file hashes to avoid rescanning it each time. Also, this avoids
  383. # inadvertently restarting workers if the file mount content is mutated on the
  384. # head node.
  385. _hash_cache = {}
  386. def hash_runtime_conf(
  387. file_mounts,
  388. cluster_synced_files,
  389. extra_objs,
  390. generate_file_mounts_contents_hash=False,
  391. ):
  392. """Returns two hashes, a runtime hash and file_mounts_content hash.
  393. The runtime hash is used to determine if the configuration or file_mounts
  394. contents have changed. It is used at launch time (ray up) to determine if
  395. a restart is needed.
  396. The file_mounts_content hash is used to determine if the file_mounts or
  397. cluster_synced_files contents have changed. It is used at monitor time to
  398. determine if additional file syncing is needed.
  399. """
  400. runtime_hasher = hashlib.sha256()
  401. contents_hasher = hashlib.sha256()
  402. def add_content_hashes(path, allow_non_existing_paths: bool = False):
  403. def add_hash_of_file(fpath):
  404. with open(fpath, "rb") as f:
  405. for chunk in iter(lambda: f.read(2**20), b""):
  406. contents_hasher.update(chunk)
  407. path = os.path.expanduser(path)
  408. if allow_non_existing_paths and not os.path.exists(path):
  409. return
  410. if os.path.isdir(path):
  411. dirs = []
  412. for dirpath, _, filenames in os.walk(path):
  413. dirs.append((dirpath, sorted(filenames)))
  414. for dirpath, filenames in sorted(dirs):
  415. contents_hasher.update(dirpath.encode("utf-8"))
  416. for name in filenames:
  417. contents_hasher.update(name.encode("utf-8"))
  418. fpath = os.path.join(dirpath, name)
  419. add_hash_of_file(fpath)
  420. else:
  421. add_hash_of_file(path)
  422. conf_str = json.dumps(file_mounts, sort_keys=True).encode("utf-8") + json.dumps(
  423. extra_objs, sort_keys=True
  424. ).encode("utf-8")
  425. # Only generate a contents hash if generate_contents_hash is true or
  426. # if we need to generate the runtime_hash
  427. if conf_str not in _hash_cache or generate_file_mounts_contents_hash:
  428. for local_path in sorted(file_mounts.values()):
  429. add_content_hashes(local_path)
  430. head_node_contents_hash = base32hex(contents_hasher.digest())
  431. # Generate a new runtime_hash if its not cached
  432. # The runtime hash does not depend on the cluster_synced_files hash
  433. # because we do not want to restart nodes only if cluster_synced_files
  434. # contents have changed.
  435. if conf_str not in _hash_cache:
  436. runtime_hasher.update(conf_str)
  437. runtime_hasher.update(head_node_contents_hash.encode("utf-8"))
  438. _hash_cache[conf_str] = base32hex(runtime_hasher.digest())
  439. # Add cluster_synced_files to the file_mounts_content hash
  440. if cluster_synced_files is not None:
  441. for local_path in sorted(cluster_synced_files):
  442. # For cluster_synced_files, we let the path be non-existant
  443. # because its possible that the source directory gets set up
  444. # anytime over the life of the head node.
  445. add_content_hashes(local_path, allow_non_existing_paths=True)
  446. file_mounts_contents_hash = base32hex(contents_hasher.digest())
  447. else:
  448. file_mounts_contents_hash = None
  449. return (_hash_cache[conf_str], file_mounts_contents_hash)
  450. def add_prefix(info_string, prefix):
  451. """Prefixes each line of info_string, except the first, by prefix."""
  452. lines = info_string.split("\n")
  453. prefixed_lines = [lines[0]]
  454. for line in lines[1:]:
  455. prefixed_line = ":".join([prefix, line])
  456. prefixed_lines.append(prefixed_line)
  457. prefixed_info_string = "\n".join(prefixed_lines)
  458. return prefixed_info_string
  459. def format_pg(pg):
  460. strategy = pg["strategy"]
  461. bundles = pg["bundles"]
  462. shape_strs = []
  463. for bundle, count in bundles:
  464. shape_strs.append(f"{bundle} * {count}")
  465. bundles_str = ", ".join(shape_strs)
  466. return f"{bundles_str} ({strategy})"
  467. def parse_placement_group_resource_str(
  468. placement_group_resource_str: str,
  469. ) -> Tuple[str, Optional[str], bool]:
  470. """Parse placement group resource in the form of following 3 cases:
  471. {resource_name}_group_{bundle_id}_{group_name};
  472. -> This case is ignored as it is duplicated to the case below.
  473. {resource_name}_group_{group_name};
  474. {resource_name}
  475. Returns:
  476. Tuple of (resource_name, placement_group_name, is_countable_resource).
  477. placement_group_name could be None if its not a placement group
  478. resource. is_countable_resource is True if the resource
  479. doesn't contain bundle index. We shouldn't count resources
  480. with bundle index because it will
  481. have duplicated resource information as
  482. wildcard resources (resource name without bundle index).
  483. """
  484. result = PLACEMENT_GROUP_INDEXED_BUNDLED_RESOURCE_PATTERN.match(
  485. placement_group_resource_str
  486. )
  487. if result:
  488. return (result.group(1), result.group(3), False)
  489. result = PLACEMENT_GROUP_WILDCARD_RESOURCE_PATTERN.match(
  490. placement_group_resource_str
  491. )
  492. if result:
  493. return (result.group(1), result.group(2), True)
  494. return (placement_group_resource_str, None, True)
  495. MEMORY_SUFFIXES = [
  496. ("TiB", 2**40),
  497. ("GiB", 2**30),
  498. ("MiB", 2**20),
  499. ("KiB", 2**10),
  500. ]
  501. def format_memory(mem_bytes: Number) -> str:
  502. """Formats memory in bytes in friendly unit. E.g. (2**30 + 1) bytes should
  503. be displayed as 1GiB but 1 byte should be displayed as 1B, (as opposed to
  504. rounding it to 0GiB).
  505. """
  506. for suffix, bytes_per_unit in MEMORY_SUFFIXES:
  507. if mem_bytes >= bytes_per_unit:
  508. mem_in_unit = mem_bytes / bytes_per_unit
  509. return f"{mem_in_unit:.2f}{suffix}"
  510. return f"{int(mem_bytes)}B"
  511. def parse_usage(usage: Usage, verbose: bool) -> List[str]:
  512. # first collect resources used in placement groups
  513. placement_group_resource_usage = {}
  514. placement_group_resource_total = collections.defaultdict(float)
  515. for resource, (used, total) in usage.items():
  516. (pg_resource_name, pg_name, is_countable) = parse_placement_group_resource_str(
  517. resource
  518. )
  519. if pg_name:
  520. if pg_resource_name not in placement_group_resource_usage:
  521. placement_group_resource_usage[pg_resource_name] = 0
  522. if is_countable:
  523. placement_group_resource_usage[pg_resource_name] += used
  524. placement_group_resource_total[pg_resource_name] += total
  525. continue
  526. usage_lines = []
  527. for resource, (used, total) in sorted(usage.items()):
  528. if "node:" in resource:
  529. continue # Skip the auto-added per-node "node:<ip>" resource.
  530. (_, pg_name, _) = parse_placement_group_resource_str(resource)
  531. if pg_name:
  532. continue # Skip resource used by placement groups
  533. pg_used = 0
  534. pg_total = 0
  535. used_in_pg = resource in placement_group_resource_usage
  536. if used_in_pg:
  537. pg_used = placement_group_resource_usage[resource]
  538. pg_total = placement_group_resource_total[resource]
  539. # Used includes pg_total because when pgs are created
  540. # it allocates resources.
  541. # To get the real resource usage, we should subtract the pg
  542. # reserved resources from the usage and add pg used instead.
  543. used = used - pg_total + pg_used
  544. if resource in ["memory", "object_store_memory"]:
  545. formatted_used = format_memory(used)
  546. formatted_total = format_memory(total)
  547. line = f"{formatted_used}/{formatted_total} {resource}"
  548. if used_in_pg:
  549. formatted_pg_used = format_memory(pg_used)
  550. formatted_pg_total = format_memory(pg_total)
  551. line = line + (
  552. f" ({formatted_pg_used} used of "
  553. f"{formatted_pg_total} " + "reserved in placement groups)"
  554. )
  555. usage_lines.append(line)
  556. elif resource.startswith("accelerator_type:") and not verbose:
  557. # We made a judgement call not to show this.
  558. # https://github.com/ray-project/ray/issues/33272
  559. pass
  560. else:
  561. line = f"{used}/{total} {resource}"
  562. if used_in_pg:
  563. line += (
  564. f" ({pg_used} used of " f"{pg_total} reserved in placement groups)"
  565. )
  566. usage_lines.append(line)
  567. return usage_lines
  568. def get_usage_report(lm_summary: LoadMetricsSummary, verbose: bool) -> str:
  569. usage_lines = parse_usage(lm_summary.usage, verbose)
  570. sio = StringIO()
  571. for line in usage_lines:
  572. print(f" {line}", file=sio)
  573. return sio.getvalue()
  574. def format_resource_demand_summary(
  575. resource_demand: List[Tuple[ResourceBundle, int]]
  576. ) -> List[str]:
  577. def filter_placement_group_from_bundle(bundle: ResourceBundle):
  578. """filter placement group from bundle resource name. returns
  579. filtered bundle and a bool indicate if the bundle is using
  580. placement group.
  581. Example: {"CPU_group_groupid": 1} returns {"CPU": 1}, True
  582. {"memory": 1} return {"memory": 1}, False
  583. """
  584. using_placement_group = False
  585. result_bundle = dict()
  586. for pg_resource_str, resource_count in bundle.items():
  587. (resource_name, pg_name, _) = parse_placement_group_resource_str(
  588. pg_resource_str
  589. )
  590. result_bundle[resource_name] = resource_count
  591. if pg_name:
  592. using_placement_group = True
  593. return (result_bundle, using_placement_group)
  594. bundle_demand = collections.defaultdict(int)
  595. pg_bundle_demand = collections.defaultdict(int)
  596. for bundle, count in resource_demand:
  597. (
  598. pg_filtered_bundle,
  599. using_placement_group,
  600. ) = filter_placement_group_from_bundle(bundle)
  601. # bundle is a special keyword for placement group scheduling
  602. # but it doesn't need to be exposed to users. Remove it from
  603. # the demand report.
  604. if (
  605. using_placement_group
  606. and PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME in pg_filtered_bundle.keys()
  607. ):
  608. del pg_filtered_bundle[PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME]
  609. # No need to report empty request to demand (e.g.,
  610. # placement group ready task).
  611. if len(pg_filtered_bundle.keys()) == 0:
  612. continue
  613. bundle_demand[tuple(sorted(pg_filtered_bundle.items()))] += count
  614. if using_placement_group:
  615. pg_bundle_demand[tuple(sorted(pg_filtered_bundle.items()))] += count
  616. demand_lines = []
  617. for bundle, count in bundle_demand.items():
  618. line = f" {dict(bundle)}: {count}+ pending tasks/actors"
  619. if bundle in pg_bundle_demand:
  620. line += f" ({pg_bundle_demand[bundle]}+ using placement groups)"
  621. demand_lines.append(line)
  622. return demand_lines
  623. def get_constraint_report(request_demand: List[DictCount]):
  624. """Returns a formatted string describing the resource constraints from request_resources().
  625. Args:
  626. request_demand: List of tuples containing resource bundle dictionaries and counts
  627. from request_resources() calls.
  628. Returns:
  629. String containing the formatted constraints report, either listing each constraint
  630. and count or indicating no constraints exist.
  631. Example:
  632. >>> request_demand = [
  633. ... ({"CPU": 4}, 2),
  634. ... ({"GPU": 1}, 1)
  635. ... ]
  636. >>> get_constraint_report(request_demand)
  637. " {'CPU': 4}: 2 from request_resources()\\n {'GPU': 1}: 1 from request_resources()"
  638. """
  639. constraint_lines = []
  640. for bundle, count in request_demand:
  641. line = f" {bundle}: {count} from request_resources()"
  642. constraint_lines.append(line)
  643. if len(constraint_lines) > 0:
  644. constraints_report = "\n".join(constraint_lines)
  645. else:
  646. constraints_report = " (none)"
  647. return constraints_report
  648. def get_demand_report(lm_summary: LoadMetricsSummary):
  649. demand_lines = []
  650. if lm_summary.resource_demand:
  651. demand_lines.extend(format_resource_demand_summary(lm_summary.resource_demand))
  652. for entry in lm_summary.pg_demand:
  653. pg, count = entry
  654. pg_str = format_pg(pg)
  655. line = f" {pg_str}: {count}+ pending placement groups"
  656. demand_lines.append(line)
  657. if len(demand_lines) > 0:
  658. demand_report = "\n".join(demand_lines)
  659. else:
  660. demand_report = " (no resource demands)"
  661. return demand_report
  662. def get_per_node_breakdown_as_dict(
  663. lm_summary: LoadMetricsSummary,
  664. ) -> dict:
  665. per_node_breakdown = {}
  666. for node_id, usage in lm_summary.usage_by_node.items():
  667. usage_string = ""
  668. for line in parse_usage(usage, verbose=True):
  669. usage_string += f"{line}\n"
  670. per_node_breakdown[node_id] = usage_string.strip()
  671. return per_node_breakdown
  672. def get_per_node_breakdown(
  673. lm_summary: LoadMetricsSummary,
  674. node_type_mapping: Optional[Dict[str, float]],
  675. node_activities: Optional[Dict[str, List[str]]],
  676. verbose: bool,
  677. ) -> str:
  678. sio = StringIO()
  679. if node_type_mapping is None:
  680. node_type_mapping = {}
  681. print(file=sio)
  682. for node_id, usage in lm_summary.usage_by_node.items():
  683. print(file=sio) # Print a newline.
  684. node_string = f"Node: {node_id}"
  685. if node_id in node_type_mapping:
  686. node_type = node_type_mapping[node_id]
  687. node_string += f" ({node_type})"
  688. print(node_string, file=sio)
  689. if (
  690. lm_summary.idle_time_map
  691. and node_id in lm_summary.idle_time_map
  692. and lm_summary.idle_time_map[node_id] > 0
  693. ):
  694. print(f" Idle: {lm_summary.idle_time_map[node_id]} ms", file=sio)
  695. print(" Usage:", file=sio)
  696. for line in parse_usage(usage, verbose):
  697. print(f" {line}", file=sio)
  698. # Don't print anything if not provided.
  699. if not node_activities:
  700. continue
  701. print(" Activity:", file=sio)
  702. if node_id not in node_activities:
  703. print(" (no activity)", file=sio)
  704. else:
  705. # Note: We have node IP here.
  706. _, reasons = node_activities[node_id]
  707. for reason in reasons:
  708. print(f" {reason}", file=sio)
  709. return sio.getvalue()
  710. def format_info_string(
  711. lm_summary,
  712. autoscaler_summary,
  713. time=None,
  714. gcs_request_time: Optional[float] = None,
  715. non_terminated_nodes_time: Optional[float] = None,
  716. autoscaler_update_time: Optional[float] = None,
  717. verbose: bool = False,
  718. ):
  719. if time is None:
  720. time = datetime.now()
  721. header = "=" * 8 + f" Autoscaler status: {time} " + "=" * 8
  722. separator = "-" * len(header)
  723. if verbose:
  724. header += "\n"
  725. if gcs_request_time:
  726. header += f"GCS request time: {gcs_request_time:3f}s\n"
  727. if non_terminated_nodes_time:
  728. header += (
  729. "Node Provider non_terminated_nodes time: "
  730. f"{non_terminated_nodes_time:3f}s\n"
  731. )
  732. if autoscaler_update_time:
  733. header += "Autoscaler iteration time: " f"{autoscaler_update_time:3f}s\n"
  734. available_node_report_lines = []
  735. if not autoscaler_summary.active_nodes:
  736. available_node_report = " (no active nodes)"
  737. else:
  738. for node_type, count in autoscaler_summary.active_nodes.items():
  739. line = f" {count} {node_type}"
  740. available_node_report_lines.append(line)
  741. available_node_report = "\n".join(available_node_report_lines)
  742. if not autoscaler_summary.idle_nodes:
  743. idle_node_report = " (no idle nodes)"
  744. else:
  745. idle_node_report_lines = []
  746. for node_type, count in autoscaler_summary.idle_nodes.items():
  747. line = f" {count} {node_type}"
  748. idle_node_report_lines.append(line)
  749. idle_node_report = "\n".join(idle_node_report_lines)
  750. pending_lines = []
  751. for node_type, count in autoscaler_summary.pending_launches.items():
  752. line = f" {node_type}, {count} launching"
  753. pending_lines.append(line)
  754. for ip, node_type, status in autoscaler_summary.pending_nodes:
  755. line = f" {ip}: {node_type}, {status.lower()}"
  756. pending_lines.append(line)
  757. if pending_lines:
  758. pending_report = "\n".join(pending_lines)
  759. else:
  760. pending_report = " (no pending nodes)"
  761. failure_lines = []
  762. for ip, node_type in autoscaler_summary.failed_nodes:
  763. line = f" {node_type}: NodeTerminated (ip: {ip})"
  764. failure_lines.append(line)
  765. if autoscaler_summary.node_availability_summary:
  766. records = sorted(
  767. autoscaler_summary.node_availability_summary.node_availabilities.values(),
  768. key=lambda record: record.last_checked_timestamp,
  769. )
  770. for record in records:
  771. if record.is_available:
  772. continue
  773. assert record.unavailable_node_information is not None
  774. node_type = record.node_type
  775. category = record.unavailable_node_information.category
  776. description = record.unavailable_node_information.description
  777. attempted_time = datetime.fromtimestamp(record.last_checked_timestamp)
  778. formatted_time = (
  779. # This `:02d` funny business is python syntax for printing a 2
  780. # digit number with a leading zero as padding if needed.
  781. f"{attempted_time.hour:02d}:"
  782. f"{attempted_time.minute:02d}:"
  783. f"{attempted_time.second:02d}"
  784. )
  785. line = f" {node_type}: {category} (latest_attempt: {formatted_time})"
  786. if verbose:
  787. line += f" - {description}"
  788. failure_lines.append(line)
  789. failure_lines = failure_lines[: -constants.AUTOSCALER_MAX_FAILURES_DISPLAYED : -1]
  790. failure_report = "Recent failures:\n"
  791. if failure_lines:
  792. failure_report += "\n".join(failure_lines)
  793. else:
  794. failure_report += " (no failures)"
  795. usage_report = get_usage_report(lm_summary, verbose)
  796. constraints_report = get_constraint_report(lm_summary.request_demand)
  797. demand_report = get_demand_report(lm_summary)
  798. formatted_output = f"""{header}
  799. Node status
  800. {separator}
  801. Active:
  802. {available_node_report}"""
  803. if not autoscaler_summary.legacy:
  804. formatted_output += f"""
  805. Idle:
  806. {idle_node_report}"""
  807. formatted_output += f"""
  808. Pending:
  809. {pending_report}
  810. {failure_report}
  811. Resources
  812. {separator}
  813. Total Usage:
  814. {usage_report}
  815. From request_resources:
  816. {constraints_report}
  817. Pending Demands:
  818. {demand_report}"""
  819. if verbose:
  820. if lm_summary.usage_by_node:
  821. formatted_output += get_per_node_breakdown(
  822. lm_summary,
  823. autoscaler_summary.node_type_mapping,
  824. autoscaler_summary.node_activities,
  825. verbose,
  826. )
  827. else:
  828. formatted_output += "\n"
  829. return formatted_output.strip()
  830. def format_readonly_node_type(node_id: str):
  831. """The anonymous node type for readonly node provider nodes."""
  832. return "node_{}".format(node_id)
  833. def format_no_node_type_string(node_type: dict):
  834. placement_group_resource_usage = {}
  835. regular_resource_usage = collections.defaultdict(float)
  836. for resource, total in node_type.items():
  837. (pg_resource_name, pg_name, is_countable) = parse_placement_group_resource_str(
  838. resource
  839. )
  840. if pg_name:
  841. if not is_countable:
  842. continue
  843. if pg_resource_name not in placement_group_resource_usage:
  844. placement_group_resource_usage[pg_resource_name] = 0
  845. placement_group_resource_usage[pg_resource_name] += total
  846. else:
  847. regular_resource_usage[resource] += total
  848. output_lines = [""]
  849. for resource, total in regular_resource_usage.items():
  850. output_line = f"{resource}: {total}"
  851. if resource in placement_group_resource_usage:
  852. pg_resource = placement_group_resource_usage[resource]
  853. output_line += f" ({pg_resource} reserved in placement groups)"
  854. output_lines.append(output_line)
  855. return "\n ".join(output_lines)
  856. def generate_rsa_key_pair():
  857. from cryptography.hazmat.backends import default_backend
  858. from cryptography.hazmat.primitives import serialization
  859. from cryptography.hazmat.primitives.asymmetric import rsa
  860. key = rsa.generate_private_key(
  861. backend=default_backend(), public_exponent=65537, key_size=2048
  862. )
  863. public_key = (
  864. key.public_key()
  865. .public_bytes(
  866. serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH
  867. )
  868. .decode("utf-8")
  869. )
  870. pem = key.private_bytes(
  871. encoding=serialization.Encoding.PEM,
  872. format=serialization.PrivateFormat.TraditionalOpenSSL,
  873. encryption_algorithm=serialization.NoEncryption(),
  874. ).decode("utf-8")
  875. return public_key, pem
  876. def generate_ssh_key_paths(key_name):
  877. public_key_path = os.path.expanduser("~/.ssh/{}.pub".format(key_name))
  878. private_key_path = os.path.expanduser("~/.ssh/{}".format(key_name))
  879. return public_key_path, private_key_path
  880. def generate_ssh_key_name(provider, i, region, identifier, ssh_user):
  881. RAY_PREFIX = "ray-autoscaler"
  882. if i is not None:
  883. return "{}_{}_{}_{}_{}_{}".format(
  884. RAY_PREFIX, provider, region, identifier, ssh_user, i
  885. )
  886. else:
  887. return "{}_{}_{}_{}_{}".format(
  888. RAY_PREFIX, provider, region, identifier, ssh_user
  889. )