functional_sgd.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # mypy: allow-untyped-defs
  2. import torch
  3. import torch.optim._functional as F
  4. from torch import Tensor
  5. from torch.distributed.optim._deprecation_warning import (
  6. _scripted_functional_optimizer_deprecation_warning,
  7. )
  8. __all__: list[str] = []
  9. # Define a TorchScript compatible Functional SGD Optimizer
  10. # where we use these optimizer in a functional way.
  11. # Instead of using the `param.grad` when updating parameters,
  12. # we explicitly allow the distributed optimizer pass gradients to
  13. # the `step` function. In this way, we could separate the gradients
  14. # and parameters and allow multithreaded trainer to update the
  15. # parameters without data traces on accumulating to the same .grad.
  16. # NOTE: This should be only used by distributed optimizer internals
  17. # and not meant to expose to the user.
  18. @torch.jit.script
  19. class _FunctionalSGD:
  20. def __init__(
  21. self,
  22. params: list[Tensor],
  23. lr: float = 1e-2,
  24. momentum: float = 0.0,
  25. dampening: float = 0.0,
  26. weight_decay: float = 0.0,
  27. nesterov: bool = False,
  28. maximize: bool = False,
  29. foreach: bool = False,
  30. fused: bool = False,
  31. _allow_empty_param_list: bool = False,
  32. ):
  33. _scripted_functional_optimizer_deprecation_warning(stacklevel=2)
  34. self.defaults = {
  35. "lr": lr,
  36. "momentum": momentum,
  37. "dampening": dampening,
  38. "weight_decay": weight_decay,
  39. }
  40. self.nesterov = nesterov
  41. self.maximize = maximize
  42. self.foreach = foreach
  43. self.fused = fused
  44. self.state = torch.jit.annotate(dict[torch.Tensor, dict[str, torch.Tensor]], {})
  45. if len(params) == 0 and not _allow_empty_param_list:
  46. raise ValueError("optimizer got an empty parameter list")
  47. # NOTE: we only have one param_group and don't allow user to add additional
  48. # param group as it's not a common use case.
  49. self.param_group = {"params": params}
  50. def step_param(self, param: Tensor, grad: Tensor | None):
  51. """Similar to self.step, but operates on a single parameter and
  52. its gradient.
  53. """
  54. # TODO: Once step_param interface is robust, refactor step to call
  55. # step param on each param.
  56. weight_decay = self.defaults["weight_decay"]
  57. momentum = self.defaults["momentum"]
  58. dampening = self.defaults["dampening"]
  59. lr = self.defaults["lr"]
  60. params = [param]
  61. momentum_buffer_list: list[Tensor | None] = []
  62. grads = []
  63. has_sparse_grad = False
  64. if grad is not None:
  65. grads.append(grad)
  66. if grad.is_sparse:
  67. has_sparse_grad = True
  68. if param not in self.state:
  69. self.state[param] = {}
  70. state = self.state[param]
  71. if "momentum_buffer" not in state:
  72. momentum_buffer_list.append(None)
  73. else:
  74. momentum_buffer_list.append(state["momentum_buffer"])
  75. with torch.no_grad():
  76. F.sgd(
  77. params,
  78. grads,
  79. momentum_buffer_list,
  80. weight_decay=weight_decay,
  81. momentum=momentum,
  82. lr=lr,
  83. dampening=dampening,
  84. nesterov=self.nesterov,
  85. maximize=self.maximize,
  86. has_sparse_grad=has_sparse_grad,
  87. foreach=self.foreach,
  88. fused=self.fused,
  89. grad_scale=None,
  90. found_inf=None,
  91. )
  92. # update momentum_buffer in state
  93. state = self.state[param]
  94. momentum_buffer = momentum_buffer_list[0]
  95. if momentum_buffer is not None:
  96. state["momentum_buffer"] = momentum_buffer
  97. def step(self, gradients: list[Tensor | None]):
  98. params = self.param_group["params"]
  99. params_with_grad = []
  100. grads = []
  101. momentum_buffer_list: list[Tensor | None] = []
  102. lr = self.defaults["lr"]
  103. weight_decay = self.defaults["weight_decay"]
  104. momentum = self.defaults["momentum"]
  105. dampening = self.defaults["dampening"]
  106. if len(params) != len(gradients):
  107. raise ValueError(
  108. "the gradients passed in does not equal to the size of the parameters!"
  109. + f"Params length: {len(params)}. "
  110. + f"Gradients length: {len(gradients)}"
  111. )
  112. has_sparse_grad = False
  113. for param, gradient in zip(params, gradients):
  114. if gradient is not None:
  115. params_with_grad.append(param)
  116. grads.append(gradient)
  117. if gradient.is_sparse:
  118. has_sparse_grad = True
  119. if param not in self.state:
  120. self.state[param] = {}
  121. state = self.state[param]
  122. if "momentum_buffer" not in state:
  123. momentum_buffer_list.append(None)
  124. else:
  125. momentum_buffer_list.append(state["momentum_buffer"])
  126. with torch.no_grad():
  127. F.sgd(
  128. params_with_grad,
  129. grads,
  130. momentum_buffer_list,
  131. weight_decay=weight_decay,
  132. momentum=momentum,
  133. lr=lr,
  134. dampening=dampening,
  135. nesterov=self.nesterov,
  136. maximize=self.maximize,
  137. has_sparse_grad=has_sparse_grad,
  138. foreach=self.foreach,
  139. fused=self.fused,
  140. grad_scale=None,
  141. found_inf=None,
  142. )
  143. # update momentum_buffers in state
  144. for i, p in enumerate(params_with_grad):
  145. state = self.state[p]
  146. momentum_buffer = momentum_buffer_list[i]
  147. if momentum_buffer is not None:
  148. state["momentum_buffer"] = momentum_buffer