metrics_head.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. import json
  2. import logging
  3. import os
  4. import shutil
  5. from urllib.parse import quote
  6. import aiohttp
  7. import ray.dashboard.optional_utils as dashboard_optional_utils
  8. import ray.dashboard.utils as dashboard_utils
  9. from ray._private.ray_constants import (
  10. PROMETHEUS_SERVICE_DISCOVERY_FILE,
  11. SESSION_LATEST,
  12. )
  13. from ray.dashboard.modules.metrics.grafana_dashboard_factory import (
  14. generate_data_grafana_dashboard,
  15. generate_default_grafana_dashboard,
  16. generate_serve_deployment_grafana_dashboard,
  17. generate_serve_grafana_dashboard,
  18. generate_serve_llm_grafana_dashboard,
  19. generate_train_grafana_dashboard,
  20. )
  21. from ray.dashboard.modules.metrics.templates import (
  22. DASHBOARD_PROVISIONING_TEMPLATE,
  23. GRAFANA_DATASOURCE_TEMPLATE,
  24. GRAFANA_INI_TEMPLATE,
  25. PROMETHEUS_YML_TEMPLATE,
  26. )
  27. from ray.dashboard.subprocesses.module import SubprocessModule
  28. from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
  29. logger = logging.getLogger(__name__)
  30. logger.setLevel(logging.INFO)
  31. METRICS_OUTPUT_ROOT_ENV_VAR = "RAY_METRICS_OUTPUT_ROOT"
  32. DEFAULT_PROMETHEUS_HOST = "http://localhost:9090"
  33. PROMETHEUS_HOST_ENV_VAR = "RAY_PROMETHEUS_HOST"
  34. DEFAULT_PROMETHEUS_HEADERS = "{}"
  35. PROMETHEUS_HEADERS_ENV_VAR = "RAY_PROMETHEUS_HEADERS"
  36. DEFAULT_PROMETHEUS_NAME = "Prometheus"
  37. PROMETHEUS_NAME_ENV_VAR = "RAY_PROMETHEUS_NAME"
  38. PROMETHEUS_HEALTHCHECK_PATH = "-/healthy"
  39. DEFAULT_GRAFANA_HOST = "http://localhost:3000"
  40. GRAFANA_HOST_ENV_VAR = "RAY_GRAFANA_HOST"
  41. GRAFANA_ORG_ID_ENV_VAR = "RAY_GRAFANA_ORG_ID"
  42. DEFAULT_GRAFANA_ORG_ID = "1"
  43. GRAFANA_CLUSTER_FILTER_ENV_VAR = "RAY_GRAFANA_CLUSTER_FILTER"
  44. GRAFANA_HOST_DISABLED_VALUE = "DISABLED"
  45. GRAFANA_IFRAME_HOST_ENV_VAR = "RAY_GRAFANA_IFRAME_HOST"
  46. GRAFANA_DASHBOARD_OUTPUT_DIR_ENV_VAR = "RAY_METRICS_GRAFANA_DASHBOARD_OUTPUT_DIR"
  47. GRAFANA_HEALTHCHECK_PATH = "api/health"
  48. # parse_prom_headers will make sure the input is in one of the following formats:
  49. # 1. {"H1": "V1", "H2": "V2"}
  50. # 2. [["H1", "V1"], ["H2", "V2"], ["H2", "V3"]]
  51. def parse_prom_headers(prometheus_headers):
  52. parsed = json.loads(prometheus_headers)
  53. if isinstance(parsed, dict):
  54. if all(isinstance(k, str) and isinstance(v, str) for k, v in parsed.items()):
  55. return parsed
  56. if isinstance(parsed, list):
  57. if all(len(e) == 2 and all(isinstance(v, str) for v in e) for e in parsed):
  58. return parsed
  59. raise ValueError(
  60. f"{PROMETHEUS_HEADERS_ENV_VAR} should be a JSON string in one of the formats:\n"
  61. + "1) An object with string keys and string values.\n"
  62. + "2) an array of string arrays with 2 string elements each.\n"
  63. + 'For example, {"H1": "V1", "H2": "V2"} and\n'
  64. + '[["H1", "V1"], ["H2", "V2"], ["H2", "V3"]] are valid.'
  65. )
  66. class PrometheusQueryError(Exception):
  67. def __init__(self, status, message):
  68. self.message = (
  69. "Error fetching data from prometheus. "
  70. f"status: {status}, message: {message}"
  71. )
  72. super().__init__(self.message)
  73. class MetricsHead(SubprocessModule):
  74. def __init__(self, *args, **kwargs):
  75. super().__init__(*args, **kwargs)
  76. self.grafana_host = os.environ.get(GRAFANA_HOST_ENV_VAR, DEFAULT_GRAFANA_HOST)
  77. self.prometheus_host = os.environ.get(
  78. PROMETHEUS_HOST_ENV_VAR, DEFAULT_PROMETHEUS_HOST
  79. )
  80. default_metrics_root = os.path.join(self.session_dir, "metrics")
  81. self.prometheus_headers = parse_prom_headers(
  82. os.environ.get(
  83. PROMETHEUS_HEADERS_ENV_VAR,
  84. DEFAULT_PROMETHEUS_HEADERS,
  85. )
  86. )
  87. session_latest_metrics_root = os.path.join(
  88. self.temp_dir, SESSION_LATEST, "metrics"
  89. )
  90. self._metrics_root = os.environ.get(
  91. METRICS_OUTPUT_ROOT_ENV_VAR, default_metrics_root
  92. )
  93. self._metrics_root_session_latest = os.environ.get(
  94. METRICS_OUTPUT_ROOT_ENV_VAR, session_latest_metrics_root
  95. )
  96. self._grafana_config_output_path = os.path.join(self._metrics_root, "grafana")
  97. self._grafana_session_latest_config_output_path = os.path.join(
  98. self._metrics_root_session_latest, "grafana"
  99. )
  100. self._grafana_dashboard_output_dir = os.environ.get(
  101. GRAFANA_DASHBOARD_OUTPUT_DIR_ENV_VAR,
  102. os.path.join(self._grafana_config_output_path, "dashboards"),
  103. )
  104. self._prometheus_name = os.environ.get(
  105. PROMETHEUS_NAME_ENV_VAR, DEFAULT_PROMETHEUS_NAME
  106. )
  107. self._grafana_org_id = os.environ.get(
  108. GRAFANA_ORG_ID_ENV_VAR, DEFAULT_GRAFANA_ORG_ID
  109. )
  110. self._grafana_cluster_filter = os.environ.get(GRAFANA_CLUSTER_FILTER_ENV_VAR)
  111. # To be set later when dashboards gets generated
  112. self._dashboard_uids = {}
  113. @routes.get("/api/grafana_health")
  114. async def grafana_health(self, req) -> aiohttp.web.Response:
  115. """
  116. Endpoint that checks if Grafana is running
  117. """
  118. # If disabled, we don't want to show the metrics tab at all.
  119. if self.grafana_host == GRAFANA_HOST_DISABLED_VALUE:
  120. return dashboard_optional_utils.rest_response(
  121. status_code=dashboard_utils.HTTPStatusCode.OK,
  122. message="Grafana disabled",
  123. grafana_host=GRAFANA_HOST_DISABLED_VALUE,
  124. )
  125. grafana_iframe_host = os.environ.get(
  126. GRAFANA_IFRAME_HOST_ENV_VAR, self.grafana_host
  127. )
  128. path = f"{self.grafana_host}/{GRAFANA_HEALTHCHECK_PATH}"
  129. try:
  130. async with self.http_session.get(path) as resp:
  131. if resp.status != 200:
  132. return dashboard_optional_utils.rest_response(
  133. status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
  134. message="Grafana healthcheck failed",
  135. status=resp.status,
  136. )
  137. json = await resp.json()
  138. # Check if the required Grafana services are running.
  139. if json["database"] != "ok":
  140. return dashboard_optional_utils.rest_response(
  141. status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
  142. message="Grafana healthcheck failed. Database not ok.",
  143. status=resp.status,
  144. json=json,
  145. )
  146. return dashboard_optional_utils.rest_response(
  147. status_code=dashboard_utils.HTTPStatusCode.OK,
  148. message="Grafana running",
  149. grafana_host=grafana_iframe_host,
  150. grafana_org_id=self._grafana_org_id,
  151. session_name=self.session_name,
  152. dashboard_uids=self._dashboard_uids,
  153. dashboard_datasource=self._prometheus_name,
  154. grafana_cluster_filter=self._grafana_cluster_filter,
  155. )
  156. except Exception as e:
  157. logger.debug(
  158. "Error fetching grafana endpoint. Is grafana running?", exc_info=e
  159. )
  160. return dashboard_optional_utils.rest_response(
  161. status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
  162. message="Grafana healthcheck failed",
  163. exception=str(e),
  164. )
  165. @routes.get("/api/prometheus_health")
  166. async def prometheus_health(self, req):
  167. try:
  168. path = f"{self.prometheus_host}/{PROMETHEUS_HEALTHCHECK_PATH}"
  169. async with self.http_session.get(
  170. path, headers=self.prometheus_headers
  171. ) as resp:
  172. if resp.status != 200:
  173. return dashboard_optional_utils.rest_response(
  174. status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
  175. message="prometheus healthcheck failed.",
  176. status=resp.status,
  177. )
  178. return dashboard_optional_utils.rest_response(
  179. status_code=dashboard_utils.HTTPStatusCode.OK,
  180. message="prometheus running",
  181. )
  182. except Exception as e:
  183. logger.debug(
  184. "Error fetching prometheus endpoint. Is prometheus running?", exc_info=e
  185. )
  186. return dashboard_optional_utils.rest_response(
  187. status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
  188. message="prometheus healthcheck failed.",
  189. reason=str(e),
  190. )
  191. def _create_default_grafana_configs(self):
  192. """
  193. Creates the Grafana configurations that are by default provided by Ray.
  194. """
  195. # Create Grafana configuration folder
  196. if os.path.exists(self._grafana_config_output_path):
  197. shutil.rmtree(self._grafana_config_output_path)
  198. os.makedirs(self._grafana_config_output_path, exist_ok=True)
  199. # Overwrite Grafana's configuration file
  200. grafana_provisioning_folder = os.path.join(
  201. self._grafana_config_output_path, "provisioning"
  202. )
  203. grafana_prov_folder_with_latest_session = os.path.join(
  204. self._grafana_session_latest_config_output_path, "provisioning"
  205. )
  206. with open(
  207. os.path.join(
  208. self._grafana_config_output_path,
  209. "grafana.ini",
  210. ),
  211. "w",
  212. ) as f:
  213. f.write(
  214. GRAFANA_INI_TEMPLATE.format(
  215. grafana_provisioning_folder=grafana_prov_folder_with_latest_session
  216. )
  217. )
  218. # Overwrite Grafana's dashboard provisioning directory based on env var
  219. dashboard_provisioning_path = os.path.join(
  220. grafana_provisioning_folder, "dashboards"
  221. )
  222. os.makedirs(
  223. dashboard_provisioning_path,
  224. exist_ok=True,
  225. )
  226. with open(
  227. os.path.join(
  228. dashboard_provisioning_path,
  229. "default.yml",
  230. ),
  231. "w",
  232. ) as f:
  233. f.write(
  234. DASHBOARD_PROVISIONING_TEMPLATE.format(
  235. dashboard_output_folder=self._grafana_dashboard_output_dir
  236. )
  237. )
  238. # Overwrite Grafana's Prometheus datasource based on env var
  239. prometheus_host = os.environ.get(
  240. PROMETHEUS_HOST_ENV_VAR, DEFAULT_PROMETHEUS_HOST
  241. )
  242. prometheus_headers = parse_prom_headers(
  243. os.environ.get(PROMETHEUS_HEADERS_ENV_VAR, DEFAULT_PROMETHEUS_HEADERS)
  244. )
  245. # parse_prom_headers will make sure the prometheus_headers is either format of:
  246. # 1. {"H1": "V1", "H2": "V2"} or
  247. # 2. [["H1", "V1"], ["H2", "V2"], ["H2", "V3"]]
  248. prometheus_header_pairs = []
  249. if isinstance(prometheus_headers, list):
  250. prometheus_header_pairs = prometheus_headers
  251. elif isinstance(prometheus_headers, dict):
  252. prometheus_header_pairs = list(prometheus_headers.items())
  253. data_sources_path = os.path.join(grafana_provisioning_folder, "datasources")
  254. os.makedirs(
  255. data_sources_path,
  256. exist_ok=True,
  257. )
  258. os.makedirs(
  259. self._grafana_dashboard_output_dir,
  260. exist_ok=True,
  261. )
  262. with open(
  263. os.path.join(
  264. data_sources_path,
  265. "default.yml",
  266. ),
  267. "w",
  268. ) as f:
  269. f.write(
  270. GRAFANA_DATASOURCE_TEMPLATE(
  271. prometheus_host=prometheus_host,
  272. prometheus_name=self._prometheus_name,
  273. jsonData={
  274. f"httpHeaderName{i+1}": header
  275. for i, (header, _) in enumerate(prometheus_header_pairs)
  276. },
  277. secureJsonData={
  278. f"httpHeaderValue{i+1}": value
  279. for i, (_, value) in enumerate(prometheus_header_pairs)
  280. },
  281. )
  282. )
  283. with open(
  284. os.path.join(
  285. self._grafana_dashboard_output_dir,
  286. "default_grafana_dashboard.json",
  287. ),
  288. "w",
  289. ) as f:
  290. (
  291. content,
  292. self._dashboard_uids["default"],
  293. ) = generate_default_grafana_dashboard()
  294. f.write(content)
  295. with open(
  296. os.path.join(
  297. self._grafana_dashboard_output_dir,
  298. "serve_grafana_dashboard.json",
  299. ),
  300. "w",
  301. ) as f:
  302. content, self._dashboard_uids["serve"] = generate_serve_grafana_dashboard()
  303. f.write(content)
  304. with open(
  305. os.path.join(
  306. self._grafana_dashboard_output_dir,
  307. "serve_deployment_grafana_dashboard.json",
  308. ),
  309. "w",
  310. ) as f:
  311. (
  312. content,
  313. self._dashboard_uids["serve_deployment"],
  314. ) = generate_serve_deployment_grafana_dashboard()
  315. f.write(content)
  316. with open(
  317. os.path.join(
  318. self._grafana_dashboard_output_dir,
  319. "serve_llm_grafana_dashboard.json",
  320. ),
  321. "w",
  322. ) as f:
  323. (
  324. content,
  325. self._dashboard_uids["serve_llm"],
  326. ) = generate_serve_llm_grafana_dashboard()
  327. f.write(content)
  328. with open(
  329. os.path.join(
  330. self._grafana_dashboard_output_dir,
  331. "data_grafana_dashboard.json",
  332. ),
  333. "w",
  334. ) as f:
  335. (
  336. content,
  337. self._dashboard_uids["data"],
  338. ) = generate_data_grafana_dashboard()
  339. f.write(content)
  340. with open(
  341. os.path.join(
  342. self._grafana_dashboard_output_dir,
  343. "train_grafana_dashboard.json",
  344. ),
  345. "w",
  346. ) as f:
  347. (
  348. content,
  349. self._dashboard_uids["train"],
  350. ) = generate_train_grafana_dashboard()
  351. f.write(content)
  352. def _create_default_prometheus_configs(self):
  353. """
  354. Creates the Prometheus configurations that are by default provided by Ray.
  355. """
  356. prometheus_config_output_path = os.path.join(
  357. self._metrics_root, "prometheus", "prometheus.yml"
  358. )
  359. # Generate the default Prometheus configurations
  360. if os.path.exists(prometheus_config_output_path):
  361. os.remove(prometheus_config_output_path)
  362. os.makedirs(os.path.dirname(prometheus_config_output_path), exist_ok=True)
  363. # This code generates the Prometheus config based on the custom temporary root
  364. # path set by the user at Ray cluster start up (via --temp-dir). In contrast,
  365. # start_prometheus in install_and_start_prometheus.py uses a hardcoded
  366. # Prometheus config at PROMETHEUS_CONFIG_INPUT_PATH that always uses "/tmp/ray".
  367. # Other than the root path, the config file generated here is identical to that
  368. # hardcoded config file.
  369. prom_discovery_file_path = os.path.join(
  370. self.temp_dir, PROMETHEUS_SERVICE_DISCOVERY_FILE
  371. )
  372. with open(prometheus_config_output_path, "w") as f:
  373. f.write(
  374. PROMETHEUS_YML_TEMPLATE.format(
  375. prom_metrics_service_discovery_file_path=prom_discovery_file_path
  376. )
  377. )
  378. async def run(self):
  379. await super().run()
  380. self._create_default_grafana_configs()
  381. self._create_default_prometheus_configs()
  382. async def _query_prometheus(self, query):
  383. async with self.http_session.get(
  384. f"{self.prometheus_host}/api/v1/query?query={quote(query)}",
  385. headers=self.prometheus_headers,
  386. ) as resp:
  387. if resp.status == 200:
  388. prom_data = await resp.json()
  389. return prom_data
  390. message = await resp.text()
  391. raise PrometheusQueryError(resp.status, message)