utils.py 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090
  1. from collections import Counter, defaultdict
  2. from copy import deepcopy
  3. from datetime import datetime
  4. from enum import Enum
  5. from itertools import chain
  6. from typing import Any, Dict, List, Optional, Tuple
  7. import ray
  8. from ray._common.utils import binary_to_hex
  9. from ray._raylet import GcsClient
  10. from ray.autoscaler._private import constants
  11. from ray.autoscaler._private.util import (
  12. format_pg,
  13. format_resource_demand_summary,
  14. parse_usage,
  15. )
  16. from ray.autoscaler.v2.schema import (
  17. NODE_DEATH_CAUSE_RAYLET_DIED,
  18. ClusterConstraintDemand,
  19. ClusterStatus,
  20. LaunchRequest,
  21. NodeInfo,
  22. NodeUsage,
  23. PlacementGroupResourceDemand,
  24. RayTaskActorDemand,
  25. ResourceDemand,
  26. ResourceDemandSummary,
  27. ResourceRequestByCount,
  28. ResourceUsage,
  29. Stats,
  30. )
  31. from ray.core.generated.autoscaler_pb2 import (
  32. AffinityConstraint,
  33. AntiAffinityConstraint,
  34. AutoscalingState,
  35. ClusterResourceState,
  36. GetClusterStatusReply,
  37. NodeState,
  38. NodeStatus,
  39. PlacementConstraint,
  40. ResourceRequest,
  41. ResourceRequestByCount as ResourceRequestByCountProto,
  42. )
  43. from ray.core.generated.common_pb2 import (
  44. LabelSelector,
  45. LabelSelectorConstraint,
  46. )
  47. from ray.experimental.internal_kv import internal_kv_get_gcs_client
  48. def _count_by(data: Any, key: str) -> Dict[str, int]:
  49. """
  50. Count the number of items by the given keys.
  51. Args:
  52. data: the data to be counted
  53. keys: the keys to count by
  54. Returns:
  55. counts: the counts
  56. """
  57. counts = defaultdict(int)
  58. for item in data:
  59. key_name = getattr(item, key)
  60. counts[key_name] += 1
  61. return counts
  62. class ProtobufUtil:
  63. """
  64. A utility class for protobuf objects.
  65. """
  66. @staticmethod
  67. def to_dict(proto):
  68. """
  69. Convert a protobuf object to a dict.
  70. This is a slow conversion, and should only be used for debugging or
  71. latency insensitve code.
  72. Args:
  73. proto: the protobuf object
  74. Returns:
  75. dict: the dict
  76. """
  77. from ray._private.protobuf_compat import message_to_dict
  78. return message_to_dict(
  79. proto,
  80. preserving_proto_field_name=True,
  81. always_print_fields_with_no_presence=True,
  82. )
  83. @staticmethod
  84. def to_dict_list(protos):
  85. """
  86. Convert a list of protobuf objects to a list of dicts.
  87. Args:
  88. protos: the list of protobuf objects
  89. Returns:
  90. dict_list: the list of dicts
  91. """
  92. return [ProtobufUtil.to_dict(proto) for proto in protos]
  93. class ResourceRequestUtil(ProtobufUtil):
  94. """
  95. A utility class for resource requests, autoscaler.proto.ResourceRequest
  96. """
  97. class PlacementConstraintType(Enum):
  98. """
  99. The affinity type for the resource request.
  100. """
  101. ANTI_AFFINITY = "ANTI_AFFINITY"
  102. AFFINITY = "AFFINITY"
  103. @staticmethod
  104. def group_by_count(
  105. requests: List[ResourceRequest],
  106. ) -> List[ResourceRequestByCountProto]:
  107. """
  108. Aggregate resource requests by shape.
  109. Args:
  110. requests: the list of resource requests
  111. Returns:
  112. resource_requests_by_count: the aggregated resource requests by count
  113. """
  114. resource_requests_by_count = defaultdict(int)
  115. for request in requests:
  116. serialized_request = request.SerializeToString()
  117. resource_requests_by_count[serialized_request] += 1
  118. results = []
  119. for serialized_request, count in resource_requests_by_count.items():
  120. request = ResourceRequest()
  121. request.ParseFromString(serialized_request)
  122. results.append(ResourceRequestByCountProto(request=request, count=count))
  123. return results
  124. @staticmethod
  125. def ungroup_by_count(
  126. requests_by_count: List[ResourceRequestByCountProto],
  127. ) -> List[ResourceRequest]:
  128. """
  129. Flatten the resource requests by count to resource requests.
  130. Args:
  131. requests_by_count: the resource requests by count
  132. Returns:
  133. requests: the flattened resource requests
  134. """
  135. reqs = []
  136. for r in requests_by_count:
  137. reqs += [r.request] * r.count
  138. return reqs
  139. @staticmethod
  140. def to_resource_map(
  141. request: ResourceRequest,
  142. ) -> Dict[str, float]:
  143. """
  144. Convert the resource request by count to resource map.
  145. Args:
  146. request: the resource request
  147. Returns:
  148. resource_map: the resource map
  149. """
  150. resource_map = defaultdict(float)
  151. for k, v in request.resources_bundle.items():
  152. resource_map[k] += v
  153. return dict(resource_map)
  154. @staticmethod
  155. def to_resource_maps(
  156. requests: List[ResourceRequest],
  157. ) -> List[Dict[str, float]]:
  158. """
  159. Convert the resource requests by count to resource map.
  160. Args:
  161. requests: the resource requests
  162. Returns:
  163. resource_maps: list of resource map
  164. """
  165. return [ResourceRequestUtil.to_resource_map(r) for r in requests]
  166. @staticmethod
  167. def make(
  168. resources_map: Dict[str, float],
  169. constraints: Optional[List[Tuple[PlacementConstraintType, str, str]]] = None,
  170. label_selectors: Optional[List[List[Tuple[str, int, List[str]]]]] = None,
  171. ) -> ResourceRequest:
  172. """
  173. Make a resource request from the given resources map.
  174. Args:
  175. resources_map: Mapping of resource names to quantities.
  176. constraints: Placement constraints. Each tuple is (constraint_type,
  177. label_key, label_value), where `constraint_type` is a
  178. PlacementConstraintType (AFFINITY or ANTI_AFFINITY).
  179. label_selectors: Optional list of label selectors. Each selector is
  180. a list of (label_key, operator_enum, label_values) tuples.
  181. Returns:
  182. request: the ResourceRequest object
  183. """
  184. request = ResourceRequest()
  185. for resource_name, quantity in resources_map.items():
  186. request.resources_bundle[resource_name] = quantity
  187. if constraints is not None:
  188. for constraint_type, label, value in constraints:
  189. if (
  190. constraint_type
  191. == ResourceRequestUtil.PlacementConstraintType.AFFINITY
  192. ):
  193. request.placement_constraints.append(
  194. PlacementConstraint(
  195. affinity=AffinityConstraint(
  196. label_name=label, label_value=value
  197. )
  198. )
  199. )
  200. elif (
  201. constraint_type
  202. == ResourceRequestUtil.PlacementConstraintType.ANTI_AFFINITY
  203. ):
  204. request.placement_constraints.append(
  205. PlacementConstraint(
  206. anti_affinity=AntiAffinityConstraint(
  207. label_name=label, label_value=value
  208. )
  209. )
  210. )
  211. else:
  212. raise ValueError(f"Unknown constraint type: {constraint_type}")
  213. if label_selectors is not None:
  214. for selector in label_selectors:
  215. selector_proto = LabelSelector()
  216. for label_key, operator_enum, label_values in selector:
  217. selector_proto.label_constraints.append(
  218. LabelSelectorConstraint(
  219. label_key=label_key,
  220. operator=operator_enum,
  221. label_values=label_values,
  222. )
  223. )
  224. request.label_selectors.append(selector_proto)
  225. return request
  226. @staticmethod
  227. def combine_requests_with_affinity(
  228. resource_requests: List[ResourceRequest],
  229. ) -> List[ResourceRequest]:
  230. """
  231. Combine the resource requests with affinity constraints
  232. into the same request. This is so that requests with affinity
  233. constraints could be considered and placed together.
  234. It merges the resource requests with the same affinity constraints
  235. into one request, and dedup the placement constraints.
  236. This assumes following:
  237. 1. There's only at most 1 placement constraint, either an affinity
  238. constraint OR an anti-affinity constraint.
  239. Args:
  240. resource_requests: The list of resource requests to be combined.
  241. Returns:
  242. A list of combined resource requests.
  243. """
  244. # Map of set of serialized affinity constraint to the list of resource requests
  245. requests_by_affinity: Dict[
  246. Tuple[str, str, Tuple], List[ResourceRequest]
  247. ] = defaultdict(list)
  248. combined_requests: List[ResourceRequest] = []
  249. for request in resource_requests:
  250. assert len(request.placement_constraints) <= 1, (
  251. "There should be at most 1 placement constraint, "
  252. "either an affinity constraint OR an anti-affinity constraint."
  253. )
  254. if len(request.placement_constraints) == 0:
  255. # No affinity constraints, just add to the combined requests.
  256. combined_requests.append(request)
  257. continue
  258. constraint = request.placement_constraints[0]
  259. if constraint.HasField("affinity"):
  260. # Combine requests with affinity and label selectors.
  261. affinity = constraint.affinity
  262. key = (
  263. affinity.label_name,
  264. affinity.label_value,
  265. ResourceRequestUtil._label_selector_key(request.label_selectors),
  266. )
  267. requests_by_affinity[key].append(request)
  268. elif constraint.HasField("anti_affinity"):
  269. # We don't need to combine requests with anti-affinity constraints.
  270. combined_requests.append(request)
  271. for (
  272. affinity_label_name,
  273. affinity_label_value,
  274. label_selector_key,
  275. ), requests in requests_by_affinity.items():
  276. combined_request = ResourceRequest()
  277. # Merge the resource bundles with the same affinity constraint.
  278. for request in requests:
  279. for k, v in request.resources_bundle.items():
  280. combined_request.resources_bundle[k] = (
  281. combined_request.resources_bundle.get(k, 0) + v
  282. )
  283. # Add the placement constraint to the combined request.
  284. affinity_constraint = AffinityConstraint(
  285. label_name=affinity_label_name, label_value=affinity_label_value
  286. )
  287. combined_request.placement_constraints.append(
  288. PlacementConstraint(affinity=affinity_constraint)
  289. )
  290. combined_request.label_selectors.extend(requests[0].label_selectors)
  291. combined_requests.append(combined_request)
  292. return combined_requests
  293. def _label_selector_key(
  294. label_selectors: List[LabelSelector],
  295. ) -> Tuple:
  296. """
  297. Convert label selectors into a hashable form for grouping.
  298. This is used for gang requests with identical label_selectors.
  299. """
  300. result = []
  301. for selector in label_selectors:
  302. constraints = []
  303. for constraint in selector.label_constraints:
  304. constraints.append(
  305. (
  306. constraint.label_key,
  307. constraint.operator,
  308. tuple(sorted(constraint.label_values)),
  309. )
  310. )
  311. result.append(tuple(constraints))
  312. return tuple(result)
  313. class ClusterStatusFormatter:
  314. """
  315. A formatter to format the ClusterStatus into a string.
  316. """
  317. @classmethod
  318. def format(cls, data: ClusterStatus, verbose: bool = False) -> str:
  319. header, separator_len = cls._header_info(data, verbose)
  320. separator = "-" * separator_len
  321. # Parse ClusterStatus information to reportable format
  322. available_node_report = cls._available_node_report(data)
  323. idle_node_report = cls._idle_node_report(data)
  324. pending_report = cls._pending_node_report(data)
  325. failure_report = cls._failed_node_report(data, verbose)
  326. cluster_usage_report = cls._cluster_usage_report(data, verbose)
  327. constraints_report = cls._constraint_report(
  328. data.resource_demands.cluster_constraint_demand
  329. )
  330. demand_report = cls._demand_report(data)
  331. node_usage_report = (
  332. ""
  333. if not verbose
  334. else cls._node_usage_report(data.active_nodes, data.idle_nodes)
  335. )
  336. # Format Cluster Status reports into one output
  337. formatted_output_lines = [
  338. header,
  339. "Node status",
  340. separator,
  341. "Active:",
  342. available_node_report,
  343. "Idle:",
  344. idle_node_report,
  345. "Pending:",
  346. pending_report,
  347. failure_report,
  348. "",
  349. "Resources",
  350. separator,
  351. "Total Usage:",
  352. cluster_usage_report,
  353. "From request_resources:",
  354. constraints_report,
  355. "Pending Demands:",
  356. demand_report,
  357. node_usage_report,
  358. ]
  359. formatted_output = "\n".join(formatted_output_lines)
  360. return formatted_output.strip()
  361. @staticmethod
  362. def _node_usage_report(
  363. active_nodes: List[NodeInfo], idle_nodes: List[NodeInfo]
  364. ) -> str:
  365. """[Example]:
  366. Node: raycluster-autoscaler-small-group-worker-n8hrw (small-group)
  367. Id: cc22041297e5fc153b5357e41f184c8000869e8de97252cc0291fd17
  368. Usage:
  369. 1.0/1.0 CPU
  370. 0B/953.67MiB memory
  371. 0B/251.76MiB object_store_memory
  372. Activity:
  373. Resource: CPU currently in use.
  374. Busy workers on node.
  375. """
  376. node_id_to_usage: Dict[str, Dict[str, Tuple[float, float]]] = {}
  377. node_id_to_type: Dict[str, str] = {}
  378. node_id_to_idle_time: Dict[str, int] = {}
  379. node_id_to_instance_id: Dict[str, str] = {}
  380. node_id_to_activities: Dict[str, List[str]] = {}
  381. # Populate mappings for node types, idle times, instance ids, and activities
  382. for node in chain(active_nodes, idle_nodes):
  383. node_id_to_usage[node.node_id] = {
  384. u.resource_name: (u.used, u.total) for u in node.resource_usage.usage
  385. }
  386. node_id_to_type[node.node_id] = node.ray_node_type_name
  387. node_id_to_idle_time[node.node_id] = node.resource_usage.idle_time_ms
  388. node_id_to_instance_id[node.node_id] = node.instance_id
  389. node_id_to_activities[node.node_id] = node.node_activity
  390. node_usage_report_lines = []
  391. for node_id, usage in node_id_to_usage.items():
  392. node_usage_report_lines.append("") # Add a blank line between nodes
  393. node_type_line = f"Node: {node_id_to_instance_id[node_id]}"
  394. if node_id in node_id_to_type:
  395. node_type = node_id_to_type[node_id]
  396. node_type_line += f" ({node_type})"
  397. node_usage_report_lines.append(node_type_line)
  398. node_usage_report_lines.append(f" Id: {node_id}")
  399. if node_id_to_idle_time.get(node_id, 0) > 0:
  400. node_usage_report_lines.append(
  401. f" Idle: {node_id_to_idle_time[node_id]} ms"
  402. )
  403. node_usage_report_lines.append(" Usage:")
  404. for line in parse_usage(usage, verbose=True):
  405. node_usage_report_lines.append(f" {line}")
  406. activities = node_id_to_activities.get(node_id, [])
  407. node_usage_report_lines.append(" Activity:")
  408. if activities is None or len(activities) == 0:
  409. node_usage_report_lines.append(" (no activity)")
  410. else:
  411. for activity in activities:
  412. node_usage_report_lines.append(f" {activity}")
  413. # Join the list into a single string with new lines
  414. return "\n".join(node_usage_report_lines)
  415. @staticmethod
  416. def _header_info(data: ClusterStatus, verbose: bool) -> (str, int):
  417. # Get the request timestamp or default to the current time
  418. time = (
  419. datetime.fromtimestamp(data.stats.request_ts_s)
  420. if data.stats.request_ts_s
  421. else datetime.now()
  422. )
  423. # Gather the time statistics
  424. gcs_request_time = data.stats.gcs_request_time_s
  425. non_terminated_nodes_time = data.stats.none_terminated_node_request_time_s
  426. autoscaler_update_time = data.stats.autoscaler_iteration_time_s
  427. # Create the header with autoscaler status
  428. header = "=" * 8 + f" Autoscaler status: {time} " + "=" * 8
  429. separator_len = len(header)
  430. # Add verbose details if required
  431. if verbose:
  432. details = []
  433. if gcs_request_time:
  434. details.append(f"GCS request time: {gcs_request_time:3f}s")
  435. if non_terminated_nodes_time:
  436. details.append(
  437. f"Node Provider non_terminated_nodes time: {non_terminated_nodes_time:3f}s"
  438. )
  439. if autoscaler_update_time:
  440. details.append(
  441. f"Autoscaler iteration time: {autoscaler_update_time:3f}s"
  442. )
  443. if details:
  444. header += "\n" + "\n".join(details) + "\n"
  445. return header, separator_len
  446. @staticmethod
  447. def _available_node_report(data: ClusterStatus) -> str:
  448. active_nodes = _count_by(data.active_nodes, "ray_node_type_name")
  449. # Build the available node report
  450. if not active_nodes:
  451. return " (no active nodes)"
  452. return "\n".join(
  453. f" {count} {node_type}" for node_type, count in active_nodes.items()
  454. )
  455. @staticmethod
  456. def _idle_node_report(data: ClusterStatus) -> str:
  457. idle_nodes = _count_by(data.idle_nodes, "ray_node_type_name")
  458. # Build the idle node report
  459. if not idle_nodes:
  460. return " (no idle nodes)"
  461. return "\n".join(
  462. f" {count} {node_type}" for node_type, count in idle_nodes.items()
  463. )
  464. @staticmethod
  465. def _failed_node_report(data: ClusterStatus, verbose: bool) -> str:
  466. failure_lines = []
  467. # Process failed launches
  468. if data.failed_launches:
  469. sorted_failed_launches = sorted(
  470. data.failed_launches,
  471. key=lambda launch: launch.request_ts_s,
  472. reverse=True,
  473. )
  474. for failed_launch in sorted_failed_launches:
  475. node_type = failed_launch.ray_node_type_name
  476. category = "LaunchFailed"
  477. description = failed_launch.details
  478. attempted_time = datetime.fromtimestamp(failed_launch.request_ts_s)
  479. formatted_time = f"{attempted_time.hour:02d}:{attempted_time.minute:02d}:{attempted_time.second:02d}"
  480. line = f" {node_type}: {category} (latest_attempt: {formatted_time})"
  481. if verbose:
  482. line += f" - {description}"
  483. failure_lines.append(line)
  484. # Process failed nodes
  485. for node in data.failed_nodes:
  486. failure_lines.append(
  487. f" {node.ray_node_type_name}: NodeTerminated (instance_id: {node.instance_id})"
  488. )
  489. # Limit the number of failures displayed
  490. failure_lines = failure_lines[: constants.AUTOSCALER_MAX_FAILURES_DISPLAYED]
  491. # Build the failure report
  492. failure_report = "Recent failures:\n"
  493. failure_report += (
  494. "\n".join(failure_lines) if failure_lines else " (no failures)"
  495. )
  496. return failure_report
  497. @staticmethod
  498. def _pending_node_report(data: ClusterStatus) -> str:
  499. # Prepare pending launch lines
  500. pending_lines = [
  501. f" {node_type}, {count} launching"
  502. for node_type, count in _count_by(
  503. data.pending_launches, "ray_node_type_name"
  504. ).items()
  505. ]
  506. # Prepare pending node lines
  507. pending_lines.extend(
  508. f" {ip}: {node_type}, {status.lower()}"
  509. for ip, node_type, status in (
  510. (node.instance_id, node.ray_node_type_name, node.details)
  511. for node in data.pending_nodes
  512. )
  513. )
  514. # Construct the pending report
  515. if pending_lines:
  516. return "\n".join(pending_lines)
  517. return " (no pending nodes)"
  518. @staticmethod
  519. def _constraint_report(
  520. cluster_constraint_demand: List[ClusterConstraintDemand],
  521. ) -> str:
  522. """Returns a formatted string describing the resource constraints from request_resources().
  523. Args:
  524. data: ClusterStatus object containing resource demand information.
  525. Returns:
  526. String containing the formatted constraints report, either listing each constraint
  527. and count or indicating no constraints exist.
  528. Example:
  529. >>> cluster_constraint_demand = [
  530. ... ClusterConstraintDemand(bundles_by_count=[
  531. ... ResourceRequestByCount(bundle={"CPU": 4}, count=2),
  532. ... ResourceRequestByCount(bundle={"GPU": 1}, count=1)
  533. ... ])
  534. ... ]
  535. >>> ClusterStatusFormatter._constraint_report(cluster_constraint_demand)
  536. " {'CPU': 4}: 2 from request_resources()\\n {'GPU': 1}: 1 from request_resources()"
  537. """
  538. constraint_lines = []
  539. request_demand = [
  540. (bc.bundle, bc.count)
  541. for constraint_demand in cluster_constraint_demand
  542. for bc in constraint_demand.bundles_by_count
  543. ]
  544. for bundle, count in request_demand:
  545. constraint_lines.append(f" {bundle}: {count} from request_resources()")
  546. if constraint_lines:
  547. return "\n".join(constraint_lines)
  548. return " (none)"
  549. @staticmethod
  550. def _demand_report(data: ClusterStatus) -> str:
  551. # Process resource demands
  552. resource_demands = [
  553. (bundle.bundle, bundle.count)
  554. for demand in data.resource_demands.ray_task_actor_demand
  555. for bundle in demand.bundles_by_count
  556. ]
  557. demand_lines = []
  558. if resource_demands:
  559. demand_lines.extend(format_resource_demand_summary(resource_demands))
  560. # Process placement group demands
  561. pg_demand_strs = [
  562. f"{pg_demand.strategy}|{pg_demand.state}"
  563. for pg_demand in data.resource_demands.placement_group_demand
  564. ]
  565. pg_demand_str_to_demand = {
  566. f"{pg_demand.strategy}|{pg_demand.state}": pg_demand
  567. for pg_demand in data.resource_demands.placement_group_demand
  568. }
  569. pg_freqs = Counter(pg_demand_strs)
  570. pg_demand = [
  571. (
  572. {
  573. "strategy": pg_demand_str_to_demand[pg_str].strategy,
  574. "bundles": [
  575. (bundle.bundle, bundle.count)
  576. for bundle in pg_demand_str_to_demand[pg_str].bundles_by_count
  577. ],
  578. },
  579. freq,
  580. )
  581. for pg_str, freq in pg_freqs.items()
  582. ]
  583. for pg, count in pg_demand:
  584. pg_str = format_pg(pg)
  585. demand_lines.append(f" {pg_str}: {count}+ pending placement groups")
  586. # Generate demand report
  587. if demand_lines:
  588. return "\n".join(demand_lines)
  589. return " (no resource demands)"
  590. @staticmethod
  591. def _cluster_usage_report(data: ClusterStatus, verbose: bool) -> str:
  592. # Build usage dictionary
  593. usage = {
  594. u.resource_name: (u.used, u.total) for u in data.cluster_resource_usage
  595. }
  596. # Parse usage lines
  597. usage_lines = parse_usage(usage, verbose)
  598. # Generate usage report
  599. usage_report = [f" {line}" for line in usage_lines] + [""]
  600. return "\n".join(usage_report)
  601. class ClusterStatusParser:
  602. @classmethod
  603. def from_get_cluster_status_reply(
  604. cls, proto: GetClusterStatusReply, stats: Stats
  605. ) -> ClusterStatus:
  606. # parse nodes info
  607. active_nodes, idle_nodes, failed_nodes = cls._parse_nodes(
  608. proto.cluster_resource_state
  609. )
  610. # parse pending nodes info
  611. pending_nodes = cls._parse_pending(proto.autoscaling_state)
  612. # parse launch requests
  613. pending_launches, failed_launches = cls._parse_launch_requests(
  614. proto.autoscaling_state
  615. )
  616. # parse cluster resource usage
  617. cluster_resource_usage = cls._parse_cluster_resource_usage(
  618. proto.cluster_resource_state
  619. )
  620. # parse resource demands
  621. resource_demands = cls._parse_resource_demands(proto.cluster_resource_state)
  622. # parse stats
  623. stats = cls._parse_stats(proto, stats)
  624. return ClusterStatus(
  625. active_nodes=active_nodes,
  626. idle_nodes=idle_nodes,
  627. pending_launches=pending_launches,
  628. failed_launches=failed_launches,
  629. pending_nodes=pending_nodes,
  630. failed_nodes=failed_nodes,
  631. cluster_resource_usage=cluster_resource_usage,
  632. resource_demands=resource_demands,
  633. stats=stats,
  634. )
  635. @classmethod
  636. def _parse_stats(cls, reply: GetClusterStatusReply, stats: Stats) -> Stats:
  637. """
  638. Parse the stats from the get cluster status reply.
  639. Args:
  640. reply: the get cluster status reply
  641. stats: the stats
  642. Returns:
  643. stats: the parsed stats
  644. """
  645. stats = deepcopy(stats)
  646. stats.gcs_request_time_s = stats.gcs_request_time_s
  647. # TODO(rickyx): Populate other autoscaler stats once available.
  648. stats.autoscaler_version = str(reply.autoscaling_state.autoscaler_state_version)
  649. stats.cluster_resource_state_version = str(
  650. reply.cluster_resource_state.cluster_resource_state_version
  651. )
  652. return stats
  653. @classmethod
  654. def _parse_resource_demands(
  655. cls, state: ClusterResourceState
  656. ) -> List[ResourceDemand]:
  657. """
  658. Parse the resource demands from the cluster resource state.
  659. Args:
  660. state: the cluster resource state
  661. Returns:
  662. resource_demands: the resource demands
  663. """
  664. task_actor_demand = []
  665. pg_demand = []
  666. constraint_demand = []
  667. for request_count in state.pending_resource_requests:
  668. # TODO(rickyx): constraints?
  669. demand = RayTaskActorDemand(
  670. bundles_by_count=[
  671. ResourceRequestByCount(
  672. request_count.request.resources_bundle, request_count.count
  673. )
  674. ],
  675. )
  676. task_actor_demand.append(demand)
  677. for gang_request in state.pending_gang_resource_requests:
  678. demand = PlacementGroupResourceDemand(
  679. bundles_by_count=cls._aggregate_resource_requests_by_shape(
  680. gang_request.requests
  681. ),
  682. details=gang_request.details,
  683. )
  684. pg_demand.append(demand)
  685. for constraint_request in state.cluster_resource_constraints:
  686. demand = ClusterConstraintDemand(
  687. bundles_by_count=[
  688. ResourceRequestByCount(
  689. bundle=dict(r.request.resources_bundle.items()), count=r.count
  690. )
  691. for r in constraint_request.resource_requests
  692. ]
  693. )
  694. constraint_demand.append(demand)
  695. return ResourceDemandSummary(
  696. ray_task_actor_demand=task_actor_demand,
  697. placement_group_demand=pg_demand,
  698. cluster_constraint_demand=constraint_demand,
  699. )
  700. @classmethod
  701. def _aggregate_resource_requests_by_shape(
  702. cls,
  703. requests: List[ResourceRequest],
  704. ) -> List[ResourceRequestByCount]:
  705. """
  706. Aggregate resource requests by shape.
  707. Args:
  708. requests: the list of resource requests
  709. Returns:
  710. resource_requests_by_count: the aggregated resource requests by count
  711. """
  712. resource_requests_by_count = defaultdict(int)
  713. for request in requests:
  714. bundle = frozenset(request.resources_bundle.items())
  715. resource_requests_by_count[bundle] += 1
  716. return [
  717. ResourceRequestByCount(dict(bundle), count)
  718. for bundle, count in resource_requests_by_count.items()
  719. ]
  720. @classmethod
  721. def _parse_node_resource_usage(
  722. cls, node_state: NodeState, usage: Dict[str, ResourceUsage]
  723. ) -> Dict[str, ResourceUsage]:
  724. """
  725. Parse the node resource usage from the node state.
  726. Args:
  727. node_state: the node state
  728. usage: the usage dict to be updated. This is a dict of
  729. {resource_name: ResourceUsage}
  730. Returns:
  731. usage: the updated usage dict
  732. """
  733. # Tuple of {resource_name : (used, total)}
  734. d = defaultdict(lambda: [0.0, 0.0])
  735. for resource_name, resource_total in node_state.total_resources.items():
  736. d[resource_name][1] += resource_total
  737. # Will be subtracted from available later.
  738. d[resource_name][0] += resource_total
  739. for (
  740. resource_name,
  741. resource_available,
  742. ) in node_state.available_resources.items():
  743. d[resource_name][0] -= resource_available
  744. # Merge with the passed in usage.
  745. for k, (used, total) in d.items():
  746. usage[k].resource_name = k
  747. usage[k].used += used
  748. usage[k].total += total
  749. return usage
  750. @classmethod
  751. def _parse_cluster_resource_usage(
  752. cls,
  753. state: ClusterResourceState,
  754. ) -> List[ResourceUsage]:
  755. """
  756. Parse the cluster resource usage from the cluster resource state.
  757. Args:
  758. state: the cluster resource state
  759. Returns:
  760. cluster_resource_usage: the cluster resource usage
  761. """
  762. cluster_resource_usage = defaultdict(ResourceUsage)
  763. for node_state in state.node_states:
  764. if node_state.status != NodeStatus.DEAD:
  765. cluster_resource_usage = cls._parse_node_resource_usage(
  766. node_state, cluster_resource_usage
  767. )
  768. return list(cluster_resource_usage.values())
  769. @classmethod
  770. def _parse_nodes(
  771. cls,
  772. state: ClusterResourceState,
  773. ) -> Tuple[List[NodeInfo], List[NodeInfo]]:
  774. """
  775. Parse the node info from the cluster resource state.
  776. Args:
  777. state: the cluster resource state
  778. Returns:
  779. active_nodes: the list of non-idle nodes
  780. idle_nodes: the list of idle nodes
  781. dead_nodes: the list of dead nodes
  782. """
  783. active_nodes = []
  784. dead_nodes = []
  785. idle_nodes = []
  786. for node_state in state.node_states:
  787. # Basic node info.
  788. node_id = binary_to_hex(node_state.node_id)
  789. if len(node_state.ray_node_type_name) == 0:
  790. # We don't have a node type name, but this is needed for showing
  791. # healthy nodes. This happens when we don't use cluster launcher.
  792. # but start ray manually. We will use node id as node type name.
  793. ray_node_type_name = f"node_{node_id}"
  794. else:
  795. ray_node_type_name = node_state.ray_node_type_name
  796. # Parse the resource usage if it's not dead
  797. node_resource_usage = None
  798. failure_detail = None
  799. if node_state.status == NodeStatus.DEAD:
  800. # TODO(rickyx): Technically we could get a more verbose
  801. # failure detail from GCS, but existing ray status treats
  802. # all ray failures as raylet death.
  803. failure_detail = NODE_DEATH_CAUSE_RAYLET_DIED
  804. else:
  805. usage = defaultdict(ResourceUsage)
  806. usage = cls._parse_node_resource_usage(node_state, usage)
  807. node_resource_usage = NodeUsage(
  808. usage=list(usage.values()),
  809. idle_time_ms=node_state.idle_duration_ms
  810. if node_state.status == NodeStatus.IDLE
  811. else 0,
  812. )
  813. node_info = NodeInfo(
  814. instance_type_name=node_state.instance_type_name,
  815. node_status=NodeStatus.Name(node_state.status),
  816. node_id=binary_to_hex(node_state.node_id),
  817. ip_address=node_state.node_ip_address,
  818. ray_node_type_name=ray_node_type_name,
  819. instance_id=node_state.instance_id,
  820. resource_usage=node_resource_usage,
  821. failure_detail=failure_detail,
  822. node_activity=node_state.node_activity,
  823. labels=dict(node_state.labels),
  824. )
  825. if node_state.status == NodeStatus.DEAD:
  826. dead_nodes.append(node_info)
  827. elif node_state.status == NodeStatus.IDLE:
  828. idle_nodes.append(node_info)
  829. else:
  830. active_nodes.append(node_info)
  831. return active_nodes, idle_nodes, dead_nodes
  832. @classmethod
  833. def _parse_launch_requests(
  834. cls, state: AutoscalingState
  835. ) -> Tuple[List[LaunchRequest], List[LaunchRequest]]:
  836. """
  837. Parse the launch requests from the autoscaling state.
  838. Args:
  839. state: the autoscaling state, empty if there's no autoscaling state
  840. being reported.
  841. Returns:
  842. pending_launches: the list of pending launches
  843. failed_launches: the list of failed launches
  844. """
  845. pending_launches = []
  846. for pending_request in state.pending_instance_requests:
  847. launch = LaunchRequest(
  848. instance_type_name=pending_request.instance_type_name,
  849. ray_node_type_name=pending_request.ray_node_type_name,
  850. count=pending_request.count,
  851. state=LaunchRequest.Status.PENDING,
  852. request_ts_s=pending_request.request_ts,
  853. )
  854. pending_launches.append(launch)
  855. failed_launches = []
  856. for failed_request in state.failed_instance_requests:
  857. launch = LaunchRequest(
  858. instance_type_name=failed_request.instance_type_name,
  859. ray_node_type_name=failed_request.ray_node_type_name,
  860. count=failed_request.count,
  861. state=LaunchRequest.Status.FAILED,
  862. request_ts_s=failed_request.start_ts,
  863. details=failed_request.reason,
  864. failed_ts_s=failed_request.failed_ts,
  865. )
  866. failed_launches.append(launch)
  867. return pending_launches, failed_launches
  868. @classmethod
  869. def _parse_pending(cls, state: AutoscalingState) -> List[NodeInfo]:
  870. """
  871. Parse the pending requests/nodes from the autoscaling state.
  872. Args:
  873. state: the autoscaling state, empty if there's no autoscaling state
  874. being reported.
  875. Returns:
  876. pending_nodes: the list of pending nodes
  877. """
  878. pending_nodes = []
  879. for pending_node in state.pending_instances:
  880. pending_nodes.append(
  881. NodeInfo(
  882. instance_type_name=pending_node.instance_type_name,
  883. ray_node_type_name=pending_node.ray_node_type_name,
  884. details=pending_node.details,
  885. instance_id=pending_node.instance_id,
  886. ip_address=pending_node.ip_address,
  887. )
  888. )
  889. return pending_nodes
  890. cached_is_autoscaler_v2 = None
  891. def is_autoscaler_v2(
  892. fetch_from_server: bool = False, gcs_client: Optional[GcsClient] = None
  893. ) -> bool:
  894. """
  895. Check if the autoscaler is v2 from reading GCS internal KV.
  896. If the method is called multiple times, the result will be cached in the module.
  897. Args:
  898. fetch_from_server: If True, fetch the value from the GCS server, otherwise
  899. use the cached value.
  900. gcs_client: The GCS client to use. If not provided, the default GCS
  901. client will be used.
  902. Returns:
  903. is_v2: True if the autoscaler is v2, False otherwise.
  904. Raises:
  905. Exception: if GCS address could not be resolved (e.g. ray.init() not called)
  906. """
  907. # If env var is set to enable autoscaler v2, we should always return True.
  908. if ray._config.enable_autoscaler_v2() and not fetch_from_server:
  909. # TODO(rickyx): Once we migrate completely to v2, we should remove this.
  910. # While this short-circuit may allow client-server inconsistency
  911. # (e.g. client running v1, while server running v2), it's currently
  912. # not possible with existing use-cases.
  913. return True
  914. global cached_is_autoscaler_v2
  915. if cached_is_autoscaler_v2 is not None and not fetch_from_server:
  916. return cached_is_autoscaler_v2
  917. if gcs_client is None:
  918. gcs_client = internal_kv_get_gcs_client()
  919. assert gcs_client, (
  920. "GCS client is not available. Please initialize the global GCS client "
  921. "first by calling ray.init() or explicitly calls to _initialize_internal_kv()."
  922. )
  923. # See src/ray/common/constants.h for the definition of this key.
  924. cached_is_autoscaler_v2 = (
  925. gcs_client.internal_kv_get(
  926. ray._raylet.GCS_AUTOSCALER_V2_ENABLED_KEY.encode(),
  927. namespace=ray._raylet.GCS_AUTOSCALER_STATE_NAMESPACE.encode(),
  928. )
  929. == b"1"
  930. )
  931. return cached_is_autoscaler_v2
  932. def is_head_node(node_state: NodeState) -> bool:
  933. """
  934. Check if the node is a head node from the node state.
  935. Args:
  936. node_state: the node state
  937. Returns:
  938. is_head: True if the node is a head node, False otherwise.
  939. """
  940. # TODO: we should include this bit of information in the future.
  941. # NOTE: we could use labels in the future to determine if it's a head node.
  942. return "node:__internal_head__" in dict(node_state.total_resources)