| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- import copy
- from threading import Lock
- from typing import Dict, Iterable, List, Optional, Protocol
- from .metrics_core import Metric
- class Collector(Protocol):
- def collect(self) -> Iterable[Metric]:
- """Collect metrics."""
- class _EmptyCollector:
- def collect(self) -> Iterable[Metric]:
- return []
- class CollectorRegistry:
- """Metric collector registry.
- Collectors must have a no-argument method 'collect' that returns a list of
- Metric objects. The returned metrics should be consistent with the Prometheus
- exposition formats.
- """
- def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None):
- self._collector_to_names: Dict[Collector, List[str]] = {}
- self._names_to_collectors: Dict[str, Collector] = {}
- self._auto_describe = auto_describe
- self._lock = Lock()
- self._target_info: Optional[Dict[str, str]] = {}
- self.set_target_info(target_info)
- def register(self, collector: Collector) -> None:
- """Add a collector to the registry."""
- with self._lock:
- names = self._get_names(collector)
- duplicates = set(self._names_to_collectors).intersection(names)
- if duplicates:
- raise ValueError(
- 'Duplicated timeseries in CollectorRegistry: {}'.format(
- duplicates))
- for name in names:
- self._names_to_collectors[name] = collector
- self._collector_to_names[collector] = names
- def unregister(self, collector: Collector) -> None:
- """Remove a collector from the registry."""
- with self._lock:
- for name in self._collector_to_names[collector]:
- del self._names_to_collectors[name]
- del self._collector_to_names[collector]
- def _get_names(self, collector):
- """Get names of timeseries the collector produces and clashes with."""
- desc_func = None
- # If there's a describe function, use it.
- try:
- desc_func = collector.describe
- except AttributeError:
- pass
- # Otherwise, if auto describe is enabled use the collect function.
- if not desc_func and self._auto_describe:
- desc_func = collector.collect
- if not desc_func:
- return []
- result = []
- type_suffixes = {
- 'counter': ['_total', '_created'],
- 'summary': ['_sum', '_count', '_created'],
- 'histogram': ['_bucket', '_sum', '_count', '_created'],
- 'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
- 'info': ['_info'],
- }
- for metric in desc_func():
- result.append(metric.name)
- for suffix in type_suffixes.get(metric.type, []):
- result.append(metric.name + suffix)
- return result
- def collect(self) -> Iterable[Metric]:
- """Yields metrics from the collectors in the registry."""
- collectors = None
- ti = None
- with self._lock:
- collectors = copy.copy(self._collector_to_names)
- if self._target_info:
- ti = self._target_info_metric()
- if ti:
- yield ti
- for collector in collectors:
- yield from collector.collect()
- def restricted_registry(self, names: Iterable[str]) -> "RestrictedRegistry":
- """Returns object that only collects some metrics.
- Returns an object which upon collect() will return
- only samples with the given names.
- Intended usage is:
- generate_latest(REGISTRY.restricted_registry(['a_timeseries']), escaping)
- Experimental."""
- names = set(names)
- return RestrictedRegistry(names, self)
- def set_target_info(self, labels: Optional[Dict[str, str]]) -> None:
- with self._lock:
- if labels:
- if not self._target_info and 'target_info' in self._names_to_collectors:
- raise ValueError('CollectorRegistry already contains a target_info metric')
- self._names_to_collectors['target_info'] = _EmptyCollector()
- elif self._target_info:
- self._names_to_collectors.pop('target_info', None)
- self._target_info = labels
- def get_target_info(self) -> Optional[Dict[str, str]]:
- with self._lock:
- return self._target_info
- def _target_info_metric(self):
- m = Metric('target', 'Target metadata', 'info')
- m.add_sample('target_info', self._target_info, 1)
- return m
- def get_sample_value(self, name: str, labels: Optional[Dict[str, str]] = None) -> Optional[float]:
- """Returns the sample value, or None if not found.
- This is inefficient, and intended only for use in unittests.
- """
- if labels is None:
- labels = {}
- for metric in self.collect():
- for s in metric.samples:
- if s.name == name and s.labels == labels:
- return s.value
- return None
- class RestrictedRegistry:
- def __init__(self, names: Iterable[str], registry: CollectorRegistry):
- self._name_set = set(names)
- self._registry = registry
- def collect(self) -> Iterable[Metric]:
- collectors = set()
- target_info_metric = None
- with self._registry._lock:
- if 'target_info' in self._name_set and self._registry._target_info:
- target_info_metric = self._registry._target_info_metric()
- for name in self._name_set:
- if name != 'target_info' and name in self._registry._names_to_collectors:
- collectors.add(self._registry._names_to_collectors[name])
- if target_info_metric:
- yield target_info_metric
- for collector in collectors:
- for metric in collector.collect():
- m = metric._restricted_metric(self._name_set)
- if m:
- yield m
- REGISTRY = CollectorRegistry(auto_describe=True)
|