output_buffer.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import math
  2. from dataclasses import dataclass
  3. from typing import Any, Optional
  4. from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
  5. from ray.data.block import Block, BlockAccessor, DataBatch
  6. from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR
  7. @dataclass
  8. class OutputBlockSizeOption:
  9. target_max_block_size: Optional[int] = None
  10. target_num_rows_per_block: Optional[int] = None
  11. disable_block_shaping: bool = False
  12. def __post_init__(self):
  13. if (
  14. self.target_max_block_size is None
  15. and self.target_num_rows_per_block is None
  16. and not self.disable_block_shaping
  17. ):
  18. raise ValueError(
  19. "Either `target_max_block_size` or `target_num_rows_per_block` "
  20. "must be specified"
  21. )
  22. @classmethod
  23. def of(
  24. cls,
  25. target_max_block_size: Optional[int] = None,
  26. target_num_rows_per_block: Optional[int] = None,
  27. disable_block_shaping: bool = False,
  28. ) -> Optional["OutputBlockSizeOption"]:
  29. if (
  30. target_max_block_size is None
  31. and target_num_rows_per_block is None
  32. and not disable_block_shaping
  33. ):
  34. # In case
  35. # - Both target_max_block_size and target_num_rows_per_block are None and
  36. # - disable_block_shaping is False
  37. #
  38. # Buffer won't be yielding incrementally, instead producing just a single block.
  39. return None
  40. else:
  41. return OutputBlockSizeOption(
  42. target_max_block_size=target_max_block_size,
  43. target_num_rows_per_block=target_num_rows_per_block,
  44. disable_block_shaping=disable_block_shaping,
  45. )
  46. class BlockOutputBuffer:
  47. """Generates output blocks of a given size or number of rows given a stream of
  48. inputs.
  49. This class is used to turn a stream of items / blocks of arbitrary size
  50. into a stream of blocks of target max block size or
  51. target max rows per block. The caller should check ``has_next()`` after each
  52. ``add()`` call, and call ``next()`` to get the next block when ``has_next()``
  53. returns True.
  54. When all items have been added, the caller must call ``finalize()`` and
  55. then check ``has_next()`` one last time.
  56. Examples:
  57. >>> from ray.data._internal.output_buffer import BlockOutputBuffer
  58. >>> udf = ... # doctest: +SKIP
  59. >>> generator = ... # doctest: +SKIP
  60. >>> # Yield a stream of output blocks.
  61. >>> output_block_size_option = OutputBlockSizeOption(target_max_block_size=500 * 1024 * 1024) # doctest: +SKIP
  62. >>> output = BlockOutputBuffer(output_block_size_option) # doctest: +SKIP
  63. >>> for item in generator(): # doctest: +SKIP
  64. ... output.add(item) # doctest: +SKIP
  65. ... if output.has_next(): # doctest: +SKIP
  66. ... yield output.next() # doctest: +SKIP
  67. >>> output.finalize() # doctest: +SKIP
  68. >>> if output.has_next() # doctest: +SKIP
  69. ... yield output.next() # doctest: +SKIP
  70. """
  71. def __init__(self, output_block_size_option: Optional[OutputBlockSizeOption]):
  72. self._output_block_size_option = output_block_size_option
  73. self._buffer = DelegatingBlockBuilder()
  74. self._finalized = False
  75. self._has_yielded_blocks = False
  76. def add(self, item: Any) -> None:
  77. """Add a single item to this output buffer."""
  78. assert not self._finalized
  79. self._buffer.add(item)
  80. def add_batch(self, batch: DataBatch) -> None:
  81. """Add a data batch to this output buffer."""
  82. assert not self._finalized
  83. self._buffer.add_batch(batch)
  84. def add_block(self, block: Block) -> None:
  85. """Add a data block to this output buffer."""
  86. assert not self._finalized
  87. self._buffer.add_block(block)
  88. def finalize(self) -> None:
  89. """Must be called once all items have been added."""
  90. assert not self._finalized
  91. self._finalized = True
  92. def _exceeded_buffer_row_limit(self) -> bool:
  93. if self._output_block_size_option.disable_block_shaping:
  94. return False
  95. return (
  96. self._max_num_rows_per_block() is not None
  97. and self._buffer.num_rows() > self._max_num_rows_per_block()
  98. )
  99. def _exceeded_buffer_size_limit(self) -> bool:
  100. if self._output_block_size_option.disable_block_shaping:
  101. return False
  102. return (
  103. self._max_bytes_per_block() is not None
  104. and self._buffer.get_estimated_memory_usage() > self._max_bytes_per_block()
  105. )
  106. def _max_num_rows_per_block(self) -> Optional[int]:
  107. if self._output_block_size_option is None:
  108. return None
  109. if self._output_block_size_option.disable_block_shaping:
  110. return None
  111. return self._output_block_size_option.target_num_rows_per_block
  112. def _max_bytes_per_block(self) -> Optional[int]:
  113. if self._output_block_size_option is None:
  114. return None
  115. if self._output_block_size_option.disable_block_shaping:
  116. return None
  117. return self._output_block_size_option.target_max_block_size
  118. def has_next(self) -> bool:
  119. """Returns true when a complete output block is produced."""
  120. # TODO remove emitting empty blocks
  121. if self._finalized:
  122. return not self._has_yielded_blocks or self._buffer.num_rows() > 0
  123. elif self._output_block_size_option is None:
  124. # NOTE: When block sizing is disabled, buffer won't be producing
  125. # incrementally, until the whole sequence is ingested. This
  126. # is required to align it with semantic of producing 1 block
  127. # from 1 block of the input
  128. return False
  129. elif self._output_block_size_option.disable_block_shaping:
  130. # When block shaping is disabled, produce blocks immediately
  131. return self._buffer.num_rows() > 0
  132. return self._exceeded_buffer_row_limit() or self._exceeded_buffer_size_limit()
  133. def _exceeded_block_size_slice_limit(self, block: BlockAccessor) -> bool:
  134. # Slice a block to respect the target max block size. We only do this if we are
  135. # more than 50% above the target block size, because this ensures that the last
  136. # block produced will be at least half the target block size.
  137. return (
  138. self._max_bytes_per_block() is not None
  139. and block.size_bytes()
  140. >= MAX_SAFE_BLOCK_SIZE_FACTOR * self._max_bytes_per_block()
  141. )
  142. def _exceeded_block_row_slice_limit(self, block: BlockAccessor) -> bool:
  143. # Slice a block to respect the target max rows per block. We only do this if we
  144. # are more than 50% above the target rows per block, because this ensures that
  145. # the last block produced will be at least half the target row count.
  146. return (
  147. self._max_num_rows_per_block() is not None
  148. and block.num_rows() > self._max_num_rows_per_block()
  149. )
  150. def next(self) -> Block:
  151. """Returns the next complete output block."""
  152. assert self.has_next()
  153. block = self._buffer.build()
  154. accessor = BlockAccessor.for_block(block)
  155. block_remainder = None
  156. target_num_rows = None
  157. if self._exceeded_block_row_slice_limit(accessor):
  158. target_num_rows = self._max_num_rows_per_block()
  159. elif self._exceeded_block_size_slice_limit(accessor):
  160. assert accessor.num_rows() > 0, "Block may not be empty"
  161. num_bytes_per_row = accessor.size_bytes() / accessor.num_rows()
  162. target_num_rows = max(
  163. 1, math.ceil(self._max_bytes_per_block() / num_bytes_per_row)
  164. )
  165. if target_num_rows is not None and target_num_rows < accessor.num_rows():
  166. block = accessor.slice(0, target_num_rows, copy=False)
  167. block_remainder = accessor.slice(
  168. target_num_rows, accessor.num_rows(), copy=False
  169. )
  170. self._buffer = DelegatingBlockBuilder()
  171. if block_remainder is not None:
  172. self._buffer.add_block(block_remainder)
  173. self._has_yielded_blocks = True
  174. return block