block.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840
  1. import collections
  2. import logging
  3. import time
  4. from dataclasses import dataclass, fields
  5. from enum import Enum
  6. from typing import (
  7. TYPE_CHECKING,
  8. Any,
  9. Callable,
  10. Dict,
  11. Iterator,
  12. List,
  13. Optional,
  14. Protocol,
  15. Tuple,
  16. TypeVar,
  17. Union,
  18. )
  19. import numpy as np
  20. import pyarrow as pa
  21. import ray
  22. from ray.data._internal.util import _check_pyarrow_version, _truncated_repr
  23. from ray.types import ObjectRef
  24. from ray.util import log_once
  25. from ray.util.annotations import DeveloperAPI
  26. if TYPE_CHECKING:
  27. import pandas
  28. import pyarrow
  29. from ray.data._internal.block_builder import BlockBuilder
  30. from ray.data._internal.pandas_block import PandasBlockSchema
  31. from ray.data._internal.planner.exchange.sort_task_spec import SortKey
  32. from ray.data.aggregate import AggregateFn
  33. T = TypeVar("T", contravariant=True)
  34. U = TypeVar("U", covariant=True)
  35. KeyType = TypeVar("KeyType")
  36. AggType = TypeVar("AggType")
  37. # Represents a batch of records to be stored in the Ray object store.
  38. #
  39. # Block data can be accessed in a uniform way via ``BlockAccessors`` like`
  40. # ``ArrowBlockAccessor``.
  41. Block = Union["pyarrow.Table", "pandas.DataFrame"]
  42. # Represents the schema of a block, which can be either a Python type or a
  43. # pyarrow schema. This is used to describe the structure of the data in a block.
  44. Schema = Union[type, "PandasBlockSchema", "pyarrow.lib.Schema"]
  45. # Represents a single column of the ``Block``
  46. BlockColumn = Union["pyarrow.ChunkedArray", "pyarrow.Array", "pandas.Series"]
  47. # Represents a single column of the ``Batch``
  48. BatchColumn = Union[
  49. "pandas.Series", "np.ndarray", "pyarrow.Array", "pyarrow.ChunkedArray"
  50. ]
  51. logger = logging.getLogger(__name__)
  52. @DeveloperAPI
  53. class BlockType(Enum):
  54. ARROW = "arrow"
  55. PANDAS = "pandas"
  56. @DeveloperAPI
  57. class BatchFormat(str, Enum):
  58. # NOTE: This is to maintain compatibility w/ existing APIs
  59. ARROW = "pyarrow"
  60. PANDAS = "pandas"
  61. NUMPY = "numpy"
  62. # User-facing data batch type. This is the data type for data that is supplied to and
  63. # returned from batch UDFs.
  64. DataBatch = Union["pyarrow.Table", "pandas.DataFrame", Dict[str, np.ndarray]]
  65. # User-facing data column type. This is the data type for data that is supplied to and
  66. # returned from column UDFs.
  67. DataBatchColumn = Union[BlockColumn, np.ndarray]
  68. # A class type that implements __call__.
  69. CallableClass = type
  70. class _CallableClassProtocol(Protocol[T, U]):
  71. def __call__(self, __arg: T) -> Union[U, Iterator[U]]:
  72. ...
  73. # A user defined function passed to flat_map, map_batches, etc.
  74. UserDefinedFunction = Union[
  75. Callable[[T], U],
  76. Callable[[T], Iterator[U]],
  77. type["_CallableClassProtocol"],
  78. ]
  79. # A list of block references pending computation by a single task. For example,
  80. # this may be the output of a task reading a file.
  81. BlockPartition = List[Tuple[ObjectRef[Block], "BlockMetadata"]]
  82. # The metadata that describes the output of a BlockPartition. This has the
  83. # same type as the metadata that describes each block in the partition.
  84. BlockPartitionMetadata = List["BlockMetadata"]
  85. VALID_BATCH_FORMATS = ["pandas", "pyarrow", "numpy", None]
  86. DEFAULT_BATCH_FORMAT = "numpy"
  87. def _is_empty_schema(schema: Optional[Schema]) -> bool:
  88. from ray.data._internal.pandas_block import PandasBlockSchema
  89. return schema is None or (
  90. not schema.names
  91. if isinstance(schema, PandasBlockSchema)
  92. else not schema # pyarrow schema check
  93. )
  94. def _take_first_non_empty_schema(schemas: Iterator["Schema"]) -> Optional["Schema"]:
  95. """Return the first non-empty schema from an iterator of schemas.
  96. Args:
  97. schemas: Iterator of schemas to check.
  98. Returns:
  99. The first non-empty schema, or None if all schemas are empty.
  100. """
  101. for schema in schemas:
  102. if not _is_empty_schema(schema):
  103. return schema
  104. return None
  105. def _apply_batch_format(given_batch_format: Optional[str]) -> Optional[str]:
  106. if given_batch_format == "default":
  107. given_batch_format = DEFAULT_BATCH_FORMAT
  108. if given_batch_format not in VALID_BATCH_FORMATS:
  109. raise ValueError(
  110. f"The given batch format {given_batch_format} isn't allowed (must be one of"
  111. f" {VALID_BATCH_FORMATS})."
  112. )
  113. return given_batch_format
  114. @DeveloperAPI
  115. def to_stats(metas: List["BlockMetadata"]) -> List["BlockStats"]:
  116. return [m.to_stats() for m in metas]
  117. @DeveloperAPI
  118. class BlockExecStats:
  119. """Execution stats for this block.
  120. Attributes:
  121. wall_time_s: The wall-clock time it took to compute this block.
  122. cpu_time_s: The CPU time it took to compute this block.
  123. node_id: A unique id for the node that computed this block.
  124. max_uss_bytes: An estimate of the maximum amount of physical memory that the
  125. process was using while computing this block.
  126. """
  127. def __init__(self):
  128. self.start_time_s: Optional[float] = None
  129. self.end_time_s: Optional[float] = None
  130. self.wall_time_s: Optional[float] = None
  131. self.udf_time_s: Optional[float] = 0
  132. self.block_ser_time_s: Optional[float] = None
  133. self.cpu_time_s: Optional[float] = None
  134. self.node_id = ray.runtime_context.get_runtime_context().get_node_id()
  135. self.max_uss_bytes: int = 0
  136. self.task_idx: Optional[int] = None
  137. @staticmethod
  138. def builder() -> "_BlockExecStatsBuilder":
  139. return _BlockExecStatsBuilder()
  140. def __repr__(self):
  141. return repr(
  142. {
  143. "wall_time_s": self.wall_time_s,
  144. "cpu_time_s": self.cpu_time_s,
  145. "udf_time_s": self.udf_time_s,
  146. "node_id": self.node_id,
  147. }
  148. )
  149. class _BlockExecStatsBuilder:
  150. """Helper class for building block stats.
  151. When this class is created, we record the start time. When build() is
  152. called, the time delta is saved as part of the stats.
  153. """
  154. def __init__(self):
  155. self._start_time = time.perf_counter()
  156. self._start_cpu = time.process_time()
  157. def build(self, block_ser_time_s: Optional[int] = None) -> "BlockExecStats":
  158. # Record end times.
  159. end_time = time.perf_counter()
  160. end_cpu = time.process_time()
  161. # Build the stats.
  162. stats = BlockExecStats()
  163. stats.start_time_s = self._start_time
  164. stats.end_time_s = end_time
  165. stats.wall_time_s = end_time - self._start_time
  166. stats.cpu_time_s = end_cpu - self._start_cpu
  167. stats.block_ser_time_s = block_ser_time_s
  168. return stats
  169. @DeveloperAPI
  170. @dataclass
  171. class BlockStats:
  172. """Statistics about the block produced"""
  173. #: The number of rows contained in this block, or None.
  174. num_rows: Optional[int]
  175. #: The approximate size in bytes of this block, or None.
  176. size_bytes: Optional[int]
  177. #: Execution stats for this block.
  178. exec_stats: Optional[BlockExecStats]
  179. def __post_init__(self):
  180. if self.size_bytes is not None:
  181. # Require size_bytes to be int, ray.util.metrics objects
  182. # will not take other types like numpy.int64
  183. assert isinstance(self.size_bytes, int)
  184. _BLOCK_STATS_FIELD_NAMES = {f.name for f in fields(BlockStats)}
  185. @DeveloperAPI
  186. @dataclass
  187. class BlockMetadata(BlockStats):
  188. """Metadata about the block."""
  189. #: The pyarrow schema or types of the block elements, or None.
  190. #: The list of file paths used to generate this block, or
  191. #: the empty list if indeterminate.
  192. input_files: Optional[List[str]]
  193. def to_stats(self):
  194. return BlockStats(
  195. **{key: self.__getattribute__(key) for key in _BLOCK_STATS_FIELD_NAMES}
  196. )
  197. def __post_init__(self):
  198. super().__post_init__()
  199. if self.input_files is None:
  200. self.input_files = []
  201. @DeveloperAPI(stability="alpha")
  202. @dataclass
  203. class BlockMetadataWithSchema(BlockMetadata):
  204. schema: Optional[Schema] = None
  205. def __init__(self, metadata: BlockMetadata, schema: Optional["Schema"] = None):
  206. super().__init__(
  207. input_files=metadata.input_files,
  208. size_bytes=metadata.size_bytes,
  209. num_rows=metadata.num_rows,
  210. exec_stats=metadata.exec_stats,
  211. )
  212. self.schema = schema
  213. def from_block(
  214. block: Block, stats: Optional["BlockExecStats"] = None
  215. ) -> "BlockMetadataWithSchema":
  216. accessor = BlockAccessor.for_block(block)
  217. meta = accessor.get_metadata(exec_stats=stats)
  218. schema = accessor.schema()
  219. return BlockMetadataWithSchema(metadata=meta, schema=schema)
  220. @property
  221. def metadata(self) -> BlockMetadata:
  222. return BlockMetadata(
  223. num_rows=self.num_rows,
  224. size_bytes=self.size_bytes,
  225. exec_stats=self.exec_stats,
  226. input_files=self.input_files,
  227. )
  228. @DeveloperAPI
  229. class BlockAccessor:
  230. """Provides accessor methods for a specific block.
  231. Ideally, we wouldn't need a separate accessor classes for blocks. However,
  232. this is needed if we want to support storing ``pyarrow.Table`` directly
  233. as a top-level Ray object, without a wrapping class (issue #17186).
  234. """
  235. def num_rows(self) -> int:
  236. """Return the number of rows contained in this block."""
  237. raise NotImplementedError
  238. def iter_rows(self, public_row_format: bool) -> Iterator[T]:
  239. """Iterate over the rows of this block.
  240. Args:
  241. public_row_format: Whether to cast rows into the public Dict row
  242. format (this incurs extra copy conversions).
  243. """
  244. raise NotImplementedError
  245. def slice(self, start: int, end: int, copy: bool = False) -> Block:
  246. """Return a slice of this block.
  247. Args:
  248. start: The starting index of the slice (inclusive).
  249. end: The ending index of the slice (exclusive).
  250. copy: Whether to perform a data copy for the slice.
  251. Returns:
  252. The sliced block result.
  253. """
  254. raise NotImplementedError
  255. def take(self, indices: List[int]) -> Block:
  256. """Return a new block containing the provided row indices.
  257. Args:
  258. indices: The row indices to return.
  259. Returns:
  260. A new block containing the provided row indices.
  261. """
  262. raise NotImplementedError
  263. def drop(self, columns: List[str]) -> Block:
  264. """Return a new block with the list of provided columns dropped"""
  265. raise NotImplementedError
  266. def select(self, columns: List[Optional[str]]) -> Block:
  267. """Return a new block containing the provided columns."""
  268. raise NotImplementedError
  269. def rename_columns(self, columns_rename: Dict[str, str]) -> Block:
  270. """Return the block reflecting the renamed columns."""
  271. raise NotImplementedError
  272. def upsert_column(self, column_name: str, column_data: BlockColumn) -> Block:
  273. """
  274. Upserts a column into the block. If the column already exists, it will be replaced.
  275. Args:
  276. column_name: The name of the column to upsert.
  277. column_data: The data to upsert into the column. (Arrow Array/ChunkedArray for Arrow blocks, Series or array-like for Pandas blocks)
  278. Returns:
  279. The updated block.
  280. """
  281. raise NotImplementedError()
  282. def random_shuffle(self, random_seed: Optional[int]) -> Block:
  283. """Randomly shuffle this block."""
  284. raise NotImplementedError
  285. def to_pandas(self) -> "pandas.DataFrame":
  286. """Convert this block into a Pandas dataframe."""
  287. raise NotImplementedError
  288. def to_numpy(
  289. self, columns: Optional[Union[str, List[str]]] = None
  290. ) -> Union[np.ndarray, Dict[str, np.ndarray]]:
  291. """Convert this block (or columns of block) into a NumPy ndarray.
  292. Args:
  293. columns: Name of columns to convert, or None if converting all columns.
  294. """
  295. raise NotImplementedError
  296. def to_arrow(self) -> "pyarrow.Table":
  297. """Convert this block into an Arrow table."""
  298. raise NotImplementedError
  299. def to_block(self) -> Block:
  300. """Return the base block that this accessor wraps."""
  301. raise NotImplementedError
  302. def to_default(self) -> Block:
  303. """Return the default data format for this accessor."""
  304. return self.to_block()
  305. def to_batch_format(self, batch_format: Optional[str]) -> DataBatch:
  306. """Convert this block into the provided batch format.
  307. Args:
  308. batch_format: The batch format to convert this block to.
  309. Returns:
  310. This block formatted as the provided batch format.
  311. """
  312. if batch_format is None:
  313. return self.to_block()
  314. elif batch_format == "default" or batch_format == "native":
  315. return self.to_default()
  316. elif batch_format == "pandas":
  317. return self.to_pandas()
  318. elif batch_format == "pyarrow":
  319. return self.to_arrow()
  320. elif batch_format == "numpy":
  321. return self.to_numpy()
  322. else:
  323. raise ValueError(
  324. f"The batch format must be one of {VALID_BATCH_FORMATS}, got: "
  325. f"{batch_format}"
  326. )
  327. def size_bytes(self) -> int:
  328. """Return the approximate size in bytes of this block."""
  329. raise NotImplementedError
  330. def schema(self) -> Union[type, "pyarrow.lib.Schema"]:
  331. """Return the Python type or pyarrow schema of this block."""
  332. raise NotImplementedError
  333. def get_metadata(
  334. self,
  335. input_files: Optional[List[str]] = None,
  336. exec_stats: Optional[BlockExecStats] = None,
  337. ) -> BlockMetadata:
  338. """Create a metadata object from this block."""
  339. return BlockMetadata(
  340. num_rows=self.num_rows(),
  341. size_bytes=self.size_bytes(),
  342. input_files=input_files,
  343. exec_stats=exec_stats,
  344. )
  345. def zip(self, other: "Block") -> "Block":
  346. """Zip this block with another block of the same type and size."""
  347. raise NotImplementedError
  348. @staticmethod
  349. def builder() -> "BlockBuilder":
  350. """Create a builder for this block type."""
  351. raise NotImplementedError
  352. @classmethod
  353. def batch_to_block(
  354. cls,
  355. batch: DataBatch,
  356. block_type: Optional[BlockType] = None,
  357. ) -> Block:
  358. """Create a block from user-facing data formats."""
  359. if isinstance(batch, np.ndarray):
  360. raise ValueError(
  361. f"Error validating {_truncated_repr(batch)}: "
  362. "Standalone numpy arrays are not "
  363. "allowed in Ray 2.5. Return a dict of field -> array, "
  364. "e.g., `{'data': array}` instead of `array`."
  365. )
  366. elif isinstance(batch, collections.abc.Mapping):
  367. if block_type is None or block_type == BlockType.ARROW:
  368. from ray.data._internal.tensor_extensions.arrow import (
  369. ArrowConversionError,
  370. )
  371. try:
  372. return cls.batch_to_arrow_block(batch)
  373. except ArrowConversionError as e:
  374. if log_once("_fallback_to_pandas_block_warning"):
  375. logger.warning(
  376. f"Failed to convert batch to Arrow due to: {e}; "
  377. f"falling back to Pandas block"
  378. )
  379. if block_type is None:
  380. return cls.batch_to_pandas_block(batch)
  381. else:
  382. raise e
  383. else:
  384. assert block_type == BlockType.PANDAS
  385. return cls.batch_to_pandas_block(batch)
  386. return batch
  387. @classmethod
  388. def batch_to_arrow_block(cls, batch: Dict[str, Any]) -> Block:
  389. """Create an Arrow block from user-facing data formats."""
  390. from ray.data._internal.arrow_block import ArrowBlockBuilder
  391. return ArrowBlockBuilder._table_from_pydict(batch)
  392. @classmethod
  393. def batch_to_pandas_block(cls, batch: Dict[str, Any]) -> Block:
  394. """Create a Pandas block from user-facing data formats."""
  395. from ray.data._internal.pandas_block import PandasBlockBuilder
  396. return PandasBlockBuilder._table_from_pydict(batch)
  397. @staticmethod
  398. def for_block(block: Block) -> "BlockAccessor[T]":
  399. """Create a block accessor for the given block."""
  400. _check_pyarrow_version()
  401. import pandas
  402. import pyarrow
  403. if isinstance(block, (pyarrow.Table, pyarrow.RecordBatch)):
  404. from ray.data._internal.arrow_block import ArrowBlockAccessor
  405. return ArrowBlockAccessor(block)
  406. elif isinstance(block, pandas.DataFrame):
  407. from ray.data._internal.pandas_block import PandasBlockAccessor
  408. return PandasBlockAccessor(block)
  409. elif isinstance(block, bytes):
  410. from ray.data._internal.arrow_block import ArrowBlockAccessor
  411. return ArrowBlockAccessor.from_bytes(block)
  412. elif isinstance(block, list):
  413. raise ValueError(
  414. f"Error validating {_truncated_repr(block)}: "
  415. "Standalone Python objects are not "
  416. "allowed in Ray 2.5. To use Python objects in a dataset, "
  417. "wrap them in a dict of numpy arrays, e.g., "
  418. "return `{'item': batch}` instead of just `batch`."
  419. )
  420. else:
  421. raise TypeError("Not a block type: {} ({})".format(block, type(block)))
  422. def sample(self, n_samples: int, sort_key: "SortKey") -> "Block":
  423. """Return a random sample of items from this block."""
  424. raise NotImplementedError
  425. def count(self, on: str, ignore_nulls: bool = False) -> Optional[U]:
  426. """Returns a count of the distinct values in the provided column"""
  427. raise NotImplementedError
  428. def sum(self, on: str, ignore_nulls: bool) -> Optional[U]:
  429. """Returns a sum of the values in the provided column"""
  430. raise NotImplementedError
  431. def min(self, on: str, ignore_nulls: bool) -> Optional[U]:
  432. """Returns a min of the values in the provided column"""
  433. raise NotImplementedError
  434. def max(self, on: str, ignore_nulls: bool) -> Optional[U]:
  435. """Returns a max of the values in the provided column"""
  436. raise NotImplementedError
  437. def mean(self, on: str, ignore_nulls: bool) -> Optional[U]:
  438. """Returns a mean of the values in the provided column"""
  439. raise NotImplementedError
  440. def sum_of_squared_diffs_from_mean(
  441. self,
  442. on: str,
  443. ignore_nulls: bool,
  444. mean: Optional[U] = None,
  445. ) -> Optional[U]:
  446. """Returns a sum of diffs (from mean) squared for the provided column"""
  447. raise NotImplementedError
  448. def sort(self, sort_key: "SortKey") -> "Block":
  449. """Returns new block sorted according to provided `sort_key`"""
  450. raise NotImplementedError
  451. def sort_and_partition(
  452. self, boundaries: List[T], sort_key: "SortKey"
  453. ) -> List["Block"]:
  454. """Return a list of sorted partitions of this block."""
  455. raise NotImplementedError
  456. def _aggregate(self, key: "SortKey", aggs: Tuple["AggregateFn"]) -> Block:
  457. """Combine rows with the same key into an accumulator."""
  458. raise NotImplementedError
  459. @staticmethod
  460. def merge_sorted_blocks(
  461. blocks: List["Block"], sort_key: "SortKey"
  462. ) -> Tuple[Block, BlockMetadataWithSchema]:
  463. """Return a sorted block by merging a list of sorted blocks."""
  464. raise NotImplementedError
  465. @staticmethod
  466. def _combine_aggregated_blocks(
  467. blocks: List[Block],
  468. sort_key: "SortKey",
  469. aggs: Tuple["AggregateFn"],
  470. finalize: bool = True,
  471. ) -> Tuple[Block, BlockMetadataWithSchema]:
  472. """Aggregate partially combined and sorted blocks."""
  473. raise NotImplementedError
  474. def _find_partitions_sorted(
  475. self,
  476. boundaries: List[Tuple[Any]],
  477. sort_key: "SortKey",
  478. ) -> List[Block]:
  479. """NOTE: PLEASE READ CAREFULLY
  480. Returns dataset partitioned using list of boundaries
  481. This method requires that
  482. - Block being sorted (according to `sort_key`)
  483. - Boundaries is a sorted list of tuples
  484. """
  485. raise NotImplementedError
  486. def block_type(self) -> BlockType:
  487. """Return the block type of this block."""
  488. raise NotImplementedError
  489. def _get_group_boundaries_sorted(self, keys: List[str]) -> np.ndarray:
  490. """
  491. NOTE: THIS METHOD ASSUMES THAT PROVIDED BLOCK IS ALREADY SORTED
  492. Compute boundaries of the groups within a block based on provided
  493. key (a column or a list of columns)
  494. NOTE: In each column, NaNs/None are considered to be the same group.
  495. Args:
  496. block: sorted block for which grouping of rows will be determined
  497. based on provided key
  498. keys: list of columns determining the key for every row based on
  499. which the block will be grouped
  500. Returns:
  501. A list of starting indices of each group and an end index of the last
  502. group, i.e., there are ``num_groups + 1`` entries and the first and last
  503. entries are 0 and ``len(array)`` respectively.
  504. """
  505. if self.num_rows() == 0:
  506. return np.array([], dtype=np.int32)
  507. elif not keys:
  508. # If no keys are specified, whole block is considered a single group
  509. return np.array([0, self.num_rows()])
  510. # Convert key columns to Numpy (to perform vectorized
  511. # ops on them)
  512. projected_block = self.to_numpy(keys)
  513. return _get_group_boundaries_sorted_numpy(list(projected_block.values()))
  514. @DeveloperAPI(stability="beta")
  515. class BlockColumnAccessor:
  516. """Provides vendor-neutral interface to apply common operations
  517. to block's (Pandas/Arrow) columns"""
  518. def __init__(self, col: BlockColumn):
  519. self._column = col
  520. def count(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
  521. """Returns a count of the distinct values in the column"""
  522. raise NotImplementedError()
  523. def sum(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
  524. """Returns a sum of the values in the column"""
  525. return NotImplementedError()
  526. def min(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
  527. """Returns a min of the values in the column"""
  528. raise NotImplementedError()
  529. def max(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
  530. """Returns a max of the values in the column"""
  531. raise NotImplementedError()
  532. def mean(self, *, ignore_nulls: bool, as_py: bool = True) -> Optional[U]:
  533. """Returns a mean of the values in the column"""
  534. raise NotImplementedError()
  535. def quantile(
  536. self, *, q: float, ignore_nulls: bool, as_py: bool = True
  537. ) -> Optional[U]:
  538. """Returns requested quantile of the given column"""
  539. raise NotImplementedError()
  540. def unique(self) -> BlockColumn:
  541. """Returns new column holding only distinct values of the current one"""
  542. raise NotImplementedError()
  543. def value_counts(self) -> Dict[str, List]:
  544. raise NotImplementedError()
  545. def hash(self) -> BlockColumn:
  546. """
  547. Computes a 64-bit hash value for each row in the column.
  548. Provides a unified hashing method across supported backends.
  549. Handles complex types like lists or nested structures by producing a single hash per row.
  550. These hashes are useful for downstream operations such as deduplication, grouping, or partitioning.
  551. Internally, Polars is used to compute row-level hashes even when the original column
  552. is backed by Pandas or PyArrow.
  553. Returns:
  554. A column of 64-bit integer hashes, returned in the same format as the
  555. underlying backend (e.g., Pandas Series or PyArrow Array).
  556. """
  557. raise NotImplementedError()
  558. def flatten(self) -> BlockColumn:
  559. """Flattens nested lists merging them into top-level container"""
  560. raise NotImplementedError()
  561. def dropna(self) -> BlockColumn:
  562. raise NotImplementedError()
  563. def is_composed_of_lists(self) -> bool:
  564. """
  565. Checks whether the column is composed of list-like elements.
  566. Returns:
  567. True if the column is made up of list-like values; False otherwise.
  568. """
  569. raise NotImplementedError()
  570. def sum_of_squared_diffs_from_mean(
  571. self,
  572. *,
  573. ignore_nulls: bool,
  574. mean: Optional[U] = None,
  575. as_py: bool = True,
  576. ) -> Optional[U]:
  577. """Returns a sum of diffs (from mean) squared for the column"""
  578. raise NotImplementedError()
  579. def to_pylist(self) -> List[Any]:
  580. """Converts block column to a list of Python native objects"""
  581. raise NotImplementedError()
  582. def to_numpy(self, zero_copy_only: bool = False) -> np.ndarray:
  583. """Converts underlying column to Numpy"""
  584. raise NotImplementedError()
  585. def _as_arrow_compatible(self) -> Union[List[Any], "pyarrow.Array"]:
  586. """Converts block column into a representation compatible with Arrow"""
  587. raise NotImplementedError()
  588. @staticmethod
  589. def for_column(col: BlockColumn) -> "BlockColumnAccessor":
  590. """Create a column accessor for the given column"""
  591. _check_pyarrow_version()
  592. import pandas as pd
  593. if isinstance(col, pa.Array) or isinstance(col, pa.ChunkedArray):
  594. from ray.data._internal.arrow_block import ArrowBlockColumnAccessor
  595. return ArrowBlockColumnAccessor(col)
  596. elif isinstance(col, pd.Series):
  597. from ray.data._internal.pandas_block import PandasBlockColumnAccessor
  598. return PandasBlockColumnAccessor(col)
  599. else:
  600. raise TypeError(
  601. f"Expected either a pandas.Series or pyarrow.Array (ChunkedArray) "
  602. f"(got {type(col)})"
  603. )
  604. def _get_group_boundaries_sorted_numpy(columns: list[np.ndarray]) -> np.ndarray:
  605. # There are 3 categories: general, numerics with NaN, and categorical with None.
  606. # We only needed to check the last element for NaNs/None, as they are assumed to
  607. # be sorted.
  608. general_arrays = []
  609. num_arrays_with_nan = []
  610. cat_arrays_with_none = []
  611. for arr in columns:
  612. if np.issubdtype(arr.dtype, np.number) and np.isnan(arr[-1]):
  613. num_arrays_with_nan.append(arr)
  614. elif not np.issubdtype(arr.dtype, np.number) and arr[-1] is None:
  615. cat_arrays_with_none.append(arr)
  616. else:
  617. general_arrays.append(arr)
  618. # Compute the difference between each pair of elements. Handle the cases
  619. # where neighboring elements are both NaN or None. Output as a list of
  620. # boolean arrays.
  621. diffs = []
  622. if len(general_arrays) > 0:
  623. diffs.append(
  624. np.vstack([arr[1:] != arr[:-1] for arr in general_arrays]).any(axis=0)
  625. )
  626. if len(num_arrays_with_nan) > 0:
  627. # Two neighboring numeric elements belong to the same group when they are
  628. # 1) both finite and equal
  629. # or 2) both np.nan
  630. diffs.append(
  631. np.vstack(
  632. [
  633. (arr[1:] != arr[:-1])
  634. & (np.isfinite(arr[1:]) | np.isfinite(arr[:-1]))
  635. for arr in num_arrays_with_nan
  636. ]
  637. ).any(axis=0)
  638. )
  639. if len(cat_arrays_with_none) > 0:
  640. # Two neighboring str/object elements belong to the same group when they are
  641. # 1) both finite and equal
  642. # or 2) both None
  643. diffs.append(
  644. np.vstack(
  645. [
  646. (arr[1:] != arr[:-1])
  647. & ~(np.equal(arr[1:], None) & np.equal(arr[:-1], None))
  648. for arr in cat_arrays_with_none
  649. ]
  650. ).any(axis=0)
  651. )
  652. # A series of vectorized operations to compute the boundaries:
  653. # - column_stack: stack the bool arrays into a single 2D bool array
  654. # - any() and nonzero(): find the indices where any of the column diffs are True
  655. # - add 1 to get the index of the first element of the next group
  656. # - hstack(): include the 0 and last indices to the boundaries
  657. boundaries = np.hstack(
  658. [
  659. [0],
  660. (np.column_stack(diffs).any(axis=1).nonzero()[0] + 1),
  661. [len(columns[0])],
  662. ]
  663. ).astype(int)
  664. return boundaries