categorical.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # mypy: allow-untyped-defs
  2. import torch
  3. from torch import nan, Tensor
  4. from torch.distributions import constraints
  5. from torch.distributions.distribution import Distribution
  6. from torch.distributions.utils import lazy_property, logits_to_probs, probs_to_logits
  7. __all__ = ["Categorical"]
  8. class Categorical(Distribution):
  9. r"""
  10. Creates a categorical distribution parameterized by either :attr:`probs` or
  11. :attr:`logits` (but not both).
  12. .. note::
  13. It is equivalent to the distribution that :func:`torch.multinomial`
  14. samples from.
  15. Samples are integers from :math:`\{0, \ldots, K-1\}` where `K` is ``probs.size(-1)``.
  16. If `probs` is 1-dimensional with length-`K`, each element is the relative probability
  17. of sampling the class at that index.
  18. If `probs` is N-dimensional, the first N-1 dimensions are treated as a batch of
  19. relative probability vectors.
  20. .. note:: The `probs` argument must be non-negative, finite and have a non-zero sum,
  21. and it will be normalized to sum to 1 along the last dimension. :attr:`probs`
  22. will return this normalized value.
  23. The `logits` argument will be interpreted as unnormalized log probabilities
  24. and can therefore be any real number. It will likewise be normalized so that
  25. the resulting probabilities sum to 1 along the last dimension. :attr:`logits`
  26. will return this normalized value.
  27. See also: :func:`torch.multinomial`
  28. Example::
  29. >>> # xdoctest: +IGNORE_WANT("non-deterministic")
  30. >>> m = Categorical(torch.tensor([ 0.25, 0.25, 0.25, 0.25 ]))
  31. >>> m.sample() # equal probability of 0, 1, 2, 3
  32. tensor(3)
  33. Args:
  34. probs (Tensor): event probabilities
  35. logits (Tensor): event log probabilities (unnormalized)
  36. """
  37. # pyrefly: ignore [bad-override]
  38. arg_constraints = {"probs": constraints.simplex, "logits": constraints.real_vector}
  39. has_enumerate_support = True
  40. def __init__(
  41. self,
  42. probs: Tensor | None = None,
  43. logits: Tensor | None = None,
  44. validate_args: bool | None = None,
  45. ) -> None:
  46. if (probs is None) == (logits is None):
  47. raise ValueError(
  48. "Either `probs` or `logits` must be specified, but not both."
  49. )
  50. if probs is not None:
  51. if probs.dim() < 1:
  52. raise ValueError("`probs` parameter must be at least one-dimensional.")
  53. # pyrefly: ignore [read-only]
  54. self.probs = probs / probs.sum(-1, keepdim=True)
  55. else:
  56. if logits is None:
  57. raise AssertionError("logits is unexpectedly None")
  58. if logits.dim() < 1:
  59. raise ValueError("`logits` parameter must be at least one-dimensional.")
  60. # Normalize
  61. # pyrefly: ignore [read-only]
  62. self.logits = logits - logits.logsumexp(dim=-1, keepdim=True)
  63. self._param = self.probs if probs is not None else self.logits
  64. self._num_events = self._param.size()[-1]
  65. batch_shape = (
  66. self._param.size()[:-1] if self._param.ndimension() > 1 else torch.Size()
  67. )
  68. # pyrefly: ignore [bad-argument-type]
  69. super().__init__(batch_shape, validate_args=validate_args)
  70. def expand(self, batch_shape, _instance=None):
  71. new = self._get_checked_instance(Categorical, _instance)
  72. batch_shape = torch.Size(batch_shape)
  73. param_shape = batch_shape + torch.Size((self._num_events,))
  74. if "probs" in self.__dict__:
  75. new.probs = self.probs.expand(param_shape)
  76. new._param = new.probs
  77. if "logits" in self.__dict__:
  78. new.logits = self.logits.expand(param_shape)
  79. new._param = new.logits
  80. new._num_events = self._num_events
  81. super(Categorical, new).__init__(batch_shape, validate_args=False)
  82. new._validate_args = self._validate_args
  83. return new
  84. def _new(self, *args, **kwargs):
  85. return self._param.new(*args, **kwargs)
  86. @constraints.dependent_property(is_discrete=True, event_dim=0)
  87. # pyrefly: ignore [bad-override]
  88. def support(self):
  89. return constraints.integer_interval(0, self._num_events - 1)
  90. @lazy_property
  91. def logits(self) -> Tensor:
  92. return probs_to_logits(self.probs)
  93. @lazy_property
  94. def probs(self) -> Tensor:
  95. return logits_to_probs(self.logits)
  96. @property
  97. def param_shape(self) -> torch.Size:
  98. return self._param.size()
  99. @property
  100. def mean(self) -> Tensor:
  101. return torch.full(
  102. self._extended_shape(),
  103. nan,
  104. dtype=self.probs.dtype,
  105. device=self.probs.device,
  106. )
  107. @property
  108. def mode(self) -> Tensor:
  109. return self.probs.argmax(dim=-1)
  110. @property
  111. def variance(self) -> Tensor:
  112. return torch.full(
  113. self._extended_shape(),
  114. nan,
  115. dtype=self.probs.dtype,
  116. device=self.probs.device,
  117. )
  118. def sample(self, sample_shape=torch.Size()):
  119. if not isinstance(sample_shape, torch.Size):
  120. sample_shape = torch.Size(sample_shape)
  121. probs_2d = self.probs.reshape(-1, self._num_events)
  122. samples_2d = torch.multinomial(probs_2d, sample_shape.numel(), True).T
  123. return samples_2d.reshape(self._extended_shape(sample_shape))
  124. def log_prob(self, value):
  125. if self._validate_args:
  126. self._validate_sample(value)
  127. value = value.long().unsqueeze(-1)
  128. value, log_pmf = torch.broadcast_tensors(value, self.logits)
  129. value = value[..., :1]
  130. return log_pmf.gather(-1, value).squeeze(-1)
  131. def entropy(self):
  132. min_real = torch.finfo(self.logits.dtype).min
  133. logits = torch.clamp(self.logits, min=min_real)
  134. p_log_p = logits * self.probs
  135. return -p_log_p.sum(-1)
  136. def enumerate_support(self, expand=True):
  137. num_events = self._num_events
  138. values = torch.arange(num_events, dtype=torch.long, device=self._param.device)
  139. values = values.view((-1,) + (1,) * len(self._batch_shape))
  140. if expand:
  141. values = values.expand((-1,) + self._batch_shape)
  142. return values