| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- import logging
- import pathlib
- import sys
- from typing import TYPE_CHECKING, List, Optional, Tuple, Union
- from urllib.parse import quote, unquote, urlparse
- from ray.data._internal.util import RetryingPyFileSystem, _resolve_custom_scheme
- logger = logging.getLogger(__name__)
- if TYPE_CHECKING:
- import fsspec.spec
- import pyarrow
- def _get_fsspec_http_filesystem() -> "pyarrow.fs.PyFileSystem":
- """Get fsspec HTTPFileSystem wrapped in PyArrow PyFileSystem.
- Returns:
- PyFileSystem wrapping fsspec HTTPFileSystem.
- Raises:
- ImportError: If fsspec is not installed.
- """
- try:
- import fsspec # noqa: F401
- from fsspec.implementations.http import HTTPFileSystem
- except ModuleNotFoundError:
- raise ImportError("Please install fsspec to read files from HTTP.") from None
- from pyarrow.fs import FSSpecHandler, PyFileSystem
- return PyFileSystem(FSSpecHandler(HTTPFileSystem()))
- def _validate_and_wrap_filesystem(
- filesystem: Optional[
- Union["pyarrow.fs.FileSystem", "fsspec.spec.AbstractFileSystem"]
- ],
- ) -> Optional["pyarrow.fs.FileSystem"]:
- """Validate filesystem and wrap fsspec filesystems in PyArrow.
- Args:
- filesystem: Filesystem to validate and potentially wrap. Can be None,
- a pyarrow.fs.FileSystem, or an fsspec.spec.AbstractFileSystem.
- Returns:
- None if filesystem is None, otherwise a pyarrow.fs.FileSystem
- (either the original if already PyArrow, or wrapped if fsspec).
- Raises:
- TypeError: If filesystem is not None and not a valid pyarrow or fsspec filesystem.
- """
- if filesystem is None:
- return None
- from pyarrow.fs import FileSystem
- if isinstance(filesystem, FileSystem):
- return filesystem
- try:
- import fsspec # noqa: F401
- except ModuleNotFoundError:
- raise TypeError("fsspec is not installed") from None
- if not isinstance(filesystem, fsspec.spec.AbstractFileSystem):
- raise TypeError(
- f"Filesystem must conform to pyarrow.fs.FileSystem or "
- f"fsspec.spec.AbstractFileSystem, got: {type(filesystem).__name__}"
- )
- from pyarrow.fs import FSSpecHandler, PyFileSystem
- return PyFileSystem(FSSpecHandler(filesystem))
- def _try_resolve_with_encoding(
- path: str,
- filesystem: Optional["pyarrow.fs.FileSystem"],
- ) -> Tuple["pyarrow.fs.FileSystem", str]:
- """Try resolving a path with URL encoding for special characters.
- This handles paths with special characters like ';', '?', '#' that
- may cause URI parsing errors.
- Args:
- path: The path to resolve.
- filesystem: Optional filesystem to validate against.
- Returns:
- Tuple of (resolved_filesystem, resolved_path).
- """
- from pyarrow.fs import _resolve_filesystem_and_path
- encoded_path = quote(path, safe="/:", errors="ignore")
- resolved_filesystem, resolved_path = _resolve_filesystem_and_path(
- encoded_path, filesystem
- )
- return resolved_filesystem, unquote(resolved_path, errors="ignore")
- def _has_file_extension(path: str, extensions: Optional[List[str]]) -> bool:
- """Check if a path has a file extension in the provided list.
- Examples:
- >>> _has_file_extension("foo.csv", ["csv"])
- True
- >>> _has_file_extension("foo.CSV", ["csv"])
- True
- >>> _has_file_extension("foo.CSV", [".csv"])
- True
- >>> _has_file_extension("foo.csv", ["json", "jsonl"])
- False
- >>> _has_file_extension("foo.csv", None)
- True
- Args:
- path: The path to check.
- extensions: A list of extensions to check against. If `None`, any extension is
- considered valid.
- """
- assert extensions is None or isinstance(extensions, list), type(extensions)
- if extensions is None:
- return True
- # If the user-specified extensions don't contain a leading dot, we add it here
- extensions = [
- f".{ext.lower()}" if not ext.startswith(".") else ext.lower()
- for ext in extensions
- ]
- return any(path.lower().endswith(ext) for ext in extensions)
- # Mapping from URI schemes to compatible filesystem type_name values.
- # Used to validate that a cached filesystem is compatible with a given URI scheme
- # before attempting to use it, avoiding silent failures from PyArrow when the
- # wrong filesystem type is passed to _resolve_filesystem_and_path.
- _SCHEME_TO_FS_TYPE_NAMES = {
- "": ("local",), # No scheme = local filesystem
- "file": ("local",), # file:// = local filesystem
- "s3": ("s3",), # s3:// = S3 filesystem
- "s3a": ("s3",), # s3a:// = S3 filesystem (Hadoop compat)
- "gs": ("gcs",), # gs:// = GCS filesystem
- "gcs": ("gcs",), # gcs:// = GCS filesystem
- "hdfs": ("hdfs",), # hdfs:// = Hadoop filesystem
- "viewfs": ("hdfs",), # viewfs:// = Hadoop filesystem
- "abfs": ("abfs",), # abfs:// = Azure Blob FileSystem
- "abfss": ("abfs",), # abfss:// = Azure Blob FileSystem (TLS)
- "http": ("py",), # http:// = fsspec HTTP (wrapped in PyFileSystem)
- "https": ("py",), # https:// = fsspec HTTP (wrapped in PyFileSystem)
- }
- def _is_filesystem_compatible_with_scheme(
- filesystem: "pyarrow.fs.FileSystem",
- scheme: str,
- ) -> bool:
- """Check if a filesystem is compatible with a URI scheme.
- Uses PyArrow's `type_name` property for reliable filesystem type detection.
- This prevents silently using the wrong filesystem for a URI, which can result
- in malformed paths or incorrect behavior.
- Args:
- filesystem: The PyArrow filesystem to check.
- scheme: The URI scheme (e.g., 's3', 'gs', 'http', 'file', '').
- Returns:
- True if the filesystem can handle the scheme, False otherwise.
- """
- # Get expected type names for this scheme
- expected_types = _SCHEME_TO_FS_TYPE_NAMES.get(scheme.lower())
- if expected_types is None:
- # Unknown scheme (e.g., abfs://, az://, custom protocols) - trust user's filesystem
- # This preserves backward compatibility for custom filesystems
- return True
- # Get the actual filesystem type
- fs_type = filesystem.type_name
- # For PyFileSystem (fsspec wrappers), also check if it's HTTP
- if fs_type == "py" and scheme in ("http", "https"):
- return _is_http_filesystem(filesystem)
- return fs_type in expected_types
- def _resolve_single_path_with_fallback(
- path: str,
- filesystem: Optional["pyarrow.fs.FileSystem"] = None,
- ) -> Tuple["pyarrow.fs.FileSystem", str]:
- """Resolve a single path with filesystem, with fallback to re-resolution on error.
- This is a helper for lazy filesystem resolution. If a filesystem is provided,
- it first validates that the filesystem type is compatible with the URI scheme,
- then attempts to resolve the path. If the filesystem is incompatible or
- resolution fails, it re-resolves without the cached filesystem.
- Args:
- path: A single file/directory path.
- filesystem: Optional cached filesystem from previous resolution.
- Returns:
- Tuple of (resolved_filesystem, resolved_path).
- Raises:
- ValueError: If path resolution fails.
- ImportError: If required dependencies are missing.
- """
- import pyarrow as pa
- from pyarrow.fs import _resolve_filesystem_and_path
- path = _resolve_custom_scheme(path)
- # Validate/wrap filesystem if needed
- try:
- filesystem = _validate_and_wrap_filesystem(filesystem)
- except TypeError as e:
- raise ValueError(f"Invalid filesystem provided: {e}") from e
- # Parse scheme to validate filesystem compatibility
- parsed = urlparse(path, allow_fragments=False)
- scheme = parsed.scheme.lower() if parsed.scheme else ""
- # Check HTTP scheme FIRST - PyArrow doesn't support HTTP/HTTPS natively
- if scheme in ("http", "https"):
- # If we have a compatible cached HTTP filesystem, use it
- if filesystem is not None and _is_filesystem_compatible_with_scheme(
- filesystem, scheme
- ):
- return filesystem, path
- # Otherwise create a new HTTP filesystem
- try:
- resolved_filesystem = _get_fsspec_http_filesystem()
- resolved_path = path
- return resolved_filesystem, resolved_path
- except ImportError as import_error:
- raise ImportError(
- f"Cannot resolve HTTP path '{path}': {import_error}"
- ) from import_error
- # Try with provided filesystem only if scheme is compatible (fast path for cached FS)
- if filesystem is not None and _is_filesystem_compatible_with_scheme(
- filesystem, scheme
- ):
- try:
- _, resolved_path = _resolve_filesystem_and_path(path, filesystem)
- # Return the wrapped filesystem we passed in.
- return filesystem, resolved_path
- except Exception:
- # Fall through to full resolution without cached filesystem
- pass
- # Full resolution without cached filesystem
- try:
- resolved_filesystem, resolved_path = _resolve_filesystem_and_path(path, None)
- except (pa.lib.ArrowInvalid, ValueError) as original_error:
- # Try URL encoding for paths with special characters that may cause parsing issues
- try:
- resolved_filesystem, resolved_path = _try_resolve_with_encoding(path, None)
- except (pa.lib.ArrowInvalid, ValueError, TypeError) as encoding_error:
- # If encoding doesn't help, raise with both errors for full context
- raise ValueError(
- f"Failed to resolve path '{path}'. Initial error: {original_error}. "
- f"URL encoding fallback also failed: {encoding_error}"
- ) from original_error
- except TypeError as e:
- raise ValueError(f"The path: '{path}' has an invalid type {e}") from e
- return resolved_filesystem, resolved_path
- def _resolve_paths_and_filesystem(
- paths: Union[str, List[str]],
- filesystem: Optional["pyarrow.fs.FileSystem"] = None,
- ) -> Tuple[List[str], "pyarrow.fs.FileSystem"]:
- """
- Resolves and normalizes all provided paths, infers a filesystem from the
- paths and assumes that all paths use the same filesystem.
- Args:
- paths: A single file/directory path or a list of file/directory paths.
- A list of paths can contain both files and directories.
- filesystem: The filesystem implementation that should be used for
- reading these files. If None, a filesystem will be inferred. If not
- None, the provided filesystem will still be validated against all
- filesystems inferred from the provided paths to ensure
- compatibility.
- """
- if isinstance(paths, str):
- paths = [paths]
- if isinstance(paths, pathlib.Path):
- paths = [str(paths)]
- elif not isinstance(paths, list) or any(not isinstance(p, str) for p in paths):
- raise ValueError(
- "Expected `paths` to be a `str`, `pathlib.Path`, or `list[str]`, but got "
- f"`{paths}`"
- )
- elif len(paths) == 0:
- raise ValueError("Must provide at least one path.")
- # Validate/wrap filesystem upfront so we return a proper PyArrow filesystem
- filesystem = _validate_and_wrap_filesystem(filesystem)
- resolved_paths = []
- for path in paths:
- try:
- resolved_filesystem, resolved_path = _resolve_single_path_with_fallback(
- path, filesystem
- )
- except (ValueError, ImportError) as e:
- logger.warning(f"Failed to resolve path '{path}': {e}, skipping")
- continue
- if filesystem is None:
- filesystem = resolved_filesystem
- # If the PyArrow filesystem is handled by a fsspec HTTPFileSystem, the protocol/
- # scheme of paths should not be unwrapped/removed, because HTTPFileSystem
- # expects full file paths including protocol/scheme. This is different behavior
- # compared to other file system implementation in pyarrow.fs.FileSystem.
- if not _is_http_filesystem(resolved_filesystem):
- resolved_path = _unwrap_protocol(resolved_path)
- resolved_path = resolved_filesystem.normalize_path(resolved_path)
- resolved_paths.append(resolved_path)
- return resolved_paths, filesystem
- def _is_http_filesystem(fs: "pyarrow.fs.FileSystem") -> bool:
- """Return whether ``fs`` is a PyFileSystem handled by a fsspec HTTPFileSystem."""
- from pyarrow.fs import FSSpecHandler, PyFileSystem
- # Try to import HTTPFileSystem
- try:
- from fsspec.implementations.http import HTTPFileSystem
- except ModuleNotFoundError:
- return False
- if isinstance(fs, RetryingPyFileSystem):
- fs = fs.unwrap()
- if not isinstance(fs, PyFileSystem):
- return False
- return isinstance(fs.handler, FSSpecHandler) and isinstance(
- fs.handler.fs, HTTPFileSystem
- )
- def _unwrap_protocol(path):
- """
- Slice off any protocol prefixes on path.
- """
- if sys.platform == "win32" and _is_local_windows_path(path):
- # Represent as posix path such that downstream functions properly handle it.
- # This is executed when 'file://' is NOT included in the path.
- return pathlib.Path(path).as_posix()
- parsed = urlparse(path, allow_fragments=False) # support '#' in path
- params = ";" + parsed.params if parsed.params else "" # support ';' in path
- query = "?" + parsed.query if parsed.query else "" # support '?' in path
- netloc = parsed.netloc
- if parsed.scheme == "s3" and "@" in parsed.netloc:
- # If the path contains an @, it is assumed to be an anonymous
- # credentialed path, and we need to strip off the credentials.
- netloc = parsed.netloc.split("@")[-1]
- parsed_path = parsed.path
- # urlparse prepends the path with a '/'. This does not work on Windows
- # so if this is the case strip the leading slash.
- if (
- sys.platform == "win32"
- and not netloc
- and len(parsed_path) >= 3
- and parsed_path[0] == "/" # The problematic leading slash
- and parsed_path[1].isalpha() # Ensure it is a drive letter.
- and parsed_path[2:4] in (":", ":/")
- ):
- parsed_path = parsed_path[1:]
- return netloc + parsed_path + params + query
- def _is_url(path) -> bool:
- return urlparse(path).scheme != ""
- def _is_http_url(path) -> bool:
- parsed = urlparse(path)
- return parsed.scheme in ("http", "https")
- def _is_local_windows_path(path: str) -> bool:
- """Determines if path is a Windows file-system location."""
- if sys.platform != "win32":
- return False
- if len(path) >= 1 and path[0] == "\\":
- return True
- if (
- len(path) >= 3
- and path[1] == ":"
- and (path[2] == "/" or path[2] == "\\")
- and path[0].isalpha()
- ):
- return True
- return False
|