path_util.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. import logging
  2. import pathlib
  3. import sys
  4. from typing import TYPE_CHECKING, List, Optional, Tuple, Union
  5. from urllib.parse import quote, unquote, urlparse
  6. from ray.data._internal.util import RetryingPyFileSystem, _resolve_custom_scheme
  7. logger = logging.getLogger(__name__)
  8. if TYPE_CHECKING:
  9. import fsspec.spec
  10. import pyarrow
  11. def _get_fsspec_http_filesystem() -> "pyarrow.fs.PyFileSystem":
  12. """Get fsspec HTTPFileSystem wrapped in PyArrow PyFileSystem.
  13. Returns:
  14. PyFileSystem wrapping fsspec HTTPFileSystem.
  15. Raises:
  16. ImportError: If fsspec is not installed.
  17. """
  18. try:
  19. import fsspec # noqa: F401
  20. from fsspec.implementations.http import HTTPFileSystem
  21. except ModuleNotFoundError:
  22. raise ImportError("Please install fsspec to read files from HTTP.") from None
  23. from pyarrow.fs import FSSpecHandler, PyFileSystem
  24. return PyFileSystem(FSSpecHandler(HTTPFileSystem()))
  25. def _validate_and_wrap_filesystem(
  26. filesystem: Optional[
  27. Union["pyarrow.fs.FileSystem", "fsspec.spec.AbstractFileSystem"]
  28. ],
  29. ) -> Optional["pyarrow.fs.FileSystem"]:
  30. """Validate filesystem and wrap fsspec filesystems in PyArrow.
  31. Args:
  32. filesystem: Filesystem to validate and potentially wrap. Can be None,
  33. a pyarrow.fs.FileSystem, or an fsspec.spec.AbstractFileSystem.
  34. Returns:
  35. None if filesystem is None, otherwise a pyarrow.fs.FileSystem
  36. (either the original if already PyArrow, or wrapped if fsspec).
  37. Raises:
  38. TypeError: If filesystem is not None and not a valid pyarrow or fsspec filesystem.
  39. """
  40. if filesystem is None:
  41. return None
  42. from pyarrow.fs import FileSystem
  43. if isinstance(filesystem, FileSystem):
  44. return filesystem
  45. try:
  46. import fsspec # noqa: F401
  47. except ModuleNotFoundError:
  48. raise TypeError("fsspec is not installed") from None
  49. if not isinstance(filesystem, fsspec.spec.AbstractFileSystem):
  50. raise TypeError(
  51. f"Filesystem must conform to pyarrow.fs.FileSystem or "
  52. f"fsspec.spec.AbstractFileSystem, got: {type(filesystem).__name__}"
  53. )
  54. from pyarrow.fs import FSSpecHandler, PyFileSystem
  55. return PyFileSystem(FSSpecHandler(filesystem))
  56. def _try_resolve_with_encoding(
  57. path: str,
  58. filesystem: Optional["pyarrow.fs.FileSystem"],
  59. ) -> Tuple["pyarrow.fs.FileSystem", str]:
  60. """Try resolving a path with URL encoding for special characters.
  61. This handles paths with special characters like ';', '?', '#' that
  62. may cause URI parsing errors.
  63. Args:
  64. path: The path to resolve.
  65. filesystem: Optional filesystem to validate against.
  66. Returns:
  67. Tuple of (resolved_filesystem, resolved_path).
  68. """
  69. from pyarrow.fs import _resolve_filesystem_and_path
  70. encoded_path = quote(path, safe="/:", errors="ignore")
  71. resolved_filesystem, resolved_path = _resolve_filesystem_and_path(
  72. encoded_path, filesystem
  73. )
  74. return resolved_filesystem, unquote(resolved_path, errors="ignore")
  75. def _has_file_extension(path: str, extensions: Optional[List[str]]) -> bool:
  76. """Check if a path has a file extension in the provided list.
  77. Examples:
  78. >>> _has_file_extension("foo.csv", ["csv"])
  79. True
  80. >>> _has_file_extension("foo.CSV", ["csv"])
  81. True
  82. >>> _has_file_extension("foo.CSV", [".csv"])
  83. True
  84. >>> _has_file_extension("foo.csv", ["json", "jsonl"])
  85. False
  86. >>> _has_file_extension("foo.csv", None)
  87. True
  88. Args:
  89. path: The path to check.
  90. extensions: A list of extensions to check against. If `None`, any extension is
  91. considered valid.
  92. """
  93. assert extensions is None or isinstance(extensions, list), type(extensions)
  94. if extensions is None:
  95. return True
  96. # If the user-specified extensions don't contain a leading dot, we add it here
  97. extensions = [
  98. f".{ext.lower()}" if not ext.startswith(".") else ext.lower()
  99. for ext in extensions
  100. ]
  101. return any(path.lower().endswith(ext) for ext in extensions)
  102. # Mapping from URI schemes to compatible filesystem type_name values.
  103. # Used to validate that a cached filesystem is compatible with a given URI scheme
  104. # before attempting to use it, avoiding silent failures from PyArrow when the
  105. # wrong filesystem type is passed to _resolve_filesystem_and_path.
  106. _SCHEME_TO_FS_TYPE_NAMES = {
  107. "": ("local",), # No scheme = local filesystem
  108. "file": ("local",), # file:// = local filesystem
  109. "s3": ("s3",), # s3:// = S3 filesystem
  110. "s3a": ("s3",), # s3a:// = S3 filesystem (Hadoop compat)
  111. "gs": ("gcs",), # gs:// = GCS filesystem
  112. "gcs": ("gcs",), # gcs:// = GCS filesystem
  113. "hdfs": ("hdfs",), # hdfs:// = Hadoop filesystem
  114. "viewfs": ("hdfs",), # viewfs:// = Hadoop filesystem
  115. "abfs": ("abfs",), # abfs:// = Azure Blob FileSystem
  116. "abfss": ("abfs",), # abfss:// = Azure Blob FileSystem (TLS)
  117. "http": ("py",), # http:// = fsspec HTTP (wrapped in PyFileSystem)
  118. "https": ("py",), # https:// = fsspec HTTP (wrapped in PyFileSystem)
  119. }
  120. def _is_filesystem_compatible_with_scheme(
  121. filesystem: "pyarrow.fs.FileSystem",
  122. scheme: str,
  123. ) -> bool:
  124. """Check if a filesystem is compatible with a URI scheme.
  125. Uses PyArrow's `type_name` property for reliable filesystem type detection.
  126. This prevents silently using the wrong filesystem for a URI, which can result
  127. in malformed paths or incorrect behavior.
  128. Args:
  129. filesystem: The PyArrow filesystem to check.
  130. scheme: The URI scheme (e.g., 's3', 'gs', 'http', 'file', '').
  131. Returns:
  132. True if the filesystem can handle the scheme, False otherwise.
  133. """
  134. # Get expected type names for this scheme
  135. expected_types = _SCHEME_TO_FS_TYPE_NAMES.get(scheme.lower())
  136. if expected_types is None:
  137. # Unknown scheme (e.g., abfs://, az://, custom protocols) - trust user's filesystem
  138. # This preserves backward compatibility for custom filesystems
  139. return True
  140. # Get the actual filesystem type
  141. fs_type = filesystem.type_name
  142. # For PyFileSystem (fsspec wrappers), also check if it's HTTP
  143. if fs_type == "py" and scheme in ("http", "https"):
  144. return _is_http_filesystem(filesystem)
  145. return fs_type in expected_types
  146. def _resolve_single_path_with_fallback(
  147. path: str,
  148. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  149. ) -> Tuple["pyarrow.fs.FileSystem", str]:
  150. """Resolve a single path with filesystem, with fallback to re-resolution on error.
  151. This is a helper for lazy filesystem resolution. If a filesystem is provided,
  152. it first validates that the filesystem type is compatible with the URI scheme,
  153. then attempts to resolve the path. If the filesystem is incompatible or
  154. resolution fails, it re-resolves without the cached filesystem.
  155. Args:
  156. path: A single file/directory path.
  157. filesystem: Optional cached filesystem from previous resolution.
  158. Returns:
  159. Tuple of (resolved_filesystem, resolved_path).
  160. Raises:
  161. ValueError: If path resolution fails.
  162. ImportError: If required dependencies are missing.
  163. """
  164. import pyarrow as pa
  165. from pyarrow.fs import _resolve_filesystem_and_path
  166. path = _resolve_custom_scheme(path)
  167. # Validate/wrap filesystem if needed
  168. try:
  169. filesystem = _validate_and_wrap_filesystem(filesystem)
  170. except TypeError as e:
  171. raise ValueError(f"Invalid filesystem provided: {e}") from e
  172. # Parse scheme to validate filesystem compatibility
  173. parsed = urlparse(path, allow_fragments=False)
  174. scheme = parsed.scheme.lower() if parsed.scheme else ""
  175. # Check HTTP scheme FIRST - PyArrow doesn't support HTTP/HTTPS natively
  176. if scheme in ("http", "https"):
  177. # If we have a compatible cached HTTP filesystem, use it
  178. if filesystem is not None and _is_filesystem_compatible_with_scheme(
  179. filesystem, scheme
  180. ):
  181. return filesystem, path
  182. # Otherwise create a new HTTP filesystem
  183. try:
  184. resolved_filesystem = _get_fsspec_http_filesystem()
  185. resolved_path = path
  186. return resolved_filesystem, resolved_path
  187. except ImportError as import_error:
  188. raise ImportError(
  189. f"Cannot resolve HTTP path '{path}': {import_error}"
  190. ) from import_error
  191. # Try with provided filesystem only if scheme is compatible (fast path for cached FS)
  192. if filesystem is not None and _is_filesystem_compatible_with_scheme(
  193. filesystem, scheme
  194. ):
  195. try:
  196. _, resolved_path = _resolve_filesystem_and_path(path, filesystem)
  197. # Return the wrapped filesystem we passed in.
  198. return filesystem, resolved_path
  199. except Exception:
  200. # Fall through to full resolution without cached filesystem
  201. pass
  202. # Full resolution without cached filesystem
  203. try:
  204. resolved_filesystem, resolved_path = _resolve_filesystem_and_path(path, None)
  205. except (pa.lib.ArrowInvalid, ValueError) as original_error:
  206. # Try URL encoding for paths with special characters that may cause parsing issues
  207. try:
  208. resolved_filesystem, resolved_path = _try_resolve_with_encoding(path, None)
  209. except (pa.lib.ArrowInvalid, ValueError, TypeError) as encoding_error:
  210. # If encoding doesn't help, raise with both errors for full context
  211. raise ValueError(
  212. f"Failed to resolve path '{path}'. Initial error: {original_error}. "
  213. f"URL encoding fallback also failed: {encoding_error}"
  214. ) from original_error
  215. except TypeError as e:
  216. raise ValueError(f"The path: '{path}' has an invalid type {e}") from e
  217. return resolved_filesystem, resolved_path
  218. def _resolve_paths_and_filesystem(
  219. paths: Union[str, List[str]],
  220. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  221. ) -> Tuple[List[str], "pyarrow.fs.FileSystem"]:
  222. """
  223. Resolves and normalizes all provided paths, infers a filesystem from the
  224. paths and assumes that all paths use the same filesystem.
  225. Args:
  226. paths: A single file/directory path or a list of file/directory paths.
  227. A list of paths can contain both files and directories.
  228. filesystem: The filesystem implementation that should be used for
  229. reading these files. If None, a filesystem will be inferred. If not
  230. None, the provided filesystem will still be validated against all
  231. filesystems inferred from the provided paths to ensure
  232. compatibility.
  233. """
  234. if isinstance(paths, str):
  235. paths = [paths]
  236. if isinstance(paths, pathlib.Path):
  237. paths = [str(paths)]
  238. elif not isinstance(paths, list) or any(not isinstance(p, str) for p in paths):
  239. raise ValueError(
  240. "Expected `paths` to be a `str`, `pathlib.Path`, or `list[str]`, but got "
  241. f"`{paths}`"
  242. )
  243. elif len(paths) == 0:
  244. raise ValueError("Must provide at least one path.")
  245. # Validate/wrap filesystem upfront so we return a proper PyArrow filesystem
  246. filesystem = _validate_and_wrap_filesystem(filesystem)
  247. resolved_paths = []
  248. for path in paths:
  249. try:
  250. resolved_filesystem, resolved_path = _resolve_single_path_with_fallback(
  251. path, filesystem
  252. )
  253. except (ValueError, ImportError) as e:
  254. logger.warning(f"Failed to resolve path '{path}': {e}, skipping")
  255. continue
  256. if filesystem is None:
  257. filesystem = resolved_filesystem
  258. # If the PyArrow filesystem is handled by a fsspec HTTPFileSystem, the protocol/
  259. # scheme of paths should not be unwrapped/removed, because HTTPFileSystem
  260. # expects full file paths including protocol/scheme. This is different behavior
  261. # compared to other file system implementation in pyarrow.fs.FileSystem.
  262. if not _is_http_filesystem(resolved_filesystem):
  263. resolved_path = _unwrap_protocol(resolved_path)
  264. resolved_path = resolved_filesystem.normalize_path(resolved_path)
  265. resolved_paths.append(resolved_path)
  266. return resolved_paths, filesystem
  267. def _is_http_filesystem(fs: "pyarrow.fs.FileSystem") -> bool:
  268. """Return whether ``fs`` is a PyFileSystem handled by a fsspec HTTPFileSystem."""
  269. from pyarrow.fs import FSSpecHandler, PyFileSystem
  270. # Try to import HTTPFileSystem
  271. try:
  272. from fsspec.implementations.http import HTTPFileSystem
  273. except ModuleNotFoundError:
  274. return False
  275. if isinstance(fs, RetryingPyFileSystem):
  276. fs = fs.unwrap()
  277. if not isinstance(fs, PyFileSystem):
  278. return False
  279. return isinstance(fs.handler, FSSpecHandler) and isinstance(
  280. fs.handler.fs, HTTPFileSystem
  281. )
  282. def _unwrap_protocol(path):
  283. """
  284. Slice off any protocol prefixes on path.
  285. """
  286. if sys.platform == "win32" and _is_local_windows_path(path):
  287. # Represent as posix path such that downstream functions properly handle it.
  288. # This is executed when 'file://' is NOT included in the path.
  289. return pathlib.Path(path).as_posix()
  290. parsed = urlparse(path, allow_fragments=False) # support '#' in path
  291. params = ";" + parsed.params if parsed.params else "" # support ';' in path
  292. query = "?" + parsed.query if parsed.query else "" # support '?' in path
  293. netloc = parsed.netloc
  294. if parsed.scheme == "s3" and "@" in parsed.netloc:
  295. # If the path contains an @, it is assumed to be an anonymous
  296. # credentialed path, and we need to strip off the credentials.
  297. netloc = parsed.netloc.split("@")[-1]
  298. parsed_path = parsed.path
  299. # urlparse prepends the path with a '/'. This does not work on Windows
  300. # so if this is the case strip the leading slash.
  301. if (
  302. sys.platform == "win32"
  303. and not netloc
  304. and len(parsed_path) >= 3
  305. and parsed_path[0] == "/" # The problematic leading slash
  306. and parsed_path[1].isalpha() # Ensure it is a drive letter.
  307. and parsed_path[2:4] in (":", ":/")
  308. ):
  309. parsed_path = parsed_path[1:]
  310. return netloc + parsed_path + params + query
  311. def _is_url(path) -> bool:
  312. return urlparse(path).scheme != ""
  313. def _is_http_url(path) -> bool:
  314. parsed = urlparse(path)
  315. return parsed.scheme in ("http", "https")
  316. def _is_local_windows_path(path: str) -> bool:
  317. """Determines if path is a Windows file-system location."""
  318. if sys.platform != "win32":
  319. return False
  320. if len(path) >= 1 and path[0] == "\\":
  321. return True
  322. if (
  323. len(path) >= 3
  324. and path[1] == ":"
  325. and (path[2] == "/" or path[2] == "\\")
  326. and path[0].isalpha()
  327. ):
  328. return True
  329. return False