prometheus_exporter.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. # NOTE: This file has been copied from OpenCensus Python exporter.
  2. # It is because OpenCensus Prometheus exporter hasn't released for a while
  3. # and the latest version has a compatibility issue with the latest OpenCensus
  4. # library.
  5. import logging
  6. import re
  7. import threading
  8. from wsgiref.simple_server import make_server
  9. from opencensus.common.transports import sync
  10. from opencensus.stats import aggregation_data as aggregation_data_module, base_exporter
  11. from prometheus_client import make_wsgi_app
  12. from prometheus_client.core import (
  13. REGISTRY,
  14. CounterMetricFamily,
  15. GaugeMetricFamily,
  16. HistogramMetricFamily,
  17. UnknownMetricFamily,
  18. )
  19. logger = logging.getLogger(__name__)
  20. class Options(object):
  21. """Options contains options for configuring the exporter.
  22. The address can be empty as the prometheus client will
  23. assume it's localhost
  24. :type namespace: str
  25. :param namespace: The prometheus namespace to be used. Defaults to ''.
  26. :type port: int
  27. :param port: The Prometheus port to be used. Defaults to 8000.
  28. :type address: str
  29. :param address: The Prometheus address to be used. Defaults to ''.
  30. :type registry: registry
  31. :param registry: The Prometheus address to be used. Defaults to ''.
  32. :type registry: :class:`~prometheus_client.core.CollectorRegistry`
  33. :param registry: A Prometheus collector registry instance.
  34. """
  35. def __init__(self, namespace="", port=8000, address="", registry=REGISTRY):
  36. self._namespace = namespace
  37. self._registry = registry
  38. self._port = int(port)
  39. self._address = address
  40. @property
  41. def registry(self):
  42. """Prometheus Collector Registry instance"""
  43. return self._registry
  44. @property
  45. def namespace(self):
  46. """Prefix to be used with view name"""
  47. return self._namespace
  48. @property
  49. def port(self):
  50. """Port number to listen"""
  51. return self._port
  52. @property
  53. def address(self):
  54. """Endpoint address (default is localhost)"""
  55. return self._address
  56. class Collector(object):
  57. """Collector represents the Prometheus Collector object"""
  58. def __init__(self, options=Options(), view_name_to_data_map=None):
  59. if view_name_to_data_map is None:
  60. view_name_to_data_map = {}
  61. self._options = options
  62. self._registry = options.registry
  63. self._view_name_to_data_map = view_name_to_data_map
  64. self._registered_views = {}
  65. @property
  66. def options(self):
  67. """Options to be used to configure the exporter"""
  68. return self._options
  69. @property
  70. def registry(self):
  71. """Prometheus Collector Registry instance"""
  72. return self._registry
  73. @property
  74. def view_name_to_data_map(self):
  75. """Map with all view data objects
  76. that will be sent to Prometheus
  77. """
  78. return self._view_name_to_data_map
  79. @property
  80. def registered_views(self):
  81. """Map with all registered views"""
  82. return self._registered_views
  83. def register_view(self, view):
  84. """register_view will create the needed structure
  85. in order to be able to sent all data to Prometheus
  86. """
  87. v_name = get_view_name(self.options.namespace, view)
  88. if v_name not in self.registered_views:
  89. desc = {
  90. "name": v_name,
  91. "documentation": view.description,
  92. "labels": list(map(sanitize, view.columns)),
  93. "units": view.measure.unit,
  94. }
  95. self.registered_views[v_name] = desc
  96. def add_view_data(self, view_data):
  97. """Add view data object to be sent to server"""
  98. self.register_view(view_data.view)
  99. v_name = get_view_name(self.options.namespace, view_data.view)
  100. self.view_name_to_data_map[v_name] = view_data
  101. # TODO: add start and end timestamp
  102. def to_metric(self, desc, tag_values, agg_data, metrics_map):
  103. """to_metric translate the data that OpenCensus create
  104. to Prometheus format, using Prometheus Metric object
  105. :type desc: dict
  106. :param desc: The map that describes view definition
  107. :type tag_values: tuple of :class:
  108. `~opencensus.tags.tag_value.TagValue`
  109. :param object of opencensus.tags.tag_value.TagValue:
  110. TagValue object used as label values
  111. :type agg_data: object of :class:
  112. `~opencensus.stats.aggregation_data.AggregationData`
  113. :param object of opencensus.stats.aggregation_data.AggregationData:
  114. Aggregated data that needs to be converted as Prometheus samples
  115. :rtype: :class:`~prometheus_client.core.CounterMetricFamily` or
  116. :class:`~prometheus_client.core.HistogramMetricFamily` or
  117. :class:`~prometheus_client.core.UnknownMetricFamily` or
  118. :class:`~prometheus_client.core.GaugeMetricFamily`
  119. """
  120. metric_name = desc["name"]
  121. metric_description = desc["documentation"]
  122. label_keys = desc["labels"]
  123. metric_units = desc["units"]
  124. assert len(tag_values) == len(label_keys), (tag_values, label_keys)
  125. # Prometheus requires that all tag values be strings hence
  126. # the need to cast none to the empty string before exporting. See
  127. # https://github.com/census-instrumentation/opencensus-python/issues/480
  128. tag_values = [tv if tv else "" for tv in tag_values]
  129. if isinstance(agg_data, aggregation_data_module.CountAggregationData):
  130. metric = metrics_map.get(metric_name)
  131. if not metric:
  132. metric = CounterMetricFamily(
  133. name=metric_name,
  134. documentation=metric_description,
  135. unit=metric_units,
  136. labels=label_keys,
  137. )
  138. metrics_map[metric_name] = metric
  139. metric.add_metric(labels=tag_values, value=agg_data.count_data)
  140. return
  141. elif isinstance(agg_data, aggregation_data_module.DistributionAggregationData):
  142. assert agg_data.bounds == sorted(agg_data.bounds)
  143. # buckets are a list of buckets. Each bucket is another list with
  144. # a pair of bucket name and value, or a triple of bucket name,
  145. # value, and exemplar. buckets need to be in order.
  146. buckets = []
  147. cum_count = 0 # Prometheus buckets expect cumulative count.
  148. for ii, bound in enumerate(agg_data.bounds):
  149. cum_count += agg_data.counts_per_bucket[ii]
  150. bucket = [str(bound), cum_count]
  151. buckets.append(bucket)
  152. # Prometheus requires buckets to be sorted, and +Inf present.
  153. # In OpenCensus we don't have +Inf in the bucket bonds so need to
  154. # append it here.
  155. buckets.append(["+Inf", agg_data.count_data])
  156. metric = metrics_map.get(metric_name)
  157. if not metric:
  158. metric = HistogramMetricFamily(
  159. name=metric_name,
  160. documentation=metric_description,
  161. labels=label_keys,
  162. )
  163. metrics_map[metric_name] = metric
  164. metric.add_metric(
  165. labels=tag_values,
  166. buckets=buckets,
  167. sum_value=agg_data.sum,
  168. )
  169. return
  170. elif isinstance(agg_data, aggregation_data_module.SumAggregationData):
  171. metric = metrics_map.get(metric_name)
  172. if not metric:
  173. metric = UnknownMetricFamily(
  174. name=metric_name,
  175. documentation=metric_description,
  176. labels=label_keys,
  177. )
  178. metrics_map[metric_name] = metric
  179. metric.add_metric(labels=tag_values, value=agg_data.sum_data)
  180. return
  181. elif isinstance(agg_data, aggregation_data_module.LastValueAggregationData):
  182. metric = metrics_map.get(metric_name)
  183. if not metric:
  184. metric = GaugeMetricFamily(
  185. name=metric_name,
  186. documentation=metric_description,
  187. labels=label_keys,
  188. )
  189. metrics_map[metric_name] = metric
  190. metric.add_metric(labels=tag_values, value=agg_data.value)
  191. return
  192. else:
  193. raise ValueError(f"unsupported aggregation type {type(agg_data)}")
  194. def collect(self): # pragma: NO COVER
  195. """Collect fetches the statistics from OpenCensus
  196. and delivers them as Prometheus Metrics.
  197. Collect is invoked every time a prometheus.Gatherer is run
  198. for example when the HTTP endpoint is invoked by Prometheus.
  199. """
  200. # Make a shallow copy of self._view_name_to_data_map, to avoid seeing
  201. # concurrent modifications when iterating through the dictionary.
  202. metrics_map = {}
  203. for v_name, view_data in self._view_name_to_data_map.copy().items():
  204. if v_name not in self.registered_views:
  205. continue
  206. desc = self.registered_views[v_name]
  207. for tag_values in view_data.tag_value_aggregation_data_map:
  208. agg_data = view_data.tag_value_aggregation_data_map[tag_values]
  209. self.to_metric(desc, tag_values, agg_data, metrics_map)
  210. for metric in metrics_map.values():
  211. yield metric
  212. class PrometheusStatsExporter(base_exporter.StatsExporter):
  213. """Exporter exports stats to Prometheus, users need
  214. to register the exporter as an HTTP Handler to be
  215. able to export.
  216. :type options:
  217. :class:`~opencensus.ext.prometheus.stats_exporter.Options`
  218. :param options: An options object with the parameters to instantiate the
  219. prometheus exporter.
  220. :type gatherer: :class:`~prometheus_client.core.CollectorRegistry`
  221. :param gatherer: A Prometheus collector registry instance.
  222. :type transport:
  223. :class:`opencensus.common.transports.sync.SyncTransport` or
  224. :class:`opencensus.common.transports.async_.AsyncTransport`
  225. :param transport: An instance of a Transpor to send data with.
  226. :type collector:
  227. :class:`~opencensus.ext.prometheus.stats_exporter.Collector`
  228. :param collector: An instance of the Prometheus Collector object.
  229. """
  230. def __init__(
  231. self, options, gatherer, transport=sync.SyncTransport, collector=Collector()
  232. ):
  233. self._options = options
  234. self._gatherer = gatherer
  235. self._collector = collector
  236. self._transport = transport(self)
  237. self._port = self.serve_http()
  238. REGISTRY.register(self._collector)
  239. @property
  240. def transport(self):
  241. """The transport way to be sent data to server
  242. (default is sync).
  243. """
  244. return self._transport
  245. @property
  246. def collector(self):
  247. """Collector class instance to be used
  248. to communicate with Prometheus
  249. """
  250. return self._collector
  251. @property
  252. def gatherer(self):
  253. """Prometheus Collector Registry instance"""
  254. return self._gatherer
  255. @property
  256. def options(self):
  257. """Options to be used to configure the exporter"""
  258. return self._options
  259. @property
  260. def port(self):
  261. """The port the HTTP server is listening on."""
  262. return self._port
  263. def export(self, view_data):
  264. """export send the data to the transport class
  265. in order to be sent to Prometheus in a sync or async way.
  266. """
  267. if view_data is not None: # pragma: NO COVER
  268. self.transport.export(view_data)
  269. def on_register_view(self, view):
  270. return NotImplementedError("Not supported by Prometheus")
  271. def emit(self, view_data): # pragma: NO COVER
  272. """Emit exports to the Prometheus if view data has one or more rows.
  273. Each OpenCensus AggregationData will be converted to
  274. corresponding Prometheus Metric: SumData will be converted
  275. to Untyped Metric, CountData will be a Counter Metric
  276. DistributionData will be a Histogram Metric.
  277. """
  278. for v_data in view_data:
  279. if v_data.tag_value_aggregation_data_map is None:
  280. v_data.tag_value_aggregation_data_map = {}
  281. self.collector.add_view_data(v_data)
  282. def serve_http(self):
  283. """serve_http serves the Prometheus endpoint."""
  284. address = self.options.address or ""
  285. httpd = make_server(address, self.options.port, make_wsgi_app())
  286. t = threading.Thread(target=httpd.serve_forever)
  287. t.daemon = True
  288. t.start()
  289. # Return the actual port (in case port=0 was specified)
  290. return httpd.server_address[1]
  291. def new_stats_exporter(option):
  292. """new_stats_exporter returns an exporter
  293. that exports stats to Prometheus.
  294. """
  295. if option.namespace == "":
  296. raise ValueError("Namespace can not be empty string.")
  297. collector = new_collector(option)
  298. exporter = PrometheusStatsExporter(
  299. options=option, gatherer=option.registry, collector=collector
  300. )
  301. return exporter
  302. def new_collector(options):
  303. """new_collector should be used
  304. to create instance of Collector class in order to
  305. prevent the usage of constructor directly
  306. """
  307. return Collector(options=options)
  308. def get_view_name(namespace, view):
  309. """create the name for the view"""
  310. name = ""
  311. if namespace != "":
  312. name = namespace + "_"
  313. return sanitize(name + view.name)
  314. _NON_LETTERS_NOR_DIGITS_RE = re.compile(r"[^\w]", re.UNICODE | re.IGNORECASE)
  315. def sanitize(key):
  316. """sanitize the given metric name or label according to Prometheus rule.
  317. Replace all characters other than [A-Za-z0-9_] with '_'.
  318. """
  319. return _NON_LETTERS_NOR_DIGITS_RE.sub("_", key)