| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- # NOTE: This file has been copied from OpenCensus Python exporter.
- # It is because OpenCensus Prometheus exporter hasn't released for a while
- # and the latest version has a compatibility issue with the latest OpenCensus
- # library.
- import logging
- import re
- import threading
- from wsgiref.simple_server import make_server
- from opencensus.common.transports import sync
- from opencensus.stats import aggregation_data as aggregation_data_module, base_exporter
- from prometheus_client import make_wsgi_app
- from prometheus_client.core import (
- REGISTRY,
- CounterMetricFamily,
- GaugeMetricFamily,
- HistogramMetricFamily,
- UnknownMetricFamily,
- )
- logger = logging.getLogger(__name__)
- class Options(object):
- """Options contains options for configuring the exporter.
- The address can be empty as the prometheus client will
- assume it's localhost
- :type namespace: str
- :param namespace: The prometheus namespace to be used. Defaults to ''.
- :type port: int
- :param port: The Prometheus port to be used. Defaults to 8000.
- :type address: str
- :param address: The Prometheus address to be used. Defaults to ''.
- :type registry: registry
- :param registry: The Prometheus address to be used. Defaults to ''.
- :type registry: :class:`~prometheus_client.core.CollectorRegistry`
- :param registry: A Prometheus collector registry instance.
- """
- def __init__(self, namespace="", port=8000, address="", registry=REGISTRY):
- self._namespace = namespace
- self._registry = registry
- self._port = int(port)
- self._address = address
- @property
- def registry(self):
- """Prometheus Collector Registry instance"""
- return self._registry
- @property
- def namespace(self):
- """Prefix to be used with view name"""
- return self._namespace
- @property
- def port(self):
- """Port number to listen"""
- return self._port
- @property
- def address(self):
- """Endpoint address (default is localhost)"""
- return self._address
- class Collector(object):
- """Collector represents the Prometheus Collector object"""
- def __init__(self, options=Options(), view_name_to_data_map=None):
- if view_name_to_data_map is None:
- view_name_to_data_map = {}
- self._options = options
- self._registry = options.registry
- self._view_name_to_data_map = view_name_to_data_map
- self._registered_views = {}
- @property
- def options(self):
- """Options to be used to configure the exporter"""
- return self._options
- @property
- def registry(self):
- """Prometheus Collector Registry instance"""
- return self._registry
- @property
- def view_name_to_data_map(self):
- """Map with all view data objects
- that will be sent to Prometheus
- """
- return self._view_name_to_data_map
- @property
- def registered_views(self):
- """Map with all registered views"""
- return self._registered_views
- def register_view(self, view):
- """register_view will create the needed structure
- in order to be able to sent all data to Prometheus
- """
- v_name = get_view_name(self.options.namespace, view)
- if v_name not in self.registered_views:
- desc = {
- "name": v_name,
- "documentation": view.description,
- "labels": list(map(sanitize, view.columns)),
- "units": view.measure.unit,
- }
- self.registered_views[v_name] = desc
- def add_view_data(self, view_data):
- """Add view data object to be sent to server"""
- self.register_view(view_data.view)
- v_name = get_view_name(self.options.namespace, view_data.view)
- self.view_name_to_data_map[v_name] = view_data
- # TODO: add start and end timestamp
- def to_metric(self, desc, tag_values, agg_data, metrics_map):
- """to_metric translate the data that OpenCensus create
- to Prometheus format, using Prometheus Metric object
- :type desc: dict
- :param desc: The map that describes view definition
- :type tag_values: tuple of :class:
- `~opencensus.tags.tag_value.TagValue`
- :param object of opencensus.tags.tag_value.TagValue:
- TagValue object used as label values
- :type agg_data: object of :class:
- `~opencensus.stats.aggregation_data.AggregationData`
- :param object of opencensus.stats.aggregation_data.AggregationData:
- Aggregated data that needs to be converted as Prometheus samples
- :rtype: :class:`~prometheus_client.core.CounterMetricFamily` or
- :class:`~prometheus_client.core.HistogramMetricFamily` or
- :class:`~prometheus_client.core.UnknownMetricFamily` or
- :class:`~prometheus_client.core.GaugeMetricFamily`
- """
- metric_name = desc["name"]
- metric_description = desc["documentation"]
- label_keys = desc["labels"]
- metric_units = desc["units"]
- assert len(tag_values) == len(label_keys), (tag_values, label_keys)
- # Prometheus requires that all tag values be strings hence
- # the need to cast none to the empty string before exporting. See
- # https://github.com/census-instrumentation/opencensus-python/issues/480
- tag_values = [tv if tv else "" for tv in tag_values]
- if isinstance(agg_data, aggregation_data_module.CountAggregationData):
- metric = metrics_map.get(metric_name)
- if not metric:
- metric = CounterMetricFamily(
- name=metric_name,
- documentation=metric_description,
- unit=metric_units,
- labels=label_keys,
- )
- metrics_map[metric_name] = metric
- metric.add_metric(labels=tag_values, value=agg_data.count_data)
- return
- elif isinstance(agg_data, aggregation_data_module.DistributionAggregationData):
- assert agg_data.bounds == sorted(agg_data.bounds)
- # buckets are a list of buckets. Each bucket is another list with
- # a pair of bucket name and value, or a triple of bucket name,
- # value, and exemplar. buckets need to be in order.
- buckets = []
- cum_count = 0 # Prometheus buckets expect cumulative count.
- for ii, bound in enumerate(agg_data.bounds):
- cum_count += agg_data.counts_per_bucket[ii]
- bucket = [str(bound), cum_count]
- buckets.append(bucket)
- # Prometheus requires buckets to be sorted, and +Inf present.
- # In OpenCensus we don't have +Inf in the bucket bonds so need to
- # append it here.
- buckets.append(["+Inf", agg_data.count_data])
- metric = metrics_map.get(metric_name)
- if not metric:
- metric = HistogramMetricFamily(
- name=metric_name,
- documentation=metric_description,
- labels=label_keys,
- )
- metrics_map[metric_name] = metric
- metric.add_metric(
- labels=tag_values,
- buckets=buckets,
- sum_value=agg_data.sum,
- )
- return
- elif isinstance(agg_data, aggregation_data_module.SumAggregationData):
- metric = metrics_map.get(metric_name)
- if not metric:
- metric = UnknownMetricFamily(
- name=metric_name,
- documentation=metric_description,
- labels=label_keys,
- )
- metrics_map[metric_name] = metric
- metric.add_metric(labels=tag_values, value=agg_data.sum_data)
- return
- elif isinstance(agg_data, aggregation_data_module.LastValueAggregationData):
- metric = metrics_map.get(metric_name)
- if not metric:
- metric = GaugeMetricFamily(
- name=metric_name,
- documentation=metric_description,
- labels=label_keys,
- )
- metrics_map[metric_name] = metric
- metric.add_metric(labels=tag_values, value=agg_data.value)
- return
- else:
- raise ValueError(f"unsupported aggregation type {type(agg_data)}")
- def collect(self): # pragma: NO COVER
- """Collect fetches the statistics from OpenCensus
- and delivers them as Prometheus Metrics.
- Collect is invoked every time a prometheus.Gatherer is run
- for example when the HTTP endpoint is invoked by Prometheus.
- """
- # Make a shallow copy of self._view_name_to_data_map, to avoid seeing
- # concurrent modifications when iterating through the dictionary.
- metrics_map = {}
- for v_name, view_data in self._view_name_to_data_map.copy().items():
- if v_name not in self.registered_views:
- continue
- desc = self.registered_views[v_name]
- for tag_values in view_data.tag_value_aggregation_data_map:
- agg_data = view_data.tag_value_aggregation_data_map[tag_values]
- self.to_metric(desc, tag_values, agg_data, metrics_map)
- for metric in metrics_map.values():
- yield metric
- class PrometheusStatsExporter(base_exporter.StatsExporter):
- """Exporter exports stats to Prometheus, users need
- to register the exporter as an HTTP Handler to be
- able to export.
- :type options:
- :class:`~opencensus.ext.prometheus.stats_exporter.Options`
- :param options: An options object with the parameters to instantiate the
- prometheus exporter.
- :type gatherer: :class:`~prometheus_client.core.CollectorRegistry`
- :param gatherer: A Prometheus collector registry instance.
- :type transport:
- :class:`opencensus.common.transports.sync.SyncTransport` or
- :class:`opencensus.common.transports.async_.AsyncTransport`
- :param transport: An instance of a Transpor to send data with.
- :type collector:
- :class:`~opencensus.ext.prometheus.stats_exporter.Collector`
- :param collector: An instance of the Prometheus Collector object.
- """
- def __init__(
- self, options, gatherer, transport=sync.SyncTransport, collector=Collector()
- ):
- self._options = options
- self._gatherer = gatherer
- self._collector = collector
- self._transport = transport(self)
- self._port = self.serve_http()
- REGISTRY.register(self._collector)
- @property
- def transport(self):
- """The transport way to be sent data to server
- (default is sync).
- """
- return self._transport
- @property
- def collector(self):
- """Collector class instance to be used
- to communicate with Prometheus
- """
- return self._collector
- @property
- def gatherer(self):
- """Prometheus Collector Registry instance"""
- return self._gatherer
- @property
- def options(self):
- """Options to be used to configure the exporter"""
- return self._options
- @property
- def port(self):
- """The port the HTTP server is listening on."""
- return self._port
- def export(self, view_data):
- """export send the data to the transport class
- in order to be sent to Prometheus in a sync or async way.
- """
- if view_data is not None: # pragma: NO COVER
- self.transport.export(view_data)
- def on_register_view(self, view):
- return NotImplementedError("Not supported by Prometheus")
- def emit(self, view_data): # pragma: NO COVER
- """Emit exports to the Prometheus if view data has one or more rows.
- Each OpenCensus AggregationData will be converted to
- corresponding Prometheus Metric: SumData will be converted
- to Untyped Metric, CountData will be a Counter Metric
- DistributionData will be a Histogram Metric.
- """
- for v_data in view_data:
- if v_data.tag_value_aggregation_data_map is None:
- v_data.tag_value_aggregation_data_map = {}
- self.collector.add_view_data(v_data)
- def serve_http(self):
- """serve_http serves the Prometheus endpoint."""
- address = self.options.address or ""
- httpd = make_server(address, self.options.port, make_wsgi_app())
- t = threading.Thread(target=httpd.serve_forever)
- t.daemon = True
- t.start()
- # Return the actual port (in case port=0 was specified)
- return httpd.server_address[1]
- def new_stats_exporter(option):
- """new_stats_exporter returns an exporter
- that exports stats to Prometheus.
- """
- if option.namespace == "":
- raise ValueError("Namespace can not be empty string.")
- collector = new_collector(option)
- exporter = PrometheusStatsExporter(
- options=option, gatherer=option.registry, collector=collector
- )
- return exporter
- def new_collector(options):
- """new_collector should be used
- to create instance of Collector class in order to
- prevent the usage of constructor directly
- """
- return Collector(options=options)
- def get_view_name(namespace, view):
- """create the name for the view"""
- name = ""
- if namespace != "":
- name = namespace + "_"
- return sanitize(name + view.name)
- _NON_LETTERS_NOR_DIGITS_RE = re.compile(r"[^\w]", re.UNICODE | re.IGNORECASE)
- def sanitize(key):
- """sanitize the given metric name or label according to Prometheus rule.
- Replace all characters other than [A-Za-z0-9_] with '_'.
- """
- return _NON_LETTERS_NOR_DIGITS_RE.sub("_", key)
|