| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848 |
- import json
- import logging
- import os
- import re
- import threading
- import time
- import traceback
- from collections import defaultdict, namedtuple
- from typing import Any, Dict, List, Set, Tuple, Union
- from opencensus.metrics.export.metric_descriptor import MetricDescriptorType
- from opencensus.metrics.export.value import ValueDouble
- from opencensus.stats import aggregation, measure as measure_module
- from opencensus.stats.aggregation_data import (
- CountAggregationData,
- DistributionAggregationData,
- LastValueAggregationData,
- SumAggregationData,
- )
- from opencensus.stats.base_exporter import StatsExporter
- from opencensus.stats.stats_recorder import StatsRecorder
- from opencensus.stats.view import View
- from opencensus.stats.view_manager import ViewManager
- from opencensus.tags import (
- tag_key as tag_key_module,
- tag_map as tag_map_module,
- tag_value as tag_value_module,
- )
- from prometheus_client.core import (
- CounterMetricFamily,
- GaugeMetricFamily,
- HistogramMetricFamily,
- Metric as PrometheusMetric,
- )
- import ray
- from ray._common.network_utils import build_address
- from ray._private.ray_constants import env_bool
- from ray._private.telemetry.metric_cardinality import (
- WORKER_ID_TAG_KEY,
- MetricCardinality,
- )
- from ray._raylet import GcsClient
- from ray.core.generated.metrics_pb2 import Metric
- from ray.util.metrics import _is_invalid_metric_name
- logger = logging.getLogger(__name__)
- # Env var key to decide worker timeout.
- # If the worker doesn't report for more than
- # this time, we treat workers as dead.
- RAY_WORKER_TIMEOUT_S = "RAY_WORKER_TIMEOUT_S"
- GLOBAL_COMPONENT_KEY = "CORE"
- RE_NON_ALPHANUMS = re.compile(r"[^a-zA-Z0-9]")
- class Gauge(View):
- """Gauge representation of opencensus view.
- This class is used to collect process metrics from the reporter agent.
- Cpp metrics should be collected in a different way.
- """
- def __init__(self, name, description, unit, tags: List[str]):
- if _is_invalid_metric_name(name):
- raise ValueError(
- f"Invalid metric name: {name}. Metric will be discarded "
- "and data will not be collected or published. "
- "Metric names can only contain letters, numbers, _, and :. "
- "Metric names cannot start with numbers."
- )
- self._measure = measure_module.MeasureInt(name, description, unit)
- self._description = description
- tags = [tag_key_module.TagKey(tag) for tag in tags]
- self._view = View(
- name, description, tags, self.measure, aggregation.LastValueAggregation()
- )
- @property
- def measure(self):
- return self._measure
- @property
- def view(self):
- return self._view
- @property
- def name(self):
- return self.measure.name
- @property
- def description(self):
- return self._description
- Record = namedtuple("Record", ["gauge", "value", "tags"])
- def fix_grpc_metric(metric: Metric):
- """
- Fix the inbound `opencensus.proto.metrics.v1.Metric` protos to make it acceptable
- by opencensus.stats.DistributionAggregationData.
- - metric name: gRPC OpenCensus metrics have names with slashes and dots, e.g.
- `grpc.io/client/server_latency`[1]. However Prometheus metric names only take
- alphanums,underscores and colons[2]. We santinize the name by replacing non-alphanum
- chars to underscore, like the official opencensus prometheus exporter[3].
- - distribution bucket bounds: The Metric proto asks distribution bucket bounds to
- be > 0 [4]. However, gRPC OpenCensus metrics have their first bucket bound == 0 [1].
- This makes the `DistributionAggregationData` constructor to raise Exceptions. This
- applies to all bytes and milliseconds (latencies). The fix: we update the initial 0
- bounds to be 0.000_000_1. This will not affect the precision of the metrics, since
- we don't expect any less-than-1 bytes, or less-than-1-nanosecond times.
- [1] https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md#units # noqa: E501
- [2] https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
- [3] https://github.com/census-instrumentation/opencensus-cpp/blob/50eb5de762e5f87e206c011a4f930adb1a1775b1/opencensus/exporters/stats/prometheus/internal/prometheus_utils.cc#L39 # noqa: E501
- [4] https://github.com/census-instrumentation/opencensus-proto/blob/master/src/opencensus/proto/metrics/v1/metrics.proto#L218 # noqa: E501
- """
- if not metric.metric_descriptor.name.startswith("grpc.io/"):
- return
- metric.metric_descriptor.name = RE_NON_ALPHANUMS.sub(
- "_", metric.metric_descriptor.name
- )
- for series in metric.timeseries:
- for point in series.points:
- if point.HasField("distribution_value"):
- dist_value = point.distribution_value
- bucket_bounds = dist_value.bucket_options.explicit.bounds
- if len(bucket_bounds) > 0 and bucket_bounds[0] == 0:
- bucket_bounds[0] = 0.000_000_1
- class OpencensusProxyMetric:
- def __init__(self, name: str, desc: str, unit: str, label_keys: List[str]):
- """Represents the OpenCensus metrics that will be proxy exported."""
- self._name = name
- self._desc = desc
- self._unit = unit
- # -- The label keys of the metric --
- self._label_keys = label_keys
- # -- The data that needs to be proxy exported --
- # tuple of label values -> data (OpenCesnsus Aggregation data)
- self._data = {}
- @property
- def name(self):
- return self._name
- @property
- def desc(self):
- return self._desc
- @property
- def unit(self):
- return self._unit
- @property
- def label_keys(self):
- return self._label_keys
- @property
- def data(self):
- return self._data
- def is_distribution_aggregation_data(self):
- """Check if the metric is a distribution aggreation metric."""
- return len(self._data) > 0 and isinstance(
- next(iter(self._data.values())), DistributionAggregationData
- )
- def add_data(self, label_values: Tuple, data: Any):
- """Add the data to the metric.
- Args:
- label_values: The label values of the metric.
- data: The data to be added.
- """
- self._data[label_values] = data
- def record(self, metric: Metric):
- """Parse the Opencensus Protobuf and store the data.
- The data can be accessed via `data` API once recorded.
- """
- timeseries = metric.timeseries
- if len(timeseries) == 0:
- return
- # Create the aggregation and fill it in the our stats
- for series in timeseries:
- labels = tuple(val.value for val in series.label_values)
- # Aggregate points.
- for point in series.points:
- if (
- metric.metric_descriptor.type
- == MetricDescriptorType.CUMULATIVE_INT64
- ):
- data = CountAggregationData(point.int64_value)
- elif (
- metric.metric_descriptor.type
- == MetricDescriptorType.CUMULATIVE_DOUBLE
- ):
- data = SumAggregationData(ValueDouble, point.double_value)
- elif metric.metric_descriptor.type == MetricDescriptorType.GAUGE_DOUBLE:
- data = LastValueAggregationData(ValueDouble, point.double_value)
- elif (
- metric.metric_descriptor.type
- == MetricDescriptorType.CUMULATIVE_DISTRIBUTION
- ):
- dist_value = point.distribution_value
- counts_per_bucket = [bucket.count for bucket in dist_value.buckets]
- bucket_bounds = dist_value.bucket_options.explicit.bounds
- data = DistributionAggregationData(
- dist_value.sum / dist_value.count,
- dist_value.count,
- dist_value.sum_of_squared_deviation,
- counts_per_bucket,
- bucket_bounds,
- )
- else:
- raise ValueError("Summary is not supported")
- self._data[labels] = data
- class Component:
- def __init__(self, id: str):
- """Represent a component that requests to proxy export metrics
- Args:
- id: Id of this component.
- """
- self.id = id
- # -- The time this component reported its metrics last time --
- # It is used to figure out if this component is stale.
- self._last_reported_time = time.monotonic()
- # -- Metrics requested to proxy export from this component --
- # metrics_name (str) -> metric (OpencensusProxyMetric)
- self._metrics = {}
- @property
- def metrics(self) -> Dict[str, OpencensusProxyMetric]:
- """Return the metrics requested to proxy export from this component."""
- return self._metrics
- @property
- def last_reported_time(self):
- return self._last_reported_time
- def record(self, metrics: List[Metric]):
- """Parse the Opencensus protobuf and store metrics.
- Metrics can be accessed via `metrics` API for proxy export.
- Args:
- metrics: A list of Opencensus protobuf for proxy export.
- """
- self._last_reported_time = time.monotonic()
- for metric in metrics:
- fix_grpc_metric(metric)
- descriptor = metric.metric_descriptor
- name = descriptor.name
- label_keys = [label_key.key for label_key in descriptor.label_keys]
- if name not in self._metrics:
- self._metrics[name] = OpencensusProxyMetric(
- name, descriptor.description, descriptor.unit, label_keys
- )
- self._metrics[name].record(metric)
- class OpenCensusProxyCollector:
- def __init__(self, namespace: str, component_timeout_s: int = 60):
- """Prometheus collector implementation for opencensus proxy export.
- Prometheus collector requires to implement `collect` which is
- invoked whenever Prometheus queries the endpoint.
- The class is thread-safe.
- Args:
- namespace: Prometheus namespace.
- """
- # -- Protect `self._components` --
- self._components_lock = threading.Lock()
- # -- Timeout until the component is marked as stale --
- # Once the component is considered as stale,
- # the metrics from that worker won't be exported.
- self._component_timeout_s = component_timeout_s
- # -- Prometheus namespace --
- self._namespace = namespace
- # -- Component that requests to proxy export metrics --
- # Component means core worker, raylet, and GCS.
- # component_id -> Components
- # For workers, they contain worker ids.
- # For other components (raylet, GCS),
- # they contain the global key `GLOBAL_COMPONENT_KEY`.
- self._components = {}
- # Whether we want to export counter as gauge.
- # This is for bug compatibility.
- # See https://github.com/ray-project/ray/pull/43795.
- self._export_counter_as_gauge = env_bool("RAY_EXPORT_COUNTER_AS_GAUGE", True)
- def record(self, metrics: List[Metric], worker_id_hex: str = None):
- """Record the metrics reported from the component that reports it.
- Args:
- metrics: A list of opencensus protobuf to proxy export metrics.
- worker_id_hex: A worker id that reports these metrics.
- If None, it means they are reported from Raylet or GCS.
- """
- key = GLOBAL_COMPONENT_KEY if not worker_id_hex else worker_id_hex
- with self._components_lock:
- if key not in self._components:
- self._components[key] = Component(key)
- self._components[key].record(metrics)
- def clean_stale_components(self):
- """Clean up stale components.
- Stale means the component is dead or unresponsive.
- Stale components won't be reported to Prometheus anymore.
- """
- with self._components_lock:
- stale_components = []
- stale_component_ids = []
- for id, component in self._components.items():
- elapsed = time.monotonic() - component.last_reported_time
- if elapsed > self._component_timeout_s:
- stale_component_ids.append(id)
- logger.info(
- "Metrics from a worker ({}) is cleaned up due to "
- "timeout. Time since last report {}s".format(id, elapsed)
- )
- for id in stale_component_ids:
- stale_components.append(self._components.pop(id))
- return stale_components
- # TODO(sang): add start and end timestamp
- def to_prometheus_metrics(
- self,
- metric_name: str,
- metric_description: str,
- label_keys: List[str],
- metric_units: str,
- label_values: Tuple[tag_value_module.TagValue],
- agg_data: Any,
- metrics_map: Dict[str, List[PrometheusMetric]],
- ) -> None:
- """to_metric translate the data that OpenCensus create
- to Prometheus format, using Prometheus Metric object.
- This method is from Opencensus Prometheus Exporter.
- Args:
- metric_name: Name of the metric.
- metric_description: Description of the metric.
- label_keys: The fixed label keys of the metric.
- metric_units: Units of the metric.
- label_values: The values of `label_keys`.
- agg_data: `opencensus.stats.aggregation_data.AggregationData` object.
- Aggregated data that needs to be converted as Prometheus samples
- metrics_map: The converted metric is added to this map.
- """
- assert self._components_lock.locked()
- metric_name = f"{self._namespace}_{metric_name}"
- assert len(label_values) == len(label_keys), (label_values, label_keys)
- # Prometheus requires that all tag values be strings hence
- # the need to cast none to the empty string before exporting. See
- # https://github.com/census-instrumentation/opencensus-python/issues/480
- label_values = [tv if tv else "" for tv in label_values]
- if isinstance(agg_data, CountAggregationData):
- metrics = metrics_map.get(metric_name)
- if not metrics:
- metric = CounterMetricFamily(
- name=metric_name,
- documentation=metric_description,
- unit=metric_units,
- labels=label_keys,
- )
- metrics = [metric]
- metrics_map[metric_name] = metrics
- metrics[0].add_metric(labels=label_values, value=agg_data.count_data)
- return
- if isinstance(agg_data, SumAggregationData):
- # This should be emitted as prometheus counter
- # but we used to emit it as prometheus gauge.
- # To keep the backward compatibility
- # (changing from counter to gauge changes the metric name
- # since prometheus client will add "_total" suffix to counter
- # per OpenMetrics specification),
- # we now emit both counter and gauge and in the
- # next major Ray release (3.0) we can stop emitting gauge.
- # This leaves people enough time to migrate their dashboards.
- # See https://github.com/ray-project/ray/pull/43795.
- metrics = metrics_map.get(metric_name)
- if not metrics:
- metric = CounterMetricFamily(
- name=metric_name,
- documentation=metric_description,
- labels=label_keys,
- )
- metrics = [metric]
- metrics_map[metric_name] = metrics
- metrics[0].add_metric(labels=label_values, value=agg_data.sum_data)
- if not self._export_counter_as_gauge:
- pass
- elif metric_name.endswith("_total"):
- # In this case, we only need to emit prometheus counter
- # since for metric name already ends with _total suffix
- # prometheus client won't change it
- # so there is no backward compatibility issue.
- # See https://prometheus.github.io/client_python/instrumenting/counter/
- pass
- else:
- if len(metrics) == 1:
- metric = GaugeMetricFamily(
- name=metric_name,
- documentation=(
- f"(DEPRECATED, use {metric_name}_total metric instead) "
- f"{metric_description}"
- ),
- labels=label_keys,
- )
- metrics.append(metric)
- assert len(metrics) == 2
- metrics[1].add_metric(labels=label_values, value=agg_data.sum_data)
- return
- elif isinstance(agg_data, DistributionAggregationData):
- assert agg_data.bounds == sorted(agg_data.bounds)
- # buckets are a list of buckets. Each bucket is another list with
- # a pair of bucket name and value, or a triple of bucket name,
- # value, and exemplar. buckets need to be in order.
- buckets = []
- cum_count = 0 # Prometheus buckets expect cumulative count.
- for ii, bound in enumerate(agg_data.bounds):
- cum_count += agg_data.counts_per_bucket[ii]
- bucket = [str(bound), cum_count]
- buckets.append(bucket)
- # Prometheus requires buckets to be sorted, and +Inf present.
- # In OpenCensus we don't have +Inf in the bucket bonds so need to
- # append it here.
- buckets.append(["+Inf", agg_data.count_data])
- metrics = metrics_map.get(metric_name)
- if not metrics:
- metric = HistogramMetricFamily(
- name=metric_name,
- documentation=metric_description,
- labels=label_keys,
- )
- metrics = [metric]
- metrics_map[metric_name] = metrics
- metrics[0].add_metric(
- labels=label_values,
- buckets=buckets,
- sum_value=agg_data.sum,
- )
- return
- elif isinstance(agg_data, LastValueAggregationData):
- metrics = metrics_map.get(metric_name)
- if not metrics:
- metric = GaugeMetricFamily(
- name=metric_name,
- documentation=metric_description,
- labels=label_keys,
- )
- metrics = [metric]
- metrics_map[metric_name] = metrics
- metrics[0].add_metric(labels=label_values, value=agg_data.value)
- return
- else:
- raise ValueError(f"unsupported aggregation type {type(agg_data)}")
- def _aggregate_metric_data(
- self,
- datas: List[
- Union[LastValueAggregationData, CountAggregationData, SumAggregationData]
- ],
- ) -> Union[LastValueAggregationData, CountAggregationData, SumAggregationData]:
- assert len(datas) > 0
- sample = datas[0]
- if isinstance(sample, LastValueAggregationData):
- return LastValueAggregationData(
- ValueDouble, sum([data.value for data in datas])
- )
- if isinstance(sample, CountAggregationData):
- return CountAggregationData(sum([data.count_data for data in datas]))
- if isinstance(sample, SumAggregationData):
- return SumAggregationData(
- ValueDouble, sum([data.sum_data for data in datas])
- )
- raise ValueError(
- f"Unsupported aggregation type {type(sample)}. "
- "Supported types are "
- f"{CountAggregationData}, {LastValueAggregationData}, {SumAggregationData}."
- f"Got {datas}."
- )
- def _aggregate_with_recommended_cardinality(
- self,
- per_worker_metrics: List[OpencensusProxyMetric],
- ) -> List[OpencensusProxyMetric]:
- """Collect per-worker metrics, aggregate them into per-node metrics and convert
- them to Prometheus format.
- Args:
- per_worker_metrics: A list of per-worker metrics for the same metric name.
- Returns:
- A list of per-node metrics for the same metric name, with the high
- cardinality labels removed and the values aggregated.
- """
- metric = next(iter(per_worker_metrics), None)
- if not metric or WORKER_ID_TAG_KEY not in metric.label_keys:
- # No high cardinality labels, return the original metrics.
- return per_worker_metrics
- worker_id_label_index = metric.label_keys.index(WORKER_ID_TAG_KEY)
- # map from the tuple of label values without worker_id to the list of per worker
- # task metrics
- label_value_to_data: Dict[
- Tuple,
- List[
- Union[
- LastValueAggregationData,
- CountAggregationData,
- SumAggregationData,
- ]
- ],
- ] = defaultdict(list)
- for metric in per_worker_metrics:
- for label_values, data in metric.data.items():
- # remove the worker_id from the label values
- label_value_to_data[
- label_values[:worker_id_label_index]
- + label_values[worker_id_label_index + 1 :]
- ].append(data)
- aggregated_metric = OpencensusProxyMetric(
- name=metric.name,
- desc=metric.desc,
- unit=metric.unit,
- # remove the worker_id from the label keys
- label_keys=metric.label_keys[:worker_id_label_index]
- + metric.label_keys[worker_id_label_index + 1 :],
- )
- for label_values, datas in label_value_to_data.items():
- aggregated_metric.add_data(
- label_values,
- self._aggregate_metric_data(datas),
- )
- return [aggregated_metric]
- def collect(self): # pragma: NO COVER
- """Collect fetches the statistics from OpenCensus
- and delivers them as Prometheus Metrics.
- Collect is invoked every time a prometheus.Gatherer is run
- for example when the HTTP endpoint is invoked by Prometheus.
- This method is required as a Prometheus Collector.
- """
- with self._components_lock:
- # First construct the list of opencensus metrics to be converted to
- # prometheus metrics. For LEGACY cardinality level, this comprises all
- # metrics from all components. For RECOMMENDED cardinality level, we need
- # to remove the high cardinality labels and aggreate the component metrics.
- open_cencus_metrics: List[OpencensusProxyMetric] = []
- # The metrics that need to be aggregated with recommended cardinality. Key
- # is the metric name and value is the list of per-worker metrics.
- to_lower_cardinality: Dict[str, List[OpencensusProxyMetric]] = defaultdict(
- list
- )
- cardinality_level = MetricCardinality.get_cardinality_level()
- for component in self._components.values():
- for metric in component.metrics.values():
- if (
- cardinality_level == MetricCardinality.RECOMMENDED
- and not metric.is_distribution_aggregation_data()
- ):
- # We reduce the cardinality for all metrics except for histogram
- # metrics. The aggregation of histogram metrics from worker
- # level to node level is not well defined. In addition, we
- # currently have very few histogram metrics in Ray
- # so the impact of them is negligible.
- to_lower_cardinality[metric.name].append(metric)
- else:
- open_cencus_metrics.append(metric)
- for per_worker_metrics in to_lower_cardinality.values():
- open_cencus_metrics.extend(
- self._aggregate_with_recommended_cardinality(
- per_worker_metrics,
- )
- )
- prometheus_metrics_map = {}
- for metric in open_cencus_metrics:
- for label_values, data in metric.data.items():
- self.to_prometheus_metrics(
- metric.name,
- metric.desc,
- metric.label_keys,
- metric.unit,
- label_values,
- data,
- prometheus_metrics_map,
- )
- for metrics in prometheus_metrics_map.values():
- for metric in metrics:
- yield metric
- class MetricsAgent:
- def __init__(
- self,
- view_manager: ViewManager,
- stats_recorder: StatsRecorder,
- stats_exporter: StatsExporter = None,
- ):
- """A class to record and export metrics.
- The class exports metrics in 2 different ways.
- - Directly record and export metrics using OpenCensus.
- - Proxy metrics from other core components
- (e.g., raylet, GCS, core workers).
- This class is thread-safe.
- """
- # Lock required because gRPC server uses
- # multiple threads to process requests.
- self._lock = threading.Lock()
- #
- # Opencensus components to record metrics.
- #
- # Managing views to export metrics
- # If the stats_exporter is None, we disable all metrics export.
- self.view_manager = view_manager
- # A class that's used to record metrics
- # emitted from the current process.
- self.stats_recorder = stats_recorder
- # A class to export metrics.
- self.stats_exporter = stats_exporter
- # -- A Prometheus custom collector to proxy export metrics --
- # `None` if the prometheus server is not started.
- self.proxy_exporter_collector = None
- if self.stats_exporter is None:
- # If the exporter is not given,
- # we disable metrics collection.
- self.view_manager = None
- else:
- self.view_manager.register_exporter(stats_exporter)
- self.proxy_exporter_collector = OpenCensusProxyCollector(
- self.stats_exporter.options.namespace,
- component_timeout_s=int(os.getenv(RAY_WORKER_TIMEOUT_S, 120)),
- )
- # Registered view names.
- self._registered_views: Set[str] = set()
- def record_and_export(self, records: List[Record], global_tags=None):
- """Directly record and export stats from the same process."""
- global_tags = global_tags or {}
- with self._lock:
- if not self.view_manager:
- return
- for record in records:
- gauge = record.gauge
- value = record.value
- tags = record.tags
- try:
- self._record_gauge(gauge, value, {**tags, **global_tags})
- except Exception as e:
- logger.error(
- f"Failed to record metric {gauge.name} with value {value} with tags {tags!r} and global tags {global_tags!r} due to: {e!r}"
- )
- def _record_gauge(self, gauge: Gauge, value: float, tags: dict):
- if gauge.name not in self._registered_views:
- self.view_manager.register_view(gauge.view)
- self._registered_views.add(gauge.name)
- measurement_map = self.stats_recorder.new_measurement_map()
- tag_map = tag_map_module.TagMap()
- for key, tag_val in tags.items():
- try:
- tag_key = tag_key_module.TagKey(key)
- except ValueError as e:
- logger.error(
- f"Failed to create tag key {key} for metric {gauge.name} due to: {e!r}"
- )
- raise e
- try:
- tag_value = tag_value_module.TagValue(tag_val)
- except ValueError as e:
- logger.error(
- f"Failed to create tag value {tag_val} for key {key} for metric {gauge.name} due to: {e!r}"
- )
- raise e
- tag_map.insert(tag_key, tag_value)
- measurement_map.measure_float_put(gauge.measure, value)
- # NOTE: When we record this metric, timestamp will be renewed.
- measurement_map.record(tag_map)
- def proxy_export_metrics(self, metrics: List[Metric], worker_id_hex: str = None):
- """Proxy export metrics specified by a Opencensus Protobuf.
- This API is used to export metrics emitted from
- core components.
- Args:
- metrics: A list of protobuf Metric defined from OpenCensus.
- worker_id_hex: The worker ID it proxies metrics export. None
- if the metric is not from a worker (i.e., raylet, GCS).
- """
- with self._lock:
- if not self.view_manager:
- return
- self._proxy_export_metrics(metrics, worker_id_hex)
- def _proxy_export_metrics(self, metrics: List[Metric], worker_id_hex: str = None):
- self.proxy_exporter_collector.record(metrics, worker_id_hex)
- def clean_all_dead_worker_metrics(self):
- """Clean dead worker's metrics.
- Worker metrics are cleaned up and won't be exported once
- it is considered as dead.
- This method has to be periodically called by a caller.
- """
- with self._lock:
- if not self.view_manager:
- return
- self.proxy_exporter_collector.clean_stale_components()
- class PrometheusServiceDiscoveryWriter(threading.Thread):
- """A class to support Prometheus service discovery.
- It supports file-based service discovery. Checkout
- https://prometheus.io/docs/guides/file-sd/ for more details.
- Args:
- gcs_address: Gcs address for this cluster.
- temp_dir: Temporary directory used by
- Ray to store logs and metadata.
- """
- def __init__(self, gcs_address, temp_dir):
- gcs_client_options = ray._raylet.GcsClientOptions.create(
- gcs_address, None, allow_cluster_id_nil=True, fetch_cluster_id_if_nil=False
- )
- self.gcs_address = gcs_address
- ray._private.state.state._initialize_global_state(gcs_client_options)
- self.temp_dir = temp_dir
- self.default_service_discovery_flush_period = 5
- # The last service discovery content that PrometheusServiceDiscoveryWriter has seen
- self.latest_service_discovery_content = []
- self._content_lock = threading.RLock()
- super().__init__()
- def get_latest_service_discovery_content(self):
- """Return the latest stored service discovery content."""
- with self._content_lock:
- return self.latest_service_discovery_content
- def get_file_discovery_content(self):
- """Return the content for Prometheus service discovery."""
- nodes = ray.nodes()
- metrics_export_addresses = [
- build_address(node["NodeManagerAddress"], node["MetricsExportPort"])
- for node in nodes
- if node["alive"] is True
- ]
- gcs_client = GcsClient(address=self.gcs_address)
- autoscaler_addr = gcs_client.internal_kv_get(b"AutoscalerMetricsAddress", None)
- if autoscaler_addr:
- metrics_export_addresses.append(autoscaler_addr.decode("utf-8"))
- dashboard_addr = gcs_client.internal_kv_get(b"DashboardMetricsAddress", None)
- if dashboard_addr:
- metrics_export_addresses.append(dashboard_addr.decode("utf-8"))
- content = [{"labels": {"job": "ray"}, "targets": metrics_export_addresses}]
- with self._content_lock:
- self.latest_service_discovery_content = content
- return json.dumps(content)
- def write(self):
- # Write a file based on https://prometheus.io/docs/guides/file-sd/
- # Write should be atomic. Otherwise, Prometheus raises an error that
- # json file format is invalid because it reads a file when
- # file is re-written. Note that Prometheus still works although we
- # have this error.
- temp_file_name = self.get_temp_file_name()
- with open(temp_file_name, "w") as json_file:
- json_file.write(self.get_file_discovery_content())
- # NOTE: os.replace is atomic on both Linux and Windows, so we won't
- # have race condition reading this file.
- os.replace(temp_file_name, self.get_target_file_name())
- def get_target_file_name(self):
- return os.path.join(
- self.temp_dir, ray._private.ray_constants.PROMETHEUS_SERVICE_DISCOVERY_FILE
- )
- def get_temp_file_name(self):
- return os.path.join(
- self.temp_dir,
- "{}_{}".format(
- "tmp", ray._private.ray_constants.PROMETHEUS_SERVICE_DISCOVERY_FILE
- ),
- )
- def run(self):
- while True:
- # This thread won't be broken by exceptions.
- try:
- self.write()
- except Exception as e:
- logger.warning(
- "Writing a service discovery file, {},"
- "failed.".format(self.get_target_file_name())
- )
- logger.warning(traceback.format_exc())
- logger.warning(f"Error message: {e}")
- time.sleep(self.default_service_discovery_flush_period)
|