domain_adaptation_functional.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. """Functional implementations for domain adaptation image transformations.
  2. This module provides low-level functions and classes for performing domain adaptation
  3. between images. It includes implementations for histogram matching, Fourier domain adaptation,
  4. and pixel distribution matching with various normalization techniques.
  5. """
  6. from __future__ import annotations
  7. import abc
  8. from copy import deepcopy
  9. from typing import Literal
  10. import cv2
  11. import numpy as np
  12. from albucore import add_weighted, clip, clipped, from_float, get_num_channels, preserve_channel_dim, to_float, uint8_io
  13. from typing_extensions import Protocol
  14. import albumentations.augmentations.geometric.functional as fgeometric
  15. from albumentations.augmentations.utils import PCA
  16. from albumentations.core.type_definitions import MONO_CHANNEL_DIMENSIONS
  17. __all__ = [
  18. "adapt_pixel_distribution",
  19. "apply_histogram",
  20. "fourier_domain_adaptation",
  21. ]
  22. class BaseScaler:
  23. def __init__(self) -> None:
  24. self.data_min: np.ndarray | None = None
  25. self.data_max: np.ndarray | None = None
  26. self.mean: np.ndarray | None = None
  27. self.var: np.ndarray | None = None
  28. self.scale: np.ndarray | None = None
  29. def fit(self, x: np.ndarray) -> None:
  30. raise NotImplementedError
  31. def transform(self, x: np.ndarray) -> np.ndarray:
  32. raise NotImplementedError
  33. def fit_transform(self, x: np.ndarray) -> np.ndarray:
  34. self.fit(x)
  35. return self.transform(x)
  36. def inverse_transform(self, x: np.ndarray) -> np.ndarray:
  37. raise NotImplementedError
  38. class MinMaxScaler(BaseScaler):
  39. def __init__(self, feature_range: tuple[float, float] = (0.0, 1.0)) -> None:
  40. super().__init__()
  41. self.min: float = feature_range[0]
  42. self.max: float = feature_range[1]
  43. self.data_range: np.ndarray | None = None
  44. def fit(self, x: np.ndarray) -> None:
  45. self.data_min = np.min(x, axis=0)
  46. self.data_max = np.max(x, axis=0)
  47. self.data_range = self.data_max - self.data_min
  48. # Handle case where data_min equals data_max
  49. self.data_range[self.data_range == 0] = 1
  50. def transform(self, x: np.ndarray) -> np.ndarray:
  51. if self.data_min is None or self.data_max is None or self.data_range is None:
  52. raise ValueError(
  53. "This MinMaxScaler instance is not fitted yet. "
  54. "Call 'fit' with appropriate arguments before using this estimator.",
  55. )
  56. x_std = np.subtract(x, self.data_min).astype(float)
  57. np.divide(x_std, self.data_range, out=x_std)
  58. np.multiply(x_std, (self.max - self.min), out=x_std)
  59. np.add(x_std, self.min, out=x_std)
  60. return x_std
  61. def inverse_transform(self, x: np.ndarray) -> np.ndarray:
  62. if self.data_min is None or self.data_max is None or self.data_range is None:
  63. raise ValueError(
  64. "This MinMaxScaler instance is not fitted yet. "
  65. "Call 'fit' with appropriate arguments before using this estimator.",
  66. )
  67. x_std = ((x - self.min) / (self.max - self.min)).astype(float)
  68. return x_std * self.data_range + self.data_min
  69. class StandardScaler(BaseScaler):
  70. def __init__(self) -> None:
  71. super().__init__()
  72. def fit(self, x: np.ndarray) -> None:
  73. self.mean = np.mean(x, axis=0)
  74. self.var = np.var(x, axis=0)
  75. self.scale = np.sqrt(self.var)
  76. # Handle case where variance is zero
  77. self.scale[self.scale == 0] = 1
  78. def transform(self, x: np.ndarray) -> np.ndarray:
  79. if self.mean is None or self.scale is None:
  80. raise ValueError(
  81. "This StandardScaler instance is not fitted yet. "
  82. "Call 'fit' with appropriate arguments before using this estimator.",
  83. )
  84. return (x - self.mean) / self.scale
  85. def inverse_transform(self, x: np.ndarray) -> np.ndarray:
  86. if self.mean is None or self.scale is None:
  87. raise ValueError(
  88. "This StandardScaler instance is not fitted yet. "
  89. "Call 'fit' with appropriate arguments before using this estimator.",
  90. )
  91. return (x * self.scale) + self.mean
  92. class TransformerInterface(Protocol):
  93. @abc.abstractmethod
  94. def inverse_transform(self, x: np.ndarray) -> np.ndarray: ...
  95. @abc.abstractmethod
  96. def fit(self, x: np.ndarray, y: np.ndarray | None = None) -> np.ndarray: ...
  97. @abc.abstractmethod
  98. def transform(self, x: np.ndarray, y: np.ndarray | None = None) -> np.ndarray: ...
  99. class DomainAdapter:
  100. def __init__(
  101. self,
  102. transformer: TransformerInterface,
  103. ref_img: np.ndarray,
  104. color_conversions: tuple[None, None] = (None, None),
  105. ):
  106. self.color_in, self.color_out = color_conversions
  107. self.source_transformer = deepcopy(transformer)
  108. self.target_transformer = transformer
  109. self.num_channels = get_num_channels(ref_img)
  110. self.target_transformer.fit(self.flatten(ref_img))
  111. def to_colorspace(self, img: np.ndarray) -> np.ndarray:
  112. return img if self.color_in is None else cv2.cvtColor(img, self.color_in)
  113. def from_colorspace(self, img: np.ndarray) -> np.ndarray:
  114. if self.color_out is None:
  115. return img
  116. return cv2.cvtColor(clip(img, np.uint8, inplace=True), self.color_out)
  117. def flatten(self, img: np.ndarray) -> np.ndarray:
  118. img = self.to_colorspace(img)
  119. img = to_float(img)
  120. return img.reshape(-1, self.num_channels)
  121. def reconstruct(self, pixels: np.ndarray, height: int, width: int) -> np.ndarray:
  122. pixels = clip(pixels, np.uint8, inplace=True)
  123. if self.num_channels == 1:
  124. return self.from_colorspace(pixels.reshape(height, width))
  125. return self.from_colorspace(pixels.reshape(height, width, self.num_channels))
  126. @staticmethod
  127. def _pca_sign(x: np.ndarray) -> np.ndarray:
  128. return np.sign(np.trace(x.components_))
  129. def __call__(self, image: np.ndarray) -> np.ndarray:
  130. height, width = image.shape[:2]
  131. pixels = self.flatten(image)
  132. self.source_transformer.fit(pixels)
  133. if (
  134. hasattr(self.target_transformer, "components_")
  135. and hasattr(self.source_transformer, "components_")
  136. and self._pca_sign(self.target_transformer) != self._pca_sign(self.source_transformer)
  137. ):
  138. self.target_transformer.components_ *= -1
  139. representation = self.source_transformer.transform(pixels)
  140. result = self.target_transformer.inverse_transform(representation)
  141. return self.reconstruct(result, height, width)
  142. @clipped
  143. @preserve_channel_dim
  144. def adapt_pixel_distribution(
  145. img: np.ndarray,
  146. ref: np.ndarray,
  147. transform_type: Literal["pca", "standard", "minmax"],
  148. weight: float,
  149. ) -> np.ndarray:
  150. """Adapt the pixel distribution of an image to match a reference image.
  151. This function adapts the pixel distribution of an image to match a reference image
  152. using a specified transformation type and weight.
  153. Args:
  154. img (np.ndarray): The input image to be adapted.
  155. ref (np.ndarray): The reference image.
  156. transform_type (Literal["pca", "standard", "minmax"]): The type of transformation to use.
  157. weight (float): The weight of the transformation.
  158. Returns:
  159. np.ndarray: The adapted image.
  160. Raises:
  161. ValueError: If the input image and reference image have different dtypes or numbers of channels.
  162. """
  163. if img.dtype != ref.dtype:
  164. raise ValueError("Input image and reference image must have the same dtype.")
  165. img_num_channels = get_num_channels(img)
  166. ref_num_channels = get_num_channels(ref)
  167. if img_num_channels != ref_num_channels:
  168. raise ValueError("Input image and reference image must have the same number of channels.")
  169. if img_num_channels == 1:
  170. img = np.squeeze(img)
  171. ref = np.squeeze(ref)
  172. if img.shape != ref.shape:
  173. ref = cv2.resize(ref, dsize=img.shape[:2], interpolation=cv2.INTER_AREA)
  174. original_dtype = img.dtype
  175. if original_dtype == np.float32:
  176. img = from_float(img, np.uint8)
  177. ref = from_float(ref, np.uint8)
  178. transformer = {"pca": PCA, "standard": StandardScaler, "minmax": MinMaxScaler}[transform_type]()
  179. adapter = DomainAdapter(transformer=transformer, ref_img=ref)
  180. transformed = adapter(img).astype(np.float32)
  181. result = img.astype(np.float32) * (1 - weight) + transformed * weight
  182. return result if original_dtype == np.uint8 else to_float(result)
  183. def low_freq_mutate(amp_src: np.ndarray, amp_trg: np.ndarray, beta: float) -> np.ndarray:
  184. image_shape = amp_src.shape[:2]
  185. border = int(np.floor(min(image_shape) * beta))
  186. center_x, center_y = fgeometric.center(image_shape)
  187. height, width = image_shape
  188. h1, h2 = max(0, int(center_y - border)), min(int(center_y + border), height)
  189. w1, w2 = max(0, int(center_x - border)), min(int(center_x + border), width)
  190. amp_src[h1:h2, w1:w2] = amp_trg[h1:h2, w1:w2]
  191. return amp_src
  192. @clipped
  193. @preserve_channel_dim
  194. def fourier_domain_adaptation(img: np.ndarray, target_img: np.ndarray, beta: float) -> np.ndarray:
  195. """Apply Fourier Domain Adaptation to the input image using a target image.
  196. This function performs domain adaptation in the frequency domain by modifying the amplitude
  197. spectrum of the source image based on the target image's amplitude spectrum. It preserves
  198. the phase information of the source image, which helps maintain its content while adapting
  199. its style to match the target image.
  200. Args:
  201. img (np.ndarray): The source image to be adapted. Can be grayscale or RGB.
  202. target_img (np.ndarray): The target image used as a reference for adaptation.
  203. Should have the same dimensions as the source image.
  204. beta (float): The adaptation strength, typically in the range [0, 1].
  205. Higher values result in stronger adaptation towards the target image's style.
  206. Returns:
  207. np.ndarray: The adapted image with the same shape and type as the input image.
  208. Raises:
  209. ValueError: If the source and target images have different shapes.
  210. Note:
  211. - Both input images are converted to float32 for processing.
  212. - The function handles both grayscale (2D) and color (3D) images.
  213. - For grayscale images, an extra dimension is added to facilitate uniform processing.
  214. - The adaptation is performed channel-wise for color images.
  215. - The output is clipped to the valid range and preserves the original number of channels.
  216. The adaptation process involves the following steps for each channel:
  217. 1. Compute the 2D Fourier Transform of both source and target images.
  218. 2. Shift the zero frequency component to the center of the spectrum.
  219. 3. Extract amplitude and phase information from the source image's spectrum.
  220. 4. Mutate the source amplitude using the target amplitude and the beta parameter.
  221. 5. Combine the mutated amplitude with the original phase.
  222. 6. Perform the inverse Fourier Transform to obtain the adapted channel.
  223. The `low_freq_mutate` function (not shown here) is responsible for the actual
  224. amplitude mutation, focusing on low-frequency components which carry style information.
  225. Example:
  226. >>> import numpy as np
  227. >>> import albumentations as A
  228. >>> source_img = np.random.rand(100, 100, 3).astype(np.float32)
  229. >>> target_img = np.random.rand(100, 100, 3).astype(np.float32)
  230. >>> adapted_img = A.fourier_domain_adaptation(source_img, target_img, beta=0.5)
  231. >>> assert adapted_img.shape == source_img.shape
  232. References:
  233. FDA: Fourier Domain Adaptation for Semantic Segmentation: Yang and Soatto, 2020, CVPR
  234. https://openaccess.thecvf.com/content_CVPR_2020/papers/Yang_FDA_Fourier_Domain_Adaptation_for_Semantic_Segmentation_CVPR_2020_paper.pdf
  235. """
  236. src_img = img.astype(np.float32)
  237. trg_img = target_img.astype(np.float32)
  238. if src_img.ndim == MONO_CHANNEL_DIMENSIONS:
  239. src_img = np.expand_dims(src_img, axis=-1)
  240. if trg_img.ndim == MONO_CHANNEL_DIMENSIONS:
  241. trg_img = np.expand_dims(trg_img, axis=-1)
  242. num_channels = src_img.shape[-1]
  243. # Prepare container for the output image
  244. src_in_trg = np.zeros_like(src_img)
  245. for channel_id in range(num_channels):
  246. # Perform FFT on each channel
  247. fft_src = np.fft.fft2(src_img[:, :, channel_id])
  248. fft_trg = np.fft.fft2(trg_img[:, :, channel_id])
  249. # Shift the zero frequency component to the center
  250. fft_src_shifted = np.fft.fftshift(fft_src)
  251. fft_trg_shifted = np.fft.fftshift(fft_trg)
  252. # Extract amplitude and phase
  253. amp_src, pha_src = np.abs(fft_src_shifted), np.angle(fft_src_shifted)
  254. amp_trg = np.abs(fft_trg_shifted)
  255. # Mutate the amplitude part of the source with the target
  256. mutated_amp = low_freq_mutate(amp_src.copy(), amp_trg, beta)
  257. # Combine the mutated amplitude with the original phase
  258. fft_src_mutated = np.fft.ifftshift(mutated_amp * np.exp(1j * pha_src))
  259. # Perform inverse FFT
  260. src_in_trg_channel = np.fft.ifft2(fft_src_mutated)
  261. # Store the result in the corresponding channel of the output image
  262. src_in_trg[:, :, channel_id] = np.real(src_in_trg_channel)
  263. return src_in_trg
  264. @clipped
  265. @preserve_channel_dim
  266. def apply_histogram(img: np.ndarray, reference_image: np.ndarray, blend_ratio: float) -> np.ndarray:
  267. """Apply histogram matching to an input image using a reference image and blend the result.
  268. This function performs histogram matching between the input image and a reference image,
  269. then blends the result with the original input image based on the specified blend ratio.
  270. Args:
  271. img (np.ndarray): The input image to be transformed. Can be either grayscale or RGB.
  272. Supported dtypes: uint8, float32 (values should be in [0, 1] range).
  273. reference_image (np.ndarray): The reference image used for histogram matching.
  274. Should have the same number of channels as the input image.
  275. Supported dtypes: uint8, float32 (values should be in [0, 1] range).
  276. blend_ratio (float): The ratio for blending the matched image with the original image.
  277. Should be in the range [0, 1], where 0 means no change and 1 means full histogram matching.
  278. Returns:
  279. np.ndarray: The transformed image after histogram matching and blending.
  280. The output will have the same shape and dtype as the input image.
  281. Supported image types:
  282. - Grayscale images: 2D arrays
  283. - RGB images: 3D arrays with 3 channels
  284. - Multispectral images: 3D arrays with more than 3 channels
  285. Note:
  286. - If the input and reference images have different sizes, the reference image
  287. will be resized to match the input image's dimensions.
  288. - The function uses a custom implementation of histogram matching based on OpenCV and NumPy.
  289. - The @clipped and @preserve_channel_dim decorators ensure the output is within
  290. the valid range and maintains the original number of dimensions.
  291. """
  292. # Resize reference image only if necessary
  293. if img.shape[:2] != reference_image.shape[:2]:
  294. reference_image = cv2.resize(reference_image, dsize=(img.shape[1], img.shape[0]))
  295. img = np.squeeze(img)
  296. reference_image = np.squeeze(reference_image)
  297. # Match histograms between the images
  298. matched = match_histograms(img, reference_image)
  299. # Blend the original image and the matched image
  300. return add_weighted(matched, blend_ratio, img, 1 - blend_ratio)
  301. @uint8_io
  302. @preserve_channel_dim
  303. def match_histograms(image: np.ndarray, reference: np.ndarray) -> np.ndarray:
  304. """Adjust an image so that its cumulative histogram matches that of another.
  305. The adjustment is applied separately for each channel.
  306. Args:
  307. image (np.ndarray): Input image. Can be gray-scale or in color.
  308. reference (np.ndarray): Image to match histogram of. Must have the same number of channels as image.
  309. channel_axis (int | None): If None, the image is assumed to be a grayscale (single channel) image.
  310. Otherwise, this indicates which axis of the array corresponds to channels.
  311. Returns:
  312. np.ndarray: Transformed input image.
  313. Raises:
  314. ValueError: Thrown when the number of channels in the input image and the reference differ.
  315. """
  316. if reference.dtype != np.uint8:
  317. reference = from_float(reference, np.uint8)
  318. if image.ndim != reference.ndim:
  319. raise ValueError("Image and reference must have the same number of dimensions.")
  320. # Expand dimensions for grayscale images
  321. if image.ndim == 2:
  322. image = np.expand_dims(image, axis=-1)
  323. if reference.ndim == 2:
  324. reference = np.expand_dims(reference, axis=-1)
  325. matched = np.empty(image.shape, dtype=np.uint8)
  326. num_channels = image.shape[-1]
  327. for channel in range(num_channels):
  328. matched_channel = _match_cumulative_cdf(image[..., channel], reference[..., channel]).astype(np.uint8)
  329. matched[..., channel] = matched_channel
  330. return matched
  331. def _match_cumulative_cdf(source: np.ndarray, template: np.ndarray) -> np.ndarray:
  332. src_lookup = source.reshape(-1)
  333. src_counts = np.bincount(src_lookup)
  334. tmpl_counts = np.bincount(template.reshape(-1))
  335. # omit values where the count was 0
  336. tmpl_values = np.nonzero(tmpl_counts)[0]
  337. tmpl_counts = tmpl_counts[tmpl_values]
  338. # calculate normalized quantiles for each array
  339. src_quantiles = np.cumsum(src_counts) / source.size
  340. tmpl_quantiles = np.cumsum(tmpl_counts) / template.size
  341. interp_a_values = np.interp(src_quantiles, tmpl_quantiles, tmpl_values)
  342. return interp_a_values[src_lookup].reshape(source.shape).astype(np.uint8)