asgi.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. from typing import Callable
  2. from urllib.parse import parse_qs
  3. from .exposition import _bake_output
  4. from .registry import Collector, REGISTRY
  5. def make_asgi_app(registry: Collector = REGISTRY, disable_compression: bool = False) -> Callable:
  6. """Create a ASGI app which serves the metrics from a registry."""
  7. async def prometheus_app(scope, receive, send):
  8. assert scope.get("type") == "http"
  9. # Prepare parameters
  10. params = parse_qs(scope.get('query_string', b'').decode("utf8"))
  11. accept_header = ",".join([
  12. value.decode("utf8") for (name, value) in scope.get('headers')
  13. if name.decode("utf8").lower() == 'accept'
  14. ])
  15. accept_encoding_header = ",".join([
  16. value.decode("utf8") for (name, value) in scope.get('headers')
  17. if name.decode("utf8").lower() == 'accept-encoding'
  18. ])
  19. # Bake output
  20. status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression)
  21. formatted_headers = []
  22. for header in headers:
  23. formatted_headers.append(tuple(x.encode('utf8') for x in header))
  24. # Return output
  25. payload = await receive()
  26. if payload.get("type") == "http.request":
  27. await send(
  28. {
  29. "type": "http.response.start",
  30. "status": int(status.split(' ')[0]),
  31. "headers": formatted_headers,
  32. }
  33. )
  34. await send({"type": "http.response.body", "body": output})
  35. return prometheus_app