| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- import logging
- from types import ModuleType
- from typing import TYPE_CHECKING, Callable, List, Optional, Tuple
- import ray
- from ray.exceptions import RayChannelError
- from ray.experimental.channel.accelerator_context import AcceleratorContext
- from ray.experimental.channel.communicator import Communicator, TorchTensorAllocator
- from ray.experimental.util.types import ReduceOp
- if TYPE_CHECKING:
- import torch
- # Logger for this module. It should be configured at the entry point
- # into the program using Ray. Ray provides a default configuration at
- # entry/init points.
- logger = logging.getLogger(__name__)
- class _NcclGroup(Communicator):
- """
- Represents an actor's NCCL communicator. This is the default NCCL communicator
- to be used in Compiled Graph if a custom communicator is not provided.
- This class is not thread-safe.
- """
- def __init__(
- self,
- world_size: int,
- comm_id: tuple,
- rank: Optional[int],
- actor_handles: List["ray.actor.ActorHandle"],
- cuda_stream: Optional["torch.cuda.Stream"],
- use_communication_streams: bool = False,
- ):
- """
- Initialize a NCCL communicator that can be used to communicate p2p with
- other GPU actors.
- This method blocks until the same call has been made on all other
- actors in the group, with the same arguments for world_size and
- comm_id.
- NOTE: A concurrent NCCL group can coexist with this one but using the
- two groups concurrently on different CUDA streams may cause deadlock.
- See
- https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html
- #using-multiple-nccl-communicators-concurrently.
- If the user can guarantee that all involved actors execute the same ops
- in the same order, then the other NCCL group should use the given
- `cuda_stream`, and there will not be a concurrency issue. Otherwise,
- the other stream needs to synchronize with the given `cuda_stream`
- before and after it launches NCCL ops, e.g., at the beginning and end
- of a DAG task.
- Args:
- world_size: The number of participating actors/devices.
- comm_id: A unique communicator ID returned by
- cupy.cuda.nccl.get_unique_id().
- rank: The rank of this actor. If None, then the caller is not a
- participant of the NCCL group.
- actor_handles: A list of actor handles, in rank order.
- cuda_stream: A raw CUDA stream to dispatch NCCL ops to. If rank is
- specified, then this must be specified too.
- use_communication_streams: Whether to use dedicated send and recv
- streams for communication. If True, communication and computation
- can be overlapped to improve performance.
- """
- self._world_size = world_size
- self._rank: Optional[int] = rank
- self.nccl_util: Optional[ModuleType] = None
- self._actor_handles = actor_handles
- self._use_communication_streams = use_communication_streams
- if rank is not None:
- assert ray.get_gpu_ids(), "NCCL actor has no GPUs assigned"
- assert cuda_stream is not None, "NCCL actor must specify cuda_stream"
- expected_rank = self.get_rank(ray.get_runtime_context().current_actor)
- assert (
- rank == expected_rank
- ), f"NCCL actor's rank {rank} does not match expected rank {expected_rank}"
- from ray.util.collective.collective_group import nccl_util
- self.nccl_util = nccl_util
- self._comm = self.nccl_util.NcclCommunicator(world_size, comm_id, rank)
- else:
- # Driver does not have a rank.
- self._comm = None
- self._cuda_stream: Optional["torch.cuda.Stream"] = None
- self._send_stream: Optional["torch.cuda.Stream"] = None
- self._recv_stream: Optional["torch.cuda.Stream"] = None
- if cuda_stream is not None:
- assert rank is not None, "NCCL actor has no rank assigned"
- self._cuda_stream = cuda_stream
- if use_communication_streams:
- import torch
- # TODO(swang): Allow default device to be overridden.
- device = AcceleratorContext.get().get_accelerator_devices()[0]
- self._send_stream = torch.cuda.Stream(device=device)
- self._recv_stream = torch.cuda.Stream(device=device)
- else:
- self._send_stream = self._cuda_stream
- self._recv_stream = self._cuda_stream
- self._closed = False
- def initialize(self, rank: int) -> None:
- # No additional initialization is needed.
- pass
- def get_actor_handles(self) -> List["ray.actor.ActorHandle"]:
- return self._actor_handles
- def get_rank(self, actor: ray.actor.ActorHandle) -> int:
- """
- Return the given actor's rank in the NCCL communicator.
- Args:
- actor: The actor handle to look up.
- """
- actor_ids = [a._ray_actor_id for a in self._actor_handles]
- try:
- rank = actor_ids.index(actor._ray_actor_id)
- except ValueError:
- raise ValueError("Actor is not in the NCCL group.")
- return rank
- def get_self_rank(self) -> Optional[int]:
- """
- Return this actor's rank.
- """
- return self._rank
- def get_world_size(self) -> int:
- """
- Return the number of ranks in the NCCL communicator.
- """
- return self._world_size
- def send(self, buf: "torch.Tensor", peer_rank: int) -> None:
- """
- Send a torch.Tensor to a peer.
- This returns when the send kernel has been queued, but the kernel may
- not have completed. Therefore, the caller should ensure that there are
- no concurrent writes to the sent `buf` until the send has finished.
- That is, either all writes should be submitted on the current stream
- (self._cuda_stream) or, if on a different stream, that stream should
- synchronize with the current stream.
- Args:
- buf: The torch.Tensor to send. It should already be on this
- actor's default device.
- peer_rank: The rank of the actor to send to.
- """
- if self._closed:
- raise RayChannelError("NCCL group has been destroyed.")
- if self._use_communication_streams:
- # We observed that if all recv/compute/send operations run on GPU,
- # since there is no synchronization, the CPU execution loop may be
- # far ahead of the GPU operations and lead to runtime failures.
- # To avoid that, we synchronize on the send stream.
- # TODO(rui): find a better approach
- self._send_stream.synchronize()
- # TODO(swang): Handle send/recv async NCCL errors such as network
- # failures.
- self._comm.send(
- self.nccl_util.get_tensor_ptr(buf),
- buf.numel(),
- self.nccl_util.get_nccl_tensor_dtype(buf),
- peer_rank,
- self._send_stream.cuda_stream,
- )
- def recv(
- self,
- shape: Tuple[int],
- dtype: "torch.dtype",
- peer_rank: int,
- allocator=Optional[TorchTensorAllocator],
- ) -> "torch.Tensor":
- """
- Receive a torch.Tensor from a peer and synchronize the current stream.
- After this call returns, the receive buffer is safe to read from from
- any stream. An RayChannelError will be raised if an error occurred (e.g.,
- remote actor died), and the buffer is not safe to read.
- Args:
- buf: The torch.Tensor to receive into. This buffer is safe to read
- peer_rank: The rank of the actor to receive from.
- """
- if self._closed:
- raise RayChannelError("NCCL group has been destroyed.")
- assert allocator is not None, "NCCL group requires a tensor allocator"
- buf = allocator(shape, dtype)
- if self._use_communication_streams:
- # We observed that if all recv/compute/send operations run on GPU,
- # since there is no synchronization, the CPU execution loop may be
- # far ahead of the GPU operations and lead to runtime failures.
- # To avoid that, we synchronize on the recv stream.
- # TODO(rui): find a better approach
- self._recv_stream.synchronize()
- self._comm.recv(
- self.nccl_util.get_tensor_ptr(buf),
- buf.numel(),
- self.nccl_util.get_nccl_tensor_dtype(buf),
- peer_rank,
- self._recv_stream.cuda_stream,
- )
- else:
- self._comm.recv(
- self.nccl_util.get_tensor_ptr(buf),
- buf.numel(),
- self.nccl_util.get_nccl_tensor_dtype(buf),
- peer_rank,
- self._recv_stream.cuda_stream,
- )
- # Buffer values are undefined if NCCL ops are aborted. Therefore, we
- # need to synchronize here and check that the channel is still open to
- # ensure that the receive buffer is valid.
- # TODO(swang): Avoid CUDA synchronization.
- self._cuda_stream.synchronize()
- if self._closed:
- raise RayChannelError("NCCL group has been destroyed.")
- return buf
- def _exec_collective(
- self,
- send_buf: "torch.Tensor",
- recv_buf: "torch.Tensor",
- operation: "Callable[..., None]",
- *operation_args,
- ):
- if self._closed:
- raise RayChannelError("NCCL group has been destroyed.")
- assert send_buf.dtype == recv_buf.dtype, (
- "Ray Compiled Graph derived the dtype of recv_buf from send_buf, "
- "so send_buf and recv_buf must have the same dtype. "
- "If you see this error, please file an issue at Ray repository."
- )
- operation(*operation_args)
- # Buffer values are undefined if NCCL ops are aborted. Therefore, we
- # need to synchronize here and check that the channel is still open to
- # ensure that the receive buffer is valid.
- # TODO(swang): Avoid CUDA synchronization.
- # TODO(wxdeng): This synchronize will be optional after merging the unify PR.
- self._cuda_stream.synchronize()
- if self._closed:
- raise RayChannelError(
- "NCCL group has been destroyed during allreduce operation. "
- "There may be a dtype mismatch between input tensors from "
- "different ranks."
- )
- def allgather(
- self,
- send_buf: "torch.Tensor",
- recv_buf: "torch.Tensor",
- ):
- operation_args = [
- self.nccl_util.get_tensor_ptr(send_buf),
- self.nccl_util.get_tensor_ptr(recv_buf),
- send_buf.numel(),
- self.nccl_util.get_nccl_tensor_dtype(send_buf),
- self._cuda_stream.cuda_stream,
- ]
- self._exec_collective(
- send_buf,
- recv_buf,
- self._comm.allGather,
- *operation_args,
- )
- def allreduce(
- self,
- send_buf: "torch.Tensor",
- recv_buf: "torch.Tensor",
- op: ReduceOp = ReduceOp.SUM,
- ):
- operation_args = [
- self.nccl_util.get_tensor_ptr(send_buf),
- self.nccl_util.get_tensor_ptr(recv_buf),
- send_buf.numel(),
- self.nccl_util.get_nccl_tensor_dtype(send_buf),
- op.value,
- self._cuda_stream.cuda_stream,
- ]
- self._exec_collective(
- send_buf,
- recv_buf,
- self._comm.allReduce,
- *operation_args,
- )
- def reducescatter(
- self,
- send_buf: "torch.Tensor",
- recv_buf: "torch.Tensor",
- op: ReduceOp = ReduceOp.SUM,
- ):
- operation_args = [
- self.nccl_util.get_tensor_ptr(send_buf),
- self.nccl_util.get_tensor_ptr(recv_buf),
- recv_buf.numel(),
- self.nccl_util.get_nccl_tensor_dtype(send_buf),
- op.value,
- self._cuda_stream.cuda_stream,
- ]
- self._exec_collective(
- send_buf,
- recv_buf,
- self._comm.reduceScatter,
- *operation_args,
- )
- @property
- def recv_stream(self):
- import torch
- return torch.cuda.StreamContext(self._recv_stream)
- @property
- def send_stream(self):
- import torch
- return torch.cuda.StreamContext(self._send_stream)
- def destroy(self) -> None:
- """
- Destroy the NCCL group.
- """
- if self._closed:
- return
- self._closed = True
- if self._comm is not None:
- logger.info(
- "Destructing NCCL group on actor: "
- f"{ray.get_runtime_context().current_actor}"
- )
- # Abort *after* setting the _closed flag. This ensures that NCCL
- # ops that were blocked on a remote peer will see that the _closed
- # flag is True when they exit from the abort.
- self._comm.abort()
- self._comm.destroy()
- def get_transport_name(self) -> str:
- return "accelerator"
- @classmethod
- def generate_communicator_id(cls) -> str:
- from cupy.cuda import nccl
- return nccl.get_unique_id()
|