| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656 |
- import logging
- import threading
- from typing import Any, Dict, List, Optional
- import ray._private.worker
- from ray._private.client_mode_hook import client_mode_hook
- from ray._private.state import actors
- from ray._private.utils import parse_pg_formatted_resources_to_original
- from ray._raylet import TaskID
- from ray.runtime_env import RuntimeEnv
- from ray.util.annotations import Deprecated, PublicAPI
- logger = logging.getLogger(__name__)
- @PublicAPI
- class RuntimeContext(object):
- """A class used for getting runtime context."""
- def __init__(self, worker):
- assert worker is not None
- self.worker = worker
- @Deprecated(
- message="Use get_xxx_id() methods to get relevant ids instead", warning=True
- )
- def get(self) -> Dict[str, Any]:
- """Get a dictionary of the current context.
- Returns:
- dict: Dictionary of the current context.
- """
- context = {
- "job_id": self.job_id,
- "node_id": self.node_id,
- "namespace": self.namespace,
- }
- if self.worker.mode == ray._private.worker.WORKER_MODE:
- if self.task_id is not None:
- context["task_id"] = self.task_id
- if self.actor_id is not None:
- context["actor_id"] = self.actor_id
- return context
- @property
- @Deprecated(message="Use get_job_id() instead", warning=True)
- def job_id(self):
- """Get current job ID for this worker or driver.
- Job ID is the id of your Ray drivers that create tasks or actors.
- Returns:
- If called by a driver, this returns the job ID. If called in
- a task, return the job ID of the associated driver.
- """
- job_id = self.worker.current_job_id
- assert not job_id.is_nil()
- return job_id
- def get_job_id(self) -> str:
- """Get current job ID for this worker or driver.
- Job ID is the id of your Ray drivers that create tasks or actors.
- Returns:
- If called by a driver, this returns the job ID. If called in
- a task, return the job ID of the associated driver. The
- job ID will be hex format.
- Raises:
- RuntimeError: If Ray has not been initialized.
- """
- if not ray.is_initialized():
- raise RuntimeError(
- "Job ID is not available because Ray has not been initialized."
- )
- job_id = self.worker.current_job_id
- return job_id.hex()
- @property
- @Deprecated(message="Use get_node_id() instead", warning=True)
- def node_id(self):
- """Get the ID for the node that this process is running on.
- This can be called from within a driver, task, or actor.
- When called from a driver that is connected to a remote Ray cluster using
- Ray Client, this returns the ID of the head node.
- Returns:
- A node id for this worker or driver.
- """
- node_id = self.worker.current_node_id
- assert not node_id.is_nil()
- return node_id
- def get_node_id(self) -> str:
- """Get the ID for the node that this process is running on.
- This can be called from within a driver, task, or actor.
- When called from a driver that is connected to a remote Ray cluster using
- Ray Client, this returns the ID of the head node.
- Returns:
- A node id in hex format for this worker or driver.
- Raises:
- RuntimeError: If Ray has not been initialized.
- """
- if not ray.is_initialized():
- raise RuntimeError(
- "Node ID is not available because Ray has not been initialized."
- )
- node_id = self.worker.current_node_id
- return node_id.hex()
- def get_session_name(self) -> str:
- """Get the session name for the Ray cluster this process is connected to.
- The session name uniquely identifies a Ray cluster instance. This is the
- same value that appears as the ``SessionName`` label in Ray metrics,
- making it useful for filtering metrics when multiple clusters run the same
- application name.
- This can be called from within a driver, task, or actor.
- Example:
- .. testcode::
- import ray
- ray.init()
- session_name = ray.get_runtime_context().get_session_name()
- print(f"Session Name: {session_name}")
- @ray.remote
- def get_session_name():
- return ray.get_runtime_context().get_session_name()
- # Session name is the same across all processes in the cluster
- assert ray.get(get_session_name.remote()) == session_name
- # Use SessionName label to filter metrics by cluster, e.g.:
- # ray_serve_http_request_latency_ms_bucket{SessionName="<session_name>"}
- Returns:
- A session name string for the Ray cluster (e.g.,
- "session_2025-01-01_00-00-00_000000_1234").
- Raises:
- RuntimeError: If Ray has not been initialized.
- """
- if not ray.is_initialized():
- raise RuntimeError(
- "Session name is not available because Ray has not been initialized."
- )
- return self.worker.node.session_name
- def get_worker_id(self) -> str:
- """Get current worker ID for this worker or driver process.
- Returns:
- A worker id in hex format for this worker or driver process.
- Raises:
- RuntimeError: If Ray has not been initialized.
- """
- if not ray.is_initialized():
- raise RuntimeError(
- "Worker ID is not available because Ray has not been initialized."
- )
- return self.worker.worker_id.hex()
- @property
- @Deprecated(message="Use get_task_id() instead", warning=True)
- def task_id(self):
- """Get current task ID for this worker.
- Task ID is the id of a Ray task.
- This shouldn't be used in a driver process.
- Example:
- .. testcode::
- import ray
- @ray.remote
- class Actor:
- def ready(self):
- return True
- @ray.remote
- def f():
- return True
- # All the below code generates different task ids.
- # Task ids are available for actor creation.
- a = Actor.remote()
- # Task ids are available for actor tasks.
- a.ready.remote()
- # Task ids are available for normal tasks.
- f.remote()
- Returns:
- The current worker's task id. None if there's no task id.
- """
- # only worker mode has task_id
- assert (
- self.worker.mode == ray._private.worker.WORKER_MODE
- ), f"This method is only available when the process is a\
- worker. Current mode: {self.worker.mode}"
- task_id = self._get_current_task_id()
- return task_id if not task_id.is_nil() else None
- def get_task_id(self) -> Optional[str]:
- """Get current task ID for this worker.
- Task ID is the id of a Ray task. The ID will be in hex format.
- This shouldn't be used in a driver process.
- Example:
- .. testcode::
- import ray
- @ray.remote
- class Actor:
- def get_task_id(self):
- return ray.get_runtime_context().get_task_id()
- @ray.remote
- def get_task_id():
- return ray.get_runtime_context().get_task_id()
- # All the below code generates different task ids.
- a = Actor.remote()
- # Task ids are available for actor tasks.
- print(ray.get(a.get_task_id.remote()))
- # Task ids are available for normal tasks.
- print(ray.get(get_task_id.remote()))
- .. testoutput::
- :options: +MOCK
- 16310a0f0a45af5c2746a0e6efb235c0962896a201000000
- c2668a65bda616c1ffffffffffffffffffffffff01000000
- Returns:
- The current worker's task id in hex. None if there's no task id.
- """
- # only worker mode has task_id
- if self.worker.mode != ray._private.worker.WORKER_MODE:
- logger.warning(
- "This method is only available when the process is a "
- f"worker. Current mode: {self.worker.mode}"
- )
- return None
- task_id = self._get_current_task_id()
- return task_id.hex() if not task_id.is_nil() else None
- def _get_current_task_id(self) -> TaskID:
- return self.worker.current_task_id
- def get_task_name(self) -> Optional[str]:
- """Get current task name for this worker.
- Task name by default is the task's function call string. It can also be
- specified in options when triggering a task.
- Example:
- .. testcode::
- import ray
- @ray.remote
- class Actor:
- def get_task_name(self):
- return ray.get_runtime_context().get_task_name()
- @ray.remote
- class AsyncActor:
- async def get_task_name(self):
- return ray.get_runtime_context().get_task_name()
- @ray.remote
- def get_task_name():
- return ray.get_runtime_context().get_task_name()
- a = Actor.remote()
- b = AsyncActor.remote()
- # Task names are available for actor tasks.
- print(ray.get(a.get_task_name.remote()))
- # Task names are available for async actor tasks.
- print(ray.get(b.get_task_name.remote()))
- # Task names are available for normal tasks.
- # Get default task name
- print(ray.get(get_task_name.remote()))
- # Get specified task name
- print(ray.get(get_task_name.options(name="task_name").remote()))
- .. testoutput::
- :options: +MOCK
- Actor.get_task_name
- AsyncActor.get_task_name
- get_task_name
- task_name
- Returns:
- The current worker's task name
- """
- # only worker mode has task_name
- if self.worker.mode != ray._private.worker.WORKER_MODE:
- logger.warning(
- "This method is only available when the process is a "
- f"worker. Current mode: {self.worker.mode}"
- )
- return None
- return self.worker.current_task_name
- def get_task_function_name(self) -> Optional[str]:
- """Get current task function name string for this worker.
- Example:
- .. testcode::
- import ray
- @ray.remote
- class Actor:
- def get_task_function_name(self):
- return ray.get_runtime_context().get_task_function_name()
- @ray.remote
- class AsyncActor:
- async def get_task_function_name(self):
- return ray.get_runtime_context().get_task_function_name()
- @ray.remote
- def get_task_function_name():
- return ray.get_runtime_context().get_task_function_name()
- a = Actor.remote()
- b = AsyncActor.remote()
- # Task functions are available for actor tasks.
- print(ray.get(a.get_task_function_name.remote()))
- # Task functions are available for async actor tasks.
- print(ray.get(b.get_task_function_name.remote()))
- # Task functions are available for normal tasks.
- print(ray.get(get_task_function_name.remote()))
- .. testoutput::
- :options: +MOCK
- [python module name].Actor.get_task_function_name
- [python module name].AsyncActor.get_task_function_name
- [python module name].get_task_function_name
- Returns:
- The current worker's task function call string
- """
- # only worker mode has task_function_name
- if self.worker.mode != ray._private.worker.WORKER_MODE:
- logger.warning(
- "This method is only available when the process is a "
- f"worker. Current mode: {self.worker.mode}"
- )
- return None
- return self.worker.current_task_function_name
- @property
- @Deprecated(message="Use get_actor_id() instead", warning=True)
- def actor_id(self):
- """Get the current actor ID in this worker.
- ID of the actor of the current process.
- This shouldn't be used in a driver process.
- Returns:
- The current actor id in this worker. None if there's no actor id.
- """
- # only worker mode has actor_id
- assert (
- self.worker.mode == ray._private.worker.WORKER_MODE
- ), f"This method is only available when the process is a\
- worker. Current mode: {self.worker.mode}"
- actor_id = self.worker.actor_id
- return actor_id if not actor_id.is_nil() else None
- def get_actor_id(self) -> Optional[str]:
- """Get the current actor ID in this worker.
- ID of the actor of the current process.
- This shouldn't be used in a driver process.
- The ID will be in hex format.
- Returns:
- The current actor id in hex format in this worker. None if there's no
- actor id.
- """
- # only worker mode has actor_id
- if self.worker.mode != ray._private.worker.WORKER_MODE:
- logger.debug(
- "This method is only available when the process is a "
- f"worker. Current mode: {self.worker.mode}"
- )
- return None
- actor_id = self.worker.actor_id
- return actor_id.hex() if not actor_id.is_nil() else None
- def get_actor_name(self) -> Optional[str]:
- """Get the current actor name of this worker.
- This shouldn't be used in a driver process.
- The name is in string format.
- Returns:
- The current actor name of this worker.
- If a current worker is an actor, and
- if actor name doesn't exist, it returns an empty string.
- If a current worker is not an actor, it returns None.
- """
- # only worker mode has actor_id
- if self.worker.mode != ray._private.worker.WORKER_MODE:
- logger.warning(
- "This method is only available when the process is a "
- f"worker. Current mode: {self.worker.mode}"
- )
- return None
- actor_id = self.worker.actor_id
- return self.worker.actor_name if not actor_id.is_nil() else None
- @property
- def namespace(self):
- """Get the current namespace of this worker.
- Returns:
- The current namespace of this worker.
- """
- return self.worker.namespace
- @property
- def was_current_actor_reconstructed(self):
- """Check whether this actor has been restarted.
- Returns:
- Whether this actor has been ever restarted.
- """
- assert (
- not self.actor_id.is_nil()
- ), "This method should't be called inside Ray tasks."
- actor_info = actors(actor_id=self.actor_id.hex())
- return actor_info and actor_info["NumRestarts"] != 0
- @property
- @Deprecated(message="Use get_placement_group_id() instead", warning=True)
- def current_placement_group_id(self):
- """Get the current Placement group ID of this worker.
- Returns:
- The current placement group id of this worker.
- """
- return self.worker.placement_group_id
- def get_placement_group_id(self) -> Optional[str]:
- """Get the current Placement group ID of this worker.
- Returns:
- The current placement group id in hex format of this worker.
- """
- pg_id = self.worker.placement_group_id
- return pg_id.hex() if not pg_id.is_nil() else None
- @property
- def should_capture_child_tasks_in_placement_group(self):
- """Get if the current task should capture parent's placement group.
- This returns True if it is called inside a driver.
- Returns:
- Return True if the current task should implicitly
- capture the parent placement group.
- """
- return self.worker.should_capture_child_tasks_in_placement_group
- def get_assigned_resources(self):
- """Get the assigned resources to this worker.
- By default for tasks, this will return {"CPU": 1}.
- By default for actors, this will return {}. This is because
- actors do not have CPUs assigned to them by default.
- Returns:
- A dictionary mapping the name of a resource to a float, where
- the float represents the amount of that resource reserved
- for this worker.
- """
- assert (
- self.worker.mode == ray._private.worker.WORKER_MODE
- ), f"This method is only available when the process is a\
- worker. Current mode: {self.worker.mode}"
- self.worker.check_connected()
- resource_id_map = self.worker.core_worker.resource_ids()
- resource_map = {
- res: sum(amt for _, amt in mapping)
- for res, mapping in resource_id_map.items()
- }
- result = parse_pg_formatted_resources_to_original(resource_map)
- return result
- def get_runtime_env_string(self):
- """Get the runtime env string used for the current driver or worker.
- Returns:
- The runtime env string currently using by this worker.
- """
- return self.worker.runtime_env
- @property
- def runtime_env(self):
- """Get the runtime env used for the current driver or worker.
- Returns:
- The runtime env currently using by this worker. The type of
- return value is ray.runtime_env.RuntimeEnv.
- """
- return RuntimeEnv.deserialize(self.get_runtime_env_string())
- @property
- def current_actor(self):
- """Get the current actor handle of this actor itself.
- Returns:
- The handle of current actor.
- """
- worker = self.worker
- worker.check_connected()
- actor_id = worker.actor_id
- if actor_id.is_nil():
- raise RuntimeError("This method is only available in an actor.")
- return worker.core_worker.get_actor_handle(actor_id)
- @property
- def gcs_address(self):
- """Get the GCS address of the ray cluster.
- Returns:
- The GCS address of the cluster.
- """
- self.worker.check_connected()
- return self.worker.gcs_client.address
- @Deprecated(message="Use get_accelerator_ids() instead", warning=True)
- def get_resource_ids(self) -> Dict[str, List[str]]:
- return self.get_accelerator_ids()
- def get_accelerator_ids(self) -> Dict[str, List[str]]:
- """
- Get the current worker's visible accelerator ids.
- Returns:
- A dictionary keyed by the accelerator resource name. The values are a list
- of ids `{'GPU': ['0', '1'], 'neuron_cores': ['0', '1'],
- 'TPU': ['0', '1']}`.
- """
- worker = self.worker
- worker.check_connected()
- ids_dict: Dict[str, List[str]] = {}
- for (
- accelerator_resource_name
- ) in ray._private.accelerators.get_all_accelerator_resource_names():
- accelerator_ids = worker.get_accelerator_ids_for_accelerator_resource(
- accelerator_resource_name,
- f"^{accelerator_resource_name}_group_[0-9A-Za-z]+$",
- )
- ids_dict[accelerator_resource_name] = [str(id) for id in accelerator_ids]
- return ids_dict
- def get_node_labels(self) -> Dict[str, List[str]]:
- """
- Get the node labels of the current worker.
- Returns:
- A dictionary of label key-value pairs.
- """
- worker = self.worker
- worker.check_connected()
- return worker.current_node_labels
- def is_canceled(self) -> bool:
- """Check if the current task has been canceled.
- This can be used to periodically check if ray.cancel() has been
- called on the current task and perform graceful cleanup.
- Returns:
- True if the task has been canceled, False otherwise.
- Raises:
- RuntimeError: If called from a driver or async actor context.
- """
- if self.worker.mode != ray._private.worker.WORKER_MODE:
- raise RuntimeError(
- "This method is only available when the process is a worker. "
- f"Current mode: {self.worker.mode}"
- )
- if self.worker.core_worker.current_actor_is_asyncio():
- raise RuntimeError("This method is not supported in an async actor.")
- return self.worker.is_canceled
- _runtime_context = None
- _runtime_context_lock = threading.Lock()
- @PublicAPI
- @client_mode_hook
- def get_runtime_context() -> RuntimeContext:
- """Get the runtime context of the current driver/worker.
- The obtained runtime context can be used to get the metadata
- of the current driver, task, or actor.
- Example:
- .. testcode::
- import ray
- # Get the job id.
- ray.get_runtime_context().get_job_id()
- # Get the session name (used as SessionName label in Ray metrics).
- ray.get_runtime_context().get_session_name()
- # Get the actor id.
- ray.get_runtime_context().get_actor_id()
- # Get the task id.
- ray.get_runtime_context().get_task_id()
- """
- with _runtime_context_lock:
- global _runtime_context
- if _runtime_context is None:
- _runtime_context = RuntimeContext(ray._private.worker.global_worker)
- return _runtime_context
|