functional.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. """Functional implementations of various blur operations for image processing.
  2. This module provides a collection of low-level functions for applying different blur effects
  3. to images, including standard blur, median blur, glass blur, defocus, and zoom effects.
  4. These functions form the foundation for the corresponding transform classes.
  5. """
  6. from __future__ import annotations
  7. import random
  8. from collections.abc import Sequence
  9. from itertools import product
  10. from math import ceil
  11. from typing import Literal
  12. from warnings import warn
  13. import cv2
  14. import numpy as np
  15. from albucore import clipped, float32_io, maybe_process_in_chunks, preserve_channel_dim, uint8_io
  16. from pydantic import ValidationInfo
  17. from albumentations.augmentations.geometric.functional import scale
  18. from albumentations.augmentations.pixel.functional import convolve
  19. from albumentations.core.type_definitions import EIGHT
  20. __all__ = ["box_blur", "central_zoom", "defocus", "glass_blur", "median_blur", "zoom_blur"]
  21. @preserve_channel_dim
  22. def box_blur(img: np.ndarray, ksize: int) -> np.ndarray:
  23. """Blur an image.
  24. This function applies a blur to an image.
  25. Args:
  26. img (np.ndarray): Input image.
  27. ksize (int): Kernel size.
  28. Returns:
  29. np.ndarray: Blurred image.
  30. """
  31. blur_fn = maybe_process_in_chunks(cv2.blur, ksize=(ksize, ksize))
  32. return blur_fn(img)
  33. @preserve_channel_dim
  34. @uint8_io
  35. def median_blur(img: np.ndarray, ksize: int) -> np.ndarray:
  36. """Median blur an image.
  37. This function applies a median blur to an image.
  38. Args:
  39. img (np.ndarray): Input image.
  40. ksize (int): Kernel size.
  41. Returns:
  42. np.ndarray: Median blurred image.
  43. """
  44. blur_fn = maybe_process_in_chunks(cv2.medianBlur, ksize=ksize)
  45. return blur_fn(img)
  46. @preserve_channel_dim
  47. def glass_blur(
  48. img: np.ndarray,
  49. sigma: float,
  50. max_delta: int,
  51. iterations: int,
  52. dxy: np.ndarray,
  53. mode: Literal["fast", "exact"],
  54. ) -> np.ndarray:
  55. """Glass blur an image.
  56. This function applies a glass blur to an image.
  57. Args:
  58. img (np.ndarray): Input image.
  59. sigma (float): Sigma.
  60. max_delta (int): Maximum delta.
  61. iterations (int): Number of iterations.
  62. dxy (np.ndarray): Dxy.
  63. mode (Literal["fast", "exact"]): Mode.
  64. Returns:
  65. np.ndarray: Glass blurred image.
  66. """
  67. x = cv2.GaussianBlur(np.array(img), sigmaX=sigma, ksize=(0, 0))
  68. if mode == "fast":
  69. hs = np.arange(img.shape[0] - max_delta, max_delta, -1)
  70. ws = np.arange(img.shape[1] - max_delta, max_delta, -1)
  71. h: int | np.ndarray = np.tile(hs, ws.shape[0])
  72. w: int | np.ndarray = np.repeat(ws, hs.shape[0])
  73. for i in range(iterations):
  74. dy = dxy[:, i, 0]
  75. dx = dxy[:, i, 1]
  76. x[h, w], x[h + dy, w + dx] = x[h + dy, w + dx], x[h, w]
  77. elif mode == "exact":
  78. for ind, (i, h, w) in enumerate(
  79. product(
  80. range(iterations),
  81. range(img.shape[0] - max_delta, max_delta, -1),
  82. range(img.shape[1] - max_delta, max_delta, -1),
  83. ),
  84. ):
  85. idx = ind if ind < len(dxy) else ind % len(dxy)
  86. dy = dxy[idx, i, 0]
  87. dx = dxy[idx, i, 1]
  88. x[h, w], x[h + dy, w + dx] = x[h + dy, w + dx], x[h, w]
  89. else:
  90. raise ValueError(f"Unsupported mode `{mode}`. Supports only `fast` and `exact`.")
  91. return cv2.GaussianBlur(x, sigmaX=sigma, ksize=(0, 0))
  92. def defocus(img: np.ndarray, radius: int, alias_blur: float) -> np.ndarray:
  93. """Defocus an image.
  94. This function defocuses an image.
  95. Args:
  96. img (np.ndarray): Input image.
  97. radius (int): Radius.
  98. alias_blur (float): Alias blur.
  99. Returns:
  100. np.ndarray: Defocused image.
  101. """
  102. length = np.arange(-max(8, radius), max(8, radius) + 1)
  103. ksize = 3 if radius <= EIGHT else 5
  104. x, y = np.meshgrid(length, length)
  105. aliased_disk = np.array((x**2 + y**2) <= radius**2, dtype=np.float32)
  106. aliased_disk /= np.sum(aliased_disk)
  107. kernel = cv2.GaussianBlur(aliased_disk, (ksize, ksize), sigmaX=alias_blur)
  108. return convolve(img, kernel=kernel)
  109. def central_zoom(img: np.ndarray, zoom_factor: int) -> np.ndarray:
  110. """Central zoom an image.
  111. This function zooms an image.
  112. Args:
  113. img (np.ndarray): Input image.
  114. zoom_factor (int): Zoom factor.
  115. Returns:
  116. np.ndarray: Zoomed image.
  117. """
  118. height, width = img.shape[:2]
  119. h_ch, w_ch = ceil(height / zoom_factor), ceil(width / zoom_factor)
  120. h_top, w_top = (height - h_ch) // 2, (width - w_ch) // 2
  121. img = scale(img[h_top : h_top + h_ch, w_top : w_top + w_ch], zoom_factor, cv2.INTER_LINEAR)
  122. h_trim_top, w_trim_top = (img.shape[0] - height) // 2, (img.shape[1] - width) // 2
  123. return img[h_trim_top : h_trim_top + height, w_trim_top : w_trim_top + width]
  124. @float32_io
  125. @clipped
  126. def zoom_blur(img: np.ndarray, zoom_factors: np.ndarray | Sequence[int]) -> np.ndarray:
  127. """Zoom blur an image.
  128. This function zooms and blurs an image.
  129. Args:
  130. img (np.ndarray): Input image.
  131. zoom_factors (np.ndarray | Sequence[int]): Zoom factors.
  132. Returns:
  133. np.ndarray: Zoomed and blurred image.
  134. """
  135. out = np.zeros_like(img, dtype=np.float32)
  136. for zoom_factor in zoom_factors:
  137. out += central_zoom(img, zoom_factor)
  138. return (img + out) / (len(zoom_factors) + 1)
  139. def _ensure_min_value(result: tuple[int, int], min_value: int, field_name: str | None) -> tuple[int, int]:
  140. if result[0] < min_value or result[1] < min_value:
  141. new_result = (max(min_value, result[0]), max(min_value, result[1]))
  142. warn(
  143. f"{field_name}: Invalid kernel size range {result}. "
  144. f"Values less than {min_value} are not allowed. "
  145. f"Range automatically adjusted to {new_result}.",
  146. UserWarning,
  147. stacklevel=2,
  148. )
  149. return new_result
  150. return result
  151. def _ensure_odd_values(result: tuple[int, int], field_name: str | None = None) -> tuple[int, int]:
  152. new_result = (
  153. result[0] if result[0] == 0 or result[0] % 2 == 1 else result[0] + 1,
  154. result[1] if result[1] == 0 or result[1] % 2 == 1 else result[1] + 1,
  155. )
  156. if new_result != result:
  157. warn(
  158. f"{field_name}: Non-zero kernel sizes must be odd. Range {result} automatically adjusted to {new_result}.",
  159. UserWarning,
  160. stacklevel=2,
  161. )
  162. return new_result
  163. def process_blur_limit(value: int | tuple[int, int], info: ValidationInfo, min_value: int = 0) -> tuple[int, int]:
  164. """Process blur limit to ensure valid kernel sizes."""
  165. # Convert value to tuple[int, int]
  166. if isinstance(value, Sequence):
  167. if len(value) != 2:
  168. raise ValueError("Sequence must contain exactly 2 elements")
  169. result = (int(value[0]), int(value[1]))
  170. else:
  171. result = (min_value, int(value))
  172. result = _ensure_min_value(result, min_value, info.field_name)
  173. result = _ensure_odd_values(result, info.field_name)
  174. if result[0] > result[1]:
  175. final_result = (result[1], result[1])
  176. warn(
  177. f"{info.field_name}: Invalid range {result} (min > max). Range automatically adjusted to {final_result}.",
  178. UserWarning,
  179. stacklevel=2,
  180. )
  181. return final_result
  182. return result
  183. def create_motion_kernel(
  184. kernel_size: int,
  185. angle: float,
  186. direction: float,
  187. allow_shifted: bool,
  188. random_state: random.Random,
  189. ) -> np.ndarray:
  190. """Create a motion blur kernel.
  191. Args:
  192. kernel_size (int): Size of the kernel (must be odd)
  193. angle (float): Angle in degrees (counter-clockwise)
  194. direction (float): Blur direction (-1.0 to 1.0)
  195. allow_shifted (bool): Allow kernel to be randomly shifted from center
  196. random_state (random.Random): Python's random.Random instance
  197. Returns:
  198. np.ndarray: Motion blur kernel
  199. """
  200. # Validate direction range to prevent unexpected interpolation results
  201. direction = np.clip(direction, -1.0, 1.0)
  202. kernel = np.zeros((kernel_size, kernel_size), dtype=np.float32)
  203. center = kernel_size // 2
  204. # Convert angle to radians
  205. angle_rad = np.deg2rad(angle)
  206. # Calculate direction vector
  207. dx = np.cos(angle_rad)
  208. dy = np.sin(angle_rad)
  209. # Create line points with direction bias
  210. line_length = kernel_size // 2
  211. # Apply direction bias to control the distribution of blur
  212. if direction < 0:
  213. # Backward bias: interpolate between symmetric and backward-only
  214. # direction = -1: only backward, direction = 0: symmetric
  215. bias_factor = abs(direction)
  216. t_start = float(-line_length)
  217. t_end = line_length * (1 - bias_factor)
  218. elif direction > 0:
  219. # Forward bias: interpolate between symmetric and forward-only
  220. # direction = 1: only forward, direction = 0: symmetric
  221. bias_factor = direction
  222. t_start = -line_length * (1 - bias_factor)
  223. t_end = float(line_length)
  224. else:
  225. # Symmetric case (direction = 0)
  226. t_start = float(-line_length)
  227. t_end = float(line_length)
  228. # Generate points along the biased line
  229. t = np.linspace(t_start, t_end, kernel_size)
  230. # Generate line coordinates
  231. x = center + dx * t
  232. y = center + dy * t
  233. # Apply random shift if allowed
  234. if allow_shifted:
  235. shift_x = random_state.uniform(-1, 1) * line_length / 2
  236. shift_y = random_state.uniform(-1, 1) * line_length / 2
  237. x += shift_x
  238. y += shift_y
  239. # Round coordinates and clip to kernel bounds
  240. x = np.clip(np.round(x), 0, kernel_size - 1).astype(int)
  241. y = np.clip(np.round(y), 0, kernel_size - 1).astype(int)
  242. # Keep only unique points to avoid multiple assignments
  243. points = np.unique(np.column_stack([y, x]), axis=0)
  244. kernel[points[:, 0], points[:, 1]] = 1
  245. # Ensure at least one point is set
  246. if not kernel.any():
  247. kernel[center, center] = 1
  248. return kernel
  249. def sample_odd_from_range(random_state: random.Random, low: int, high: int) -> int:
  250. """Sample an odd number from the range [low, high] (inclusive).
  251. Args:
  252. random_state (random.Random): instance of random.Random
  253. low (int): lower bound (will be converted to nearest valid odd number)
  254. high (int): upper bound (will be converted to nearest valid odd number)
  255. Returns:
  256. int: Randomly sampled odd number from the range
  257. Note:
  258. - Input values will be converted to nearest valid odd numbers:
  259. * Values less than 3 will become 3
  260. * Even values will be rounded up to next odd number
  261. - After normalization, high must be >= low
  262. """
  263. # Normalize low value
  264. low = max(3, low + (low % 2 == 0))
  265. # Normalize high value
  266. high = max(3, high + (high % 2 == 0))
  267. # Ensure high >= low after normalization
  268. high = max(high, low)
  269. if low == high:
  270. return low
  271. # Calculate number of possible odd values
  272. num_odd_values = (high - low) // 2 + 1
  273. # Generate random index and convert to corresponding odd number
  274. rand_idx = random_state.randint(0, num_odd_values - 1)
  275. return low + (2 * rand_idx)
  276. def create_gaussian_kernel(sigma: float, ksize: int = 0) -> np.ndarray:
  277. """Create a Gaussian kernel following PIL's approach.
  278. Args:
  279. sigma (float): Standard deviation for Gaussian kernel.
  280. ksize (int): Kernel size. If 0, size is computed as int(sigma * 3.5) * 2 + 1
  281. to match PIL's implementation. Otherwise, must be positive and odd.
  282. Returns:
  283. np.ndarray: 2D normalized Gaussian kernel.
  284. """
  285. # PIL's kernel creation approach
  286. size = int(sigma * 3.5) * 2 + 1 if ksize == 0 else ksize
  287. # Ensure odd size
  288. size = size + 1 if size % 2 == 0 else size
  289. # Create x coordinates
  290. x = np.linspace(-(size // 2), size // 2, size)
  291. # Compute 1D kernel using vectorized operations
  292. kernel_1d = np.exp(-0.5 * (x / sigma) ** 2)
  293. kernel_1d = kernel_1d / kernel_1d.sum()
  294. # Create 2D kernel
  295. return kernel_1d[:, np.newaxis] @ kernel_1d[np.newaxis, :]
  296. def create_gaussian_kernel_1d(sigma: float, ksize: int = 0) -> np.ndarray:
  297. """Create a 1D Gaussian kernel following PIL's approach.
  298. Args:
  299. sigma (float): Standard deviation for Gaussian kernel.
  300. ksize (int): Kernel size. If 0, size is computed as int(sigma * 3.5) * 2 + 1
  301. to match PIL's implementation. Otherwise, must be positive and odd.
  302. Returns:
  303. np.ndarray: 1D normalized Gaussian kernel.
  304. """
  305. # PIL's kernel creation approach
  306. size = int(sigma * 3.5) * 2 + 1 if ksize == 0 else ksize
  307. # Ensure odd size
  308. size = size + 1 if size % 2 == 0 else size
  309. # Create x coordinates
  310. x = create_gaussian_kernel_input_array(size=size)
  311. # Compute 1D kernel using vectorized operations
  312. kernel_1d = np.exp(-0.5 * (x / sigma) ** 2)
  313. return kernel_1d / kernel_1d.sum()
  314. def create_gaussian_kernel_input_array(size: int) -> np.ndarray:
  315. """Creates a 1-D array which will create an array of x-coordinates which will be input for the
  316. gaussian function (values from -size/2 to size/2 with step size of 1)
  317. Piecewise function is needed as equivalent python list comprehension is faster than np.linspace
  318. for values of size < 100
  319. Args:
  320. size (int): kernel size
  321. Returns:
  322. np.ndarray: x-coordinate array which will be input for gaussian function that will be used for
  323. separable gaussian blur
  324. """
  325. if size < 100:
  326. return np.array(list(range(-(size // 2), (size // 2) + 1, 1)))
  327. return np.linspace(-(size // 2), size // 2, size)