metric_cardinality.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. from enum import Enum
  2. from typing import Callable, Dict, List
  3. from ray._private.ray_constants import RAY_METRIC_CARDINALITY_LEVEL
  4. from ray._private.telemetry.metric_types import MetricType
  5. # Keep in sync with the WorkerIdKey in src/ray/stats/tag_defs.cc
  6. WORKER_ID_TAG_KEY = "WorkerId"
  7. # Keep in sync with the NameKey in src/ray/stats/tag_defs.cc
  8. TASK_OR_ACTOR_NAME_TAG_KEY = "Name"
  9. # Aggregation functions for high-cardinality gauge metrics when labels are dropped.
  10. # Counter and Sum metrics always use sum() aggregation.
  11. HIGH_CARDINALITY_GAUGE_AGGREGATION: Dict[str, Callable[[List[float]], float]] = {
  12. "tasks": sum,
  13. "actors": sum,
  14. }
  15. _CARDINALITY_LEVEL = None
  16. _HIGH_CARDINALITY_LABELS: Dict[str, List[str]] = {}
  17. class MetricCardinality(str, Enum):
  18. """Cardinality level configuration for all Ray metrics (ray_tasks, ray_actors,
  19. etc.). This configurtion is used to determine whether to globally drop high
  20. cardinality labels. This is important for high scale clusters that might consist
  21. thousands of workers, millions of tasks.
  22. - LEGACY: Keep all labels. This is the default behavior.
  23. - RECOMMENDED: Drop high cardinality labels. The set of high cardinality labels
  24. are determined internally by Ray and not exposed to users. Currently, this includes
  25. the following labels: WorkerId
  26. - LOW: Same as RECOMMENDED, but also drop the Name label for tasks and actors.
  27. """
  28. LEGACY = "legacy"
  29. RECOMMENDED = "recommended"
  30. LOW = "low"
  31. @staticmethod
  32. def get_cardinality_level() -> "MetricCardinality":
  33. global _CARDINALITY_LEVEL
  34. if _CARDINALITY_LEVEL is not None:
  35. return _CARDINALITY_LEVEL
  36. try:
  37. _CARDINALITY_LEVEL = MetricCardinality(RAY_METRIC_CARDINALITY_LEVEL.lower())
  38. except ValueError:
  39. _CARDINALITY_LEVEL = MetricCardinality.LEGACY
  40. return _CARDINALITY_LEVEL
  41. @staticmethod
  42. def get_aggregation_function(
  43. metric_name: str, metric_type: MetricType = MetricType.GAUGE
  44. ) -> Callable[[List[float]], float]:
  45. """Get the aggregation function for a metric when labels are dropped. This method does not currently support histogram metrics.
  46. Args:
  47. metric_name: The name of the metric.
  48. metric_type: The type of the metric. If provided, Counter and Sum
  49. metrics always use sum() aggregation.
  50. Returns:
  51. A function that takes a list of values and returns the aggregated value.
  52. """
  53. # Counter and Sum metrics always aggregate by summing
  54. if metric_type in (MetricType.COUNTER, MetricType.SUM):
  55. return sum
  56. # Histogram metrics are not supported by this method
  57. if metric_type == MetricType.HISTOGRAM:
  58. raise ValueError("No Aggregation function for histogram metrics.")
  59. # Gauge metrics use metric-specific aggregation or default to first value
  60. if metric_name in HIGH_CARDINALITY_GAUGE_AGGREGATION:
  61. return HIGH_CARDINALITY_GAUGE_AGGREGATION[metric_name]
  62. return lambda values: values[0]
  63. @staticmethod
  64. def get_high_cardinality_metrics() -> List[str]:
  65. return list(HIGH_CARDINALITY_GAUGE_AGGREGATION.keys())
  66. @staticmethod
  67. def get_high_cardinality_labels_to_drop(metric_name: str) -> List[str]:
  68. """
  69. Get the high cardinality labels of the metric.
  70. """
  71. if metric_name in _HIGH_CARDINALITY_LABELS:
  72. return _HIGH_CARDINALITY_LABELS[metric_name]
  73. cardinality_level = MetricCardinality.get_cardinality_level()
  74. if (
  75. cardinality_level == MetricCardinality.LEGACY
  76. or metric_name not in MetricCardinality.get_high_cardinality_metrics()
  77. ):
  78. _HIGH_CARDINALITY_LABELS[metric_name] = []
  79. return []
  80. _HIGH_CARDINALITY_LABELS[metric_name] = [WORKER_ID_TAG_KEY]
  81. if cardinality_level == MetricCardinality.LOW and metric_name in [
  82. "tasks",
  83. "actors",
  84. ]:
  85. _HIGH_CARDINALITY_LABELS[metric_name].append(TASK_OR_ACTOR_NAME_TAG_KEY)
  86. return _HIGH_CARDINALITY_LABELS[metric_name]