cached.py 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021
  1. from __future__ import annotations
  2. import inspect
  3. import logging
  4. import os
  5. import tempfile
  6. import time
  7. import weakref
  8. from collections.abc import Callable
  9. from shutil import rmtree
  10. from typing import TYPE_CHECKING, Any, ClassVar
  11. from fsspec import filesystem
  12. from fsspec.callbacks import DEFAULT_CALLBACK
  13. from fsspec.compression import compr
  14. from fsspec.core import BaseCache, MMapCache
  15. from fsspec.exceptions import BlocksizeMismatchError
  16. from fsspec.implementations.cache_mapper import create_cache_mapper
  17. from fsspec.implementations.cache_metadata import CacheMetadata
  18. from fsspec.implementations.chained import ChainedFileSystem
  19. from fsspec.implementations.local import LocalFileSystem
  20. from fsspec.spec import AbstractBufferedFile
  21. from fsspec.transaction import Transaction
  22. from fsspec.utils import infer_compression
  23. if TYPE_CHECKING:
  24. from fsspec.implementations.cache_mapper import AbstractCacheMapper
  25. logger = logging.getLogger("fsspec.cached")
  26. class WriteCachedTransaction(Transaction):
  27. def complete(self, commit=True):
  28. rpaths = [f.path for f in self.files]
  29. lpaths = [f.fn for f in self.files]
  30. if commit:
  31. self.fs.put(lpaths, rpaths)
  32. self.files.clear()
  33. self.fs._intrans = False
  34. self.fs._transaction = None
  35. self.fs = None # break cycle
  36. class CachingFileSystem(ChainedFileSystem):
  37. """Locally caching filesystem, layer over any other FS
  38. This class implements chunk-wise local storage of remote files, for quick
  39. access after the initial download. The files are stored in a given
  40. directory with hashes of URLs for the filenames. If no directory is given,
  41. a temporary one is used, which should be cleaned up by the OS after the
  42. process ends. The files themselves are sparse (as implemented in
  43. :class:`~fsspec.caching.MMapCache`), so only the data which is accessed
  44. takes up space.
  45. Restrictions:
  46. - the block-size must be the same for each access of a given file, unless
  47. all blocks of the file have already been read
  48. - caching can only be applied to file-systems which produce files
  49. derived from fsspec.spec.AbstractBufferedFile ; LocalFileSystem is also
  50. allowed, for testing
  51. """
  52. protocol: ClassVar[str | tuple[str, ...]] = ("blockcache", "cached")
  53. _strip_tokenize_options = ("fo",)
  54. def __init__(
  55. self,
  56. target_protocol=None,
  57. cache_storage="TMP",
  58. cache_check=10,
  59. check_files=False,
  60. expiry_time=604800,
  61. target_options=None,
  62. fs=None,
  63. same_names: bool | None = None,
  64. compression=None,
  65. cache_mapper: AbstractCacheMapper | None = None,
  66. **kwargs,
  67. ):
  68. """
  69. Parameters
  70. ----------
  71. target_protocol: str (optional)
  72. Target filesystem protocol. Provide either this or ``fs``.
  73. cache_storage: str or list(str)
  74. Location to store files. If "TMP", this is a temporary directory,
  75. and will be cleaned up by the OS when this process ends (or later).
  76. If a list, each location will be tried in the order given, but
  77. only the last will be considered writable.
  78. cache_check: int
  79. Number of seconds between reload of cache metadata
  80. check_files: bool
  81. Whether to explicitly see if the UID of the remote file matches
  82. the stored one before using. Warning: some file systems such as
  83. HTTP cannot reliably give a unique hash of the contents of some
  84. path, so be sure to set this option to False.
  85. expiry_time: int
  86. The time in seconds after which a local copy is considered useless.
  87. Set to falsy to prevent expiry. The default is equivalent to one
  88. week.
  89. target_options: dict or None
  90. Passed to the instantiation of the FS, if fs is None.
  91. fs: filesystem instance
  92. The target filesystem to run against. Provide this or ``protocol``.
  93. same_names: bool (optional)
  94. By default, target URLs are hashed using a ``HashCacheMapper`` so
  95. that files from different backends with the same basename do not
  96. conflict. If this argument is ``true``, a ``BasenameCacheMapper``
  97. is used instead. Other cache mapper options are available by using
  98. the ``cache_mapper`` keyword argument. Only one of this and
  99. ``cache_mapper`` should be specified.
  100. compression: str (optional)
  101. To decompress on download. Can be 'infer' (guess from the URL name),
  102. one of the entries in ``fsspec.compression.compr``, or None for no
  103. decompression.
  104. cache_mapper: AbstractCacheMapper (optional)
  105. The object use to map from original filenames to cached filenames.
  106. Only one of this and ``same_names`` should be specified.
  107. """
  108. super().__init__(**kwargs)
  109. if fs is None and target_protocol is None:
  110. raise ValueError(
  111. "Please provide filesystem instance(fs) or target_protocol"
  112. )
  113. if not (fs is None) ^ (target_protocol is None):
  114. raise ValueError(
  115. "Both filesystems (fs) and target_protocol may not be both given."
  116. )
  117. if cache_storage == "TMP":
  118. tempdir = tempfile.mkdtemp()
  119. storage = [tempdir]
  120. weakref.finalize(self, self._remove_tempdir, tempdir)
  121. else:
  122. if isinstance(cache_storage, str):
  123. storage = [cache_storage]
  124. else:
  125. storage = cache_storage
  126. os.makedirs(storage[-1], exist_ok=True)
  127. self.storage = storage
  128. self.kwargs = target_options or {}
  129. self.cache_check = cache_check
  130. self.check_files = check_files
  131. self.expiry = expiry_time
  132. self.compression = compression
  133. # Size of cache in bytes. If None then the size is unknown and will be
  134. # recalculated the next time cache_size() is called. On writes to the
  135. # cache this is reset to None.
  136. self._cache_size = None
  137. if same_names is not None and cache_mapper is not None:
  138. raise ValueError(
  139. "Cannot specify both same_names and cache_mapper in "
  140. "CachingFileSystem.__init__"
  141. )
  142. if cache_mapper is not None:
  143. self._mapper = cache_mapper
  144. else:
  145. self._mapper = create_cache_mapper(
  146. same_names if same_names is not None else False
  147. )
  148. self.target_protocol = (
  149. target_protocol
  150. if isinstance(target_protocol, str)
  151. else (fs.protocol if isinstance(fs.protocol, str) else fs.protocol[0])
  152. )
  153. self._metadata = CacheMetadata(self.storage)
  154. self.load_cache()
  155. self.fs = fs if fs is not None else filesystem(target_protocol, **self.kwargs)
  156. def _strip_protocol(path):
  157. # acts as a method, since each instance has a difference target
  158. return self.fs._strip_protocol(type(self)._strip_protocol(path))
  159. self._strip_protocol: Callable = _strip_protocol
  160. @staticmethod
  161. def _remove_tempdir(tempdir):
  162. try:
  163. rmtree(tempdir)
  164. except Exception:
  165. pass
  166. def _mkcache(self):
  167. os.makedirs(self.storage[-1], exist_ok=True)
  168. def cache_size(self):
  169. """Return size of cache in bytes.
  170. If more than one cache directory is in use, only the size of the last
  171. one (the writable cache directory) is returned.
  172. """
  173. if self._cache_size is None:
  174. cache_dir = self.storage[-1]
  175. self._cache_size = filesystem("file").du(cache_dir, withdirs=True)
  176. return self._cache_size
  177. def load_cache(self):
  178. """Read set of stored blocks from file"""
  179. self._metadata.load()
  180. self._mkcache()
  181. self.last_cache = time.time()
  182. def save_cache(self):
  183. """Save set of stored blocks from file"""
  184. self._mkcache()
  185. self._metadata.save()
  186. self.last_cache = time.time()
  187. self._cache_size = None
  188. def _check_cache(self):
  189. """Reload caches if time elapsed or any disappeared"""
  190. self._mkcache()
  191. if not self.cache_check:
  192. # explicitly told not to bother checking
  193. return
  194. timecond = time.time() - self.last_cache > self.cache_check
  195. existcond = all(os.path.exists(storage) for storage in self.storage)
  196. if timecond or not existcond:
  197. self.load_cache()
  198. def _check_file(self, path):
  199. """Is path in cache and still valid"""
  200. path = self._strip_protocol(path)
  201. self._check_cache()
  202. return self._metadata.check_file(path, self)
  203. def clear_cache(self):
  204. """Remove all files and metadata from the cache
  205. In the case of multiple cache locations, this clears only the last one,
  206. which is assumed to be the read/write one.
  207. """
  208. rmtree(self.storage[-1])
  209. self.load_cache()
  210. self._cache_size = None
  211. def clear_expired_cache(self, expiry_time=None):
  212. """Remove all expired files and metadata from the cache
  213. In the case of multiple cache locations, this clears only the last one,
  214. which is assumed to be the read/write one.
  215. Parameters
  216. ----------
  217. expiry_time: int
  218. The time in seconds after which a local copy is considered useless.
  219. If not defined the default is equivalent to the attribute from the
  220. file caching instantiation.
  221. """
  222. if not expiry_time:
  223. expiry_time = self.expiry
  224. self._check_cache()
  225. expired_files, writable_cache_empty = self._metadata.clear_expired(expiry_time)
  226. for fn in expired_files:
  227. if os.path.exists(fn):
  228. os.remove(fn)
  229. if writable_cache_empty:
  230. rmtree(self.storage[-1])
  231. self.load_cache()
  232. self._cache_size = None
  233. def pop_from_cache(self, path):
  234. """Remove cached version of given file
  235. Deletes local copy of the given (remote) path. If it is found in a cache
  236. location which is not the last, it is assumed to be read-only, and
  237. raises PermissionError
  238. """
  239. path = self._strip_protocol(path)
  240. fn = self._metadata.pop_file(path)
  241. if fn is not None:
  242. os.remove(fn)
  243. self._cache_size = None
  244. def _open(
  245. self,
  246. path,
  247. mode="rb",
  248. block_size=None,
  249. autocommit=True,
  250. cache_options=None,
  251. **kwargs,
  252. ):
  253. """Wrap the target _open
  254. If the whole file exists in the cache, just open it locally and
  255. return that.
  256. Otherwise, open the file on the target FS, and make it have a mmap
  257. cache pointing to the location which we determine, in our cache.
  258. The ``blocks`` instance is shared, so as the mmap cache instance
  259. updates, so does the entry in our ``cached_files`` attribute.
  260. We monkey-patch this file, so that when it closes, we call
  261. ``close_and_update`` to save the state of the blocks.
  262. """
  263. path = self._strip_protocol(path)
  264. path = self.fs._strip_protocol(path)
  265. if "r" not in mode:
  266. return self.fs._open(
  267. path,
  268. mode=mode,
  269. block_size=block_size,
  270. autocommit=autocommit,
  271. cache_options=cache_options,
  272. **kwargs,
  273. )
  274. detail = self._check_file(path)
  275. if detail:
  276. # file is in cache
  277. detail, fn = detail
  278. hash, blocks = detail["fn"], detail["blocks"]
  279. if blocks is True:
  280. # stored file is complete
  281. logger.debug("Opening local copy of %s", path)
  282. return open(fn, mode)
  283. # TODO: action where partial file exists in read-only cache
  284. logger.debug("Opening partially cached copy of %s", path)
  285. else:
  286. hash = self._mapper(path)
  287. fn = os.path.join(self.storage[-1], hash)
  288. blocks = set()
  289. detail = {
  290. "original": path,
  291. "fn": hash,
  292. "blocks": blocks,
  293. "time": time.time(),
  294. "uid": self.fs.ukey(path),
  295. }
  296. self._metadata.update_file(path, detail)
  297. logger.debug("Creating local sparse file for %s", path)
  298. # explicitly submitting the size to the open call will avoid extra
  299. # operations when opening. This is particularly relevant
  300. # for any file that is read over a network, e.g. S3.
  301. size = detail.get("size")
  302. # call target filesystems open
  303. self._mkcache()
  304. f = self.fs._open(
  305. path,
  306. mode=mode,
  307. block_size=block_size,
  308. autocommit=autocommit,
  309. cache_options=cache_options,
  310. cache_type="none",
  311. size=size,
  312. **kwargs,
  313. )
  314. # set size if not already set
  315. if size is None:
  316. detail["size"] = f.size
  317. self._metadata.update_file(path, detail)
  318. if self.compression:
  319. comp = (
  320. infer_compression(path)
  321. if self.compression == "infer"
  322. else self.compression
  323. )
  324. f = compr[comp](f, mode="rb")
  325. if "blocksize" in detail:
  326. if detail["blocksize"] != f.blocksize:
  327. raise BlocksizeMismatchError(
  328. f"Cached file must be reopened with same block"
  329. f" size as original (old: {detail['blocksize']},"
  330. f" new {f.blocksize})"
  331. )
  332. else:
  333. detail["blocksize"] = f.blocksize
  334. def _fetch_ranges(ranges):
  335. return self.fs.cat_ranges(
  336. [path] * len(ranges),
  337. [r[0] for r in ranges],
  338. [r[1] for r in ranges],
  339. **kwargs,
  340. )
  341. multi_fetcher = None if self.compression else _fetch_ranges
  342. f.cache = MMapCache(
  343. f.blocksize, f._fetch_range, f.size, fn, blocks, multi_fetcher=multi_fetcher
  344. )
  345. close = f.close
  346. f.close = lambda: self.close_and_update(f, close)
  347. self.save_cache()
  348. return f
  349. def _parent(self, path):
  350. return self.fs._parent(path)
  351. def hash_name(self, path: str, *args: Any) -> str:
  352. # Kept for backward compatibility with downstream libraries.
  353. # Ignores extra arguments, previously same_name boolean.
  354. return self._mapper(path)
  355. def close_and_update(self, f, close):
  356. """Called when a file is closing, so store the set of blocks"""
  357. if f.closed:
  358. return
  359. path = self._strip_protocol(f.path)
  360. self._metadata.on_close_cached_file(f, path)
  361. try:
  362. logger.debug("going to save")
  363. self.save_cache()
  364. logger.debug("saved")
  365. except OSError:
  366. logger.debug("Cache saving failed while closing file")
  367. except NameError:
  368. logger.debug("Cache save failed due to interpreter shutdown")
  369. close()
  370. f.closed = True
  371. def ls(self, path, detail=True):
  372. return self.fs.ls(path, detail)
  373. def __getattribute__(self, item):
  374. if item in {
  375. "load_cache",
  376. "_get_cached_file_before_open",
  377. "_open",
  378. "save_cache",
  379. "close_and_update",
  380. "__init__",
  381. "__getattribute__",
  382. "__reduce__",
  383. "_make_local_details",
  384. "open",
  385. "cat",
  386. "cat_file",
  387. "_cat_file",
  388. "cat_ranges",
  389. "_cat_ranges",
  390. "get",
  391. "read_block",
  392. "tail",
  393. "head",
  394. "info",
  395. "ls",
  396. "exists",
  397. "isfile",
  398. "isdir",
  399. "_check_file",
  400. "_check_cache",
  401. "_mkcache",
  402. "clear_cache",
  403. "clear_expired_cache",
  404. "pop_from_cache",
  405. "local_file",
  406. "_paths_from_path",
  407. "get_mapper",
  408. "open_many",
  409. "commit_many",
  410. "hash_name",
  411. "__hash__",
  412. "__eq__",
  413. "to_json",
  414. "to_dict",
  415. "cache_size",
  416. "pipe_file",
  417. "pipe",
  418. "start_transaction",
  419. "end_transaction",
  420. }:
  421. # all the methods defined in this class. Note `open` here, since
  422. # it calls `_open`, but is actually in superclass
  423. return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  424. *args, **kw
  425. )
  426. if item in ["__reduce_ex__"]:
  427. raise AttributeError
  428. if item in ["transaction"]:
  429. # property
  430. return type(self).transaction.__get__(self)
  431. if item in {"_cache", "transaction_type", "protocol"}:
  432. # class attributes
  433. return getattr(type(self), item)
  434. if item == "__class__":
  435. return type(self)
  436. d = object.__getattribute__(self, "__dict__")
  437. fs = d.get("fs", None) # fs is not immediately defined
  438. if item in d:
  439. return d[item]
  440. elif fs is not None:
  441. if item in fs.__dict__:
  442. # attribute of instance
  443. return fs.__dict__[item]
  444. # attributed belonging to the target filesystem
  445. cls = type(fs)
  446. m = getattr(cls, item)
  447. if (inspect.isfunction(m) or inspect.isdatadescriptor(m)) and (
  448. not hasattr(m, "__self__") or m.__self__ is None
  449. ):
  450. # instance method
  451. return m.__get__(fs, cls)
  452. return m # class method or attribute
  453. else:
  454. # attributes of the superclass, while target is being set up
  455. return super().__getattribute__(item)
  456. def __eq__(self, other):
  457. """Test for equality."""
  458. if self is other:
  459. return True
  460. if not isinstance(other, type(self)):
  461. return False
  462. return (
  463. self.storage == other.storage
  464. and self.kwargs == other.kwargs
  465. and self.cache_check == other.cache_check
  466. and self.check_files == other.check_files
  467. and self.expiry == other.expiry
  468. and self.compression == other.compression
  469. and self._mapper == other._mapper
  470. and self.target_protocol == other.target_protocol
  471. )
  472. def __hash__(self):
  473. """Calculate hash."""
  474. return (
  475. hash(tuple(self.storage))
  476. ^ hash(str(self.kwargs))
  477. ^ hash(self.cache_check)
  478. ^ hash(self.check_files)
  479. ^ hash(self.expiry)
  480. ^ hash(self.compression)
  481. ^ hash(self._mapper)
  482. ^ hash(self.target_protocol)
  483. )
  484. class WholeFileCacheFileSystem(CachingFileSystem):
  485. """Caches whole remote files on first access
  486. This class is intended as a layer over any other file system, and
  487. will make a local copy of each file accessed, so that all subsequent
  488. reads are local. This is similar to ``CachingFileSystem``, but without
  489. the block-wise functionality and so can work even when sparse files
  490. are not allowed. See its docstring for definition of the init
  491. arguments.
  492. The class still needs access to the remote store for listing files,
  493. and may refresh cached files.
  494. """
  495. protocol = "filecache"
  496. local_file = True
  497. def open_many(self, open_files, **kwargs):
  498. paths = [of.path for of in open_files]
  499. if "r" in open_files.mode:
  500. self._mkcache()
  501. else:
  502. return [
  503. LocalTempFile(
  504. self.fs,
  505. path,
  506. mode=open_files.mode,
  507. fn=os.path.join(self.storage[-1], self._mapper(path)),
  508. **kwargs,
  509. )
  510. for path in paths
  511. ]
  512. if self.compression:
  513. raise NotImplementedError
  514. details = [self._check_file(sp) for sp in paths]
  515. downpath = [p for p, d in zip(paths, details) if not d]
  516. downfn0 = [
  517. os.path.join(self.storage[-1], self._mapper(p))
  518. for p, d in zip(paths, details)
  519. ] # keep these path names for opening later
  520. downfn = [fn for fn, d in zip(downfn0, details) if not d]
  521. if downpath:
  522. # skip if all files are already cached and up to date
  523. self.fs.get(downpath, downfn)
  524. # update metadata - only happens when downloads are successful
  525. newdetail = [
  526. {
  527. "original": path,
  528. "fn": self._mapper(path),
  529. "blocks": True,
  530. "time": time.time(),
  531. "uid": self.fs.ukey(path),
  532. }
  533. for path in downpath
  534. ]
  535. for path, detail in zip(downpath, newdetail):
  536. self._metadata.update_file(path, detail)
  537. self.save_cache()
  538. def firstpart(fn):
  539. # helper to adapt both whole-file and simple-cache
  540. return fn[1] if isinstance(fn, tuple) else fn
  541. return [
  542. open(firstpart(fn0) if fn0 else fn1, mode=open_files.mode)
  543. for fn0, fn1 in zip(details, downfn0)
  544. ]
  545. def commit_many(self, open_files):
  546. self.fs.put([f.fn for f in open_files], [f.path for f in open_files])
  547. [f.close() for f in open_files]
  548. for f in open_files:
  549. # in case autocommit is off, and so close did not already delete
  550. try:
  551. os.remove(f.name)
  552. except FileNotFoundError:
  553. pass
  554. self._cache_size = None
  555. def _make_local_details(self, path):
  556. hash = self._mapper(path)
  557. fn = os.path.join(self.storage[-1], hash)
  558. detail = {
  559. "original": path,
  560. "fn": hash,
  561. "blocks": True,
  562. "time": time.time(),
  563. "uid": self.fs.ukey(path),
  564. }
  565. self._metadata.update_file(path, detail)
  566. logger.debug("Copying %s to local cache", path)
  567. return fn
  568. def cat(
  569. self,
  570. path,
  571. recursive=False,
  572. on_error="raise",
  573. callback=DEFAULT_CALLBACK,
  574. **kwargs,
  575. ):
  576. paths = self.expand_path(
  577. path, recursive=recursive, maxdepth=kwargs.get("maxdepth")
  578. )
  579. getpaths = []
  580. storepaths = []
  581. fns = []
  582. out = {}
  583. for p in paths.copy():
  584. try:
  585. detail = self._check_file(p)
  586. if not detail:
  587. fn = self._make_local_details(p)
  588. getpaths.append(p)
  589. storepaths.append(fn)
  590. else:
  591. detail, fn = detail if isinstance(detail, tuple) else (None, detail)
  592. fns.append(fn)
  593. except Exception as e:
  594. if on_error == "raise":
  595. raise
  596. if on_error == "return":
  597. out[p] = e
  598. paths.remove(p)
  599. if getpaths:
  600. self.fs.get(getpaths, storepaths)
  601. self.save_cache()
  602. callback.set_size(len(paths))
  603. for p, fn in zip(paths, fns):
  604. with open(fn, "rb") as f:
  605. out[p] = f.read()
  606. callback.relative_update(1)
  607. if isinstance(path, str) and len(paths) == 1 and recursive is False:
  608. out = out[paths[0]]
  609. return out
  610. def _get_cached_file_before_open(self, path, **kwargs):
  611. fn = self._make_local_details(path)
  612. # call target filesystems open
  613. self._mkcache()
  614. if self.compression:
  615. with self.fs._open(path, mode="rb", **kwargs) as f, open(fn, "wb") as f2:
  616. if isinstance(f, AbstractBufferedFile):
  617. # want no type of caching if just downloading whole thing
  618. f.cache = BaseCache(0, f.cache.fetcher, f.size)
  619. comp = (
  620. infer_compression(path)
  621. if self.compression == "infer"
  622. else self.compression
  623. )
  624. f = compr[comp](f, mode="rb")
  625. data = True
  626. while data:
  627. block = getattr(f, "blocksize", 5 * 2**20)
  628. data = f.read(block)
  629. f2.write(data)
  630. else:
  631. self.fs.get_file(path, fn)
  632. self.save_cache()
  633. def _open(self, path, mode="rb", **kwargs):
  634. path = self._strip_protocol(path)
  635. # For read (or append), (try) download from remote
  636. if "r" in mode or "a" in mode:
  637. if not self._check_file(path):
  638. if self.fs.exists(path):
  639. self._get_cached_file_before_open(path, **kwargs)
  640. elif "r" in mode:
  641. raise FileNotFoundError(path)
  642. detail, fn = self._check_file(path)
  643. _, blocks = detail["fn"], detail["blocks"]
  644. if blocks is True:
  645. logger.debug("Opening local copy of %s", path)
  646. else:
  647. raise ValueError(
  648. f"Attempt to open partially cached file {path}"
  649. f" as a wholly cached file"
  650. )
  651. # Just reading does not need special file handling
  652. if "r" in mode and "+" not in mode:
  653. # In order to support downstream filesystems to be able to
  654. # infer the compression from the original filename, like
  655. # the `TarFileSystem`, let's extend the `io.BufferedReader`
  656. # fileobject protocol by adding a dedicated attribute
  657. # `original`.
  658. f = open(fn, mode)
  659. f.original = detail.get("original")
  660. return f
  661. hash = self._mapper(path)
  662. fn = os.path.join(self.storage[-1], hash)
  663. user_specified_kwargs = {
  664. k: v
  665. for k, v in kwargs.items()
  666. # those kwargs were added by open(), we don't want them
  667. if k not in ["autocommit", "block_size", "cache_options"]
  668. }
  669. return LocalTempFile(self, path, mode=mode, fn=fn, **user_specified_kwargs)
  670. class SimpleCacheFileSystem(WholeFileCacheFileSystem):
  671. """Caches whole remote files on first access
  672. This class is intended as a layer over any other file system, and
  673. will make a local copy of each file accessed, so that all subsequent
  674. reads are local. This implementation only copies whole files, and
  675. does not keep any metadata about the download time or file details.
  676. It is therefore safer to use in multi-threaded/concurrent situations.
  677. This is the only of the caching filesystems that supports write: you will
  678. be given a real local open file, and upon close and commit, it will be
  679. uploaded to the target filesystem; the writability or the target URL is
  680. not checked until that time.
  681. """
  682. protocol = "simplecache"
  683. local_file = True
  684. transaction_type = WriteCachedTransaction
  685. def __init__(self, **kwargs):
  686. kw = kwargs.copy()
  687. for key in ["cache_check", "expiry_time", "check_files"]:
  688. kw[key] = False
  689. super().__init__(**kw)
  690. for storage in self.storage:
  691. if not os.path.exists(storage):
  692. os.makedirs(storage, exist_ok=True)
  693. def _check_file(self, path):
  694. self._check_cache()
  695. sha = self._mapper(path)
  696. for storage in self.storage:
  697. fn = os.path.join(storage, sha)
  698. if os.path.exists(fn):
  699. return fn
  700. def save_cache(self):
  701. pass
  702. def load_cache(self):
  703. pass
  704. def pipe_file(self, path, value=None, **kwargs):
  705. if self._intrans:
  706. with self.open(path, "wb") as f:
  707. f.write(value)
  708. else:
  709. super().pipe_file(path, value)
  710. def ls(self, path, detail=True, **kwargs):
  711. path = self._strip_protocol(path)
  712. details = []
  713. try:
  714. details = self.fs.ls(
  715. path, detail=True, **kwargs
  716. ).copy() # don't edit original!
  717. except FileNotFoundError as e:
  718. ex = e
  719. else:
  720. ex = None
  721. if self._intrans:
  722. path1 = path.rstrip("/") + "/"
  723. for f in self.transaction.files:
  724. if f.path == path:
  725. details.append(
  726. {"name": path, "size": f.size or f.tell(), "type": "file"}
  727. )
  728. elif f.path.startswith(path1):
  729. if f.path.count("/") == path1.count("/"):
  730. details.append(
  731. {"name": f.path, "size": f.size or f.tell(), "type": "file"}
  732. )
  733. else:
  734. dname = "/".join(f.path.split("/")[: path1.count("/") + 1])
  735. details.append({"name": dname, "size": 0, "type": "directory"})
  736. if ex is not None and not details:
  737. raise ex
  738. if detail:
  739. return details
  740. return sorted(_["name"] for _ in details)
  741. def info(self, path, **kwargs):
  742. path = self._strip_protocol(path)
  743. if self._intrans:
  744. f = [_ for _ in self.transaction.files if _.path == path]
  745. if f:
  746. size = os.path.getsize(f[0].fn) if f[0].closed else f[0].tell()
  747. return {"name": path, "size": size, "type": "file"}
  748. f = any(_.path.startswith(path + "/") for _ in self.transaction.files)
  749. if f:
  750. return {"name": path, "size": 0, "type": "directory"}
  751. return self.fs.info(path, **kwargs)
  752. def pipe(self, path, value=None, **kwargs):
  753. if isinstance(path, str):
  754. self.pipe_file(self._strip_protocol(path), value, **kwargs)
  755. elif isinstance(path, dict):
  756. for k, v in path.items():
  757. self.pipe_file(self._strip_protocol(k), v, **kwargs)
  758. else:
  759. raise ValueError("path must be str or dict")
  760. async def _cat_file(self, path, start=None, end=None, **kwargs):
  761. logger.debug("async cat_file %s", path)
  762. path = self._strip_protocol(path)
  763. sha = self._mapper(path)
  764. fn = self._check_file(path)
  765. if not fn:
  766. fn = os.path.join(self.storage[-1], sha)
  767. await self.fs._get_file(path, fn, **kwargs)
  768. with open(fn, "rb") as f: # noqa ASYNC230
  769. if start:
  770. f.seek(start)
  771. size = -1 if end is None else end - f.tell()
  772. return f.read(size)
  773. async def _cat_ranges(
  774. self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
  775. ):
  776. logger.debug("async cat ranges %s", paths)
  777. lpaths = []
  778. rset = set()
  779. download = []
  780. rpaths = []
  781. for p in paths:
  782. fn = self._check_file(p)
  783. if fn is None and p not in rset:
  784. sha = self._mapper(p)
  785. fn = os.path.join(self.storage[-1], sha)
  786. download.append(fn)
  787. rset.add(p)
  788. rpaths.append(p)
  789. lpaths.append(fn)
  790. if download:
  791. await self.fs._get(rpaths, download, on_error=on_error)
  792. return LocalFileSystem().cat_ranges(
  793. lpaths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
  794. )
  795. def cat_ranges(
  796. self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
  797. ):
  798. logger.debug("cat ranges %s", paths)
  799. lpaths = [self._check_file(p) for p in paths]
  800. rpaths = [p for l, p in zip(lpaths, paths) if l is False]
  801. lpaths = [l for l, p in zip(lpaths, paths) if l is False]
  802. self.fs.get(rpaths, lpaths)
  803. paths = [self._check_file(p) for p in paths]
  804. return LocalFileSystem().cat_ranges(
  805. paths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
  806. )
  807. def _get_cached_file_before_open(self, path, **kwargs):
  808. sha = self._mapper(path)
  809. fn = os.path.join(self.storage[-1], sha)
  810. logger.debug("Copying %s to local cache", path)
  811. self._mkcache()
  812. self._cache_size = None
  813. if self.compression:
  814. with self.fs._open(path, mode="rb", **kwargs) as f, open(fn, "wb") as f2:
  815. if isinstance(f, AbstractBufferedFile):
  816. # want no type of caching if just downloading whole thing
  817. f.cache = BaseCache(0, f.cache.fetcher, f.size)
  818. comp = (
  819. infer_compression(path)
  820. if self.compression == "infer"
  821. else self.compression
  822. )
  823. f = compr[comp](f, mode="rb")
  824. data = True
  825. while data:
  826. block = getattr(f, "blocksize", 5 * 2**20)
  827. data = f.read(block)
  828. f2.write(data)
  829. else:
  830. self.fs.get_file(path, fn)
  831. def _open(self, path, mode="rb", **kwargs):
  832. path = self._strip_protocol(path)
  833. sha = self._mapper(path)
  834. # For read (or append), (try) download from remote
  835. if "r" in mode or "a" in mode:
  836. if not self._check_file(path):
  837. # append does not require an existing file but read does
  838. if self.fs.exists(path):
  839. self._get_cached_file_before_open(path, **kwargs)
  840. elif "r" in mode:
  841. raise FileNotFoundError(path)
  842. fn = self._check_file(path)
  843. # Just reading does not need special file handling
  844. if "r" in mode and "+" not in mode:
  845. return open(fn, mode)
  846. fn = os.path.join(self.storage[-1], sha)
  847. user_specified_kwargs = {
  848. k: v
  849. for k, v in kwargs.items()
  850. if k not in ["autocommit", "block_size", "cache_options"]
  851. } # those were added by open()
  852. return LocalTempFile(
  853. self,
  854. path,
  855. mode=mode,
  856. autocommit=not self._intrans,
  857. fn=fn,
  858. **user_specified_kwargs,
  859. )
  860. class LocalTempFile:
  861. """A temporary local file, which will be uploaded on commit"""
  862. def __init__(self, fs, path, fn, mode="wb", autocommit=True, seek=0, **kwargs):
  863. self.fn = fn
  864. self.fh = open(fn, mode)
  865. self.mode = mode
  866. if seek:
  867. self.fh.seek(seek)
  868. self.path = path
  869. self.size = None
  870. self.fs = fs
  871. self.closed = False
  872. self.autocommit = autocommit
  873. self.kwargs = kwargs
  874. def __reduce__(self):
  875. # always open in r+b to allow continuing writing at a location
  876. return (
  877. LocalTempFile,
  878. (self.fs, self.path, self.fn, "r+b", self.autocommit, self.tell()),
  879. )
  880. def __enter__(self):
  881. return self.fh
  882. def __exit__(self, exc_type, exc_val, exc_tb):
  883. self.close()
  884. def close(self):
  885. # self.size = self.fh.tell()
  886. if self.closed:
  887. return
  888. self.fh.close()
  889. self.closed = True
  890. if self.autocommit:
  891. self.commit()
  892. def discard(self):
  893. self.fh.close()
  894. os.remove(self.fn)
  895. def commit(self):
  896. # calling put() with list arguments avoids path expansion and additional operations
  897. # like isdir()
  898. self.fs.put([self.fn], [self.path], **self.kwargs)
  899. # we do not delete the local copy, it's still in the cache.
  900. @property
  901. def name(self):
  902. return self.fn
  903. def __repr__(self) -> str:
  904. return f"LocalTempFile: {self.path}"
  905. def __getattr__(self, item):
  906. return getattr(self.fh, item)