ranker.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. """Ranker component for operator selection in streaming executor."""
  2. from abc import ABC, abstractmethod
  3. from typing import TYPE_CHECKING, Generic, List, Protocol, Tuple, TypeVar
  4. from ray.data._internal.execution.interfaces import PhysicalOperator
  5. if TYPE_CHECKING:
  6. from ray.data._internal.execution.resource_manager import ResourceManager
  7. from ray.data._internal.execution.streaming_executor_state import Topology
  8. # Protocol for comparable ranking values
  9. class Comparable(Protocol):
  10. """Protocol for types that can be compared for ranking."""
  11. def __lt__(self, other: "Comparable") -> bool:
  12. ...
  13. def __le__(self, other: "Comparable") -> bool:
  14. ...
  15. def __gt__(self, other: "Comparable") -> bool:
  16. ...
  17. def __ge__(self, other: "Comparable") -> bool:
  18. ...
  19. def __eq__(self, other: "Comparable") -> bool:
  20. ...
  21. # Generic type for comparable ranking values
  22. RankingValue = TypeVar("RankingValue", bound=Comparable)
  23. class Ranker(ABC, Generic[RankingValue]):
  24. """Abstract base class for operator ranking strategies."""
  25. @abstractmethod
  26. def rank_operator(
  27. self,
  28. op: PhysicalOperator,
  29. topology: "Topology",
  30. resource_manager: "ResourceManager",
  31. ) -> RankingValue:
  32. """Rank operator for execution priority.
  33. Operator to run next is selected as the one with the *smallest* value
  34. of the lexicographically ordered ranks composed of (in order):
  35. Args:
  36. op: Operator to rank
  37. topology: Current execution topology
  38. resource_manager: Resource manager for usage information
  39. Returns:
  40. Rank (tuple) for operator
  41. """
  42. pass
  43. def rank_operators(
  44. self,
  45. ops: List[PhysicalOperator],
  46. topology: "Topology",
  47. resource_manager: "ResourceManager",
  48. ) -> List[RankingValue]:
  49. assert len(ops) > 0
  50. return [self.rank_operator(op, topology, resource_manager) for op in ops]
  51. class DefaultRanker(Ranker[Tuple[int, int]]):
  52. """Ranker implementation."""
  53. def rank_operator(
  54. self,
  55. op: PhysicalOperator,
  56. topology: "Topology",
  57. resource_manager: "ResourceManager",
  58. ) -> Tuple[int, int]:
  59. """Computes rank for op. *Lower means better rank*
  60. 1. Whether operator's could be throttled (int)
  61. 2. Operators' object store utilization
  62. Args:
  63. op: Operator to rank
  64. topology: Current execution topology
  65. resource_manager: Resource manager for usage information
  66. Returns:
  67. Rank (tuple) for operator
  68. """
  69. throttling_disabled = 0 if op.throttling_disabled() else 1
  70. return (
  71. throttling_disabled,
  72. resource_manager.get_op_usage(op).object_store_memory,
  73. )