log_manager.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. import logging
  2. import re
  3. from collections import defaultdict
  4. from typing import AsyncIterable, Awaitable, Callable, Dict, List, Optional, Tuple
  5. from ray import ActorID, NodeID, WorkerID
  6. from ray._common.pydantic_compat import BaseModel
  7. from ray.core.generated.gcs_pb2 import ActorTableData
  8. from ray.dashboard.modules.job.common import JOB_LOGS_PATH_TEMPLATE
  9. from ray.util.state.common import (
  10. DEFAULT_RPC_TIMEOUT,
  11. GetLogOptions,
  12. protobuf_to_task_state_dict,
  13. )
  14. from ray.util.state.state_manager import StateDataSourceClient
  15. if BaseModel is None:
  16. raise ModuleNotFoundError("Please install pydantic via `pip install pydantic`.")
  17. logger = logging.getLogger(__name__)
  18. WORKER_LOG_PATTERN = re.compile(r".*worker-([0-9a-f]+)-([0-9a-f]+)-(\d+).(out|err)")
  19. class ResolvedStreamFileInfo(BaseModel):
  20. # The node id where the log file is located.
  21. node_id: str
  22. # The log file path name. Could be a relative path relative to ray's logging folder,
  23. # or an absolute path.
  24. filename: str
  25. # Start offset in the log file to stream from. None to indicate beginning of
  26. # the file, or determined by last tail lines.
  27. start_offset: Optional[int]
  28. # End offset in the log file to stream from. None to indicate the end of the file.
  29. end_offset: Optional[int]
  30. class LogsManager:
  31. def __init__(self, data_source_client: StateDataSourceClient):
  32. self.client = data_source_client
  33. @property
  34. def data_source_client(self) -> StateDataSourceClient:
  35. return self.client
  36. async def ip_to_node_id(self, node_ip: Optional[str]) -> Optional[str]:
  37. """Resolve the node id in hex from a given node ip.
  38. Args:
  39. node_ip: The node ip.
  40. Returns:
  41. node_id if there's a node id that matches the given node ip and is alive.
  42. None otherwise.
  43. """
  44. return await self.client.ip_to_node_id(node_ip)
  45. async def list_logs(
  46. self, node_id: str, timeout: int, glob_filter: str = "*"
  47. ) -> Dict[str, List[str]]:
  48. """Return a list of log files on a given node id filtered by the glob.
  49. Args:
  50. node_id: The node id where log files present.
  51. timeout: The timeout of the API.
  52. glob_filter: The glob filter to filter out log files.
  53. Returns:
  54. Dictionary of {component_name -> list of log files}
  55. Raises:
  56. ValueError: If a source is unresponsive.
  57. """
  58. reply = await self.client.list_logs(node_id, glob_filter, timeout=timeout)
  59. return self._categorize_log_files(reply.log_files)
  60. async def stream_logs(
  61. self,
  62. options: GetLogOptions,
  63. get_actor_fn: Callable[[ActorID], Awaitable[Optional[ActorTableData]]],
  64. ) -> AsyncIterable[bytes]:
  65. """Generate a stream of logs in bytes.
  66. Args:
  67. options: The option for streaming logs.
  68. Return:
  69. Async generator of streamed logs in bytes.
  70. """
  71. node_id = options.node_id
  72. if node_id is None:
  73. node_id = await self.ip_to_node_id(options.node_ip)
  74. res = await self.resolve_filename(
  75. node_id=node_id,
  76. log_filename=options.filename,
  77. actor_id=options.actor_id,
  78. task_id=options.task_id,
  79. attempt_number=options.attempt_number,
  80. pid=options.pid,
  81. get_actor_fn=get_actor_fn,
  82. timeout=options.timeout,
  83. suffix=options.suffix,
  84. submission_id=options.submission_id,
  85. )
  86. keep_alive = options.media_type == "stream"
  87. stream = await self.client.stream_log(
  88. node_id=res.node_id,
  89. log_file_name=res.filename,
  90. keep_alive=keep_alive,
  91. lines=options.lines,
  92. interval=options.interval,
  93. # If we keepalive logs connection, we shouldn't have timeout
  94. # otherwise the stream will be terminated forcefully
  95. # after the deadline is expired.
  96. timeout=options.timeout if not keep_alive else None,
  97. start_offset=res.start_offset,
  98. end_offset=res.end_offset,
  99. )
  100. async for streamed_log in stream:
  101. yield streamed_log.data
  102. async def _resolve_job_filename(self, sub_job_id: str) -> Tuple[str, str]:
  103. """Return the log file name and node id for a given job submission id.
  104. Args:
  105. sub_job_id: The job submission id.
  106. Returns:
  107. The log file name and node id.
  108. """
  109. job_infos = await self.client.get_job_info(timeout=DEFAULT_RPC_TIMEOUT)
  110. target_job = None
  111. for job_info in job_infos:
  112. if job_info.submission_id == sub_job_id:
  113. target_job = job_info
  114. break
  115. if target_job is None:
  116. logger.info(f"Submission job ID {sub_job_id} not found.")
  117. return None, None
  118. node_id = job_info.driver_node_id
  119. if node_id is None:
  120. raise ValueError(
  121. f"Job {sub_job_id} has no driver node id info. "
  122. "This is likely a bug. Please file an issue."
  123. )
  124. log_filename = JOB_LOGS_PATH_TEMPLATE.format(submission_id=sub_job_id)
  125. return node_id, log_filename
  126. async def _resolve_worker_file(
  127. self,
  128. node_id_hex: str,
  129. worker_id_hex: Optional[str],
  130. pid: Optional[int],
  131. suffix: str,
  132. timeout: int,
  133. ) -> Optional[str]:
  134. """Resolve worker log file."""
  135. if worker_id_hex is not None and pid is not None:
  136. raise ValueError(
  137. f"Only one of worker id({worker_id_hex}) or pid({pid}) should be"
  138. "provided."
  139. )
  140. if worker_id_hex is not None:
  141. log_files = await self.list_logs(
  142. node_id_hex, timeout, glob_filter=f"*{worker_id_hex}*{suffix}"
  143. )
  144. else:
  145. log_files = await self.list_logs(
  146. node_id_hex, timeout, glob_filter=f"*{pid}*{suffix}"
  147. )
  148. # Find matching worker logs.
  149. for filename in [*log_files["worker_out"], *log_files["worker_err"]]:
  150. # Worker logs look like worker-[worker_id]-[job_id]-[pid].out
  151. if worker_id_hex is not None:
  152. worker_id_from_filename = WORKER_LOG_PATTERN.match(filename).group(1)
  153. if worker_id_from_filename == worker_id_hex:
  154. return filename
  155. else:
  156. worker_pid_from_filename = int(
  157. WORKER_LOG_PATTERN.match(filename).group(3)
  158. )
  159. if worker_pid_from_filename == pid:
  160. return filename
  161. return None
  162. async def _resolve_actor_filename(
  163. self,
  164. actor_id: ActorID,
  165. get_actor_fn: Callable[[ActorID], Awaitable[Optional[ActorTableData]]],
  166. suffix: str,
  167. timeout: int,
  168. ):
  169. """Resolve actor log file.
  170. Args:
  171. actor_id: The actor id.
  172. get_actor_fn: The function to get actor information.
  173. suffix: The suffix of the log file.
  174. timeout: Timeout in seconds.
  175. Returns:
  176. The log file name and node id.
  177. Raises:
  178. ValueError: If actor data is not found or get_actor_fn is not provided.
  179. """
  180. if get_actor_fn is None:
  181. raise ValueError("get_actor_fn needs to be specified for actor_id")
  182. actor_data = await get_actor_fn(actor_id)
  183. if actor_data is None:
  184. raise ValueError(f"Actor ID {actor_id} not found.")
  185. # TODO(sang): Only the latest worker id can be obtained from
  186. # actor information now. That means, if actors are restarted,
  187. # there's no way for us to get the past worker ids.
  188. worker_id_binary = actor_data.address.worker_id
  189. if not worker_id_binary:
  190. raise ValueError(
  191. f"Worker ID for Actor ID {actor_id} not found. "
  192. "Actor is not scheduled yet."
  193. )
  194. worker_id = WorkerID(worker_id_binary)
  195. node_id_binary = actor_data.address.node_id
  196. if not node_id_binary:
  197. raise ValueError(
  198. f"Node ID for Actor ID {actor_id} not found. "
  199. "Actor is not scheduled yet."
  200. )
  201. node_id = NodeID(node_id_binary)
  202. log_filename = await self._resolve_worker_file(
  203. node_id_hex=node_id.hex(),
  204. worker_id_hex=worker_id.hex(),
  205. pid=None,
  206. suffix=suffix,
  207. timeout=timeout,
  208. )
  209. return node_id.hex(), log_filename
  210. async def _resolve_task_filename(
  211. self, task_id: str, attempt_number: int, suffix: str, timeout: int
  212. ):
  213. """Resolve log file for a task.
  214. Args:
  215. task_id: The task id.
  216. attempt_number: The attempt number.
  217. suffix: The suffix of the log file, e.g. out or err.
  218. timeout: Timeout in seconds.
  219. Returns:
  220. The log file name, node id, the start and end offsets of the
  221. corresponding task log in the file.
  222. Raises:
  223. FileNotFoundError: If the log file is not found.
  224. ValueError: If the suffix is not out or err.
  225. """
  226. log_filename = None
  227. node_id = None
  228. start_offset = None
  229. end_offset = None
  230. if suffix not in ["out", "err"]:
  231. raise ValueError(f"Suffix {suffix} is not supported.")
  232. reply = await self.client.get_all_task_info(
  233. filters=[("task_id", "=", task_id)], timeout=timeout
  234. )
  235. # Check if the task is found.
  236. if len(reply.events_by_task) == 0:
  237. raise FileNotFoundError(
  238. f"Could not find log file for task: {task_id}"
  239. f" (attempt {attempt_number}) with suffix: {suffix}"
  240. )
  241. task_event = None
  242. for t in reply.events_by_task:
  243. if t.attempt_number == attempt_number:
  244. task_event = t
  245. break
  246. if task_event is None:
  247. raise FileNotFoundError(
  248. "Could not find log file for task attempt:"
  249. f"{task_id}({attempt_number})"
  250. )
  251. # Get the worker id and node id.
  252. task = protobuf_to_task_state_dict(task_event)
  253. worker_id = task.get("worker_id", None)
  254. node_id = task.get("node_id", None)
  255. log_info = task.get("task_log_info", None)
  256. actor_id = task.get("actor_id", None)
  257. if node_id is None:
  258. raise FileNotFoundError(
  259. "Could not find log file for task attempt."
  260. f"{task_id}({attempt_number}) due to missing node info."
  261. )
  262. if log_info is None and actor_id is not None:
  263. # This is a concurrent actor task. The logs will be interleaved.
  264. # So we return the log file of the actor instead.
  265. raise FileNotFoundError(
  266. f"For actor task, please query actor log for "
  267. f"actor({actor_id}): e.g. ray logs actor --id {actor_id} . Or "
  268. "set RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1 in actor's runtime env "
  269. "or when starting the cluster. Recording actor task's log could be "
  270. "expensive, so Ray turns it off by default."
  271. )
  272. elif log_info is None:
  273. raise FileNotFoundError(
  274. "Could not find log file for task attempt:"
  275. f"{task_id}({attempt_number})."
  276. f"Worker id = {worker_id}, node id = {node_id},"
  277. f"log_info = {log_info}"
  278. )
  279. filename_key = "stdout_file" if suffix == "out" else "stderr_file"
  280. log_filename = log_info.get(filename_key, None)
  281. if log_filename is None:
  282. raise FileNotFoundError(
  283. f"Missing log filename info in {log_info} for task {task_id},"
  284. f"attempt {attempt_number}"
  285. )
  286. start_offset = log_info.get(f"std{suffix}_start", None)
  287. end_offset = log_info.get(f"std{suffix}_end", None)
  288. return node_id, log_filename, start_offset, end_offset
  289. async def resolve_filename(
  290. self,
  291. *,
  292. node_id: Optional[str] = None,
  293. log_filename: Optional[str] = None,
  294. actor_id: Optional[str] = None,
  295. task_id: Optional[str] = None,
  296. attempt_number: Optional[int] = None,
  297. pid: Optional[str] = None,
  298. get_actor_fn: Optional[
  299. Callable[[ActorID], Awaitable[Optional[ActorTableData]]]
  300. ] = None,
  301. timeout: int = DEFAULT_RPC_TIMEOUT,
  302. suffix: str = "out",
  303. submission_id: Optional[str] = None,
  304. ) -> ResolvedStreamFileInfo:
  305. """Return the file name given all options.
  306. Args:
  307. node_id: The node's id from which logs are resolved.
  308. log_filename: Filename of the log file.
  309. actor_id: Id of the actor that generates the log file.
  310. task_id: Id of the task that generates the log file.
  311. pid: Id of the worker process that generates the log file.
  312. get_actor_fn: Callback to get the actor's data by id.
  313. timeout: Timeout for the gRPC to listing logs on the node
  314. specified by `node_id`.
  315. suffix: Log suffix if no `log_filename` is provided, when
  316. resolving by other ids'. Default to "out".
  317. submission_id: The submission id for a submission job.
  318. """
  319. start_offset = None
  320. end_offset = None
  321. if suffix not in ["out", "err"]:
  322. raise ValueError(f"Suffix {suffix} is not supported. ")
  323. # TODO(rickyx): We should make sure we do some sort of checking on the log
  324. # filename
  325. if actor_id:
  326. node_id, log_filename = await self._resolve_actor_filename(
  327. ActorID.from_hex(actor_id), get_actor_fn, suffix, timeout
  328. )
  329. elif task_id:
  330. (
  331. node_id,
  332. log_filename,
  333. start_offset,
  334. end_offset,
  335. ) = await self._resolve_task_filename(
  336. task_id, attempt_number, suffix, timeout
  337. )
  338. elif submission_id:
  339. node_id, log_filename = await self._resolve_job_filename(submission_id)
  340. elif pid:
  341. if node_id is None:
  342. raise ValueError(
  343. "Node id needs to be specified for resolving"
  344. f" filenames of pid {pid}"
  345. )
  346. log_filename = await self._resolve_worker_file(
  347. node_id_hex=node_id,
  348. worker_id_hex=None,
  349. pid=pid,
  350. suffix=suffix,
  351. timeout=timeout,
  352. )
  353. if log_filename is None:
  354. raise FileNotFoundError(
  355. "Could not find a log file. Please make sure the given "
  356. "option exists in the cluster.\n"
  357. f"\tnode_id: {node_id}\n"
  358. f"\tfilename: {log_filename}\n"
  359. f"\tactor_id: {actor_id}\n"
  360. f"\ttask_id: {task_id}\n"
  361. f"\tpid: {pid}\n"
  362. f"\tsuffix: {suffix}\n"
  363. f"\tsubmission_id: {submission_id}\n"
  364. f"\tattempt_number: {attempt_number}\n"
  365. )
  366. res = ResolvedStreamFileInfo(
  367. node_id=node_id,
  368. filename=log_filename,
  369. start_offset=start_offset,
  370. end_offset=end_offset,
  371. )
  372. logger.info(f"Resolved log file: {res}")
  373. return res
  374. def _categorize_log_files(self, log_files: List[str]) -> Dict[str, List[str]]:
  375. """Categorize the given log files after filterieng them out using a given glob.
  376. Returns:
  377. Dictionary of {component_name -> list of log files}
  378. """
  379. result = defaultdict(list)
  380. for log_file in log_files:
  381. if "worker" in log_file and (log_file.endswith(".out")):
  382. result["worker_out"].append(log_file)
  383. elif "worker" in log_file and (log_file.endswith(".err")):
  384. result["worker_err"].append(log_file)
  385. elif "core-worker" in log_file and log_file.endswith(".log"):
  386. result["core_worker"].append(log_file)
  387. elif "core-driver" in log_file and log_file.endswith(".log"):
  388. result["driver"].append(log_file)
  389. elif "raylet." in log_file:
  390. result["raylet"].append(log_file)
  391. elif "gcs_server." in log_file:
  392. result["gcs_server"].append(log_file)
  393. elif "log_monitor" in log_file:
  394. result["internal"].append(log_file)
  395. elif "monitor" in log_file:
  396. result["autoscaler"].append(log_file)
  397. elif "agent." in log_file:
  398. result["agent"].append(log_file)
  399. elif "dashboard." in log_file:
  400. result["dashboard"].append(log_file)
  401. else:
  402. result["internal"].append(log_file)
  403. return result