collective.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874
  1. """APIs exposed under the namespace ray.util.collective."""
  2. import logging
  3. import os
  4. import socket
  5. import threading
  6. import time
  7. from typing import List, Tuple
  8. import numpy as np
  9. import ray
  10. import ray.experimental.internal_kv as _internal_kv
  11. from . import types
  12. from ray._common.network_utils import find_free_port, is_ipv6
  13. from ray.util.collective.collective_group.torch_gloo_collective_group import (
  14. get_master_address_metadata_key as _get_master_addr_key,
  15. )
  16. logger = logging.getLogger(__name__)
  17. try:
  18. from ray.util.collective.collective_group.nccl_collective_group import NCCLGroup
  19. _NCCL_AVAILABLE = True
  20. _LOG_NCCL_WARNING = False
  21. except ImportError:
  22. _NCCL_AVAILABLE = False
  23. _LOG_NCCL_WARNING = True
  24. try:
  25. from ray.util.collective.collective_group.torch_gloo_collective_group import (
  26. TorchGLOOGroup,
  27. )
  28. _TORCH_DISTRIBUTED_AVAILABLE = True
  29. except ImportError:
  30. _TORCH_DISTRIBUTED_AVAILABLE = False
  31. def nccl_available():
  32. global _LOG_NCCL_WARNING
  33. if ray.get_gpu_ids() and _LOG_NCCL_WARNING:
  34. logger.warning(
  35. "NCCL seems unavailable. Please install Cupy "
  36. "following the guide at: "
  37. "https://docs.cupy.dev/en/stable/install.html."
  38. )
  39. _LOG_NCCL_WARNING = False
  40. return _NCCL_AVAILABLE
  41. def gloo_available():
  42. # Since we use torch_gloo as the backend for Gloo,
  43. # we can just return the availability of torch.distributed.
  44. return _TORCH_DISTRIBUTED_AVAILABLE
  45. def torch_distributed_available():
  46. return _TORCH_DISTRIBUTED_AVAILABLE
  47. def get_address_and_port() -> Tuple[str, int]:
  48. """Returns the IP address and a free port on this node."""
  49. addr = ray.util.get_node_ip_address()
  50. port = find_free_port(socket.AF_INET6 if is_ipv6(addr) else socket.AF_INET)
  51. return addr, port
  52. class GroupManager(object):
  53. """Use this class to manage the collective groups we created so far.
  54. Each process will have an instance of `GroupManager`. Each process
  55. could belong to multiple collective groups. The membership information
  56. and other metadata are stored in the global `_group_mgr` object.
  57. """
  58. def __init__(self):
  59. self._name_group_map = {}
  60. def create_collective_group(
  61. self, backend, world_size, rank, group_name, gloo_timeout
  62. ):
  63. """The entry to create new collective groups in the manager.
  64. Put the registration and the group information into the manager
  65. metadata as well.
  66. """
  67. backend = types.Backend(backend)
  68. if backend == types.Backend.GLOO:
  69. # Rendezvous: ensure a MASTER_ADDR:MASTER_PORT is published in internal_kv.
  70. metadata_key = _get_master_addr_key(group_name)
  71. if rank == 0:
  72. addr, port = get_address_and_port()
  73. _internal_kv._internal_kv_put(metadata_key, f"{addr}:{port}")
  74. else:
  75. # Wait until rank 0 publishes the metadata or timeout.
  76. deadline_s = time.time() + (
  77. gloo_timeout / 1000.0 if gloo_timeout else 30.0
  78. )
  79. while True:
  80. meta = _internal_kv._internal_kv_get(metadata_key)
  81. if meta is not None:
  82. break
  83. if time.time() > deadline_s:
  84. raise TimeoutError(
  85. f"Timed out waiting for GLOO rendezvous metadata for group '{group_name}'."
  86. )
  87. time.sleep(0.05)
  88. logger.debug(
  89. "Creating torch.distributed GLOO group: '{}'...".format(group_name)
  90. )
  91. g = TorchGLOOGroup(world_size, rank, group_name, gloo_timeout)
  92. elif backend == types.Backend.NCCL:
  93. _check_backend_availability(backend)
  94. logger.debug("Creating NCCL group: '{}'...".format(group_name))
  95. g = NCCLGroup(world_size, rank, group_name)
  96. else:
  97. raise RuntimeError(f"Unexpected backend: {backend}")
  98. self._name_group_map[group_name] = g
  99. return self._name_group_map[group_name]
  100. def is_group_exist(self, group_name):
  101. return group_name in self._name_group_map
  102. def get_group_by_name(self, group_name):
  103. """Get the collective group handle by its name."""
  104. if not self.is_group_exist(group_name):
  105. logger.warning("The group '{}' is not initialized.".format(group_name))
  106. return None
  107. return self._name_group_map[group_name]
  108. def destroy_collective_group(self, group_name):
  109. """Group destructor."""
  110. if not self.is_group_exist(group_name):
  111. logger.warning("The group '{}' does not exist.".format(group_name))
  112. return
  113. # release the collective group resource
  114. g = self._name_group_map[group_name]
  115. # clean up the dicts
  116. del self._name_group_map[group_name]
  117. # Release the communicator resources
  118. g.destroy_group()
  119. # Release the detached actors spawned by `create_collective_group()`
  120. name = "info_" + group_name
  121. try:
  122. store = ray.get_actor(name)
  123. ray.kill(store)
  124. except ValueError:
  125. pass
  126. _group_mgr = GroupManager()
  127. # This lock is used to make external calls to the _group_mgr thread-safe.
  128. _group_mgr_lock = threading.Lock()
  129. def is_group_initialized(group_name):
  130. """Check if the group is initialized in this process by the group name."""
  131. global _group_mgr
  132. global _group_mgr_lock
  133. with _group_mgr_lock:
  134. return _group_mgr.is_group_exist(group_name)
  135. def init_collective_group(
  136. world_size: int,
  137. rank: int,
  138. backend=types.Backend.NCCL,
  139. group_name: str = "default",
  140. gloo_timeout: int = 30000,
  141. ):
  142. """Initialize a collective group inside an actor process.
  143. Args:
  144. world_size: the total number of processes in the group.
  145. rank: the rank of the current process.
  146. backend: the CCL backend to use, NCCL or GLOO.
  147. group_name: the name of the collective group.
  148. Returns:
  149. None
  150. """
  151. _check_inside_actor()
  152. backend = types.Backend(backend)
  153. _check_backend_availability(backend)
  154. global _group_mgr
  155. global _group_mgr_lock
  156. # TODO(Hao): implement a group auto-counter.
  157. if not group_name:
  158. raise ValueError("group_name '{}' needs to be a string.".format(group_name))
  159. with _group_mgr_lock:
  160. if _group_mgr.is_group_exist(group_name):
  161. raise RuntimeError("Trying to initialize a group twice.")
  162. assert world_size > 0
  163. assert rank >= 0
  164. assert rank < world_size
  165. _group_mgr.create_collective_group(
  166. backend, world_size, rank, group_name, gloo_timeout
  167. )
  168. def create_collective_group(
  169. actors,
  170. world_size: int,
  171. ranks: List[int],
  172. backend=types.Backend.NCCL,
  173. group_name: str = "default",
  174. gloo_timeout: int = 30000,
  175. ):
  176. """Declare a list of actors as a collective group.
  177. Note: This function should be called in a driver process.
  178. Args:
  179. actors: a list of actors to be set in a collective group.
  180. world_size: the total number of processes in the group.
  181. ranks (List[int]): the rank of each actor.
  182. backend: the CCL backend to use, NCCL or GLOO.
  183. group_name: the name of the collective group.
  184. Returns:
  185. None
  186. """
  187. backend = types.Backend(backend)
  188. _check_backend_availability(backend)
  189. name = "info_" + group_name
  190. try:
  191. ray.get_actor(name)
  192. raise RuntimeError("Trying to initialize a group twice.")
  193. except ValueError:
  194. pass
  195. if len(ranks) != len(actors):
  196. raise RuntimeError(
  197. "Each actor should correspond to one rank. Got '{}' "
  198. "ranks but '{}' actors".format(len(ranks), len(actors))
  199. )
  200. if set(ranks) != set(range(len(ranks))):
  201. raise RuntimeError(
  202. "Ranks must be a permutation from 0 to '{}'. Got '{}'.".format(
  203. len(ranks), "".join([str(r) for r in ranks])
  204. )
  205. )
  206. if world_size <= 0:
  207. raise RuntimeError(
  208. "World size must be greater than zero. Got '{}'.".format(world_size)
  209. )
  210. if not all(ranks) >= 0:
  211. raise RuntimeError("Ranks must be non-negative.")
  212. if not all(ranks) < world_size:
  213. raise RuntimeError("Ranks cannot be greater than world_size.")
  214. # avoid a circular dependency
  215. from ray.util.collective.util import Info
  216. # store the information into a NamedActor that can be accessed later.
  217. name = "info_" + group_name
  218. actors_id = [a._ray_actor_id for a in actors]
  219. # TODO (Dacheng): how do we recycle this name actor?
  220. info = Info.options(name=name, lifetime="detached").remote()
  221. ray.get([info.set_info.remote(actors_id, world_size, ranks, backend, gloo_timeout)])
  222. # TODO (we need a declarative destroy() API here.)
  223. def destroy_collective_group(group_name: str = "default") -> None:
  224. """Destroy a collective group given its group name."""
  225. _check_inside_actor()
  226. global _group_mgr
  227. global _group_mgr_lock
  228. with _group_mgr_lock:
  229. _group_mgr.destroy_collective_group(group_name)
  230. def get_rank(group_name: str = "default") -> int:
  231. """Return the rank of this process in the given group.
  232. Args:
  233. group_name: the name of the group to query
  234. Returns:
  235. the rank of this process in the named group,
  236. -1 if the group does not exist or the process does
  237. not belong to the group.
  238. """
  239. _check_inside_actor()
  240. global _group_mgr
  241. global _group_mgr_lock
  242. with _group_mgr_lock:
  243. if not _group_mgr.is_group_exist(group_name):
  244. return -1
  245. g = _group_mgr.get_group_by_name(group_name)
  246. return g.rank
  247. def get_collective_group_size(group_name: str = "default") -> int:
  248. """Return the size of the collective group with the given name.
  249. Args:
  250. group_name: the name of the group to query
  251. Returns:
  252. The world size of the collective group, -1 if the group does
  253. not exist or the process does not belong to the group.
  254. """
  255. _check_inside_actor()
  256. global _group_mgr
  257. global _group_mgr_lock
  258. with _group_mgr_lock:
  259. if not _group_mgr.is_group_exist(group_name):
  260. return -1
  261. g = _group_mgr.get_group_by_name(group_name)
  262. return g.world_size
  263. def allreduce(tensor, group_name: str = "default", op=types.ReduceOp.SUM):
  264. """Collective allreduce the tensor across the group.
  265. Args:
  266. tensor: the tensor to be all-reduced on this process.
  267. group_name: the collective group name to perform allreduce.
  268. op: The reduce operation.
  269. Returns:
  270. None
  271. """
  272. _check_single_tensor_input(tensor)
  273. g = get_group_handle(group_name)
  274. opts = types.AllReduceOptions
  275. opts.reduceOp = op
  276. g.allreduce([tensor], opts)
  277. def allreduce_multigpu(
  278. tensor_list: list, group_name: str = "default", op=types.ReduceOp.SUM
  279. ):
  280. """Collective allreduce a list of tensors across the group.
  281. Args:
  282. tensor_list (List[tensor]): list of tensors to be allreduced,
  283. each on a GPU.
  284. group_name: the collective group name to perform allreduce.
  285. Returns:
  286. None
  287. """
  288. if not types.cupy_available():
  289. raise RuntimeError("Multigpu calls requires NCCL and Cupy.")
  290. _check_tensor_list_input(tensor_list)
  291. g = get_group_handle(group_name)
  292. opts = types.AllReduceOptions
  293. opts.reduceOp = op
  294. g.allreduce(tensor_list, opts)
  295. def barrier(group_name: str = "default"):
  296. """Barrier all processes in the collective group.
  297. Args:
  298. group_name: the name of the group to barrier.
  299. Returns:
  300. None
  301. """
  302. g = get_group_handle(group_name)
  303. g.barrier()
  304. def reduce(
  305. tensor, dst_rank: int = 0, group_name: str = "default", op=types.ReduceOp.SUM
  306. ):
  307. """Reduce the tensor across the group to the destination rank.
  308. Args:
  309. tensor: the tensor to be reduced on this process.
  310. dst_rank: the rank of the destination process.
  311. group_name: the collective group name to perform reduce.
  312. op: The reduce operation.
  313. Returns:
  314. None
  315. """
  316. _check_single_tensor_input(tensor)
  317. g = get_group_handle(group_name)
  318. # check dst rank
  319. _check_rank_valid(g, dst_rank)
  320. opts = types.ReduceOptions()
  321. opts.reduceOp = op
  322. opts.root_rank = dst_rank
  323. opts.root_tensor = 0
  324. g.reduce([tensor], opts)
  325. def reduce_multigpu(
  326. tensor_list: list,
  327. dst_rank: int = 0,
  328. dst_tensor: int = 0,
  329. group_name: str = "default",
  330. op=types.ReduceOp.SUM,
  331. ):
  332. """Reduce the tensor across the group to the destination rank
  333. and destination tensor.
  334. Args:
  335. tensor_list: the list of tensors to be reduced on this process;
  336. each tensor located on a GPU.
  337. dst_rank: the rank of the destination process.
  338. dst_tensor: the index of GPU at the destination.
  339. group_name: the collective group name to perform reduce.
  340. op: The reduce operation.
  341. Returns:
  342. None
  343. """
  344. if not types.cupy_available():
  345. raise RuntimeError("Multigpu calls requires NCCL and Cupy.")
  346. _check_tensor_list_input(tensor_list)
  347. g = get_group_handle(group_name)
  348. # check dst rank
  349. _check_rank_valid(g, dst_rank)
  350. _check_root_tensor_valid(len(tensor_list), dst_tensor)
  351. opts = types.ReduceOptions()
  352. opts.reduceOp = op
  353. opts.root_rank = dst_rank
  354. opts.root_tensor = dst_tensor
  355. g.reduce(tensor_list, opts)
  356. def broadcast(tensor, src_rank: int = 0, group_name: str = "default"):
  357. """Broadcast the tensor from a source process to all others.
  358. Args:
  359. tensor: the tensor to be broadcasted (src) or received (destination).
  360. src_rank: the rank of the source process.
  361. group_name: the collective group name to perform broadcast.
  362. Returns:
  363. None
  364. """
  365. _check_single_tensor_input(tensor)
  366. g = get_group_handle(group_name)
  367. # check src rank
  368. _check_rank_valid(g, src_rank)
  369. opts = types.BroadcastOptions()
  370. opts.root_rank = src_rank
  371. opts.root_tensor = 0
  372. g.broadcast([tensor], opts)
  373. def broadcast_multigpu(
  374. tensor_list, src_rank: int = 0, src_tensor: int = 0, group_name: str = "default"
  375. ):
  376. """Broadcast the tensor from a source GPU to all other GPUs.
  377. Args:
  378. tensor_list: the tensors to broadcast (src) or receive (dst).
  379. src_rank: the rank of the source process.
  380. src_tensor: the index of the source GPU on the source process.
  381. group_name: the collective group name to perform broadcast.
  382. Returns:
  383. None
  384. """
  385. if not types.cupy_available():
  386. raise RuntimeError("Multigpu calls requires NCCL and Cupy.")
  387. _check_tensor_list_input(tensor_list)
  388. g = get_group_handle(group_name)
  389. # check src rank
  390. _check_rank_valid(g, src_rank)
  391. _check_root_tensor_valid(len(tensor_list), src_tensor)
  392. opts = types.BroadcastOptions()
  393. opts.root_rank = src_rank
  394. opts.root_tensor = src_tensor
  395. g.broadcast(tensor_list, opts)
  396. def allgather(tensor_list: list, tensor, group_name: str = "default"):
  397. """Allgather tensors from each process of the group into a list.
  398. Args:
  399. tensor_list: the results, stored as a list of tensors.
  400. tensor: the tensor (to be gathered) in the current process
  401. group_name: the name of the collective group.
  402. Returns:
  403. None
  404. """
  405. _check_single_tensor_input(tensor)
  406. _check_tensor_list_input(tensor_list)
  407. g = get_group_handle(group_name)
  408. if len(tensor_list) != g.world_size:
  409. # Typically CLL lib requires len(tensor_list) >= world_size;
  410. # Here we make it more strict: len(tensor_list) == world_size.
  411. raise RuntimeError(
  412. "The length of the tensor list operands to allgather "
  413. "must be equal to world_size."
  414. )
  415. opts = types.AllGatherOptions()
  416. g.allgather([tensor_list], [tensor], opts)
  417. def allgather_multigpu(
  418. output_tensor_lists: list, input_tensor_list: list, group_name: str = "default"
  419. ):
  420. """Allgather tensors from each gpus of the group into lists.
  421. Args:
  422. output_tensor_lists (List[List[tensor]]): gathered results, with shape
  423. must be num_gpus * world_size * shape(tensor).
  424. input_tensor_list: (List[tensor]): a list of tensors, with shape
  425. num_gpus * shape(tensor).
  426. group_name: the name of the collective group.
  427. Returns:
  428. None
  429. """
  430. if not types.cupy_available():
  431. raise RuntimeError("Multigpu calls requires NCCL and Cupy.")
  432. _check_tensor_lists_input(output_tensor_lists)
  433. _check_tensor_list_input(input_tensor_list)
  434. g = get_group_handle(group_name)
  435. opts = types.AllGatherOptions()
  436. g.allgather(output_tensor_lists, input_tensor_list, opts)
  437. def reducescatter(
  438. tensor, tensor_list: list, group_name: str = "default", op=types.ReduceOp.SUM
  439. ):
  440. """Reducescatter a list of tensors across the group.
  441. Reduce the list of the tensors across each process in the group, then
  442. scatter the reduced list of tensors -- one tensor for each process.
  443. Args:
  444. tensor: the resulted tensor on this process.
  445. tensor_list: The list of tensors to be reduced and scattered.
  446. group_name: the name of the collective group.
  447. op: The reduce operation.
  448. Returns:
  449. None
  450. """
  451. _check_single_tensor_input(tensor)
  452. _check_tensor_list_input(tensor_list)
  453. g = get_group_handle(group_name)
  454. opts = types.ReduceScatterOptions()
  455. opts.reduceOp = op
  456. if len(tensor_list) != g.world_size:
  457. raise RuntimeError(
  458. "The length of the tensor list operands to reducescatter "
  459. "must not be equal to world_size."
  460. )
  461. g.reducescatter([tensor], [tensor_list], opts)
  462. def reducescatter_multigpu(
  463. output_tensor_list,
  464. input_tensor_lists,
  465. group_name: str = "default",
  466. op=types.ReduceOp.SUM,
  467. ):
  468. """Reducescatter a list of tensors across all GPUs.
  469. Args:
  470. output_tensor_list: the resulted list of tensors, with
  471. shape: num_gpus * shape(tensor).
  472. input_tensor_lists: the original tensors, with shape:
  473. num_gpus * world_size * shape(tensor).
  474. group_name: the name of the collective group.
  475. op: The reduce operation.
  476. Returns:
  477. None.
  478. """
  479. if not types.cupy_available():
  480. raise RuntimeError("Multigpu calls requires NCCL and Cupy.")
  481. _check_tensor_lists_input(input_tensor_lists)
  482. _check_tensor_list_input(output_tensor_list)
  483. g = get_group_handle(group_name)
  484. opts = types.ReduceScatterOptions()
  485. opts.reduceOp = op
  486. g.reducescatter(output_tensor_list, input_tensor_lists, opts)
  487. def send(tensor, dst_rank: int, group_name: str = "default"):
  488. """Send a tensor to a remote process synchronously.
  489. Args:
  490. tensor: the tensor to send.
  491. dst_rank: the rank of the destination process.
  492. group_name: the name of the collective group.
  493. Returns:
  494. None
  495. """
  496. _check_single_tensor_input(tensor)
  497. g = get_group_handle(group_name)
  498. _check_rank_valid(g, dst_rank)
  499. if dst_rank == g.rank:
  500. raise RuntimeError("The destination rank '{}' is self.".format(dst_rank))
  501. opts = types.SendOptions()
  502. opts.dst_rank = dst_rank
  503. g.send([tensor], opts)
  504. def send_multigpu(
  505. tensor,
  506. dst_rank: int,
  507. dst_gpu_index: int,
  508. group_name: str = "default",
  509. n_elements: int = 0,
  510. ):
  511. """Send a tensor to a remote GPU synchronously.
  512. The function assumes each process owns >1 GPUs, and the sender
  513. process and receiver process has equal number of GPUs.
  514. Args:
  515. tensor: the tensor to send, located on a GPU.
  516. dst_rank: the rank of the destination process.
  517. dst_gpu_index: the destination gpu index.
  518. group_name: the name of the collective group.
  519. n_elements: if specified, send the next n elements
  520. from the starting address of tensor.
  521. Returns:
  522. None
  523. """
  524. if not types.cupy_available():
  525. raise RuntimeError("send_multigpu call requires NCCL.")
  526. _check_single_tensor_input(tensor)
  527. g = get_group_handle(group_name)
  528. _check_rank_valid(g, dst_rank)
  529. if dst_rank == g.rank:
  530. raise RuntimeError(
  531. "The dst_rank '{}' is self. Considering "
  532. "doing GPU to GPU memcpy instead?".format(dst_rank)
  533. )
  534. if n_elements < 0:
  535. raise RuntimeError("The n_elements '{}' should >= 0.".format(n_elements))
  536. opts = types.SendOptions()
  537. opts.dst_rank = dst_rank
  538. opts.dst_gpu_index = dst_gpu_index
  539. opts.n_elements = n_elements
  540. g.send([tensor], opts)
  541. def recv(tensor, src_rank: int, group_name: str = "default"):
  542. """Receive a tensor from a remote process synchronously.
  543. Args:
  544. tensor: the received tensor.
  545. src_rank: the rank of the source process.
  546. group_name: the name of the collective group.
  547. Returns:
  548. None
  549. """
  550. _check_single_tensor_input(tensor)
  551. g = get_group_handle(group_name)
  552. _check_rank_valid(g, src_rank)
  553. if src_rank == g.rank:
  554. raise RuntimeError("The destination rank '{}' is self.".format(src_rank))
  555. opts = types.RecvOptions()
  556. opts.src_rank = src_rank
  557. g.recv([tensor], opts)
  558. def recv_multigpu(
  559. tensor,
  560. src_rank: int,
  561. src_gpu_index: int,
  562. group_name: str = "default",
  563. n_elements: int = 0,
  564. ):
  565. """Receive a tensor from a remote GPU synchronously.
  566. The function asssume each process owns >1 GPUs, and the sender
  567. process and receiver process has equal nubmer of GPUs.
  568. Args:
  569. tensor: The received tensor, located on a GPU.
  570. src_rank: The rank of the source process.
  571. src_gpu_index: The index of the source GPU on the src process.
  572. group_name: The name of the collective group.
  573. Returns:
  574. None
  575. """
  576. if not types.cupy_available():
  577. raise RuntimeError("recv_multigpu call requires NCCL.")
  578. _check_single_tensor_input(tensor)
  579. g = get_group_handle(group_name)
  580. _check_rank_valid(g, src_rank)
  581. if src_rank == g.rank:
  582. raise RuntimeError(
  583. "The dst_rank '{}' is self. Considering "
  584. "doing GPU to GPU memcpy instead?".format(src_rank)
  585. )
  586. if n_elements < 0:
  587. raise RuntimeError("The n_elements '{}' should be >= 0.".format(n_elements))
  588. opts = types.RecvOptions()
  589. opts.src_rank = src_rank
  590. opts.src_gpu_index = src_gpu_index
  591. opts.n_elements = n_elements
  592. g.recv([tensor], opts)
  593. def synchronize(gpu_id: int):
  594. """Synchronize the current process to a give device.
  595. Args:
  596. gpu_id: the GPU device id to synchronize.
  597. Returns:
  598. None
  599. """
  600. if not types.cupy_available():
  601. raise RuntimeError("synchronize call requires CUDA and NCCL.")
  602. import cupy as cp
  603. cp.cuda.Device(gpu_id).synchronize()
  604. def get_group_handle(group_name: str = "default"):
  605. """Check if the group is initialized and return the group handle.
  606. Args:
  607. group_name: the name of the collective group.
  608. Returns:
  609. The collective group handle.
  610. """
  611. _check_inside_actor()
  612. global _group_mgr
  613. global _group_mgr_lock
  614. with _group_mgr_lock:
  615. if not _group_mgr.is_group_exist(group_name):
  616. # try loading from remote info store
  617. try:
  618. # if the information is stored in an Info object,
  619. # get and create the group.
  620. name = "info_" + group_name
  621. mgr = ray.get_actor(name=name)
  622. ids, world_size, rank, backend, gloo_timeout = ray.get(
  623. mgr.get_info.remote()
  624. )
  625. worker = ray._private.worker.global_worker
  626. id_ = worker.core_worker.get_actor_id()
  627. r = rank[ids.index(id_)]
  628. _group_mgr.create_collective_group(
  629. backend, world_size, r, group_name, gloo_timeout
  630. )
  631. except ValueError as exc:
  632. # check if this group is initialized using options()
  633. if (
  634. "collective_group_name" in os.environ
  635. and os.environ["collective_group_name"] == group_name
  636. ):
  637. rank = int(os.environ["collective_rank"])
  638. world_size = int(os.environ["collective_world_size"])
  639. backend = os.environ["collective_backend"]
  640. gloo_timeout = os.getenv("collective_gloo_timeout", 30000)
  641. _group_mgr.create_collective_group(
  642. backend, world_size, rank, group_name, gloo_timeout
  643. )
  644. else:
  645. raise RuntimeError(
  646. "The collective group '{}' is not "
  647. "initialized in the process.".format(group_name)
  648. ) from exc
  649. g = _group_mgr.get_group_by_name(group_name)
  650. return g
  651. def _check_single_tensor_input(tensor):
  652. """Check if the tensor is with a supported type."""
  653. if isinstance(tensor, np.ndarray):
  654. return
  655. if types.cupy_available():
  656. if isinstance(tensor, types.cp.ndarray):
  657. return
  658. if types.torch_available():
  659. if isinstance(tensor, types.th.Tensor):
  660. return
  661. raise RuntimeError(
  662. "Unrecognized tensor type '{}'. Supported types are: "
  663. "np.ndarray, torch.Tensor, cupy.ndarray.".format(type(tensor))
  664. )
  665. def _check_backend_availability(backend: types.Backend):
  666. """Check whether the backend is available."""
  667. if backend == types.Backend.GLOO:
  668. # Now we have deprecated pygloo, and use torch_gloo in all cases.
  669. if not torch_distributed_available():
  670. raise RuntimeError("torch.distributed is not available.")
  671. elif backend == types.Backend.NCCL:
  672. if not nccl_available():
  673. raise RuntimeError("NCCL is not available.")
  674. def _check_inside_actor():
  675. """Check if currently it is inside a Ray actor/task."""
  676. worker = ray._private.worker.global_worker
  677. if worker.mode == ray.WORKER_MODE:
  678. return
  679. else:
  680. raise RuntimeError(
  681. "The collective APIs shall be only used inside a Ray actor or task."
  682. )
  683. def _check_rank_valid(g, rank: int):
  684. """Check the rank: 0 <= rank < world_size."""
  685. if rank < 0:
  686. raise ValueError("rank '{}' is negative.".format(rank))
  687. if rank >= g.world_size:
  688. raise ValueError(
  689. "rank '{}' must be less than world size '{}'".format(rank, g.world_size)
  690. )
  691. def _check_tensor_list_input(tensor_list):
  692. """Check if the input is a list of supported tensor types."""
  693. if not isinstance(tensor_list, list):
  694. raise RuntimeError(
  695. "The input must be a list of tensors. "
  696. "Got '{}'.".format(type(tensor_list))
  697. )
  698. if not tensor_list:
  699. raise RuntimeError("Got an empty list of tensors.")
  700. for t in tensor_list:
  701. _check_single_tensor_input(t)
  702. def _check_tensor_lists_input(tensor_lists):
  703. """Check if the input is a list of lists of supported tensor types."""
  704. if not isinstance(tensor_lists, list):
  705. raise RuntimeError(
  706. "The input must be a list of lists of tensors. "
  707. "Got '{}'.".format(type(tensor_lists))
  708. )
  709. if not tensor_lists:
  710. raise RuntimeError(f"Did not receive tensors. Got: {tensor_lists}")
  711. for t in tensor_lists:
  712. _check_tensor_list_input(t)
  713. def _check_root_tensor_valid(length, root_tensor):
  714. """Check the root_tensor device is 0 <= root_tensor < length"""
  715. if root_tensor < 0:
  716. raise ValueError("root_tensor '{}' is negative.".format(root_tensor))
  717. if root_tensor >= length:
  718. raise ValueError(
  719. "root_tensor '{}' is greater than the number of GPUs: "
  720. "'{}'".format(root_tensor, length)
  721. )