monitor.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. """Autoscaler monitoring loop daemon."""
  2. import argparse
  3. import json
  4. import logging
  5. import os
  6. import signal
  7. import sys
  8. import time
  9. import traceback
  10. from collections import Counter
  11. from dataclasses import asdict
  12. from typing import Any, Callable, Dict, Optional, Union
  13. import ray
  14. import ray._private.ray_constants as ray_constants
  15. from ray._common.network_utils import build_address, parse_address
  16. from ray._common.ray_constants import (
  17. LOGGING_ROTATE_BACKUP_COUNT,
  18. LOGGING_ROTATE_BYTES,
  19. )
  20. from ray._private import logging_utils
  21. from ray._private.event.event_logger import get_event_logger
  22. from ray._private.ray_logging import setup_component_logger
  23. from ray._raylet import GcsClient
  24. from ray.autoscaler._private.autoscaler import StandardAutoscaler
  25. from ray.autoscaler._private.commands import teardown_cluster
  26. from ray.autoscaler._private.constants import (
  27. AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE,
  28. AUTOSCALER_METRIC_PORT,
  29. AUTOSCALER_UPDATE_INTERVAL_S,
  30. DISABLE_LAUNCH_CONFIG_CHECK_KEY,
  31. )
  32. from ray.autoscaler._private.event_summarizer import EventSummarizer
  33. from ray.autoscaler._private.load_metrics import LoadMetrics
  34. from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
  35. from ray.autoscaler._private.util import format_readonly_node_type
  36. from ray.autoscaler.v2.sdk import get_cluster_resource_state
  37. from ray.core.generated import gcs_pb2
  38. from ray.core.generated.event_pb2 import Event as RayEvent
  39. from ray.experimental.internal_kv import (
  40. _initialize_internal_kv,
  41. _internal_kv_del,
  42. _internal_kv_get,
  43. _internal_kv_initialized,
  44. _internal_kv_put,
  45. )
  46. try:
  47. import prometheus_client
  48. except ImportError:
  49. prometheus_client = None
  50. logger = logging.getLogger(__name__)
  51. def parse_resource_demands(resource_load_by_shape):
  52. """Handle the message.resource_load_by_shape protobuf for the demand
  53. based autoscaling. Catch and log all exceptions so this doesn't
  54. interfere with the utilization based autoscaler until we're confident
  55. this is stable. Worker queue backlogs are added to the appropriate
  56. resource demand vector.
  57. Args:
  58. resource_load_by_shape (pb2.gcs.ResourceLoad): The resource demands
  59. in protobuf form or None.
  60. Returns:
  61. List[ResourceDict]: Waiting bundles (ready and feasible).
  62. List[ResourceDict]: Infeasible bundles.
  63. """
  64. waiting_bundles, infeasible_bundles = [], []
  65. try:
  66. for resource_demand_pb in list(resource_load_by_shape.resource_demands):
  67. request_shape = dict(resource_demand_pb.shape)
  68. for _ in range(resource_demand_pb.num_ready_requests_queued):
  69. waiting_bundles.append(request_shape)
  70. for _ in range(resource_demand_pb.num_infeasible_requests_queued):
  71. infeasible_bundles.append(request_shape)
  72. # Infeasible and ready states for tasks are (logically)
  73. # mutually exclusive.
  74. if resource_demand_pb.num_infeasible_requests_queued > 0:
  75. backlog_queue = infeasible_bundles
  76. else:
  77. backlog_queue = waiting_bundles
  78. for _ in range(resource_demand_pb.backlog_size):
  79. backlog_queue.append(request_shape)
  80. if (
  81. len(waiting_bundles + infeasible_bundles)
  82. > AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
  83. ):
  84. break
  85. except Exception:
  86. logger.exception("Failed to parse resource demands.")
  87. return waiting_bundles, infeasible_bundles
  88. # Readonly provider config (e.g., for laptop mode, manually setup clusters).
  89. BASE_READONLY_CONFIG = {
  90. "cluster_name": "default",
  91. "max_workers": 0,
  92. "upscaling_speed": 1.0,
  93. "docker": {},
  94. "idle_timeout_minutes": 0,
  95. "provider": {
  96. "type": "readonly",
  97. "use_node_id_as_ip": True, # For emulated multi-node on laptop.
  98. DISABLE_LAUNCH_CONFIG_CHECK_KEY: True, # No launch check.
  99. },
  100. "auth": {},
  101. "available_node_types": {
  102. "ray.head.default": {"resources": {}, "node_config": {}, "max_workers": 0}
  103. },
  104. "head_node_type": "ray.head.default",
  105. "file_mounts": {},
  106. "cluster_synced_files": [],
  107. "file_mounts_sync_continuously": False,
  108. "rsync_exclude": [],
  109. "rsync_filter": [],
  110. "initialization_commands": [],
  111. "setup_commands": [],
  112. "head_setup_commands": [],
  113. "worker_setup_commands": [],
  114. "head_start_ray_commands": [],
  115. "worker_start_ray_commands": [],
  116. }
  117. class Monitor:
  118. """Autoscaling monitor.
  119. This process periodically collects stats from the GCS and triggers
  120. autoscaler updates.
  121. """
  122. def __init__(
  123. self,
  124. address: str,
  125. autoscaling_config: Union[str, Callable[[], Dict[str, Any]]],
  126. log_dir: str = None,
  127. prefix_cluster_info: bool = False,
  128. monitor_ip: Optional[str] = None,
  129. retry_on_failure: bool = True,
  130. ):
  131. self.gcs_address = address
  132. worker = ray._private.worker.global_worker
  133. # TODO: eventually plumb ClusterID through to here
  134. self.gcs_client = GcsClient(address=self.gcs_address)
  135. _initialize_internal_kv(self.gcs_client)
  136. if monitor_ip:
  137. monitor_addr = build_address(monitor_ip, AUTOSCALER_METRIC_PORT)
  138. self.gcs_client.internal_kv_put(
  139. b"AutoscalerMetricsAddress", monitor_addr.encode(), True, None
  140. )
  141. self._session_name = self.get_session_name(self.gcs_client)
  142. logger.info(f"session_name: {self._session_name}")
  143. worker.mode = 0
  144. head_node_ip = parse_address(self.gcs_address)[0]
  145. self.load_metrics = LoadMetrics()
  146. self.last_avail_resources = None
  147. self.event_summarizer = EventSummarizer()
  148. self.prefix_cluster_info = prefix_cluster_info
  149. self.retry_on_failure = retry_on_failure
  150. self.autoscaling_config = autoscaling_config
  151. self.autoscaler = None
  152. # If set, we are in a manually created cluster (non-autoscaling) and
  153. # simply mirroring what the GCS tells us the cluster node types are.
  154. self.readonly_config = None
  155. if log_dir:
  156. try:
  157. self.event_logger = get_event_logger(
  158. RayEvent.SourceType.AUTOSCALER, log_dir
  159. )
  160. except Exception:
  161. self.event_logger = None
  162. else:
  163. self.event_logger = None
  164. self.prom_metrics = AutoscalerPrometheusMetrics(session_name=self._session_name)
  165. if monitor_ip and prometheus_client:
  166. # If monitor_ip wasn't passed in, then don't attempt to start the
  167. # metric server to keep behavior identical to before metrics were
  168. # introduced
  169. try:
  170. logger.info(
  171. "Starting autoscaler metrics server on port {}".format(
  172. AUTOSCALER_METRIC_PORT
  173. )
  174. )
  175. kwargs = {"addr": "127.0.0.1"} if head_node_ip == "127.0.0.1" else {}
  176. prometheus_client.start_http_server(
  177. port=AUTOSCALER_METRIC_PORT,
  178. registry=self.prom_metrics.registry,
  179. **kwargs,
  180. )
  181. # Reset some gauges, since we don't know which labels have
  182. # leaked if the autoscaler was restarted.
  183. self.prom_metrics.pending_nodes.clear()
  184. self.prom_metrics.active_nodes.clear()
  185. except Exception:
  186. logger.exception(
  187. "An exception occurred while starting the metrics server."
  188. )
  189. elif not prometheus_client:
  190. logger.warning(
  191. "`prometheus_client` not found, so metrics will not be exported."
  192. )
  193. logger.info("Monitor: Started")
  194. def _initialize_autoscaler(self):
  195. if self.autoscaling_config:
  196. autoscaling_config = self.autoscaling_config
  197. else:
  198. # This config mirrors the current setup of the manually created
  199. # cluster. Each node gets its own unique node type.
  200. self.readonly_config = BASE_READONLY_CONFIG
  201. # Note that the "available_node_types" of the config can change.
  202. def get_latest_readonly_config():
  203. return self.readonly_config
  204. autoscaling_config = get_latest_readonly_config
  205. self.autoscaler = StandardAutoscaler(
  206. autoscaling_config,
  207. self.load_metrics,
  208. self.gcs_client,
  209. self._session_name,
  210. prefix_cluster_info=self.prefix_cluster_info,
  211. event_summarizer=self.event_summarizer,
  212. prom_metrics=self.prom_metrics,
  213. )
  214. def update_load_metrics(self):
  215. """Fetches resource usage data from GCS and updates load metrics."""
  216. # TODO(jinbum-kim): Still needed since some fields aren't in cluster_resource_state.
  217. # Remove after v1 autoscaler fully migrates to get_cluster_resource_state().
  218. # ref: https://github.com/ray-project/ray/pull/57130
  219. response = self.gcs_client.get_all_resource_usage(timeout=60)
  220. resources_batch_data = response.resource_usage_data
  221. log_resource_batch_data_if_desired(resources_batch_data)
  222. # This is a workaround to get correct idle_duration_ms
  223. # from "get_cluster_resource_state"
  224. # ref: https://github.com/ray-project/ray/pull/48519#issuecomment-2481659346
  225. cluster_resource_state = get_cluster_resource_state(self.gcs_client)
  226. ray_node_states = cluster_resource_state.node_states
  227. ray_nodes_idle_duration_ms_by_id = {
  228. node.node_id: node.idle_duration_ms for node in ray_node_states
  229. }
  230. # Tell the readonly node provider what nodes to report.
  231. if self.readonly_config:
  232. new_nodes = []
  233. for msg in list(cluster_resource_state.node_states):
  234. node_id = msg.node_id.hex()
  235. new_nodes.append((node_id, msg.node_ip_address))
  236. self.autoscaler.provider._set_nodes(new_nodes)
  237. waiting_bundles, infeasible_bundles = parse_resource_demands(
  238. resources_batch_data.resource_load_by_shape
  239. )
  240. pending_placement_groups = list(
  241. resources_batch_data.placement_group_load.placement_group_data
  242. )
  243. mirror_node_types = {}
  244. for resource_message in cluster_resource_state.node_states:
  245. node_id = resource_message.node_id
  246. # Generate node type config based on GCS reported node list.
  247. if self.readonly_config:
  248. # Keep prefix in sync with ReadonlyNodeProvider.
  249. node_type = format_readonly_node_type(node_id.hex())
  250. resources = {}
  251. for k, v in resource_message.total_resources.items():
  252. resources[k] = v
  253. mirror_node_types[node_type] = {
  254. "resources": resources,
  255. "node_config": {},
  256. "max_workers": 1,
  257. }
  258. total_resources = dict(resource_message.total_resources)
  259. available_resources = dict(resource_message.available_resources)
  260. use_node_id_as_ip = self.autoscaler is not None and self.autoscaler.config[
  261. "provider"
  262. ].get("use_node_id_as_ip", False)
  263. # "use_node_id_as_ip" is a hack meant to address situations in
  264. # which there's more than one Ray node residing at a given ip.
  265. # TODO (Dmitri): Stop using ips as node identifiers.
  266. # https://github.com/ray-project/ray/issues/19086
  267. if use_node_id_as_ip:
  268. peloton_id = total_resources.get("NODE_ID_AS_RESOURCE")
  269. # Legacy support https://github.com/ray-project/ray/pull/17312
  270. if peloton_id is not None:
  271. ip = str(int(peloton_id))
  272. else:
  273. ip = node_id.hex()
  274. else:
  275. ip = resource_message.node_ip_address
  276. idle_duration_s = 0.0
  277. if node_id in ray_nodes_idle_duration_ms_by_id:
  278. idle_duration_s = ray_nodes_idle_duration_ms_by_id[node_id] / 1000
  279. else:
  280. logger.warning(
  281. f"node_id {node_id} not found in ray_nodes_idle_duration_ms_by_id"
  282. )
  283. self.load_metrics.update(
  284. ip,
  285. node_id,
  286. total_resources,
  287. available_resources,
  288. idle_duration_s,
  289. waiting_bundles,
  290. infeasible_bundles,
  291. pending_placement_groups,
  292. )
  293. if self.readonly_config:
  294. self.readonly_config["available_node_types"].update(mirror_node_types)
  295. def get_session_name(self, gcs_client: GcsClient) -> Optional[str]:
  296. """Obtain the session name from the GCS.
  297. If the GCS doesn't respond, session name is considered None.
  298. In this case, the metrics reported from the monitor won't have
  299. the correct session name.
  300. """
  301. if not _internal_kv_initialized():
  302. return None
  303. session_name = gcs_client.internal_kv_get(
  304. b"session_name",
  305. ray_constants.KV_NAMESPACE_SESSION,
  306. timeout=10,
  307. )
  308. if session_name:
  309. session_name = session_name.decode()
  310. return session_name
  311. def update_resource_requests(self):
  312. """Fetches resource requests from the internal KV and updates load."""
  313. if not _internal_kv_initialized():
  314. return
  315. data = _internal_kv_get(
  316. ray._private.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL
  317. )
  318. if data:
  319. try:
  320. resource_request = json.loads(data)
  321. self.load_metrics.set_resource_requests(resource_request)
  322. except Exception:
  323. logger.exception("Error parsing resource requests")
  324. def _run(self):
  325. """Run the monitor loop."""
  326. while True:
  327. try:
  328. gcs_request_start_time = time.time()
  329. self.update_load_metrics()
  330. gcs_request_time = time.time() - gcs_request_start_time
  331. self.update_resource_requests()
  332. self.update_event_summary()
  333. load_metrics_summary = self.load_metrics.summary()
  334. status = {
  335. "gcs_request_time": gcs_request_time,
  336. "time": time.time(),
  337. "monitor_pid": os.getpid(),
  338. }
  339. if self.autoscaler and not self.load_metrics:
  340. # load_metrics is Falsey iff we haven't collected any
  341. # resource messages from the GCS, which can happen at startup if
  342. # the GCS hasn't yet received data from the Raylets.
  343. # In this case, do not do an autoscaler update.
  344. # Wait to get load metrics.
  345. logger.info(
  346. "Autoscaler has not yet received load metrics. Waiting."
  347. )
  348. elif self.autoscaler:
  349. # Process autoscaling actions
  350. update_start_time = time.time()
  351. self.autoscaler.update()
  352. status["autoscaler_update_time"] = time.time() - update_start_time
  353. autoscaler_summary = self.autoscaler.summary()
  354. try:
  355. self.emit_metrics(
  356. load_metrics_summary,
  357. autoscaler_summary,
  358. self.autoscaler.all_node_types,
  359. )
  360. except Exception:
  361. logger.exception("Error emitting metrics")
  362. if autoscaler_summary:
  363. status["autoscaler_report"] = asdict(autoscaler_summary)
  364. status[
  365. "non_terminated_nodes_time"
  366. ] = (
  367. self.autoscaler.non_terminated_nodes.non_terminated_nodes_time # noqa: E501
  368. )
  369. for msg in self.event_summarizer.summary():
  370. # Need to prefix each line of the message for the lines to
  371. # get pushed to the driver logs.
  372. for line in msg.split("\n"):
  373. logger.info(
  374. "{}{}".format(
  375. ray_constants.LOG_PREFIX_EVENT_SUMMARY, line
  376. )
  377. )
  378. if self.event_logger:
  379. self.event_logger.info(line)
  380. self.event_summarizer.clear()
  381. status["load_metrics_report"] = asdict(load_metrics_summary)
  382. as_json = json.dumps(status)
  383. if _internal_kv_initialized():
  384. _internal_kv_put(
  385. ray_constants.DEBUG_AUTOSCALING_STATUS, as_json, overwrite=True
  386. )
  387. except Exception:
  388. # By default, do not exit the monitor on failure.
  389. if self.retry_on_failure:
  390. logger.exception("Monitor: Execution exception. Trying again...")
  391. else:
  392. raise
  393. # Wait for a autoscaler update interval before processing the next
  394. # round of messages.
  395. time.sleep(AUTOSCALER_UPDATE_INTERVAL_S)
  396. def emit_metrics(self, load_metrics_summary, autoscaler_summary, node_types):
  397. if autoscaler_summary is None:
  398. return None
  399. for resource_name in ["CPU", "GPU", "TPU"]:
  400. _, total = load_metrics_summary.usage.get(resource_name, (0, 0))
  401. pending = autoscaler_summary.pending_resources.get(resource_name, 0)
  402. self.prom_metrics.cluster_resources.labels(
  403. resource=resource_name,
  404. SessionName=self.prom_metrics.session_name,
  405. ).set(total)
  406. self.prom_metrics.pending_resources.labels(
  407. resource=resource_name,
  408. SessionName=self.prom_metrics.session_name,
  409. ).set(pending)
  410. pending_node_count = Counter()
  411. for _, node_type, _ in autoscaler_summary.pending_nodes:
  412. pending_node_count[node_type] += 1
  413. for node_type, count in autoscaler_summary.pending_launches.items():
  414. pending_node_count[node_type] += count
  415. for node_type in node_types:
  416. count = pending_node_count[node_type]
  417. self.prom_metrics.pending_nodes.labels(
  418. SessionName=self.prom_metrics.session_name,
  419. NodeType=node_type,
  420. ).set(count)
  421. for node_type in node_types:
  422. count = autoscaler_summary.active_nodes.get(node_type, 0)
  423. self.prom_metrics.active_nodes.labels(
  424. SessionName=self.prom_metrics.session_name,
  425. NodeType=node_type,
  426. ).set(count)
  427. failed_node_counts = Counter()
  428. for _, node_type in autoscaler_summary.failed_nodes:
  429. failed_node_counts[node_type] += 1
  430. # NOTE: This metric isn't reset with monitor resets. This means it will
  431. # only be updated when the autoscaler' node tracker remembers failed
  432. # nodes. If the node type failure is evicted from the autoscaler, the
  433. # metric may not update for a while.
  434. for node_type, count in failed_node_counts.items():
  435. self.prom_metrics.recently_failed_nodes.labels(
  436. SessionName=self.prom_metrics.session_name,
  437. NodeType=node_type,
  438. ).set(count)
  439. def update_event_summary(self):
  440. """Report the current size of the cluster.
  441. To avoid log spam, only cluster size changes (CPU, GPU or TPU count change)
  442. are reported to the event summarizer. The event summarizer will report
  443. only the latest cluster size per batch.
  444. """
  445. avail_resources = self.load_metrics.resources_avail_summary()
  446. if not self.readonly_config and avail_resources != self.last_avail_resources:
  447. self.event_summarizer.add(
  448. "Resized to {}.", # e.g., Resized to 100 CPUs, 4 GPUs, 4 TPUs.
  449. quantity=avail_resources,
  450. aggregate=lambda old, new: new,
  451. )
  452. self.last_avail_resources = avail_resources
  453. def destroy_autoscaler_workers(self):
  454. """Cleanup the autoscaler, in case of an exception in the run() method.
  455. We kill the worker nodes, but retain the head node in order to keep
  456. logs around, keeping costs minimal. This monitor process runs on the
  457. head node anyway, so this is more reliable."""
  458. if self.autoscaler is None:
  459. return # Nothing to clean up.
  460. if self.autoscaling_config is None:
  461. # This is a logic error in the program. Can't do anything.
  462. logger.error("Monitor: Cleanup failed due to lack of autoscaler config.")
  463. return
  464. logger.info("Monitor: Exception caught. Taking down workers...")
  465. clean = False
  466. while not clean:
  467. try:
  468. teardown_cluster(
  469. config_file=self.autoscaling_config,
  470. yes=True, # Non-interactive.
  471. workers_only=True, # Retain head node for logs.
  472. override_cluster_name=None,
  473. keep_min_workers=True, # Retain minimal amount of workers.
  474. )
  475. clean = True
  476. logger.info("Monitor: Workers taken down.")
  477. except Exception:
  478. logger.error("Monitor: Cleanup exception. Trying again...")
  479. time.sleep(2)
  480. def _handle_failure(self, error):
  481. if (
  482. self.autoscaler is not None
  483. and os.environ.get("RAY_AUTOSCALER_FATESHARE_WORKERS", "") == "1"
  484. ):
  485. self.autoscaler.kill_workers()
  486. # Take down autoscaler workers if necessary.
  487. self.destroy_autoscaler_workers()
  488. # Something went wrong, so push an error to all current and future
  489. # drivers.
  490. message = f"The autoscaler failed with the following error:\n{error}"
  491. if _internal_kv_initialized():
  492. _internal_kv_put(
  493. ray_constants.DEBUG_AUTOSCALING_ERROR, message, overwrite=True
  494. )
  495. from ray._private.utils import publish_error_to_driver
  496. publish_error_to_driver(
  497. ray_constants.MONITOR_DIED_ERROR,
  498. message,
  499. gcs_client=self.gcs_client,
  500. )
  501. def _signal_handler(self, sig, frame):
  502. try:
  503. self._handle_failure(
  504. f"Terminated with signal {sig}\n"
  505. + "".join(traceback.format_stack(frame))
  506. )
  507. except Exception:
  508. logger.exception("Monitor: Failure in signal handler.")
  509. sys.exit(sig + 128)
  510. def run(self):
  511. # Register signal handlers for autoscaler termination.
  512. # Signals will not be received on windows
  513. signal.signal(signal.SIGINT, self._signal_handler)
  514. signal.signal(signal.SIGTERM, self._signal_handler)
  515. try:
  516. if _internal_kv_initialized():
  517. # Delete any previous autoscaling errors.
  518. _internal_kv_del(ray_constants.DEBUG_AUTOSCALING_ERROR)
  519. self._initialize_autoscaler()
  520. self._run()
  521. except Exception:
  522. logger.exception("Error in monitor loop")
  523. self._handle_failure(traceback.format_exc())
  524. raise
  525. def log_resource_batch_data_if_desired(
  526. resources_batch_data: gcs_pb2.ResourceUsageBatchData,
  527. ) -> None:
  528. if os.getenv("AUTOSCALER_LOG_RESOURCE_BATCH_DATA") == "1":
  529. logger.info("Logging raw resource message pulled from GCS.")
  530. logger.info(resources_batch_data)
  531. logger.info("Done logging raw resource message.")
  532. if __name__ == "__main__":
  533. parser = argparse.ArgumentParser(
  534. description=("Parse GCS server for the monitor to connect to.")
  535. )
  536. parser.add_argument(
  537. "--gcs-address", required=False, type=str, help="The address (ip:port) of GCS."
  538. )
  539. parser.add_argument(
  540. "--autoscaling-config",
  541. required=False,
  542. type=str,
  543. help="the path to the autoscaling config file",
  544. )
  545. parser.add_argument(
  546. "--logging-level",
  547. required=False,
  548. type=str,
  549. default=ray_constants.LOGGER_LEVEL,
  550. choices=ray_constants.LOGGER_LEVEL_CHOICES,
  551. help=ray_constants.LOGGER_LEVEL_HELP,
  552. )
  553. parser.add_argument(
  554. "--logging-format",
  555. required=False,
  556. type=str,
  557. default=ray_constants.LOGGER_FORMAT,
  558. help=ray_constants.LOGGER_FORMAT_HELP,
  559. )
  560. parser.add_argument(
  561. "--logging-filename",
  562. required=False,
  563. type=str,
  564. default=ray_constants.MONITOR_LOG_FILE_NAME,
  565. help="Specify the name of log file, "
  566. "log to stdout if set empty, default is "
  567. f'"{ray_constants.MONITOR_LOG_FILE_NAME}"',
  568. )
  569. parser.add_argument(
  570. "--logs-dir",
  571. required=True,
  572. type=str,
  573. help="Specify the path of the temporary directory used by Ray processes.",
  574. )
  575. parser.add_argument(
  576. "--logging-rotate-bytes",
  577. required=False,
  578. type=int,
  579. default=LOGGING_ROTATE_BYTES,
  580. help="Specify the max bytes for rotating "
  581. "log file, default is "
  582. f"{LOGGING_ROTATE_BYTES} bytes.",
  583. )
  584. parser.add_argument(
  585. "--logging-rotate-backup-count",
  586. required=False,
  587. type=int,
  588. default=LOGGING_ROTATE_BACKUP_COUNT,
  589. help="Specify the backup count of rotated log file, default is "
  590. f"{LOGGING_ROTATE_BACKUP_COUNT}.",
  591. )
  592. parser.add_argument(
  593. "--monitor-ip",
  594. required=False,
  595. type=str,
  596. default=None,
  597. help="The IP address of the machine hosting the monitor process.",
  598. )
  599. parser.add_argument(
  600. "--stdout-filepath",
  601. required=False,
  602. type=str,
  603. default="",
  604. help="The filepath to dump monitor stdout.",
  605. )
  606. parser.add_argument(
  607. "--stderr-filepath",
  608. required=False,
  609. type=str,
  610. default="",
  611. help="The filepath to dump monitor stderr.",
  612. )
  613. args = parser.parse_args()
  614. # Disable log rotation for windows, because NTFS doesn't allow file deletion when there're multiple owners or borrowers, which happens to be how ray accesses log files.
  615. logging_rotation_bytes = args.logging_rotate_bytes if sys.platform != "win32" else 0
  616. logging_rotation_backup_count = (
  617. args.logging_rotate_backup_count if sys.platform != "win32" else 1
  618. )
  619. setup_component_logger(
  620. logging_level=args.logging_level,
  621. logging_format=args.logging_format,
  622. log_dir=args.logs_dir,
  623. filename=args.logging_filename,
  624. max_bytes=logging_rotation_bytes,
  625. backup_count=logging_rotation_backup_count,
  626. )
  627. # Setup stdout/stderr redirect files if redirection enabled.
  628. logging_utils.redirect_stdout_stderr_if_needed(
  629. args.stdout_filepath,
  630. args.stderr_filepath,
  631. logging_rotation_bytes,
  632. logging_rotation_backup_count,
  633. )
  634. logger.info(f"Starting monitor using ray installation: {ray.__file__}")
  635. logger.info(f"Ray version: {ray.__version__}")
  636. logger.info(f"Ray commit: {ray.__commit__}")
  637. logger.info(f"Monitor started with command: {sys.argv}")
  638. if args.autoscaling_config:
  639. autoscaling_config = os.path.expanduser(args.autoscaling_config)
  640. else:
  641. autoscaling_config = None
  642. bootstrap_address = args.gcs_address
  643. if bootstrap_address is None:
  644. raise ValueError("--gcs-address must be set!")
  645. monitor = Monitor(
  646. bootstrap_address,
  647. autoscaling_config,
  648. log_dir=args.logs_dir,
  649. monitor_ip=args.monitor_ip,
  650. )
  651. monitor.run()