metrics_agent.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848
  1. import json
  2. import logging
  3. import os
  4. import re
  5. import threading
  6. import time
  7. import traceback
  8. from collections import defaultdict, namedtuple
  9. from typing import Any, Dict, List, Set, Tuple, Union
  10. from opencensus.metrics.export.metric_descriptor import MetricDescriptorType
  11. from opencensus.metrics.export.value import ValueDouble
  12. from opencensus.stats import aggregation, measure as measure_module
  13. from opencensus.stats.aggregation_data import (
  14. CountAggregationData,
  15. DistributionAggregationData,
  16. LastValueAggregationData,
  17. SumAggregationData,
  18. )
  19. from opencensus.stats.base_exporter import StatsExporter
  20. from opencensus.stats.stats_recorder import StatsRecorder
  21. from opencensus.stats.view import View
  22. from opencensus.stats.view_manager import ViewManager
  23. from opencensus.tags import (
  24. tag_key as tag_key_module,
  25. tag_map as tag_map_module,
  26. tag_value as tag_value_module,
  27. )
  28. from prometheus_client.core import (
  29. CounterMetricFamily,
  30. GaugeMetricFamily,
  31. HistogramMetricFamily,
  32. Metric as PrometheusMetric,
  33. )
  34. import ray
  35. from ray._common.network_utils import build_address
  36. from ray._private.ray_constants import env_bool
  37. from ray._private.telemetry.metric_cardinality import (
  38. WORKER_ID_TAG_KEY,
  39. MetricCardinality,
  40. )
  41. from ray._raylet import GcsClient
  42. from ray.core.generated.metrics_pb2 import Metric
  43. from ray.util.metrics import _is_invalid_metric_name
  44. logger = logging.getLogger(__name__)
  45. # Env var key to decide worker timeout.
  46. # If the worker doesn't report for more than
  47. # this time, we treat workers as dead.
  48. RAY_WORKER_TIMEOUT_S = "RAY_WORKER_TIMEOUT_S"
  49. GLOBAL_COMPONENT_KEY = "CORE"
  50. RE_NON_ALPHANUMS = re.compile(r"[^a-zA-Z0-9]")
  51. class Gauge(View):
  52. """Gauge representation of opencensus view.
  53. This class is used to collect process metrics from the reporter agent.
  54. Cpp metrics should be collected in a different way.
  55. """
  56. def __init__(self, name, description, unit, tags: List[str]):
  57. if _is_invalid_metric_name(name):
  58. raise ValueError(
  59. f"Invalid metric name: {name}. Metric will be discarded "
  60. "and data will not be collected or published. "
  61. "Metric names can only contain letters, numbers, _, and :. "
  62. "Metric names cannot start with numbers."
  63. )
  64. self._measure = measure_module.MeasureInt(name, description, unit)
  65. self._description = description
  66. tags = [tag_key_module.TagKey(tag) for tag in tags]
  67. self._view = View(
  68. name, description, tags, self.measure, aggregation.LastValueAggregation()
  69. )
  70. @property
  71. def measure(self):
  72. return self._measure
  73. @property
  74. def view(self):
  75. return self._view
  76. @property
  77. def name(self):
  78. return self.measure.name
  79. @property
  80. def description(self):
  81. return self._description
  82. Record = namedtuple("Record", ["gauge", "value", "tags"])
  83. def fix_grpc_metric(metric: Metric):
  84. """
  85. Fix the inbound `opencensus.proto.metrics.v1.Metric` protos to make it acceptable
  86. by opencensus.stats.DistributionAggregationData.
  87. - metric name: gRPC OpenCensus metrics have names with slashes and dots, e.g.
  88. `grpc.io/client/server_latency`[1]. However Prometheus metric names only take
  89. alphanums,underscores and colons[2]. We santinize the name by replacing non-alphanum
  90. chars to underscore, like the official opencensus prometheus exporter[3].
  91. - distribution bucket bounds: The Metric proto asks distribution bucket bounds to
  92. be > 0 [4]. However, gRPC OpenCensus metrics have their first bucket bound == 0 [1].
  93. This makes the `DistributionAggregationData` constructor to raise Exceptions. This
  94. applies to all bytes and milliseconds (latencies). The fix: we update the initial 0
  95. bounds to be 0.000_000_1. This will not affect the precision of the metrics, since
  96. we don't expect any less-than-1 bytes, or less-than-1-nanosecond times.
  97. [1] https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md#units # noqa: E501
  98. [2] https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
  99. [3] https://github.com/census-instrumentation/opencensus-cpp/blob/50eb5de762e5f87e206c011a4f930adb1a1775b1/opencensus/exporters/stats/prometheus/internal/prometheus_utils.cc#L39 # noqa: E501
  100. [4] https://github.com/census-instrumentation/opencensus-proto/blob/master/src/opencensus/proto/metrics/v1/metrics.proto#L218 # noqa: E501
  101. """
  102. if not metric.metric_descriptor.name.startswith("grpc.io/"):
  103. return
  104. metric.metric_descriptor.name = RE_NON_ALPHANUMS.sub(
  105. "_", metric.metric_descriptor.name
  106. )
  107. for series in metric.timeseries:
  108. for point in series.points:
  109. if point.HasField("distribution_value"):
  110. dist_value = point.distribution_value
  111. bucket_bounds = dist_value.bucket_options.explicit.bounds
  112. if len(bucket_bounds) > 0 and bucket_bounds[0] == 0:
  113. bucket_bounds[0] = 0.000_000_1
  114. class OpencensusProxyMetric:
  115. def __init__(self, name: str, desc: str, unit: str, label_keys: List[str]):
  116. """Represents the OpenCensus metrics that will be proxy exported."""
  117. self._name = name
  118. self._desc = desc
  119. self._unit = unit
  120. # -- The label keys of the metric --
  121. self._label_keys = label_keys
  122. # -- The data that needs to be proxy exported --
  123. # tuple of label values -> data (OpenCesnsus Aggregation data)
  124. self._data = {}
  125. @property
  126. def name(self):
  127. return self._name
  128. @property
  129. def desc(self):
  130. return self._desc
  131. @property
  132. def unit(self):
  133. return self._unit
  134. @property
  135. def label_keys(self):
  136. return self._label_keys
  137. @property
  138. def data(self):
  139. return self._data
  140. def is_distribution_aggregation_data(self):
  141. """Check if the metric is a distribution aggreation metric."""
  142. return len(self._data) > 0 and isinstance(
  143. next(iter(self._data.values())), DistributionAggregationData
  144. )
  145. def add_data(self, label_values: Tuple, data: Any):
  146. """Add the data to the metric.
  147. Args:
  148. label_values: The label values of the metric.
  149. data: The data to be added.
  150. """
  151. self._data[label_values] = data
  152. def record(self, metric: Metric):
  153. """Parse the Opencensus Protobuf and store the data.
  154. The data can be accessed via `data` API once recorded.
  155. """
  156. timeseries = metric.timeseries
  157. if len(timeseries) == 0:
  158. return
  159. # Create the aggregation and fill it in the our stats
  160. for series in timeseries:
  161. labels = tuple(val.value for val in series.label_values)
  162. # Aggregate points.
  163. for point in series.points:
  164. if (
  165. metric.metric_descriptor.type
  166. == MetricDescriptorType.CUMULATIVE_INT64
  167. ):
  168. data = CountAggregationData(point.int64_value)
  169. elif (
  170. metric.metric_descriptor.type
  171. == MetricDescriptorType.CUMULATIVE_DOUBLE
  172. ):
  173. data = SumAggregationData(ValueDouble, point.double_value)
  174. elif metric.metric_descriptor.type == MetricDescriptorType.GAUGE_DOUBLE:
  175. data = LastValueAggregationData(ValueDouble, point.double_value)
  176. elif (
  177. metric.metric_descriptor.type
  178. == MetricDescriptorType.CUMULATIVE_DISTRIBUTION
  179. ):
  180. dist_value = point.distribution_value
  181. counts_per_bucket = [bucket.count for bucket in dist_value.buckets]
  182. bucket_bounds = dist_value.bucket_options.explicit.bounds
  183. data = DistributionAggregationData(
  184. dist_value.sum / dist_value.count,
  185. dist_value.count,
  186. dist_value.sum_of_squared_deviation,
  187. counts_per_bucket,
  188. bucket_bounds,
  189. )
  190. else:
  191. raise ValueError("Summary is not supported")
  192. self._data[labels] = data
  193. class Component:
  194. def __init__(self, id: str):
  195. """Represent a component that requests to proxy export metrics
  196. Args:
  197. id: Id of this component.
  198. """
  199. self.id = id
  200. # -- The time this component reported its metrics last time --
  201. # It is used to figure out if this component is stale.
  202. self._last_reported_time = time.monotonic()
  203. # -- Metrics requested to proxy export from this component --
  204. # metrics_name (str) -> metric (OpencensusProxyMetric)
  205. self._metrics = {}
  206. @property
  207. def metrics(self) -> Dict[str, OpencensusProxyMetric]:
  208. """Return the metrics requested to proxy export from this component."""
  209. return self._metrics
  210. @property
  211. def last_reported_time(self):
  212. return self._last_reported_time
  213. def record(self, metrics: List[Metric]):
  214. """Parse the Opencensus protobuf and store metrics.
  215. Metrics can be accessed via `metrics` API for proxy export.
  216. Args:
  217. metrics: A list of Opencensus protobuf for proxy export.
  218. """
  219. self._last_reported_time = time.monotonic()
  220. for metric in metrics:
  221. fix_grpc_metric(metric)
  222. descriptor = metric.metric_descriptor
  223. name = descriptor.name
  224. label_keys = [label_key.key for label_key in descriptor.label_keys]
  225. if name not in self._metrics:
  226. self._metrics[name] = OpencensusProxyMetric(
  227. name, descriptor.description, descriptor.unit, label_keys
  228. )
  229. self._metrics[name].record(metric)
  230. class OpenCensusProxyCollector:
  231. def __init__(self, namespace: str, component_timeout_s: int = 60):
  232. """Prometheus collector implementation for opencensus proxy export.
  233. Prometheus collector requires to implement `collect` which is
  234. invoked whenever Prometheus queries the endpoint.
  235. The class is thread-safe.
  236. Args:
  237. namespace: Prometheus namespace.
  238. """
  239. # -- Protect `self._components` --
  240. self._components_lock = threading.Lock()
  241. # -- Timeout until the component is marked as stale --
  242. # Once the component is considered as stale,
  243. # the metrics from that worker won't be exported.
  244. self._component_timeout_s = component_timeout_s
  245. # -- Prometheus namespace --
  246. self._namespace = namespace
  247. # -- Component that requests to proxy export metrics --
  248. # Component means core worker, raylet, and GCS.
  249. # component_id -> Components
  250. # For workers, they contain worker ids.
  251. # For other components (raylet, GCS),
  252. # they contain the global key `GLOBAL_COMPONENT_KEY`.
  253. self._components = {}
  254. # Whether we want to export counter as gauge.
  255. # This is for bug compatibility.
  256. # See https://github.com/ray-project/ray/pull/43795.
  257. self._export_counter_as_gauge = env_bool("RAY_EXPORT_COUNTER_AS_GAUGE", True)
  258. def record(self, metrics: List[Metric], worker_id_hex: str = None):
  259. """Record the metrics reported from the component that reports it.
  260. Args:
  261. metrics: A list of opencensus protobuf to proxy export metrics.
  262. worker_id_hex: A worker id that reports these metrics.
  263. If None, it means they are reported from Raylet or GCS.
  264. """
  265. key = GLOBAL_COMPONENT_KEY if not worker_id_hex else worker_id_hex
  266. with self._components_lock:
  267. if key not in self._components:
  268. self._components[key] = Component(key)
  269. self._components[key].record(metrics)
  270. def clean_stale_components(self):
  271. """Clean up stale components.
  272. Stale means the component is dead or unresponsive.
  273. Stale components won't be reported to Prometheus anymore.
  274. """
  275. with self._components_lock:
  276. stale_components = []
  277. stale_component_ids = []
  278. for id, component in self._components.items():
  279. elapsed = time.monotonic() - component.last_reported_time
  280. if elapsed > self._component_timeout_s:
  281. stale_component_ids.append(id)
  282. logger.info(
  283. "Metrics from a worker ({}) is cleaned up due to "
  284. "timeout. Time since last report {}s".format(id, elapsed)
  285. )
  286. for id in stale_component_ids:
  287. stale_components.append(self._components.pop(id))
  288. return stale_components
  289. # TODO(sang): add start and end timestamp
  290. def to_prometheus_metrics(
  291. self,
  292. metric_name: str,
  293. metric_description: str,
  294. label_keys: List[str],
  295. metric_units: str,
  296. label_values: Tuple[tag_value_module.TagValue],
  297. agg_data: Any,
  298. metrics_map: Dict[str, List[PrometheusMetric]],
  299. ) -> None:
  300. """to_metric translate the data that OpenCensus create
  301. to Prometheus format, using Prometheus Metric object.
  302. This method is from Opencensus Prometheus Exporter.
  303. Args:
  304. metric_name: Name of the metric.
  305. metric_description: Description of the metric.
  306. label_keys: The fixed label keys of the metric.
  307. metric_units: Units of the metric.
  308. label_values: The values of `label_keys`.
  309. agg_data: `opencensus.stats.aggregation_data.AggregationData` object.
  310. Aggregated data that needs to be converted as Prometheus samples
  311. metrics_map: The converted metric is added to this map.
  312. """
  313. assert self._components_lock.locked()
  314. metric_name = f"{self._namespace}_{metric_name}"
  315. assert len(label_values) == len(label_keys), (label_values, label_keys)
  316. # Prometheus requires that all tag values be strings hence
  317. # the need to cast none to the empty string before exporting. See
  318. # https://github.com/census-instrumentation/opencensus-python/issues/480
  319. label_values = [tv if tv else "" for tv in label_values]
  320. if isinstance(agg_data, CountAggregationData):
  321. metrics = metrics_map.get(metric_name)
  322. if not metrics:
  323. metric = CounterMetricFamily(
  324. name=metric_name,
  325. documentation=metric_description,
  326. unit=metric_units,
  327. labels=label_keys,
  328. )
  329. metrics = [metric]
  330. metrics_map[metric_name] = metrics
  331. metrics[0].add_metric(labels=label_values, value=agg_data.count_data)
  332. return
  333. if isinstance(agg_data, SumAggregationData):
  334. # This should be emitted as prometheus counter
  335. # but we used to emit it as prometheus gauge.
  336. # To keep the backward compatibility
  337. # (changing from counter to gauge changes the metric name
  338. # since prometheus client will add "_total" suffix to counter
  339. # per OpenMetrics specification),
  340. # we now emit both counter and gauge and in the
  341. # next major Ray release (3.0) we can stop emitting gauge.
  342. # This leaves people enough time to migrate their dashboards.
  343. # See https://github.com/ray-project/ray/pull/43795.
  344. metrics = metrics_map.get(metric_name)
  345. if not metrics:
  346. metric = CounterMetricFamily(
  347. name=metric_name,
  348. documentation=metric_description,
  349. labels=label_keys,
  350. )
  351. metrics = [metric]
  352. metrics_map[metric_name] = metrics
  353. metrics[0].add_metric(labels=label_values, value=agg_data.sum_data)
  354. if not self._export_counter_as_gauge:
  355. pass
  356. elif metric_name.endswith("_total"):
  357. # In this case, we only need to emit prometheus counter
  358. # since for metric name already ends with _total suffix
  359. # prometheus client won't change it
  360. # so there is no backward compatibility issue.
  361. # See https://prometheus.github.io/client_python/instrumenting/counter/
  362. pass
  363. else:
  364. if len(metrics) == 1:
  365. metric = GaugeMetricFamily(
  366. name=metric_name,
  367. documentation=(
  368. f"(DEPRECATED, use {metric_name}_total metric instead) "
  369. f"{metric_description}"
  370. ),
  371. labels=label_keys,
  372. )
  373. metrics.append(metric)
  374. assert len(metrics) == 2
  375. metrics[1].add_metric(labels=label_values, value=agg_data.sum_data)
  376. return
  377. elif isinstance(agg_data, DistributionAggregationData):
  378. assert agg_data.bounds == sorted(agg_data.bounds)
  379. # buckets are a list of buckets. Each bucket is another list with
  380. # a pair of bucket name and value, or a triple of bucket name,
  381. # value, and exemplar. buckets need to be in order.
  382. buckets = []
  383. cum_count = 0 # Prometheus buckets expect cumulative count.
  384. for ii, bound in enumerate(agg_data.bounds):
  385. cum_count += agg_data.counts_per_bucket[ii]
  386. bucket = [str(bound), cum_count]
  387. buckets.append(bucket)
  388. # Prometheus requires buckets to be sorted, and +Inf present.
  389. # In OpenCensus we don't have +Inf in the bucket bonds so need to
  390. # append it here.
  391. buckets.append(["+Inf", agg_data.count_data])
  392. metrics = metrics_map.get(metric_name)
  393. if not metrics:
  394. metric = HistogramMetricFamily(
  395. name=metric_name,
  396. documentation=metric_description,
  397. labels=label_keys,
  398. )
  399. metrics = [metric]
  400. metrics_map[metric_name] = metrics
  401. metrics[0].add_metric(
  402. labels=label_values,
  403. buckets=buckets,
  404. sum_value=agg_data.sum,
  405. )
  406. return
  407. elif isinstance(agg_data, LastValueAggregationData):
  408. metrics = metrics_map.get(metric_name)
  409. if not metrics:
  410. metric = GaugeMetricFamily(
  411. name=metric_name,
  412. documentation=metric_description,
  413. labels=label_keys,
  414. )
  415. metrics = [metric]
  416. metrics_map[metric_name] = metrics
  417. metrics[0].add_metric(labels=label_values, value=agg_data.value)
  418. return
  419. else:
  420. raise ValueError(f"unsupported aggregation type {type(agg_data)}")
  421. def _aggregate_metric_data(
  422. self,
  423. datas: List[
  424. Union[LastValueAggregationData, CountAggregationData, SumAggregationData]
  425. ],
  426. ) -> Union[LastValueAggregationData, CountAggregationData, SumAggregationData]:
  427. assert len(datas) > 0
  428. sample = datas[0]
  429. if isinstance(sample, LastValueAggregationData):
  430. return LastValueAggregationData(
  431. ValueDouble, sum([data.value for data in datas])
  432. )
  433. if isinstance(sample, CountAggregationData):
  434. return CountAggregationData(sum([data.count_data for data in datas]))
  435. if isinstance(sample, SumAggregationData):
  436. return SumAggregationData(
  437. ValueDouble, sum([data.sum_data for data in datas])
  438. )
  439. raise ValueError(
  440. f"Unsupported aggregation type {type(sample)}. "
  441. "Supported types are "
  442. f"{CountAggregationData}, {LastValueAggregationData}, {SumAggregationData}."
  443. f"Got {datas}."
  444. )
  445. def _aggregate_with_recommended_cardinality(
  446. self,
  447. per_worker_metrics: List[OpencensusProxyMetric],
  448. ) -> List[OpencensusProxyMetric]:
  449. """Collect per-worker metrics, aggregate them into per-node metrics and convert
  450. them to Prometheus format.
  451. Args:
  452. per_worker_metrics: A list of per-worker metrics for the same metric name.
  453. Returns:
  454. A list of per-node metrics for the same metric name, with the high
  455. cardinality labels removed and the values aggregated.
  456. """
  457. metric = next(iter(per_worker_metrics), None)
  458. if not metric or WORKER_ID_TAG_KEY not in metric.label_keys:
  459. # No high cardinality labels, return the original metrics.
  460. return per_worker_metrics
  461. worker_id_label_index = metric.label_keys.index(WORKER_ID_TAG_KEY)
  462. # map from the tuple of label values without worker_id to the list of per worker
  463. # task metrics
  464. label_value_to_data: Dict[
  465. Tuple,
  466. List[
  467. Union[
  468. LastValueAggregationData,
  469. CountAggregationData,
  470. SumAggregationData,
  471. ]
  472. ],
  473. ] = defaultdict(list)
  474. for metric in per_worker_metrics:
  475. for label_values, data in metric.data.items():
  476. # remove the worker_id from the label values
  477. label_value_to_data[
  478. label_values[:worker_id_label_index]
  479. + label_values[worker_id_label_index + 1 :]
  480. ].append(data)
  481. aggregated_metric = OpencensusProxyMetric(
  482. name=metric.name,
  483. desc=metric.desc,
  484. unit=metric.unit,
  485. # remove the worker_id from the label keys
  486. label_keys=metric.label_keys[:worker_id_label_index]
  487. + metric.label_keys[worker_id_label_index + 1 :],
  488. )
  489. for label_values, datas in label_value_to_data.items():
  490. aggregated_metric.add_data(
  491. label_values,
  492. self._aggregate_metric_data(datas),
  493. )
  494. return [aggregated_metric]
  495. def collect(self): # pragma: NO COVER
  496. """Collect fetches the statistics from OpenCensus
  497. and delivers them as Prometheus Metrics.
  498. Collect is invoked every time a prometheus.Gatherer is run
  499. for example when the HTTP endpoint is invoked by Prometheus.
  500. This method is required as a Prometheus Collector.
  501. """
  502. with self._components_lock:
  503. # First construct the list of opencensus metrics to be converted to
  504. # prometheus metrics. For LEGACY cardinality level, this comprises all
  505. # metrics from all components. For RECOMMENDED cardinality level, we need
  506. # to remove the high cardinality labels and aggreate the component metrics.
  507. open_cencus_metrics: List[OpencensusProxyMetric] = []
  508. # The metrics that need to be aggregated with recommended cardinality. Key
  509. # is the metric name and value is the list of per-worker metrics.
  510. to_lower_cardinality: Dict[str, List[OpencensusProxyMetric]] = defaultdict(
  511. list
  512. )
  513. cardinality_level = MetricCardinality.get_cardinality_level()
  514. for component in self._components.values():
  515. for metric in component.metrics.values():
  516. if (
  517. cardinality_level == MetricCardinality.RECOMMENDED
  518. and not metric.is_distribution_aggregation_data()
  519. ):
  520. # We reduce the cardinality for all metrics except for histogram
  521. # metrics. The aggregation of histogram metrics from worker
  522. # level to node level is not well defined. In addition, we
  523. # currently have very few histogram metrics in Ray
  524. # so the impact of them is negligible.
  525. to_lower_cardinality[metric.name].append(metric)
  526. else:
  527. open_cencus_metrics.append(metric)
  528. for per_worker_metrics in to_lower_cardinality.values():
  529. open_cencus_metrics.extend(
  530. self._aggregate_with_recommended_cardinality(
  531. per_worker_metrics,
  532. )
  533. )
  534. prometheus_metrics_map = {}
  535. for metric in open_cencus_metrics:
  536. for label_values, data in metric.data.items():
  537. self.to_prometheus_metrics(
  538. metric.name,
  539. metric.desc,
  540. metric.label_keys,
  541. metric.unit,
  542. label_values,
  543. data,
  544. prometheus_metrics_map,
  545. )
  546. for metrics in prometheus_metrics_map.values():
  547. for metric in metrics:
  548. yield metric
  549. class MetricsAgent:
  550. def __init__(
  551. self,
  552. view_manager: ViewManager,
  553. stats_recorder: StatsRecorder,
  554. stats_exporter: StatsExporter = None,
  555. ):
  556. """A class to record and export metrics.
  557. The class exports metrics in 2 different ways.
  558. - Directly record and export metrics using OpenCensus.
  559. - Proxy metrics from other core components
  560. (e.g., raylet, GCS, core workers).
  561. This class is thread-safe.
  562. """
  563. # Lock required because gRPC server uses
  564. # multiple threads to process requests.
  565. self._lock = threading.Lock()
  566. #
  567. # Opencensus components to record metrics.
  568. #
  569. # Managing views to export metrics
  570. # If the stats_exporter is None, we disable all metrics export.
  571. self.view_manager = view_manager
  572. # A class that's used to record metrics
  573. # emitted from the current process.
  574. self.stats_recorder = stats_recorder
  575. # A class to export metrics.
  576. self.stats_exporter = stats_exporter
  577. # -- A Prometheus custom collector to proxy export metrics --
  578. # `None` if the prometheus server is not started.
  579. self.proxy_exporter_collector = None
  580. if self.stats_exporter is None:
  581. # If the exporter is not given,
  582. # we disable metrics collection.
  583. self.view_manager = None
  584. else:
  585. self.view_manager.register_exporter(stats_exporter)
  586. self.proxy_exporter_collector = OpenCensusProxyCollector(
  587. self.stats_exporter.options.namespace,
  588. component_timeout_s=int(os.getenv(RAY_WORKER_TIMEOUT_S, 120)),
  589. )
  590. # Registered view names.
  591. self._registered_views: Set[str] = set()
  592. def record_and_export(self, records: List[Record], global_tags=None):
  593. """Directly record and export stats from the same process."""
  594. global_tags = global_tags or {}
  595. with self._lock:
  596. if not self.view_manager:
  597. return
  598. for record in records:
  599. gauge = record.gauge
  600. value = record.value
  601. tags = record.tags
  602. try:
  603. self._record_gauge(gauge, value, {**tags, **global_tags})
  604. except Exception as e:
  605. logger.error(
  606. f"Failed to record metric {gauge.name} with value {value} with tags {tags!r} and global tags {global_tags!r} due to: {e!r}"
  607. )
  608. def _record_gauge(self, gauge: Gauge, value: float, tags: dict):
  609. if gauge.name not in self._registered_views:
  610. self.view_manager.register_view(gauge.view)
  611. self._registered_views.add(gauge.name)
  612. measurement_map = self.stats_recorder.new_measurement_map()
  613. tag_map = tag_map_module.TagMap()
  614. for key, tag_val in tags.items():
  615. try:
  616. tag_key = tag_key_module.TagKey(key)
  617. except ValueError as e:
  618. logger.error(
  619. f"Failed to create tag key {key} for metric {gauge.name} due to: {e!r}"
  620. )
  621. raise e
  622. try:
  623. tag_value = tag_value_module.TagValue(tag_val)
  624. except ValueError as e:
  625. logger.error(
  626. f"Failed to create tag value {tag_val} for key {key} for metric {gauge.name} due to: {e!r}"
  627. )
  628. raise e
  629. tag_map.insert(tag_key, tag_value)
  630. measurement_map.measure_float_put(gauge.measure, value)
  631. # NOTE: When we record this metric, timestamp will be renewed.
  632. measurement_map.record(tag_map)
  633. def proxy_export_metrics(self, metrics: List[Metric], worker_id_hex: str = None):
  634. """Proxy export metrics specified by a Opencensus Protobuf.
  635. This API is used to export metrics emitted from
  636. core components.
  637. Args:
  638. metrics: A list of protobuf Metric defined from OpenCensus.
  639. worker_id_hex: The worker ID it proxies metrics export. None
  640. if the metric is not from a worker (i.e., raylet, GCS).
  641. """
  642. with self._lock:
  643. if not self.view_manager:
  644. return
  645. self._proxy_export_metrics(metrics, worker_id_hex)
  646. def _proxy_export_metrics(self, metrics: List[Metric], worker_id_hex: str = None):
  647. self.proxy_exporter_collector.record(metrics, worker_id_hex)
  648. def clean_all_dead_worker_metrics(self):
  649. """Clean dead worker's metrics.
  650. Worker metrics are cleaned up and won't be exported once
  651. it is considered as dead.
  652. This method has to be periodically called by a caller.
  653. """
  654. with self._lock:
  655. if not self.view_manager:
  656. return
  657. self.proxy_exporter_collector.clean_stale_components()
  658. class PrometheusServiceDiscoveryWriter(threading.Thread):
  659. """A class to support Prometheus service discovery.
  660. It supports file-based service discovery. Checkout
  661. https://prometheus.io/docs/guides/file-sd/ for more details.
  662. Args:
  663. gcs_address: Gcs address for this cluster.
  664. temp_dir: Temporary directory used by
  665. Ray to store logs and metadata.
  666. """
  667. def __init__(self, gcs_address, temp_dir):
  668. gcs_client_options = ray._raylet.GcsClientOptions.create(
  669. gcs_address, None, allow_cluster_id_nil=True, fetch_cluster_id_if_nil=False
  670. )
  671. self.gcs_address = gcs_address
  672. ray._private.state.state._initialize_global_state(gcs_client_options)
  673. self.temp_dir = temp_dir
  674. self.default_service_discovery_flush_period = 5
  675. # The last service discovery content that PrometheusServiceDiscoveryWriter has seen
  676. self.latest_service_discovery_content = []
  677. self._content_lock = threading.RLock()
  678. super().__init__()
  679. def get_latest_service_discovery_content(self):
  680. """Return the latest stored service discovery content."""
  681. with self._content_lock:
  682. return self.latest_service_discovery_content
  683. def get_file_discovery_content(self):
  684. """Return the content for Prometheus service discovery."""
  685. nodes = ray.nodes()
  686. metrics_export_addresses = [
  687. build_address(node["NodeManagerAddress"], node["MetricsExportPort"])
  688. for node in nodes
  689. if node["alive"] is True
  690. ]
  691. gcs_client = GcsClient(address=self.gcs_address)
  692. autoscaler_addr = gcs_client.internal_kv_get(b"AutoscalerMetricsAddress", None)
  693. if autoscaler_addr:
  694. metrics_export_addresses.append(autoscaler_addr.decode("utf-8"))
  695. dashboard_addr = gcs_client.internal_kv_get(b"DashboardMetricsAddress", None)
  696. if dashboard_addr:
  697. metrics_export_addresses.append(dashboard_addr.decode("utf-8"))
  698. content = [{"labels": {"job": "ray"}, "targets": metrics_export_addresses}]
  699. with self._content_lock:
  700. self.latest_service_discovery_content = content
  701. return json.dumps(content)
  702. def write(self):
  703. # Write a file based on https://prometheus.io/docs/guides/file-sd/
  704. # Write should be atomic. Otherwise, Prometheus raises an error that
  705. # json file format is invalid because it reads a file when
  706. # file is re-written. Note that Prometheus still works although we
  707. # have this error.
  708. temp_file_name = self.get_temp_file_name()
  709. with open(temp_file_name, "w") as json_file:
  710. json_file.write(self.get_file_discovery_content())
  711. # NOTE: os.replace is atomic on both Linux and Windows, so we won't
  712. # have race condition reading this file.
  713. os.replace(temp_file_name, self.get_target_file_name())
  714. def get_target_file_name(self):
  715. return os.path.join(
  716. self.temp_dir, ray._private.ray_constants.PROMETHEUS_SERVICE_DISCOVERY_FILE
  717. )
  718. def get_temp_file_name(self):
  719. return os.path.join(
  720. self.temp_dir,
  721. "{}_{}".format(
  722. "tmp", ray._private.ray_constants.PROMETHEUS_SERVICE_DISCOVERY_FILE
  723. ),
  724. )
  725. def run(self):
  726. while True:
  727. # This thread won't be broken by exceptions.
  728. try:
  729. self.write()
  730. except Exception as e:
  731. logger.warning(
  732. "Writing a service discovery file, {},"
  733. "failed.".format(self.get_target_file_name())
  734. )
  735. logger.warning(traceback.format_exc())
  736. logger.warning(f"Error message: {e}")
  737. time.sleep(self.default_service_discovery_flush_period)