node_tracker.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from typing import List, Set, Tuple
  2. from ray.autoscaler._private import constants
  3. class NodeTracker:
  4. """Map nodes to their corresponding logs.
  5. We need to be a little careful here. At an given point in time, node_id <->
  6. ip can be interchangeably used, but the node_id -> ip relation is not
  7. bijective _across time_ since IP addresses can be reused. Therefore, we
  8. should treat node_id as the only unique identifier.
  9. """
  10. def __init__(self):
  11. # Mapping from node_id -> (ip, node type, stdout_path, process runner)
  12. self.node_mapping = {}
  13. # A quick, inefficient FIFO cache implementation.
  14. self.lru_order = []
  15. def _add_node_mapping(self, node_id: str, value: str):
  16. if node_id in self.node_mapping:
  17. return
  18. assert len(self.lru_order) == len(self.node_mapping)
  19. if len(self.lru_order) >= constants.AUTOSCALER_MAX_NODES_TRACKED:
  20. # The LRU eviction case
  21. node_id = self.lru_order.pop(0)
  22. del self.node_mapping[node_id]
  23. self.node_mapping[node_id] = value
  24. self.lru_order.append(node_id)
  25. def track(self, node_id: str, ip: str, node_type: str):
  26. """
  27. Begin to track a new node.
  28. Args:
  29. node_id: The node id.
  30. ip: The node ip address.
  31. node_type: The node type.
  32. """
  33. if node_id not in self.node_mapping:
  34. self._add_node_mapping(node_id, (ip, node_type))
  35. def untrack(self, node_id: str):
  36. """Gracefully stop tracking a node. If a node is intentionally removed from
  37. the cluster, we should stop tracking it so we don't mistakenly mark it
  38. as failed.
  39. Args:
  40. node_id: The node id which failed.
  41. """
  42. if node_id in self.node_mapping:
  43. self.lru_order.remove(node_id)
  44. del self.node_mapping[node_id]
  45. def get_all_failed_node_info(
  46. self, non_failed_ids: Set[str]
  47. ) -> List[Tuple[str, str]]:
  48. """Get the information about all failed nodes. A failed node is any node which
  49. we began to track that is not pending or alive (i.e. not failed).
  50. Args:
  51. non_failed_ids: Nodes are failed unless they are in this set.
  52. Returns:
  53. List[Tuple[str, str]]: A list of tuples. Each tuple is the ip
  54. address and type of a failed node.
  55. """
  56. failed_nodes = self.node_mapping.keys() - non_failed_ids
  57. failed_info = []
  58. # Returning the list in order is important for display purposes.
  59. for node_id in filter(lambda node_id: node_id in failed_nodes, self.lru_order):
  60. failed_info.append(self.node_mapping[node_id])
  61. return failed_info