| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242 |
- import logging
- from collections import Counter
- from functools import partial
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Dict,
- Hashable,
- List,
- Optional,
- Set,
- Tuple,
- Union,
- )
- import numpy as np
- import pandas as pd
- import pandas.api.types
- import pyarrow as pa
- import pyarrow.compute as pc
- from ray.data._internal.util import is_null
- from ray.data.block import BlockAccessor
- from ray.data.preprocessor import (
- Preprocessor,
- PreprocessorNotFittedException,
- SerializablePreprocessorBase,
- )
- from ray.data.preprocessors.utils import (
- make_post_processor,
- )
- from ray.data.preprocessors.version_support import SerializablePreprocessor
- from ray.data.util.data_batch_conversion import BatchFormat
- from ray.util.annotations import DeveloperAPI, PublicAPI
- if TYPE_CHECKING:
- from ray.data.dataset import Dataset
- logger = logging.getLogger(__name__)
- def _get_unique_value_arrow_arrays(
- stats: Dict[str, Any], input_col: str
- ) -> Tuple[pa.Array, pa.Array]:
- """Get Arrow arrays for keys and values from encoder stats.
- Args:
- stats: The encoder's stats_ dictionary.
- input_col: The name of the column to get arrays for.
- Returns:
- Tuple of (keys_array, values_array) for the column's ordinal mapping.
- """
- stat_value = stats[f"unique_values({input_col})"]
- if isinstance(stat_value, dict):
- # Stats are in pandas dict format - convert to Arrow format
- sorted_keys = sorted(stat_value.keys())
- keys_array = pa.array(sorted_keys)
- values_array = pa.array([stat_value[k] for k in sorted_keys], type=pa.int64())
- else:
- # Stats are in Arrow tuple format: (keys_array, values_array)
- keys_array, values_array = stat_value
- return keys_array, values_array
- @PublicAPI(stability="alpha")
- @SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.ordinal_encoder")
- class OrdinalEncoder(SerializablePreprocessorBase):
- r"""Encode values within columns as ordered integer values.
- :class:`OrdinalEncoder` encodes categorical features as integers that range from
- :math:`0` to :math:`n - 1`, where :math:`n` is the number of categories.
- If you transform a value that isn't in the fitted datset, then the value is encoded
- as ``float("nan")``.
- Columns must contain either hashable values or lists of hashable values. Also, you
- can't have both scalars and lists in the same column.
- Examples:
- Use :class:`OrdinalEncoder` to encode categorical features as integers.
- >>> import pandas as pd
- >>> import ray
- >>> from ray.data.preprocessors import OrdinalEncoder
- >>> df = pd.DataFrame({
- ... "sex": ["male", "female", "male", "female"],
- ... "level": ["L4", "L5", "L3", "L4"],
- ... })
- >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
- >>> encoder = OrdinalEncoder(columns=["sex", "level"])
- >>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
- sex level
- 0 1 1
- 1 0 2
- 2 1 0
- 3 0 1
- :class:`OrdinalEncoder` can also be used in append mode by providing the
- name of the output_columns that should hold the encoded values.
- >>> encoder = OrdinalEncoder(columns=["sex", "level"], output_columns=["sex_encoded", "level_encoded"])
- >>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
- sex level sex_encoded level_encoded
- 0 male L4 1 1
- 1 female L5 0 2
- 2 male L3 1 0
- 3 female L4 0 1
- If you transform a value not present in the original dataset, then the value
- is encoded as ``float("nan")``.
- >>> df = pd.DataFrame({"sex": ["female"], "level": ["L6"]})
- >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
- >>> encoder.transform(ds).to_pandas() # doctest: +SKIP
- sex level
- 0 0 NaN
- :class:`OrdinalEncoder` can also encode categories in a list.
- >>> df = pd.DataFrame({
- ... "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"],
- ... "genre": [
- ... ["comedy", "action", "sports"],
- ... ["animation", "comedy", "action"],
- ... ["documentary"],
- ... ],
- ... })
- >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
- >>> encoder = OrdinalEncoder(columns=["genre"])
- >>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
- name genre
- 0 Shaolin Soccer [2, 0, 4]
- 1 Moana [1, 2, 0]
- 2 The Smartest Guys in the Room [3]
- Args:
- columns: The columns to separately encode.
- encode_lists: If ``True``, encode list elements. If ``False``, encode
- whole lists (i.e., replace each list with an integer). ``True``
- by default.
- output_columns: The names of the transformed columns. If None, the transformed
- columns will be the same as the input columns. If not None, the length of
- ``output_columns`` must match the length of ``columns``, othwerwise an error
- will be raised.
- .. seealso::
- :class:`OneHotEncoder`
- Another preprocessor that encodes categorical data.
- """
- def __init__(
- self,
- columns: List[str],
- *,
- encode_lists: bool = True,
- output_columns: Optional[List[str]] = None,
- ):
- super().__init__()
- # TODO: allow user to specify order of values within each column.
- self.columns = columns
- self.encode_lists = encode_lists
- self.output_columns = Preprocessor._derive_and_validate_output_columns(
- columns, output_columns
- )
- def _fit(self, dataset: "Dataset") -> Preprocessor:
- self.stat_computation_plan.add_callable_stat(
- stat_fn=lambda key_gen: compute_unique_value_indices(
- dataset=dataset,
- columns=self.columns,
- encode_lists=self.encode_lists,
- key_gen=key_gen,
- ),
- post_process_fn=unique_post_fn(),
- stat_key_fn=lambda col: f"unique({col})",
- post_key_fn=lambda col: f"unique_values({col})",
- columns=self.columns,
- )
- return self
- def _get_ordinal_map(self, column_name: str) -> Dict[Any, int]:
- """Get the ordinal mapping for a column as a dict.
- Stats can be stored in either:
- - Dict format: {value: index} (from pandas-style processing)
- - Arrow format: (keys_array, values_array) tuple
- This method returns a dict in either case.
- """
- stat_value = self.stats_[f"unique_values({column_name})"]
- if isinstance(stat_value, dict):
- return stat_value
- # Arrow tuple format (keys_array, values_array)
- keys_array, values_array = stat_value
- return {k.as_py(): v.as_py() for k, v in zip(keys_array, values_array)}
- def _get_arrow_arrays(self, input_col: str) -> Tuple[pa.Array, pa.Array]:
- """Get Arrow arrays for keys and values."""
- return _get_unique_value_arrow_arrays(self.stats_, input_col)
- def _encode_list_element(self, element: list, *, column_name: str):
- ordinal_map = self._get_ordinal_map(column_name)
- # If encoding lists, entire column is flattened, hence we map individual
- # elements inside the list element (of the column)
- if self.encode_lists:
- return [ordinal_map.get(x) for x in element]
- return ordinal_map.get(tuple(element))
- def _transform_pandas(self, df: pd.DataFrame):
- _validate_df(df, *self.columns)
- def column_ordinal_encoder(s: pd.Series):
- if _is_series_composed_of_lists(s):
- return s.map(
- lambda elem: self._encode_list_element(elem, column_name=s.name)
- )
- s_values = self._get_ordinal_map(s.name)
- return s.map(s_values)
- df[self.output_columns] = df[self.columns].apply(column_ordinal_encoder)
- return df
- def _transform_arrow(self, table: pa.Table) -> pa.Table:
- """Transform using fast native PyArrow operations for scalar columns.
- List-type columns are preferably handled by _transform_pandas, which is selected
- via _determine_transform_to_use when a PyArrow schema is available. However,
- for pandas-backed datasets (PandasBlockSchema), we can't detect list columns
- until runtime, so we fall back to pandas here if list columns are found.
- """
- # Validate that columns don't contain null values (consistent with pandas path)
- _validate_arrow(table, *self.columns)
- # Check for list columns (runtime fallback for PandasBlockSchema datasets)
- for col_name in self.columns:
- col_type = table.schema.field(col_name).type
- if pa.types.is_list(col_type) or pa.types.is_large_list(col_type):
- # Fall back to pandas transform for list columns
- df = table.to_pandas()
- result_df = self._transform_pandas(df)
- return pa.Table.from_pandas(result_df, preserve_index=False)
- for input_col, output_col in zip(self.columns, self.output_columns):
- column = table.column(input_col)
- encoded_column = self._encode_column_vectorized(column, input_col)
- table = BlockAccessor.for_block(table).upsert_column(
- output_col, encoded_column
- )
- return table
- def _encode_column_vectorized(
- self, column: pa.ChunkedArray, input_col: str
- ) -> pa.Array:
- """Encode column using PyArrow's vectorized pc.index_in.
- Unseen categories are encoded as null in the output, which becomes NaN
- when converted to pandas. Null values should be validated before calling
- this method via _validate_arrow.
- """
- keys_array, values_array = self._get_arrow_arrays(input_col)
- if keys_array.type != column.type:
- keys_array = pc.cast(keys_array, column.type)
- # pc.index_in returns null for values not found in keys_array
- # (including null input values and unseen categories)
- indices = pc.index_in(column, keys_array)
- # pc.take preserves nulls from indices, so null inputs -> null outputs
- return pc.take(values_array, indices)
- @classmethod
- @DeveloperAPI
- def preferred_batch_format(cls) -> BatchFormat:
- return BatchFormat.ARROW
- def _get_serializable_fields(self) -> Dict[str, Any]:
- return {
- "columns": self.columns,
- "output_columns": self.output_columns,
- "encode_lists": self.encode_lists,
- "_fitted": getattr(self, "_fitted", None),
- }
- def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
- # required fields
- self.columns = fields["columns"]
- self.output_columns = fields["output_columns"]
- self.encode_lists = fields["encode_lists"]
- # optional fields
- self._fitted = fields.get("_fitted")
- def __repr__(self):
- return (
- f"{self.__class__.__name__}(columns={self.columns!r}, "
- f"encode_lists={self.encode_lists!r}, "
- f"output_columns={self.output_columns!r})"
- )
- @PublicAPI(stability="alpha")
- @SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.one_hot_encoder")
- class OneHotEncoder(SerializablePreprocessorBase):
- r"""`One-hot encode <https://en.wikipedia.org/wiki/One-hot#Machine_learning_and_statistics>`_
- categorical data.
- This preprocessor transforms each specified column into a one-hot encoded vector.
- Each element in the vector corresponds to a unique category in the column, with a
- value of 1 if the category matches and 0 otherwise.
- If a category is infrequent (based on ``max_categories``) or not present in the
- fitted dataset, it is encoded as all 0s.
- Columns must contain hashable objects or lists of hashable objects.
- .. note::
- Lists are treated as categories. If you want to encode individual list
- elements, use :class:`MultiHotEncoder`.
- Example:
- >>> import pandas as pd
- >>> import ray
- >>> from ray.data.preprocessors import OneHotEncoder
- >>>
- >>> df = pd.DataFrame({"color": ["red", "green", "red", "red", "blue", "green"]})
- >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
- >>> encoder = OneHotEncoder(columns=["color"])
- >>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
- color
- 0 [0, 0, 1]
- 1 [0, 1, 0]
- 2 [0, 0, 1]
- 3 [0, 0, 1]
- 4 [1, 0, 0]
- 5 [0, 1, 0]
- OneHotEncoder can also be used in append mode by providing the
- name of the output_columns that should hold the encoded values.
- >>> encoder = OneHotEncoder(columns=["color"], output_columns=["color_encoded"])
- >>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
- color color_encoded
- 0 red [0, 0, 1]
- 1 green [0, 1, 0]
- 2 red [0, 0, 1]
- 3 red [0, 0, 1]
- 4 blue [1, 0, 0]
- 5 green [0, 1, 0]
- If you one-hot encode a value that isn't in the fitted dataset, then the
- value is encoded with zeros.
- >>> df = pd.DataFrame({"color": ["yellow"]})
- >>> batch = ray.data.from_pandas(df) # doctest: +SKIP
- >>> encoder.transform(batch).to_pandas() # doctest: +SKIP
- color color_encoded
- 0 yellow [0, 0, 0]
- Likewise, if you one-hot encode an infrequent value, then the value is encoded
- with zeros.
- >>> encoder = OneHotEncoder(columns=["color"], max_categories={"color": 2})
- >>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
- color
- 0 [1, 0]
- 1 [0, 1]
- 2 [1, 0]
- 3 [1, 0]
- 4 [0, 0]
- 5 [0, 1]
- Args:
- columns: The columns to separately encode.
- max_categories: The maximum number of features to create for each column.
- If a value isn't specified for a column, then a feature is created
- for every category in that column.
- output_columns: The names of the transformed columns. If None, the transformed
- columns will be the same as the input columns. If not None, the length of
- ``output_columns`` must match the length of ``columns``, othwerwise an error
- will be raised.
- .. seealso::
- :class:`MultiHotEncoder`
- If you want to encode individual list elements, use
- :class:`MultiHotEncoder`.
- :class:`OrdinalEncoder`
- If your categories are ordered, you may want to use
- :class:`OrdinalEncoder`.
- """ # noqa: E501
- def __init__(
- self,
- columns: List[str],
- *,
- max_categories: Optional[Dict[str, int]] = None,
- output_columns: Optional[List[str]] = None,
- ):
- super().__init__()
- # TODO: add `drop` parameter.
- self.columns = columns
- self.max_categories = max_categories or {}
- self.output_columns = Preprocessor._derive_and_validate_output_columns(
- columns, output_columns
- )
- def _fit(self, dataset: "Dataset") -> Preprocessor:
- self.stat_computation_plan.add_callable_stat(
- stat_fn=lambda key_gen: compute_unique_value_indices(
- dataset=dataset,
- columns=self.columns,
- encode_lists=False,
- key_gen=key_gen,
- max_categories=self.max_categories,
- ),
- post_process_fn=unique_post_fn(),
- stat_key_fn=lambda col: f"unique({col})",
- post_key_fn=lambda col: f"unique_values({col})",
- columns=self.columns,
- )
- return self
- @classmethod
- @DeveloperAPI
- def preferred_batch_format(cls) -> BatchFormat:
- return BatchFormat.ARROW
- def safe_get(self, v: Any, stats: Dict[str, int]):
- if isinstance(v, (list, np.ndarray)):
- v = tuple(v)
- if isinstance(v, Hashable):
- return stats.get(v, -1)
- else:
- return -1 # Unhashable type treated as a missing category
- def _transform_pandas(self, df: pd.DataFrame):
- _validate_df(df, *self.columns)
- # Compute new one-hot encoded columns
- for column, output_column in zip(self.columns, self.output_columns):
- stats = self.stats_[f"unique_values({column})"]
- num_categories = len(stats)
- one_hot = np.zeros((len(df), num_categories), dtype=np.uint8)
- # Integer indices for each category in the column
- codes = df[column].apply(lambda v: self.safe_get(v, stats)).to_numpy()
- # Filter to only the rows that have a valid category
- valid_category_mask = codes != -1
- # Dimension should be (num_rows, ) - 1D boolean array
- non_zero_indices = np.nonzero(valid_category_mask)[0]
- # Mark the corresponding categories as 1
- one_hot[
- non_zero_indices,
- codes[valid_category_mask],
- ] = 1
- df[output_column] = one_hot.tolist()
- return df
- def _transform_arrow(self, table: pa.Table) -> pa.Table:
- """Transform using fast native PyArrow operations for scalar columns.
- List-type columns are preferably handled by _transform_pandas, which is selected
- via _determine_transform_to_use when a PyArrow schema is available. However,
- for pandas-backed datasets (PandasBlockSchema), we can't detect list columns
- until runtime, so we fall back to pandas here if list columns are found.
- """
- # Validate that columns don't contain null values (consistent with pandas path)
- _validate_arrow(table, *self.columns)
- # Check for list columns (runtime fallback for PandasBlockSchema datasets)
- for col_name in self.columns:
- col_type = table.schema.field(col_name).type
- if pa.types.is_list(col_type) or pa.types.is_large_list(col_type):
- # Fall back to pandas transform for list columns
- df = table.to_pandas()
- result_df = self._transform_pandas(df)
- return pa.Table.from_pandas(result_df, preserve_index=False)
- for input_col, output_col in zip(self.columns, self.output_columns):
- column = table.column(input_col)
- encoded_column = self._encode_column_one_hot(column, input_col)
- table = BlockAccessor.for_block(table).upsert_column(
- output_col, encoded_column
- )
- return table
- def _get_arrow_arrays(self, input_col: str) -> Tuple[pa.Array, pa.Array]:
- """Get Arrow arrays for keys and values."""
- return _get_unique_value_arrow_arrays(self.stats_, input_col)
- def _encode_column_one_hot(
- self, column: pa.ChunkedArray, input_col: str
- ) -> pa.FixedSizeListArray:
- """Encode a column to one-hot vectors using Arrow arrays.
- Unseen categories are encoded as all-zeros vectors, matching the pandas
- behavior. Null values should be validated before calling this method
- via _validate_arrow.
- """
- keys_array, _ = self._get_arrow_arrays(input_col)
- num_categories = len(keys_array)
- # Cast keys to match column type if needed
- if keys_array.type != column.type:
- keys_array = pc.cast(keys_array, column.type)
- # Use pc.index_in to find position of each value in keys_array
- # Returns null for null inputs and unseen categories (values not in keys_array)
- indices = pc.index_in(column, keys_array)
- # Fill nulls with -1 so they can be filtered out below (resulting in all-zeros)
- indices_filled = pc.fill_null(indices, -1)
- # Create one-hot encoded matrix using vectorized NumPy operations
- num_rows = len(column)
- indices_np = indices_filled.to_numpy()
- one_hot_matrix = np.zeros((num_rows, num_categories), dtype=np.uint8)
- # Find valid indices (not -1) and set 1s at the appropriate positions
- valid_mask = indices_np != -1
- valid_indices = np.nonzero(valid_mask)[0]
- if len(valid_indices) > 0:
- one_hot_matrix[valid_indices, indices_np[valid_mask]] = 1
- # Convert to Arrow FixedSizeListArray for efficient storage
- return pa.FixedSizeListArray.from_arrays(one_hot_matrix.ravel(), num_categories)
- def _get_serializable_fields(self) -> Dict[str, Any]:
- return {
- "columns": self.columns,
- "output_columns": self.output_columns,
- "max_categories": self.max_categories,
- "_fitted": getattr(self, "_fitted", None),
- }
- def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
- # required fields
- self.columns = fields["columns"]
- self.output_columns = fields["output_columns"]
- self.max_categories = fields["max_categories"]
- # optional fields
- self._fitted = fields.get("_fitted")
- def __repr__(self):
- return (
- f"{self.__class__.__name__}(columns={self.columns!r}, "
- f"max_categories={self.max_categories!r}, "
- f"output_columns={self.output_columns!r})"
- )
- @PublicAPI(stability="alpha")
- @SerializablePreprocessor(
- version=1, identifier="io.ray.preprocessors.multi_hot_encoder"
- )
- class MultiHotEncoder(SerializablePreprocessorBase):
- r"""Multi-hot encode categorical data.
- This preprocessor replaces each list of categories with an :math:`m`-length binary
- list, where :math:`m` is the number of unique categories in the column or the value
- specified in ``max_categories``. The :math:`i\\text{-th}` element of the binary list
- is :math:`1` if category :math:`i` is in the input list and :math:`0` otherwise.
- Columns must contain hashable objects or lists of hashable objects.
- Also, you can't have both types in the same column.
- .. note::
- The logic is similar to scikit-learn's [MultiLabelBinarizer][1]
- Examples:
- >>> import pandas as pd
- >>> import ray
- >>> from ray.data.preprocessors import MultiHotEncoder
- >>>
- >>> df = pd.DataFrame({
- ... "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"],
- ... "genre": [
- ... ["comedy", "action", "sports"],
- ... ["animation", "comedy", "action"],
- ... ["documentary"],
- ... ],
- ... })
- >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
- >>>
- >>> encoder = MultiHotEncoder(columns=["genre"])
- >>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
- name genre
- 0 Shaolin Soccer [1, 0, 1, 0, 1]
- 1 Moana [1, 1, 1, 0, 0]
- 2 The Smartest Guys in the Room [0, 0, 0, 1, 0]
- :class:`MultiHotEncoder` can also be used in append mode by providing the
- name of the output_columns that should hold the encoded values.
- >>> encoder = MultiHotEncoder(columns=["genre"], output_columns=["genre_encoded"])
- >>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
- name genre genre_encoded
- 0 Shaolin Soccer [comedy, action, sports] [1, 0, 1, 0, 1]
- 1 Moana [animation, comedy, action] [1, 1, 1, 0, 0]
- 2 The Smartest Guys in the Room [documentary] [0, 0, 0, 1, 0]
- If you specify ``max_categories``, then :class:`MultiHotEncoder`
- creates features for only the most frequent categories.
- >>> encoder = MultiHotEncoder(columns=["genre"], max_categories={"genre": 3})
- >>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
- name genre
- 0 Shaolin Soccer [1, 1, 1]
- 1 Moana [1, 1, 0]
- 2 The Smartest Guys in the Room [0, 0, 0]
- >>> encoder.stats_ # doctest: +SKIP
- OrderedDict([('unique_values(genre)', {'comedy': 0, 'action': 1, 'sports': 2})])
- Args:
- columns: The columns to separately encode.
- max_categories: The maximum number of features to create for each column.
- If a value isn't specified for a column, then a feature is created
- for every unique category in that column.
- output_columns: The names of the transformed columns. If None, the transformed
- columns will be the same as the input columns. If not None, the length of
- ``output_columns`` must match the length of ``columns``, othwerwise an error
- will be raised.
- .. seealso::
- :class:`OneHotEncoder`
- If you're encoding individual categories instead of lists of
- categories, use :class:`OneHotEncoder`.
- :class:`OrdinalEncoder`
- If your categories are ordered, you may want to use
- :class:`OrdinalEncoder`.
- [1]: https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MultiLabelBinarizer.html
- """
- def __init__(
- self,
- columns: List[str],
- *,
- max_categories: Optional[Dict[str, int]] = None,
- output_columns: Optional[List[str]] = None,
- ):
- super().__init__()
- # TODO: add `drop` parameter.
- self.columns = columns
- self.max_categories = max_categories or {}
- self.output_columns = Preprocessor._derive_and_validate_output_columns(
- columns, output_columns
- )
- def _fit(self, dataset: "Dataset") -> Preprocessor:
- self.stat_computation_plan.add_callable_stat(
- stat_fn=lambda key_gen: compute_unique_value_indices(
- dataset=dataset,
- columns=self.columns,
- encode_lists=True,
- key_gen=key_gen,
- max_categories=self.max_categories,
- ),
- post_process_fn=unique_post_fn(),
- stat_key_fn=lambda col: f"unique({col})",
- post_key_fn=lambda col: f"unique_values({col})",
- columns=self.columns,
- )
- return self
- def _transform_pandas(self, df: pd.DataFrame):
- _validate_df(df, *self.columns)
- def encode_list(element: list, *, name: str):
- if isinstance(element, np.ndarray):
- element = element.tolist()
- elif not isinstance(element, list):
- element = [element]
- stats = self.stats_[f"unique_values({name})"]
- counter = Counter(element)
- return [counter.get(x, 0) for x in stats]
- for column, output_column in zip(self.columns, self.output_columns):
- df[output_column] = df[column].map(partial(encode_list, name=column))
- return df
- def _get_serializable_fields(self) -> Dict[str, Any]:
- return {
- "columns": self.columns,
- "output_columns": self.output_columns,
- "max_categories": self.max_categories,
- "_fitted": getattr(self, "_fitted", None),
- }
- def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
- # required fields
- self.columns = fields["columns"]
- self.output_columns = fields["output_columns"]
- self.max_categories = fields["max_categories"]
- # optional fields
- self._fitted = fields.get("_fitted")
- def __repr__(self):
- return (
- f"{self.__class__.__name__}(columns={self.columns!r}, "
- f"max_categories={self.max_categories!r}, "
- f"output_columns={self.output_columns})"
- )
- @PublicAPI(stability="alpha")
- @SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.label_encoder")
- class LabelEncoder(SerializablePreprocessorBase):
- r"""Encode labels as integer targets.
- :class:`LabelEncoder` encodes labels as integer targets that range from
- :math:`0` to :math:`n - 1`, where :math:`n` is the number of unique labels.
- If you transform a label that isn't in the fitted datset, then the label is encoded
- as ``float("nan")``.
- Examples:
- >>> import pandas as pd
- >>> import ray
- >>> df = pd.DataFrame({
- ... "sepal_width": [5.1, 7, 4.9, 6.2],
- ... "sepal_height": [3.5, 3.2, 3, 3.4],
- ... "species": ["setosa", "versicolor", "setosa", "virginica"]
- ... })
- >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
- >>>
- >>> from ray.data.preprocessors import LabelEncoder
- >>> encoder = LabelEncoder(label_column="species")
- >>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
- sepal_width sepal_height species
- 0 5.1 3.5 0
- 1 7.0 3.2 1
- 2 4.9 3.0 0
- 3 6.2 3.4 2
- You can also provide the name of the output column that should hold the encoded
- labels if you want to use :class:`LabelEncoder` in append mode.
- >>> encoder = LabelEncoder(label_column="species", output_column="species_encoded")
- >>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
- sepal_width sepal_height species species_encoded
- 0 5.1 3.5 setosa 0
- 1 7.0 3.2 versicolor 1
- 2 4.9 3.0 setosa 0
- 3 6.2 3.4 virginica 2
- If you transform a label not present in the original dataset, then the new
- label is encoded as ``float("nan")``.
- >>> df = pd.DataFrame({
- ... "sepal_width": [4.2],
- ... "sepal_height": [2.7],
- ... "species": ["bracteata"]
- ... })
- >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
- >>> encoder.transform(ds).to_pandas() # doctest: +SKIP
- sepal_width sepal_height species
- 0 4.2 2.7 NaN
- Args:
- label_column: A column containing labels that you want to encode.
- output_column: The name of the column that will contain the encoded
- labels. If None, the output column will have the same name as the
- input column.
- .. seealso::
- :class:`OrdinalEncoder`
- If you're encoding ordered features, use :class:`OrdinalEncoder` instead of
- :class:`LabelEncoder`.
- """
- def __init__(self, label_column: str, *, output_column: Optional[str] = None):
- super().__init__()
- self.label_column = label_column
- self.output_column = output_column or label_column
- def _fit(self, dataset: "Dataset") -> Preprocessor:
- self.stat_computation_plan.add_callable_stat(
- stat_fn=lambda key_gen: compute_unique_value_indices(
- dataset=dataset,
- columns=[self.label_column],
- key_gen=key_gen,
- ),
- post_process_fn=unique_post_fn(),
- stat_key_fn=lambda col: f"unique({col})",
- post_key_fn=lambda col: f"unique_values({col})",
- columns=[self.label_column],
- )
- return self
- def _transform_pandas(self, df: pd.DataFrame):
- _validate_df(df, self.label_column)
- def column_label_encoder(s: pd.Series):
- s_values = self.stats_[f"unique_values({s.name})"]
- return s.map(s_values)
- df[self.output_column] = df[self.label_column].transform(column_label_encoder)
- return df
- def inverse_transform(self, ds: "Dataset") -> "Dataset":
- """Inverse transform the given dataset.
- Args:
- ds: Input Dataset that has been fitted and/or transformed.
- Returns:
- ray.data.Dataset: The inverse transformed Dataset.
- Raises:
- PreprocessorNotFittedException: if ``fit`` is not called yet.
- """
- fit_status = self.fit_status()
- if fit_status in (
- Preprocessor.FitStatus.PARTIALLY_FITTED,
- Preprocessor.FitStatus.NOT_FITTED,
- ):
- raise PreprocessorNotFittedException(
- "`fit` must be called before `inverse_transform`, "
- )
- kwargs = self._get_transform_config()
- return ds.map_batches(
- self._inverse_transform_pandas, batch_format=BatchFormat.PANDAS, **kwargs
- )
- def _inverse_transform_pandas(self, df: pd.DataFrame):
- def column_label_decoder(s: pd.Series):
- inverse_values = {
- value: key
- for key, value in self.stats_[
- f"unique_values({self.label_column})"
- ].items()
- }
- return s.map(inverse_values)
- df[self.label_column] = df[self.output_column].transform(column_label_decoder)
- return df
- def get_input_columns(self) -> List[str]:
- return [self.label_column]
- def get_output_columns(self) -> List[str]:
- return [self.output_column]
- def _get_serializable_fields(self) -> Dict[str, Any]:
- return {
- "label_column": self.label_column,
- "output_column": self.output_column,
- "_fitted": getattr(self, "_fitted", None),
- }
- def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
- # required fields
- self.label_column = fields["label_column"]
- self.output_column = fields["output_column"]
- # optional fields
- self._fitted = fields.get("_fitted")
- def __repr__(self):
- return f"{self.__class__.__name__}(label_column={self.label_column!r}, output_column={self.output_column!r})"
- @PublicAPI(stability="alpha")
- @SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.categorizer")
- class Categorizer(SerializablePreprocessorBase):
- r"""Convert columns to ``pd.CategoricalDtype``.
- Use this preprocessor with frameworks that have built-in support for
- ``pd.CategoricalDtype`` like LightGBM.
- .. warning::
- If you don't specify ``dtypes``, fit this preprocessor before splitting
- your dataset into train and test splits. This ensures categories are
- consistent across splits.
- Examples:
- >>> import pandas as pd
- >>> import ray
- >>> from ray.data.preprocessors import Categorizer
- >>>
- >>> df = pd.DataFrame(
- ... {
- ... "sex": ["male", "female", "male", "female"],
- ... "level": ["L4", "L5", "L3", "L4"],
- ... })
- >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
- >>> categorizer = Categorizer(columns=["sex", "level"])
- >>> categorizer.fit_transform(ds).schema().types # doctest: +SKIP
- [CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5'], ordered=False)]
- :class:`Categorizer` can also be used in append mode by providing the
- name of the output_columns that should hold the categorized values.
- >>> categorizer = Categorizer(columns=["sex", "level"], output_columns=["sex_cat", "level_cat"])
- >>> categorizer.fit_transform(ds).to_pandas() # doctest: +SKIP
- sex level sex_cat level_cat
- 0 male L4 male L4
- 1 female L5 female L5
- 2 male L3 male L3
- 3 female L4 female L4
- If you know the categories in advance, you can specify the categories with the
- ``dtypes`` parameter.
- >>> categorizer = Categorizer(
- ... columns=["sex", "level"],
- ... dtypes={"level": pd.CategoricalDtype(["L3", "L4", "L5", "L6"], ordered=True)},
- ... )
- >>> categorizer.fit_transform(ds).schema().types # doctest: +SKIP
- [CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5', 'L6'], ordered=True)]
- Args:
- columns: The columns to convert to ``pd.CategoricalDtype``.
- dtypes: An optional dictionary that maps columns to ``pd.CategoricalDtype``
- objects. If you don't include a column in ``dtypes``, the categories
- are inferred.
- output_columns: The names of the transformed columns. If None, the transformed
- columns will be the same as the input columns. If not None, the length of
- ``output_columns`` must match the length of ``columns``, othwerwise an error
- will be raised.
- """ # noqa: E501
- def __init__(
- self,
- columns: List[str],
- dtypes: Optional[Dict[str, pd.CategoricalDtype]] = None,
- output_columns: Optional[List[str]] = None,
- ):
- super().__init__()
- if not dtypes:
- dtypes = {}
- self.columns = columns
- self.dtypes = dtypes
- self.output_columns = Preprocessor._derive_and_validate_output_columns(
- columns, output_columns
- )
- def _fit(self, dataset: "Dataset") -> Preprocessor:
- columns_to_get = [
- column for column in self.columns if column not in self.dtypes
- ]
- self.stats_ |= self.dtypes
- if not columns_to_get:
- return self
- def callback(unique_indices: Dict[str, Dict]) -> pd.CategoricalDtype:
- return pd.CategoricalDtype(unique_indices.keys())
- self.stat_computation_plan.add_callable_stat(
- stat_fn=lambda key_gen: compute_unique_value_indices(
- dataset=dataset,
- columns=columns_to_get,
- key_gen=key_gen,
- ),
- post_process_fn=make_post_processor(
- base_fn=unique_post_fn(drop_na_values=True),
- callbacks=[callback],
- ),
- stat_key_fn=lambda col: f"unique({col})",
- post_key_fn=lambda col: col,
- columns=columns_to_get,
- )
- return self
- def _transform_pandas(self, df: pd.DataFrame):
- df[self.output_columns] = df[self.columns].astype(self.stats_)
- return df
- def _get_serializable_fields(self) -> Dict[str, Any]:
- return {
- "columns": self.columns,
- "output_columns": self.output_columns,
- "_fitted": getattr(self, "_fitted", None),
- "dtypes": {
- col: {"categories": list(dtype.categories), "ordered": dtype.ordered}
- for col, dtype in self.dtypes.items()
- }
- if hasattr(self, "dtypes") and self.dtypes
- else None,
- }
- def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
- # required fields
- # Handle dtypes field specially
- self.dtypes = (
- {
- col: pd.CategoricalDtype(
- categories=dtype_data["categories"], ordered=dtype_data["ordered"]
- )
- for col, dtype_data in fields["dtypes"].items()
- }
- if fields.get("dtypes")
- else {}
- )
- self.columns = fields["columns"]
- self.output_columns = fields["output_columns"]
- # optional fields
- self._fitted = fields.get("_fitted")
- def __repr__(self):
- return (
- f"{self.__class__.__name__}(columns={self.columns!r}, "
- f"dtypes={self.dtypes!r}, output_columns={self.output_columns!r})"
- )
- def compute_unique_value_indices(
- *,
- dataset: "Dataset",
- columns: List[str],
- key_gen: Callable,
- encode_lists: bool = True,
- max_categories: Optional[Dict[str, int]] = None,
- ):
- if max_categories is None:
- max_categories = {}
- columns_set = set(columns)
- for column in max_categories:
- if column not in columns_set:
- raise ValueError(
- f"You set `max_categories` for {column}, which is not present in "
- f"{columns}."
- )
- def get_pd_value_counts_per_column(col: pd.Series) -> Dict:
- # special handling for lists
- if _is_series_composed_of_lists(col):
- if encode_lists:
- counter = Counter()
- def update_counter(element):
- counter.update(element)
- return element
- col.map(update_counter)
- return counter
- else:
- # convert to tuples to make lists hashable
- col = col.map(lambda x: tuple(x))
- return Counter(col.value_counts(dropna=False).to_dict())
- def get_pd_value_counts(df: pd.DataFrame) -> Dict[str, List[Dict]]:
- df_columns = df.columns.tolist()
- result = {}
- for col in columns:
- if col in df_columns:
- result[col] = [get_pd_value_counts_per_column(df[col])]
- else:
- raise ValueError(
- f"Column '{col}' does not exist in DataFrame, which has columns: {df_columns}" # noqa: E501
- )
- return result
- value_counts_ds = dataset.map_batches(get_pd_value_counts, batch_format="pandas")
- unique_values_by_col: Dict[str, Set] = {key_gen(col): set() for col in columns}
- for batch in value_counts_ds.iter_batches(batch_size=None):
- for col, counters in batch.items():
- for counter in counters:
- counter: Dict[Any, int] = {
- k: v for k, v in counter.items() if v is not None
- }
- if col in max_categories:
- counter: Dict[Any, int] = dict(
- Counter(counter).most_common(max_categories[col])
- )
- # add only column values since frequencies are needed beyond this point
- unique_values_by_col[key_gen(col)].update(counter.keys())
- return unique_values_by_col
- # FIXME: the arrow format path is broken: https://anyscale1.atlassian.net/browse/DATA-1788
- def unique_post_fn(
- drop_na_values: bool = False, batch_format: BatchFormat = None
- ) -> Callable:
- """
- Returns a post-processing function that generates an encoding map by
- sorting the unique values produced during aggregation or stats computation.
- Args:
- drop_na_values: If True, NA/null values will be silently dropped from the
- encoding map. If False, raises an error if any NA/null values are present.
- batch_format: Determines the output format of the encoding map.
- - If BatchFormat.ARROW: Returns Arrow format (tuple of arrays) for scalar
- types, or dict format for list types that PyArrow can't sort.
- - Otherwise: Returns pandas dict format {value: index}.
- Returns:
- A callable that takes unique values and returns an encoding map.
- The map format depends on batch_format and input types:
- - Dict format: {value: int} - used for pandas path or list-type data
- - Arrow format: (keys_array, values_array) - used for Arrow path with scalar data
- """
- def gen_value_index(values: List) -> Dict[Any, int]:
- """
- Generate an encoding map from a list of unique values using Python sorting.
- Args:
- values: List of unique values to encode (can include lists/tuples).
- Returns:
- Dict mapping each value to a unique integer index.
- List values are converted to tuples for hashability.
- Raises:
- ValueError: If null values are present and drop_na_values is False.
- """
- # NOTE: We special-case null here since it prevents provided
- # values sequence from being sortable
- if any(is_null(v) for v in values) and not drop_na_values:
- raise ValueError(
- "Unable to fit column because it contains null"
- " values. Consider imputing missing values first."
- )
- non_null_values = [v for v in values if not is_null(v)]
- return {
- (v if not isinstance(v, list) else tuple(v)): i
- # NOTE: Sorting applied to produce stable encoding
- for i, v in enumerate(sorted(non_null_values))
- }
- def gen_value_index_arrow_from_arrow(
- values: Union["pa.ListScalar", "pa.Array"],
- ) -> Union[Tuple["pa.Array", "pa.Array"], Dict[Any, int]]:
- """Generate an encoding map from unique values using Arrow-native operations.
- Args:
- values: The aggregation result as a pa.ListScalar (list of unique values)
- or a pa.Array of values directly.
- Returns:
- For scalar types that PyArrow can sort natively, returns a tuple of
- (sorted_keys, indices) as pa.Array. For list types that require fallback,
- returns a dict mapping {value: index}.
- Note:
- PyArrow's sort_indices doesn't support list types, so we fall back to
- dict format for columns containing lists. The _transform_arrow method
- handles this by detecting dict-format stats and converting as needed.
- """
- # Handle ListScalar from aggregation result
- if isinstance(values, pa.ListScalar):
- values = values.values
- # Check if values contain list types - PyArrow can't sort these
- # Fall back to pandas dict format for list types
- if pa.types.is_list(values.type) or pa.types.is_large_list(values.type):
- return gen_value_index(values.to_pylist())
- # Drop nulls if requested
- if drop_na_values:
- values = pc.drop_null(values)
- else:
- if pc.any(pc.is_null(values)).as_py():
- raise ValueError(
- "Unable to fit column because it contains null"
- " values. Consider imputing missing values first."
- )
- # Sort the values
- sorted_indices = pc.sort_indices(values)
- sorted_values = pc.take(values, sorted_indices)
- # Create the index array
- values_array = pa.array(range(len(sorted_values)), type=pa.int64())
- return (sorted_values, values_array)
- return (
- gen_value_index_arrow_from_arrow
- if batch_format == BatchFormat.ARROW
- else gen_value_index
- )
- def _validate_df(df: pd.DataFrame, *columns: str) -> None:
- null_columns = [column for column in columns if df[column].isnull().values.any()]
- if null_columns:
- raise ValueError(
- f"Unable to transform columns {null_columns} because they contain "
- f"null values. Consider imputing missing values first."
- )
- def _validate_arrow(table: pa.Table, *columns: str) -> None:
- """Validate that specified columns in an Arrow table do not contain null values.
- Args:
- table: The Arrow table to validate.
- *columns: Column names to check for null values.
- Raises:
- ValueError: If any of the specified columns contain null values.
- """
- null_columns = [
- column for column in columns if pc.any(pc.is_null(table.column(column))).as_py()
- ]
- if null_columns:
- raise ValueError(
- f"Unable to transform columns {null_columns} because they contain "
- f"null values. Consider imputing missing values first."
- )
- def _is_series_composed_of_lists(series: pd.Series) -> bool:
- # we assume that all elements are a list here
- first_not_none_element = next(
- (element for element in series if element is not None), None
- )
- return pandas.api.types.is_object_dtype(series.dtype) and isinstance(
- first_not_none_element, (list, np.ndarray)
- )
|