| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- from typing import List, Set, Tuple
- from ray.autoscaler._private import constants
- class NodeTracker:
- """Map nodes to their corresponding logs.
- We need to be a little careful here. At an given point in time, node_id <->
- ip can be interchangeably used, but the node_id -> ip relation is not
- bijective _across time_ since IP addresses can be reused. Therefore, we
- should treat node_id as the only unique identifier.
- """
- def __init__(self):
- # Mapping from node_id -> (ip, node type, stdout_path, process runner)
- self.node_mapping = {}
- # A quick, inefficient FIFO cache implementation.
- self.lru_order = []
- def _add_node_mapping(self, node_id: str, value: str):
- if node_id in self.node_mapping:
- return
- assert len(self.lru_order) == len(self.node_mapping)
- if len(self.lru_order) >= constants.AUTOSCALER_MAX_NODES_TRACKED:
- # The LRU eviction case
- node_id = self.lru_order.pop(0)
- del self.node_mapping[node_id]
- self.node_mapping[node_id] = value
- self.lru_order.append(node_id)
- def track(self, node_id: str, ip: str, node_type: str):
- """
- Begin to track a new node.
- Args:
- node_id: The node id.
- ip: The node ip address.
- node_type: The node type.
- """
- if node_id not in self.node_mapping:
- self._add_node_mapping(node_id, (ip, node_type))
- def untrack(self, node_id: str):
- """Gracefully stop tracking a node. If a node is intentionally removed from
- the cluster, we should stop tracking it so we don't mistakenly mark it
- as failed.
- Args:
- node_id: The node id which failed.
- """
- if node_id in self.node_mapping:
- self.lru_order.remove(node_id)
- del self.node_mapping[node_id]
- def get_all_failed_node_info(
- self, non_failed_ids: Set[str]
- ) -> List[Tuple[str, str]]:
- """Get the information about all failed nodes. A failed node is any node which
- we began to track that is not pending or alive (i.e. not failed).
- Args:
- non_failed_ids: Nodes are failed unless they are in this set.
- Returns:
- List[Tuple[str, str]]: A list of tuples. Each tuple is the ip
- address and type of a failed node.
- """
- failed_nodes = self.node_mapping.keys() - non_failed_ids
- failed_info = []
- # Returning the list in order is important for display purposes.
- for node_id in filter(lambda node_id: node_id in failed_nodes, self.lru_order):
- failed_info.append(self.node_mapping[node_id])
- return failed_info
|