| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613 |
- import logging
- import random
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Dict,
- Iterator,
- List,
- Mapping,
- Optional,
- Tuple,
- TypeVar,
- Union,
- )
- import numpy as np
- from packaging.version import parse as parse_version
- from ray._private.ray_constants import env_integer
- from ray.data._internal.arrow_ops import transform_polars, transform_pyarrow
- from ray.data._internal.arrow_ops.transform_pyarrow import shuffle
- from ray.data._internal.row import row_repr, row_repr_pretty, row_str
- from ray.data._internal.table_block import TableBlockAccessor, TableBlockBuilder
- from ray.data._internal.tensor_extensions.arrow import (
- convert_to_pyarrow_array,
- pyarrow_table_from_pydict,
- )
- from ray.data._internal.utils.arrow_utils import get_pyarrow_version
- from ray.data.block import (
- Block,
- BlockAccessor,
- BlockColumn,
- BlockColumnAccessor,
- BlockExecStats,
- BlockMetadataWithSchema,
- BlockType,
- U,
- )
- from ray.data.context import DEFAULT_TARGET_MAX_BLOCK_SIZE, DataContext
- from ray.data.expressions import Expr
- try:
- import pyarrow
- except ImportError:
- pyarrow = None
- if TYPE_CHECKING:
- import pandas
- from ray.data._internal.planner.exchange.sort_task_spec import SortKey
- T = TypeVar("T")
- logger = logging.getLogger(__name__)
- _MIN_PYARROW_VERSION_TO_NUMPY_ZERO_COPY_ONLY = parse_version("13.0.0")
- _BATCH_SIZE_PRESERVING_STUB_COL_NAME = "__bsp_stub"
- # Set the max chunk size in bytes for Arrow to Batches conversion in
- # ArrowBlockAccessor.iter_rows(). Default to 4MB, to optimize for image
- # datasets in parquet format.
- ARROW_MAX_CHUNK_SIZE_BYTES = env_integer(
- "RAY_DATA_ARROW_MAX_CHUNK_SIZE_BYTES",
- int(DEFAULT_TARGET_MAX_BLOCK_SIZE / 32),
- )
- # We offload some transformations to polars for performance.
- def get_sort_transform(context: DataContext) -> Callable:
- if context.use_polars or context.use_polars_sort:
- return transform_polars.sort
- else:
- return transform_pyarrow.sort
- def get_concat_and_sort_transform(context: DataContext) -> Callable:
- if context.use_polars or context.use_polars_sort:
- return transform_polars.concat_and_sort
- else:
- return transform_pyarrow.concat_and_sort
- class ArrowRow(Mapping):
- """
- Row of a tabular Dataset backed by a Arrow Table block.
- """
- def __init__(self, row: Any):
- self._row = row
- def __getitem__(self, key: Union[str, List[str]]) -> Any:
- from ray.data.extensions import get_arrow_extension_tensor_types
- tensor_arrow_extension_types = get_arrow_extension_tensor_types()
- def get_item(keys: List[str]) -> Any:
- schema = self._row.schema
- if isinstance(schema.field(keys[0]).type, tensor_arrow_extension_types):
- # Build a tensor row.
- return tuple(
- [
- ArrowBlockAccessor._build_tensor_row(
- self._row, col_name=key, row_idx=0
- )
- for key in keys
- ]
- )
- table = self._row.select(keys)
- if len(table) == 0:
- return None
- items = [col[0] for col in table.columns]
- try:
- # Try to interpret this as a pyarrow.Scalar value.
- return tuple([item.as_py() for item in items])
- except AttributeError:
- # Assume that this row is an element of an extension array, and
- # that it is bypassing pyarrow's scalar model for Arrow < 8.0.0.
- return items
- is_single_item = isinstance(key, str)
- keys = [key] if is_single_item else key
- items = get_item(keys)
- if items is None:
- return None
- elif is_single_item:
- return items[0]
- else:
- return items
- def __iter__(self) -> Iterator:
- for k in self._row.column_names:
- yield k
- def __len__(self):
- return self._row.num_columns
- def as_pydict(self) -> Dict[str, Any]:
- return dict(self.items())
- def __str__(self):
- return row_str(self)
- def __repr__(self):
- return row_repr(self)
- def _repr_pretty_(self, p, cycle):
- return row_repr_pretty(self, p, cycle)
- class ArrowBlockBuilder(TableBlockBuilder):
- def __init__(self):
- if pyarrow is None:
- raise ImportError("Run `pip install pyarrow` for Arrow support")
- super().__init__((pyarrow.Table, bytes))
- @staticmethod
- def _table_from_pydict(columns: Dict[str, List[Any]]) -> Block:
- return pyarrow_table_from_pydict(
- {
- column_name: convert_to_pyarrow_array(column_values, column_name)
- for column_name, column_values in columns.items()
- }
- )
- @staticmethod
- def _combine_tables(tables: List[Block]) -> Block:
- if len(tables) > 1:
- return transform_pyarrow.concat(tables, promote_types=True)
- else:
- return tables[0]
- @staticmethod
- def _concat_would_copy() -> bool:
- return False
- @staticmethod
- def _empty_table() -> "pyarrow.Table":
- return pyarrow_table_from_pydict({})
- def block_type(self) -> BlockType:
- return BlockType.ARROW
- def _get_max_chunk_size(
- table: "pyarrow.Table", max_chunk_size_bytes: int
- ) -> Optional[int]:
- """
- Calculate the max chunk size in rows for Arrow to Batches conversion in
- ArrowBlockAccessor.iter_rows().
- Args:
- table: The pyarrow table to calculate the max chunk size for.
- max_chunk_size_bytes: The max chunk size in bytes.
- Returns:
- The max chunk size in rows, or None if the table is empty.
- """
- if table.nbytes == 0:
- return None
- else:
- avg_row_size = table.nbytes / table.num_rows
- return max(1, int(max_chunk_size_bytes / avg_row_size))
- class ArrowBlockAccessor(TableBlockAccessor):
- ROW_TYPE = ArrowRow
- def __init__(self, table: "pyarrow.Table"):
- if pyarrow is None:
- raise ImportError("Run `pip install pyarrow` for Arrow support")
- super().__init__(table)
- self._max_chunk_size: Optional[int] = None
- def _get_row(self, index: int) -> ArrowRow:
- base_row = self.slice(index, index + 1, copy=False)
- return ArrowRow(base_row)
- def column_names(self) -> List[str]:
- return self._table.column_names
- def fill_column(self, name: str, value: Any) -> Block:
- import pyarrow.compute as pc
- # Check if value is array-like - if so, use upsert_column logic
- if isinstance(value, (pyarrow.Array, pyarrow.ChunkedArray)):
- return self.upsert_column(name, value)
- else:
- # Scalar value - use original fill_column logic
- if isinstance(value, pyarrow.Scalar):
- type = value.type
- else:
- type = pyarrow.infer_type([value])
- array = pyarrow.nulls(len(self._table), type=type)
- array = pc.fill_null(array, value)
- return self.upsert_column(name, array)
- @classmethod
- def from_bytes(cls, data: bytes) -> "ArrowBlockAccessor":
- reader = pyarrow.ipc.open_stream(data)
- return cls(reader.read_all())
- @staticmethod
- def _build_tensor_row(row: ArrowRow, row_idx: int, col_name: str) -> np.ndarray:
- element = row[col_name][row_idx]
- arr = element.as_py()
- assert isinstance(arr, np.ndarray), type(arr)
- return arr
- def slice(self, start: int, end: int, copy: bool = False) -> "pyarrow.Table":
- view = self._table.slice(start, end - start)
- if copy:
- view = transform_pyarrow.combine_chunks(view, copy)
- return view
- def random_shuffle(self, random_seed: Optional[int]) -> "pyarrow.Table":
- return shuffle(self._table, random_seed)
- def schema(self) -> "pyarrow.lib.Schema":
- return self._table.schema
- def to_pandas(self) -> "pandas.DataFrame":
- from ray.data.util.data_batch_conversion import _cast_tensor_columns_to_ndarrays
- # We specify ignore_metadata=True because pyarrow will use the metadata
- # to build the Table. This is handled incorrectly for older pyarrow versions
- ctx = DataContext.get_current()
- df = self._table.to_pandas(ignore_metadata=ctx.pandas_block_ignore_metadata)
- if ctx.enable_tensor_extension_casting:
- df = _cast_tensor_columns_to_ndarrays(df)
- return df
- def to_numpy(
- self, columns: Optional[Union[str, List[str]]] = None
- ) -> Union[np.ndarray, Dict[str, np.ndarray]]:
- if columns is None:
- columns = self._table.column_names
- should_be_single_ndarray = False
- elif isinstance(columns, list):
- should_be_single_ndarray = False
- else:
- columns = [columns]
- should_be_single_ndarray = True
- column_names_set = set(self._table.column_names)
- for column in columns:
- if column not in column_names_set:
- raise ValueError(
- f"Cannot find column {column}, available columns: "
- f"{column_names_set}"
- )
- column_values_ndarrays = []
- for col_name in columns:
- col = self._table[col_name]
- # Combine columnar values arrays to make these contiguous
- # (making them compatible with numpy format)
- combined_array = transform_pyarrow.combine_chunked_array(col)
- column_values_ndarrays.append(
- transform_pyarrow.to_numpy(combined_array, zero_copy_only=False)
- )
- if should_be_single_ndarray:
- assert len(columns) == 1
- return column_values_ndarrays[0]
- else:
- return dict(zip(columns, column_values_ndarrays))
- def to_arrow(self) -> "pyarrow.Table":
- return self._table
- def num_rows(self) -> int:
- # Arrow may represent an empty table via an N > 0 row, 0-column table, e.g. when
- # slicing an empty table, so we return 0 if num_columns == 0.
- return self._table.num_rows if self._table.num_columns > 0 else 0
- def size_bytes(self) -> int:
- return self._table.nbytes
- def _zip(self, acc: BlockAccessor) -> "Block":
- r = self.to_arrow()
- s = acc.to_arrow()
- for col_name in s.column_names:
- col = s.column(col_name)
- # Ensure the column names are unique after zip.
- if col_name in r.column_names:
- i = 1
- new_name = col_name
- while new_name in r.column_names:
- new_name = "{}_{}".format(col_name, i)
- i += 1
- col_name = new_name
- r = r.append_column(col_name, col)
- return r
- def upsert_column(
- self, column_name: str, column_data: BlockColumn
- ) -> "pyarrow.Table":
- assert isinstance(
- column_data, (pyarrow.Array, pyarrow.ChunkedArray)
- ), f"Expected either a pyarrow.Array or pyarrow.ChunkedArray, got: {type(column_data)}"
- column_idx = self._table.schema.get_field_index(column_name)
- if column_idx == -1:
- return self._table.append_column(column_name, column_data)
- else:
- return self._table.set_column(column_idx, column_name, column_data)
- @staticmethod
- def builder() -> ArrowBlockBuilder:
- return ArrowBlockBuilder()
- @staticmethod
- def _empty_table() -> "pyarrow.Table":
- return ArrowBlockBuilder._empty_table()
- def take(
- self,
- indices: Union[List[int], "pyarrow.Array", "pyarrow.ChunkedArray"],
- ) -> "pyarrow.Table":
- """Select rows from the underlying table.
- This method is an alternative to pyarrow.Table.take(), which breaks for
- extension arrays.
- """
- return transform_pyarrow.take_table(self._table, indices)
- def drop(self, columns: List[str]) -> Block:
- return self._table.drop(columns)
- def select(self, columns: List[str]) -> "pyarrow.Table":
- if not all(isinstance(col, str) for col in columns):
- raise ValueError(
- "Columns must be a list of column name strings when aggregating on "
- f"Arrow blocks, but got: {columns}."
- )
- if len(columns) == 0:
- # Applicable for count which does an empty projection.
- # Pyarrow returns a table with 0 columns and num_rows rows.
- return self.fill_column(_BATCH_SIZE_PRESERVING_STUB_COL_NAME, None)
- return self._table.select(columns)
- def rename_columns(self, columns_rename: Dict[str, str]) -> "pyarrow.Table":
- return self._table.rename_columns(columns_rename)
- def hstack(self, other_block: "pyarrow.Table") -> "pyarrow.Table":
- result_table = self._table
- for name, column in zip(other_block.column_names, other_block.columns):
- result_table = result_table.append_column(name, column)
- return result_table
- def _sample(self, n_samples: int, sort_key: "SortKey") -> "pyarrow.Table":
- indices = random.sample(range(self._table.num_rows), n_samples)
- table = self._table.select(sort_key.get_columns())
- return transform_pyarrow.take_table(table, indices)
- def sort(self, sort_key: "SortKey") -> Block:
- assert (
- sort_key.get_columns()
- ), f"Sorting columns couldn't be empty (got {sort_key.get_columns()})"
- if self._table.num_rows == 0:
- # If the pyarrow table is empty we may not have schema
- # so calling sort_indices() will raise an error.
- return self._empty_table()
- context = DataContext.get_current()
- sort = get_sort_transform(context)
- return sort(self._table, sort_key)
- def sort_and_partition(
- self, boundaries: List[T], sort_key: "SortKey"
- ) -> List["Block"]:
- table = self.sort(sort_key)
- if table.num_rows == 0:
- return [self._empty_table() for _ in range(len(boundaries) + 1)]
- elif len(boundaries) == 0:
- return [table]
- return BlockAccessor.for_block(table)._find_partitions_sorted(
- boundaries, sort_key
- )
- @staticmethod
- def merge_sorted_blocks(
- blocks: List[Block], sort_key: "SortKey"
- ) -> Tuple[Block, BlockMetadataWithSchema]:
- stats = BlockExecStats.builder()
- blocks = [b for b in blocks if b.num_rows > 0]
- if len(blocks) == 0:
- ret = ArrowBlockAccessor._empty_table()
- else:
- # Handle blocks of different types.
- blocks = TableBlockAccessor.normalize_block_types(blocks, BlockType.ARROW)
- concat_and_sort = get_concat_and_sort_transform(DataContext.get_current())
- ret = concat_and_sort(blocks, sort_key, promote_types=True)
- return ret, BlockMetadataWithSchema.from_block(ret, stats=stats.build())
- def block_type(self) -> BlockType:
- return BlockType.ARROW
- def iter_rows(
- self, public_row_format: bool
- ) -> Iterator[Union[Mapping, np.ndarray]]:
- table = self._table
- if public_row_format:
- if self._max_chunk_size is None:
- # Calling _get_max_chunk_size in constructor makes it slow, so we
- # are calling it here only when needed.
- self._max_chunk_size = _get_max_chunk_size(
- table, ARROW_MAX_CHUNK_SIZE_BYTES
- )
- for batch in table.to_batches(max_chunksize=self._max_chunk_size):
- yield from batch.to_pylist()
- else:
- num_rows = self.num_rows()
- for i in range(num_rows):
- yield self._get_row(i)
- def filter(self, predicate_expr: "Expr") -> "pyarrow.Table":
- """Filter rows based on a predicate expression."""
- if self._table.num_rows == 0:
- return self._table
- from ray.data._internal.planner.plan_expression.expression_evaluator import (
- eval_expr,
- )
- # Evaluate the expression to get a boolean mask
- mask = eval_expr(predicate_expr, self._table)
- # Use PyArrow's built-in filter method
- return self._table.filter(mask)
- class ArrowBlockColumnAccessor(BlockColumnAccessor):
- def __init__(self, col: Union["pyarrow.Array", "pyarrow.ChunkedArray"]):
- super().__init__(col)
- def count(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
- import pyarrow.compute as pac
- res = pac.count(self._column, mode="only_valid" if ignore_nulls else "all")
- return res.as_py() if as_py else res
- def sum(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
- import pyarrow.compute as pac
- res = pac.sum(self._column, skip_nulls=ignore_nulls)
- return res.as_py() if as_py else res
- def min(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
- import pyarrow.compute as pac
- res = pac.min(self._column, skip_nulls=ignore_nulls)
- return res.as_py() if as_py else res
- def max(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
- import pyarrow.compute as pac
- res = pac.max(self._column, skip_nulls=ignore_nulls)
- return res.as_py() if as_py else res
- def mean(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
- import pyarrow.compute as pac
- res = pac.mean(self._column, skip_nulls=ignore_nulls)
- return res.as_py() if as_py else res
- def sum_of_squared_diffs_from_mean(
- self, ignore_nulls: bool, mean: Optional[U] = None, as_py: bool = True
- ) -> Optional[U]:
- import pyarrow.compute as pac
- # Calculate mean if not provided
- if mean is None:
- mean = self.mean(ignore_nulls=ignore_nulls)
- if mean is None:
- return None
- res = pac.sum(
- pac.power(pac.subtract(self._column, mean), 2), skip_nulls=ignore_nulls
- )
- return res.as_py() if as_py else res
- def quantile(
- self, *, q: float, ignore_nulls: bool, as_py: bool = True
- ) -> Optional[U]:
- import pyarrow.compute as pac
- array = pac.quantile(self._column, q=q, skip_nulls=ignore_nulls)
- # NOTE: That quantile method still returns an array
- res = array[0]
- return res.as_py() if as_py else res
- def unique(self) -> BlockColumn:
- import pyarrow.compute as pac
- if self.is_composed_of_lists():
- # NOTE: Arrow doesn't provide unique kernels for `ListArray`s and
- # such, so we rely on Polars to encode and compute unique
- # values instead
- import polars
- return polars.from_arrow(self._column).unique().to_arrow()
- return pac.unique(self._column)
- def value_counts(self) -> Optional[Dict[str, List]]:
- import pyarrow.compute as pac
- value_counts: pyarrow.StructArray = pac.value_counts(self._column)
- if len(value_counts) == 0:
- return None
- return {
- "values": value_counts.field("values").to_pylist(),
- "counts": value_counts.field("counts").to_pylist(),
- }
- def hash(self) -> BlockColumn:
- import polars as pl
- df = pl.DataFrame({"col": self._column})
- hashes = df.hash_rows().cast(pl.Int64, wrap_numerical=True)
- return hashes.to_arrow()
- def flatten(self) -> BlockColumn:
- import pyarrow.compute as pac
- return pac.list_flatten(self._column)
- def dropna(self) -> BlockColumn:
- import pyarrow.compute as pac
- return pac.drop_null(self._column)
- def is_composed_of_lists(self) -> bool:
- types = (pyarrow.lib.ListType, pyarrow.lib.LargeListType)
- return isinstance(self._column.type, types)
- def to_pylist(self) -> List[Any]:
- return self._column.to_pylist()
- def to_numpy(self, zero_copy_only: bool = False) -> np.ndarray:
- if get_pyarrow_version() < _MIN_PYARROW_VERSION_TO_NUMPY_ZERO_COPY_ONLY:
- if isinstance(
- self._column, pyarrow.ChunkedArray
- ): # NOTE: ChunkedArray in Pyarrow < 13.0.0 does not support ``zero_copy_only``
- return self._column.to_numpy()
- else:
- return self._column.to_numpy(zero_copy_only=zero_copy_only)
- return self._column.to_numpy(zero_copy_only=zero_copy_only)
- def _as_arrow_compatible(self) -> Union[List[Any], "pyarrow.Array"]:
- return self._column
|