| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- import json
- import logging
- import os
- import shutil
- from urllib.parse import quote
- import aiohttp
- import ray.dashboard.optional_utils as dashboard_optional_utils
- import ray.dashboard.utils as dashboard_utils
- from ray._private.ray_constants import (
- PROMETHEUS_SERVICE_DISCOVERY_FILE,
- SESSION_LATEST,
- )
- from ray.dashboard.modules.metrics.grafana_dashboard_factory import (
- generate_data_grafana_dashboard,
- generate_default_grafana_dashboard,
- generate_serve_deployment_grafana_dashboard,
- generate_serve_grafana_dashboard,
- generate_serve_llm_grafana_dashboard,
- generate_train_grafana_dashboard,
- )
- from ray.dashboard.modules.metrics.templates import (
- DASHBOARD_PROVISIONING_TEMPLATE,
- GRAFANA_DATASOURCE_TEMPLATE,
- GRAFANA_INI_TEMPLATE,
- PROMETHEUS_YML_TEMPLATE,
- )
- from ray.dashboard.subprocesses.module import SubprocessModule
- from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- METRICS_OUTPUT_ROOT_ENV_VAR = "RAY_METRICS_OUTPUT_ROOT"
- DEFAULT_PROMETHEUS_HOST = "http://localhost:9090"
- PROMETHEUS_HOST_ENV_VAR = "RAY_PROMETHEUS_HOST"
- DEFAULT_PROMETHEUS_HEADERS = "{}"
- PROMETHEUS_HEADERS_ENV_VAR = "RAY_PROMETHEUS_HEADERS"
- DEFAULT_PROMETHEUS_NAME = "Prometheus"
- PROMETHEUS_NAME_ENV_VAR = "RAY_PROMETHEUS_NAME"
- PROMETHEUS_HEALTHCHECK_PATH = "-/healthy"
- DEFAULT_GRAFANA_HOST = "http://localhost:3000"
- GRAFANA_HOST_ENV_VAR = "RAY_GRAFANA_HOST"
- GRAFANA_ORG_ID_ENV_VAR = "RAY_GRAFANA_ORG_ID"
- DEFAULT_GRAFANA_ORG_ID = "1"
- GRAFANA_CLUSTER_FILTER_ENV_VAR = "RAY_GRAFANA_CLUSTER_FILTER"
- GRAFANA_HOST_DISABLED_VALUE = "DISABLED"
- GRAFANA_IFRAME_HOST_ENV_VAR = "RAY_GRAFANA_IFRAME_HOST"
- GRAFANA_DASHBOARD_OUTPUT_DIR_ENV_VAR = "RAY_METRICS_GRAFANA_DASHBOARD_OUTPUT_DIR"
- GRAFANA_HEALTHCHECK_PATH = "api/health"
- # parse_prom_headers will make sure the input is in one of the following formats:
- # 1. {"H1": "V1", "H2": "V2"}
- # 2. [["H1", "V1"], ["H2", "V2"], ["H2", "V3"]]
- def parse_prom_headers(prometheus_headers):
- parsed = json.loads(prometheus_headers)
- if isinstance(parsed, dict):
- if all(isinstance(k, str) and isinstance(v, str) for k, v in parsed.items()):
- return parsed
- if isinstance(parsed, list):
- if all(len(e) == 2 and all(isinstance(v, str) for v in e) for e in parsed):
- return parsed
- raise ValueError(
- f"{PROMETHEUS_HEADERS_ENV_VAR} should be a JSON string in one of the formats:\n"
- + "1) An object with string keys and string values.\n"
- + "2) an array of string arrays with 2 string elements each.\n"
- + 'For example, {"H1": "V1", "H2": "V2"} and\n'
- + '[["H1", "V1"], ["H2", "V2"], ["H2", "V3"]] are valid.'
- )
- class PrometheusQueryError(Exception):
- def __init__(self, status, message):
- self.message = (
- "Error fetching data from prometheus. "
- f"status: {status}, message: {message}"
- )
- super().__init__(self.message)
- class MetricsHead(SubprocessModule):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.grafana_host = os.environ.get(GRAFANA_HOST_ENV_VAR, DEFAULT_GRAFANA_HOST)
- self.prometheus_host = os.environ.get(
- PROMETHEUS_HOST_ENV_VAR, DEFAULT_PROMETHEUS_HOST
- )
- default_metrics_root = os.path.join(self.session_dir, "metrics")
- self.prometheus_headers = parse_prom_headers(
- os.environ.get(
- PROMETHEUS_HEADERS_ENV_VAR,
- DEFAULT_PROMETHEUS_HEADERS,
- )
- )
- session_latest_metrics_root = os.path.join(
- self.temp_dir, SESSION_LATEST, "metrics"
- )
- self._metrics_root = os.environ.get(
- METRICS_OUTPUT_ROOT_ENV_VAR, default_metrics_root
- )
- self._metrics_root_session_latest = os.environ.get(
- METRICS_OUTPUT_ROOT_ENV_VAR, session_latest_metrics_root
- )
- self._grafana_config_output_path = os.path.join(self._metrics_root, "grafana")
- self._grafana_session_latest_config_output_path = os.path.join(
- self._metrics_root_session_latest, "grafana"
- )
- self._grafana_dashboard_output_dir = os.environ.get(
- GRAFANA_DASHBOARD_OUTPUT_DIR_ENV_VAR,
- os.path.join(self._grafana_config_output_path, "dashboards"),
- )
- self._prometheus_name = os.environ.get(
- PROMETHEUS_NAME_ENV_VAR, DEFAULT_PROMETHEUS_NAME
- )
- self._grafana_org_id = os.environ.get(
- GRAFANA_ORG_ID_ENV_VAR, DEFAULT_GRAFANA_ORG_ID
- )
- self._grafana_cluster_filter = os.environ.get(GRAFANA_CLUSTER_FILTER_ENV_VAR)
- # To be set later when dashboards gets generated
- self._dashboard_uids = {}
- @routes.get("/api/grafana_health")
- async def grafana_health(self, req) -> aiohttp.web.Response:
- """
- Endpoint that checks if Grafana is running
- """
- # If disabled, we don't want to show the metrics tab at all.
- if self.grafana_host == GRAFANA_HOST_DISABLED_VALUE:
- return dashboard_optional_utils.rest_response(
- status_code=dashboard_utils.HTTPStatusCode.OK,
- message="Grafana disabled",
- grafana_host=GRAFANA_HOST_DISABLED_VALUE,
- )
- grafana_iframe_host = os.environ.get(
- GRAFANA_IFRAME_HOST_ENV_VAR, self.grafana_host
- )
- path = f"{self.grafana_host}/{GRAFANA_HEALTHCHECK_PATH}"
- try:
- async with self.http_session.get(path) as resp:
- if resp.status != 200:
- return dashboard_optional_utils.rest_response(
- status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
- message="Grafana healthcheck failed",
- status=resp.status,
- )
- json = await resp.json()
- # Check if the required Grafana services are running.
- if json["database"] != "ok":
- return dashboard_optional_utils.rest_response(
- status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
- message="Grafana healthcheck failed. Database not ok.",
- status=resp.status,
- json=json,
- )
- return dashboard_optional_utils.rest_response(
- status_code=dashboard_utils.HTTPStatusCode.OK,
- message="Grafana running",
- grafana_host=grafana_iframe_host,
- grafana_org_id=self._grafana_org_id,
- session_name=self.session_name,
- dashboard_uids=self._dashboard_uids,
- dashboard_datasource=self._prometheus_name,
- grafana_cluster_filter=self._grafana_cluster_filter,
- )
- except Exception as e:
- logger.debug(
- "Error fetching grafana endpoint. Is grafana running?", exc_info=e
- )
- return dashboard_optional_utils.rest_response(
- status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
- message="Grafana healthcheck failed",
- exception=str(e),
- )
- @routes.get("/api/prometheus_health")
- async def prometheus_health(self, req):
- try:
- path = f"{self.prometheus_host}/{PROMETHEUS_HEALTHCHECK_PATH}"
- async with self.http_session.get(
- path, headers=self.prometheus_headers
- ) as resp:
- if resp.status != 200:
- return dashboard_optional_utils.rest_response(
- status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
- message="prometheus healthcheck failed.",
- status=resp.status,
- )
- return dashboard_optional_utils.rest_response(
- status_code=dashboard_utils.HTTPStatusCode.OK,
- message="prometheus running",
- )
- except Exception as e:
- logger.debug(
- "Error fetching prometheus endpoint. Is prometheus running?", exc_info=e
- )
- return dashboard_optional_utils.rest_response(
- status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
- message="prometheus healthcheck failed.",
- reason=str(e),
- )
- def _create_default_grafana_configs(self):
- """
- Creates the Grafana configurations that are by default provided by Ray.
- """
- # Create Grafana configuration folder
- if os.path.exists(self._grafana_config_output_path):
- shutil.rmtree(self._grafana_config_output_path)
- os.makedirs(self._grafana_config_output_path, exist_ok=True)
- # Overwrite Grafana's configuration file
- grafana_provisioning_folder = os.path.join(
- self._grafana_config_output_path, "provisioning"
- )
- grafana_prov_folder_with_latest_session = os.path.join(
- self._grafana_session_latest_config_output_path, "provisioning"
- )
- with open(
- os.path.join(
- self._grafana_config_output_path,
- "grafana.ini",
- ),
- "w",
- ) as f:
- f.write(
- GRAFANA_INI_TEMPLATE.format(
- grafana_provisioning_folder=grafana_prov_folder_with_latest_session
- )
- )
- # Overwrite Grafana's dashboard provisioning directory based on env var
- dashboard_provisioning_path = os.path.join(
- grafana_provisioning_folder, "dashboards"
- )
- os.makedirs(
- dashboard_provisioning_path,
- exist_ok=True,
- )
- with open(
- os.path.join(
- dashboard_provisioning_path,
- "default.yml",
- ),
- "w",
- ) as f:
- f.write(
- DASHBOARD_PROVISIONING_TEMPLATE.format(
- dashboard_output_folder=self._grafana_dashboard_output_dir
- )
- )
- # Overwrite Grafana's Prometheus datasource based on env var
- prometheus_host = os.environ.get(
- PROMETHEUS_HOST_ENV_VAR, DEFAULT_PROMETHEUS_HOST
- )
- prometheus_headers = parse_prom_headers(
- os.environ.get(PROMETHEUS_HEADERS_ENV_VAR, DEFAULT_PROMETHEUS_HEADERS)
- )
- # parse_prom_headers will make sure the prometheus_headers is either format of:
- # 1. {"H1": "V1", "H2": "V2"} or
- # 2. [["H1", "V1"], ["H2", "V2"], ["H2", "V3"]]
- prometheus_header_pairs = []
- if isinstance(prometheus_headers, list):
- prometheus_header_pairs = prometheus_headers
- elif isinstance(prometheus_headers, dict):
- prometheus_header_pairs = list(prometheus_headers.items())
- data_sources_path = os.path.join(grafana_provisioning_folder, "datasources")
- os.makedirs(
- data_sources_path,
- exist_ok=True,
- )
- os.makedirs(
- self._grafana_dashboard_output_dir,
- exist_ok=True,
- )
- with open(
- os.path.join(
- data_sources_path,
- "default.yml",
- ),
- "w",
- ) as f:
- f.write(
- GRAFANA_DATASOURCE_TEMPLATE(
- prometheus_host=prometheus_host,
- prometheus_name=self._prometheus_name,
- jsonData={
- f"httpHeaderName{i+1}": header
- for i, (header, _) in enumerate(prometheus_header_pairs)
- },
- secureJsonData={
- f"httpHeaderValue{i+1}": value
- for i, (_, value) in enumerate(prometheus_header_pairs)
- },
- )
- )
- with open(
- os.path.join(
- self._grafana_dashboard_output_dir,
- "default_grafana_dashboard.json",
- ),
- "w",
- ) as f:
- (
- content,
- self._dashboard_uids["default"],
- ) = generate_default_grafana_dashboard()
- f.write(content)
- with open(
- os.path.join(
- self._grafana_dashboard_output_dir,
- "serve_grafana_dashboard.json",
- ),
- "w",
- ) as f:
- content, self._dashboard_uids["serve"] = generate_serve_grafana_dashboard()
- f.write(content)
- with open(
- os.path.join(
- self._grafana_dashboard_output_dir,
- "serve_deployment_grafana_dashboard.json",
- ),
- "w",
- ) as f:
- (
- content,
- self._dashboard_uids["serve_deployment"],
- ) = generate_serve_deployment_grafana_dashboard()
- f.write(content)
- with open(
- os.path.join(
- self._grafana_dashboard_output_dir,
- "serve_llm_grafana_dashboard.json",
- ),
- "w",
- ) as f:
- (
- content,
- self._dashboard_uids["serve_llm"],
- ) = generate_serve_llm_grafana_dashboard()
- f.write(content)
- with open(
- os.path.join(
- self._grafana_dashboard_output_dir,
- "data_grafana_dashboard.json",
- ),
- "w",
- ) as f:
- (
- content,
- self._dashboard_uids["data"],
- ) = generate_data_grafana_dashboard()
- f.write(content)
- with open(
- os.path.join(
- self._grafana_dashboard_output_dir,
- "train_grafana_dashboard.json",
- ),
- "w",
- ) as f:
- (
- content,
- self._dashboard_uids["train"],
- ) = generate_train_grafana_dashboard()
- f.write(content)
- def _create_default_prometheus_configs(self):
- """
- Creates the Prometheus configurations that are by default provided by Ray.
- """
- prometheus_config_output_path = os.path.join(
- self._metrics_root, "prometheus", "prometheus.yml"
- )
- # Generate the default Prometheus configurations
- if os.path.exists(prometheus_config_output_path):
- os.remove(prometheus_config_output_path)
- os.makedirs(os.path.dirname(prometheus_config_output_path), exist_ok=True)
- # This code generates the Prometheus config based on the custom temporary root
- # path set by the user at Ray cluster start up (via --temp-dir). In contrast,
- # start_prometheus in install_and_start_prometheus.py uses a hardcoded
- # Prometheus config at PROMETHEUS_CONFIG_INPUT_PATH that always uses "/tmp/ray".
- # Other than the root path, the config file generated here is identical to that
- # hardcoded config file.
- prom_discovery_file_path = os.path.join(
- self.temp_dir, PROMETHEUS_SERVICE_DISCOVERY_FILE
- )
- with open(prometheus_config_output_path, "w") as f:
- f.write(
- PROMETHEUS_YML_TEMPLATE.format(
- prom_metrics_service_discovery_file_path=prom_discovery_file_path
- )
- )
- async def run(self):
- await super().run()
- self._create_default_grafana_configs()
- self._create_default_prometheus_configs()
- async def _query_prometheus(self, query):
- async with self.http_session.get(
- f"{self.prometheus_host}/api/v1/query?query={quote(query)}",
- headers=self.prometheus_headers,
- ) as resp:
- if resp.status == 200:
- prom_data = await resp.json()
- return prom_data
- message = await resp.text()
- raise PrometheusQueryError(resp.status, message)
|