event_utils.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import asyncio
  2. import collections
  3. import fnmatch
  4. import itertools
  5. import json
  6. import logging.handlers
  7. import mmap
  8. import os
  9. import time
  10. from concurrent.futures import ThreadPoolExecutor
  11. from ray._common.utils import get_or_create_event_loop, run_background_task
  12. from ray.dashboard.modules.event import event_consts
  13. from ray.dashboard.utils import async_loop_forever
  14. logger = logging.getLogger(__name__)
  15. def _get_source_files(event_dir, source_types=None, event_file_filter=None):
  16. event_log_names = os.listdir(event_dir)
  17. source_files = {}
  18. all_source_types = set(event_consts.EVENT_SOURCE_ALL)
  19. for source_type in source_types or event_consts.EVENT_SOURCE_ALL:
  20. assert source_type in all_source_types, f"Invalid source type: {source_type}"
  21. files = []
  22. for n in event_log_names:
  23. if fnmatch.fnmatch(n, f"*{source_type}*.log"):
  24. f = os.path.join(event_dir, n)
  25. if event_file_filter is not None and not event_file_filter(f):
  26. continue
  27. files.append(f)
  28. if files:
  29. source_files[source_type] = files
  30. return source_files
  31. def _restore_newline(event_dict):
  32. try:
  33. event_dict["message"] = (
  34. event_dict["message"].replace("\\n", "\n").replace("\\r", "\n")
  35. )
  36. except Exception:
  37. logger.exception("Restore newline for event failed: %s", event_dict)
  38. return event_dict
  39. def _parse_line(event_str):
  40. return _restore_newline(json.loads(event_str))
  41. def parse_event_strings(event_string_list):
  42. events = []
  43. for data in event_string_list:
  44. if not data:
  45. continue
  46. try:
  47. event = _parse_line(data)
  48. events.append(event)
  49. except Exception:
  50. logger.exception("Parse event line failed: %s", repr(data))
  51. return events
  52. ReadFileResult = collections.namedtuple(
  53. "ReadFileResult", ["fid", "size", "mtime", "position", "lines"]
  54. )
  55. def _read_file(
  56. file, pos, n_lines=event_consts.EVENT_READ_LINE_COUNT_LIMIT, closefd=True
  57. ):
  58. with open(file, "rb", closefd=closefd) as f:
  59. # The ino may be 0 on Windows.
  60. stat = os.stat(f.fileno())
  61. fid = stat.st_ino or file
  62. lines = []
  63. with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
  64. start = pos
  65. for _ in range(n_lines):
  66. sep = mm.find(b"\n", start)
  67. if sep == -1:
  68. break
  69. if sep - start <= event_consts.EVENT_READ_LINE_LENGTH_LIMIT:
  70. lines.append(mm[start:sep].decode("utf-8"))
  71. else:
  72. truncated_size = min(100, event_consts.EVENT_READ_LINE_LENGTH_LIMIT)
  73. logger.warning(
  74. "Ignored long string: %s...(%s chars)",
  75. mm[start : start + truncated_size].decode("utf-8"),
  76. sep - start,
  77. )
  78. start = sep + 1
  79. return ReadFileResult(fid, stat.st_size, stat.st_mtime, start, lines)
  80. def monitor_events(
  81. event_dir,
  82. callback,
  83. monitor_thread_pool_executor: ThreadPoolExecutor,
  84. scan_interval_seconds=event_consts.SCAN_EVENT_DIR_INTERVAL_SECONDS,
  85. start_mtime=time.time() + event_consts.SCAN_EVENT_START_OFFSET_SECONDS,
  86. monitor_files=None,
  87. source_types=None,
  88. ):
  89. """Monitor events in directory. New events will be read and passed to the
  90. callback.
  91. Args:
  92. event_dir: The event log directory.
  93. callback (def callback(List[str]): pass): A callback accepts a list of
  94. event strings.
  95. monitor_thread_pool_executor: A thread pool exector to monitor/update
  96. events. None means it will use the default execturo which uses
  97. num_cpus of the machine * 5 threads (before python 3.8) or
  98. min(32, num_cpus + 5) (from Python 3.8).
  99. scan_interval_seconds: An interval seconds between two scans.
  100. start_mtime: Only the event log files whose last modification
  101. time is greater than start_mtime are monitored.
  102. monitor_files (Dict[int, MonitorFile]): The map from event log file id
  103. to MonitorFile object. Monitor all files start from the beginning
  104. if the value is None.
  105. source_types (List[str]): A list of source type name from
  106. event_pb2.Event.SourceType.keys(). Monitor all source types if the
  107. value is None.
  108. """
  109. loop = get_or_create_event_loop()
  110. if monitor_files is None:
  111. monitor_files = {}
  112. logger.info(
  113. "Monitor events logs modified after %s on %s, the source types are %s.",
  114. start_mtime,
  115. event_dir,
  116. "all" if source_types is None else source_types,
  117. )
  118. MonitorFile = collections.namedtuple("MonitorFile", ["size", "mtime", "position"])
  119. def _source_file_filter(source_file):
  120. stat = os.stat(source_file)
  121. return stat.st_mtime > start_mtime
  122. def _read_monitor_file(file, pos):
  123. assert isinstance(
  124. file, str
  125. ), f"File should be a str, but a {type(file)}({file}) found"
  126. fd = os.open(file, os.O_RDONLY)
  127. try:
  128. stat = os.stat(fd)
  129. # Check the file size to avoid raising the exception
  130. # ValueError: cannot mmap an empty file
  131. if stat.st_size <= 0:
  132. return []
  133. fid = stat.st_ino or file
  134. monitor_file = monitor_files.get(fid)
  135. if monitor_file:
  136. if (
  137. monitor_file.position == monitor_file.size
  138. and monitor_file.size == stat.st_size
  139. and monitor_file.mtime == stat.st_mtime
  140. ):
  141. logger.debug(
  142. "Skip reading the file because there is no change: %s", file
  143. )
  144. return []
  145. position = monitor_file.position
  146. else:
  147. logger.info("Found new event log file: %s", file)
  148. position = pos
  149. # Close the fd in finally.
  150. r = _read_file(fd, position, closefd=False)
  151. # It should be fine to update the dict in executor thread.
  152. monitor_files[r.fid] = MonitorFile(r.size, r.mtime, r.position)
  153. loop.call_soon_threadsafe(callback, r.lines)
  154. except Exception as e:
  155. raise Exception(f"Read event file failed: {file}") from e
  156. finally:
  157. os.close(fd)
  158. @async_loop_forever(scan_interval_seconds, cancellable=True)
  159. async def _scan_event_log_files():
  160. # Scan event files.
  161. source_files = await loop.run_in_executor(
  162. monitor_thread_pool_executor,
  163. _get_source_files,
  164. event_dir,
  165. source_types,
  166. _source_file_filter,
  167. )
  168. # Limit concurrent read to avoid fd exhaustion.
  169. semaphore = asyncio.Semaphore(event_consts.CONCURRENT_READ_LIMIT)
  170. async def _concurrent_coro(filename):
  171. async with semaphore:
  172. return await loop.run_in_executor(
  173. monitor_thread_pool_executor, _read_monitor_file, filename, 0
  174. )
  175. # Read files.
  176. await asyncio.gather(
  177. *[
  178. _concurrent_coro(filename)
  179. for filename in list(itertools.chain(*source_files.values()))
  180. ]
  181. )
  182. return run_background_task(_scan_event_log_files())