| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577 |
- import pickle
- import pytest
- import numpy as np
- from numpy.linalg import LinAlgError
- from scipy._lib._array_api import xp_assert_close, make_xp_test_case
- from scipy.stats.qmc import Halton
- from scipy.spatial import cKDTree # type: ignore[attr-defined]
- from scipy.interpolate._rbfinterp import (
- _AVAILABLE, _SCALE_INVARIANT, _NAME_TO_MIN_DEGREE, RBFInterpolator,
- _get_backend
- )
- from scipy.interpolate import _rbfinterp_pythran
- from scipy._lib._testutils import _run_concurrent_barrier
- skip_xp_backends = pytest.mark.skip_xp_backends
- def _vandermonde(x, degree, xp=np):
- # Returns a matrix of monomials that span polynomials with the specified
- # degree evaluated at x.
- backend = _get_backend(xp)
- powers = backend._monomial_powers(x.shape[1], degree, xp)
- return backend.polynomial_matrix(x, powers, xp)
- def _1d_test_function(x, xp):
- # Test function used in Wahba's "Spline Models for Observational Data".
- # domain ~= (0, 3), range ~= (-1.0, 0.2)
- x = x[:, 0]
- y = 4.26*(xp.exp(-x) - 4*xp.exp(-2*x) + 3*xp.exp(-3*x))
- return y
- def _2d_test_function(x, xp):
- # Franke's test function.
- # domain ~= (0, 1) X (0, 1), range ~= (0.0, 1.2)
- x1, x2 = x[:, 0], x[:, 1]
- term1 = 0.75 * xp.exp(-(9*x1-2)**2/4 - (9*x2-2)**2/4)
- term2 = 0.75 * xp.exp(-(9*x1+1)**2/49 - (9*x2+1)/10)
- term3 = 0.5 * xp.exp(-(9*x1-7)**2/4 - (9*x2-3)**2/4)
- term4 = -0.2 * xp.exp(-(9*x1-4)**2 - (9*x2-7)**2)
- y = term1 + term2 + term3 + term4
- return y
- def _is_conditionally_positive_definite(kernel, m):
- # Tests whether the kernel is conditionally positive definite of order m.
- # See chapter 7 of Fasshauer's "Meshfree Approximation Methods with
- # MATLAB".
- nx = 10
- ntests = 100
- for ndim in [1, 2, 3, 4, 5]:
- # Generate sample points with a Halton sequence to avoid samples that
- # are too close to each other, which can make the matrix singular.
- seq = Halton(ndim, scramble=False, seed=np.random.RandomState())
- for _ in range(ntests):
- x = 2*seq.random(nx) - 1
- A = _rbfinterp_pythran._kernel_matrix(x, kernel)
- P = _vandermonde(x, m - 1)
- Q, R = np.linalg.qr(P, mode='complete')
- # Q2 forms a basis spanning the space where P.T.dot(x) = 0. Project
- # A onto this space, and then see if it is positive definite using
- # the Cholesky decomposition. If not, then the kernel is not c.p.d.
- # of order m.
- Q2 = Q[:, P.shape[1]:]
- B = Q2.T.dot(A).dot(Q2)
- try:
- np.linalg.cholesky(B)
- except np.linalg.LinAlgError:
- return False
- return True
- # Sorting the parametrize arguments is necessary to avoid a parallelization
- # issue described here: https://github.com/pytest-dev/pytest-xdist/issues/432.
- @pytest.mark.parametrize('kernel', sorted(_AVAILABLE))
- def test_conditionally_positive_definite(kernel):
- # Test if each kernel in _AVAILABLE is conditionally positive definite of
- # order m, where m comes from _NAME_TO_MIN_DEGREE. This is a necessary
- # condition for the smoothed RBF interpolant to be well-posed in general.
- m = _NAME_TO_MIN_DEGREE.get(kernel, -1) + 1
- assert _is_conditionally_positive_definite(kernel, m)
- class _TestRBFInterpolator:
- @pytest.mark.parametrize('kernel', sorted(_SCALE_INVARIANT))
- def test_scale_invariance_1d(self, kernel, xp):
- # Verify that the functions in _SCALE_INVARIANT are insensitive to the
- # shape parameter (when smoothing == 0) in 1d.
- seq = Halton(1, scramble=False, seed=np.random.RandomState())
- x = 3*seq.random(50)
- x = xp.asarray(x)
- y = _1d_test_function(x, xp)
- xitp = 3*seq.random(50)
- xitp = xp.asarray(xitp)
- yitp1 = self.build(x, y, epsilon=1.0, kernel=kernel)(xitp)
- yitp2 = self.build(x, y, epsilon=2.0, kernel=kernel)(xitp)
- xp_assert_close(yitp1, yitp2, atol=1e-8)
- @pytest.mark.parametrize('kernel', sorted(_SCALE_INVARIANT))
- def test_scale_invariance_2d(self, kernel, xp):
- # Verify that the functions in _SCALE_INVARIANT are insensitive to the
- # shape parameter (when smoothing == 0) in 2d.
- seq = Halton(2, scramble=False, seed=np.random.RandomState())
- x = seq.random(100)
- x = xp.asarray(x)
- y = _2d_test_function(x, xp)
- xitp = seq.random(100)
- xitp = xp.asarray(xitp)
- yitp1 = self.build(x, y, epsilon=1.0, kernel=kernel)(xitp)
- yitp2 = self.build(x, y, epsilon=2.0, kernel=kernel)(xitp)
- xp_assert_close(yitp1, yitp2, atol=1e-8)
- @pytest.mark.parametrize('kernel', sorted(_AVAILABLE))
- def test_extreme_domains(self, kernel, xp):
- # Make sure the interpolant remains numerically stable for very
- # large/small domains.
- seq = Halton(2, scramble=False, seed=np.random.RandomState())
- scale = 1e50
- shift = 1e55
- x = seq.random(100)
- x = xp.asarray(x)
- y = _2d_test_function(x, xp)
- xitp = seq.random(100)
- xitp = xp.asarray(xitp)
- if kernel in _SCALE_INVARIANT:
- yitp1 = self.build(x, y, kernel=kernel)(xitp)
- yitp2 = self.build(
- x*scale + shift, y,
- kernel=kernel
- )(xitp*scale + shift)
- else:
- yitp1 = self.build(x, y, epsilon=5.0, kernel=kernel)(xitp)
- yitp2 = self.build(
- x*scale + shift, y,
- epsilon=5.0/scale,
- kernel=kernel
- )(xitp*scale + shift)
- xp_assert_close(yitp1, yitp2, atol=1e-8)
- def test_polynomial_reproduction(self, xp):
- # If the observed data comes from a polynomial, then the interpolant
- # should be able to reproduce the polynomial exactly, provided that
- # `degree` is sufficiently high.
- rng = np.random.RandomState(0)
- seq = Halton(2, scramble=False, seed=rng)
- degree = 3
- x = seq.random(50)
- xitp = seq.random(50)
- x = xp.asarray(x)
- xitp = xp.asarray(xitp)
- P = _vandermonde(x, degree, xp)
- Pitp = _vandermonde(xitp, degree, xp)
- poly_coeffs = rng.normal(0.0, 1.0, P.shape[1])
- poly_coeffs = xp.asarray(poly_coeffs)
- y = P @ poly_coeffs #y = P.dot(poly_coeffs)
- yitp1 = Pitp @ poly_coeffs #yitp1 = Pitp.dot(poly_coeffs)
- yitp2 = self.build(x, y, degree=degree)(xitp)
- xp_assert_close(yitp1, yitp2, atol=1e-8)
- @pytest.mark.slow
- def test_chunking(self, monkeypatch, xp):
- # If the observed data comes from a polynomial, then the interpolant
- # should be able to reproduce the polynomial exactly, provided that
- # `degree` is sufficiently high.
- rng = np.random.RandomState(0)
- seq = Halton(2, scramble=False, seed=rng)
- degree = 3
- largeN = 1000 + 33
- # this is large to check that chunking of the RBFInterpolator is tested
- x = seq.random(50)
- xitp = seq.random(largeN)
- x = xp.asarray(x)
- xitp = xp.asarray(xitp)
- P = _vandermonde(x, degree, xp)
- Pitp = _vandermonde(xitp, degree, xp)
- poly_coeffs = rng.normal(0.0, 1.0, P.shape[1])
- poly_coeffs = xp.asarray(poly_coeffs)
- y = P @ poly_coeffs # y = P.dot(poly_coeffs)
- yitp1 = Pitp @ poly_coeffs # yitp1 = Pitp.dot(poly_coeffs)
- interp = self.build(x, y, degree=degree)
- ce_real = interp._chunk_evaluator
- def _chunk_evaluator(*args, **kwargs):
- kwargs.update(memory_budget=100)
- return ce_real(*args, **kwargs)
- monkeypatch.setattr(interp, '_chunk_evaluator', _chunk_evaluator)
- yitp2 = interp(xitp)
- xp_assert_close(yitp1, yitp2, atol=1e-8)
- def test_vector_data(self, xp):
- # Make sure interpolating a vector field is the same as interpolating
- # each component separately.
- seq = Halton(2, scramble=False, seed=np.random.RandomState())
- x = seq.random(100)
- xitp = seq.random(100)
- x = xp.asarray(x)
- xitp = xp.asarray(xitp)
- y = xp.stack([_2d_test_function(x, xp),
- _2d_test_function(xp.flip(x, axis=1), xp)]).T
- yitp1 = self.build(x, y)(xitp)
- yitp2 = self.build(x, y[:, 0])(xitp)
- yitp3 = self.build(x, y[:, 1])(xitp)
- xp_assert_close(yitp1[:, 0], yitp2)
- xp_assert_close(yitp1[:, 1], yitp3)
- def test_complex_data(self, xp):
- # Interpolating complex input should be the same as interpolating the
- # real and complex components.
- seq = Halton(2, scramble=False, seed=np.random.RandomState())
- x = seq.random(100)
- xitp = seq.random(100)
- y = _2d_test_function(x, np) + 1j*_2d_test_function(x[:, ::-1], np)
- x, xitp, y = map(xp.asarray, (x, xitp, y))
- yitp1 = self.build(x, y)(xitp)
- yitp2 = self.build(x, y.real)(xitp)
- yitp3 = self.build(x, y.imag)(xitp)
- xp_assert_close(yitp1.real, yitp2)
- xp_assert_close(yitp1.imag, yitp3)
- @pytest.mark.parametrize('kernel', sorted(_AVAILABLE))
- def test_interpolation_misfit_1d(self, kernel, xp):
- # Make sure that each kernel, with its default `degree` and an
- # appropriate `epsilon`, does a good job at interpolation in 1d.
- seq = Halton(1, scramble=False, seed=np.random.RandomState())
- x = 3*seq.random(50)
- xitp = 3*seq.random(50)
- x = xp.asarray(x)
- xitp = xp.asarray(xitp)
- y = _1d_test_function(x, xp)
- ytrue = _1d_test_function(xitp, xp)
- yitp = self.build(x, y, epsilon=5.0, kernel=kernel)(xitp)
- mse = xp.mean((yitp - ytrue)**2)
- assert mse < 1.0e-4
- @pytest.mark.parametrize('kernel', sorted(_AVAILABLE))
- def test_interpolation_misfit_2d(self, kernel, xp):
- # Make sure that each kernel, with its default `degree` and an
- # appropriate `epsilon`, does a good job at interpolation in 2d.
- seq = Halton(2, scramble=False, seed=np.random.RandomState())
- x = seq.random(100)
- xitp = seq.random(100)
- x = xp.asarray(x)
- xitp = xp.asarray(xitp)
- y = _2d_test_function(x, xp)
- ytrue = _2d_test_function(xitp, xp)
- yitp = self.build(x, y, epsilon=5.0, kernel=kernel)(xitp)
- mse = xp.mean((yitp - ytrue)**2)
- assert mse < 2.0e-4
- @pytest.mark.parametrize('kernel', sorted(_AVAILABLE))
- def test_smoothing_misfit(self, kernel, xp):
- # Make sure we can find a smoothing parameter for each kernel that
- # removes a sufficient amount of noise.
- rng = np.random.RandomState(0)
- seq = Halton(1, scramble=False, seed=rng)
- noise = 0.2
- rmse_tol = 0.1
- smoothing_range = 10**xp.linspace(-4, 1, 20)
- x = 3*seq.random(100)
- y = _1d_test_function(x, np) + rng.normal(0.0, noise, (100,))
- x = xp.asarray(x)
- y = xp.asarray(y)
- ytrue = _1d_test_function(x, xp)
- rmse_within_tol = False
- for smoothing in smoothing_range:
- ysmooth = self.build(
- x, y,
- epsilon=1.0,
- smoothing=smoothing,
- kernel=kernel)(x)
- rmse = xp.sqrt(xp.mean((ysmooth - ytrue)**2))
- if rmse < rmse_tol:
- rmse_within_tol = True
- break
- assert rmse_within_tol
- def test_array_smoothing(self, xp):
- # Test using an array for `smoothing` to give less weight to a known
- # outlier.
- rng = np.random.RandomState(0)
- seq = Halton(1, scramble=False, seed=rng)
- degree = 2
- x = seq.random(50)
- P = _vandermonde(x, degree)
- poly_coeffs = rng.normal(0.0, 1.0, P.shape[1])
- y = P @ poly_coeffs # y = P.dot(poly_coeffs)
- y_with_outlier = y.copy()
- y_with_outlier[10] += 1.0
- smoothing = np.zeros((50,))
- smoothing[10] = 1000.0
- x, P, poly_coeffs, y = map(xp.asarray, (x, P, poly_coeffs, y))
- y_with_outlier, smoothing = map(xp.asarray, (y_with_outlier, smoothing))
- yitp = self.build(x, y_with_outlier, smoothing=smoothing)(x)
- # Should be able to reproduce the uncorrupted data almost exactly.
- xp_assert_close(yitp, y, atol=1e-4)
- def test_inconsistent_x_dimensions_error(self):
- # ValueError should be raised if the observation points and evaluation
- # points have a different number of dimensions.
- y = Halton(2, scramble=False, seed=np.random.RandomState()).random(10)
- d = _2d_test_function(y, np)
- x = Halton(1, scramble=False, seed=np.random.RandomState()).random(10)
- match = 'Expected the second axis of `x`'
- with pytest.raises(ValueError, match=match):
- self.build(y, d)(x)
- def test_inconsistent_d_length_error(self):
- y = np.linspace(0, 1, 5)[:, None]
- d = np.zeros(1)
- match = 'Expected the first axis of `d`'
- with pytest.raises(ValueError, match=match):
- self.build(y, d)
- def test_y_not_2d_error(self):
- y = np.linspace(0, 1, 5)
- d = np.zeros(5)
- match = '`y` must be a 2-dimensional array.'
- with pytest.raises(ValueError, match=match):
- self.build(y, d)
- def test_inconsistent_smoothing_length_error(self):
- y = np.linspace(0, 1, 5)[:, None]
- d = np.zeros(5)
- smoothing = np.ones(1)
- match = 'Expected `smoothing` to be'
- with pytest.raises(ValueError, match=match):
- self.build(y, d, smoothing=smoothing)
- def test_invalid_kernel_name_error(self):
- y = np.linspace(0, 1, 5)[:, None]
- d = np.zeros(5)
- match = '`kernel` must be one of'
- with pytest.raises(ValueError, match=match):
- self.build(y, d, kernel='test')
- def test_epsilon_not_specified_error(self):
- y = np.linspace(0, 1, 5)[:, None]
- d = np.zeros(5)
- for kernel in _AVAILABLE:
- if kernel in _SCALE_INVARIANT:
- continue
- match = '`epsilon` must be specified'
- with pytest.raises(ValueError, match=match):
- self.build(y, d, kernel=kernel)
- def test_x_not_2d_error(self):
- y = np.linspace(0, 1, 5)[:, None]
- x = np.linspace(0, 1, 5)
- d = np.zeros(5)
- match = '`x` must be a 2-dimensional array.'
- with pytest.raises(ValueError, match=match):
- self.build(y, d)(x)
- def test_not_enough_observations_error(self):
- y = np.linspace(0, 1, 1)[:, None]
- d = np.zeros(1)
- match = 'At least 2 data points are required'
- with pytest.raises(ValueError, match=match):
- self.build(y, d, kernel='thin_plate_spline')
- def test_degree_warning(self):
- y = np.linspace(0, 1, 5)[:, None]
- d = np.zeros(5)
- for kernel, deg in _NAME_TO_MIN_DEGREE.items():
- # Only test for kernels that its minimum degree is not 0.
- if deg >= 1:
- match = f'`degree` should not be below {deg}'
- with pytest.warns(Warning, match=match):
- self.build(y, d, epsilon=1.0, kernel=kernel, degree=deg-1)
- def test_minus_one_degree(self):
- # Make sure a degree of -1 is accepted without any warning.
- y = np.linspace(0, 1, 5)[:, None]
- d = np.zeros(5)
- for kernel, _ in _NAME_TO_MIN_DEGREE.items():
- self.build(y, d, epsilon=1.0, kernel=kernel, degree=-1)
- @skip_xp_backends("jax.numpy", reason="solve raises no error for a singular matrix")
- @skip_xp_backends("cupy", reason="solve raises no error for a singular matrix")
- def test_rank_error(self, xp):
- # An error should be raised when `kernel` is "thin_plate_spline" and
- # observations are 2-D and collinear.
- y = xp.asarray([[2.0, 0.0], [1.0, 0.0], [0.0, 0.0]])
- d = xp.asarray([0.0, 0.0, 0.0])
- match = 'does not have full column rank'
- with pytest.raises(LinAlgError, match=match):
- self.build(y, d, kernel='thin_plate_spline')(y)
- def test_single_point(self, xp):
- # Make sure interpolation still works with only one point (in 1, 2, and
- # 3 dimensions).
- for dim in [1, 2, 3]:
- y = xp.zeros((1, dim))
- d = xp.ones((1,), dtype=xp.float64)
- f = self.build(y, d, kernel='linear')(y)
- xp_assert_close(f, d)
- def test_pickleable(self, xp):
- # Make sure we can pickle and unpickle the interpolant without any
- # changes in the behavior.
- seq = Halton(1, scramble=False, seed=np.random.RandomState(2305982309))
- x = 3*seq.random(50)
- xitp = 3*seq.random(50)
- x, xitp = xp.asarray(x), xp.asarray(xitp)
- y = _1d_test_function(x, xp)
- interp = self.build(x, y)
- yitp1 = interp(xitp)
- yitp2 = pickle.loads(pickle.dumps(interp))(xitp)
- xp_assert_close(yitp1, yitp2, atol=1e-16)
- @make_xp_test_case(RBFInterpolator)
- class TestRBFInterpolatorNeighborsNone(_TestRBFInterpolator):
- def build(self, *args, **kwargs):
- return RBFInterpolator(*args, **kwargs)
- def test_smoothing_limit_1d(self):
- # For large smoothing parameters, the interpolant should approach a
- # least squares fit of a polynomial with the specified degree.
- seq = Halton(1, scramble=False, seed=np.random.RandomState())
- degree = 3
- smoothing = 1e8
- x = 3*seq.random(50)
- xitp = 3*seq.random(50)
- y = _1d_test_function(x, np)
- yitp1 = self.build(
- x, y,
- degree=degree,
- smoothing=smoothing
- )(xitp)
- P = _vandermonde(x, degree)
- Pitp = _vandermonde(xitp, degree)
- yitp2 = Pitp.dot(np.linalg.lstsq(P, y, rcond=None)[0])
- xp_assert_close(yitp1, yitp2, atol=1e-8)
- def test_smoothing_limit_2d(self):
- # For large smoothing parameters, the interpolant should approach a
- # least squares fit of a polynomial with the specified degree.
- seq = Halton(2, scramble=False, seed=np.random.RandomState())
- degree = 3
- smoothing = 1e8
- x = seq.random(100)
- xitp = seq.random(100)
- y = _2d_test_function(x, np)
- yitp1 = self.build(
- x, y,
- degree=degree,
- smoothing=smoothing
- )(xitp)
- P = _vandermonde(x, degree)
- Pitp = _vandermonde(xitp, degree)
- yitp2 = Pitp.dot(np.linalg.lstsq(P, y, rcond=None)[0])
- xp_assert_close(yitp1, yitp2, atol=1e-8)
- @skip_xp_backends(np_only=True, reason="neighbors not None uses KDTree")
- class TestRBFInterpolatorNeighbors20(_TestRBFInterpolator):
- # RBFInterpolator using 20 nearest neighbors.
- def build(self, *args, **kwargs):
- return RBFInterpolator(*args, **kwargs, neighbors=20)
- def test_equivalent_to_rbf_interpolator(self):
- seq = Halton(2, scramble=False, seed=np.random.RandomState())
- x = seq.random(100)
- xitp = seq.random(100)
- y = _2d_test_function(x, np)
- yitp1 = self.build(x, y)(xitp)
- yitp2 = []
- tree = cKDTree(x)
- for xi in xitp:
- _, nbr = tree.query(xi, 20)
- yitp2.append(RBFInterpolator(x[nbr], y[nbr])(xi[None])[0])
- xp_assert_close(yitp1, yitp2, atol=1e-8)
- def test_concurrency(self):
- # Check that no segfaults appear with concurrent access to
- # RbfInterpolator
- seq = Halton(2, scramble=False, seed=np.random.RandomState(0))
- x = seq.random(100)
- xitp = seq.random(100)
- y = _2d_test_function(x, np)
- interp = self.build(x, y)
- def worker_fn(_, interp, xp):
- interp(xp)
- _run_concurrent_barrier(10, worker_fn, interp, xitp)
- @skip_xp_backends(np_only=True, reason="neighbors not None uses KDTree")
- class TestRBFInterpolatorNeighborsInf(TestRBFInterpolatorNeighborsNone):
- # RBFInterpolator using neighbors=np.inf. This should give exactly the same
- # results as neighbors=None, but it will be slower.
- def build(self, *args, **kwargs):
- return RBFInterpolator(*args, **kwargs, neighbors=np.inf)
- def test_equivalent_to_rbf_interpolator(self):
- seq = Halton(1, scramble=False, seed=np.random.RandomState())
- x = 3*seq.random(50)
- xitp = 3*seq.random(50)
- y = _1d_test_function(x, np)
- yitp1 = self.build(x, y)(xitp)
- yitp2 = RBFInterpolator(x, y)(xitp)
- xp_assert_close(yitp1, yitp2, atol=1e-8)
|