| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- from typing import Dict, Optional, Union
- import ray
- def _get_node_id_from_node_ip(node_ip: str) -> Optional[str]:
- """Returns the node ID for the first alive node with the input IP."""
- for node in ray.nodes():
- if node["Alive"] and node["NodeManagerAddress"] == node_ip:
- return node["NodeID"]
- return None
- def _force_on_node(
- node_id: str,
- remote_func_or_actor_class: Optional[
- Union[ray.remote_function.RemoteFunction, ray.actor.ActorClass]
- ] = None,
- ) -> Union[Union[ray.remote_function.RemoteFunction, ray.actor.ActorClass], Dict]:
- """Schedule a remote function or actor class on a given node.
- Args:
- node_id: The node to schedule on.
- remote_func_or_actor_class: A Ray remote function or actor class
- to schedule on the input node. If None, this function will directly
- return the options dict to pass to another remote function or actor class
- as remote options.
- Returns:
- The provided remote function or actor class, but with options modified to force
- placement on the input node. If remote_func_or_actor_class is None,
- the options dict to pass to another remote function or
- actor class as remote options kwargs.
- """
- scheduling_strategy = ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
- node_id=node_id, soft=False
- )
- options = {"scheduling_strategy": scheduling_strategy}
- if remote_func_or_actor_class is None:
- return options
- return remote_func_or_actor_class.options(**options)
- def _force_on_current_node(
- remote_func_or_actor_class: Optional[
- Union[ray.remote_function.RemoteFunction, ray.actor.ActorClass]
- ] = None
- ) -> Union[Union[ray.remote_function.RemoteFunction, ray.actor.ActorClass], Dict]:
- """Schedule a remote function or actor class on the current node.
- If using Ray Client, the current node is the client server node.
- Args:
- remote_func_or_actor_class: A Ray remote function or actor class
- to schedule on the current node. If None, this function will directly
- return the options dict to pass to another remote function or actor class
- as remote options.
- Returns:
- The provided remote function or actor class, but with options modified to force
- placement on the current node. If remote_func_or_actor_class is None,
- the options dict to pass to another remote function or
- actor class as remote options kwargs.
- """
- current_node_id = ray.get_runtime_context().get_node_id()
- return _force_on_node(current_node_id, remote_func_or_actor_class)
|