test_h5d_direct_chunk.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. import h5py
  2. import numpy
  3. import numpy.testing
  4. import pytest
  5. from .common import ut, TestCase, make_name
  6. class TestWriteDirectChunk(TestCase):
  7. def test_write_direct_chunk(self):
  8. filename = self.mktemp().encode()
  9. with h5py.File(filename, "w") as filehandle:
  10. dataset = filehandle.create_dataset("data", (100, 100, 100),
  11. maxshape=(None, 100, 100),
  12. chunks=(1, 100, 100),
  13. dtype='float32')
  14. # writing
  15. array = numpy.zeros((10, 100, 100))
  16. for index in range(10):
  17. a = numpy.random.rand(100, 100).astype('float32')
  18. dataset.id.write_direct_chunk((index, 0, 0), a.tobytes(), filter_mask=1)
  19. array[index] = a
  20. # checking
  21. with h5py.File(filename, "r") as filehandle:
  22. for i in range(10):
  23. read_data = filehandle["data"][i]
  24. numpy.testing.assert_array_equal(array[i], read_data)
  25. @ut.skipIf('gzip' not in h5py.filters.encode, "DEFLATE is not installed")
  26. class TestReadDirectChunk(TestCase):
  27. def test_read_compressed_offsets(self):
  28. filename = self.mktemp().encode()
  29. with h5py.File(filename, "w") as filehandle:
  30. frame = numpy.arange(16).reshape(4, 4)
  31. frame_dataset = filehandle.create_dataset("frame",
  32. data=frame,
  33. compression="gzip",
  34. compression_opts=9)
  35. dataset = filehandle.create_dataset("compressed_chunked",
  36. data=[frame, frame, frame],
  37. compression="gzip",
  38. compression_opts=9,
  39. chunks=(1, ) + frame.shape)
  40. filter_mask, compressed_frame = frame_dataset.id.read_direct_chunk((0, 0))
  41. # No filter must be disabled
  42. self.assertEqual(filter_mask, 0)
  43. for i in range(dataset.shape[0]):
  44. filter_mask, data = dataset.id.read_direct_chunk((i, 0, 0))
  45. self.assertEqual(compressed_frame, data)
  46. # No filter must be disabled
  47. self.assertEqual(filter_mask, 0)
  48. def test_read_uncompressed_offsets(self):
  49. filename = self.mktemp().encode()
  50. frame = numpy.arange(16).reshape(4, 4)
  51. with h5py.File(filename, "w") as filehandle:
  52. dataset = filehandle.create_dataset("frame",
  53. maxshape=(1,) + frame.shape,
  54. shape=(1,) + frame.shape,
  55. dtype="f4",
  56. compression="gzip",
  57. compression_opts=9)
  58. # Write uncompressed data
  59. DISABLE_ALL_FILTERS = 0xFFFFFFFF
  60. dataset.id.write_direct_chunk((0, 0, 0), frame.tobytes(), filter_mask=DISABLE_ALL_FILTERS)
  61. # FIXME: Here we have to close the file and load it back else
  62. # a runtime error occurs:
  63. # RuntimeError: Can't get storage size of chunk (chunk storage is not allocated)
  64. with h5py.File(filename, "r") as filehandle:
  65. dataset = filehandle["frame"]
  66. filter_mask, compressed_frame = dataset.id.read_direct_chunk((0, 0, 0))
  67. # At least 1 filter is supposed to be disabled
  68. self.assertNotEqual(filter_mask, 0)
  69. self.assertEqual(compressed_frame, frame.tobytes())
  70. def test_read_write_chunk(self):
  71. filename = self.mktemp().encode()
  72. with h5py.File(filename, "w") as filehandle:
  73. # create a reference
  74. frame = numpy.arange(16).reshape(4, 4)
  75. frame_dataset = filehandle.create_dataset("source",
  76. data=frame,
  77. compression="gzip",
  78. compression_opts=9)
  79. # configure an empty dataset
  80. filter_mask, compressed_frame = frame_dataset.id.read_direct_chunk((0, 0))
  81. dataset = filehandle.create_dataset("created",
  82. shape=frame_dataset.shape,
  83. maxshape=frame_dataset.shape,
  84. chunks=frame_dataset.chunks,
  85. dtype=frame_dataset.dtype,
  86. compression="gzip",
  87. compression_opts=9)
  88. # copy the data
  89. dataset.id.write_direct_chunk((0, 0), compressed_frame, filter_mask=filter_mask)
  90. # checking
  91. with h5py.File(filename, "r") as filehandle:
  92. dataset = filehandle["created"][...]
  93. numpy.testing.assert_array_equal(dataset, frame)
  94. class TestReadDirectChunkToOut:
  95. def test_uncompressed_data(self, writable_file):
  96. ref_data = numpy.arange(16).reshape(4, 4)
  97. dataset = writable_file.create_dataset(
  98. make_name(), data=ref_data, chunks=ref_data.shape)
  99. out = bytearray(ref_data.nbytes)
  100. filter_mask, chunk = dataset.id.read_direct_chunk((0, 0), out=out)
  101. assert numpy.array_equal(
  102. numpy.frombuffer(out, dtype=ref_data.dtype).reshape(ref_data.shape),
  103. ref_data,
  104. )
  105. assert filter_mask == 0
  106. assert len(chunk) == ref_data.nbytes
  107. @pytest.mark.skipif(
  108. 'gzip' not in h5py.filters.encode,
  109. reason="DEFLATE is not installed",
  110. )
  111. def test_compressed_data(self, writable_file):
  112. ref_data = numpy.arange(16).reshape(4, 4)
  113. dataset = writable_file.create_dataset(
  114. make_name(),
  115. data=ref_data,
  116. chunks=ref_data.shape,
  117. compression="gzip",
  118. compression_opts=9,
  119. )
  120. chunk_info = dataset.id.get_chunk_info(0)
  121. out = bytearray(chunk_info.size)
  122. filter_mask, chunk = dataset.id.read_direct_chunk(
  123. chunk_info.chunk_offset,
  124. out=out,
  125. )
  126. assert filter_mask == chunk_info.filter_mask
  127. assert len(chunk) == chunk_info.size
  128. assert out == dataset.id.read_direct_chunk(chunk_info.chunk_offset)[1]
  129. def test_fail_buffer_too_small(self, writable_file):
  130. ref_data = numpy.arange(16).reshape(4, 4)
  131. dataset = writable_file.create_dataset(
  132. make_name(), data=ref_data, chunks=ref_data.shape)
  133. out = bytearray(ref_data.nbytes // 2)
  134. with pytest.raises(ValueError):
  135. dataset.id.read_direct_chunk((0, 0), out=out)
  136. def test_fail_buffer_readonly(self, writable_file):
  137. ref_data = numpy.arange(16).reshape(4, 4)
  138. dataset = writable_file.create_dataset(
  139. make_name(), data=ref_data, chunks=ref_data.shape)
  140. out = bytes(ref_data.nbytes)
  141. with pytest.raises(BufferError):
  142. dataset.id.read_direct_chunk((0, 0), out=out)
  143. def test_fail_buffer_not_contiguous(self, writable_file):
  144. ref_data = numpy.arange(16).reshape(4, 4)
  145. dataset = writable_file.create_dataset(
  146. make_name(), data=ref_data, chunks=ref_data.shape)
  147. array = numpy.empty(ref_data.shape + (2,), dtype=ref_data.dtype)
  148. out = array[:, :, ::2] # Array is not contiguous
  149. with pytest.raises(ValueError):
  150. dataset.id.read_direct_chunk((0, 0), out=out)