multiprocess.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. from collections import defaultdict
  2. import glob
  3. import json
  4. import os
  5. import warnings
  6. from .metrics import Gauge
  7. from .metrics_core import Metric
  8. from .mmap_dict import MmapedDict
  9. from .samples import Sample
  10. from .utils import floatToGoString
  11. try: # Python3
  12. FileNotFoundError
  13. except NameError: # Python >= 2.5
  14. FileNotFoundError = IOError
  15. class MultiProcessCollector:
  16. """Collector for files for multi-process mode."""
  17. def __init__(self, registry, path=None):
  18. if path is None:
  19. # This deprecation warning can go away in a few releases when removing the compatibility
  20. if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
  21. os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
  22. warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
  23. path = os.environ.get('PROMETHEUS_MULTIPROC_DIR')
  24. if not path or not os.path.isdir(path):
  25. raise ValueError('env PROMETHEUS_MULTIPROC_DIR is not set or not a directory')
  26. self._path = path
  27. if registry:
  28. registry.register(self)
  29. @staticmethod
  30. def merge(files, accumulate=True):
  31. """Merge metrics from given mmap files.
  32. By default, histograms are accumulated, as per prometheus wire format.
  33. But if writing the merged data back to mmap files, use
  34. accumulate=False to avoid compound accumulation.
  35. """
  36. metrics = MultiProcessCollector._read_metrics(files)
  37. return MultiProcessCollector._accumulate_metrics(metrics, accumulate)
  38. @staticmethod
  39. def _read_metrics(files):
  40. metrics = {}
  41. key_cache = {}
  42. def _parse_key(key):
  43. val = key_cache.get(key)
  44. if not val:
  45. metric_name, name, labels, help_text = json.loads(key)
  46. labels_key = tuple(sorted(labels.items()))
  47. val = key_cache[key] = (metric_name, name, labels, labels_key, help_text)
  48. return val
  49. for f in files:
  50. parts = os.path.basename(f).split('_')
  51. typ = parts[0]
  52. try:
  53. file_values = MmapedDict.read_all_values_from_file(f)
  54. except FileNotFoundError:
  55. if typ == 'gauge' and parts[1].startswith('live'):
  56. # Files for 'live*' gauges can be deleted between the glob of collect
  57. # and now (via a mark_process_dead call) so don't fail if
  58. # the file is missing
  59. continue
  60. raise
  61. for key, value, timestamp, _ in file_values:
  62. metric_name, name, labels, labels_key, help_text = _parse_key(key)
  63. metric = metrics.get(metric_name)
  64. if metric is None:
  65. metric = Metric(metric_name, help_text, typ)
  66. metrics[metric_name] = metric
  67. if typ == 'gauge':
  68. pid = parts[2][:-3]
  69. metric._multiprocess_mode = parts[1]
  70. metric.add_sample(name, labels_key + (('pid', pid),), value, timestamp)
  71. else:
  72. # The duplicates and labels are fixed in the next for.
  73. metric.add_sample(name, labels_key, value)
  74. return metrics
  75. @staticmethod
  76. def _accumulate_metrics(metrics, accumulate):
  77. for metric in metrics.values():
  78. samples = defaultdict(lambda: defaultdict(float))
  79. sample_timestamps = defaultdict(lambda: defaultdict(float))
  80. buckets = defaultdict(lambda: defaultdict(float))
  81. for s in metric.samples:
  82. name, labels, value, timestamp, exemplar, native_histogram_value = s
  83. if (
  84. metric.type == 'gauge'
  85. and metric._multiprocess_mode in (
  86. 'min', 'livemin',
  87. 'max', 'livemax',
  88. 'sum', 'livesum',
  89. 'mostrecent', 'livemostrecent',
  90. )
  91. ):
  92. labels = tuple(l for l in labels if l[0] != 'pid')
  93. if metric.type == 'gauge':
  94. if metric._multiprocess_mode in ('min', 'livemin'):
  95. current = samples[labels].setdefault((name, labels), value)
  96. if value < current:
  97. samples[labels][(name, labels)] = value
  98. elif metric._multiprocess_mode in ('max', 'livemax'):
  99. current = samples[labels].setdefault((name, labels), value)
  100. if value > current:
  101. samples[labels][(name, labels)] = value
  102. elif metric._multiprocess_mode in ('sum', 'livesum'):
  103. samples[labels][(name, labels)] += value
  104. elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'):
  105. current_timestamp = sample_timestamps[labels][name]
  106. timestamp = float(timestamp or 0)
  107. if current_timestamp < timestamp:
  108. samples[labels][(name, labels)] = value
  109. sample_timestamps[labels][name] = timestamp
  110. else: # all/liveall
  111. samples[labels][(name, labels)] = value
  112. elif metric.type == 'histogram':
  113. # A for loop with early exit is faster than a genexpr
  114. # or a listcomp that ends up building unnecessary things
  115. for l in labels:
  116. if l[0] == 'le':
  117. bucket_value = float(l[1])
  118. # _bucket
  119. without_le = tuple(l for l in labels if l[0] != 'le')
  120. buckets[without_le][bucket_value] += value
  121. break
  122. else: # did not find the `le` key
  123. # _sum/_count
  124. samples[labels][(name, labels)] += value
  125. else:
  126. # Counter and Summary.
  127. samples[labels][(name, labels)] += value
  128. # Accumulate bucket values.
  129. if metric.type == 'histogram':
  130. for labels, values in buckets.items():
  131. acc = 0.0
  132. for bucket, value in sorted(values.items()):
  133. sample_key = (
  134. metric.name + '_bucket',
  135. labels + (('le', floatToGoString(bucket)),),
  136. )
  137. if accumulate:
  138. acc += value
  139. samples[labels][sample_key] = acc
  140. else:
  141. samples[labels][sample_key] = value
  142. if accumulate:
  143. samples[labels][(metric.name + '_count', labels)] = acc
  144. # Convert to correct sample format.
  145. metric.samples = []
  146. for _, samples_by_labels in samples.items():
  147. for (name_, labels), value in samples_by_labels.items():
  148. metric.samples.append(Sample(name_, dict(labels), value))
  149. return metrics.values()
  150. def collect(self):
  151. files = glob.glob(os.path.join(self._path, '*.db'))
  152. return self.merge(files, accumulate=True)
  153. _LIVE_GAUGE_MULTIPROCESS_MODES = {m for m in Gauge._MULTIPROC_MODES if m.startswith('live')}
  154. def mark_process_dead(pid, path=None):
  155. """Do bookkeeping for when one process dies in a multi-process setup."""
  156. if path is None:
  157. path = os.environ.get('PROMETHEUS_MULTIPROC_DIR', os.environ.get('prometheus_multiproc_dir'))
  158. for mode in _LIVE_GAUGE_MULTIPROCESS_MODES:
  159. for f in glob.glob(os.path.join(path, f'gauge_{mode}_{pid}.db')):
  160. os.remove(f)