parquet.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. import io
  2. import json
  3. import warnings
  4. import fsspec
  5. from .core import url_to_fs
  6. from .spec import AbstractBufferedFile
  7. from .utils import merge_offset_ranges
  8. # Parquet-Specific Utilities for fsspec
  9. #
  10. # Most of the functions defined in this module are NOT
  11. # intended for public consumption. The only exception
  12. # to this is `open_parquet_file`, which should be used
  13. # place of `fs.open()` to open parquet-formatted files
  14. # on remote file systems.
  15. class AlreadyBufferedFile(AbstractBufferedFile):
  16. def _fetch_range(self, start, end):
  17. raise NotImplementedError
  18. def open_parquet_files(
  19. path: list[str],
  20. fs: None | fsspec.AbstractFileSystem = None,
  21. metadata=None,
  22. columns: None | list[str] = None,
  23. row_groups: None | list[int] = None,
  24. storage_options: None | dict = None,
  25. engine: str = "auto",
  26. max_gap: int = 64_000,
  27. max_block: int = 256_000_000,
  28. footer_sample_size: int = 1_000_000,
  29. filters: None | list[list[list[str]]] = None,
  30. **kwargs,
  31. ):
  32. """
  33. Return a file-like object for a single Parquet file.
  34. The specified parquet `engine` will be used to parse the
  35. footer metadata, and determine the required byte ranges
  36. from the file. The target path will then be opened with
  37. the "parts" (`KnownPartsOfAFile`) caching strategy.
  38. Note that this method is intended for usage with remote
  39. file systems, and is unlikely to improve parquet-read
  40. performance on local file systems.
  41. Parameters
  42. ----------
  43. path: str
  44. Target file path.
  45. metadata: Any, optional
  46. Parquet metadata object. Object type must be supported
  47. by the backend parquet engine. For now, only the "fastparquet"
  48. engine supports an explicit `ParquetFile` metadata object.
  49. If a metadata object is supplied, the remote footer metadata
  50. will not need to be transferred into local memory.
  51. fs: AbstractFileSystem, optional
  52. Filesystem object to use for opening the file. If nothing is
  53. specified, an `AbstractFileSystem` object will be inferred.
  54. engine : str, default "auto"
  55. Parquet engine to use for metadata parsing. Allowed options
  56. include "fastparquet", "pyarrow", and "auto". The specified
  57. engine must be installed in the current environment. If
  58. "auto" is specified, and both engines are installed,
  59. "fastparquet" will take precedence over "pyarrow".
  60. columns: list, optional
  61. List of all column names that may be read from the file.
  62. row_groups : list, optional
  63. List of all row-groups that may be read from the file. This
  64. may be a list of row-group indices (integers), or it may be
  65. a list of `RowGroup` metadata objects (if the "fastparquet"
  66. engine is used).
  67. storage_options : dict, optional
  68. Used to generate an `AbstractFileSystem` object if `fs` was
  69. not specified.
  70. max_gap : int, optional
  71. Neighboring byte ranges will only be merged when their
  72. inter-range gap is <= `max_gap`. Default is 64KB.
  73. max_block : int, optional
  74. Neighboring byte ranges will only be merged when the size of
  75. the aggregated range is <= `max_block`. Default is 256MB.
  76. footer_sample_size : int, optional
  77. Number of bytes to read from the end of the path to look
  78. for the footer metadata. If the sampled bytes do not contain
  79. the footer, a second read request will be required, and
  80. performance will suffer. Default is 1MB.
  81. filters : list[list], optional
  82. List of filters to apply to prevent reading row groups, of the
  83. same format as accepted by the loading engines. Ignored if
  84. ``row_groups`` is specified.
  85. **kwargs :
  86. Optional key-word arguments to pass to `fs.open`
  87. """
  88. # Make sure we have an `AbstractFileSystem` object
  89. # to work with
  90. if fs is None:
  91. path0 = path
  92. if isinstance(path, (list, tuple)):
  93. path = path[0]
  94. fs, path = url_to_fs(path, **(storage_options or {}))
  95. else:
  96. path0 = path
  97. # For now, `columns == []` not supported, is the same
  98. # as all columns
  99. if columns is not None and len(columns) == 0:
  100. columns = None
  101. # Set the engine
  102. engine = _set_engine(engine)
  103. if isinstance(path0, (list, tuple)):
  104. paths = path0
  105. elif "*" in path:
  106. paths = fs.glob(path)
  107. elif path0.endswith("/"): # or fs.isdir(path):
  108. paths = [
  109. _
  110. for _ in fs.find(path, withdirs=False, detail=False)
  111. if _.endswith((".parquet", ".parq"))
  112. ]
  113. else:
  114. paths = [path]
  115. data = _get_parquet_byte_ranges(
  116. paths,
  117. fs,
  118. metadata=metadata,
  119. columns=columns,
  120. row_groups=row_groups,
  121. engine=engine,
  122. max_gap=max_gap,
  123. max_block=max_block,
  124. footer_sample_size=footer_sample_size,
  125. filters=filters,
  126. )
  127. # Call self.open with "parts" caching
  128. options = kwargs.pop("cache_options", {}).copy()
  129. return [
  130. AlreadyBufferedFile(
  131. fs=None,
  132. path=fn,
  133. mode="rb",
  134. cache_type="parts",
  135. cache_options={
  136. **options,
  137. "data": ranges,
  138. },
  139. size=max(_[1] for _ in ranges),
  140. **kwargs,
  141. )
  142. for fn, ranges in data.items()
  143. ]
  144. def open_parquet_file(*args, **kwargs):
  145. """Create files tailed to reading specific parts of parquet files
  146. Please see ``open_parquet_files`` for details of the arguments. The
  147. difference is, this function always returns a single ``AlreadyBufferedFile``,
  148. whereas `open_parquet_files`` always returns a list of files, even if
  149. there are one or zero matching parquet files.
  150. """
  151. return open_parquet_files(*args, **kwargs)[0]
  152. def _get_parquet_byte_ranges(
  153. paths,
  154. fs,
  155. metadata=None,
  156. columns=None,
  157. row_groups=None,
  158. max_gap=64_000,
  159. max_block=256_000_000,
  160. footer_sample_size=1_000_000,
  161. engine="auto",
  162. filters=None,
  163. ):
  164. """Get a dictionary of the known byte ranges needed
  165. to read a specific column/row-group selection from a
  166. Parquet dataset. Each value in the output dictionary
  167. is intended for use as the `data` argument for the
  168. `KnownPartsOfAFile` caching strategy of a single path.
  169. """
  170. # Set engine if necessary
  171. if isinstance(engine, str):
  172. engine = _set_engine(engine)
  173. # Pass to a specialized function if metadata is defined
  174. if metadata is not None:
  175. # Use the provided parquet metadata object
  176. # to avoid transferring/parsing footer metadata
  177. return _get_parquet_byte_ranges_from_metadata(
  178. metadata,
  179. fs,
  180. engine,
  181. columns=columns,
  182. row_groups=row_groups,
  183. max_gap=max_gap,
  184. max_block=max_block,
  185. filters=filters,
  186. )
  187. # Populate global paths, starts, & ends
  188. if columns is None and row_groups is None and filters is None:
  189. # We are NOT selecting specific columns or row-groups.
  190. #
  191. # We can avoid sampling the footers, and just transfer
  192. # all file data with cat_ranges
  193. result = {path: {(0, len(data)): data} for path, data in fs.cat(paths).items()}
  194. else:
  195. # We ARE selecting specific columns or row-groups.
  196. #
  197. # Get file sizes asynchronously
  198. file_sizes = fs.sizes(paths)
  199. data_paths = []
  200. data_starts = []
  201. data_ends = []
  202. # Gather file footers.
  203. # We just take the last `footer_sample_size` bytes of each
  204. # file (or the entire file if it is smaller than that)
  205. footer_starts = [
  206. max(0, file_size - footer_sample_size) for file_size in file_sizes
  207. ]
  208. footer_samples = fs.cat_ranges(paths, footer_starts, file_sizes)
  209. # Check our footer samples and re-sample if necessary.
  210. large_footer = []
  211. for i, path in enumerate(paths):
  212. footer_size = int.from_bytes(footer_samples[i][-8:-4], "little")
  213. real_footer_start = file_sizes[i] - (footer_size + 8)
  214. if real_footer_start < footer_starts[i]:
  215. large_footer.append((i, real_footer_start))
  216. if large_footer:
  217. warnings.warn(
  218. f"Not enough data was used to sample the parquet footer. "
  219. f"Try setting footer_sample_size >= {large_footer}."
  220. )
  221. path0 = [paths[i] for i, _ in large_footer]
  222. starts = [_[1] for _ in large_footer]
  223. ends = [file_sizes[i] - footer_sample_size for i, _ in large_footer]
  224. data = fs.cat_ranges(path0, starts, ends)
  225. for i, (path, start, block) in enumerate(zip(path0, starts, data)):
  226. footer_samples[i] = block + footer_samples[i]
  227. footer_starts[i] = start
  228. result = {
  229. path: {(start, size): data}
  230. for path, start, size, data in zip(
  231. paths, footer_starts, file_sizes, footer_samples
  232. )
  233. }
  234. # Calculate required byte ranges for each path
  235. for i, path in enumerate(paths):
  236. # Use "engine" to collect data byte ranges
  237. path_data_starts, path_data_ends = engine._parquet_byte_ranges(
  238. columns,
  239. row_groups=row_groups,
  240. footer=footer_samples[i],
  241. footer_start=footer_starts[i],
  242. filters=filters,
  243. )
  244. data_paths += [path] * len(path_data_starts)
  245. data_starts += path_data_starts
  246. data_ends += path_data_ends
  247. # Merge adjacent offset ranges
  248. data_paths, data_starts, data_ends = merge_offset_ranges(
  249. data_paths,
  250. data_starts,
  251. data_ends,
  252. max_gap=max_gap,
  253. max_block=max_block,
  254. sort=True,
  255. )
  256. # Transfer the data byte-ranges into local memory
  257. _transfer_ranges(fs, result, data_paths, data_starts, data_ends)
  258. # Add b"PAR1" to headers
  259. _add_header_magic(result)
  260. return result
  261. def _get_parquet_byte_ranges_from_metadata(
  262. metadata,
  263. fs,
  264. engine,
  265. columns=None,
  266. row_groups=None,
  267. max_gap=64_000,
  268. max_block=256_000_000,
  269. filters=None,
  270. ):
  271. """Simplified version of `_get_parquet_byte_ranges` for
  272. the case that an engine-specific `metadata` object is
  273. provided, and the remote footer metadata does not need to
  274. be transferred before calculating the required byte ranges.
  275. """
  276. # Use "engine" to collect data byte ranges
  277. data_paths, data_starts, data_ends = engine._parquet_byte_ranges(
  278. columns, row_groups=row_groups, metadata=metadata, filters=filters
  279. )
  280. # Merge adjacent offset ranges
  281. data_paths, data_starts, data_ends = merge_offset_ranges(
  282. data_paths,
  283. data_starts,
  284. data_ends,
  285. max_gap=max_gap,
  286. max_block=max_block,
  287. sort=False, # Should be sorted
  288. )
  289. # Transfer the data byte-ranges into local memory
  290. result = {fn: {} for fn in list(set(data_paths))}
  291. _transfer_ranges(fs, result, data_paths, data_starts, data_ends)
  292. # Add b"PAR1" to header
  293. _add_header_magic(result)
  294. return result
  295. def _transfer_ranges(fs, blocks, paths, starts, ends):
  296. # Use cat_ranges to gather the data byte_ranges
  297. ranges = (paths, starts, ends)
  298. for path, start, stop, data in zip(*ranges, fs.cat_ranges(*ranges)):
  299. blocks[path][(start, stop)] = data
  300. def _add_header_magic(data):
  301. # Add b"PAR1" to file headers
  302. for path in list(data):
  303. add_magic = True
  304. for k in data[path]:
  305. if k[0] == 0 and k[1] >= 4:
  306. add_magic = False
  307. break
  308. if add_magic:
  309. data[path][(0, 4)] = b"PAR1"
  310. def _set_engine(engine_str):
  311. # Define a list of parquet engines to try
  312. if engine_str == "auto":
  313. try_engines = ("fastparquet", "pyarrow")
  314. elif not isinstance(engine_str, str):
  315. raise ValueError(
  316. "Failed to set parquet engine! "
  317. "Please pass 'fastparquet', 'pyarrow', or 'auto'"
  318. )
  319. elif engine_str not in ("fastparquet", "pyarrow"):
  320. raise ValueError(f"{engine_str} engine not supported by `fsspec.parquet`")
  321. else:
  322. try_engines = [engine_str]
  323. # Try importing the engines in `try_engines`,
  324. # and choose the first one that succeeds
  325. for engine in try_engines:
  326. try:
  327. if engine == "fastparquet":
  328. return FastparquetEngine()
  329. elif engine == "pyarrow":
  330. return PyarrowEngine()
  331. except ImportError:
  332. pass
  333. # Raise an error if a supported parquet engine
  334. # was not found
  335. raise ImportError(
  336. f"The following parquet engines are not installed "
  337. f"in your python environment: {try_engines}."
  338. f"Please install 'fastparquert' or 'pyarrow' to "
  339. f"utilize the `fsspec.parquet` module."
  340. )
  341. class FastparquetEngine:
  342. # The purpose of the FastparquetEngine class is
  343. # to check if fastparquet can be imported (on initialization)
  344. # and to define a `_parquet_byte_ranges` method. In the
  345. # future, this class may also be used to define other
  346. # methods/logic that are specific to fastparquet.
  347. def __init__(self):
  348. import fastparquet as fp
  349. self.fp = fp
  350. def _parquet_byte_ranges(
  351. self,
  352. columns,
  353. row_groups=None,
  354. metadata=None,
  355. footer=None,
  356. footer_start=None,
  357. filters=None,
  358. ):
  359. # Initialize offset ranges and define ParqetFile metadata
  360. pf = metadata
  361. data_paths, data_starts, data_ends = [], [], []
  362. if filters and row_groups:
  363. raise ValueError("filters and row_groups cannot be used together")
  364. if pf is None:
  365. pf = self.fp.ParquetFile(io.BytesIO(footer))
  366. # Convert columns to a set and add any index columns
  367. # specified in the pandas metadata (just in case)
  368. column_set = None if columns is None else {c.split(".", 1)[0] for c in columns}
  369. if column_set is not None and hasattr(pf, "pandas_metadata"):
  370. md_index = [
  371. ind
  372. for ind in pf.pandas_metadata.get("index_columns", [])
  373. # Ignore RangeIndex information
  374. if not isinstance(ind, dict)
  375. ]
  376. column_set |= set(md_index)
  377. # Check if row_groups is a list of integers
  378. # or a list of row-group metadata
  379. if filters:
  380. from fastparquet.api import filter_row_groups
  381. row_group_indices = None
  382. row_groups = filter_row_groups(pf, filters)
  383. elif row_groups and not isinstance(row_groups[0], int):
  384. # Input row_groups contains row-group metadata
  385. row_group_indices = None
  386. else:
  387. # Input row_groups contains row-group indices
  388. row_group_indices = row_groups
  389. row_groups = pf.row_groups
  390. if column_set is not None:
  391. column_set = [
  392. _ if isinstance(_, list) else _.split(".") for _ in column_set
  393. ]
  394. # Loop through column chunks to add required byte ranges
  395. for r, row_group in enumerate(row_groups):
  396. # Skip this row-group if we are targeting
  397. # specific row-groups
  398. if row_group_indices is None or r in row_group_indices:
  399. # Find the target parquet-file path for `row_group`
  400. fn = pf.row_group_filename(row_group)
  401. for column in row_group.columns:
  402. name = column.meta_data.path_in_schema
  403. # Skip this column if we are targeting specific columns
  404. if column_set is None or _cmp(name, column_set):
  405. file_offset0 = column.meta_data.dictionary_page_offset
  406. if file_offset0 is None:
  407. file_offset0 = column.meta_data.data_page_offset
  408. num_bytes = column.meta_data.total_compressed_size
  409. if footer_start is None or file_offset0 < footer_start:
  410. data_paths.append(fn)
  411. data_starts.append(file_offset0)
  412. data_ends.append(
  413. min(
  414. file_offset0 + num_bytes,
  415. footer_start or (file_offset0 + num_bytes),
  416. )
  417. )
  418. if metadata:
  419. # The metadata in this call may map to multiple
  420. # file paths. Need to include `data_paths`
  421. return data_paths, data_starts, data_ends
  422. return data_starts, data_ends
  423. class PyarrowEngine:
  424. # The purpose of the PyarrowEngine class is
  425. # to check if pyarrow can be imported (on initialization)
  426. # and to define a `_parquet_byte_ranges` method. In the
  427. # future, this class may also be used to define other
  428. # methods/logic that are specific to pyarrow.
  429. def __init__(self):
  430. import pyarrow.parquet as pq
  431. self.pq = pq
  432. def _parquet_byte_ranges(
  433. self,
  434. columns,
  435. row_groups=None,
  436. metadata=None,
  437. footer=None,
  438. footer_start=None,
  439. filters=None,
  440. ):
  441. if metadata is not None:
  442. raise ValueError("metadata input not supported for PyarrowEngine")
  443. if filters:
  444. # there must be a way!
  445. raise NotImplementedError
  446. data_starts, data_ends = [], []
  447. md = self.pq.ParquetFile(io.BytesIO(footer)).metadata
  448. # Convert columns to a set and add any index columns
  449. # specified in the pandas metadata (just in case)
  450. column_set = None if columns is None else set(columns)
  451. if column_set is not None:
  452. schema = md.schema.to_arrow_schema()
  453. has_pandas_metadata = (
  454. schema.metadata is not None and b"pandas" in schema.metadata
  455. )
  456. if has_pandas_metadata:
  457. md_index = [
  458. ind
  459. for ind in json.loads(
  460. schema.metadata[b"pandas"].decode("utf8")
  461. ).get("index_columns", [])
  462. # Ignore RangeIndex information
  463. if not isinstance(ind, dict)
  464. ]
  465. column_set |= set(md_index)
  466. if column_set is not None:
  467. column_set = [
  468. _[:1] if isinstance(_, list) else _.split(".")[:1] for _ in column_set
  469. ]
  470. # Loop through column chunks to add required byte ranges
  471. for r in range(md.num_row_groups):
  472. # Skip this row-group if we are targeting
  473. # specific row-groups
  474. if row_groups is None or r in row_groups:
  475. row_group = md.row_group(r)
  476. for c in range(row_group.num_columns):
  477. column = row_group.column(c)
  478. name = column.path_in_schema.split(".")
  479. # Skip this column if we are targeting specific columns
  480. if column_set is None or _cmp(name, column_set):
  481. meta = column.to_dict()
  482. # Any offset could be the first one
  483. file_offset0 = min(
  484. _
  485. for _ in [
  486. meta.get("dictionary_page_offset"),
  487. meta.get("data_page_offset"),
  488. meta.get("index_page_offset"),
  489. ]
  490. if _ is not None
  491. )
  492. if file_offset0 < footer_start:
  493. data_starts.append(file_offset0)
  494. data_ends.append(
  495. min(
  496. meta["total_compressed_size"] + file_offset0,
  497. footer_start,
  498. )
  499. )
  500. data_starts.append(footer_start)
  501. data_ends.append(footer_start + len(footer))
  502. return data_starts, data_ends
  503. def _cmp(name, column_set):
  504. return any(all(a == b for a, b in zip(name, _)) for _ in column_set)