generic.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. from __future__ import annotations
  2. import inspect
  3. import logging
  4. import os
  5. import shutil
  6. import uuid
  7. from .asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper
  8. from .callbacks import DEFAULT_CALLBACK
  9. from .core import filesystem, get_filesystem_class, split_protocol, url_to_fs
  10. _generic_fs = {}
  11. logger = logging.getLogger("fsspec.generic")
  12. def set_generic_fs(protocol, **storage_options):
  13. """Populate the dict used for method=="generic" lookups"""
  14. _generic_fs[protocol] = filesystem(protocol, **storage_options)
  15. def _resolve_fs(url, method, protocol=None, storage_options=None):
  16. """Pick instance of backend FS"""
  17. url = url[0] if isinstance(url, (list, tuple)) else url
  18. protocol = protocol or split_protocol(url)[0]
  19. storage_options = storage_options or {}
  20. if method == "default":
  21. return filesystem(protocol)
  22. if method == "generic":
  23. return _generic_fs[protocol]
  24. if method == "current":
  25. cls = get_filesystem_class(protocol)
  26. return cls.current()
  27. if method == "options":
  28. fs, _ = url_to_fs(url, **storage_options.get(protocol, {}))
  29. return fs
  30. raise ValueError(f"Unknown FS resolution method: {method}")
  31. def rsync(
  32. source,
  33. destination,
  34. delete_missing=False,
  35. source_field="size",
  36. dest_field="size",
  37. update_cond="different",
  38. inst_kwargs=None,
  39. fs=None,
  40. **kwargs,
  41. ):
  42. """Sync files between two directory trees
  43. (experimental)
  44. Parameters
  45. ----------
  46. source: str
  47. Root of the directory tree to take files from. This must be a directory, but
  48. do not include any terminating "/" character
  49. destination: str
  50. Root path to copy into. The contents of this location should be
  51. identical to the contents of ``source`` when done. This will be made a
  52. directory, and the terminal "/" should not be included.
  53. delete_missing: bool
  54. If there are paths in the destination that don't exist in the
  55. source and this is True, delete them. Otherwise, leave them alone.
  56. source_field: str | callable
  57. If ``update_field`` is "different", this is the key in the info
  58. of source files to consider for difference. Maybe a function of the
  59. info dict.
  60. dest_field: str | callable
  61. If ``update_field`` is "different", this is the key in the info
  62. of destination files to consider for difference. May be a function of
  63. the info dict.
  64. update_cond: "different"|"always"|"never"
  65. If "always", every file is copied, regardless of whether it exists in
  66. the destination. If "never", files that exist in the destination are
  67. not copied again. If "different" (default), only copy if the info
  68. fields given by ``source_field`` and ``dest_field`` (usually "size")
  69. are different. Other comparisons may be added in the future.
  70. inst_kwargs: dict|None
  71. If ``fs`` is None, use this set of keyword arguments to make a
  72. GenericFileSystem instance
  73. fs: GenericFileSystem|None
  74. Instance to use if explicitly given. The instance defines how to
  75. to make downstream file system instances from paths.
  76. Returns
  77. -------
  78. dict of the copy operations that were performed, {source: destination}
  79. """
  80. fs = fs or GenericFileSystem(**(inst_kwargs or {}))
  81. source = fs._strip_protocol(source)
  82. destination = fs._strip_protocol(destination)
  83. allfiles = fs.find(source, withdirs=True, detail=True)
  84. if not fs.isdir(source):
  85. raise ValueError("Can only rsync on a directory")
  86. otherfiles = fs.find(destination, withdirs=True, detail=True)
  87. dirs = [
  88. a
  89. for a, v in allfiles.items()
  90. if v["type"] == "directory" and a.replace(source, destination) not in otherfiles
  91. ]
  92. logger.debug(f"{len(dirs)} directories to create")
  93. if dirs:
  94. fs.make_many_dirs(
  95. [dirn.replace(source, destination) for dirn in dirs], exist_ok=True
  96. )
  97. allfiles = {a: v for a, v in allfiles.items() if v["type"] == "file"}
  98. logger.debug(f"{len(allfiles)} files to consider for copy")
  99. to_delete = [
  100. o
  101. for o, v in otherfiles.items()
  102. if o.replace(destination, source) not in allfiles and v["type"] == "file"
  103. ]
  104. for k, v in allfiles.copy().items():
  105. otherfile = k.replace(source, destination)
  106. if otherfile in otherfiles:
  107. if update_cond == "always":
  108. allfiles[k] = otherfile
  109. elif update_cond == "never":
  110. allfiles.pop(k)
  111. elif update_cond == "different":
  112. inf1 = source_field(v) if callable(source_field) else v[source_field]
  113. v2 = otherfiles[otherfile]
  114. inf2 = dest_field(v2) if callable(dest_field) else v2[dest_field]
  115. if inf1 != inf2:
  116. # details mismatch, make copy
  117. allfiles[k] = otherfile
  118. else:
  119. # details match, don't copy
  120. allfiles.pop(k)
  121. else:
  122. # file not in target yet
  123. allfiles[k] = otherfile
  124. logger.debug(f"{len(allfiles)} files to copy")
  125. if allfiles:
  126. source_files, target_files = zip(*allfiles.items())
  127. fs.cp(source_files, target_files, **kwargs)
  128. logger.debug(f"{len(to_delete)} files to delete")
  129. if delete_missing and to_delete:
  130. fs.rm(to_delete)
  131. return allfiles
  132. class GenericFileSystem(AsyncFileSystem):
  133. """Wrapper over all other FS types
  134. <experimental!>
  135. This implementation is a single unified interface to be able to run FS operations
  136. over generic URLs, and dispatch to the specific implementations using the URL
  137. protocol prefix.
  138. Note: instances of this FS are always async, even if you never use it with any async
  139. backend.
  140. """
  141. protocol = "generic" # there is no real reason to ever use a protocol with this FS
  142. def __init__(self, default_method="default", storage_options=None, **kwargs):
  143. """
  144. Parameters
  145. ----------
  146. default_method: str (optional)
  147. Defines how to configure backend FS instances. Options are:
  148. - "default": instantiate like FSClass(), with no
  149. extra arguments; this is the default instance of that FS, and can be
  150. configured via the config system
  151. - "generic": takes instances from the `_generic_fs` dict in this module,
  152. which you must populate before use. Keys are by protocol
  153. - "options": expects storage_options, a dict mapping protocol to
  154. kwargs to use when constructing the filesystem
  155. - "current": takes the most recently instantiated version of each FS
  156. """
  157. self.method = default_method
  158. self.st_opts = storage_options
  159. super().__init__(**kwargs)
  160. def _parent(self, path):
  161. fs = _resolve_fs(path, self.method, storage_options=self.st_opts)
  162. return fs.unstrip_protocol(fs._parent(path))
  163. def _strip_protocol(self, path):
  164. # normalization only
  165. fs = _resolve_fs(path, self.method, storage_options=self.st_opts)
  166. return fs.unstrip_protocol(fs._strip_protocol(path))
  167. async def _find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
  168. fs = _resolve_fs(path, self.method, storage_options=self.st_opts)
  169. if fs.async_impl:
  170. out = await fs._find(
  171. path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs
  172. )
  173. else:
  174. out = fs.find(
  175. path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs
  176. )
  177. result = {}
  178. for k, v in out.items():
  179. v = v.copy() # don't corrupt target FS dircache
  180. name = fs.unstrip_protocol(k)
  181. v["name"] = name
  182. result[name] = v
  183. if detail:
  184. return result
  185. return list(result)
  186. async def _info(self, url, **kwargs):
  187. fs = _resolve_fs(url, self.method)
  188. if fs.async_impl:
  189. out = await fs._info(url, **kwargs)
  190. else:
  191. out = fs.info(url, **kwargs)
  192. out = out.copy() # don't edit originals
  193. out["name"] = fs.unstrip_protocol(out["name"])
  194. return out
  195. async def _ls(
  196. self,
  197. url,
  198. detail=True,
  199. **kwargs,
  200. ):
  201. fs = _resolve_fs(url, self.method)
  202. if fs.async_impl:
  203. out = await fs._ls(url, detail=True, **kwargs)
  204. else:
  205. out = fs.ls(url, detail=True, **kwargs)
  206. out = [o.copy() for o in out] # don't edit originals
  207. for o in out:
  208. o["name"] = fs.unstrip_protocol(o["name"])
  209. if detail:
  210. return out
  211. else:
  212. return [o["name"] for o in out]
  213. async def _cat_file(
  214. self,
  215. url,
  216. **kwargs,
  217. ):
  218. fs = _resolve_fs(url, self.method)
  219. if fs.async_impl:
  220. return await fs._cat_file(url, **kwargs)
  221. else:
  222. return fs.cat_file(url, **kwargs)
  223. async def _pipe_file(
  224. self,
  225. path,
  226. value,
  227. **kwargs,
  228. ):
  229. fs = _resolve_fs(path, self.method, storage_options=self.st_opts)
  230. if fs.async_impl:
  231. return await fs._pipe_file(path, value, **kwargs)
  232. else:
  233. return fs.pipe_file(path, value, **kwargs)
  234. async def _rm(self, url, **kwargs):
  235. urls = url
  236. if isinstance(urls, str):
  237. urls = [urls]
  238. fs = _resolve_fs(urls[0], self.method)
  239. if fs.async_impl:
  240. await fs._rm(urls, **kwargs)
  241. else:
  242. fs.rm(url, **kwargs)
  243. async def _makedirs(self, path, exist_ok=False):
  244. logger.debug("Make dir %s", path)
  245. fs = _resolve_fs(path, self.method, storage_options=self.st_opts)
  246. if fs.async_impl:
  247. await fs._makedirs(path, exist_ok=exist_ok)
  248. else:
  249. fs.makedirs(path, exist_ok=exist_ok)
  250. def rsync(self, source, destination, **kwargs):
  251. """Sync files between two directory trees
  252. See `func:rsync` for more details.
  253. """
  254. rsync(source, destination, fs=self, **kwargs)
  255. async def _cp_file(
  256. self,
  257. url,
  258. url2,
  259. blocksize=2**20,
  260. callback=DEFAULT_CALLBACK,
  261. tempdir: str | None = None,
  262. **kwargs,
  263. ):
  264. fs = _resolve_fs(url, self.method)
  265. fs2 = _resolve_fs(url2, self.method)
  266. if fs is fs2:
  267. # pure remote
  268. if fs.async_impl:
  269. return await fs._copy(url, url2, **kwargs)
  270. else:
  271. return fs.copy(url, url2, **kwargs)
  272. await copy_file_op(fs, [url], fs2, [url2], tempdir, 1, on_error="raise")
  273. async def _make_many_dirs(self, urls, exist_ok=True):
  274. fs = _resolve_fs(urls[0], self.method)
  275. if fs.async_impl:
  276. coros = [fs._makedirs(u, exist_ok=exist_ok) for u in urls]
  277. await _run_coros_in_chunks(coros)
  278. else:
  279. for u in urls:
  280. fs.makedirs(u, exist_ok=exist_ok)
  281. make_many_dirs = sync_wrapper(_make_many_dirs)
  282. async def _copy(
  283. self,
  284. path1: list[str],
  285. path2: list[str],
  286. recursive: bool = False,
  287. on_error: str = "ignore",
  288. maxdepth: int | None = None,
  289. batch_size: int | None = None,
  290. tempdir: str | None = None,
  291. **kwargs,
  292. ):
  293. # TODO: special case for one FS being local, which can use get/put
  294. # TODO: special case for one being memFS, which can use cat/pipe
  295. if recursive:
  296. raise NotImplementedError("Please use fsspec.generic.rsync")
  297. path1 = [path1] if isinstance(path1, str) else path1
  298. path2 = [path2] if isinstance(path2, str) else path2
  299. fs = _resolve_fs(path1, self.method)
  300. fs2 = _resolve_fs(path2, self.method)
  301. if fs is fs2:
  302. if fs.async_impl:
  303. return await fs._copy(path1, path2, **kwargs)
  304. else:
  305. return fs.copy(path1, path2, **kwargs)
  306. await copy_file_op(
  307. fs, path1, fs2, path2, tempdir, batch_size, on_error=on_error
  308. )
  309. async def copy_file_op(
  310. fs1, url1, fs2, url2, tempdir=None, batch_size=20, on_error="ignore"
  311. ):
  312. import tempfile
  313. tempdir = tempdir or tempfile.mkdtemp()
  314. try:
  315. coros = [
  316. _copy_file_op(
  317. fs1,
  318. u1,
  319. fs2,
  320. u2,
  321. os.path.join(tempdir, uuid.uuid4().hex),
  322. )
  323. for u1, u2 in zip(url1, url2)
  324. ]
  325. out = await _run_coros_in_chunks(
  326. coros, batch_size=batch_size, return_exceptions=True
  327. )
  328. finally:
  329. shutil.rmtree(tempdir)
  330. if on_error == "return":
  331. return out
  332. elif on_error == "raise":
  333. for o in out:
  334. if isinstance(o, Exception):
  335. raise o
  336. async def _copy_file_op(fs1, url1, fs2, url2, local, on_error="ignore"):
  337. if fs1.async_impl:
  338. await fs1._get_file(url1, local)
  339. else:
  340. fs1.get_file(url1, local)
  341. if fs2.async_impl:
  342. await fs2._put_file(local, url2)
  343. else:
  344. fs2.put_file(local, url2)
  345. os.unlink(local)
  346. logger.debug("Copy %s -> %s; done", url1, url2)
  347. async def maybe_await(cor):
  348. if inspect.iscoroutine(cor):
  349. return await cor
  350. else:
  351. return cor