# This file is part of h5py, a Python interface to the HDF5 library. # # http://www.h5py.org # # Copyright 2008-2013 Andrew Collette and contributors # # License: Standard 3-clause BSD; see "license.txt" for full license terms # and contributor agreement. """ Dataset testing operations. Tests all dataset operations, including creation, with the exception of: 1. Slicing operations for read and write, handled by module test_slicing 2. Type conversion for read and write (currently untested) """ import pathlib import os import sys import numpy as np import platform import pytest import threading from concurrent.futures import ThreadPoolExecutor from h5py import File, Dataset from h5py._hl.base import is_empty_dataspace, product from h5py import h5f, h5t import h5py from .common import ut, TestCase, NUMPY_RELEASE_VERSION, is_main_thread, make_name from .data_files import get_data_file_path from ..h5py_warnings import H5pyDeprecationWarning class BaseDataset(TestCase): def setUp(self): self.f = File(self.mktemp(), 'w') def tearDown(self): if self.f: self.f.close() class TestRepr(BaseDataset): """ Feature: repr(Dataset) behaves sensibly """ endian_mark = '>' if sys.byteorder=='big' else '<' def test_repr_basic(self): name = make_name() ds = self.f.create_dataset(name, (4,), dtype='int32') assert repr(ds) == f'' @pytest.mark.thread_unsafe def test_repr_closed(self): """ repr() works on live and dead datasets """ ds = self.f.create_dataset(make_name(), (4,), dtype="f4") self.f.close() assert repr(ds) == '' def test_repr_anonymous(self): ds = self.f.create_dataset(None, (4,), dtype='int32') assert repr(ds) == f'' class TestCreateShape(BaseDataset): """ Feature: Datasets can be created from a shape only """ def test_create_scalar(self): """ Create a scalar dataset """ dset = self.f.create_dataset(make_name(), (), dtype='f4') self.assertEqual(dset.shape, ()) def test_create_simple(self): """ Create a size-1 dataset """ dset = self.f.create_dataset(make_name(), (1,), dtype='f4') self.assertEqual(dset.shape, (1,)) def test_create_integer(self): """ Create a size-1 dataset with integer shape""" dset = self.f.create_dataset(make_name(), 1, dtype='f4') self.assertEqual(dset.shape, (1,)) def test_create_extended_1d(self): """ Create an extended dataset with tuple shape """ dset = self.f.create_dataset(make_name(), (63,), dtype='f4') self.assertEqual(dset.shape, (63,)) self.assertEqual(dset.size, 63) def test_create_extended_2d(self): """ Create an extended dataset with 2 dimensions """ dset = self.f.create_dataset(make_name(), (6, 10), dtype='f4') self.assertEqual(dset.shape, (6, 10)) self.assertEqual(dset.size, (60)) def test_create_integer_extended(self): """ Create an extended dataset with integer shape """ dset = self.f.create_dataset(make_name(), 63, dtype='f4') self.assertEqual(dset.shape, (63,)) self.assertEqual(dset.size, 63) def test_default_dtype(self): """ Confirm that the default dtype is float """ dset = self.f.create_dataset(make_name(), (63,), dtype='f4') self.assertEqual(dset.dtype, np.dtype('=f4')) def test_missing_shape(self): """ Missing shape raises TypeError """ with self.assertRaises(TypeError): self.f.create_dataset(make_name()) def test_long_double(self): """ Confirm that the default dtype is float """ dset = self.f.create_dataset(make_name(), (63,), dtype=np.longdouble) if platform.machine() in ['ppc64le']: pytest.xfail("Storage of long double deactivated on %s" % platform.machine()) self.assertEqual(dset.dtype, np.longdouble) @ut.skipIf(not hasattr(np, "complex256"), "No support for complex256") def test_complex256(self): """ Confirm that the default dtype is float """ dset = self.f.create_dataset(make_name(), (63,), dtype=np.dtype('complex256')) self.assertEqual(dset.dtype, np.dtype('complex256')) def test_name_bytes(self): dset = self.f.create_dataset(make_name("foo").encode('utf-8'), (1,), dtype='f4') self.assertEqual(dset.shape, (1,)) dset2 = self.f.create_dataset((make_name("bar{}/baz")).encode('utf-8'), (2,), dtype='f4') self.assertEqual(dset2.shape, (2,)) def test_no_dtype(self): # From h5py 4.0, either dtype or data will be required with pytest.warns(H5pyDeprecationWarning): dset = self.f.create_dataset(make_name(), (5,)) assert dset.dtype == np.dtype('f4') class TestCreateData(BaseDataset): """ Feature: Datasets can be created from existing data """ def test_create_scalar(self): """ Create a scalar dataset from existing array """ data = np.ones((), 'f') dset = self.f.create_dataset(make_name(), data=data) self.assertEqual(dset.shape, data.shape) def test_create_extended(self): """ Create an extended dataset from existing data """ data = np.ones((63,), 'f') dset = self.f.create_dataset(make_name(), data=data) self.assertEqual(dset.shape, data.shape) def test_dataset_intermediate_group(self): """ Create dataset with missing intermediate groups """ name = make_name("/foo{}/bar/baz") ds = self.f.create_dataset(name, shape=(10, 10), dtype=' 0 si = ds.get_chunk_info_by_coord((0, 0)) assert si.chunk_offset == (0, 0) assert si.filter_mask == 0 assert si.byte_offset is not None assert si.size > 0 @ut.skipUnless(h5py.version.hdf5_version_tuple >= (1, 12, 3) or (h5py.version.hdf5_version_tuple >= (1, 10, 10) and h5py.version.hdf5_version_tuple < (1, 10, 99)), "chunk iteration requires HDF5 1.10.10 and later 1.10, or 1.12.3 and later") def test_chunk_iter(): """H5Dchunk_iter() for chunk information""" from io import BytesIO buf = BytesIO() name = make_name() with h5py.File(buf, 'w') as f: f.create_dataset(name, shape=(100, 100), chunks=(10, 10), dtype='i4') f[name][:] = 1 buf.seek(0) with h5py.File(buf, 'r') as f: dsid = f[name].id num_chunks = dsid.get_num_chunks() assert num_chunks == 100 ci = {} for j in range(num_chunks): si = dsid.get_chunk_info(j) ci[si.chunk_offset] = si def callback(chunk_info): known = ci[chunk_info.chunk_offset] assert chunk_info.chunk_offset == known.chunk_offset assert chunk_info.filter_mask == known.filter_mask assert chunk_info.byte_offset == known.byte_offset assert chunk_info.size == known.size dsid.chunk_iter(callback) def test_empty_shape(writable_file): ds = writable_file.create_dataset(make_name(), dtype='int32') assert ds.shape is None assert ds.maxshape is None def test_zero_storage_size(): # https://github.com/h5py/h5py/issues/1475 from io import BytesIO buf = BytesIO() with h5py.File(buf, 'w') as fout: fout.create_dataset('empty', dtype='uint8') buf.seek(0) with h5py.File(buf, 'r') as fin: assert fin['empty'].chunks is None assert fin['empty'].id.get_offset() is None assert fin['empty'].id.get_storage_size() == 0 def test_python_int_uint64(writable_file): # https://github.com/h5py/h5py/issues/1547 data = [np.iinfo(np.int64).max, np.iinfo(np.int64).max + 1] # Check creating a new dataset ds = writable_file.create_dataset(make_name(), data=data, dtype=np.uint64) assert ds.dtype == np.dtype(np.uint64) np.testing.assert_array_equal(ds[:], np.array(data, dtype=np.uint64)) # Check writing to an existing dataset ds[:] = data np.testing.assert_array_equal(ds[:], np.array(data, dtype=np.uint64)) def test_setitem_fancy_indexing(writable_file): # https://github.com/h5py/h5py/issues/1593 arr = writable_file.create_dataset(make_name(), (5, 1000, 2), dtype=np.uint8) block = np.random.randint(255, size=(5, 3, 2)) arr[:, [0, 2, 4], ...] = block def test_vlen_spacepad(): with File(get_data_file_path("vlen_string_dset.h5")) as f: assert f["DS1"][0] == b"Parting" def test_vlen_nullterm(): with File(get_data_file_path("vlen_string_dset_utc.h5")) as f: assert f["ds1"][0] == b"2009-12-20T10:16:18.662409Z" def test_allow_unknown_filter(writable_file): # apparently 256-511 are reserved for testing purposes fake_filter_id = 256 ds = writable_file.create_dataset( make_name(), shape=(10, 10), dtype=np.uint8, compression=fake_filter_id, allow_unknown_filter=True ) assert str(fake_filter_id) in ds._filters assert ds.compression == 'unknown' def test_dset_chunk_cache(): """Chunk cache configuration for individual datasets.""" from io import BytesIO buf = BytesIO() name = make_name() with h5py.File(buf, 'w') as fout: ds = fout.create_dataset( name, shape=(10, 20), chunks=(5, 4), dtype='i4', rdcc_nbytes=2 * 1024 * 1024, rdcc_w0=0.2, rdcc_nslots=997) ds_chunk_cache = ds.id.get_access_plist().get_chunk_cache() assert fout.id.get_access_plist().get_cache()[1:] != ds_chunk_cache assert ds_chunk_cache == (997, 2 * 1024 * 1024, 0.2) buf.seek(0) with h5py.File(buf, 'r') as fin: ds = fin.require_dataset( name, shape=(10, 20), dtype='i4', rdcc_nbytes=3 * 1024 * 1024, rdcc_w0=0.67, rdcc_nslots=709) ds_chunk_cache = ds.id.get_access_plist().get_chunk_cache() assert fin.id.get_access_plist().get_cache()[1:] != ds_chunk_cache assert ds_chunk_cache == (709, 3 * 1024 * 1024, 0.67) class TestCommutative(BaseDataset): """ Test the symmetry of operators, at least with the numpy types. Issue: https://github.com/h5py/h5py/issues/1947 """ def test_numpy_commutative(self,): """ Create a h5py dataset, extract one element convert to numpy Check that it returns symmetric response to == and != """ shape = (100,1) dset = self.f.create_dataset(make_name(), shape, dtype=float, data=np.random.rand(*shape)) # grab a value from the elements, ie dset[0, 0] # check that mask arrays are commutative wrt ==, != val = np.float64(dset[0, 0]) assert np.all((val == dset) == (dset == val)) assert np.all((val != dset) == (dset != val)) # generate sample not in the dset, ie max(dset)+delta # check that mask arrays are commutative wrt ==, != delta = 0.001 nval = np.nanmax(dset)+delta assert np.all((nval == dset) == (dset == nval)) assert np.all((nval != dset) == (dset != nval)) def test_basetype_commutative(self,): """ Create a h5py dataset and check basetype compatibility. Check that operation is symmetric, even if it is potentially not meaningful. """ shape = (100,1) dset = self.f.create_dataset(make_name(), shape, dtype=float, data=np.random.rand(*shape)) # generate float type, sample float(0.) # check that operation is symmetric (but potentially meaningless) val = float(0.) assert (val == dset) == (dset == val) assert (val != dset) == (dset != val) class TestVirtualPrefix(BaseDataset): """ Test setting virtual prefix """ def test_virtual_prefix_create(self): shape = (100,1) virtual_prefix = "/path/to/virtual" dset = self.f.create_dataset(make_name(), shape, dtype=float, data=np.random.rand(*shape), virtual_prefix = virtual_prefix) virtual_prefix_readback = pathlib.Path(dset.id.get_access_plist().get_virtual_prefix().decode()).as_posix() assert virtual_prefix_readback == virtual_prefix def test_virtual_prefix_require(self): virtual_prefix = "/path/to/virtual" dset = self.f.require_dataset(make_name(), (10, 3), 'f', virtual_prefix = virtual_prefix) virtual_prefix_readback = pathlib.Path(dset.id.get_access_plist().get_virtual_prefix().decode()).as_posix() self.assertEqual(virtual_prefix, virtual_prefix_readback) self.assertIsInstance(dset, Dataset) self.assertEqual(dset.shape, (10, 3)) def ds_str(file, shape=(10, )): dt = h5py.string_dtype(encoding='ascii') fill_value = b'fill' return file.create_dataset(make_name(), shape, dtype=dt, fillvalue=fill_value) def ds_fields(file, shape=(10, )): dt = np.dtype([ ('foo', h5py.string_dtype(encoding='ascii')), ('bar', np.float64), ]) fill_value = np.asarray(('fill', 0.0), dtype=dt) name = make_name() file[name] = np.broadcast_to(fill_value, shape) return file[name] view_getters = pytest.mark.parametrize( "view_getter,make_ds", [ (lambda ds: ds, ds_str), (lambda ds: ds.astype(dtype=object), ds_str), (lambda ds: ds.asstr(), ds_str), (lambda ds: ds.fields("foo"), ds_fields), ], ids=["ds", "astype", "asstr", "fields"], ) COPY_IF_NEEDED = False if NUMPY_RELEASE_VERSION < (2, 0) else None @pytest.mark.parametrize("copy", [True, COPY_IF_NEEDED]) @view_getters def test_array_copy(view_getter, make_ds, copy, writable_file): ds = make_ds(writable_file) view = view_getter(ds) np.array(view, copy=copy) @pytest.mark.skipif( NUMPY_RELEASE_VERSION < (2, 0), reason="forbidding copies requires numpy 2", ) @view_getters def test_array_copy_false(view_getter, make_ds, writable_file): ds = make_ds(writable_file) view = view_getter(ds) with pytest.raises(ValueError, match="memory allocation cannot be avoided"): np.array(view, copy=False) @view_getters def test_array_dtype(view_getter, make_ds, writable_file): ds = make_ds(writable_file) view = view_getter(ds) assert np.array(view, dtype='|S10').dtype == np.dtype('|S10') @view_getters def test_array_scalar(view_getter, make_ds, writable_file): ds = make_ds(writable_file, shape=()) view = view_getter(ds) assert isinstance(view[()], (bytes, str)) assert np.array(view).shape == () @view_getters def test_array_nd(view_getter, make_ds, writable_file): ds = make_ds(writable_file, shape=(5, 6)) view = view_getter(ds) assert np.array(view).shape == (5, 6) @view_getters def test_view_properties(view_getter, make_ds, writable_file): ds = make_ds(writable_file, shape=(5, 6)) view = view_getter(ds) assert view.dtype == np.dtype(object) assert view.ndim == 2 assert view.shape == (5, 6) assert view.size == 30 assert len(view) == 5 @pytest.mark.thread_unsafe(reason="spawns thread pool itself") def test_concurrent_dataset_creation(writable_file): N_THREADS = 25 N_DATASETS_PER_THREAD = 5 # Defines a thread barrier that will be spawned before parallel execution # this increases the probability of concurrent access clashes. barrier = threading.Barrier(N_THREADS) def closure(ithread): # Ensure that all threads reach this point before concurrent execution. barrier.wait() for j in range(N_DATASETS_PER_THREAD): writable_file.create_dataset(f'concurrent_{ithread:02d}_{j:02d}', (1000,), dtype='i4') with ThreadPoolExecutor(max_workers=N_THREADS) as executor: futures = [executor.submit(closure, ithread) for ithread in range(N_THREADS)] [f.result() for f in futures] expected = set(f'concurrent_{i:02d}_{j:02d}' for i in range(N_THREADS) for j in range(N_DATASETS_PER_THREAD)) assert set(writable_file) == expected def test_filter_properties(writable_file): name = make_name() ds = writable_file.create_dataset( name, shape=1000, dtype=np.float32, fletcher32=True, shuffle=True, compression='lzf' ) assert ds.filter_ids == ( h5py.h5z.FILTER_SHUFFLE, h5py.h5z.FILTER_LZF, h5py.h5z.FILTER_FLETCHER32 ) assert ds.filter_names == ('shuffle', 'lzf', 'fletcher32') def test_store_refs(writable_file): ds1 = writable_file.create_dataset(make_name("foo"), data=np.arange(12)) refs_ds = writable_file.create_dataset(make_name("refs"), data=[writable_file.ref, ds1.ref]) assert isinstance(refs_ds[0], h5py.Reference) assert writable_file[refs_ds[0]] == writable_file assert isinstance(refs_ds[1], h5py.Reference) assert writable_file[refs_ds[1]] == ds1 # Single reference ref_scalar_ds = writable_file.create_dataset(make_name("ref_scalar"), data=ds1.ref) assert isinstance(ref_scalar_ds[()], h5py.h5r.Reference) assert writable_file[ref_scalar_ds[()]] == ds1 def test_store_regionrefs(writable_file): ds1 = writable_file.create_dataset(make_name("foo"), data=np.arange(12)) regionrefs_ds = writable_file.create_dataset(make_name("regrefs"), data=[ ds1.regionref[:-1], ds1.regionref[1:] ]) assert isinstance(regionrefs_ds[0], h5py.RegionReference) np.testing.assert_array_equal(ds1[regionrefs_ds[0]], np.arange(11)) np.testing.assert_array_equal(ds1[regionrefs_ds[1]], np.arange(1, 12)) refs_ds = writable_file.create_dataset(make_name("refs"), shape=(1,), dtype=h5py.ref_dtype) with pytest.raises(TypeError, match="convert"): refs_ds[0] = ds1.regionref[:6]