| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365 |
- # This file is part of h5py, a Python interface to the HDF5 library.
- #
- # http://www.h5py.org
- #
- # Copyright 2008-2013 Andrew Collette and contributors
- #
- # License: Standard 3-clause BSD; see "license.txt" for full license terms
- # and contributor agreement.
- """
- Dataset testing operations.
- Tests all dataset operations, including creation, with the exception of:
- 1. Slicing operations for read and write, handled by module test_slicing
- 2. Type conversion for read and write (currently untested)
- """
- import pathlib
- import os
- import sys
- import numpy as np
- import platform
- import pytest
- import threading
- from concurrent.futures import ThreadPoolExecutor
- from h5py import File, Dataset
- from h5py._hl.base import is_empty_dataspace, product
- from h5py import h5f, h5t
- import h5py
- from .common import ut, TestCase, NUMPY_RELEASE_VERSION, is_main_thread, make_name
- from .data_files import get_data_file_path
- from ..h5py_warnings import H5pyDeprecationWarning
- class BaseDataset(TestCase):
- def setUp(self):
- self.f = File(self.mktemp(), 'w')
- def tearDown(self):
- if self.f:
- self.f.close()
- class TestRepr(BaseDataset):
- """
- Feature: repr(Dataset) behaves sensibly
- """
- endian_mark = '>' if sys.byteorder=='big' else '<'
- def test_repr_basic(self):
- name = make_name()
- ds = self.f.create_dataset(name, (4,), dtype='int32')
- assert repr(ds) == f'<HDF5 dataset "{name}": shape (4,), type "{self.endian_mark}i4">'
- @pytest.mark.thread_unsafe
- def test_repr_closed(self):
- """ repr() works on live and dead datasets """
- ds = self.f.create_dataset(make_name(), (4,), dtype="f4")
- self.f.close()
- assert repr(ds) == '<Closed HDF5 dataset>'
- def test_repr_anonymous(self):
- ds = self.f.create_dataset(None, (4,), dtype='int32')
- assert repr(ds) == f'<HDF5 dataset (anonymous): shape (4,), type "{self.endian_mark}i4">'
- class TestCreateShape(BaseDataset):
- """
- Feature: Datasets can be created from a shape only
- """
- def test_create_scalar(self):
- """ Create a scalar dataset """
- dset = self.f.create_dataset(make_name(), (), dtype='f4')
- self.assertEqual(dset.shape, ())
- def test_create_simple(self):
- """ Create a size-1 dataset """
- dset = self.f.create_dataset(make_name(), (1,), dtype='f4')
- self.assertEqual(dset.shape, (1,))
- def test_create_integer(self):
- """ Create a size-1 dataset with integer shape"""
- dset = self.f.create_dataset(make_name(), 1, dtype='f4')
- self.assertEqual(dset.shape, (1,))
- def test_create_extended_1d(self):
- """ Create an extended dataset with tuple shape """
- dset = self.f.create_dataset(make_name(), (63,), dtype='f4')
- self.assertEqual(dset.shape, (63,))
- self.assertEqual(dset.size, 63)
- def test_create_extended_2d(self):
- """ Create an extended dataset with 2 dimensions """
- dset = self.f.create_dataset(make_name(), (6, 10), dtype='f4')
- self.assertEqual(dset.shape, (6, 10))
- self.assertEqual(dset.size, (60))
- def test_create_integer_extended(self):
- """ Create an extended dataset with integer shape """
- dset = self.f.create_dataset(make_name(), 63, dtype='f4')
- self.assertEqual(dset.shape, (63,))
- self.assertEqual(dset.size, 63)
- def test_default_dtype(self):
- """ Confirm that the default dtype is float """
- dset = self.f.create_dataset(make_name(), (63,), dtype='f4')
- self.assertEqual(dset.dtype, np.dtype('=f4'))
- def test_missing_shape(self):
- """ Missing shape raises TypeError """
- with self.assertRaises(TypeError):
- self.f.create_dataset(make_name())
- def test_long_double(self):
- """ Confirm that the default dtype is float """
- dset = self.f.create_dataset(make_name(), (63,), dtype=np.longdouble)
- if platform.machine() in ['ppc64le']:
- pytest.xfail("Storage of long double deactivated on %s" % platform.machine())
- self.assertEqual(dset.dtype, np.longdouble)
- @ut.skipIf(not hasattr(np, "complex256"), "No support for complex256")
- def test_complex256(self):
- """ Confirm that the default dtype is float """
- dset = self.f.create_dataset(make_name(), (63,),
- dtype=np.dtype('complex256'))
- self.assertEqual(dset.dtype, np.dtype('complex256'))
- def test_name_bytes(self):
- dset = self.f.create_dataset(make_name("foo").encode('utf-8'), (1,), dtype='f4')
- self.assertEqual(dset.shape, (1,))
- dset2 = self.f.create_dataset((make_name("bar{}/baz")).encode('utf-8'), (2,), dtype='f4')
- self.assertEqual(dset2.shape, (2,))
- def test_no_dtype(self):
- # From h5py 4.0, either dtype or data will be required
- with pytest.warns(H5pyDeprecationWarning):
- dset = self.f.create_dataset(make_name(), (5,))
- assert dset.dtype == np.dtype('f4')
- class TestCreateData(BaseDataset):
- """
- Feature: Datasets can be created from existing data
- """
- def test_create_scalar(self):
- """ Create a scalar dataset from existing array """
- data = np.ones((), 'f')
- dset = self.f.create_dataset(make_name(), data=data)
- self.assertEqual(dset.shape, data.shape)
- def test_create_extended(self):
- """ Create an extended dataset from existing data """
- data = np.ones((63,), 'f')
- dset = self.f.create_dataset(make_name(), data=data)
- self.assertEqual(dset.shape, data.shape)
- def test_dataset_intermediate_group(self):
- """ Create dataset with missing intermediate groups """
- name = make_name("/foo{}/bar/baz")
- ds = self.f.create_dataset(name, shape=(10, 10), dtype='<i4')
- self.assertIsInstance(ds, h5py.Dataset)
- self.assertTrue(name in self.f)
- def test_reshape(self):
- """ Create from existing data, and make it fit a new shape """
- data = np.arange(30, dtype='f')
- dset = self.f.create_dataset(make_name(), shape=(10, 3), data=data)
- self.assertEqual(dset.shape, (10, 3))
- self.assertArrayEqual(dset[...], data.reshape((10, 3)))
- def test_appropriate_low_level_id(self):
- " Binding Dataset to a non-DatasetID identifier fails with ValueError "
- with self.assertRaises(ValueError):
- Dataset(self.f['/'].id)
- def check_h5_string(self, dset, cset, length):
- tid = dset.id.get_type()
- assert isinstance(tid, h5t.TypeStringID)
- assert tid.get_cset() == cset
- if length is None:
- assert tid.is_variable_str()
- else:
- assert not tid.is_variable_str()
- assert tid.get_size() == length
- def test_create_bytestring(self):
- """ Creating dataset with byte string yields vlen ASCII dataset """
- def check_vlen_ascii(dset):
- self.check_h5_string(dset, h5t.CSET_ASCII, length=None)
- check_vlen_ascii(self.f.create_dataset(make_name("a"), data=b'abc'))
- check_vlen_ascii(self.f.create_dataset(make_name("b"), data=[b'abc', b'def']))
- check_vlen_ascii(self.f.create_dataset(make_name("c"), data=[[b'abc'], [b'def']]))
- check_vlen_ascii(self.f.create_dataset(
- make_name("d"), data=np.array([b'abc', b'def'], dtype=object)
- ))
- def test_create_np_s(self):
- dset = self.f.create_dataset(make_name(), data=np.array([b'abc', b'def'], dtype='S3'))
- self.check_h5_string(dset, h5t.CSET_ASCII, length=3)
- def test_create_strings(self):
- def check_vlen_utf8(dset):
- self.check_h5_string(dset, h5t.CSET_UTF8, length=None)
- check_vlen_utf8(self.f.create_dataset(make_name("a"), data='abc'))
- check_vlen_utf8(self.f.create_dataset(make_name("b"), data=['abc', 'def']))
- check_vlen_utf8(self.f.create_dataset(make_name("c"), data=[['abc'], ['def']]))
- check_vlen_utf8(self.f.create_dataset(
- make_name("d"), data=np.array(['abc', 'def'], dtype=object)
- ))
- def test_create_np_u(self):
- with self.assertRaises(TypeError):
- self.f.create_dataset(make_name(), data=np.array([b'abc', b'def'], dtype='U3'))
- def test_empty_create_via_None_shape(self):
- name = make_name()
- self.f.create_dataset(name, dtype='f')
- self.assertTrue(is_empty_dataspace(self.f[name].id))
- def test_empty_create_via_Empty_class(self):
- name = make_name()
- self.f.create_dataset(name, data=h5py.Empty(dtype='f'))
- self.assertTrue(is_empty_dataspace(self.f[name].id))
- def test_create_incompatible_data(self):
- # Shape tuple is incompatible with data
- with self.assertRaises(ValueError):
- self.f.create_dataset(make_name(), shape=4, data= np.arange(3))
- class TestReadDirectly:
- """
- Feature: Read data directly from Dataset into a Numpy array
- """
- @pytest.mark.parametrize(
- 'source_shape,dest_shape,source_sel,dest_sel',
- [
- ((100,), (100,), np.s_[0:10], np.s_[50:60]),
- ((70,), (100,), np.s_[50:60], np.s_[90:]),
- ((30, 10), (20, 20), np.s_[:20, :], np.s_[:, :10]),
- ((5, 7, 9), (6,), np.s_[2, :6, 3], np.s_[:]),
- ])
- def test_read_direct(self, writable_file, source_shape, dest_shape, source_sel, dest_sel):
- source_values = np.arange(product(source_shape), dtype="int64").reshape(source_shape)
- dset = writable_file.create_dataset(make_name(), source_shape, data=source_values)
- arr = np.full(dest_shape, -1, dtype="int64")
- expected = arr.copy()
- expected[dest_sel] = source_values[source_sel]
- dset.read_direct(arr, source_sel, dest_sel)
- np.testing.assert_array_equal(arr, expected)
- def test_no_sel(self, writable_file):
- dset = writable_file.create_dataset(make_name(), (10,), data=np.arange(10, dtype="int64"))
- arr = np.ones((10,), dtype="int64")
- dset.read_direct(arr)
- np.testing.assert_array_equal(arr, np.arange(10, dtype="int64"))
- def test_empty(self, writable_file):
- empty_dset = writable_file.create_dataset(make_name(), dtype='int64')
- arr = np.ones((100,), 'int64')
- with pytest.raises(TypeError):
- empty_dset.read_direct(arr, np.s_[0:10], np.s_[50:60])
- def test_wrong_shape(self, writable_file):
- dset = writable_file.create_dataset(make_name(), (100,), dtype='int64')
- arr = np.ones((200,))
- with pytest.raises(TypeError):
- dset.read_direct(arr)
- def test_not_c_contiguous(self, writable_file):
- dset = writable_file.create_dataset(make_name(), (10, 10), dtype='int64')
- arr = np.ones((10, 10), order='F')
- with pytest.raises(TypeError):
- dset.read_direct(arr)
- def test_zero_length(self, writable_file):
- shape = (0, 20)
- dset = writable_file.create_dataset(make_name(), shape, dtype=np.int64)
- arr = np.zeros(shape, dtype=np.int64)
- dset.read_direct(arr)
- # We should still get an error if the shape is wrong
- arr2 = np.zeros((0, 25), dtype=np.int64)
- with pytest.raises(TypeError):
- dset.read_direct(arr2)
- class TestWriteDirectly:
- """
- Feature: Write Numpy array directly into Dataset
- """
- @pytest.mark.parametrize(
- 'source_shape,dest_shape,source_sel,dest_sel',
- [
- ((100,), (100,), np.s_[0:10], np.s_[50:60]),
- ((70,), (100,), np.s_[50:60], np.s_[90:]),
- ((30, 10), (20, 20), np.s_[:20, :], np.s_[:, :10]),
- ((5, 7, 9), (6,), np.s_[2, :6, 3], np.s_[:]),
- ])
- def test_write_direct(self, writable_file, source_shape, dest_shape, source_sel, dest_sel):
- dset = writable_file.create_dataset(make_name(), dest_shape, dtype='int32', fillvalue=-1)
- arr = np.arange(product(source_shape)).reshape(source_shape)
- expected = np.full(dest_shape, -1, dtype='int32')
- expected[dest_sel] = arr[source_sel]
- dset.write_direct(arr, source_sel, dest_sel)
- np.testing.assert_array_equal(dset[:], expected)
- def test_empty(self, writable_file):
- empty_dset = writable_file.create_dataset(make_name(), dtype='int64')
- with pytest.raises(TypeError):
- empty_dset.write_direct(np.ones((100,)), np.s_[0:10], np.s_[50:60])
- def test_wrong_shape(self, writable_file):
- dset = writable_file.create_dataset(make_name(), (100,), dtype='int64')
- arr = np.ones((200,))
- with pytest.raises(TypeError):
- dset.write_direct(arr)
- def test_not_c_contiguous(self, writable_file):
- dset = writable_file.create_dataset(make_name(), (10, 10), dtype='int64')
- arr = np.ones((10, 10), order='F')
- with pytest.raises(TypeError):
- dset.write_direct(arr)
- class TestCreateRequire(BaseDataset):
- """
- Feature: Datasets can be created only if they don't exist in the file
- """
- def test_create(self):
- """ Create new dataset with no conflicts """
- dset = self.f.require_dataset(make_name(), (10, 3), 'f')
- self.assertIsInstance(dset, Dataset)
- self.assertEqual(dset.shape, (10, 3))
- def test_create_existing(self):
- """ require_dataset yields existing dataset """
- name = make_name()
- dset = self.f.require_dataset(name, (10, 3), 'f')
- dset[0, 0] = 123
- dset2 = self.f.require_dataset(name, (10, 3), 'f')
- self.assertEqual(dset, dset2)
- def test_create_1D_integer(self):
- """ require_dataset with integer shape yields existing dataset"""
- name = make_name()
- dset = self.f.require_dataset(name, 10, 'f')
- dset[0] = 123
- dset2 = self.f.require_dataset(name, 10, 'f')
- self.assertEqual(dset, dset2)
- def test_create_1D_tuple(self):
- name = make_name()
- dset = self.f.require_dataset(name, (10,), 'f')
- dset[0] = 123
- dset2 = self.f.require_dataset(name, 10, 'f')
- self.assertEqual(dset, dset2)
- def test_create_1D_binary(self):
- name = make_name()
- dset = self.f.require_dataset(name, 10, 'f')
- dset[0] = 123
- dset2 = self.f.require_dataset(name.encode('utf-8'), (10,), 'f')
- self.assertEqual(dset, dset2)
- def test_shape_conflict(self):
- """ require_dataset with shape conflict yields TypeError """
- name = make_name()
- self.f.create_dataset(name, (10, 3), 'f')
- with self.assertRaises(TypeError):
- self.f.require_dataset(name, (10, 4), 'f')
- def test_type_conflict(self):
- """ require_dataset with object type conflict yields TypeError """
- name = make_name()
- self.f.create_group(name)
- with self.assertRaises(TypeError):
- self.f.require_dataset(name, (10, 3), 'f')
- def test_dtype_conflict(self):
- """ require_dataset with dtype conflict (strict mode) yields TypeError
- """
- name = make_name()
- dset = self.f.create_dataset(name, (10, 3), 'f')
- with self.assertRaises(TypeError):
- self.f.require_dataset(name, (10, 3), 'S10')
- def test_dtype_exact(self):
- """ require_dataset with exactly dtype match """
- name = make_name()
- dset = self.f.create_dataset(name, (10, 3), 'f')
- dset[0, 0] = 123
- dset2 = self.f.require_dataset(name, (10, 3), 'f', exact=True)
- self.assertEqual(dset, dset2)
- def test_dtype_close(self):
- """ require_dataset with convertible type succeeds (non-strict mode)
- """
- name = make_name()
- dset = self.f.create_dataset(name, (10, 3), 'i4')
- # Set a value too large for i2 to test for spurious intermediate conversions
- dset[0, 0] = 98765
- dset2 = self.f.require_dataset(name, (10, 3), 'i2', exact=False)
- self.assertEqual(dset, dset2)
- self.assertEqual(dset2.dtype, np.dtype('i4'))
- class TestCreateChunked(BaseDataset):
- """
- Feature: Datasets can be created by manually specifying chunks
- """
- def test_create_chunks(self):
- """ Create via chunks tuple """
- dset = self.f.create_dataset(make_name(), shape=(100,), chunks=(10,), dtype='f4')
- self.assertEqual(dset.chunks, (10,))
- def test_create_chunks_integer(self):
- """ Create via chunks integer """
- dset = self.f.create_dataset(make_name(), shape=(100,), chunks=10, dtype='f4')
- self.assertEqual(dset.chunks, (10,))
- def test_chunks_mismatch(self):
- """ Illegal chunk size raises ValueError """
- with self.assertRaises(ValueError):
- self.f.create_dataset(make_name(), shape=(100,), chunks=(200,), dtype='f4')
- def test_chunks_false(self):
- """ Chunked format required for given storage options """
- with self.assertRaises(ValueError):
- self.f.create_dataset(make_name(), shape=(10,), maxshape=100, chunks=False, dtype='f4')
- def test_chunks_scalar(self):
- """ Attempting to create chunked scalar dataset raises TypeError """
- with self.assertRaises(TypeError):
- self.f.create_dataset(make_name(), shape=(), chunks=(50,), dtype='f4')
- def test_auto_chunks(self):
- """ Auto-chunking of datasets """
- dset = self.f.create_dataset(make_name(), shape=(20, 100), chunks=True, dtype='f4')
- self.assertIsInstance(dset.chunks, tuple)
- self.assertEqual(len(dset.chunks), 2)
- def test_auto_chunks_abuse(self):
- """ Auto-chunking with pathologically large element sizes """
- dset = self.f.create_dataset(make_name(), shape=(3,), dtype='S100000000', chunks=True)
- self.assertEqual(dset.chunks, (1,))
- def test_scalar_assignment(self):
- """ Test scalar assignment of chunked dataset """
- dset = self.f.create_dataset(make_name(), shape=(3, 50, 50),
- dtype=np.int32, chunks=(1, 50, 50))
- # test assignment of selection smaller than chunk size
- dset[1, :, 40] = 10
- self.assertTrue(np.all(dset[1, :, 40] == 10))
- # test assignment of selection equal to chunk size
- dset[1] = 11
- self.assertTrue(np.all(dset[1] == 11))
- # test assignment of selection bigger than chunk size
- dset[0:2] = 12
- self.assertTrue(np.all(dset[0:2] == 12))
- def test_auto_chunks_no_shape(self):
- """ Auto-chunking of empty datasets not allowed"""
- name = make_name()
- with pytest.raises(TypeError, match='Empty') as err:
- self.f.create_dataset(name, dtype='S100', chunks=True)
- with pytest.raises(TypeError, match='Empty') as err:
- self.f.create_dataset(name, dtype='S100', maxshape=20)
- class TestCreateFillvalue(BaseDataset):
- """
- Feature: Datasets can be created with fill value
- """
- def test_create_fillval(self):
- """ Fill value is reflected in dataset contents """
- dset = self.f.create_dataset(make_name(), (10,), fillvalue=4.0, dtype='f4')
- self.assertEqual(dset[0], 4.0)
- self.assertEqual(dset[7], 4.0)
- def test_property(self):
- """ Fill value is recoverable via property """
- dset = self.f.create_dataset(make_name(), (10,), fillvalue=3.0, dtype='f4')
- self.assertEqual(dset.fillvalue, 3.0)
- self.assertNotIsInstance(dset.fillvalue, np.ndarray)
- def test_property_none(self):
- """ .fillvalue property works correctly if not set """
- dset = self.f.create_dataset(make_name(), (10,), dtype='f4')
- self.assertEqual(dset.fillvalue, 0)
- def test_compound(self):
- """ Fill value works with compound types """
- dt = np.dtype([('a', 'f4'), ('b', 'i8')])
- v = np.ones((1,), dtype=dt)[0]
- dset = self.f.create_dataset(make_name(), (10,), dtype=dt, fillvalue=v)
- self.assertEqual(dset.fillvalue, v)
- self.assertAlmostEqual(dset[4], v)
- def test_exc(self):
- """ Bogus fill value raises ValueError """
- with self.assertRaises(ValueError):
- dset = self.f.create_dataset(make_name(), (10,),
- dtype=[('a', 'i'), ('b', 'f')], fillvalue=42)
- class TestFillTime(BaseDataset):
- """
- Feature: Datasets created with specified fill time property
- """
- def test_fill_time_default(self):
- """ Fill time default to IFSET """
- dset = self.f.create_dataset(make_name(), (10,), 'f4', fillvalue=4.0)
- plist = dset.id.get_create_plist()
- self.assertEqual(plist.get_fill_time(), h5py.h5d.FILL_TIME_IFSET)
- self.assertEqual(dset[0], 4.0)
- self.assertEqual(dset[7], 4.0)
- @ut.skipIf('gzip' not in h5py.filters.encode, "DEFLATE is not installed")
- def test_compressed_default(self):
- """ Fill time is IFSET for compressed dataset (chunked) """
- dset = self.f.create_dataset(make_name(), (10,), "f4", compression='gzip',
- fillvalue=4.0)
- plist = dset.id.get_create_plist()
- self.assertEqual(plist.get_fill_time(), h5py.h5d.FILL_TIME_IFSET)
- self.assertEqual(dset[0], 4.0)
- self.assertEqual(dset[7], 4.0)
- def test_fill_time_never(self):
- """ Fill time set to NEVER """
- dset = self.f.create_dataset(make_name(), (10,), "f4", fillvalue=4.0,
- fill_time='never')
- plist = dset.id.get_create_plist()
- self.assertEqual(plist.get_fill_time(), h5py.h5d.FILL_TIME_NEVER)
- # should not be equal to the explicitly set fillvalue
- self.assertNotEqual(dset[0], 4.0)
- self.assertNotEqual(dset[7], 4.0)
- def test_fill_time_alloc(self):
- """ Fill time explicitly set to ALLOC """
- dset = self.f.create_dataset(make_name(), (10,), "f4", fillvalue=4.0,
- fill_time='alloc')
- plist = dset.id.get_create_plist()
- self.assertEqual(plist.get_fill_time(), h5py.h5d.FILL_TIME_ALLOC)
- def test_fill_time_ifset(self):
- """ Fill time explicitly set to IFSET """
- dset = self.f.create_dataset(make_name(), (10,), 'f4', chunks=(2,), fillvalue=4.0,
- fill_time='ifset')
- plist = dset.id.get_create_plist()
- self.assertEqual(plist.get_fill_time(), h5py.h5d.FILL_TIME_IFSET)
- def test_invalid_fill_time(self):
- """ Choice of fill_time is 'alloc', 'never', 'ifset' """
- with self.assertRaises(ValueError):
- dset = self.f.create_dataset(make_name(), (10,), 'f4', fill_time='fill_bad')
- def test_non_str_fill_time(self):
- """ fill_time must be a string """
- with self.assertRaises(ValueError):
- dset = self.f.create_dataset(make_name(), (10,), 'f4', fill_time=2)
- def test_resize_chunk_fill_time_default(self):
- """ The resize dataset will be filled (by default fill value 0) """
- dset = self.f.create_dataset(make_name(), (50, ), 'f4', maxshape=(100, ),
- chunks=(5, ))
- plist = dset.id.get_create_plist()
- self.assertEqual(plist.get_fill_time(), h5py.h5d.FILL_TIME_IFSET)
- assert np.isclose(dset[:], 0.0).all()
- dset.resize((100, ))
- assert np.isclose(dset[:], 0.0).all()
- def test_resize_chunk_fill_time_never(self):
- """ The resize dataset won't be filled """
- dset = self.f.create_dataset(make_name(), (50, ), 'f4', maxshape=(100, ),
- fillvalue=4.0, fill_time='never',
- chunks=(5, ))
- plist = dset.id.get_create_plist()
- self.assertEqual(plist.get_fill_time(), h5py.h5d.FILL_TIME_NEVER)
- assert not np.isclose(dset[:], 4.0).any()
- dset.resize((100, ))
- assert not np.isclose(dset[:], 4.0).any()
- @pytest.mark.parametrize('dt,expected', [
- (int, 0),
- (np.int32, 0),
- (np.int64, 0),
- (float, 0.0),
- (np.float32, 0.0),
- (np.float64, 0.0),
- (h5py.string_dtype(encoding='utf-8', length=5), b''),
- (h5py.string_dtype(encoding='ascii', length=5), b''),
- (h5py.string_dtype(encoding='utf-8'), b''),
- (h5py.string_dtype(encoding='ascii'), b''),
- (h5py.string_dtype(), b''),
- ])
- def test_get_unset_fill_value(dt, expected, writable_file):
- dset = writable_file.create_dataset(make_name(), (10,), dtype=dt)
- assert dset.fillvalue == expected
- class TestCreateNamedType(BaseDataset):
- """
- Feature: Datasets created from an existing named type
- """
- def test_named(self):
- """ Named type object works and links the dataset to type """
- name = make_name("type")
- self.f[name] = np.dtype('f8')
- dt = self.f[name]
- dset = self.f.create_dataset(make_name("x"), (100,), dtype=dt)
- self.assertEqual(dset.dtype, np.dtype('f8'))
- self.assertEqual(dset.id.get_type(), dt.id)
- self.assertTrue(dset.id.get_type().committed())
- @ut.skipIf('gzip' not in h5py.filters.encode, "DEFLATE is not installed")
- class TestCreateGzip(BaseDataset):
- """
- Feature: Datasets created with gzip compression
- """
- def test_gzip(self):
- """ Create with explicit gzip options """
- dset = self.f.create_dataset(make_name(), (20, 30), 'f4', compression='gzip',
- compression_opts=9)
- self.assertEqual(dset.compression, 'gzip')
- self.assertEqual(dset.compression_opts, 9)
- def test_gzip_implicit(self):
- """ Create with implicit gzip level (level 4) """
- dset = self.f.create_dataset(make_name(), (20, 30), 'f4', compression='gzip')
- self.assertEqual(dset.compression, 'gzip')
- self.assertEqual(dset.compression_opts, 4)
- @pytest.mark.thread_unsafe(reason="monkey-patch")
- def test_gzip_number(self):
- """ Create with gzip level by specifying integer """
- name = make_name()
- dset = self.f.create_dataset(name, (20, 30), 'f4', compression=7)
- self.assertEqual(dset.compression, 'gzip')
- self.assertEqual(dset.compression_opts, 7)
- original_compression_vals = h5py._hl.dataset._LEGACY_GZIP_COMPRESSION_VALS
- try:
- h5py._hl.dataset._LEGACY_GZIP_COMPRESSION_VALS = tuple()
- with self.assertRaises(ValueError):
- dset = self.f.create_dataset(name, (20, 30), 'f4', compression=7)
- finally:
- h5py._hl.dataset._LEGACY_GZIP_COMPRESSION_VALS = original_compression_vals
- def test_gzip_exc(self):
- """ Illegal gzip level (explicit or implicit) raises ValueError """
- name = make_name()
- with self.assertRaises((ValueError, RuntimeError)):
- self.f.create_dataset(name, (20, 30), 'f4', compression=14)
- with self.assertRaises(ValueError):
- self.f.create_dataset(name, (20, 30), 'f4', compression=-4)
- with self.assertRaises(ValueError):
- self.f.create_dataset(name, (20, 30), 'f4', compression='gzip',
- compression_opts=14)
- @ut.skipIf('gzip' not in h5py.filters.encode, "DEFLATE is not installed")
- class TestCreateCompressionNumber(BaseDataset):
- """
- Feature: Datasets created with a compression code
- """
- @pytest.mark.thread_unsafe(reason="monkey-patch")
- def test_compression_number(self):
- """ Create with compression number of gzip (h5py.h5z.FILTER_DEFLATE) and a compression level of 7"""
- original_compression_vals = h5py._hl.dataset._LEGACY_GZIP_COMPRESSION_VALS
- try:
- h5py._hl.dataset._LEGACY_GZIP_COMPRESSION_VALS = tuple()
- dset = self.f.create_dataset('foo', (20, 30), compression=h5py.h5z.FILTER_DEFLATE, compression_opts=(7,), dtype='f4')
- finally:
- h5py._hl.dataset._LEGACY_GZIP_COMPRESSION_VALS = original_compression_vals
- self.assertEqual(dset.compression, 'gzip')
- self.assertEqual(dset.compression_opts, 7)
- @pytest.mark.thread_unsafe(reason="monkey-patch")
- def test_compression_number_invalid(self):
- """ Create with invalid compression numbers """
- with self.assertRaises(ValueError) as e:
- self.f.create_dataset('foo', (20, 30), compression=-999, dtype='f4')
- self.assertIn("Invalid filter", str(e.exception))
- with self.assertRaises(ValueError) as e:
- self.f.create_dataset('foo', (20, 30), compression=100, dtype='f4')
- self.assertIn("Unknown compression", str(e.exception))
- original_compression_vals = h5py._hl.dataset._LEGACY_GZIP_COMPRESSION_VALS
- try:
- h5py._hl.dataset._LEGACY_GZIP_COMPRESSION_VALS = tuple()
- # Using gzip compression requires a compression level specified in compression_opts
- with self.assertRaises(IndexError):
- self.f.create_dataset('foo', (20, 30), compression=h5py.h5z.FILTER_DEFLATE, dtype='f4')
- finally:
- h5py._hl.dataset._LEGACY_GZIP_COMPRESSION_VALS = original_compression_vals
- @ut.skipIf('lzf' not in h5py.filters.encode, "LZF is not installed")
- class TestCreateLZF(BaseDataset):
- """
- Feature: Datasets created with LZF compression
- """
- def test_lzf_sparse(self):
- """ Create with explicit lzf (empty chunks)"""
- dset = self.f.create_dataset(make_name(), (20, 30), compression='lzf', dtype='f4')
- self.assertEqual(dset.compression, 'lzf')
- self.assertEqual(dset.compression_opts, None)
- def test_lzf_dense(self):
- """ Create with explicit lzf (populated)"""
- name = make_name()
- testdata = np.arange(100)
- dset = self.f.create_dataset(name, data=testdata, compression='lzf')
- self.assertEqual(dset.compression, 'lzf')
- self.assertEqual(dset.compression_opts, None)
- self.f.flush() # Actually write to file
- readdata = self.f[name][()]
- self.assertArrayEqual(readdata, testdata)
- def test_lzf_exc(self):
- """ Giving lzf options raises ValueError """
- with self.assertRaises(ValueError):
- self.f.create_dataset(make_name(), (20, 30), 'f4', compression='lzf',
- compression_opts=4)
- @ut.skipIf('szip' not in h5py.filters.encode, "SZIP is not installed")
- class TestCreateSZIP(BaseDataset):
- """
- Feature: Datasets created with LZF compression
- """
- def test_szip(self):
- """ Create with explicit szip """
- dset = self.f.create_dataset(make_name(), (20, 30), 'f4', compression='szip',
- compression_opts=('ec', 16))
- @ut.skipIf('shuffle' not in h5py.filters.encode, "SHUFFLE is not installed")
- class TestCreateShuffle(BaseDataset):
- """
- Feature: Datasets can use shuffling filter
- """
- def test_shuffle(self):
- """ Enable shuffle filter """
- dset = self.f.create_dataset(make_name(), (20, 30), shuffle=True, dtype='f4')
- self.assertTrue(dset.shuffle)
- @ut.skipIf('fletcher32' not in h5py.filters.encode, "FLETCHER32 is not installed")
- class TestCreateFletcher32(BaseDataset):
- """
- Feature: Datasets can use the fletcher32 filter
- """
- def test_fletcher32(self):
- """ Enable fletcher32 filter """
- dset = self.f.create_dataset(make_name(), (20, 30), fletcher32=True, dtype='f4')
- self.assertTrue(dset.fletcher32)
- @ut.skipIf('scaleoffset' not in h5py.filters.encode, "SCALEOFFSET is not installed")
- class TestCreateScaleOffset(BaseDataset):
- """
- Feature: Datasets can use the scale/offset filter
- Note: loss of precision caused by scaleoffset only becomes visible
- when closing and reopening the File.
- Can't close/reopen the shared self.f in pytest-run-parallel.
- """
- def test_float_fails_without_options(self):
- """ Ensure that a scale factor is required for scaleoffset compression of floating point data """
- with self.assertRaises(ValueError):
- dset = self.f.create_dataset(make_name(), (20, 30), dtype=float, scaleoffset=True)
- def test_non_integer(self):
- """ Check when scaleoffset is negetive"""
- with self.assertRaises(ValueError):
- dset = self.f.create_dataset(make_name(), (20, 30), dtype=float, scaleoffset=-0.1)
- def test_unsupport_dtype(self):
- """ Check when dtype is unsupported type"""
- with self.assertRaises(TypeError):
- dset = self.f.create_dataset(make_name(), (20, 30), dtype=bool, scaleoffset=True)
- def test_float(self):
- """ Scaleoffset filter works for floating point data """
- scalefac = 4
- shape = (100, 300)
- range = 20 * 10 ** scalefac
- testdata = (np.random.rand(*shape) - 0.5) * range
- fname = self.mktemp()
- with h5py.File(fname, 'w') as f:
- dset = f.create_dataset(
- 'foo', shape, dtype=np.float64, scaleoffset=scalefac
- )
- # Dataset reports that scaleoffset is in use
- assert dset.scaleoffset is not None
- # Dataset round-trips
- dset[...] = testdata
- with h5py.File(fname, 'r') as f:
- readdata = f['foo'][...]
- # Test that data round-trips to requested precision
- self.assertArrayEqual(readdata, testdata, precision=10 ** (-scalefac))
- # Test that the filter is actually active (i.e. compression is lossy)
- assert not (readdata == testdata).all()
- def test_int(self):
- """ Scaleoffset filter works for integer data with default precision """
- nbits = 12
- shape = (100, 300)
- testdata = np.random.randint(0, 2 ** nbits - 1, size=shape, dtype=np.int64)
- fname = self.mktemp()
- with h5py.File(fname, 'w') as f:
- # Create dataset; note omission of nbits (for library-determined precision)
- dset = f.create_dataset('foo', shape, dtype=np.int64, scaleoffset=True)
- # Dataset reports scaleoffset enabled
- assert dset.scaleoffset is not None
- # Data round-trips correctly and identically
- dset[...] = testdata
- with h5py.File(fname, 'r') as f:
- readdata = f['foo'][...]
- self.assertArrayEqual(readdata, testdata)
- def test_int_with_minbits(self):
- """ Scaleoffset filter works for integer data with specified precision """
- nbits = 12
- shape = (100, 300)
- testdata = np.random.randint(0, 2 ** nbits, size=shape, dtype=np.int64)
- fname = self.mktemp()
- with h5py.File(fname, 'w') as f:
- dset = f.create_dataset('foo', shape, dtype=np.int64, scaleoffset=nbits)
- # Dataset reports scaleoffset enabled with correct precision
- self.assertTrue(dset.scaleoffset == 12)
- # Data round-trips correctly
- dset[...] = testdata
- with h5py.File(fname, 'r') as f:
- readdata = f['foo'][...]
- self.assertArrayEqual(readdata, testdata)
- def test_int_with_minbits_lossy(self):
- """ Scaleoffset filter works for integer data with specified precision """
- nbits = 12
- shape = (100, 300)
- testdata = np.random.randint(0, 2 ** (nbits + 1) - 1, size=shape, dtype=np.int64)
- fname = self.mktemp()
- with h5py.File(fname, 'w') as f:
- dset = f.create_dataset('foo', shape, dtype=np.int64, scaleoffset=nbits)
- # Dataset reports scaleoffset enabled with correct precision
- self.assertTrue(dset.scaleoffset == 12)
- # Data can be written and read
- dset[...] = testdata
- with h5py.File(fname, 'r') as f:
- readdata = f['foo'][...]
- # Compression is lossy
- assert not (readdata == testdata).all()
- class TestExternal(BaseDataset):
- """
- Feature: Datasets with the external storage property
- """
- def test_contents(self):
- """ Create and access an external dataset """
- shape = (6, 100)
- testdata = np.random.random(shape)
- # create a dataset in an external file and set it
- ext_file = self.mktemp()
- external = [(ext_file, 0, h5f.UNLIMITED)]
- # ${ORIGIN} should be replaced by the parent dir of the HDF5 file
- dset = self.f.create_dataset(make_name(), shape, dtype=testdata.dtype, external=external, efile_prefix="${ORIGIN}")
- dset[...] = testdata
- assert dset.external is not None
- # verify file's existence, size, and contents
- with open(ext_file, 'rb') as fid:
- contents = fid.read()
- assert contents == testdata.tobytes()
- efile_prefix = pathlib.Path(dset.id.get_access_plist().get_efile_prefix().decode()).as_posix()
- parent = pathlib.Path(self.f.filename).parent.as_posix()
- assert efile_prefix == parent
- def test_contents_efile_prefix(self):
- """ Create and access an external dataset using an efile_prefix"""
- name = make_name()
- shape = (6, 100)
- testdata = np.random.random(shape)
- # create a dataset in an external file and set it
- ext_file = self.mktemp()
- # set only the basename, let the efile_prefix do the rest
- external = [(os.path.basename(ext_file), 0, h5f.UNLIMITED)]
- dset = self.f.create_dataset(name, shape, dtype=testdata.dtype, external=external, efile_prefix=os.path.dirname(ext_file))
- dset[...] = testdata
- assert dset.external is not None
- # verify file's existence, size, and contents
- with open(ext_file, 'rb') as fid:
- contents = fid.read()
- assert contents == testdata.tobytes()
- # check efile_prefix
- efile_prefix = pathlib.Path(dset.id.get_access_plist().get_efile_prefix().decode()).as_posix()
- parent = pathlib.Path(ext_file).parent.as_posix()
- assert efile_prefix == parent
- dset2 = self.f.require_dataset(name, shape, testdata.dtype, efile_prefix=os.path.dirname(ext_file))
- assert dset2.external is not None
- dset2[()] == testdata
- def test_name_str(self):
- """ External argument may be a file name str only """
- self.f.create_dataset(make_name(), (6, 100), external=self.mktemp(), dtype='f4')
- def test_name_path(self):
- """ External argument may be a file name path only """
- self.f.create_dataset(make_name(), (6, 100), 'f4',
- external=pathlib.Path(self.mktemp()))
- def test_iter_multi(self):
- """ External argument may be an iterable of multiple tuples """
- ext_file = self.mktemp()
- N = 100
- external = iter((ext_file, x * 1000, 1000) for x in range(N))
- dset = self.f.create_dataset(make_name(), (6, 100), external=external, dtype='f4')
- assert len(dset.external) == N
- def test_invalid(self):
- """ Test with invalid external lists """
- shape = (6, 100)
- ext_file = self.mktemp()
- for exc_type, external in [
- (TypeError, [ext_file]),
- (TypeError, [ext_file, 0]),
- (TypeError, [ext_file, 0, h5f.UNLIMITED]),
- (ValueError, [(ext_file,)]),
- (ValueError, [(ext_file, 0)]),
- (ValueError, [(ext_file, 0, h5f.UNLIMITED, 0)]),
- (TypeError, [(ext_file, 0, "h5f.UNLIMITED")]),
- ]:
- with self.assertRaises(exc_type):
- self.f.create_dataset(make_name(), shape, external=external, dtype='f4')
- def test_create_expandable(self):
- """ Create expandable external dataset """
- ext_file = self.mktemp()
- shape = (128, 64)
- maxshape = (None, 64)
- exp_dset = self.f.create_dataset(
- make_name(), shape, 'f4', maxshape=maxshape, external=ext_file
- )
- assert exp_dset.chunks is None
- assert exp_dset.shape == shape
- assert exp_dset.maxshape == maxshape
- class TestAutoCreate(BaseDataset):
- """
- Feature: Datasets auto-created from data produce the correct types
- """
- def assert_string_type(self, ds, cset, variable=True):
- tid = ds.id.get_type()
- self.assertEqual(type(tid), h5py.h5t.TypeStringID)
- self.assertEqual(tid.get_cset(), cset)
- if variable:
- assert tid.is_variable_str()
- def test_vlen_bytes(self):
- """Assigning byte strings produces a vlen string ASCII dataset """
- x = make_name("x")
- y = make_name("y")
- z = make_name("z")
- self.f[x] = b"Hello there"
- self.assert_string_type(self.f[x], h5py.h5t.CSET_ASCII)
- self.f[y] = [b"a", b"bc"]
- self.assert_string_type(self.f[y], h5py.h5t.CSET_ASCII)
- self.f[z] = np.array([b"a", b"bc"], dtype=np.object_)
- self.assert_string_type(self.f[z], h5py.h5t.CSET_ASCII)
- def test_vlen_unicode(self):
- """Assigning unicode strings produces a vlen string UTF-8 dataset """
- x = make_name("x")
- y = make_name("y")
- z = make_name("z")
- self.f[x] = "Hello there" + chr(0x2034)
- self.assert_string_type(self.f[x], h5py.h5t.CSET_UTF8)
- self.f[y] = ["a", "bc"]
- self.assert_string_type(self.f[y], h5py.h5t.CSET_UTF8)
- # 2D array; this only works with an array, not nested lists
- self.f[z] = np.array([["a", "bc"]], dtype=np.object_)
- self.assert_string_type(self.f[z], h5py.h5t.CSET_UTF8)
- def test_string_fixed(self):
- """ Assignment of fixed-length byte string produces a fixed-length
- ascii dataset """
- name = make_name()
- self.f[name] = np.bytes_("Hello there")
- ds = self.f[name]
- self.assert_string_type(ds, h5py.h5t.CSET_ASCII, variable=False)
- self.assertEqual(ds.id.get_type().get_size(), 11)
- class TestCreateLike(BaseDataset):
- def test_no_chunks(self):
- x = make_name("x")
- y = make_name("y")
- self.f[x] = np.arange(25).reshape(5, 5)
- self.f.create_dataset_like(y, self.f[x])
- dslike = self.f[y]
- self.assertEqual(dslike.shape, (5, 5))
- self.assertIs(dslike.chunks, None)
- def test_track_times(self):
- x = make_name("x")
- y = make_name("y")
- z = make_name("z")
- w = make_name("w")
- orig = self.f.create_dataset(x, data=np.arange(12),
- track_times=True)
- self.assertNotEqual(0, h5py.h5g.get_objinfo(orig._id).mtime)
- similar = self.f.create_dataset_like(y, orig)
- self.assertNotEqual(0, h5py.h5g.get_objinfo(similar._id).mtime)
- orig = self.f.create_dataset(z, data=np.arange(12),
- track_times=False)
- self.assertEqual(0, h5py.h5g.get_objinfo(orig._id).mtime)
- similar = self.f.create_dataset_like(w, orig)
- self.assertEqual(0, h5py.h5g.get_objinfo(similar._id).mtime)
- def test_maxshape(self):
- """ Test when other.maxshape != other.shape """
- other = self.f.create_dataset(make_name("x"), (10,), maxshape=20, dtype='f4')
- similar = self.f.create_dataset_like(make_name("y"), other)
- self.assertEqual(similar.shape, (10,))
- self.assertEqual(similar.maxshape, (20,))
- class TestChunkIterator(BaseDataset):
- def test_no_chunks(self):
- dset = self.f.create_dataset(make_name(), (), dtype='f4')
- with self.assertRaises(TypeError):
- dset.iter_chunks()
- def test_rank_mismatch(self):
- dset = self.f.create_dataset(make_name(), shape=(100,), chunks=(32,), dtype='f4')
- with self.assertRaises(ValueError):
- dset.iter_chunks((slice(19,67), 9))
- def test_1d(self):
- dset = self.f.create_dataset(make_name(), (100,), chunks=(32,), dtype='f4')
- expected = ((slice(0,32,1),), (slice(32,64,1),), (slice(64,96,1),),
- (slice(96,100,1),))
- self.assertEqual(list(dset.iter_chunks()), list(expected))
- expected = ((slice(50,64,1),), (slice(64,96,1),), (slice(96,97,1),))
- self.assertEqual(list(dset.iter_chunks(np.s_[50:97])), list(expected))
- expected = ((slice(0,32,1),), (slice(32,50,1),))
- self.assertEqual(list(dset.iter_chunks(np.s_[:50])), list(expected))
- expected = ((slice(96,97,1),),)
- self.assertEqual(list(dset.iter_chunks(np.s_[96])), list(expected))
- def test_2d(self):
- dset = self.f.create_dataset(make_name(), (100,100), chunks=(32,64), dtype='f4')
- expected = (
- (slice(0, 32, 1), slice(0, 64, 1)),
- (slice(0, 32, 1), slice(64, 100, 1)),
- (slice(32, 64, 1), slice(0, 64, 1)),
- (slice(32, 64, 1), slice(64, 100, 1)),
- (slice(64, 96, 1), slice(0, 64, 1)),
- (slice(64, 96, 1), slice(64, 100, 1)),
- (slice(96, 100, 1), slice(0, 64, 1)),
- (slice(96, 100, 1), slice(64, 100, 1)),
- )
- self.assertEqual(list(dset.iter_chunks()), list(expected))
- expected = ((slice(48, 52, 1), slice(40, 50, 1)),)
- self.assertEqual(list(dset.iter_chunks(np.s_[48:52,40:50])), list(expected))
- expected = (
- (slice(0, 32, 1), slice(40, 64, 1)),
- (slice(0, 32, 1), slice(64, 100, 1)),
- (slice(32, 52, 1), slice(40, 64, 1)),
- (slice(32, 52, 1), slice(64, 100, 1)),
- )
- self.assertEqual(list(dset.iter_chunks(np.s_[:52,40:])), list(expected))
- expected = (
- (slice(96, 97, 1), slice(19, 64, 1)),
- (slice(96, 97, 1), slice(64, 67, 1)),
- )
- self.assertEqual(list(dset.iter_chunks(np.s_[96,19:67])), list(expected))
- def test_2d_partial_slice(self):
- dset = self.f.create_dataset(make_name(), (5,5), chunks=(2,2), dtype='f4')
- expected = ((slice(3, 4, 1), slice(3, 4, 1)),
- (slice(3, 4, 1), slice(4, 5, 1)),
- (slice(4, 5, 1), slice(3, 4, 1)),
- (slice(4, 5, 1), slice(4, 5, 1)))
- sel = slice(3,5)
- self.assertEqual(list(dset.iter_chunks((sel, sel))), list(expected))
- class TestResize(BaseDataset):
- """
- Feature: Datasets created with "maxshape" may be resized
- """
- def test_create(self):
- """ Create dataset with "maxshape" """
- dset = self.f.create_dataset(make_name(), (20, 30), maxshape=(20, 60), dtype='f4')
- self.assertIsNot(dset.chunks, None)
- self.assertEqual(dset.maxshape, (20, 60))
- def test_create_1D_integer_maxshape_tuple(self):
- """ Create dataset with "maxshape" using tuple shape and integer maxshape"""
- dset = self.f.create_dataset(make_name(), (20,), maxshape=20, dtype='f4')
- self.assertIsNot(dset.chunks, None)
- self.assertEqual(dset.maxshape, (20,))
- def test_create_1D_integer_maxshape_integer(self):
- """ Create dataset with "maxshape" using integer shape and integer maxshape"""
- dset = self.f.create_dataset(make_name(), 20, maxshape=20, dtype='f4')
- self.assertEqual(dset.maxshape, (20,))
- def test_resize(self):
- """ Datasets may be resized up to maxshape """
- dset = self.f.create_dataset(make_name(), (20, 30), maxshape=(20, 60), dtype='f4')
- self.assertEqual(dset.shape, (20, 30))
- dset.resize((20, 50))
- self.assertEqual(dset.shape, (20, 50))
- dset.resize((20, 60))
- self.assertEqual(dset.shape, (20, 60))
- def test_resize_1D(self):
- """ Datasets may be resized up to maxshape using integer maxshape"""
- dset = self.f.create_dataset(make_name(), 20, maxshape=40, dtype='f4')
- self.assertEqual(dset.shape, (20,))
- dset.resize((30,))
- self.assertEqual(dset.shape, (30,))
- def test_resize_over(self):
- """ Resizing past maxshape triggers an exception """
- dset = self.f.create_dataset(make_name(), (20, 30), maxshape=(20, 60), dtype='f4')
- with self.assertRaises(Exception):
- dset.resize((20, 70))
- def test_resize_nonchunked(self):
- """ Resizing non-chunked dataset raises TypeError """
- dset = self.f.create_dataset(make_name(), (20, 30), dtype='f4')
- with self.assertRaises(TypeError):
- dset.resize((20, 60))
- def test_resize_axis(self):
- """ Resize specified axis """
- dset = self.f.create_dataset(make_name(), (20, 30), maxshape=(20, 60), dtype='f4')
- dset.resize(50, axis=1)
- self.assertEqual(dset.shape, (20, 50))
- def test_axis_exc(self):
- """ Illegal axis raises ValueError """
- dset = self.f.create_dataset(make_name(), (20, 30), maxshape=(20, 60), dtype='f4')
- with self.assertRaises(ValueError):
- dset.resize(50, axis=2)
- def test_zero_dim(self):
- """ Allow zero-length initial dims for unlimited axes (issue 111) """
- dset = self.f.create_dataset(make_name(), (15, 0), maxshape=(15, None), dtype='f4')
- self.assertEqual(dset.shape, (15, 0))
- self.assertEqual(dset.maxshape, (15, None))
- class TestDtype(BaseDataset):
- """
- Feature: Dataset dtype is available as .dtype property
- """
- def test_dtype(self):
- """ Retrieve dtype from dataset """
- dset = self.f.create_dataset(make_name(), (5,), '|S10')
- self.assertEqual(dset.dtype, np.dtype('|S10'))
- def test_dtype_complex32(self):
- """ Retrieve dtype from complex float16 dataset (gh-2156) """
- # No native support in numpy as of v1.23.3, so expect compound type.
- complex32 = np.dtype([('r', np.float16), ('i', np.float16)])
- dset = self.f.create_dataset(make_name(), (5,), complex32)
- self.assertEqual(dset.dtype, complex32)
- class TestLen(BaseDataset):
- """
- Feature: Size of first axis is available via Python's len
- """
- def test_len(self):
- """ Python len() (under 32 bits) """
- dset = self.f.create_dataset(make_name(), (312, 15), dtype='f4')
- self.assertEqual(len(dset), 312)
- def test_len_big(self):
- """ Python len() vs Dataset.len() """
- dset = self.f.create_dataset(make_name(), (2 ** 33, 15), dtype='f4')
- self.assertEqual(dset.shape, (2 ** 33, 15))
- if sys.maxsize == 2 ** 31 - 1:
- with self.assertRaises(OverflowError):
- len(dset)
- else:
- self.assertEqual(len(dset), 2 ** 33)
- self.assertEqual(dset.len(), 2 ** 33)
- class TestIter(BaseDataset):
- """
- Feature: Iterating over a dataset yields rows
- """
- def test_iter(self):
- """ Iterating over a dataset yields rows """
- data = np.arange(30, dtype='f').reshape((10, 3))
- dset = self.f.create_dataset(make_name(), data=data)
- for x, y in zip(dset, data, strict=True):
- self.assertEqual(len(x), 3)
- self.assertArrayEqual(x, y)
- def test_iter_scalar(self):
- """ Iterating over scalar dataset raises TypeError """
- dset = self.f.create_dataset(make_name(), shape=(), dtype='f4')
- with self.assertRaises(TypeError):
- [x for x in dset]
- class TestStrings(BaseDataset):
- """
- Feature: Datasets created with vlen and fixed datatypes correctly
- translate to and from HDF5
- """
- def test_vlen_bytes(self):
- """ Vlen bytes dataset maps to vlen ascii in the file """
- dt = h5py.string_dtype(encoding='ascii')
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt)
- tid = ds.id.get_type()
- self.assertEqual(type(tid), h5py.h5t.TypeStringID)
- self.assertEqual(tid.get_cset(), h5py.h5t.CSET_ASCII)
- string_info = h5py.check_string_dtype(ds.dtype)
- self.assertEqual(string_info.encoding, 'ascii')
- def test_vlen_bytes_fillvalue(self):
- """ Vlen bytes dataset handles fillvalue """
- dt = h5py.string_dtype(encoding='ascii')
- fill_value = b'bar'
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt, fillvalue=fill_value)
- self.assertEqual(ds[0], fill_value)
- self.assertEqual(ds.asstr()[0], fill_value.decode())
- self.assertEqual(ds.fillvalue, fill_value)
- def test_vlen_unicode(self):
- """ Vlen unicode dataset maps to vlen utf-8 in the file """
- dt = h5py.string_dtype()
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt)
- tid = ds.id.get_type()
- self.assertEqual(type(tid), h5py.h5t.TypeStringID)
- self.assertEqual(tid.get_cset(), h5py.h5t.CSET_UTF8)
- string_info = h5py.check_string_dtype(ds.dtype)
- self.assertEqual(string_info.encoding, 'utf-8')
- def test_vlen_unicode_fillvalue(self):
- """ Vlen unicode dataset handles fillvalue """
- dt = h5py.string_dtype()
- fill_value = 'bár'
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt, fillvalue=fill_value)
- self.assertEqual(ds[0], fill_value.encode("utf-8"))
- self.assertEqual(ds.asstr()[0], fill_value)
- self.assertEqual(ds.fillvalue, fill_value.encode("utf-8"))
- def test_fixed_ascii(self):
- """ Fixed-length bytes dataset maps to fixed-length ascii in the file
- """
- dt = np.dtype("|S10")
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt)
- tid = ds.id.get_type()
- self.assertEqual(type(tid), h5py.h5t.TypeStringID)
- self.assertFalse(tid.is_variable_str())
- self.assertEqual(tid.get_size(), 10)
- self.assertEqual(tid.get_cset(), h5py.h5t.CSET_ASCII)
- string_info = h5py.check_string_dtype(ds.dtype)
- self.assertEqual(string_info.encoding, 'ascii')
- self.assertEqual(string_info.length, 10)
- def test_fixed_bytes_fillvalue(self):
- """ Vlen bytes dataset handles fillvalue """
- dt = h5py.string_dtype(encoding='ascii', length=10)
- fill_value = b'bar'
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt, fillvalue=fill_value)
- self.assertEqual(ds[0], fill_value)
- self.assertEqual(ds.asstr()[0], fill_value.decode())
- self.assertEqual(ds.fillvalue, fill_value)
- def test_fixed_utf8(self):
- dt = h5py.string_dtype(encoding='utf-8', length=5)
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt)
- tid = ds.id.get_type()
- self.assertEqual(tid.get_cset(), h5py.h5t.CSET_UTF8)
- s = 'cù'
- ds[0] = s.encode('utf-8')
- ds[1] = s
- ds[2:4] = [s, s]
- ds[4:6] = np.array([s, s], dtype=object)
- ds[6:8] = np.array([s.encode('utf-8')] * 2, dtype=dt)
- with self.assertRaises(TypeError):
- ds[8:10] = np.array([s, s], dtype='U')
- np.testing.assert_array_equal(ds[:8], np.array([s.encode('utf-8')] * 8, dtype='S'))
- def test_fixed_utf_8_fillvalue(self):
- """ Vlen unicode dataset handles fillvalue """
- dt = h5py.string_dtype(encoding='utf-8', length=10)
- fill_value = 'bár'.encode("utf-8")
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt, fillvalue=fill_value)
- self.assertEqual(ds[0], fill_value)
- self.assertEqual(ds.asstr()[0], fill_value.decode("utf-8"))
- self.assertEqual(ds.fillvalue, fill_value)
- def test_fixed_unicode(self):
- """ Fixed-length unicode datasets are unsupported (raise TypeError) """
- dt = np.dtype("|U10")
- with self.assertRaises(TypeError):
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt)
- def test_roundtrip_vlen_bytes(self):
- """ writing and reading to vlen bytes dataset preserves type and content
- """
- dt = h5py.string_dtype(encoding='ascii')
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt)
- data = b"Hello\xef"
- ds[0] = data
- out = ds[0]
- self.assertEqual(type(out), bytes)
- self.assertEqual(out, data)
- def test_roundtrip_fixed_bytes(self):
- """ Writing to and reading from fixed-length bytes dataset preserves
- type and content """
- dt = np.dtype("|S10")
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt)
- data = b"Hello\xef"
- ds[0] = data
- out = ds[0]
- self.assertEqual(type(out), np.bytes_)
- self.assertEqual(out, data)
- def test_retrieve_vlen_unicode(self):
- dt = h5py.string_dtype()
- ds = self.f.create_dataset(make_name(), (10,), dtype=dt)
- data = "fàilte"
- ds[0] = data
- self.assertIsInstance(ds[0], bytes)
- out = ds.asstr()[0]
- self.assertIsInstance(out, str)
- self.assertEqual(out, data)
- def test_asstr(self):
- ds = self.f.create_dataset(make_name(), (10,), dtype=h5py.string_dtype())
- data = "fàilte"
- ds[0] = data
- strwrap1 = ds.asstr('ascii')
- with self.assertRaises(UnicodeDecodeError):
- strwrap1[0]
- # Different errors parameter
- self.assertEqual(ds.asstr('ascii', 'ignore')[0], 'filte')
- # latin-1 will decode it but give the wrong text
- self.assertNotEqual(ds.asstr('latin-1')[0], data)
- # len of ds
- self.assertEqual(10, len(ds.asstr()))
- # Array output
- np.testing.assert_array_equal(
- ds.asstr()[:1], np.array([data], dtype=object)
- )
- np.testing.assert_array_equal(
- np.asarray(ds.asstr())[:1], np.array([data], dtype=object)
- )
- def test_asstr_fixed(self):
- dt = h5py.string_dtype(length=5)
- ds = self.f.create_dataset(make_name(), (10,), dtype=dt)
- data = 'cù'
- ds[0] = np.array(data.encode('utf-8'), dtype=dt)
- self.assertIsInstance(ds[0], np.bytes_)
- out = ds.asstr()[0]
- self.assertIsInstance(out, str)
- self.assertEqual(out, data)
- # Different errors parameter
- self.assertEqual(ds.asstr('ascii', 'ignore')[0], 'c')
- # latin-1 will decode it but give the wrong text
- self.assertNotEqual(ds.asstr('latin-1')[0], data)
- # Array output
- np.testing.assert_array_equal(
- ds.asstr()[:1], np.array([data], dtype=object)
- )
- def test_unicode_write_error(self):
- """Encoding error when writing a non-ASCII string to an ASCII vlen dataset"""
- dt = h5py.string_dtype('ascii')
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt)
- data = "fàilte"
- with self.assertRaises(UnicodeEncodeError):
- ds[0] = data
- def test_unicode_write_bytes(self):
- """ Writing valid utf-8 byte strings to a unicode vlen dataset is OK
- """
- dt = h5py.string_dtype()
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt)
- data = (u"Hello there" + chr(0x2034)).encode('utf8')
- ds[0] = data
- out = ds[0]
- self.assertEqual(type(out), bytes)
- self.assertEqual(out, data)
- def test_vlen_bytes_write_ascii_str(self):
- """ Writing an ascii str to ascii vlen dataset is OK
- """
- dt = h5py.string_dtype('ascii')
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt)
- data = "ASCII string"
- ds[0] = data
- out = ds[0]
- self.assertEqual(type(out), bytes)
- self.assertEqual(out, data.encode('ascii'))
- class TestCompound(BaseDataset):
- """
- Feature: Compound types correctly round-trip
- """
- def test_rt(self):
- """ Compound types are read back in correct order (issue 236)"""
- dt = np.dtype([ ('weight', np.float64),
- ('cputime', np.float64),
- ('walltime', np.float64),
- ('parents_offset', np.uint32),
- ('n_parents', np.uint32),
- ('status', np.uint8),
- ('endpoint_type', np.uint8), ])
- testdata = np.ndarray((16,), dtype=dt)
- for key in dt.fields:
- testdata[key] = np.random.random((16,)) * 100
- name = make_name()
- self.f[name] = testdata
- outdata = self.f[name][...]
- self.assertTrue(np.all(outdata == testdata))
- self.assertEqual(outdata.dtype, testdata.dtype)
- def test_assign(self):
- dt = np.dtype([ ('weight', (np.float64, 3)),
- ('endpoint_type', np.uint8), ])
- testdata = np.ndarray((16,), dtype=dt)
- for key in dt.fields:
- testdata[key] = np.random.random(size=testdata[key].shape) * 100
- name = make_name()
- ds = self.f.create_dataset(name, (16,), dtype=dt)
- for key in dt.fields:
- ds[key] = testdata[key]
- outdata = self.f[name][...]
- self.assertTrue(np.all(outdata == testdata))
- self.assertEqual(outdata.dtype, testdata.dtype)
- def test_fields(self):
- dt = np.dtype([
- ('x', np.float64),
- ('y', np.float64),
- ('z', np.float64),
- ])
- testdata = np.ndarray((16,), dtype=dt)
- for key in dt.fields:
- testdata[key] = np.random.random((16,)) * 100
- name = make_name()
- self.f[name] = testdata
- ds = self.f[name]
- # Extract multiple fields
- np.testing.assert_array_equal(
- ds.fields(['x', 'y'])[:], testdata[['x', 'y']]
- )
- # Extract single field
- np.testing.assert_array_equal(
- ds.fields('x')[:], testdata['x']
- )
- # Check __array__() method of fields wrapper
- np.testing.assert_array_equal(
- np.asarray(ds.fields(['x', 'y'])), testdata[['x', 'y']]
- )
- # Check type conversion of __array__() method
- dt_int = np.dtype([('x', np.int32)])
- np.testing.assert_array_equal(
- np.asarray(ds.fields(['x']), dtype=dt_int),
- testdata[['x']].astype(dt_int)
- )
- # Check len() on fields wrapper
- assert len(ds.fields('x')) == 16
- def test_nested_compound_vlen(self):
- dt_inner = np.dtype([('a', h5py.vlen_dtype(np.int32)),
- ('b', h5py.vlen_dtype(np.int32))])
- dt = np.dtype([('f1', h5py.vlen_dtype(dt_inner)),
- ('f2', np.int64)])
- inner1 = (np.array(range(1, 3), dtype=np.int32),
- np.array(range(6, 9), dtype=np.int32))
- inner2 = (np.array(range(10, 14), dtype=np.int32),
- np.array(range(16, 21), dtype=np.int32))
- data = np.array([(np.array([inner1, inner2], dtype=dt_inner), 2),
- (np.array([inner1], dtype=dt_inner), 3)],
- dtype=dt)
- name = make_name()
- self.f[name] = data
- out = self.f[name]
- # Specifying check_alignment=False because vlen fields have 8 bytes of padding
- # because the vlen datatype in hdf5 occupies 16 bytes
- self.assertArrayEqual(out, data, check_alignment=False)
- class TestSubarray(BaseDataset):
- def test_write_list(self):
- ds = self.f.create_dataset(make_name(), (1,), dtype="3int8")
- ds[0] = [1, 2, 3]
- np.testing.assert_array_equal(ds[:], [[1, 2, 3]])
- ds[:] = [[4, 5, 6]]
- np.testing.assert_array_equal(ds[:], [[4, 5, 6]])
- def test_write_array(self):
- ds = self.f.create_dataset(make_name(), (1,), dtype="3int8")
- ds[0] = np.array([1, 2, 3])
- np.testing.assert_array_equal(ds[:], [[1, 2, 3]])
- ds[:] = np.array([[4, 5, 6]])
- np.testing.assert_array_equal(ds[:], [[4, 5, 6]])
- class TestEnum(BaseDataset):
- """
- Feature: Enum datatype info is preserved, read/write as integer
- """
- EDICT = {'RED': 0, 'GREEN': 1, 'BLUE': 42}
- def test_create(self):
- """ Enum datasets can be created and type correctly round-trips """
- dt = h5py.enum_dtype(self.EDICT, basetype='i')
- ds = self.f.create_dataset(make_name(), (100, 100), dtype=dt)
- dt2 = ds.dtype
- dict2 = h5py.check_enum_dtype(dt2)
- self.assertEqual(dict2, self.EDICT)
- def test_readwrite(self):
- """ Enum datasets can be read/written as integers """
- dt = h5py.enum_dtype(self.EDICT, basetype='i4')
- ds = self.f.create_dataset(make_name(), (100, 100), dtype=dt)
- ds[35, 37] = 42
- ds[1, :] = 1
- self.assertEqual(ds[35, 37], 42)
- self.assertArrayEqual(ds[1, :], np.array((1,) * 100, dtype='i4'))
- class TestFloats(BaseDataset):
- """
- Test support for mini and extended-precision floats
- """
- def _exectest(self, dt):
- dset = self.f.create_dataset(make_name(), (100,), dtype=dt)
- self.assertEqual(dset.dtype, dt)
- data = np.ones((100,), dtype=dt)
- dset[...] = data
- self.assertArrayEqual(dset[...], data)
- @ut.skipUnless(hasattr(np, 'float16'), "NumPy float16 support required")
- def test_mini(self):
- """ Mini-floats round trip """
- self._exectest(np.dtype('float16'))
- # TODO: move these tests to test_h5t
- def test_mini_mapping(self):
- """ Test mapping for float16 """
- if hasattr(np, 'float16'):
- self.assertEqual(h5t.IEEE_F16LE.dtype, np.dtype('<f2'))
- else:
- self.assertEqual(h5t.IEEE_F16LE.dtype, np.dtype('<f4'))
- class TestTrackTimes(BaseDataset):
- """
- Feature: track_times
- """
- def test_disable_track_times(self):
- """ check that when track_times=False, the time stamp=0 (Jan 1, 1970) """
- ds = self.f.create_dataset(make_name(), (4,), track_times=False, dtype='f4')
- ds_mtime = h5py.h5g.get_objinfo(ds._id).mtime
- self.assertEqual(0, ds_mtime)
- def test_invalid_track_times(self):
- """ check that when give track_times an invalid value """
- with self.assertRaises(TypeError):
- self.f.create_dataset(make_name(), (4,), track_times='null', dtype='f4')
- class TestZeroShape(BaseDataset):
- """
- Features of datasets with (0,)-shape axes
- """
- def test_array_conversion(self):
- """ Empty datasets can be converted to NumPy arrays """
- ds = self.f.create_dataset(make_name("x"), 0, maxshape=None, dtype='f4')
- self.assertEqual(ds.shape, np.array(ds).shape)
- ds = self.f.create_dataset(make_name("y"), (0,), maxshape=(None,), dtype='f4')
- self.assertEqual(ds.shape, np.array(ds).shape)
- ds = self.f.create_dataset(make_name("z"), (0, 0), maxshape=(None, None), dtype='f4')
- self.assertEqual(ds.shape, np.array(ds).shape)
- def test_reading(self):
- """ Slicing into empty datasets works correctly """
- dt = [('a', 'f'), ('b', 'i')]
- ds = self.f.create_dataset(make_name(), (0,), dtype=dt, maxshape=(None,))
- arr = np.empty((0,), dtype=dt)
- self.assertEqual(ds[...].shape, arr.shape)
- self.assertEqual(ds[...].dtype, arr.dtype)
- self.assertEqual(ds[()].shape, arr.shape)
- self.assertEqual(ds[()].dtype, arr.dtype)
- class TestRegionRefs(BaseDataset):
- """
- Various features of region references
- """
- def setUp(self):
- BaseDataset.setUp(self)
- self.data = np.arange(100 * 100).reshape((100, 100))
- self.dset = self.f.create_dataset('x', data=self.data)
- self.dset[...] = self.data
- def test_create_ref(self):
- """ Region references can be used as slicing arguments """
- slic = np.s_[25:35, 10:100:5]
- ref = self.dset.regionref[slic]
- self.assertArrayEqual(self.dset[ref], self.data[slic])
- def test_empty_region(self):
- ref = self.dset.regionref[:0]
- out = self.dset[ref]
- assert out.size == 0
- # Ideally we should preserve shape (0, 100), but it seems this is lost.
- def test_scalar_dataset(self):
- ds = self.f.create_dataset(make_name(), data=1.0, dtype='f4')
- sid = h5py.h5s.create(h5py.h5s.SCALAR)
- # Deselected
- sid.select_none()
- ref = h5py.h5r.create(ds.id, b'.', h5py.h5r.DATASET_REGION, sid)
- assert ds[ref] == h5py.Empty(np.dtype('f4'))
- # Selected
- sid.select_all()
- ref = h5py.h5r.create(ds.id, b'.', h5py.h5r.DATASET_REGION, sid)
- assert ds[ref] == ds[()]
- def test_ref_shape(self):
- """ Region reference shape and selection shape """
- slic = np.s_[25:35, 10:100:5]
- ref = self.dset.regionref[slic]
- self.assertEqual(self.dset.regionref.shape(ref), self.dset.shape)
- self.assertEqual(self.dset.regionref.selection(ref), (10, 18))
- class TestAstype(BaseDataset):
- """.astype() wrapper & context manager
- """
- def test_astype_wrapper(self):
- dset = self.f.create_dataset(make_name(), (100,), dtype='i2')
- dset[...] = np.arange(100)
- arr = dset.astype('f4')[:]
- self.assertArrayEqual(arr, np.arange(100, dtype='f4'))
- def test_astype_wrapper_len(self):
- dset = self.f.create_dataset(make_name(), (100,), dtype='i2')
- dset[...] = np.arange(100)
- self.assertEqual(100, len(dset.astype('f4')))
- def test_astype_wrapper_asarray(self):
- dset = self.f.create_dataset(make_name(), (100,), dtype='i2')
- dset[...] = np.arange(100)
- arr = np.asarray(dset.astype('f4'), dtype='i2')
- self.assertArrayEqual(arr, np.arange(100, dtype='i2'))
- class TestScalarCompound(BaseDataset):
- """
- Retrieval of a single field from a scalar compound dataset should
- strip the field info
- """
- def test_scalar_compound(self):
- dt = np.dtype([('a', 'i')])
- dset = self.f.create_dataset(make_name(), (), dtype=dt)
- self.assertEqual(dset['a'].dtype, np.dtype('i'))
- class TestVlen(BaseDataset):
- def test_int(self):
- dt = h5py.vlen_dtype(int)
- ds = self.f.create_dataset(make_name(), (4,), dtype=dt)
- ds[0] = np.arange(3)
- ds[1] = np.arange(0)
- ds[2] = [1, 2, 3]
- ds[3] = np.arange(1)
- self.assertArrayEqual(ds[0], np.arange(3))
- self.assertArrayEqual(ds[1], np.arange(0))
- self.assertArrayEqual(ds[2], np.array([1, 2, 3]))
- self.assertArrayEqual(ds[1], np.arange(0))
- ds[0:2] = np.array([np.arange(5), np.arange(4)], dtype=object)
- self.assertArrayEqual(ds[0], np.arange(5))
- self.assertArrayEqual(ds[1], np.arange(4))
- ds[0:2] = np.array([np.arange(3), np.arange(3)])
- self.assertArrayEqual(ds[0], np.arange(3))
- self.assertArrayEqual(ds[1], np.arange(3))
- def test_reuse_from_other(self):
- dt = h5py.vlen_dtype(int)
- ds = self.f.create_dataset(make_name("x"), (1,), dtype=dt)
- self.f.create_dataset(make_name("y"), (1,), ds[()].dtype)
- def test_reuse_struct_from_other(self):
- dt = [('a', int), ('b', h5py.vlen_dtype(int))]
- fname = self.mktemp()
- with h5py.File(fname, 'w') as f:
- f.create_dataset("x", (1,), dtype=dt)
- with h5py.File(fname, 'a') as f:
- f.create_dataset("y", (1,), f["x"]['b'][()].dtype)
- def test_convert(self):
- dt = h5py.vlen_dtype(int)
- ds = self.f.create_dataset(make_name(), (3,), dtype=dt)
- ds[0] = np.array([1.4, 1.2])
- ds[1] = np.array([1.2])
- ds[2] = [1.2, 2, 3]
- self.assertArrayEqual(ds[0], np.array([1, 1]))
- self.assertArrayEqual(ds[1], np.array([1]))
- self.assertArrayEqual(ds[2], np.array([1, 2, 3]))
- ds[0:2] = np.array([[0.1, 1.1, 2.1, 3.1, 4], np.arange(4)], dtype=object)
- self.assertArrayEqual(ds[0], np.arange(5))
- self.assertArrayEqual(ds[1], np.arange(4))
- ds[0:2] = np.array([np.array([0.1, 1.2, 2.2]),
- np.array([0.2, 1.2, 2.2])])
- self.assertArrayEqual(ds[0], np.arange(3))
- self.assertArrayEqual(ds[1], np.arange(3))
- def test_multidim(self):
- dt = h5py.vlen_dtype(int)
- ds = self.f.create_dataset(make_name(), (2, 2), dtype=dt)
- ds[0, 0] = np.arange(1)
- ds[:, :] = np.array([[np.arange(3), np.arange(2)],
- [np.arange(1), np.arange(2)]], dtype=object)
- ds[:, :] = np.array([[np.arange(2), np.arange(2)],
- [np.arange(2), np.arange(2)]])
- def _help_float_testing(self, np_dt):
- """
- Helper for testing various vlen numpy data types.
- :param np_dt: Numpy datatype to test
- """
- dt = h5py.vlen_dtype(np_dt)
- ds = self.f.create_dataset(make_name(), (5,), dtype=dt)
- # Create some arrays, and assign them to the dataset
- array_0 = np.array([1., 2., 30.], dtype=np_dt)
- array_1 = np.array([100.3, 200.4, 98.1, -10.5, -300.0], dtype=np_dt)
- # Test that a numpy array of different type gets cast correctly
- array_2 = np.array([1, 2, 8], dtype=np.dtype('int32'))
- casted_array_2 = array_2.astype(np_dt)
- # Test that we can set a list of floats.
- list_3 = [1., 2., 900., 0., -0.5]
- list_array_3 = np.array(list_3, dtype=np_dt)
- # Test that a list of integers gets casted correctly
- list_4 = [-1, -100, 0, 1, 9999, 70]
- list_array_4 = np.array(list_4, dtype=np_dt)
- ds[0] = array_0
- ds[1] = array_1
- ds[2] = array_2
- ds[3] = list_3
- ds[4] = list_4
- self.assertArrayEqual(array_0, ds[0])
- self.assertArrayEqual(array_1, ds[1])
- self.assertArrayEqual(casted_array_2, ds[2])
- self.assertArrayEqual(list_array_3, ds[3])
- self.assertArrayEqual(list_array_4, ds[4])
- # Test that we can reassign arrays in the dataset
- list_array_3 = np.array([0.3, 2.2], dtype=np_dt)
- ds[0] = list_array_3[:]
- self.assertArrayEqual(list_array_3, ds[0])
- # Make sure we can close the file.
- self.f.flush()
- if is_main_thread():
- self.f.close()
- def test_numpy_float16(self):
- np_dt = np.dtype('float16')
- self._help_float_testing(np_dt)
- def test_numpy_float32(self):
- np_dt = np.dtype('float32')
- self._help_float_testing(np_dt)
- def test_numpy_float64_from_dtype(self):
- np_dt = np.dtype('float64')
- self._help_float_testing(np_dt)
- def test_numpy_float64_2(self):
- np_dt = np.float64
- self._help_float_testing(np_dt)
- def test_non_contiguous_arrays_bool(self):
- """Test that non-contiguous arrays are stored correctly"""
- name = make_name()
- self.f.create_dataset(name, (10,), dtype=h5py.vlen_dtype('bool'))
- ds = self.f[name]
- x = np.array([True, False, True, True, False, False, False])
- ds[0] = x[::2]
- assert all(ds[0] == x[::2]), f"{ds[0]} != {x[::2]}"
- def test_non_contiguous_arrays_int(self):
- name = make_name()
- self.f.create_dataset(name, (10,), dtype=h5py.vlen_dtype('int8'))
- ds = self.f[name]
- y = np.array([2, 4, 1, 5, -1, 3, 7])
- ds[0] = y[::2]
- assert all(ds[0] == y[::2]), f"{ds[0]} != {y[::2]}"
- def test_asstr_array_dtype(self):
- dt = h5py.string_dtype(encoding='ascii')
- fill_value = b'bar'
- ds = self.f.create_dataset(make_name(), (100,), dtype=dt, fillvalue=fill_value)
- with pytest.raises(ValueError):
- np.array(ds.asstr(), dtype=int)
- class TestLowOpen(BaseDataset):
- def test_get_access_list(self):
- """ Test H5Dget_access_plist """
- ds = self.f.create_dataset(make_name(), (4,), dtype='f4')
- p_list = ds.id.get_access_plist()
- def test_dapl(self):
- """ Test the dapl keyword to h5d.open """
- name = make_name()
- dapl = h5py.h5p.create(h5py.h5p.DATASET_ACCESS)
- dset = self.f.create_dataset(name, (100,), dtype='f4')
- del dset
- dsid = h5py.h5d.open(self.f.id, name.encode('utf-8'), dapl)
- self.assertIsInstance(dsid, h5py.h5d.DatasetID)
- def test_get_chunk_details():
- from io import BytesIO
- buf = BytesIO()
- name = make_name()
- with h5py.File(buf, 'w') as fout:
- fout.create_dataset(name, shape=(100, 100), chunks=(10, 10), dtype='i4')
- fout[name][:] = 1
- buf.seek(0)
- with h5py.File(buf, 'r') as fin:
- ds = fin[name].id
- assert ds.get_num_chunks() == 100
- for j in range(100):
- offset = tuple(np.array(np.unravel_index(j, (10, 10))) * 10)
- si = ds.get_chunk_info(j)
- assert si.chunk_offset == offset
- assert si.filter_mask == 0
- assert si.byte_offset is not None
- assert si.size > 0
- si = ds.get_chunk_info_by_coord((0, 0))
- assert si.chunk_offset == (0, 0)
- assert si.filter_mask == 0
- assert si.byte_offset is not None
- assert si.size > 0
- @ut.skipUnless(h5py.version.hdf5_version_tuple >= (1, 12, 3) or
- (h5py.version.hdf5_version_tuple >= (1, 10, 10) and h5py.version.hdf5_version_tuple < (1, 10, 99)),
- "chunk iteration requires HDF5 1.10.10 and later 1.10, or 1.12.3 and later")
- def test_chunk_iter():
- """H5Dchunk_iter() for chunk information"""
- from io import BytesIO
- buf = BytesIO()
- name = make_name()
- with h5py.File(buf, 'w') as f:
- f.create_dataset(name, shape=(100, 100), chunks=(10, 10), dtype='i4')
- f[name][:] = 1
- buf.seek(0)
- with h5py.File(buf, 'r') as f:
- dsid = f[name].id
- num_chunks = dsid.get_num_chunks()
- assert num_chunks == 100
- ci = {}
- for j in range(num_chunks):
- si = dsid.get_chunk_info(j)
- ci[si.chunk_offset] = si
- def callback(chunk_info):
- known = ci[chunk_info.chunk_offset]
- assert chunk_info.chunk_offset == known.chunk_offset
- assert chunk_info.filter_mask == known.filter_mask
- assert chunk_info.byte_offset == known.byte_offset
- assert chunk_info.size == known.size
- dsid.chunk_iter(callback)
- def test_empty_shape(writable_file):
- ds = writable_file.create_dataset(make_name(), dtype='int32')
- assert ds.shape is None
- assert ds.maxshape is None
- def test_zero_storage_size():
- # https://github.com/h5py/h5py/issues/1475
- from io import BytesIO
- buf = BytesIO()
- with h5py.File(buf, 'w') as fout:
- fout.create_dataset('empty', dtype='uint8')
- buf.seek(0)
- with h5py.File(buf, 'r') as fin:
- assert fin['empty'].chunks is None
- assert fin['empty'].id.get_offset() is None
- assert fin['empty'].id.get_storage_size() == 0
- def test_python_int_uint64(writable_file):
- # https://github.com/h5py/h5py/issues/1547
- data = [np.iinfo(np.int64).max, np.iinfo(np.int64).max + 1]
- # Check creating a new dataset
- ds = writable_file.create_dataset(make_name(), data=data, dtype=np.uint64)
- assert ds.dtype == np.dtype(np.uint64)
- np.testing.assert_array_equal(ds[:], np.array(data, dtype=np.uint64))
- # Check writing to an existing dataset
- ds[:] = data
- np.testing.assert_array_equal(ds[:], np.array(data, dtype=np.uint64))
- def test_setitem_fancy_indexing(writable_file):
- # https://github.com/h5py/h5py/issues/1593
- arr = writable_file.create_dataset(make_name(), (5, 1000, 2), dtype=np.uint8)
- block = np.random.randint(255, size=(5, 3, 2))
- arr[:, [0, 2, 4], ...] = block
- def test_vlen_spacepad():
- with File(get_data_file_path("vlen_string_dset.h5")) as f:
- assert f["DS1"][0] == b"Parting"
- def test_vlen_nullterm():
- with File(get_data_file_path("vlen_string_dset_utc.h5")) as f:
- assert f["ds1"][0] == b"2009-12-20T10:16:18.662409Z"
- def test_allow_unknown_filter(writable_file):
- # apparently 256-511 are reserved for testing purposes
- fake_filter_id = 256
- ds = writable_file.create_dataset(
- make_name(), shape=(10, 10), dtype=np.uint8, compression=fake_filter_id,
- allow_unknown_filter=True
- )
- assert str(fake_filter_id) in ds._filters
- assert ds.compression == 'unknown'
- def test_dset_chunk_cache():
- """Chunk cache configuration for individual datasets."""
- from io import BytesIO
- buf = BytesIO()
- name = make_name()
- with h5py.File(buf, 'w') as fout:
- ds = fout.create_dataset(
- name, shape=(10, 20), chunks=(5, 4), dtype='i4',
- rdcc_nbytes=2 * 1024 * 1024, rdcc_w0=0.2, rdcc_nslots=997)
- ds_chunk_cache = ds.id.get_access_plist().get_chunk_cache()
- assert fout.id.get_access_plist().get_cache()[1:] != ds_chunk_cache
- assert ds_chunk_cache == (997, 2 * 1024 * 1024, 0.2)
- buf.seek(0)
- with h5py.File(buf, 'r') as fin:
- ds = fin.require_dataset(
- name, shape=(10, 20), dtype='i4',
- rdcc_nbytes=3 * 1024 * 1024, rdcc_w0=0.67, rdcc_nslots=709)
- ds_chunk_cache = ds.id.get_access_plist().get_chunk_cache()
- assert fin.id.get_access_plist().get_cache()[1:] != ds_chunk_cache
- assert ds_chunk_cache == (709, 3 * 1024 * 1024, 0.67)
- class TestCommutative(BaseDataset):
- """
- Test the symmetry of operators, at least with the numpy types.
- Issue: https://github.com/h5py/h5py/issues/1947
- """
- def test_numpy_commutative(self,):
- """
- Create a h5py dataset, extract one element convert to numpy
- Check that it returns symmetric response to == and !=
- """
- shape = (100,1)
- dset = self.f.create_dataset(make_name(), shape, dtype=float,
- data=np.random.rand(*shape))
- # grab a value from the elements, ie dset[0, 0]
- # check that mask arrays are commutative wrt ==, !=
- val = np.float64(dset[0, 0])
- assert np.all((val == dset) == (dset == val))
- assert np.all((val != dset) == (dset != val))
- # generate sample not in the dset, ie max(dset)+delta
- # check that mask arrays are commutative wrt ==, !=
- delta = 0.001
- nval = np.nanmax(dset)+delta
- assert np.all((nval == dset) == (dset == nval))
- assert np.all((nval != dset) == (dset != nval))
- def test_basetype_commutative(self,):
- """
- Create a h5py dataset and check basetype compatibility.
- Check that operation is symmetric, even if it is potentially
- not meaningful.
- """
- shape = (100,1)
- dset = self.f.create_dataset(make_name(), shape, dtype=float,
- data=np.random.rand(*shape))
- # generate float type, sample float(0.)
- # check that operation is symmetric (but potentially meaningless)
- val = float(0.)
- assert (val == dset) == (dset == val)
- assert (val != dset) == (dset != val)
- class TestVirtualPrefix(BaseDataset):
- """
- Test setting virtual prefix
- """
- def test_virtual_prefix_create(self):
- shape = (100,1)
- virtual_prefix = "/path/to/virtual"
- dset = self.f.create_dataset(make_name(), shape, dtype=float,
- data=np.random.rand(*shape),
- virtual_prefix = virtual_prefix)
- virtual_prefix_readback = pathlib.Path(dset.id.get_access_plist().get_virtual_prefix().decode()).as_posix()
- assert virtual_prefix_readback == virtual_prefix
- def test_virtual_prefix_require(self):
- virtual_prefix = "/path/to/virtual"
- dset = self.f.require_dataset(make_name(), (10, 3), 'f', virtual_prefix = virtual_prefix)
- virtual_prefix_readback = pathlib.Path(dset.id.get_access_plist().get_virtual_prefix().decode()).as_posix()
- self.assertEqual(virtual_prefix, virtual_prefix_readback)
- self.assertIsInstance(dset, Dataset)
- self.assertEqual(dset.shape, (10, 3))
- def ds_str(file, shape=(10, )):
- dt = h5py.string_dtype(encoding='ascii')
- fill_value = b'fill'
- return file.create_dataset(make_name(), shape, dtype=dt, fillvalue=fill_value)
- def ds_fields(file, shape=(10, )):
- dt = np.dtype([
- ('foo', h5py.string_dtype(encoding='ascii')),
- ('bar', np.float64),
- ])
- fill_value = np.asarray(('fill', 0.0), dtype=dt)
- name = make_name()
- file[name] = np.broadcast_to(fill_value, shape)
- return file[name]
- view_getters = pytest.mark.parametrize(
- "view_getter,make_ds",
- [
- (lambda ds: ds, ds_str),
- (lambda ds: ds.astype(dtype=object), ds_str),
- (lambda ds: ds.asstr(), ds_str),
- (lambda ds: ds.fields("foo"), ds_fields),
- ],
- ids=["ds", "astype", "asstr", "fields"],
- )
- COPY_IF_NEEDED = False if NUMPY_RELEASE_VERSION < (2, 0) else None
- @pytest.mark.parametrize("copy", [True, COPY_IF_NEEDED])
- @view_getters
- def test_array_copy(view_getter, make_ds, copy, writable_file):
- ds = make_ds(writable_file)
- view = view_getter(ds)
- np.array(view, copy=copy)
- @pytest.mark.skipif(
- NUMPY_RELEASE_VERSION < (2, 0),
- reason="forbidding copies requires numpy 2",
- )
- @view_getters
- def test_array_copy_false(view_getter, make_ds, writable_file):
- ds = make_ds(writable_file)
- view = view_getter(ds)
- with pytest.raises(ValueError, match="memory allocation cannot be avoided"):
- np.array(view, copy=False)
- @view_getters
- def test_array_dtype(view_getter, make_ds, writable_file):
- ds = make_ds(writable_file)
- view = view_getter(ds)
- assert np.array(view, dtype='|S10').dtype == np.dtype('|S10')
- @view_getters
- def test_array_scalar(view_getter, make_ds, writable_file):
- ds = make_ds(writable_file, shape=())
- view = view_getter(ds)
- assert isinstance(view[()], (bytes, str))
- assert np.array(view).shape == ()
- @view_getters
- def test_array_nd(view_getter, make_ds, writable_file):
- ds = make_ds(writable_file, shape=(5, 6))
- view = view_getter(ds)
- assert np.array(view).shape == (5, 6)
- @view_getters
- def test_view_properties(view_getter, make_ds, writable_file):
- ds = make_ds(writable_file, shape=(5, 6))
- view = view_getter(ds)
- assert view.dtype == np.dtype(object)
- assert view.ndim == 2
- assert view.shape == (5, 6)
- assert view.size == 30
- assert len(view) == 5
- @pytest.mark.thread_unsafe(reason="spawns thread pool itself")
- def test_concurrent_dataset_creation(writable_file):
- N_THREADS = 25
- N_DATASETS_PER_THREAD = 5
- # Defines a thread barrier that will be spawned before parallel execution
- # this increases the probability of concurrent access clashes.
- barrier = threading.Barrier(N_THREADS)
- def closure(ithread):
- # Ensure that all threads reach this point before concurrent execution.
- barrier.wait()
- for j in range(N_DATASETS_PER_THREAD):
- writable_file.create_dataset(f'concurrent_{ithread:02d}_{j:02d}', (1000,), dtype='i4')
- with ThreadPoolExecutor(max_workers=N_THREADS) as executor:
- futures = [executor.submit(closure, ithread) for ithread in range(N_THREADS)]
- [f.result() for f in futures]
- expected = set(f'concurrent_{i:02d}_{j:02d}' for i in range(N_THREADS) for j in range(N_DATASETS_PER_THREAD))
- assert set(writable_file) == expected
- def test_filter_properties(writable_file):
- name = make_name()
- ds = writable_file.create_dataset(
- name, shape=1000, dtype=np.float32,
- fletcher32=True, shuffle=True, compression='lzf'
- )
- assert ds.filter_ids == (
- h5py.h5z.FILTER_SHUFFLE, h5py.h5z.FILTER_LZF, h5py.h5z.FILTER_FLETCHER32
- )
- assert ds.filter_names == ('shuffle', 'lzf', 'fletcher32')
- def test_store_refs(writable_file):
- ds1 = writable_file.create_dataset(make_name("foo"), data=np.arange(12))
- refs_ds = writable_file.create_dataset(make_name("refs"), data=[writable_file.ref, ds1.ref])
- assert isinstance(refs_ds[0], h5py.Reference)
- assert writable_file[refs_ds[0]] == writable_file
- assert isinstance(refs_ds[1], h5py.Reference)
- assert writable_file[refs_ds[1]] == ds1
- # Single reference
- ref_scalar_ds = writable_file.create_dataset(make_name("ref_scalar"), data=ds1.ref)
- assert isinstance(ref_scalar_ds[()], h5py.h5r.Reference)
- assert writable_file[ref_scalar_ds[()]] == ds1
- def test_store_regionrefs(writable_file):
- ds1 = writable_file.create_dataset(make_name("foo"), data=np.arange(12))
- regionrefs_ds = writable_file.create_dataset(make_name("regrefs"), data=[
- ds1.regionref[:-1], ds1.regionref[1:]
- ])
- assert isinstance(regionrefs_ds[0], h5py.RegionReference)
- np.testing.assert_array_equal(ds1[regionrefs_ds[0]], np.arange(11))
- np.testing.assert_array_equal(ds1[regionrefs_ds[1]], np.arange(1, 12))
- refs_ds = writable_file.create_dataset(make_name("refs"), shape=(1,), dtype=h5py.ref_dtype)
- with pytest.raises(TypeError, match="convert"):
- refs_ds[0] = ds1.regionref[:6]
|