event_logger.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import json
  2. import logging
  3. import os
  4. import pathlib
  5. import random
  6. import socket
  7. import string
  8. import threading
  9. from datetime import datetime
  10. from typing import Dict, Optional
  11. from google.protobuf.json_format import Parse
  12. from ray._private.protobuf_compat import message_to_dict
  13. from ray.core.generated.event_pb2 import Event
  14. global_logger = logging.getLogger(__name__)
  15. def get_event_id():
  16. return "".join([random.choice(string.hexdigits) for _ in range(36)])
  17. class EventLoggerAdapter:
  18. def __init__(self, source: Event.SourceType, logger: logging.Logger):
  19. """Adapter for the Python logger that's used to emit events.
  20. When events are emitted, they are aggregated and available via
  21. state API and dashboard.
  22. This class is thread-safe.
  23. """
  24. self.logger = logger
  25. # Aligned with `event.proto`'s `message Event``
  26. self.source = source
  27. self.source_hostname = socket.gethostname()
  28. self.source_pid = os.getpid()
  29. # The below fields must be protected by this lock.
  30. self.lock = threading.Lock()
  31. # {str -> str} typed dict
  32. self.global_context = {}
  33. def set_global_context(self, global_context: Dict[str, str] = None):
  34. """Set the global metadata.
  35. This method overwrites the global metadata if it is called more than once.
  36. """
  37. with self.lock:
  38. self.global_context = {} if not global_context else global_context
  39. def trace(self, message: str, **kwargs):
  40. self._emit(Event.Severity.TRACE, message, **kwargs)
  41. def debug(self, message: str, **kwargs):
  42. self._emit(Event.Severity.DEBUG, message, **kwargs)
  43. def info(self, message: str, **kwargs):
  44. self._emit(Event.Severity.INFO, message, **kwargs)
  45. def warning(self, message: str, **kwargs):
  46. self._emit(Event.Severity.WARNING, message, **kwargs)
  47. def error(self, message: str, **kwargs):
  48. self._emit(Event.Severity.ERROR, message, **kwargs)
  49. def fatal(self, message: str, **kwargs):
  50. self._emit(Event.Severity.FATAL, message, **kwargs)
  51. def _emit(self, severity: Event.Severity, message: str, **kwargs):
  52. # NOTE: Python logger is thread-safe,
  53. # so we don't need to protect it using locks.
  54. event = Event()
  55. event.event_id = get_event_id()
  56. event.timestamp = int(datetime.now().timestamp())
  57. event.message = message
  58. event.severity = severity
  59. # TODO(sang): Support event type & schema.
  60. event.label = ""
  61. event.source_type = self.source
  62. event.source_hostname = self.source_hostname
  63. event.source_pid = self.source_pid
  64. custom_fields = event.custom_fields
  65. with self.lock:
  66. for k, v in self.global_context.items():
  67. if v is not None and k is not None:
  68. custom_fields[k] = v
  69. for k, v in kwargs.items():
  70. if v is not None and k is not None:
  71. custom_fields[k] = v
  72. self.logger.info(
  73. json.dumps(
  74. message_to_dict(
  75. event,
  76. always_print_fields_with_no_presence=True,
  77. preserving_proto_field_name=True,
  78. use_integers_for_enums=False,
  79. )
  80. )
  81. )
  82. # Force flush so that we won't lose events
  83. self.logger.handlers[0].flush()
  84. def _build_event_file_logger(source: Event.SourceType, sink_dir: str):
  85. logger = logging.getLogger("_ray_event_logger")
  86. logger.setLevel(logging.INFO)
  87. dir_path = pathlib.Path(sink_dir) / "events"
  88. filepath = dir_path / f"event_{source}.log"
  89. dir_path.mkdir(exist_ok=True)
  90. filepath.touch(exist_ok=True)
  91. # Configure the logger.
  92. handler = logging.FileHandler(filepath)
  93. formatter = logging.Formatter("%(message)s")
  94. handler.setFormatter(formatter)
  95. logger.addHandler(handler)
  96. logger.propagate = False
  97. return logger
  98. # This lock must be used when accessing or updating global event logger dict.
  99. _event_logger_lock = threading.Lock()
  100. _event_logger = {}
  101. def get_event_logger(source: Event.SourceType, sink_dir: str):
  102. """Get the event logger of the current process.
  103. There's only 1 event logger per (process, source).
  104. TODO(sang): Support more impl than file-based logging.
  105. Currently, the interface also ties to the
  106. file-based logging impl.
  107. Args:
  108. source: The source of the event.
  109. sink_dir: The directory to sink event logs.
  110. """
  111. with _event_logger_lock:
  112. global _event_logger
  113. source_name = Event.SourceType.Name(source)
  114. if source_name not in _event_logger:
  115. logger = _build_event_file_logger(source_name, sink_dir)
  116. _event_logger[source_name] = EventLoggerAdapter(source, logger)
  117. return _event_logger[source_name]
  118. def parse_event(event_str: str) -> Optional[Event]:
  119. """Parse an event from a string.
  120. Args:
  121. event_str: The string to parse. Expect to be a JSON serialized
  122. Event protobuf.
  123. Returns:
  124. The parsed event if parsable, else None
  125. """
  126. try:
  127. return Parse(event_str, Event())
  128. except Exception:
  129. global_logger.exception(f"Failed to parse event: {event_str}")
  130. return None
  131. def filter_event_by_level(event: Event, filter_event_level: str) -> bool:
  132. """Filter an event based on event level.
  133. Args:
  134. event: The event to filter.
  135. filter_event_level: The event level string to filter by. Any events
  136. that are lower than this level will be filtered.
  137. Returns:
  138. True if the event should be filtered, else False.
  139. """
  140. event_levels = {
  141. Event.Severity.TRACE: 0,
  142. Event.Severity.DEBUG: 1,
  143. Event.Severity.INFO: 2,
  144. Event.Severity.WARNING: 3,
  145. Event.Severity.ERROR: 4,
  146. Event.Severity.FATAL: 5,
  147. }
  148. filter_event_level = filter_event_level.upper()
  149. filter_event_level = Event.Severity.Value(filter_event_level)
  150. if event_levels[event.severity] < event_levels[filter_event_level]:
  151. return True
  152. return False