pandas_block.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. import collections
  2. import logging
  3. import sys
  4. from typing import (
  5. TYPE_CHECKING,
  6. Any,
  7. Dict,
  8. Iterator,
  9. List,
  10. Mapping,
  11. Optional,
  12. Tuple,
  13. TypeVar,
  14. Union,
  15. )
  16. import numpy as np
  17. import pandas as pd
  18. from pandas.api.types import is_object_dtype, is_scalar, is_string_dtype
  19. from ray.data._internal.numpy_support import convert_to_numpy
  20. from ray.data._internal.row import row_repr, row_repr_pretty, row_str
  21. from ray.data._internal.table_block import TableBlockAccessor, TableBlockBuilder
  22. from ray.data._internal.tensor_extensions.utils import _should_convert_to_tensor
  23. from ray.data._internal.util import is_null
  24. from ray.data.block import (
  25. Block,
  26. BlockAccessor,
  27. BlockColumn,
  28. BlockColumnAccessor,
  29. BlockExecStats,
  30. BlockType,
  31. U,
  32. )
  33. from ray.data.context import DataContext
  34. from ray.data.expressions import Expr
  35. if TYPE_CHECKING:
  36. import pandas
  37. import pyarrow
  38. from ray.data._internal.planner.exchange.sort_task_spec import SortKey
  39. from ray.data.block import BlockMetadataWithSchema
  40. T = TypeVar("T")
  41. # Max number of samples used to estimate the Pandas block size.
  42. _PANDAS_SIZE_BYTES_MAX_SAMPLE_COUNT = 200
  43. logger = logging.getLogger(__name__)
  44. _pandas = None
  45. def lazy_import_pandas():
  46. global _pandas
  47. if _pandas is None:
  48. import pandas
  49. _pandas = pandas
  50. return _pandas
  51. class PandasRow(Mapping):
  52. """
  53. Row of a tabular Dataset backed by a Pandas DataFrame block.
  54. """
  55. def __init__(self, row: Any):
  56. self._row = row
  57. def __getitem__(self, key: Union[str, List[str]]) -> Any:
  58. from ray.data.extensions import TensorArrayElement
  59. def get_item(keys: List[str]) -> Any:
  60. col = self._row[keys]
  61. if len(col) == 0:
  62. return None
  63. items = col.iloc[0]
  64. if isinstance(items.iloc[0], TensorArrayElement):
  65. # Getting an item in a Pandas tensor column may return
  66. # a TensorArrayElement, which we have to convert to an ndarray.
  67. return tuple(item.to_numpy() for item in items)
  68. try:
  69. # Try to interpret this as a numpy-type value.
  70. # See https://stackoverflow.com/questions/9452775/converting-numpy-dtypes-to-native-python-types. # noqa: E501
  71. return tuple(item for item in items)
  72. except (AttributeError, ValueError) as e:
  73. logger.warning(f"Failed to convert {items} to a tuple", exc_info=e)
  74. # Fallback to the original form.
  75. return items
  76. is_single_item = isinstance(key, str)
  77. keys = [key] if is_single_item else key
  78. items = get_item(keys)
  79. if items is None:
  80. return None
  81. elif is_single_item:
  82. return items[0]
  83. else:
  84. return items
  85. def __iter__(self) -> Iterator:
  86. for k in self._row.columns:
  87. yield k
  88. def __len__(self):
  89. return self._row.shape[1]
  90. def as_pydict(self) -> Dict[str, Any]:
  91. pydict: Dict[str, Any] = {}
  92. for key, value in self.items():
  93. # Convert NA to None for consistency across block formats. `pd.isna`
  94. # returns True for both NA and NaN, but since we want to preserve NaN
  95. # values, we check for identity instead.
  96. if is_scalar(value) and value is pd.NA:
  97. pydict[key] = None
  98. else:
  99. pydict[key] = value
  100. return pydict
  101. def __str__(self):
  102. return row_str(self)
  103. def __repr__(self):
  104. return row_repr(self)
  105. def _repr_pretty_(self, p, cycle):
  106. return row_repr_pretty(self, p, cycle)
  107. class PandasBlockColumnAccessor(BlockColumnAccessor):
  108. def __init__(self, col: "pandas.Series"):
  109. super().__init__(col)
  110. def count(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
  111. return self._column.count() if ignore_nulls else len(self._column)
  112. def sum(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
  113. # NOTE: Pandas ``Series`` isn't able to properly handle the case with
  114. # all-null/NaN values in the column, hence we have to handle it here
  115. if self._is_all_null():
  116. return None
  117. # NOTE: We pass `min_count=1` to workaround quirky Pandas behavior,
  118. # where (by default) when min_count=0 it will return 0.0 for
  119. # all-null/NaN series
  120. return self._column.sum(skipna=ignore_nulls, min_count=1)
  121. def min(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
  122. # NOTE: Pandas ``Series`` isn't able to properly handle the case with
  123. # all-null/NaN values in the column, hence we have to handle it here
  124. if self._is_all_null():
  125. return None
  126. return self._column.min(skipna=ignore_nulls)
  127. def max(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
  128. # NOTE: Pandas ``Series`` isn't able to properly handle the case with
  129. # all-null/NaN values in the column, hence we have to handle it here
  130. if self._is_all_null():
  131. return None
  132. return self._column.max(skipna=ignore_nulls)
  133. def mean(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
  134. # NOTE: We manually implement mean here to keep implementation consistent
  135. # with behavior of ``sum`` method returning null if the series
  136. # contains exclusively null values
  137. sum_ = self.sum(ignore_nulls=ignore_nulls)
  138. return (
  139. sum_ / self.count(ignore_nulls=ignore_nulls) if not is_null(sum_) else sum_
  140. )
  141. def quantile(
  142. self, *, q: float, ignore_nulls: bool, as_py: bool = True
  143. ) -> Optional[U]:
  144. return self._column.quantile(q=q)
  145. def value_counts(self) -> Optional[Dict[str, List]]:
  146. value_counts = self._column.value_counts()
  147. if len(value_counts) == 0:
  148. return None
  149. return {
  150. "values": value_counts.index.tolist(),
  151. "counts": value_counts.values.tolist(),
  152. }
  153. def hash(self) -> BlockColumn:
  154. from ray.data._internal.tensor_extensions.pandas import TensorArrayElement
  155. first_non_null = next((x for x in self._column if x is not None), None)
  156. if isinstance(first_non_null, TensorArrayElement):
  157. self._column = self._column.apply(lambda x: x.to_numpy())
  158. import polars as pl
  159. df = pl.from_pandas(self._column.to_frame())
  160. hashes = df.hash_rows().cast(pl.Int64, wrap_numerical=True)
  161. return hashes.to_pandas()
  162. def unique(self) -> BlockColumn:
  163. pd = lazy_import_pandas()
  164. try:
  165. if self.is_composed_of_lists():
  166. # NOTE: Pandas uses hashing internally to compute unique values,
  167. # and hence we have to convert lists into tuples to make
  168. # them hashable
  169. col = self._column.map(lambda l: l if l is None else tuple(l))
  170. else:
  171. col = self._column
  172. return pd.Series(col.unique())
  173. except ValueError as e:
  174. if "buffer source array is read-only" in str(e):
  175. # NOTE: Pandas < 2.0 somehow tries to update the underlying buffer
  176. # when computing unique values hence failing
  177. return pd.Series(self._column.copy().unique())
  178. else:
  179. raise
  180. def flatten(self) -> BlockColumn:
  181. from ray.data._internal.tensor_extensions.pandas import TensorArrayElement
  182. first_non_null = next((x for x in self._column if x is not None), None)
  183. if not isinstance(first_non_null, TensorArrayElement):
  184. column = self._column
  185. else:
  186. column = self._column.apply(
  187. lambda x: x.to_numpy() if isinstance(x, TensorArrayElement) else x
  188. )
  189. # NOTE: `Series.explode` explodes empty lists into NaNs, necessitating
  190. # filtering out of empty lists first
  191. if self.is_composed_of_lists():
  192. mask = column.apply(lambda x: x is not None and len(x) > 0)
  193. column = column[mask]
  194. return column.explode(ignore_index=True)
  195. def dropna(self) -> BlockColumn:
  196. return self._column.dropna()
  197. def sum_of_squared_diffs_from_mean(
  198. self,
  199. ignore_nulls: bool,
  200. mean: Optional[U] = None,
  201. as_py: bool = True,
  202. ) -> Optional[U]:
  203. if mean is None:
  204. mean = self.mean(ignore_nulls=ignore_nulls)
  205. if is_null(mean):
  206. return mean
  207. return ((self._column - mean) ** 2).sum(skipna=ignore_nulls)
  208. def to_pylist(self) -> List[Any]:
  209. return self._column.to_list()
  210. def to_numpy(self, zero_copy_only: bool = False) -> np.ndarray:
  211. """NOTE: Unlike Arrow, specifying `zero_copy_only=True` isn't a guarantee
  212. that no copy will be made
  213. """
  214. return self._column.to_numpy(copy=not zero_copy_only)
  215. def _as_arrow_compatible(self) -> Union[List[Any], "pyarrow.Array"]:
  216. return self.to_pylist()
  217. def _is_all_null(self):
  218. return not self._column.notna().any()
  219. def is_composed_of_lists(self) -> bool:
  220. from ray.data._internal.tensor_extensions.pandas import TensorArrayElement
  221. types = (list, np.ndarray, TensorArrayElement)
  222. first_non_null = next((x for x in self._column if x is not None), None)
  223. return isinstance(first_non_null, types)
  224. class PandasBlockBuilder(TableBlockBuilder):
  225. def __init__(self):
  226. pandas = lazy_import_pandas()
  227. super().__init__(pandas.DataFrame)
  228. @staticmethod
  229. def _table_from_pydict(columns: Dict[str, List[Any]]) -> "pandas.DataFrame":
  230. from ray.data.extensions.tensor_extension import TensorArray
  231. pandas = lazy_import_pandas()
  232. return pandas.DataFrame(
  233. {
  234. column_name: (
  235. TensorArray(convert_to_numpy(column_values))
  236. if len(column_values) > 0
  237. and _should_convert_to_tensor(column_values, column_name)
  238. else column_values
  239. )
  240. for column_name, column_values in columns.items()
  241. }
  242. )
  243. @staticmethod
  244. def _combine_tables(tables: List["pandas.DataFrame"]) -> "pandas.DataFrame":
  245. pandas = lazy_import_pandas()
  246. from ray.data.util.data_batch_conversion import (
  247. _cast_ndarray_columns_to_tensor_extension,
  248. )
  249. if len(tables) > 1:
  250. df = pandas.concat(tables, ignore_index=True)
  251. df.reset_index(drop=True, inplace=True)
  252. else:
  253. df = tables[0]
  254. ctx = DataContext.get_current()
  255. if ctx.enable_tensor_extension_casting:
  256. df = _cast_ndarray_columns_to_tensor_extension(df)
  257. return df
  258. @staticmethod
  259. def _concat_would_copy() -> bool:
  260. return True
  261. @staticmethod
  262. def _empty_table() -> "pandas.DataFrame":
  263. pandas = lazy_import_pandas()
  264. return pandas.DataFrame()
  265. def block_type(self) -> BlockType:
  266. return BlockType.PANDAS
  267. # This is to be compatible with pyarrow.lib.schema
  268. # TODO (kfstorm): We need a format-independent way to represent schema.
  269. PandasBlockSchema = collections.namedtuple("PandasBlockSchema", ["names", "types"])
  270. class PandasBlockAccessor(TableBlockAccessor):
  271. ROW_TYPE = PandasRow
  272. def __init__(self, table: "pandas.DataFrame"):
  273. super().__init__(table)
  274. def _get_row(self, index: int) -> PandasRow:
  275. base_row = self.slice(index, index + 1, copy=False)
  276. return PandasRow(base_row)
  277. def column_names(self) -> List[str]:
  278. return self._table.columns.tolist()
  279. def fill_column(self, name: str, value: Any) -> Block:
  280. # Check if value is array-like - if so, use upsert_column logic
  281. if isinstance(value, (pd.Series, np.ndarray)):
  282. return self.upsert_column(name, value)
  283. # Scalar value - use original fill_column logic
  284. return self._table.assign(**{name: value})
  285. def slice(self, start: int, end: int, copy: bool = False) -> "pandas.DataFrame":
  286. view = self._table[start:end]
  287. view.reset_index(drop=True, inplace=True)
  288. if copy:
  289. view = view.copy(deep=True)
  290. return view
  291. def take(self, indices: List[int]) -> "pandas.DataFrame":
  292. table = self._table.take(indices)
  293. table.reset_index(drop=True, inplace=True)
  294. return table
  295. def drop(self, columns: List[str]) -> Block:
  296. return self._table.drop(columns, axis="columns")
  297. def select(self, columns: List[str]) -> "pandas.DataFrame":
  298. if not all(isinstance(col, str) for col in columns):
  299. raise ValueError(
  300. "Columns must be a list of column name strings when aggregating on "
  301. f"Pandas blocks, but got: {columns}."
  302. )
  303. return self._table[columns]
  304. def rename_columns(self, columns_rename: Dict[str, str]) -> "pandas.DataFrame":
  305. return self._table.rename(columns=columns_rename, inplace=False, copy=False)
  306. def upsert_column(
  307. self, column_name: str, column_data: BlockColumn
  308. ) -> "pandas.DataFrame":
  309. import pyarrow
  310. if isinstance(column_data, (pyarrow.Array, pyarrow.ChunkedArray)):
  311. column_data = column_data.to_pandas()
  312. return self._table.assign(**{column_name: column_data})
  313. def random_shuffle(self, random_seed: Optional[int]) -> "pandas.DataFrame":
  314. table = self._table.sample(frac=1, random_state=random_seed)
  315. table.reset_index(drop=True, inplace=True)
  316. return table
  317. def schema(self) -> PandasBlockSchema:
  318. dtypes = self._table.dtypes
  319. schema = PandasBlockSchema(
  320. names=dtypes.index.tolist(), types=dtypes.values.tolist()
  321. )
  322. # Column names with non-str types of a pandas DataFrame is not
  323. # supported by Ray Dataset.
  324. if any(not isinstance(name, str) for name in schema.names):
  325. raise ValueError(
  326. "A Pandas DataFrame with column names of non-str types"
  327. " is not supported by Ray Dataset. Column names of this"
  328. f" DataFrame: {schema.names!r}."
  329. )
  330. return schema
  331. def to_pandas(self) -> "pandas.DataFrame":
  332. from ray.data.util.data_batch_conversion import _cast_tensor_columns_to_ndarrays
  333. ctx = DataContext.get_current()
  334. table = self._table
  335. if ctx.enable_tensor_extension_casting:
  336. table = _cast_tensor_columns_to_ndarrays(table)
  337. return table
  338. def to_numpy(
  339. self, columns: Optional[Union[str, List[str]]] = None
  340. ) -> Union[np.ndarray, Dict[str, np.ndarray]]:
  341. if columns is None:
  342. columns = self._table.columns.tolist()
  343. should_be_single_ndarray = False
  344. elif isinstance(columns, list):
  345. should_be_single_ndarray = False
  346. else:
  347. columns = [columns]
  348. should_be_single_ndarray = True
  349. column_names_set = set(self._table.columns)
  350. for column in columns:
  351. if column not in column_names_set:
  352. raise ValueError(
  353. f"Cannot find column {column}, available columns: "
  354. f"{self._table.columns.tolist()}"
  355. )
  356. arrays = []
  357. for column in columns:
  358. arrays.append(self._table[column].to_numpy())
  359. if should_be_single_ndarray:
  360. arrays = arrays[0]
  361. else:
  362. arrays = dict(zip(columns, arrays))
  363. return arrays
  364. def to_arrow(self) -> "pyarrow.Table":
  365. import pyarrow as pa
  366. from ray.data._internal.tensor_extensions.pandas import TensorDtype
  367. # Set `preserve_index=False` so that Arrow doesn't add a '__index_level_0__'
  368. # column to the resulting table.
  369. arrow_table = pa.Table.from_pandas(self._table, preserve_index=False)
  370. # NOTE: Pandas by default coerces all-null column types (including None,
  371. # NaN, etc) into "double" type by default, which is incorrect in a
  372. # a lot of cases.
  373. #
  374. # To fix that, we traverse all the columns after conversion and
  375. # replace all-null ones with the column of null-type that allows
  376. # these columns to be properly combined with the same column
  377. # containing non-null values and carrying appropriate type later.
  378. null_coerced_columns = {}
  379. for idx, col_name in enumerate(self._table.columns):
  380. col = self._table[col_name]
  381. # Skip coercing tensors to null-type to avoid type information loss
  382. # See https://github.com/ray-project/ray/issues/59087 for context
  383. if isinstance(col.dtype, TensorDtype):
  384. continue
  385. if not col.notna().any():
  386. # If there are only null-values, coerce column to Arrow's `NullType`
  387. null_coerced_columns[(idx, col_name)] = pa.nulls(
  388. len(col), type=pa.null()
  389. )
  390. # NOTE: We're updating columns in place to preserve any potential metadata
  391. # set from conversion from original Pandas data-frame
  392. for (idx, col_name), null_col in null_coerced_columns.items():
  393. arrow_table = arrow_table.set_column(idx, col_name, null_col)
  394. return arrow_table
  395. def num_rows(self) -> int:
  396. return self._table.shape[0]
  397. def size_bytes(self) -> int:
  398. from ray.data._internal.tensor_extensions.pandas import TensorArray
  399. from ray.data.extensions import TensorArrayElement, TensorDtype
  400. pd = lazy_import_pandas()
  401. def get_deep_size(obj):
  402. """Calculates the memory size of objects,
  403. including nested objects using an iterative approach."""
  404. seen = set()
  405. total_size = 0
  406. objects = collections.deque([obj])
  407. while objects:
  408. current = objects.pop()
  409. # Skip interning-eligible immutable objects
  410. if isinstance(current, (str, bytes, int, float)):
  411. size = sys.getsizeof(current)
  412. total_size += size
  413. continue
  414. # Check if the object has been seen before
  415. # i.e. a = np.ndarray([1,2,3]), b = [a,a]
  416. # The patten above will have only one memory copy
  417. if id(current) in seen:
  418. continue
  419. seen.add(id(current))
  420. try:
  421. size = sys.getsizeof(current)
  422. except TypeError:
  423. size = 0
  424. total_size += size
  425. # Handle specific cases
  426. if isinstance(current, np.ndarray):
  427. total_size += current.nbytes - size # Avoid double counting
  428. elif isinstance(current, pd.DataFrame):
  429. total_size += (
  430. current.memory_usage(index=True, deep=True).sum() - size
  431. )
  432. elif isinstance(current, (list, tuple, set)):
  433. objects.extend(current)
  434. elif isinstance(current, dict):
  435. objects.extend(current.keys())
  436. objects.extend(current.values())
  437. elif isinstance(current, TensorArrayElement):
  438. objects.extend(current.to_numpy())
  439. return total_size
  440. # Get initial memory usage.
  441. # No need for deep inspection here, as we will handle the str, object and
  442. # extension columns separately.
  443. memory_usage = self._table.memory_usage(index=True, deep=False)
  444. # TensorDtype for ray.data._internal.tensor_extensions.pandas.TensorDtype
  445. object_need_check = (TensorDtype,)
  446. max_sample_count = _PANDAS_SIZE_BYTES_MAX_SAMPLE_COUNT
  447. # Handle object columns separately
  448. for column in self._table.columns:
  449. # For str, object and extension dtypes, we calculate the size
  450. # by sampling the data.
  451. dtype = self._table[column].dtype
  452. if (
  453. is_string_dtype(dtype)
  454. or is_object_dtype(dtype)
  455. or isinstance(dtype, object_need_check)
  456. ):
  457. total_size = len(self._table[column])
  458. # Determine the sample size based on max_sample_count
  459. sample_size = min(total_size, max_sample_count)
  460. # Skip size calculation for empty columns
  461. if sample_size == 0:
  462. continue
  463. # Following codes can also handel case that sample_size == total_size
  464. sampled_data = self._table[column].sample(n=sample_size).values
  465. try:
  466. if isinstance(sampled_data, TensorArray) and np.issubdtype(
  467. sampled_data[0].numpy_dtype, np.number
  468. ):
  469. column_memory_sample = sampled_data.nbytes
  470. else:
  471. vectorized_size_calc = np.vectorize(lambda x: get_deep_size(x))
  472. column_memory_sample = np.sum(
  473. vectorized_size_calc(sampled_data)
  474. )
  475. # Scale back to the full column size if we sampled
  476. column_memory = column_memory_sample * (total_size / sample_size)
  477. # Add the data memory usage on top of the index memory usage.
  478. memory_usage[column] += int(column_memory)
  479. except Exception as e:
  480. # Handle or log the exception as needed
  481. logger.warning(f"Error calculating size for column '{column}': {e}")
  482. # Sum up total memory usage
  483. total_memory_usage = memory_usage.sum()
  484. return int(total_memory_usage)
  485. def _zip(self, acc: BlockAccessor) -> "pandas.DataFrame":
  486. r = self.to_pandas().copy(deep=False)
  487. s = acc.to_pandas()
  488. for col_name in s.columns:
  489. col = s[col_name]
  490. column_names = list(r.columns)
  491. # Ensure the column names are unique after zip.
  492. if col_name in column_names:
  493. i = 1
  494. new_name = col_name
  495. while new_name in column_names:
  496. new_name = "{}_{}".format(col_name, i)
  497. i += 1
  498. col_name = new_name
  499. r[col_name] = col
  500. return r
  501. @staticmethod
  502. def builder() -> PandasBlockBuilder:
  503. return PandasBlockBuilder()
  504. @staticmethod
  505. def _empty_table() -> "pandas.DataFrame":
  506. return PandasBlockBuilder._empty_table()
  507. def _sample(self, n_samples: int, sort_key: "SortKey") -> "pandas.DataFrame":
  508. return self._table[sort_key.get_columns()].sample(n_samples, ignore_index=True)
  509. def sort(self, sort_key: "SortKey"):
  510. assert (
  511. sort_key.get_columns()
  512. ), f"Sorting columns couldn't be empty (got {sort_key.get_columns()})"
  513. if self._table.shape[0] == 0:
  514. return self._empty_table()
  515. columns, ascending = sort_key.to_pandas_sort_args()
  516. return self._table.sort_values(by=columns, ascending=ascending)
  517. def sort_and_partition(
  518. self, boundaries: List[T], sort_key: "SortKey"
  519. ) -> List[Block]:
  520. table = self.sort(sort_key)
  521. if table.shape[0] == 0:
  522. # If the pyarrow table is empty we may not have schema
  523. # so calling sort_indices() will raise an error.
  524. return [self._empty_table() for _ in range(len(boundaries) + 1)]
  525. elif len(boundaries) == 0:
  526. return [table]
  527. return BlockAccessor.for_block(table)._find_partitions_sorted(
  528. boundaries, sort_key
  529. )
  530. @staticmethod
  531. def merge_sorted_blocks(
  532. blocks: List[Block], sort_key: "SortKey"
  533. ) -> Tuple[Block, "BlockMetadataWithSchema"]:
  534. pd = lazy_import_pandas()
  535. stats = BlockExecStats.builder()
  536. blocks = [b for b in blocks if b.shape[0] > 0]
  537. if len(blocks) == 0:
  538. ret = PandasBlockAccessor._empty_table()
  539. else:
  540. # Handle blocks of different types.
  541. blocks = TableBlockAccessor.normalize_block_types(blocks, BlockType.PANDAS)
  542. ret = pd.concat(blocks, ignore_index=True)
  543. columns, ascending = sort_key.to_pandas_sort_args()
  544. ret = ret.sort_values(by=columns, ascending=ascending)
  545. from ray.data.block import BlockMetadataWithSchema
  546. return ret, BlockMetadataWithSchema.from_block(ret, stats=stats.build())
  547. def block_type(self) -> BlockType:
  548. return BlockType.PANDAS
  549. def iter_rows(
  550. self, public_row_format: bool
  551. ) -> Iterator[Union[Mapping, np.ndarray]]:
  552. num_rows = self.num_rows()
  553. for i in range(num_rows):
  554. row = self._get_row(i)
  555. if public_row_format:
  556. yield row.as_pydict()
  557. else:
  558. yield row
  559. def filter(self, predicate_expr: "Expr") -> "pandas.DataFrame":
  560. """Filter rows based on a predicate expression."""
  561. if self._table.empty:
  562. return self._table
  563. from ray.data._internal.planner.plan_expression.expression_evaluator import (
  564. eval_expr,
  565. )
  566. # Evaluate the expression to get a boolean mask
  567. mask = eval_expr(predicate_expr, self._table)
  568. # Use pandas boolean indexing
  569. return self._table[mask]