test_file2.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. # This file is part of h5py, a Python interface to the HDF5 library.
  2. #
  3. # http://www.h5py.org
  4. #
  5. # Copyright 2008-2013 Andrew Collette and contributors
  6. #
  7. # License: Standard 3-clause BSD; see "license.txt" for full license terms
  8. # and contributor agreement.
  9. """
  10. Tests the h5py.File object.
  11. """
  12. import h5py
  13. from h5py._hl.files import _drivers
  14. from h5py import File
  15. from .common import ut, TestCase, make_name
  16. import pytest
  17. import io
  18. import tempfile
  19. import os
  20. def nfiles():
  21. return h5py.h5f.get_obj_count(h5py.h5f.OBJ_ALL, h5py.h5f.OBJ_FILE)
  22. def ngroups():
  23. return h5py.h5f.get_obj_count(h5py.h5f.OBJ_ALL, h5py.h5f.OBJ_GROUP)
  24. class TestDealloc(TestCase):
  25. """
  26. Behavior on object deallocation. Note most of this behavior is
  27. delegated to FileID.
  28. """
  29. @pytest.mark.thread_unsafe(reason="global object counters")
  30. def test_autoclose(self):
  31. """ File objects close automatically when out of scope, but
  32. other objects remain open. """
  33. start_nfiles = nfiles()
  34. start_ngroups = ngroups()
  35. fname = self.mktemp()
  36. f = h5py.File(fname, 'w')
  37. g = f['/']
  38. self.assertEqual(nfiles(), start_nfiles+1)
  39. self.assertEqual(ngroups(), start_ngroups+1)
  40. del f
  41. self.assertTrue(g)
  42. self.assertEqual(nfiles(), start_nfiles)
  43. self.assertEqual(ngroups(), start_ngroups+1)
  44. f = g.file
  45. self.assertTrue(f)
  46. self.assertEqual(nfiles(), start_nfiles+1)
  47. self.assertEqual(ngroups(), start_ngroups+1)
  48. del g
  49. self.assertEqual(nfiles(), start_nfiles+1)
  50. self.assertEqual(ngroups(), start_ngroups)
  51. del f
  52. self.assertEqual(nfiles(), start_nfiles)
  53. self.assertEqual(ngroups(), start_ngroups)
  54. class TestDriverRegistration(TestCase):
  55. def test_register_driver(self):
  56. called_with = [None]
  57. def set_fapl(plist, *args, **kwargs):
  58. called_with[0] = args, kwargs
  59. return _drivers['sec2'](plist)
  60. name = make_name()
  61. h5py.register_driver(name, set_fapl)
  62. self.assertIn(name, h5py.registered_drivers())
  63. fname = self.mktemp()
  64. h5py.File(fname, driver=name, driver_arg_0=0, driver_arg_1=1,
  65. mode='w')
  66. self.assertEqual(
  67. called_with,
  68. [((), {'driver_arg_0': 0, 'driver_arg_1': 1})],
  69. )
  70. def test_unregister_driver(self):
  71. name = make_name()
  72. h5py.register_driver(name, lambda plist: None)
  73. self.assertIn(name, h5py.registered_drivers())
  74. h5py.unregister_driver(name)
  75. self.assertNotIn(name, h5py.registered_drivers())
  76. with self.assertRaises(ValueError) as e:
  77. fname = self.mktemp()
  78. h5py.File(fname, driver=name, mode='w')
  79. self.assertEqual(str(e.exception), f"Unknown driver type '{name}'")
  80. class TestCache(TestCase):
  81. def setUp(self):
  82. MiB = 1024 * 1024
  83. if h5py.version.hdf5_version_tuple < (2, 0, 0):
  84. self.dflt_chunk_cache = MiB
  85. self.dflt_chunk_nslots = 521
  86. else:
  87. self.dflt_chunk_cache = 8 * MiB
  88. self.dflt_chunk_nslots = 8191
  89. def test_defaults(self):
  90. fname = self.mktemp()
  91. f = h5py.File(fname, 'w')
  92. self.assertEqual(list(f.id.get_access_plist().get_cache()),
  93. [0, self.dflt_chunk_nslots, self.dflt_chunk_cache, 0.75])
  94. def test_nbytes(self):
  95. fname = self.mktemp()
  96. f = h5py.File(fname, 'w', rdcc_nbytes=1024)
  97. self.assertEqual(list(f.id.get_access_plist().get_cache()),
  98. [0, self.dflt_chunk_nslots, 1024, 0.75])
  99. def test_nslots(self):
  100. fname = self.mktemp()
  101. f = h5py.File(fname, 'w', rdcc_nslots=125)
  102. self.assertEqual(list(f.id.get_access_plist().get_cache()),
  103. [0, 125, self.dflt_chunk_cache, 0.75])
  104. def test_w0(self):
  105. fname = self.mktemp()
  106. f = h5py.File(fname, 'w', rdcc_w0=0.25)
  107. self.assertEqual(list(f.id.get_access_plist().get_cache()),
  108. [0, self.dflt_chunk_nslots, self.dflt_chunk_cache, 0.25])
  109. class TestFileObj(TestCase):
  110. def check_write(self, fileobj):
  111. f = h5py.File(fileobj, 'w')
  112. self.assertEqual(f.driver, 'fileobj')
  113. self.assertEqual(f.filename, repr(fileobj))
  114. f.create_dataset('test', data=list(range(12)))
  115. self.assertEqual(list(f), ['test'])
  116. self.assertEqual(list(f['test'][:]), list(range(12)))
  117. f.close()
  118. def check_read(self, fileobj):
  119. f = h5py.File(fileobj, 'r')
  120. self.assertEqual(list(f), ['test'])
  121. self.assertEqual(list(f['test'][:]), list(range(12)))
  122. self.assertRaises(Exception, f.create_dataset, 'another.test', data=list(range(3)))
  123. f.close()
  124. def test_BytesIO(self):
  125. with io.BytesIO() as fileobj:
  126. self.assertEqual(len(fileobj.getvalue()), 0)
  127. self.check_write(fileobj)
  128. self.assertGreater(len(fileobj.getvalue()), 0)
  129. self.check_read(fileobj)
  130. def test_file(self):
  131. fname = self.mktemp()
  132. try:
  133. with open(fname, 'wb+') as fileobj:
  134. self.assertEqual(os.path.getsize(fname), 0)
  135. self.check_write(fileobj)
  136. self.assertGreater(os.path.getsize(fname), 0)
  137. self.check_read(fileobj)
  138. with open(fname, 'rb') as fileobj:
  139. self.check_read(fileobj)
  140. finally:
  141. os.remove(fname)
  142. @pytest.mark.filterwarnings(
  143. # at least on Windows and MacOS, a resource warning may be emitted
  144. # when this test returns
  145. "ignore::ResourceWarning"
  146. )
  147. def test_TemporaryFile(self):
  148. # in this test, we check explicitly that temp file gets
  149. # automatically deleted upon h5py.File.close()...
  150. fileobj = tempfile.NamedTemporaryFile()
  151. fname = fileobj.name
  152. f = h5py.File(fileobj, 'w')
  153. del fileobj
  154. # ... but in your code feel free to simply
  155. # f = h5py.File(tempfile.TemporaryFile())
  156. f.create_dataset('test', data=list(range(12)))
  157. self.assertEqual(list(f), ['test'])
  158. self.assertEqual(list(f['test'][:]), list(range(12)))
  159. self.assertTrue(os.path.isfile(fname))
  160. f.close()
  161. self.assertFalse(os.path.isfile(fname))
  162. def test_exception_open(self):
  163. self.assertRaises(Exception, h5py.File, None,
  164. driver='fileobj', mode='x')
  165. self.assertRaises(Exception, h5py.File, 'rogue',
  166. driver='fileobj', mode='x')
  167. self.assertRaises(Exception, h5py.File, self,
  168. driver='fileobj', mode='x')
  169. def test_exception_read(self):
  170. class BrokenBytesIO(io.BytesIO):
  171. def readinto(self, b):
  172. raise Exception('I am broken')
  173. f = h5py.File(BrokenBytesIO(), 'w')
  174. f.create_dataset('test', data=list(range(12)))
  175. self.assertRaises(Exception, list, f['test'])
  176. def test_exception_write(self):
  177. class BrokenBytesIO(io.BytesIO):
  178. allow_write = False
  179. def write(self, b):
  180. if self.allow_write:
  181. return super().write(b)
  182. else:
  183. raise Exception('I am broken')
  184. bio = BrokenBytesIO()
  185. f = h5py.File(bio, 'w')
  186. try:
  187. self.assertRaises(Exception, f.create_dataset, 'test',
  188. data=list(range(12)))
  189. finally:
  190. # Un-break writing so we can close: errors while closing get messy.
  191. bio.allow_write = True
  192. f.close()
  193. @ut.skip("Incompletely closed files can cause segfaults")
  194. def test_exception_close(self):
  195. fileobj = io.BytesIO()
  196. f = h5py.File(fileobj, 'w')
  197. fileobj.close()
  198. self.assertRaises(Exception, f.close)
  199. def test_exception_writeonly(self):
  200. # HDF5 expects read & write access to a file it's writing;
  201. # check that we get the correct exception on a write-only file object.
  202. fileobj = open(os.path.join(self.tempdir, make_name("foo{}.h5")), 'wb')
  203. f = h5py.File(fileobj, 'w')
  204. group = f.create_group("group")
  205. with self.assertRaises(io.UnsupportedOperation):
  206. group.create_dataset("data", data='foo', dtype=h5py.string_dtype())
  207. f.close()
  208. fileobj.close()
  209. def test_method_vanish(self):
  210. fileobj = io.BytesIO()
  211. f = h5py.File(fileobj, 'w')
  212. f.create_dataset('test', data=list(range(12)))
  213. self.assertEqual(list(f['test'][:]), list(range(12)))
  214. fileobj.readinto = None
  215. self.assertRaises(Exception, list, f['test'])
  216. class TestTrackOrder(TestCase):
  217. def populate(self, f):
  218. for i in range(100):
  219. # Mix group and dataset creation.
  220. if i % 10 == 0:
  221. f.create_group(str(i))
  222. else:
  223. f[str(i)] = [i]
  224. def test_track_order(self):
  225. fname = self.mktemp()
  226. f = h5py.File(fname, 'w', track_order=True) # creation order
  227. self.populate(f)
  228. self.assertEqual(list(f), [str(i) for i in range(100)])
  229. f.close()
  230. # Check order tracking after reopening the file
  231. f2 = h5py.File(fname)
  232. self.assertEqual(list(f2), [str(i) for i in range(100)])
  233. def test_no_track_order(self):
  234. fname = self.mktemp()
  235. f = h5py.File(fname, 'w', track_order=False) # name alphanumeric
  236. self.populate(f)
  237. self.assertEqual(list(f),
  238. sorted([str(i) for i in range(100)]))
  239. class TestFileMetaBlockSize(TestCase):
  240. """
  241. Feature: The meta block size can be manipulated, changing how metadata
  242. is aggregated and the offset of the first dataset.
  243. """
  244. def test_file_create_with_meta_block_size_4096(self):
  245. # Test a large meta block size of 4 kibibytes
  246. meta_block_size = 4096
  247. with File(
  248. self.mktemp(), 'w',
  249. meta_block_size=meta_block_size,
  250. libver="latest"
  251. ) as f:
  252. f["test"] = 5
  253. self.assertEqual(f.meta_block_size, meta_block_size)
  254. # Equality is expected for HDF5 1.10
  255. self.assertGreaterEqual(f["test"].id.get_offset(), meta_block_size)
  256. def test_file_create_with_meta_block_size_512(self):
  257. # Test a small meta block size of 512 bytes
  258. # The smallest verifiable meta_block_size is 463
  259. meta_block_size = 512
  260. libver = "latest"
  261. with File(
  262. self.mktemp(), 'w',
  263. meta_block_size=meta_block_size,
  264. libver=libver
  265. ) as f:
  266. f["test"] = 3
  267. self.assertEqual(f.meta_block_size, meta_block_size)
  268. # Equality is expected for HDF5 1.10
  269. self.assertGreaterEqual(f["test"].id.get_offset(), meta_block_size)
  270. # Default meta_block_size is 2048. This should fail if meta_block_size is not set.
  271. self.assertLess(f["test"].id.get_offset(), meta_block_size*2)