distributed.py 109 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472
  1. # mypy: allow-untyped-defs
  2. import copy
  3. import functools
  4. import inspect
  5. import itertools
  6. import logging
  7. import os
  8. import sys
  9. import warnings
  10. import weakref
  11. from collections import defaultdict, deque
  12. from collections.abc import Callable
  13. from contextlib import contextmanager
  14. from dataclasses import dataclass, fields, is_dataclass
  15. from enum import auto, Enum
  16. from typing import Any, Optional, TYPE_CHECKING
  17. import torch
  18. import torch.distributed as dist
  19. from torch._utils import _get_device_index
  20. from torch.autograd import Function, Variable
  21. from torch.distributed.algorithms.join import Join, Joinable, JoinHook
  22. from torch.nn.modules import Module
  23. from torch.nn.parallel.scatter_gather import gather, scatter_kwargs
  24. from torch.utils._pytree import tree_flatten, tree_unflatten
  25. RPC_AVAILABLE = False
  26. if dist.is_available():
  27. from torch.distributed.distributed_c10d import (
  28. _get_default_group,
  29. _rank_not_in_group,
  30. ReduceOp,
  31. )
  32. from torch.distributed.utils import (
  33. _alloc_storage,
  34. _cast_forward_inputs,
  35. _free_storage,
  36. _sync_module_states,
  37. _to_kwargs,
  38. _verify_param_shape_across_processes,
  39. )
  40. if dist.rpc.is_available():
  41. RPC_AVAILABLE = True
  42. from torch.distributed.rpc import RRef
  43. if TYPE_CHECKING:
  44. from torch.utils.hooks import RemovableHandle
  45. __all__ = ["DistributedDataParallel"]
  46. logger = logging.getLogger(__name__)
  47. @dataclass
  48. class _MixedPrecision:
  49. """
  50. This configures DDP-native mixed precision training.
  51. Attributes:
  52. param_dtype (torch.dtype): This specifies the dtype for model
  53. parameters, inputs (when ``cast_forward_inputs`` is set to
  54. ``True``), and therefore the dtype for computation.
  55. However, outside the forward and backward passes, parameters are in
  56. full precision. Model checkpointing always happens in full
  57. precision.
  58. reduce_dtype (torch.dtype): This specifies the dtype for gradient
  59. reduction, which is permitted to differ from ``param_dtype``.
  60. buffer_dtype (torch.dtype): This specifies the dtype for buffers.
  61. .. note:: This API is experimental and subject to change.
  62. .. note:: Only floating point tensors are cast to their specified dtypes.
  63. .. note:: ``state_dict`` checkpoints parameters and buffers in full
  64. precision.
  65. .. note:: Each low precision dtype must be specified explicitly. For
  66. example, ``_MixedPrecision(reduce_dtype=torch.float16)`` only specifies
  67. the reduction dtype to be low precision, and DDP will not cast
  68. parameters or buffers.
  69. .. note:: If a ``reduce_dtype`` is not specified, then gradient reduction
  70. happens in ``param_dtype`` if specified or the original parameter dtype
  71. otherwise. For example, ``_MixedPrecision(param_dtype=torch.float16)``
  72. would result in communication occurring in fp16.
  73. """
  74. param_dtype: torch.dtype | None = None
  75. reduce_dtype: torch.dtype | None = None
  76. buffer_dtype: torch.dtype | None = None
  77. # TODO (rohan-varma): keep_low_precision_grads: bool = False
  78. # TODO (rohan-varma): APIs to allow users to run batchnorm and layernorm
  79. # in full precision. For DDP, this can be implemented by not performing the
  80. # parameter cast for BN and LN units.
  81. def _cast_buffers(mixed_precision_config, root_module):
  82. """Casts buffers to the given ``buffer_dtype``."""
  83. for buf in root_module.buffers():
  84. if hasattr(buf, "_ddp_ignored") and buf._ddp_ignored:
  85. continue
  86. buf.data = buf.to(dtype=mixed_precision_config.buffer_dtype)
  87. def _setup_mixed_precision_params(mixed_precision_config, root_module):
  88. """Create and free storage for the mixed precision parameters."""
  89. for param in root_module.parameters():
  90. # Do not setup mixed precision for DDP ignored parameters.
  91. if hasattr(param, "_ddp_ignored") and param._ddp_ignored:
  92. continue
  93. if not hasattr(param, "_mp_param"):
  94. param._mp_param = torch.zeros_like(
  95. param,
  96. device=param.device,
  97. dtype=mixed_precision_config.param_dtype,
  98. requires_grad=param.requires_grad,
  99. )
  100. _free_storage(param._mp_param)
  101. # _fp_param will point to the full precision param so it can be switched
  102. # back to at the end of forward / backward.
  103. param._fp_param = param.data
  104. def _tree_flatten_with_rref(output):
  105. output_is_rref = RPC_AVAILABLE and isinstance(output, RRef)
  106. if output_is_rref:
  107. output_tensor_list, treespec = tree_flatten(output.local_value())
  108. else:
  109. output_tensor_list, treespec = tree_flatten(output)
  110. # Need to return flattened tensors, spec to re-pack them, as well
  111. # as if the return type was actually an RRef to reconstruct.
  112. return output_tensor_list, treespec, output_is_rref
  113. def _tree_unflatten_with_rref(output, treespec, output_is_rref):
  114. output = tree_unflatten(output, treespec)
  115. if output_is_rref:
  116. output = RRef(output)
  117. return output
  118. def _find_tensors(obj):
  119. r"""Recursively find all tensors contained in the specified object."""
  120. if RPC_AVAILABLE and isinstance(obj, RRef):
  121. # If the current node is the owner of the RRef, unwrap it and try to
  122. # find Tensors.
  123. # TODO: Expand to remote RRefs.
  124. if obj.is_owner():
  125. return _find_tensors(obj.local_value())
  126. if isinstance(obj, torch.Tensor):
  127. return [obj]
  128. if isinstance(obj, (list, tuple)):
  129. return itertools.chain.from_iterable(map(_find_tensors, obj))
  130. if isinstance(obj, dict):
  131. return itertools.chain.from_iterable(map(_find_tensors, obj.values()))
  132. if is_dataclass(obj):
  133. return itertools.chain.from_iterable(
  134. map(_find_tensors, (getattr(obj, f.name) for f in fields(obj)))
  135. )
  136. return []
  137. def _dump_DDP_relevant_env_vars():
  138. relevant_env_vars = [
  139. "RANK",
  140. "LOCAL_RANK",
  141. "WORLD_SIZE",
  142. "MASTER_PORT",
  143. "MASTER_ADDR",
  144. "CUDA_VISIBLE_DEVICES",
  145. "GLOO_SOCKET_IFNAME",
  146. "GLOO_DEVICE_TRANSPORT",
  147. "NCCL_SOCKET_IFNAME",
  148. "TORCH_NCCL_BLOCKING_WAIT",
  149. "NCCL_DEBUG",
  150. "NCCL_DEBUG_SUBSYS",
  151. "NCCL_IB_DISABLE",
  152. # More NCCL env vars:
  153. "NCCL_P2P_DISABLE",
  154. "NCCL_P2P_LEVEL",
  155. "NCCL_SHM_DISABLE",
  156. "NCCL_SOCKET_NTHREADS",
  157. "NCCL_NSOCKS_PERTHREAD",
  158. "NCCL_BUFFSIZE",
  159. "NCCL_NTHREADS",
  160. "NCCL_RINGS",
  161. "NCCL_MAX_NCHANNELS",
  162. "NCCL_MIN_NCHANNELS",
  163. "NCCL_CHECKS_DISABLE",
  164. "NCCL_CHECK_POINTERS",
  165. "NCCL_LAUNCH_MODE",
  166. "NCCL_IB_HCA",
  167. "NCCL_IB_TIMEOUT",
  168. "NCCL_IB_RETRY_CNT",
  169. "NCCL_IB_GID_INDEX",
  170. "NCCL_IB_SL",
  171. "NCCL_IB_TC",
  172. "NCCL_IB_AR_THRESHOLD",
  173. "NCCL_IB_CUDA_SUPPORT",
  174. "NCCL_NET_GDR_LEVEL",
  175. "NCCL_NET_GDR_READ",
  176. "NCCL_SINGLE_RING_THRESHOLD",
  177. "NCCL_LL_THRESHOLD",
  178. "NCCL_TREE_THRESHOLD",
  179. "NCCL_ALGO",
  180. "NCCL_PROTO",
  181. "NCCL_IGNORE_CPU_AFFINITY",
  182. "NCCL_DEBUG_FILE",
  183. "NCCL_COLLNET_ENABLE",
  184. "NCCL_TOPO_FILE",
  185. "NCCL_TOPO_DUMP_FILE",
  186. "TORCH_NCCL_ASYNC_ERROR_HANDLING",
  187. ]
  188. formatted_output = ""
  189. for var in relevant_env_vars:
  190. value = os.environ.get(var, "N/A")
  191. formatted_output += f"env:{var}={value}\n"
  192. print(formatted_output)
  193. class _BufferCommHookLocation(Enum):
  194. PRE_FORWARD = auto()
  195. POST_FORWARD = auto()
  196. @dataclass
  197. class _BufferCommHook:
  198. buffer_comm_hook: Callable
  199. buffer_comm_hook_state: Any
  200. buffer_comm_hook_location: _BufferCommHookLocation
  201. # Add a DDPSink to run various functions when backwards starts, such as
  202. # queueing call back of out-most backward/graph task,
  203. # this helps call back is fired after all gradients' calculation
  204. # is completed.
  205. class _DDPSink(Function):
  206. @staticmethod
  207. # pyrefly: ignore [bad-override]
  208. def forward(ctx, ddp_weakref, *inputs):
  209. # set_materialize_grads(False) will ensure that None gradients stay as
  210. # None and are not filled with zeros.
  211. ctx.set_materialize_grads(False)
  212. ctx.ddp_weakref = ddp_weakref
  213. ret = inputs
  214. if ddp_weakref()._ddp_sink_clone:
  215. ret = tuple(
  216. inp.clone() if isinstance(inp, torch.Tensor) else inp for inp in inputs
  217. )
  218. return ret
  219. @staticmethod
  220. def backward(ctx, *grad_outputs):
  221. # Enqueue delay allreduce for static graph training on the first
  222. # iteration.
  223. ddp_weakref = ctx.ddp_weakref()
  224. reducer = ddp_weakref.reducer
  225. static_graph = ddp_weakref.static_graph
  226. delay_ar_enqueued = (
  227. static_graph and ddp_weakref._static_graph_delay_allreduce_enqueued
  228. )
  229. if static_graph and not delay_ar_enqueued:
  230. Variable._execution_engine.queue_callback( # type: ignore[call-arg,misc]
  231. reducer._delay_all_reduce
  232. )
  233. ddp_weakref._static_graph_delay_allreduce_enqueued = True
  234. return (None, *grad_outputs)
  235. class _DDPJoinHook(JoinHook):
  236. def __init__(self, ddp, divide_by_initial_world_size):
  237. """Set config variables for internal usage."""
  238. if not isinstance(ddp, DistributedDataParallel):
  239. raise AssertionError(
  240. "DDP join hook requires passing in a DistributedDataParallel "
  241. f"instance as the state, got {type(ddp).__name__}"
  242. )
  243. if ddp.logger is None:
  244. raise AssertionError("ddp.logger must not be None")
  245. ddp.logger._set_uneven_input_join()
  246. self.ddp = ddp
  247. self.ddp._divide_by_initial_world_size = divide_by_initial_world_size
  248. super().__init__()
  249. def main_hook(self):
  250. """Shadow the DDP collective communication operations in the forward and backward passes."""
  251. ddp = self.ddp
  252. # Buckets are rebuilt only once during a training period
  253. ddp.reducer._rebuild_buckets()
  254. # Schedule a broadcast if we are syncing module buffers in the
  255. # forward pass
  256. # TODO: make DDP uneven inputs context manager support buffer
  257. # comm hook (https://github.com/pytorch/pytorch/issues/65436)
  258. ddp._check_and_sync_module_buffers()
  259. # Check if need to sync in the backward pass
  260. should_sync_backwards = ddp._check_global_requires_backward_grad_sync(
  261. is_joined_rank=True
  262. )
  263. # Forward parameter sync is disabled in the next iteration if we
  264. # are skipping gradient sync this iteration, so set
  265. # `require_forward_param_sync` accordingly
  266. ddp.require_forward_param_sync = should_sync_backwards
  267. if not should_sync_backwards:
  268. return
  269. # Schedule one allreduce per gradient bucket to match the backward
  270. # pass allreduce
  271. ddp._match_all_reduce_for_bwd_pass()
  272. # Check if we need to allreduce locally unused parameters
  273. if ddp.find_unused_parameters:
  274. ddp._match_unused_params_allreduce()
  275. # Rebuilt parameters are pushed only once during a training period
  276. ddp.reducer._push_all_rebuilt_params()
  277. def post_hook(self, is_last_joiner: bool):
  278. """Sync the final model to ensure that the model is the same across all processes."""
  279. self.ddp._sync_final_model(is_last_joiner)
  280. class DistributedDataParallel(Module, Joinable):
  281. r"""Implement distributed data parallelism based on ``torch.distributed`` at module level.
  282. This container provides data parallelism by synchronizing gradients
  283. across each model replica. The devices to synchronize across are
  284. specified by the input ``process_group``, which is the entire world
  285. by default. Note that ``DistributedDataParallel`` does not chunk or
  286. otherwise shard the input across participating GPUs; the user is
  287. responsible for defining how to do so, for example through the use
  288. of a :class:`DistributedSampler`.
  289. See also: :ref:`distributed-basics` and :ref:`cuda-nn-ddp-instead`.
  290. The same constraints on input as in :class:`torch.nn.DataParallel` apply.
  291. Creation of this class requires that ``torch.distributed`` to be already
  292. initialized, by calling :func:`torch.distributed.init_process_group`.
  293. ``DistributedDataParallel`` is proven to be significantly faster than
  294. :class:`torch.nn.DataParallel` for single-node multi-GPU data
  295. parallel training.
  296. To use ``DistributedDataParallel`` on a host with N GPUs, you should spawn
  297. up ``N`` processes, ensuring that each process exclusively works on a single
  298. GPU from 0 to N-1. This can be done by either setting
  299. ``CUDA_VISIBLE_DEVICES`` for every process or by calling the following API for GPUs,
  300. >>> # xdoctest: +SKIP("undefined variables")
  301. >>> torch.cuda.set_device(i)
  302. or calling the unified API for :ref:`accelerator<accelerators>`,
  303. >>> # xdoctest: +SKIP("undefined variables")
  304. >>> torch.accelerator.set_device_index(i)
  305. where i is from 0 to N-1. In each process, you should refer the following
  306. to construct this module:
  307. >>> # xdoctest: +SKIP("undefined variables")
  308. >>> if torch.accelerator.is_available():
  309. >>> device_type = torch.accelerator.current_accelerator().type
  310. >>> vendor_backend = torch.distributed.get_default_backend_for_device(device_type)
  311. >>>
  312. >>> torch.distributed.init_process_group(
  313. >>> backend=vendor_backend, world_size=N, init_method='...'
  314. >>> )
  315. >>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)
  316. Or you can use the latest API for initialization:
  317. >>> torch.distributed.init_process_group(device_id=i)
  318. In order to spawn up multiple processes per node, you can use either
  319. ``torch.distributed.launch`` or ``torch.multiprocessing.spawn``.
  320. .. note::
  321. Please refer to `PyTorch Distributed Overview <https://pytorch.org/tutorials/beginner/dist_overview.html>`__
  322. for a brief introduction to all features related to distributed training.
  323. .. note::
  324. ``DistributedDataParallel`` can be used in conjunction with
  325. :class:`torch.distributed.optim.ZeroRedundancyOptimizer` to reduce
  326. per-rank optimizer states memory footprint. Please refer to
  327. `ZeroRedundancyOptimizer recipe <https://pytorch.org/tutorials/recipes/zero_redundancy_optimizer.html>`__
  328. for more details.
  329. .. note:: ``nccl`` backend is currently the fastest and highly recommended
  330. backend when using GPUs. This applies to both single-node and
  331. multi-node distributed training.
  332. .. note:: This module also supports mixed-precision distributed training.
  333. This means that your model can have different types of parameters such
  334. as mixed types of ``fp16`` and ``fp32``, the gradient reduction on these
  335. mixed types of parameters will just work fine.
  336. .. note:: If you use ``torch.save`` on one process to checkpoint the module,
  337. and ``torch.load`` on some other processes to recover it, make sure that
  338. ``map_location`` is configured properly for every process. Without
  339. ``map_location``, ``torch.load`` would recover the module to devices
  340. where the module was saved from.
  341. .. note:: When a model is trained on ``M`` nodes with ``batch=N``, the
  342. gradient will be ``M`` times smaller when compared to the same model
  343. trained on a single node with ``batch=M*N`` if the loss is summed (NOT
  344. averaged as usual) across instances in a batch (because the gradients
  345. between different nodes are averaged). You should take this into
  346. consideration when you want to obtain a mathematically equivalent
  347. training process compared to the local training counterpart. But in most
  348. cases, you can just treat a DistributedDataParallel wrapped model, a
  349. DataParallel wrapped model and an ordinary model on a single GPU as the
  350. same (E.g. using the same learning rate for equivalent batch size).
  351. .. note::
  352. Parameters are never broadcast between processes. The module performs
  353. an all-reduce step on gradients and assumes that they will be modified
  354. by the optimizer in all processes in the same way. Buffers
  355. (e.g. BatchNorm stats) are broadcast from the module in process of rank
  356. 0, to all other replicas in the system in every iteration.
  357. .. note::
  358. If you are using DistributedDataParallel in conjunction with the
  359. :ref:`distributed-rpc-framework`, you should always use
  360. :meth:`torch.distributed.autograd.backward` to compute gradients and
  361. :class:`torch.distributed.optim.DistributedOptimizer` for optimizing
  362. parameters.
  363. Example::
  364. >>> # xdoctest: +SKIP("undefined variables")
  365. >>> import torch.distributed.autograd as dist_autograd
  366. >>> from torch.nn.parallel import DistributedDataParallel as DDP
  367. >>> import torch
  368. >>> from torch import optim
  369. >>> from torch.distributed.optim import DistributedOptimizer
  370. >>> import torch.distributed.rpc as rpc
  371. >>> from torch.distributed.rpc import RRef
  372. >>>
  373. >>> t1 = torch.rand((3, 3), requires_grad=True)
  374. >>> t2 = torch.rand((3, 3), requires_grad=True)
  375. >>> rref = rpc.remote("worker1", torch.add, args=(t1, t2))
  376. >>> ddp_model = DDP(my_model)
  377. >>>
  378. >>> # Setup optimizer
  379. >>> optimizer_params = [rref]
  380. >>> for param in ddp_model.parameters():
  381. >>> optimizer_params.append(RRef(param))
  382. >>>
  383. >>> dist_optim = DistributedOptimizer(
  384. >>> optim.SGD,
  385. >>> optimizer_params,
  386. >>> lr=0.05,
  387. >>> )
  388. >>>
  389. >>> with dist_autograd.context() as context_id:
  390. >>> pred = ddp_model(rref.to_here())
  391. >>> loss = loss_func(pred, target)
  392. >>> dist_autograd.backward(context_id, [loss])
  393. >>> dist_optim.step(context_id)
  394. .. note::
  395. DistributedDataParallel currently offers limited support for gradient
  396. checkpointing with :meth:`torch.utils.checkpoint`.
  397. If the checkpoint is done with use_reentrant=False (recommended), DDP
  398. will work as expected without any limitations.
  399. If, however, the checkpoint is done with use_reentrant=True (the default),
  400. DDP will work as expected when there are no unused parameters in the model
  401. and each layer is checkpointed at most once (make sure you are not passing
  402. `find_unused_parameters=True` to DDP). We currently do not support the
  403. case where a layer is checkpointed multiple times, or when there unused
  404. parameters in the checkpointed model.
  405. .. note::
  406. To let a non-DDP model load a state dict from a DDP model,
  407. :meth:`~torch.nn.modules.utils.consume_prefix_in_state_dict_if_present`
  408. needs to be applied to strip the prefix "module." in the DDP state dict before loading.
  409. .. warning::
  410. Constructor, forward method, and differentiation of the output (or a
  411. function of the output of this module) are distributed synchronization
  412. points. Take that into account in case different processes might be
  413. executing different code.
  414. .. warning::
  415. This module assumes all parameters are registered in the model by the
  416. time it is created. No parameters should be added nor removed later.
  417. Same applies to buffers.
  418. .. warning::
  419. This module assumes all parameters are registered in the model of each
  420. distributed processes are in the same order. The module itself will
  421. conduct gradient ``allreduce`` following the reverse order of the
  422. registered parameters of the model. In other words, it is users'
  423. responsibility to ensure that each distributed process has the exact
  424. same model and thus the exact same parameter registration order.
  425. .. warning::
  426. This module allows parameters with non-rowmajor-contiguous strides.
  427. For example, your model may contain some parameters whose
  428. :class:`torch.memory_format` is ``torch.contiguous_format``
  429. and others whose format is ``torch.channels_last``. However,
  430. corresponding parameters in different processes must have the
  431. same strides.
  432. .. warning::
  433. This module doesn't work with :func:`torch.autograd.grad` (i.e. it will
  434. only work if gradients are to be accumulated in ``.grad`` attributes of
  435. parameters).
  436. .. warning::
  437. If you plan on using this module with a ``nccl`` backend or a ``gloo``
  438. backend (that uses Infiniband), together with a DataLoader that uses
  439. multiple workers, please change the multiprocessing start method to
  440. ``forkserver`` (Python 3 only) or ``spawn``. Unfortunately
  441. Gloo (that uses Infiniband) and NCCL2 are not fork safe, and you will
  442. likely experience deadlocks if you don't change this setting.
  443. .. warning::
  444. You should never try to change your model's parameters after wrapping
  445. up your model with ``DistributedDataParallel``. Because, when
  446. wrapping up your model with ``DistributedDataParallel``, the constructor
  447. of ``DistributedDataParallel`` will register the additional gradient
  448. reduction functions on all the parameters of the model itself at the
  449. time of construction. If you change the model's parameters afterwards,
  450. gradient reduction functions no longer match the correct set of
  451. parameters.
  452. .. warning::
  453. Using ``DistributedDataParallel`` in conjunction with the
  454. :ref:`distributed-rpc-framework` is experimental and subject to change.
  455. Args:
  456. module (Module): module to be parallelized
  457. device_ids (list of int or torch.device): CUDA devices.
  458. 1) For single-device modules, ``device_ids`` can
  459. contain exactly one device id, which represents the only
  460. CUDA device where the input module corresponding to this process resides.
  461. Alternatively, ``device_ids`` can also be ``None``.
  462. 2) For multi-device modules and CPU modules,
  463. ``device_ids`` must be ``None``.
  464. When ``device_ids`` is ``None`` for both cases,
  465. both the input data for the forward pass and the actual module
  466. must be placed on the correct device.
  467. (default: ``None``)
  468. output_device (int or torch.device): Device location of output for
  469. single-device CUDA modules. For multi-device modules and
  470. CPU modules, it must be ``None``, and the module itself
  471. dictates the output location. (default: ``device_ids[0]``
  472. for single-device modules)
  473. broadcast_buffers (bool): Flag that enables syncing (broadcasting)
  474. buffers of the module at beginning of the ``forward``
  475. function. (default: ``True``)
  476. init_sync (bool): Whether to sync during initialization to verify param
  477. shapes and broadcast parameters and buffers.
  478. WARNING: if this is set to False the user is required
  479. to ensure themselves that the weights are the same on
  480. all ranks.
  481. (default: ``True``)
  482. process_group: The process group to be used for distributed data
  483. all-reduction. If ``None``, the default process group, which
  484. is created by :func:`torch.distributed.init_process_group`,
  485. will be used. (default: ``None``)
  486. bucket_cap_mb: ``DistributedDataParallel`` will bucket parameters into
  487. multiple buckets so that gradient reduction of each
  488. bucket can potentially overlap with backward computation.
  489. :attr:`bucket_cap_mb` controls the bucket size in
  490. MebiBytes (MiB). If ``None``, a default size of 25 MiB
  491. will be used. (default: ``None``)
  492. find_unused_parameters (bool): Traverse the autograd graph from all
  493. tensors contained in the return value of the
  494. wrapped module's ``forward`` function. Parameters
  495. that don't receive gradients as part of this
  496. graph are preemptively marked as being ready to
  497. be reduced. In addition, parameters that may have
  498. been used in the wrapped module's ``forward``
  499. function but were not part of loss computation and
  500. thus would also not receive gradients are
  501. preemptively marked as ready to be reduced.
  502. (default: ``False``)
  503. check_reduction: This argument is deprecated.
  504. gradient_as_bucket_view (bool): When set to ``True``, gradients will be views
  505. pointing to different offsets of ``allreduce`` communication
  506. buckets. This can reduce peak memory usage, where the
  507. saved memory size will be equal to the total gradients
  508. size. Moreover, it avoids the overhead of copying between
  509. gradients and ``allreduce`` communication buckets. When
  510. gradients are views, ``detach_()`` cannot be called on the
  511. gradients. If hitting such errors, please fix it by
  512. referring to the :meth:`~torch.optim.Optimizer.zero_grad`
  513. function in ``torch/optim/optimizer.py`` as a solution.
  514. Note that gradients will be views after first iteration, so
  515. the peak memory saving should be checked after first iteration.
  516. static_graph (bool): When set to ``True``, DDP knows the trained graph is
  517. static. Static graph means 1) The set of used and unused
  518. parameters will not change during the whole training loop; in
  519. this case, it does not matter whether users set
  520. ``find_unused_parameters = True`` or not. 2) How the graph is trained
  521. will not change during the whole training loop (meaning there is
  522. no control flow depending on iterations).
  523. When static_graph is set to be ``True``, DDP will support cases that
  524. can not be supported in the past:
  525. 1) Reentrant backwards.
  526. 2) Activation checkpointing multiple times.
  527. 3) Activation checkpointing when model has unused parameters.
  528. 4) There are model parameters that are outside of forward function.
  529. 5) Potentially improve performance when there are unused parameters,
  530. as DDP will not search graph in each iteration to detect unused
  531. parameters when static_graph is set to be ``True``.
  532. To check whether you can set static_graph to be ``True``, one way is to
  533. check ddp logging data at the end of your previous model training,
  534. if ``ddp_logging_data.get("can_set_static_graph") == True``, mostly you
  535. can set ``static_graph = True`` as well.
  536. Example::
  537. >>> # xdoctest: +SKIP("undefined variables")
  538. >>> model_DDP = torch.nn.parallel.DistributedDataParallel(model)
  539. >>> # Training loop
  540. >>> ...
  541. >>> ddp_logging_data = model_DDP._get_ddp_logging_data()
  542. >>> static_graph = ddp_logging_data.get("can_set_static_graph")
  543. delay_all_reduce_named_params (list of tuple of str and torch.nn.Parameter): a list
  544. of named parameters whose all reduce will be delayed when the gradient of
  545. the parameter specified in ``param_to_hook_all_reduce`` is ready. Other
  546. arguments of DDP do not apply to named params specified in this argument
  547. as these named params will be ignored by DDP reducer.
  548. param_to_hook_all_reduce (torch.nn.Parameter): a parameter to hook delayed all reduce
  549. of parameters specified in ``delay_all_reduce_named_params``.
  550. skip_all_reduce_unused_params: When set to True, DDP will skip reducing unused parameters.
  551. This requires that unused parameters remain the same across all ranks throughout
  552. the entire training process. If this condition is not met, it may cause
  553. desynchronization and result in training hang.
  554. Attributes:
  555. module (Module): the module to be parallelized.
  556. Example::
  557. >>> # xdoctest: +SKIP("undefined variables")
  558. >>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
  559. >>> net = torch.nn.parallel.DistributedDataParallel(model)
  560. """
  561. # used to track whether the given thread is inside ddp forward for torchdynamo purposes
  562. _active_ddp_module: Optional["DistributedDataParallel"] = None
  563. def __init__(
  564. self,
  565. module,
  566. device_ids=None,
  567. output_device=None,
  568. dim=0,
  569. broadcast_buffers=True,
  570. init_sync=True,
  571. process_group=None,
  572. bucket_cap_mb=None,
  573. find_unused_parameters=False,
  574. check_reduction=False,
  575. gradient_as_bucket_view=False,
  576. static_graph=False,
  577. delay_all_reduce_named_params=None,
  578. param_to_hook_all_reduce=None,
  579. mixed_precision: _MixedPrecision | None = None,
  580. device_mesh=None,
  581. skip_all_reduce_unused_params=False,
  582. bucket_cap_mb_list: Optional[list[int]] = None,
  583. ):
  584. super().__init__()
  585. Joinable.__init__(self)
  586. self._use_python_reducer = (
  587. torch._dynamo.utils.get_optimize_ddp_mode() == "python_reducer"
  588. )
  589. self.logger: dist.Logger | None = None
  590. if bool(delay_all_reduce_named_params is not None) != bool(
  591. param_to_hook_all_reduce is not None
  592. ):
  593. self._log_and_throw(
  594. ValueError,
  595. "delay_all_reduce_named_params and param_to_hook_all_reduce "
  596. "need to be set at the same time.",
  597. )
  598. if process_group and device_mesh is not None:
  599. raise RuntimeError(
  600. "Cannot specify both process_group and device_mesh arguments."
  601. )
  602. elif process_group is None and device_mesh is None:
  603. self.process_group = _get_default_group()
  604. elif device_mesh is None:
  605. # pyrefly: ignore [bad-assignment]
  606. self.process_group = process_group
  607. else:
  608. if device_mesh.ndim != 1:
  609. raise RuntimeError(
  610. f"Only 1D device mesh is supported, but got {device_mesh}."
  611. )
  612. self.device_mesh = device_mesh
  613. self.process_group = device_mesh.get_group(mesh_dim=0)
  614. root_mesh = device_mesh._get_root_mesh()
  615. # if a root mesh is not the same as device_mesh,
  616. # meaning the device_mesh is sliced out from the root mesh.
  617. if root_mesh != device_mesh:
  618. # TODO: This is a temporary work around to enable DDP + TP.
  619. # We should do the logic in DDP so that the 2D implementation is
  620. # sound and the state_dict works out of the box.
  621. # This has to be done before check UninitializedParameter.
  622. from torch.distributed.tensor.parallel.ddp import (
  623. _pre_dp_module_transform,
  624. )
  625. _pre_dp_module_transform(module)
  626. self._delay_all_reduce_params = []
  627. if hasattr(module, "_ddp_params_and_buffers_to_ignore"):
  628. self.parameters_to_ignore = set(module._ddp_params_and_buffers_to_ignore)
  629. else:
  630. self.parameters_to_ignore = set()
  631. if delay_all_reduce_named_params is not None:
  632. for name, param in delay_all_reduce_named_params:
  633. self.parameters_to_ignore.add(name)
  634. self._delay_all_reduce_params.append(param)
  635. self._module_parameters = [
  636. p
  637. for n, p in module.named_parameters()
  638. if n not in self.parameters_to_ignore
  639. ]
  640. if not any(p.requires_grad for p in self._module_parameters):
  641. if len(self._delay_all_reduce_params):
  642. logger.info("Delay the AllReduce of all parameters.")
  643. else:
  644. self._log_and_throw(
  645. RuntimeError,
  646. "DistributedDataParallel is not needed when a module "
  647. "doesn't have any parameter that requires a gradient.",
  648. )
  649. if device_ids is not None and len(device_ids) > 1:
  650. self._log_and_throw(
  651. ValueError,
  652. "device_ids can only be None or contain a single element.",
  653. )
  654. self.is_multi_device_module = (
  655. len({p.device for p in self._module_parameters}) > 1
  656. )
  657. distinct_device_types = {
  658. p.device.type for p in self._module_parameters if p.device is not None
  659. }
  660. if len(distinct_device_types) != 1:
  661. self._log_and_throw(
  662. ValueError,
  663. "DistributedDataParallel's input module must be on "
  664. f"the same type of devices, but input module parameters locate in {distinct_device_types}.",
  665. )
  666. self.device_type = next(iter(distinct_device_types))
  667. if (
  668. device_ids is None
  669. or len(device_ids) == 0 # For backward compatibility.
  670. or self.device_type == "cpu"
  671. or self.is_multi_device_module
  672. ):
  673. if device_ids or output_device:
  674. self._log_and_throw(
  675. ValueError,
  676. "DistributedDataParallel device_ids and output_device arguments "
  677. "only work with single-device/multiple-device GPU modules or CPU modules, "
  678. f"but got device_ids {device_ids}, output_device {output_device}, "
  679. f"and module parameters { ({p.device for p in self._module_parameters}) }.",
  680. )
  681. self.device_ids = None
  682. self.output_device = None
  683. else:
  684. # pyrefly: ignore [bad-assignment]
  685. self.device_ids = [_get_device_index(x, True) for x in device_ids]
  686. if output_device is None:
  687. output_device = device_ids[0]
  688. # pyrefly: ignore [bad-assignment]
  689. self.output_device = _get_device_index(output_device, True)
  690. self.static_graph = False
  691. self.dim = dim
  692. self.module = module
  693. self.device = next(iter(self._module_parameters)).device
  694. self.broadcast_buffers = broadcast_buffers
  695. self.find_unused_parameters = find_unused_parameters
  696. self.require_backward_grad_sync = True
  697. self.require_forward_param_sync = True
  698. self.gradient_as_bucket_view = gradient_as_bucket_view
  699. self.mixed_precision = mixed_precision
  700. if self.mixed_precision is not None:
  701. logger.warning("Received mixed precision config %s", self.mixed_precision)
  702. if check_reduction:
  703. # This argument is no longer used since the reducer
  704. # will ensure reduction completes even if some parameters
  705. # do not receive gradients.
  706. warnings.warn(
  707. "The `check_reduction` argument in `DistributedDataParallel` "
  708. "module is deprecated. Please avoid using it.",
  709. FutureWarning,
  710. stacklevel=2,
  711. )
  712. # Check that a module does not have Uninitialized parameters
  713. for param in self._module_parameters:
  714. if isinstance(param, torch.nn.parameter.UninitializedParameter):
  715. self._log_and_throw(
  716. RuntimeError,
  717. "Modules with uninitialized parameters can't be used with `DistributedDataParallel`. "
  718. "Run a dummy forward pass to correctly initialize the modules",
  719. )
  720. # used for intra-node param sync and inter-node sync as well
  721. self.broadcast_bucket_size = 250 * 1024 * 1024
  722. # reduction bucket size
  723. if bucket_cap_mb is None:
  724. # default case (bucket cap is 25 MiB)
  725. bucket_cap_mb = 25
  726. self.bucket_bytes_cap_default = True
  727. else:
  728. self.bucket_bytes_cap_default = False
  729. self.bucket_bytes_cap_list = []
  730. if bucket_cap_mb_list:
  731. if self._use_python_reducer:
  732. raise AssertionError(
  733. "when using bucket_cap_mb_list, python reducer is not supported"
  734. )
  735. self.bucket_bytes_cap_list = [
  736. int(bucket_cap_mb * 1024 * 1024) for bucket_cap_mb in bucket_cap_mb_list
  737. ]
  738. # This is not supposed to be used later when custom bucket_cap_mb_list is passed,
  739. # just a safe placeholder for backward compatibility
  740. self.bucket_bytes_cap_default = False
  741. bucket_cap_mb = max(bucket_cap_mb_list)
  742. if not bucket_cap_mb:
  743. raise AssertionError("bucket_cap_mb should be set by now")
  744. self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024)
  745. # Whether to perform input tensor CPU to GPU copies on a side-stream
  746. self.use_side_stream_for_tensor_copies = (
  747. os.environ.get("PYTORCH_DDP_USE_SIDE_STREAM", "1") == "1"
  748. )
  749. # Initialize gradient buffers and register all reduce hook
  750. self._delay_grad_buffer: torch.Tensor | None = None
  751. self._delay_grad_views: list[torch.Tensor] = []
  752. self._delay_all_reduce_all_params = False
  753. if len(self._delay_all_reduce_params) != 0:
  754. self._register_delay_all_reduce_hook(
  755. bucket_cap_mb=bucket_cap_mb,
  756. param_to_hook_all_reduce=param_to_hook_all_reduce,
  757. device_ids=device_ids,
  758. )
  759. if self._delay_all_reduce_all_params:
  760. return
  761. self.skip_all_reduce_unused_params = skip_all_reduce_unused_params
  762. # Build parameters for reducer.
  763. parameters, expect_sparse_gradient = self._build_params_for_reducer()
  764. # All collectives during initialization are gated by this flag.
  765. if init_sync:
  766. # Verify model equivalence.
  767. _verify_param_shape_across_processes(self.process_group, parameters)
  768. # Sync params and buffers. Ensures all DDP models start off at the same value.
  769. _sync_module_states(
  770. module=self.module,
  771. process_group=self.process_group,
  772. broadcast_bucket_size=self.broadcast_bucket_size,
  773. src=0,
  774. params_and_buffers_to_ignore=self.parameters_to_ignore,
  775. broadcast_buffers=self.broadcast_buffers,
  776. )
  777. # In debug mode, build a mapping of parameter index -> parameter.
  778. param_to_name_mapping = self._build_debug_param_to_name_mapping(parameters)
  779. # Builds reducer.
  780. self._ddp_init_helper(
  781. parameters,
  782. expect_sparse_gradient,
  783. param_to_name_mapping,
  784. static_graph,
  785. )
  786. self._comm_hooks: list[tuple[Callable, object]] = []
  787. if self.mixed_precision is not None:
  788. _setup_mixed_precision_params(self.mixed_precision, self.module)
  789. _cast_buffers(self.mixed_precision, self.module)
  790. # Stream used for async low precision copies.
  791. self._mp_stream = torch.Stream()
  792. self._submodule_to_event = defaultdict(deque) # type: ignore[var-annotated]
  793. # Add forward pre-hook to root module to kick off copies to lower
  794. # precision.
  795. self.module.register_forward_pre_hook(
  796. self._root_copy_hook, prepend=False, with_kwargs=True
  797. )
  798. # Add forward pre hook to all submodules to wait for copy events
  799. # before running computation.
  800. for module in self.module.modules():
  801. module.register_forward_pre_hook(
  802. self._module_wait_for_copy_hook,
  803. prepend=False,
  804. with_kwargs=True,
  805. )
  806. # Set up callbacks in backward to upcast and use full precision
  807. # params. TODO (rohan-varma): Make this compose with general
  808. # comm hooks and apply_optimizer_in_backward. Importing inline to
  809. # avoid circular import issue.
  810. from torch.distributed.algorithms.ddp_comm_hooks.mixed_precision_hooks import (
  811. _AllreduceUpcastHookState,
  812. _reducer_allreduce_and_upcast_hook,
  813. )
  814. upcast_hook_state = _AllreduceUpcastHookState(
  815. ddp_weakref=weakref.ref(self),
  816. upcast_stream=torch.Stream(),
  817. )
  818. self.register_comm_hook(
  819. upcast_hook_state,
  820. _reducer_allreduce_and_upcast_hook,
  821. )
  822. # Inform reducer of reduced precision param dtype for correctness
  823. # of type checks between gradient and bucket.
  824. self.reducer._set_mixed_precision_param_dtype( # type: ignore[attr-defined]
  825. self.mixed_precision.param_dtype
  826. )
  827. self._has_rebuilt_buckets = False
  828. if static_graph:
  829. self._set_static_graph()
  830. self._lazy_init_ran = False
  831. # Register the AccumulateGrad post hooks if optimize_ddp is
  832. # True. The hooks will be deregistered if compiled_autograd is not
  833. # enabled.
  834. self._accum_grad_hooks: list[RemovableHandle] = []
  835. if self._use_python_reducer:
  836. # pyrefly: ignore [bad-assignment]
  837. torch._inductor.config._fuse_ddp_communication = True
  838. # pyrefly: ignore [bad-assignment]
  839. torch._inductor.config._fuse_ddp_bucket_size = bucket_cap_mb
  840. # Directly adding this to the trace rule will disturb the users
  841. # who are using DDPOptimizer.
  842. torch._dynamo.trace_rules.LEGACY_MOD_INLINELIST.add(
  843. "torch.nn.parallel.distributed"
  844. )
  845. torch._dynamo.trace_rules.get_legacy_mod_inlinelist.cache_clear()
  846. # NOTE: we should init these lazily
  847. self._register_accum_grad_hook()
  848. # Whether or not DDPSink performs a clone.
  849. self._ddp_sink_clone = True
  850. def _register_accum_grad_hook(self):
  851. import torch.distributed._functional_collectives as fcol
  852. def compiled_accum_grad_hook(
  853. param,
  854. *,
  855. param_index: int,
  856. ):
  857. if not self.require_backward_grad_sync:
  858. return
  859. if param.grad is None:
  860. return
  861. if self._comm_hooks:
  862. for hook, state in self._comm_hooks:
  863. hook(state, (param.grad, param))
  864. else:
  865. gradient = param.grad / self.process_group.size()
  866. gradient = fcol.all_reduce(gradient, "sum", self.process_group)
  867. param.grad.copy_(gradient)
  868. for index, param in enumerate(self._module_parameters):
  869. if not param.requires_grad:
  870. continue
  871. self._accum_grad_hooks.append(
  872. param.register_post_accumulate_grad_hook(
  873. functools.partial(
  874. compiled_accum_grad_hook,
  875. param_index=index,
  876. )
  877. )
  878. )
  879. def _delayed_all_reduce_hook(self, grad):
  880. world_size = dist.get_world_size(self.process_group)
  881. self._delay_grad_buffer.div_(world_size) # type: ignore[union-attr]
  882. _ = dist.all_reduce(
  883. self._delay_grad_buffer, group=self.process_group, async_op=True
  884. )
  885. return grad
  886. def _register_delay_all_reduce_hook(
  887. self,
  888. bucket_cap_mb,
  889. param_to_hook_all_reduce,
  890. device_ids,
  891. ):
  892. # 1. Create gradient buffer
  893. device = torch.device("cpu") if device_ids is None else device_ids[0]
  894. self._delay_grad_buffer = torch.zeros(
  895. sum(p.numel() for p in self._delay_all_reduce_params),
  896. device=device,
  897. )
  898. # 2. Broadcast the parameters
  899. detached_params = [p.detach() for p in self._delay_all_reduce_params]
  900. dist._broadcast_coalesced(self.process_group, detached_params, bucket_cap_mb, 0)
  901. # 3. Hook all reduce to the specified parameter
  902. param_to_hook_all_reduce.register_hook(self._delayed_all_reduce_hook)
  903. # 4. Build tensor views for gradients
  904. offset = 0
  905. for param in self._delay_all_reduce_params:
  906. grad_view = self._delay_grad_buffer[offset : (offset + param.numel())].view(
  907. param.shape
  908. )
  909. self._delay_grad_views.append(grad_view)
  910. offset = offset + param.numel()
  911. # 5. Check whether the all reduce of all params requiring grad is delayed.
  912. for module_name, module in self.module.named_modules():
  913. for param_name, param in module.named_parameters(recurse=False):
  914. if param.requires_grad:
  915. full_name = f"{module_name}.{param_name}"
  916. if full_name not in self.parameters_to_ignore:
  917. # There is at least a param whose all reduce will not be delayed.
  918. # In this case, we should not set self._delay_all_reduce_all_params
  919. # to True.
  920. return
  921. self._delay_all_reduce_all_params = True
  922. def _setup_in_backward_optimizers(self):
  923. # Check if user has used apply_optim_in_backward to overlap optimizer
  924. # step + DDP backward. Current constraints:
  925. # 1. Only allreduce is supported at the moment, no custom communication.
  926. # 2. For DDP-managed parameters that have their optimizer run in
  927. # backward, their gradients are set to ``None``. If your use case
  928. # requires DDP parameters grad not to be set to ``None`` after their
  929. # in-backward optimizer runs, please ping
  930. # https://github.com/pytorch/pytorch/issues/90052.
  931. # NOTE: we use self._module_parameters instead of .parameters() since
  932. # the former excludes ignored (non-DDP managed) parameters.
  933. if any(hasattr(p, "_in_backward_optimizers") for p in self._module_parameters):
  934. torch._C._log_api_usage_once("ddp.optimizer_in_backward")
  935. # Remove hooks that apply_optim_in_backward had registered because
  936. # DDP customizes how optimizer is overlapped with backward due to
  937. # the allreduce.
  938. param_to_handle_map = (
  939. dist.optim.apply_optimizer_in_backward.param_to_optim_hook_handle_map
  940. )
  941. for p in self._module_parameters:
  942. for handle in param_to_handle_map.get(p, []):
  943. handle.remove()
  944. # Need a weakref to DDP instance to run all_reduce (from reducer)
  945. # and get managed DDP parameters.
  946. ddp_weakref = weakref.ref(self)
  947. # Note: importing in function, otherwise this will cause a circular
  948. # import.
  949. from torch.distributed.algorithms.ddp_comm_hooks.optimizer_overlap_hooks import (
  950. _apply_optim_in_backward_hook,
  951. )
  952. self.register_comm_hook(
  953. ddp_weakref,
  954. _apply_optim_in_backward_hook(
  955. gradient_is_bucket_view=self.gradient_as_bucket_view
  956. ),
  957. )
  958. self.reducer._set_optimizer_in_backward() # type: ignore[attr-defined]
  959. def _fire_reducer_autograd_hook(self, idx, *unused):
  960. """
  961. Fire the reducer's autograd hook to allreduce params in a Reducer bucket.
  962. Note that this is only used during mixed precision training as the
  963. Reducer's hooks installed during construction time would not be called
  964. as we're working in the low precision parameter setting.
  965. """
  966. self.reducer._autograd_hook(idx) # type: ignore[attr-defined]
  967. def _root_copy_hook(self, *args: Any, **kwargs: Any) -> None:
  968. """
  969. For DDP mixed precision, put low precision copies on separate stream and create events to wait for them.
  970. When training with DDP mixed precision, this root pre-forward hook kicks
  971. off low precision copies on a separate stream and creates respective
  972. events to wait for them.
  973. """
  974. # Clear out previous iteration submodule to event. This is because we
  975. # may have populated some events for modules that didn't end up being
  976. # used.
  977. self._submodule_to_event = defaultdict(deque) # type: ignore[var-annotated]
  978. with self._mp_stream:
  979. for submodule in self.module.modules():
  980. for param in submodule.parameters(recurse=False):
  981. # Do not cast DDP ignored parameters.
  982. if hasattr(param, "_ddp_ignored") and param._ddp_ignored:
  983. continue
  984. _alloc_storage(param._mp_param, param.size())
  985. # copy() implicitly casts to low precision
  986. with torch.no_grad():
  987. param._mp_param.copy_(param.data)
  988. # TODO: when zero_grad(set_to_none=False) or in grad
  989. # accumulation case, accumulated grads can be in fp32
  990. # which can cause errors when running DDP backwards due
  991. # to mismatched incoming and accumulated gradient types.
  992. # So we manually cast the accumulated grad down for now,
  993. # in the future we may shift to FSDP style gradient
  994. # accumulation management where the accumulated gradient
  995. # is saved and .grad field is set to None, bypassing
  996. # this issue.
  997. if param.grad is not None:
  998. param.grad.data = param.grad.to(
  999. self.mixed_precision.param_dtype # type: ignore[union-attr]
  1000. )
  1001. param.data = param._mp_param
  1002. copy_event = torch.Event()
  1003. copy_event.record()
  1004. self._submodule_to_event[submodule].append(copy_event)
  1005. def _module_wait_for_copy_hook(
  1006. self,
  1007. module,
  1008. *args: Any,
  1009. **kwargs: Any,
  1010. ) -> None:
  1011. """Before carrying out computation, wait on the appropriate event to ensure low precision copies have finished."""
  1012. try:
  1013. event = self._submodule_to_event[module].popleft()
  1014. except IndexError:
  1015. # copy event has already been waited on
  1016. return
  1017. event.wait(stream=torch.accelerator.current_stream())
  1018. for p in module.parameters(recurse=False):
  1019. # Don't register hooks if param does not require grad
  1020. if not p.requires_grad or (hasattr(p, "_ddp_ignored") and p._ddp_ignored):
  1021. continue
  1022. # We need to register autograd hook here instead of DDP's ctor
  1023. # since we're working with the low precision param. Register them
  1024. # via obtaining the gradient accumulator.
  1025. tmp = p.expand_as(p)
  1026. grad_acc = tmp.grad_fn.next_functions[0][0]
  1027. hook = grad_acc.register_hook(
  1028. functools.partial(self._fire_reducer_autograd_hook, p._idx)
  1029. )
  1030. p._ddp_mp_hook_state = (grad_acc, hook)
  1031. def _log_and_throw(self, err_type, err_msg):
  1032. if self.logger is not None:
  1033. self.logger.set_error_and_log(f"{str(err_type)}: {err_msg}")
  1034. raise err_type(err_msg)
  1035. def _ddp_init_helper(
  1036. self,
  1037. parameters,
  1038. expect_sparse_gradient,
  1039. param_to_name_mapping,
  1040. static_graph,
  1041. ):
  1042. """
  1043. DDP init helper function to manage parameters, grad hooks, logging, and SyncBatchNorm.
  1044. Initialization helper function that does the following:
  1045. (1) bucketing the parameters for reductions
  1046. (2) resetting the bucketing states
  1047. (3) registering the grad hooks
  1048. (4) Logging construction-time DDP logging data
  1049. (5) passing a handle of DDP to SyncBatchNorm Layer
  1050. """
  1051. # Notice, the parameters order is not in the order in which they are used,
  1052. # especially in models with control flow.
  1053. #
  1054. # Alongside parameters are not presented in the real execution order,
  1055. # if a certain model happens to also
  1056. # 1) have other collectives comm ops in its backward graph.
  1057. # 2) have unused parameter in subset ranks of the whole world.
  1058. # bucketing could insert ALL-REDUCE comm op too early on the rank with unused parameter,
  1059. # matching up with other collectives comm ops on other ranks unexpectedly.
  1060. #
  1061. # In order to handle this corner case, when the parameters are not in the real execution order,
  1062. # we don't do bucketing, thus only one ALL-REDUCE is inserted after all the gradients
  1063. # of the whole graph are computed.
  1064. #
  1065. # Notice, here we only disable bucketing for the first iteration.
  1066. # After the first iteration, it's OK to rebuild buckets,
  1067. # because "bucket rebuild" bucketizes parameters based on its real execution order in backward graph.
  1068. # Can remove this branching once #73732 is landed.
  1069. if self.bucket_bytes_cap_list:
  1070. bucket_size_limits = self.bucket_bytes_cap_list
  1071. # When bucket_cap_mb_list is provided, use it for rebuilding buckets
  1072. bucket_size_limits_for_rebuilding = self.bucket_bytes_cap_list
  1073. else:
  1074. if static_graph is True or self.find_unused_parameters is False:
  1075. bucket_size_limits = [sys.maxsize]
  1076. else:
  1077. if self.bucket_bytes_cap_default:
  1078. bucket_size_limits = [
  1079. dist._DEFAULT_FIRST_BUCKET_BYTES,
  1080. self.bucket_bytes_cap,
  1081. ]
  1082. else:
  1083. bucket_size_limits = [self.bucket_bytes_cap]
  1084. # When bucket_cap_mb_list is not provided, pass empty list
  1085. # to let C++ Reducer use the original logic (first_bucket_bytes_cap_ and bucket_bytes_cap_)
  1086. bucket_size_limits_for_rebuilding = []
  1087. (
  1088. bucket_indices,
  1089. per_bucket_size_limits,
  1090. ) = dist._compute_bucket_assignment_by_size(
  1091. parameters,
  1092. bucket_size_limits,
  1093. expect_sparse_gradient,
  1094. )
  1095. # Remember index for parameters if we are in mixed precision, as we
  1096. # need to pass in index to Reducer's autograd hook via python.
  1097. if self.mixed_precision is not None:
  1098. for i, p in enumerate(parameters):
  1099. p._idx = i
  1100. # Note: reverse list of buckets because we want to approximate the
  1101. # order in which their gradients are produced, and assume they
  1102. # are used in the forward pass in the order they are defined.
  1103. self.reducer = dist.Reducer(
  1104. parameters,
  1105. list(reversed(bucket_indices)),
  1106. list(reversed(per_bucket_size_limits)),
  1107. self.process_group,
  1108. expect_sparse_gradient,
  1109. # The bucket size limit is specified in the constructor.
  1110. # Additionally, we allow for a single small bucket for parameters
  1111. # that are defined first, such that their gradients don't spill into
  1112. # a much larger bucket, adding unnecessary latency after gradient
  1113. # computation finishes. Experiments showed 1MB is a reasonable value.
  1114. self.bucket_bytes_cap,
  1115. self.find_unused_parameters,
  1116. self.gradient_as_bucket_view,
  1117. param_to_name_mapping,
  1118. # User can set dist._DEFAULT_FIRST_BUCKET_BYTES to tune DDP first
  1119. # bucket.
  1120. (
  1121. dist._DEFAULT_FIRST_BUCKET_BYTES
  1122. if self.bucket_bytes_cap_default
  1123. else self.bucket_bytes_cap
  1124. ),
  1125. self.skip_all_reduce_unused_params,
  1126. self._use_python_reducer,
  1127. bucket_size_limits_for_rebuilding,
  1128. )
  1129. self.logger = dist.Logger(self.reducer)
  1130. # Set as a weak reference to avoid reference cycle between
  1131. # logger and reducer.
  1132. self.reducer.set_logger(self.logger)
  1133. has_sync_bn = False
  1134. for submodule in self.module.modules():
  1135. if isinstance(submodule, torch.nn.SyncBatchNorm):
  1136. has_sync_bn = True
  1137. break
  1138. # Set logging data that can be got during construction time.
  1139. self.logger.set_construction_data_and_log(
  1140. self.module.__class__.__name__,
  1141. [] if self.device_ids is None else self.device_ids,
  1142. -1 if self.output_device is None else self.output_device,
  1143. self.broadcast_buffers,
  1144. has_sync_bn,
  1145. static_graph,
  1146. )
  1147. # passing a handle to torch.nn.SyncBatchNorm layer
  1148. self._passing_sync_batchnorm_handle(self.module)
  1149. def __getstate__(self):
  1150. self._check_default_group()
  1151. attrs = copy.copy(self.__dict__)
  1152. del attrs["process_group"]
  1153. del attrs["reducer"]
  1154. del attrs["logger"]
  1155. return attrs
  1156. def __setstate__(self, state):
  1157. # If serializable, then the process group should be the default one
  1158. self.process_group = _get_default_group()
  1159. super().__setstate__(state)
  1160. self.__dict__.setdefault("require_forward_param_sync", True)
  1161. self.__dict__.setdefault("require_backward_grad_sync", True)
  1162. parameters, expect_sparse_gradient = self._build_params_for_reducer()
  1163. # In debug mode, build a mapping of parameter index -> parameter.
  1164. param_to_name_mapping = self._build_debug_param_to_name_mapping(parameters)
  1165. # Builds reducer.
  1166. self._ddp_init_helper(
  1167. parameters,
  1168. expect_sparse_gradient,
  1169. param_to_name_mapping,
  1170. self.static_graph,
  1171. )
  1172. if self.static_graph:
  1173. self.reducer._set_static_graph()
  1174. if self.logger is None:
  1175. raise AssertionError(
  1176. "self.logger must not be None when static_graph is True"
  1177. )
  1178. self.logger._set_static_graph()
  1179. def _build_params_for_reducer(self):
  1180. # Build tuple of (module, parameter) for all parameters that require grads.
  1181. modules_and_parameters = [
  1182. (module, parameter)
  1183. for module_name, module in self.module.named_modules()
  1184. for parameter in [
  1185. param
  1186. # Note that we access module.named_parameters instead of
  1187. # parameters(module). parameters(module) is only needed in the
  1188. # single-process multi device case, where it accesses replicated
  1189. # parameters through _former_parameters.
  1190. for param_name, param in module.named_parameters(recurse=False)
  1191. if param.requires_grad
  1192. and f"{module_name}.{param_name}" not in self.parameters_to_ignore
  1193. ]
  1194. ]
  1195. # Deduplicate any parameters that might be shared across child modules.
  1196. memo = set()
  1197. modules_and_parameters = [
  1198. # "p not in memo" is the deduplication check.
  1199. # "not memo.add(p)" is always True, and it's only there to cause "add(p)" if needed.
  1200. (m, p)
  1201. for m, p in modules_and_parameters
  1202. if p not in memo and not memo.add(p) # type: ignore[func-returns-value]
  1203. ]
  1204. # Build list of parameters.
  1205. parameters = [parameter for _, parameter in modules_and_parameters]
  1206. # Checks if a module will produce a sparse gradient.
  1207. def produces_sparse_gradient(module):
  1208. if isinstance(module, (torch.nn.Embedding, torch.nn.EmbeddingBag)):
  1209. return module.sparse
  1210. return False
  1211. # Build list of booleans indicating whether or not to expect sparse
  1212. # gradients for the corresponding parameters.
  1213. expect_sparse_gradient = [
  1214. produces_sparse_gradient(module) for module, _ in modules_and_parameters
  1215. ]
  1216. self._assign_modules_buffers()
  1217. return parameters, expect_sparse_gradient
  1218. def _assign_modules_buffers(self):
  1219. """
  1220. Assign self.module.named_buffers to self.modules_buffers.
  1221. Assigns module buffers to self.modules_buffers which are then used to
  1222. broadcast across ranks when broadcast_buffers=True. Note that this
  1223. must be called every time buffers need to be synced because buffers can
  1224. be reassigned by user module,
  1225. see https://github.com/pytorch/pytorch/issues/63916.
  1226. """
  1227. # Collect buffers for modules, filtering out buffers that should be ignored.
  1228. named_module_buffers = [
  1229. (buffer, buffer_name)
  1230. for buffer_name, buffer in self.module.named_buffers()
  1231. if buffer_name not in self.parameters_to_ignore
  1232. ]
  1233. self.modules_buffers = [
  1234. buffer for (buffer, buffer_name) in named_module_buffers
  1235. ]
  1236. # Dict[str, tensor] representing module buffers not ignored by DDP.
  1237. self.named_module_buffers = {
  1238. buffer_name: buffer for (buffer, buffer_name) in named_module_buffers
  1239. }
  1240. def _build_debug_param_to_name_mapping(self, parameters):
  1241. param_to_param_index = {parameters[i]: i for i in range(len(parameters))}
  1242. param_set = set(parameters)
  1243. param_index_to_param_fqn = {}
  1244. for module_name, module in self.module.named_modules():
  1245. for param_name, param in module.named_parameters(recurse=False):
  1246. fqn = f"{module_name}.{param_name}"
  1247. # Bypass ignored parameters since those are not reduced by DDP
  1248. # to begin with.
  1249. if fqn not in self.parameters_to_ignore and param.requires_grad:
  1250. if param not in param_set:
  1251. self._log_and_throw(
  1252. ValueError,
  1253. f"Param with name {fqn} found in module parameters, but not DDP parameters."
  1254. " This indicates a bug in DDP, please report an issue to PyTorch.",
  1255. )
  1256. param_index = param_to_param_index[param]
  1257. param_index_to_param_fqn[param_index] = fqn
  1258. # Ensure we covered all parameters
  1259. if len(param_set) != len(param_index_to_param_fqn):
  1260. self._log_and_throw(
  1261. ValueError,
  1262. (
  1263. "Expected param to name mapping to cover all parameters, but"
  1264. f" got conflicting lengths: {len(param_set)} vs "
  1265. f"{len(param_index_to_param_fqn)}. This indicates a bug in DDP"
  1266. ", please report an issue to PyTorch."
  1267. ),
  1268. )
  1269. return param_index_to_param_fqn
  1270. def _get_parameters(self, m, recurse=True):
  1271. """Return a generator of module parameters."""
  1272. def model_parameters(m):
  1273. ps = (
  1274. m._former_parameters.values()
  1275. if hasattr(m, "_former_parameters")
  1276. else m.parameters(recurse=False)
  1277. )
  1278. yield from ps
  1279. for mod in m.modules() if recurse else [m]:
  1280. yield from model_parameters(mod)
  1281. def _check_default_group(self):
  1282. pickle_not_supported = False
  1283. try:
  1284. if self.process_group != _get_default_group():
  1285. pickle_not_supported = True
  1286. except RuntimeError:
  1287. pickle_not_supported = True
  1288. if pickle_not_supported:
  1289. self._log_and_throw(
  1290. RuntimeError,
  1291. "DDP Pickling/Unpickling are only supported "
  1292. "when using DDP with the default process "
  1293. "group. That is, when you have called "
  1294. "init_process_group and have not passed "
  1295. "process_group argument to DDP constructor",
  1296. )
  1297. @contextmanager
  1298. def no_sync(self):
  1299. r"""
  1300. Context manager to disable gradient synchronizations across DDP processes.
  1301. Within this context, gradients will be accumulated on module
  1302. variables, which will later be synchronized in the first
  1303. forward-backward pass exiting the context.
  1304. Example::
  1305. >>> # xdoctest: +SKIP("undefined variables")
  1306. >>> ddp = torch.nn.parallel.DistributedDataParallel(model, pg)
  1307. >>> with ddp.no_sync():
  1308. >>> for input in inputs:
  1309. >>> ddp(input).backward() # no synchronization, accumulate grads
  1310. >>> ddp(another_input).backward() # synchronize grads
  1311. .. warning::
  1312. The forward pass should be included inside the context manager, or
  1313. else gradients will still be synchronized.
  1314. """
  1315. old_require_backward_grad_sync = self.require_backward_grad_sync
  1316. self.require_backward_grad_sync = False
  1317. try:
  1318. yield
  1319. finally:
  1320. self.require_backward_grad_sync = old_require_backward_grad_sync
  1321. @classmethod
  1322. def _get_active_ddp_module(cls):
  1323. """`TorchDynamo` requires DDP's status and module for cooperative optimization."""
  1324. return cls._active_ddp_module
  1325. # note, this ctxmgr function is marked 'skip' in torchdynamo, so dynamo only kicks in
  1326. # for the 'module_to_run' underneath
  1327. # see torch._dynamo/eval_frame.py TorchPatcher.patch for more details
  1328. @contextmanager
  1329. @torch._disable_dynamo(recursive=False)
  1330. def _inside_ddp_forward(self):
  1331. DistributedDataParallel._active_ddp_module = self
  1332. try:
  1333. yield
  1334. finally:
  1335. DistributedDataParallel._active_ddp_module = None
  1336. def _run_ddp_forward(self, *inputs, **kwargs):
  1337. if self._use_python_reducer:
  1338. return self.module(*inputs, **kwargs) # type: ignore[index]
  1339. else:
  1340. with self._inside_ddp_forward():
  1341. return self.module(*inputs, **kwargs) # type: ignore[index]
  1342. def _clear_grad_buffer(self):
  1343. # Making param.grad points to the grad buffers before backward is based on the
  1344. # assumption that the grad accumulation is done in place in autograd engine,
  1345. # for some edge cases, if the grad accumulation in autograd engine is not in
  1346. # place, then the param.grad and grad buffers are detached.
  1347. if self._delay_grad_buffer is not None:
  1348. # We batch zero_grad for all params by resetting the whole grad
  1349. # buffer when the grad of all params is set to None.
  1350. all_param_grad_none = all(
  1351. param.grad is None for param in self._delay_all_reduce_params
  1352. )
  1353. for index, param in enumerate(self._delay_all_reduce_params):
  1354. if param.grad is None:
  1355. param.grad = self._delay_grad_views[index]
  1356. if not all_param_grad_none:
  1357. param.grad.zero_()
  1358. if all_param_grad_none:
  1359. self._delay_grad_buffer.zero_()
  1360. def _lazy_init(self):
  1361. # Initialization for DDP that occurs after construction, but lazily
  1362. # before the first forward pass.
  1363. self._setup_in_backward_optimizers()
  1364. self._lazy_init_ran = True
  1365. def _pre_forward(self, *inputs, **kwargs):
  1366. if self._use_python_reducer:
  1367. return inputs, kwargs
  1368. if not self._lazy_init_ran and not torch.compiler.is_compiling():
  1369. self._lazy_init()
  1370. if self._delay_all_reduce_all_params:
  1371. return inputs, kwargs
  1372. if torch.is_grad_enabled() and self.require_backward_grad_sync:
  1373. if self.logger is None:
  1374. raise AssertionError("self.logger must not be None")
  1375. self.logger.set_runtime_stats_and_log()
  1376. self.reducer.prepare_for_forward()
  1377. # Notify the join context that this process has not joined, if
  1378. # needed
  1379. work = Join.notify_join_context(self)
  1380. if work:
  1381. self.reducer._set_forward_pass_work_handle(
  1382. work,
  1383. self._divide_by_initial_world_size, # type: ignore[arg-type]
  1384. )
  1385. # Calling _rebuild_buckets before forward computation,
  1386. # It may allocate new buckets before deallocating old buckets
  1387. # inside _rebuild_buckets. To save peak memory usage,
  1388. # call _rebuild_buckets before the peak memory usage increases
  1389. # during forward computation.
  1390. # This should be called only once during whole training period.
  1391. if torch.is_grad_enabled() and self.reducer._rebuild_buckets():
  1392. logger.info("Reducer buckets have been rebuilt in this iteration.")
  1393. self._has_rebuilt_buckets = True
  1394. # sync params according to location (before/after forward) user
  1395. # specified as part of hook, if hook was specified.
  1396. if self._check_sync_bufs_pre_fwd():
  1397. self._sync_buffers()
  1398. if self._join_config.enable:
  1399. # Notify joined ranks whether they should sync in backwards pass or not.
  1400. self._check_global_requires_backward_grad_sync(is_joined_rank=False)
  1401. if self.device_ids:
  1402. moved_inputs, moved_kwargs = _to_kwargs(
  1403. inputs,
  1404. kwargs,
  1405. torch.device(self.device_type, self.device_ids[0]),
  1406. self.use_side_stream_for_tensor_copies,
  1407. )
  1408. args, kwargs = moved_inputs[0], moved_kwargs[0]
  1409. # Cast inputs to reduced precision if needed.
  1410. if self.mixed_precision is not None:
  1411. args, kwargs = _cast_forward_inputs(
  1412. self.mixed_precision.param_dtype,
  1413. *args,
  1414. **kwargs,
  1415. )
  1416. return args, kwargs
  1417. else:
  1418. # Cast inputs to reduced precision if needed.
  1419. # TODO (rohan-varma) test this codepath.
  1420. if self.mixed_precision is not None:
  1421. inputs, kwargs = _cast_forward_inputs(
  1422. self.mixed_precision.param_dtype,
  1423. *inputs,
  1424. **kwargs,
  1425. )
  1426. return inputs, kwargs
  1427. def _post_forward(self, output):
  1428. if self._use_python_reducer:
  1429. return output
  1430. if self._delay_all_reduce_all_params:
  1431. self._clear_grad_buffer()
  1432. return output
  1433. # sync params according to location (before/after forward) user
  1434. # specified as part of hook, if hook was specified.
  1435. if self._check_sync_bufs_post_fwd():
  1436. self._sync_buffers()
  1437. if torch.is_grad_enabled() and self.require_backward_grad_sync:
  1438. self.require_forward_param_sync = True
  1439. # We'll return the output object verbatim since it is a freeform
  1440. # object. We need to find any tensors in this object, though,
  1441. # because we need to figure out which parameters were used during
  1442. # this forward pass, to ensure we short circuit reduction for any
  1443. # unused parameters. Only if `find_unused_parameters` is set.
  1444. if self.find_unused_parameters and not self.static_graph:
  1445. # Do not need to populate this for static graph.
  1446. self.reducer.prepare_for_backward(list(_find_tensors(output)))
  1447. else:
  1448. self.reducer.prepare_for_backward([])
  1449. else:
  1450. self.require_forward_param_sync = False
  1451. # TODO: DDPSink is currently enabled for unused parameter detection and
  1452. # static graph training for first iteration.
  1453. if (self.find_unused_parameters and not self.static_graph) or (
  1454. self.static_graph and not self._static_graph_delay_allreduce_enqueued
  1455. ):
  1456. (
  1457. output_tensor_list,
  1458. treespec,
  1459. output_is_rref,
  1460. ) = _tree_flatten_with_rref(output)
  1461. output_placeholders: list[torch.Tensor | None] = [
  1462. None for _ in range(len(output_tensor_list))
  1463. ]
  1464. # Do not touch tensors that have no grad_fn, which can cause issues
  1465. # such as https://github.com/pytorch/pytorch/issues/60733
  1466. for i, output in enumerate(output_tensor_list):
  1467. if torch.is_tensor(output) and output.grad_fn is None:
  1468. output_placeholders[i] = output
  1469. # When find_unused_parameters=True, makes tensors which require grad
  1470. # run through the DDPSink backward pass. When not all outputs are
  1471. # used in loss, this makes those corresponding tensors receive
  1472. # undefined gradient which the reducer then handles to ensure
  1473. # param.grad field is not touched and we don't error out.
  1474. passthrough_tensor_list = _DDPSink.apply(
  1475. weakref.ref(self),
  1476. *output_tensor_list,
  1477. )
  1478. for i in range(len(output_placeholders)):
  1479. if output_placeholders[i] is None:
  1480. output_placeholders[i] = passthrough_tensor_list[i]
  1481. # Reconstruct output data structure.
  1482. output = _tree_unflatten_with_rref(
  1483. output_placeholders, treespec, output_is_rref
  1484. )
  1485. # At the end of the forward pass, reset the grad buffer and grad views
  1486. self._clear_grad_buffer()
  1487. return output
  1488. def forward(self, *inputs, **kwargs):
  1489. with torch.autograd.profiler.record_function("DistributedDataParallel.forward"):
  1490. inputs, kwargs = self._pre_forward(*inputs, **kwargs)
  1491. output = (
  1492. self.module.forward(*inputs, **kwargs)
  1493. if self._delay_all_reduce_all_params
  1494. else self._run_ddp_forward(*inputs, **kwargs)
  1495. )
  1496. return self._post_forward(output)
  1497. def scatter(self, inputs, kwargs, device_ids):
  1498. return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
  1499. def to_kwargs(self, inputs, kwargs, device_id):
  1500. # Kept for BC
  1501. return _to_kwargs(
  1502. inputs,
  1503. kwargs,
  1504. torch.device(self.device_type, device_id),
  1505. self.use_side_stream_for_tensor_copies,
  1506. )
  1507. def gather(self, outputs, output_device):
  1508. return gather(outputs, output_device, dim=self.dim)
  1509. def train(self, mode=True):
  1510. super().train(mode)
  1511. return self
  1512. # When running in join mode, schedules an allreduce to notify joined ranks
  1513. # of whether backwards pass synchronization will run this iteration or not.
  1514. def _check_global_requires_backward_grad_sync(self, is_joined_rank):
  1515. if not is_joined_rank and self.require_backward_grad_sync:
  1516. requires_sync_tensor = torch.ones(1, device=self.device)
  1517. else:
  1518. requires_sync_tensor = torch.zeros(1, device=self.device)
  1519. work = dist.all_reduce(
  1520. requires_sync_tensor, group=self.process_group, async_op=True
  1521. )
  1522. # (kwen2501) This if condition is a plain translation of previous
  1523. # behavior, i.e. in the `is_joined_rank=False` case, `work.wait()`
  1524. # is not called and it doesn't care about the result. I am guessing
  1525. # that it just wants to fire a matching all-reduce and does not want
  1526. # the main stream to wait.
  1527. if is_joined_rank:
  1528. work.wait()
  1529. should_sync_backwards = requires_sync_tensor.item() != 0
  1530. return should_sync_backwards
  1531. else:
  1532. return None # Return value is not/should not be used.
  1533. # When running in join mode, checks and performs sync of module buffers if
  1534. # the models have buffers that should be synchronized in the forward pass.
  1535. def _check_and_sync_module_buffers(self):
  1536. if self._check_sync_bufs_pre_fwd():
  1537. authoritative_rank = self._find_common_rank(self._distributed_rank, False)
  1538. self._sync_module_buffers(authoritative_rank)
  1539. # When running in join model, agrees upon a common rank and broadcast model
  1540. # parameters to all other ranks.
  1541. def _sync_final_model(self, is_last_joiner):
  1542. # Agree upon the process that will be the authoritative model copy.
  1543. # The current rank is a candidate for being the authoritative copy if
  1544. # is_last_joiner=True. We break ties via picking the larger rank.
  1545. self._authoritative_rank = self._find_common_rank(
  1546. self._distributed_rank, is_last_joiner
  1547. )
  1548. _sync_module_states(
  1549. module=self.module,
  1550. process_group=self.process_group,
  1551. broadcast_bucket_size=self.broadcast_bucket_size,
  1552. src=self._authoritative_rank,
  1553. params_and_buffers_to_ignore=self.parameters_to_ignore,
  1554. broadcast_buffers=self.broadcast_buffers,
  1555. )
  1556. # Schedule comm ops to match those scheduled in the reducer's backward
  1557. # pass.
  1558. def _match_all_reduce_for_bwd_pass(self):
  1559. comm_work = []
  1560. # Schedule comm in the same order as Reducer schedules them, i.e.
  1561. # the order of the buckets. Retrieving the bucket order from the reducer
  1562. # ensures that we keep the same order in join mode, such as when bucket
  1563. # order is rebuilt dynamically.
  1564. # Returns grad_buckets in order, but real tensors are substituted with
  1565. # zero tensors of the same shape.
  1566. grad_buckets = self.reducer._get_zeros_like_grad_buckets()
  1567. for grad_bucket in grad_buckets:
  1568. # Joined processes contribute zero gradient. In the case that
  1569. # divide_by_initial_world_size=True, we divide grads by the static
  1570. # world size, if not, the dividing factor is reduced by the number
  1571. # of joined processes.
  1572. work = self.reducer._run_comm_hook(grad_bucket)
  1573. comm_work.append(work)
  1574. for work in comm_work:
  1575. work.wait()
  1576. # Allreduces the used parameter mapping across ranks.
  1577. def _match_unused_params_allreduce(self):
  1578. locally_used_param_map = self.reducer._get_local_used_map()
  1579. self.process_group.allreduce(locally_used_param_map)
  1580. def join(
  1581. self,
  1582. divide_by_initial_world_size: bool = True,
  1583. enable: bool = True,
  1584. throw_on_early_termination: bool = False,
  1585. ):
  1586. r"""
  1587. Context manager for training with uneven inputs across processes in DDP.
  1588. This context manager will keep track of already-joined DDP processes,
  1589. and "shadow" the forward and backward passes by inserting collective
  1590. communication operations to match with the ones created by non-joined
  1591. DDP processes. This will ensure each collective call has a corresponding
  1592. call by already-joined DDP processes, preventing hangs or errors that
  1593. would otherwise happen when training with uneven inputs across
  1594. processes. Alternatively, if the flag ``throw_on_early_termination`` is
  1595. specified to be ``True``, all trainers will throw an error once one rank
  1596. runs out of inputs, allowing these errors to be caught and handled
  1597. according to application logic.
  1598. Once all DDP processes have joined, the context manager will broadcast
  1599. the model corresponding to the last joined process to all processes to
  1600. ensure the model is the same across all processes
  1601. (which is guaranteed by DDP).
  1602. To use this to enable training with uneven inputs across processes,
  1603. simply wrap this context manager around your training loop. No further
  1604. modifications to the model or data loading is required.
  1605. .. warning::
  1606. If the model or training loop this context manager is wrapped around
  1607. has additional distributed collective operations, such as
  1608. ``SyncBatchNorm`` in the model's forward pass, then the flag
  1609. ``throw_on_early_termination`` must be enabled. This is because this
  1610. context manager is not aware of non-DDP collective communication.
  1611. This flag will cause all ranks to throw when any one rank
  1612. exhausts inputs, allowing these errors to be caught and recovered
  1613. from across all ranks.
  1614. Args:
  1615. divide_by_initial_world_size (bool): If ``True``, will divide
  1616. gradients by the initial ``world_size`` DDP training was launched
  1617. with. If ``False``, will compute the effective world size
  1618. (number of ranks that have not depleted their inputs yet) and
  1619. divide gradients by that during allreduce. Set
  1620. ``divide_by_initial_world_size=True`` to ensure every input
  1621. sample including the uneven inputs have equal weight in terms of
  1622. how much they contribute to the global gradient. This is
  1623. achieved by always dividing the gradient by the initial
  1624. ``world_size`` even when we encounter uneven inputs. If you set
  1625. this to ``False``, we divide the gradient by the remaining
  1626. number of nodes. This ensures parity with training on a smaller
  1627. ``world_size`` although it also means the uneven inputs would
  1628. contribute more towards the global gradient. Typically, you
  1629. would want to set this to ``True`` for cases where the last few
  1630. inputs of your training job are uneven. In extreme cases, where
  1631. there is a large discrepancy in the number of inputs, setting
  1632. this to ``False`` might provide better results.
  1633. enable (bool): Whether to enable uneven input detection or not. Pass
  1634. in ``enable=False`` to disable in cases where you know that
  1635. inputs are even across participating processes. Default is
  1636. ``True``.
  1637. throw_on_early_termination (bool): Whether to throw an error
  1638. or continue training when at least one rank has exhausted
  1639. inputs. If ``True``, will throw upon the first rank reaching end
  1640. of data. If ``False``, will continue training with a smaller
  1641. effective world size until all ranks are joined. Note that if
  1642. this flag is specified, then the flag
  1643. ``divide_by_initial_world_size`` would be ignored. Default
  1644. is ``False``.
  1645. Example::
  1646. >>> # xdoctest: +SKIP("Distributed")
  1647. >>> import torch
  1648. >>> import torch.distributed as dist
  1649. >>> import os
  1650. >>> import torch.multiprocessing as mp
  1651. >>> import torch.nn as nn
  1652. >>> # On each spawned worker
  1653. >>> def worker(rank):
  1654. >>> dist.init_process_group("nccl", rank=rank, world_size=2)
  1655. >>> torch.cuda.set_device(rank)
  1656. >>> model = nn.Linear(1, 1, bias=False).to(rank)
  1657. >>> model = torch.nn.parallel.DistributedDataParallel(
  1658. >>> model, device_ids=[rank], output_device=rank
  1659. >>> )
  1660. >>> # Rank 1 gets one more input than rank 0.
  1661. >>> inputs = [torch.tensor([1]).float() for _ in range(10 + rank)]
  1662. >>> with model.join():
  1663. >>> for _ in range(5):
  1664. >>> for inp in inputs:
  1665. >>> loss = model(inp).sum()
  1666. >>> loss.backward()
  1667. >>> # Without the join() API, the below synchronization will hang
  1668. >>> # blocking for rank 1's allreduce to complete.
  1669. >>> torch.cuda.synchronize(device=rank)
  1670. """
  1671. return Join(
  1672. [self],
  1673. enable,
  1674. throw_on_early_termination,
  1675. divide_by_initial_world_size=divide_by_initial_world_size,
  1676. )
  1677. def join_hook(
  1678. self,
  1679. **kwargs,
  1680. ):
  1681. r"""
  1682. DDP join hook enables training on uneven inputs by mirroring communications in forward and backward passes.
  1683. Arguments:
  1684. kwargs (dict): a :class:`dict` containing any keyword arguments
  1685. to modify the behavior of the join hook at run time; all
  1686. :class:`Joinable` instances sharing the same join context
  1687. manager are forwarded the same value for ``kwargs``.
  1688. The hook supports the following keyword arguments:
  1689. divide_by_initial_world_size (bool, optional):
  1690. If ``True``, then gradients are divided by the initial world
  1691. size that DDP was launched with.
  1692. If ``False``, then gradients are divided by the effective world
  1693. size (i.e. the number of non-joined processes), meaning that
  1694. the uneven inputs contribute more toward the global gradient.
  1695. Typically, this should be set to ``True`` if the degree of
  1696. unevenness is small but can be set to ``False`` in extreme
  1697. cases for possibly better results.
  1698. Default is ``True``.
  1699. """
  1700. divide_by_initial_world_size = kwargs.get("divide_by_initial_world_size", True)
  1701. return _DDPJoinHook(
  1702. self, divide_by_initial_world_size=divide_by_initial_world_size
  1703. )
  1704. @property
  1705. def join_device(self):
  1706. return self.device
  1707. @property
  1708. def join_process_group(self):
  1709. return self.process_group
  1710. def _register_buffer_comm_hook(
  1711. self,
  1712. state,
  1713. hook: Callable,
  1714. comm_hook_location=_BufferCommHookLocation.POST_FORWARD,
  1715. ):
  1716. r"""
  1717. Allow custom registration of hooks that define how buffer are synchronized across ranks.
  1718. The hook takes in an optional state and is passed in a Dict[str, Tensor]
  1719. corresponding to buffer names and the buffers, and can run arbitrary reductions
  1720. on buffers as opposed to DDP's default broadcast from rank 0. This is useful for
  1721. example if a counter needs to be summed or averaged across ranks every iteration.
  1722. Args:
  1723. state (Any): Optional state that is passed to the hook.
  1724. hook (Callable): Callable with the following signature:
  1725. ``hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]``
  1726. comm_hook_location (_BufferCommHookLocation): Enum value indicating
  1727. where to run the hook.
  1728. _BufferCommHookLocation.PRE_FORWARD means that the
  1729. hook will run _before_ the forward pass, and
  1730. _BufferCommHookLocation.POST_FORWARD means that the
  1731. hook will run _after_ the forward pass.
  1732. NOTE: To maximize performance, users can return a
  1733. list[torch.futures.Future] from their hook, and DDP will
  1734. install and await these hooks appropriately at the end of
  1735. the backward pass. This will ensure all buffers are
  1736. synchronized by the end of the backward pass. If this
  1737. setting is used, it is recommended to pass
  1738. comm_hook_location=_BufferCommHookLocation.POST_FORWARD,
  1739. which will trigger the hook after the forward pass.
  1740. If _BufferCommHookLocation.PRE_FORWARD is used, users must
  1741. ensure appropriate synchronization when manipulating GPU
  1742. buffers in the forward pass.
  1743. """
  1744. if not callable(hook):
  1745. raise AssertionError(f"hook must be callable, got {type(hook).__name__}")
  1746. self.buffer_hook = _BufferCommHook(
  1747. buffer_comm_hook=hook,
  1748. buffer_comm_hook_state=state,
  1749. buffer_comm_hook_location=comm_hook_location,
  1750. )
  1751. def register_comm_hook(self, state: object, hook: Callable) -> None:
  1752. r"""
  1753. Register communication hook for user-defined DDP aggregation of gradients across multiple workers.
  1754. This hook would be very useful for researchers to try out new ideas. For
  1755. example, this hook can be used to implement several algorithms like GossipGrad
  1756. and gradient compression which involve different communication strategies for
  1757. parameter syncs while running Distributed DataParallel training.
  1758. Args:
  1759. state (object): Passed to the hook to maintain any state information during the training process.
  1760. Examples include error feedback in gradient compression,
  1761. peers to communicate with next in GossipGrad, etc.
  1762. It is locally stored by each worker
  1763. and shared by all the gradient tensors on the worker.
  1764. hook (Callable): Callable with the following signature:
  1765. ``hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]``:
  1766. This function is called once the bucket is ready. The
  1767. hook can perform whatever processing is needed and return
  1768. a Future indicating completion of any async work (ex: allreduce).
  1769. If the hook doesn't perform any communication, it still
  1770. must return a completed Future. The Future should hold the
  1771. new value of grad bucket's tensors. Once a bucket is ready,
  1772. c10d reducer would call this hook and use the tensors returned
  1773. by the Future and copy grads to individual parameters.
  1774. Note that the future's return type must be a single tensor.
  1775. We also provide an API called ``get_future`` to retrieve a
  1776. Future associated with the completion of ``c10d.ProcessGroup.Work``.
  1777. ``get_future`` is currently supported for NCCL and also supported for most
  1778. operations on GLOO and MPI, except for peer to peer operations (send/recv).
  1779. .. warning ::
  1780. Grad bucket's tensors will not be predivided by world_size. User is responsible
  1781. to divide by the world_size in case of operations like allreduce.
  1782. .. warning ::
  1783. DDP communication hook can only be registered once and should be registered
  1784. before calling backward.
  1785. .. warning ::
  1786. The Future object that hook returns should contain a single tensor
  1787. that has the same shape with the tensors inside grad bucket.
  1788. .. warning ::
  1789. ``get_future`` API supports NCCL, and partially GLOO and MPI backends (no support
  1790. for peer-to-peer operations like send/recv) and will return a ``torch.futures.Future``.
  1791. Example::
  1792. Below is an example of a noop hook that returns the same tensor.
  1793. >>> # xdoctest: +SKIP('undefined name')
  1794. >>> def noop(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]:
  1795. >>> fut = torch.futures.Future()
  1796. >>> fut.set_result(bucket.buffer())
  1797. >>> return fut
  1798. >>> ddp.register_comm_hook(state=None, hook=noop)
  1799. Example::
  1800. Below is an example of a Parallel SGD algorithm where gradients are encoded before
  1801. allreduce, and then decoded after allreduce.
  1802. >>> # xdoctest: +SKIP('undefined name')
  1803. >>> def encode_and_decode(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]:
  1804. >>> encoded_tensor = encode(bucket.buffer()) # encode gradients
  1805. >>> fut = torch.distributed.all_reduce(encoded_tensor).get_future()
  1806. >>> # Define the then callback to decode.
  1807. >>> def decode(fut):
  1808. >>> decoded_tensor = decode(fut.value()[0]) # decode gradients
  1809. >>> return decoded_tensor
  1810. >>> return fut.then(decode)
  1811. >>> ddp.register_comm_hook(state=None, hook=encode_and_decode)
  1812. """
  1813. self._check_comm_hook(hook)
  1814. if self.logger is None:
  1815. raise AssertionError("self.logger must not be None")
  1816. self.logger._set_comm_hook_name(hook.__qualname__)
  1817. self._comm_hooks.append((hook, state))
  1818. dist._register_comm_hook(self.reducer, state, hook)
  1819. def _register_builtin_comm_hook(self, comm_hook_type):
  1820. r"""
  1821. Register a built-in communication hook that specifies how DDP aggregates gradients across multiple workers.
  1822. The built-in hooks aim to provide efficient C++ implementations for certain hooks,
  1823. which might not be as efficient if implemented in Python using a Python communication hook.
  1824. Args:
  1825. comm_hook_type (dist.BuiltinCommHookType): type of communication hook, such as ALLREDUCE, FP16_COMPRESS, etc.
  1826. .. warning ::
  1827. DDP communication hook can only be registered once and should be registered
  1828. before calling backward.
  1829. Example::
  1830. Below is an example of a FP16 compression where gradients are
  1831. compressed into 16-bit floating-point numbers before allreduce, and
  1832. then decompressed after allreduce.
  1833. >>> # xdoctest: +SKIP('undefined name')
  1834. >>> ddp._register_builtin_comm_hook(dist.BuiltinCommHookType.FP16_COMPRESS)
  1835. """
  1836. if self.logger is None:
  1837. raise AssertionError("self.logger must not be None")
  1838. self.logger._set_comm_hook_name(str(comm_hook_type))
  1839. dist._register_builtin_comm_hook(self.reducer, comm_hook_type)
  1840. def _register_fused_optim(self, optim: type, *args, optim_params=None, **kwargs):
  1841. r"""
  1842. Register an optimizer in DDP to optimize parameter immediately after its gradient reduction.
  1843. Registers an optimizer with DDP such that the optimization for a
  1844. parameter will run immediately when that parameter's gradient is
  1845. finished with reduction, instead of waiting for all parameters'
  1846. gradients to finish reduction. This can result in a training speedup
  1847. depending on your workload since the optimizer can run while gradient
  1848. reduction for other parameters are still ongoing. In addition, this has
  1849. the potential to reduce peak memory consumption during training, as it
  1850. only needs to load the per-parameter optimizer states of a single
  1851. parameter at a time, instead of loading all per-parameter optimizer
  1852. states at once.
  1853. Args:
  1854. optim (Type): a ``torch.optim.Optimizer`` class to be registered
  1855. as a fused optimizer.
  1856. *args (Sequence[Any]): Arguments to forward to `optim`.
  1857. optim_params (Optional[Iterable[torch.Tensor]]): Set of parameters
  1858. to optimize, similar to `params` argument of traditional `torch.optim`
  1859. Optimizers. If this is omitted, all DDP model parameters will be
  1860. optimized.
  1861. **kwargs: (Dict[str, Any]): Keyword arguments to forward to `optim`.
  1862. .. warning ::
  1863. _register_fused_optim should only be called once on a DDP instance,
  1864. and registering multiple fused optimizers for the same DDP model
  1865. is not currently supported. Please ping
  1866. https://github.com/pytorch/pytorch/issues/71595 if this is necessary
  1867. for your use case.
  1868. .. warning ::
  1869. _register_fused_optim and register_comm_hook currently do not
  1870. compose together, meaning that custom DDP communication hooks are
  1871. not supported with overlapped optimizers. Please ping
  1872. https://github.com/pytorch/pytorch/issues/71595 if this is necessary
  1873. for your use case.
  1874. .. warning ::
  1875. Gradient accumulation and DDP `no_sync` are currently not supported
  1876. with overlapped optimizer. Please ping
  1877. https://github.com/pytorch/pytorch/issues/71595 if this is necessary
  1878. for your use case.
  1879. Example::
  1880. >>> # xdoctest: +SKIP("No rendezvous handler")
  1881. >>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
  1882. >>> net = torch.nn.parallel.DistributedDataParallel(model, pg)
  1883. >>> lr = 1e-2
  1884. >>> betas = (0.9, 0.99)
  1885. >>> eps = 1e-6
  1886. >>> net._register_fused_optim(torch.optim.Adam, lr, betas=betas, eps=eps)
  1887. >>> # Example with subset of parameters
  1888. >>> params_to_opt = [list(net.parameters())[0]]
  1889. >>> net._register_fused_optim(
  1890. ... torch.optim.Adam, lr, optim_params=params_to_opt, betas=betas, eps=eps
  1891. ... )
  1892. """
  1893. # Note: importing in function, otherwise this will cause a circular
  1894. # import as optimizer_overlap module needs to import DistributedDataParallel.
  1895. from torch.distributed.algorithms._optimizer_overlap import _as_overlapped_optim
  1896. overlapped_optim = _as_overlapped_optim(optim, optim_params, *args, **kwargs)
  1897. try:
  1898. overlapped_optim.register_ddp(self)
  1899. except NotImplementedError as e:
  1900. raise RuntimeError(
  1901. f"{optim} does not support overlapped DDP. Please file an issue to PyTorch or the respective owner of {optim}."
  1902. ) from e
  1903. def _distributed_broadcast_coalesced(
  1904. self, tensors, buffer_size, authoritative_rank=0
  1905. ):
  1906. dist._broadcast_coalesced(
  1907. self.process_group, tensors, buffer_size, authoritative_rank
  1908. )
  1909. def _check_sync_bufs_post_fwd(self):
  1910. return (
  1911. self.will_sync_module_buffers()
  1912. and hasattr(self, "buffer_hook")
  1913. and self.buffer_hook.buffer_comm_hook_location
  1914. == _BufferCommHookLocation.POST_FORWARD
  1915. )
  1916. def _check_sync_bufs_pre_fwd(self):
  1917. return self.will_sync_module_buffers() and (
  1918. not hasattr(self, "buffer_hook")
  1919. or self.buffer_hook.buffer_comm_hook_location
  1920. == _BufferCommHookLocation.PRE_FORWARD
  1921. )
  1922. def will_sync_module_buffers(self):
  1923. return (
  1924. self.require_forward_param_sync
  1925. and self.broadcast_buffers
  1926. and len(self.modules_buffers) > 0
  1927. )
  1928. def _find_common_rank(self, input_rank, rank_cond):
  1929. # -1 indicates that this rank is not under consideration to be the
  1930. # common_rank
  1931. rank_to_use = torch.tensor(
  1932. [input_rank if rank_cond else -1],
  1933. device=self.device,
  1934. )
  1935. dist.all_reduce(rank_to_use, op=ReduceOp.MAX, group=self.process_group)
  1936. if rank_to_use.item() == -1:
  1937. self._log_and_throw(
  1938. ValueError,
  1939. "BUG! Expected rank_cond to be true for at least one process."
  1940. " This indicates a bug in PyTorch, please report an issue.",
  1941. )
  1942. return rank_to_use.item()
  1943. def _sync_buffers(self):
  1944. with torch.no_grad():
  1945. # module buffer sync
  1946. # Synchronize buffers across processes.
  1947. # If we are running DDP with the join manager, we have to agree
  1948. # upon a rank to sync module buffers from, since rank 0 may
  1949. # already have been joined and have stale module buffers.
  1950. if self._join_config.enable:
  1951. authoritative_rank = self._find_common_rank(
  1952. self._distributed_rank, True
  1953. )
  1954. else:
  1955. # The process with rank 0 is considered the authoritative copy.
  1956. authoritative_rank = 0
  1957. # Update self.modules_buffers in case any buffers were
  1958. # reassigned.
  1959. self._assign_modules_buffers()
  1960. self._sync_module_buffers(authoritative_rank)
  1961. def _sync_module_buffers(self, authoritative_rank):
  1962. if not hasattr(self, "buffer_hook"):
  1963. self._default_broadcast_coalesced(authoritative_rank=authoritative_rank)
  1964. else:
  1965. hook = self.buffer_hook.buffer_comm_hook
  1966. state = self.buffer_hook.buffer_comm_hook_state
  1967. futs = hook(state, self.named_module_buffers)
  1968. if futs is not None:
  1969. self.reducer._install_post_backward_futures(futs)
  1970. def _default_broadcast_coalesced(
  1971. self, bufs=None, bucket_size=None, authoritative_rank=0
  1972. ):
  1973. """
  1974. Broadcasts buffers from rank 0 to rest of workers.
  1975. If bufs, bucket_size are None, default values self.modules_buffers
  1976. and self.broadcast_bucket_size are used instead.
  1977. """
  1978. if bufs is None:
  1979. bufs = self.modules_buffers
  1980. if bucket_size is None:
  1981. bucket_size = self.broadcast_bucket_size
  1982. self._distributed_broadcast_coalesced(bufs, bucket_size, authoritative_rank)
  1983. def _passing_sync_batchnorm_handle(self, module):
  1984. for layer in module.modules():
  1985. if isinstance(layer, torch.nn.modules.SyncBatchNorm):
  1986. if self.device_type == "cpu":
  1987. self._log_and_throw(
  1988. ValueError,
  1989. "SyncBatchNorm layers only work with GPU modules",
  1990. )
  1991. def _check_comm_hook(self, hook):
  1992. if not callable(hook):
  1993. self._log_and_throw(TypeError, "Communication hook must be callable.")
  1994. sig = inspect.signature(hook)
  1995. if (
  1996. sig.parameters["bucket"].annotation != inspect._empty
  1997. and sig.parameters["bucket"].annotation != dist.GradBucket
  1998. ):
  1999. self._log_and_throw(
  2000. ValueError,
  2001. "Communication hook: bucket annotation should be dist.GradBucket.",
  2002. )
  2003. if (
  2004. sig.return_annotation != inspect._empty
  2005. and sig.return_annotation != torch.futures.Future[torch.Tensor]
  2006. ):
  2007. self._log_and_throw(
  2008. ValueError,
  2009. "Communication hook: return annotation should be torch.futures.Future[torch.Tensor].",
  2010. )
  2011. if hook.__name__ in ["bf16_compress_hook", "bf16_compress_wrapper_hook"]:
  2012. cuda_supported = (
  2013. torch.version.cuda is not None
  2014. ) or torch.version.hip is not None
  2015. nccl_supported = (
  2016. dist.is_available()
  2017. and dist.is_nccl_available()
  2018. and torch.cuda.nccl.version() >= (2, 10)
  2019. )
  2020. xpu_xccl_supported = (
  2021. dist.is_available()
  2022. and dist.is_xccl_available()
  2023. and torch.xpu.is_available()
  2024. )
  2025. if not ((cuda_supported and nccl_supported) or xpu_xccl_supported):
  2026. self._log_and_throw(
  2027. TypeError,
  2028. "BF16 all reduce communication hook required CUDA 11+ and NCCL 2.10+ or XPU and XCCL",
  2029. )
  2030. @property
  2031. def _distributed_rank(self):
  2032. return dist.get_rank(self.process_group)
  2033. @staticmethod
  2034. def _get_data_parallel_params(module, named_params=False):
  2035. """Return a generator of parameters managed by a given DDP unit."""
  2036. for param in (
  2037. module.parameters() if not named_params else module.named_parameters()
  2038. ):
  2039. if not hasattr(param, "_ddp_ignored"):
  2040. yield param
  2041. @staticmethod
  2042. def _set_params_and_buffers_to_ignore_for_model(
  2043. module, params_and_buffers_to_ignore
  2044. ):
  2045. """
  2046. Set parameters and buffers to be ignored by DDP.
  2047. Expected format for parameters is the fully qualified name: {module_name}.{param_name}, and
  2048. similarly, {module_name}.{buffer_name} for buffers. For example:
  2049. params_to_ignore = []
  2050. # NB: model here is vanilla PyTorch module, not yet wrapped with DDP.
  2051. for module_name, module in model.named_modules():
  2052. for param_name, param in module.named_parameters(recurse=False):
  2053. if should_ignore(param):
  2054. # Create expected format
  2055. fqn = f"{module_name}.{param_name}"
  2056. params_to_ignore.append(fqn)
  2057. torch.nn.parallel.DistributedDataParallel._set_params_and_buffers_to_ignore_for_model(
  2058. model,
  2059. params_to_ignore
  2060. )
  2061. """
  2062. # This is a workaround to set parameters and buffers DDP should ignore
  2063. # during synchronization. It will be removed when the API is finalized
  2064. # as part of addressing https://github.com/pytorch/pytorch/issues/43690.
  2065. module._ddp_params_and_buffers_to_ignore = params_and_buffers_to_ignore
  2066. for name, param in module.named_parameters():
  2067. if name in params_and_buffers_to_ignore:
  2068. param._ddp_ignored = True
  2069. for name, buffer in module.named_buffers():
  2070. if name in params_and_buffers_to_ignore:
  2071. buffer._ddp_ignored = True
  2072. def _get_ddp_logging_data(self):
  2073. r"""
  2074. Return a dictionary of logging data for debugging and analysis.
  2075. This interface can be called after DistributedDataParallel() is
  2076. constructed. It returns a dictionary of logging data. It could help
  2077. for debugging and analysis. The logging data includes DistributedDataParallel
  2078. constructor input parameters, some internal states of DistributedDataParallel
  2079. and performance metrics. Simply print the dictionary and see what
  2080. these metrics are.
  2081. This is a prototype interface and subject to change in the future.
  2082. """
  2083. if self.logger is None:
  2084. raise AssertionError("self.logger must not be None")
  2085. ddp_logging_data = self.logger._get_ddp_logging_data()
  2086. return {**ddp_logging_data.strs_map, **ddp_logging_data.ints_map}
  2087. def _set_ddp_runtime_logging_sample_rate(self, sample_rate):
  2088. r"""
  2089. Set sample_rate of collecting runtime stats.
  2090. This interface allows users to set sample_rate of collecting
  2091. runtime stats. The runtime stats will be recorded for the
  2092. first 10 iterations, after 10 iterations runtime stats will be
  2093. recorded once every "sample_rate" training iterations. In
  2094. default, runtime stats are recorded for the first 10 iterations,
  2095. after 10 iterations runtime stats are recorded once every
  2096. "kDDPRuntimeLoggingSampleRate=100" training iterations.
  2097. This is a prototype interface and subject to change in the future.
  2098. """
  2099. if sample_rate < 1:
  2100. self._log_and_throw(
  2101. ValueError,
  2102. "DDP runtime logging sample rate should be equal or greater than 1",
  2103. )
  2104. self.reducer._set_ddp_runtime_logging_sample_rate(sample_rate)
  2105. def _set_static_graph(self):
  2106. """
  2107. Set static graph for DDP.
  2108. It is recommended to set static graph in the DDP constructor, which will
  2109. call this private API internally.
  2110. """
  2111. # If self.static_graph has been set, no need to set it again
  2112. if self.static_graph:
  2113. warnings.warn(
  2114. "You've set static_graph to be True, no need to set it again.",
  2115. stacklevel=2,
  2116. )
  2117. return
  2118. self.static_graph = True
  2119. self._static_graph_delay_allreduce_enqueued = False
  2120. self.reducer._set_static_graph()
  2121. if self.logger is None:
  2122. raise AssertionError("self.logger must not be None")
  2123. self.logger._set_static_graph()
  2124. if self.find_unused_parameters:
  2125. warnings.warn(
  2126. "You passed find_unused_parameters=true to DistributedDataParallel, "
  2127. "`_set_static_graph` will detect unused parameters automatically, so "
  2128. "you do not need to set find_unused_parameters=true, just be sure these "
  2129. "unused parameters will not change during training loop while calling "
  2130. "`_set_static_graph`.",
  2131. stacklevel=2,
  2132. )
  2133. def _remove_autograd_hooks(self):
  2134. """Remove autograd hooks registered by the reducer on the model parameters."""
  2135. self.reducer._remove_autograd_hooks()
  2136. def _check_reducer_finalized(self):
  2137. """
  2138. Check if the reducer has processed all buckets and finalized the backward appropriately.
  2139. It is useful to call this method after calling .backward() in your training loop
  2140. in order to avoid subsequent hard to debug errors down the road due to the
  2141. reducer not finalizing backward.
  2142. """
  2143. self.reducer._check_reducer_finalized()
  2144. def _set_sparse_metadata(self, global_unique_ids):
  2145. self.reducer._set_sparse_metadata(global_unique_ids)
  2146. def _update_process_group(self, new_process_group):
  2147. """
  2148. Dynamically updates the process group for DDP so that we can shrink/expand DDP
  2149. world size without having to reinitialize DDP.
  2150. NOTE: If you are using custom communications hooks via, register_comm_hook,
  2151. you need to update the process groups for those hooks separately.
  2152. """
  2153. # Force a rebuild of buckets for a new process group. This ensures all ranks
  2154. # are synchronized in terms of when they will rebuild buckets and also
  2155. # re-evaluates previous assumptions of buckets given the world size might have
  2156. # changed.
  2157. self._has_rebuilt_buckets = False
  2158. self.reducer._reset_state()
  2159. if not _rank_not_in_group(new_process_group):
  2160. self.process_group = new_process_group
  2161. self.reducer._update_process_group(new_process_group)
  2162. def _set_ddp_sink_clone(self, val: bool):
  2163. """
  2164. Sets whether or not DDPSink should clone the output tensors or not.
  2165. The default is True since if the loss is modified in place we run
  2166. into the view is modified in-place error.
  2167. Although, cloning the tensors can add significant memory and
  2168. performance hit if the number and size of tensors are large. As
  2169. a result, this can be set to False if you are not modifying the
  2170. loss in place.
  2171. """
  2172. self._ddp_sink_clone = val