# NOTE: This file has been copied from OpenCensus Python exporter. # It is because OpenCensus Prometheus exporter hasn't released for a while # and the latest version has a compatibility issue with the latest OpenCensus # library. import logging import re import threading from wsgiref.simple_server import make_server from opencensus.common.transports import sync from opencensus.stats import aggregation_data as aggregation_data_module, base_exporter from prometheus_client import make_wsgi_app from prometheus_client.core import ( REGISTRY, CounterMetricFamily, GaugeMetricFamily, HistogramMetricFamily, UnknownMetricFamily, ) logger = logging.getLogger(__name__) class Options(object): """Options contains options for configuring the exporter. The address can be empty as the prometheus client will assume it's localhost :type namespace: str :param namespace: The prometheus namespace to be used. Defaults to ''. :type port: int :param port: The Prometheus port to be used. Defaults to 8000. :type address: str :param address: The Prometheus address to be used. Defaults to ''. :type registry: registry :param registry: The Prometheus address to be used. Defaults to ''. :type registry: :class:`~prometheus_client.core.CollectorRegistry` :param registry: A Prometheus collector registry instance. """ def __init__(self, namespace="", port=8000, address="", registry=REGISTRY): self._namespace = namespace self._registry = registry self._port = int(port) self._address = address @property def registry(self): """Prometheus Collector Registry instance""" return self._registry @property def namespace(self): """Prefix to be used with view name""" return self._namespace @property def port(self): """Port number to listen""" return self._port @property def address(self): """Endpoint address (default is localhost)""" return self._address class Collector(object): """Collector represents the Prometheus Collector object""" def __init__(self, options=Options(), view_name_to_data_map=None): if view_name_to_data_map is None: view_name_to_data_map = {} self._options = options self._registry = options.registry self._view_name_to_data_map = view_name_to_data_map self._registered_views = {} @property def options(self): """Options to be used to configure the exporter""" return self._options @property def registry(self): """Prometheus Collector Registry instance""" return self._registry @property def view_name_to_data_map(self): """Map with all view data objects that will be sent to Prometheus """ return self._view_name_to_data_map @property def registered_views(self): """Map with all registered views""" return self._registered_views def register_view(self, view): """register_view will create the needed structure in order to be able to sent all data to Prometheus """ v_name = get_view_name(self.options.namespace, view) if v_name not in self.registered_views: desc = { "name": v_name, "documentation": view.description, "labels": list(map(sanitize, view.columns)), "units": view.measure.unit, } self.registered_views[v_name] = desc def add_view_data(self, view_data): """Add view data object to be sent to server""" self.register_view(view_data.view) v_name = get_view_name(self.options.namespace, view_data.view) self.view_name_to_data_map[v_name] = view_data # TODO: add start and end timestamp def to_metric(self, desc, tag_values, agg_data, metrics_map): """to_metric translate the data that OpenCensus create to Prometheus format, using Prometheus Metric object :type desc: dict :param desc: The map that describes view definition :type tag_values: tuple of :class: `~opencensus.tags.tag_value.TagValue` :param object of opencensus.tags.tag_value.TagValue: TagValue object used as label values :type agg_data: object of :class: `~opencensus.stats.aggregation_data.AggregationData` :param object of opencensus.stats.aggregation_data.AggregationData: Aggregated data that needs to be converted as Prometheus samples :rtype: :class:`~prometheus_client.core.CounterMetricFamily` or :class:`~prometheus_client.core.HistogramMetricFamily` or :class:`~prometheus_client.core.UnknownMetricFamily` or :class:`~prometheus_client.core.GaugeMetricFamily` """ metric_name = desc["name"] metric_description = desc["documentation"] label_keys = desc["labels"] metric_units = desc["units"] assert len(tag_values) == len(label_keys), (tag_values, label_keys) # Prometheus requires that all tag values be strings hence # the need to cast none to the empty string before exporting. See # https://github.com/census-instrumentation/opencensus-python/issues/480 tag_values = [tv if tv else "" for tv in tag_values] if isinstance(agg_data, aggregation_data_module.CountAggregationData): metric = metrics_map.get(metric_name) if not metric: metric = CounterMetricFamily( name=metric_name, documentation=metric_description, unit=metric_units, labels=label_keys, ) metrics_map[metric_name] = metric metric.add_metric(labels=tag_values, value=agg_data.count_data) return elif isinstance(agg_data, aggregation_data_module.DistributionAggregationData): assert agg_data.bounds == sorted(agg_data.bounds) # buckets are a list of buckets. Each bucket is another list with # a pair of bucket name and value, or a triple of bucket name, # value, and exemplar. buckets need to be in order. buckets = [] cum_count = 0 # Prometheus buckets expect cumulative count. for ii, bound in enumerate(agg_data.bounds): cum_count += agg_data.counts_per_bucket[ii] bucket = [str(bound), cum_count] buckets.append(bucket) # Prometheus requires buckets to be sorted, and +Inf present. # In OpenCensus we don't have +Inf in the bucket bonds so need to # append it here. buckets.append(["+Inf", agg_data.count_data]) metric = metrics_map.get(metric_name) if not metric: metric = HistogramMetricFamily( name=metric_name, documentation=metric_description, labels=label_keys, ) metrics_map[metric_name] = metric metric.add_metric( labels=tag_values, buckets=buckets, sum_value=agg_data.sum, ) return elif isinstance(agg_data, aggregation_data_module.SumAggregationData): metric = metrics_map.get(metric_name) if not metric: metric = UnknownMetricFamily( name=metric_name, documentation=metric_description, labels=label_keys, ) metrics_map[metric_name] = metric metric.add_metric(labels=tag_values, value=agg_data.sum_data) return elif isinstance(agg_data, aggregation_data_module.LastValueAggregationData): metric = metrics_map.get(metric_name) if not metric: metric = GaugeMetricFamily( name=metric_name, documentation=metric_description, labels=label_keys, ) metrics_map[metric_name] = metric metric.add_metric(labels=tag_values, value=agg_data.value) return else: raise ValueError(f"unsupported aggregation type {type(agg_data)}") def collect(self): # pragma: NO COVER """Collect fetches the statistics from OpenCensus and delivers them as Prometheus Metrics. Collect is invoked every time a prometheus.Gatherer is run for example when the HTTP endpoint is invoked by Prometheus. """ # Make a shallow copy of self._view_name_to_data_map, to avoid seeing # concurrent modifications when iterating through the dictionary. metrics_map = {} for v_name, view_data in self._view_name_to_data_map.copy().items(): if v_name not in self.registered_views: continue desc = self.registered_views[v_name] for tag_values in view_data.tag_value_aggregation_data_map: agg_data = view_data.tag_value_aggregation_data_map[tag_values] self.to_metric(desc, tag_values, agg_data, metrics_map) for metric in metrics_map.values(): yield metric class PrometheusStatsExporter(base_exporter.StatsExporter): """Exporter exports stats to Prometheus, users need to register the exporter as an HTTP Handler to be able to export. :type options: :class:`~opencensus.ext.prometheus.stats_exporter.Options` :param options: An options object with the parameters to instantiate the prometheus exporter. :type gatherer: :class:`~prometheus_client.core.CollectorRegistry` :param gatherer: A Prometheus collector registry instance. :type transport: :class:`opencensus.common.transports.sync.SyncTransport` or :class:`opencensus.common.transports.async_.AsyncTransport` :param transport: An instance of a Transpor to send data with. :type collector: :class:`~opencensus.ext.prometheus.stats_exporter.Collector` :param collector: An instance of the Prometheus Collector object. """ def __init__( self, options, gatherer, transport=sync.SyncTransport, collector=Collector() ): self._options = options self._gatherer = gatherer self._collector = collector self._transport = transport(self) self._port = self.serve_http() REGISTRY.register(self._collector) @property def transport(self): """The transport way to be sent data to server (default is sync). """ return self._transport @property def collector(self): """Collector class instance to be used to communicate with Prometheus """ return self._collector @property def gatherer(self): """Prometheus Collector Registry instance""" return self._gatherer @property def options(self): """Options to be used to configure the exporter""" return self._options @property def port(self): """The port the HTTP server is listening on.""" return self._port def export(self, view_data): """export send the data to the transport class in order to be sent to Prometheus in a sync or async way. """ if view_data is not None: # pragma: NO COVER self.transport.export(view_data) def on_register_view(self, view): return NotImplementedError("Not supported by Prometheus") def emit(self, view_data): # pragma: NO COVER """Emit exports to the Prometheus if view data has one or more rows. Each OpenCensus AggregationData will be converted to corresponding Prometheus Metric: SumData will be converted to Untyped Metric, CountData will be a Counter Metric DistributionData will be a Histogram Metric. """ for v_data in view_data: if v_data.tag_value_aggregation_data_map is None: v_data.tag_value_aggregation_data_map = {} self.collector.add_view_data(v_data) def serve_http(self): """serve_http serves the Prometheus endpoint.""" address = self.options.address or "" httpd = make_server(address, self.options.port, make_wsgi_app()) t = threading.Thread(target=httpd.serve_forever) t.daemon = True t.start() # Return the actual port (in case port=0 was specified) return httpd.server_address[1] def new_stats_exporter(option): """new_stats_exporter returns an exporter that exports stats to Prometheus. """ if option.namespace == "": raise ValueError("Namespace can not be empty string.") collector = new_collector(option) exporter = PrometheusStatsExporter( options=option, gatherer=option.registry, collector=collector ) return exporter def new_collector(options): """new_collector should be used to create instance of Collector class in order to prevent the usage of constructor directly """ return Collector(options=options) def get_view_name(namespace, view): """create the name for the view""" name = "" if namespace != "": name = namespace + "_" return sanitize(name + view.name) _NON_LETTERS_NOR_DIGITS_RE = re.compile(r"[^\w]", re.UNICODE | re.IGNORECASE) def sanitize(key): """sanitize the given metric name or label according to Prometheus rule. Replace all characters other than [A-Za-z0-9_] with '_'. """ return _NON_LETTERS_NOR_DIGITS_RE.sub("_", key)