negative_binomial.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. # mypy: allow-untyped-defs
  2. import torch
  3. import torch.nn.functional as F
  4. from torch import Tensor
  5. from torch.distributions import constraints
  6. from torch.distributions.distribution import Distribution
  7. from torch.distributions.gamma import Gamma
  8. from torch.distributions.utils import (
  9. broadcast_all,
  10. lazy_property,
  11. logits_to_probs,
  12. probs_to_logits,
  13. )
  14. __all__ = ["NegativeBinomial"]
  15. class NegativeBinomial(Distribution):
  16. r"""
  17. Creates a Negative Binomial distribution, i.e. distribution
  18. of the number of successful independent and identical Bernoulli trials
  19. before :attr:`total_count` failures are achieved. The probability
  20. of success of each Bernoulli trial is :attr:`probs`.
  21. Args:
  22. total_count (float or Tensor): non-negative number of negative Bernoulli
  23. trials to stop, although the distribution is still valid for real
  24. valued count
  25. probs (Tensor): Event probabilities of success in the half open interval [0, 1)
  26. logits (Tensor): Event log-odds for probabilities of success
  27. """
  28. # pyrefly: ignore [bad-override]
  29. arg_constraints = {
  30. "total_count": constraints.greater_than_eq(0),
  31. "probs": constraints.half_open_interval(0.0, 1.0),
  32. "logits": constraints.real,
  33. }
  34. support = constraints.nonnegative_integer
  35. def __init__(
  36. self,
  37. total_count: Tensor | float,
  38. probs: Tensor | None = None,
  39. logits: Tensor | None = None,
  40. validate_args: bool | None = None,
  41. ) -> None:
  42. if (probs is None) == (logits is None):
  43. raise ValueError(
  44. "Either `probs` or `logits` must be specified, but not both."
  45. )
  46. if probs is not None:
  47. (
  48. self.total_count,
  49. # pyrefly: ignore [read-only]
  50. self.probs,
  51. ) = broadcast_all(total_count, probs)
  52. self.total_count = self.total_count.type_as(self.probs)
  53. else:
  54. if logits is None:
  55. raise AssertionError("logits is unexpectedly None")
  56. (
  57. self.total_count,
  58. # pyrefly: ignore [read-only]
  59. self.logits,
  60. ) = broadcast_all(total_count, logits)
  61. self.total_count = self.total_count.type_as(self.logits)
  62. self._param = self.probs if probs is not None else self.logits
  63. batch_shape = self._param.size()
  64. super().__init__(batch_shape, validate_args=validate_args)
  65. def expand(self, batch_shape, _instance=None):
  66. new = self._get_checked_instance(NegativeBinomial, _instance)
  67. batch_shape = torch.Size(batch_shape)
  68. new.total_count = self.total_count.expand(batch_shape)
  69. if "probs" in self.__dict__:
  70. new.probs = self.probs.expand(batch_shape)
  71. new._param = new.probs
  72. if "logits" in self.__dict__:
  73. new.logits = self.logits.expand(batch_shape)
  74. new._param = new.logits
  75. super(NegativeBinomial, new).__init__(batch_shape, validate_args=False)
  76. new._validate_args = self._validate_args
  77. return new
  78. def _new(self, *args, **kwargs):
  79. return self._param.new(*args, **kwargs)
  80. @property
  81. def mean(self) -> Tensor:
  82. return self.total_count * torch.exp(self.logits)
  83. @property
  84. def mode(self) -> Tensor:
  85. return ((self.total_count - 1) * self.logits.exp()).floor().clamp(min=0.0)
  86. @property
  87. def variance(self) -> Tensor:
  88. return self.mean / torch.sigmoid(-self.logits)
  89. @lazy_property
  90. def logits(self) -> Tensor:
  91. return probs_to_logits(self.probs, is_binary=True)
  92. @lazy_property
  93. def probs(self) -> Tensor:
  94. return logits_to_probs(self.logits, is_binary=True)
  95. @property
  96. def param_shape(self) -> torch.Size:
  97. return self._param.size()
  98. @lazy_property
  99. def _gamma(self) -> Gamma:
  100. # Note we avoid validating because self.total_count can be zero.
  101. return Gamma(
  102. concentration=self.total_count,
  103. rate=torch.exp(-self.logits),
  104. validate_args=False,
  105. )
  106. def sample(self, sample_shape=torch.Size()):
  107. with torch.no_grad():
  108. rate = self._gamma.sample(sample_shape=sample_shape)
  109. return torch.poisson(rate)
  110. def log_prob(self, value):
  111. if self._validate_args:
  112. self._validate_sample(value)
  113. log_unnormalized_prob = self.total_count * F.logsigmoid(
  114. -self.logits
  115. ) + value * F.logsigmoid(self.logits)
  116. log_normalization = (
  117. -torch.lgamma(self.total_count + value)
  118. + torch.lgamma(1.0 + value)
  119. + torch.lgamma(self.total_count)
  120. )
  121. # The case self.total_count == 0 and value == 0 has probability 1 but
  122. # lgamma(0) is infinite. Handle this case separately using a function
  123. # that does not modify tensors in place to allow Jit compilation.
  124. log_normalization = log_normalization.masked_fill(
  125. self.total_count + value == 0.0, 0.0
  126. )
  127. return log_unnormalized_prob - log_normalization