discretizer.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Type, Union
  2. import numpy as np
  3. import pandas as pd
  4. from ray.data.aggregate import Max, Min
  5. from ray.data.preprocessor import Preprocessor
  6. from ray.util.annotations import PublicAPI
  7. if TYPE_CHECKING:
  8. from ray.data.dataset import Dataset
  9. class _AbstractKBinsDiscretizer(Preprocessor):
  10. """Abstract base class for all KBinsDiscretizers.
  11. Essentially a thin wraper around ``pd.cut``.
  12. Expects either ``self.stats_`` or ``self.bins`` to be set and
  13. contain {column:list_of_bin_intervals}.
  14. """
  15. def _transform_pandas(self, df: pd.DataFrame):
  16. def bin_values(s: pd.Series) -> pd.Series:
  17. if s.name not in self.columns:
  18. return s
  19. labels = self.dtypes.get(s.name) if self.dtypes else False
  20. ordered = True
  21. if labels:
  22. if isinstance(labels, pd.CategoricalDtype):
  23. ordered = labels.ordered
  24. labels = list(labels.categories)
  25. else:
  26. labels = False
  27. bins = self.stats_ if self._is_fittable else self.bins
  28. return pd.cut(
  29. s,
  30. bins[s.name] if isinstance(bins, dict) else bins,
  31. right=self.right,
  32. labels=labels,
  33. ordered=ordered,
  34. retbins=False,
  35. include_lowest=self.include_lowest,
  36. duplicates=self.duplicates,
  37. )
  38. binned_df = df.apply(bin_values, axis=0)
  39. df[self.output_columns] = binned_df[self.columns]
  40. return df
  41. def _validate_bins_columns(self):
  42. if isinstance(self.bins, dict) and not all(
  43. col in self.bins for col in self.columns
  44. ):
  45. raise ValueError(
  46. "If `bins` is a dictionary, all elements of `columns` must be present "
  47. "in it."
  48. )
  49. def __repr__(self):
  50. attr_str = ", ".join(
  51. [
  52. f"{attr_name}={attr_value!r}"
  53. for attr_name, attr_value in vars(self).items()
  54. if not attr_name.startswith("_")
  55. ]
  56. )
  57. return f"{self.__class__.__name__}({attr_str})"
  58. @PublicAPI(stability="alpha")
  59. class CustomKBinsDiscretizer(_AbstractKBinsDiscretizer):
  60. """Bin values into discrete intervals using custom bin edges.
  61. Columns must contain numerical values.
  62. Examples:
  63. Use :class:`CustomKBinsDiscretizer` to bin continuous features.
  64. >>> import pandas as pd
  65. >>> import ray
  66. >>> from ray.data.preprocessors import CustomKBinsDiscretizer
  67. >>> df = pd.DataFrame({
  68. ... "value_1": [0.2, 1.4, 2.5, 6.2, 9.7, 2.1],
  69. ... "value_2": [10, 15, 13, 12, 23, 25],
  70. ... })
  71. >>> ds = ray.data.from_pandas(df)
  72. >>> discretizer = CustomKBinsDiscretizer(
  73. ... columns=["value_1", "value_2"],
  74. ... bins=[0, 1, 4, 10, 25]
  75. ... )
  76. >>> discretizer.transform(ds).to_pandas()
  77. value_1 value_2
  78. 0 0 2
  79. 1 1 3
  80. 2 1 3
  81. 3 2 3
  82. 4 2 3
  83. 5 1 3
  84. :class:`CustomKBinsDiscretizer` can also be used in append mode by providing the
  85. name of the output_columns that should hold the encoded values.
  86. >>> discretizer = CustomKBinsDiscretizer(
  87. ... columns=["value_1", "value_2"],
  88. ... bins=[0, 1, 4, 10, 25],
  89. ... output_columns=["value_1_discretized", "value_2_discretized"]
  90. ... )
  91. >>> discretizer.fit_transform(ds).to_pandas() # doctest: +SKIP
  92. value_1 value_2 value_1_discretized value_2_discretized
  93. 0 0.2 10 0 2
  94. 1 1.4 15 1 3
  95. 2 2.5 13 1 3
  96. 3 6.2 12 2 3
  97. 4 9.7 23 2 3
  98. 5 2.1 25 1 3
  99. You can also specify different bin edges per column.
  100. >>> discretizer = CustomKBinsDiscretizer(
  101. ... columns=["value_1", "value_2"],
  102. ... bins={"value_1": [0, 1, 4], "value_2": [0, 18, 35, 70]},
  103. ... )
  104. >>> discretizer.transform(ds).to_pandas()
  105. value_1 value_2
  106. 0 0.0 0
  107. 1 1.0 0
  108. 2 1.0 0
  109. 3 NaN 0
  110. 4 NaN 1
  111. 5 1.0 1
  112. Args:
  113. columns: The columns to discretize.
  114. bins: Defines custom bin edges. Can be an iterable of numbers,
  115. a ``pd.IntervalIndex``, or a dict mapping columns to either of them.
  116. Note that ``pd.IntervalIndex`` for bins must be non-overlapping.
  117. right: Indicates whether bins include the rightmost edge.
  118. include_lowest: Indicates whether the first interval should be left-inclusive.
  119. duplicates: Can be either 'raise' or 'drop'. If bin edges are not unique,
  120. raise ``ValueError`` or drop non-uniques.
  121. dtypes: An optional dictionary that maps columns to ``pd.CategoricalDtype``
  122. objects or ``np.integer`` types. If you don't include a column in ``dtypes``
  123. or specify it as an integer dtype, the outputted column will consist of
  124. ordered integers corresponding to bins. If you use a
  125. ``pd.CategoricalDtype``, the outputted column will be a
  126. ``pd.CategoricalDtype`` with the categories being mapped to bins.
  127. You can use ``pd.CategoricalDtype(categories, ordered=True)`` to
  128. preserve information about bin order.
  129. output_columns: The names of the transformed columns. If None, the transformed
  130. columns will be the same as the input columns. If not None, the length of
  131. ``output_columns`` must match the length of ``columns``, othwerwise an error
  132. will be raised.
  133. .. seealso::
  134. :class:`UniformKBinsDiscretizer`
  135. If you want to bin data into uniform width bins.
  136. """
  137. def __init__(
  138. self,
  139. columns: List[str],
  140. bins: Union[
  141. Iterable[float],
  142. pd.IntervalIndex,
  143. Dict[str, Union[Iterable[float], pd.IntervalIndex]],
  144. ],
  145. *,
  146. right: bool = True,
  147. include_lowest: bool = False,
  148. duplicates: str = "raise",
  149. dtypes: Optional[
  150. Dict[str, Union[pd.CategoricalDtype, Type[np.integer]]]
  151. ] = None,
  152. output_columns: Optional[List[str]] = None,
  153. ):
  154. self.columns = columns
  155. self.bins = bins
  156. self.right = right
  157. self.include_lowest = include_lowest
  158. self.duplicates = duplicates
  159. self.dtypes = dtypes
  160. self.output_columns = Preprocessor._derive_and_validate_output_columns(
  161. columns, output_columns
  162. )
  163. self._validate_bins_columns()
  164. _is_fittable = False
  165. @PublicAPI(stability="alpha")
  166. class UniformKBinsDiscretizer(_AbstractKBinsDiscretizer):
  167. """Bin values into discrete intervals (bins) of uniform width.
  168. Columns must contain numerical values.
  169. Examples:
  170. Use :class:`UniformKBinsDiscretizer` to bin continuous features.
  171. >>> import pandas as pd
  172. >>> import ray
  173. >>> from ray.data.preprocessors import UniformKBinsDiscretizer
  174. >>> df = pd.DataFrame({
  175. ... "value_1": [0.2, 1.4, 2.5, 6.2, 9.7, 2.1],
  176. ... "value_2": [10, 15, 13, 12, 23, 25],
  177. ... })
  178. >>> ds = ray.data.from_pandas(df)
  179. >>> discretizer = UniformKBinsDiscretizer(
  180. ... columns=["value_1", "value_2"], bins=4
  181. ... )
  182. >>> discretizer.fit_transform(ds).to_pandas()
  183. value_1 value_2
  184. 0 0 0
  185. 1 0 1
  186. 2 0 0
  187. 3 2 0
  188. 4 3 3
  189. 5 0 3
  190. :class:`UniformKBinsDiscretizer` can also be used in append mode by providing the
  191. name of the output_columns that should hold the encoded values.
  192. >>> discretizer = UniformKBinsDiscretizer(
  193. ... columns=["value_1", "value_2"],
  194. ... bins=4,
  195. ... output_columns=["value_1_discretized", "value_2_discretized"]
  196. ... )
  197. >>> discretizer.fit_transform(ds).to_pandas() # doctest: +SKIP
  198. value_1 value_2 value_1_discretized value_2_discretized
  199. 0 0.2 10 0 0
  200. 1 1.4 15 0 1
  201. 2 2.5 13 0 0
  202. 3 6.2 12 2 0
  203. 4 9.7 23 3 3
  204. 5 2.1 25 0 3
  205. You can also specify different number of bins per column.
  206. >>> discretizer = UniformKBinsDiscretizer(
  207. ... columns=["value_1", "value_2"], bins={"value_1": 4, "value_2": 3}
  208. ... )
  209. >>> discretizer.fit_transform(ds).to_pandas()
  210. value_1 value_2
  211. 0 0 0
  212. 1 0 0
  213. 2 0 0
  214. 3 2 0
  215. 4 3 2
  216. 5 0 2
  217. Args:
  218. columns: The columns to discretize.
  219. bins: Defines the number of equal-width bins.
  220. Can be either an integer (which will be applied to all columns),
  221. or a dict that maps columns to integers.
  222. The range is extended by .1% on each side to include
  223. the minimum and maximum values.
  224. right: Indicates whether bins includes the rightmost edge or not.
  225. include_lowest: Whether the first interval should be left-inclusive
  226. or not.
  227. duplicates: Can be either 'raise' or 'drop'. If bin edges are not unique,
  228. raise ``ValueError`` or drop non-uniques.
  229. dtypes: An optional dictionary that maps columns to ``pd.CategoricalDtype``
  230. objects or ``np.integer`` types. If you don't include a column in ``dtypes``
  231. or specify it as an integer dtype, the outputted column will consist of
  232. ordered integers corresponding to bins. If you use a
  233. ``pd.CategoricalDtype``, the outputted column will be a
  234. ``pd.CategoricalDtype`` with the categories being mapped to bins.
  235. You can use ``pd.CategoricalDtype(categories, ordered=True)`` to
  236. preserve information about bin order.
  237. output_columns: The names of the transformed columns. If None, the transformed
  238. columns will be the same as the input columns. If not None, the length of
  239. ``output_columns`` must match the length of ``columns``, othwerwise an error
  240. will be raised.
  241. .. seealso::
  242. :class:`CustomKBinsDiscretizer`
  243. If you want to specify your own bin edges.
  244. """
  245. def __init__(
  246. self,
  247. columns: List[str],
  248. bins: Union[int, Dict[str, int]],
  249. *,
  250. right: bool = True,
  251. include_lowest: bool = False,
  252. duplicates: str = "raise",
  253. dtypes: Optional[
  254. Dict[str, Union[pd.CategoricalDtype, Type[np.integer]]]
  255. ] = None,
  256. output_columns: Optional[List[str]] = None,
  257. ):
  258. super().__init__()
  259. self.columns = columns
  260. self.bins = bins
  261. self.right = right
  262. self.include_lowest = include_lowest
  263. self.duplicates = duplicates
  264. self.dtypes = dtypes
  265. self.output_columns = Preprocessor._derive_and_validate_output_columns(
  266. columns, output_columns
  267. )
  268. def _fit(self, dataset: "Dataset") -> Preprocessor:
  269. self._validate_on_fit()
  270. if isinstance(self.bins, dict):
  271. columns = self.bins.keys()
  272. else:
  273. columns = self.columns
  274. for column in columns:
  275. bins = self.bins[column] if isinstance(self.bins, dict) else self.bins
  276. if not isinstance(bins, int):
  277. raise TypeError(
  278. f"`bins` must be an integer or a dict of integers, got {bins}"
  279. )
  280. self.stat_computation_plan.add_aggregator(
  281. aggregator_fn=Min,
  282. columns=columns,
  283. )
  284. self.stat_computation_plan.add_aggregator(
  285. aggregator_fn=Max,
  286. columns=columns,
  287. )
  288. return self
  289. def _validate_on_fit(self):
  290. self._validate_bins_columns()
  291. def _fit_execute(self, dataset: "Dataset"):
  292. stats = self.stat_computation_plan.compute(dataset)
  293. self.stats_ = post_fit_processor(stats, self.bins, self.right)
  294. return self
  295. def post_fit_processor(aggregate_stats: dict, bins: Union[str, Dict], right: bool):
  296. mins, maxes, stats = {}, {}, {}
  297. for key, value in aggregate_stats.items():
  298. column_name = key[4:-1] # min(column) -> column
  299. if key.startswith("min"):
  300. mins[column_name] = value
  301. if key.startswith("max"):
  302. maxes[column_name] = value
  303. for column in mins.keys():
  304. stats[column] = _translate_min_max_number_of_bins_to_bin_edges(
  305. mn=mins[column],
  306. mx=maxes[column],
  307. bins=bins[column] if isinstance(bins, dict) else bins,
  308. right=right,
  309. )
  310. return stats
  311. # Copied from
  312. # https://github.com/pandas-dev/pandas/blob/v1.4.4/pandas/core/reshape/tile.py#L257
  313. # under
  314. # BSD 3-Clause License
  315. #
  316. # Copyright (c) 2008-2011, AQR Capital Management, LLC, Lambda Foundry, Inc.
  317. # and PyData Development Team
  318. # All rights reserved.
  319. #
  320. # Copyright (c) 2011-2022, Open source contributors.
  321. #
  322. # Redistribution and use in source and binary forms, with or without
  323. # modification, are permitted provided that the following conditions are met:
  324. #
  325. # * Redistributions of source code must retain the above copyright notice, this
  326. # list of conditions and the following disclaimer.
  327. #
  328. # * Redistributions in binary form must reproduce the above copyright notice,
  329. # this list of conditions and the following disclaimer in the documentation
  330. # and/or other materials provided with the distribution.
  331. #
  332. # * Neither the name of the copyright holder nor the names of its
  333. # contributors may be used to endorse or promote products derived from
  334. # this software without specific prior written permission.
  335. #
  336. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  337. # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  338. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  339. # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
  340. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  341. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
  342. # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  343. # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
  344. # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  345. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  346. def _translate_min_max_number_of_bins_to_bin_edges(
  347. mn: float, mx: float, bins: int, right: bool
  348. ) -> List[float]:
  349. """Translates a range and desired number of bins into list of bin edges."""
  350. rng = (mn, mx)
  351. mn, mx = (mi + 0.0 for mi in rng)
  352. if np.isinf(mn) or np.isinf(mx):
  353. raise ValueError(
  354. "Cannot specify integer `bins` when input data contains infinity."
  355. )
  356. elif mn == mx: # adjust end points before binning
  357. mn -= 0.001 * abs(mn) if mn != 0 else 0.001
  358. mx += 0.001 * abs(mx) if mx != 0 else 0.001
  359. bins = np.linspace(mn, mx, bins + 1, endpoint=True)
  360. else: # adjust end points after binning
  361. bins = np.linspace(mn, mx, bins + 1, endpoint=True)
  362. adj = (mx - mn) * 0.001 # 0.1% of the range
  363. if right:
  364. bins[0] -= adj
  365. else:
  366. bins[-1] += adj
  367. return bins
  368. # TODO(ml-team)
  369. # Add QuantileKBinsDiscretizer