metrics.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. import logging
  2. import re
  3. import warnings
  4. from typing import Any, Dict, List, Optional, Tuple, Union
  5. from ray._private.ray_constants import env_bool
  6. from ray._raylet import (
  7. Count as CythonCount,
  8. Gauge as CythonGauge,
  9. Histogram as CythonHistogram,
  10. Sum as CythonSum,
  11. ) # noqa: E402
  12. # Sum is used for CythonCount because it allows incrementing by positive
  13. # values that are different from one.
  14. from ray.util.annotations import DeveloperAPI
  15. logger = logging.getLogger(__name__)
  16. # Copied from Prometheus Python Client. While the regex is not part of the public API
  17. # for Prometheus, it's not expected to change.
  18. # https://github.com/prometheus/client_python/blob/46eae7bae88f76951f7246d9f359f2dd5eeff110/prometheus_client/validation.py#L4
  19. _VALID_METRIC_NAME_RE = re.compile(r"^[a-zA-Z_:][a-zA-Z0-9_:]*$")
  20. def _is_invalid_metric_name(name: str) -> bool:
  21. if len(name) == 0:
  22. raise ValueError("Empty name is not allowed. Please provide a metric name.")
  23. if not _VALID_METRIC_NAME_RE.match(name):
  24. warnings.warn(
  25. f"Invalid metric name: {name}. Metric will be discarded "
  26. "and data will not be collected or published. "
  27. "Metric names can only contain letters, numbers, _, and :. "
  28. "Metric names cannot start with numbers.",
  29. UserWarning,
  30. )
  31. return True
  32. return False
  33. @DeveloperAPI
  34. class Metric:
  35. """The parent class of custom metrics.
  36. Ray's custom metrics APIs are rooted from this class and share
  37. the same public methods.
  38. """
  39. def __init__(
  40. self,
  41. name: str,
  42. description: str = "",
  43. tag_keys: Optional[Tuple[str, ...]] = None,
  44. ):
  45. # Metrics with invalid names will be discarded and will not be collected
  46. # by Prometheus.
  47. self._discard_metric = _is_invalid_metric_name(name)
  48. self._name = name
  49. self._description = description
  50. # The default tags key-value pair.
  51. self._default_tags = {}
  52. # Keys of tags.
  53. self._tag_keys = tag_keys or tuple()
  54. # The Cython metric class. This should be set in the child class.
  55. self._metric = None
  56. if not isinstance(self._tag_keys, tuple):
  57. raise TypeError(
  58. "tag_keys should be a tuple type, got: " f"{type(self._tag_keys)}"
  59. )
  60. for key in self._tag_keys:
  61. if not isinstance(key, str):
  62. raise TypeError(f"Tag keys must be str, got {type(key)}.")
  63. if ":" in self._name:
  64. warnings.warn(
  65. f"Metric name {self._name} contains a : character, which is no longer allowed. "
  66. f"Please migrate to the new metric name format. "
  67. f"This will be an error in the future.",
  68. FutureWarning,
  69. )
  70. def set_default_tags(self, default_tags: Dict[str, str]):
  71. """Set default tags of metrics.
  72. Example:
  73. >>> from ray.util.metrics import Counter
  74. >>> # Note that set_default_tags returns the instance itself.
  75. >>> counter = Counter("name", tag_keys=("a",))
  76. >>> counter2 = counter.set_default_tags({"a": "b"})
  77. >>> assert counter is counter2
  78. >>> # this means you can instantiate it in this way.
  79. >>> counter = Counter("name", tag_keys=("a",)).set_default_tags({"a": "b"})
  80. Args:
  81. default_tags: Default tags that are
  82. used for every record method.
  83. Returns:
  84. Metric: it returns the instance itself.
  85. """
  86. for key, val in default_tags.items():
  87. if key not in self._tag_keys:
  88. raise ValueError(f"Unrecognized tag key {key}.")
  89. if not isinstance(val, str):
  90. raise TypeError(f"Tag values must be str, got {type(val)}.")
  91. self._default_tags = default_tags
  92. return self
  93. def _record(
  94. self,
  95. value: Union[int, float],
  96. tags: Optional[Dict[str, str]] = None,
  97. ) -> None:
  98. """Record the metric point of the metric.
  99. Tags passed in will take precedence over the metric's default tags.
  100. Args:
  101. value: The value to be recorded as a metric point.
  102. """
  103. if self._discard_metric:
  104. return
  105. assert self._metric is not None
  106. final_tags = self._get_final_tags(tags)
  107. self._validate_tags(final_tags)
  108. self._metric.record(value, tags=final_tags)
  109. def _get_final_tags(self, tags):
  110. if not tags:
  111. return self._default_tags
  112. for val in tags.values():
  113. if not isinstance(val, str):
  114. raise TypeError(f"Tag values must be str, got {type(val)}.")
  115. return {**self._default_tags, **tags}
  116. def _validate_tags(self, final_tags):
  117. missing_tags = []
  118. for tag_key in self._tag_keys:
  119. # Prefer passed tags over default tags.
  120. if tag_key not in final_tags:
  121. missing_tags.append(tag_key)
  122. # Strict validation: if any required tag_keys are missing, raise error
  123. if missing_tags:
  124. raise ValueError(f"Missing value for tag key(s): {','.join(missing_tags)}.")
  125. @property
  126. def info(self) -> Dict[str, Any]:
  127. """Return the information of this metric.
  128. Example:
  129. >>> from ray.util.metrics import Counter
  130. >>> counter = Counter("name", description="desc")
  131. >>> print(counter.info)
  132. {'name': 'name', 'description': 'desc', 'tag_keys': (), 'default_tags': {}}
  133. """
  134. return {
  135. "name": self._name,
  136. "description": self._description,
  137. "tag_keys": self._tag_keys,
  138. "default_tags": self._default_tags,
  139. }
  140. @DeveloperAPI
  141. class Counter(Metric):
  142. """A cumulative metric that is monotonically increasing.
  143. This corresponds to Prometheus' counter metric:
  144. https://prometheus.io/docs/concepts/metric_types/#counter
  145. Before Ray 2.10, this exports a Prometheus gauge metric instead of
  146. a counter metric, which is wrong.
  147. Since 2.10, this exports both counter (with a suffix "_total") and
  148. gauge metrics (for bug compatibility).
  149. Use `RAY_EXPORT_COUNTER_AS_GAUGE=0` to disable exporting the gauge metric.
  150. Args:
  151. name: Name of the metric.
  152. description: Description of the metric.
  153. tag_keys: Tag keys of the metric.
  154. """
  155. def __init__(
  156. self,
  157. name: str,
  158. description: str = "",
  159. tag_keys: Optional[Tuple[str, ...]] = None,
  160. ):
  161. super().__init__(name, description, tag_keys)
  162. if self._discard_metric:
  163. self._metric = None
  164. else:
  165. if env_bool("RAY_enable_open_telemetry", True):
  166. """
  167. For the new opentelemetry implementation, we'll correctly use Counter
  168. rather than Sum.
  169. """
  170. self._metric = CythonCount(
  171. self._name, self._description, self._tag_keys
  172. )
  173. else:
  174. """
  175. For the previous opencensus implementation, we used Sum to support
  176. exporting Counter as a gauge metric. We'll drop that feature in the
  177. new opentelemetry implementation.
  178. """
  179. self._metric = CythonSum(self._name, self._description, self._tag_keys)
  180. def __reduce__(self):
  181. deserializer = self.__class__
  182. serialized_data = (self._name, self._description, self._tag_keys)
  183. return deserializer, serialized_data
  184. def inc(self, value: Union[int, float] = 1.0, tags: Dict[str, str] = None):
  185. """Increment the counter by `value` (defaults to 1).
  186. Tags passed in will take precedence over the metric's default tags.
  187. Args:
  188. value(int, float): Value to increment the counter by (default=1).
  189. tags(Dict[str, str]): Tags to set or override for this counter.
  190. """
  191. if not isinstance(value, (int, float)):
  192. raise TypeError(f"value must be int or float, got {type(value)}.")
  193. if value <= 0:
  194. raise ValueError(f"value must be >0, got {value}")
  195. self._record(value, tags=tags)
  196. @DeveloperAPI
  197. class Histogram(Metric):
  198. """Tracks the size and number of events in buckets.
  199. Histograms allow you to calculate aggregate quantiles
  200. such as 25, 50, 95, 99 percentile latency for an RPC.
  201. This corresponds to Prometheus' histogram metric:
  202. https://prometheus.io/docs/concepts/metric_types/#histogram
  203. Args:
  204. name: Name of the metric.
  205. description: Description of the metric.
  206. boundaries: Boundaries of histogram buckets.
  207. tag_keys: Tag keys of the metric.
  208. """
  209. def __init__(
  210. self,
  211. name: str,
  212. description: str = "",
  213. boundaries: List[float] = None,
  214. tag_keys: Optional[Tuple[str, ...]] = None,
  215. ):
  216. super().__init__(name, description, tag_keys)
  217. if boundaries is None or len(boundaries) == 0:
  218. raise ValueError(
  219. "boundaries argument should be provided when using "
  220. "the Histogram class. e.g., "
  221. 'Histogram("name", boundaries=[1.0, 2.0])'
  222. )
  223. for i, boundary in enumerate(boundaries):
  224. if boundary <= 0:
  225. raise ValueError(
  226. "Invalid `boundaries` argument at index "
  227. f"{i}, {boundaries}. Use positive values for the arguments."
  228. )
  229. self.boundaries = boundaries
  230. if self._discard_metric:
  231. self._metric = None
  232. else:
  233. self._metric = CythonHistogram(
  234. self._name, self._description, self.boundaries, self._tag_keys
  235. )
  236. def observe(self, value: Union[int, float], tags: Dict[str, str] = None):
  237. """Observe a given `value` and add it to the appropriate bucket.
  238. Tags passed in will take precedence over the metric's default tags.
  239. Args:
  240. value(int, float): Value to set the gauge to.
  241. tags(Dict[str, str]): Tags to set or override for this gauge.
  242. """
  243. if not isinstance(value, (int, float)):
  244. raise TypeError(f"value must be int or float, got {type(value)}.")
  245. self._record(value, tags)
  246. def __reduce__(self):
  247. deserializer = Histogram
  248. serialized_data = (
  249. self._name,
  250. self._description,
  251. self.boundaries,
  252. self._tag_keys,
  253. )
  254. return deserializer, serialized_data
  255. @property
  256. def info(self):
  257. """Return information about histogram metric."""
  258. info = super().info
  259. info.update({"boundaries": self.boundaries})
  260. return info
  261. @DeveloperAPI
  262. class Gauge(Metric):
  263. """Gauges keep the last recorded value and drop everything before.
  264. Unlike counters, gauges can go up or down over time.
  265. This corresponds to Prometheus' gauge metric:
  266. https://prometheus.io/docs/concepts/metric_types/#gauge
  267. Args:
  268. name: Name of the metric.
  269. description: Description of the metric.
  270. tag_keys: Tag keys of the metric.
  271. """
  272. def __init__(
  273. self,
  274. name: str,
  275. description: str = "",
  276. tag_keys: Optional[Tuple[str, ...]] = None,
  277. ):
  278. super().__init__(name, description, tag_keys)
  279. if self._discard_metric:
  280. self._metric = None
  281. else:
  282. self._metric = CythonGauge(self._name, self._description, self._tag_keys)
  283. def set(self, value: Optional[Union[int, float]], tags: Dict[str, str] = None):
  284. """Set the gauge to the given `value`.
  285. Tags passed in will take precedence over the metric's default tags.
  286. Args:
  287. value(int, float): Value to set the gauge to. If `None`, this method is a
  288. no-op.
  289. tags(Dict[str, str]): Tags to set or override for this gauge.
  290. """
  291. if value is None:
  292. return
  293. if not isinstance(value, (int, float)):
  294. raise TypeError(f"value must be int or float, got {type(value)}.")
  295. self._record(value, tags)
  296. def __reduce__(self):
  297. deserializer = Gauge
  298. serialized_data = (self._name, self._description, self._tag_keys)
  299. return deserializer, serialized_data
  300. __all__ = [
  301. "Counter",
  302. "Histogram",
  303. "Gauge",
  304. ]