values.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import os
  2. from threading import Lock
  3. import warnings
  4. from .mmap_dict import mmap_key, MmapedDict
  5. class MutexValue:
  6. """A float protected by a mutex."""
  7. _multiprocess = False
  8. def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, **kwargs):
  9. self._value = 0.0
  10. self._exemplar = None
  11. self._lock = Lock()
  12. def inc(self, amount):
  13. with self._lock:
  14. self._value += amount
  15. def set(self, value, timestamp=None):
  16. with self._lock:
  17. self._value = value
  18. def set_exemplar(self, exemplar):
  19. with self._lock:
  20. self._exemplar = exemplar
  21. def get(self):
  22. with self._lock:
  23. return self._value
  24. def get_exemplar(self):
  25. with self._lock:
  26. return self._exemplar
  27. def MultiProcessValue(process_identifier=os.getpid):
  28. """Returns a MmapedValue class based on a process_identifier function.
  29. The 'process_identifier' function MUST comply with this simple rule:
  30. when called in simultaneously running processes it MUST return distinct values.
  31. Using a different function than the default 'os.getpid' is at your own risk.
  32. """
  33. files = {}
  34. values = []
  35. pid = {'value': process_identifier()}
  36. # Use a single global lock when in multi-processing mode
  37. # as we presume this means there is no threading going on.
  38. # This avoids the need to also have mutexes in __MmapDict.
  39. lock = Lock()
  40. class MmapedValue:
  41. """A float protected by a mutex backed by a per-process mmaped file."""
  42. _multiprocess = True
  43. def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', **kwargs):
  44. self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode
  45. # This deprecation warning can go away in a few releases when removing the compatibility
  46. if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
  47. os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
  48. warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
  49. with lock:
  50. self.__check_for_pid_change()
  51. self.__reset()
  52. values.append(self)
  53. def __reset(self):
  54. typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode = self._params
  55. if typ == 'gauge':
  56. file_prefix = typ + '_' + multiprocess_mode
  57. else:
  58. file_prefix = typ
  59. if file_prefix not in files:
  60. filename = os.path.join(
  61. os.environ.get('PROMETHEUS_MULTIPROC_DIR'),
  62. '{}_{}.db'.format(file_prefix, pid['value']))
  63. files[file_prefix] = MmapedDict(filename)
  64. self._file = files[file_prefix]
  65. self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
  66. self._value, self._timestamp = self._file.read_value(self._key)
  67. def __check_for_pid_change(self):
  68. actual_pid = process_identifier()
  69. if pid['value'] != actual_pid:
  70. pid['value'] = actual_pid
  71. # There has been a fork(), reset all the values.
  72. for f in files.values():
  73. f.close()
  74. files.clear()
  75. for value in values:
  76. value.__reset()
  77. def inc(self, amount):
  78. with lock:
  79. self.__check_for_pid_change()
  80. self._value += amount
  81. self._timestamp = 0.0
  82. self._file.write_value(self._key, self._value, self._timestamp)
  83. def set(self, value, timestamp=None):
  84. with lock:
  85. self.__check_for_pid_change()
  86. self._value = value
  87. self._timestamp = timestamp or 0.0
  88. self._file.write_value(self._key, self._value, self._timestamp)
  89. def set_exemplar(self, exemplar):
  90. # TODO: Implement exemplars for multiprocess mode.
  91. return
  92. def get(self):
  93. with lock:
  94. self.__check_for_pid_change()
  95. return self._value
  96. def get_exemplar(self):
  97. # TODO: Implement exemplars for multiprocess mode.
  98. return None
  99. return MmapedValue
  100. def get_value_class():
  101. # Should we enable multi-process mode?
  102. # This needs to be chosen before the first metric is constructed,
  103. # and as that may be in some arbitrary library the user/admin has
  104. # no control over we use an environment variable.
  105. if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
  106. return MultiProcessValue()
  107. else:
  108. return MutexValue
  109. ValueClass = get_value_class()