| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- """This file implements a threaded stream controller to return logs back from
- the ray clientserver.
- """
- import logging
- import queue
- import sys
- import threading
- import time
- from typing import TYPE_CHECKING
- import grpc
- import ray.core.generated.ray_client_pb2 as ray_client_pb2
- import ray.core.generated.ray_client_pb2_grpc as ray_client_pb2_grpc
- from ray.util.debug import log_once
- if TYPE_CHECKING:
- from ray.util.client.worker import Worker
- logger = logging.getLogger(__name__)
- # TODO(barakmich): Running a logger in a logger causes loopback.
- # The client logger need its own root -- possibly this one.
- # For the moment, let's just not propogate beyond this point.
- logger.propagate = False
- class LogstreamClient:
- def __init__(self, client_worker: "Worker", metadata: list):
- """Initializes a thread-safe log stream over a Ray Client gRPC channel.
- Args:
- client_worker: The Ray Client worker that manages this client
- metadata: metadata to pass to gRPC requests
- """
- self.client_worker = client_worker
- self._metadata = metadata
- self.request_queue = queue.Queue()
- self.log_thread = self._start_logthread()
- self.log_thread.start()
- self.last_req = None
- def _start_logthread(self) -> threading.Thread:
- return threading.Thread(target=self._log_main, args=(), daemon=True)
- def _log_main(self) -> None:
- reconnecting = False
- while not self.client_worker._in_shutdown:
- if reconnecting:
- # Refresh queue and retry last request
- self.request_queue = queue.Queue()
- if self.last_req:
- self.request_queue.put(self.last_req)
- stub = ray_client_pb2_grpc.RayletLogStreamerStub(self.client_worker.channel)
- try:
- log_stream = stub.Logstream(
- iter(self.request_queue.get, None), metadata=self._metadata
- )
- except ValueError:
- # Trying to use the stub on a cancelled channel will raise
- # ValueError. This should only happen when the data client
- # is attempting to reset the connection -- sleep and try
- # again.
- time.sleep(0.5)
- continue
- try:
- for record in log_stream:
- if record.level < 0:
- self.stdstream(level=record.level, msg=record.msg)
- self.log(level=record.level, msg=record.msg)
- return
- except grpc.RpcError as e:
- reconnecting = self._process_rpc_error(e)
- if not reconnecting:
- return
- def _process_rpc_error(self, e: grpc.RpcError) -> bool:
- """
- Processes RPC errors that occur while reading from data stream.
- Returns True if the error can be recovered from, False otherwise.
- """
- if self.client_worker._can_reconnect(e):
- if log_once("lost_reconnect_logs"):
- logger.warning(
- "Log channel is reconnecting. Logs produced while "
- "the connection was down can be found on the head "
- "node of the cluster in "
- "`ray_client_server_[port].out`"
- )
- logger.debug("Log channel dropped, retrying.")
- time.sleep(0.5)
- return True
- logger.debug("Shutting down log channel.")
- if not self.client_worker._in_shutdown:
- logger.exception("Unexpected exception:")
- return False
- def log(self, level: int, msg: str):
- """Log the message from the log stream.
- By default, calls logger.log but this can be overridden.
- Args:
- level: The loglevel of the received log message
- msg: The content of the message
- """
- logger.log(level=level, msg=msg)
- def stdstream(self, level: int, msg: str):
- """Log the stdout/stderr entry from the log stream.
- By default, calls print but this can be overridden.
- Args:
- level: The loglevel of the received log message
- msg: The content of the message
- """
- print_file = sys.stderr if level == -2 else sys.stdout
- print(msg, file=print_file, end="")
- def set_logstream_level(self, level: int):
- logger.setLevel(level)
- req = ray_client_pb2.LogSettingsRequest()
- req.enabled = True
- req.loglevel = level
- self.request_queue.put(req)
- self.last_req = req
- def close(self) -> None:
- self.request_queue.put(None)
- if self.log_thread is not None:
- self.log_thread.join()
- def disable_logs(self) -> None:
- req = ray_client_pb2.LogSettingsRequest()
- req.enabled = False
- self.request_queue.put(req)
- self.last_req = req
|