grouped_data.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. from collections.abc import Iterator as IteratorABC
  2. from functools import partial
  3. from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union
  4. from ray.data._internal.compute import ComputeStrategy
  5. from ray.data._internal.logical.interfaces import LogicalPlan
  6. from ray.data._internal.logical.operators import Aggregate
  7. from ray.data.aggregate import AggregateFn, Count, Max, Mean, Min, Std, Sum
  8. from ray.data.block import (
  9. Block,
  10. BlockAccessor,
  11. CallableClass,
  12. DataBatch,
  13. UserDefinedFunction,
  14. )
  15. from ray.data.context import ShuffleStrategy
  16. from ray.data.dataset import EXPRESSION_API_GROUP, Dataset
  17. from ray.data.expressions import DownloadExpr, Expr, StarExpr
  18. from ray.util.annotations import PublicAPI
  19. CDS_API_GROUP = "Computations or Descriptive Stats"
  20. FA_API_GROUP = "Function Application"
  21. class GroupedData:
  22. """Represents a grouped dataset created by calling ``Dataset.groupby()``.
  23. The actual groupby is deferred until an aggregation is applied.
  24. """
  25. def __init__(
  26. self,
  27. dataset: Dataset,
  28. key: Optional[Union[str, List[str]]],
  29. *,
  30. num_partitions: Optional[int],
  31. ):
  32. """Construct a dataset grouped by key (internal API).
  33. The constructor is not part of the GroupedData API.
  34. Use the ``Dataset.groupby()`` method to construct one.
  35. """
  36. self._dataset: Dataset = dataset
  37. self._key: Optional[Union[str, List[str]]] = key
  38. self._num_partitions: Optional[int] = num_partitions
  39. def __repr__(self) -> str:
  40. return (
  41. f"{self.__class__.__name__}(dataset={self._dataset}, " f"key={self._key!r})"
  42. )
  43. @PublicAPI(api_group=FA_API_GROUP)
  44. def aggregate(self, *aggs: AggregateFn) -> Dataset:
  45. """Implements an accumulator-based aggregation.
  46. Args:
  47. aggs: Aggregations to do.
  48. Returns:
  49. The output is an dataset of ``n + 1`` columns where the first column
  50. is the groupby key and the second through ``n + 1`` columns are the
  51. results of the aggregations.
  52. If groupby key is ``None`` then the key part of return is omitted.
  53. """
  54. plan = self._dataset._plan.copy()
  55. op = Aggregate(
  56. self._dataset._logical_plan.dag,
  57. key=self._key,
  58. aggs=aggs,
  59. num_partitions=self._num_partitions,
  60. )
  61. logical_plan = LogicalPlan(op, self._dataset.context)
  62. return Dataset(
  63. plan,
  64. logical_plan,
  65. )
  66. def _aggregate_on(
  67. self,
  68. agg_cls: type,
  69. on: Union[str, List[str]],
  70. *args,
  71. **kwargs,
  72. ):
  73. """Helper for aggregating on a particular subset of the dataset.
  74. This validates the `on` argument, and converts a list of column names
  75. to a multi-aggregation. A null `on` results in a
  76. multi-aggregation on all columns for an Arrow Dataset, and a single
  77. aggregation on the entire row for a simple Dataset.
  78. """
  79. aggs = self._dataset._build_multicolumn_aggs(
  80. agg_cls, on, *args, skip_cols=self._key, **kwargs
  81. )
  82. return self.aggregate(*aggs)
  83. @PublicAPI(api_group=FA_API_GROUP)
  84. def map_groups(
  85. self,
  86. fn: UserDefinedFunction[DataBatch, DataBatch],
  87. *,
  88. zero_copy_batch: bool = True,
  89. compute: Union[str, ComputeStrategy] = None,
  90. batch_format: Optional[str] = "default",
  91. fn_args: Optional[Iterable[Any]] = None,
  92. fn_kwargs: Optional[Dict[str, Any]] = None,
  93. fn_constructor_args: Optional[Iterable[Any]] = None,
  94. fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
  95. num_cpus: Optional[float] = None,
  96. num_gpus: Optional[float] = None,
  97. memory: Optional[float] = None,
  98. concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None,
  99. ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
  100. **ray_remote_args,
  101. ) -> "Dataset":
  102. """Apply the given function to each group of records of this dataset.
  103. While map_groups() is very flexible, note that it comes with downsides:
  104. * It may be slower than using more specific methods such as min(), max().
  105. * It requires that each group fits in memory on a single node.
  106. In general, prefer to use `aggregate()` instead of `map_groups()`.
  107. .. warning::
  108. Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental,
  109. and may result in scheduling or stability issues. Please
  110. `report any issues <https://github.com/ray-project/ray/issues/new/choose>`_
  111. to the Ray team.
  112. Examples:
  113. >>> # Return a single record per group (list of multiple records in,
  114. >>> # list of a single record out).
  115. >>> import ray
  116. >>> import pandas as pd
  117. >>> import numpy as np
  118. >>> # Get first value per group.
  119. >>> ds = ray.data.from_items([ # doctest: +SKIP
  120. ... {"group": 1, "value": 1},
  121. ... {"group": 1, "value": 2},
  122. ... {"group": 2, "value": 3},
  123. ... {"group": 2, "value": 4}])
  124. >>> ds.groupby("group").map_groups( # doctest: +SKIP
  125. ... lambda g: {"result": np.array([g["value"][0]])})
  126. >>> # Return multiple records per group (dataframe in, dataframe out).
  127. >>> df = pd.DataFrame(
  128. ... {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]}
  129. ... )
  130. >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
  131. >>> grouped = ds.groupby("A") # doctest: +SKIP
  132. >>> grouped.map_groups( # doctest: +SKIP
  133. ... lambda g: g.apply(
  134. ... lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c
  135. ... )
  136. ... ) # doctest: +SKIP
  137. Args:
  138. fn: The function to apply to each group of records, or a class type
  139. that can be instantiated to create such a callable. It takes as
  140. input a batch of all records from a single group, and returns a
  141. batch of zero or more records, similar to map_batches().
  142. zero_copy_batch: If True, each group of rows (batch) will be provided w/o
  143. making an additional copy.
  144. compute: The compute strategy to use for the map operation.
  145. * If ``compute`` is not specified for a function, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
  146. * Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.
  147. * If ``compute`` is not specified for a callable class, will use ``ray.data.ActorPoolStrategy(min_size=1, max_size=None)`` to launch an autoscaling actor pool from 1 to unlimited workers.
  148. * Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
  149. * Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
  150. * Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
  151. batch_format: Specify ``"default"`` to use the default block format
  152. (NumPy), ``"pandas"`` to select ``pandas.DataFrame``, "pyarrow" to
  153. select ``pyarrow.Table``, or ``"numpy"`` to select
  154. ``Dict[str, numpy.ndarray]``, or None to return the underlying block
  155. exactly as is with no additional formatting.
  156. fn_args: Arguments to `fn`.
  157. fn_kwargs: Keyword arguments to `fn`.
  158. fn_constructor_args: Positional arguments to pass to ``fn``'s constructor.
  159. You can only provide this if ``fn`` is a callable class. These arguments
  160. are top-level arguments in the underlying Ray actor construction task.
  161. fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
  162. This can only be provided if ``fn`` is a callable class. These arguments
  163. are top-level arguments in the underlying Ray actor construction task.
  164. num_cpus: The number of CPUs to reserve for each parallel map worker.
  165. num_gpus: The number of GPUs to reserve for each parallel map worker. For
  166. example, specify `num_gpus=1` to request 1 GPU for each parallel map
  167. worker.
  168. memory: The heap memory in bytes to reserve for each parallel map worker.
  169. ray_remote_args_fn: A function that returns a dictionary of remote args
  170. passed to each map worker. The purpose of this argument is to generate
  171. dynamic arguments for each actor or task, and will be called each time prior
  172. to initializing the worker. Args returned from this dict will always
  173. override the args in ``ray_remote_args``. Note: this is an advanced,
  174. experimental feature.
  175. concurrency: This argument is deprecated. Use ``compute`` argument.
  176. ray_remote_args: Additional resource requirements to request from
  177. Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
  178. :func:`ray.remote` for details.
  179. Returns:
  180. The return type is determined by the return type of ``fn``, and the return
  181. value is combined from results of all groups.
  182. .. seealso::
  183. :meth:`GroupedData.aggregate`
  184. Use this method for common aggregation use cases.
  185. """
  186. # Prior to applying map operation we have to shuffle the data based on provided
  187. # key and (optionally) number of partitions
  188. #
  189. # - In case key is none, we repartition into a single block
  190. # - In case when hash-shuffle strategy is employed -- perform `repartition_and_sort`
  191. # - Otherwise we perform "global" sort of the dataset (to co-locate rows with the
  192. # same key values)
  193. if self._key is None:
  194. shuffled_ds = self._dataset.repartition(1)
  195. elif self._dataset.context.shuffle_strategy == ShuffleStrategy.HASH_SHUFFLE:
  196. num_partitions = (
  197. self._num_partitions
  198. or self._dataset.context.default_hash_shuffle_parallelism
  199. )
  200. shuffled_ds = self._dataset.repartition(
  201. num_partitions,
  202. keys=self._key,
  203. # Blocks must be sorted after repartitioning, such that group
  204. # of rows sharing the same key values are co-located
  205. sort=True,
  206. )
  207. else:
  208. shuffled_ds = self._dataset.sort(self._key)
  209. # The batch is the entire block, because we have batch_size=None for
  210. # map_batches() below.
  211. if self._key is None:
  212. keys = []
  213. elif isinstance(self._key, str):
  214. keys = [self._key]
  215. elif isinstance(self._key, List):
  216. keys = self._key
  217. else:
  218. raise ValueError(
  219. f"Group-by keys are expected to either be a single column (str) "
  220. f"or a list of columns (got '{self._key}')"
  221. )
  222. # NOTE: It's crucial to make sure that UDF isn't capturing `GroupedData`
  223. # object in its closure to ensure its serializability
  224. #
  225. # See https://github.com/ray-project/ray/issues/54280 for more details
  226. if isinstance(fn, CallableClass):
  227. class wrapped_fn:
  228. def __init__(self, *args, **kwargs):
  229. self.fn = fn(*args, **kwargs)
  230. def __call__(self, batch, *args, **kwargs):
  231. yield from _apply_udf_to_groups(
  232. self.fn, batch, keys, batch_format, *args, **kwargs
  233. )
  234. else:
  235. def wrapped_fn(batch, *args, **kwargs):
  236. yield from _apply_udf_to_groups(
  237. fn, batch, keys, batch_format, *args, **kwargs
  238. )
  239. # Change the name of the wrapped function so that users see the name of their
  240. # function rather than `wrapped_fn` in the progress bar.
  241. if isinstance(fn, partial):
  242. wrapped_fn.__name__ = fn.func.__name__
  243. else:
  244. wrapped_fn.__name__ = fn.__name__
  245. # NOTE: We set batch_size=None here, so that every batch contains the entire block,
  246. # guaranteeing that groups are contained in full (ie not being split)
  247. return shuffled_ds._map_batches_without_batch_size_validation(
  248. wrapped_fn,
  249. batch_size=None,
  250. compute=compute,
  251. # NOTE: We specify `batch_format` as none to avoid converting
  252. # back-n-forth between batch and block formats (instead we convert
  253. # once per group inside the method applying the UDF itself)
  254. batch_format=None,
  255. zero_copy_batch=zero_copy_batch,
  256. fn_args=fn_args,
  257. fn_kwargs=fn_kwargs,
  258. fn_constructor_args=fn_constructor_args,
  259. fn_constructor_kwargs=fn_constructor_kwargs,
  260. num_cpus=num_cpus,
  261. num_gpus=num_gpus,
  262. memory=memory,
  263. concurrency=concurrency,
  264. udf_modifying_row_count=True,
  265. ray_remote_args_fn=ray_remote_args_fn,
  266. **ray_remote_args,
  267. )
  268. @PublicAPI(api_group=EXPRESSION_API_GROUP, stability="alpha")
  269. def with_column(
  270. self,
  271. column_name: str,
  272. expr: Expr,
  273. **ray_remote_args,
  274. ) -> Dataset:
  275. """Add a new column to each group using an expression.
  276. The supplied expression is evaluated against every row in each group, and
  277. the resulting column is appended to the group's records. The output dataset
  278. preserves the original rows and columns.
  279. Examples:
  280. >>> import ray
  281. >>> from ray.data.expressions import col
  282. >>> ds = (
  283. ... ray.data.from_items([{"group": 1, "value": 1}, {"group": 1, "value": 2}])
  284. ... .groupby("group")
  285. ... .with_column("value_twice", col("value") * 2)
  286. ... .sort(["group", "value"])
  287. ... )
  288. >>> ds.take_all()
  289. [{'group': 1, 'value': 1, 'value_twice': 2}, {'group': 1, 'value': 2, 'value_twice': 4}]
  290. Args:
  291. column_name: Name of the column to add.
  292. expr: Expression that yields the values for the new column.
  293. **ray_remote_args: Additional resource requirements to request from Ray
  294. for the underlying map tasks (for example, ``num_gpus=1``).
  295. Returns:
  296. A new :class:`~ray.data.Dataset` containing all existing columns plus
  297. the newly computed column.
  298. """
  299. if not isinstance(column_name, str) or not column_name:
  300. raise ValueError(
  301. f"column_name must be a non-empty string, got: {column_name!r}"
  302. )
  303. if not isinstance(expr, Expr):
  304. raise TypeError(
  305. "expr must be a Ray Data expression created via the expression API."
  306. )
  307. if isinstance(expr, DownloadExpr):
  308. raise TypeError(
  309. "GroupedData.with_column does not yet support download expressions."
  310. )
  311. aliased_expr = expr.alias(column_name)
  312. projection_exprs = [StarExpr(), aliased_expr]
  313. def _project_group(block: Block) -> Block:
  314. from ray.data._internal.planner.plan_expression.expression_evaluator import (
  315. eval_projection,
  316. )
  317. return eval_projection(projection_exprs, block)
  318. return self.map_groups(
  319. _project_group,
  320. batch_format=None,
  321. zero_copy_batch=True,
  322. **ray_remote_args,
  323. )
  324. @PublicAPI(api_group=CDS_API_GROUP)
  325. def count(self) -> Dataset:
  326. """Compute count aggregation.
  327. Examples:
  328. >>> import ray
  329. >>> ray.data.from_items([ # doctest: +SKIP
  330. ... {"A": x % 3, "B": x} for x in range(100)]).groupby( # doctest: +SKIP
  331. ... "A").count() # doctest: +SKIP
  332. Returns:
  333. A dataset of ``[k, v]`` columns where ``k`` is the groupby key and
  334. ``v`` is the number of rows with that key.
  335. If groupby key is ``None`` then the key part of return is omitted.
  336. """
  337. return self.aggregate(Count())
  338. @PublicAPI(api_group=CDS_API_GROUP)
  339. def sum(
  340. self, on: Union[str, List[str]] = None, ignore_nulls: bool = True
  341. ) -> Dataset:
  342. r"""Compute grouped sum aggregation.
  343. Examples:
  344. >>> import ray
  345. >>> ray.data.from_items([ # doctest: +SKIP
  346. ... (i % 3, i, i**2) # doctest: +SKIP
  347. ... for i in range(100)]) # doctest: +SKIP
  348. ... .groupby(lambda x: x[0] % 3) # doctest: +SKIP
  349. ... .sum(lambda x: x[2]) # doctest: +SKIP
  350. >>> ray.data.range(100).groupby("id").sum() # doctest: +SKIP
  351. >>> ray.data.from_items([ # doctest: +SKIP
  352. ... {"A": i % 3, "B": i, "C": i**2} # doctest: +SKIP
  353. ... for i in range(100)]) # doctest: +SKIP
  354. ... .groupby("A") # doctest: +SKIP
  355. ... .sum(["B", "C"]) # doctest: +SKIP
  356. Args:
  357. on: a column name or a list of column names to aggregate.
  358. ignore_nulls: Whether to ignore null values. If ``True``, null
  359. values will be ignored when computing the sum; if ``False``,
  360. if a null value is encountered, the output will be null.
  361. We consider np.nan, None, and pd.NaT to be null values.
  362. Default is ``True``.
  363. Returns:
  364. The sum result.
  365. For different values of ``on``, the return varies:
  366. - ``on=None``: a dataset containing a groupby key column,
  367. ``"k"``, and a column-wise sum column for each original column
  368. in the dataset.
  369. - ``on=["col_1", ..., "col_n"]``: a dataset of ``n + 1``
  370. columns where the first column is the groupby key and the second
  371. through ``n + 1`` columns are the results of the aggregations.
  372. If groupby key is ``None`` then the key part of return is omitted.
  373. """
  374. return self._aggregate_on(Sum, on, ignore_nulls=ignore_nulls)
  375. @PublicAPI(api_group=CDS_API_GROUP)
  376. def min(
  377. self, on: Union[str, List[str]] = None, ignore_nulls: bool = True
  378. ) -> Dataset:
  379. r"""Compute grouped min aggregation.
  380. Examples:
  381. >>> import ray
  382. >>> ray.data.le(100).groupby("value").min() # doctest: +SKIP
  383. >>> ray.data.from_items([ # doctest: +SKIP
  384. ... {"A": i % 3, "B": i, "C": i**2} # doctest: +SKIP
  385. ... for i in range(100)]) # doctest: +SKIP
  386. ... .groupby("A") # doctest: +SKIP
  387. ... .min(["B", "C"]) # doctest: +SKIP
  388. Args:
  389. on: a column name or a list of column names to aggregate.
  390. ignore_nulls: Whether to ignore null values. If ``True``, null
  391. values will be ignored when computing the min; if ``False``,
  392. if a null value is encountered, the output will be null.
  393. We consider np.nan, None, and pd.NaT to be null values.
  394. Default is ``True``.
  395. Returns:
  396. The min result.
  397. For different values of ``on``, the return varies:
  398. - ``on=None``: a dataset containing a groupby key column,
  399. ``"k"``, and a column-wise min column for each original column in
  400. the dataset.
  401. - ``on=["col_1", ..., "col_n"]``: a dataset of ``n + 1``
  402. columns where the first column is the groupby key and the second
  403. through ``n + 1`` columns are the results of the aggregations.
  404. If groupby key is ``None`` then the key part of return is omitted.
  405. """
  406. return self._aggregate_on(Min, on, ignore_nulls=ignore_nulls)
  407. @PublicAPI(api_group=CDS_API_GROUP)
  408. def max(
  409. self, on: Union[str, List[str]] = None, ignore_nulls: bool = True
  410. ) -> Dataset:
  411. r"""Compute grouped max aggregation.
  412. Examples:
  413. >>> import ray
  414. >>> ray.data.le(100).groupby("value").max() # doctest: +SKIP
  415. >>> ray.data.from_items([ # doctest: +SKIP
  416. ... {"A": i % 3, "B": i, "C": i**2} # doctest: +SKIP
  417. ... for i in range(100)]) # doctest: +SKIP
  418. ... .groupby("A") # doctest: +SKIP
  419. ... .max(["B", "C"]) # doctest: +SKIP
  420. Args:
  421. on: a column name or a list of column names to aggregate.
  422. ignore_nulls: Whether to ignore null values. If ``True``, null
  423. values will be ignored when computing the max; if ``False``,
  424. if a null value is encountered, the output will be null.
  425. We consider np.nan, None, and pd.NaT to be null values.
  426. Default is ``True``.
  427. Returns:
  428. The max result.
  429. For different values of ``on``, the return varies:
  430. - ``on=None``: a dataset containing a groupby key column,
  431. ``"k"``, and a column-wise max column for each original column in
  432. the dataset.
  433. - ``on=["col_1", ..., "col_n"]``: a dataset of ``n + 1``
  434. columns where the first column is the groupby key and the second
  435. through ``n + 1`` columns are the results of the aggregations.
  436. If groupby key is ``None`` then the key part of return is omitted.
  437. """
  438. return self._aggregate_on(Max, on, ignore_nulls=ignore_nulls)
  439. @PublicAPI(api_group=CDS_API_GROUP)
  440. def mean(
  441. self, on: Union[str, List[str]] = None, ignore_nulls: bool = True
  442. ) -> Dataset:
  443. r"""Compute grouped mean aggregation.
  444. Examples:
  445. >>> import ray
  446. >>> ray.data.le(100).groupby("value").mean() # doctest: +SKIP
  447. >>> ray.data.from_items([ # doctest: +SKIP
  448. ... {"A": i % 3, "B": i, "C": i**2} # doctest: +SKIP
  449. ... for i in range(100)]) # doctest: +SKIP
  450. ... .groupby("A") # doctest: +SKIP
  451. ... .mean(["B", "C"]) # doctest: +SKIP
  452. Args:
  453. on: a column name or a list of column names to aggregate.
  454. ignore_nulls: Whether to ignore null values. If ``True``, null
  455. values will be ignored when computing the mean; if ``False``,
  456. if a null value is encountered, the output will be null.
  457. We consider np.nan, None, and pd.NaT to be null values.
  458. Default is ``True``.
  459. Returns:
  460. The mean result.
  461. For different values of ``on``, the return varies:
  462. - ``on=None``: a dataset containing a groupby key column,
  463. ``"k"``, and a column-wise mean column for each original column
  464. in the dataset.
  465. - ``on=["col_1", ..., "col_n"]``: a dataset of ``n + 1``
  466. columns where the first column is the groupby key and the second
  467. through ``n + 1`` columns are the results of the aggregations.
  468. If groupby key is ``None`` then the key part of return is omitted.
  469. """
  470. return self._aggregate_on(Mean, on, ignore_nulls=ignore_nulls)
  471. @PublicAPI(api_group=CDS_API_GROUP)
  472. def std(
  473. self,
  474. on: Union[str, List[str]] = None,
  475. ddof: int = 1,
  476. ignore_nulls: bool = True,
  477. ) -> Dataset:
  478. r"""Compute grouped standard deviation aggregation.
  479. Examples:
  480. >>> import ray
  481. >>> ray.data.range(100).groupby("id").std(ddof=0) # doctest: +SKIP
  482. >>> ray.data.from_items([ # doctest: +SKIP
  483. ... {"A": i % 3, "B": i, "C": i**2} # doctest: +SKIP
  484. ... for i in range(100)]) # doctest: +SKIP
  485. ... .groupby("A") # doctest: +SKIP
  486. ... .std(["B", "C"]) # doctest: +SKIP
  487. NOTE: This uses Welford's online method for an accumulator-style
  488. computation of the standard deviation. This method was chosen due to
  489. it's numerical stability, and it being computable in a single pass.
  490. This may give different (but more accurate) results than NumPy, Pandas,
  491. and sklearn, which use a less numerically stable two-pass algorithm.
  492. See
  493. https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
  494. Args:
  495. on: a column name or a list of column names to aggregate.
  496. ddof: Delta Degrees of Freedom. The divisor used in calculations
  497. is ``N - ddof``, where ``N`` represents the number of elements.
  498. ignore_nulls: Whether to ignore null values. If ``True``, null
  499. values will be ignored when computing the std; if ``False``,
  500. if a null value is encountered, the output will be null.
  501. We consider np.nan, None, and pd.NaT to be null values.
  502. Default is ``True``.
  503. Returns:
  504. The standard deviation result.
  505. For different values of ``on``, the return varies:
  506. - ``on=None``: a dataset containing a groupby key column,
  507. ``"k"``, and a column-wise std column for each original column in
  508. the dataset.
  509. - ``on=["col_1", ..., "col_n"]``: a dataset of ``n + 1``
  510. columns where the first column is the groupby key and the second
  511. through ``n + 1`` columns are the results of the aggregations.
  512. If groupby key is ``None`` then the key part of return is omitted.
  513. """
  514. return self._aggregate_on(Std, on, ignore_nulls=ignore_nulls, ddof=ddof)
  515. def _apply_udf_to_groups(
  516. udf: Union[
  517. Callable[[DataBatch, ...], DataBatch],
  518. Callable[[DataBatch, ...], Iterator[DataBatch]],
  519. ],
  520. block: Block,
  521. keys: List[str],
  522. batch_format: Optional[str],
  523. *args: Any,
  524. **kwargs: Any,
  525. ) -> Iterator[DataBatch]:
  526. """Apply UDF to groups of rows having the same set of values of the specified
  527. columns (keys).
  528. NOTE: This function is defined at module level to avoid capturing closures and make it serializable.
  529. """
  530. block_accessor = BlockAccessor.for_block(block)
  531. boundaries = block_accessor._get_group_boundaries_sorted(keys)
  532. for start, end in zip(boundaries[:-1], boundaries[1:]):
  533. group_block = block_accessor.slice(start, end, copy=False)
  534. group_block_accessor = BlockAccessor.for_block(group_block)
  535. # Convert corresponding block of each group to batch format here,
  536. # because the block format here can be different from batch format
  537. # (e.g. block is Arrow format, and batch is NumPy format).
  538. result = udf(
  539. group_block_accessor.to_batch_format(batch_format), *args, **kwargs
  540. )
  541. # Check if the UDF returned an iterator/generator.
  542. if isinstance(result, IteratorABC):
  543. # If so, yield each item from the iterator.
  544. yield from result
  545. else:
  546. # Otherwise, yield the single result.
  547. yield result
  548. # Backwards compatibility alias.
  549. GroupedDataset = GroupedData