logsclient.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. """This file implements a threaded stream controller to return logs back from
  2. the ray clientserver.
  3. """
  4. import logging
  5. import queue
  6. import sys
  7. import threading
  8. import time
  9. from typing import TYPE_CHECKING
  10. import grpc
  11. import ray.core.generated.ray_client_pb2 as ray_client_pb2
  12. import ray.core.generated.ray_client_pb2_grpc as ray_client_pb2_grpc
  13. from ray.util.debug import log_once
  14. if TYPE_CHECKING:
  15. from ray.util.client.worker import Worker
  16. logger = logging.getLogger(__name__)
  17. # TODO(barakmich): Running a logger in a logger causes loopback.
  18. # The client logger need its own root -- possibly this one.
  19. # For the moment, let's just not propogate beyond this point.
  20. logger.propagate = False
  21. class LogstreamClient:
  22. def __init__(self, client_worker: "Worker", metadata: list):
  23. """Initializes a thread-safe log stream over a Ray Client gRPC channel.
  24. Args:
  25. client_worker: The Ray Client worker that manages this client
  26. metadata: metadata to pass to gRPC requests
  27. """
  28. self.client_worker = client_worker
  29. self._metadata = metadata
  30. self.request_queue = queue.Queue()
  31. self.log_thread = self._start_logthread()
  32. self.log_thread.start()
  33. self.last_req = None
  34. def _start_logthread(self) -> threading.Thread:
  35. return threading.Thread(target=self._log_main, args=(), daemon=True)
  36. def _log_main(self) -> None:
  37. reconnecting = False
  38. while not self.client_worker._in_shutdown:
  39. if reconnecting:
  40. # Refresh queue and retry last request
  41. self.request_queue = queue.Queue()
  42. if self.last_req:
  43. self.request_queue.put(self.last_req)
  44. stub = ray_client_pb2_grpc.RayletLogStreamerStub(self.client_worker.channel)
  45. try:
  46. log_stream = stub.Logstream(
  47. iter(self.request_queue.get, None), metadata=self._metadata
  48. )
  49. except ValueError:
  50. # Trying to use the stub on a cancelled channel will raise
  51. # ValueError. This should only happen when the data client
  52. # is attempting to reset the connection -- sleep and try
  53. # again.
  54. time.sleep(0.5)
  55. continue
  56. try:
  57. for record in log_stream:
  58. if record.level < 0:
  59. self.stdstream(level=record.level, msg=record.msg)
  60. self.log(level=record.level, msg=record.msg)
  61. return
  62. except grpc.RpcError as e:
  63. reconnecting = self._process_rpc_error(e)
  64. if not reconnecting:
  65. return
  66. def _process_rpc_error(self, e: grpc.RpcError) -> bool:
  67. """
  68. Processes RPC errors that occur while reading from data stream.
  69. Returns True if the error can be recovered from, False otherwise.
  70. """
  71. if self.client_worker._can_reconnect(e):
  72. if log_once("lost_reconnect_logs"):
  73. logger.warning(
  74. "Log channel is reconnecting. Logs produced while "
  75. "the connection was down can be found on the head "
  76. "node of the cluster in "
  77. "`ray_client_server_[port].out`"
  78. )
  79. logger.debug("Log channel dropped, retrying.")
  80. time.sleep(0.5)
  81. return True
  82. logger.debug("Shutting down log channel.")
  83. if not self.client_worker._in_shutdown:
  84. logger.exception("Unexpected exception:")
  85. return False
  86. def log(self, level: int, msg: str):
  87. """Log the message from the log stream.
  88. By default, calls logger.log but this can be overridden.
  89. Args:
  90. level: The loglevel of the received log message
  91. msg: The content of the message
  92. """
  93. logger.log(level=level, msg=msg)
  94. def stdstream(self, level: int, msg: str):
  95. """Log the stdout/stderr entry from the log stream.
  96. By default, calls print but this can be overridden.
  97. Args:
  98. level: The loglevel of the received log message
  99. msg: The content of the message
  100. """
  101. print_file = sys.stderr if level == -2 else sys.stdout
  102. print(msg, file=print_file, end="")
  103. def set_logstream_level(self, level: int):
  104. logger.setLevel(level)
  105. req = ray_client_pb2.LogSettingsRequest()
  106. req.enabled = True
  107. req.loglevel = level
  108. self.request_queue.put(req)
  109. self.last_req = req
  110. def close(self) -> None:
  111. self.request_queue.put(None)
  112. if self.log_thread is not None:
  113. self.log_thread.join()
  114. def disable_logs(self) -> None:
  115. req = ray_client_pb2.LogSettingsRequest()
  116. req.enabled = False
  117. self.request_queue.put(req)
  118. self.last_req = req