load_metrics.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. import logging
  2. import time
  3. from collections import Counter
  4. from functools import reduce
  5. from typing import Dict, List
  6. from ray._private.gcs_utils import PlacementGroupTableData
  7. from ray.autoscaler._private.constants import (
  8. AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE,
  9. AUTOSCALER_REPORT_PER_NODE_STATUS,
  10. )
  11. from ray.autoscaler._private.util import (
  12. DictCount,
  13. LoadMetricsSummary,
  14. NodeIP,
  15. ResourceDict,
  16. )
  17. from ray.core.generated.common_pb2 import PlacementStrategy
  18. logger = logging.getLogger(__name__)
  19. def add_resources(dict1: Dict[str, float], dict2: Dict[str, float]) -> Dict[str, float]:
  20. """Add the values in two dictionaries.
  21. Returns:
  22. dict: A new dictionary (inputs remain unmodified).
  23. """
  24. new_dict = dict1.copy()
  25. for k, v in dict2.items():
  26. new_dict[k] = v + new_dict.get(k, 0)
  27. return new_dict
  28. def freq_of_dicts(dicts: List[Dict], serializer=None, deserializer=dict) -> DictCount:
  29. """Count a list of dictionaries (or unhashable types).
  30. This is somewhat annoying because mutable data structures aren't hashable,
  31. and set/dict keys must be hashable.
  32. Args:
  33. dicts (List[D]): A list of dictionaries to be counted.
  34. serializer (D -> S): A custom serialization function. The output type S
  35. must be hashable. The default serializer converts a dictionary into
  36. a frozenset of KV pairs.
  37. deserializer (S -> U): A custom deserialization function. See the
  38. serializer for information about type S. For dictionaries U := D.
  39. Returns:
  40. List[Tuple[U, int]]: Returns a list of tuples. Each entry in the list
  41. is a tuple containing a unique entry from `dicts` and its
  42. corresponding frequency count.
  43. """
  44. if serializer is None:
  45. serializer = lambda d: frozenset(d.items()) # noqa: E731
  46. freqs = Counter(serializer(d) for d in dicts)
  47. as_list = []
  48. for as_set, count in freqs.items():
  49. as_list.append((deserializer(as_set), count))
  50. return as_list
  51. class LoadMetrics:
  52. """Container for cluster load metrics.
  53. Metrics here are updated from raylet heartbeats. The autoscaler
  54. queries these metrics to determine when to scale up, and which nodes
  55. can be removed.
  56. """
  57. def __init__(self):
  58. self.last_heartbeat_time_by_ip = {}
  59. self.static_resources_by_ip = {}
  60. self.dynamic_resources_by_ip = {}
  61. self.node_id_by_ip = {}
  62. self.waiting_bundles = []
  63. self.infeasible_bundles = []
  64. self.pending_placement_groups = []
  65. self.resource_requests = []
  66. self.ray_nodes_last_used_time_by_ip = {}
  67. def __bool__(self):
  68. """A load metrics instance is Falsey iff the autoscaler process
  69. has not received a resource message from the GCS.
  70. """
  71. return bool(self.node_id_by_ip)
  72. def update(
  73. self,
  74. ip: str,
  75. node_id: bytes,
  76. static_resources: Dict[str, Dict],
  77. dynamic_resources: Dict[str, Dict],
  78. node_idle_duration_s: float,
  79. waiting_bundles: List[Dict[str, float]] = None,
  80. infeasible_bundles: List[Dict[str, float]] = None,
  81. pending_placement_groups: List[PlacementGroupTableData] = None,
  82. ):
  83. self.static_resources_by_ip[ip] = static_resources
  84. self.node_id_by_ip[ip] = node_id
  85. if not waiting_bundles:
  86. waiting_bundles = []
  87. if not infeasible_bundles:
  88. infeasible_bundles = []
  89. if not pending_placement_groups:
  90. pending_placement_groups = []
  91. # We are not guaranteed to have a corresponding dynamic resource
  92. # for every static resource because dynamic resources are based on
  93. # the available resources in the heartbeat, which does not exist
  94. # if it is zero. Thus, we have to update dynamic resources here.
  95. dynamic_resources_update = dynamic_resources.copy()
  96. for resource_name, capacity in self.static_resources_by_ip[ip].items():
  97. if resource_name not in dynamic_resources_update:
  98. dynamic_resources_update[resource_name] = 0.0
  99. self.dynamic_resources_by_ip[ip] = dynamic_resources_update
  100. now = time.time()
  101. self.ray_nodes_last_used_time_by_ip[ip] = now - node_idle_duration_s
  102. self.last_heartbeat_time_by_ip[ip] = now
  103. self.waiting_bundles = waiting_bundles
  104. self.infeasible_bundles = infeasible_bundles
  105. self.pending_placement_groups = pending_placement_groups
  106. def mark_active(self, ip):
  107. assert ip is not None, "IP should be known at this time"
  108. logger.debug("Node {} is newly setup, treating as active".format(ip))
  109. self.last_heartbeat_time_by_ip[ip] = time.time()
  110. def prune_active_ips(self, active_ips: List[str]):
  111. """The Raylet ips stored by LoadMetrics are obtained by polling
  112. the GCS in Monitor.update_load_metrics().
  113. On the other hand, the autoscaler gets a list of node ips from
  114. its NodeProvider.
  115. This method removes from LoadMetrics the ips unknown to the autoscaler.
  116. Args:
  117. active_ips (List[str]): The node ips known to the autoscaler.
  118. """
  119. active_ips = set(active_ips)
  120. def prune(mapping, should_log):
  121. unwanted_ips = set(mapping) - active_ips
  122. for unwanted_ip in unwanted_ips:
  123. if should_log:
  124. logger.info("LoadMetrics: " f"Removed ip: {unwanted_ip}.")
  125. del mapping[unwanted_ip]
  126. if unwanted_ips and should_log:
  127. logger.info(
  128. "LoadMetrics: "
  129. "Removed {} stale ip mappings: {} not in {}".format(
  130. len(unwanted_ips), unwanted_ips, active_ips
  131. )
  132. )
  133. assert not (unwanted_ips & set(mapping))
  134. prune(self.ray_nodes_last_used_time_by_ip, should_log=True)
  135. prune(self.static_resources_by_ip, should_log=False)
  136. prune(self.node_id_by_ip, should_log=False)
  137. prune(self.dynamic_resources_by_ip, should_log=False)
  138. prune(self.last_heartbeat_time_by_ip, should_log=False)
  139. def get_node_resources(self):
  140. """Return a list of node resources (static resource sizes).
  141. Example:
  142. >>> from ray.autoscaler._private.load_metrics import LoadMetrics
  143. >>> metrics = LoadMetrics(...) # doctest: +SKIP
  144. >>> metrics.get_node_resources() # doctest: +SKIP
  145. [{"CPU": 1}, {"CPU": 4, "GPU": 8}] # for two different nodes
  146. """
  147. return self.static_resources_by_ip.values()
  148. def get_static_node_resources_by_ip(self) -> Dict[NodeIP, ResourceDict]:
  149. """Return a dict of node resources for every node ip.
  150. Example:
  151. >>> from ray.autoscaler._private.load_metrics import LoadMetrics
  152. >>> metrics = LoadMetrics(...) # doctest: +SKIP
  153. >>> metrics.get_static_node_resources_by_ip() # doctest: +SKIP
  154. {127.0.0.1: {"CPU": 1}, 127.0.0.2: {"CPU": 4, "GPU": 8}}
  155. """
  156. return self.static_resources_by_ip
  157. def get_resource_utilization(self):
  158. return self.dynamic_resources_by_ip
  159. def _get_resource_usage(self):
  160. resources_used = {}
  161. resources_total = {}
  162. for ip, max_resources in self.static_resources_by_ip.items():
  163. avail_resources = self.dynamic_resources_by_ip[ip]
  164. for resource_id, amount in max_resources.items():
  165. used = amount - avail_resources[resource_id]
  166. if resource_id not in resources_used:
  167. resources_used[resource_id] = 0.0
  168. resources_total[resource_id] = 0.0
  169. resources_used[resource_id] += used
  170. resources_total[resource_id] += amount
  171. used = max(0, used)
  172. return resources_used, resources_total
  173. def get_resource_demand_vector(self, clip=True):
  174. if clip:
  175. # Bound the total number of bundles to
  176. # 2xMAX_RESOURCE_DEMAND_VECTOR_SIZE. This guarantees the resource
  177. # demand scheduler bin packing algorithm takes a reasonable amount
  178. # of time to run.
  179. return (
  180. self.waiting_bundles[:AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE]
  181. + self.infeasible_bundles[:AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE]
  182. )
  183. else:
  184. return self.waiting_bundles + self.infeasible_bundles
  185. def get_resource_requests(self):
  186. return self.resource_requests
  187. def get_pending_placement_groups(self):
  188. return self.pending_placement_groups
  189. def resources_avail_summary(self) -> str:
  190. """Return a concise string of cluster size to report to event logs.
  191. For example, "3 CPUs, 4 GPUs".
  192. """
  193. total_resources = (
  194. reduce(add_resources, self.static_resources_by_ip.values())
  195. if self.static_resources_by_ip
  196. else {}
  197. )
  198. out = "{} CPUs".format(int(total_resources.get("CPU", 0)))
  199. if "GPU" in total_resources:
  200. out += ", {} GPUs".format(int(total_resources["GPU"]))
  201. if "TPU" in total_resources:
  202. out += ", {} TPUs".format(int(total_resources["TPU"]))
  203. return out
  204. def summary(self):
  205. available_resources = (
  206. reduce(add_resources, self.dynamic_resources_by_ip.values())
  207. if self.dynamic_resources_by_ip
  208. else {}
  209. )
  210. total_resources = (
  211. reduce(add_resources, self.static_resources_by_ip.values())
  212. if self.static_resources_by_ip
  213. else {}
  214. )
  215. usage_dict = {}
  216. for key in total_resources:
  217. if key in ["memory", "object_store_memory"]:
  218. total = total_resources[key]
  219. available = available_resources[key]
  220. usage_dict[key] = (total - available, total)
  221. else:
  222. total = total_resources[key]
  223. usage_dict[key] = (total - available_resources[key], total)
  224. summarized_demand_vector = freq_of_dicts(
  225. self.get_resource_demand_vector(clip=False)
  226. )
  227. summarized_resource_requests = freq_of_dicts(self.get_resource_requests())
  228. def placement_group_serializer(pg):
  229. bundles = tuple(
  230. frozenset(bundle.unit_resources.items()) for bundle in pg.bundles
  231. )
  232. return (bundles, pg.strategy)
  233. def placement_group_deserializer(pg_tuple):
  234. # We marshal this as a dictionary so that we can easily json.dumps
  235. # it later.
  236. # TODO (Alex): Would there be a benefit to properly
  237. # marshalling this (into a protobuf)?
  238. bundles = list(map(dict, pg_tuple[0]))
  239. return {
  240. "bundles": freq_of_dicts(bundles),
  241. "strategy": PlacementStrategy.Name(pg_tuple[1]),
  242. }
  243. summarized_placement_groups = freq_of_dicts(
  244. self.get_pending_placement_groups(),
  245. serializer=placement_group_serializer,
  246. deserializer=placement_group_deserializer,
  247. )
  248. nodes_summary = freq_of_dicts(self.static_resources_by_ip.values())
  249. usage_by_node = None
  250. if AUTOSCALER_REPORT_PER_NODE_STATUS:
  251. usage_by_node = {}
  252. for ip, totals in self.static_resources_by_ip.items():
  253. available = self.dynamic_resources_by_ip.get(ip, {})
  254. usage_by_node[ip] = {}
  255. for resource, total in totals.items():
  256. usage_by_node[ip][resource] = (
  257. total - available.get(resource, 0),
  258. total,
  259. )
  260. return LoadMetricsSummary(
  261. usage=usage_dict,
  262. resource_demand=summarized_demand_vector,
  263. pg_demand=summarized_placement_groups,
  264. request_demand=summarized_resource_requests,
  265. node_types=nodes_summary,
  266. usage_by_node=usage_by_node,
  267. )
  268. def set_resource_requests(self, requested_resources):
  269. if requested_resources is not None:
  270. assert isinstance(requested_resources, list), requested_resources
  271. self.resource_requests = [
  272. request for request in requested_resources if len(request) > 0
  273. ]
  274. def info_string(self):
  275. return " - " + "\n - ".join(
  276. ["{}: {}".format(k, v) for k, v in sorted(self._info().items())]
  277. )
  278. def _info(self):
  279. resources_used, resources_total = self._get_resource_usage()
  280. now = time.time()
  281. idle_times = [now - t for t in self.ray_nodes_last_used_time_by_ip.values()]
  282. heartbeat_times = [now - t for t in self.last_heartbeat_time_by_ip.values()]
  283. most_delayed_heartbeats = sorted(
  284. self.last_heartbeat_time_by_ip.items(), key=lambda pair: pair[1]
  285. )[:5]
  286. most_delayed_heartbeats = {ip: (now - t) for ip, t in most_delayed_heartbeats}
  287. def format_resource(key, value):
  288. if key in ["object_store_memory", "memory"]:
  289. return "{} GiB".format(round(value / (1024 * 1024 * 1024), 2))
  290. else:
  291. return round(value, 2)
  292. return {
  293. "ResourceUsage": ", ".join(
  294. [
  295. "{}/{} {}".format(
  296. format_resource(rid, resources_used[rid]),
  297. format_resource(rid, resources_total[rid]),
  298. rid,
  299. )
  300. for rid in sorted(resources_used)
  301. if not rid.startswith("node:")
  302. ]
  303. ),
  304. "NodeIdleSeconds": "Min={} Mean={} Max={}".format(
  305. int(min(idle_times)) if idle_times else -1,
  306. int(float(sum(idle_times)) / len(idle_times)) if idle_times else -1,
  307. int(max(idle_times)) if idle_times else -1,
  308. ),
  309. "TimeSinceLastHeartbeat": "Min={} Mean={} Max={}".format(
  310. int(min(heartbeat_times)) if heartbeat_times else -1,
  311. int(float(sum(heartbeat_times)) / len(heartbeat_times))
  312. if heartbeat_times
  313. else -1,
  314. int(max(heartbeat_times)) if heartbeat_times else -1,
  315. ),
  316. "MostDelayedHeartbeats": most_delayed_heartbeats,
  317. }