utils.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. """Module containing utility functions for augmentation operations.
  2. This module provides a collection of helper functions and utilities used throughout
  3. the augmentation pipeline. It includes functions for image loading, type checking,
  4. error handling, mathematical operations, and decorators that add functionality to
  5. other functions in the codebase. These utilities help ensure consistent behavior
  6. and simplify common operations across different augmentation transforms.
  7. """
  8. from __future__ import annotations
  9. import functools
  10. from functools import wraps
  11. from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
  12. import cv2
  13. import numpy as np
  14. from albucore.utils import (
  15. is_grayscale_image,
  16. is_multispectral_image,
  17. is_rgb_image,
  18. )
  19. from typing_extensions import Concatenate, ParamSpec
  20. from albumentations.core.keypoints_utils import angle_to_2pi_range
  21. if TYPE_CHECKING:
  22. from pathlib import Path
  23. __all__ = [
  24. "angle_2pi_range",
  25. "non_rgb_error",
  26. "read_bgr_image",
  27. "read_grayscale",
  28. "read_rgb_image",
  29. ]
  30. P = ParamSpec("P")
  31. T = TypeVar("T", bound=np.ndarray)
  32. F = TypeVar("F", bound=Callable[..., Any])
  33. def read_bgr_image(path: str | Path) -> np.ndarray:
  34. """Read an image in BGR format from the specified path.
  35. Args:
  36. path (str | Path): Path to the image file.
  37. Returns:
  38. np.ndarray: Image in BGR format as a numpy array.
  39. """
  40. return cv2.imread(str(path), cv2.IMREAD_COLOR)
  41. def read_rgb_image(path: str | Path) -> np.ndarray:
  42. """Read an image in RGB format from the specified path.
  43. This function reads an image in BGR format using OpenCV and then
  44. converts it to RGB format.
  45. Args:
  46. path (str | Path): Path to the image file.
  47. Returns:
  48. np.ndarray: Image in RGB format as a numpy array.
  49. """
  50. image = read_bgr_image(path)
  51. return cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
  52. def read_grayscale(path: str | Path) -> np.ndarray:
  53. """Read a grayscale image from the specified path.
  54. Args:
  55. path (str | Path): Path to the image file.
  56. Returns:
  57. np.ndarray: Grayscale image as a numpy array.
  58. """
  59. return cv2.imread(str(path), cv2.IMREAD_GRAYSCALE)
  60. def angle_2pi_range(
  61. func: Callable[Concatenate[np.ndarray, P], np.ndarray],
  62. ) -> Callable[Concatenate[np.ndarray, P], np.ndarray]:
  63. """Decorator to normalize angle values to the range [0, 2π).
  64. This decorator wraps a function that processes keypoints, ensuring that
  65. angle values (stored in the 4th column, index 3) are normalized to the
  66. range [0, 2π) after the wrapped function executes.
  67. Args:
  68. func (Callable): Function that processes keypoints and returns a numpy array.
  69. The function should take a keypoints array as its first parameter.
  70. Returns:
  71. Callable: Wrapped function that normalizes angles after processing keypoints.
  72. """
  73. @wraps(func)
  74. def wrapped_function(keypoints: np.ndarray, *args: P.args, **kwargs: P.kwargs) -> np.ndarray:
  75. result = func(keypoints, *args, **kwargs)
  76. if len(result) > 0 and result.shape[1] > 3:
  77. result[:, 3] = angle_to_2pi_range(result[:, 3])
  78. return result
  79. return wrapped_function
  80. def non_rgb_error(image: np.ndarray) -> None:
  81. """Check if the input image is RGB and raise a ValueError if it's not.
  82. This function is used to ensure that certain transformations are only applied to
  83. RGB images. It provides helpful error messages for grayscale and multi-spectral images.
  84. Args:
  85. image (np.ndarray): The input image to check. Expected to be a numpy array
  86. representing an image.
  87. Raises:
  88. ValueError: If the input image is not an RGB image (i.e., does not have exactly 3 channels).
  89. The error message includes specific instructions for grayscale images
  90. and a note about incompatibility with multi-spectral images.
  91. Note:
  92. - RGB images are expected to have exactly 3 channels.
  93. - Grayscale images (1 channel) will trigger an error with conversion instructions.
  94. - Multi-spectral images (more than 3 channels) will trigger an error stating incompatibility.
  95. Example:
  96. >>> import numpy as np
  97. >>> rgb_image = np.random.randint(0, 256, (100, 100, 3), dtype=np.uint8)
  98. >>> non_rgb_error(rgb_image) # No error raised
  99. >>>
  100. >>> grayscale_image = np.random.randint(0, 256, (100, 100), dtype=np.uint8)
  101. >>> non_rgb_error(grayscale_image) # Raises ValueError with conversion instructions
  102. >>>
  103. >>> multispectral_image = np.random.randint(0, 256, (100, 100, 5), dtype=np.uint8)
  104. >>> non_rgb_error(multispectral_image) # Raises ValueError stating incompatibility
  105. """
  106. if not is_rgb_image(image):
  107. message = "This transformation expects 3-channel images"
  108. if is_grayscale_image(image):
  109. message += "\nYou can convert your grayscale image to RGB using cv2.cvtColor(image, cv2.COLOR_GRAY2RGB))"
  110. if is_multispectral_image(image): # Any image with a number of channels other than 1 and 3
  111. message += "\nThis transformation cannot be applied to multi-spectral images"
  112. raise ValueError(message)
  113. def check_range(value: tuple[float, float], lower_bound: float, upper_bound: float, name: str | None) -> None:
  114. """Checks if the given value is within the specified bounds
  115. Args:
  116. value (tuple[float, float]): The value to check and convert. Can be a single float or a tuple of floats.
  117. lower_bound (float): The lower bound for the range check.
  118. upper_bound (float): The upper bound for the range check.
  119. name (str | None): The name of the parameter being checked. Used for error messages.
  120. Raises:
  121. ValueError: If the value is outside the bounds or if the tuple values are not ordered correctly.
  122. """
  123. if not all(lower_bound <= x <= upper_bound for x in value):
  124. raise ValueError(f"All values in {name} must be within [{lower_bound}, {upper_bound}] for tuple inputs.")
  125. if not value[0] <= value[1]:
  126. raise ValueError(f"{name!s} tuple values must be ordered as (min, max). Got: {value}")
  127. class PCA:
  128. def __init__(self, n_components: int | None = None) -> None:
  129. if n_components is not None and n_components <= 0:
  130. raise ValueError("Number of components must be greater than zero.")
  131. self.n_components = n_components
  132. self.mean: np.ndarray | None = None
  133. self.components_: np.ndarray | None = None
  134. self.explained_variance_: np.ndarray | None = None
  135. def fit(self, x: np.ndarray) -> None:
  136. x = x.astype(np.float64, copy=False) # avoid unnecessary copy if already float64
  137. n_samples, n_features = x.shape
  138. # Determine the number of components if not set
  139. if self.n_components is None:
  140. self.n_components = min(n_samples, n_features)
  141. self.mean, eigenvectors, eigenvalues = cv2.PCACompute2(x, mean=None, maxComponents=self.n_components)
  142. self.components_ = eigenvectors
  143. self.explained_variance_ = eigenvalues.flatten()
  144. def transform(self, x: np.ndarray) -> np.ndarray:
  145. if self.components_ is None:
  146. raise ValueError(
  147. "This PCA instance is not fitted yet. "
  148. "Call 'fit' with appropriate arguments before using this estimator.",
  149. )
  150. x = x.astype(np.float64, copy=False) # avoid unnecessary copy if already float64
  151. return cv2.PCAProject(x, self.mean, self.components_)
  152. def fit_transform(self, x: np.ndarray) -> np.ndarray:
  153. self.fit(x)
  154. return self.transform(x)
  155. def inverse_transform(self, x: np.ndarray) -> np.ndarray:
  156. if self.components_ is None:
  157. raise ValueError(
  158. "This PCA instance is not fitted yet. "
  159. "Call 'fit' with appropriate arguments before using this estimator.",
  160. )
  161. return cv2.PCABackProject(x, self.mean, self.components_)
  162. def explained_variance_ratio(self) -> np.ndarray:
  163. if self.explained_variance_ is None:
  164. raise ValueError(
  165. "This PCA instance is not fitted yet. Call 'fit' with appropriate arguments before using this method.",
  166. )
  167. total_variance = np.sum(self.explained_variance_)
  168. return self.explained_variance_ / total_variance
  169. def cumulative_explained_variance_ratio(self) -> np.ndarray:
  170. return np.cumsum(self.explained_variance_ratio())
  171. def handle_empty_array(param_name: str) -> Callable[[F], F]:
  172. def decorator(func: F) -> F:
  173. @functools.wraps(func)
  174. def wrapper(*args: Any, **kwargs: Any) -> Any:
  175. # Check if the parameter is passed as positional argument
  176. if len(args) > 0:
  177. array = args[0]
  178. # Check if the parameter is passed as keyword argument
  179. elif param_name in kwargs:
  180. array = kwargs[param_name]
  181. else:
  182. raise ValueError(f"Missing required argument: {param_name}")
  183. if len(array) == 0:
  184. return array
  185. return func(*args, **kwargs)
  186. return cast("F", wrapper)
  187. return decorator