| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802 |
- import os
- from threading import Lock
- import time
- import types
- from typing import (
- Any, Callable, Dict, Iterable, List, Literal, Optional, Sequence, Tuple,
- Type, TypeVar, Union,
- )
- import warnings
- from . import values # retain this import style for testability
- from .context_managers import ExceptionCounter, InprogressTracker, Timer
- from .metrics_core import Metric
- from .registry import Collector, CollectorRegistry, REGISTRY
- from .samples import Exemplar, Sample
- from .utils import floatToGoString, INF
- from .validation import (
- _validate_exemplar, _validate_labelnames, _validate_metric_name,
- )
- T = TypeVar('T', bound='MetricWrapperBase')
- F = TypeVar("F", bound=Callable[..., Any])
- def _build_full_name(metric_type, name, namespace, subsystem, unit):
- if not name:
- raise ValueError('Metric name should not be empty')
- full_name = ''
- if namespace:
- full_name += namespace + '_'
- if subsystem:
- full_name += subsystem + '_'
- full_name += name
- if metric_type == 'counter' and full_name.endswith('_total'):
- full_name = full_name[:-6] # Munge to OpenMetrics.
- if unit and not full_name.endswith("_" + unit):
- full_name += "_" + unit
- if unit and metric_type in ('info', 'stateset'):
- raise ValueError('Metric name is of a type that cannot have a unit: ' + full_name)
- return full_name
- def _get_use_created() -> bool:
- return os.environ.get("PROMETHEUS_DISABLE_CREATED_SERIES", 'False').lower() not in ('true', '1', 't')
- _use_created = _get_use_created()
- def disable_created_metrics():
- """Disable exporting _created metrics on counters, histograms, and summaries."""
- global _use_created
- _use_created = False
- def enable_created_metrics():
- """Enable exporting _created metrics on counters, histograms, and summaries."""
- global _use_created
- _use_created = True
- class MetricWrapperBase(Collector):
- _type: Optional[str] = None
- _reserved_labelnames: Sequence[str] = ()
- def _is_observable(self):
- # Whether this metric is observable, i.e.
- # * a metric without label names and values, or
- # * the child of a labelled metric.
- return not self._labelnames or (self._labelnames and self._labelvalues)
- def _raise_if_not_observable(self):
- # Functions that mutate the state of the metric, for example incrementing
- # a counter, will fail if the metric is not observable, because only if a
- # metric is observable will the value be initialized.
- if not self._is_observable():
- raise ValueError('%s metric is missing label values' % str(self._type))
- def _is_parent(self):
- return self._labelnames and not self._labelvalues
- def _get_metric(self):
- return Metric(self._name, self._documentation, self._type, self._unit)
- def describe(self) -> Iterable[Metric]:
- return [self._get_metric()]
- def collect(self) -> Iterable[Metric]:
- metric = self._get_metric()
- for suffix, labels, value, timestamp, exemplar, native_histogram_value in self._samples():
- metric.add_sample(self._name + suffix, labels, value, timestamp, exemplar, native_histogram_value)
- return [metric]
- def __str__(self) -> str:
- return f"{self._type}:{self._name}"
- def __repr__(self) -> str:
- metric_type = type(self)
- return f"{metric_type.__module__}.{metric_type.__name__}({self._name})"
- def __init__(self: T,
- name: str,
- documentation: str,
- labelnames: Iterable[str] = (),
- namespace: str = '',
- subsystem: str = '',
- unit: str = '',
- registry: Optional[CollectorRegistry] = REGISTRY,
- _labelvalues: Optional[Sequence[str]] = None,
- ) -> None:
- self._original_name = name
- self._namespace = namespace
- self._subsystem = subsystem
- self._name = _build_full_name(self._type, name, namespace, subsystem, unit)
- self._labelnames = _validate_labelnames(self, labelnames)
- self._labelvalues = tuple(_labelvalues or ())
- self._kwargs: Dict[str, Any] = {}
- self._documentation = documentation
- self._unit = unit
- _validate_metric_name(self._name)
- if self._is_parent():
- # Prepare the fields needed for child metrics.
- self._lock = Lock()
- self._metrics: Dict[Sequence[str], T] = {}
- if self._is_observable():
- self._metric_init()
- if not self._labelvalues:
- # Register the multi-wrapper parent metric, or if a label-less metric, the whole shebang.
- if registry:
- registry.register(self)
- def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T:
- """Return the child for the given labelset.
- All metrics can have labels, allowing grouping of related time series.
- Taking a counter as an example:
- from prometheus_client import Counter
- c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
- c.labels('get', '/').inc()
- c.labels('post', '/submit').inc()
- Labels can also be provided as keyword arguments:
- from prometheus_client import Counter
- c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
- c.labels(method='get', endpoint='/').inc()
- c.labels(method='post', endpoint='/submit').inc()
- See the best practices on [naming](http://prometheus.io/docs/practices/naming/)
- and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels).
- """
- if not self._labelnames:
- raise ValueError('No label names were set when constructing %s' % self)
- if self._labelvalues:
- raise ValueError('{} already has labels set ({}); can not chain calls to .labels()'.format(
- self,
- dict(zip(self._labelnames, self._labelvalues))
- ))
- if labelvalues and labelkwargs:
- raise ValueError("Can't pass both *args and **kwargs")
- if labelkwargs:
- if sorted(labelkwargs) != sorted(self._labelnames):
- raise ValueError('Incorrect label names')
- labelvalues = tuple(str(labelkwargs[l]) for l in self._labelnames)
- else:
- if len(labelvalues) != len(self._labelnames):
- raise ValueError('Incorrect label count')
- labelvalues = tuple(str(l) for l in labelvalues)
- with self._lock:
- if labelvalues not in self._metrics:
- original_name = getattr(self, '_original_name', self._name)
- namespace = getattr(self, '_namespace', '')
- subsystem = getattr(self, '_subsystem', '')
- unit = getattr(self, '_unit', '')
- child_kwargs = dict(self._kwargs) if self._kwargs else {}
- for k in ('namespace', 'subsystem', 'unit'):
- child_kwargs.pop(k, None)
- self._metrics[labelvalues] = self.__class__(
- original_name,
- documentation=self._documentation,
- labelnames=self._labelnames,
- namespace=namespace,
- subsystem=subsystem,
- unit=unit,
- _labelvalues=labelvalues,
- **child_kwargs
- )
- return self._metrics[labelvalues]
- def remove(self, *labelvalues: Any) -> None:
- if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
- warnings.warn(
- "Removal of labels has not been implemented in multi-process mode yet.",
- UserWarning)
- if not self._labelnames:
- raise ValueError('No label names were set when constructing %s' % self)
- """Remove the given labelset from the metric."""
- if len(labelvalues) != len(self._labelnames):
- raise ValueError('Incorrect label count (expected %d, got %s)' % (len(self._labelnames), labelvalues))
- labelvalues = tuple(str(l) for l in labelvalues)
- with self._lock:
- if labelvalues in self._metrics:
- del self._metrics[labelvalues]
- def remove_by_labels(self, labels: dict[str, str]) -> None:
- """Remove all series whose labelset partially matches the given labels."""
- if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
- warnings.warn(
- "Removal of labels has not been implemented in multi-process mode yet.",
- UserWarning
- )
- if not self._labelnames:
- raise ValueError('No label names were set when constructing %s' % self)
-
- if not isinstance(labels, dict):
- raise TypeError("labels must be a dict of {label_name: label_value}")
-
- if not labels:
- return # no operation
-
- invalid = [k for k in labels.keys() if k not in self._labelnames]
- if invalid:
- raise ValueError(
- 'Unknown label names: %s; expected %s' % (invalid, self._labelnames)
- )
-
- pos_filter = {self._labelnames.index(k): str(v) for k, v in labels.items()}
- with self._lock:
- # list(...) to avoid "dictionary changed size during iteration"
- for lv in list(self._metrics.keys()):
- if all(lv[pos] == want for pos, want in pos_filter.items()):
- # pop with default avoids KeyError if concurrently removed
- self._metrics.pop(lv, None)
-
- def clear(self) -> None:
- """Remove all labelsets from the metric"""
- if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
- warnings.warn(
- "Clearing labels has not been implemented in multi-process mode yet",
- UserWarning)
- with self._lock:
- self._metrics = {}
- def _samples(self) -> Iterable[Sample]:
- if self._is_parent():
- return self._multi_samples()
- else:
- return self._child_samples()
- def _multi_samples(self) -> Iterable[Sample]:
- with self._lock:
- metrics = self._metrics.copy()
- for labels, metric in metrics.items():
- series_labels = list(zip(self._labelnames, labels))
- for suffix, sample_labels, value, timestamp, exemplar, native_histogram_value in metric._samples():
- yield Sample(suffix, dict(series_labels + list(sample_labels.items())), value, timestamp, exemplar, native_histogram_value)
- def _child_samples(self) -> Iterable[Sample]: # pragma: no cover
- raise NotImplementedError('_child_samples() must be implemented by %r' % self)
- def _metric_init(self): # pragma: no cover
- """
- Initialize the metric object as a child, i.e. when it has labels (if any) set.
- This is factored as a separate function to allow for deferred initialization.
- """
- raise NotImplementedError('_metric_init() must be implemented by %r' % self)
- class Counter(MetricWrapperBase):
- """A Counter tracks counts of events or running totals.
- Example use cases for Counters:
- - Number of requests processed
- - Number of items that were inserted into a queue
- - Total amount of data that a system has processed
- Counters can only go up (and be reset when the process restarts). If your use case can go down,
- you should use a Gauge instead.
- An example for a Counter:
- from prometheus_client import Counter
- c = Counter('my_failures_total', 'Description of counter')
- c.inc() # Increment by 1
- c.inc(1.6) # Increment by given value
- There are utilities to count exceptions raised:
- @c.count_exceptions()
- def f():
- pass
- with c.count_exceptions():
- pass
- # Count only one type of exception
- with c.count_exceptions(ValueError):
- pass
-
- You can also reset the counter to zero in case your logical "process" restarts
- without restarting the actual python process.
- c.reset()
- """
- _type = 'counter'
- def _metric_init(self) -> None:
- self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames,
- self._labelvalues, self._documentation)
- self._created = time.time()
- def inc(self, amount: float = 1, exemplar: Optional[Dict[str, str]] = None) -> None:
- """Increment counter by the given amount."""
- self._raise_if_not_observable()
- if amount < 0:
- raise ValueError('Counters can only be incremented by non-negative amounts.')
- self._value.inc(amount)
- if exemplar:
- _validate_exemplar(exemplar)
- self._value.set_exemplar(Exemplar(exemplar, amount, time.time()))
- def reset(self) -> None:
- """Reset the counter to zero. Use this when a logical process restarts without restarting the actual python process."""
- self._value.set(0)
- self._created = time.time()
- def count_exceptions(self, exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception) -> ExceptionCounter:
- """Count exceptions in a block of code or function.
- Can be used as a function decorator or context manager.
- Increments the counter when an exception of the given
- type is raised up out of the code.
- """
- self._raise_if_not_observable()
- return ExceptionCounter(self, exception)
- def _child_samples(self) -> Iterable[Sample]:
- sample = Sample('_total', {}, self._value.get(), None, self._value.get_exemplar())
- if _use_created:
- return (
- sample,
- Sample('_created', {}, self._created, None, None)
- )
- return (sample,)
- class Gauge(MetricWrapperBase):
- """Gauge metric, to report instantaneous values.
- Examples of Gauges include:
- - Inprogress requests
- - Number of items in a queue
- - Free memory
- - Total memory
- - Temperature
- Gauges can go both up and down.
- from prometheus_client import Gauge
- g = Gauge('my_inprogress_requests', 'Description of gauge')
- g.inc() # Increment by 1
- g.dec(10) # Decrement by given value
- g.set(4.2) # Set to a given value
- There are utilities for common use cases:
- g.set_to_current_time() # Set to current unixtime
- # Increment when entered, decrement when exited.
- @g.track_inprogress()
- def f():
- pass
- with g.track_inprogress():
- pass
- A Gauge can also take its value from a callback:
- d = Gauge('data_objects', 'Number of objects')
- my_dict = {}
- d.set_function(lambda: len(my_dict))
- """
- _type = 'gauge'
- _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'))
- _MOST_RECENT_MODES = frozenset(('mostrecent', 'livemostrecent'))
- def __init__(self,
- name: str,
- documentation: str,
- labelnames: Iterable[str] = (),
- namespace: str = '',
- subsystem: str = '',
- unit: str = '',
- registry: Optional[CollectorRegistry] = REGISTRY,
- _labelvalues: Optional[Sequence[str]] = None,
- multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all',
- ):
- self._multiprocess_mode = multiprocess_mode
- if multiprocess_mode not in self._MULTIPROC_MODES:
- raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode)
- super().__init__(
- name=name,
- documentation=documentation,
- labelnames=labelnames,
- namespace=namespace,
- subsystem=subsystem,
- unit=unit,
- registry=registry,
- _labelvalues=_labelvalues,
- )
- self._kwargs['multiprocess_mode'] = self._multiprocess_mode
- self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES
- def _metric_init(self) -> None:
- self._value = values.ValueClass(
- self._type, self._name, self._name, self._labelnames, self._labelvalues,
- self._documentation, multiprocess_mode=self._multiprocess_mode
- )
- def inc(self, amount: float = 1) -> None:
- """Increment gauge by the given amount."""
- if self._is_most_recent:
- raise RuntimeError("inc must not be used with the mostrecent mode")
- self._raise_if_not_observable()
- self._value.inc(amount)
- def dec(self, amount: float = 1) -> None:
- """Decrement gauge by the given amount."""
- if self._is_most_recent:
- raise RuntimeError("dec must not be used with the mostrecent mode")
- self._raise_if_not_observable()
- self._value.inc(-amount)
- def set(self, value: float) -> None:
- """Set gauge to the given value."""
- self._raise_if_not_observable()
- if self._is_most_recent:
- self._value.set(float(value), timestamp=time.time())
- else:
- self._value.set(float(value))
- def set_to_current_time(self) -> None:
- """Set gauge to the current unixtime."""
- self.set(time.time())
- def track_inprogress(self) -> InprogressTracker:
- """Track inprogress blocks of code or functions.
- Can be used as a function decorator or context manager.
- Increments the gauge when the code is entered,
- and decrements when it is exited.
- """
- self._raise_if_not_observable()
- return InprogressTracker(self)
- def time(self) -> Timer:
- """Time a block of code or function, and set the duration in seconds.
- Can be used as a function decorator or context manager.
- """
- return Timer(self, 'set')
- def set_function(self, f: Callable[[], float]) -> None:
- """Call the provided function to return the Gauge value.
- The function must return a float, and may be called from
- multiple threads. All other methods of the Gauge become NOOPs.
- """
- self._raise_if_not_observable()
- def samples(_: Gauge) -> Iterable[Sample]:
- return (Sample('', {}, float(f()), None, None),)
- self._child_samples = types.MethodType(samples, self) # type: ignore
- def _child_samples(self) -> Iterable[Sample]:
- return (Sample('', {}, self._value.get(), None, None),)
- class Summary(MetricWrapperBase):
- """A Summary tracks the size and number of events.
- Example use cases for Summaries:
- - Response latency
- - Request size
- Example for a Summary:
- from prometheus_client import Summary
- s = Summary('request_size_bytes', 'Request size (bytes)')
- s.observe(512) # Observe 512 (bytes)
- Example for a Summary using time:
- from prometheus_client import Summary
- REQUEST_TIME = Summary('response_latency_seconds', 'Response latency (seconds)')
- @REQUEST_TIME.time()
- def create_response(request):
- '''A dummy function'''
- time.sleep(1)
- Example for using the same Summary object as a context manager:
- with REQUEST_TIME.time():
- pass # Logic to be timed
- """
- _type = 'summary'
- _reserved_labelnames = ['quantile']
- def _metric_init(self) -> None:
- self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames,
- self._labelvalues, self._documentation)
- self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation)
- self._created = time.time()
- def observe(self, amount: float) -> None:
- """Observe the given amount.
- The amount is usually positive or zero. Negative values are
- accepted but prevent current versions of Prometheus from
- properly detecting counter resets in the sum of
- observations. See
- https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations
- for details.
- """
- self._raise_if_not_observable()
- self._count.inc(1)
- self._sum.inc(amount)
- def time(self) -> Timer:
- """Time a block of code or function, and observe the duration in seconds.
- Can be used as a function decorator or context manager.
- """
- return Timer(self, 'observe')
- def _child_samples(self) -> Iterable[Sample]:
- samples = [
- Sample('_count', {}, self._count.get(), None, None),
- Sample('_sum', {}, self._sum.get(), None, None),
- ]
- if _use_created:
- samples.append(Sample('_created', {}, self._created, None, None))
- return tuple(samples)
- class Histogram(MetricWrapperBase):
- """A Histogram tracks the size and number of events in buckets.
- You can use Histograms for aggregatable calculation of quantiles.
- Example use cases:
- - Response latency
- - Request size
- Example for a Histogram:
- from prometheus_client import Histogram
- h = Histogram('request_size_bytes', 'Request size (bytes)')
- h.observe(512) # Observe 512 (bytes)
- Example for a Histogram using time:
- from prometheus_client import Histogram
- REQUEST_TIME = Histogram('response_latency_seconds', 'Response latency (seconds)')
- @REQUEST_TIME.time()
- def create_response(request):
- '''A dummy function'''
- time.sleep(1)
- Example of using the same Histogram object as a context manager:
- with REQUEST_TIME.time():
- pass # Logic to be timed
- The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds.
- They can be overridden by passing `buckets` keyword argument to `Histogram`.
- """
- _type = 'histogram'
- _reserved_labelnames = ['le']
- DEFAULT_BUCKETS = (.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, INF)
- def __init__(self,
- name: str,
- documentation: str,
- labelnames: Iterable[str] = (),
- namespace: str = '',
- subsystem: str = '',
- unit: str = '',
- registry: Optional[CollectorRegistry] = REGISTRY,
- _labelvalues: Optional[Sequence[str]] = None,
- buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS,
- ):
- self._prepare_buckets(buckets)
- super().__init__(
- name=name,
- documentation=documentation,
- labelnames=labelnames,
- namespace=namespace,
- subsystem=subsystem,
- unit=unit,
- registry=registry,
- _labelvalues=_labelvalues,
- )
- self._kwargs['buckets'] = buckets
- def _prepare_buckets(self, source_buckets: Sequence[Union[float, str]]) -> None:
- buckets = [float(b) for b in source_buckets]
- if buckets != sorted(buckets):
- # This is probably an error on the part of the user,
- # so raise rather than sorting for them.
- raise ValueError('Buckets not in sorted order')
- if buckets and buckets[-1] != INF:
- buckets.append(INF)
- if len(buckets) < 2:
- raise ValueError('Must have at least two buckets')
- self._upper_bounds = buckets
- def _metric_init(self) -> None:
- self._buckets: List[values.ValueClass] = []
- self._created = time.time()
- bucket_labelnames = self._labelnames + ('le',)
- self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation)
- for b in self._upper_bounds:
- self._buckets.append(values.ValueClass(
- self._type,
- self._name,
- self._name + '_bucket',
- bucket_labelnames,
- self._labelvalues + (floatToGoString(b),),
- self._documentation)
- )
- def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> None:
- """Observe the given amount.
- The amount is usually positive or zero. Negative values are
- accepted but prevent current versions of Prometheus from
- properly detecting counter resets in the sum of
- observations. See
- https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations
- for details.
- """
- self._raise_if_not_observable()
- self._sum.inc(amount)
- for i, bound in enumerate(self._upper_bounds):
- if amount <= bound:
- self._buckets[i].inc(1)
- if exemplar:
- _validate_exemplar(exemplar)
- self._buckets[i].set_exemplar(Exemplar(exemplar, amount, time.time()))
- break
- def time(self) -> Timer:
- """Time a block of code or function, and observe the duration in seconds.
- Can be used as a function decorator or context manager.
- """
- return Timer(self, 'observe')
- def _child_samples(self) -> Iterable[Sample]:
- samples = []
- acc = 0.0
- for i, bound in enumerate(self._upper_bounds):
- acc += self._buckets[i].get()
- samples.append(Sample('_bucket', {'le': floatToGoString(bound)}, acc, None, self._buckets[i].get_exemplar()))
- samples.append(Sample('_count', {}, acc, None, None))
- if self._upper_bounds[0] >= 0:
- samples.append(Sample('_sum', {}, self._sum.get(), None, None))
- if _use_created:
- samples.append(Sample('_created', {}, self._created, None, None))
- return tuple(samples)
- class Info(MetricWrapperBase):
- """Info metric, key-value pairs.
- Examples of Info include:
- - Build information
- - Version information
- - Potential target metadata
- Example usage:
- from prometheus_client import Info
- i = Info('my_build', 'Description of info')
- i.info({'version': '1.2.3', 'buildhost': 'foo@bar'})
- Info metrics do not work in multiprocess mode.
- """
- _type = 'info'
- def _metric_init(self):
- self._labelname_set = set(self._labelnames)
- self._lock = Lock()
- self._value = {}
- def info(self, val: Dict[str, str]) -> None:
- """Set info metric."""
- if self._labelname_set.intersection(val.keys()):
- raise ValueError('Overlapping labels for Info metric, metric: {} child: {}'.format(
- self._labelnames, val))
- if any(i is None for i in val.values()):
- raise ValueError('Label value cannot be None')
- with self._lock:
- self._value = dict(val)
- def _child_samples(self) -> Iterable[Sample]:
- with self._lock:
- return (Sample('_info', self._value, 1.0, None, None),)
- class Enum(MetricWrapperBase):
- """Enum metric, which of a set of states is true.
- Example usage:
- from prometheus_client import Enum
- e = Enum('task_state', 'Description of enum',
- states=['starting', 'running', 'stopped'])
- e.state('running')
- The first listed state will be the default.
- Enum metrics do not work in multiprocess mode.
- """
- _type = 'stateset'
- def __init__(self,
- name: str,
- documentation: str,
- labelnames: Sequence[str] = (),
- namespace: str = '',
- subsystem: str = '',
- unit: str = '',
- registry: Optional[CollectorRegistry] = REGISTRY,
- _labelvalues: Optional[Sequence[str]] = None,
- states: Optional[Sequence[str]] = None,
- ):
- super().__init__(
- name=name,
- documentation=documentation,
- labelnames=labelnames,
- namespace=namespace,
- subsystem=subsystem,
- unit=unit,
- registry=registry,
- _labelvalues=_labelvalues,
- )
- if name in labelnames:
- raise ValueError(f'Overlapping labels for Enum metric: {name}')
- if not states:
- raise ValueError(f'No states provided for Enum metric: {name}')
- self._kwargs['states'] = self._states = states
- def _metric_init(self) -> None:
- self._value = 0
- self._lock = Lock()
- def state(self, state: str) -> None:
- """Set enum metric state."""
- self._raise_if_not_observable()
- with self._lock:
- self._value = self._states.index(state)
- def _child_samples(self) -> Iterable[Sample]:
- with self._lock:
- return [
- Sample('', {self._name: s}, 1 if i == self._value else 0, None, None)
- for i, s
- in enumerate(self._states)
- ]
|