| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- import io
- import json
- import logging
- import os
- from dataclasses import dataclass
- from pathlib import Path
- from typing import Any, Dict, List, Optional, Tuple, Union
- import pandas as pd
- import pyarrow
- import ray
- from ray._private.dict import unflattened_lookup
- from ray.air.constants import (
- EXPR_ERROR_PICKLE_FILE,
- EXPR_PROGRESS_FILE,
- EXPR_RESULT_FILE,
- )
- from ray.util.annotations import PublicAPI
- logger = logging.getLogger(__name__)
- @dataclass
- @PublicAPI(stability="stable")
- class Result:
- """The final result of a ML training run or a Tune trial.
- This is the output produced by ``Trainer.fit``.
- ``Tuner.fit`` outputs a :class:`~ray.tune.ResultGrid` that is a collection
- of ``Result`` objects.
- This API is the recommended way to access the outputs such as:
- - checkpoints (``Result.checkpoint``)
- - the history of reported metrics (``Result.metrics_dataframe``, ``Result.metrics``)
- - errors encountered during a training run (``Result.error``)
- The constructor is a private API -- use ``Result.from_path`` to create a result
- object from a directory.
- Attributes:
- metrics: The latest set of reported metrics.
- checkpoint: The latest checkpoint.
- error: The execution error of the Trainable run, if the trial finishes in error.
- path: Path pointing to the result directory on persistent storage. This can
- point to a remote storage location (e.g. S3) or to a local location (path
- on the head node). The path is accessible via the result's associated
- `filesystem`. For instance, for a result stored in S3 at
- ``s3://bucket/location``, ``path`` will have the value ``bucket/location``.
- metrics_dataframe: The full result dataframe of the Trainable.
- The dataframe is indexed by iterations and contains reported
- metrics. Note that the dataframe columns are indexed with the
- *flattened* keys of reported metrics, so the format of this dataframe
- may be slightly different than ``Result.metrics``, which is an unflattened
- dict of the latest set of reported metrics.
- best_checkpoints: A list of tuples of the best checkpoints and
- their associated metrics. The number of
- saved checkpoints is determined by :class:`~ray.train.CheckpointConfig`
- (by default, all checkpoints will be saved).
- """
- metrics: Optional[Dict[str, Any]]
- checkpoint: Optional["ray.tune.Checkpoint"]
- error: Optional[Exception]
- path: str
- metrics_dataframe: Optional["pd.DataFrame"] = None
- best_checkpoints: Optional[
- List[Tuple["ray.tune.Checkpoint", Dict[str, Any]]]
- ] = None
- _storage_filesystem: Optional[pyarrow.fs.FileSystem] = None
- _items_to_repr = ["error", "metrics", "path", "filesystem", "checkpoint"]
- @property
- def config(self) -> Optional[Dict[str, Any]]:
- """The config associated with the result."""
- if not self.metrics:
- return None
- return self.metrics.get("config", None)
- @property
- def filesystem(self) -> pyarrow.fs.FileSystem:
- """Return the filesystem that can be used to access the result path.
- Returns:
- pyarrow.fs.FileSystem implementation.
- """
- return self._storage_filesystem or pyarrow.fs.LocalFileSystem()
- def _repr(self, indent: int = 0) -> str:
- """Construct the representation with specified number of space indent."""
- from ray.tune.experimental.output import BLACKLISTED_KEYS
- from ray.tune.result import AUTO_RESULT_KEYS
- shown_attributes = {k: getattr(self, k) for k in self._items_to_repr}
- if self.error:
- shown_attributes["error"] = type(self.error).__name__
- else:
- shown_attributes.pop("error")
- shown_attributes["filesystem"] = shown_attributes["filesystem"].type_name
- if self.metrics:
- exclude = set(AUTO_RESULT_KEYS)
- exclude.update(BLACKLISTED_KEYS)
- shown_attributes["metrics"] = {
- k: v for k, v in self.metrics.items() if k not in exclude
- }
- cls_indent = " " * indent
- kws_indent = " " * (indent + 2)
- kws = [
- f"{kws_indent}{key}={value!r}" for key, value in shown_attributes.items()
- ]
- kws_repr = ",\n".join(kws)
- return "{0}{1}(\n{2}\n{0})".format(cls_indent, type(self).__name__, kws_repr)
- def __repr__(self) -> str:
- return self._repr(indent=0)
- @staticmethod
- def _read_file_as_str(
- storage_filesystem: pyarrow.fs.FileSystem,
- storage_path: str,
- ) -> str:
- """Opens a file as an input stream reading all byte content sequentially and
- decoding read bytes as utf-8 string.
- Args:
- storage_filesystem: The filesystem to use.
- storage_path: The source to open for reading.
- """
- with storage_filesystem.open_input_stream(storage_path) as f:
- return f.readall().decode()
- @classmethod
- def from_path(
- cls,
- path: Union[str, os.PathLike],
- storage_filesystem: Optional[pyarrow.fs.FileSystem] = None,
- ) -> "Result":
- """Restore a Result object from local or remote trial directory.
- Args:
- path: A path of a trial directory on local or remote storage
- (ex: s3://bucket/path or /tmp/ray_results).
- storage_filesystem: A custom filesystem to use. If not provided,
- this will be auto-resolved by pyarrow. If provided, the path
- is assumed to be prefix-stripped already, and must be a valid path
- on the filesystem.
- Returns:
- A :py:class:`Result` object of that trial.
- """
- # TODO(justinvyu): Fix circular dependency.
- from ray.train import Checkpoint
- from ray.train._internal.storage import (
- _exists_at_fs_path,
- _list_at_fs_path,
- get_fs_and_path,
- )
- from ray.train.constants import CHECKPOINT_DIR_NAME
- fs, fs_path = get_fs_and_path(path, storage_filesystem)
- if not _exists_at_fs_path(fs, fs_path):
- raise RuntimeError(f"Trial folder {fs_path} doesn't exist!")
- # Restore metrics from result.json
- result_json_file = Path(fs_path, EXPR_RESULT_FILE).as_posix()
- progress_csv_file = Path(fs_path, EXPR_PROGRESS_FILE).as_posix()
- if _exists_at_fs_path(fs, result_json_file):
- lines = cls._read_file_as_str(fs, result_json_file).split("\n")
- json_list = [json.loads(line) for line in lines if line]
- metrics_df = pd.json_normalize(json_list, sep="/")
- latest_metrics = json_list[-1] if json_list else {}
- # Fallback to restore from progress.csv
- elif _exists_at_fs_path(fs, progress_csv_file):
- metrics_df = pd.read_csv(
- io.StringIO(cls._read_file_as_str(fs, progress_csv_file))
- )
- latest_metrics = (
- metrics_df.iloc[-1].to_dict() if not metrics_df.empty else {}
- )
- else:
- raise RuntimeError(
- f"Failed to restore the Result object: Neither {EXPR_RESULT_FILE}"
- f" nor {EXPR_PROGRESS_FILE} exists in the trial folder!"
- )
- # Restore all checkpoints from the checkpoint folders
- checkpoint_dir_names = sorted(
- _list_at_fs_path(
- fs,
- fs_path,
- file_filter=lambda file_info: file_info.type
- == pyarrow.fs.FileType.Directory
- and file_info.base_name.startswith("checkpoint_"),
- )
- )
- if checkpoint_dir_names:
- checkpoints = [
- Checkpoint(
- path=Path(fs_path, checkpoint_dir_name).as_posix(), filesystem=fs
- )
- for checkpoint_dir_name in checkpoint_dir_names
- ]
- metrics = []
- for checkpoint_dir_name in checkpoint_dir_names:
- metrics_corresponding_to_checkpoint = metrics_df[
- metrics_df[CHECKPOINT_DIR_NAME] == checkpoint_dir_name
- ]
- if metrics_corresponding_to_checkpoint.empty:
- logger.warning(
- "Could not find metrics corresponding to "
- f"{checkpoint_dir_name}. These will default to an empty dict."
- )
- metrics.append(
- {}
- if metrics_corresponding_to_checkpoint.empty
- else metrics_corresponding_to_checkpoint.iloc[-1].to_dict()
- )
- latest_checkpoint = checkpoints[-1]
- # TODO(justinvyu): These are ordered by checkpoint index, since we don't
- # know the metric to order these with.
- best_checkpoints = list(zip(checkpoints, metrics))
- else:
- best_checkpoints = latest_checkpoint = None
- # Restore the trial error if it exists
- error = None
- error_file_path = Path(fs_path, EXPR_ERROR_PICKLE_FILE).as_posix()
- if _exists_at_fs_path(fs, error_file_path):
- with fs.open_input_stream(error_file_path) as f:
- error = ray.cloudpickle.load(f)
- return Result(
- metrics=latest_metrics,
- checkpoint=latest_checkpoint,
- path=fs_path,
- _storage_filesystem=fs,
- metrics_dataframe=metrics_df,
- best_checkpoints=best_checkpoints,
- error=error,
- )
- @PublicAPI(stability="alpha")
- def get_best_checkpoint(
- self, metric: str, mode: str
- ) -> Optional["ray.tune.Checkpoint"]:
- """Get the best checkpoint from this trial based on a specific metric.
- Any checkpoints without an associated metric value will be filtered out.
- Args:
- metric: The key for checkpoints to order on.
- mode: One of ["min", "max"].
- Returns:
- :class:`Checkpoint <ray.train.Checkpoint>` object, or None if there is
- no valid checkpoint associated with the metric.
- """
- if not self.best_checkpoints:
- raise RuntimeError("No checkpoint exists in the trial directory!")
- if mode not in ["max", "min"]:
- raise ValueError(
- f'Unsupported mode: {mode}. Please choose from ["min", "max"]!'
- )
- op = max if mode == "max" else min
- valid_checkpoints = [
- ckpt_info
- for ckpt_info in self.best_checkpoints
- if unflattened_lookup(metric, ckpt_info[1], default=None) is not None
- ]
- if not valid_checkpoints:
- raise RuntimeError(
- f"Invalid metric name {metric}! "
- f"You may choose from the following metrics: {self.metrics.keys()}."
- )
- return op(valid_checkpoints, key=lambda x: unflattened_lookup(metric, x[1]))[0]
|