| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103 |
- import asyncio
- import asyncio.events
- import functools
- import inspect
- import io
- import numbers
- import os
- import re
- import threading
- from collections.abc import Iterable
- from glob import has_magic
- from typing import TYPE_CHECKING
- from .callbacks import DEFAULT_CALLBACK
- from .exceptions import FSTimeoutError
- from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep
- from .spec import AbstractBufferedFile, AbstractFileSystem
- from .utils import glob_translate, is_exception, other_paths
- private = re.compile("_[^_]")
- iothread = [None] # dedicated fsspec IO thread
- loop = [None] # global event loop for any non-async instance
- _lock = None # global lock placeholder
- get_running_loop = asyncio.get_running_loop
- def get_lock():
- """Allocate or return a threading lock.
- The lock is allocated on first use to allow setting one lock per forked process.
- """
- global _lock
- if not _lock:
- _lock = threading.Lock()
- return _lock
- def reset_lock():
- """Reset the global lock.
- This should be called only on the init of a forked process to reset the lock to
- None, enabling the new forked process to get a new lock.
- """
- global _lock
- iothread[0] = None
- loop[0] = None
- _lock = None
- async def _runner(event, coro, result, timeout=None):
- timeout = timeout if timeout else None # convert 0 or 0.0 to None
- if timeout is not None:
- coro = asyncio.wait_for(coro, timeout=timeout)
- try:
- result[0] = await coro
- except Exception as ex:
- result[0] = ex
- finally:
- event.set()
- def sync(loop, func, *args, timeout=None, **kwargs):
- """
- Make loop run coroutine until it returns. Runs in other thread
- Examples
- --------
- >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
- timeout=timeout, **kwargs)
- """
- timeout = timeout if timeout else None # convert 0 or 0.0 to None
- # NB: if the loop is not running *yet*, it is OK to submit work
- # and we will wait for it
- if loop is None or loop.is_closed():
- raise RuntimeError("Loop is not running")
- try:
- loop0 = asyncio.events.get_running_loop()
- if loop0 is loop:
- raise NotImplementedError("Calling sync() from within a running loop")
- except NotImplementedError:
- raise
- except RuntimeError:
- pass
- coro = func(*args, **kwargs)
- result = [None]
- event = threading.Event()
- asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
- while True:
- # this loops allows thread to get interrupted
- if event.wait(1):
- break
- if timeout is not None:
- timeout -= 1
- if timeout < 0:
- raise FSTimeoutError
- return_result = result[0]
- if isinstance(return_result, asyncio.TimeoutError):
- # suppress asyncio.TimeoutError, raise FSTimeoutError
- raise FSTimeoutError from return_result
- elif isinstance(return_result, BaseException):
- raise return_result
- else:
- return return_result
- def sync_wrapper(func, obj=None):
- """Given a function, make so can be called in blocking contexts
- Leave obj=None if defining within a class. Pass the instance if attaching
- as an attribute of the instance.
- """
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- self = obj or args[0]
- return sync(self.loop, func, *args, **kwargs)
- return wrapper
- def get_loop():
- """Create or return the default fsspec IO loop
- The loop will be running on a separate thread.
- """
- if loop[0] is None:
- with get_lock():
- # repeat the check just in case the loop got filled between the
- # previous two calls from another thread
- if loop[0] is None:
- loop[0] = asyncio.new_event_loop()
- th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
- th.daemon = True
- th.start()
- iothread[0] = th
- return loop[0]
- def reset_after_fork():
- global lock
- loop[0] = None
- iothread[0] = None
- lock = None
- if hasattr(os, "register_at_fork"):
- # should be posix; this will do nothing for spawn or forkserver subprocesses
- os.register_at_fork(after_in_child=reset_after_fork)
- if TYPE_CHECKING:
- import resource
- ResourceError = resource.error
- else:
- try:
- import resource
- except ImportError:
- resource = None
- ResourceError = OSError
- else:
- ResourceError = getattr(resource, "error", OSError)
- _DEFAULT_BATCH_SIZE = 128
- _NOFILES_DEFAULT_BATCH_SIZE = 1280
- def _get_batch_size(nofiles=False):
- from fsspec.config import conf
- if nofiles:
- if "nofiles_gather_batch_size" in conf:
- return conf["nofiles_gather_batch_size"]
- else:
- if "gather_batch_size" in conf:
- return conf["gather_batch_size"]
- if nofiles:
- return _NOFILES_DEFAULT_BATCH_SIZE
- if resource is None:
- return _DEFAULT_BATCH_SIZE
- try:
- soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
- except (ImportError, ValueError, ResourceError):
- return _DEFAULT_BATCH_SIZE
- if soft_limit == resource.RLIM_INFINITY:
- return -1
- else:
- return soft_limit // 8
- def running_async() -> bool:
- """Being executed by an event loop?"""
- try:
- asyncio.get_running_loop()
- return True
- except RuntimeError:
- return False
- async def _run_coros_in_chunks(
- coros,
- batch_size=None,
- callback=DEFAULT_CALLBACK,
- timeout=None,
- return_exceptions=False,
- nofiles=False,
- ):
- """Run the given coroutines in chunks.
- Parameters
- ----------
- coros: list of coroutines to run
- batch_size: int or None
- Number of coroutines to submit/wait on simultaneously.
- If -1, then it will not be any throttling. If
- None, it will be inferred from _get_batch_size()
- callback: fsspec.callbacks.Callback instance
- Gets a relative_update when each coroutine completes
- timeout: number or None
- If given, each coroutine times out after this time. Note that, since
- there are multiple batches, the total run time of this function will in
- general be longer
- return_exceptions: bool
- Same meaning as in asyncio.gather
- nofiles: bool
- If inferring the batch_size, does this operation involve local files?
- If yes, you normally expect smaller batches.
- """
- if batch_size is None:
- batch_size = _get_batch_size(nofiles=nofiles)
- if batch_size == -1:
- batch_size = len(coros)
- assert batch_size > 0
- async def _run_coro(coro, i):
- try:
- return await asyncio.wait_for(coro, timeout=timeout), i
- except Exception as e:
- if not return_exceptions:
- raise
- return e, i
- finally:
- callback.relative_update(1)
- i = 0
- n = len(coros)
- results = [None] * n
- pending = set()
- while pending or i < n:
- while len(pending) < batch_size and i < n:
- pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
- i += 1
- if not pending:
- break
- done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
- while done:
- result, k = await done.pop()
- results[k] = result
- return results
- # these methods should be implemented as async by any async-able backend
- async_methods = [
- "_ls",
- "_cat_file",
- "_get_file",
- "_put_file",
- "_rm_file",
- "_cp_file",
- "_pipe_file",
- "_expand_path",
- "_info",
- "_isfile",
- "_isdir",
- "_exists",
- "_walk",
- "_glob",
- "_find",
- "_du",
- "_size",
- "_mkdir",
- "_makedirs",
- ]
- class AsyncFileSystem(AbstractFileSystem):
- """Async file operations, default implementations
- Passes bulk operations to asyncio.gather for concurrent operation.
- Implementations that have concurrent batch operations and/or async methods
- should inherit from this class instead of AbstractFileSystem. Docstrings are
- copied from the un-underscored method in AbstractFileSystem, if not given.
- """
- # note that methods do not have docstring here; they will be copied
- # for _* methods and inferred for overridden methods.
- async_impl = True
- mirror_sync_methods = True
- disable_throttling = False
- def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
- self.asynchronous = asynchronous
- self._pid = os.getpid()
- if not asynchronous:
- self._loop = loop or get_loop()
- else:
- self._loop = None
- self.batch_size = batch_size
- super().__init__(*args, **kwargs)
- @property
- def loop(self):
- if self._pid != os.getpid():
- raise RuntimeError("This class is not fork-safe")
- return self._loop
- async def _rm_file(self, path, **kwargs):
- if (
- inspect.iscoroutinefunction(self._rm)
- and type(self)._rm is not AsyncFileSystem._rm
- ):
- return await self._rm(path, recursive=False, batch_size=1, **kwargs)
- raise NotImplementedError
- async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
- # TODO: implement on_error
- batch_size = batch_size or self.batch_size
- path = await self._expand_path(path, recursive=recursive)
- return await _run_coros_in_chunks(
- [self._rm_file(p, **kwargs) for p in reversed(path)],
- batch_size=batch_size,
- nofiles=True,
- )
- async def _cp_file(self, path1, path2, **kwargs):
- raise NotImplementedError
- async def _mv_file(self, path1, path2):
- await self._cp_file(path1, path2)
- await self._rm_file(path1)
- async def _copy(
- self,
- path1,
- path2,
- recursive=False,
- on_error=None,
- maxdepth=None,
- batch_size=None,
- **kwargs,
- ):
- if on_error is None and recursive:
- on_error = "ignore"
- elif on_error is None:
- on_error = "raise"
- if isinstance(path1, list) and isinstance(path2, list):
- # No need to expand paths when both source and destination
- # are provided as lists
- paths1 = path1
- paths2 = path2
- else:
- source_is_str = isinstance(path1, str)
- paths1 = await self._expand_path(
- path1, maxdepth=maxdepth, recursive=recursive
- )
- if source_is_str and (not recursive or maxdepth is not None):
- # Non-recursive glob does not copy directories
- paths1 = [
- p for p in paths1 if not (trailing_sep(p) or await self._isdir(p))
- ]
- if not paths1:
- return
- source_is_file = len(paths1) == 1
- dest_is_dir = isinstance(path2, str) and (
- trailing_sep(path2) or await self._isdir(path2)
- )
- exists = source_is_str and (
- (has_magic(path1) and source_is_file)
- or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
- )
- paths2 = other_paths(
- paths1,
- path2,
- exists=exists,
- flatten=not source_is_str,
- )
- batch_size = batch_size or self.batch_size
- coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)]
- result = await _run_coros_in_chunks(
- coros, batch_size=batch_size, return_exceptions=True, nofiles=True
- )
- for ex in filter(is_exception, result):
- if on_error == "ignore" and isinstance(ex, FileNotFoundError):
- continue
- raise ex
- async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
- raise NotImplementedError
- async def _pipe(self, path, value=None, batch_size=None, **kwargs):
- if isinstance(path, str):
- path = {path: value}
- batch_size = batch_size or self.batch_size
- return await _run_coros_in_chunks(
- [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
- batch_size=batch_size,
- nofiles=True,
- )
- async def _process_limits(self, url, start, end):
- """Helper for "Range"-based _cat_file"""
- size = None
- suff = False
- if start is not None and start < 0:
- # if start is negative and end None, end is the "suffix length"
- if end is None:
- end = -start
- start = ""
- suff = True
- else:
- size = size or (await self._info(url))["size"]
- start = size + start
- elif start is None:
- start = 0
- if not suff:
- if end is not None and end < 0:
- if start is not None:
- size = size or (await self._info(url))["size"]
- end = size + end
- elif end is None:
- end = ""
- if isinstance(end, numbers.Integral):
- end -= 1 # bytes range is inclusive
- return f"bytes={start}-{end}"
- async def _cat_file(self, path, start=None, end=None, **kwargs):
- raise NotImplementedError
- async def _cat(
- self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
- ):
- paths = await self._expand_path(path, recursive=recursive)
- coros = [self._cat_file(path, **kwargs) for path in paths]
- batch_size = batch_size or self.batch_size
- out = await _run_coros_in_chunks(
- coros, batch_size=batch_size, nofiles=True, return_exceptions=True
- )
- if on_error == "raise":
- ex = next(filter(is_exception, out), False)
- if ex:
- raise ex
- if (
- len(paths) > 1
- or isinstance(path, list)
- or paths[0] != self._strip_protocol(path)
- ):
- return {
- k: v
- for k, v in zip(paths, out)
- if on_error != "omit" or not is_exception(v)
- }
- else:
- return out[0]
- async def _cat_ranges(
- self,
- paths,
- starts,
- ends,
- max_gap=None,
- batch_size=None,
- on_error="return",
- **kwargs,
- ):
- """Get the contents of byte ranges from one or more files
- Parameters
- ----------
- paths: list
- A list of of filepaths on this filesystems
- starts, ends: int or list
- Bytes limits of the read. If using a single int, the same value will be
- used to read all the specified files.
- """
- # TODO: on_error
- if max_gap is not None:
- # use utils.merge_offset_ranges
- raise NotImplementedError
- if not isinstance(paths, list):
- raise TypeError
- if not isinstance(starts, Iterable):
- starts = [starts] * len(paths)
- if not isinstance(ends, Iterable):
- ends = [ends] * len(paths)
- if len(starts) != len(paths) or len(ends) != len(paths):
- raise ValueError
- coros = [
- self._cat_file(p, start=s, end=e, **kwargs)
- for p, s, e in zip(paths, starts, ends)
- ]
- batch_size = batch_size or self.batch_size
- return await _run_coros_in_chunks(
- coros, batch_size=batch_size, nofiles=True, return_exceptions=True
- )
- async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
- raise NotImplementedError
- async def _put(
- self,
- lpath,
- rpath,
- recursive=False,
- callback=DEFAULT_CALLBACK,
- batch_size=None,
- maxdepth=None,
- **kwargs,
- ):
- """Copy file(s) from local.
- Copies a specific file or tree of files (if recursive=True). If rpath
- ends with a "/", it will be assumed to be a directory, and target files
- will go within.
- The put_file method will be called concurrently on a batch of files. The
- batch_size option can configure the amount of futures that can be executed
- at the same time. If it is -1, then all the files will be uploaded concurrently.
- The default can be set for this instance by passing "batch_size" in the
- constructor, or for all instances by setting the "gather_batch_size" key
- in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
- """
- if isinstance(lpath, list) and isinstance(rpath, list):
- # No need to expand paths when both source and destination
- # are provided as lists
- rpaths = rpath
- lpaths = lpath
- else:
- source_is_str = isinstance(lpath, str)
- if source_is_str:
- lpath = make_path_posix(lpath)
- fs = LocalFileSystem()
- lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
- if source_is_str and (not recursive or maxdepth is not None):
- # Non-recursive glob does not copy directories
- lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
- if not lpaths:
- return
- source_is_file = len(lpaths) == 1
- dest_is_dir = isinstance(rpath, str) and (
- trailing_sep(rpath) or await self._isdir(rpath)
- )
- rpath = self._strip_protocol(rpath)
- exists = source_is_str and (
- (has_magic(lpath) and source_is_file)
- or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
- )
- rpaths = other_paths(
- lpaths,
- rpath,
- exists=exists,
- flatten=not source_is_str,
- )
- is_dir = {l: os.path.isdir(l) for l in lpaths}
- rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
- file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
- await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
- batch_size = batch_size or self.batch_size
- coros = []
- callback.set_size(len(file_pairs))
- for lfile, rfile in file_pairs:
- put_file = callback.branch_coro(self._put_file)
- coros.append(put_file(lfile, rfile, **kwargs))
- return await _run_coros_in_chunks(
- coros, batch_size=batch_size, callback=callback
- )
- async def _get_file(self, rpath, lpath, **kwargs):
- raise NotImplementedError
- async def _get(
- self,
- rpath,
- lpath,
- recursive=False,
- callback=DEFAULT_CALLBACK,
- maxdepth=None,
- **kwargs,
- ):
- """Copy file(s) to local.
- Copies a specific file or tree of files (if recursive=True). If lpath
- ends with a "/", it will be assumed to be a directory, and target files
- will go within. Can submit a list of paths, which may be glob-patterns
- and will be expanded.
- The get_file method will be called concurrently on a batch of files. The
- batch_size option can configure the amount of futures that can be executed
- at the same time. If it is -1, then all the files will be uploaded concurrently.
- The default can be set for this instance by passing "batch_size" in the
- constructor, or for all instances by setting the "gather_batch_size" key
- in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
- """
- if isinstance(lpath, list) and isinstance(rpath, list):
- # No need to expand paths when both source and destination
- # are provided as lists
- rpaths = rpath
- lpaths = lpath
- else:
- source_is_str = isinstance(rpath, str)
- # First check for rpath trailing slash as _strip_protocol removes it.
- source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
- rpath = self._strip_protocol(rpath)
- rpaths = await self._expand_path(
- rpath, recursive=recursive, maxdepth=maxdepth
- )
- if source_is_str and (not recursive or maxdepth is not None):
- # Non-recursive glob does not copy directories
- rpaths = [
- p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
- ]
- if not rpaths:
- return
- lpath = make_path_posix(lpath)
- source_is_file = len(rpaths) == 1
- dest_is_dir = isinstance(lpath, str) and (
- trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
- )
- exists = source_is_str and (
- (has_magic(rpath) and source_is_file)
- or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
- )
- lpaths = other_paths(
- rpaths,
- lpath,
- exists=exists,
- flatten=not source_is_str,
- )
- [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
- batch_size = kwargs.pop("batch_size", self.batch_size)
- coros = []
- callback.set_size(len(lpaths))
- for lpath, rpath in zip(lpaths, rpaths):
- get_file = callback.branch_coro(self._get_file)
- coros.append(get_file(rpath, lpath, **kwargs))
- return await _run_coros_in_chunks(
- coros, batch_size=batch_size, callback=callback
- )
- async def _isfile(self, path):
- try:
- return (await self._info(path))["type"] == "file"
- except: # noqa: E722
- return False
- async def _isdir(self, path):
- try:
- return (await self._info(path))["type"] == "directory"
- except OSError:
- return False
- async def _size(self, path):
- return (await self._info(path)).get("size", None)
- async def _sizes(self, paths, batch_size=None):
- batch_size = batch_size or self.batch_size
- return await _run_coros_in_chunks(
- [self._size(p) for p in paths], batch_size=batch_size
- )
- async def _exists(self, path, **kwargs):
- try:
- await self._info(path, **kwargs)
- return True
- except FileNotFoundError:
- return False
- async def _info(self, path, **kwargs):
- raise NotImplementedError
- async def _ls(self, path, detail=True, **kwargs):
- raise NotImplementedError
- async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
- if maxdepth is not None and maxdepth < 1:
- raise ValueError("maxdepth must be at least 1")
- path = self._strip_protocol(path)
- full_dirs = {}
- dirs = {}
- files = {}
- detail = kwargs.pop("detail", False)
- try:
- listing = await self._ls(path, detail=True, **kwargs)
- except (FileNotFoundError, OSError) as e:
- if on_error == "raise":
- raise
- elif callable(on_error):
- on_error(e)
- if detail:
- yield path, {}, {}
- else:
- yield path, [], []
- return
- for info in listing:
- # each info name must be at least [path]/part , but here
- # we check also for names like [path]/part/
- pathname = info["name"].rstrip("/")
- name = pathname.rsplit("/", 1)[-1]
- if info["type"] == "directory" and pathname != path:
- # do not include "self" path
- full_dirs[name] = pathname
- dirs[name] = info
- elif pathname == path:
- # file-like with same name as give path
- files[""] = info
- else:
- files[name] = info
- if detail:
- yield path, dirs, files
- else:
- yield path, list(dirs), list(files)
- if maxdepth is not None:
- maxdepth -= 1
- if maxdepth < 1:
- return
- for d in dirs:
- async for _ in self._walk(
- full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs
- ):
- yield _
- async def _glob(self, path, maxdepth=None, **kwargs):
- if maxdepth is not None and maxdepth < 1:
- raise ValueError("maxdepth must be at least 1")
- import re
- seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
- ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
- path = self._strip_protocol(path)
- append_slash_to_dirname = ends_with_sep or path.endswith(
- tuple(sep + "**" for sep in seps)
- )
- idx_star = path.find("*") if path.find("*") >= 0 else len(path)
- idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
- idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
- min_idx = min(idx_star, idx_qmark, idx_brace)
- detail = kwargs.pop("detail", False)
- withdirs = kwargs.pop("withdirs", True)
- if not has_magic(path):
- if await self._exists(path, **kwargs):
- if not detail:
- return [path]
- else:
- return {path: await self._info(path, **kwargs)}
- else:
- if not detail:
- return [] # glob of non-existent returns empty
- else:
- return {}
- elif "/" in path[:min_idx]:
- min_idx = path[:min_idx].rindex("/")
- root = path[: min_idx + 1]
- depth = path[min_idx + 1 :].count("/") + 1
- else:
- root = ""
- depth = path[min_idx + 1 :].count("/") + 1
- if "**" in path:
- if maxdepth is not None:
- idx_double_stars = path.find("**")
- depth_double_stars = path[idx_double_stars:].count("/") + 1
- depth = depth - depth_double_stars + maxdepth
- else:
- depth = None
- allpaths = await self._find(
- root, maxdepth=depth, withdirs=withdirs, detail=True, **kwargs
- )
- pattern = glob_translate(path + ("/" if ends_with_sep else ""))
- pattern = re.compile(pattern)
- out = {
- p: info
- for p, info in sorted(allpaths.items())
- if pattern.match(
- p + "/"
- if append_slash_to_dirname and info["type"] == "directory"
- else p
- )
- }
- if detail:
- return out
- else:
- return list(out)
- async def _du(self, path, total=True, maxdepth=None, **kwargs):
- sizes = {}
- # async for?
- for f in await self._find(path, maxdepth=maxdepth, **kwargs):
- info = await self._info(f)
- sizes[info["name"]] = info["size"]
- if total:
- return sum(sizes.values())
- else:
- return sizes
- async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
- path = self._strip_protocol(path)
- out = {}
- detail = kwargs.pop("detail", False)
- # Add the root directory if withdirs is requested
- # This is needed for posix glob compliance
- if withdirs and path != "" and await self._isdir(path):
- out[path] = await self._info(path)
- # async for?
- async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
- if withdirs:
- files.update(dirs)
- out.update({info["name"]: info for name, info in files.items()})
- if not out and (await self._isfile(path)):
- # walk works on directories, but find should also return [path]
- # when path happens to be a file
- out[path] = {}
- names = sorted(out)
- if not detail:
- return names
- else:
- return {name: out[name] for name in names}
- async def _expand_path(self, path, recursive=False, maxdepth=None):
- if maxdepth is not None and maxdepth < 1:
- raise ValueError("maxdepth must be at least 1")
- if isinstance(path, str):
- out = await self._expand_path([path], recursive, maxdepth)
- else:
- out = set()
- path = [self._strip_protocol(p) for p in path]
- for p in path: # can gather here
- if has_magic(p):
- bit = set(await self._glob(p, maxdepth=maxdepth))
- out |= bit
- if recursive:
- # glob call above expanded one depth so if maxdepth is defined
- # then decrement it in expand_path call below. If it is zero
- # after decrementing then avoid expand_path call.
- if maxdepth is not None and maxdepth <= 1:
- continue
- out |= set(
- await self._expand_path(
- list(bit),
- recursive=recursive,
- maxdepth=maxdepth - 1 if maxdepth is not None else None,
- )
- )
- continue
- elif recursive:
- rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
- out |= rec
- if p not in out and (recursive is False or (await self._exists(p))):
- # should only check once, for the root
- out.add(p)
- if not out:
- raise FileNotFoundError(path)
- return sorted(out)
- async def _mkdir(self, path, create_parents=True, **kwargs):
- pass # not necessary to implement, may not have directories
- async def _makedirs(self, path, exist_ok=False):
- pass # not necessary to implement, may not have directories
- async def open_async(self, path, mode="rb", **kwargs):
- if "b" not in mode or kwargs.get("compression"):
- raise ValueError
- raise NotImplementedError
- def mirror_sync_methods(obj):
- """Populate sync and async methods for obj
- For each method will create a sync version if the name refers to an async method
- (coroutine) and there is no override in the child class; will create an async
- method for the corresponding sync method if there is no implementation.
- Uses the methods specified in
- - async_methods: the set that an implementation is expected to provide
- - default_async_methods: that can be derived from their sync version in
- AbstractFileSystem
- - AsyncFileSystem: async-specific default coroutines
- """
- from fsspec import AbstractFileSystem
- for method in async_methods + dir(AsyncFileSystem):
- if not method.startswith("_"):
- continue
- smethod = method[1:]
- if private.match(method):
- isco = inspect.iscoroutinefunction(getattr(obj, method, None))
- unsync = getattr(getattr(obj, smethod, False), "__func__", None)
- is_default = unsync is getattr(AbstractFileSystem, smethod, "")
- if isco and is_default:
- mth = sync_wrapper(getattr(obj, method), obj=obj)
- setattr(obj, smethod, mth)
- if not mth.__doc__:
- mth.__doc__ = getattr(
- getattr(AbstractFileSystem, smethod, None), "__doc__", ""
- )
- class FSSpecCoroutineCancel(Exception):
- pass
- def _dump_running_tasks(
- printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
- ):
- import traceback
- tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
- if printout:
- [task.print_stack() for task in tasks]
- out = [
- {
- "locals": task._coro.cr_frame.f_locals,
- "file": task._coro.cr_frame.f_code.co_filename,
- "firstline": task._coro.cr_frame.f_code.co_firstlineno,
- "linelo": task._coro.cr_frame.f_lineno,
- "stack": traceback.format_stack(task._coro.cr_frame),
- "task": task if with_task else None,
- }
- for task in tasks
- ]
- if cancel:
- for t in tasks:
- cbs = t._callbacks
- t.cancel()
- asyncio.futures.Future.set_exception(t, exc)
- asyncio.futures.Future.cancel(t)
- [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures
- try:
- t._coro.throw(exc) # exits coro, unless explicitly handled
- except exc:
- pass
- return out
- class AbstractAsyncStreamedFile(AbstractBufferedFile):
- # no read buffering, and always auto-commit
- # TODO: readahead might still be useful here, but needs async version
- async def read(self, length=-1):
- """
- Return data from cache, or fetch pieces as necessary
- Parameters
- ----------
- length: int (-1)
- Number of bytes to read; if <0, all remaining bytes.
- """
- length = -1 if length is None else int(length)
- if self.mode != "rb":
- raise ValueError("File not in read mode")
- if length < 0:
- length = self.size - self.loc
- if self.closed:
- raise ValueError("I/O operation on closed file.")
- if length == 0:
- # don't even bother calling fetch
- return b""
- out = await self._fetch_range(self.loc, self.loc + length)
- self.loc += len(out)
- return out
- async def write(self, data):
- """
- Write data to buffer.
- Buffer only sent on flush() or if buffer is greater than
- or equal to blocksize.
- Parameters
- ----------
- data: bytes
- Set of bytes to be written.
- """
- if self.mode not in {"wb", "ab"}:
- raise ValueError("File not in write mode")
- if self.closed:
- raise ValueError("I/O operation on closed file.")
- if self.forced:
- raise ValueError("This file has been force-flushed, can only close")
- out = self.buffer.write(data)
- self.loc += out
- if self.buffer.tell() >= self.blocksize:
- await self.flush()
- return out
- async def close(self):
- """Close file
- Finalizes writes, discards cache
- """
- if getattr(self, "_unclosable", False):
- return
- if self.closed:
- return
- if self.mode == "rb":
- self.cache = None
- else:
- if not self.forced:
- await self.flush(force=True)
- if self.fs is not None:
- self.fs.invalidate_cache(self.path)
- self.fs.invalidate_cache(self.fs._parent(self.path))
- self.closed = True
- async def flush(self, force=False):
- if self.closed:
- raise ValueError("Flush on closed file")
- if force and self.forced:
- raise ValueError("Force flush cannot be called more than once")
- if force:
- self.forced = True
- if self.mode not in {"wb", "ab"}:
- # no-op to flush on read-mode
- return
- if not force and self.buffer.tell() < self.blocksize:
- # Defer write on small block
- return
- if self.offset is None:
- # Initialize a multipart upload
- self.offset = 0
- try:
- await self._initiate_upload()
- except:
- self.closed = True
- raise
- if await self._upload_chunk(final=force) is not False:
- self.offset += self.buffer.seek(0, 2)
- self.buffer = io.BytesIO()
- async def __aenter__(self):
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self.close()
- async def _fetch_range(self, start, end):
- raise NotImplementedError
- async def _initiate_upload(self):
- pass
- async def _upload_chunk(self, final=False):
- raise NotImplementedError
|