log_agent.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. import asyncio
  2. import concurrent.futures
  3. import io
  4. import logging
  5. import os
  6. from pathlib import Path
  7. from typing import Optional
  8. import grpc
  9. import ray.dashboard.modules.log.log_consts as log_consts
  10. import ray.dashboard.modules.log.log_utils as log_utils
  11. import ray.dashboard.optional_utils as dashboard_optional_utils
  12. import ray.dashboard.utils as dashboard_utils
  13. from ray._private.ray_constants import env_integer
  14. from ray.core.generated import reporter_pb2, reporter_pb2_grpc
  15. logger = logging.getLogger(__name__)
  16. routes = dashboard_optional_utils.DashboardAgentRouteTable
  17. # 64 KB
  18. BLOCK_SIZE = 1 << 16
  19. # Keep-alive interval for reading the file
  20. DEFAULT_KEEP_ALIVE_INTERVAL_SEC = 1
  21. RAY_DASHBOARD_LOG_TASK_LOG_SEARCH_MAX_WORKER_COUNT = env_integer(
  22. "RAY_DASHBOARD_LOG_TASK_LOG_SEARCH_MAX_WORKER_COUNT", default=2
  23. )
  24. def find_offset_of_content_in_file(
  25. file: io.BufferedIOBase, content: bytes, start_offset: int = 0
  26. ) -> int:
  27. """Find the offset of the first occurrence of content in a file.
  28. Args:
  29. file: File object
  30. content: Content to find
  31. start_offset: Start offset to read from, inclusive.
  32. Returns:
  33. Offset of the first occurrence of content in a file.
  34. """
  35. logger.debug(f"Finding offset of content {content} in file")
  36. file.seek(start_offset, io.SEEK_SET) # move file pointer to start of file
  37. offset = start_offset
  38. while True:
  39. # Read in block
  40. block_data = file.read(BLOCK_SIZE)
  41. if block_data == b"":
  42. # Stop reading
  43. return -1
  44. # Find the offset of the first occurrence of content in the block
  45. block_offset = block_data.find(content)
  46. if block_offset != -1:
  47. # Found the offset in the block
  48. return offset + block_offset
  49. # Continue reading
  50. offset += len(block_data)
  51. def find_end_offset_file(file: io.BufferedIOBase) -> int:
  52. """
  53. Find the offset of the end of a file without changing the file pointer.
  54. Args:
  55. file: File object
  56. Returns:
  57. Offset of the end of a file.
  58. """
  59. old_pos = file.tell() # store old position
  60. file.seek(0, io.SEEK_END) # move file pointer to end of file
  61. end = file.tell() # return end of file offset
  62. file.seek(old_pos, io.SEEK_SET)
  63. return end
  64. def find_end_offset_next_n_lines_from_offset(
  65. file: io.BufferedIOBase, start_offset: int, n: int
  66. ) -> int:
  67. """
  68. Find the offsets of next n lines from a start offset.
  69. Args:
  70. file: File object
  71. start_offset: Start offset to read from, inclusive.
  72. n: Number of lines to find.
  73. Returns:
  74. Offset of the end of the next n line (exclusive)
  75. """
  76. file.seek(start_offset) # move file pointer to start offset
  77. end_offset = None
  78. for _ in range(n): # loop until we find n lines or reach end of file
  79. line = file.readline() # read a line and consume new line character
  80. if not line: # end of file
  81. break
  82. end_offset = file.tell() # end offset.
  83. logger.debug(f"Found next {n} lines from {start_offset} offset")
  84. return (
  85. end_offset if end_offset is not None else file.seek(0, io.SEEK_END)
  86. ) # return last line offset or end of file offset if no lines found
  87. def find_start_offset_last_n_lines_from_offset(
  88. file: io.BufferedIOBase, offset: int, n: int, block_size: int = BLOCK_SIZE
  89. ) -> int:
  90. """
  91. Find the offset of the beginning of the line of the last X lines from an offset.
  92. Args:
  93. file: File object
  94. offset: Start offset from which to find last X lines, -1 means end of file.
  95. The offset is exclusive, i.e. data at the offset is not included
  96. in the result.
  97. n: Number of lines to find
  98. block_size: Block size to read from file
  99. Returns:
  100. Offset of the beginning of the line of the last X lines from a start offset.
  101. """
  102. logger.debug(f"Finding last {n} lines from {offset} offset")
  103. if offset == -1:
  104. offset = file.seek(0, io.SEEK_END) # move file pointer to end of file
  105. else:
  106. file.seek(offset, io.SEEK_SET) # move file pointer to start offset
  107. if n == 0:
  108. return offset
  109. nbytes_from_end = (
  110. 0 # Number of bytes that should be tailed from the end of the file
  111. )
  112. # Non new line terminating offset, adjust the line count and treat the non-newline
  113. # terminated line as the last line. e.g. line 1\nline 2
  114. file.seek(max(0, offset - 1), os.SEEK_SET)
  115. if file.read(1) != b"\n":
  116. n -= 1
  117. # Remaining number of lines to tail
  118. lines_more = n
  119. read_offset = max(0, offset - block_size)
  120. # So that we know how much to read on the last block (the block 0)
  121. prev_offset = offset
  122. while lines_more >= 0 and read_offset >= 0:
  123. # Seek to the current block start
  124. file.seek(read_offset, 0)
  125. # Read the current block (or less than block) data
  126. block_data = file.read(min(block_size, prev_offset - read_offset))
  127. num_lines = block_data.count(b"\n")
  128. if num_lines > lines_more:
  129. # This is the last block to read.
  130. # Need to find the offset of exact number of lines to tail
  131. # in the block.
  132. # Use `split` here to split away the extra lines, i.e.
  133. # first `num_lines - lines_more` lines.
  134. lines = block_data.split(b"\n", num_lines - lines_more)
  135. # Added the len of those lines that at the end of the block.
  136. nbytes_from_end += len(lines[-1])
  137. break
  138. # Need to read more blocks.
  139. lines_more -= num_lines
  140. nbytes_from_end += len(block_data)
  141. if read_offset == 0:
  142. # We have read all blocks (since the start)
  143. break
  144. # Continuing with the previous block
  145. prev_offset = read_offset
  146. read_offset = max(0, read_offset - block_size)
  147. offset_read_start = offset - nbytes_from_end
  148. assert (
  149. offset_read_start >= 0
  150. ), f"Read start offset({offset_read_start}) should be non-negative"
  151. return offset_read_start
  152. async def _stream_log_in_chunk(
  153. context: grpc.aio.ServicerContext,
  154. file: io.BufferedIOBase,
  155. start_offset: int,
  156. end_offset: int = -1,
  157. keep_alive_interval_sec: int = -1,
  158. block_size: int = BLOCK_SIZE,
  159. ):
  160. """Streaming log in chunk from start to end offset.
  161. Stream binary file content in chunks from start offset to an end
  162. offset if provided, else to the end of the file.
  163. Args:
  164. context: gRPC server side context
  165. file: Binary file to stream
  166. start_offset: File offset where streaming starts
  167. end_offset: If -1, implying streaming til the EOF.
  168. keep_alive_interval_sec: Duration for which streaming will be
  169. retried when reaching the file end, -1 means no retry.
  170. block_size: Number of bytes per chunk, exposed for testing
  171. Return:
  172. Async generator of StreamReply
  173. """
  174. assert "b" in file.mode, "Only binary file is supported."
  175. assert not (
  176. keep_alive_interval_sec >= 0 and end_offset != -1
  177. ), "Keep-alive is not allowed when specifying an end offset"
  178. file.seek(start_offset, 0)
  179. cur_offset = start_offset
  180. # Until gRPC is done
  181. while not context.done():
  182. # Read in block
  183. if end_offset != -1:
  184. to_read = min(end_offset - cur_offset, block_size)
  185. else:
  186. to_read = block_size
  187. bytes = file.read(to_read)
  188. if bytes == b"":
  189. # Stop reading
  190. if keep_alive_interval_sec >= 0:
  191. await asyncio.sleep(keep_alive_interval_sec)
  192. # Try reading again
  193. continue
  194. # Have read the entire file, done
  195. break
  196. logger.debug(f"Sending {len(bytes)} bytes at {cur_offset}")
  197. yield reporter_pb2.StreamLogReply(data=bytes)
  198. # Have read the requested section [start_offset, end_offset), done
  199. cur_offset += len(bytes)
  200. if end_offset != -1 and cur_offset >= end_offset:
  201. break
  202. class LogAgent(dashboard_utils.DashboardAgentModule):
  203. def __init__(self, dashboard_agent):
  204. super().__init__(dashboard_agent)
  205. log_utils.register_mimetypes()
  206. routes.static("/logs", self._dashboard_agent.log_dir, show_index=True)
  207. async def run(self, server):
  208. pass
  209. @staticmethod
  210. def is_minimal_module():
  211. return False
  212. _task_log_search_worker_pool = concurrent.futures.ThreadPoolExecutor(
  213. max_workers=RAY_DASHBOARD_LOG_TASK_LOG_SEARCH_MAX_WORKER_COUNT
  214. )
  215. class LogAgentV1Grpc(dashboard_utils.DashboardAgentModule):
  216. def __init__(self, dashboard_agent):
  217. super().__init__(dashboard_agent)
  218. async def run(self, server):
  219. if server:
  220. reporter_pb2_grpc.add_LogServiceServicer_to_server(self, server)
  221. @property
  222. def node_id(self) -> Optional[str]:
  223. return self._dashboard_agent.get_node_id()
  224. @staticmethod
  225. def is_minimal_module():
  226. # Dashboard is only available with non-minimal install now.
  227. return False
  228. async def ListLogs(self, request, context):
  229. """
  230. Lists all files in the active Ray logs directory.
  231. Part of `LogService` gRPC.
  232. NOTE: These RPCs are used by state_head.py, not log_head.py
  233. """
  234. path = Path(self._dashboard_agent.log_dir)
  235. if not path.exists():
  236. raise FileNotFoundError(
  237. f"Could not find log dir at path: {self._dashboard_agent.log_dir}"
  238. "It is unexpected. Please report an issue to Ray Github."
  239. )
  240. log_files = []
  241. for p in path.glob(request.glob_filter):
  242. log_files.append(str(p.relative_to(path)) + ("/" if p.is_dir() else ""))
  243. return reporter_pb2.ListLogsReply(log_files=log_files)
  244. @classmethod
  245. def _resolve_filename(cls, root_log_dir: Path, filename: str) -> Path:
  246. """
  247. Resolves the file path relative to the root log directory.
  248. Args:
  249. root_log_dir: Root log directory.
  250. filename: File path relative to the root log directory.
  251. Raises:
  252. FileNotFoundError: If the file path is invalid.
  253. Returns:
  254. The absolute file path resolved from the root log directory.
  255. """
  256. if not Path(filename).is_absolute():
  257. filepath = root_log_dir / filename
  258. else:
  259. filepath = Path(filename)
  260. # We want to allow relative paths that include symlinks pointing outside of the
  261. # `root_log_dir`, so use `os.path.abspath` instead of `Path.resolve()` because
  262. # `os.path.abspath` does not resolve symlinks.
  263. filepath = Path(os.path.abspath(filepath))
  264. if not filepath.is_file():
  265. raise FileNotFoundError(f"A file is not found at: {filepath}")
  266. try:
  267. filepath.relative_to(root_log_dir)
  268. except ValueError as e:
  269. raise FileNotFoundError(f"{filepath} not in {root_log_dir}: {e}")
  270. # Fully resolve the path before returning (including following symlinks).
  271. return filepath.resolve()
  272. async def StreamLog(self, request, context):
  273. """
  274. Streams the log in real time starting from `request.lines` number of lines from
  275. the end of the file if `request.keep_alive == True`. Else, it terminates the
  276. stream once there are no more bytes to read from the log file.
  277. Part of `LogService` gRPC.
  278. NOTE: These RPCs are used by state_head.py, not log_head.py
  279. """
  280. # NOTE: If the client side connection is closed, this handler will
  281. # be automatically terminated.
  282. lines = request.lines if request.lines else 1000
  283. try:
  284. filepath = self._resolve_filename(
  285. Path(self._dashboard_agent.log_dir), request.log_file_name
  286. )
  287. except FileNotFoundError as e:
  288. await context.send_initial_metadata([[log_consts.LOG_GRPC_ERROR, str(e)]])
  289. else:
  290. with open(filepath, "rb") as f:
  291. await context.send_initial_metadata([])
  292. # Default stream entire file
  293. start_offset = (
  294. request.start_offset if request.HasField("start_offset") else 0
  295. )
  296. end_offset = (
  297. request.end_offset
  298. if request.HasField("end_offset")
  299. else find_end_offset_file(f)
  300. )
  301. if lines != -1:
  302. # If specified tail line number, cap the start offset
  303. # with lines from the current end offset
  304. start_offset = max(
  305. find_start_offset_last_n_lines_from_offset(
  306. f, offset=end_offset, n=lines
  307. ),
  308. start_offset,
  309. )
  310. # If keep alive: following the log every 'interval'
  311. keep_alive_interval_sec = -1
  312. if request.keep_alive:
  313. keep_alive_interval_sec = (
  314. request.interval
  315. if request.interval
  316. else DEFAULT_KEEP_ALIVE_INTERVAL_SEC
  317. )
  318. # When following (keep_alive), it will read beyond the end
  319. end_offset = -1
  320. logger.info(
  321. f"Tailing logs from {start_offset} to {end_offset} for "
  322. f"lines={lines}, with keep_alive={keep_alive_interval_sec}"
  323. )
  324. # Read and send the file data in chunk
  325. async for chunk_res in _stream_log_in_chunk(
  326. context=context,
  327. file=f,
  328. start_offset=start_offset,
  329. end_offset=end_offset,
  330. keep_alive_interval_sec=keep_alive_interval_sec,
  331. ):
  332. yield chunk_res