blur_pool.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. """
  2. BlurPool layer inspired by
  3. - Kornia's Max_BlurPool2d
  4. - Making Convolutional Networks Shift-Invariant Again :cite:`zhang2019shiftinvar`
  5. Hacked together by Chris Ha and Ross Wightman
  6. """
  7. from functools import partial
  8. from math import comb # Python 3.8
  9. from typing import Callable, Optional, Type, Union
  10. import torch
  11. import torch.nn as nn
  12. import torch.nn.functional as F
  13. from .padding import get_padding
  14. from .typing import LayerType
  15. class BlurPool2d(nn.Module):
  16. r"""Creates a module that computes blurs and downsample a given feature map.
  17. See :cite:`zhang2019shiftinvar` for more details.
  18. Corresponds to the Downsample class, which does blurring and subsampling
  19. Args:
  20. channels = Number of input channels
  21. filt_size (int): binomial filter size for blurring. currently supports 3 (default) and 5.
  22. stride (int): downsampling filter stride
  23. Returns:
  24. torch.Tensor: the transformed tensor.
  25. """
  26. def __init__(
  27. self,
  28. channels: Optional[int] = None,
  29. filt_size: int = 3,
  30. stride: int = 2,
  31. pad_mode: str = 'reflect',
  32. device=None,
  33. dtype=None
  34. ) -> None:
  35. super().__init__()
  36. assert filt_size > 1
  37. self.channels = channels
  38. self.filt_size = filt_size
  39. self.stride = stride
  40. self.pad_mode = pad_mode
  41. self.padding = [get_padding(filt_size, stride, dilation=1)] * 4
  42. # Register empty buffer with correct shape
  43. filt_shape = (channels or 1, 1, filt_size, filt_size)
  44. self.register_buffer('filt', torch.empty(filt_shape, device=device, dtype=dtype), persistent=False)
  45. # TODO: skip init when on meta device when safe to do so
  46. self.reset_parameters()
  47. def reset_parameters(self) -> None:
  48. """Initialize buffers."""
  49. self._init_buffers()
  50. def _init_buffers(self) -> None:
  51. """Compute and fill non-persistent buffer values."""
  52. # (0.5 + 0.5 x)^N => coefficients = C(N,k) / 2^N, k = 0..N
  53. coeffs = torch.tensor(
  54. [comb(self.filt_size - 1, k) for k in range(self.filt_size)],
  55. device='cpu',
  56. dtype=torch.float32,
  57. ) / (2 ** (self.filt_size - 1)) # normalise so coefficients sum to 1
  58. blur_filter = (coeffs[:, None] * coeffs[None, :])[None, None, :, :]
  59. if self.channels is not None:
  60. blur_filter = blur_filter.repeat(self.channels, 1, 1, 1)
  61. self.filt.copy_(blur_filter)
  62. def forward(self, x: torch.Tensor) -> torch.Tensor:
  63. x = F.pad(x, self.padding, mode=self.pad_mode)
  64. if self.channels is None:
  65. channels = x.shape[1]
  66. weight = self.filt.expand(channels, 1, self.filt_size, self.filt_size)
  67. else:
  68. channels = self.channels
  69. weight = self.filt
  70. return F.conv2d(x, weight, stride=self.stride, groups=channels)
  71. def init_non_persistent_buffers(self) -> None:
  72. """Initialize non-persistent buffers."""
  73. self._init_buffers()
  74. def _normalize_aa_layer(aa_layer: LayerType) -> Callable[..., nn.Module]:
  75. """Map string shorthands to callables (class or partial)."""
  76. if isinstance(aa_layer, str):
  77. key = aa_layer.lower().replace('_', '').replace('-', '')
  78. if key in ('avg', 'avgpool'):
  79. return nn.AvgPool2d
  80. if key in ('blur', 'blurpool'):
  81. return BlurPool2d
  82. if key == 'blurpc':
  83. # preconfigure a constant-pad BlurPool2d
  84. return partial(BlurPool2d, pad_mode='constant')
  85. raise AssertionError(f"Unknown anti-aliasing layer ({aa_layer}).")
  86. return aa_layer
  87. def _underlying_cls(layer_callable: Callable[..., nn.Module]):
  88. """Return the class behind a callable (unwrap partial), else None."""
  89. if isinstance(layer_callable, partial):
  90. return layer_callable.func
  91. return layer_callable if isinstance(layer_callable, type) else None
  92. def _is_blurpool(layer_callable: Callable[..., nn.Module]) -> bool:
  93. """True if callable is BlurPool2d or a partial of it."""
  94. cls = _underlying_cls(layer_callable)
  95. try:
  96. return issubclass(cls, BlurPool2d) # cls may be None, protect below
  97. except TypeError:
  98. return False
  99. except Exception:
  100. return False
  101. def create_aa(
  102. aa_layer: LayerType,
  103. channels: Optional[int] = None,
  104. stride: int = 2,
  105. enable: bool = True,
  106. noop: Optional[Type[nn.Module]] = nn.Identity,
  107. device=None,
  108. dtype=None,
  109. ) -> Optional[nn.Module]:
  110. """ Anti-aliasing factory that supports strings, classes, and partials. """
  111. if not aa_layer or not enable:
  112. return noop() if noop is not None else None
  113. # Resolve strings to callables
  114. aa_layer = _normalize_aa_layer(aa_layer)
  115. # Build kwargs we *intend* to pass
  116. call_kwargs = {"channels": channels, "stride": stride}
  117. # Only add device/dtype for BlurPool2d (or partial of it) and don't override if already provided in the partial.
  118. if _is_blurpool(aa_layer):
  119. # Check if aa_layer is a partial and already has device/dtype set
  120. existing_kw = aa_layer.keywords if isinstance(aa_layer, partial) and aa_layer.keywords else {}
  121. if "device" not in existing_kw and device is not None:
  122. call_kwargs["device"] = device
  123. if "dtype" not in existing_kw and dtype is not None:
  124. call_kwargs["dtype"] = dtype
  125. # Try (channels, stride, [device, dtype]) first; fall back to (stride) only
  126. try:
  127. return aa_layer(**call_kwargs)
  128. except TypeError:
  129. # Some layers (e.g., AvgPool2d) may not accept 'channels' and need stride passed as kernel
  130. return aa_layer(stride)