logging_utils.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. import builtins
  2. import logging
  3. import os
  4. import sys
  5. import traceback
  6. from typing import Any, Optional
  7. import ray
  8. from ray._common.filters import CoreContextFilter
  9. from ray._common.formatters import JSONFormatter, TextFormatter
  10. from ray._common.ray_constants import LOGGING_ROTATE_BACKUP_COUNT, LOGGING_ROTATE_BYTES
  11. from ray.serve._private.common import ServeComponentType
  12. from ray.serve._private.constants import (
  13. RAY_SERVE_ENABLE_MEMORY_PROFILING,
  14. RAY_SERVE_LOG_TO_STDERR,
  15. SERVE_LOG_APPLICATION,
  16. SERVE_LOG_COMPONENT,
  17. SERVE_LOG_COMPONENT_ID,
  18. SERVE_LOG_DEPLOYMENT,
  19. SERVE_LOG_LEVEL_NAME,
  20. SERVE_LOG_MESSAGE,
  21. SERVE_LOG_RECORD_FORMAT,
  22. SERVE_LOG_REPLICA,
  23. SERVE_LOG_REQUEST_ID,
  24. SERVE_LOG_ROUTE,
  25. SERVE_LOG_TIME,
  26. SERVE_LOG_UNWANTED_ATTRS,
  27. SERVE_LOGGER_NAME,
  28. )
  29. from ray.serve._private.utils import get_component_file_name
  30. from ray.serve.schema import EncodingType, LoggingConfig
  31. buildin_print = builtins.print
  32. def should_skip_context_filter(record: logging.LogRecord) -> bool:
  33. """Check if the log record should skip the context filter."""
  34. return getattr(record, "skip_context_filter", False)
  35. class ServeCoreContextFilter(CoreContextFilter):
  36. def filter(self, record: logging.LogRecord) -> bool:
  37. if should_skip_context_filter(record):
  38. return True
  39. return super().filter(record)
  40. class ServeComponentFilter(logging.Filter):
  41. """Serve component filter.
  42. The filter will add the component name, id, and type to the log record.
  43. """
  44. def __init__(
  45. self,
  46. component_name: str,
  47. component_id: str,
  48. component_type: Optional[ServeComponentType] = None,
  49. ):
  50. self.component_name = component_name
  51. self.component_id = component_id
  52. self.component_type = component_type
  53. def filter(self, record: logging.LogRecord) -> bool:
  54. """Add component attributes to the log record.
  55. Note: the filter doesn't do any filtering, it only adds the component
  56. attributes.
  57. """
  58. if should_skip_context_filter(record):
  59. return True
  60. if self.component_type and self.component_type == ServeComponentType.REPLICA:
  61. setattr(record, SERVE_LOG_DEPLOYMENT, self.component_name)
  62. setattr(record, SERVE_LOG_REPLICA, self.component_id)
  63. setattr(record, SERVE_LOG_COMPONENT, self.component_type)
  64. else:
  65. setattr(record, SERVE_LOG_COMPONENT, self.component_name)
  66. setattr(record, SERVE_LOG_COMPONENT_ID, self.component_id)
  67. return True
  68. class ServeContextFilter(logging.Filter):
  69. """Serve context filter.
  70. The filter will add the route, request id, app name to the log record.
  71. Note: the filter doesn't do any filtering, it only adds the serve request context
  72. attributes.
  73. """
  74. def filter(self, record):
  75. if should_skip_context_filter(record):
  76. return True
  77. request_context = ray.serve.context._get_serve_request_context()
  78. if request_context.route:
  79. setattr(record, SERVE_LOG_ROUTE, request_context.route)
  80. if request_context.request_id:
  81. setattr(record, SERVE_LOG_REQUEST_ID, request_context.request_id)
  82. if request_context.app_name:
  83. setattr(record, SERVE_LOG_APPLICATION, request_context.app_name)
  84. return True
  85. class ServeLogAttributeRemovalFilter(logging.Filter):
  86. """Serve log attribute removal filter.
  87. The filter will remove unwanted attributes on the log record so they won't be
  88. included in the structured logs.
  89. Note: the filter doesn't do any filtering, it only removes unwanted attributes.
  90. """
  91. def filter(self, record):
  92. for key in SERVE_LOG_UNWANTED_ATTRS:
  93. if hasattr(record, key):
  94. delattr(record, key)
  95. return True
  96. class ServeFormatter(TextFormatter):
  97. """Serve Logging Formatter
  98. The formatter will generate the log format on the fly based on the field of record.
  99. Optimized to pre-compute format strings and formatters for better performance.
  100. """
  101. COMPONENT_LOG_FMT = f"%({SERVE_LOG_LEVEL_NAME})s %({SERVE_LOG_TIME})s {{{SERVE_LOG_COMPONENT}}} {{{SERVE_LOG_COMPONENT_ID}}} " # noqa:E501
  102. def __init__(
  103. self,
  104. component_name: str,
  105. component_id: str,
  106. fmt: Optional[str] = None,
  107. datefmt: Optional[str] = None,
  108. style: str = "%",
  109. validate: bool = True,
  110. ):
  111. super().__init__(fmt, datefmt, style, validate)
  112. self.component_log_fmt = ServeFormatter.COMPONENT_LOG_FMT.format(
  113. component_name=component_name, component_id=component_id
  114. )
  115. # Pre-compute format strings and formatters for performance
  116. self._precompute_formatters()
  117. def set_additional_log_standard_attrs(self, *args, **kwargs):
  118. super().set_additional_log_standard_attrs(*args, **kwargs)
  119. self._precompute_formatters()
  120. def _precompute_formatters(self):
  121. self.base_formatter = self._create_formatter([])
  122. self.request_formatter = self._create_formatter(
  123. [SERVE_LOG_RECORD_FORMAT[SERVE_LOG_REQUEST_ID]]
  124. )
  125. def _create_formatter(self, initial_attrs: list) -> logging.Formatter:
  126. attrs = initial_attrs.copy()
  127. attrs.extend([f"%({k})s" for k in self.additional_log_standard_attrs])
  128. attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_MESSAGE])
  129. format_string = self.component_log_fmt + " ".join(attrs)
  130. return logging.Formatter(format_string)
  131. def format(self, record: logging.LogRecord) -> str:
  132. """Format the log record into the format string.
  133. Args:
  134. record: The log record to be formatted.
  135. Returns:
  136. The formatted log record in string format.
  137. """
  138. # Use pre-computed formatters for better performance
  139. if SERVE_LOG_REQUEST_ID in record.__dict__:
  140. return self.request_formatter.format(record)
  141. else:
  142. return self.base_formatter.format(record)
  143. def access_log_msg(*, method: str, route: str, status: str, latency_ms: float):
  144. """Returns a formatted message for an HTTP or ServeHandle access log."""
  145. return f"{method} {route} {status} {latency_ms:.1f}ms"
  146. def log_to_stderr_filter(record: logging.LogRecord) -> bool:
  147. """Filters log records based on a parameter in the `extra` dictionary."""
  148. if not hasattr(record, "log_to_stderr") or record.log_to_stderr is None:
  149. return True
  150. return record.log_to_stderr
  151. def log_access_log_filter(record: logging.LogRecord) -> bool:
  152. """Filters ray serve access log based on 'serve_access_log' key in `extra` dict."""
  153. if not hasattr(record, "serve_access_log") or record.serve_access_log is None:
  154. return True
  155. return not record.serve_access_log
  156. def get_component_logger_file_path() -> Optional[str]:
  157. """Returns the relative file path for the Serve logger, if it exists.
  158. If a logger was configured through configure_component_logger() for the Serve
  159. component that's calling this function, this returns the location of the log file
  160. relative to the ray logs directory.
  161. """
  162. logger = logging.getLogger(SERVE_LOGGER_NAME)
  163. for handler in logger.handlers:
  164. if isinstance(handler, logging.handlers.MemoryHandler):
  165. absolute_path = handler.target.baseFilename
  166. ray_logs_dir = ray._private.worker._global_node.get_logs_dir_path()
  167. if absolute_path.startswith(ray_logs_dir):
  168. return absolute_path[len(ray_logs_dir) :]
  169. class StreamToLogger(object):
  170. """
  171. Fake file-like stream object that redirects writes to a logger instance.
  172. This comes from https://stackoverflow.com/a/36296215 directly.
  173. """
  174. def __init__(self, logger: logging.Logger, log_level: int, original_object: Any):
  175. self._logger = logger
  176. self._log_level = log_level
  177. self._original_object = original_object
  178. self._linebuf = ""
  179. def __getattr__(self, attr: str) -> Any:
  180. # getting attributes from the original object
  181. return getattr(self._original_object, attr)
  182. @staticmethod
  183. def get_stacklevel() -> int:
  184. """Rewind stack to get the stacklevel for the user code.
  185. Going from the back of the traceback and traverse until it's no longer in
  186. logging_utils.py or site-packages.
  187. """
  188. reverse_traces = traceback.extract_stack()[::-1]
  189. for index, trace in enumerate(reverse_traces):
  190. if (
  191. "logging_utils.py" not in trace.filename
  192. and "site-packages" not in trace.filename
  193. ):
  194. return index
  195. return 1
  196. def write(self, buf: str):
  197. temp_linebuf = self._linebuf + buf
  198. self._linebuf = ""
  199. for line in temp_linebuf.splitlines(True):
  200. # From the io.TextIOWrapper docs:
  201. # On output, if newline is None, any '\n' characters written
  202. # are translated to the system default line separator.
  203. # By default sys.stdout.write() expects '\n' newlines and then
  204. # translates them so this is still cross-platform.
  205. if line[-1] == "\n":
  206. self._logger.log(
  207. self._log_level,
  208. line.rstrip(),
  209. stacklevel=self.get_stacklevel(),
  210. )
  211. else:
  212. self._linebuf += line
  213. def flush(self):
  214. if self._linebuf != "":
  215. self._logger.log(
  216. self._log_level,
  217. self._linebuf.rstrip(),
  218. stacklevel=self.get_stacklevel(),
  219. )
  220. self._linebuf = ""
  221. def isatty(self) -> bool:
  222. return True
  223. def redirected_print(*objects, sep=" ", end="\n", file=None, flush=False):
  224. """Implement python's print function to redirect logs to Serve's logger.
  225. If the file is set to anything other than stdout, stderr, or None, call the
  226. builtin print. Else, construct the message and redirect to Serve's logger.
  227. See https://docs.python.org/3/library/functions.html#print
  228. """
  229. if file not in [sys.stdout, sys.stderr, None]:
  230. return buildin_print(objects, sep=sep, end=end, file=file, flush=flush)
  231. serve_logger = logging.getLogger(SERVE_LOGGER_NAME)
  232. message = sep.join(map(str, objects)) + end
  233. # We monkey patched print function, so this is always at stack level 2.
  234. serve_logger.log(logging.INFO, message, stacklevel=2)
  235. def configure_component_logger(
  236. *,
  237. component_name: str,
  238. component_id: str,
  239. logging_config: LoggingConfig,
  240. component_type: Optional[ServeComponentType] = None,
  241. max_bytes: Optional[int] = None,
  242. backup_count: Optional[int] = None,
  243. stream_handler_only: bool = False,
  244. buffer_size: int = 1,
  245. ):
  246. """Configure a logger to be used by a Serve component.
  247. The logger will log using a standard format to make components identifiable
  248. using the provided name and unique ID for this instance (e.g., replica ID).
  249. This logger will *not* propagate its log messages to the parent logger(s).
  250. """
  251. logger = logging.getLogger(SERVE_LOGGER_NAME)
  252. logger.propagate = False
  253. logger.setLevel(logging_config.log_level)
  254. logger.handlers.clear()
  255. serve_formatter = ServeFormatter(component_name, component_id)
  256. json_formatter = JSONFormatter()
  257. if logging_config.additional_log_standard_attrs:
  258. json_formatter.set_additional_log_standard_attrs(
  259. logging_config.additional_log_standard_attrs
  260. )
  261. serve_formatter.set_additional_log_standard_attrs(
  262. logging_config.additional_log_standard_attrs
  263. )
  264. # Only add stream handler if RAY_SERVE_LOG_TO_STDERR is True or if
  265. # `stream_handler_only` is set to True.
  266. if RAY_SERVE_LOG_TO_STDERR or stream_handler_only:
  267. stream_handler = logging.StreamHandler()
  268. stream_handler.setFormatter(serve_formatter)
  269. stream_handler.addFilter(log_to_stderr_filter)
  270. stream_handler.addFilter(ServeContextFilter())
  271. logger.addHandler(stream_handler)
  272. # Skip setting up file handler and stdout/stderr redirect if `stream_handler_only`
  273. # is set to True. Logger such as default serve logger can be configured outside the
  274. # context of a Serve component, we don't want those logs to redirect into serve's
  275. # logger and log files.
  276. if stream_handler_only:
  277. return
  278. if logging_config.logs_dir:
  279. logs_dir = logging_config.logs_dir
  280. else:
  281. logs_dir = get_serve_logs_dir()
  282. os.makedirs(logs_dir, exist_ok=True)
  283. if max_bytes is None:
  284. max_bytes = ray._private.worker._global_node.max_bytes
  285. if backup_count is None:
  286. backup_count = ray._private.worker._global_node.backup_count
  287. log_file_name = get_component_file_name(
  288. component_name=component_name,
  289. component_id=component_id,
  290. component_type=component_type,
  291. suffix=".log",
  292. )
  293. file_handler = logging.handlers.RotatingFileHandler(
  294. os.path.join(logs_dir, log_file_name),
  295. maxBytes=max_bytes,
  296. backupCount=backup_count,
  297. )
  298. # Create a memory handler that buffers log records and flushes to file handler
  299. # Buffer capacity: buffer_size records
  300. # Flush triggers: buffer full, ERROR messages, or explicit flush
  301. memory_handler = logging.handlers.MemoryHandler(
  302. capacity=buffer_size,
  303. target=file_handler,
  304. flushLevel=logging.ERROR, # Auto-flush on ERROR/CRITICAL
  305. )
  306. # Add filters directly to the memory handler effective for both buffered and non buffered cases
  307. if logging_config.encoding == EncodingType.JSON:
  308. memory_handler.addFilter(ServeCoreContextFilter())
  309. memory_handler.addFilter(ServeContextFilter())
  310. memory_handler.addFilter(
  311. ServeComponentFilter(component_name, component_id, component_type)
  312. )
  313. file_handler.setFormatter(json_formatter)
  314. else:
  315. file_handler.setFormatter(serve_formatter)
  316. if logging_config.enable_access_log is False:
  317. memory_handler.addFilter(log_access_log_filter)
  318. else:
  319. memory_handler.addFilter(ServeContextFilter())
  320. # Remove unwanted attributes from the log record.
  321. memory_handler.addFilter(ServeLogAttributeRemovalFilter())
  322. # Redirect print, stdout, and stderr to Serve logger, only when it's on the replica.
  323. if not RAY_SERVE_LOG_TO_STDERR and component_type == ServeComponentType.REPLICA:
  324. builtins.print = redirected_print
  325. sys.stdout = StreamToLogger(logger, logging.INFO, sys.stdout)
  326. sys.stderr = StreamToLogger(logger, logging.INFO, sys.stderr)
  327. # Add the memory handler instead of the file handler directly
  328. logger.addHandler(memory_handler)
  329. # Configure a dedicated rotating file logger for autoscaling snapshots.
  330. def configure_autoscaling_snapshot_logger(
  331. *, component_id: str, logging_config: LoggingConfig
  332. ) -> logging.Logger:
  333. """Configure a dedicated logger for autoscaling snapshots.
  334. - Writes to `autoscaling_snapshot_<pid>.log` under the Serve logs dir.
  335. """
  336. logger_obj = logging.getLogger(f"{SERVE_LOGGER_NAME}.snapshot")
  337. logger_obj.propagate = False
  338. logger_obj.setLevel(logging_config.log_level)
  339. logger_obj.handlers.clear()
  340. logs_dir = logging_config.logs_dir or get_serve_logs_dir()
  341. os.makedirs(logs_dir, exist_ok=True)
  342. max_bytes = ray._private.worker._global_node.max_bytes
  343. backup_count = ray._private.worker._global_node.backup_count
  344. file_name = get_component_file_name(
  345. component_name="autoscaling_snapshot",
  346. component_id=component_id,
  347. component_type=None,
  348. suffix=".log",
  349. )
  350. file_path = os.path.join(logs_dir, file_name)
  351. handler = logging.handlers.RotatingFileHandler(
  352. file_path, maxBytes=max_bytes, backupCount=backup_count
  353. )
  354. handler.setFormatter(JSONFormatter())
  355. logger_obj.addHandler(handler)
  356. return logger_obj
  357. def configure_default_serve_logger():
  358. """Helper function to configure the default Serve logger that's used outside of
  359. individual Serve components."""
  360. configure_component_logger(
  361. component_name="serve",
  362. component_id=str(os.getpid()),
  363. logging_config=LoggingConfig(),
  364. max_bytes=LOGGING_ROTATE_BYTES,
  365. backup_count=LOGGING_ROTATE_BACKUP_COUNT,
  366. stream_handler_only=True,
  367. )
  368. def configure_component_memory_profiler(
  369. component_name: str,
  370. component_id: str,
  371. component_type: Optional[ServeComponentType] = None,
  372. ):
  373. """Configures the memory logger for this component.
  374. Does nothing if RAY_SERVE_ENABLE_MEMORY_PROFILING is disabled.
  375. """
  376. if RAY_SERVE_ENABLE_MEMORY_PROFILING:
  377. logger = logging.getLogger(SERVE_LOGGER_NAME)
  378. try:
  379. import memray
  380. logs_dir = get_serve_logs_dir()
  381. memray_file_name = get_component_file_name(
  382. component_name=component_name,
  383. component_id=component_id,
  384. component_type=component_type,
  385. suffix="_memray_0.bin",
  386. )
  387. memray_file_path = os.path.join(logs_dir, memray_file_name)
  388. # If the actor restarted, memray requires a new file to start
  389. # tracking memory.
  390. restart_counter = 1
  391. while os.path.exists(memray_file_path):
  392. memray_file_name = get_component_file_name(
  393. component_name=component_name,
  394. component_id=component_id,
  395. component_type=component_type,
  396. suffix=f"_memray_{restart_counter}.bin",
  397. )
  398. memray_file_path = os.path.join(logs_dir, memray_file_name)
  399. restart_counter += 1
  400. # Memray usually tracks the memory usage of only a block of code
  401. # within a context manager. We explicitly call __enter__ here
  402. # instead of using a context manager to track memory usage across
  403. # all of the caller's code instead.
  404. memray.Tracker(memray_file_path, native_traces=True).__enter__()
  405. logger.info(
  406. "RAY_SERVE_ENABLE_MEMORY_PROFILING is enabled. Started a "
  407. "memray tracker on this actor. Tracker file located at "
  408. f'"{memray_file_path}"'
  409. )
  410. except ImportError:
  411. logger.warning(
  412. "RAY_SERVE_ENABLE_MEMORY_PROFILING is enabled, but memray "
  413. "is not installed. No memory profiling is happening. "
  414. "`pip install memray` to enable memory profiling."
  415. )
  416. def get_serve_logs_dir() -> str:
  417. """Get the directory that stores Serve log files.
  418. If `ray._private.worker._global_node` is None (running outside the context of Ray),
  419. then the current working directory with subdirectory of serve is used as the logs
  420. directory. Otherwise, the logs directory is determined by the global node's logs
  421. directory path.
  422. """
  423. if ray._private.worker._global_node is None:
  424. return os.path.join(os.getcwd(), "serve")
  425. return os.path.join(ray._private.worker._global_node.get_logs_dir_path(), "serve")
  426. class LoggingContext:
  427. """
  428. Context manager to manage logging behaviors within a particular block, such as:
  429. 1) Overriding logging level
  430. Source (python3 official documentation)
  431. https://docs.python.org/3/howto/logging-cookbook.html#using-a-context-manager-for-selective-logging # noqa: E501
  432. """
  433. def __init__(self, logger, level=None):
  434. self.logger = logger
  435. self.level = level
  436. def __enter__(self):
  437. if self.level is not None:
  438. self.old_level = self.logger.level
  439. self.logger.setLevel(self.level)
  440. def __exit__(self, et, ev, tb):
  441. if self.level is not None:
  442. self.logger.setLevel(self.old_level)