| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- # This file is part of h5py, a Python interface to the HDF5 library.
- #
- # http://www.h5py.org
- #
- # Copyright 2008-2013 Andrew Collette and contributors
- #
- # License: Standard 3-clause BSD; see "license.txt" for full license terms
- # and contributor agreement.
- """
- Tests the h5py.File object.
- """
- import h5py
- from h5py._hl.files import _drivers
- from h5py import File
- from .common import ut, TestCase, make_name
- import pytest
- import io
- import tempfile
- import os
- def nfiles():
- return h5py.h5f.get_obj_count(h5py.h5f.OBJ_ALL, h5py.h5f.OBJ_FILE)
- def ngroups():
- return h5py.h5f.get_obj_count(h5py.h5f.OBJ_ALL, h5py.h5f.OBJ_GROUP)
- class TestDealloc(TestCase):
- """
- Behavior on object deallocation. Note most of this behavior is
- delegated to FileID.
- """
- @pytest.mark.thread_unsafe(reason="global object counters")
- def test_autoclose(self):
- """ File objects close automatically when out of scope, but
- other objects remain open. """
- start_nfiles = nfiles()
- start_ngroups = ngroups()
- fname = self.mktemp()
- f = h5py.File(fname, 'w')
- g = f['/']
- self.assertEqual(nfiles(), start_nfiles+1)
- self.assertEqual(ngroups(), start_ngroups+1)
- del f
- self.assertTrue(g)
- self.assertEqual(nfiles(), start_nfiles)
- self.assertEqual(ngroups(), start_ngroups+1)
- f = g.file
- self.assertTrue(f)
- self.assertEqual(nfiles(), start_nfiles+1)
- self.assertEqual(ngroups(), start_ngroups+1)
- del g
- self.assertEqual(nfiles(), start_nfiles+1)
- self.assertEqual(ngroups(), start_ngroups)
- del f
- self.assertEqual(nfiles(), start_nfiles)
- self.assertEqual(ngroups(), start_ngroups)
- class TestDriverRegistration(TestCase):
- def test_register_driver(self):
- called_with = [None]
- def set_fapl(plist, *args, **kwargs):
- called_with[0] = args, kwargs
- return _drivers['sec2'](plist)
- name = make_name()
- h5py.register_driver(name, set_fapl)
- self.assertIn(name, h5py.registered_drivers())
- fname = self.mktemp()
- h5py.File(fname, driver=name, driver_arg_0=0, driver_arg_1=1,
- mode='w')
- self.assertEqual(
- called_with,
- [((), {'driver_arg_0': 0, 'driver_arg_1': 1})],
- )
- def test_unregister_driver(self):
- name = make_name()
- h5py.register_driver(name, lambda plist: None)
- self.assertIn(name, h5py.registered_drivers())
- h5py.unregister_driver(name)
- self.assertNotIn(name, h5py.registered_drivers())
- with self.assertRaises(ValueError) as e:
- fname = self.mktemp()
- h5py.File(fname, driver=name, mode='w')
- self.assertEqual(str(e.exception), f"Unknown driver type '{name}'")
- class TestCache(TestCase):
- def setUp(self):
- MiB = 1024 * 1024
- if h5py.version.hdf5_version_tuple < (2, 0, 0):
- self.dflt_chunk_cache = MiB
- self.dflt_chunk_nslots = 521
- else:
- self.dflt_chunk_cache = 8 * MiB
- self.dflt_chunk_nslots = 8191
- def test_defaults(self):
- fname = self.mktemp()
- f = h5py.File(fname, 'w')
- self.assertEqual(list(f.id.get_access_plist().get_cache()),
- [0, self.dflt_chunk_nslots, self.dflt_chunk_cache, 0.75])
- def test_nbytes(self):
- fname = self.mktemp()
- f = h5py.File(fname, 'w', rdcc_nbytes=1024)
- self.assertEqual(list(f.id.get_access_plist().get_cache()),
- [0, self.dflt_chunk_nslots, 1024, 0.75])
- def test_nslots(self):
- fname = self.mktemp()
- f = h5py.File(fname, 'w', rdcc_nslots=125)
- self.assertEqual(list(f.id.get_access_plist().get_cache()),
- [0, 125, self.dflt_chunk_cache, 0.75])
- def test_w0(self):
- fname = self.mktemp()
- f = h5py.File(fname, 'w', rdcc_w0=0.25)
- self.assertEqual(list(f.id.get_access_plist().get_cache()),
- [0, self.dflt_chunk_nslots, self.dflt_chunk_cache, 0.25])
- class TestFileObj(TestCase):
- def check_write(self, fileobj):
- f = h5py.File(fileobj, 'w')
- self.assertEqual(f.driver, 'fileobj')
- self.assertEqual(f.filename, repr(fileobj))
- f.create_dataset('test', data=list(range(12)))
- self.assertEqual(list(f), ['test'])
- self.assertEqual(list(f['test'][:]), list(range(12)))
- f.close()
- def check_read(self, fileobj):
- f = h5py.File(fileobj, 'r')
- self.assertEqual(list(f), ['test'])
- self.assertEqual(list(f['test'][:]), list(range(12)))
- self.assertRaises(Exception, f.create_dataset, 'another.test', data=list(range(3)))
- f.close()
- def test_BytesIO(self):
- with io.BytesIO() as fileobj:
- self.assertEqual(len(fileobj.getvalue()), 0)
- self.check_write(fileobj)
- self.assertGreater(len(fileobj.getvalue()), 0)
- self.check_read(fileobj)
- def test_file(self):
- fname = self.mktemp()
- try:
- with open(fname, 'wb+') as fileobj:
- self.assertEqual(os.path.getsize(fname), 0)
- self.check_write(fileobj)
- self.assertGreater(os.path.getsize(fname), 0)
- self.check_read(fileobj)
- with open(fname, 'rb') as fileobj:
- self.check_read(fileobj)
- finally:
- os.remove(fname)
- @pytest.mark.filterwarnings(
- # at least on Windows and MacOS, a resource warning may be emitted
- # when this test returns
- "ignore::ResourceWarning"
- )
- def test_TemporaryFile(self):
- # in this test, we check explicitly that temp file gets
- # automatically deleted upon h5py.File.close()...
- fileobj = tempfile.NamedTemporaryFile()
- fname = fileobj.name
- f = h5py.File(fileobj, 'w')
- del fileobj
- # ... but in your code feel free to simply
- # f = h5py.File(tempfile.TemporaryFile())
- f.create_dataset('test', data=list(range(12)))
- self.assertEqual(list(f), ['test'])
- self.assertEqual(list(f['test'][:]), list(range(12)))
- self.assertTrue(os.path.isfile(fname))
- f.close()
- self.assertFalse(os.path.isfile(fname))
- def test_exception_open(self):
- self.assertRaises(Exception, h5py.File, None,
- driver='fileobj', mode='x')
- self.assertRaises(Exception, h5py.File, 'rogue',
- driver='fileobj', mode='x')
- self.assertRaises(Exception, h5py.File, self,
- driver='fileobj', mode='x')
- def test_exception_read(self):
- class BrokenBytesIO(io.BytesIO):
- def readinto(self, b):
- raise Exception('I am broken')
- f = h5py.File(BrokenBytesIO(), 'w')
- f.create_dataset('test', data=list(range(12)))
- self.assertRaises(Exception, list, f['test'])
- def test_exception_write(self):
- class BrokenBytesIO(io.BytesIO):
- allow_write = False
- def write(self, b):
- if self.allow_write:
- return super().write(b)
- else:
- raise Exception('I am broken')
- bio = BrokenBytesIO()
- f = h5py.File(bio, 'w')
- try:
- self.assertRaises(Exception, f.create_dataset, 'test',
- data=list(range(12)))
- finally:
- # Un-break writing so we can close: errors while closing get messy.
- bio.allow_write = True
- f.close()
- @ut.skip("Incompletely closed files can cause segfaults")
- def test_exception_close(self):
- fileobj = io.BytesIO()
- f = h5py.File(fileobj, 'w')
- fileobj.close()
- self.assertRaises(Exception, f.close)
- def test_exception_writeonly(self):
- # HDF5 expects read & write access to a file it's writing;
- # check that we get the correct exception on a write-only file object.
- fileobj = open(os.path.join(self.tempdir, make_name("foo{}.h5")), 'wb')
- f = h5py.File(fileobj, 'w')
- group = f.create_group("group")
- with self.assertRaises(io.UnsupportedOperation):
- group.create_dataset("data", data='foo', dtype=h5py.string_dtype())
- f.close()
- fileobj.close()
- def test_method_vanish(self):
- fileobj = io.BytesIO()
- f = h5py.File(fileobj, 'w')
- f.create_dataset('test', data=list(range(12)))
- self.assertEqual(list(f['test'][:]), list(range(12)))
- fileobj.readinto = None
- self.assertRaises(Exception, list, f['test'])
- class TestTrackOrder(TestCase):
- def populate(self, f):
- for i in range(100):
- # Mix group and dataset creation.
- if i % 10 == 0:
- f.create_group(str(i))
- else:
- f[str(i)] = [i]
- def test_track_order(self):
- fname = self.mktemp()
- f = h5py.File(fname, 'w', track_order=True) # creation order
- self.populate(f)
- self.assertEqual(list(f), [str(i) for i in range(100)])
- f.close()
- # Check order tracking after reopening the file
- f2 = h5py.File(fname)
- self.assertEqual(list(f2), [str(i) for i in range(100)])
- def test_no_track_order(self):
- fname = self.mktemp()
- f = h5py.File(fname, 'w', track_order=False) # name alphanumeric
- self.populate(f)
- self.assertEqual(list(f),
- sorted([str(i) for i in range(100)]))
- class TestFileMetaBlockSize(TestCase):
- """
- Feature: The meta block size can be manipulated, changing how metadata
- is aggregated and the offset of the first dataset.
- """
- def test_file_create_with_meta_block_size_4096(self):
- # Test a large meta block size of 4 kibibytes
- meta_block_size = 4096
- with File(
- self.mktemp(), 'w',
- meta_block_size=meta_block_size,
- libver="latest"
- ) as f:
- f["test"] = 5
- self.assertEqual(f.meta_block_size, meta_block_size)
- # Equality is expected for HDF5 1.10
- self.assertGreaterEqual(f["test"].id.get_offset(), meta_block_size)
- def test_file_create_with_meta_block_size_512(self):
- # Test a small meta block size of 512 bytes
- # The smallest verifiable meta_block_size is 463
- meta_block_size = 512
- libver = "latest"
- with File(
- self.mktemp(), 'w',
- meta_block_size=meta_block_size,
- libver=libver
- ) as f:
- f["test"] = 3
- self.assertEqual(f.meta_block_size, meta_block_size)
- # Equality is expected for HDF5 1.10
- self.assertGreaterEqual(f["test"].id.get_offset(), meta_block_size)
- # Default meta_block_size is 2048. This should fail if meta_block_size is not set.
- self.assertLess(f["test"].id.get_offset(), meta_block_size*2)
|