ray_train.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. from typing import Any, Dict, List, Optional
  2. from ray.train import Checkpoint as RayTrainCheckpoint
  3. from ray.train._internal.session import get_session
  4. from ray.train.v2._internal.execution.context import TrainRunContext
  5. from ray.train.v2.api.callback import UserCallback
  6. from ray.tune.trainable.trainable_fn_utils import _in_tune_session
  7. from ray.util.annotations import DeveloperAPI
  8. CHECKPOINT_PATH_KEY = "checkpoint_path"
  9. @DeveloperAPI
  10. class TuneReportCallback(UserCallback):
  11. """Propagate metrics and checkpoint paths from Ray Train workers to Ray Tune."""
  12. def __init__(self):
  13. if not _in_tune_session():
  14. raise RuntimeError("TuneReportCallback must be used in a Tune session.")
  15. self._training_actor_item_queue = (
  16. get_session()._get_or_create_inter_actor_queue()
  17. )
  18. def after_report(
  19. self,
  20. run_context: TrainRunContext,
  21. metrics: List[Dict[str, Any]],
  22. checkpoint: Optional[RayTrainCheckpoint],
  23. ):
  24. # TODO: This can be changed to aggregate the metrics from all workers.
  25. # For now, just achieve feature parity with the old Tune+Train integration.
  26. metrics = metrics[0].copy()
  27. # If a checkpoint is provided, add the checkpoint path to the metrics.
  28. # Don't report the checkpoint again since it's already been uploaded
  29. # to storage.
  30. if checkpoint:
  31. metrics[CHECKPOINT_PATH_KEY] = checkpoint.path
  32. self._training_actor_item_queue.put(metrics)