scaler.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
  2. import numpy as np
  3. import pandas as pd
  4. import pyarrow as pa
  5. import pyarrow.compute as pc
  6. from ray.data.aggregate import AbsMax, ApproximateQuantile, Max, Mean, Min, Std
  7. from ray.data.block import BlockAccessor
  8. from ray.data.preprocessor import Preprocessor, SerializablePreprocessorBase
  9. from ray.data.preprocessors.version_support import SerializablePreprocessor
  10. from ray.data.util.data_batch_conversion import BatchFormat
  11. from ray.util.annotations import DeveloperAPI, PublicAPI
  12. if TYPE_CHECKING:
  13. from ray.data.dataset import Dataset
  14. # Small epsilon value to handle near-zero values in division operations.
  15. # This prevents numerical instability when scaling columns with very small
  16. # variance or range. Similar to sklearn's approach.
  17. _EPSILON = 1e-8
  18. @PublicAPI(stability="alpha")
  19. @SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.standard_scaler")
  20. class StandardScaler(SerializablePreprocessorBase):
  21. r"""Translate and scale each column by its mean and standard deviation,
  22. respectively.
  23. The general formula is given by
  24. .. math::
  25. x' = \frac{x - \bar{x}}{s}
  26. where :math:`x` is the column, :math:`x'` is the transformed column,
  27. :math:`\bar{x}` is the column average, and :math:`s` is the column's sample
  28. standard deviation. If :math:`s = 0` (i.e., the column is constant-valued),
  29. then the transformed column will contain zeros.
  30. .. warning::
  31. :class:`StandardScaler` works best when your data is normal. If your data isn't
  32. approximately normal, then the transformed features won't be meaningful.
  33. Examples:
  34. >>> import pandas as pd
  35. >>> import ray
  36. >>> from ray.data.preprocessors import StandardScaler
  37. >>>
  38. >>> df = pd.DataFrame({"X1": [-2, 0, 2], "X2": [-3, -3, 3], "X3": [1, 1, 1]})
  39. >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
  40. >>> ds.to_pandas() # doctest: +SKIP
  41. X1 X2 X3
  42. 0 -2 -3 1
  43. 1 0 -3 1
  44. 2 2 3 1
  45. Columns are scaled separately.
  46. >>> preprocessor = StandardScaler(columns=["X1", "X2"])
  47. >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
  48. X1 X2 X3
  49. 0 -1.224745 -0.707107 1
  50. 1 0.000000 -0.707107 1
  51. 2 1.224745 1.414214 1
  52. Constant-valued columns get filled with zeros.
  53. >>> preprocessor = StandardScaler(columns=["X3"])
  54. >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
  55. X1 X2 X3
  56. 0 -2 -3 0.0
  57. 1 0 -3 0.0
  58. 2 2 3 0.0
  59. >>> preprocessor = StandardScaler(
  60. ... columns=["X1", "X2"],
  61. ... output_columns=["X1_scaled", "X2_scaled"]
  62. ... )
  63. >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
  64. X1 X2 X3 X1_scaled X2_scaled
  65. 0 -2 -3 1 -1.224745 -0.707107
  66. 1 0 -3 1 0.000000 -0.707107
  67. 2 2 3 1 1.224745 1.414214
  68. Args:
  69. columns: The columns to separately scale.
  70. output_columns: The names of the transformed columns. If None, the transformed
  71. columns will be the same as the input columns. If not None, the length of
  72. ``output_columns`` must match the length of ``columns``, othwerwise an error
  73. will be raised.
  74. """
  75. def __init__(self, columns: List[str], output_columns: Optional[List[str]] = None):
  76. super().__init__()
  77. self.columns = columns
  78. self.output_columns = Preprocessor._derive_and_validate_output_columns(
  79. columns, output_columns
  80. )
  81. def _fit(self, dataset: "Dataset") -> Preprocessor:
  82. self.stat_computation_plan.add_aggregator(
  83. aggregator_fn=Mean,
  84. columns=self.columns,
  85. )
  86. self.stat_computation_plan.add_aggregator(
  87. aggregator_fn=lambda col: Std(col, ddof=0),
  88. columns=self.columns,
  89. )
  90. return self
  91. def _transform_pandas(self, df: pd.DataFrame):
  92. def column_standard_scaler(s: pd.Series):
  93. s_mean = self.stats_[f"mean({s.name})"]
  94. s_std = self.stats_[f"std({s.name})"]
  95. if s_std is None or s_mean is None:
  96. s[:] = np.nan
  97. return s
  98. # Handle division by zero and near-zero values for numerical stability.
  99. # If standard deviation is very small (constant or near-constant column),
  100. # treat it as 1 to avoid numerical instability.
  101. if s_std < _EPSILON:
  102. s_std = 1
  103. return (s - s_mean) / s_std
  104. df[self.output_columns] = df[self.columns].transform(column_standard_scaler)
  105. return df
  106. @staticmethod
  107. def _scale_column(column: pa.Array, mean: float, std: float) -> pa.Array:
  108. # Handle division by zero and near-zero values for numerical stability.
  109. if std < _EPSILON:
  110. std = 1
  111. return pc.divide(
  112. pc.subtract(column, pa.scalar(float(mean))), pa.scalar(float(std))
  113. )
  114. def _transform_arrow(self, table: pa.Table) -> pa.Table:
  115. """Transform using fast native PyArrow operations."""
  116. # Read all input columns first to avoid reading modified data when
  117. # output_columns[i] == columns[j] for i < j
  118. input_columns = [table.column(input_col) for input_col in self.columns]
  119. for input_col, output_col, column in zip(
  120. self.columns, self.output_columns, input_columns
  121. ):
  122. s_mean = self.stats_[f"mean({input_col})"]
  123. s_std = self.stats_[f"std({input_col})"]
  124. if s_std is None or s_mean is None:
  125. # Return column filled with nulls, preserving original column type
  126. null_array = pa.nulls(len(column), type=column.type)
  127. table = BlockAccessor.for_block(table).upsert_column(
  128. output_col, null_array
  129. )
  130. continue
  131. scaled_column = self._scale_column(column, s_mean, s_std)
  132. table = BlockAccessor.for_block(table).upsert_column(
  133. output_col, scaled_column
  134. )
  135. return table
  136. @classmethod
  137. @DeveloperAPI
  138. def preferred_batch_format(cls) -> BatchFormat:
  139. return BatchFormat.ARROW
  140. def _get_serializable_fields(self) -> Dict[str, Any]:
  141. return {
  142. "columns": self.columns,
  143. "output_columns": self.output_columns,
  144. "_fitted": getattr(self, "_fitted", None),
  145. }
  146. def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
  147. # required fields
  148. self.columns = fields["columns"]
  149. self.output_columns = fields["output_columns"]
  150. # optional fields
  151. self._fitted = fields.get("_fitted")
  152. def __repr__(self):
  153. return f"{self.__class__.__name__}(columns={self.columns!r}, output_columns={self.output_columns!r})"
  154. @PublicAPI(stability="alpha")
  155. @SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.min_max_scaler")
  156. class MinMaxScaler(SerializablePreprocessorBase):
  157. r"""Scale each column by its range.
  158. The general formula is given by
  159. .. math::
  160. x' = \frac{x - \min(x)}{\max{x} - \min{x}}
  161. where :math:`x` is the column and :math:`x'` is the transformed column. If
  162. :math:`\max{x} - \min{x} = 0` (i.e., the column is constant-valued), then the
  163. transformed column will get filled with zeros.
  164. Transformed values are always in the range :math:`[0, 1]`.
  165. .. tip::
  166. This can be used as an alternative to :py:class:`StandardScaler`.
  167. Examples:
  168. >>> import pandas as pd
  169. >>> import ray
  170. >>> from ray.data.preprocessors import MinMaxScaler
  171. >>>
  172. >>> df = pd.DataFrame({"X1": [-2, 0, 2], "X2": [-3, -3, 3], "X3": [1, 1, 1]}) # noqa: E501
  173. >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
  174. >>> ds.to_pandas() # doctest: +SKIP
  175. X1 X2 X3
  176. 0 -2 -3 1
  177. 1 0 -3 1
  178. 2 2 3 1
  179. Columns are scaled separately.
  180. >>> preprocessor = MinMaxScaler(columns=["X1", "X2"])
  181. >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
  182. X1 X2 X3
  183. 0 0.0 0.0 1
  184. 1 0.5 0.0 1
  185. 2 1.0 1.0 1
  186. Constant-valued columns get filled with zeros.
  187. >>> preprocessor = MinMaxScaler(columns=["X3"])
  188. >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
  189. X1 X2 X3
  190. 0 -2 -3 0.0
  191. 1 0 -3 0.0
  192. 2 2 3 0.0
  193. >>> preprocessor = MinMaxScaler(columns=["X1", "X2"], output_columns=["X1_scaled", "X2_scaled"])
  194. >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
  195. X1 X2 X3 X1_scaled X2_scaled
  196. 0 -2 -3 1 0.0 0.0
  197. 1 0 -3 1 0.5 0.0
  198. 2 2 3 1 1.0 1.0
  199. Args:
  200. columns: The columns to separately scale.
  201. output_columns: The names of the transformed columns. If None, the transformed
  202. columns will be the same as the input columns. If not None, the length of
  203. ``output_columns`` must match the length of ``columns``, othwerwise an error
  204. will be raised.
  205. """
  206. def __init__(self, columns: List[str], output_columns: Optional[List[str]] = None):
  207. super().__init__()
  208. self.columns = columns
  209. self.output_columns = Preprocessor._derive_and_validate_output_columns(
  210. columns, output_columns
  211. )
  212. def _fit(self, dataset: "Dataset") -> Preprocessor:
  213. aggregates = [Agg(col) for Agg in [Min, Max] for col in self.columns]
  214. self.stats_ = dataset.aggregate(*aggregates)
  215. return self
  216. def _transform_pandas(self, df: pd.DataFrame):
  217. def column_min_max_scaler(s: pd.Series):
  218. s_min = self.stats_[f"min({s.name})"]
  219. s_max = self.stats_[f"max({s.name})"]
  220. diff = s_max - s_min
  221. # Handle division by zero and near-zero values for numerical stability.
  222. # If range is very small (constant or near-constant column),
  223. # treat it as 1 to avoid numerical instability.
  224. if diff < _EPSILON:
  225. diff = 1
  226. return (s - s_min) / diff
  227. df[self.output_columns] = df[self.columns].transform(column_min_max_scaler)
  228. return df
  229. def _get_serializable_fields(self) -> Dict[str, Any]:
  230. return {
  231. "columns": self.columns,
  232. "output_columns": self.output_columns,
  233. "_fitted": getattr(self, "_fitted", None),
  234. }
  235. def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
  236. # required fields
  237. self.columns = fields["columns"]
  238. self.output_columns = fields["output_columns"]
  239. # optional fields
  240. self._fitted = fields.get("_fitted")
  241. def __repr__(self):
  242. return f"{self.__class__.__name__}(columns={self.columns!r}, output_columns={self.output_columns!r})"
  243. @PublicAPI(stability="alpha")
  244. @SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.max_abs_scaler")
  245. class MaxAbsScaler(SerializablePreprocessorBase):
  246. r"""Scale each column by its absolute max value.
  247. The general formula is given by
  248. .. math::
  249. x' = \frac{x}{\max{\vert x \vert}}
  250. where :math:`x` is the column and :math:`x'` is the transformed column. If
  251. :math:`\max{\vert x \vert} = 0` (i.e., the column contains all zeros), then the
  252. column is unmodified.
  253. .. tip::
  254. This is the recommended way to scale sparse data. If you data isn't sparse,
  255. you can use :class:`MinMaxScaler` or :class:`StandardScaler` instead.
  256. Examples:
  257. >>> import pandas as pd
  258. >>> import ray
  259. >>> from ray.data.preprocessors import MaxAbsScaler
  260. >>>
  261. >>> df = pd.DataFrame({"X1": [-6, 3], "X2": [2, -4], "X3": [0, 0]}) # noqa: E501
  262. >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
  263. >>> ds.to_pandas() # doctest: +SKIP
  264. X1 X2 X3
  265. 0 -6 2 0
  266. 1 3 -4 0
  267. Columns are scaled separately.
  268. >>> preprocessor = MaxAbsScaler(columns=["X1", "X2"])
  269. >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
  270. X1 X2 X3
  271. 0 -1.0 0.5 0
  272. 1 0.5 -1.0 0
  273. Zero-valued columns aren't scaled.
  274. >>> preprocessor = MaxAbsScaler(columns=["X3"])
  275. >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
  276. X1 X2 X3
  277. 0 -6 2 0.0
  278. 1 3 -4 0.0
  279. >>> preprocessor = MaxAbsScaler(columns=["X1", "X2"], output_columns=["X1_scaled", "X2_scaled"])
  280. >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
  281. X1 X2 X3 X1_scaled X2_scaled
  282. 0 -2 -3 1 -1.0 -1.0
  283. 1 0 -3 1 0.0 -1.0
  284. 2 2 3 1 1.0 1.0
  285. Args:
  286. columns: The columns to separately scale.
  287. output_columns: The names of the transformed columns. If None, the transformed
  288. columns will be the same as the input columns. If not None, the length of
  289. ``output_columns`` must match the length of ``columns``, othwerwise an error
  290. will be raised.
  291. """
  292. def __init__(self, columns: List[str], output_columns: Optional[List[str]] = None):
  293. super().__init__()
  294. self.columns = columns
  295. self.output_columns = Preprocessor._derive_and_validate_output_columns(
  296. columns, output_columns
  297. )
  298. def _fit(self, dataset: "Dataset") -> Preprocessor:
  299. aggregates = [AbsMax(col) for col in self.columns]
  300. self.stats_ = dataset.aggregate(*aggregates)
  301. return self
  302. def _transform_pandas(self, df: pd.DataFrame):
  303. def column_abs_max_scaler(s: pd.Series):
  304. s_abs_max = self.stats_[f"abs_max({s.name})"]
  305. # Handle division by zero.
  306. # All values are 0.
  307. if s_abs_max == 0:
  308. s_abs_max = 1
  309. return s / s_abs_max
  310. df[self.output_columns] = df[self.columns].transform(column_abs_max_scaler)
  311. return df
  312. def _get_serializable_fields(self) -> Dict[str, Any]:
  313. return {
  314. "columns": self.columns,
  315. "output_columns": self.output_columns,
  316. "_fitted": getattr(self, "_fitted", None),
  317. }
  318. def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
  319. # required fields
  320. self.columns = fields["columns"]
  321. self.output_columns = fields["output_columns"]
  322. # optional fields
  323. self._fitted = fields.get("_fitted")
  324. def __repr__(self):
  325. return f"{self.__class__.__name__}(columns={self.columns!r}, output_columns={self.output_columns!r})"
  326. @PublicAPI(stability="alpha")
  327. @SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.robust_scaler")
  328. class RobustScaler(SerializablePreprocessorBase):
  329. r"""Scale and translate each column using approximate quantiles.
  330. The general formula is given by
  331. .. math::
  332. x' = \frac{x - \mu_{1/2}}{\mu_h - \mu_l}
  333. where :math:`x` is the column, :math:`x'` is the transformed column,
  334. :math:`\mu_{1/2}` is the column median. :math:`\mu_{h}` and :math:`\mu_{l}` are the
  335. high and low quantiles, respectively. By default, :math:`\mu_{h}` is the third
  336. quartile and :math:`\mu_{l}` is the first quartile.
  337. Internally, the `ApproximateQuantile` aggregator is used to calculate the
  338. approximate quantiles.
  339. .. tip::
  340. This scaler works well when your data contains many outliers.
  341. Examples:
  342. >>> import pandas as pd
  343. >>> import ray
  344. >>> from ray.data.preprocessors import RobustScaler
  345. >>>
  346. >>> df = pd.DataFrame({
  347. ... "X1": [1, 2, 3, 4, 5],
  348. ... "X2": [13, 5, 14, 2, 8],
  349. ... "X3": [1, 2, 2, 2, 3],
  350. ... })
  351. >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
  352. >>> ds.to_pandas() # doctest: +SKIP
  353. X1 X2 X3
  354. 0 1 13 1
  355. 1 2 5 2
  356. 2 3 14 2
  357. 3 4 2 2
  358. 4 5 8 3
  359. :class:`RobustScaler` separately scales each column.
  360. >>> preprocessor = RobustScaler(columns=["X1", "X2"])
  361. >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
  362. X1 X2 X3
  363. 0 -1.0 0.625 1
  364. 1 -0.5 -0.375 2
  365. 2 0.0 0.750 2
  366. 3 0.5 -0.750 2
  367. 4 1.0 0.000 3
  368. >>> preprocessor = RobustScaler(
  369. ... columns=["X1", "X2"],
  370. ... output_columns=["X1_scaled", "X2_scaled"]
  371. ... )
  372. >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
  373. X1 X2 X3 X1_scaled X2_scaled
  374. 0 1 13 1 -1.0 0.625
  375. 1 2 5 2 -0.5 -0.375
  376. 2 3 14 2 0.0 0.750
  377. 3 4 2 2 0.5 -0.750
  378. 4 5 8 3 1.0 0.000
  379. Args:
  380. columns: The columns to separately scale.
  381. quantile_range: A tuple that defines the lower and upper quantiles. Values
  382. must be between 0 and 1. Defaults to the 1st and 3rd quartiles:
  383. ``(0.25, 0.75)``.
  384. output_columns: The names of the transformed columns. If None, the transformed
  385. columns will be the same as the input columns. If not None, the length of
  386. ``output_columns`` must match the length of ``columns``, othwerwise an error
  387. will be raised.
  388. quantile_precision: Controls the accuracy and memory footprint of the sketch (K in KLL);
  389. higher values yield lower error but use more memory. Defaults to 800. See
  390. https://datasketches.apache.org/docs/KLL/KLLAccuracyAndSize.html
  391. for details on accuracy and size.
  392. """
  393. DEFAULT_QUANTILE_PRECISION = 800
  394. def __init__(
  395. self,
  396. columns: List[str],
  397. quantile_range: Tuple[float, float] = (0.25, 0.75),
  398. output_columns: Optional[List[str]] = None,
  399. quantile_precision: int = DEFAULT_QUANTILE_PRECISION,
  400. ):
  401. super().__init__()
  402. self.columns = columns
  403. self.quantile_range = quantile_range
  404. self.quantile_precision = quantile_precision
  405. self.output_columns = Preprocessor._derive_and_validate_output_columns(
  406. columns, output_columns
  407. )
  408. def _fit(self, dataset: "Dataset") -> Preprocessor:
  409. quantiles = [
  410. self.quantile_range[0],
  411. 0.50,
  412. self.quantile_range[1],
  413. ]
  414. aggregates = [
  415. ApproximateQuantile(
  416. on=col,
  417. quantiles=quantiles,
  418. quantile_precision=self.quantile_precision,
  419. )
  420. for col in self.columns
  421. ]
  422. aggregated = dataset.aggregate(*aggregates)
  423. self.stats_ = {}
  424. for col in self.columns:
  425. low_q, med_q, high_q = aggregated[f"approx_quantile({col})"]
  426. self.stats_[f"low_quantile({col})"] = low_q
  427. self.stats_[f"median({col})"] = med_q
  428. self.stats_[f"high_quantile({col})"] = high_q
  429. return self
  430. def _transform_pandas(self, df: pd.DataFrame):
  431. def column_robust_scaler(s: pd.Series):
  432. s_low_q = self.stats_[f"low_quantile({s.name})"]
  433. s_median = self.stats_[f"median({s.name})"]
  434. s_high_q = self.stats_[f"high_quantile({s.name})"]
  435. diff = s_high_q - s_low_q
  436. # Handle division by zero.
  437. # Return all zeros.
  438. if diff == 0:
  439. return np.zeros_like(s)
  440. return (s - s_median) / diff
  441. df[self.output_columns] = df[self.columns].transform(column_robust_scaler)
  442. return df
  443. def _get_serializable_fields(self) -> Dict[str, Any]:
  444. return {
  445. "columns": self.columns,
  446. "output_columns": self.output_columns,
  447. "quantile_range": self.quantile_range,
  448. "quantile_precision": self.quantile_precision,
  449. "_fitted": getattr(self, "_fitted", None),
  450. }
  451. def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
  452. # required fields
  453. self.columns = fields["columns"]
  454. self.output_columns = fields["output_columns"]
  455. self.quantile_range = fields["quantile_range"]
  456. self.quantile_precision = fields["quantile_precision"]
  457. # optional fields
  458. self._fitted = fields.get("_fitted")
  459. def __repr__(self):
  460. return (
  461. f"{self.__class__.__name__}(columns={self.columns!r}, "
  462. f"quantile_range={self.quantile_range!r}), "
  463. f"output_columns={self.output_columns!r})"
  464. )