registry.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import copy
  2. from threading import Lock
  3. from typing import Dict, Iterable, List, Optional, Protocol
  4. from .metrics_core import Metric
  5. class Collector(Protocol):
  6. def collect(self) -> Iterable[Metric]:
  7. """Collect metrics."""
  8. class _EmptyCollector:
  9. def collect(self) -> Iterable[Metric]:
  10. return []
  11. class CollectorRegistry:
  12. """Metric collector registry.
  13. Collectors must have a no-argument method 'collect' that returns a list of
  14. Metric objects. The returned metrics should be consistent with the Prometheus
  15. exposition formats.
  16. """
  17. def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None):
  18. self._collector_to_names: Dict[Collector, List[str]] = {}
  19. self._names_to_collectors: Dict[str, Collector] = {}
  20. self._auto_describe = auto_describe
  21. self._lock = Lock()
  22. self._target_info: Optional[Dict[str, str]] = {}
  23. self.set_target_info(target_info)
  24. def register(self, collector: Collector) -> None:
  25. """Add a collector to the registry."""
  26. with self._lock:
  27. names = self._get_names(collector)
  28. duplicates = set(self._names_to_collectors).intersection(names)
  29. if duplicates:
  30. raise ValueError(
  31. 'Duplicated timeseries in CollectorRegistry: {}'.format(
  32. duplicates))
  33. for name in names:
  34. self._names_to_collectors[name] = collector
  35. self._collector_to_names[collector] = names
  36. def unregister(self, collector: Collector) -> None:
  37. """Remove a collector from the registry."""
  38. with self._lock:
  39. for name in self._collector_to_names[collector]:
  40. del self._names_to_collectors[name]
  41. del self._collector_to_names[collector]
  42. def _get_names(self, collector):
  43. """Get names of timeseries the collector produces and clashes with."""
  44. desc_func = None
  45. # If there's a describe function, use it.
  46. try:
  47. desc_func = collector.describe
  48. except AttributeError:
  49. pass
  50. # Otherwise, if auto describe is enabled use the collect function.
  51. if not desc_func and self._auto_describe:
  52. desc_func = collector.collect
  53. if not desc_func:
  54. return []
  55. result = []
  56. type_suffixes = {
  57. 'counter': ['_total', '_created'],
  58. 'summary': ['_sum', '_count', '_created'],
  59. 'histogram': ['_bucket', '_sum', '_count', '_created'],
  60. 'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
  61. 'info': ['_info'],
  62. }
  63. for metric in desc_func():
  64. result.append(metric.name)
  65. for suffix in type_suffixes.get(metric.type, []):
  66. result.append(metric.name + suffix)
  67. return result
  68. def collect(self) -> Iterable[Metric]:
  69. """Yields metrics from the collectors in the registry."""
  70. collectors = None
  71. ti = None
  72. with self._lock:
  73. collectors = copy.copy(self._collector_to_names)
  74. if self._target_info:
  75. ti = self._target_info_metric()
  76. if ti:
  77. yield ti
  78. for collector in collectors:
  79. yield from collector.collect()
  80. def restricted_registry(self, names: Iterable[str]) -> "RestrictedRegistry":
  81. """Returns object that only collects some metrics.
  82. Returns an object which upon collect() will return
  83. only samples with the given names.
  84. Intended usage is:
  85. generate_latest(REGISTRY.restricted_registry(['a_timeseries']), escaping)
  86. Experimental."""
  87. names = set(names)
  88. return RestrictedRegistry(names, self)
  89. def set_target_info(self, labels: Optional[Dict[str, str]]) -> None:
  90. with self._lock:
  91. if labels:
  92. if not self._target_info and 'target_info' in self._names_to_collectors:
  93. raise ValueError('CollectorRegistry already contains a target_info metric')
  94. self._names_to_collectors['target_info'] = _EmptyCollector()
  95. elif self._target_info:
  96. self._names_to_collectors.pop('target_info', None)
  97. self._target_info = labels
  98. def get_target_info(self) -> Optional[Dict[str, str]]:
  99. with self._lock:
  100. return self._target_info
  101. def _target_info_metric(self):
  102. m = Metric('target', 'Target metadata', 'info')
  103. m.add_sample('target_info', self._target_info, 1)
  104. return m
  105. def get_sample_value(self, name: str, labels: Optional[Dict[str, str]] = None) -> Optional[float]:
  106. """Returns the sample value, or None if not found.
  107. This is inefficient, and intended only for use in unittests.
  108. """
  109. if labels is None:
  110. labels = {}
  111. for metric in self.collect():
  112. for s in metric.samples:
  113. if s.name == name and s.labels == labels:
  114. return s.value
  115. return None
  116. class RestrictedRegistry:
  117. def __init__(self, names: Iterable[str], registry: CollectorRegistry):
  118. self._name_set = set(names)
  119. self._registry = registry
  120. def collect(self) -> Iterable[Metric]:
  121. collectors = set()
  122. target_info_metric = None
  123. with self._registry._lock:
  124. if 'target_info' in self._name_set and self._registry._target_info:
  125. target_info_metric = self._registry._target_info_metric()
  126. for name in self._name_set:
  127. if name != 'target_info' and name in self._registry._names_to_collectors:
  128. collectors.add(self._registry._names_to_collectors[name])
  129. if target_info_metric:
  130. yield target_info_metric
  131. for collector in collectors:
  132. for metric in collector.collect():
  133. m = metric._restricted_metric(self._name_set)
  134. if m:
  135. yield m
  136. REGISTRY = CollectorRegistry(auto_describe=True)