log_monitor.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. import argparse
  2. import errno
  3. import glob
  4. import logging
  5. import logging.handlers
  6. import os
  7. import platform
  8. import re
  9. import shutil
  10. import sys
  11. import time
  12. import traceback
  13. from typing import Callable, List, Optional, Set
  14. import ray._private.ray_constants as ray_constants
  15. import ray._private.utils
  16. from ray._private import logging_utils
  17. from ray._private.ray_logging import setup_component_logger
  18. from ray._raylet import GcsClient
  19. # Logger for this module. It should be configured at the entry point
  20. # into the program using Ray. Ray provides a default configuration at
  21. # entry/init points.
  22. logger = logging.getLogger(__name__)
  23. # The groups are job id, and pid.
  24. WORKER_LOG_PATTERN = re.compile(r".*worker.*-([0-9a-f]+)-(\d+)")
  25. # The groups are job id.
  26. RUNTIME_ENV_SETUP_PATTERN = re.compile(r".*runtime_env_setup-(\d+).log")
  27. # Log name update interval under pressure.
  28. # We need it because log name update is CPU intensive and uses 100%
  29. # of cpu when there are many log files.
  30. LOG_NAME_UPDATE_INTERVAL_S = float(os.getenv("LOG_NAME_UPDATE_INTERVAL_S", 0.5))
  31. # Once there are more files than this threshold,
  32. # log monitor start giving backpressure to lower cpu usages.
  33. RAY_LOG_MONITOR_MANY_FILES_THRESHOLD = int(
  34. os.getenv("RAY_LOG_MONITOR_MANY_FILES_THRESHOLD", 1000)
  35. )
  36. RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED = int(
  37. os.getenv("RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED", 0)
  38. )
  39. class LogFileInfo:
  40. def __init__(
  41. self,
  42. filename=None,
  43. size_when_last_opened=None,
  44. file_position=None,
  45. file_handle=None,
  46. is_err_file=False,
  47. job_id=None,
  48. worker_pid=None,
  49. ):
  50. assert (
  51. filename is not None
  52. and size_when_last_opened is not None
  53. and file_position is not None
  54. )
  55. self.filename = filename
  56. self.size_when_last_opened = size_when_last_opened
  57. self.file_position = file_position
  58. self.file_handle = file_handle
  59. self.is_err_file = is_err_file
  60. self.job_id = job_id
  61. self.worker_pid = worker_pid
  62. self.actor_name = None
  63. self.task_name = None
  64. def reopen_if_necessary(self):
  65. """Check if the file's inode has changed and reopen it if necessary.
  66. There are a variety of reasons what we would logically consider a file
  67. would have different inodes, such as log rotation or file syncing
  68. semantics.
  69. If the file is smaller than our recorded file position, we assume it has been
  70. rotated and start reading it from the beginning.
  71. """
  72. try:
  73. open_inode = None
  74. if self.file_handle and not self.file_handle.closed:
  75. open_inode = os.fstat(self.file_handle.fileno()).st_ino
  76. new_statinfo = os.stat(self.filename)
  77. if new_statinfo.st_ino != open_inode:
  78. self.file_handle = open(self.filename, "rb")
  79. # If the new file is smaller than the last read position, assume that
  80. # the file has been rotated and read from the beginning. Else, continue
  81. # from the existing file position.
  82. if new_statinfo.st_size < self.file_position:
  83. self.file_position = 0
  84. self.file_handle.seek(self.file_position)
  85. self.size_when_last_opened = new_statinfo.st_size
  86. except Exception:
  87. logger.debug(f"file no longer exists, skip re-opening of {self.filename}")
  88. def __repr__(self):
  89. return (
  90. "FileInfo(\n"
  91. f"\tfilename: {self.filename}\n"
  92. f"\tsize_when_last_opened: {self.size_when_last_opened}\n"
  93. f"\tfile_position: {self.file_position}\n"
  94. f"\tfile_handle: {self.file_handle}\n"
  95. f"\tis_err_file: {self.is_err_file}\n"
  96. f"\tjob_id: {self.job_id}\n"
  97. f"\tworker_pid: {self.worker_pid}\n"
  98. f"\tactor_name: {self.actor_name}\n"
  99. f"\ttask_name: {self.task_name}\n"
  100. ")"
  101. )
  102. class LogMonitor:
  103. """A monitor process for monitoring Ray log files.
  104. This class maintains a list of open files and a list of closed log files. We
  105. can't simply leave all files open because we'll run out of file
  106. descriptors.
  107. The "run" method of this class will cycle between doing several things:
  108. 1. First, it will check if any new files have appeared in the log
  109. directory. If so, they will be added to the list of closed files.
  110. 2. Then, if we are unable to open any new files, we will close all of the
  111. files.
  112. 3. Then, we will open as many closed files as we can that may have new
  113. lines (judged by an increase in file size since the last time the file
  114. was opened).
  115. 4. Then we will loop through the open files and see if there are any new
  116. lines in the file. If so, we will publish them to Ray pubsub.
  117. Attributes:
  118. ip: The hostname of this machine, for grouping log messages.
  119. logs_dir: The directory that the log files are in.
  120. log_filenames: This is the set of filenames of all files in
  121. open_file_infos and closed_file_infos.
  122. open_file_infos (list[LogFileInfo]): Info for all of the open files.
  123. closed_file_infos (list[LogFileInfo]): Info for all of the closed
  124. files.
  125. can_open_more_files: True if we can still open more files and
  126. false otherwise.
  127. max_files_open: The maximum number of files that can be open.
  128. """
  129. def __init__(
  130. self,
  131. node_ip_address: str,
  132. logs_dir: str,
  133. gcs_client: GcsClient,
  134. is_proc_alive_fn: Callable[[int], bool],
  135. max_files_open: int = ray_constants.LOG_MONITOR_MAX_OPEN_FILES,
  136. gcs_address: Optional[str] = None,
  137. ):
  138. """Initialize the log monitor object."""
  139. self.ip: str = node_ip_address
  140. self.logs_dir: str = logs_dir
  141. self.gcs_client = gcs_client
  142. self.log_filenames: Set[str] = set()
  143. self.open_file_infos: List[LogFileInfo] = []
  144. self.closed_file_infos: List[LogFileInfo] = []
  145. self.can_open_more_files: bool = True
  146. self.max_files_open: int = max_files_open
  147. self.is_proc_alive_fn: Callable[[int], bool] = is_proc_alive_fn
  148. self.is_autoscaler_v2: bool = self.get_is_autoscaler_v2(gcs_address)
  149. logger.info(
  150. f"Starting log monitor with [max open files={max_files_open}],"
  151. f" [is_autoscaler_v2={self.is_autoscaler_v2}]"
  152. )
  153. def get_is_autoscaler_v2(self, gcs_address: Optional[str]) -> bool:
  154. """Check if autoscaler v2 is enabled."""
  155. if gcs_address is None:
  156. return False
  157. if not ray.experimental.internal_kv._internal_kv_initialized():
  158. ray.experimental.internal_kv._initialize_internal_kv(self.gcs_client)
  159. from ray.autoscaler.v2.utils import is_autoscaler_v2
  160. return is_autoscaler_v2()
  161. def _close_all_files(self):
  162. """Close all open files (so that we can open more)."""
  163. while len(self.open_file_infos) > 0:
  164. file_info = self.open_file_infos.pop(0)
  165. file_info.file_handle.close()
  166. file_info.file_handle = None
  167. proc_alive = True
  168. # Test if the worker process that generated the log file
  169. # is still alive. Only applies to worker processes.
  170. # For all other system components, we always assume they are alive.
  171. if (
  172. file_info.worker_pid != "raylet"
  173. and file_info.worker_pid != "gcs_server"
  174. and file_info.worker_pid != "autoscaler"
  175. and file_info.worker_pid != "runtime_env"
  176. and file_info.worker_pid is not None
  177. ):
  178. assert not isinstance(file_info.worker_pid, str), (
  179. "PID should be an int type. " f"Given PID: {file_info.worker_pid}."
  180. )
  181. proc_alive = self.is_proc_alive_fn(file_info.worker_pid)
  182. if not proc_alive:
  183. # The process is not alive any more, so move the log file
  184. # out of the log directory so glob.glob will not be slowed
  185. # by it.
  186. target = os.path.join(
  187. self.logs_dir, "old", os.path.basename(file_info.filename)
  188. )
  189. try:
  190. shutil.move(file_info.filename, target)
  191. except (IOError, OSError) as e:
  192. if e.errno == errno.ENOENT:
  193. logger.warning(
  194. f"Warning: The file {file_info.filename} was not found."
  195. )
  196. else:
  197. raise e
  198. if proc_alive:
  199. self.closed_file_infos.append(file_info)
  200. self.can_open_more_files = True
  201. def update_log_filenames(self):
  202. """Update the list of log files to monitor."""
  203. monitor_log_paths = []
  204. # output of user code is written here
  205. monitor_log_paths += glob.glob(
  206. f"{self.logs_dir}/worker*[.out|.err]"
  207. ) + glob.glob(f"{self.logs_dir}/java-worker*.log")
  208. # segfaults and other serious errors are logged here
  209. monitor_log_paths += glob.glob(f"{self.logs_dir}/raylet*.err")
  210. # monitor logs are needed to report autoscaler events
  211. # TODO(rickyx): remove this after migration.
  212. if not self.is_autoscaler_v2:
  213. # We publish monitor logs in autoscaler v1
  214. monitor_log_paths += glob.glob(f"{self.logs_dir}/monitor.log")
  215. else:
  216. # We publish autoscaler events directly in autoscaler v2
  217. monitor_log_paths += glob.glob(
  218. f"{self.logs_dir}/events/event_AUTOSCALER.log"
  219. )
  220. # If gcs server restarts, there can be multiple log files.
  221. monitor_log_paths += glob.glob(f"{self.logs_dir}/gcs_server*.err")
  222. # Add libtpu logs if they exist in the Ray container.
  223. tpu_log_dir = f"{self.logs_dir}/tpu_logs"
  224. if os.path.isdir(tpu_log_dir):
  225. monitor_log_paths += glob.glob(f"{self.logs_dir}/tpu_logs/**")
  226. # runtime_env setup process is logged here
  227. if RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED:
  228. monitor_log_paths += glob.glob(f"{self.logs_dir}/runtime_env*.log")
  229. for file_path in monitor_log_paths:
  230. if os.path.isfile(file_path) and file_path not in self.log_filenames:
  231. worker_match = WORKER_LOG_PATTERN.match(file_path)
  232. if worker_match:
  233. worker_pid = int(worker_match.group(2))
  234. else:
  235. worker_pid = None
  236. job_id = None
  237. # Perform existence check first because most file will not be
  238. # including runtime_env. This saves some cpu cycle.
  239. if "runtime_env" in file_path:
  240. runtime_env_job_match = RUNTIME_ENV_SETUP_PATTERN.match(file_path)
  241. if runtime_env_job_match:
  242. job_id = runtime_env_job_match.group(1)
  243. is_err_file = file_path.endswith("err")
  244. self.log_filenames.add(file_path)
  245. self.closed_file_infos.append(
  246. LogFileInfo(
  247. filename=file_path,
  248. size_when_last_opened=0,
  249. file_position=0,
  250. file_handle=None,
  251. is_err_file=is_err_file,
  252. job_id=job_id,
  253. worker_pid=worker_pid,
  254. )
  255. )
  256. log_filename = os.path.basename(file_path)
  257. logger.info(f"Beginning to track file {log_filename}")
  258. def open_closed_files(self):
  259. """Open some closed files if they may have new lines.
  260. Opening more files may require us to close some of the already open
  261. files.
  262. """
  263. if not self.can_open_more_files:
  264. # If we can't open any more files. Close all of the files.
  265. self._close_all_files()
  266. files_with_no_updates = []
  267. while len(self.closed_file_infos) > 0:
  268. if len(self.open_file_infos) >= self.max_files_open:
  269. self.can_open_more_files = False
  270. break
  271. file_info = self.closed_file_infos.pop(0)
  272. assert file_info.file_handle is None
  273. # Get the file size to see if it has gotten bigger since we last
  274. # opened it.
  275. try:
  276. file_size = os.path.getsize(file_info.filename)
  277. except (IOError, OSError) as e:
  278. # Catch "file not found" errors.
  279. if e.errno == errno.ENOENT:
  280. logger.warning(
  281. f"Warning: The file {file_info.filename} was not found."
  282. )
  283. self.log_filenames.remove(file_info.filename)
  284. continue
  285. raise e
  286. # If some new lines have been added to this file, try to reopen the
  287. # file.
  288. if file_size > file_info.size_when_last_opened:
  289. try:
  290. f = open(file_info.filename, "rb")
  291. except (IOError, OSError) as e:
  292. if e.errno == errno.ENOENT:
  293. logger.warning(
  294. f"Warning: The file {file_info.filename} was not found."
  295. )
  296. self.log_filenames.remove(file_info.filename)
  297. continue
  298. else:
  299. raise e
  300. f.seek(file_info.file_position)
  301. file_info.size_when_last_opened = file_size
  302. file_info.file_handle = f
  303. self.open_file_infos.append(file_info)
  304. else:
  305. files_with_no_updates.append(file_info)
  306. if len(self.open_file_infos) >= self.max_files_open:
  307. self.can_open_more_files = False
  308. # Add the files with no changes back to the list of closed files.
  309. self.closed_file_infos += files_with_no_updates
  310. def check_log_files_and_publish_updates(self):
  311. """Gets updates to the log files and publishes them.
  312. Returns:
  313. True if anything was published and false otherwise.
  314. """
  315. anything_published = False
  316. lines_to_publish = []
  317. def flush():
  318. nonlocal lines_to_publish
  319. nonlocal anything_published
  320. if len(lines_to_publish) > 0:
  321. data = {
  322. "ip": self.ip,
  323. "pid": file_info.worker_pid,
  324. "job": file_info.job_id,
  325. "is_err": file_info.is_err_file,
  326. "lines": lines_to_publish,
  327. "actor_name": file_info.actor_name,
  328. "task_name": file_info.task_name,
  329. }
  330. try:
  331. self.gcs_client.publish_logs(data)
  332. except Exception:
  333. logger.exception(f"Failed to publish log messages {data}")
  334. anything_published = True
  335. lines_to_publish = []
  336. for file_info in self.open_file_infos:
  337. assert not file_info.file_handle.closed
  338. file_info.reopen_if_necessary()
  339. max_num_lines_to_read = ray_constants.LOG_MONITOR_NUM_LINES_TO_READ
  340. for _ in range(max_num_lines_to_read):
  341. try:
  342. next_line = file_info.file_handle.readline()
  343. # Replace any characters not in UTF-8 with
  344. # a replacement character, see
  345. # https://stackoverflow.com/a/38565489/10891801
  346. next_line = next_line.decode("utf-8", "replace")
  347. if next_line == "":
  348. break
  349. next_line = next_line.rstrip("\r\n")
  350. if next_line.startswith(ray_constants.LOG_PREFIX_ACTOR_NAME):
  351. flush() # Possible change of task/actor name.
  352. file_info.actor_name = next_line.split(
  353. ray_constants.LOG_PREFIX_ACTOR_NAME, 1
  354. )[1]
  355. file_info.task_name = None
  356. elif next_line.startswith(ray_constants.LOG_PREFIX_TASK_NAME):
  357. flush() # Possible change of task/actor name.
  358. file_info.task_name = next_line.split(
  359. ray_constants.LOG_PREFIX_TASK_NAME, 1
  360. )[1]
  361. elif next_line.startswith(ray_constants.LOG_PREFIX_JOB_ID):
  362. file_info.job_id = next_line.split(
  363. ray_constants.LOG_PREFIX_JOB_ID, 1
  364. )[1]
  365. elif next_line.startswith(
  366. "Windows fatal exception: access violation"
  367. ):
  368. # We are suppressing the
  369. # 'Windows fatal exception: access violation'
  370. # message on workers on Windows here.
  371. # As far as we know it is harmless,
  372. # but is frequently popping up if Python
  373. # functions are run inside the core
  374. # worker C extension. See the investigation in
  375. # github.com/ray-project/ray/issues/18944
  376. # Also skip the following line, which is an
  377. # empty line.
  378. file_info.file_handle.readline()
  379. else:
  380. lines_to_publish.append(next_line)
  381. except Exception:
  382. logger.error(
  383. f"Error: Reading file: {file_info.filename}, "
  384. f"position: {file_info.file_info.file_handle.tell()} "
  385. "failed."
  386. )
  387. raise
  388. if file_info.file_position == 0:
  389. # make filename windows-agnostic
  390. filename = file_info.filename.replace("\\", "/")
  391. if "/raylet" in filename:
  392. file_info.worker_pid = "raylet"
  393. elif "/gcs_server" in filename:
  394. file_info.worker_pid = "gcs_server"
  395. elif "/monitor" in filename or "event_AUTOSCALER" in filename:
  396. file_info.worker_pid = "autoscaler"
  397. elif "/runtime_env" in filename:
  398. file_info.worker_pid = "runtime_env"
  399. # Record the current position in the file.
  400. file_info.file_position = file_info.file_handle.tell()
  401. flush()
  402. return anything_published
  403. def should_update_filenames(self, last_file_updated_time: float) -> bool:
  404. """Return true if filenames should be updated.
  405. This method is used to apply the backpressure on file updates because
  406. that requires heavy glob operations which use lots of CPUs.
  407. Args:
  408. last_file_updated_time: The last time filenames are updated.
  409. Returns:
  410. True if filenames should be updated. False otherwise.
  411. """
  412. elapsed_seconds = float(time.time() - last_file_updated_time)
  413. return (
  414. len(self.log_filenames) < RAY_LOG_MONITOR_MANY_FILES_THRESHOLD
  415. or elapsed_seconds > LOG_NAME_UPDATE_INTERVAL_S
  416. )
  417. def run(self):
  418. """Run the log monitor.
  419. This will scan the file system once every LOG_NAME_UPDATE_INTERVAL_S to
  420. check if there are new log files to monitor. It will also publish new
  421. log lines.
  422. """
  423. last_updated = time.time()
  424. while True:
  425. if self.should_update_filenames(last_updated):
  426. self.update_log_filenames()
  427. last_updated = time.time()
  428. self.open_closed_files()
  429. anything_published = self.check_log_files_and_publish_updates()
  430. # If nothing was published, then wait a little bit before checking
  431. # for logs to avoid using too much CPU.
  432. if not anything_published:
  433. time.sleep(0.1)
  434. def is_proc_alive(pid):
  435. # Import locally to make sure the bundled version is used if needed
  436. import psutil
  437. try:
  438. return psutil.Process(pid).is_running()
  439. except psutil.NoSuchProcess:
  440. # The process does not exist.
  441. return False
  442. if __name__ == "__main__":
  443. parser = argparse.ArgumentParser(
  444. description=("Parse GCS server address for the log monitor to connect to.")
  445. )
  446. parser.add_argument(
  447. "--gcs-address", required=False, type=str, help="The address (ip:port) of GCS."
  448. )
  449. parser.add_argument(
  450. "--node-ip-address",
  451. required=False,
  452. type=str,
  453. help="The IP address of the node.",
  454. )
  455. parser.add_argument(
  456. "--logging-level",
  457. required=False,
  458. type=str,
  459. default=ray_constants.LOGGER_LEVEL,
  460. choices=ray_constants.LOGGER_LEVEL_CHOICES,
  461. help=ray_constants.LOGGER_LEVEL_HELP,
  462. )
  463. parser.add_argument(
  464. "--logging-format",
  465. required=False,
  466. type=str,
  467. default=ray_constants.LOGGER_FORMAT,
  468. help=ray_constants.LOGGER_FORMAT_HELP,
  469. )
  470. parser.add_argument(
  471. "--logging-filename",
  472. required=False,
  473. type=str,
  474. default=ray_constants.LOG_MONITOR_LOG_FILE_NAME,
  475. help="Specify the name of log file, "
  476. "log to stderr if set empty, default is "
  477. f'"{ray_constants.LOG_MONITOR_LOG_FILE_NAME}"',
  478. )
  479. parser.add_argument(
  480. "--session-dir",
  481. required=True,
  482. type=str,
  483. help="Specify the path of the session directory used by Ray processes.",
  484. )
  485. parser.add_argument(
  486. "--logs-dir",
  487. required=True,
  488. type=str,
  489. help="Specify the path of the log directory used by Ray processes.",
  490. )
  491. parser.add_argument(
  492. "--logging-rotate-bytes",
  493. required=True,
  494. type=int,
  495. help="Specify the max bytes for rotating log file.",
  496. )
  497. parser.add_argument(
  498. "--logging-rotate-backup-count",
  499. required=True,
  500. type=int,
  501. help="Specify the backup count of rotated log file.",
  502. )
  503. parser.add_argument(
  504. "--stdout-filepath",
  505. required=False,
  506. default="",
  507. type=str,
  508. help="The filepath to dump log monitor stdout.",
  509. )
  510. parser.add_argument(
  511. "--stderr-filepath",
  512. required=False,
  513. default="",
  514. type=str,
  515. help="The filepath to dump log monitor stderr.",
  516. )
  517. args = parser.parse_args()
  518. # Disable log rotation for windows platform.
  519. logging_rotation_bytes = args.logging_rotate_bytes if sys.platform != "win32" else 0
  520. logging_rotation_backup_count = (
  521. args.logging_rotate_backup_count if sys.platform != "win32" else 1
  522. )
  523. logging_params = dict(
  524. logging_level=args.logging_level,
  525. logging_format=args.logging_format,
  526. log_dir=args.logs_dir,
  527. filename=args.logging_filename,
  528. max_bytes=logging_rotation_bytes,
  529. backup_count=logging_rotation_backup_count,
  530. )
  531. logger = setup_component_logger(**logging_params)
  532. # Setup stdout/stderr redirect files if redirection enabled
  533. logging_utils.redirect_stdout_stderr_if_needed(
  534. args.stdout_filepath,
  535. args.stderr_filepath,
  536. logging_rotation_bytes,
  537. logging_rotation_backup_count,
  538. )
  539. gcs_client = GcsClient(address=args.gcs_address)
  540. log_monitor = LogMonitor(
  541. args.node_ip_address,
  542. args.logs_dir,
  543. gcs_client,
  544. is_proc_alive,
  545. gcs_address=args.gcs_address,
  546. )
  547. try:
  548. log_monitor.run()
  549. except Exception as e:
  550. # Something went wrong, so push an error to all drivers.
  551. traceback_str = ray._private.utils.format_error_message(traceback.format_exc())
  552. message = (
  553. f"The log monitor on node {platform.node()} "
  554. f"failed with the following error:\n{traceback_str}"
  555. )
  556. ray._private.utils.publish_error_to_driver(
  557. ray_constants.LOG_MONITOR_DIED_ERROR,
  558. message,
  559. gcs_client=gcs_client,
  560. )
  561. logger.error(message)
  562. raise e