file_meta_provider.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. import itertools
  2. import logging
  3. import os
  4. import pathlib
  5. import re
  6. from typing import (
  7. TYPE_CHECKING,
  8. Callable,
  9. Iterator,
  10. List,
  11. Optional,
  12. Tuple,
  13. TypeVar,
  14. Union,
  15. )
  16. import numpy as np
  17. from ray.data._internal.progress.progress_bar import ProgressBar
  18. from ray.data._internal.remote_fn import cached_remote_fn
  19. from ray.data._internal.util import RetryingPyFileSystem
  20. from ray.data.block import BlockMetadata
  21. from ray.data.datasource.partitioning import Partitioning, PathPartitionFilter
  22. from ray.data.datasource.path_util import _has_file_extension
  23. from ray.util.annotations import DeveloperAPI
  24. if TYPE_CHECKING:
  25. import pyarrow
  26. logger = logging.getLogger(__name__)
  27. @DeveloperAPI
  28. class FileMetadataProvider:
  29. """Abstract callable that provides metadata for the files of a single dataset block.
  30. Current subclasses:
  31. - :class:`BaseFileMetadataProvider`
  32. """
  33. def _get_block_metadata(
  34. self,
  35. paths: List[str],
  36. **kwargs,
  37. ) -> BlockMetadata:
  38. """Resolves and returns block metadata for files in the given paths.
  39. All file paths provided should belong to a single dataset block.
  40. Args:
  41. paths: The file paths for a single dataset block.
  42. **kwargs: Additional kwargs used to determine block metadata.
  43. Returns:
  44. BlockMetadata aggregated across the given paths.
  45. """
  46. raise NotImplementedError
  47. def __call__(
  48. self,
  49. paths: List[str],
  50. **kwargs,
  51. ) -> BlockMetadata:
  52. return self._get_block_metadata(paths, **kwargs)
  53. @DeveloperAPI
  54. class BaseFileMetadataProvider(FileMetadataProvider):
  55. """Abstract callable that provides metadata for
  56. :class:`~ray.data.datasource.file_based_datasource.FileBasedDatasource`
  57. implementations that reuse the base :meth:`~ray.data.Datasource.prepare_read`
  58. method.
  59. Also supports file and file size discovery in input directory paths.
  60. Current subclasses:
  61. - :class:`DefaultFileMetadataProvider`
  62. """
  63. def _get_block_metadata(
  64. self,
  65. paths: List[str],
  66. *,
  67. rows_per_file: Optional[int],
  68. file_sizes: List[Optional[int]],
  69. ) -> BlockMetadata:
  70. """Resolves and returns block metadata for files of a single dataset block.
  71. Args:
  72. paths: The file paths for a single dataset block. These
  73. paths will always be a subset of those previously returned from
  74. :meth:`.expand_paths`.
  75. rows_per_file: The fixed number of rows per input file, or None.
  76. file_sizes: Optional file size per input file previously returned
  77. from :meth:`.expand_paths`, where `file_sizes[i]` holds the size of
  78. the file at `paths[i]`.
  79. Returns:
  80. BlockMetadata aggregated across the given file paths.
  81. """
  82. raise NotImplementedError
  83. def expand_paths(
  84. self,
  85. paths: List[str],
  86. filesystem: Optional["RetryingPyFileSystem"],
  87. partitioning: Optional[Partitioning] = None,
  88. ignore_missing_paths: bool = False,
  89. ) -> Iterator[Tuple[str, int]]:
  90. """Expands all paths into concrete file paths by walking directories.
  91. Also returns a sidecar of file sizes.
  92. The input paths must be normalized for compatibility with the input
  93. filesystem prior to invocation.
  94. Args:
  95. paths: A list of file and/or directory paths compatible with the
  96. given filesystem.
  97. filesystem: The filesystem implementation that should be used for
  98. expanding all paths and reading their files.
  99. ignore_missing_paths: If True, ignores any file paths in ``paths`` that
  100. are not found. Defaults to False.
  101. Returns:
  102. An iterator of `(file_path, file_size)` pairs. None may be returned for the
  103. file size if it is either unknown or will be fetched later by
  104. `_get_block_metadata()`, but the length of
  105. both lists must be equal.
  106. """
  107. raise NotImplementedError
  108. @DeveloperAPI
  109. class DefaultFileMetadataProvider(BaseFileMetadataProvider):
  110. """Default metadata provider for
  111. :class:`~ray.data.datasource.file_based_datasource.FileBasedDatasource`
  112. implementations that reuse the base `prepare_read` method.
  113. Calculates block size in bytes as the sum of its constituent file sizes,
  114. and assumes a fixed number of rows per file.
  115. """
  116. def _get_block_metadata(
  117. self,
  118. paths: List[str],
  119. *,
  120. rows_per_file: Optional[int],
  121. file_sizes: List[Optional[int]],
  122. ) -> BlockMetadata:
  123. if rows_per_file is None:
  124. num_rows = None
  125. else:
  126. num_rows = len(paths) * rows_per_file
  127. return BlockMetadata(
  128. num_rows=num_rows,
  129. size_bytes=None if None in file_sizes else int(sum(file_sizes)),
  130. input_files=paths,
  131. exec_stats=None,
  132. ) # Exec stats filled in later.
  133. def expand_paths(
  134. self,
  135. paths: List[str],
  136. filesystem: "RetryingPyFileSystem",
  137. partitioning: Optional[Partitioning] = None,
  138. ignore_missing_paths: bool = False,
  139. ) -> Iterator[Tuple[str, int]]:
  140. yield from _expand_paths(paths, filesystem, partitioning, ignore_missing_paths)
  141. def _handle_read_os_error(error: OSError, paths: Union[str, List[str]]) -> str:
  142. # NOTE: this is not comprehensive yet, and should be extended as more errors arise.
  143. # NOTE: The latter patterns are raised in Arrow 10+, while the former is raised in
  144. # Arrow < 10.
  145. aws_error_pattern = (
  146. r"^(?:(.*)AWS Error \[code \d+\]: No response body\.(.*))|"
  147. r"(?:(.*)AWS Error UNKNOWN \(HTTP status 400\) during HeadObject operation: "
  148. r"No response body\.(.*))|"
  149. r"(?:(.*)AWS Error ACCESS_DENIED during HeadObject operation: No response "
  150. r"body\.(.*))$"
  151. )
  152. if re.match(aws_error_pattern, str(error)):
  153. # Specially handle AWS error when reading files, to give a clearer error
  154. # message to avoid confusing users. The real issue is most likely that the AWS
  155. # S3 file credentials have not been properly configured yet.
  156. if isinstance(paths, str):
  157. # Quote to highlight single file path in error message for better
  158. # readability. List of file paths will be shown up as ['foo', 'boo'],
  159. # so only quote single file path here.
  160. paths = f'"{paths}"'
  161. raise OSError(
  162. (
  163. f"Failing to read AWS S3 file(s): {paths}. "
  164. "Please check that file exists and has properly configured access. "
  165. "You can also run AWS CLI command to get more detailed error message "
  166. "(e.g., aws s3 ls <file-name>). "
  167. "See https://awscli.amazonaws.com/v2/documentation/api/latest/reference/s3/index.html " # noqa
  168. "and https://docs.ray.io/en/latest/data/creating-datasets.html#reading-from-remote-storage " # noqa
  169. "for more information."
  170. )
  171. )
  172. else:
  173. raise error
  174. def _list_files(
  175. paths: List[str],
  176. filesystem: "RetryingPyFileSystem",
  177. *,
  178. partition_filter: Optional[PathPartitionFilter],
  179. file_extensions: Optional[List[str]],
  180. ) -> List[Tuple[str, int]]:
  181. return list(
  182. _list_files_internal(
  183. paths,
  184. filesystem,
  185. partition_filter=partition_filter,
  186. file_extensions=file_extensions,
  187. )
  188. )
  189. def _list_files_internal(
  190. paths: List[str],
  191. filesystem: "RetryingPyFileSystem",
  192. *,
  193. partition_filter: Optional[PathPartitionFilter],
  194. file_extensions: Optional[List[str]],
  195. ) -> Iterator[Tuple[str, int]]:
  196. default_meta_provider = DefaultFileMetadataProvider()
  197. for path, file_size in default_meta_provider.expand_paths(paths, filesystem):
  198. # HACK: PyArrow's `ParquetDataset` errors if input paths contain non-parquet
  199. # files. To avoid this, we expand the input paths with the default metadata
  200. # provider and then apply the partition filter or file extensions.
  201. if (
  202. partition_filter
  203. and not partition_filter.apply(path)
  204. or not _has_file_extension(path, file_extensions)
  205. ):
  206. continue
  207. yield path, file_size
  208. def _expand_paths(
  209. paths: List[str],
  210. filesystem: "RetryingPyFileSystem",
  211. partitioning: Optional[Partitioning],
  212. ignore_missing_paths: bool = False,
  213. ) -> Iterator[Tuple[str, int]]:
  214. """Get the file sizes for all provided file paths."""
  215. from pyarrow.fs import LocalFileSystem
  216. from ray.data.datasource.file_based_datasource import (
  217. FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD,
  218. )
  219. from ray.data.datasource.path_util import _is_http_url, _unwrap_protocol
  220. # We break down our processing paths into a few key cases:
  221. # 1. If len(paths) < threshold, fetch the file info for the individual files/paths
  222. # serially.
  223. # 2. If all paths are contained under the same parent directory (or base directory,
  224. # if using partitioning), fetch all file infos at this prefix and filter to the
  225. # provided paths on the client; this should be a single file info request.
  226. # 3. If more than threshold requests required, parallelize them via Ray tasks.
  227. # 1. Small # of paths case.
  228. is_local = isinstance(filesystem, LocalFileSystem)
  229. if isinstance(filesystem, RetryingPyFileSystem):
  230. is_local = isinstance(filesystem.unwrap(), LocalFileSystem)
  231. if (
  232. len(paths) < FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD
  233. # Local file systems are very fast to hit.
  234. or is_local
  235. ):
  236. yield from _get_file_infos_serial(paths, filesystem, ignore_missing_paths)
  237. else:
  238. # 2. Common path prefix case.
  239. # Get longest common path of all paths.
  240. common_path = os.path.commonpath(paths)
  241. # If parent directory (or base directory, if using partitioning) is common to
  242. # all paths, fetch all file infos at that prefix and filter the response to the
  243. # provided paths.
  244. if not _is_http_url(common_path) and (
  245. (
  246. partitioning is not None
  247. and common_path == _unwrap_protocol(partitioning.base_dir)
  248. )
  249. or all(str(pathlib.Path(path).parent) == common_path for path in paths)
  250. ):
  251. yield from _get_file_infos_common_path_prefix(
  252. paths, common_path, filesystem, ignore_missing_paths
  253. )
  254. # 3. Parallelization case.
  255. else:
  256. # Parallelize requests via Ray tasks.
  257. yield from _get_file_infos_parallel(paths, filesystem, ignore_missing_paths)
  258. def _get_file_infos_serial(
  259. paths: List[str],
  260. filesystem: "RetryingPyFileSystem",
  261. ignore_missing_paths: bool = False,
  262. ) -> Iterator[Tuple[str, int]]:
  263. for path in paths:
  264. yield from _get_file_infos(path, filesystem, ignore_missing_paths)
  265. def _get_file_infos_common_path_prefix(
  266. paths: List[str],
  267. common_path: str,
  268. filesystem: "pyarrow.fs.FileSystem",
  269. ignore_missing_paths: bool = False,
  270. ) -> Iterator[Tuple[str, int]]:
  271. path_to_size = {path: None for path in paths}
  272. for path, file_size in _get_file_infos(
  273. common_path, filesystem, ignore_missing_paths
  274. ):
  275. if path in path_to_size:
  276. path_to_size[path] = file_size
  277. # Check if all `paths` have file size metadata.
  278. # If any of paths has no file size, fall back to get files metadata in parallel.
  279. # This can happen when path is a directory, but not a file.
  280. have_missing_path = False
  281. for path in paths:
  282. if path_to_size[path] is None:
  283. logger.debug(
  284. f"Finding path {path} not have file size metadata. "
  285. "Fall back to get files metadata in parallel for all paths."
  286. )
  287. have_missing_path = True
  288. break
  289. if have_missing_path:
  290. # Parallelize requests via Ray tasks.
  291. yield from _get_file_infos_parallel(paths, filesystem, ignore_missing_paths)
  292. else:
  293. # Iterate over `paths` to yield each path in original order.
  294. # NOTE: do not iterate over `path_to_size` because the dictionary skips
  295. # duplicated path, while `paths` might contain duplicated path if one wants
  296. # to read same file multiple times.
  297. for path in paths:
  298. yield path, path_to_size[path]
  299. def _get_file_infos_parallel(
  300. paths: List[str],
  301. filesystem: "RetryingPyFileSystem",
  302. ignore_missing_paths: bool = False,
  303. ) -> Iterator[Tuple[str, int]]:
  304. from ray.data.datasource.file_based_datasource import (
  305. PATHS_PER_FILE_SIZE_FETCH_TASK,
  306. _unwrap_s3_serialization_workaround,
  307. _wrap_s3_serialization_workaround,
  308. )
  309. logger.warning(
  310. f"Expanding {len(paths)} path(s). This may be a HIGH LATENCY "
  311. f"operation on some cloud storage services. Moving all the "
  312. "paths to a common parent directory will lead to faster "
  313. "metadata fetching."
  314. )
  315. # Capture the filesystem in the fetcher func closure, but wrap it in our
  316. # serialization workaround to make sure that the pickle roundtrip works as expected.
  317. filesystem = _wrap_s3_serialization_workaround(filesystem)
  318. def _file_infos_fetcher(paths: List[str]) -> List[Tuple[str, int]]:
  319. fs = _unwrap_s3_serialization_workaround(filesystem)
  320. return list(
  321. itertools.chain.from_iterable(
  322. _get_file_infos(path, fs, ignore_missing_paths) for path in paths
  323. )
  324. )
  325. yield from _fetch_metadata_parallel(
  326. paths, _file_infos_fetcher, PATHS_PER_FILE_SIZE_FETCH_TASK
  327. )
  328. Uri = TypeVar("Uri")
  329. Meta = TypeVar("Meta")
  330. def _fetch_metadata_parallel(
  331. uris: List[Uri],
  332. fetch_func: Callable[[List[Uri]], List[Meta]],
  333. desired_uris_per_task: int,
  334. **ray_remote_args,
  335. ) -> Iterator[Meta]:
  336. """Fetch file metadata in parallel using Ray tasks."""
  337. remote_fetch_func = cached_remote_fn(fetch_func)
  338. if ray_remote_args:
  339. remote_fetch_func = remote_fetch_func.options(**ray_remote_args)
  340. # Choose a parallelism that results in a # of metadata fetches per task that
  341. # dominates the Ray task overhead while ensuring good parallelism.
  342. # Always launch at least 2 parallel fetch tasks.
  343. parallelism = max(len(uris) // desired_uris_per_task, 2)
  344. metadata_fetch_bar = ProgressBar(
  345. "Metadata Fetch Progress", total=parallelism, unit="task"
  346. )
  347. fetch_tasks = []
  348. for uri_chunk in np.array_split(uris, parallelism):
  349. if len(uri_chunk) == 0:
  350. continue
  351. fetch_tasks.append(remote_fetch_func.remote(uri_chunk))
  352. results = metadata_fetch_bar.fetch_until_complete(fetch_tasks)
  353. yield from itertools.chain.from_iterable(results)
  354. def _get_file_infos(
  355. path: str, filesystem: "RetryingPyFileSystem", ignore_missing_path: bool = False
  356. ) -> List[Tuple[str, int]]:
  357. """Get the file info for all files at or under the provided path."""
  358. from pyarrow.fs import FileType
  359. file_infos = []
  360. try:
  361. file_info = filesystem.get_file_info(path)
  362. except OSError as e:
  363. _handle_read_os_error(e, path)
  364. if file_info.type == FileType.Directory:
  365. for file_path, file_size in _expand_directory(path, filesystem):
  366. file_infos.append((file_path, file_size))
  367. elif file_info.type == FileType.File:
  368. file_infos.append((path, file_info.size))
  369. elif file_info.type == FileType.NotFound and ignore_missing_path:
  370. pass
  371. else:
  372. raise FileNotFoundError(path)
  373. return file_infos
  374. def _expand_directory(
  375. path: str,
  376. filesystem: "RetryingPyFileSystem",
  377. exclude_prefixes: Optional[List[str]] = None,
  378. ignore_missing_path: bool = False,
  379. ) -> List[Tuple[str, int]]:
  380. """
  381. Expand the provided directory path to a list of file paths.
  382. Args:
  383. path: The directory path to expand.
  384. filesystem: The filesystem implementation that should be used for
  385. reading these files.
  386. exclude_prefixes: The file relative path prefixes that should be
  387. excluded from the returned file set. Default excluded prefixes are
  388. "." and "_".
  389. Returns:
  390. An iterator of (file_path, file_size) tuples.
  391. """
  392. if exclude_prefixes is None:
  393. exclude_prefixes = [".", "_"]
  394. from pyarrow.fs import FileSelector
  395. selector = FileSelector(path, recursive=True, allow_not_found=ignore_missing_path)
  396. files = filesystem.get_file_info(selector)
  397. base_path = selector.base_dir
  398. out = []
  399. for file_ in files:
  400. if not file_.is_file:
  401. continue
  402. file_path = file_.path
  403. if not file_path.startswith(base_path):
  404. continue
  405. relative = file_path[len(base_path) :]
  406. if any(relative.startswith(prefix) for prefix in exclude_prefixes):
  407. continue
  408. out.append((file_path, file_.size))
  409. # We sort the paths to guarantee a stable order.
  410. return sorted(out)