| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809 |
- import base64
- from contextlib import closing
- from functools import partial
- import gzip
- from http.server import BaseHTTPRequestHandler
- import os
- import socket
- from socketserver import ThreadingMixIn
- import ssl
- import sys
- import threading
- from typing import (
- Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union,
- )
- from urllib.error import HTTPError
- from urllib.parse import parse_qs, quote_plus, urlparse
- from urllib.request import (
- BaseHandler, build_opener, HTTPHandler, HTTPRedirectHandler, HTTPSHandler,
- Request,
- )
- from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer
- from .openmetrics import exposition as openmetrics
- from .registry import Collector, REGISTRY
- from .utils import floatToGoString, parse_version
- try:
- import snappy # type: ignore
- SNAPPY_AVAILABLE = True
- except ImportError:
- snappy = None # type: ignore
- SNAPPY_AVAILABLE = False
- __all__ = (
- 'CONTENT_TYPE_LATEST',
- 'CONTENT_TYPE_PLAIN_0_0_4',
- 'CONTENT_TYPE_PLAIN_1_0_0',
- 'delete_from_gateway',
- 'generate_latest',
- 'instance_ip_grouping_key',
- 'make_asgi_app',
- 'make_wsgi_app',
- 'MetricsHandler',
- 'push_to_gateway',
- 'pushadd_to_gateway',
- 'start_http_server',
- 'start_wsgi_server',
- 'write_to_textfile',
- )
- CONTENT_TYPE_PLAIN_0_0_4 = 'text/plain; version=0.0.4; charset=utf-8'
- """Content type of the compatibility format"""
- CONTENT_TYPE_PLAIN_1_0_0 = 'text/plain; version=1.0.0; charset=utf-8'
- """Content type of the latest format"""
- CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0
- CompressionType = Optional[Literal['gzip', 'snappy']]
- class _PrometheusRedirectHandler(HTTPRedirectHandler):
- """
- Allow additional methods (e.g. PUT) and data forwarding in redirects.
- Use of this class constitute a user's explicit agreement to the
- redirect responses the Prometheus client will receive when using it.
- You should only use this class if you control or otherwise trust the
- redirect behavior involved and are certain it is safe to full transfer
- the original request (method and data) to the redirected URL. For
- example, if you know there is a cosmetic URL redirect in front of a
- local deployment of a Prometheus server, and all redirects are safe,
- this is the class to use to handle redirects in that case.
- The standard HTTPRedirectHandler does not forward request data nor
- does it allow redirected PUT requests (which Prometheus uses for some
- operations, for example `push_to_gateway`) because these cannot
- generically guarantee no violations of HTTP RFC 2616 requirements for
- the user to explicitly confirm redirects that could have unexpected
- side effects (such as rendering a PUT request non-idempotent or
- creating multiple resources not named in the original request).
- """
- def redirect_request(self, req, fp, code, msg, headers, newurl):
- """
- Apply redirect logic to a request.
- See parent HTTPRedirectHandler.redirect_request for parameter info.
- If the redirect is disallowed, this raises the corresponding HTTP error.
- If the redirect can't be determined, return None to allow other handlers
- to try. If the redirect is allowed, return the new request.
- This method specialized for the case when (a) the user knows that the
- redirect will not cause unacceptable side effects for any request method,
- and (b) the user knows that any request data should be passed through to
- the redirect. If either condition is not met, this should not be used.
- """
- # note that requests being provided by a handler will use get_method to
- # indicate the method, by monkeypatching this, instead of setting the
- # Request object's method attribute.
- m = getattr(req, "method", req.get_method())
- if not (code in (301, 302, 303, 307) and m in ("GET", "HEAD")
- or code in (301, 302, 303) and m in ("POST", "PUT")):
- raise HTTPError(req.full_url, code, msg, headers, fp)
- new_request = Request(
- newurl.replace(' ', '%20'), # space escaping in new url if needed.
- headers=req.headers,
- origin_req_host=req.origin_req_host,
- unverifiable=True,
- data=req.data,
- )
- new_request.method = m
- return new_request
- def _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression):
- """Bake output for metrics output."""
- # Choose the correct plain text format of the output.
- encoder, content_type = choose_encoder(accept_header)
- if 'name[]' in params:
- registry = registry.restricted_registry(params['name[]'])
- output = encoder(registry)
- headers = [('Content-Type', content_type)]
- # If gzip encoding required, gzip the output.
- if not disable_compression and gzip_accepted(accept_encoding_header):
- output = gzip.compress(output)
- headers.append(('Content-Encoding', 'gzip'))
- return '200 OK', headers, output
- def make_wsgi_app(registry: Collector = REGISTRY, disable_compression: bool = False) -> Callable:
- """Create a WSGI app which serves the metrics from a registry."""
- def prometheus_app(environ, start_response):
- # Prepare parameters
- accept_header = environ.get('HTTP_ACCEPT')
- accept_encoding_header = environ.get('HTTP_ACCEPT_ENCODING')
- params = parse_qs(environ.get('QUERY_STRING', ''))
- method = environ['REQUEST_METHOD']
- if method == 'OPTIONS':
- status = '200 OK'
- headers = [('Allow', 'OPTIONS,GET')]
- output = b''
- elif method != 'GET':
- status = '405 Method Not Allowed'
- headers = [('Allow', 'OPTIONS,GET')]
- output = '# HTTP {}: {}; use OPTIONS or GET\n'.format(status, method).encode()
- elif environ['PATH_INFO'] == '/favicon.ico':
- # Serve empty response for browsers
- status = '200 OK'
- headers = []
- output = b''
- else:
- # Note: For backwards compatibility, the URI path for GET is not
- # constrained to the documented /metrics, but any path is allowed.
- # Bake output
- status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression)
- # Return output
- start_response(status, headers)
- return [output]
- return prometheus_app
- class _SilentHandler(WSGIRequestHandler):
- """WSGI handler that does not log requests."""
- def log_message(self, format, *args):
- """Log nothing."""
- class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
- """Thread per request HTTP server."""
- # Make worker threads "fire and forget". Beginning with Python 3.7 this
- # prevents a memory leak because ``ThreadingMixIn`` starts to gather all
- # non-daemon threads in a list in order to join on them at server close.
- daemon_threads = True
- def _get_best_family(address, port):
- """Automatically select address family depending on address"""
- # HTTPServer defaults to AF_INET, which will not start properly if
- # binding an ipv6 address is requested.
- # This function is based on what upstream python did for http.server
- # in https://github.com/python/cpython/pull/11767
- infos = socket.getaddrinfo(address, port, type=socket.SOCK_STREAM, flags=socket.AI_PASSIVE)
- family, _, _, _, sockaddr = next(iter(infos))
- return family, sockaddr[0]
- def _get_ssl_ctx(
- certfile: str,
- keyfile: str,
- protocol: int,
- cafile: Optional[str] = None,
- capath: Optional[str] = None,
- client_auth_required: bool = False,
- ) -> ssl.SSLContext:
- """Load context supports SSL."""
- ssl_cxt = ssl.SSLContext(protocol=protocol)
- if cafile is not None or capath is not None:
- try:
- ssl_cxt.load_verify_locations(cafile, capath)
- except IOError as exc:
- exc_type = type(exc)
- msg = str(exc)
- raise exc_type(f"Cannot load CA certificate chain from file "
- f"{cafile!r} or directory {capath!r}: {msg}")
- else:
- try:
- ssl_cxt.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH)
- except IOError as exc:
- exc_type = type(exc)
- msg = str(exc)
- raise exc_type(f"Cannot load default CA certificate chain: {msg}")
- if client_auth_required:
- ssl_cxt.verify_mode = ssl.CERT_REQUIRED
- try:
- ssl_cxt.load_cert_chain(certfile=certfile, keyfile=keyfile)
- except IOError as exc:
- exc_type = type(exc)
- msg = str(exc)
- raise exc_type(f"Cannot load server certificate file {certfile!r} or "
- f"its private key file {keyfile!r}: {msg}")
- return ssl_cxt
- def start_wsgi_server(
- port: int,
- addr: str = '0.0.0.0',
- registry: Collector = REGISTRY,
- certfile: Optional[str] = None,
- keyfile: Optional[str] = None,
- client_cafile: Optional[str] = None,
- client_capath: Optional[str] = None,
- protocol: int = ssl.PROTOCOL_TLS_SERVER,
- client_auth_required: bool = False,
- ) -> Tuple[WSGIServer, threading.Thread]:
- """Starts a WSGI server for prometheus metrics as a daemon thread."""
- class TmpServer(ThreadingWSGIServer):
- """Copy of ThreadingWSGIServer to update address_family locally"""
- TmpServer.address_family, addr = _get_best_family(addr, port)
- app = make_wsgi_app(registry)
- httpd = make_server(addr, port, app, TmpServer, handler_class=_SilentHandler)
- if certfile and keyfile:
- context = _get_ssl_ctx(certfile, keyfile, protocol, client_cafile, client_capath, client_auth_required)
- httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
- t = threading.Thread(target=httpd.serve_forever)
- t.daemon = True
- t.start()
- return httpd, t
- start_http_server = start_wsgi_server
- def generate_latest(registry: Collector = REGISTRY, escaping: str = openmetrics.UNDERSCORES) -> bytes:
- """
- Generates the exposition format using the basic Prometheus text format.
- Params:
- registry: Collector to export data from.
- escaping: Escaping scheme used for metric and label names.
- Returns: UTF-8 encoded string containing the metrics in text format.
- """
- def sample_line(samples):
- if samples.labels:
- labelstr = '{0}'.format(','.join(
- # Label values always support UTF-8
- ['{}="{}"'.format(
- openmetrics.escape_label_name(k, escaping), openmetrics._escape(v, openmetrics.ALLOWUTF8, False))
- for k, v in sorted(samples.labels.items())]))
- else:
- labelstr = ''
- timestamp = ''
- if samples.timestamp is not None:
- # Convert to milliseconds.
- timestamp = f' {int(float(samples.timestamp) * 1000):d}'
- if escaping != openmetrics.ALLOWUTF8 or openmetrics._is_valid_legacy_metric_name(samples.name):
- if labelstr:
- labelstr = '{{{0}}}'.format(labelstr)
- return f'{openmetrics.escape_metric_name(samples.name, escaping)}{labelstr} {floatToGoString(samples.value)}{timestamp}\n'
- maybe_comma = ''
- if labelstr:
- maybe_comma = ','
- return f'{{{openmetrics.escape_metric_name(samples.name, escaping)}{maybe_comma}{labelstr}}} {floatToGoString(samples.value)}{timestamp}\n'
- output = []
- for metric in registry.collect():
- try:
- mname = metric.name
- mtype = metric.type
- # Munging from OpenMetrics into Prometheus format.
- if mtype == 'counter':
- mname = mname + '_total'
- elif mtype == 'info':
- mname = mname + '_info'
- mtype = 'gauge'
- elif mtype == 'stateset':
- mtype = 'gauge'
- elif mtype == 'gaugehistogram':
- # A gauge histogram is really a gauge,
- # but this captures the structure better.
- mtype = 'histogram'
- elif mtype == 'unknown':
- mtype = 'untyped'
- output.append('# HELP {} {}\n'.format(
- openmetrics.escape_metric_name(mname, escaping), metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
- output.append(f'# TYPE {openmetrics.escape_metric_name(mname, escaping)} {mtype}\n')
- om_samples: Dict[str, List[str]] = {}
- for s in metric.samples:
- for suffix in ['_created', '_gsum', '_gcount']:
- if s.name == metric.name + suffix:
- # OpenMetrics specific sample, put in a gauge at the end.
- om_samples.setdefault(suffix, []).append(sample_line(s))
- break
- else:
- output.append(sample_line(s))
- except Exception as exception:
- exception.args = (exception.args or ('',)) + (metric,)
- raise
- for suffix, lines in sorted(om_samples.items()):
- output.append('# HELP {} {}\n'.format(openmetrics.escape_metric_name(metric.name + suffix, escaping),
- metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
- output.append(f'# TYPE {openmetrics.escape_metric_name(metric.name + suffix, escaping)} gauge\n')
- output.extend(lines)
- return ''.join(output).encode('utf-8')
- def choose_encoder(accept_header: str) -> Tuple[Callable[[Collector], bytes], str]:
- # Python client library accepts a narrower range of content-types than
- # Prometheus does.
- accept_header = accept_header or ''
- escaping = openmetrics.UNDERSCORES
- for accepted in accept_header.split(','):
- if accepted.split(';')[0].strip() == 'application/openmetrics-text':
- toks = accepted.split(';')
- version = _get_version(toks)
- escaping = _get_escaping(toks)
- # Only return an escaping header if we have a good version and
- # mimetype.
- if not version:
- return (partial(openmetrics.generate_latest, escaping=openmetrics.UNDERSCORES, version="1.0.0"), openmetrics.CONTENT_TYPE_LATEST)
- if version and parse_version(version) >= (1, 0, 0):
- return (partial(openmetrics.generate_latest, escaping=escaping, version=version),
- f'application/openmetrics-text; version={version}; charset=utf-8; escaping=' + str(escaping))
- elif accepted.split(';')[0].strip() == 'text/plain':
- toks = accepted.split(';')
- version = _get_version(toks)
- escaping = _get_escaping(toks)
- # Only return an escaping header if we have a good version and
- # mimetype.
- if version and parse_version(version) >= (1, 0, 0):
- return (partial(generate_latest, escaping=escaping),
- CONTENT_TYPE_LATEST + '; escaping=' + str(escaping))
- return generate_latest, CONTENT_TYPE_PLAIN_0_0_4
- def _get_version(accept_header: List[str]) -> str:
- """Return the version tag from the Accept header.
- If no version is specified, returns empty string."""
- for tok in accept_header:
- if '=' not in tok:
- continue
- key, value = tok.strip().split('=', 1)
- if key == 'version':
- return value
- return ""
- def _get_escaping(accept_header: List[str]) -> str:
- """Return the escaping scheme from the Accept header.
- If no escaping scheme is specified or the scheme is not one of the allowed
- strings, defaults to UNDERSCORES."""
- for tok in accept_header:
- if '=' not in tok:
- continue
- key, value = tok.strip().split('=', 1)
- if key != 'escaping':
- continue
- if value == openmetrics.ALLOWUTF8:
- return openmetrics.ALLOWUTF8
- elif value == openmetrics.UNDERSCORES:
- return openmetrics.UNDERSCORES
- elif value == openmetrics.DOTS:
- return openmetrics.DOTS
- elif value == openmetrics.VALUES:
- return openmetrics.VALUES
- else:
- return openmetrics.UNDERSCORES
- return openmetrics.UNDERSCORES
- def gzip_accepted(accept_encoding_header: str) -> bool:
- accept_encoding_header = accept_encoding_header or ''
- for accepted in accept_encoding_header.split(','):
- if accepted.split(';')[0].strip().lower() == 'gzip':
- return True
- return False
- class MetricsHandler(BaseHTTPRequestHandler):
- """HTTP handler that gives metrics from ``REGISTRY``."""
- registry: Collector = REGISTRY
- def do_GET(self) -> None:
- # Prepare parameters
- registry = self.registry
- accept_header = self.headers.get('Accept')
- accept_encoding_header = self.headers.get('Accept-Encoding')
- params = parse_qs(urlparse(self.path).query)
- # Bake output
- status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, False)
- # Return output
- self.send_response(int(status.split(' ')[0]))
- for header in headers:
- self.send_header(*header)
- self.end_headers()
- self.wfile.write(output)
- def log_message(self, format: str, *args: Any) -> None:
- """Log nothing."""
- @classmethod
- def factory(cls, registry: Collector) -> type:
- """Returns a dynamic MetricsHandler class tied
- to the passed registry.
- """
- # This implementation relies on MetricsHandler.registry
- # (defined above and defaulted to REGISTRY).
- # As we have unicode_literals, we need to create a str()
- # object for type().
- cls_name = str(cls.__name__)
- MyMetricsHandler = type(cls_name, (cls, object),
- {"registry": registry})
- return MyMetricsHandler
- def write_to_textfile(path: str, registry: Collector, escaping: str = openmetrics.ALLOWUTF8, tmpdir: Optional[str] = None) -> None:
- """Write metrics to the given path.
- This is intended for use with the Node exporter textfile collector.
- The path must end in .prom for the textfile collector to process it.
- An optional tmpdir parameter can be set to determine where the
- metrics will be temporarily written to. If not set, it will be in
- the same directory as the .prom file. If provided, the path MUST be
- on the same filesystem."""
- if tmpdir is not None:
- filename = os.path.basename(path)
- tmppath = f'{os.path.join(tmpdir, filename)}.{os.getpid()}.{threading.current_thread().ident}'
- else:
- tmppath = f'{path}.{os.getpid()}.{threading.current_thread().ident}'
- try:
- with open(tmppath, 'wb') as f:
- f.write(generate_latest(registry, escaping))
- # rename(2) is atomic but fails on Windows if the destination file exists
- if os.name == 'nt':
- os.replace(tmppath, path)
- else:
- os.rename(tmppath, path)
- except Exception:
- if os.path.exists(tmppath):
- os.remove(tmppath)
- raise
- def _make_handler(
- url: str,
- method: str,
- timeout: Optional[float],
- headers: Sequence[Tuple[str, str]],
- data: bytes,
- base_handler: Union[BaseHandler, type],
- ) -> Callable[[], None]:
- def handle() -> None:
- request = Request(url, data=data)
- request.get_method = lambda: method # type: ignore
- for k, v in headers:
- request.add_header(k, v)
- resp = build_opener(base_handler).open(request, timeout=timeout)
- if resp.code >= 400:
- raise OSError(f"error talking to pushgateway: {resp.code} {resp.msg}")
- return handle
- def default_handler(
- url: str,
- method: str,
- timeout: Optional[float],
- headers: List[Tuple[str, str]],
- data: bytes,
- ) -> Callable[[], None]:
- """Default handler that implements HTTP/HTTPS connections.
- Used by the push_to_gateway functions. Can be re-used by other handlers."""
- return _make_handler(url, method, timeout, headers, data, HTTPHandler)
- def passthrough_redirect_handler(
- url: str,
- method: str,
- timeout: Optional[float],
- headers: List[Tuple[str, str]],
- data: bytes,
- ) -> Callable[[], None]:
- """
- Handler that automatically trusts redirect responses for all HTTP methods.
- Augments standard HTTPRedirectHandler capability by permitting PUT requests,
- preserving the method upon redirect, and passing through all headers and
- data from the original request. Only use this handler if you control or
- trust the source of redirect responses you encounter when making requests
- via the Prometheus client. This handler will simply repeat the identical
- request, including same method and data, to the new redirect URL."""
- return _make_handler(url, method, timeout, headers, data, _PrometheusRedirectHandler)
- def basic_auth_handler(
- url: str,
- method: str,
- timeout: Optional[float],
- headers: List[Tuple[str, str]],
- data: bytes,
- username: Optional[str] = None,
- password: Optional[str] = None,
- ) -> Callable[[], None]:
- """Handler that implements HTTP/HTTPS connections with Basic Auth.
- Sets auth headers using supplied 'username' and 'password', if set.
- Used by the push_to_gateway functions. Can be re-used by other handlers."""
- def handle():
- """Handler that implements HTTP Basic Auth.
- """
- if username is not None and password is not None:
- auth_value = f'{username}:{password}'.encode()
- auth_token = base64.b64encode(auth_value)
- auth_header = b'Basic ' + auth_token
- headers.append(('Authorization', auth_header))
- default_handler(url, method, timeout, headers, data)()
- return handle
- def tls_auth_handler(
- url: str,
- method: str,
- timeout: Optional[float],
- headers: List[Tuple[str, str]],
- data: bytes,
- certfile: str,
- keyfile: str,
- cafile: Optional[str] = None,
- protocol: int = ssl.PROTOCOL_TLS_CLIENT,
- insecure_skip_verify: bool = False,
- ) -> Callable[[], None]:
- """Handler that implements an HTTPS connection with TLS Auth.
- The default protocol (ssl.PROTOCOL_TLS_CLIENT) will also enable
- ssl.CERT_REQUIRED and SSLContext.check_hostname by default. This can be
- disabled by setting insecure_skip_verify to True.
- Both this handler and the TLS feature on pushgateay are experimental."""
- context = ssl.SSLContext(protocol=protocol)
- if cafile is not None:
- context.load_verify_locations(cafile)
- else:
- context.load_default_certs()
- if insecure_skip_verify:
- context.check_hostname = False
- context.verify_mode = ssl.CERT_NONE
- context.load_cert_chain(certfile=certfile, keyfile=keyfile)
- handler = HTTPSHandler(context=context)
- return _make_handler(url, method, timeout, headers, data, handler)
- def push_to_gateway(
- gateway: str,
- job: str,
- registry: Collector,
- grouping_key: Optional[Dict[str, Any]] = None,
- timeout: Optional[float] = 30,
- handler: Callable = default_handler,
- compression: CompressionType = None,
- ) -> None:
- """Push metrics to the given pushgateway.
- `gateway` the url for your push gateway. Either of the form
- 'http://pushgateway.local', or 'pushgateway.local'.
- Scheme defaults to 'http' if none is provided
- `job` is the job label to be attached to all pushed metrics
- `registry` is a Collector, normally an instance of CollectorRegistry
- `grouping_key` please see the pushgateway documentation for details.
- Defaults to None
- `timeout` is how long push will attempt to connect before giving up.
- Defaults to 30s, can be set to None for no timeout.
- `handler` is an optional function which can be provided to perform
- requests to the 'gateway'.
- Defaults to None, in which case an http or https request
- will be carried out by a default handler.
- If not None, the argument must be a function which accepts
- the following arguments:
- url, method, timeout, headers, and content
- May be used to implement additional functionality not
- supported by the built-in default handler (such as SSL
- client certicates, and HTTP authentication mechanisms).
- 'url' is the URL for the request, the 'gateway' argument
- described earlier will form the basis of this URL.
- 'method' is the HTTP method which should be used when
- carrying out the request.
- 'timeout' requests not successfully completed after this
- many seconds should be aborted. If timeout is None, then
- the handler should not set a timeout.
- 'headers' is a list of ("header-name","header-value") tuples
- which must be passed to the pushgateway in the form of HTTP
- request headers.
- The function should raise an exception (e.g. IOError) on
- failure.
- 'content' is the data which should be used to form the HTTP
- Message Body.
- `compression` selects the payload compression. Supported values are 'gzip'
- and 'snappy'. Defaults to None (no compression).
- This overwrites all metrics with the same job and grouping_key.
- This uses the PUT HTTP method."""
- _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler, compression)
- def pushadd_to_gateway(
- gateway: str,
- job: str,
- registry: Optional[Collector],
- grouping_key: Optional[Dict[str, Any]] = None,
- timeout: Optional[float] = 30,
- handler: Callable = default_handler,
- compression: CompressionType = None,
- ) -> None:
- """PushAdd metrics to the given pushgateway.
- `gateway` the url for your push gateway. Either of the form
- 'http://pushgateway.local', or 'pushgateway.local'.
- Scheme defaults to 'http' if none is provided
- `job` is the job label to be attached to all pushed metrics
- `registry` is a Collector, normally an instance of CollectorRegistry
- `grouping_key` please see the pushgateway documentation for details.
- Defaults to None
- `timeout` is how long push will attempt to connect before giving up.
- Defaults to 30s, can be set to None for no timeout.
- `handler` is an optional function which can be provided to perform
- requests to the 'gateway'.
- Defaults to None, in which case an http or https request
- will be carried out by a default handler.
- See the 'prometheus_client.push_to_gateway' documentation
- for implementation requirements.
- `compression` selects the payload compression. Supported values are 'gzip'
- and 'snappy'. Defaults to None (no compression).
- This replaces metrics with the same name, job and grouping_key.
- This uses the POST HTTP method."""
- _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler, compression)
- def delete_from_gateway(
- gateway: str,
- job: str,
- grouping_key: Optional[Dict[str, Any]] = None,
- timeout: Optional[float] = 30,
- handler: Callable = default_handler,
- ) -> None:
- """Delete metrics from the given pushgateway.
- `gateway` the url for your push gateway. Either of the form
- 'http://pushgateway.local', or 'pushgateway.local'.
- Scheme defaults to 'http' if none is provided
- `job` is the job label to be attached to all pushed metrics
- `grouping_key` please see the pushgateway documentation for details.
- Defaults to None
- `timeout` is how long delete will attempt to connect before giving up.
- Defaults to 30s, can be set to None for no timeout.
- `handler` is an optional function which can be provided to perform
- requests to the 'gateway'.
- Defaults to None, in which case an http or https request
- will be carried out by a default handler.
- See the 'prometheus_client.push_to_gateway' documentation
- for implementation requirements.
- This deletes metrics with the given job and grouping_key.
- This uses the DELETE HTTP method."""
- _use_gateway('DELETE', gateway, job, None, grouping_key, timeout, handler)
- def _use_gateway(
- method: str,
- gateway: str,
- job: str,
- registry: Optional[Collector],
- grouping_key: Optional[Dict[str, Any]],
- timeout: Optional[float],
- handler: Callable,
- compression: CompressionType = None,
- ) -> None:
- gateway_url = urlparse(gateway)
- # See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6.
- if not gateway_url.scheme or gateway_url.scheme not in ['http', 'https']:
- gateway = f'http://{gateway}'
- gateway = gateway.rstrip('/')
- url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job))
- if grouping_key is None:
- grouping_key = {}
- url += ''.join(
- '/{}/{}'.format(*_escape_grouping_key(str(k), str(v)))
- for k, v in sorted(grouping_key.items()))
- data = b''
- headers: List[Tuple[str, str]] = []
- if method != 'DELETE':
- if registry is None:
- registry = REGISTRY
- data = generate_latest(registry)
- data, headers = _compress_payload(data, compression)
- else:
- # DELETE requests still need Content-Type header per test expectations
- headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
- if compression is not None:
- raise ValueError('Compression is not supported for DELETE requests.')
- handler(
- url=url, method=method, timeout=timeout,
- headers=headers, data=data,
- )()
- def _compress_payload(data: bytes, compression: CompressionType) -> Tuple[bytes, List[Tuple[str, str]]]:
- headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
- if compression is None:
- return data, headers
- encoding = compression.lower()
- if encoding == 'gzip':
- headers.append(('Content-Encoding', 'gzip'))
- return gzip.compress(data), headers
- if encoding == 'snappy':
- if not SNAPPY_AVAILABLE:
- raise RuntimeError('Snappy compression requires the python-snappy package to be installed.')
- headers.append(('Content-Encoding', 'snappy'))
- compressor = snappy.StreamCompressor()
- compressed = compressor.compress(data)
- flush = getattr(compressor, 'flush', None)
- if callable(flush):
- compressed += flush()
- return compressed, headers
- raise ValueError(f"Unsupported compression type: {compression}")
- def _escape_grouping_key(k, v):
- if v == "":
- # Per https://github.com/prometheus/pushgateway/pull/346.
- return k + "@base64", "="
- elif '/' in v:
- # Added in Pushgateway 0.9.0.
- return k + "@base64", base64.urlsafe_b64encode(v.encode("utf-8")).decode("utf-8")
- else:
- return k, quote_plus(v)
- def instance_ip_grouping_key() -> Dict[str, Any]:
- """Grouping key with instance set to the IP Address of this host."""
- with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
- if sys.platform == 'darwin':
- # This check is done this way only on MacOS devices
- # it is done this way because the localhost method does
- # not work.
- # This method was adapted from this StackOverflow answer:
- # https://stackoverflow.com/a/28950776
- s.connect(('10.255.255.255', 1))
- else:
- s.connect(('localhost', 0))
- return {'instance': s.getsockname()[0]}
- from .asgi import make_asgi_app # noqa
|