arrow_utils.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. from __future__ import annotations
  2. import itertools
  3. from typing import List, Union
  4. import numpy as np
  5. import pyarrow as pa
  6. import pyarrow.compute as pc
  7. def _counts_to_offsets(counts: pa.Array) -> pa.Array:
  8. """Convert per-row counts to list offsets via cumulative sum."""
  9. cumsum = pc.cumulative_sum(counts)
  10. return pa.concat_arrays([pa.array([0], type=cumsum.type), cumsum])
  11. def _combine_as_list_array(
  12. column_values: List[Union[pa.Array, pa.ChunkedArray]] | None = None,
  13. *,
  14. offsets: pa.Array | None = None,
  15. values: pa.Array | None = None,
  16. is_large: bool = False,
  17. null_mask: pa.Array | None = None,
  18. ) -> pa.Array:
  19. """Combine list arrays or build a list array from offsets and values."""
  20. if column_values is None:
  21. if offsets is None or values is None:
  22. raise ValueError(
  23. "Either column_values or both offsets and values must be provided."
  24. )
  25. else:
  26. lens = [len(v) for v in column_values]
  27. offsets_type = pa.int64() if is_large else pa.int32()
  28. offsets = pa.array(np.concatenate([[0], np.cumsum(lens)]), type=offsets_type)
  29. values = pa.concat_arrays(
  30. itertools.chain(
  31. *[
  32. v.chunks if isinstance(v, pa.ChunkedArray) else [v]
  33. for v in column_values
  34. ]
  35. )
  36. )
  37. offsets_type = pa.int64() if is_large else pa.int32()
  38. offsets = pc.cast(offsets, offsets_type)
  39. array_cls = pa.LargeListArray if is_large else pa.ListArray
  40. list_type = pa.large_list(values.type) if is_large else pa.list_(values.type)
  41. return array_cls.from_arrays(offsets, values, list_type, mask=null_mask)