utils.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. from typing import List, Optional, Tuple
  2. import ray
  3. def get_self_actor() -> Optional["ray.actor.ActorHandle"]:
  4. """
  5. Get the current actor handle in this worker.
  6. If this is called in a driver process, it will return None.
  7. """
  8. try:
  9. return ray.get_runtime_context().current_actor
  10. except RuntimeError:
  11. return None
  12. def split_readers_by_locality(
  13. writer: "ray.actor.ActorHandle",
  14. reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]],
  15. ) -> Tuple[
  16. List[Tuple["ray.actor.ActorHandle", str]], List[Tuple["ray.actor.ActorHandle", str]]
  17. ]:
  18. """Split readers into remote and local readers based on writer.
  19. Args:
  20. writer: The actor handle of the writer
  21. reader_and_node_list: List of (reader, node) tuples
  22. Returns:
  23. Tuple containing:
  24. - List of (reader, node) tuples for remote readers
  25. - List of (reader, node) tuples for local readers
  26. """
  27. remote_readers = []
  28. local_readers = []
  29. for reader, node in reader_and_node_list:
  30. if reader != writer:
  31. remote_readers.append((reader, node))
  32. else:
  33. local_readers.append((reader, node))
  34. return remote_readers, local_readers
  35. def split_actors_by_node_locality(
  36. node: str,
  37. actor_and_node_list: List[Tuple["ray.actor.ActorHandle", str]],
  38. ) -> Tuple[
  39. List[Tuple["ray.actor.ActorHandle", str]], List[Tuple["ray.actor.ActorHandle", str]]
  40. ]:
  41. """Split actors into remote and local actors based on node. The local actors will be
  42. on the same node as the given node. The remote actors will be on a different node.
  43. Args:
  44. writer_node: The node of the writer
  45. actor_and_node_list: List of (actor, node) tuples
  46. Returns:
  47. Tuple containing:
  48. - List of (actor, node) tuples for actors on the same node
  49. - List of (actor, node) tuples for actors on a different node
  50. """
  51. actors_on_same_node = []
  52. actors_on_different_node = []
  53. for actor, actor_node in actor_and_node_list:
  54. if node == actor_node:
  55. actors_on_same_node.append((actor, actor_node))
  56. else:
  57. actors_on_different_node.append((actor, actor_node))
  58. return actors_on_same_node, actors_on_different_node
  59. def get_actor_node(actor: Optional["ray.actor.ActorHandle"]) -> str:
  60. """Get the node of the actor.
  61. Args:
  62. actor: The actor handle of the actor
  63. Returns:
  64. The node of the actor
  65. """
  66. if actor is None or actor == ray.get_runtime_context().current_actor:
  67. return ray.get_runtime_context().get_node_id()
  68. else:
  69. return ray.get(
  70. actor.__ray_call__.remote(
  71. lambda self: ray.get_runtime_context().get_node_id()
  72. )
  73. )