utils.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. from collections.abc import Callable, Sequence
  2. from functools import update_wrapper
  3. from typing import Any, Final, Generic, overload, TypeVar
  4. import torch
  5. import torch.nn.functional as F
  6. from torch import SymInt, Tensor
  7. from torch.overrides import is_tensor_like
  8. from torch.types import _dtype, _Number, Device, Number
  9. euler_constant: Final[float] = 0.57721566490153286060 # Euler Mascheroni Constant
  10. __all__ = [
  11. "broadcast_all",
  12. "logits_to_probs",
  13. "clamp_probs",
  14. "probs_to_logits",
  15. "lazy_property",
  16. "tril_matrix_to_vec",
  17. "vec_to_tril_matrix",
  18. ]
  19. # FIXME: Use (*values: *Ts) -> tuple[Tensor for T in Ts] if Mapping-Type is ever added.
  20. # See https://github.com/python/typing/issues/1216#issuecomment-2126153831
  21. def broadcast_all(*values: Tensor | Number) -> tuple[Tensor, ...]:
  22. r"""
  23. Given a list of values (possibly containing numbers), returns a list where each
  24. value is broadcasted based on the following rules:
  25. - `torch.*Tensor` instances are broadcasted as per :ref:`_broadcasting-semantics`.
  26. - Number instances (scalars) are upcast to tensors having
  27. the same size and type as the first tensor passed to `values`. If all the
  28. values are scalars, then they are upcasted to scalar Tensors.
  29. Args:
  30. values (list of `Number`, `torch.*Tensor` or objects implementing __torch_function__)
  31. Raises:
  32. ValueError: if any of the values is not a `Number` instance,
  33. a `torch.*Tensor` instance, or an instance implementing __torch_function__
  34. """
  35. if not all(is_tensor_like(v) or isinstance(v, _Number) for v in values):
  36. raise ValueError(
  37. "Input arguments must all be instances of Number, "
  38. "torch.Tensor or objects implementing __torch_function__."
  39. )
  40. if not all(is_tensor_like(v) for v in values):
  41. options: dict[str, Any] = dict(dtype=torch.get_default_dtype())
  42. for value in values:
  43. if isinstance(value, torch.Tensor):
  44. options = dict(dtype=value.dtype, device=value.device)
  45. break
  46. new_values = [
  47. v if is_tensor_like(v) else torch.tensor(v, **options) for v in values
  48. ]
  49. return torch.broadcast_tensors(*new_values)
  50. return torch.broadcast_tensors(*values)
  51. def _standard_normal(
  52. shape: Sequence[int | SymInt],
  53. dtype: _dtype | None,
  54. device: Device | None,
  55. ) -> Tensor:
  56. if torch._C._get_tracing_state():
  57. # [JIT WORKAROUND] lack of support for .normal_()
  58. return torch.normal(
  59. torch.zeros(shape, dtype=dtype, device=device),
  60. torch.ones(shape, dtype=dtype, device=device),
  61. )
  62. return torch.empty(shape, dtype=dtype, device=device).normal_()
  63. def _sum_rightmost(value: Tensor, dim: int) -> Tensor:
  64. r"""
  65. Sum out ``dim`` many rightmost dimensions of a given tensor.
  66. Args:
  67. value (Tensor): A tensor of ``.dim()`` at least ``dim``.
  68. dim (int): The number of rightmost dims to sum out.
  69. """
  70. if dim == 0:
  71. return value
  72. required_shape = value.shape[:-dim] + (-1,)
  73. return value.reshape(required_shape).sum(-1)
  74. def logits_to_probs(logits: Tensor, is_binary: bool = False) -> Tensor:
  75. r"""
  76. Converts a tensor of logits into probabilities. Note that for the
  77. binary case, each value denotes log odds, whereas for the
  78. multi-dimensional case, the values along the last dimension denote
  79. the log probabilities (possibly unnormalized) of the events.
  80. """
  81. if is_binary:
  82. return torch.sigmoid(logits)
  83. return F.softmax(logits, dim=-1)
  84. def clamp_probs(probs: Tensor) -> Tensor:
  85. """Clamps the probabilities to be in the open interval `(0, 1)`.
  86. The probabilities would be clamped between `eps` and `1 - eps`,
  87. and `eps` would be the smallest representable positive number for the input data type.
  88. Args:
  89. probs (Tensor): A tensor of probabilities.
  90. Returns:
  91. Tensor: The clamped probabilities.
  92. Examples:
  93. >>> probs = torch.tensor([0.0, 0.5, 1.0])
  94. >>> clamp_probs(probs)
  95. tensor([1.1921e-07, 5.0000e-01, 1.0000e+00])
  96. >>> probs = torch.tensor([0.0, 0.5, 1.0], dtype=torch.float64)
  97. >>> clamp_probs(probs)
  98. tensor([2.2204e-16, 5.0000e-01, 1.0000e+00], dtype=torch.float64)
  99. """
  100. eps = torch.finfo(probs.dtype).eps
  101. return probs.clamp(min=eps, max=1 - eps)
  102. def probs_to_logits(probs: Tensor, is_binary: bool = False) -> Tensor:
  103. r"""
  104. Converts a tensor of probabilities into logits. For the binary case,
  105. this denotes the probability of occurrence of the event indexed by `1`.
  106. For the multi-dimensional case, the values along the last dimension
  107. denote the probabilities of occurrence of each of the events.
  108. """
  109. ps_clamped = clamp_probs(probs)
  110. if is_binary:
  111. return torch.log(ps_clamped) - torch.log1p(-ps_clamped)
  112. return torch.log(ps_clamped)
  113. T = TypeVar("T", contravariant=True)
  114. R = TypeVar("R", covariant=True)
  115. class lazy_property(Generic[T, R]):
  116. r"""
  117. Used as a decorator for lazy loading of class attributes. This uses a
  118. non-data descriptor that calls the wrapped method to compute the property on
  119. first call; thereafter replacing the wrapped method into an instance
  120. attribute.
  121. """
  122. def __init__(self, wrapped: Callable[[T], R]) -> None:
  123. self.wrapped: Callable[[T], R] = wrapped
  124. update_wrapper(self, wrapped) # type:ignore[arg-type]
  125. @overload
  126. def __get__(
  127. self, instance: None, obj_type: Any = None
  128. ) -> "_lazy_property_and_property[T, R]": ...
  129. @overload
  130. def __get__(self, instance: T, obj_type: Any = None) -> R: ...
  131. def __get__(
  132. self, instance: T | None, obj_type: Any = None
  133. ) -> "R | _lazy_property_and_property[T, R]":
  134. if instance is None:
  135. return _lazy_property_and_property(self.wrapped)
  136. with torch.enable_grad():
  137. value = self.wrapped(instance)
  138. setattr(instance, self.wrapped.__name__, value)
  139. return value
  140. class _lazy_property_and_property(lazy_property[T, R], property):
  141. """We want lazy properties to look like multiple things.
  142. * property when Sphinx autodoc looks
  143. * lazy_property when Distribution validate_args looks
  144. """
  145. def __init__(self, wrapped: Callable[[T], R]) -> None:
  146. property.__init__(self, wrapped)
  147. def tril_matrix_to_vec(mat: Tensor, diag: int = 0) -> Tensor:
  148. r"""
  149. Convert a `D x D` matrix or a batch of matrices into a (batched) vector
  150. which comprises of lower triangular elements from the matrix in row order.
  151. """
  152. n = mat.shape[-1]
  153. if not torch._C._get_tracing_state() and (diag < -n or diag >= n):
  154. raise ValueError(f"diag ({diag}) provided is outside [{-n}, {n - 1}].")
  155. arange = torch.arange(n, device=mat.device)
  156. tril_mask = arange < arange.view(-1, 1) + (diag + 1)
  157. vec = mat[..., tril_mask]
  158. return vec
  159. def vec_to_tril_matrix(vec: Tensor, diag: int = 0) -> Tensor:
  160. r"""
  161. Convert a vector or a batch of vectors into a batched `D x D`
  162. lower triangular matrix containing elements from the vector in row order.
  163. """
  164. # +ve root of D**2 + (1+2*diag)*D - |diag| * (diag+1) - 2*vec.shape[-1] = 0
  165. n = (
  166. -(1 + 2 * diag)
  167. + ((1 + 2 * diag) ** 2 + 8 * vec.shape[-1] + 4 * abs(diag) * (diag + 1)) ** 0.5
  168. ) / 2
  169. eps = torch.finfo(vec.dtype).eps
  170. if not torch._C._get_tracing_state() and (round(n) - n > eps):
  171. raise ValueError(
  172. f"The size of last dimension is {vec.shape[-1]} which cannot be expressed as "
  173. + "the lower triangular part of a square D x D matrix."
  174. )
  175. n = round(n.item()) if isinstance(n, torch.Tensor) else round(n)
  176. mat = vec.new_zeros(vec.shape[:-1] + torch.Size((n, n)))
  177. arange = torch.arange(n, device=vec.device)
  178. tril_mask = arange < arange.view(-1, 1) + (diag + 1)
  179. mat[..., tril_mask] = vec
  180. return mat