test_upfirdn.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. # Code adapted from "upfirdn" python library with permission:
  2. #
  3. # Copyright (c) 2009, Motorola, Inc
  4. #
  5. # All Rights Reserved.
  6. #
  7. # Redistribution and use in source and binary forms, with or without
  8. # modification, are permitted provided that the following conditions are
  9. # met:
  10. #
  11. # * Redistributions of source code must retain the above copyright notice,
  12. # this list of conditions and the following disclaimer.
  13. #
  14. # * Redistributions in binary form must reproduce the above copyright
  15. # notice, this list of conditions and the following disclaimer in the
  16. # documentation and/or other materials provided with the distribution.
  17. #
  18. # * Neither the name of Motorola nor the names of its contributors may be
  19. # used to endorse or promote products derived from this software without
  20. # specific prior written permission.
  21. #
  22. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
  23. # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
  24. # THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
  25. # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  26. # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
  27. # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  28. # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  29. # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  30. # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  31. # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  32. # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  33. import numpy as np
  34. from itertools import product
  35. from pytest import raises as assert_raises
  36. import pytest
  37. from scipy._lib import array_api_extra as xpx
  38. from scipy._lib._array_api import (
  39. xp_assert_close, array_namespace, _xp_copy_to_numpy, is_cupy,
  40. make_xp_test_case
  41. )
  42. from scipy._lib.array_api_compat import numpy as np_compat
  43. from scipy.signal import upfirdn, firwin
  44. from scipy.signal._upfirdn import _output_len, _upfirdn_modes
  45. from scipy.signal._upfirdn_apply import _pad_test
  46. skip_xp_backends = pytest.mark.skip_xp_backends
  47. def upfirdn_naive(x, h, up=1, down=1):
  48. """Naive upfirdn processing in Python.
  49. Note: arg order (x, h) differs to facilitate apply_along_axis use.
  50. """
  51. x = np.asarray(x)
  52. h = np.asarray(h)
  53. out = np.zeros(len(x) * up, x.dtype)
  54. out[::up] = x
  55. out = np.convolve(h, out)[::down][:_output_len(len(h), len(x), up, down)]
  56. return out
  57. class UpFIRDnCase:
  58. """Test _UpFIRDn object"""
  59. def __init__(self, up, down, h, x_dtype, *, xp=None):
  60. if xp is None:
  61. xp = np_compat
  62. self.up = up
  63. self.down = down
  64. self.h = np.atleast_1d(h)
  65. self.x_dtype = x_dtype
  66. self.rng = np.random.RandomState(17)
  67. self.xp = xp
  68. def __call__(self):
  69. # tiny signal
  70. self.scrub(np.ones(1, self.x_dtype))
  71. # ones
  72. self.scrub(np.ones(10, self.x_dtype)) # ones
  73. # randn
  74. x = self.rng.randn(10).astype(self.x_dtype)
  75. if self.x_dtype in (np.complex64, np.complex128):
  76. x += 1j * self.rng.randn(10)
  77. self.scrub(x)
  78. # ramp
  79. self.scrub(np.arange(10).astype(self.x_dtype))
  80. # 3D, random
  81. if is_cupy(self.xp):
  82. # ndim > 2 is unsupported in CuPy.
  83. return
  84. size = (2, 3, 5)
  85. x = self.rng.randn(*size).astype(self.x_dtype)
  86. if self.x_dtype in (np.complex64, np.complex128):
  87. x += 1j * self.rng.randn(*size)
  88. for axis in range(len(size)):
  89. self.scrub(x, axis=axis)
  90. x = x[:, ::2, 1::3].T
  91. for axis in range(len(size)):
  92. self.scrub(x, axis=axis)
  93. def scrub(self, x, axis=-1):
  94. xp = self.xp
  95. yr = np.apply_along_axis(upfirdn_naive, axis, x,
  96. self.h, self.up, self.down)
  97. want_len = _output_len(len(self.h), x.shape[axis], self.up, self.down)
  98. assert yr.shape[axis] == want_len
  99. y = upfirdn(xp.asarray(self.h), xp.asarray(x), self.up, self.down,
  100. axis=axis)
  101. assert y.shape[axis] == want_len
  102. assert y.shape == yr.shape
  103. dtypes = (self.h.dtype, x.dtype)
  104. if all(d == np.complex64 for d in dtypes):
  105. assert y.dtype == xp.complex64
  106. elif np.complex64 in dtypes and np.float32 in dtypes:
  107. assert y.dtype == xp.complex64
  108. elif all(d == np.float32 for d in dtypes):
  109. assert y.dtype == xp.float32
  110. elif np.complex128 in dtypes or np.complex64 in dtypes:
  111. assert y.dtype == xp.complex128
  112. else:
  113. assert y.dtype == xp.float64
  114. yr = xp.asarray(yr, dtype=y.dtype)
  115. xp_assert_close(yr, y)
  116. _UPFIRDN_TYPES = ("int64", "float32", "complex64", "float64", "complex128")
  117. @make_xp_test_case(upfirdn)
  118. class TestUpfirdn:
  119. @skip_xp_backends(np_only=True, reason="enough to only test on numpy")
  120. def test_valid_input(self, xp):
  121. assert_raises(ValueError, upfirdn, [1], [1], 1, 0) # up or down < 1
  122. assert_raises(ValueError, upfirdn, [], [1], 1, 1) # h.ndim != 1
  123. assert_raises(ValueError, upfirdn, [[1]], [1], 1, 1)
  124. @pytest.mark.parametrize('len_h', [1, 2, 3, 4, 5])
  125. @pytest.mark.parametrize('len_x', [1, 2, 3, 4, 5])
  126. def test_singleton(self, len_h, len_x, xp):
  127. # gh-9844: lengths producing expected outputs
  128. h = xp.zeros(len_h)
  129. h = xpx.at(h)[len_h // 2].set(1.) # make h a delta
  130. x = xp.ones(len_x)
  131. y = upfirdn(h, x, 1, 1)
  132. want = xpx.pad(x, (len_h // 2, (len_h - 1) // 2), 'constant', xp=xp)
  133. xp_assert_close(y, want)
  134. def test_shift_x(self, xp):
  135. # gh-9844: shifted x can change values?
  136. y = upfirdn(xp.asarray([1, 1]), xp.asarray([1.]), 1, 1)
  137. xp_assert_close(
  138. y, xp.asarray([1.0, 1.0], dtype=xp.float64) # was [0, 1] in the issue
  139. )
  140. y = upfirdn(xp.asarray([1, 1]), xp.asarray([0., 1.]), 1, 1)
  141. xp_assert_close(y, xp.asarray([0.0, 1.0, 1.0], dtype=xp.float64))
  142. # A bunch of lengths/factors chosen because they exposed differences
  143. # between the "old way" and new way of computing length, and then
  144. # got `expected` from MATLAB
  145. @pytest.mark.parametrize('len_h, len_x, up, down, expected', [
  146. (2, 2, 5, 2, [1, 0, 0, 0]),
  147. (2, 3, 6, 3, [1, 0, 1, 0, 1]),
  148. (2, 4, 4, 3, [1, 0, 0, 0, 1]),
  149. (3, 2, 6, 2, [1, 0, 0, 1, 0]),
  150. (4, 11, 3, 5, [1, 0, 0, 1, 0, 0, 1]),
  151. ])
  152. def test_length_factors(self, len_h, len_x, up, down, expected, xp):
  153. # gh-9844: weird factors
  154. h = xp.zeros(len_h)
  155. h = xpx.at(h)[0].set(1.)
  156. x = xp.ones(len_x, dtype=xp.float64)
  157. y = upfirdn(h, x, up, down)
  158. expected = xp.asarray(expected, dtype=xp.float64)
  159. xp_assert_close(y, expected)
  160. @pytest.mark.parametrize(
  161. 'dtype', ["int64", "float32", "complex64", "float64", "complex128"]
  162. )
  163. @pytest.mark.parametrize('down, want_len', [ # lengths from MATLAB
  164. (2, 5015),
  165. (11, 912),
  166. (79, 127),
  167. ])
  168. def test_vs_convolve(self, down, want_len, dtype, xp):
  169. # Check that up=1.0 gives same answer as convolve + slicing
  170. random_state = np.random.RandomState(17)
  171. size = 10000
  172. np_dtype = getattr(np, dtype)
  173. x = random_state.randn(size).astype(np_dtype)
  174. if np_dtype in (np.complex64, np.complex128):
  175. x += 1j * random_state.randn(size)
  176. dtype = getattr(xp, dtype)
  177. x = xp.asarray(x, dtype=dtype)
  178. h = xp.asarray(firwin(31, 1. / down, window='hamming'))
  179. yl = xp.asarray(
  180. upfirdn_naive(_xp_copy_to_numpy(x), _xp_copy_to_numpy(h), 1, down)
  181. )
  182. y = upfirdn(h, x, up=1, down=down)
  183. assert y.shape == (want_len,)
  184. assert yl.shape[0] == y.shape[0]
  185. xp_assert_close(yl, y, atol=1e-7, rtol=1e-7)
  186. @pytest.mark.parametrize('x_dtype', _UPFIRDN_TYPES)
  187. @pytest.mark.parametrize('h', (1., 1j))
  188. @pytest.mark.parametrize('up, down', [(1, 1), (2, 2), (3, 2), (2, 3)])
  189. def test_vs_naive_delta(self, x_dtype, h, up, down, xp):
  190. UpFIRDnCase(up, down, h, x_dtype, xp=xp)()
  191. @pytest.mark.parametrize('x_dtype', _UPFIRDN_TYPES)
  192. @pytest.mark.parametrize('h_dtype', _UPFIRDN_TYPES)
  193. @pytest.mark.parametrize('p_max, q_max',
  194. list(product((10, 100), (10, 100))))
  195. def test_vs_naive(self, x_dtype, h_dtype, p_max, q_max, xp):
  196. tests = self._random_factors(p_max, q_max, h_dtype, x_dtype, xp=xp)
  197. for test in tests:
  198. test()
  199. def _random_factors(self, p_max, q_max, h_dtype, x_dtype, *, xp):
  200. n_rep = 3
  201. longest_h = 25
  202. random_state = np.random.RandomState(17)
  203. tests = []
  204. for _ in range(n_rep):
  205. # Randomize the up/down factors somewhat
  206. p_add = q_max if p_max > q_max else 1
  207. q_add = p_max if q_max > p_max else 1
  208. p = random_state.randint(p_max) + p_add
  209. q = random_state.randint(q_max) + q_add
  210. # Generate random FIR coefficients
  211. len_h = random_state.randint(longest_h) + 1
  212. h = np.atleast_1d(random_state.randint(len_h))
  213. h = h.astype(h_dtype)
  214. if h_dtype is complex:
  215. h += 1j * random_state.randint(len_h)
  216. tests.append(UpFIRDnCase(p, q, h, x_dtype, xp=xp))
  217. return tests
  218. @pytest.mark.parametrize('mode', _upfirdn_modes)
  219. def test_extensions(self, mode, xp):
  220. """Test vs. manually computed results for modes not in numpy's pad."""
  221. x = np.asarray([1, 2, 3, 1], dtype=np.float64)
  222. npre, npost = 6, 6
  223. y = _pad_test(x, npre=npre, npost=npost, mode=mode)
  224. x = xp.asarray(x)
  225. y = xp.asarray(y)
  226. if mode == 'antisymmetric':
  227. y_expected = xp.asarray(
  228. [3.0, 1, -1, -3, -2, -1, 1, 2, 3, 1, -1, -3, -2, -1, 1, 2])
  229. elif mode == 'antireflect':
  230. y_expected = xp.asarray(
  231. [1.0, 2, 3, 1, -1, 0, 1, 2, 3, 1, -1, 0, 1, 2, 3, 1])
  232. elif mode == 'smooth':
  233. y_expected = xp.asarray(
  234. [-5.0, -4, -3, -2, -1, 0, 1, 2, 3, 1, -1, -3, -5, -7, -9, -11])
  235. elif mode == "line":
  236. lin_slope = (x[-1] - x[0]) / (x.shape[0] - 1)
  237. left = x[0] + xp.arange(-npre, 0, 1, dtype=xp.float64) * lin_slope
  238. right = x[-1] + xp.arange(1, npost + 1, dtype=xp.float64) * lin_slope
  239. concat = array_namespace(left).concat
  240. y_expected = concat((left, x, right))
  241. else:
  242. y_expected = np.pad(_xp_copy_to_numpy(x), (npre, npost), mode=mode)
  243. y_expected = xp.asarray(y_expected)
  244. y_expected = xp.asarray(y_expected, dtype=xp.float64)
  245. xp_assert_close(y, y_expected)
  246. @pytest.mark.parametrize(
  247. 'size, h_len, mode, dtype',
  248. product(
  249. [8],
  250. [4, 5, 26], # include cases with h_len > 2*size
  251. _upfirdn_modes,
  252. ["float32", "float64", "complex64", "complex128"],
  253. )
  254. )
  255. def test_modes(self, size, h_len, mode, dtype, xp):
  256. if is_cupy(xp) and mode != "constant":
  257. pytest.skip(reason="only mode='constant' supported by CuPy")
  258. dtype_np = getattr(np, dtype)
  259. dtype_xp = getattr(xp, dtype)
  260. random_state = np.random.RandomState(5)
  261. x = random_state.randn(size).astype(dtype_np)
  262. if dtype in ("complex64", "complex128"):
  263. x += 1j * random_state.randn(size)
  264. h = np.arange(1, 1 + h_len, dtype=x.real.dtype)
  265. x = xp.asarray(x, dtype=dtype_xp)
  266. h = xp.asarray(h)
  267. y = upfirdn(h, x, up=1, down=1, mode=mode)
  268. # expected result: pad the input, filter with zero padding, then crop
  269. npad = h_len - 1
  270. if mode in ['antisymmetric', 'antireflect', 'smooth', 'line']:
  271. # use _pad_test test function for modes not supported by np.pad.
  272. xpad = _pad_test(_xp_copy_to_numpy(x), npre=npad, npost=npad, mode=mode)
  273. else:
  274. xpad = np.pad(_xp_copy_to_numpy(x), npad, mode=mode)
  275. xpad = xp.asarray(xpad)
  276. ypad = upfirdn(h, xpad, up=1, down=1, mode='constant')
  277. y_expected = ypad[npad:-npad]
  278. atol = rtol = xp.finfo(dtype_xp).eps * 1e2
  279. xp_assert_close(y, y_expected, atol=atol, rtol=rtol)
  280. @make_xp_test_case(upfirdn)
  281. def test_output_len_long_input(xp):
  282. # Regression test for gh-17375. On Windows, a large enough input
  283. # that should have been well within the capabilities of 64 bit integers
  284. # would result in a 32 bit overflow because of a bug in Cython 0.29.32.
  285. len_h = 1001
  286. in_len = 10**8
  287. up = 320
  288. down = 441
  289. out_len = _output_len(len_h, in_len, up, down)
  290. # The expected value was computed "by hand" from the formula
  291. # (((in_len - 1) * up + len_h) - 1) // down + 1
  292. assert out_len == 72562360