| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- import logging
- import re
- from collections import defaultdict
- from typing import AsyncIterable, Awaitable, Callable, Dict, List, Optional, Tuple
- from ray import ActorID, NodeID, WorkerID
- from ray._common.pydantic_compat import BaseModel
- from ray.core.generated.gcs_pb2 import ActorTableData
- from ray.dashboard.modules.job.common import JOB_LOGS_PATH_TEMPLATE
- from ray.util.state.common import (
- DEFAULT_RPC_TIMEOUT,
- GetLogOptions,
- protobuf_to_task_state_dict,
- )
- from ray.util.state.state_manager import StateDataSourceClient
- if BaseModel is None:
- raise ModuleNotFoundError("Please install pydantic via `pip install pydantic`.")
- logger = logging.getLogger(__name__)
- WORKER_LOG_PATTERN = re.compile(r".*worker-([0-9a-f]+)-([0-9a-f]+)-(\d+).(out|err)")
- class ResolvedStreamFileInfo(BaseModel):
- # The node id where the log file is located.
- node_id: str
- # The log file path name. Could be a relative path relative to ray's logging folder,
- # or an absolute path.
- filename: str
- # Start offset in the log file to stream from. None to indicate beginning of
- # the file, or determined by last tail lines.
- start_offset: Optional[int]
- # End offset in the log file to stream from. None to indicate the end of the file.
- end_offset: Optional[int]
- class LogsManager:
- def __init__(self, data_source_client: StateDataSourceClient):
- self.client = data_source_client
- @property
- def data_source_client(self) -> StateDataSourceClient:
- return self.client
- async def ip_to_node_id(self, node_ip: Optional[str]) -> Optional[str]:
- """Resolve the node id in hex from a given node ip.
- Args:
- node_ip: The node ip.
- Returns:
- node_id if there's a node id that matches the given node ip and is alive.
- None otherwise.
- """
- return await self.client.ip_to_node_id(node_ip)
- async def list_logs(
- self, node_id: str, timeout: int, glob_filter: str = "*"
- ) -> Dict[str, List[str]]:
- """Return a list of log files on a given node id filtered by the glob.
- Args:
- node_id: The node id where log files present.
- timeout: The timeout of the API.
- glob_filter: The glob filter to filter out log files.
- Returns:
- Dictionary of {component_name -> list of log files}
- Raises:
- ValueError: If a source is unresponsive.
- """
- reply = await self.client.list_logs(node_id, glob_filter, timeout=timeout)
- return self._categorize_log_files(reply.log_files)
- async def stream_logs(
- self,
- options: GetLogOptions,
- get_actor_fn: Callable[[ActorID], Awaitable[Optional[ActorTableData]]],
- ) -> AsyncIterable[bytes]:
- """Generate a stream of logs in bytes.
- Args:
- options: The option for streaming logs.
- Return:
- Async generator of streamed logs in bytes.
- """
- node_id = options.node_id
- if node_id is None:
- node_id = await self.ip_to_node_id(options.node_ip)
- res = await self.resolve_filename(
- node_id=node_id,
- log_filename=options.filename,
- actor_id=options.actor_id,
- task_id=options.task_id,
- attempt_number=options.attempt_number,
- pid=options.pid,
- get_actor_fn=get_actor_fn,
- timeout=options.timeout,
- suffix=options.suffix,
- submission_id=options.submission_id,
- )
- keep_alive = options.media_type == "stream"
- stream = await self.client.stream_log(
- node_id=res.node_id,
- log_file_name=res.filename,
- keep_alive=keep_alive,
- lines=options.lines,
- interval=options.interval,
- # If we keepalive logs connection, we shouldn't have timeout
- # otherwise the stream will be terminated forcefully
- # after the deadline is expired.
- timeout=options.timeout if not keep_alive else None,
- start_offset=res.start_offset,
- end_offset=res.end_offset,
- )
- async for streamed_log in stream:
- yield streamed_log.data
- async def _resolve_job_filename(self, sub_job_id: str) -> Tuple[str, str]:
- """Return the log file name and node id for a given job submission id.
- Args:
- sub_job_id: The job submission id.
- Returns:
- The log file name and node id.
- """
- job_infos = await self.client.get_job_info(timeout=DEFAULT_RPC_TIMEOUT)
- target_job = None
- for job_info in job_infos:
- if job_info.submission_id == sub_job_id:
- target_job = job_info
- break
- if target_job is None:
- logger.info(f"Submission job ID {sub_job_id} not found.")
- return None, None
- node_id = job_info.driver_node_id
- if node_id is None:
- raise ValueError(
- f"Job {sub_job_id} has no driver node id info. "
- "This is likely a bug. Please file an issue."
- )
- log_filename = JOB_LOGS_PATH_TEMPLATE.format(submission_id=sub_job_id)
- return node_id, log_filename
- async def _resolve_worker_file(
- self,
- node_id_hex: str,
- worker_id_hex: Optional[str],
- pid: Optional[int],
- suffix: str,
- timeout: int,
- ) -> Optional[str]:
- """Resolve worker log file."""
- if worker_id_hex is not None and pid is not None:
- raise ValueError(
- f"Only one of worker id({worker_id_hex}) or pid({pid}) should be"
- "provided."
- )
- if worker_id_hex is not None:
- log_files = await self.list_logs(
- node_id_hex, timeout, glob_filter=f"*{worker_id_hex}*{suffix}"
- )
- else:
- log_files = await self.list_logs(
- node_id_hex, timeout, glob_filter=f"*{pid}*{suffix}"
- )
- # Find matching worker logs.
- for filename in [*log_files["worker_out"], *log_files["worker_err"]]:
- # Worker logs look like worker-[worker_id]-[job_id]-[pid].out
- if worker_id_hex is not None:
- worker_id_from_filename = WORKER_LOG_PATTERN.match(filename).group(1)
- if worker_id_from_filename == worker_id_hex:
- return filename
- else:
- worker_pid_from_filename = int(
- WORKER_LOG_PATTERN.match(filename).group(3)
- )
- if worker_pid_from_filename == pid:
- return filename
- return None
- async def _resolve_actor_filename(
- self,
- actor_id: ActorID,
- get_actor_fn: Callable[[ActorID], Awaitable[Optional[ActorTableData]]],
- suffix: str,
- timeout: int,
- ):
- """Resolve actor log file.
- Args:
- actor_id: The actor id.
- get_actor_fn: The function to get actor information.
- suffix: The suffix of the log file.
- timeout: Timeout in seconds.
- Returns:
- The log file name and node id.
- Raises:
- ValueError: If actor data is not found or get_actor_fn is not provided.
- """
- if get_actor_fn is None:
- raise ValueError("get_actor_fn needs to be specified for actor_id")
- actor_data = await get_actor_fn(actor_id)
- if actor_data is None:
- raise ValueError(f"Actor ID {actor_id} not found.")
- # TODO(sang): Only the latest worker id can be obtained from
- # actor information now. That means, if actors are restarted,
- # there's no way for us to get the past worker ids.
- worker_id_binary = actor_data.address.worker_id
- if not worker_id_binary:
- raise ValueError(
- f"Worker ID for Actor ID {actor_id} not found. "
- "Actor is not scheduled yet."
- )
- worker_id = WorkerID(worker_id_binary)
- node_id_binary = actor_data.address.node_id
- if not node_id_binary:
- raise ValueError(
- f"Node ID for Actor ID {actor_id} not found. "
- "Actor is not scheduled yet."
- )
- node_id = NodeID(node_id_binary)
- log_filename = await self._resolve_worker_file(
- node_id_hex=node_id.hex(),
- worker_id_hex=worker_id.hex(),
- pid=None,
- suffix=suffix,
- timeout=timeout,
- )
- return node_id.hex(), log_filename
- async def _resolve_task_filename(
- self, task_id: str, attempt_number: int, suffix: str, timeout: int
- ):
- """Resolve log file for a task.
- Args:
- task_id: The task id.
- attempt_number: The attempt number.
- suffix: The suffix of the log file, e.g. out or err.
- timeout: Timeout in seconds.
- Returns:
- The log file name, node id, the start and end offsets of the
- corresponding task log in the file.
- Raises:
- FileNotFoundError: If the log file is not found.
- ValueError: If the suffix is not out or err.
- """
- log_filename = None
- node_id = None
- start_offset = None
- end_offset = None
- if suffix not in ["out", "err"]:
- raise ValueError(f"Suffix {suffix} is not supported.")
- reply = await self.client.get_all_task_info(
- filters=[("task_id", "=", task_id)], timeout=timeout
- )
- # Check if the task is found.
- if len(reply.events_by_task) == 0:
- raise FileNotFoundError(
- f"Could not find log file for task: {task_id}"
- f" (attempt {attempt_number}) with suffix: {suffix}"
- )
- task_event = None
- for t in reply.events_by_task:
- if t.attempt_number == attempt_number:
- task_event = t
- break
- if task_event is None:
- raise FileNotFoundError(
- "Could not find log file for task attempt:"
- f"{task_id}({attempt_number})"
- )
- # Get the worker id and node id.
- task = protobuf_to_task_state_dict(task_event)
- worker_id = task.get("worker_id", None)
- node_id = task.get("node_id", None)
- log_info = task.get("task_log_info", None)
- actor_id = task.get("actor_id", None)
- if node_id is None:
- raise FileNotFoundError(
- "Could not find log file for task attempt."
- f"{task_id}({attempt_number}) due to missing node info."
- )
- if log_info is None and actor_id is not None:
- # This is a concurrent actor task. The logs will be interleaved.
- # So we return the log file of the actor instead.
- raise FileNotFoundError(
- f"For actor task, please query actor log for "
- f"actor({actor_id}): e.g. ray logs actor --id {actor_id} . Or "
- "set RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1 in actor's runtime env "
- "or when starting the cluster. Recording actor task's log could be "
- "expensive, so Ray turns it off by default."
- )
- elif log_info is None:
- raise FileNotFoundError(
- "Could not find log file for task attempt:"
- f"{task_id}({attempt_number})."
- f"Worker id = {worker_id}, node id = {node_id},"
- f"log_info = {log_info}"
- )
- filename_key = "stdout_file" if suffix == "out" else "stderr_file"
- log_filename = log_info.get(filename_key, None)
- if log_filename is None:
- raise FileNotFoundError(
- f"Missing log filename info in {log_info} for task {task_id},"
- f"attempt {attempt_number}"
- )
- start_offset = log_info.get(f"std{suffix}_start", None)
- end_offset = log_info.get(f"std{suffix}_end", None)
- return node_id, log_filename, start_offset, end_offset
- async def resolve_filename(
- self,
- *,
- node_id: Optional[str] = None,
- log_filename: Optional[str] = None,
- actor_id: Optional[str] = None,
- task_id: Optional[str] = None,
- attempt_number: Optional[int] = None,
- pid: Optional[str] = None,
- get_actor_fn: Optional[
- Callable[[ActorID], Awaitable[Optional[ActorTableData]]]
- ] = None,
- timeout: int = DEFAULT_RPC_TIMEOUT,
- suffix: str = "out",
- submission_id: Optional[str] = None,
- ) -> ResolvedStreamFileInfo:
- """Return the file name given all options.
- Args:
- node_id: The node's id from which logs are resolved.
- log_filename: Filename of the log file.
- actor_id: Id of the actor that generates the log file.
- task_id: Id of the task that generates the log file.
- pid: Id of the worker process that generates the log file.
- get_actor_fn: Callback to get the actor's data by id.
- timeout: Timeout for the gRPC to listing logs on the node
- specified by `node_id`.
- suffix: Log suffix if no `log_filename` is provided, when
- resolving by other ids'. Default to "out".
- submission_id: The submission id for a submission job.
- """
- start_offset = None
- end_offset = None
- if suffix not in ["out", "err"]:
- raise ValueError(f"Suffix {suffix} is not supported. ")
- # TODO(rickyx): We should make sure we do some sort of checking on the log
- # filename
- if actor_id:
- node_id, log_filename = await self._resolve_actor_filename(
- ActorID.from_hex(actor_id), get_actor_fn, suffix, timeout
- )
- elif task_id:
- (
- node_id,
- log_filename,
- start_offset,
- end_offset,
- ) = await self._resolve_task_filename(
- task_id, attempt_number, suffix, timeout
- )
- elif submission_id:
- node_id, log_filename = await self._resolve_job_filename(submission_id)
- elif pid:
- if node_id is None:
- raise ValueError(
- "Node id needs to be specified for resolving"
- f" filenames of pid {pid}"
- )
- log_filename = await self._resolve_worker_file(
- node_id_hex=node_id,
- worker_id_hex=None,
- pid=pid,
- suffix=suffix,
- timeout=timeout,
- )
- if log_filename is None:
- raise FileNotFoundError(
- "Could not find a log file. Please make sure the given "
- "option exists in the cluster.\n"
- f"\tnode_id: {node_id}\n"
- f"\tfilename: {log_filename}\n"
- f"\tactor_id: {actor_id}\n"
- f"\ttask_id: {task_id}\n"
- f"\tpid: {pid}\n"
- f"\tsuffix: {suffix}\n"
- f"\tsubmission_id: {submission_id}\n"
- f"\tattempt_number: {attempt_number}\n"
- )
- res = ResolvedStreamFileInfo(
- node_id=node_id,
- filename=log_filename,
- start_offset=start_offset,
- end_offset=end_offset,
- )
- logger.info(f"Resolved log file: {res}")
- return res
- def _categorize_log_files(self, log_files: List[str]) -> Dict[str, List[str]]:
- """Categorize the given log files after filterieng them out using a given glob.
- Returns:
- Dictionary of {component_name -> list of log files}
- """
- result = defaultdict(list)
- for log_file in log_files:
- if "worker" in log_file and (log_file.endswith(".out")):
- result["worker_out"].append(log_file)
- elif "worker" in log_file and (log_file.endswith(".err")):
- result["worker_err"].append(log_file)
- elif "core-worker" in log_file and log_file.endswith(".log"):
- result["core_worker"].append(log_file)
- elif "core-driver" in log_file and log_file.endswith(".log"):
- result["driver"].append(log_file)
- elif "raylet." in log_file:
- result["raylet"].append(log_file)
- elif "gcs_server." in log_file:
- result["gcs_server"].append(log_file)
- elif "log_monitor" in log_file:
- result["internal"].append(log_file)
- elif "monitor" in log_file:
- result["autoscaler"].append(log_file)
- elif "agent." in log_file:
- result["agent"].append(log_file)
- elif "dashboard." in log_file:
- result["dashboard"].append(log_file)
- else:
- result["internal"].append(log_file)
- return result
|