| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108 |
- import logging
- import os
- import sys
- from traceback import format_exception
- from typing import Optional, Union
- import colorama
- import ray._private.ray_constants as ray_constants
- import ray.cloudpickle as pickle
- from ray._raylet import ActorID, TaskID, WorkerID
- from ray.core.generated.common_pb2 import (
- PYTHON,
- ActorDiedErrorContext,
- Address,
- ErrorType,
- Language,
- NodeDeathInfo,
- RayException,
- )
- from ray.util.annotations import DeveloperAPI, PublicAPI
- logger = logging.getLogger(__name__)
- @PublicAPI
- class RayError(Exception):
- """Super class of all ray exception types."""
- def to_bytes(self):
- # Extract exc_info from exception object.
- exc_info = (type(self), self, self.__traceback__)
- formatted_exception_string = "\n".join(format_exception(*exc_info))
- return RayException(
- language=PYTHON,
- serialized_exception=pickle.dumps(self),
- formatted_exception_string=formatted_exception_string,
- ).SerializeToString()
- @staticmethod
- def from_bytes(b):
- ray_exception = RayException()
- ray_exception.ParseFromString(b)
- return RayError.from_ray_exception(ray_exception)
- @staticmethod
- def from_ray_exception(ray_exception):
- if ray_exception.language == PYTHON:
- try:
- return pickle.loads(ray_exception.serialized_exception)
- except Exception:
- # formatted_exception_string is set in to_bytes() above by calling
- # traceback.format_exception() on the original exception. It contains
- # the string representation and stack trace of the original error.
- original_stacktrace = getattr(
- ray_exception,
- "formatted_exception_string",
- "No formatted exception string available.",
- )
- return UnserializableException(original_stacktrace)
- else:
- return CrossLanguageError(ray_exception)
- @PublicAPI
- class CrossLanguageError(RayError):
- """Raised from another language."""
- def __init__(self, ray_exception):
- super().__init__(
- "An exception raised from {}:\n{}".format(
- Language.Name(ray_exception.language),
- ray_exception.formatted_exception_string,
- )
- )
- @PublicAPI
- class TaskCancelledError(RayError):
- """Raised when this task is cancelled.
- Args:
- task_id: The TaskID of the function that was directly
- cancelled.
- """
- def __init__(
- self, task_id: Optional[TaskID] = None, error_message: Optional[str] = None
- ):
- self.task_id = task_id
- self.error_message = error_message
- def __str__(self):
- msg = ""
- if self.task_id:
- msg = "Task: " + str(self.task_id) + " was cancelled. "
- if self.error_message:
- msg += self.error_message
- return msg
- @PublicAPI
- class RayTaskError(RayError):
- """Indicates that a task threw an exception during execution.
- If a task throws an exception during execution, a RayTaskError is stored in
- the object store for each of the task's outputs. When an object is
- retrieved from the object store, the Python method that retrieved it checks
- to see if the object is a RayTaskError and if it is then an exception is
- thrown propagating the error message.
- """
- def __init__(
- self,
- function_name,
- traceback_str,
- cause,
- proctitle=None,
- pid=None,
- ip=None,
- actor_repr=None,
- actor_id=None,
- ):
- """Initialize a RayTaskError."""
- import ray
- if proctitle:
- self.proctitle = proctitle
- else:
- self.proctitle = ray._raylet.getproctitle()
- self.pid = pid or os.getpid()
- self.ip = ip or ray.util.get_node_ip_address()
- self.function_name = function_name
- self.traceback_str = traceback_str
- self.actor_repr = actor_repr
- self._actor_id = actor_id
- self.cause = cause
- try:
- pickle.dumps(cause)
- except (pickle.PicklingError, TypeError) as e:
- err_type = f"{cause.__class__.__module__}.{cause.__class__.__name__}"
- err_msg = (
- f"Exception {err_type} isn't serializable: {e}.\n"
- f"Original exception details:\n{traceback_str}"
- )
- logger.exception(
- f"The original cause of the RayTaskError ({err_type}) isn't serializable."
- )
- self.cause = RayError(err_msg)
- # BaseException implements a __reduce__ method that returns
- # a tuple with the type and the value of self.args.
- # https://stackoverflow.com/a/49715949/2213289
- self.args = (function_name, traceback_str, self.cause, proctitle, pid, ip)
- assert traceback_str is not None
- def make_dual_exception_instance(self) -> "RayTaskError":
- """Makes a object instance that inherits from both RayTaskError and the type of
- `self.cause`. Raises TypeError if the cause class can't be subclassed"""
- # For normal user Exceptions, we subclass from both
- # RayTaskError and the user exception. For ExceptionGroup,
- # we special handle it because it has a different __new__()
- # signature from Exception.
- # Ref: https://docs.python.org/3/library/exceptions.html#exception-groups
- if sys.version_info >= (3, 11) and isinstance(
- self.cause, ExceptionGroup # noqa: F821
- ):
- return self._make_exceptiongroup_dual_exception_instance()
- return self._make_normal_dual_exception_instance()
- def _make_normal_dual_exception_instance(self) -> "RayTaskError":
- cause_cls = self.cause.__class__
- error_msg = str(self)
- class cls(RayTaskError, cause_cls):
- def __init__(self, cause):
- self.cause = cause
- # Store args separately to avoid writing to user-defined
- # read-only or property-based `args`.
- self._ray_task_error_args = (cause,)
- @property
- def args(self):
- return self._ray_task_error_args
- @args.setter
- def args(self, value):
- self._ray_task_error_args = value
- def __reduce__(self):
- return (cls, self._ray_task_error_args)
- def __getattr__(self, name):
- return getattr(self.cause, name)
- def __str__(self):
- return error_msg
- name = f"RayTaskError({cause_cls.__name__})"
- cls.__name__ = name
- cls.__qualname__ = name
- return cls(self.cause)
- def _make_exceptiongroup_dual_exception_instance(self) -> "RayTaskError":
- cause_cls = self.cause.__class__
- error_msg = str(self)
- class cls(RayTaskError, cause_cls):
- def __new__(cls, cause):
- self = super().__new__(cls, cause.message, cause.exceptions)
- return self
- def __init__(self, cause):
- self.cause = cause
- self._ray_task_error_args = (cause,)
- @property
- def args(self):
- return self._ray_task_error_args
- @args.setter
- def args(self, value):
- self._ray_task_error_args = value
- def __reduce__(self):
- return (cls, self._ray_task_error_args)
- def __getattr__(self, name):
- return getattr(self.cause, name)
- def __str__(self):
- return error_msg
- name = f"RayTaskError({cause_cls.__name__})"
- cls.__name__ = name
- cls.__qualname__ = name
- return cls(self.cause)
- def as_instanceof_cause(self):
- """Returns an exception that's an instance of the cause's class.
- The returned exception inherits from both RayTaskError and the
- cause class and contains all of the attributes of the cause
- exception.
- If the cause class can't be subclassed, issues a warning and returns `self`.
- """
- cause_cls = self.cause.__class__
- if issubclass(RayTaskError, cause_cls):
- return self # already satisfied
- try:
- return self.make_dual_exception_instance()
- except TypeError as e:
- logger.warning(
- f"User exception type {type(self.cause)} in RayTaskError can't"
- " be subclassed! This exception is raised as"
- " RayTaskError only. You can use `ray_task_error.cause` to"
- f" access the user exception. Failure in subclassing: {e}"
- )
- return self
- except Exception as e:
- logger.warning(
- "Failed to combine RayTaskError with user exception type "
- f"{type(self.cause)}; raising RayTaskError only. This can "
- "happen when the user exception overrides attributes like "
- "`args` or otherwise blocks subclass construction. "
- f"Failure in subclassing: {e}"
- )
- return self
- def __str__(self):
- """Format a RayTaskError as a string."""
- lines = self.traceback_str.strip().split("\n")
- out = []
- code_from_internal_file = False
- # Format tracebacks.
- # Python stacktrace consists of
- # Traceback...: Indicate the next line will be a traceback.
- # File [file_name + line number]
- # code
- # XError: [message]
- # NOTE: For _raylet.pyx (Cython), the code is not always included.
- for i, line in enumerate(lines):
- # Convert traceback to the readable information.
- if line.startswith("Traceback "):
- traceback_line = (
- f"{colorama.Fore.CYAN}"
- f"{self.proctitle}()"
- f"{colorama.Fore.RESET} "
- f"(pid={self.pid}, ip={self.ip}"
- )
- if self.actor_repr:
- traceback_line += (
- f", actor_id={self._actor_id}, repr={self.actor_repr})"
- )
- else:
- traceback_line += ")"
- code_from_internal_file = False
- out.append(traceback_line)
- elif line.startswith(" File ") and (
- "ray/worker.py" in line
- or "ray/_private/" in line
- or "ray/util/tracing/" in line
- or "ray/_raylet.pyx" in line
- ):
- # TODO(windows)
- # Process the internal file line.
- # The file line always starts with 2 space and File.
- # https://github.com/python/cpython/blob/0a0a135bae2692d069b18d2d590397fbe0a0d39a/Lib/traceback.py#L421 # noqa
- if "ray._raylet.raise_if_dependency_failed" in line:
- # It means the current task is failed
- # due to the dependency failure.
- # Print out an user-friendly
- # message to explain that..
- out.append(
- " At least one of the input arguments for "
- "this task could not be computed:"
- )
- if i + 1 < len(lines) and lines[i + 1].startswith(" "):
- # If the next line is indented with 2 space,
- # that means it contains internal code information.
- # For example,
- # File [file_name] [line]
- # [code] # if the next line is indented, it is code.
- # Note there there are 4 spaces in the code line.
- code_from_internal_file = True
- elif code_from_internal_file:
- # If the current line is internal file's code,
- # the next line is not code anymore.
- code_from_internal_file = False
- else:
- out.append(line)
- return "\n".join(out)
- @PublicAPI
- class LocalRayletDiedError(RayError):
- """Indicates that the task's local raylet died."""
- def __str__(self):
- return "The task's local raylet died. Check raylet.out for more information."
- @PublicAPI
- class WorkerCrashedError(RayError):
- """Indicates that the worker died unexpectedly while executing a task."""
- def __str__(self):
- return (
- "The worker died unexpectedly while executing this task. "
- "Check python-core-worker-*.log files for more information."
- )
- @PublicAPI
- class RayActorError(RayError):
- """Indicates that the actor has outages unexpectedly before finishing a task.
- This exception could happen because the actor process is dead, or is unavailable for
- the moment. Ray raises subclasses `ActorDiedError` and `ActorUnavailableError`
- respectively.
- """
- BASE_ERROR_MSG = "The actor experienced an error before finishing this task."
- def __init__(
- self,
- actor_id: str = None,
- error_msg: str = BASE_ERROR_MSG,
- actor_init_failed: bool = False,
- preempted: bool = False,
- ):
- #: The actor ID in hex string.
- self.actor_id = actor_id
- #: Whether the actor failed in the middle of __init__.
- self.error_msg = error_msg
- #: The full error message.
- self._actor_init_failed = actor_init_failed
- #: Whether the actor died because the node was preempted.
- self._preempted = preempted
- def __str__(self) -> str:
- return self.error_msg
- @property
- def preempted(self) -> bool:
- return self._preempted
- @property
- def actor_init_failed(self) -> bool:
- return self._actor_init_failed
- @DeveloperAPI
- class ActorDiedError(RayActorError):
- """Indicates that the actor died unexpectedly before finishing a task.
- This exception could happen either because the actor process dies while
- executing a task, or because a task is submitted to a dead actor.
- Args:
- cause: The cause of the actor error. `RayTaskError` type means
- the actor has died because of an exception within `__init__`.
- `ActorDiedErrorContext` means the actor has died because of
- an unexpected system error. None means the cause isn't known.
- Theoretically, this shouldn't happen,
- but it's there as a safety check.
- """
- BASE_ERROR_MSG = "The actor died unexpectedly before finishing this task."
- def __init__(
- self, cause: Optional[Union[RayTaskError, ActorDiedErrorContext]] = None
- ):
- """
- Construct a RayActorError by building the arguments.
- """
- actor_id = None
- error_msg = ActorDiedError.BASE_ERROR_MSG
- actor_init_failed = False
- preempted = False
- if not cause:
- # Use the defaults above.
- pass
- elif isinstance(cause, RayTaskError):
- actor_init_failed = True
- actor_id = cause._actor_id
- error_msg = (
- "The actor died because of an error"
- " raised in its creation task, "
- f"{cause.__str__()}"
- )
- else:
- # Indicating system-level actor failures.
- assert isinstance(cause, ActorDiedErrorContext)
- error_msg_lines = [ActorDiedError.BASE_ERROR_MSG]
- error_msg_lines.append(f"\tclass_name: {cause.class_name}")
- error_msg_lines.append(f"\tactor_id: {ActorID(cause.actor_id).hex()}")
- # Below items are optional fields.
- if cause.pid != 0:
- error_msg_lines.append(f"\tpid: {cause.pid}")
- if cause.name != "":
- error_msg_lines.append(f"\tname: {cause.name}")
- if cause.ray_namespace != "":
- error_msg_lines.append(f"\tnamespace: {cause.ray_namespace}")
- if cause.node_ip_address != "":
- error_msg_lines.append(f"\tip: {cause.node_ip_address}")
- error_msg_lines.append(cause.error_message)
- if cause.never_started:
- error_msg_lines.append(
- "The actor never ran - it was cancelled before it started running."
- )
- if (
- cause.node_death_info
- and cause.node_death_info.reason
- == NodeDeathInfo.AUTOSCALER_DRAIN_PREEMPTED
- ):
- preempted = True
- error_msg = "\n".join(error_msg_lines)
- actor_id = ActorID(cause.actor_id).hex()
- super().__init__(actor_id, error_msg, actor_init_failed, preempted)
- @staticmethod
- def from_task_error(task_error: RayTaskError):
- return ActorDiedError(task_error)
- @DeveloperAPI
- class ActorUnavailableError(RayActorError):
- """Raised when the actor is temporarily unavailable but may be available later."""
- def __init__(self, error_message: str, actor_id: Optional[bytes]):
- actor_id = ActorID(actor_id).hex() if actor_id is not None else None
- error_msg = (
- f"The actor {actor_id} is unavailable: {error_message}. The task may or "
- "may not have been executed on the actor."
- )
- actor_init_failed = False
- preempted = False
- super().__init__(actor_id, error_msg, actor_init_failed, preempted)
- @PublicAPI
- class RaySystemError(RayError):
- """Indicates that Ray encountered a system error.
- This exception can be thrown when the raylet is killed.
- """
- def __init__(self, client_exc, traceback_str=None):
- self.client_exc = client_exc
- self.traceback_str = traceback_str
- def __str__(self):
- error_msg = f"System error: {self.client_exc}"
- if self.traceback_str:
- error_msg += f"\ntraceback: {self.traceback_str}"
- return error_msg
- @PublicAPI
- class AuthenticationError(RayError):
- """Indicates that an authentication error occurred.
- Most commonly, this is caused by a missing or mismatching token set on the client
- (e.g., a Ray CLI command interacting with a remote cluster).
- Only applicable when `RAY_AUTH_MODE` is not set to `disabled`.
- """
- def __init__(self, message: str):
- self.message = message
- # Always hide traceback for cleaner output
- self.__suppress_context__ = True
- super().__init__(message)
- def __str__(self) -> str:
- # Check if RAY_AUTH_MODE is set to token and add a heads-up if not
- auth_mode_note = ""
- from ray._private.authentication.authentication_utils import (
- get_authentication_mode_name,
- )
- from ray._raylet import AuthenticationMode, get_authentication_mode
- current_mode = get_authentication_mode()
- if current_mode != AuthenticationMode.TOKEN:
- mode_name = get_authentication_mode_name(current_mode)
- auth_mode_note = (
- f" Note: RAY_AUTH_MODE is currently '{mode_name}' (not 'token')."
- )
- help_text = (
- " Ensure that the token for the cluster is available in a local file (e.g., ~/.ray/auth_token or via "
- "RAY_AUTH_TOKEN_PATH) or as the `RAY_AUTH_TOKEN` environment variable. "
- "To generate a token for local development, use `ray get-auth-token --generate` "
- "For remote clusters, ensure that the token is propagated to all nodes of the cluster when token authentication is enabled. "
- "For more information, see: https://docs.ray.io/en/latest/ray-security/token-auth.html"
- )
- return self.message + "." + auth_mode_note + help_text
- @DeveloperAPI
- class UserCodeException(RayError):
- """Indicates that an exception occurred while executing user code.
- For example, this exception can be used to wrap user code exceptions
- from a remote task or actor. The `retry_exceptions` parameter will
- still respect the underlying cause of this exception."""
- pass
- @PublicAPI
- class ObjectStoreFullError(RayError):
- """Indicates that the object store is full.
- This is raised if the attempt to store the object fails
- because the object store is full even after multiple retries.
- """
- def __str__(self):
- return super(ObjectStoreFullError, self).__str__() + (
- "\n"
- "The local object store is full of objects that are still in "
- "scope and cannot be evicted. Tip: Use the `ray memory` command "
- "to list active objects in the cluster."
- )
- @PublicAPI
- class OutOfDiskError(RayError):
- """Indicates that the local disk is full.
- This is raised if the attempt to store the object fails
- because both the object store and disk are full.
- """
- def __str__(self):
- # TODO(scv119): expose more disk usage information and link to a doc.
- return super(OutOfDiskError, self).__str__() + (
- "\n"
- "The object cannot be created because the local object store"
- " is full and the local disk's utilization is over capacity"
- " (95% by default)."
- "Tip: Use `df` on this node to check disk usage and "
- "`ray memory` to check object store memory usage."
- )
- @PublicAPI
- class OutOfMemoryError(RayError):
- """Indicates that the node is running out of memory and is close to full.
- This is raised if the node is low on memory and tasks or actors are being
- evicted to free up memory.
- """
- # TODO: (clarng) expose the error message string here and format it with proto
- def __init__(self, message):
- self.message = message
- def __str__(self):
- return self.message
- @PublicAPI
- class NodeDiedError(RayError):
- """Indicates that the node is either dead or unreachable."""
- # TODO: (clarng) expose the error message string here and format it with proto
- def __init__(self, message):
- self.message = message
- def __str__(self):
- return self.message
- @PublicAPI
- class ObjectLostError(RayError):
- """Indicates that the object is lost from distributed memory, due to
- node failure or system error.
- Args:
- object_ref_hex: Hex ID of the object.
- """
- def __init__(self, object_ref_hex, owner_address, call_site):
- self.object_ref_hex = object_ref_hex
- self.owner_address = owner_address
- self.call_site = call_site.replace(
- ray_constants.CALL_STACK_LINE_DELIMITER, "\n "
- )
- def _base_str(self):
- msg = f"Failed to retrieve object {self.object_ref_hex}. "
- if self.call_site:
- msg += f"The ObjectRef was created at: {self.call_site}"
- else:
- msg += (
- "To see information about where this ObjectRef was created "
- "in Python, set the environment variable "
- "RAY_record_ref_creation_sites=1 during `ray start` and "
- "`ray.init()`."
- )
- return msg
- def __str__(self):
- return (
- self._base_str()
- + "\n\n"
- + (
- f"All copies of {self.object_ref_hex} have been lost due to node "
- "failure. Check cluster logs (`/tmp/ray/session_latest/logs`) for "
- "more information about the failure."
- )
- )
- @PublicAPI
- class ObjectFetchTimedOutError(ObjectLostError):
- """Indicates that an object fetch timed out.
- Args:
- object_ref_hex: Hex ID of the object.
- """
- def __str__(self):
- return (
- self._base_str()
- + "\n\n"
- + (
- f"Fetch for object {self.object_ref_hex} timed out because no "
- "locations were found for the object. This may indicate a "
- "system-level bug."
- )
- )
- @DeveloperAPI
- class RpcError(RayError):
- """Indicates an error in the underlying RPC system."""
- def __init__(self, message, rpc_code=None):
- self.message = message
- self.rpc_code = rpc_code
- def __str__(self):
- return self.message
- @DeveloperAPI
- class ReferenceCountingAssertionError(ObjectLostError, AssertionError):
- """Indicates that an object has been deleted while there was still a
- reference to it.
- Args:
- object_ref_hex: Hex ID of the object.
- """
- def __str__(self):
- return (
- self._base_str()
- + "\n\n"
- + (
- "The object has already been deleted by the reference counting "
- "protocol. This should not happen."
- )
- )
- @DeveloperAPI
- class ObjectFreedError(ObjectLostError):
- """Indicates that an object was manually freed by the application.
- Attributes:
- object_ref_hex: Hex ID of the object.
- """
- def __str__(self):
- return (
- self._base_str()
- + "\n\n"
- + (
- "The object was manually freed using the internal `free` call. "
- "Please ensure that `free` is only called once the object is no "
- "longer needed."
- )
- )
- @PublicAPI
- class OwnerDiedError(ObjectLostError):
- """Indicates that the owner of the object has died while there is still a
- reference to the object.
- Args:
- object_ref_hex: Hex ID of the object.
- """
- def __str__(self):
- log_loc = "`/tmp/ray/session_latest/logs`"
- if self.owner_address:
- try:
- addr = Address()
- addr.ParseFromString(self.owner_address)
- ip_addr = addr.ip_address
- worker_id = WorkerID(addr.worker_id)
- log_loc = (
- f"`/tmp/ray/session_latest/logs/*{worker_id.hex()}*`"
- f" at IP address {ip_addr}"
- )
- except Exception:
- # Catch all to make sure we always at least print the default
- # message.
- pass
- return (
- self._base_str()
- + "\n\n"
- + (
- "The object's owner has exited. This is the Python "
- "worker that first created the ObjectRef via `.remote()` or "
- "`ray.put()`. "
- f"Check cluster logs ({log_loc}) for more "
- "information about the Python worker failure."
- )
- )
- @PublicAPI
- class ObjectReconstructionFailedError(ObjectLostError):
- """Indicates that the object cannot be reconstructed."""
- REASON_MESSAGES = {
- ErrorType.OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED: (
- "The object cannot be reconstructed because the maximum number of "
- "task retries has been exceeded. "
- "Consider increasing the number of retries using `@ray.remote(max_retries=N)`."
- ),
- ErrorType.OBJECT_UNRECONSTRUCTABLE_LINEAGE_EVICTED: (
- "The object cannot be reconstructed because its lineage has been "
- "evicted to reduce memory pressure. "
- "To prevent this error, set the environment variable "
- "RAY_max_lineage_bytes=<bytes> (default 1GB) during `ray start`."
- ),
- ErrorType.OBJECT_UNRECONSTRUCTABLE_PUT: (
- "The object cannot be reconstructed because it was created by "
- "ray.put(), which has no task lineage. "
- "To prevent this error, return the value from a task instead."
- ),
- ErrorType.OBJECT_UNRECONSTRUCTABLE_RETRIES_DISABLED: (
- "The object cannot be reconstructed because the task was created "
- "with max_retries=0. "
- "Consider enabling retries using `@ray.remote(max_retries=N)`."
- ),
- ErrorType.OBJECT_UNRECONSTRUCTABLE_BORROWED: (
- "The object cannot be reconstructed because it crossed an ownership "
- "boundary. Only the owner of an object can trigger reconstruction, "
- "but this worker borrowed the object from another worker."
- ),
- ErrorType.OBJECT_UNRECONSTRUCTABLE_LOCAL_MODE: (
- "The object cannot be reconstructed because Ray is running in "
- "local mode. Local mode does not support object reconstruction."
- ),
- ErrorType.OBJECT_UNRECONSTRUCTABLE_REF_NOT_FOUND: (
- "The object cannot be reconstructed because its reference was "
- "not found in the reference counter. "
- "Please file an issue at https://github.com/ray-project/ray/issues."
- ),
- ErrorType.OBJECT_UNRECONSTRUCTABLE_TASK_CANCELLED: (
- "The object cannot be reconstructed because the task that would "
- "produce it was cancelled."
- ),
- ErrorType.OBJECT_UNRECONSTRUCTABLE_LINEAGE_DISABLED: (
- "The object cannot be reconstructed because lineage reconstruction "
- "is disabled system-wide (object_reconstruction_enabled=False)."
- ),
- }
- def __init__(
- self,
- object_ref_hex: str,
- reason: "ErrorType" = None,
- reason_message: str = None,
- owner_address: Optional[Address] = None,
- call_site: str = "",
- ):
- """Initialize ObjectReconstructionFailedError.
- Args:
- object_ref_hex: Hex string of the object reference.
- reason: ErrorType enum value indicating why reconstruction failed.
- reason_message: Human-readable explanation of the failure.
- owner_address: Address of the object's owner.
- call_site: Call site where the object was created.
- """
- super().__init__(object_ref_hex, owner_address, call_site)
- self.reason = reason
- self.reason_message = reason_message or self.REASON_MESSAGES.get(
- self.reason,
- "Unknown error reason. This should not happen, please file an issue "
- "at https://github.com/ray-project/ray/issues.",
- )
- def __str__(self):
- base = self._base_str()
- if self.reason_message:
- reason_name = ErrorType.Name(self.reason) if self.reason else "UNKNOWN"
- return base + f"\n\n[{reason_name}] {self.reason_message}"
- return base
- @PublicAPI
- class GetTimeoutError(RayError, TimeoutError):
- """Indicates that a call to the worker timed out."""
- pass
- @PublicAPI
- class PlasmaObjectNotAvailable(RayError):
- """Called when an object was not available within the given timeout."""
- pass
- @PublicAPI
- class AsyncioActorExit(RayError):
- """Raised when an asyncio actor intentionally exits via exit_actor()."""
- pass
- @PublicAPI
- class RuntimeEnvSetupError(RayError):
- """Raised when a runtime environment fails to be set up.
- Args:
- error_message: The error message that explains
- why runtime env setup has failed.
- """
- def __init__(self, error_message: str = None):
- self.error_message = error_message
- def __str__(self):
- msgs = ["Failed to set up runtime environment."]
- if self.error_message:
- msgs.append(self.error_message)
- return "\n".join(msgs)
- @PublicAPI
- class TaskPlacementGroupRemoved(RayError):
- """Raised when the corresponding placement group was removed."""
- def __str__(self):
- return "The placement group corresponding to this task has been removed."
- @PublicAPI
- class ActorPlacementGroupRemoved(RayError):
- """Raised when the corresponding placement group was removed."""
- def __str__(self):
- return "The placement group corresponding to this Actor has been removed."
- @PublicAPI
- class PendingCallsLimitExceeded(RayError):
- """Raised when the pending actor calls exceeds `max_pending_calls` option.
- This exception could happen probably because the caller calls the callee
- too frequently.
- """
- pass
- @PublicAPI
- class TaskUnschedulableError(RayError):
- """Raised when the task cannot be scheduled.
- One example is that the node specified through
- NodeAffinitySchedulingStrategy is dead.
- """
- def __init__(self, error_message: str):
- self.error_message = error_message
- def __str__(self):
- return f"The task is not schedulable: {self.error_message}"
- @PublicAPI
- class ActorUnschedulableError(RayError):
- """Raised when the actor cannot be scheduled.
- One example is that the node specified through
- NodeAffinitySchedulingStrategy is dead.
- """
- def __init__(self, error_message: str):
- self.error_message = error_message
- def __str__(self):
- return f"The actor is not schedulable: {self.error_message}"
- @DeveloperAPI
- class ObjectRefStreamEndOfStreamError(RayError):
- """Raised by streaming generator tasks when there are no more ObjectRefs to
- read.
- """
- pass
- @DeveloperAPI
- class OufOfBandObjectRefSerializationException(RayError):
- """Raised when an `ray.ObjectRef` is out of band serialized by
- `ray.cloudpickle`. It is an anti pattern.
- """
- pass
- @PublicAPI(stability="alpha")
- class RayChannelError(RaySystemError):
- """Indicates that Ray encountered a system error related
- to ray.experimental.channel.
- """
- pass
- @PublicAPI(stability="alpha")
- class RayChannelTimeoutError(RayChannelError, TimeoutError):
- """Raised when the Compiled Graph channel operation times out."""
- pass
- @PublicAPI(stability="alpha")
- class RayCgraphCapacityExceeded(RaySystemError):
- """Raised when the Compiled Graph channel's buffer is at max capacity"""
- pass
- @PublicAPI(stability="alpha")
- class RayDirectTransportError(RaySystemError):
- """Raised when there is an error during a Ray direct transport transfer."""
- pass
- @PublicAPI(stability="alpha")
- class UnserializableException(RayError):
- """Raised when there is an error deserializing a serialized exception.
- This occurs when deserializing (unpickling) a previously serialized exception
- fails. In this case, we fall back to raising the string representation of
- the original exception along with its stack trace that was captured at the
- time of serialization.
- For more details and how to handle this with custom serializers, :ref:`configuring custom exception serializers <custom-exception-serializer>`
- Args:
- original_stack_trace: The string representation and stack trace of the
- original exception that was captured during serialization.
- """
- def __init__(self, original_stack_trace: str):
- self._original_stack_trace = original_stack_trace
- def __str__(self):
- return (
- "Failed to deserialize exception. Refer to https://docs.ray.io/en/latest/ray-core/objects/serialization.html#custom-serializers-for-exceptions for more information.\n"
- "Original exception:\n"
- f"{self._original_stack_trace}"
- )
- @DeveloperAPI
- class ActorAlreadyExistsError(ValueError, RayError):
- """Raised when a named actor already exists.
- Note that this error is not only a subclass of RayError, but also a subclass of ValueError, to maintain backward compatibility.
- Args:
- error_message: The error message that contains information about the actor name and namespace.
- """
- def __init__(self, error_message: str):
- super().__init__(error_message)
- self.error_message = error_message
- def __str__(self):
- return self.error_message
- @DeveloperAPI
- class ActorHandleNotFoundError(ValueError, RayError):
- """Raised when trying to kill an actor handle that doesn't exist.
- This typically happens when using an actor handle from a previous Ray session
- after calling ray.shutdown() and ray.init().
- Note that this error is not only a subclass of RayError, but also a subclass of ValueError,
- to maintain backward compatibility.
- Args:
- error_message: The error message that contains information about the actor handle.
- """
- def __init__(self, error_message: str):
- super().__init__(error_message)
- self.error_message = error_message
- def __str__(self):
- return self.error_message
- RAY_EXCEPTION_TYPES = [
- PlasmaObjectNotAvailable,
- RayError,
- RayTaskError,
- WorkerCrashedError,
- RayActorError,
- ObjectStoreFullError,
- ObjectLostError,
- ObjectFetchTimedOutError,
- ReferenceCountingAssertionError,
- ObjectReconstructionFailedError,
- OwnerDiedError,
- GetTimeoutError,
- AsyncioActorExit,
- RuntimeEnvSetupError,
- TaskPlacementGroupRemoved,
- ActorPlacementGroupRemoved,
- PendingCallsLimitExceeded,
- LocalRayletDiedError,
- TaskUnschedulableError,
- ActorDiedError,
- ActorUnschedulableError,
- ActorUnavailableError,
- RayChannelError,
- RayChannelTimeoutError,
- OufOfBandObjectRefSerializationException,
- RayCgraphCapacityExceeded,
- UnserializableException,
- ActorAlreadyExistsError,
- ActorHandleNotFoundError,
- AuthenticationError,
- ]
|