| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067 |
- import abc
- import time
- import warnings
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Dict,
- Iterable,
- Iterator,
- List,
- Literal,
- Optional,
- Tuple,
- TypeVar,
- Union,
- )
- import numpy as np
- from ray.data._internal.block_batching.iter_batches import BatchIterator
- from ray.data._internal.execution.interfaces import RefBundle
- from ray.data._internal.logical.interfaces import LogicalPlan
- from ray.data._internal.logical.operators import InputData
- from ray.data._internal.plan import ExecutionPlan
- from ray.data._internal.stats import DatasetStats
- from ray.data.block import BlockAccessor, DataBatch, _apply_batch_format
- from ray.data.collate_fn import (
- ArrowBatchCollateFn,
- CollateFn,
- DefaultCollateFn,
- NumpyBatchCollateFn,
- PandasBatchCollateFn,
- TensorBatchReturnType,
- TensorBatchType,
- is_tensor_batch_type,
- )
- from ray.data.context import DataContext
- from ray.util.annotations import PublicAPI, RayDeprecationWarning
- if TYPE_CHECKING:
- import tensorflow as tf
- import torch
- from ray.data._internal.execution.streaming_executor import StreamingExecutor
- from ray.data.dataset import (
- CollatedData,
- MaterializedDataset,
- Schema,
- TensorFlowTensorBatchType,
- TorchBatchType,
- TorchDeviceType,
- )
- T = TypeVar("T")
- class _IterableFromIterator(Iterable[T]):
- def __init__(self, iterator_gen: Callable[[], Iterator[T]]):
- """Constructs an Iterable from an iterator generator.
- Args:
- iterator_gen: A function that returns an iterator each time it
- is called. For example, this can be a generator function.
- """
- self.iterator_gen = iterator_gen
- def __iter__(self):
- return self.iterator_gen()
- @PublicAPI
- class DataIterator(abc.ABC):
- """An iterator for reading records from a :class:`~Dataset`.
- For Datasets, each iteration call represents a complete read of all items in the
- Dataset.
- If using Ray Train, each trainer actor should get its own iterator by calling
- :meth:`ray.train.get_dataset_shard("train")
- <ray.train.get_dataset_shard>`.
- Examples:
- >>> import ray
- >>> ds = ray.data.range(5)
- >>> ds
- shape: (5, 1)
- ╭───────╮
- │ id │
- │ --- │
- │ int64 │
- ╰───────╯
- (Dataset isn't materialized)
- >>> ds.iterator()
- DataIterator(shape: (5, 1)
- ╭───────╮
- │ id │
- │ --- │
- │ int64 │
- ╰───────╯
- (Dataset isn't materialized))
- """
- @abc.abstractmethod
- def _to_ref_bundle_iterator(
- self,
- ) -> Tuple[
- Iterator[RefBundle], Optional[DatasetStats], bool, Optional["StreamingExecutor"]
- ]:
- """Returns the iterator to use for `iter_batches`.
- Returns:
- A tuple containing:
- - An iterator over RefBundles.
- - A DatasetStats object used for recording stats during iteration.
- - A boolean indicating if the blocks can be safely cleared after use.
- - An optional executor (StreamingExecutor) for reporting prefetched bytes.
- """
- ...
- @PublicAPI
- def iter_batches(
- self,
- *,
- prefetch_batches: int = 1,
- batch_size: int = 256,
- batch_format: Optional[str] = "default",
- drop_last: bool = False,
- local_shuffle_buffer_size: Optional[int] = None,
- local_shuffle_seed: Optional[int] = None,
- ) -> Iterable[DataBatch]:
- """Return a batched iterable over the dataset.
- Examples:
- >>> import ray
- >>> for batch in ray.data.range(
- ... 1000000
- ... ).iterator().iter_batches(): # doctest: +SKIP
- ... print(batch) # doctest: +SKIP
- Time complexity: O(1)
- Args:
- prefetch_batches: The number of batches to fetch ahead of the current batch
- to fetch. If set to greater than 0, a separate threadpool will be used
- to fetch the objects to the local node, format the batches, and apply
- the collate_fn. Defaults to 1.
- batch_size: The number of rows in each batch, or None to use entire blocks
- as batches (blocks may contain different number of rows).
- The final batch may include fewer than ``batch_size`` rows if
- ``drop_last`` is ``False``. Defaults to 256.
- batch_format: Specify ``"default"`` to use the default block format
- (NumPy), ``"pandas"`` to select ``pandas.DataFrame``, "pyarrow" to
- select ``pyarrow.Table``, or ``"numpy"`` to select
- ``Dict[str, numpy.ndarray]``, or None to return the underlying block
- exactly as is with no additional formatting.
- drop_last: Whether to drop the last batch if it's incomplete.
- local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
- using a local in-memory shuffle buffer, and this value will serve as the
- minimum number of rows that must be in the local in-memory shuffle
- buffer in order to yield a batch. When there are no more rows to add to
- the buffer, the remaining rows in the buffer will be drained.
- local_shuffle_seed: The seed to use for the local random shuffle.
- Returns:
- An iterable over record batches.
- """
- return self._iter_batches(
- prefetch_batches=prefetch_batches,
- batch_size=batch_size,
- batch_format=batch_format,
- drop_last=drop_last,
- local_shuffle_buffer_size=local_shuffle_buffer_size,
- local_shuffle_seed=local_shuffle_seed,
- )
- def _create_batch_iterator(
- self,
- ref_bundles_iter: Iterator[RefBundle],
- prefetch_bytes_callback: Optional[Callable[[int], None]] = None,
- **kwargs,
- ) -> BatchIterator:
- return BatchIterator(
- ref_bundles_iter,
- prefetch_bytes_callback=prefetch_bytes_callback,
- **kwargs,
- )
- def _iter_batches(
- self,
- *,
- prefetch_batches: int = 1,
- batch_size: int = 256,
- batch_format: Optional[str] = "default",
- drop_last: bool = False,
- local_shuffle_buffer_size: Optional[int] = None,
- local_shuffle_seed: Optional[int] = None,
- _collate_fn: Optional[Callable[[DataBatch], "CollatedData"]] = None,
- _finalize_fn: Optional[Callable[[Any], Any]] = None,
- ) -> Iterable[DataBatch]:
- batch_format = _apply_batch_format(batch_format)
- def _create_iterator() -> Iterator[DataBatch]:
- time_start = time.perf_counter()
- # Iterate through the dataset from the start each time
- # _iterator_gen is called.
- # This allows multiple iterations of the dataset without
- # needing to explicitly call `iter_batches()` multiple times.
- (
- ref_bundles_iterator,
- stats,
- blocks_owned_by_consumer,
- executor,
- ) = self._to_ref_bundle_iterator()
- dataset_tag = self._get_dataset_tag()
- # Create a callback to report prefetched bytes to the executor's
- # resource manager.
- def make_prefetch_callback(exec):
- def callback(num_bytes: int) -> None:
- exec.set_external_consumer_bytes(num_bytes)
- return callback
- prefetch_bytes_callback = (
- make_prefetch_callback(executor) if executor is not None else None
- )
- batch_iterator = self._create_batch_iterator(
- ref_bundles_iterator,
- stats=stats,
- dataset_tag=dataset_tag,
- clear_block_after_read=blocks_owned_by_consumer,
- batch_size=batch_size,
- batch_format=batch_format,
- drop_last=drop_last,
- collate_fn=_collate_fn,
- finalize_fn=_finalize_fn,
- shuffle_buffer_min_size=local_shuffle_buffer_size,
- shuffle_seed=local_shuffle_seed,
- prefetch_batches=prefetch_batches,
- prefetch_bytes_callback=prefetch_bytes_callback,
- )
- if stats:
- stats.iter_initialize_s.add(time.perf_counter() - time_start)
- yield from batch_iterator
- if stats:
- stats.iter_total_s.add(time.perf_counter() - time_start)
- return _IterableFromIterator(_create_iterator)
- def _get_dataset_tag(self) -> str:
- return "unknown_dataset"
- @PublicAPI
- def iter_rows(self) -> Iterable[Dict[str, Any]]:
- """Return a local row iterable over the dataset.
- If the dataset is a tabular dataset (Arrow/Pandas blocks), dicts
- are yielded for each row by the iterator. If the dataset is not tabular,
- the raw row is yielded.
- Examples:
- >>> import ray
- >>> dataset = ray.data.range(10)
- >>> next(iter(dataset.iterator().iter_rows()))
- {'id': 0}
- Time complexity: O(1)
- Returns:
- An iterable over rows of the dataset.
- """
- batch_iterable = self._iter_batches(
- batch_size=None, batch_format=None, prefetch_batches=1
- )
- def _wrapped_iterator():
- for batch in batch_iterable:
- batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch))
- for row in batch.iter_rows(public_row_format=True):
- yield row
- return _IterableFromIterator(_wrapped_iterator)
- @abc.abstractmethod
- @PublicAPI
- def stats(self) -> str:
- """Returns a string containing execution timing information."""
- ...
- @abc.abstractmethod
- def schema(self) -> "Schema":
- """Return the schema of the dataset iterated over."""
- ...
- @abc.abstractmethod
- def get_context(self) -> DataContext:
- ...
- @PublicAPI
- def iter_torch_batches(
- self,
- *,
- prefetch_batches: int = 1,
- batch_size: Optional[int] = 256,
- dtypes: Optional[Union["torch.dtype", Dict[str, "torch.dtype"]]] = None,
- device: Union["TorchDeviceType", Literal["auto"]] = "auto",
- collate_fn: Optional[
- Union[Callable[[Dict[str, np.ndarray]], "CollatedData"], CollateFn]
- ] = None,
- drop_last: bool = False,
- local_shuffle_buffer_size: Optional[int] = None,
- local_shuffle_seed: Optional[int] = None,
- pin_memory: bool = False,
- ) -> Iterable["TorchBatchType"]:
- """Return a batched iterable of Torch Tensors over the dataset.
- This iterable yields a dictionary of column-tensors. If you are looking for
- more flexibility in the tensor conversion (e.g. casting dtypes) or the batch
- format, try using :meth:`~ray.data.DataIterator.iter_batches` directly.
- Examples:
- >>> import ray
- >>> for batch in ray.data.range(
- ... 12,
- ... ).iterator().iter_torch_batches(batch_size=4):
- ... print(batch)
- {'id': tensor([0, 1, 2, 3])}
- {'id': tensor([4, 5, 6, 7])}
- {'id': tensor([ 8, 9, 10, 11])}
- Use the ``ArrowBatchCollateFn`` to customize how the tensor batch is created
- from an Arrow batch.
- >>> import pyarrow as pa
- >>> import torch
- >>> import ray
- >>> from ray.data.collate_fn import ArrowBatchCollateFn
- >>> class CustomArrowBatchCollateFn(ArrowBatchCollateFn):
- ... def __call__(self, batch: pa.Table) -> torch.Tensor:
- ... return torch.as_tensor(batch["col_1"].to_numpy() + 5)
- >>> iterator = ray.data.from_items([
- ... {"col_1": 1, "col_2": 2},
- ... {"col_1": 3, "col_2": 4}]).iterator()
- >>> for batch in iterator.iter_torch_batches(collate_fn=CustomArrowBatchCollateFn()):
- ... print(batch)
- tensor([6, 8])
- Use the ``NumpyBatchCollateFn`` to customize how the tensor batch is created
- from a Numpy batch.
- >>> from typing import Dict
- >>> import numpy as np
- >>> import torch
- >>> import ray
- >>> from ray.data.collate_fn import NumpyBatchCollateFn
- >>> class CustomNumpyBatchCollateFn(NumpyBatchCollateFn):
- ... def __call__(self, batch: Dict[str, np.ndarray]) -> torch.Tensor:
- ... return torch.as_tensor(batch["col_1"] + 5)
- >>> iterator = ray.data.from_items([
- ... {"col_1": 1, "col_2": 2},
- ... {"col_1": 3, "col_2": 4}]).iterator()
- >>> for batch in iterator.iter_torch_batches(collate_fn=CustomNumpyBatchCollateFn()):
- ... print(batch)
- tensor([6, 8])
- Use the ``PandasBatchCollateFn`` to customize how the tensor batch is created
- from a Pandas batch.
- >>> import pandas as pd
- >>> import torch
- >>> import ray
- >>> from ray.data.collate_fn import PandasBatchCollateFn
- >>> class CustomPandasBatchCollateFn(PandasBatchCollateFn):
- ... def __call__(self, batch: pd.DataFrame) -> torch.Tensor:
- ... return torch.as_tensor(batch["col_1"].to_numpy() + 5)
- >>> iterator = ray.data.from_items([
- ... {"col_1": 1, "col_2": 2},
- ... {"col_1": 3, "col_2": 4}]).iterator()
- >>> for batch in iterator.iter_torch_batches(collate_fn=CustomPandasBatchCollateFn()):
- ... print(batch)
- tensor([6, 8])
- Time complexity: O(1)
- Args:
- prefetch_batches: The number of batches to fetch ahead of the current batch
- to fetch. If set to greater than 0, a separate threadpool will be used
- to fetch the objects to the local node, format the batches, and apply
- the collate_fn. Defaults to 1.
- batch_size: The number of rows in each batch, or None to use entire blocks
- as batches (blocks may contain different number of rows).
- The final batch may include fewer than ``batch_size`` rows if
- ``drop_last`` is ``False``. Defaults to 256.
- dtypes: The Torch dtype(s) for the created tensor(s); if None, the dtype
- will be inferred from the tensor data. You can't use this parameter
- with ``collate_fn``.
- device: The device on which the tensor should be placed. Defaults to
- "auto" which moves the tensors to the appropriate device when the
- Dataset is passed to Ray Train and ``collate_fn`` is not provided.
- Otherwise, defaults to CPU. You can't use this parameter with
- ``collate_fn``.
- collate_fn: [Alpha] A function to customize how data batches are collated
- before being passed to the model. This is useful for last-mile data
- formatting such as padding, masking, or packaging tensors into custom
- data structures. If not provided, `iter_torch_batches` automatically
- converts batches to `torch.Tensor`s and moves them to the device
- assigned to the current worker. The input to `collate_fn` may be:
- 1. pyarrow.Table, where you should provide a callable class that
- subclasses `ArrowBatchCollateFn` (recommended for best performance).
- Note that you should use util function `arrow_batch_to_tensors` to
- convert the pyarrow.Table to a dictionary of non-contiguous tensor
- batches.
- 2. Dict[str, np.ndarray], where you should provide a callable class that
- subclasses `NumpyBatchCollateFn`
- 3. pd.DataFrame, where you should provide a callable class that
- subclasses `PandasBatchCollateFn`
- The output can be any type. If the output is a `TensorBatchType`, it will be
- automatically moved to the current worker's device. For other types,
- you must handle device transfer manually in your training loop.
- Note: This function is called in a multi-threaded context; avoid using
- thread-unsafe code.
- drop_last: Whether to drop the last batch if it's incomplete.
- local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
- using a local in-memory shuffle buffer, and this value will serve as the
- minimum number of rows that must be in the local in-memory shuffle
- buffer in order to yield a batch. When there are no more rows to add to
- the buffer, the remaining rows in the buffer will be drained. This
- buffer size must be greater than or equal to ``batch_size``, and
- therefore ``batch_size`` must also be specified when using local
- shuffling.
- local_shuffle_seed: The seed to use for the local random shuffle.
- pin_memory: [Alpha] If True, copies the tensor to pinned memory. Note that
- `pin_memory` is only supported when using `DefaultCollateFn`.
- Returns:
- An iterable over Torch Tensor batches.
- """
- from ray.train.torch import get_device
- from ray.train.utils import _in_ray_train_worker
- if collate_fn is not None and (dtypes is not None or device != "auto"):
- raise ValueError(
- "collate_fn cannot be used with dtypes and device."
- "You should manually move the output Torch tensors to the"
- "desired dtype and device outside of collate_fn."
- )
- if pin_memory and collate_fn is not None:
- raise ValueError(
- "pin_memory is only supported when using `DefaultCollateFn`."
- )
- if device == "auto":
- # Use the appropriate device for Ray Train, or falls back to CPU if
- # Ray Train is not being used.
- device = get_device() if _in_ray_train_worker() else "cpu"
- from ray.data.util.torch_utils import (
- move_tensors_to_device,
- )
- # The default finalize_fn handles the host to device data transfer.
- # This is executed in a 1-thread pool separately from collate_fn
- # to allow independent parallelism of these steps.
- def default_finalize_fn(
- batch: TensorBatchType,
- ) -> Union[TensorBatchReturnType, Any]:
- """Default finalize function for moving PyTorch tensors to device. If
- batch is of type `TensorBatchType`, it will be automatically moved to the
- current worker's device. For other types, you must handle device transfer
- manually in your training loop.
- Args:
- batch: Input batch to move to device.
- Returns:
- Batch with tensors moved to the target device.
- - If input is TensorBatchType, returns tensors moved to device
- - Otherwise returns the same type as input without moving tensors
- to device.
- """
- if is_tensor_batch_type(batch):
- return move_tensors_to_device(batch, device=device)
- else:
- return batch
- if collate_fn is None:
- # The default collate_fn handles formatting and Tensor creation.
- # Here, we defer host to device data transfer to the subsequent
- # finalize_fn.
- collate_fn = DefaultCollateFn(
- dtypes=dtypes,
- device=device,
- pin_memory=pin_memory,
- )
- batch_format = "pyarrow"
- elif isinstance(collate_fn, ArrowBatchCollateFn):
- # The ArrowBatchCollateFn handles formatting and Tensor creation.
- # Here, we defer host to device data transfer to the subsequent
- # finalize_fn.
- batch_format = "pyarrow"
- elif isinstance(collate_fn, NumpyBatchCollateFn):
- batch_format = "numpy"
- elif isinstance(collate_fn, PandasBatchCollateFn):
- batch_format = "pandas"
- elif callable(collate_fn):
- batch_format = "numpy"
- warnings.warn(
- "Passing a function to `iter_torch_batches(collate_fn)` is "
- "deprecated in Ray 2.47. Please switch to using a callable class that "
- "inherits from `ArrowBatchCollateFn`, `NumpyBatchCollateFn`, or "
- "`PandasBatchCollateFn`.",
- RayDeprecationWarning,
- )
- else:
- raise ValueError(f"Unsupported collate function: {type(collate_fn)}")
- return self._iter_batches(
- prefetch_batches=prefetch_batches,
- batch_size=batch_size,
- batch_format=batch_format,
- drop_last=drop_last,
- local_shuffle_buffer_size=local_shuffle_buffer_size,
- local_shuffle_seed=local_shuffle_seed,
- _collate_fn=collate_fn,
- _finalize_fn=default_finalize_fn,
- )
- def iter_tf_batches(
- self,
- *,
- prefetch_batches: int = 1,
- batch_size: Optional[int] = 256,
- dtypes: Optional[Union["tf.dtypes.DType", Dict[str, "tf.dtypes.DType"]]] = None,
- drop_last: bool = False,
- local_shuffle_buffer_size: Optional[int] = None,
- local_shuffle_seed: Optional[int] = None,
- ) -> Iterable["TensorFlowTensorBatchType"]:
- """Return a batched iterable of TensorFlow Tensors over the dataset.
- This iterable will yield single-tensor batches of the underlying dataset
- consists of a single column; otherwise, it will yield a dictionary of
- column-tensors.
- .. tip::
- If you don't need the additional flexibility provided by this method,
- consider using :meth:`~ray.data.Dataset.to_tf` instead. It's easier
- to use.
- Examples:
- >>> import ray
- >>> for batch in ray.data.range( # doctest: +SKIP
- ... 12,
- ... ).iter_tf_batches(batch_size=4):
- ... print(batch.shape) # doctest: +SKIP
- (4, 1)
- (4, 1)
- (4, 1)
- Time complexity: O(1)
- Args:
- prefetch_batches: The number of batches to fetch ahead of the current batch
- to fetch. If set to greater than 0, a separate threadpool will be used
- to fetch the objects to the local node, format the batches, and apply
- the collate_fn. Defaults to 1.
- batch_size: The number of rows in each batch, or None to use entire blocks
- as batches (blocks may contain different number of rows).
- The final batch may include fewer than ``batch_size`` rows if
- ``drop_last`` is ``False``. Defaults to 256.
- dtypes: The TensorFlow dtype(s) for the created tensor(s); if None, the
- dtype will be inferred from the tensor data.
- drop_last: Whether to drop the last batch if it's incomplete.
- local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
- using a local in-memory shuffle buffer, and this value will serve as the
- minimum number of rows that must be in the local in-memory shuffle
- buffer in order to yield a batch. When there are no more rows to add to
- the buffer, the remaining rows in the buffer will be drained. This
- buffer size must be greater than or equal to ``batch_size``, and
- therefore ``batch_size`` must also be specified when using local
- shuffling.
- local_shuffle_seed: The seed to use for the local random shuffle.
- Returns:
- An iterator over TensorFlow Tensor batches.
- """
- from ray.data._internal.utils.tensorflow_utils import (
- convert_ndarray_batch_to_tf_tensor_batch,
- )
- batch_iterable = self._iter_batches(
- prefetch_batches=prefetch_batches,
- batch_size=batch_size,
- drop_last=drop_last,
- local_shuffle_buffer_size=local_shuffle_buffer_size,
- local_shuffle_seed=local_shuffle_seed,
- )
- mapped_iterable = map(
- lambda batch: convert_ndarray_batch_to_tf_tensor_batch(
- batch, dtypes=dtypes
- ),
- batch_iterable,
- )
- return mapped_iterable
- def to_torch(
- self,
- *,
- label_column: Optional[str] = None,
- feature_columns: Optional[
- Union[List[str], List[List[str]], Dict[str, List[str]]]
- ] = None,
- label_column_dtype: Optional["torch.dtype"] = None,
- feature_column_dtypes: Optional[
- Union["torch.dtype", List["torch.dtype"], Dict[str, "torch.dtype"]]
- ] = None,
- batch_size: int = 1,
- prefetch_batches: int = 1,
- drop_last: bool = False,
- local_shuffle_buffer_size: Optional[int] = None,
- local_shuffle_seed: Optional[int] = None,
- unsqueeze_label_tensor: bool = True,
- unsqueeze_feature_tensors: bool = True,
- ) -> "torch.utils.data.IterableDataset":
- """Return a Torch IterableDataset over this dataset.
- This is only supported for datasets convertible to Arrow records.
- It is recommended to use the returned ``IterableDataset`` directly
- instead of passing it into a torch ``DataLoader``.
- Each element in IterableDataset will be a tuple consisting of 2
- elements. The first item contains the feature tensor(s), and the
- second item is the label tensor. Those can take on different
- forms, depending on the specified arguments.
- For the features tensor (N is the ``batch_size`` and n, m, k
- are the number of features per tensor):
- * If ``feature_columns`` is a ``List[str]``, the features will be
- a tensor of shape (N, n), with columns corresponding to
- ``feature_columns``
- * If ``feature_columns`` is a ``List[List[str]]``, the features will be
- a list of tensors of shape [(N, m),...,(N, k)], with columns of each
- tensor corresponding to the elements of ``feature_columns``
- * If ``feature_columns`` is a ``Dict[str, List[str]]``, the features
- will be a dict of key-tensor pairs of shape
- {key1: (N, m),..., keyN: (N, k)}, with columns of each
- tensor corresponding to the value of ``feature_columns`` under the
- key.
- If ``unsqueeze_label_tensor=True`` (default), the label tensor will be
- of shape (N, 1). Otherwise, it will be of shape (N,).
- If ``label_column`` is specified as ``None``, then no column from the
- ``Dataset`` will be treated as the label, and the output label tensor
- will be ``None``.
- Note that you probably want to call ``.split()`` on this dataset if
- there are to be multiple Torch workers consuming the data.
- Time complexity: O(1)
- Args:
- label_column: The name of the column used as the
- label (second element of the output list). Can be None for
- prediction, in which case the second element of returned
- tuple will also be None.
- feature_columns: The names of the columns
- to use as the features. Can be a list of lists or
- a dict of string-list pairs for multi-tensor output.
- If None, then use all columns except the label column as
- the features.
- label_column_dtype: The torch dtype to
- use for the label column. If None, then automatically infer
- the dtype.
- feature_column_dtypes: The dtypes to use for the feature
- tensors. This should match the format of ``feature_columns``,
- or be a single dtype, in which case it will be applied to
- all tensors. If None, then automatically infer the dtype.
- batch_size: How many samples per batch to yield at a time.
- Defaults to 1.
- prefetch_batches: The number of batches to fetch ahead of the current batch
- to fetch. If set to greater than 0, a separate threadpool will be used
- to fetch the objects to the local node, format the batches, and apply
- the collate_fn. Defaults to 1.
- drop_last: Set to True to drop the last incomplete batch,
- if the dataset size is not divisible by the batch size. If
- False and the size of dataset is not divisible by the batch
- size, then the last batch will be smaller. Defaults to False.
- local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
- using a local in-memory shuffle buffer, and this value will serve as the
- minimum number of rows that must be in the local in-memory shuffle
- buffer in order to yield a batch. When there are no more rows to add to
- the buffer, the remaining rows in the buffer will be drained. This
- buffer size must be greater than or equal to ``batch_size``, and
- therefore ``batch_size`` must also be specified when using local
- shuffling.
- local_shuffle_seed: The seed to use for the local random shuffle.
- unsqueeze_label_tensor: If set to True, the label tensor
- will be unsqueezed (reshaped to (N, 1)). Otherwise, it will
- be left as is, that is (N, ). In general, regression loss
- functions expect an unsqueezed tensor, while classification
- loss functions expect a squeezed one. Defaults to True.
- unsqueeze_feature_tensors: If set to True, the features tensors
- will be unsqueezed (reshaped to (N, 1)) before being concatenated into
- the final features tensor. Otherwise, they will be left as is, that is
- (N, ). Defaults to True.
- Returns:
- A torch IterableDataset.
- """
- import torch
- from ray.data._internal.torch_iterable_dataset import TorchIterableDataset
- from ray.data.util.torch_utils import convert_pandas_to_torch_tensor
- # If an empty collection is passed in, treat it the same as None
- if not feature_columns:
- feature_columns = None
- if feature_column_dtypes and not isinstance(feature_column_dtypes, torch.dtype):
- if isinstance(feature_columns, dict):
- if not isinstance(feature_column_dtypes, dict):
- raise TypeError(
- "If `feature_columns` is a dict, "
- "`feature_column_dtypes` must be None, `torch.dtype`,"
- f" or dict, got {type(feature_column_dtypes)}."
- )
- if set(feature_columns) != set(feature_column_dtypes):
- raise ValueError(
- "`feature_columns` and `feature_column_dtypes` "
- "must have the same keys."
- )
- if any(not subcolumns for subcolumns in feature_columns.values()):
- raise ValueError("column list may not be empty")
- elif isinstance(feature_columns[0], (list, tuple)):
- if not isinstance(feature_column_dtypes, (list, tuple)):
- raise TypeError(
- "If `feature_columns` is a list of lists, "
- "`feature_column_dtypes` must be None, `torch.dtype`,"
- f" or a sequence, got {type(feature_column_dtypes)}."
- )
- if len(feature_columns) != len(feature_column_dtypes):
- raise ValueError(
- "`feature_columns` and `feature_column_dtypes` "
- "must have the same length."
- )
- if any(not subcolumns for subcolumns in feature_columns):
- raise ValueError("column list may not be empty")
- def make_generator():
- for batch in self._iter_batches(
- batch_size=batch_size,
- batch_format="pandas",
- prefetch_batches=prefetch_batches,
- drop_last=drop_last,
- local_shuffle_buffer_size=local_shuffle_buffer_size,
- local_shuffle_seed=local_shuffle_seed,
- ):
- if label_column:
- label_tensor = convert_pandas_to_torch_tensor(
- batch,
- [label_column],
- label_column_dtype,
- unsqueeze=unsqueeze_label_tensor,
- )
- batch.pop(label_column)
- else:
- label_tensor = None
- if isinstance(feature_columns, dict):
- features_tensor = {
- key: convert_pandas_to_torch_tensor(
- batch,
- feature_columns[key],
- (
- feature_column_dtypes[key]
- if isinstance(feature_column_dtypes, dict)
- else feature_column_dtypes
- ),
- unsqueeze=unsqueeze_feature_tensors,
- )
- for key in feature_columns
- }
- else:
- features_tensor = convert_pandas_to_torch_tensor(
- batch,
- columns=feature_columns,
- column_dtypes=feature_column_dtypes,
- unsqueeze=unsqueeze_feature_tensors,
- )
- yield (features_tensor, label_tensor)
- return TorchIterableDataset(make_generator)
- @PublicAPI
- def to_tf(
- self,
- feature_columns: Union[str, List[str]],
- label_columns: Union[str, List[str]],
- *,
- additional_columns: Union[Optional[str], Optional[List[str]]] = None,
- prefetch_batches: int = 1,
- batch_size: int = 1,
- drop_last: bool = False,
- local_shuffle_buffer_size: Optional[int] = None,
- local_shuffle_seed: Optional[int] = None,
- feature_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
- label_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
- additional_type_spec: Union[
- Optional["tf.TypeSpec"], Optional[Dict[str, "tf.TypeSpec"]]
- ] = None,
- ) -> "tf.data.Dataset":
- """Return a TF Dataset over this dataset.
- .. warning::
- If your dataset contains ragged tensors, this method errors. To prevent
- errors, :ref:`resize your tensors <transforming_tensors>`.
- Examples:
- >>> import ray
- >>> ds = ray.data.read_csv(
- ... "s3://anonymous@air-example-data/iris.csv"
- ... )
- >>> it = ds.iterator(); it
- DataIterator(Dataset(num_rows=?, schema=Unknown schema))
- If your model accepts a single tensor as input, specify a single feature column.
- >>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target")
- <_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
- If your model accepts a dictionary as input, specify a list of feature columns.
- >>> it.to_tf(["sepal length (cm)", "sepal width (cm)"], "target")
- <_OptionsDataset element_spec=({'sepal length (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), 'sepal width (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal width (cm)')}, TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
- If your dataset contains multiple features but your model accepts a single
- tensor as input, combine features with
- :class:`~ray.data.preprocessors.Concatenator`.
- >>> from ray.data.preprocessors import Concatenator
- >>> columns_to_concat = ["sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)"]
- >>> preprocessor = Concatenator(columns=columns_to_concat, output_column_name="features")
- >>> it = preprocessor.transform(ds).iterator()
- >>> it.to_tf("features", "target")
- <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
- If your model accepts different types, shapes, or names of tensors as input, specify the type spec.
- If type specs are not specified, they are automatically inferred from the schema of the iterator.
- >>> import tensorflow as tf
- >>> it.to_tf(
- ... feature_columns="features",
- ... label_columns="target",
- ... feature_type_spec=tf.TensorSpec(shape=(None, 4), dtype=tf.float32, name="features"),
- ... label_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="label")
- ... )
- <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float32, name='features'), TensorSpec(shape=(None,), dtype=tf.float32, name='label'))>
- If your model accepts additional metadata aside from features and label, specify a single additional column or a list of additional columns.
- A common use case is to include sample weights in the data samples and train a ``tf.keras.Model`` with ``tf.keras.Model.fit``.
- >>> import pandas as pd
- >>> ds = ds.add_column("sample weights", lambda df: pd.Series([1] * len(df)))
- >>> it = ds.iterator()
- >>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target", additional_columns="sample weights")
- <_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.int64, name='sample weights'))>
- If your model accepts different types, shapes, or names for the additional metadata, specify the type spec of the additional column.
- >>> it.to_tf(
- ... feature_columns="sepal length (cm)",
- ... label_columns="target",
- ... additional_columns="sample weights",
- ... additional_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="weight")
- ... )
- <_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.float32, name='weight'))>
- Args:
- feature_columns: Columns that correspond to model inputs. If this is a
- string, the input data is a tensor. If this is a list, the input data
- is a ``dict`` that maps column names to their tensor representation.
- label_columns: Columns that correspond to model targets. If this is a
- string, the target data is a tensor. If this is a list, the target data
- is a ``dict`` that maps column names to their tensor representation.
- additional_columns: Columns that correspond to sample weights or other metadata.
- If this is a string, the weight data is a tensor. If this is a list, the
- weight data is a ``dict`` that maps column names to their tensor representation.
- prefetch_batches: The number of batches to fetch ahead of the current batch
- to fetch. If set to greater than 0, a separate threadpool will be used
- to fetch the objects to the local node, format the batches, and apply
- the collate_fn. Defaults to 1.
- batch_size: Record batch size. Defaults to 1.
- drop_last: Set to True to drop the last incomplete batch,
- if the dataset size is not divisible by the batch size. If
- False and the size of dataset is not divisible by the batch
- size, then the last batch will be smaller. Defaults to False.
- local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
- using a local in-memory shuffle buffer, and this value will serve as the
- minimum number of rows that must be in the local in-memory shuffle
- buffer in order to yield a batch. When there are no more rows to add to
- the buffer, the remaining rows in the buffer will be drained. This
- buffer size must be greater than or equal to ``batch_size``, and
- therefore ``batch_size`` must also be specified when using local
- shuffling.
- local_shuffle_seed: The seed to use for the local random shuffle.
- feature_type_spec: The `tf.TypeSpec` of `feature_columns`. If there is
- only one column, specify a `tf.TypeSpec`. If there are multiple columns,
- specify a ``dict`` that maps column names to their `tf.TypeSpec`.
- Default is `None` to automatically infer the type of each column.
- label_type_spec: The `tf.TypeSpec` of `label_columns`. If there is
- only one column, specify a `tf.TypeSpec`. If there are multiple columns,
- specify a ``dict`` that maps column names to their `tf.TypeSpec`.
- Default is `None` to automatically infer the type of each column.
- additional_type_spec: The `tf.TypeSpec` of `additional_columns`. If there
- is only one column, specify a `tf.TypeSpec`. If there are multiple
- columns, specify a ``dict`` that maps column names to their `tf.TypeSpec`.
- Default is `None` to automatically infer the type of each column.
- Returns:
- A ``tf.data.Dataset`` that yields inputs and targets.
- """ # noqa: E501
- from ray.data._internal.utils.tensorflow_utils import (
- convert_ndarray_to_tf_tensor,
- get_type_spec,
- )
- try:
- import tensorflow as tf
- except ImportError:
- raise ValueError("tensorflow must be installed!")
- def validate_column(column: str) -> None:
- if column not in valid_columns:
- raise ValueError(
- f"You specified '{column}' in `feature_columns`, "
- f"`label_columns`, or `additional_columns`, but there's no "
- f"column named '{column}' in the dataset. "
- f"Valid column names are: {valid_columns}."
- )
- def validate_columns(columns: Union[str, List]) -> None:
- if isinstance(columns, list):
- for column in columns:
- validate_column(column)
- else:
- validate_column(columns)
- def convert_batch_to_tensors(
- batch: Dict[str, np.ndarray],
- *,
- columns: Union[str, List[str]],
- type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]],
- ) -> Union[tf.Tensor, Dict[str, tf.Tensor]]:
- if isinstance(columns, str):
- return convert_ndarray_to_tf_tensor(batch[columns], type_spec=type_spec)
- return {
- column: convert_ndarray_to_tf_tensor(
- batch[column], type_spec=type_spec[column]
- )
- for column in columns
- }
- def generator():
- for batch in self._iter_batches(
- prefetch_batches=prefetch_batches,
- batch_size=batch_size,
- drop_last=drop_last,
- local_shuffle_buffer_size=local_shuffle_buffer_size,
- local_shuffle_seed=local_shuffle_seed,
- ):
- assert isinstance(batch, dict)
- features = convert_batch_to_tensors(
- batch, columns=feature_columns, type_spec=feature_type_spec
- )
- labels = convert_batch_to_tensors(
- batch, columns=label_columns, type_spec=label_type_spec
- )
- if additional_columns is None:
- yield features, labels
- else:
- additional_metadata = convert_batch_to_tensors(
- batch,
- columns=additional_columns,
- type_spec=additional_type_spec,
- )
- yield features, labels, additional_metadata
- if feature_type_spec is None or label_type_spec is None:
- schema = self.schema()
- valid_columns = set(schema.names)
- validate_columns(feature_columns)
- validate_columns(label_columns)
- feature_type_spec = get_type_spec(schema, columns=feature_columns)
- label_type_spec = get_type_spec(schema, columns=label_columns)
- if additional_columns is not None and additional_type_spec is None:
- schema = self.schema()
- valid_columns = set(schema.names)
- validate_columns(additional_columns)
- additional_type_spec = get_type_spec(schema, columns=additional_columns)
- if additional_columns is not None:
- dataset = tf.data.Dataset.from_generator(
- generator,
- output_signature=(
- feature_type_spec,
- label_type_spec,
- additional_type_spec,
- ),
- )
- else:
- dataset = tf.data.Dataset.from_generator(
- generator, output_signature=(feature_type_spec, label_type_spec)
- )
- options = tf.data.Options()
- options.experimental_distribute.auto_shard_policy = (
- tf.data.experimental.AutoShardPolicy.OFF
- )
- return dataset.with_options(options)
- @PublicAPI
- def materialize(self) -> "MaterializedDataset":
- """Execute and materialize this data iterator into object store memory.
- .. note::
- This method triggers the execution and materializes all blocks
- of the iterator, returning its contents as a
- :class:`~ray.data.dataset.MaterializedDataset` for further processing.
- """
- from ray.data.dataset import MaterializedDataset
- ref_bundles_iter, stats, _, _ = self._to_ref_bundle_iterator()
- ref_bundles = list(ref_bundles_iter)
- execution_plan = ExecutionPlan(stats, self.get_context())
- logical_plan = LogicalPlan(
- InputData(input_data=ref_bundles),
- execution_plan._context,
- )
- return MaterializedDataset(
- execution_plan,
- logical_plan,
- )
- # Backwards compatibility alias.
- DatasetIterator = DataIterator
|