| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- """Storage providers backends for Memory caching."""
- import collections
- import datetime
- import json
- import operator
- import os
- import os.path
- import re
- import shutil
- import threading
- import time
- import uuid
- import warnings
- from abc import ABCMeta, abstractmethod
- from pickle import PicklingError
- from . import numpy_pickle
- from .backports import concurrency_safe_rename
- from .disk import memstr_to_bytes, mkdirp, rm_subdirs
- from .logger import format_time
- CacheItemInfo = collections.namedtuple("CacheItemInfo", "path size last_access")
- class CacheWarning(Warning):
- """Warning to capture dump failures except for PicklingError."""
- pass
- def concurrency_safe_write(object_to_write, filename, write_func):
- """Writes an object into a unique file in a concurrency-safe way."""
- # Temporary name is composed of UUID, process_id and thread_id to avoid
- # collisions due to concurrent write.
- # UUID is unique across nodes and time and help avoid collisions, even if
- # the cache folder is shared by several Python processes with the same pid and
- # thread id on different nodes of a cluster for instance.
- thread_id = id(threading.current_thread())
- temporary_filename = f"{filename}.{uuid.uuid4().hex}-{os.getpid()}-{thread_id}"
- write_func(object_to_write, temporary_filename)
- return temporary_filename
- class StoreBackendBase(metaclass=ABCMeta):
- """Helper Abstract Base Class which defines all methods that
- a StorageBackend must implement."""
- location = None
- @abstractmethod
- def _open_item(self, f, mode):
- """Opens an item on the store and return a file-like object.
- This method is private and only used by the StoreBackendMixin object.
- Parameters
- ----------
- f: a file-like object
- The file-like object where an item is stored and retrieved
- mode: string, optional
- the mode in which the file-like object is opened allowed valued are
- 'rb', 'wb'
- Returns
- -------
- a file-like object
- """
- @abstractmethod
- def _item_exists(self, location):
- """Checks if an item location exists in the store.
- This method is private and only used by the StoreBackendMixin object.
- Parameters
- ----------
- location: string
- The location of an item. On a filesystem, this corresponds to the
- absolute path, including the filename, of a file.
- Returns
- -------
- True if the item exists, False otherwise
- """
- @abstractmethod
- def _move_item(self, src, dst):
- """Moves an item from src to dst in the store.
- This method is private and only used by the StoreBackendMixin object.
- Parameters
- ----------
- src: string
- The source location of an item
- dst: string
- The destination location of an item
- """
- @abstractmethod
- def create_location(self, location):
- """Creates a location on the store.
- Parameters
- ----------
- location: string
- The location in the store. On a filesystem, this corresponds to a
- directory.
- """
- @abstractmethod
- def clear_location(self, location):
- """Clears a location on the store.
- Parameters
- ----------
- location: string
- The location in the store. On a filesystem, this corresponds to a
- directory or a filename absolute path
- """
- @abstractmethod
- def get_items(self):
- """Returns the whole list of items available in the store.
- Returns
- -------
- The list of items identified by their ids (e.g filename in a
- filesystem).
- """
- @abstractmethod
- def configure(self, location, verbose=0, backend_options=dict()):
- """Configures the store.
- Parameters
- ----------
- location: string
- The base location used by the store. On a filesystem, this
- corresponds to a directory.
- verbose: int
- The level of verbosity of the store
- backend_options: dict
- Contains a dictionary of named parameters used to configure the
- store backend.
- """
- class StoreBackendMixin(object):
- """Class providing all logic for managing the store in a generic way.
- The StoreBackend subclass has to implement 3 methods: create_location,
- clear_location and configure. The StoreBackend also has to provide
- a private _open_item, _item_exists and _move_item methods. The _open_item
- method has to have the same signature as the builtin open and return a
- file-like object.
- """
- def load_item(self, call_id, verbose=1, timestamp=None, metadata=None):
- """Load an item from the store given its id as a list of str."""
- full_path = os.path.join(self.location, *call_id)
- if verbose > 1:
- ts_string = (
- "{: <16}".format(format_time(time.time() - timestamp))
- if timestamp is not None
- else ""
- )
- signature = os.path.basename(call_id[0])
- if metadata is not None and "input_args" in metadata:
- kwargs = ", ".join(
- "{}={}".format(*item) for item in metadata["input_args"].items()
- )
- signature += "({})".format(kwargs)
- msg = "[Memory]{}: Loading {}".format(ts_string, signature)
- if verbose < 10:
- print("{0}...".format(msg))
- else:
- print("{0} from {1}".format(msg, full_path))
- mmap_mode = None if not hasattr(self, "mmap_mode") else self.mmap_mode
- filename = os.path.join(full_path, "output.pkl")
- if not self._item_exists(filename):
- raise KeyError(
- "Non-existing item (may have been "
- "cleared).\nFile %s does not exist" % filename
- )
- # file-like object cannot be used when mmap_mode is set
- if mmap_mode is None:
- with self._open_item(filename, "rb") as f:
- item = numpy_pickle.load(f)
- else:
- item = numpy_pickle.load(filename, mmap_mode=mmap_mode)
- return item
- def dump_item(self, call_id, item, verbose=1):
- """Dump an item in the store at the id given as a list of str."""
- try:
- item_path = os.path.join(self.location, *call_id)
- if not self._item_exists(item_path):
- self.create_location(item_path)
- filename = os.path.join(item_path, "output.pkl")
- if verbose > 10:
- print("Persisting in %s" % item_path)
- def write_func(to_write, dest_filename):
- with self._open_item(dest_filename, "wb") as f:
- try:
- numpy_pickle.dump(to_write, f, compress=self.compress)
- except PicklingError as e:
- # TODO(1.5) turn into error
- warnings.warn(
- "Unable to cache to disk: failed to pickle "
- "output. In version 1.5 this will raise an "
- f"exception. Exception: {e}.",
- FutureWarning,
- )
- self._concurrency_safe_write(item, filename, write_func)
- except Exception as e: # noqa: E722
- warnings.warn(
- "Unable to cache to disk. Possibly a race condition in the "
- f"creation of the directory. Exception: {e}.",
- CacheWarning,
- )
- def clear_item(self, call_id):
- """Clear the item at the id, given as a list of str."""
- item_path = os.path.join(self.location, *call_id)
- if self._item_exists(item_path):
- self.clear_location(item_path)
- def contains_item(self, call_id):
- """Check if there is an item at the id, given as a list of str."""
- item_path = os.path.join(self.location, *call_id)
- filename = os.path.join(item_path, "output.pkl")
- return self._item_exists(filename)
- def get_item_info(self, call_id):
- """Return information about item."""
- return {"location": os.path.join(self.location, *call_id)}
- def get_metadata(self, call_id):
- """Return actual metadata of an item."""
- try:
- item_path = os.path.join(self.location, *call_id)
- filename = os.path.join(item_path, "metadata.json")
- with self._open_item(filename, "rb") as f:
- return json.loads(f.read().decode("utf-8"))
- except: # noqa: E722
- return {}
- def store_metadata(self, call_id, metadata):
- """Store metadata of a computation."""
- try:
- item_path = os.path.join(self.location, *call_id)
- self.create_location(item_path)
- filename = os.path.join(item_path, "metadata.json")
- def write_func(to_write, dest_filename):
- with self._open_item(dest_filename, "wb") as f:
- f.write(json.dumps(to_write).encode("utf-8"))
- self._concurrency_safe_write(metadata, filename, write_func)
- except: # noqa: E722
- pass
- def contains_path(self, call_id):
- """Check cached function is available in store."""
- func_path = os.path.join(self.location, *call_id)
- return self.object_exists(func_path)
- def clear_path(self, call_id):
- """Clear all items with a common path in the store."""
- func_path = os.path.join(self.location, *call_id)
- if self._item_exists(func_path):
- self.clear_location(func_path)
- def store_cached_func_code(self, call_id, func_code=None):
- """Store the code of the cached function."""
- func_path = os.path.join(self.location, *call_id)
- if not self._item_exists(func_path):
- self.create_location(func_path)
- if func_code is not None:
- filename = os.path.join(func_path, "func_code.py")
- with self._open_item(filename, "wb") as f:
- f.write(func_code.encode("utf-8"))
- def get_cached_func_code(self, call_id):
- """Store the code of the cached function."""
- filename = os.path.join(self.location, *call_id, "func_code.py")
- try:
- with self._open_item(filename, "rb") as f:
- return f.read().decode("utf-8")
- except: # noqa: E722
- raise
- def get_cached_func_info(self, call_id):
- """Return information related to the cached function if it exists."""
- return {"location": os.path.join(self.location, *call_id)}
- def clear(self):
- """Clear the whole store content."""
- self.clear_location(self.location)
- def enforce_store_limits(self, bytes_limit, items_limit=None, age_limit=None):
- """
- Remove the store's oldest files to enforce item, byte, and age limits.
- """
- items_to_delete = self._get_items_to_delete(bytes_limit, items_limit, age_limit)
- for item in items_to_delete:
- if self.verbose > 10:
- print("Deleting item {0}".format(item))
- try:
- self.clear_location(item.path)
- except OSError:
- # Even with ignore_errors=True shutil.rmtree can raise OSError
- # with:
- # [Errno 116] Stale file handle if another process has deleted
- # the folder already.
- pass
- def _get_items_to_delete(self, bytes_limit, items_limit=None, age_limit=None):
- """
- Get items to delete to keep the store under size, file, & age limits.
- """
- if isinstance(bytes_limit, str):
- bytes_limit = memstr_to_bytes(bytes_limit)
- items = self.get_items()
- if not items:
- return []
- size = sum(item.size for item in items)
- if bytes_limit is not None:
- to_delete_size = size - bytes_limit
- else:
- to_delete_size = 0
- if items_limit is not None:
- to_delete_items = len(items) - items_limit
- else:
- to_delete_items = 0
- if age_limit is not None:
- older_item = min(item.last_access for item in items)
- if age_limit.total_seconds() < 0:
- raise ValueError("age_limit has to be a positive timedelta")
- deadline = datetime.datetime.now() - age_limit
- else:
- deadline = None
- if (
- to_delete_size <= 0
- and to_delete_items <= 0
- and (deadline is None or older_item > deadline)
- ):
- return []
- # We want to delete first the cache items that were accessed a
- # long time ago
- items.sort(key=operator.attrgetter("last_access"))
- items_to_delete = []
- size_so_far = 0
- items_so_far = 0
- for item in items:
- if (
- (size_so_far >= to_delete_size)
- and items_so_far >= to_delete_items
- and (deadline is None or deadline < item.last_access)
- ):
- break
- items_to_delete.append(item)
- size_so_far += item.size
- items_so_far += 1
- return items_to_delete
- def _concurrency_safe_write(self, to_write, filename, write_func):
- """Writes an object into a file in a concurrency-safe way."""
- temporary_filename = concurrency_safe_write(to_write, filename, write_func)
- self._move_item(temporary_filename, filename)
- def __repr__(self):
- """Printable representation of the store location."""
- return '{class_name}(location="{location}")'.format(
- class_name=self.__class__.__name__, location=self.location
- )
- class FileSystemStoreBackend(StoreBackendBase, StoreBackendMixin):
- """A StoreBackend used with local or network file systems."""
- _open_item = staticmethod(open)
- _item_exists = staticmethod(os.path.exists)
- _move_item = staticmethod(concurrency_safe_rename)
- def clear_location(self, location):
- """Delete location on store."""
- if location == self.location:
- rm_subdirs(location)
- else:
- shutil.rmtree(location, ignore_errors=True)
- def create_location(self, location):
- """Create object location on store"""
- mkdirp(location)
- def get_items(self):
- """Returns the whole list of items available in the store."""
- items = []
- for dirpath, _, filenames in os.walk(self.location):
- is_cache_hash_dir = re.match("[a-f0-9]{32}", os.path.basename(dirpath))
- if is_cache_hash_dir:
- output_filename = os.path.join(dirpath, "output.pkl")
- try:
- last_access = os.path.getatime(output_filename)
- except OSError:
- try:
- last_access = os.path.getatime(dirpath)
- except OSError:
- # The directory has already been deleted
- continue
- last_access = datetime.datetime.fromtimestamp(last_access)
- try:
- full_filenames = [os.path.join(dirpath, fn) for fn in filenames]
- dirsize = sum(os.path.getsize(fn) for fn in full_filenames)
- except OSError:
- # Either output_filename or one of the files in
- # dirpath does not exist any more. We assume this
- # directory is being cleaned by another process already
- continue
- items.append(CacheItemInfo(dirpath, dirsize, last_access))
- return items
- def configure(self, location, verbose=1, backend_options=None):
- """Configure the store backend.
- For this backend, valid store options are 'compress' and 'mmap_mode'
- """
- if backend_options is None:
- backend_options = {}
- # setup location directory
- self.location = location
- if not os.path.exists(self.location):
- mkdirp(self.location)
- # Automatically add `.gitignore` file to the cache folder.
- # XXX: the condition is necessary because in `Memory.__init__`, the user
- # passed `location` param is modified to be either `{location}` or
- # `{location}/joblib` depending on input type (`pathlib.Path` vs `str`).
- # The proper resolution of this inconsistency is tracked in:
- # https://github.com/joblib/joblib/issues/1684
- cache_directory = (
- os.path.dirname(location)
- if os.path.dirname(location) and os.path.basename(location) == "joblib"
- else location
- )
- gitignore = os.path.join(cache_directory, ".gitignore")
- if not os.path.exists(gitignore):
- try:
- with open(gitignore, "w") as file:
- file.write("# Created by joblib automatically.\n")
- file.write("*\n")
- except OSError as e:
- warnings.warn(f"Unable to write {gitignore}. Exception: {e}.")
- # item can be stored compressed for faster I/O
- self.compress = backend_options.get("compress", False)
- # FileSystemStoreBackend can be used with mmap_mode options under
- # certain conditions.
- mmap_mode = backend_options.get("mmap_mode")
- if self.compress and mmap_mode is not None:
- warnings.warn(
- "Compressed items cannot be memmapped in a "
- "filesystem store. Option will be ignored.",
- stacklevel=2,
- )
- self.mmap_mode = mmap_mode
- self.verbose = verbose
|