utils.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. # mypy: allow-untyped-defs
  2. import dataclasses
  3. import traceback
  4. from collections import OrderedDict
  5. from collections.abc import Callable, Container
  6. from typing import Any, Optional, overload, TypeVar
  7. import torch
  8. import torch.distributed as dist
  9. from torch import nn
  10. from torch.nn.utils.rnn import PackedSequence
  11. __all__ = [] # type: ignore[var-annotated]
  12. def _pack_kwargs(*args: Any, **kwargs: Any) -> tuple[tuple[Any, ...], tuple[str, ...]]:
  13. """
  14. Turn argument list into separate key list and value list (unpack_kwargs does the opposite).
  15. Inspiration: https://github.com/facebookresearch/fairscale/blob/eeb6684/fairscale/internal/containers.py#L70
  16. Usage::
  17. kwarg_keys, flat_args = pack_kwargs(1, 2, a=3, b=4)
  18. assert kwarg_keys == ("a", "b")
  19. assert flat_args == (1, 2, 3, 4)
  20. args, kwargs = unpack_kwargs(kwarg_keys, flat_args)
  21. assert args == (1, 2)
  22. assert kwargs == {"a": 3, "b": 4}
  23. Returns:
  24. Tuple[Tuple[Any, ...], Tuple[str, ...]]: The first tuple element gives
  25. gives both positional args and kwarg values, where the positional args
  26. proceed kwarg values and kwarg values are ordered consistently with the
  27. kwarg keys. The second tuple element gives the kwarg keys.
  28. The second tuple element's length is at most the first tuple element's length.
  29. """
  30. kwarg_keys: list[str] = []
  31. flat_args: list[Any] = list(args)
  32. for k, v in kwargs.items():
  33. kwarg_keys.append(k)
  34. flat_args.append(v)
  35. return tuple(flat_args), tuple(kwarg_keys)
  36. def _cast_forward_inputs(
  37. dtype: torch.dtype | None,
  38. *args: Any,
  39. **kwargs: Any,
  40. ) -> tuple[Any, Any]:
  41. """
  42. Cast floating point tensors in ``args`` and ``kwargs`` to ``input_dtype``.
  43. This respects the existing ``requires_grad`` on the tensors.
  44. """
  45. if dtype is None:
  46. return args, kwargs
  47. def cast_fn(x: torch.Tensor) -> torch.Tensor:
  48. if not torch.is_floating_point(x) or x.dtype == dtype:
  49. return x
  50. return x.to(dtype)
  51. return (_apply_to_tensors(cast_fn, args), _apply_to_tensors(cast_fn, kwargs))
  52. def _unpack_kwargs(
  53. flat_args: tuple[Any, ...], kwarg_keys: tuple[str, ...]
  54. ) -> tuple[tuple[Any, ...], dict[str, Any]]:
  55. """See _pack_kwargs."""
  56. if len(kwarg_keys) > len(flat_args):
  57. raise AssertionError(f"too many keys {len(kwarg_keys)} vs. {len(flat_args)}")
  58. if len(kwarg_keys) == 0:
  59. return flat_args, {}
  60. args = flat_args[: -len(kwarg_keys)]
  61. kwargs = dict(zip(kwarg_keys, flat_args[-len(kwarg_keys) :]))
  62. return args, kwargs
  63. S = TypeVar("S", dict, list, tuple)
  64. T = TypeVar("T", torch.Tensor, PackedSequence)
  65. @overload
  66. def _recursive_to(
  67. inputs: S, target_device: torch.device, use_side_stream_for_tensor_copies: bool
  68. ) -> list[S]: ...
  69. @overload
  70. def _recursive_to(
  71. inputs: T, target_device: torch.device, use_side_stream_for_tensor_copies: bool
  72. ) -> tuple[T]: ...
  73. def _recursive_to(inputs, target_device, use_side_stream_for_tensor_copies):
  74. r"""Recursively moves input to the target_device."""
  75. def to_map(obj):
  76. if isinstance(obj, (torch.Tensor, PackedSequence)):
  77. device = obj.data.device if isinstance(obj, PackedSequence) else obj.device
  78. if device == target_device:
  79. return (obj,)
  80. if not use_side_stream_for_tensor_copies:
  81. return (obj.to(target_device),)
  82. else:
  83. # If the custom module is not registered to torch, stream is not used for acceleration
  84. if device.type == "cpu":
  85. return (obj.to(target_device),)
  86. from torch.nn.parallel._functions import _get_stream
  87. # Perform CPU -> target_device copies in a background stream. This code is
  88. # motivated from similar logic in torch/nn/parallel/_functions.py
  89. stream = _get_stream(target_device)
  90. with stream:
  91. output = obj.to(target_device)
  92. # synchronize with the copy stream
  93. with torch.accelerator.device_index(target_device.index):
  94. current_stream = torch.accelerator.current_stream()
  95. # Sync the current stream with the copy stream
  96. current_stream.wait_stream(stream)
  97. # Ensure tensor memory is not reused until work on
  98. # main stream is complete
  99. if isinstance(obj, PackedSequence):
  100. output.data.record_stream(current_stream) # type: ignore[arg-type]
  101. else:
  102. if not isinstance(output, torch.Tensor):
  103. raise AssertionError("output must be a torch.Tensor")
  104. output.record_stream(current_stream) # type: ignore[arg-type]
  105. return (output,)
  106. from torch.nn.parallel.scatter_gather import _is_namedtuple
  107. if _is_namedtuple(obj):
  108. # pyrefly: ignore [no-matching-overload]
  109. return [type(obj)(*args) for args in zip(*map(to_map, obj))]
  110. if isinstance(obj, tuple) and len(obj) > 0:
  111. # pyrefly: ignore [no-matching-overload]
  112. return list(zip(*map(to_map, obj)))
  113. if isinstance(obj, list) and len(obj) > 0:
  114. # pyrefly: ignore [no-matching-overload]
  115. return [list(i) for i in zip(*map(to_map, obj))]
  116. if isinstance(obj, dict) and len(obj) > 0:
  117. # pyrefly: ignore [no-matching-overload]
  118. return [type(obj)(i) for i in zip(*map(to_map, obj.items()))]
  119. return [obj]
  120. # Avoid reference cycle
  121. try:
  122. res = to_map(inputs)
  123. finally:
  124. to_map = None # type: ignore[assignment]
  125. return res
  126. def _p_assert(cond: Any, s: str, raise_assertion_error: bool = True) -> None:
  127. """Alternate to ``assert`` when in the backward context to print the error message ``s`` since otherwise, it is swallowed."""
  128. if not cond:
  129. print(s)
  130. traceback.print_stack()
  131. if raise_assertion_error:
  132. raise AssertionError(s)
  133. def _alloc_storage(tensor: torch.Tensor, size: torch.Size) -> None:
  134. """
  135. Allocate storage for ``tensor`` with the given size.
  136. Returns:
  137. bool: ``True`` if this method allocated storage and ``False`` if the
  138. storage was already allocated.
  139. """
  140. with torch.no_grad():
  141. if not torch.distributed._functional_collectives.is_torchdynamo_compiling():
  142. already_allocated = tensor._typed_storage()._size() == size.numel()
  143. if not already_allocated:
  144. tensor_storage_size = tensor._typed_storage()._size()
  145. _p_assert(
  146. tensor_storage_size == 0,
  147. "Tensor storage should have been resized to be 0 but got PLACEHOLDER",
  148. )
  149. tensor._typed_storage()._resize_(size.numel())
  150. def _free_storage(tensor: torch.Tensor):
  151. """
  152. Frees the underlying storage of ``tensor``.
  153. Returns:
  154. bool: ``True`` if the method freed the storage and ``False`` if the
  155. storage was already freed.
  156. """
  157. with torch.no_grad():
  158. if not torch.distributed._functional_collectives.is_torchdynamo_compiling():
  159. already_freed = tensor._typed_storage()._size() == 0
  160. if not already_freed:
  161. _p_assert(
  162. tensor.storage_offset() == 0,
  163. "Freeing a tensor's storage is unsafe when it is not the sole occupant\n"
  164. f"storage offset: {tensor.storage_offset()}\n"
  165. f"storage size: {tensor._typed_storage()._size()}\n"
  166. f"tensor shape: {tensor.shape}",
  167. )
  168. tensor._typed_storage()._resize_(0)
  169. Q = TypeVar("Q")
  170. R = TypeVar("R", dict, list, tuple, set, OrderedDict, PackedSequence, Any)
  171. @overload
  172. def _apply_to_tensors(
  173. fn: Callable[[torch.Tensor], Q], container: torch.Tensor
  174. ) -> Q: ...
  175. @overload
  176. def _apply_to_tensors(fn: Callable[[torch.Tensor], Any], container: R) -> R: ...
  177. def _apply_to_tensors(fn, container):
  178. """Recursively apply to all tensor in different kinds of container types."""
  179. def apply(x):
  180. from torch.nn.parallel.scatter_gather import _is_namedtuple
  181. if isinstance(x, torch.Tensor):
  182. return fn(x)
  183. elif hasattr(x, "__dataclass_fields__"):
  184. dc = dataclasses.replace(x)
  185. changes = {
  186. f.name: apply(getattr(dc, f.name)) for f in dataclasses.fields(dc)
  187. }
  188. return dataclasses.replace(dc, **changes)
  189. elif isinstance(x, OrderedDict):
  190. od = x.__class__()
  191. for key, value in x.items():
  192. od[key] = apply(value)
  193. return od
  194. elif isinstance(x, PackedSequence):
  195. apply(x.data)
  196. return x
  197. elif isinstance(x, dict):
  198. return {key: apply(value) for key, value in x.items()}
  199. elif _is_namedtuple(x):
  200. res = (apply(el) for el in x)
  201. return type(x)(*res)
  202. elif isinstance(x, (list, tuple, set)):
  203. return type(x)(apply(el) for el in x)
  204. else:
  205. return x
  206. return apply(container)
  207. def _to_kwargs(
  208. inputs: tuple[Any, ...],
  209. kwargs: dict[str, Any] | None,
  210. target_device: torch.device,
  211. use_side_stream_for_tensor_copies: bool,
  212. ) -> tuple[tuple[Any, ...], tuple[dict[str, Any], ...]]:
  213. moved_inputs = (
  214. _recursive_to(inputs, target_device, use_side_stream_for_tensor_copies)
  215. if inputs
  216. else []
  217. )
  218. moved_kwargs = (
  219. _recursive_to(kwargs, target_device, use_side_stream_for_tensor_copies)
  220. if kwargs
  221. else []
  222. )
  223. if len(moved_inputs) < len(moved_kwargs):
  224. moved_inputs.extend([() for _ in range(len(moved_kwargs) - len(inputs))])
  225. elif len(moved_kwargs) < len(moved_inputs):
  226. moved_kwargs.extend([{} for _ in range(len(moved_inputs) - len(moved_kwargs))])
  227. return tuple(moved_inputs), tuple(moved_kwargs)
  228. def _verify_param_shape_across_processes(
  229. process_group: dist.ProcessGroup,
  230. tensors: list[torch.Tensor],
  231. logger: Optional["dist.Logger"] = None,
  232. ):
  233. return dist._verify_params_across_processes(process_group, tensors, logger)
  234. def _sync_module_states(
  235. module: nn.Module,
  236. process_group: dist.ProcessGroup,
  237. broadcast_bucket_size: int,
  238. src: int,
  239. params_and_buffers_to_ignore: Container[str],
  240. broadcast_buffers: bool = True,
  241. ) -> None:
  242. """
  243. Sync ``module``'s parameters and buffers state.
  244. Syncs ``module``'s parameters and buffers state so that all ranks contain
  245. the same module state across all ranks. Note that this API assumes that all
  246. parameter shapes are consistent before running the synchronization. This can
  247. be checked with ``_verify_param_shape_across_processes``.
  248. """
  249. module_states: list[torch.Tensor] = []
  250. for name, param in module.named_parameters():
  251. if name not in params_and_buffers_to_ignore:
  252. module_states.append(param.detach())
  253. if broadcast_buffers:
  254. for name, buffer in module.named_buffers():
  255. if name not in params_and_buffers_to_ignore:
  256. module_states.append(buffer.detach())
  257. _sync_params_and_buffers(process_group, module_states, broadcast_bucket_size, src)
  258. def _sync_params_and_buffers(
  259. process_group: dist.ProcessGroup,
  260. module_states: list[torch.Tensor],
  261. broadcast_bucket_size: int,
  262. src: int,
  263. ) -> None:
  264. """Synchronize ``module_states`` (list of tensors) across all processes by broadcasting them from rank 0."""
  265. if len(module_states) > 0:
  266. dist._broadcast_coalesced(
  267. process_group, module_states, broadcast_bucket_size, src
  268. )
  269. def _replace_by_prefix(
  270. state_dict: dict[str, Any],
  271. old_prefix: str,
  272. new_prefix: str,
  273. ) -> None:
  274. """
  275. Replace all keys that match a given old_prefix with a new_prefix (in-place).
  276. Usage::
  277. state_dict = {"layer.xyz": torch.tensor(1)}
  278. replace_by_prefix_(state_dict, "layer.", "module.layer.")
  279. assert state_dict == {"module.layer.xyz": torch.tensor(1)}
  280. """
  281. if old_prefix == new_prefix:
  282. raise ValueError("old_prefix and new_prefix must be distinct")
  283. for key in list(state_dict.keys()):
  284. if not key.startswith(old_prefix):
  285. continue
  286. new_key = new_prefix + key[len(old_prefix) :]
  287. state_dict[new_key] = state_dict[key]
  288. del state_dict[key]
  289. def _data_ptr_allocated(tensor: torch.Tensor) -> bool:
  290. return tensor.untyped_storage().data_ptr() > 0
  291. def _get_root_modules(modules: list[nn.Module]) -> list[nn.Module]:
  292. """
  293. Returns the modules in ``modules`` that are root modules (i.e.
  294. parent-less) with respect to the set ``modules``. In other words, these
  295. are the modules in ``modules`` that are the not child of any other
  296. module in ``modules``.
  297. """
  298. root_modules: list[nn.Module] = []
  299. module_to_modules: dict[nn.Module, set[nn.Module]] = {
  300. module: set(module.modules()) for module in modules
  301. }
  302. for candidate_module in modules:
  303. is_root_module = True
  304. for module, _modules in module_to_modules.items():
  305. is_child_module = (
  306. candidate_module is not module and candidate_module in _modules
  307. )
  308. if is_child_module:
  309. is_root_module = False
  310. break
  311. if is_root_module:
  312. root_modules.append(candidate_module)
  313. return root_modules