arrow.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. import errno
  2. import io
  3. import os
  4. import secrets
  5. import shutil
  6. from contextlib import suppress
  7. from functools import cached_property, wraps
  8. from urllib.parse import parse_qs
  9. from fsspec.spec import AbstractFileSystem
  10. from fsspec.utils import (
  11. get_package_version_without_import,
  12. infer_storage_options,
  13. mirror_from,
  14. tokenize,
  15. )
  16. def wrap_exceptions(func):
  17. @wraps(func)
  18. def wrapper(*args, **kwargs):
  19. try:
  20. return func(*args, **kwargs)
  21. except OSError as exception:
  22. if not exception.args:
  23. raise
  24. message, *args = exception.args
  25. if isinstance(message, str) and "does not exist" in message:
  26. raise FileNotFoundError(errno.ENOENT, message) from exception
  27. else:
  28. raise
  29. return wrapper
  30. PYARROW_VERSION = None
  31. class ArrowFSWrapper(AbstractFileSystem):
  32. """FSSpec-compatible wrapper of pyarrow.fs.FileSystem.
  33. Parameters
  34. ----------
  35. fs : pyarrow.fs.FileSystem
  36. """
  37. root_marker = "/"
  38. def __init__(self, fs, **kwargs):
  39. global PYARROW_VERSION
  40. PYARROW_VERSION = get_package_version_without_import("pyarrow")
  41. self.fs = fs
  42. super().__init__(**kwargs)
  43. @property
  44. def protocol(self):
  45. return self.fs.type_name
  46. @cached_property
  47. def fsid(self):
  48. return "hdfs_" + tokenize(self.fs.host, self.fs.port)
  49. @classmethod
  50. def _strip_protocol(cls, path):
  51. ops = infer_storage_options(path)
  52. path = ops["path"]
  53. if path.startswith("//"):
  54. # special case for "hdfs://path" (without the triple slash)
  55. path = path[1:]
  56. return path
  57. def ls(self, path, detail=False, **kwargs):
  58. path = self._strip_protocol(path)
  59. from pyarrow.fs import FileSelector
  60. try:
  61. entries = [
  62. self._make_entry(entry)
  63. for entry in self.fs.get_file_info(FileSelector(path))
  64. ]
  65. except (FileNotFoundError, NotADirectoryError):
  66. entries = [self.info(path, **kwargs)]
  67. if detail:
  68. return entries
  69. else:
  70. return [entry["name"] for entry in entries]
  71. def info(self, path, **kwargs):
  72. path = self._strip_protocol(path)
  73. [info] = self.fs.get_file_info([path])
  74. return self._make_entry(info)
  75. def exists(self, path):
  76. path = self._strip_protocol(path)
  77. try:
  78. self.info(path)
  79. except FileNotFoundError:
  80. return False
  81. else:
  82. return True
  83. def _make_entry(self, info):
  84. from pyarrow.fs import FileType
  85. if info.type is FileType.Directory:
  86. kind = "directory"
  87. elif info.type is FileType.File:
  88. kind = "file"
  89. elif info.type is FileType.NotFound:
  90. raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), info.path)
  91. else:
  92. kind = "other"
  93. return {
  94. "name": info.path,
  95. "size": info.size,
  96. "type": kind,
  97. "mtime": info.mtime,
  98. }
  99. @wrap_exceptions
  100. def cp_file(self, path1, path2, **kwargs):
  101. path1 = self._strip_protocol(path1).rstrip("/")
  102. path2 = self._strip_protocol(path2).rstrip("/")
  103. with self._open(path1, "rb") as lstream:
  104. tmp_fname = f"{path2}.tmp.{secrets.token_hex(6)}"
  105. try:
  106. with self.open(tmp_fname, "wb") as rstream:
  107. shutil.copyfileobj(lstream, rstream)
  108. self.fs.move(tmp_fname, path2)
  109. except BaseException:
  110. with suppress(FileNotFoundError):
  111. self.fs.delete_file(tmp_fname)
  112. raise
  113. @wrap_exceptions
  114. def mv(self, path1, path2, **kwargs):
  115. path1 = self._strip_protocol(path1).rstrip("/")
  116. path2 = self._strip_protocol(path2).rstrip("/")
  117. self.fs.move(path1, path2)
  118. @wrap_exceptions
  119. def rm_file(self, path):
  120. path = self._strip_protocol(path)
  121. self.fs.delete_file(path)
  122. @wrap_exceptions
  123. def rm(self, path, recursive=False, maxdepth=None):
  124. path = self._strip_protocol(path).rstrip("/")
  125. if self.isdir(path):
  126. if recursive:
  127. self.fs.delete_dir(path)
  128. else:
  129. raise ValueError("Can't delete directories without recursive=False")
  130. else:
  131. self.fs.delete_file(path)
  132. @wrap_exceptions
  133. def _open(self, path, mode="rb", block_size=None, seekable=True, **kwargs):
  134. if mode == "rb":
  135. if seekable:
  136. method = self.fs.open_input_file
  137. else:
  138. method = self.fs.open_input_stream
  139. elif mode == "wb":
  140. method = self.fs.open_output_stream
  141. elif mode == "ab":
  142. method = self.fs.open_append_stream
  143. else:
  144. raise ValueError(f"unsupported mode for Arrow filesystem: {mode!r}")
  145. _kwargs = {}
  146. if mode != "rb" or not seekable:
  147. if int(PYARROW_VERSION.split(".")[0]) >= 4:
  148. # disable compression auto-detection
  149. _kwargs["compression"] = None
  150. stream = method(path, **_kwargs)
  151. return ArrowFile(self, stream, path, mode, block_size, **kwargs)
  152. @wrap_exceptions
  153. def mkdir(self, path, create_parents=True, **kwargs):
  154. path = self._strip_protocol(path)
  155. if create_parents:
  156. self.makedirs(path, exist_ok=True)
  157. else:
  158. self.fs.create_dir(path, recursive=False)
  159. @wrap_exceptions
  160. def makedirs(self, path, exist_ok=False):
  161. path = self._strip_protocol(path)
  162. self.fs.create_dir(path, recursive=True)
  163. @wrap_exceptions
  164. def rmdir(self, path):
  165. path = self._strip_protocol(path)
  166. self.fs.delete_dir(path)
  167. @wrap_exceptions
  168. def modified(self, path):
  169. path = self._strip_protocol(path)
  170. return self.fs.get_file_info(path).mtime
  171. def cat_file(self, path, start=None, end=None, **kwargs):
  172. kwargs.setdefault("seekable", start not in [None, 0])
  173. return super().cat_file(path, start=None, end=None, **kwargs)
  174. def get_file(self, rpath, lpath, **kwargs):
  175. kwargs.setdefault("seekable", False)
  176. super().get_file(rpath, lpath, **kwargs)
  177. @mirror_from(
  178. "stream",
  179. [
  180. "read",
  181. "seek",
  182. "tell",
  183. "write",
  184. "readable",
  185. "writable",
  186. "close",
  187. "seekable",
  188. ],
  189. )
  190. class ArrowFile(io.IOBase):
  191. def __init__(self, fs, stream, path, mode, block_size=None, **kwargs):
  192. self.path = path
  193. self.mode = mode
  194. self.fs = fs
  195. self.stream = stream
  196. self.blocksize = self.block_size = block_size
  197. self.kwargs = kwargs
  198. def __enter__(self):
  199. return self
  200. @property
  201. def size(self):
  202. if self.stream.seekable():
  203. return self.stream.size()
  204. return None
  205. def __exit__(self, *args):
  206. return self.close()
  207. class HadoopFileSystem(ArrowFSWrapper):
  208. """A wrapper on top of the pyarrow.fs.HadoopFileSystem
  209. to connect it's interface with fsspec"""
  210. protocol = "hdfs"
  211. def __init__(
  212. self,
  213. host="default",
  214. port=0,
  215. user=None,
  216. kerb_ticket=None,
  217. replication=3,
  218. extra_conf=None,
  219. **kwargs,
  220. ):
  221. """
  222. Parameters
  223. ----------
  224. host: str
  225. Hostname, IP or "default" to try to read from Hadoop config
  226. port: int
  227. Port to connect on, or default from Hadoop config if 0
  228. user: str or None
  229. If given, connect as this username
  230. kerb_ticket: str or None
  231. If given, use this ticket for authentication
  232. replication: int
  233. set replication factor of file for write operations. default value is 3.
  234. extra_conf: None or dict
  235. Passed on to HadoopFileSystem
  236. """
  237. from pyarrow.fs import HadoopFileSystem
  238. fs = HadoopFileSystem(
  239. host=host,
  240. port=port,
  241. user=user,
  242. kerb_ticket=kerb_ticket,
  243. replication=replication,
  244. extra_conf=extra_conf,
  245. )
  246. super().__init__(fs=fs, **kwargs)
  247. @staticmethod
  248. def _get_kwargs_from_urls(path):
  249. ops = infer_storage_options(path)
  250. out = {}
  251. if ops.get("host", None):
  252. out["host"] = ops["host"]
  253. if ops.get("username", None):
  254. out["user"] = ops["username"]
  255. if ops.get("port", None):
  256. out["port"] = ops["port"]
  257. if ops.get("url_query", None):
  258. queries = parse_qs(ops["url_query"])
  259. if queries.get("replication", None):
  260. out["replication"] = int(queries["replication"][0])
  261. return out