batcher.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. import warnings
  2. from typing import Optional
  3. from ray.data._internal.arrow_block import ArrowBlockAccessor
  4. from ray.data._internal.arrow_ops import transform_pyarrow
  5. from ray.data._internal.arrow_ops.transform_pyarrow import try_combine_chunked_columns
  6. from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
  7. from ray.data._internal.execution.util import memory_string
  8. from ray.data._internal.util import get_total_obj_store_mem_on_node
  9. from ray.data.block import Block, BlockAccessor
  10. from ray.util import log_once
  11. # Delay compaction until the shuffle buffer has reached this ratio over the min
  12. # shuffle buffer size. Setting this to 1 minimizes memory usage, at the cost of
  13. # frequent compactions. Setting this to higher values increases memory usage but
  14. # reduces compaction frequency.
  15. SHUFFLE_BUFFER_COMPACTION_RATIO = 1.5
  16. class BatcherInterface:
  17. def add(self, block: Block):
  18. """Add a block to the block buffer.
  19. Args:
  20. block: Block to add to the block buffer.
  21. """
  22. raise NotImplementedError()
  23. def done_adding(self) -> bool:
  24. """Indicate to the batcher that no more blocks will be added to the buffer."""
  25. raise NotImplementedError()
  26. def has_batch(self) -> bool:
  27. """Whether this Batcher has any full batches."""
  28. raise NotImplementedError()
  29. def has_any(self) -> bool:
  30. """Whether this Batcher has any data."""
  31. raise NotImplementedError()
  32. def next_batch(self) -> Block:
  33. """Get the next batch from the block buffer.
  34. Returns:
  35. A batch represented as a Block.
  36. """
  37. raise NotImplementedError()
  38. class Batcher(BatcherInterface):
  39. """Chunks blocks into batches."""
  40. # Implementation Note: When there are multiple batches per block, this batcher will
  41. # slice off and return each batch and add the remaining block back to the buffer
  42. # instead of optimally slicing and returning all batches from the block at once.
  43. # This will result in extra (and nested) block slicing. However, since slices are
  44. # zero-copy views, we sacrifice what should be a small performance hit for better
  45. # readability.
  46. def __init__(self, batch_size: Optional[int], ensure_copy: bool = False):
  47. """
  48. Construct a batcher that yields batches of batch_sizes rows.
  49. Args:
  50. batch_size: The size of batches to yield.
  51. ensure_copy: Whether batches are always copied from the underlying base
  52. blocks (not zero-copy views).
  53. """
  54. self._batch_size = batch_size
  55. self._buffer = []
  56. self._buffer_size = 0
  57. self._done_adding = False
  58. self._ensure_copy = ensure_copy
  59. def add(self, block: Block):
  60. """Add a block to the block buffer.
  61. Note empty block is not added to buffer.
  62. Args:
  63. block: Block to add to the block buffer.
  64. """
  65. if BlockAccessor.for_block(block).num_rows() > 0:
  66. self._buffer.append(block)
  67. self._buffer_size += BlockAccessor.for_block(block).num_rows()
  68. def done_adding(self) -> bool:
  69. """Indicate to the batcher that no more blocks will be added to the batcher."""
  70. self._done_adding = True
  71. def has_batch(self) -> bool:
  72. """Whether this Batcher has any full batches."""
  73. return self.has_any() and (
  74. self._batch_size is None or self._buffer_size >= self._batch_size
  75. )
  76. def has_any(self) -> bool:
  77. """Whether this Batcher has any data."""
  78. return self._buffer_size > 0
  79. def next_batch(self) -> Block:
  80. """Get the next batch from the block buffer.
  81. Returns:
  82. A batch represented as a Block.
  83. """
  84. assert self.has_batch() or (self._done_adding and self.has_any())
  85. needs_copy = self._ensure_copy
  86. # If no batch size, short-circuit.
  87. if self._batch_size is None:
  88. assert len(self._buffer) == 1
  89. block = self._buffer[0]
  90. if needs_copy:
  91. # Copy block if needing to ensure fresh batch copy.
  92. block = BlockAccessor.for_block(block)
  93. block = block.slice(0, block.num_rows(), copy=True)
  94. self._buffer = []
  95. self._buffer_size = 0
  96. return block
  97. output = DelegatingBlockBuilder()
  98. leftover = []
  99. needed = self._batch_size
  100. for block in self._buffer:
  101. accessor = BlockAccessor.for_block(block)
  102. if needed <= 0:
  103. # We already have a full batch, so add this block to
  104. # the leftovers.
  105. leftover.append(block)
  106. elif accessor.num_rows() <= needed:
  107. output.add_block(accessor.to_block())
  108. needed -= accessor.num_rows()
  109. else:
  110. # Try de-fragmenting table in case its columns
  111. # have too many chunks (potentially hindering performance of
  112. # subsequent slicing operation)
  113. if isinstance(accessor, ArrowBlockAccessor):
  114. accessor = BlockAccessor.for_block(
  115. transform_pyarrow.try_combine_chunked_columns(block)
  116. )
  117. # We only need part of the block to fill out a batch.
  118. output.add_block(accessor.slice(0, needed, copy=False))
  119. # Add the rest of the block to the leftovers.
  120. leftover.append(accessor.slice(needed, accessor.num_rows(), copy=False))
  121. needed = 0
  122. # Move the leftovers into the block buffer so they're the first
  123. # blocks consumed on the next batch extraction.
  124. self._buffer = leftover
  125. self._buffer_size -= self._batch_size
  126. needs_copy = needs_copy and not output.will_build_yield_copy()
  127. batch = output.build()
  128. if needs_copy:
  129. # Need to ensure that the batch is a fresh copy.
  130. batch = BlockAccessor.for_block(batch)
  131. batch = batch.slice(0, batch.num_rows(), copy=True)
  132. return batch
  133. class ShufflingBatcher(BatcherInterface):
  134. """Chunks blocks into shuffled batches, using a local in-memory shuffle buffer."""
  135. # Implementation Note:
  136. #
  137. # This shuffling batcher lazily builds a shuffle buffer from added blocks, and once
  138. # a batch is requested via .next_batch(), it concatenates the blocks into a concrete
  139. # shuffle buffer and randomly shuffles the entire buffer.
  140. #
  141. # Adding of more blocks can be intermixed with retrieving batches, but it should be
  142. # noted that we can end up performing two expensive operations on each retrieval:
  143. # 1. Build added blocks into a concrete shuffle buffer.
  144. # 2. Shuffling the entire buffer.
  145. # To amortize the overhead of this process, we only shuffle the blocks after a
  146. # delay designated by SHUFFLE_BUFFER_COMPACTION_RATIO.
  147. #
  148. # Similarly, adding blocks is very cheap. Each added block will be appended to a
  149. # list, with concatenation of the underlying data delayed until the next batch
  150. # compaction.
  151. def __init__(
  152. self,
  153. batch_size: Optional[int],
  154. shuffle_buffer_min_size: int,
  155. shuffle_seed: Optional[int] = None,
  156. ):
  157. """Constructs a random-shuffling block batcher.
  158. Args:
  159. batch_size: Record batch size.
  160. shuffle_buffer_min_size: Minimum number of rows that must be in the local
  161. in-memory shuffle buffer in order to yield a batch. When there are no
  162. more rows to be added to the buffer, the number of rows in the buffer
  163. *will* decrease below this value while yielding the remaining batches,
  164. and the final batch may have less than ``batch_size`` rows. Increasing
  165. this will improve the randomness of the shuffle but may increase the
  166. latency to the first batch.
  167. shuffle_seed: The seed to use for the local random shuffle.
  168. """
  169. if batch_size is None:
  170. raise ValueError("Must specify a batch_size if using a local shuffle.")
  171. self._batch_size = batch_size
  172. self._shuffle_seed = shuffle_seed
  173. if shuffle_buffer_min_size < batch_size:
  174. # Round it up internally to `batch_size` since our algorithm requires it.
  175. # This is harmless since it only offers extra randomization.
  176. shuffle_buffer_min_size = batch_size
  177. self._min_rows_to_yield_batch = shuffle_buffer_min_size
  178. self._min_rows_to_trigger_compaction = int(
  179. shuffle_buffer_min_size * SHUFFLE_BUFFER_COMPACTION_RATIO
  180. )
  181. self._builder = DelegatingBlockBuilder()
  182. self._shuffle_buffer: Block = None
  183. self._batch_head = 0
  184. self._done_adding = False
  185. self._total_object_store_nbytes = get_total_obj_store_mem_on_node()
  186. self._total_num_rows_added = 0
  187. self._total_nbytes_added = 0
  188. def add(self, block: Block):
  189. """Add a block to the shuffle buffer.
  190. Note empty block is not added to buffer.
  191. Args:
  192. block: Block to add to the shuffle buffer.
  193. """
  194. # Because Arrow tables are memory mapped, blocks in the builder reside in object
  195. # store memory and not local heap memory. So, if you specify a large buffer size
  196. # and there isn't enough object store memory on the node, you encounter
  197. # spilling.
  198. if (
  199. self._estimated_min_nbytes_in_buffers is not None
  200. and self._estimated_min_nbytes_in_buffers > self._total_object_store_nbytes
  201. and log_once("shuffle_buffer_mem_warning")
  202. ):
  203. warnings.warn(
  204. "The node you're iterating on has "
  205. f"{memory_string(self._total_object_store_nbytes)} object "
  206. "store memory, but the shuffle buffer is estimated to use "
  207. f"{memory_string(self._estimated_min_nbytes_in_buffers)}. If you don't "
  208. f"decrease the shuffle buffer size from "
  209. f"{self._min_rows_to_yield_batch} rows, you might encounter spilling."
  210. )
  211. block_accessor = BlockAccessor.for_block(block)
  212. if block_accessor.num_rows() > 0:
  213. self._builder.add_block(block)
  214. self._total_num_rows_added += block_accessor.num_rows()
  215. self._total_nbytes_added += block_accessor.size_bytes()
  216. @property
  217. def _average_row_nbytes(self) -> Optional[int]:
  218. """Return the average number of bytes per row added to this batcher."""
  219. return (
  220. self._total_nbytes_added // self._total_num_rows_added
  221. if self._total_num_rows_added > 0
  222. else None
  223. )
  224. @property
  225. def _estimated_min_nbytes_in_buffers(self) -> Optional[int]:
  226. """Return the estimated minimum number of bytes across all buffers.
  227. This includes data in both the compacted and uncompacted buffers.
  228. """
  229. if self._average_row_nbytes is None:
  230. return None
  231. return self._average_row_nbytes * self._min_rows_to_trigger_compaction
  232. def done_adding(self) -> bool:
  233. """Indicate to the batcher that no more blocks will be added to the batcher.
  234. No more blocks should be added to the batcher after calling this.
  235. """
  236. self._done_adding = True
  237. def has_any(self) -> bool:
  238. """Whether this batcher has any data."""
  239. return self._num_rows() > 0
  240. def has_batch(self) -> bool:
  241. """Whether this batcher has any batches."""
  242. num_rows = self._num_rows()
  243. if not self._done_adding:
  244. # Delay pulling of batches until the buffer is large enough in order to
  245. # amortize compaction overhead.
  246. return (
  247. self._num_compacted_rows() >= self._min_rows_to_yield_batch
  248. or num_rows - self._batch_size >= self._min_rows_to_trigger_compaction
  249. )
  250. else:
  251. return num_rows >= self._batch_size
  252. def _num_rows(self) -> int:
  253. """Return the total number of rows that haven't been yielded yet.
  254. This includes rows in both the compacted and uncompacted buffers.
  255. """
  256. return self._num_compacted_rows() + self._num_uncompacted_rows()
  257. def _num_compacted_rows(self) -> int:
  258. """Return number of unyielded rows in the compacted (shuffle) buffer."""
  259. if self._shuffle_buffer is None:
  260. return 0
  261. # The size of the concrete (materialized) shuffle buffer, adjusting
  262. # for the batch head position, which also serves as a counter of the number
  263. # of already-yielded rows from the current concrete shuffle buffer.
  264. return max(
  265. 0,
  266. BlockAccessor.for_block(self._shuffle_buffer).num_rows() - self._batch_head,
  267. )
  268. def _num_uncompacted_rows(self) -> int:
  269. """Return number of unyielded rows in the uncompacted buffer."""
  270. return self._builder.num_rows()
  271. def next_batch(self) -> Block:
  272. """Get the next shuffled batch from the shuffle buffer.
  273. Returns:
  274. A batch represented as a Block.
  275. """
  276. assert self.has_batch() or (self._done_adding and self.has_any())
  277. # Add rows in the builder to the shuffle buffer. Note that we delay compaction
  278. # as much as possible to amortize the concatenation overhead. Compaction is
  279. # only necessary when the materialized buffer size falls below the min size.
  280. if self._num_uncompacted_rows() > 0 and (
  281. self._done_adding
  282. or self._num_compacted_rows() <= self._min_rows_to_yield_batch
  283. ):
  284. if self._shuffle_buffer is not None:
  285. if self._batch_head > 0:
  286. # Compact the materialized shuffle buffer.
  287. block = BlockAccessor.for_block(self._shuffle_buffer)
  288. self._shuffle_buffer = block.slice(
  289. self._batch_head, block.num_rows()
  290. )
  291. # Add the unyielded rows from the existing shuffle buffer.
  292. self._builder.add_block(self._shuffle_buffer)
  293. # Build the new shuffle buffer.
  294. self._shuffle_buffer = self._builder.build()
  295. self._shuffle_buffer = BlockAccessor.for_block(
  296. self._shuffle_buffer
  297. ).random_shuffle(self._shuffle_seed)
  298. if self._shuffle_seed is not None:
  299. self._shuffle_seed += 1
  300. if isinstance(
  301. BlockAccessor.for_block(self._shuffle_buffer), ArrowBlockAccessor
  302. ):
  303. self._shuffle_buffer = try_combine_chunked_columns(self._shuffle_buffer)
  304. # Reset the builder.
  305. self._builder = DelegatingBlockBuilder()
  306. self._batch_head = 0
  307. assert self._shuffle_buffer is not None
  308. buffer_size = BlockAccessor.for_block(self._shuffle_buffer).num_rows()
  309. # Truncate the batch to the buffer size, if necessary.
  310. batch_size = min(self._batch_size, buffer_size)
  311. slice_start = self._batch_head
  312. self._batch_head += batch_size
  313. # Yield the shuffled batch.
  314. return BlockAccessor.for_block(self._shuffle_buffer).slice(
  315. slice_start, self._batch_head
  316. )