| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- import warnings
- from typing import Optional
- from ray.data._internal.arrow_block import ArrowBlockAccessor
- from ray.data._internal.arrow_ops import transform_pyarrow
- from ray.data._internal.arrow_ops.transform_pyarrow import try_combine_chunked_columns
- from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
- from ray.data._internal.execution.util import memory_string
- from ray.data._internal.util import get_total_obj_store_mem_on_node
- from ray.data.block import Block, BlockAccessor
- from ray.util import log_once
- # Delay compaction until the shuffle buffer has reached this ratio over the min
- # shuffle buffer size. Setting this to 1 minimizes memory usage, at the cost of
- # frequent compactions. Setting this to higher values increases memory usage but
- # reduces compaction frequency.
- SHUFFLE_BUFFER_COMPACTION_RATIO = 1.5
- class BatcherInterface:
- def add(self, block: Block):
- """Add a block to the block buffer.
- Args:
- block: Block to add to the block buffer.
- """
- raise NotImplementedError()
- def done_adding(self) -> bool:
- """Indicate to the batcher that no more blocks will be added to the buffer."""
- raise NotImplementedError()
- def has_batch(self) -> bool:
- """Whether this Batcher has any full batches."""
- raise NotImplementedError()
- def has_any(self) -> bool:
- """Whether this Batcher has any data."""
- raise NotImplementedError()
- def next_batch(self) -> Block:
- """Get the next batch from the block buffer.
- Returns:
- A batch represented as a Block.
- """
- raise NotImplementedError()
- class Batcher(BatcherInterface):
- """Chunks blocks into batches."""
- # Implementation Note: When there are multiple batches per block, this batcher will
- # slice off and return each batch and add the remaining block back to the buffer
- # instead of optimally slicing and returning all batches from the block at once.
- # This will result in extra (and nested) block slicing. However, since slices are
- # zero-copy views, we sacrifice what should be a small performance hit for better
- # readability.
- def __init__(self, batch_size: Optional[int], ensure_copy: bool = False):
- """
- Construct a batcher that yields batches of batch_sizes rows.
- Args:
- batch_size: The size of batches to yield.
- ensure_copy: Whether batches are always copied from the underlying base
- blocks (not zero-copy views).
- """
- self._batch_size = batch_size
- self._buffer = []
- self._buffer_size = 0
- self._done_adding = False
- self._ensure_copy = ensure_copy
- def add(self, block: Block):
- """Add a block to the block buffer.
- Note empty block is not added to buffer.
- Args:
- block: Block to add to the block buffer.
- """
- if BlockAccessor.for_block(block).num_rows() > 0:
- self._buffer.append(block)
- self._buffer_size += BlockAccessor.for_block(block).num_rows()
- def done_adding(self) -> bool:
- """Indicate to the batcher that no more blocks will be added to the batcher."""
- self._done_adding = True
- def has_batch(self) -> bool:
- """Whether this Batcher has any full batches."""
- return self.has_any() and (
- self._batch_size is None or self._buffer_size >= self._batch_size
- )
- def has_any(self) -> bool:
- """Whether this Batcher has any data."""
- return self._buffer_size > 0
- def next_batch(self) -> Block:
- """Get the next batch from the block buffer.
- Returns:
- A batch represented as a Block.
- """
- assert self.has_batch() or (self._done_adding and self.has_any())
- needs_copy = self._ensure_copy
- # If no batch size, short-circuit.
- if self._batch_size is None:
- assert len(self._buffer) == 1
- block = self._buffer[0]
- if needs_copy:
- # Copy block if needing to ensure fresh batch copy.
- block = BlockAccessor.for_block(block)
- block = block.slice(0, block.num_rows(), copy=True)
- self._buffer = []
- self._buffer_size = 0
- return block
- output = DelegatingBlockBuilder()
- leftover = []
- needed = self._batch_size
- for block in self._buffer:
- accessor = BlockAccessor.for_block(block)
- if needed <= 0:
- # We already have a full batch, so add this block to
- # the leftovers.
- leftover.append(block)
- elif accessor.num_rows() <= needed:
- output.add_block(accessor.to_block())
- needed -= accessor.num_rows()
- else:
- # Try de-fragmenting table in case its columns
- # have too many chunks (potentially hindering performance of
- # subsequent slicing operation)
- if isinstance(accessor, ArrowBlockAccessor):
- accessor = BlockAccessor.for_block(
- transform_pyarrow.try_combine_chunked_columns(block)
- )
- # We only need part of the block to fill out a batch.
- output.add_block(accessor.slice(0, needed, copy=False))
- # Add the rest of the block to the leftovers.
- leftover.append(accessor.slice(needed, accessor.num_rows(), copy=False))
- needed = 0
- # Move the leftovers into the block buffer so they're the first
- # blocks consumed on the next batch extraction.
- self._buffer = leftover
- self._buffer_size -= self._batch_size
- needs_copy = needs_copy and not output.will_build_yield_copy()
- batch = output.build()
- if needs_copy:
- # Need to ensure that the batch is a fresh copy.
- batch = BlockAccessor.for_block(batch)
- batch = batch.slice(0, batch.num_rows(), copy=True)
- return batch
- class ShufflingBatcher(BatcherInterface):
- """Chunks blocks into shuffled batches, using a local in-memory shuffle buffer."""
- # Implementation Note:
- #
- # This shuffling batcher lazily builds a shuffle buffer from added blocks, and once
- # a batch is requested via .next_batch(), it concatenates the blocks into a concrete
- # shuffle buffer and randomly shuffles the entire buffer.
- #
- # Adding of more blocks can be intermixed with retrieving batches, but it should be
- # noted that we can end up performing two expensive operations on each retrieval:
- # 1. Build added blocks into a concrete shuffle buffer.
- # 2. Shuffling the entire buffer.
- # To amortize the overhead of this process, we only shuffle the blocks after a
- # delay designated by SHUFFLE_BUFFER_COMPACTION_RATIO.
- #
- # Similarly, adding blocks is very cheap. Each added block will be appended to a
- # list, with concatenation of the underlying data delayed until the next batch
- # compaction.
- def __init__(
- self,
- batch_size: Optional[int],
- shuffle_buffer_min_size: int,
- shuffle_seed: Optional[int] = None,
- ):
- """Constructs a random-shuffling block batcher.
- Args:
- batch_size: Record batch size.
- shuffle_buffer_min_size: Minimum number of rows that must be in the local
- in-memory shuffle buffer in order to yield a batch. When there are no
- more rows to be added to the buffer, the number of rows in the buffer
- *will* decrease below this value while yielding the remaining batches,
- and the final batch may have less than ``batch_size`` rows. Increasing
- this will improve the randomness of the shuffle but may increase the
- latency to the first batch.
- shuffle_seed: The seed to use for the local random shuffle.
- """
- if batch_size is None:
- raise ValueError("Must specify a batch_size if using a local shuffle.")
- self._batch_size = batch_size
- self._shuffle_seed = shuffle_seed
- if shuffle_buffer_min_size < batch_size:
- # Round it up internally to `batch_size` since our algorithm requires it.
- # This is harmless since it only offers extra randomization.
- shuffle_buffer_min_size = batch_size
- self._min_rows_to_yield_batch = shuffle_buffer_min_size
- self._min_rows_to_trigger_compaction = int(
- shuffle_buffer_min_size * SHUFFLE_BUFFER_COMPACTION_RATIO
- )
- self._builder = DelegatingBlockBuilder()
- self._shuffle_buffer: Block = None
- self._batch_head = 0
- self._done_adding = False
- self._total_object_store_nbytes = get_total_obj_store_mem_on_node()
- self._total_num_rows_added = 0
- self._total_nbytes_added = 0
- def add(self, block: Block):
- """Add a block to the shuffle buffer.
- Note empty block is not added to buffer.
- Args:
- block: Block to add to the shuffle buffer.
- """
- # Because Arrow tables are memory mapped, blocks in the builder reside in object
- # store memory and not local heap memory. So, if you specify a large buffer size
- # and there isn't enough object store memory on the node, you encounter
- # spilling.
- if (
- self._estimated_min_nbytes_in_buffers is not None
- and self._estimated_min_nbytes_in_buffers > self._total_object_store_nbytes
- and log_once("shuffle_buffer_mem_warning")
- ):
- warnings.warn(
- "The node you're iterating on has "
- f"{memory_string(self._total_object_store_nbytes)} object "
- "store memory, but the shuffle buffer is estimated to use "
- f"{memory_string(self._estimated_min_nbytes_in_buffers)}. If you don't "
- f"decrease the shuffle buffer size from "
- f"{self._min_rows_to_yield_batch} rows, you might encounter spilling."
- )
- block_accessor = BlockAccessor.for_block(block)
- if block_accessor.num_rows() > 0:
- self._builder.add_block(block)
- self._total_num_rows_added += block_accessor.num_rows()
- self._total_nbytes_added += block_accessor.size_bytes()
- @property
- def _average_row_nbytes(self) -> Optional[int]:
- """Return the average number of bytes per row added to this batcher."""
- return (
- self._total_nbytes_added // self._total_num_rows_added
- if self._total_num_rows_added > 0
- else None
- )
- @property
- def _estimated_min_nbytes_in_buffers(self) -> Optional[int]:
- """Return the estimated minimum number of bytes across all buffers.
- This includes data in both the compacted and uncompacted buffers.
- """
- if self._average_row_nbytes is None:
- return None
- return self._average_row_nbytes * self._min_rows_to_trigger_compaction
- def done_adding(self) -> bool:
- """Indicate to the batcher that no more blocks will be added to the batcher.
- No more blocks should be added to the batcher after calling this.
- """
- self._done_adding = True
- def has_any(self) -> bool:
- """Whether this batcher has any data."""
- return self._num_rows() > 0
- def has_batch(self) -> bool:
- """Whether this batcher has any batches."""
- num_rows = self._num_rows()
- if not self._done_adding:
- # Delay pulling of batches until the buffer is large enough in order to
- # amortize compaction overhead.
- return (
- self._num_compacted_rows() >= self._min_rows_to_yield_batch
- or num_rows - self._batch_size >= self._min_rows_to_trigger_compaction
- )
- else:
- return num_rows >= self._batch_size
- def _num_rows(self) -> int:
- """Return the total number of rows that haven't been yielded yet.
- This includes rows in both the compacted and uncompacted buffers.
- """
- return self._num_compacted_rows() + self._num_uncompacted_rows()
- def _num_compacted_rows(self) -> int:
- """Return number of unyielded rows in the compacted (shuffle) buffer."""
- if self._shuffle_buffer is None:
- return 0
- # The size of the concrete (materialized) shuffle buffer, adjusting
- # for the batch head position, which also serves as a counter of the number
- # of already-yielded rows from the current concrete shuffle buffer.
- return max(
- 0,
- BlockAccessor.for_block(self._shuffle_buffer).num_rows() - self._batch_head,
- )
- def _num_uncompacted_rows(self) -> int:
- """Return number of unyielded rows in the uncompacted buffer."""
- return self._builder.num_rows()
- def next_batch(self) -> Block:
- """Get the next shuffled batch from the shuffle buffer.
- Returns:
- A batch represented as a Block.
- """
- assert self.has_batch() or (self._done_adding and self.has_any())
- # Add rows in the builder to the shuffle buffer. Note that we delay compaction
- # as much as possible to amortize the concatenation overhead. Compaction is
- # only necessary when the materialized buffer size falls below the min size.
- if self._num_uncompacted_rows() > 0 and (
- self._done_adding
- or self._num_compacted_rows() <= self._min_rows_to_yield_batch
- ):
- if self._shuffle_buffer is not None:
- if self._batch_head > 0:
- # Compact the materialized shuffle buffer.
- block = BlockAccessor.for_block(self._shuffle_buffer)
- self._shuffle_buffer = block.slice(
- self._batch_head, block.num_rows()
- )
- # Add the unyielded rows from the existing shuffle buffer.
- self._builder.add_block(self._shuffle_buffer)
- # Build the new shuffle buffer.
- self._shuffle_buffer = self._builder.build()
- self._shuffle_buffer = BlockAccessor.for_block(
- self._shuffle_buffer
- ).random_shuffle(self._shuffle_seed)
- if self._shuffle_seed is not None:
- self._shuffle_seed += 1
- if isinstance(
- BlockAccessor.for_block(self._shuffle_buffer), ArrowBlockAccessor
- ):
- self._shuffle_buffer = try_combine_chunked_columns(self._shuffle_buffer)
- # Reset the builder.
- self._builder = DelegatingBlockBuilder()
- self._batch_head = 0
- assert self._shuffle_buffer is not None
- buffer_size = BlockAccessor.for_block(self._shuffle_buffer).num_rows()
- # Truncate the batch to the buffer size, if necessary.
- batch_size = min(self._batch_size, buffer_size)
- slice_start = self._batch_head
- self._batch_head += batch_size
- # Yield the shuffled batch.
- return BlockAccessor.for_block(self._shuffle_buffer).slice(
- slice_start, self._batch_head
- )
|