| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- import os
- from typing import List, Optional
- from ray.train.v2._internal.constants import (
- COLLECTIVE_TIMEOUT_S_ENV_VAR,
- DEFAULT_WORKER_GROUP_START_TIMEOUT_S,
- DEFAULT_WORKER_HEALTH_CHECK_TIMEOUT_S,
- WORKER_GROUP_START_TIMEOUT_S_ENV_VAR,
- WORKER_HEALTH_CHECK_TIMEOUT_S_ENV_VAR,
- )
- # TODO: Distinguish between user and system exceptions.
- class RayTrainError(Exception):
- """Base class for all Ray Train exceptions."""
- class WorkerHealthCheckTimeoutError(RayTrainError):
- """Exception raised when a worker health check hangs for long enough."""
- def __init__(self, message):
- timeout = os.getenv(
- WORKER_HEALTH_CHECK_TIMEOUT_S_ENV_VAR, DEFAULT_WORKER_HEALTH_CHECK_TIMEOUT_S
- )
- message += (
- f"\nSet the {WORKER_HEALTH_CHECK_TIMEOUT_S_ENV_VAR} "
- "environment variable to increase the timeout "
- f"(current value: {timeout} seconds)."
- )
- super().__init__(message)
- class WorkerHealthCheckFailedError(RayTrainError):
- """Exception raised when a worker health check fails."""
- def __init__(self, message, failure: Exception):
- super().__init__(message)
- self._message = message
- self.health_check_failure = failure
- def __reduce__(self):
- return (self.__class__, (self._message, self.health_check_failure))
- def __str__(self):
- return self._message + "\n" + str(self.health_check_failure)
- class WorkerGroupStartupTimeoutError(RayTrainError):
- """Exception raised when the worker group startup times out.
- Example scenario: 4 GPUs are detected in the cluster, but when the worker
- are actually scheduled, one of the nodes goes down and only 3 GPUs are
- available. One of the worker tasks may be stuck pending, until a timeout is reached.
- """
- def __init__(self, num_workers: int):
- timeout = float(
- os.environ.get(
- WORKER_GROUP_START_TIMEOUT_S_ENV_VAR,
- DEFAULT_WORKER_GROUP_START_TIMEOUT_S,
- )
- )
- self.num_workers = num_workers
- super().__init__(
- f"The worker group startup timed out after {timeout} seconds waiting "
- f"for {num_workers} workers. "
- "Potential causes include: "
- "(1) temporary insufficient cluster resources while waiting for "
- "autoscaling (ignore this warning in this case), "
- "(2) infeasible resource request where the provided `ScalingConfig` "
- "cannot be satisfied), "
- "and (3) transient network issues. "
- f"Set the {WORKER_GROUP_START_TIMEOUT_S_ENV_VAR} "
- "environment variable to increase the timeout."
- )
- def __reduce__(self):
- return (self.__class__, (self.num_workers,))
- class WorkerGroupStartupFailedError(RayTrainError):
- """Exception raised when the worker group fails to start.
- Example scenario: A worker is scheduled onto a node that dies while
- the worker actor is initializing.
- """
- class InsufficientClusterResourcesError(RayTrainError):
- """Exception raised when the cluster has insufficient resources.
- Example scenario: A worker that requires 1 GPU is scheduled onto a cluster
- that only has CPU worker node types.
- """
- class CheckpointManagerInitializationError(RayTrainError):
- """Exception raised when the checkpoint manager fails to initialize from a snapshot.
- Example scenarios:
- 1. The checkpoint manager snapshot version is old and
- incompatible with the current version of Ray Train.
- 2. The checkpoint manager snapshot JSON file is corrupted.
- 3. The checkpoint manager snapshot references checkpoints that cannot be found
- in the run storage path.
- """
- class CollectiveTimeoutError(RayTrainError):
- """Exception raised when an internal Ray Train collective operation of
- the worker group times out.
- """
- class BroadcastCollectiveTimeoutError(CollectiveTimeoutError):
- """Exception raised when the broadcast operation times out.
- There are two main timeout examples:
- 1. If not all workers call `ray.train.report`, the entire worker group will
- hang until the timeout before raising. This prevents indefinite worker
- group hangs.
- 2. If a worker is slow in the training loop and fails to reach the broadcast
- time, the collective will time out.
- """
- def __init__(
- self, time_elapsed: Optional[float], missing_ranks: List[int], timeout_s: float
- ):
- self._time_elapsed = time_elapsed
- self._missing_ranks = missing_ranks
- self._timeout_s = timeout_s
- message = (
- f"The collective operation timed out after {time_elapsed:.2f} seconds. "
- f"The following ranks have not joined the collective operation: {missing_ranks}\n"
- f"You can set the timeout with the {COLLECTIVE_TIMEOUT_S_ENV_VAR} "
- f"environment variable (current value: {timeout_s:.2f} seconds). "
- "Disable the timeout by setting the environment variable to -1."
- )
- super().__init__(message)
- def __reduce__(self):
- return (
- self.__class__,
- (self._time_elapsed, self._missing_ranks, self._timeout_s),
- )
- class UserExceptionWithTraceback(RayTrainError):
- """This class wraps a user code exception raised on the worker
- with its original traceback string, for logging and debugging purposes.
- This is needed because the original exception traceback is not serialized
- with the exception when it is *returned* back to the main process.
- """
- def __init__(self, exc: BaseException, traceback_str: str):
- self._base_exc = exc
- self._traceback_str = traceback_str
- def __reduce__(self):
- return (self.__class__, (self._base_exc, self._traceback_str))
- def __str__(self):
- return self._traceback_str
|