logservicer.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. """This file responds to log stream requests and forwards logs
  2. with its handler.
  3. """
  4. import io
  5. import logging
  6. import queue
  7. import threading
  8. import uuid
  9. import grpc
  10. import ray.core.generated.ray_client_pb2 as ray_client_pb2
  11. import ray.core.generated.ray_client_pb2_grpc as ray_client_pb2_grpc
  12. from ray._private.ray_logging import global_worker_stdstream_dispatcher
  13. from ray._private.worker import print_worker_logs
  14. from ray.util.client.common import CLIENT_SERVER_MAX_THREADS
  15. logger = logging.getLogger(__name__)
  16. class LogstreamHandler(logging.Handler):
  17. def __init__(self, queue, level):
  18. super().__init__()
  19. self.queue = queue
  20. self.level = level
  21. def emit(self, record: logging.LogRecord):
  22. logdata = ray_client_pb2.LogData()
  23. logdata.msg = record.getMessage()
  24. logdata.level = record.levelno
  25. logdata.name = record.name
  26. self.queue.put(logdata)
  27. class StdStreamHandler:
  28. def __init__(self, queue):
  29. self.queue = queue
  30. self.id = str(uuid.uuid4())
  31. def handle(self, data):
  32. logdata = ray_client_pb2.LogData()
  33. logdata.level = -2 if data["is_err"] else -1
  34. logdata.name = "stderr" if data["is_err"] else "stdout"
  35. with io.StringIO() as file:
  36. print_worker_logs(data, file)
  37. logdata.msg = file.getvalue()
  38. self.queue.put(logdata)
  39. def register_global(self):
  40. global_worker_stdstream_dispatcher.add_handler(self.id, self.handle)
  41. def unregister_global(self):
  42. global_worker_stdstream_dispatcher.remove_handler(self.id)
  43. def log_status_change_thread(log_queue, request_iterator):
  44. std_handler = StdStreamHandler(log_queue)
  45. current_handler = None
  46. root_logger = logging.getLogger("ray")
  47. default_level = root_logger.getEffectiveLevel()
  48. try:
  49. for req in request_iterator:
  50. if current_handler is not None:
  51. root_logger.setLevel(default_level)
  52. root_logger.removeHandler(current_handler)
  53. std_handler.unregister_global()
  54. if not req.enabled:
  55. current_handler = None
  56. continue
  57. current_handler = LogstreamHandler(log_queue, req.loglevel)
  58. std_handler.register_global()
  59. root_logger.addHandler(current_handler)
  60. root_logger.setLevel(req.loglevel)
  61. except grpc.RpcError as e:
  62. logger.debug(f"closing log thread " f"grpc error reading request_iterator: {e}")
  63. finally:
  64. if current_handler is not None:
  65. root_logger.setLevel(default_level)
  66. root_logger.removeHandler(current_handler)
  67. std_handler.unregister_global()
  68. log_queue.put(None)
  69. class LogstreamServicer(ray_client_pb2_grpc.RayletLogStreamerServicer):
  70. def __init__(self):
  71. super().__init__()
  72. self.num_clients = 0
  73. self.client_lock = threading.Lock()
  74. def Logstream(self, request_iterator, context):
  75. initialized = False
  76. with self.client_lock:
  77. threshold = CLIENT_SERVER_MAX_THREADS / 2
  78. if self.num_clients + 1 >= threshold:
  79. context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
  80. logger.warning(
  81. f"Logstream: Num clients {self.num_clients} has reached "
  82. f"the threshold {threshold}. Rejecting new connection."
  83. )
  84. return
  85. self.num_clients += 1
  86. initialized = True
  87. logger.info(
  88. "New logs connection established. " f"Total clients: {self.num_clients}"
  89. )
  90. log_queue = queue.Queue()
  91. thread = threading.Thread(
  92. target=log_status_change_thread,
  93. args=(log_queue, request_iterator),
  94. daemon=True,
  95. )
  96. thread.start()
  97. try:
  98. queue_iter = iter(log_queue.get, None)
  99. for record in queue_iter:
  100. if record is None:
  101. break
  102. yield record
  103. except grpc.RpcError as e:
  104. logger.debug(f"Closing log channel: {e}")
  105. finally:
  106. thread.join()
  107. with self.client_lock:
  108. if initialized:
  109. self.num_clients -= 1