| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474 |
- import asyncio
- import logging
- import os
- from concurrent.futures import ThreadPoolExecutor
- from pathlib import Path
- from typing import TYPE_CHECKING, List, Optional, Set, Tuple
- import ray
- import ray.dashboard.consts as dashboard_consts
- import ray.dashboard.utils as dashboard_utils
- import ray.experimental.internal_kv as internal_kv
- from ray._common.network_utils import build_address
- from ray._common.usage.usage_lib import TagKey, record_extra_usage_tag
- from ray._private import ray_constants
- from ray._private.async_utils import enable_monitor_loop_lag
- from ray._private.ray_constants import env_integer
- from ray._raylet import GcsClient
- from ray.dashboard.consts import (
- AVAILABLE_COMPONENT_NAMES_FOR_METRICS,
- DASHBOARD_METRIC_PORT,
- )
- from ray.dashboard.dashboard_metrics import DashboardPrometheusMetrics
- from ray.dashboard.utils import (
- DashboardHeadModule,
- DashboardHeadModuleConfig,
- async_loop_forever,
- )
- import psutil
- try:
- import prometheus_client
- except ImportError:
- prometheus_client = None
- if TYPE_CHECKING:
- from ray.dashboard.subprocesses.handle import SubprocessModuleHandle
- logger = logging.getLogger(__name__)
- # NOTE: Executor in this head is intentionally constrained to just 1 thread by
- # default to limit its concurrency, therefore reducing potential for
- # GIL contention
- RAY_DASHBOARD_DASHBOARD_HEAD_TPE_MAX_WORKERS = env_integer(
- "RAY_DASHBOARD_DASHBOARD_HEAD_TPE_MAX_WORKERS", 1
- )
- class DashboardHead:
- def __init__(
- self,
- http_host: str,
- http_port: int,
- http_port_retries: int,
- gcs_address: str,
- cluster_id_hex: str,
- node_ip_address: str,
- log_dir: str,
- logging_level: int,
- logging_format: str,
- logging_filename: str,
- logging_rotate_bytes: int,
- logging_rotate_backup_count: int,
- temp_dir: str,
- session_dir: str,
- minimal: bool,
- serve_frontend: bool,
- modules_to_load: Optional[Set[str]] = None,
- ):
- """
- Args:
- http_host: The host address for the Http server.
- http_port: The port for the Http server.
- http_port_retries: The maximum retry to bind ports for the Http server.
- gcs_address: The GCS address in the {address}:{port} format.
- log_dir: The log directory. E.g., /tmp/session_latest/logs.
- logging_level: The logging level (e.g. logging.INFO, logging.DEBUG)
- logging_format: The format string for log messages
- logging_filename: The name of the log file
- logging_rotate_bytes: Max size in bytes before rotating log file
- logging_rotate_backup_count: Number of backup files to keep when rotating
- temp_dir: The temp directory. E.g., /tmp.
- session_dir: The session directory. E.g., tmp/session_latest.
- minimal: Whether or not it will load the minimal modules.
- serve_frontend: If configured, frontend HTML is
- served from the dashboard.
- modules_to_load: A set of module name in string to load.
- By default (None), it loads all available modules.
- Note that available modules could be changed depending on
- minimal flags.
- """
- self.minimal = minimal
- self.serve_frontend = serve_frontend
- # If it is the minimal mode, we shouldn't serve frontend.
- if self.minimal:
- self.serve_frontend = False
- # Public attributes are accessible for all head modules.
- # Walkaround for issue: https://github.com/ray-project/ray/issues/7084
- self.http_host = "127.0.0.1" if http_host == "localhost" else http_host
- self.http_port = http_port
- self.http_port_retries = http_port_retries
- self._modules_to_load = modules_to_load
- self._modules_loaded = False
- self.metrics = None
- self._executor = ThreadPoolExecutor(
- max_workers=RAY_DASHBOARD_DASHBOARD_HEAD_TPE_MAX_WORKERS,
- thread_name_prefix="dashboard_head_executor",
- )
- assert gcs_address is not None
- self.gcs_address = gcs_address
- self.cluster_id_hex = cluster_id_hex
- self.log_dir = log_dir
- self.logging_level = logging_level
- self.logging_format = logging_format
- self.logging_filename = logging_filename
- self.logging_rotate_bytes = logging_rotate_bytes
- self.logging_rotate_backup_count = logging_rotate_backup_count
- self.temp_dir = temp_dir
- self.session_dir = session_dir
- self.session_name = Path(session_dir).name
- self.gcs_error_subscriber = None
- self.gcs_log_subscriber = None
- self.ip = node_ip_address
- self.pid = os.getpid()
- self.dashboard_proc = psutil.Process()
- # If the dashboard is started as non-minimal version, http server should
- # be configured to expose APIs.
- self.http_server = None
- async def _configure_http_server(
- self,
- dashboard_head_modules: List[DashboardHeadModule],
- subprocess_module_handles: List["SubprocessModuleHandle"],
- ):
- from ray.dashboard.http_server_head import HttpServerDashboardHead
- self.http_server = HttpServerDashboardHead(
- self.ip,
- self.http_host,
- self.http_port,
- self.http_port_retries,
- self.gcs_address,
- self.session_name,
- self.metrics,
- )
- await self.http_server.run(dashboard_head_modules, subprocess_module_handles)
- @property
- def http_session(self):
- if not self._modules_loaded and not self.http_server:
- # When the dashboard is still starting up, this property gets
- # called as part of the method_route_table_factory magic. In
- # this case, the property is not actually used but the magic
- # method calls every property to look for a route to add to
- # the global route table. It should be okay for http_server
- # to still be None at this point.
- return None
- assert self.http_server, "Accessing unsupported API in a minimal ray."
- return self.http_server.http_session
- @async_loop_forever(dashboard_consts.GCS_CHECK_ALIVE_INTERVAL_SECONDS)
- async def _gcs_check_alive(self):
- try:
- # If gcs is permanently dead, gcs client will exit the process
- # (see gcs_rpc_client.h)
- await self.gcs_client.async_check_alive(node_ids=[], timeout=None)
- except Exception:
- logger.warning("Failed to check gcs aliveness, will retry", exc_info=True)
- def _load_modules(
- self, modules_to_load: Optional[Set[str]] = None
- ) -> Tuple[List[DashboardHeadModule], List["SubprocessModuleHandle"]]:
- """
- If minimal, only load DashboardHeadModule.
- If non-minimal, load both kinds of modules: DashboardHeadModule, SubprocessModule.
- If modules_to_load is not None, only load the modules in the set.
- """
- dashboard_head_modules = self._load_dashboard_head_modules(modules_to_load)
- subprocess_module_handles = self._load_subprocess_module_handles(
- modules_to_load
- )
- all_names = {type(m).__name__ for m in dashboard_head_modules} | {
- h.module_cls.__name__ for h in subprocess_module_handles
- }
- assert len(all_names) == len(dashboard_head_modules) + len(
- subprocess_module_handles
- ), "Duplicate module names. A module name can't be a DashboardHeadModule and a SubprocessModule at the same time."
- # Verify modules are loaded as expected.
- if modules_to_load is not None and all_names != modules_to_load:
- assert False, (
- f"Actual loaded modules {all_names}, doesn't match the requested modules "
- f"to load, {modules_to_load}."
- )
- self._modules_loaded = True
- return dashboard_head_modules, subprocess_module_handles
- def _load_dashboard_head_modules(
- self, modules_to_load: Optional[Set[str]] = None
- ) -> List[DashboardHeadModule]:
- """Load `DashboardHeadModule`s.
- Args:
- modules: A list of module names to load. By default (None),
- it loads all modules.
- """
- modules = []
- head_cls_list = dashboard_utils.get_all_modules(DashboardHeadModule)
- config = DashboardHeadModuleConfig(
- minimal=self.minimal,
- cluster_id_hex=self.cluster_id_hex,
- session_name=self.session_name,
- gcs_address=self.gcs_address,
- log_dir=self.log_dir,
- temp_dir=self.temp_dir,
- session_dir=self.session_dir,
- ip=self.ip,
- http_host=self.http_host,
- http_port=self.http_port,
- )
- # Select modules to load.
- if modules_to_load is not None:
- head_cls_list = [
- cls for cls in head_cls_list if cls.__name__ in modules_to_load
- ]
- logger.info(f"DashboardHeadModules to load: {modules_to_load}.")
- for cls in head_cls_list:
- logger.info(f"Loading {DashboardHeadModule.__name__}: {cls}.")
- c = cls(config)
- modules.append(c)
- logger.info(f"Loaded {len(modules)} dashboard head modules: {modules}.")
- return modules
- def _load_subprocess_module_handles(
- self, modules_to_load: Optional[Set[str]] = None
- ) -> List["SubprocessModuleHandle"]:
- """
- If minimal, return an empty list.
- If non-minimal, load `SubprocessModule`s by creating Handles to them.
- Args:
- modules: A list of module names to load. By default (None),
- it loads all modules.
- """
- if self.minimal:
- logger.info("Subprocess modules not loaded in minimal mode.")
- return []
- from ray.dashboard.subprocesses.handle import SubprocessModuleHandle
- from ray.dashboard.subprocesses.module import (
- SubprocessModule,
- SubprocessModuleConfig,
- )
- handles = []
- subprocess_cls_list = dashboard_utils.get_all_modules(SubprocessModule)
- loop = ray._common.utils.get_or_create_event_loop()
- config = SubprocessModuleConfig(
- cluster_id_hex=self.cluster_id_hex,
- gcs_address=self.gcs_address,
- session_name=self.session_name,
- temp_dir=self.temp_dir,
- session_dir=self.session_dir,
- logging_level=self.logging_level,
- logging_format=self.logging_format,
- log_dir=self.log_dir,
- logging_filename=self.logging_filename,
- logging_rotate_bytes=self.logging_rotate_bytes,
- logging_rotate_backup_count=self.logging_rotate_backup_count,
- socket_dir=str(Path(self.session_dir) / "sockets"),
- )
- # Select modules to load.
- if modules_to_load is not None:
- subprocess_cls_list = [
- cls for cls in subprocess_cls_list if cls.__name__ in modules_to_load
- ]
- for cls in subprocess_cls_list:
- logger.info(f"Loading {SubprocessModule.__name__}: {cls}.")
- handle = SubprocessModuleHandle(loop, cls, config)
- handles.append(handle)
- logger.info(f"Loaded {len(handles)} subprocess modules: {handles}.")
- return handles
- async def _setup_metrics(self, gcs_client):
- metrics = DashboardPrometheusMetrics()
- # Setup prometheus metrics export server
- assert internal_kv._internal_kv_initialized()
- assert gcs_client is not None
- address = build_address(self.ip, DASHBOARD_METRIC_PORT)
- await gcs_client.async_internal_kv_put(
- "DashboardMetricsAddress".encode(), address.encode(), True, namespace=None
- )
- if prometheus_client:
- try:
- logger.info(
- "Starting dashboard metrics server on port {}".format(
- DASHBOARD_METRIC_PORT
- )
- )
- kwargs = {"addr": "127.0.0.1"} if self.ip == "127.0.0.1" else {}
- prometheus_client.start_http_server(
- port=DASHBOARD_METRIC_PORT,
- registry=metrics.registry,
- **kwargs,
- )
- except Exception:
- logger.exception(
- "An exception occurred while starting the metrics server."
- )
- elif not prometheus_client:
- logger.warning(
- "`prometheus_client` not found, so metrics will not be exported."
- )
- return metrics
- @dashboard_utils.async_loop_forever(dashboard_consts.METRICS_RECORD_INTERVAL_S)
- async def _record_dashboard_metrics(
- self, subprocess_module_handles: List["SubprocessModuleHandle"]
- ):
- labels = {
- "ip": self.ip,
- "pid": self.pid,
- "Version": ray.__version__,
- "Component": "dashboard",
- "SessionName": self.session_name,
- }
- assert "dashboard" in AVAILABLE_COMPONENT_NAMES_FOR_METRICS
- self._record_cpu_mem_metrics_for_proc(self.dashboard_proc)
- for subprocess_module_handle in subprocess_module_handles:
- assert subprocess_module_handle.process is not None
- proc = psutil.Process(subprocess_module_handle.process.pid)
- self._record_cpu_mem_metrics_for_proc(
- proc, subprocess_module_handle.module_cls.__name__
- )
- loop = ray._common.utils.get_or_create_event_loop()
- self.metrics.metrics_event_loop_tasks.labels(**labels).set(
- len(asyncio.all_tasks(loop))
- )
- # Report the max lag since the last export, if any.
- if self._event_loop_lag_s_max is not None:
- self.metrics.metrics_event_loop_lag.labels(**labels).set(
- float(self._event_loop_lag_s_max)
- )
- self._event_loop_lag_s_max = None
- def _record_cpu_mem_metrics_for_proc(
- self, proc: psutil.Process, module_name: str = ""
- ):
- labels = {
- "ip": self.ip,
- "pid": proc.pid,
- "Version": ray.__version__,
- "Component": "dashboard" if not module_name else "dashboard_" + module_name,
- "SessionName": self.session_name,
- }
- proc_attrs = proc.as_dict(attrs=["cpu_percent", "memory_full_info"])
- self.metrics.metrics_dashboard_cpu.labels(**labels).set(
- float(proc_attrs.get("cpu_percent", 0.0))
- )
- # memory_full_info is None on Mac due to the permission issue
- # (https://github.com/giampaolo/psutil/issues/883)
- if proc_attrs.get("memory_full_info") is not None:
- self.metrics.metrics_dashboard_mem_uss.labels(**labels).set(
- float(proc_attrs.get("memory_full_info").uss) / 1.0e6
- )
- self.metrics.metrics_dashboard_mem_rss.labels(**labels).set(
- float(proc_attrs.get("memory_full_info").rss) / 1.0e6
- )
- async def run(self):
- gcs_address = self.gcs_address
- # Dashboard will handle connection failure automatically
- self.gcs_client = GcsClient(address=gcs_address, cluster_id=self.cluster_id_hex)
- internal_kv._initialize_internal_kv(self.gcs_client)
- dashboard_head_modules, subprocess_module_handles = self._load_modules(
- self._modules_to_load
- )
- # Parallel start all subprocess modules.
- for handle in subprocess_module_handles:
- handle.start_module()
- # Wait for all subprocess modules to be ready.
- for handle in subprocess_module_handles:
- handle.wait_for_module_ready()
- if not self.minimal:
- self.metrics = await self._setup_metrics(self.gcs_client)
- self._event_loop_lag_s_max: Optional[float] = None
- def on_new_lag(lag_s):
- # Record the lag. It's exported in `record_dashboard_metrics`
- self._event_loop_lag_s_max = max(self._event_loop_lag_s_max or 0, lag_s)
- enable_monitor_loop_lag(on_new_lag)
- self.record_dashboard_metrics_task = asyncio.create_task(
- self._record_dashboard_metrics(subprocess_module_handles)
- )
- try:
- assert internal_kv._internal_kv_initialized()
- # Note: We always record the usage, but it is not reported
- # if the usage stats is disabled.
- record_extra_usage_tag(TagKey.DASHBOARD_USED, "False")
- except Exception as e:
- logger.warning(
- "Failed to record the dashboard usage. "
- "This error message is harmless and can be ignored. "
- f"Error: {e}"
- )
- http_host, http_port = self.http_host, self.http_port
- if self.serve_frontend:
- logger.info("Initialize the http server.")
- await self._configure_http_server(
- dashboard_head_modules, subprocess_module_handles
- )
- http_host, http_port = self.http_server.get_address()
- logger.info(
- f"http server initialized at {build_address(http_host, http_port)}"
- )
- else:
- logger.info("http server disabled.")
- # We need to expose dashboard's node's ip for other worker nodes
- # if it's listening to all interfaces.
- dashboard_http_host = (
- self.ip
- if self.http_host != ray_constants.DEFAULT_DASHBOARD_IP
- else http_host
- )
- # This synchronous code inside an async context is not great.
- # It is however acceptable, because this only gets run once
- # during initialization and therefore cannot block the event loop.
- # This could be done better in the future, including
- # removing the polling on the Ray side, by communicating the
- # server address to Ray via stdin / stdout or a pipe.
- self.gcs_client.internal_kv_put(
- ray_constants.DASHBOARD_ADDRESS.encode(),
- build_address(dashboard_http_host, http_port).encode(),
- True,
- namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
- )
- concurrent_tasks = [
- self._gcs_check_alive(),
- ]
- for m in dashboard_head_modules:
- concurrent_tasks.append(m.run())
- await asyncio.gather(*concurrent_tasks)
- if self.http_server:
- await self.http_server.cleanup()
|