data_head.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import json
  2. import logging
  3. import os
  4. from enum import Enum
  5. from urllib.parse import quote
  6. import aiohttp
  7. from aiohttp.web import Request, Response
  8. import ray.dashboard.optional_utils as optional_utils
  9. from ray.dashboard.modules.metrics.metrics_head import (
  10. DEFAULT_PROMETHEUS_HEADERS,
  11. DEFAULT_PROMETHEUS_HOST,
  12. PROMETHEUS_HEADERS_ENV_VAR,
  13. PROMETHEUS_HOST_ENV_VAR,
  14. PrometheusQueryError,
  15. parse_prom_headers,
  16. )
  17. from ray.dashboard.subprocesses.module import SubprocessModule
  18. from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
  19. logger = logging.getLogger(__name__)
  20. logger.setLevel(logging.INFO)
  21. # Window and sampling rate used for certain Prometheus queries.
  22. # Datapoints up until `MAX_TIME_WINDOW` ago are queried at `SAMPLE_RATE` intervals.
  23. MAX_TIME_WINDOW = "1h"
  24. SAMPLE_RATE = "1s"
  25. class PrometheusQuery(Enum):
  26. """Enum to store types of Prometheus queries for a given metric and grouping."""
  27. VALUE = ("value", "sum({}{{SessionName='{}'}}) by ({})")
  28. MAX = (
  29. "max",
  30. "max_over_time(sum({}{{SessionName='{}'}}) by ({})["
  31. + f"{MAX_TIME_WINDOW}:{SAMPLE_RATE}])",
  32. )
  33. DATASET_METRICS = {
  34. "ray_data_output_rows": (PrometheusQuery.MAX,),
  35. "ray_data_spilled_bytes": (PrometheusQuery.MAX,),
  36. "ray_data_current_bytes": (PrometheusQuery.VALUE, PrometheusQuery.MAX),
  37. "ray_data_cpu_usage_cores": (PrometheusQuery.VALUE, PrometheusQuery.MAX),
  38. "ray_data_gpu_usage_cores": (PrometheusQuery.VALUE, PrometheusQuery.MAX),
  39. }
  40. class DataHead(SubprocessModule):
  41. def __init__(self, *args, **kwargs):
  42. super().__init__(*args, **kwargs)
  43. self.prometheus_host = os.environ.get(
  44. PROMETHEUS_HOST_ENV_VAR, DEFAULT_PROMETHEUS_HOST
  45. )
  46. self.prometheus_headers = parse_prom_headers(
  47. os.environ.get(
  48. PROMETHEUS_HEADERS_ENV_VAR,
  49. DEFAULT_PROMETHEUS_HEADERS,
  50. )
  51. )
  52. @routes.get("/api/data/datasets/{job_id}")
  53. @optional_utils.init_ray_and_catch_exceptions()
  54. async def get_datasets(self, req: Request) -> Response:
  55. job_id = req.match_info["job_id"]
  56. try:
  57. from ray.data._internal.stats import get_or_create_stats_actor
  58. _stats_actor = get_or_create_stats_actor()
  59. datasets = await _stats_actor.get_datasets.remote(job_id)
  60. # Initializes dataset metric values
  61. for dataset in datasets:
  62. for metric, queries in DATASET_METRICS.items():
  63. datasets[dataset][metric] = {query.value[0]: 0 for query in queries}
  64. for operator in datasets[dataset]["operators"]:
  65. datasets[dataset]["operators"][operator][metric] = {
  66. query.value[0]: 0 for query in queries
  67. }
  68. # Query dataset metric values from prometheus
  69. try:
  70. # TODO (Zandew): store results of completed datasets in stats actor.
  71. for metric, queries in DATASET_METRICS.items():
  72. for query in queries:
  73. query_name, prom_query = query.value
  74. # Dataset level
  75. dataset_result = await self._query_prometheus(
  76. prom_query.format(metric, self.session_name, "dataset")
  77. )
  78. for res in dataset_result["data"]["result"]:
  79. dataset, value = res["metric"]["dataset"], res["value"][1]
  80. if dataset in datasets:
  81. datasets[dataset][metric][query_name] = value
  82. # Operator level
  83. operator_result = await self._query_prometheus(
  84. prom_query.format(
  85. metric, self.session_name, "dataset, operator"
  86. )
  87. )
  88. for res in operator_result["data"]["result"]:
  89. dataset, operator, value = (
  90. res["metric"]["dataset"],
  91. res["metric"]["operator"],
  92. res["value"][1],
  93. )
  94. # Check if dataset/operator is in current _StatsActor scope.
  95. # Prometheus server may contain metrics from previous
  96. # cluster if not reset.
  97. if (
  98. dataset in datasets
  99. and operator in datasets[dataset]["operators"]
  100. ):
  101. datasets[dataset]["operators"][operator][metric][
  102. query_name
  103. ] = value
  104. except aiohttp.client_exceptions.ClientConnectorError:
  105. # Prometheus server may not be running,
  106. # leave these values blank and return other data
  107. logging.exception(
  108. "Exception occurred while querying Prometheus. "
  109. "The Prometheus server may not be running."
  110. )
  111. # Flatten response
  112. for dataset in datasets:
  113. datasets[dataset]["operators"] = list(
  114. map(
  115. lambda item: {"operator": item[0], **item[1]},
  116. datasets[dataset]["operators"].items(),
  117. )
  118. )
  119. datasets = list(
  120. map(lambda item: {"dataset": item[0], **item[1]}, datasets.items())
  121. )
  122. # Sort by descending start time
  123. datasets = sorted(datasets, key=lambda x: x["start_time"], reverse=True)
  124. return Response(
  125. text=json.dumps({"datasets": datasets}),
  126. content_type="application/json",
  127. )
  128. except Exception as e:
  129. logging.exception("Exception occurred while getting datasets.")
  130. return Response(
  131. status=503,
  132. text=str(e),
  133. )
  134. async def _query_prometheus(self, query):
  135. async with self.http_session.get(
  136. f"{self.prometheus_host}/api/v1/query?query={quote(query)}",
  137. headers=self.prometheus_headers,
  138. ) as resp:
  139. if resp.status == 200:
  140. prom_data = await resp.json()
  141. return prom_data
  142. message = await resp.text()
  143. raise PrometheusQueryError(resp.status, message)