node.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from typing import Dict, Optional, Union
  2. import ray
  3. def _get_node_id_from_node_ip(node_ip: str) -> Optional[str]:
  4. """Returns the node ID for the first alive node with the input IP."""
  5. for node in ray.nodes():
  6. if node["Alive"] and node["NodeManagerAddress"] == node_ip:
  7. return node["NodeID"]
  8. return None
  9. def _force_on_node(
  10. node_id: str,
  11. remote_func_or_actor_class: Optional[
  12. Union[ray.remote_function.RemoteFunction, ray.actor.ActorClass]
  13. ] = None,
  14. ) -> Union[Union[ray.remote_function.RemoteFunction, ray.actor.ActorClass], Dict]:
  15. """Schedule a remote function or actor class on a given node.
  16. Args:
  17. node_id: The node to schedule on.
  18. remote_func_or_actor_class: A Ray remote function or actor class
  19. to schedule on the input node. If None, this function will directly
  20. return the options dict to pass to another remote function or actor class
  21. as remote options.
  22. Returns:
  23. The provided remote function or actor class, but with options modified to force
  24. placement on the input node. If remote_func_or_actor_class is None,
  25. the options dict to pass to another remote function or
  26. actor class as remote options kwargs.
  27. """
  28. scheduling_strategy = ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
  29. node_id=node_id, soft=False
  30. )
  31. options = {"scheduling_strategy": scheduling_strategy}
  32. if remote_func_or_actor_class is None:
  33. return options
  34. return remote_func_or_actor_class.options(**options)
  35. def _force_on_current_node(
  36. remote_func_or_actor_class: Optional[
  37. Union[ray.remote_function.RemoteFunction, ray.actor.ActorClass]
  38. ] = None
  39. ) -> Union[Union[ray.remote_function.RemoteFunction, ray.actor.ActorClass], Dict]:
  40. """Schedule a remote function or actor class on the current node.
  41. If using Ray Client, the current node is the client server node.
  42. Args:
  43. remote_func_or_actor_class: A Ray remote function or actor class
  44. to schedule on the current node. If None, this function will directly
  45. return the options dict to pass to another remote function or actor class
  46. as remote options.
  47. Returns:
  48. The provided remote function or actor class, but with options modified to force
  49. placement on the current node. If remote_func_or_actor_class is None,
  50. the options dict to pass to another remote function or
  51. actor class as remote options kwargs.
  52. """
  53. current_node_id = ray.get_runtime_context().get_node_id()
  54. return _force_on_node(current_node_id, remote_func_or_actor_class)