| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- import math
- from dataclasses import dataclass
- from typing import Any, Optional
- from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
- from ray.data.block import Block, BlockAccessor, DataBatch
- from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR
- @dataclass
- class OutputBlockSizeOption:
- target_max_block_size: Optional[int] = None
- target_num_rows_per_block: Optional[int] = None
- disable_block_shaping: bool = False
- def __post_init__(self):
- if (
- self.target_max_block_size is None
- and self.target_num_rows_per_block is None
- and not self.disable_block_shaping
- ):
- raise ValueError(
- "Either `target_max_block_size` or `target_num_rows_per_block` "
- "must be specified"
- )
- @classmethod
- def of(
- cls,
- target_max_block_size: Optional[int] = None,
- target_num_rows_per_block: Optional[int] = None,
- disable_block_shaping: bool = False,
- ) -> Optional["OutputBlockSizeOption"]:
- if (
- target_max_block_size is None
- and target_num_rows_per_block is None
- and not disable_block_shaping
- ):
- # In case
- # - Both target_max_block_size and target_num_rows_per_block are None and
- # - disable_block_shaping is False
- #
- # Buffer won't be yielding incrementally, instead producing just a single block.
- return None
- else:
- return OutputBlockSizeOption(
- target_max_block_size=target_max_block_size,
- target_num_rows_per_block=target_num_rows_per_block,
- disable_block_shaping=disable_block_shaping,
- )
- class BlockOutputBuffer:
- """Generates output blocks of a given size or number of rows given a stream of
- inputs.
- This class is used to turn a stream of items / blocks of arbitrary size
- into a stream of blocks of target max block size or
- target max rows per block. The caller should check ``has_next()`` after each
- ``add()`` call, and call ``next()`` to get the next block when ``has_next()``
- returns True.
- When all items have been added, the caller must call ``finalize()`` and
- then check ``has_next()`` one last time.
- Examples:
- >>> from ray.data._internal.output_buffer import BlockOutputBuffer
- >>> udf = ... # doctest: +SKIP
- >>> generator = ... # doctest: +SKIP
- >>> # Yield a stream of output blocks.
- >>> output_block_size_option = OutputBlockSizeOption(target_max_block_size=500 * 1024 * 1024) # doctest: +SKIP
- >>> output = BlockOutputBuffer(output_block_size_option) # doctest: +SKIP
- >>> for item in generator(): # doctest: +SKIP
- ... output.add(item) # doctest: +SKIP
- ... if output.has_next(): # doctest: +SKIP
- ... yield output.next() # doctest: +SKIP
- >>> output.finalize() # doctest: +SKIP
- >>> if output.has_next() # doctest: +SKIP
- ... yield output.next() # doctest: +SKIP
- """
- def __init__(self, output_block_size_option: Optional[OutputBlockSizeOption]):
- self._output_block_size_option = output_block_size_option
- self._buffer = DelegatingBlockBuilder()
- self._finalized = False
- self._has_yielded_blocks = False
- def add(self, item: Any) -> None:
- """Add a single item to this output buffer."""
- assert not self._finalized
- self._buffer.add(item)
- def add_batch(self, batch: DataBatch) -> None:
- """Add a data batch to this output buffer."""
- assert not self._finalized
- self._buffer.add_batch(batch)
- def add_block(self, block: Block) -> None:
- """Add a data block to this output buffer."""
- assert not self._finalized
- self._buffer.add_block(block)
- def finalize(self) -> None:
- """Must be called once all items have been added."""
- assert not self._finalized
- self._finalized = True
- def _exceeded_buffer_row_limit(self) -> bool:
- if self._output_block_size_option.disable_block_shaping:
- return False
- return (
- self._max_num_rows_per_block() is not None
- and self._buffer.num_rows() > self._max_num_rows_per_block()
- )
- def _exceeded_buffer_size_limit(self) -> bool:
- if self._output_block_size_option.disable_block_shaping:
- return False
- return (
- self._max_bytes_per_block() is not None
- and self._buffer.get_estimated_memory_usage() > self._max_bytes_per_block()
- )
- def _max_num_rows_per_block(self) -> Optional[int]:
- if self._output_block_size_option is None:
- return None
- if self._output_block_size_option.disable_block_shaping:
- return None
- return self._output_block_size_option.target_num_rows_per_block
- def _max_bytes_per_block(self) -> Optional[int]:
- if self._output_block_size_option is None:
- return None
- if self._output_block_size_option.disable_block_shaping:
- return None
- return self._output_block_size_option.target_max_block_size
- def has_next(self) -> bool:
- """Returns true when a complete output block is produced."""
- # TODO remove emitting empty blocks
- if self._finalized:
- return not self._has_yielded_blocks or self._buffer.num_rows() > 0
- elif self._output_block_size_option is None:
- # NOTE: When block sizing is disabled, buffer won't be producing
- # incrementally, until the whole sequence is ingested. This
- # is required to align it with semantic of producing 1 block
- # from 1 block of the input
- return False
- elif self._output_block_size_option.disable_block_shaping:
- # When block shaping is disabled, produce blocks immediately
- return self._buffer.num_rows() > 0
- return self._exceeded_buffer_row_limit() or self._exceeded_buffer_size_limit()
- def _exceeded_block_size_slice_limit(self, block: BlockAccessor) -> bool:
- # Slice a block to respect the target max block size. We only do this if we are
- # more than 50% above the target block size, because this ensures that the last
- # block produced will be at least half the target block size.
- return (
- self._max_bytes_per_block() is not None
- and block.size_bytes()
- >= MAX_SAFE_BLOCK_SIZE_FACTOR * self._max_bytes_per_block()
- )
- def _exceeded_block_row_slice_limit(self, block: BlockAccessor) -> bool:
- # Slice a block to respect the target max rows per block. We only do this if we
- # are more than 50% above the target rows per block, because this ensures that
- # the last block produced will be at least half the target row count.
- return (
- self._max_num_rows_per_block() is not None
- and block.num_rows() > self._max_num_rows_per_block()
- )
- def next(self) -> Block:
- """Returns the next complete output block."""
- assert self.has_next()
- block = self._buffer.build()
- accessor = BlockAccessor.for_block(block)
- block_remainder = None
- target_num_rows = None
- if self._exceeded_block_row_slice_limit(accessor):
- target_num_rows = self._max_num_rows_per_block()
- elif self._exceeded_block_size_slice_limit(accessor):
- assert accessor.num_rows() > 0, "Block may not be empty"
- num_bytes_per_row = accessor.size_bytes() / accessor.num_rows()
- target_num_rows = max(
- 1, math.ceil(self._max_bytes_per_block() / num_bytes_per_row)
- )
- if target_num_rows is not None and target_num_rows < accessor.num_rows():
- block = accessor.slice(0, target_num_rows, copy=False)
- block_remainder = accessor.slice(
- target_num_rows, accessor.num_rows(), copy=False
- )
- self._buffer = DelegatingBlockBuilder()
- if block_remainder is not None:
- self._buffer.add_block(block_remainder)
- self._has_yielded_blocks = True
- return block
|