| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550 |
- import builtins
- import logging
- import os
- import sys
- import traceback
- from typing import Any, Optional
- import ray
- from ray._common.filters import CoreContextFilter
- from ray._common.formatters import JSONFormatter, TextFormatter
- from ray._common.ray_constants import LOGGING_ROTATE_BACKUP_COUNT, LOGGING_ROTATE_BYTES
- from ray.serve._private.common import ServeComponentType
- from ray.serve._private.constants import (
- RAY_SERVE_ENABLE_MEMORY_PROFILING,
- RAY_SERVE_LOG_TO_STDERR,
- SERVE_LOG_APPLICATION,
- SERVE_LOG_COMPONENT,
- SERVE_LOG_COMPONENT_ID,
- SERVE_LOG_DEPLOYMENT,
- SERVE_LOG_LEVEL_NAME,
- SERVE_LOG_MESSAGE,
- SERVE_LOG_RECORD_FORMAT,
- SERVE_LOG_REPLICA,
- SERVE_LOG_REQUEST_ID,
- SERVE_LOG_ROUTE,
- SERVE_LOG_TIME,
- SERVE_LOG_UNWANTED_ATTRS,
- SERVE_LOGGER_NAME,
- )
- from ray.serve._private.utils import get_component_file_name
- from ray.serve.schema import EncodingType, LoggingConfig
- buildin_print = builtins.print
- def should_skip_context_filter(record: logging.LogRecord) -> bool:
- """Check if the log record should skip the context filter."""
- return getattr(record, "skip_context_filter", False)
- class ServeCoreContextFilter(CoreContextFilter):
- def filter(self, record: logging.LogRecord) -> bool:
- if should_skip_context_filter(record):
- return True
- return super().filter(record)
- class ServeComponentFilter(logging.Filter):
- """Serve component filter.
- The filter will add the component name, id, and type to the log record.
- """
- def __init__(
- self,
- component_name: str,
- component_id: str,
- component_type: Optional[ServeComponentType] = None,
- ):
- self.component_name = component_name
- self.component_id = component_id
- self.component_type = component_type
- def filter(self, record: logging.LogRecord) -> bool:
- """Add component attributes to the log record.
- Note: the filter doesn't do any filtering, it only adds the component
- attributes.
- """
- if should_skip_context_filter(record):
- return True
- if self.component_type and self.component_type == ServeComponentType.REPLICA:
- setattr(record, SERVE_LOG_DEPLOYMENT, self.component_name)
- setattr(record, SERVE_LOG_REPLICA, self.component_id)
- setattr(record, SERVE_LOG_COMPONENT, self.component_type)
- else:
- setattr(record, SERVE_LOG_COMPONENT, self.component_name)
- setattr(record, SERVE_LOG_COMPONENT_ID, self.component_id)
- return True
- class ServeContextFilter(logging.Filter):
- """Serve context filter.
- The filter will add the route, request id, app name to the log record.
- Note: the filter doesn't do any filtering, it only adds the serve request context
- attributes.
- """
- def filter(self, record):
- if should_skip_context_filter(record):
- return True
- request_context = ray.serve.context._get_serve_request_context()
- if request_context.route:
- setattr(record, SERVE_LOG_ROUTE, request_context.route)
- if request_context.request_id:
- setattr(record, SERVE_LOG_REQUEST_ID, request_context.request_id)
- if request_context.app_name:
- setattr(record, SERVE_LOG_APPLICATION, request_context.app_name)
- return True
- class ServeLogAttributeRemovalFilter(logging.Filter):
- """Serve log attribute removal filter.
- The filter will remove unwanted attributes on the log record so they won't be
- included in the structured logs.
- Note: the filter doesn't do any filtering, it only removes unwanted attributes.
- """
- def filter(self, record):
- for key in SERVE_LOG_UNWANTED_ATTRS:
- if hasattr(record, key):
- delattr(record, key)
- return True
- class ServeFormatter(TextFormatter):
- """Serve Logging Formatter
- The formatter will generate the log format on the fly based on the field of record.
- Optimized to pre-compute format strings and formatters for better performance.
- """
- COMPONENT_LOG_FMT = f"%({SERVE_LOG_LEVEL_NAME})s %({SERVE_LOG_TIME})s {{{SERVE_LOG_COMPONENT}}} {{{SERVE_LOG_COMPONENT_ID}}} " # noqa:E501
- def __init__(
- self,
- component_name: str,
- component_id: str,
- fmt: Optional[str] = None,
- datefmt: Optional[str] = None,
- style: str = "%",
- validate: bool = True,
- ):
- super().__init__(fmt, datefmt, style, validate)
- self.component_log_fmt = ServeFormatter.COMPONENT_LOG_FMT.format(
- component_name=component_name, component_id=component_id
- )
- # Pre-compute format strings and formatters for performance
- self._precompute_formatters()
- def set_additional_log_standard_attrs(self, *args, **kwargs):
- super().set_additional_log_standard_attrs(*args, **kwargs)
- self._precompute_formatters()
- def _precompute_formatters(self):
- self.base_formatter = self._create_formatter([])
- self.request_formatter = self._create_formatter(
- [SERVE_LOG_RECORD_FORMAT[SERVE_LOG_REQUEST_ID]]
- )
- def _create_formatter(self, initial_attrs: list) -> logging.Formatter:
- attrs = initial_attrs.copy()
- attrs.extend([f"%({k})s" for k in self.additional_log_standard_attrs])
- attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_MESSAGE])
- format_string = self.component_log_fmt + " ".join(attrs)
- return logging.Formatter(format_string)
- def format(self, record: logging.LogRecord) -> str:
- """Format the log record into the format string.
- Args:
- record: The log record to be formatted.
- Returns:
- The formatted log record in string format.
- """
- # Use pre-computed formatters for better performance
- if SERVE_LOG_REQUEST_ID in record.__dict__:
- return self.request_formatter.format(record)
- else:
- return self.base_formatter.format(record)
- def access_log_msg(*, method: str, route: str, status: str, latency_ms: float):
- """Returns a formatted message for an HTTP or ServeHandle access log."""
- return f"{method} {route} {status} {latency_ms:.1f}ms"
- def log_to_stderr_filter(record: logging.LogRecord) -> bool:
- """Filters log records based on a parameter in the `extra` dictionary."""
- if not hasattr(record, "log_to_stderr") or record.log_to_stderr is None:
- return True
- return record.log_to_stderr
- def log_access_log_filter(record: logging.LogRecord) -> bool:
- """Filters ray serve access log based on 'serve_access_log' key in `extra` dict."""
- if not hasattr(record, "serve_access_log") or record.serve_access_log is None:
- return True
- return not record.serve_access_log
- def get_component_logger_file_path() -> Optional[str]:
- """Returns the relative file path for the Serve logger, if it exists.
- If a logger was configured through configure_component_logger() for the Serve
- component that's calling this function, this returns the location of the log file
- relative to the ray logs directory.
- """
- logger = logging.getLogger(SERVE_LOGGER_NAME)
- for handler in logger.handlers:
- if isinstance(handler, logging.handlers.MemoryHandler):
- absolute_path = handler.target.baseFilename
- ray_logs_dir = ray._private.worker._global_node.get_logs_dir_path()
- if absolute_path.startswith(ray_logs_dir):
- return absolute_path[len(ray_logs_dir) :]
- class StreamToLogger(object):
- """
- Fake file-like stream object that redirects writes to a logger instance.
- This comes from https://stackoverflow.com/a/36296215 directly.
- """
- def __init__(self, logger: logging.Logger, log_level: int, original_object: Any):
- self._logger = logger
- self._log_level = log_level
- self._original_object = original_object
- self._linebuf = ""
- def __getattr__(self, attr: str) -> Any:
- # getting attributes from the original object
- return getattr(self._original_object, attr)
- @staticmethod
- def get_stacklevel() -> int:
- """Rewind stack to get the stacklevel for the user code.
- Going from the back of the traceback and traverse until it's no longer in
- logging_utils.py or site-packages.
- """
- reverse_traces = traceback.extract_stack()[::-1]
- for index, trace in enumerate(reverse_traces):
- if (
- "logging_utils.py" not in trace.filename
- and "site-packages" not in trace.filename
- ):
- return index
- return 1
- def write(self, buf: str):
- temp_linebuf = self._linebuf + buf
- self._linebuf = ""
- for line in temp_linebuf.splitlines(True):
- # From the io.TextIOWrapper docs:
- # On output, if newline is None, any '\n' characters written
- # are translated to the system default line separator.
- # By default sys.stdout.write() expects '\n' newlines and then
- # translates them so this is still cross-platform.
- if line[-1] == "\n":
- self._logger.log(
- self._log_level,
- line.rstrip(),
- stacklevel=self.get_stacklevel(),
- )
- else:
- self._linebuf += line
- def flush(self):
- if self._linebuf != "":
- self._logger.log(
- self._log_level,
- self._linebuf.rstrip(),
- stacklevel=self.get_stacklevel(),
- )
- self._linebuf = ""
- def isatty(self) -> bool:
- return True
- def redirected_print(*objects, sep=" ", end="\n", file=None, flush=False):
- """Implement python's print function to redirect logs to Serve's logger.
- If the file is set to anything other than stdout, stderr, or None, call the
- builtin print. Else, construct the message and redirect to Serve's logger.
- See https://docs.python.org/3/library/functions.html#print
- """
- if file not in [sys.stdout, sys.stderr, None]:
- return buildin_print(objects, sep=sep, end=end, file=file, flush=flush)
- serve_logger = logging.getLogger(SERVE_LOGGER_NAME)
- message = sep.join(map(str, objects)) + end
- # We monkey patched print function, so this is always at stack level 2.
- serve_logger.log(logging.INFO, message, stacklevel=2)
- def configure_component_logger(
- *,
- component_name: str,
- component_id: str,
- logging_config: LoggingConfig,
- component_type: Optional[ServeComponentType] = None,
- max_bytes: Optional[int] = None,
- backup_count: Optional[int] = None,
- stream_handler_only: bool = False,
- buffer_size: int = 1,
- ):
- """Configure a logger to be used by a Serve component.
- The logger will log using a standard format to make components identifiable
- using the provided name and unique ID for this instance (e.g., replica ID).
- This logger will *not* propagate its log messages to the parent logger(s).
- """
- logger = logging.getLogger(SERVE_LOGGER_NAME)
- logger.propagate = False
- logger.setLevel(logging_config.log_level)
- logger.handlers.clear()
- serve_formatter = ServeFormatter(component_name, component_id)
- json_formatter = JSONFormatter()
- if logging_config.additional_log_standard_attrs:
- json_formatter.set_additional_log_standard_attrs(
- logging_config.additional_log_standard_attrs
- )
- serve_formatter.set_additional_log_standard_attrs(
- logging_config.additional_log_standard_attrs
- )
- # Only add stream handler if RAY_SERVE_LOG_TO_STDERR is True or if
- # `stream_handler_only` is set to True.
- if RAY_SERVE_LOG_TO_STDERR or stream_handler_only:
- stream_handler = logging.StreamHandler()
- stream_handler.setFormatter(serve_formatter)
- stream_handler.addFilter(log_to_stderr_filter)
- stream_handler.addFilter(ServeContextFilter())
- logger.addHandler(stream_handler)
- # Skip setting up file handler and stdout/stderr redirect if `stream_handler_only`
- # is set to True. Logger such as default serve logger can be configured outside the
- # context of a Serve component, we don't want those logs to redirect into serve's
- # logger and log files.
- if stream_handler_only:
- return
- if logging_config.logs_dir:
- logs_dir = logging_config.logs_dir
- else:
- logs_dir = get_serve_logs_dir()
- os.makedirs(logs_dir, exist_ok=True)
- if max_bytes is None:
- max_bytes = ray._private.worker._global_node.max_bytes
- if backup_count is None:
- backup_count = ray._private.worker._global_node.backup_count
- log_file_name = get_component_file_name(
- component_name=component_name,
- component_id=component_id,
- component_type=component_type,
- suffix=".log",
- )
- file_handler = logging.handlers.RotatingFileHandler(
- os.path.join(logs_dir, log_file_name),
- maxBytes=max_bytes,
- backupCount=backup_count,
- )
- # Create a memory handler that buffers log records and flushes to file handler
- # Buffer capacity: buffer_size records
- # Flush triggers: buffer full, ERROR messages, or explicit flush
- memory_handler = logging.handlers.MemoryHandler(
- capacity=buffer_size,
- target=file_handler,
- flushLevel=logging.ERROR, # Auto-flush on ERROR/CRITICAL
- )
- # Add filters directly to the memory handler effective for both buffered and non buffered cases
- if logging_config.encoding == EncodingType.JSON:
- memory_handler.addFilter(ServeCoreContextFilter())
- memory_handler.addFilter(ServeContextFilter())
- memory_handler.addFilter(
- ServeComponentFilter(component_name, component_id, component_type)
- )
- file_handler.setFormatter(json_formatter)
- else:
- file_handler.setFormatter(serve_formatter)
- if logging_config.enable_access_log is False:
- memory_handler.addFilter(log_access_log_filter)
- else:
- memory_handler.addFilter(ServeContextFilter())
- # Remove unwanted attributes from the log record.
- memory_handler.addFilter(ServeLogAttributeRemovalFilter())
- # Redirect print, stdout, and stderr to Serve logger, only when it's on the replica.
- if not RAY_SERVE_LOG_TO_STDERR and component_type == ServeComponentType.REPLICA:
- builtins.print = redirected_print
- sys.stdout = StreamToLogger(logger, logging.INFO, sys.stdout)
- sys.stderr = StreamToLogger(logger, logging.INFO, sys.stderr)
- # Add the memory handler instead of the file handler directly
- logger.addHandler(memory_handler)
- # Configure a dedicated rotating file logger for autoscaling snapshots.
- def configure_autoscaling_snapshot_logger(
- *, component_id: str, logging_config: LoggingConfig
- ) -> logging.Logger:
- """Configure a dedicated logger for autoscaling snapshots.
- - Writes to `autoscaling_snapshot_<pid>.log` under the Serve logs dir.
- """
- logger_obj = logging.getLogger(f"{SERVE_LOGGER_NAME}.snapshot")
- logger_obj.propagate = False
- logger_obj.setLevel(logging_config.log_level)
- logger_obj.handlers.clear()
- logs_dir = logging_config.logs_dir or get_serve_logs_dir()
- os.makedirs(logs_dir, exist_ok=True)
- max_bytes = ray._private.worker._global_node.max_bytes
- backup_count = ray._private.worker._global_node.backup_count
- file_name = get_component_file_name(
- component_name="autoscaling_snapshot",
- component_id=component_id,
- component_type=None,
- suffix=".log",
- )
- file_path = os.path.join(logs_dir, file_name)
- handler = logging.handlers.RotatingFileHandler(
- file_path, maxBytes=max_bytes, backupCount=backup_count
- )
- handler.setFormatter(JSONFormatter())
- logger_obj.addHandler(handler)
- return logger_obj
- def configure_default_serve_logger():
- """Helper function to configure the default Serve logger that's used outside of
- individual Serve components."""
- configure_component_logger(
- component_name="serve",
- component_id=str(os.getpid()),
- logging_config=LoggingConfig(),
- max_bytes=LOGGING_ROTATE_BYTES,
- backup_count=LOGGING_ROTATE_BACKUP_COUNT,
- stream_handler_only=True,
- )
- def configure_component_memory_profiler(
- component_name: str,
- component_id: str,
- component_type: Optional[ServeComponentType] = None,
- ):
- """Configures the memory logger for this component.
- Does nothing if RAY_SERVE_ENABLE_MEMORY_PROFILING is disabled.
- """
- if RAY_SERVE_ENABLE_MEMORY_PROFILING:
- logger = logging.getLogger(SERVE_LOGGER_NAME)
- try:
- import memray
- logs_dir = get_serve_logs_dir()
- memray_file_name = get_component_file_name(
- component_name=component_name,
- component_id=component_id,
- component_type=component_type,
- suffix="_memray_0.bin",
- )
- memray_file_path = os.path.join(logs_dir, memray_file_name)
- # If the actor restarted, memray requires a new file to start
- # tracking memory.
- restart_counter = 1
- while os.path.exists(memray_file_path):
- memray_file_name = get_component_file_name(
- component_name=component_name,
- component_id=component_id,
- component_type=component_type,
- suffix=f"_memray_{restart_counter}.bin",
- )
- memray_file_path = os.path.join(logs_dir, memray_file_name)
- restart_counter += 1
- # Memray usually tracks the memory usage of only a block of code
- # within a context manager. We explicitly call __enter__ here
- # instead of using a context manager to track memory usage across
- # all of the caller's code instead.
- memray.Tracker(memray_file_path, native_traces=True).__enter__()
- logger.info(
- "RAY_SERVE_ENABLE_MEMORY_PROFILING is enabled. Started a "
- "memray tracker on this actor. Tracker file located at "
- f'"{memray_file_path}"'
- )
- except ImportError:
- logger.warning(
- "RAY_SERVE_ENABLE_MEMORY_PROFILING is enabled, but memray "
- "is not installed. No memory profiling is happening. "
- "`pip install memray` to enable memory profiling."
- )
- def get_serve_logs_dir() -> str:
- """Get the directory that stores Serve log files.
- If `ray._private.worker._global_node` is None (running outside the context of Ray),
- then the current working directory with subdirectory of serve is used as the logs
- directory. Otherwise, the logs directory is determined by the global node's logs
- directory path.
- """
- if ray._private.worker._global_node is None:
- return os.path.join(os.getcwd(), "serve")
- return os.path.join(ray._private.worker._global_node.get_logs_dir_path(), "serve")
- class LoggingContext:
- """
- Context manager to manage logging behaviors within a particular block, such as:
- 1) Overriding logging level
- Source (python3 official documentation)
- https://docs.python.org/3/howto/logging-cookbook.html#using-a-context-manager-for-selective-logging # noqa: E501
- """
- def __init__(self, logger, level=None):
- self.logger = logger
- self.level = level
- def __enter__(self):
- if self.level is not None:
- self.old_level = self.logger.level
- self.logger.setLevel(self.level)
- def __exit__(self, et, ev, tb):
- if self.level is not None:
- self.logger.setLevel(self.old_level)
|