head.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. import asyncio
  2. import logging
  3. import os
  4. from concurrent.futures import ThreadPoolExecutor
  5. from pathlib import Path
  6. from typing import TYPE_CHECKING, List, Optional, Set, Tuple
  7. import ray
  8. import ray.dashboard.consts as dashboard_consts
  9. import ray.dashboard.utils as dashboard_utils
  10. import ray.experimental.internal_kv as internal_kv
  11. from ray._common.network_utils import build_address
  12. from ray._common.usage.usage_lib import TagKey, record_extra_usage_tag
  13. from ray._private import ray_constants
  14. from ray._private.async_utils import enable_monitor_loop_lag
  15. from ray._private.ray_constants import env_integer
  16. from ray._raylet import GcsClient
  17. from ray.dashboard.consts import (
  18. AVAILABLE_COMPONENT_NAMES_FOR_METRICS,
  19. DASHBOARD_METRIC_PORT,
  20. )
  21. from ray.dashboard.dashboard_metrics import DashboardPrometheusMetrics
  22. from ray.dashboard.utils import (
  23. DashboardHeadModule,
  24. DashboardHeadModuleConfig,
  25. async_loop_forever,
  26. )
  27. import psutil
  28. try:
  29. import prometheus_client
  30. except ImportError:
  31. prometheus_client = None
  32. if TYPE_CHECKING:
  33. from ray.dashboard.subprocesses.handle import SubprocessModuleHandle
  34. logger = logging.getLogger(__name__)
  35. # NOTE: Executor in this head is intentionally constrained to just 1 thread by
  36. # default to limit its concurrency, therefore reducing potential for
  37. # GIL contention
  38. RAY_DASHBOARD_DASHBOARD_HEAD_TPE_MAX_WORKERS = env_integer(
  39. "RAY_DASHBOARD_DASHBOARD_HEAD_TPE_MAX_WORKERS", 1
  40. )
  41. class DashboardHead:
  42. def __init__(
  43. self,
  44. http_host: str,
  45. http_port: int,
  46. http_port_retries: int,
  47. gcs_address: str,
  48. cluster_id_hex: str,
  49. node_ip_address: str,
  50. log_dir: str,
  51. logging_level: int,
  52. logging_format: str,
  53. logging_filename: str,
  54. logging_rotate_bytes: int,
  55. logging_rotate_backup_count: int,
  56. temp_dir: str,
  57. session_dir: str,
  58. minimal: bool,
  59. serve_frontend: bool,
  60. modules_to_load: Optional[Set[str]] = None,
  61. ):
  62. """
  63. Args:
  64. http_host: The host address for the Http server.
  65. http_port: The port for the Http server.
  66. http_port_retries: The maximum retry to bind ports for the Http server.
  67. gcs_address: The GCS address in the {address}:{port} format.
  68. log_dir: The log directory. E.g., /tmp/session_latest/logs.
  69. logging_level: The logging level (e.g. logging.INFO, logging.DEBUG)
  70. logging_format: The format string for log messages
  71. logging_filename: The name of the log file
  72. logging_rotate_bytes: Max size in bytes before rotating log file
  73. logging_rotate_backup_count: Number of backup files to keep when rotating
  74. temp_dir: The temp directory. E.g., /tmp.
  75. session_dir: The session directory. E.g., tmp/session_latest.
  76. minimal: Whether or not it will load the minimal modules.
  77. serve_frontend: If configured, frontend HTML is
  78. served from the dashboard.
  79. modules_to_load: A set of module name in string to load.
  80. By default (None), it loads all available modules.
  81. Note that available modules could be changed depending on
  82. minimal flags.
  83. """
  84. self.minimal = minimal
  85. self.serve_frontend = serve_frontend
  86. # If it is the minimal mode, we shouldn't serve frontend.
  87. if self.minimal:
  88. self.serve_frontend = False
  89. # Public attributes are accessible for all head modules.
  90. # Walkaround for issue: https://github.com/ray-project/ray/issues/7084
  91. self.http_host = "127.0.0.1" if http_host == "localhost" else http_host
  92. self.http_port = http_port
  93. self.http_port_retries = http_port_retries
  94. self._modules_to_load = modules_to_load
  95. self._modules_loaded = False
  96. self.metrics = None
  97. self._executor = ThreadPoolExecutor(
  98. max_workers=RAY_DASHBOARD_DASHBOARD_HEAD_TPE_MAX_WORKERS,
  99. thread_name_prefix="dashboard_head_executor",
  100. )
  101. assert gcs_address is not None
  102. self.gcs_address = gcs_address
  103. self.cluster_id_hex = cluster_id_hex
  104. self.log_dir = log_dir
  105. self.logging_level = logging_level
  106. self.logging_format = logging_format
  107. self.logging_filename = logging_filename
  108. self.logging_rotate_bytes = logging_rotate_bytes
  109. self.logging_rotate_backup_count = logging_rotate_backup_count
  110. self.temp_dir = temp_dir
  111. self.session_dir = session_dir
  112. self.session_name = Path(session_dir).name
  113. self.gcs_error_subscriber = None
  114. self.gcs_log_subscriber = None
  115. self.ip = node_ip_address
  116. self.pid = os.getpid()
  117. self.dashboard_proc = psutil.Process()
  118. # If the dashboard is started as non-minimal version, http server should
  119. # be configured to expose APIs.
  120. self.http_server = None
  121. async def _configure_http_server(
  122. self,
  123. dashboard_head_modules: List[DashboardHeadModule],
  124. subprocess_module_handles: List["SubprocessModuleHandle"],
  125. ):
  126. from ray.dashboard.http_server_head import HttpServerDashboardHead
  127. self.http_server = HttpServerDashboardHead(
  128. self.ip,
  129. self.http_host,
  130. self.http_port,
  131. self.http_port_retries,
  132. self.gcs_address,
  133. self.session_name,
  134. self.metrics,
  135. )
  136. await self.http_server.run(dashboard_head_modules, subprocess_module_handles)
  137. @property
  138. def http_session(self):
  139. if not self._modules_loaded and not self.http_server:
  140. # When the dashboard is still starting up, this property gets
  141. # called as part of the method_route_table_factory magic. In
  142. # this case, the property is not actually used but the magic
  143. # method calls every property to look for a route to add to
  144. # the global route table. It should be okay for http_server
  145. # to still be None at this point.
  146. return None
  147. assert self.http_server, "Accessing unsupported API in a minimal ray."
  148. return self.http_server.http_session
  149. @async_loop_forever(dashboard_consts.GCS_CHECK_ALIVE_INTERVAL_SECONDS)
  150. async def _gcs_check_alive(self):
  151. try:
  152. # If gcs is permanently dead, gcs client will exit the process
  153. # (see gcs_rpc_client.h)
  154. await self.gcs_client.async_check_alive(node_ids=[], timeout=None)
  155. except Exception:
  156. logger.warning("Failed to check gcs aliveness, will retry", exc_info=True)
  157. def _load_modules(
  158. self, modules_to_load: Optional[Set[str]] = None
  159. ) -> Tuple[List[DashboardHeadModule], List["SubprocessModuleHandle"]]:
  160. """
  161. If minimal, only load DashboardHeadModule.
  162. If non-minimal, load both kinds of modules: DashboardHeadModule, SubprocessModule.
  163. If modules_to_load is not None, only load the modules in the set.
  164. """
  165. dashboard_head_modules = self._load_dashboard_head_modules(modules_to_load)
  166. subprocess_module_handles = self._load_subprocess_module_handles(
  167. modules_to_load
  168. )
  169. all_names = {type(m).__name__ for m in dashboard_head_modules} | {
  170. h.module_cls.__name__ for h in subprocess_module_handles
  171. }
  172. assert len(all_names) == len(dashboard_head_modules) + len(
  173. subprocess_module_handles
  174. ), "Duplicate module names. A module name can't be a DashboardHeadModule and a SubprocessModule at the same time."
  175. # Verify modules are loaded as expected.
  176. if modules_to_load is not None and all_names != modules_to_load:
  177. assert False, (
  178. f"Actual loaded modules {all_names}, doesn't match the requested modules "
  179. f"to load, {modules_to_load}."
  180. )
  181. self._modules_loaded = True
  182. return dashboard_head_modules, subprocess_module_handles
  183. def _load_dashboard_head_modules(
  184. self, modules_to_load: Optional[Set[str]] = None
  185. ) -> List[DashboardHeadModule]:
  186. """Load `DashboardHeadModule`s.
  187. Args:
  188. modules: A list of module names to load. By default (None),
  189. it loads all modules.
  190. """
  191. modules = []
  192. head_cls_list = dashboard_utils.get_all_modules(DashboardHeadModule)
  193. config = DashboardHeadModuleConfig(
  194. minimal=self.minimal,
  195. cluster_id_hex=self.cluster_id_hex,
  196. session_name=self.session_name,
  197. gcs_address=self.gcs_address,
  198. log_dir=self.log_dir,
  199. temp_dir=self.temp_dir,
  200. session_dir=self.session_dir,
  201. ip=self.ip,
  202. http_host=self.http_host,
  203. http_port=self.http_port,
  204. )
  205. # Select modules to load.
  206. if modules_to_load is not None:
  207. head_cls_list = [
  208. cls for cls in head_cls_list if cls.__name__ in modules_to_load
  209. ]
  210. logger.info(f"DashboardHeadModules to load: {modules_to_load}.")
  211. for cls in head_cls_list:
  212. logger.info(f"Loading {DashboardHeadModule.__name__}: {cls}.")
  213. c = cls(config)
  214. modules.append(c)
  215. logger.info(f"Loaded {len(modules)} dashboard head modules: {modules}.")
  216. return modules
  217. def _load_subprocess_module_handles(
  218. self, modules_to_load: Optional[Set[str]] = None
  219. ) -> List["SubprocessModuleHandle"]:
  220. """
  221. If minimal, return an empty list.
  222. If non-minimal, load `SubprocessModule`s by creating Handles to them.
  223. Args:
  224. modules: A list of module names to load. By default (None),
  225. it loads all modules.
  226. """
  227. if self.minimal:
  228. logger.info("Subprocess modules not loaded in minimal mode.")
  229. return []
  230. from ray.dashboard.subprocesses.handle import SubprocessModuleHandle
  231. from ray.dashboard.subprocesses.module import (
  232. SubprocessModule,
  233. SubprocessModuleConfig,
  234. )
  235. handles = []
  236. subprocess_cls_list = dashboard_utils.get_all_modules(SubprocessModule)
  237. loop = ray._common.utils.get_or_create_event_loop()
  238. config = SubprocessModuleConfig(
  239. cluster_id_hex=self.cluster_id_hex,
  240. gcs_address=self.gcs_address,
  241. session_name=self.session_name,
  242. temp_dir=self.temp_dir,
  243. session_dir=self.session_dir,
  244. logging_level=self.logging_level,
  245. logging_format=self.logging_format,
  246. log_dir=self.log_dir,
  247. logging_filename=self.logging_filename,
  248. logging_rotate_bytes=self.logging_rotate_bytes,
  249. logging_rotate_backup_count=self.logging_rotate_backup_count,
  250. socket_dir=str(Path(self.session_dir) / "sockets"),
  251. )
  252. # Select modules to load.
  253. if modules_to_load is not None:
  254. subprocess_cls_list = [
  255. cls for cls in subprocess_cls_list if cls.__name__ in modules_to_load
  256. ]
  257. for cls in subprocess_cls_list:
  258. logger.info(f"Loading {SubprocessModule.__name__}: {cls}.")
  259. handle = SubprocessModuleHandle(loop, cls, config)
  260. handles.append(handle)
  261. logger.info(f"Loaded {len(handles)} subprocess modules: {handles}.")
  262. return handles
  263. async def _setup_metrics(self, gcs_client):
  264. metrics = DashboardPrometheusMetrics()
  265. # Setup prometheus metrics export server
  266. assert internal_kv._internal_kv_initialized()
  267. assert gcs_client is not None
  268. address = build_address(self.ip, DASHBOARD_METRIC_PORT)
  269. await gcs_client.async_internal_kv_put(
  270. "DashboardMetricsAddress".encode(), address.encode(), True, namespace=None
  271. )
  272. if prometheus_client:
  273. try:
  274. logger.info(
  275. "Starting dashboard metrics server on port {}".format(
  276. DASHBOARD_METRIC_PORT
  277. )
  278. )
  279. kwargs = {"addr": "127.0.0.1"} if self.ip == "127.0.0.1" else {}
  280. prometheus_client.start_http_server(
  281. port=DASHBOARD_METRIC_PORT,
  282. registry=metrics.registry,
  283. **kwargs,
  284. )
  285. except Exception:
  286. logger.exception(
  287. "An exception occurred while starting the metrics server."
  288. )
  289. elif not prometheus_client:
  290. logger.warning(
  291. "`prometheus_client` not found, so metrics will not be exported."
  292. )
  293. return metrics
  294. @dashboard_utils.async_loop_forever(dashboard_consts.METRICS_RECORD_INTERVAL_S)
  295. async def _record_dashboard_metrics(
  296. self, subprocess_module_handles: List["SubprocessModuleHandle"]
  297. ):
  298. labels = {
  299. "ip": self.ip,
  300. "pid": self.pid,
  301. "Version": ray.__version__,
  302. "Component": "dashboard",
  303. "SessionName": self.session_name,
  304. }
  305. assert "dashboard" in AVAILABLE_COMPONENT_NAMES_FOR_METRICS
  306. self._record_cpu_mem_metrics_for_proc(self.dashboard_proc)
  307. for subprocess_module_handle in subprocess_module_handles:
  308. assert subprocess_module_handle.process is not None
  309. proc = psutil.Process(subprocess_module_handle.process.pid)
  310. self._record_cpu_mem_metrics_for_proc(
  311. proc, subprocess_module_handle.module_cls.__name__
  312. )
  313. loop = ray._common.utils.get_or_create_event_loop()
  314. self.metrics.metrics_event_loop_tasks.labels(**labels).set(
  315. len(asyncio.all_tasks(loop))
  316. )
  317. # Report the max lag since the last export, if any.
  318. if self._event_loop_lag_s_max is not None:
  319. self.metrics.metrics_event_loop_lag.labels(**labels).set(
  320. float(self._event_loop_lag_s_max)
  321. )
  322. self._event_loop_lag_s_max = None
  323. def _record_cpu_mem_metrics_for_proc(
  324. self, proc: psutil.Process, module_name: str = ""
  325. ):
  326. labels = {
  327. "ip": self.ip,
  328. "pid": proc.pid,
  329. "Version": ray.__version__,
  330. "Component": "dashboard" if not module_name else "dashboard_" + module_name,
  331. "SessionName": self.session_name,
  332. }
  333. proc_attrs = proc.as_dict(attrs=["cpu_percent", "memory_full_info"])
  334. self.metrics.metrics_dashboard_cpu.labels(**labels).set(
  335. float(proc_attrs.get("cpu_percent", 0.0))
  336. )
  337. # memory_full_info is None on Mac due to the permission issue
  338. # (https://github.com/giampaolo/psutil/issues/883)
  339. if proc_attrs.get("memory_full_info") is not None:
  340. self.metrics.metrics_dashboard_mem_uss.labels(**labels).set(
  341. float(proc_attrs.get("memory_full_info").uss) / 1.0e6
  342. )
  343. self.metrics.metrics_dashboard_mem_rss.labels(**labels).set(
  344. float(proc_attrs.get("memory_full_info").rss) / 1.0e6
  345. )
  346. async def run(self):
  347. gcs_address = self.gcs_address
  348. # Dashboard will handle connection failure automatically
  349. self.gcs_client = GcsClient(address=gcs_address, cluster_id=self.cluster_id_hex)
  350. internal_kv._initialize_internal_kv(self.gcs_client)
  351. dashboard_head_modules, subprocess_module_handles = self._load_modules(
  352. self._modules_to_load
  353. )
  354. # Parallel start all subprocess modules.
  355. for handle in subprocess_module_handles:
  356. handle.start_module()
  357. # Wait for all subprocess modules to be ready.
  358. for handle in subprocess_module_handles:
  359. handle.wait_for_module_ready()
  360. if not self.minimal:
  361. self.metrics = await self._setup_metrics(self.gcs_client)
  362. self._event_loop_lag_s_max: Optional[float] = None
  363. def on_new_lag(lag_s):
  364. # Record the lag. It's exported in `record_dashboard_metrics`
  365. self._event_loop_lag_s_max = max(self._event_loop_lag_s_max or 0, lag_s)
  366. enable_monitor_loop_lag(on_new_lag)
  367. self.record_dashboard_metrics_task = asyncio.create_task(
  368. self._record_dashboard_metrics(subprocess_module_handles)
  369. )
  370. try:
  371. assert internal_kv._internal_kv_initialized()
  372. # Note: We always record the usage, but it is not reported
  373. # if the usage stats is disabled.
  374. record_extra_usage_tag(TagKey.DASHBOARD_USED, "False")
  375. except Exception as e:
  376. logger.warning(
  377. "Failed to record the dashboard usage. "
  378. "This error message is harmless and can be ignored. "
  379. f"Error: {e}"
  380. )
  381. http_host, http_port = self.http_host, self.http_port
  382. if self.serve_frontend:
  383. logger.info("Initialize the http server.")
  384. await self._configure_http_server(
  385. dashboard_head_modules, subprocess_module_handles
  386. )
  387. http_host, http_port = self.http_server.get_address()
  388. logger.info(
  389. f"http server initialized at {build_address(http_host, http_port)}"
  390. )
  391. else:
  392. logger.info("http server disabled.")
  393. # We need to expose dashboard's node's ip for other worker nodes
  394. # if it's listening to all interfaces.
  395. dashboard_http_host = (
  396. self.ip
  397. if self.http_host != ray_constants.DEFAULT_DASHBOARD_IP
  398. else http_host
  399. )
  400. # This synchronous code inside an async context is not great.
  401. # It is however acceptable, because this only gets run once
  402. # during initialization and therefore cannot block the event loop.
  403. # This could be done better in the future, including
  404. # removing the polling on the Ray side, by communicating the
  405. # server address to Ray via stdin / stdout or a pipe.
  406. self.gcs_client.internal_kv_put(
  407. ray_constants.DASHBOARD_ADDRESS.encode(),
  408. build_address(dashboard_http_host, http_port).encode(),
  409. True,
  410. namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
  411. )
  412. concurrent_tasks = [
  413. self._gcs_check_alive(),
  414. ]
  415. for m in dashboard_head_modules:
  416. concurrent_tasks.append(m.run())
  417. await asyncio.gather(*concurrent_tasks)
  418. if self.http_server:
  419. await self.http_server.cleanup()