metrics.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import time
  2. from typing import Any, Optional, TYPE_CHECKING
  3. import sentry_sdk
  4. from sentry_sdk.utils import format_attribute
  5. if TYPE_CHECKING:
  6. from sentry_sdk._types import Attributes, Metric, MetricType
  7. def _capture_metric(
  8. name: str,
  9. metric_type: "MetricType",
  10. value: float,
  11. unit: "Optional[str]" = None,
  12. attributes: "Optional[Attributes]" = None,
  13. ) -> None:
  14. attrs: "Attributes" = {}
  15. if attributes:
  16. for k, v in attributes.items():
  17. attrs[k] = format_attribute(v)
  18. metric: "Metric" = {
  19. "timestamp": time.time(),
  20. "trace_id": None,
  21. "span_id": None,
  22. "name": name,
  23. "type": metric_type,
  24. "value": float(value),
  25. "unit": unit,
  26. "attributes": attrs,
  27. }
  28. sentry_sdk.get_current_scope()._capture_metric(metric)
  29. def count(
  30. name: str,
  31. value: float,
  32. unit: "Optional[str]" = None,
  33. attributes: "Optional[dict[str, Any]]" = None,
  34. ) -> None:
  35. _capture_metric(name, "counter", value, unit, attributes)
  36. def gauge(
  37. name: str,
  38. value: float,
  39. unit: "Optional[str]" = None,
  40. attributes: "Optional[dict[str, Any]]" = None,
  41. ) -> None:
  42. _capture_metric(name, "gauge", value, unit, attributes)
  43. def distribution(
  44. name: str,
  45. value: float,
  46. unit: "Optional[str]" = None,
  47. attributes: "Optional[dict[str, Any]]" = None,
  48. ) -> None:
  49. _capture_metric(name, "distribution", value, unit, attributes)