runtime_context.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. import logging
  2. import threading
  3. from typing import Any, Dict, List, Optional
  4. import ray._private.worker
  5. from ray._private.client_mode_hook import client_mode_hook
  6. from ray._private.state import actors
  7. from ray._private.utils import parse_pg_formatted_resources_to_original
  8. from ray._raylet import TaskID
  9. from ray.runtime_env import RuntimeEnv
  10. from ray.util.annotations import Deprecated, PublicAPI
  11. logger = logging.getLogger(__name__)
  12. @PublicAPI
  13. class RuntimeContext(object):
  14. """A class used for getting runtime context."""
  15. def __init__(self, worker):
  16. assert worker is not None
  17. self.worker = worker
  18. @Deprecated(
  19. message="Use get_xxx_id() methods to get relevant ids instead", warning=True
  20. )
  21. def get(self) -> Dict[str, Any]:
  22. """Get a dictionary of the current context.
  23. Returns:
  24. dict: Dictionary of the current context.
  25. """
  26. context = {
  27. "job_id": self.job_id,
  28. "node_id": self.node_id,
  29. "namespace": self.namespace,
  30. }
  31. if self.worker.mode == ray._private.worker.WORKER_MODE:
  32. if self.task_id is not None:
  33. context["task_id"] = self.task_id
  34. if self.actor_id is not None:
  35. context["actor_id"] = self.actor_id
  36. return context
  37. @property
  38. @Deprecated(message="Use get_job_id() instead", warning=True)
  39. def job_id(self):
  40. """Get current job ID for this worker or driver.
  41. Job ID is the id of your Ray drivers that create tasks or actors.
  42. Returns:
  43. If called by a driver, this returns the job ID. If called in
  44. a task, return the job ID of the associated driver.
  45. """
  46. job_id = self.worker.current_job_id
  47. assert not job_id.is_nil()
  48. return job_id
  49. def get_job_id(self) -> str:
  50. """Get current job ID for this worker or driver.
  51. Job ID is the id of your Ray drivers that create tasks or actors.
  52. Returns:
  53. If called by a driver, this returns the job ID. If called in
  54. a task, return the job ID of the associated driver. The
  55. job ID will be hex format.
  56. Raises:
  57. RuntimeError: If Ray has not been initialized.
  58. """
  59. if not ray.is_initialized():
  60. raise RuntimeError(
  61. "Job ID is not available because Ray has not been initialized."
  62. )
  63. job_id = self.worker.current_job_id
  64. return job_id.hex()
  65. @property
  66. @Deprecated(message="Use get_node_id() instead", warning=True)
  67. def node_id(self):
  68. """Get the ID for the node that this process is running on.
  69. This can be called from within a driver, task, or actor.
  70. When called from a driver that is connected to a remote Ray cluster using
  71. Ray Client, this returns the ID of the head node.
  72. Returns:
  73. A node id for this worker or driver.
  74. """
  75. node_id = self.worker.current_node_id
  76. assert not node_id.is_nil()
  77. return node_id
  78. def get_node_id(self) -> str:
  79. """Get the ID for the node that this process is running on.
  80. This can be called from within a driver, task, or actor.
  81. When called from a driver that is connected to a remote Ray cluster using
  82. Ray Client, this returns the ID of the head node.
  83. Returns:
  84. A node id in hex format for this worker or driver.
  85. Raises:
  86. RuntimeError: If Ray has not been initialized.
  87. """
  88. if not ray.is_initialized():
  89. raise RuntimeError(
  90. "Node ID is not available because Ray has not been initialized."
  91. )
  92. node_id = self.worker.current_node_id
  93. return node_id.hex()
  94. def get_session_name(self) -> str:
  95. """Get the session name for the Ray cluster this process is connected to.
  96. The session name uniquely identifies a Ray cluster instance. This is the
  97. same value that appears as the ``SessionName`` label in Ray metrics,
  98. making it useful for filtering metrics when multiple clusters run the same
  99. application name.
  100. This can be called from within a driver, task, or actor.
  101. Example:
  102. .. testcode::
  103. import ray
  104. ray.init()
  105. session_name = ray.get_runtime_context().get_session_name()
  106. print(f"Session Name: {session_name}")
  107. @ray.remote
  108. def get_session_name():
  109. return ray.get_runtime_context().get_session_name()
  110. # Session name is the same across all processes in the cluster
  111. assert ray.get(get_session_name.remote()) == session_name
  112. # Use SessionName label to filter metrics by cluster, e.g.:
  113. # ray_serve_http_request_latency_ms_bucket{SessionName="<session_name>"}
  114. Returns:
  115. A session name string for the Ray cluster (e.g.,
  116. "session_2025-01-01_00-00-00_000000_1234").
  117. Raises:
  118. RuntimeError: If Ray has not been initialized.
  119. """
  120. if not ray.is_initialized():
  121. raise RuntimeError(
  122. "Session name is not available because Ray has not been initialized."
  123. )
  124. return self.worker.node.session_name
  125. def get_worker_id(self) -> str:
  126. """Get current worker ID for this worker or driver process.
  127. Returns:
  128. A worker id in hex format for this worker or driver process.
  129. Raises:
  130. RuntimeError: If Ray has not been initialized.
  131. """
  132. if not ray.is_initialized():
  133. raise RuntimeError(
  134. "Worker ID is not available because Ray has not been initialized."
  135. )
  136. return self.worker.worker_id.hex()
  137. @property
  138. @Deprecated(message="Use get_task_id() instead", warning=True)
  139. def task_id(self):
  140. """Get current task ID for this worker.
  141. Task ID is the id of a Ray task.
  142. This shouldn't be used in a driver process.
  143. Example:
  144. .. testcode::
  145. import ray
  146. @ray.remote
  147. class Actor:
  148. def ready(self):
  149. return True
  150. @ray.remote
  151. def f():
  152. return True
  153. # All the below code generates different task ids.
  154. # Task ids are available for actor creation.
  155. a = Actor.remote()
  156. # Task ids are available for actor tasks.
  157. a.ready.remote()
  158. # Task ids are available for normal tasks.
  159. f.remote()
  160. Returns:
  161. The current worker's task id. None if there's no task id.
  162. """
  163. # only worker mode has task_id
  164. assert (
  165. self.worker.mode == ray._private.worker.WORKER_MODE
  166. ), f"This method is only available when the process is a\
  167. worker. Current mode: {self.worker.mode}"
  168. task_id = self._get_current_task_id()
  169. return task_id if not task_id.is_nil() else None
  170. def get_task_id(self) -> Optional[str]:
  171. """Get current task ID for this worker.
  172. Task ID is the id of a Ray task. The ID will be in hex format.
  173. This shouldn't be used in a driver process.
  174. Example:
  175. .. testcode::
  176. import ray
  177. @ray.remote
  178. class Actor:
  179. def get_task_id(self):
  180. return ray.get_runtime_context().get_task_id()
  181. @ray.remote
  182. def get_task_id():
  183. return ray.get_runtime_context().get_task_id()
  184. # All the below code generates different task ids.
  185. a = Actor.remote()
  186. # Task ids are available for actor tasks.
  187. print(ray.get(a.get_task_id.remote()))
  188. # Task ids are available for normal tasks.
  189. print(ray.get(get_task_id.remote()))
  190. .. testoutput::
  191. :options: +MOCK
  192. 16310a0f0a45af5c2746a0e6efb235c0962896a201000000
  193. c2668a65bda616c1ffffffffffffffffffffffff01000000
  194. Returns:
  195. The current worker's task id in hex. None if there's no task id.
  196. """
  197. # only worker mode has task_id
  198. if self.worker.mode != ray._private.worker.WORKER_MODE:
  199. logger.warning(
  200. "This method is only available when the process is a "
  201. f"worker. Current mode: {self.worker.mode}"
  202. )
  203. return None
  204. task_id = self._get_current_task_id()
  205. return task_id.hex() if not task_id.is_nil() else None
  206. def _get_current_task_id(self) -> TaskID:
  207. return self.worker.current_task_id
  208. def get_task_name(self) -> Optional[str]:
  209. """Get current task name for this worker.
  210. Task name by default is the task's function call string. It can also be
  211. specified in options when triggering a task.
  212. Example:
  213. .. testcode::
  214. import ray
  215. @ray.remote
  216. class Actor:
  217. def get_task_name(self):
  218. return ray.get_runtime_context().get_task_name()
  219. @ray.remote
  220. class AsyncActor:
  221. async def get_task_name(self):
  222. return ray.get_runtime_context().get_task_name()
  223. @ray.remote
  224. def get_task_name():
  225. return ray.get_runtime_context().get_task_name()
  226. a = Actor.remote()
  227. b = AsyncActor.remote()
  228. # Task names are available for actor tasks.
  229. print(ray.get(a.get_task_name.remote()))
  230. # Task names are available for async actor tasks.
  231. print(ray.get(b.get_task_name.remote()))
  232. # Task names are available for normal tasks.
  233. # Get default task name
  234. print(ray.get(get_task_name.remote()))
  235. # Get specified task name
  236. print(ray.get(get_task_name.options(name="task_name").remote()))
  237. .. testoutput::
  238. :options: +MOCK
  239. Actor.get_task_name
  240. AsyncActor.get_task_name
  241. get_task_name
  242. task_name
  243. Returns:
  244. The current worker's task name
  245. """
  246. # only worker mode has task_name
  247. if self.worker.mode != ray._private.worker.WORKER_MODE:
  248. logger.warning(
  249. "This method is only available when the process is a "
  250. f"worker. Current mode: {self.worker.mode}"
  251. )
  252. return None
  253. return self.worker.current_task_name
  254. def get_task_function_name(self) -> Optional[str]:
  255. """Get current task function name string for this worker.
  256. Example:
  257. .. testcode::
  258. import ray
  259. @ray.remote
  260. class Actor:
  261. def get_task_function_name(self):
  262. return ray.get_runtime_context().get_task_function_name()
  263. @ray.remote
  264. class AsyncActor:
  265. async def get_task_function_name(self):
  266. return ray.get_runtime_context().get_task_function_name()
  267. @ray.remote
  268. def get_task_function_name():
  269. return ray.get_runtime_context().get_task_function_name()
  270. a = Actor.remote()
  271. b = AsyncActor.remote()
  272. # Task functions are available for actor tasks.
  273. print(ray.get(a.get_task_function_name.remote()))
  274. # Task functions are available for async actor tasks.
  275. print(ray.get(b.get_task_function_name.remote()))
  276. # Task functions are available for normal tasks.
  277. print(ray.get(get_task_function_name.remote()))
  278. .. testoutput::
  279. :options: +MOCK
  280. [python module name].Actor.get_task_function_name
  281. [python module name].AsyncActor.get_task_function_name
  282. [python module name].get_task_function_name
  283. Returns:
  284. The current worker's task function call string
  285. """
  286. # only worker mode has task_function_name
  287. if self.worker.mode != ray._private.worker.WORKER_MODE:
  288. logger.warning(
  289. "This method is only available when the process is a "
  290. f"worker. Current mode: {self.worker.mode}"
  291. )
  292. return None
  293. return self.worker.current_task_function_name
  294. @property
  295. @Deprecated(message="Use get_actor_id() instead", warning=True)
  296. def actor_id(self):
  297. """Get the current actor ID in this worker.
  298. ID of the actor of the current process.
  299. This shouldn't be used in a driver process.
  300. Returns:
  301. The current actor id in this worker. None if there's no actor id.
  302. """
  303. # only worker mode has actor_id
  304. assert (
  305. self.worker.mode == ray._private.worker.WORKER_MODE
  306. ), f"This method is only available when the process is a\
  307. worker. Current mode: {self.worker.mode}"
  308. actor_id = self.worker.actor_id
  309. return actor_id if not actor_id.is_nil() else None
  310. def get_actor_id(self) -> Optional[str]:
  311. """Get the current actor ID in this worker.
  312. ID of the actor of the current process.
  313. This shouldn't be used in a driver process.
  314. The ID will be in hex format.
  315. Returns:
  316. The current actor id in hex format in this worker. None if there's no
  317. actor id.
  318. """
  319. # only worker mode has actor_id
  320. if self.worker.mode != ray._private.worker.WORKER_MODE:
  321. logger.debug(
  322. "This method is only available when the process is a "
  323. f"worker. Current mode: {self.worker.mode}"
  324. )
  325. return None
  326. actor_id = self.worker.actor_id
  327. return actor_id.hex() if not actor_id.is_nil() else None
  328. def get_actor_name(self) -> Optional[str]:
  329. """Get the current actor name of this worker.
  330. This shouldn't be used in a driver process.
  331. The name is in string format.
  332. Returns:
  333. The current actor name of this worker.
  334. If a current worker is an actor, and
  335. if actor name doesn't exist, it returns an empty string.
  336. If a current worker is not an actor, it returns None.
  337. """
  338. # only worker mode has actor_id
  339. if self.worker.mode != ray._private.worker.WORKER_MODE:
  340. logger.warning(
  341. "This method is only available when the process is a "
  342. f"worker. Current mode: {self.worker.mode}"
  343. )
  344. return None
  345. actor_id = self.worker.actor_id
  346. return self.worker.actor_name if not actor_id.is_nil() else None
  347. @property
  348. def namespace(self):
  349. """Get the current namespace of this worker.
  350. Returns:
  351. The current namespace of this worker.
  352. """
  353. return self.worker.namespace
  354. @property
  355. def was_current_actor_reconstructed(self):
  356. """Check whether this actor has been restarted.
  357. Returns:
  358. Whether this actor has been ever restarted.
  359. """
  360. assert (
  361. not self.actor_id.is_nil()
  362. ), "This method should't be called inside Ray tasks."
  363. actor_info = actors(actor_id=self.actor_id.hex())
  364. return actor_info and actor_info["NumRestarts"] != 0
  365. @property
  366. @Deprecated(message="Use get_placement_group_id() instead", warning=True)
  367. def current_placement_group_id(self):
  368. """Get the current Placement group ID of this worker.
  369. Returns:
  370. The current placement group id of this worker.
  371. """
  372. return self.worker.placement_group_id
  373. def get_placement_group_id(self) -> Optional[str]:
  374. """Get the current Placement group ID of this worker.
  375. Returns:
  376. The current placement group id in hex format of this worker.
  377. """
  378. pg_id = self.worker.placement_group_id
  379. return pg_id.hex() if not pg_id.is_nil() else None
  380. @property
  381. def should_capture_child_tasks_in_placement_group(self):
  382. """Get if the current task should capture parent's placement group.
  383. This returns True if it is called inside a driver.
  384. Returns:
  385. Return True if the current task should implicitly
  386. capture the parent placement group.
  387. """
  388. return self.worker.should_capture_child_tasks_in_placement_group
  389. def get_assigned_resources(self):
  390. """Get the assigned resources to this worker.
  391. By default for tasks, this will return {"CPU": 1}.
  392. By default for actors, this will return {}. This is because
  393. actors do not have CPUs assigned to them by default.
  394. Returns:
  395. A dictionary mapping the name of a resource to a float, where
  396. the float represents the amount of that resource reserved
  397. for this worker.
  398. """
  399. assert (
  400. self.worker.mode == ray._private.worker.WORKER_MODE
  401. ), f"This method is only available when the process is a\
  402. worker. Current mode: {self.worker.mode}"
  403. self.worker.check_connected()
  404. resource_id_map = self.worker.core_worker.resource_ids()
  405. resource_map = {
  406. res: sum(amt for _, amt in mapping)
  407. for res, mapping in resource_id_map.items()
  408. }
  409. result = parse_pg_formatted_resources_to_original(resource_map)
  410. return result
  411. def get_runtime_env_string(self):
  412. """Get the runtime env string used for the current driver or worker.
  413. Returns:
  414. The runtime env string currently using by this worker.
  415. """
  416. return self.worker.runtime_env
  417. @property
  418. def runtime_env(self):
  419. """Get the runtime env used for the current driver or worker.
  420. Returns:
  421. The runtime env currently using by this worker. The type of
  422. return value is ray.runtime_env.RuntimeEnv.
  423. """
  424. return RuntimeEnv.deserialize(self.get_runtime_env_string())
  425. @property
  426. def current_actor(self):
  427. """Get the current actor handle of this actor itself.
  428. Returns:
  429. The handle of current actor.
  430. """
  431. worker = self.worker
  432. worker.check_connected()
  433. actor_id = worker.actor_id
  434. if actor_id.is_nil():
  435. raise RuntimeError("This method is only available in an actor.")
  436. return worker.core_worker.get_actor_handle(actor_id)
  437. @property
  438. def gcs_address(self):
  439. """Get the GCS address of the ray cluster.
  440. Returns:
  441. The GCS address of the cluster.
  442. """
  443. self.worker.check_connected()
  444. return self.worker.gcs_client.address
  445. @Deprecated(message="Use get_accelerator_ids() instead", warning=True)
  446. def get_resource_ids(self) -> Dict[str, List[str]]:
  447. return self.get_accelerator_ids()
  448. def get_accelerator_ids(self) -> Dict[str, List[str]]:
  449. """
  450. Get the current worker's visible accelerator ids.
  451. Returns:
  452. A dictionary keyed by the accelerator resource name. The values are a list
  453. of ids `{'GPU': ['0', '1'], 'neuron_cores': ['0', '1'],
  454. 'TPU': ['0', '1']}`.
  455. """
  456. worker = self.worker
  457. worker.check_connected()
  458. ids_dict: Dict[str, List[str]] = {}
  459. for (
  460. accelerator_resource_name
  461. ) in ray._private.accelerators.get_all_accelerator_resource_names():
  462. accelerator_ids = worker.get_accelerator_ids_for_accelerator_resource(
  463. accelerator_resource_name,
  464. f"^{accelerator_resource_name}_group_[0-9A-Za-z]+$",
  465. )
  466. ids_dict[accelerator_resource_name] = [str(id) for id in accelerator_ids]
  467. return ids_dict
  468. def get_node_labels(self) -> Dict[str, List[str]]:
  469. """
  470. Get the node labels of the current worker.
  471. Returns:
  472. A dictionary of label key-value pairs.
  473. """
  474. worker = self.worker
  475. worker.check_connected()
  476. return worker.current_node_labels
  477. def is_canceled(self) -> bool:
  478. """Check if the current task has been canceled.
  479. This can be used to periodically check if ray.cancel() has been
  480. called on the current task and perform graceful cleanup.
  481. Returns:
  482. True if the task has been canceled, False otherwise.
  483. Raises:
  484. RuntimeError: If called from a driver or async actor context.
  485. """
  486. if self.worker.mode != ray._private.worker.WORKER_MODE:
  487. raise RuntimeError(
  488. "This method is only available when the process is a worker. "
  489. f"Current mode: {self.worker.mode}"
  490. )
  491. if self.worker.core_worker.current_actor_is_asyncio():
  492. raise RuntimeError("This method is not supported in an async actor.")
  493. return self.worker.is_canceled
  494. _runtime_context = None
  495. _runtime_context_lock = threading.Lock()
  496. @PublicAPI
  497. @client_mode_hook
  498. def get_runtime_context() -> RuntimeContext:
  499. """Get the runtime context of the current driver/worker.
  500. The obtained runtime context can be used to get the metadata
  501. of the current driver, task, or actor.
  502. Example:
  503. .. testcode::
  504. import ray
  505. # Get the job id.
  506. ray.get_runtime_context().get_job_id()
  507. # Get the session name (used as SessionName label in Ray metrics).
  508. ray.get_runtime_context().get_session_name()
  509. # Get the actor id.
  510. ray.get_runtime_context().get_actor_id()
  511. # Get the task id.
  512. ray.get_runtime_context().get_task_id()
  513. """
  514. with _runtime_context_lock:
  515. global _runtime_context
  516. if _runtime_context is None:
  517. _runtime_context = RuntimeContext(ray._private.worker.global_worker)
  518. return _runtime_context