partitioning.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. import logging
  2. import posixpath
  3. import urllib.parse
  4. from dataclasses import dataclass
  5. from enum import Enum
  6. from typing import (
  7. TYPE_CHECKING,
  8. Any,
  9. Callable,
  10. Dict,
  11. List,
  12. Optional,
  13. Type,
  14. Union,
  15. )
  16. import numpy as np
  17. from ray.util.annotations import DeveloperAPI, PublicAPI
  18. if TYPE_CHECKING:
  19. import pyarrow
  20. from ray.data.expressions import Expr
  21. PartitionDataType = Type[Union[int, float, str, bool]]
  22. logger = logging.getLogger(__name__)
  23. @DeveloperAPI
  24. class PartitionStyle(str, Enum):
  25. """Supported dataset partition styles.
  26. Inherits from `str` to simplify plain text serialization/deserialization.
  27. Examples:
  28. >>> # Serialize to JSON text.
  29. >>> json.dumps(PartitionStyle.HIVE) # doctest: +SKIP
  30. '"hive"'
  31. >>> # Deserialize from JSON text.
  32. >>> PartitionStyle(json.loads('"hive"')) # doctest: +SKIP
  33. <PartitionStyle.HIVE: 'hive'>
  34. """
  35. HIVE = "hive"
  36. DIRECTORY = "dir"
  37. @DeveloperAPI
  38. @dataclass
  39. class Partitioning:
  40. """Partition scheme used to describe path-based partitions.
  41. Path-based partition formats embed all partition keys and values directly in
  42. their dataset file paths.
  43. For example, to read a dataset with
  44. `Hive-style partitions <https://athena.guide/articles/hive-style-partitioning>`_:
  45. >>> import ray
  46. >>> from ray.data.datasource.partitioning import Partitioning
  47. >>> ds = ray.data.read_csv(
  48. ... "s3://anonymous@ray-example-data/iris.csv",
  49. ... partitioning=Partitioning("hive"),
  50. ... )
  51. Instead, if your files are arranged in a directory structure such as:
  52. .. code::
  53. root/dog/dog_0.jpeg
  54. root/dog/dog_1.jpeg
  55. ...
  56. root/cat/cat_0.jpeg
  57. root/cat/cat_1.jpeg
  58. ...
  59. Then you can use directory-based partitioning:
  60. >>> import ray
  61. >>> from ray.data.datasource.partitioning import Partitioning
  62. >>> root = "s3://anonymous@air-example-data/cifar-10/images"
  63. >>> partitioning = Partitioning("dir", field_names=["class"], base_dir=root)
  64. >>> ds = ray.data.read_images(root, partitioning=partitioning)
  65. """
  66. #: The partition style - may be either HIVE or DIRECTORY.
  67. style: PartitionStyle
  68. #: "/"-delimited base directory that all partitioned paths should
  69. #: exist under (exclusive). File paths either outside of, or at the first
  70. #: level of, this directory will be considered unpartitioned. Specify
  71. #: `None` or an empty string to search for partitions in all file path
  72. #: directories.
  73. base_dir: Optional[str] = None
  74. #: The partition key field names (i.e. column names for tabular
  75. #: datasets). When non-empty, the order and length of partition key
  76. #: field names must match the order and length of partition values.
  77. #: Required when parsing DIRECTORY partitioned paths or generating
  78. #: HIVE partitioned paths.
  79. field_names: Optional[List[str]] = None
  80. #: A dictionary that maps partition key names to their desired data type. If not
  81. #: provided, the data type defaults to string.
  82. field_types: Optional[Dict[str, PartitionDataType]] = None
  83. #: Filesystem that will be used for partition path file I/O.
  84. filesystem: Optional["pyarrow.fs.FileSystem"] = None
  85. def __post_init__(self):
  86. if self.base_dir is None:
  87. self.base_dir = ""
  88. if self.field_types is None:
  89. self.field_types = {}
  90. self._normalized_base_dir = None
  91. self._resolved_filesystem = None
  92. @property
  93. def normalized_base_dir(self) -> str:
  94. """Returns the base directory normalized for compatibility with a filesystem."""
  95. if self._normalized_base_dir is None:
  96. self._normalize_base_dir()
  97. return self._normalized_base_dir
  98. @property
  99. def resolved_filesystem(self) -> "pyarrow.fs.FileSystem":
  100. """Returns the filesystem resolved for compatibility with a base directory."""
  101. if self._resolved_filesystem is None:
  102. self._normalize_base_dir()
  103. return self._resolved_filesystem
  104. def _normalize_base_dir(self):
  105. """Normalizes the partition base directory for compatibility with the
  106. given filesystem.
  107. This should be called once a filesystem has been resolved to ensure that this
  108. base directory is correctly discovered at the root of all partitioned file
  109. paths.
  110. """
  111. from ray.data.datasource.path_util import _resolve_paths_and_filesystem
  112. paths, self._resolved_filesystem = _resolve_paths_and_filesystem(
  113. self.base_dir,
  114. self.filesystem,
  115. )
  116. assert (
  117. len(paths) == 1
  118. ), f"Expected 1 normalized base directory, but found {len(paths)}"
  119. normalized_base_dir = paths[0]
  120. if len(normalized_base_dir) and not normalized_base_dir.endswith("/"):
  121. normalized_base_dir += "/"
  122. self._normalized_base_dir = normalized_base_dir
  123. @DeveloperAPI
  124. class PathPartitionParser:
  125. """Partition parser for path-based partition formats.
  126. Path-based partition formats embed all partition keys and values directly in
  127. their dataset file paths.
  128. Two path partition formats are currently supported - `HIVE` and `DIRECTORY`.
  129. For `HIVE` Partitioning, all partition directories under the base directory
  130. will be discovered based on `{key1}={value1}/{key2}={value2}` naming
  131. conventions. Key/value pairs do not need to be presented in the same
  132. order across all paths. Directory names nested under the base directory that
  133. don't follow this naming condition will be considered unpartitioned. If a
  134. partition filter is defined, then it will be called with an empty input
  135. dictionary for each unpartitioned file.
  136. For `DIRECTORY` Partitioning, all directories under the base directory will
  137. be interpreted as partition values of the form `{value1}/{value2}`. An
  138. accompanying ordered list of partition field names must also be provided,
  139. where the order and length of all partition values must match the order and
  140. length of field names. Files stored directly in the base directory will
  141. be considered unpartitioned. If a partition filter is defined, then it will
  142. be called with an empty input dictionary for each unpartitioned file. For
  143. example, if the base directory is `"foo"`, then `"foo.csv"` and `"foo/bar.csv"`
  144. would be considered unpartitioned files but `"foo/bar/baz.csv"` would be associated
  145. with partition `"bar"`. If the base directory is undefined, then `"foo.csv"` would
  146. be unpartitioned, `"foo/bar.csv"` would be associated with partition `"foo"`, and
  147. "foo/bar/baz.csv" would be associated with partition `("foo", "bar")`.
  148. """
  149. @staticmethod
  150. def of(
  151. style: PartitionStyle = PartitionStyle.HIVE,
  152. base_dir: Optional[str] = None,
  153. field_names: Optional[List[str]] = None,
  154. field_types: Optional[Dict[str, PartitionDataType]] = None,
  155. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  156. ) -> "PathPartitionParser":
  157. """Creates a path-based partition parser using a flattened argument list.
  158. Args:
  159. style: The partition style - may be either HIVE or DIRECTORY.
  160. base_dir: "/"-delimited base directory to start searching for partitions
  161. (exclusive). File paths outside of this directory will be considered
  162. unpartitioned. Specify `None` or an empty string to search for
  163. partitions in all file path directories.
  164. field_names: The partition key names. Required for DIRECTORY partitioning.
  165. Optional for HIVE partitioning. When non-empty, the order and length of
  166. partition key field names must match the order and length of partition
  167. directories discovered. Partition key field names are not required to
  168. exist in the dataset schema.
  169. field_types: A dictionary that maps partition key names to their desired
  170. data type. If not provided, the data type default to string.
  171. filesystem: Filesystem that will be used for partition path file I/O.
  172. Returns:
  173. The new path-based partition parser.
  174. """
  175. scheme = Partitioning(style, base_dir, field_names, field_types, filesystem)
  176. return PathPartitionParser(scheme)
  177. def __init__(self, partitioning: Partitioning):
  178. """Creates a path-based partition parser.
  179. Args:
  180. partitioning: The path-based partition scheme. The parser starts
  181. searching for partitions from this scheme's base directory. File paths
  182. outside the base directory will be considered unpartitioned. If the
  183. base directory is `None` or an empty string then this will search for
  184. partitions in all file path directories. Field names are required for
  185. DIRECTORY partitioning, and optional for HIVE partitioning. When
  186. non-empty, the order and length of partition key field names must match
  187. the order and length of partition directories discovered.
  188. """
  189. style = partitioning.style
  190. field_names = partitioning.field_names
  191. if style == PartitionStyle.DIRECTORY and not field_names:
  192. raise ValueError(
  193. "Directory partitioning requires a corresponding list of "
  194. "partition key field names. Please retry your request with one "
  195. "or more field names specified."
  196. )
  197. parsers = {
  198. PartitionStyle.HIVE: self._parse_hive_path,
  199. PartitionStyle.DIRECTORY: self._parse_dir_path,
  200. }
  201. self._parser_fn: Callable[[str], Dict[str, str]] = parsers.get(style)
  202. if self._parser_fn is None:
  203. raise ValueError(
  204. f"Unsupported partition style: {style}. "
  205. f"Supported styles: {parsers.keys()}"
  206. )
  207. self._scheme = partitioning
  208. def __call__(self, path: str) -> Dict[str, str]:
  209. """Parses partition keys and values from a single file path.
  210. Args:
  211. path: Input file path to parse.
  212. Returns:
  213. Dictionary mapping directory partition keys to values from the input file
  214. path. Returns an empty dictionary for unpartitioned files.
  215. """
  216. dir_path = self._dir_path_trim_base(path)
  217. if dir_path is None:
  218. return {}
  219. partitions: Dict[str, str] = self._parser_fn(dir_path)
  220. for field, data_type in self._scheme.field_types.items():
  221. partitions[field] = _cast_value(partitions[field], data_type)
  222. return partitions
  223. def evaluate_predicate_on_partition(self, path: str, predicate: "Expr") -> bool:
  224. """Evaluate a predicate expression against partition values from a path.
  225. This method enables partition pruning by evaluating predicates that reference
  226. partition columns against the partition values parsed from file paths.
  227. Args:
  228. path: File path to parse partition values from.
  229. predicate: Expression that references partition columns.
  230. Returns:
  231. True if the partition satisfies the predicate (should read the file),
  232. False if it doesn't (can skip the file for partition pruning).
  233. """
  234. import pyarrow as pa
  235. from ray.data._internal.planner.plan_expression.expression_evaluator import (
  236. NativeExpressionEvaluator,
  237. )
  238. # Parse partition values from the file path
  239. partition_values = self(path)
  240. if not partition_values:
  241. # Unpartitioned file - exclude it when filtering on partition columns
  242. # If the predicate references partition columns and the file doesn't have
  243. # partition values in its path, we can't determine if it matches
  244. return False
  245. try:
  246. # Create a single-row table with partition values
  247. partition_table = pa.table(
  248. {col: [val] for col, val in partition_values.items()}
  249. )
  250. # Evaluate using Ray Data's native evaluator
  251. evaluator = NativeExpressionEvaluator(partition_table)
  252. result = evaluator.visit(predicate)
  253. # Extract boolean result from array-like types
  254. # Check for specific array types to avoid issues with strings (which are iterable)
  255. if isinstance(result, (pa.Array, pa.ChunkedArray, np.ndarray)):
  256. return bool(result[0])
  257. # Import pandas here to avoid circular dependencies
  258. import pandas as pd
  259. if isinstance(result, pd.Series):
  260. return bool(result.iloc[0])
  261. # Scalar result (shouldn't happen with table evaluation, but handle conservatively)
  262. return bool(result)
  263. except Exception:
  264. logger.debug(
  265. "Failed to evaluate predicate on partition for path %s, "
  266. "conservatively including file.",
  267. path,
  268. exc_info=True,
  269. )
  270. return True
  271. @property
  272. def scheme(self) -> Partitioning:
  273. """Returns the partitioning for this parser."""
  274. return self._scheme
  275. def _dir_path_trim_base(self, path: str) -> Optional[str]:
  276. """Trims the normalized base directory and returns the directory path.
  277. Returns None if the path does not start with the normalized base directory.
  278. Simply returns the directory path if the base directory is undefined.
  279. """
  280. if not path.startswith(self._scheme.normalized_base_dir):
  281. return None
  282. path = path[len(self._scheme.normalized_base_dir) :]
  283. return posixpath.dirname(path)
  284. def _parse_hive_path(self, dir_path: str) -> Dict[str, str]:
  285. """Hive partition path parser.
  286. Returns a dictionary mapping partition keys to values given a hive-style
  287. partition path of the form "{key1}={value1}/{key2}={value2}/..." or an empty
  288. dictionary for unpartitioned files.
  289. """
  290. dirs = [d for d in dir_path.split("/") if d and (d.count("=") == 1)]
  291. kv_pairs = [d.split("=") for d in dirs] if dirs else []
  292. # NOTE: PyArrow URL-encodes partition values when writing to cloud storage. To
  293. # ensure the values are consistent when you read them back, we need to
  294. # URL-decode them. See https://github.com/apache/arrow/issues/34905.
  295. kv_pairs = [[key, urllib.parse.unquote(value)] for key, value in kv_pairs]
  296. field_names = self._scheme.field_names
  297. if field_names and kv_pairs:
  298. if len(kv_pairs) != len(field_names):
  299. raise ValueError(
  300. f"Expected {len(field_names)} partition value(s) but found "
  301. f"{len(kv_pairs)}: {kv_pairs}."
  302. )
  303. for i, field_name in enumerate(field_names):
  304. if kv_pairs[i][0] != field_name:
  305. raise ValueError(
  306. f"Expected partition key {field_name} but found "
  307. f"{kv_pairs[i][0]}"
  308. )
  309. return dict(kv_pairs)
  310. def _parse_dir_path(self, dir_path: str) -> Dict[str, str]:
  311. """Directory partition path parser.
  312. Returns a dictionary mapping directory partition keys to values from a
  313. partition path of the form "{value1}/{value2}/..." or an empty dictionary for
  314. unpartitioned files.
  315. Requires a corresponding ordered list of partition key field names to map the
  316. correct key to each value.
  317. """
  318. dirs = [d for d in dir_path.split("/") if d]
  319. field_names = self._scheme.field_names
  320. if dirs and len(dirs) != len(field_names):
  321. raise ValueError(
  322. f"Expected {len(field_names)} partition value(s) but found "
  323. f"{len(dirs)}: {dirs}."
  324. )
  325. if not dirs:
  326. return {}
  327. return {
  328. field: directory
  329. for field, directory in zip(field_names, dirs)
  330. if field is not None
  331. }
  332. @PublicAPI(stability="beta")
  333. class PathPartitionFilter:
  334. """Partition filter for path-based partition formats.
  335. Used to explicitly keep or reject files based on a custom filter function that
  336. takes partition keys and values parsed from the file's path as input.
  337. """
  338. @staticmethod
  339. def of(
  340. filter_fn: Callable[[Dict[str, str]], bool],
  341. style: PartitionStyle = PartitionStyle.HIVE,
  342. base_dir: Optional[str] = None,
  343. field_names: Optional[List[str]] = None,
  344. field_types: Optional[Dict[str, PartitionDataType]] = None,
  345. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  346. ) -> "PathPartitionFilter":
  347. """Creates a path-based partition filter using a flattened argument list.
  348. Args:
  349. filter_fn: Callback used to filter partitions. Takes a dictionary mapping
  350. partition keys to values as input. Unpartitioned files are denoted with
  351. an empty input dictionary. Returns `True` to read a file for that
  352. partition or `False` to skip it. Partition keys and values are always
  353. strings read from the filesystem path. For example, this removes all
  354. unpartitioned files:
  355. .. code:: python
  356. lambda d: True if d else False
  357. This raises an assertion error for any unpartitioned file found:
  358. .. code:: python
  359. def do_assert(val, msg):
  360. assert val, msg
  361. lambda d: do_assert(d, "Expected all files to be partitioned!")
  362. And this only reads files from January, 2022 partitions:
  363. .. code:: python
  364. lambda d: d["month"] == "January" and d["year"] == "2022"
  365. style: The partition style - may be either HIVE or DIRECTORY.
  366. base_dir: "/"-delimited base directory to start searching for partitions
  367. (exclusive). File paths outside of this directory will be considered
  368. unpartitioned. Specify `None` or an empty string to search for
  369. partitions in all file path directories.
  370. field_names: The partition key names. Required for DIRECTORY partitioning.
  371. Optional for HIVE partitioning. When non-empty, the order and length of
  372. partition key field names must match the order and length of partition
  373. directories discovered. Partition key field names are not required to
  374. exist in the dataset schema.
  375. field_types: A dictionary that maps partition key names to their desired
  376. data type. If not provided, the data type defaults to string.
  377. filesystem: Filesystem that will be used for partition path file I/O.
  378. Returns:
  379. The new path-based partition filter.
  380. """
  381. scheme = Partitioning(style, base_dir, field_names, field_types, filesystem)
  382. path_partition_parser = PathPartitionParser(scheme)
  383. return PathPartitionFilter(path_partition_parser, filter_fn)
  384. def __init__(
  385. self,
  386. path_partition_parser: PathPartitionParser,
  387. filter_fn: Callable[[Dict[str, str]], bool],
  388. ):
  389. """Creates a new path-based partition filter based on a parser.
  390. Args:
  391. path_partition_parser: The path-based partition parser.
  392. filter_fn: Callback used to filter partitions. Takes a dictionary mapping
  393. partition keys to values as input. Unpartitioned files are denoted with
  394. an empty input dictionary. Returns `True` to read a file for that
  395. partition or `False` to skip it. Partition keys and values are always
  396. strings read from the filesystem path. For example, this removes all
  397. unpartitioned files:
  398. ``lambda d: True if d else False``
  399. This raises an assertion error for any unpartitioned file found:
  400. ``lambda d: assert d, "Expected all files to be partitioned!"``
  401. And this only reads files from January, 2022 partitions:
  402. ``lambda d: d["month"] == "January" and d["year"] == "2022"``
  403. """
  404. self._parser = path_partition_parser
  405. self._filter_fn = filter_fn
  406. def __call__(self, paths: List[str]) -> List[str]:
  407. """Returns all paths that pass this partition scheme's partition filter.
  408. If no partition filter is set, then returns all input paths. If a base
  409. directory is set, then only paths under this base directory will be parsed
  410. for partitions. All paths outside of this base directory will automatically
  411. be considered unpartitioned, and passed into the filter function as empty
  412. dictionaries.
  413. Also normalizes the partition base directory for compatibility with the
  414. given filesystem before applying the filter.
  415. Args:
  416. paths: Paths to pass through the partition filter function. All
  417. paths should be normalized for compatibility with the given
  418. filesystem.
  419. Returns:
  420. List of paths that pass the partition filter, or all paths if no
  421. partition filter is defined.
  422. """
  423. filtered_paths = paths
  424. if self._filter_fn is not None:
  425. filtered_paths = [path for path in paths if self.apply(path)]
  426. return filtered_paths
  427. def apply(self, path: str) -> bool:
  428. return self._filter_fn(self._parser(path))
  429. @property
  430. def parser(self) -> PathPartitionParser:
  431. """Returns the path partition parser for this filter."""
  432. return self._parser
  433. def _cast_value(value: str, data_type: PartitionDataType) -> Any:
  434. if data_type is int:
  435. return int(value)
  436. elif data_type is float:
  437. return float(value)
  438. elif data_type is bool:
  439. return value.lower() == "true"
  440. else:
  441. return value