logging.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. import logging
  2. import logging.config
  3. import os
  4. from typing import List, Optional
  5. import yaml
  6. import ray
  7. DEFAULT_TEXT_FORMATTER = (
  8. "%(asctime)s\t%(levelname)s %(filename)s:%(lineno)s -- %(message)s" # noqa: E501
  9. )
  10. DEFAULT_JSON_FORMATTER = ray._common.formatters.JSONFormatter
  11. DEFAULT_CONFIG = {
  12. "version": 1,
  13. "disable_existing_loggers": False,
  14. "formatters": {
  15. "ray": {"format": DEFAULT_TEXT_FORMATTER},
  16. "ray_json": {
  17. "class": f"{DEFAULT_JSON_FORMATTER.__module__}.{DEFAULT_JSON_FORMATTER.__name__}"
  18. },
  19. },
  20. "filters": {
  21. "console_filter": {"()": "ray.data._internal.logging.HiddenRecordFilter"},
  22. "core_context_filter": {"()": "ray._common.filters.CoreContextFilter"},
  23. },
  24. "handlers": {
  25. "file": {
  26. "class": "ray.data._internal.logging.SessionFileHandler",
  27. "formatter": "ray",
  28. "filename": "ray-data.log",
  29. },
  30. "file_json": {
  31. "class": "ray.data._internal.logging.SessionFileHandler",
  32. "formatter": "ray_json",
  33. "filename": "ray-data.log",
  34. "filters": ["core_context_filter"],
  35. },
  36. "console": {
  37. "class": "ray._private.log.PlainRayHandler",
  38. "formatter": "ray",
  39. "level": "INFO",
  40. "filters": ["console_filter"],
  41. },
  42. },
  43. "loggers": {
  44. "ray.data": {
  45. "level": "DEBUG",
  46. "handlers": ["file", "console"],
  47. "propagate": False,
  48. },
  49. },
  50. }
  51. # Dictionary of substitutions to be performed when using JSON mode. Handlers with names
  52. # corresponding to keys will be replaced by those corresponding to values.
  53. RAY_DATA_LOG_HANDLER_JSON_SUBSTITUTIONS = {"file": "file_json"}
  54. # Env. variable to specify the encoding of the file logs when using the default config.
  55. RAY_DATA_LOG_ENCODING_ENV_VAR_NAME = "RAY_DATA_LOG_ENCODING"
  56. # Env. variable to specify the logging config path use defaults if not set
  57. RAY_DATA_LOGGING_CONFIG_ENV_VAR_NAME = "RAY_DATA_LOGGING_CONFIG"
  58. _DATASET_LOGGER_HANDLER = {}
  59. _ACTIVE_DATASET = None
  60. # To facilitate debugging, Ray Data writes debug logs to a file. However, if Ray Data
  61. # logs every scheduler loop, logging might impact performance. So, we add a "TRACE"
  62. # level where logs aren't written by default.
  63. #
  64. # Use the following code to log a message at the "TRACE" level:
  65. # ```
  66. # logger.log(logging.getLevelName("TRACE"), "Your message here.")
  67. # ````
  68. logging.addLevelName(logging.DEBUG - 1, "TRACE")
  69. class HiddenRecordFilter:
  70. """Filters out log records with the "hide" attribute set to True.
  71. This filter allows you to override default logging behavior. For example, if errors
  72. are printed by default, and you don't want to print a specific error, you can set
  73. the "hide" attribute to avoid printing the message.
  74. .. testcode::
  75. import logging
  76. logger = logging.getLogger("ray.data.spam")
  77. # This warning won't be printed to the console.
  78. logger.warning("ham", extra={"hide": True})
  79. """
  80. def filter(self, record):
  81. return not getattr(record, "hide", False)
  82. class SessionFileHandler(logging.Handler):
  83. """A handler that writes to a log file in the Ray session directory.
  84. The Ray session directory isn't available until Ray is initialized, so this handler
  85. lazily creates the file handler when you emit a log record.
  86. Args:
  87. filename: The name of the log file. The file is created in the 'logs' directory
  88. of the Ray session directory.
  89. """
  90. def __init__(self, filename: str):
  91. super().__init__()
  92. self._filename = filename
  93. self._handler = None
  94. self._formatter = None
  95. self._path = None
  96. def emit(self, record):
  97. if self._handler is None:
  98. self._try_create_handler()
  99. if self._handler is not None:
  100. self._handler.emit(record)
  101. def setFormatter(self, fmt: logging.Formatter) -> None:
  102. if self._handler is not None:
  103. self._handler.setFormatter(fmt)
  104. self._formatter = fmt
  105. def _try_create_handler(self):
  106. assert self._handler is None
  107. log_directory = get_log_directory()
  108. if log_directory is None:
  109. return
  110. os.makedirs(log_directory, exist_ok=True)
  111. self._path = os.path.join(log_directory, self._filename)
  112. self._handler = logging.FileHandler(self._path)
  113. if self._formatter is not None:
  114. self._handler.setFormatter(self._formatter)
  115. def _get_logging_config() -> Optional[dict]:
  116. def _load_logging_config(config_path: str):
  117. with open(config_path) as file:
  118. config = yaml.safe_load(file)
  119. return config
  120. # Dynamically load env vars
  121. config_path = os.environ.get(RAY_DATA_LOGGING_CONFIG_ENV_VAR_NAME)
  122. log_encoding = os.environ.get(RAY_DATA_LOG_ENCODING_ENV_VAR_NAME)
  123. if config_path is not None:
  124. config = _load_logging_config(config_path)
  125. else:
  126. config = DEFAULT_CONFIG
  127. if log_encoding is not None and log_encoding.upper() == "JSON":
  128. for logger in config["loggers"].values():
  129. for (
  130. old_handler_name,
  131. new_handler_name,
  132. ) in RAY_DATA_LOG_HANDLER_JSON_SUBSTITUTIONS.items():
  133. logger["handlers"].remove(old_handler_name)
  134. logger["handlers"].append(new_handler_name)
  135. return config
  136. def _get_logger_names() -> List[str]:
  137. logger_config = _get_logging_config().get("loggers", {})
  138. return list(logger_config.keys())
  139. def configure_logging() -> None:
  140. """Configure the Python logger named 'ray.data'.
  141. This function loads the configuration YAML specified by "RAY_DATA_LOGGING_CONFIG"
  142. environment variable. If the variable isn't set, this function loads the default
  143. "logging.yaml" file that is adjacent to this module.
  144. If "RAY_DATA_LOG_ENCODING" is specified as "JSON" we will enable JSON logging mode
  145. if using the default logging config.
  146. """
  147. config = _get_logging_config()
  148. # Create formatters, filters, and handlers from config
  149. formatters = _create_formatters(config)
  150. filters = _create_filters(config)
  151. handlers = _create_handlers(config, formatters, filters)
  152. # Configure each logger defined in the config
  153. _configure_loggers(config, handlers)
  154. # Warn if both env vars are set (incompatible)
  155. _warn_if_incompatible_env_vars()
  156. def _import_class(class_path: str):
  157. """Dynamically import a class from a fully qualified path."""
  158. import importlib
  159. if "." not in class_path:
  160. raise ValueError(f"Invalid class path: {class_path}")
  161. module_name, class_name = class_path.rsplit(".", 1)
  162. module = importlib.import_module(module_name)
  163. return getattr(module, class_name)
  164. def _create_formatters(config: dict) -> dict:
  165. """Create formatter instances from config."""
  166. formatters = {}
  167. for name, fmt_config in config.get("formatters", {}).items():
  168. if "class" in fmt_config:
  169. formatter_class = _import_class(fmt_config["class"])
  170. formatters[name] = formatter_class()
  171. elif "format" in fmt_config:
  172. formatters[name] = logging.Formatter(fmt_config["format"])
  173. return formatters
  174. def _create_filters(config: dict) -> dict:
  175. """Create filter instances from config."""
  176. filters = {}
  177. for name, filter_config in config.get("filters", {}).items():
  178. # https://docs.python.org/3/library/logging.config.html#dictionary-schema-details
  179. if "()" in filter_config:
  180. filter_class = _import_class(filter_config["()"])
  181. filters[name] = filter_class()
  182. return filters
  183. def _create_handlers(config: dict, formatters: dict, filters: dict) -> dict:
  184. """Create and configure handler instances from config."""
  185. handlers = {}
  186. # Keys that are not passed to handler constructor
  187. HANDLER_CONFIG_KEYS = {"class", "level", "formatter", "filters"}
  188. for name, handler_config in config.get("handlers", {}).items():
  189. # Instantiate handler with all keys except config-only keys
  190. handler_class = _import_class(handler_config["class"])
  191. handler_kwargs = {
  192. k: v for k, v in handler_config.items() if k not in HANDLER_CONFIG_KEYS
  193. }
  194. handler = handler_class(**handler_kwargs)
  195. handler.name = name
  196. # Configure handler
  197. if "level" in handler_config:
  198. handler.setLevel(handler_config["level"])
  199. if "formatter" in handler_config:
  200. formatter = formatters.get(handler_config["formatter"])
  201. if formatter:
  202. handler.setFormatter(formatter)
  203. for filter_name in handler_config.get("filters", []):
  204. filter_obj = filters.get(filter_name)
  205. if filter_obj:
  206. handler.addFilter(filter_obj)
  207. handlers[name] = handler
  208. return handlers
  209. def _configure_loggers(config: dict, handlers: dict) -> None:
  210. """Configure logger instances from config."""
  211. for logger_name, logger_config in config.get("loggers", {}).items():
  212. logger = logging.getLogger(logger_name)
  213. logger.setLevel(logger_config.get("level", logging.NOTSET))
  214. # Clear existing handlers
  215. for handler in logger.handlers[:]:
  216. logger.removeHandler(handler)
  217. # Add configured handlers
  218. for handler_name in logger_config.get("handlers", []):
  219. handler = handlers.get(handler_name)
  220. if handler:
  221. logger.addHandler(handler)
  222. logger.propagate = logger_config.get("propagate", True)
  223. def _warn_if_incompatible_env_vars() -> None:
  224. """Warn if both RAY_DATA_LOGGING_CONFIG and RAY_DATA_LOG_ENCODING are set."""
  225. config_path = os.environ.get(RAY_DATA_LOGGING_CONFIG_ENV_VAR_NAME)
  226. log_encoding = os.environ.get(RAY_DATA_LOG_ENCODING_ENV_VAR_NAME)
  227. # After configuring logger, warn if RAY_DATA_LOGGING_CONFIG is used with
  228. # RAY_DATA_LOG_ENCODING, because they are not both supported together.
  229. if config_path is not None and log_encoding is not None:
  230. logger = logging.getLogger(__name__)
  231. logger.warning(
  232. "Using `RAY_DATA_LOG_ENCODING` is not supported with "
  233. + "`RAY_DATA_LOGGING_CONFIG`"
  234. )
  235. def reset_logging() -> None:
  236. """Reset the logger named 'ray.data' to its initial state.
  237. Used for testing.
  238. """
  239. global _DATASET_LOGGER_HANDLER
  240. global _ACTIVE_DATASET
  241. logger = logging.getLogger("ray.data")
  242. logger.handlers.clear()
  243. logger.setLevel(logging.NOTSET)
  244. _DATASET_LOGGER_HANDLER = {}
  245. _ACTIVE_DATASET = None
  246. def get_log_directory() -> Optional[str]:
  247. """Return the directory where Ray Data writes log files.
  248. If Ray isn't initialized, this function returns ``None``.
  249. """
  250. global_node = ray._private.worker._global_node
  251. if global_node is None:
  252. return None
  253. session_dir = global_node.get_session_dir_path()
  254. return os.path.join(session_dir, "logs", "ray-data")
  255. def _get_default_formatter() -> logging.Formatter:
  256. log_encoding = os.environ.get(RAY_DATA_LOG_ENCODING_ENV_VAR_NAME)
  257. if log_encoding is not None and log_encoding.upper() == "JSON":
  258. return DEFAULT_JSON_FORMATTER()
  259. return logging.Formatter(DEFAULT_TEXT_FORMATTER)
  260. def _create_dataset_log_handler(dataset_id: str) -> SessionFileHandler:
  261. """Create a log handler for a dataset with the given ID.
  262. Args:
  263. dataset_id: The ID of the dataset.
  264. Returns:
  265. A log handler for the dataset.
  266. """
  267. handler = SessionFileHandler(filename=f"ray-data-{dataset_id}.log")
  268. handler.setFormatter(_get_default_formatter())
  269. return handler
  270. def update_dataset_logger_for_worker(dataset_id: Optional[str]) -> None:
  271. """Create a log handler for a dataset with the given ID. Switch the dataset logger
  272. for the worker to this dataset logger. Note that only the driver keeps track of the
  273. active dataset. The worker will just use the handler that the driver tells it to use.
  274. Args:
  275. dataset_id: The ID of the dataset.
  276. """
  277. if not dataset_id:
  278. return
  279. configure_logging()
  280. log_handler = _create_dataset_log_handler(dataset_id)
  281. loggers = [logging.getLogger(name) for name in _get_logger_names()]
  282. for logger in loggers:
  283. logger.addHandler(log_handler)
  284. def register_dataset_logger(dataset_id: str) -> Optional[int]:
  285. """Create a log handler for a dataset with the given ID. Activate the handler if
  286. this is the only active dataset. Otherwise, print a warning to that handler and
  287. keep it inactive until it becomes the only active dataset.
  288. Args:
  289. dataset_id: The ID of the dataset.
  290. """
  291. global _DATASET_LOGGER_HANDLER
  292. global _ACTIVE_DATASET
  293. loggers = [logging.getLogger(name) for name in _get_logger_names()]
  294. log_handler = _create_dataset_log_handler(dataset_id)
  295. # The per-dataset log will always have the full context about its registration,
  296. # regardless of whether it is active or inactive.
  297. local_logger = logging.getLogger(__name__)
  298. local_logger.addHandler(log_handler)
  299. local_logger.info("Registered dataset logger for dataset %s", dataset_id)
  300. _DATASET_LOGGER_HANDLER[dataset_id] = log_handler
  301. if not _ACTIVE_DATASET:
  302. _ACTIVE_DATASET = dataset_id
  303. for logger in loggers:
  304. logger.addHandler(log_handler)
  305. else:
  306. local_logger.info(
  307. f"{dataset_id} registers for logging while another dataset "
  308. f"{_ACTIVE_DATASET} is also logging. For performance reasons, we will not "
  309. f"log to the dataset {dataset_id} until it is the only active dataset."
  310. )
  311. local_logger.removeHandler(log_handler)
  312. return _ACTIVE_DATASET
  313. def unregister_dataset_logger(dataset_id: str) -> Optional[int]:
  314. """Remove the logger for a dataset with the given ID.
  315. Args:
  316. dataset_id: The ID of the dataset.
  317. """
  318. global _DATASET_LOGGER_HANDLER
  319. global _ACTIVE_DATASET
  320. loggers = [logging.getLogger(name) for name in _get_logger_names()]
  321. log_handler = _DATASET_LOGGER_HANDLER.pop(dataset_id, None)
  322. if _ACTIVE_DATASET == dataset_id:
  323. _ACTIVE_DATASET = None
  324. if _DATASET_LOGGER_HANDLER:
  325. # If there are still active dataset loggers, activate the first one.
  326. register_dataset_logger(next(iter(_DATASET_LOGGER_HANDLER.keys())))
  327. if log_handler:
  328. for logger in loggers:
  329. logger.removeHandler(log_handler)
  330. log_handler.close()
  331. return _ACTIVE_DATASET