api.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. import traceback as tb
  2. from typing import Any
  3. WRAPPED_EXCEPTION = tuple[BaseException, tb.StackSummary]
  4. __all__ = ["CheckpointException"]
  5. def _wrap_exception(exc: BaseException) -> WRAPPED_EXCEPTION:
  6. return (exc, tb.extract_tb(exc.__traceback__))
  7. def _is_wrapped_exception(obj: Any) -> bool:
  8. if not isinstance(obj, tuple):
  9. return False
  10. if len(obj) != 2:
  11. return False
  12. return isinstance(obj[0], BaseException) and isinstance(obj[1], tb.StackSummary)
  13. class CheckpointException(BaseException):
  14. """Exception raised if failure was detected as part of a checkpoint load or save."""
  15. def __init__(self, msg: str, failures: dict[int, WRAPPED_EXCEPTION]):
  16. super().__init__(msg, failures)
  17. self._failures = failures
  18. @property
  19. def failures(self) -> dict[int, WRAPPED_EXCEPTION]:
  20. """Return a dictionary mapping node ranks to their associated exceptions in case of failure."""
  21. return self._failures
  22. def __str__(self) -> str:
  23. str = f"CheckpointException ranks:{self._failures.keys()}\n"
  24. for rank, exc_pair in self._failures.items():
  25. exc, trace = exc_pair
  26. str += f"Traceback (most recent call last): (RANK {rank})\n"
  27. if trace is not None:
  28. str += "".join(tb.format_list(trace))
  29. str += "".join(tb.format_exception_only(type(exc), value=exc))
  30. return str