monitor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. """Autoscaler monitoring loop daemon.
  2. See autoscaler._private/monitor.py for the legacy implementation. All the legacy flags
  3. are supported here, but the new implementation uses the new autoscaler v2.
  4. """
  5. import argparse
  6. import logging
  7. import os
  8. import sys
  9. import time
  10. from typing import Optional
  11. import ray
  12. import ray._private.ray_constants as ray_constants
  13. from ray._common.network_utils import build_address, parse_address
  14. from ray._common.ray_constants import (
  15. LOGGING_ROTATE_BACKUP_COUNT,
  16. LOGGING_ROTATE_BYTES,
  17. )
  18. from ray._common.usage.usage_lib import record_extra_usage_tag
  19. from ray._private import logging_utils
  20. from ray._private.event.event_logger import get_event_logger
  21. from ray._private.ray_logging import setup_component_logger
  22. from ray._private.worker import SCRIPT_MODE
  23. from ray._raylet import GcsClient
  24. from ray.autoscaler._private.constants import (
  25. AUTOSCALER_METRIC_PORT,
  26. AUTOSCALER_UPDATE_INTERVAL_S,
  27. )
  28. from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
  29. from ray.autoscaler.v2.autoscaler import Autoscaler
  30. from ray.autoscaler.v2.event_logger import AutoscalerEventLogger
  31. from ray.autoscaler.v2.instance_manager.config import (
  32. FileConfigReader,
  33. IConfigReader,
  34. ReadOnlyProviderConfigReader,
  35. )
  36. from ray.autoscaler.v2.metrics_reporter import AutoscalerMetricsReporter
  37. from ray.core.generated.autoscaler_pb2 import AutoscalingState
  38. from ray.core.generated.event_pb2 import Event as RayEvent
  39. from ray.core.generated.usage_pb2 import TagKey
  40. try:
  41. import prometheus_client
  42. except ImportError:
  43. prometheus_client = None
  44. logger = logging.getLogger(__name__)
  45. class AutoscalerMonitor:
  46. """Autoscaling monitor.
  47. This process periodically collects stats from the GCS and triggers
  48. autoscaler updates.
  49. TODO:
  50. We should also handle autoscaler failures properly in the future.
  51. Right now, we don't restart autoscaler if it fails (internal reconciliation
  52. however, should not fail the autoscaler process).
  53. With the Reconciler able to handle extra cloud instances, we could in fact
  54. recover the autoscaler process from reconciliation.
  55. """
  56. def __init__(
  57. self,
  58. address: str,
  59. config_reader: IConfigReader,
  60. log_dir: Optional[str] = None,
  61. monitor_ip: Optional[str] = None,
  62. ):
  63. # Record v2 usage (we do this as early as possible to capture usage)
  64. record_autoscaler_v2_usage(GcsClient(address))
  65. self.gcs_address = address
  66. worker = ray._private.worker.global_worker
  67. # TODO: eventually plumb ClusterID through to here
  68. self.gcs_client = GcsClient(address=self.gcs_address)
  69. if monitor_ip:
  70. monitor_addr = build_address(monitor_ip, AUTOSCALER_METRIC_PORT)
  71. self.gcs_client.internal_kv_put(
  72. b"AutoscalerMetricsAddress", monitor_addr.encode(), True, None
  73. )
  74. self._session_name = self._get_session_name(self.gcs_client)
  75. logger.info(f"session_name: {self._session_name}")
  76. worker.set_mode(SCRIPT_MODE)
  77. head_node_ip = parse_address(self.gcs_address)[0]
  78. self.autoscaler = None
  79. if log_dir:
  80. try:
  81. ray_event_logger = get_event_logger(
  82. RayEvent.SourceType.AUTOSCALER, log_dir
  83. )
  84. self.event_logger = AutoscalerEventLogger(ray_event_logger)
  85. except Exception:
  86. self.event_logger = None
  87. else:
  88. self.event_logger = None
  89. prom_metrics = AutoscalerPrometheusMetrics(session_name=self._session_name)
  90. self.metric_reporter = AutoscalerMetricsReporter(prom_metrics)
  91. if monitor_ip and prometheus_client:
  92. # If monitor_ip wasn't passed in, then don't attempt to start the
  93. # metric server to keep behavior identical to before metrics were
  94. # introduced
  95. try:
  96. logger.info(
  97. "Starting autoscaler metrics server on port {}".format(
  98. AUTOSCALER_METRIC_PORT
  99. )
  100. )
  101. kwargs = {"addr": "127.0.0.1"} if head_node_ip == "127.0.0.1" else {}
  102. prometheus_client.start_http_server(
  103. port=AUTOSCALER_METRIC_PORT,
  104. registry=prom_metrics.registry,
  105. **kwargs,
  106. )
  107. except Exception:
  108. logger.exception(
  109. "An exception occurred while starting the metrics server."
  110. )
  111. elif not prometheus_client:
  112. logger.warning(
  113. "`prometheus_client` not found, so metrics will not be exported."
  114. )
  115. self.autoscaler = Autoscaler(
  116. session_name=self._session_name,
  117. config_reader=config_reader,
  118. gcs_client=self.gcs_client,
  119. event_logger=self.event_logger,
  120. metrics_reporter=self.metric_reporter,
  121. )
  122. @staticmethod
  123. def _get_session_name(gcs_client: GcsClient) -> Optional[str]:
  124. """Obtain the session name from the GCS.
  125. If the GCS doesn't respond, session name is considered None.
  126. In this case, the metrics reported from the monitor won't have
  127. the correct session name.
  128. """
  129. session_name = gcs_client.internal_kv_get(
  130. b"session_name",
  131. ray_constants.KV_NAMESPACE_SESSION,
  132. timeout=10,
  133. )
  134. if session_name:
  135. session_name = session_name.decode()
  136. return session_name
  137. @staticmethod
  138. def _report_autoscaling_state(
  139. gcs_client: GcsClient, autoscaling_state: AutoscalingState
  140. ):
  141. """Report the autoscaling state to the GCS."""
  142. try:
  143. gcs_client.report_autoscaling_state(autoscaling_state.SerializeToString())
  144. except Exception:
  145. logger.exception("Error reporting autoscaling state to GCS.")
  146. def _run(self):
  147. """Run the monitor loop."""
  148. while True:
  149. autoscaling_state = self.autoscaler.update_autoscaling_state()
  150. if autoscaling_state:
  151. # report autoscaling state
  152. self._report_autoscaling_state(self.gcs_client, autoscaling_state)
  153. else:
  154. logger.warning("No autoscaling state to report.")
  155. # Wait for a autoscaler update interval before processing the next
  156. # round of messages.
  157. time.sleep(AUTOSCALER_UPDATE_INTERVAL_S)
  158. def run(self):
  159. try:
  160. self._run()
  161. except Exception:
  162. logger.exception("Error in monitor loop")
  163. raise
  164. def record_autoscaler_v2_usage(gcs_client: GcsClient) -> None:
  165. """
  166. Record usage for autoscaler v2.
  167. """
  168. try:
  169. record_extra_usage_tag(TagKey.AUTOSCALER_VERSION, "v2", gcs_client)
  170. except Exception:
  171. logger.exception("Error recording usage for autoscaler v2.")
  172. if __name__ == "__main__":
  173. parser = argparse.ArgumentParser(
  174. description=("Parse GCS server for the monitor to connect to.")
  175. )
  176. parser.add_argument(
  177. "--gcs-address", required=False, type=str, help="The address (ip:port) of GCS."
  178. )
  179. parser.add_argument(
  180. "--autoscaling-config",
  181. required=False,
  182. type=str,
  183. help="the path to the autoscaling config file",
  184. )
  185. parser.add_argument(
  186. "--logging-level",
  187. required=False,
  188. type=str,
  189. default=ray_constants.LOGGER_LEVEL,
  190. choices=ray_constants.LOGGER_LEVEL_CHOICES,
  191. help=ray_constants.LOGGER_LEVEL_HELP,
  192. )
  193. parser.add_argument(
  194. "--logging-format",
  195. required=False,
  196. type=str,
  197. default=ray_constants.LOGGER_FORMAT,
  198. help=ray_constants.LOGGER_FORMAT_HELP,
  199. )
  200. parser.add_argument(
  201. "--logging-filename",
  202. required=False,
  203. type=str,
  204. default=ray_constants.MONITOR_LOG_FILE_NAME,
  205. help="Specify the name of log file, "
  206. "log to stdout if set empty, default is "
  207. f'"{ray_constants.MONITOR_LOG_FILE_NAME}"',
  208. )
  209. parser.add_argument(
  210. "--logs-dir",
  211. required=True,
  212. type=str,
  213. help="Specify the path of the temporary directory used by Ray processes.",
  214. )
  215. parser.add_argument(
  216. "--logging-rotate-bytes",
  217. required=False,
  218. type=int,
  219. default=LOGGING_ROTATE_BYTES,
  220. help="Specify the max bytes for rotating "
  221. "log file, default is "
  222. f"{LOGGING_ROTATE_BYTES} bytes.",
  223. )
  224. parser.add_argument(
  225. "--logging-rotate-backup-count",
  226. required=False,
  227. type=int,
  228. default=LOGGING_ROTATE_BACKUP_COUNT,
  229. help="Specify the backup count of rotated log file, default is "
  230. f"{LOGGING_ROTATE_BACKUP_COUNT}.",
  231. )
  232. parser.add_argument(
  233. "--monitor-ip",
  234. required=False,
  235. type=str,
  236. default=None,
  237. help="The IP address of the machine hosting the monitor process.",
  238. )
  239. parser.add_argument(
  240. "--stdout-filepath",
  241. required=False,
  242. type=str,
  243. default="",
  244. help="The filepath to dump monitor stdout.",
  245. )
  246. parser.add_argument(
  247. "--stderr-filepath",
  248. required=False,
  249. type=str,
  250. default="",
  251. help="The filepath to dump monitor stderr.",
  252. )
  253. args = parser.parse_args()
  254. # Disable log rotation for windows platform.
  255. logging_rotation_bytes = args.logging_rotate_bytes if sys.platform != "win32" else 0
  256. logging_rotation_backup_count = (
  257. args.logging_rotate_backup_count if sys.platform != "win32" else 1
  258. )
  259. setup_component_logger(
  260. logging_level=args.logging_level,
  261. logging_format=args.logging_format,
  262. log_dir=args.logs_dir,
  263. filename=args.logging_filename,
  264. max_bytes=logging_rotation_bytes,
  265. backup_count=logging_rotation_backup_count,
  266. )
  267. # Setup stdout/stderr redirect files if redirection enabled.
  268. logging_utils.redirect_stdout_stderr_if_needed(
  269. args.stdout_filepath,
  270. args.stderr_filepath,
  271. logging_rotation_bytes,
  272. logging_rotation_backup_count,
  273. )
  274. logger.info(
  275. f"Starting autoscaler v2 monitor using ray installation: {ray.__file__}"
  276. )
  277. logger.info(f"Ray version: {ray.__version__}")
  278. logger.info(f"Ray commit: {ray.__commit__}")
  279. logger.info(f"AutoscalerMonitor started with command: {sys.argv}")
  280. gcs_address = args.gcs_address
  281. if gcs_address is None:
  282. raise ValueError("--gcs-address must be set!")
  283. if not args.autoscaling_config:
  284. logger.info("No autoscaling config provided: use read only node provider.")
  285. config_reader = ReadOnlyProviderConfigReader(gcs_address)
  286. else:
  287. autoscaling_config = os.path.expanduser(args.autoscaling_config)
  288. config_reader = FileConfigReader(
  289. config_file=autoscaling_config, skip_content_hash=True
  290. )
  291. monitor = AutoscalerMonitor(
  292. gcs_address,
  293. config_reader,
  294. log_dir=args.logs_dir,
  295. monitor_ip=args.monitor_ip,
  296. )
  297. monitor.run()