| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- """
- Vendoring of pickleshare, reduced to used functionalities.
- ---
- PickleShare - a small 'shelve' like datastore with concurrency support
- Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
- shelve, many processes can access the database simultaneously. Changing a
- value in database is immediately visible to other processes accessing the
- same database.
- Concurrency is possible because the values are stored in separate files. Hence
- the "database" is a directory where *all* files are governed by PickleShare.
- Example usage::
- from pickleshare import *
- db = PickleShareDB('~/testpickleshare')
- db.clear()
- print "Should be empty:",db.items()
- db['hello'] = 15
- db['aku ankka'] = [1,2,313]
- db['paths/are/ok/key'] = [1,(5,46)]
- print db.keys()
- del db['aku ankka']
- This module is certainly not ZODB, but can be used for low-load
- (non-mission-critical) situations where tiny code size trumps the
- advanced features of a "real" object database.
- Installation guide: pip install pickleshare
- Author: Ville Vainio <vivainio@gmail.com>
- License: MIT open source license.
- """
- __version__ = "0.7.5"
- from pathlib import Path
- import os, stat, time
- try:
- import collections.abc as collections_abc
- except ImportError:
- import collections as collections_abc
- try:
- import cPickle as pickle
- except ImportError:
- import pickle
- import errno
- import sys
- def gethashfile(key):
- return ("%02x" % abs(hash(key) % 256))[-2:]
- _sentinel = object()
- class PickleShareDB(collections_abc.MutableMapping):
- """The main 'connection' object for PickleShare database"""
- def __init__(self, root):
- """Return a db object that will manage the specied directory"""
- if not isinstance(root, str):
- root = str(root)
- root = os.path.abspath(os.path.expanduser(root))
- self.root = Path(root)
- if not self.root.is_dir():
- # catching the exception is necessary if multiple processes are concurrently trying to create a folder
- # exists_ok keyword argument of mkdir does the same but only from Python 3.5
- try:
- self.root.mkdir(parents=True)
- except OSError as e:
- if e.errno != errno.EEXIST:
- raise
- # cache has { 'key' : (obj, orig_mod_time) }
- self.cache = {}
- def __getitem__(self, key):
- """db['key'] reading"""
- fil = self.root / key
- try:
- mtime = fil.stat()[stat.ST_MTIME]
- except OSError:
- raise KeyError(key)
- if fil in self.cache and mtime == self.cache[fil][1]:
- return self.cache[fil][0]
- try:
- # The cached item has expired, need to read
- with fil.open("rb") as f:
- obj = pickle.loads(f.read())
- except:
- raise KeyError(key)
- self.cache[fil] = (obj, mtime)
- return obj
- def __setitem__(self, key, value):
- """db['key'] = 5"""
- fil = self.root / key
- parent = fil.parent
- if parent and not parent.is_dir():
- parent.mkdir(parents=True)
- # We specify protocol 2, so that we can mostly go between Python 2
- # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
- with fil.open("wb") as f:
- pickle.dump(value, f, protocol=2)
- try:
- self.cache[fil] = (value, fil.stat().st_mtime)
- except OSError as e:
- if e.errno != errno.ENOENT:
- raise
- def hset(self, hashroot, key, value):
- """hashed set"""
- hroot = self.root / hashroot
- if not hroot.is_dir():
- hroot.mkdir()
- hfile = hroot / gethashfile(key)
- d = self.get(hfile, {})
- d.update({key: value})
- self[hfile] = d
- def hget(self, hashroot, key, default=_sentinel, fast_only=True):
- """hashed get"""
- hroot = self.root / hashroot
- hfile = hroot / gethashfile(key)
- d = self.get(hfile, _sentinel)
- # print "got dict",d,"from",hfile
- if d is _sentinel:
- if fast_only:
- if default is _sentinel:
- raise KeyError(key)
- return default
- # slow mode ok, works even after hcompress()
- d = self.hdict(hashroot)
- return d.get(key, default)
- def hdict(self, hashroot):
- """Get all data contained in hashed category 'hashroot' as dict"""
- hfiles = self.keys(hashroot + "/*")
- hfiles.sort()
- last = len(hfiles) and hfiles[-1] or ""
- if last.endswith("xx"):
- # print "using xx"
- hfiles = [last] + hfiles[:-1]
- all = {}
- for f in hfiles:
- # print "using",f
- try:
- all.update(self[f])
- except KeyError:
- print("Corrupt", f, "deleted - hset is not threadsafe!")
- del self[f]
- self.uncache(f)
- return all
- def hcompress(self, hashroot):
- """Compress category 'hashroot', so hset is fast again
- hget will fail if fast_only is True for compressed items (that were
- hset before hcompress).
- """
- hfiles = self.keys(hashroot + "/*")
- all = {}
- for f in hfiles:
- # print "using",f
- all.update(self[f])
- self.uncache(f)
- self[hashroot + "/xx"] = all
- for f in hfiles:
- p = self.root / f
- if p.name == "xx":
- continue
- p.unlink()
- def __delitem__(self, key):
- """del db["key"]"""
- fil = self.root / key
- self.cache.pop(fil, None)
- try:
- fil.unlink()
- except OSError:
- # notfound and permission denied are ok - we
- # lost, the other process wins the conflict
- pass
- def _normalized(self, p):
- """Make a key suitable for user's eyes"""
- return str(p.relative_to(self.root)).replace("\\", "/")
- def keys(self, globpat=None):
- """All keys in DB, or all keys matching a glob"""
- if globpat is None:
- files = self.root.rglob("*")
- else:
- files = self.root.glob(globpat)
- return [self._normalized(p) for p in files if p.is_file()]
- def __iter__(self):
- return iter(self.keys())
- def __len__(self):
- return len(self.keys())
- def uncache(self, *items):
- """Removes all, or specified items from cache
- Use this after reading a large amount of large objects
- to free up memory, when you won't be needing the objects
- for a while.
- """
- if not items:
- self.cache = {}
- for it in items:
- self.cache.pop(it, None)
- def waitget(self, key, maxwaittime=60):
- """Wait (poll) for a key to get a value
- Will wait for `maxwaittime` seconds before raising a KeyError.
- The call exits normally if the `key` field in db gets a value
- within the timeout period.
- Use this for synchronizing different processes or for ensuring
- that an unfortunately timed "db['key'] = newvalue" operation
- in another process (which causes all 'get' operation to cause a
- KeyError for the duration of pickling) won't screw up your program
- logic.
- """
- wtimes = [0.2] * 3 + [0.5] * 2 + [1]
- tries = 0
- waited = 0
- while 1:
- try:
- val = self[key]
- return val
- except KeyError:
- pass
- if waited > maxwaittime:
- raise KeyError(key)
- time.sleep(wtimes[tries])
- waited += wtimes[tries]
- if tries < len(wtimes) - 1:
- tries += 1
- def getlink(self, folder):
- """Get a convenient link for accessing items"""
- return PickleShareLink(self, folder)
- def __repr__(self):
- return "PickleShareDB('%s')" % self.root
- class PickleShareLink:
- """A shortdand for accessing nested PickleShare data conveniently.
- Created through PickleShareDB.getlink(), example::
- lnk = db.getlink('myobjects/test')
- lnk.foo = 2
- lnk.bar = lnk.foo + 5
- """
- def __init__(self, db, keydir):
- self.__dict__.update(locals())
- def __getattr__(self, key):
- return self.__dict__["db"][self.__dict__["keydir"] + "/" + key]
- def __setattr__(self, key, val):
- self.db[self.keydir + "/" + key] = val
- def __repr__(self):
- db = self.__dict__["db"]
- keys = db.keys(self.__dict__["keydir"] + "/*")
- return "<PickleShareLink '%s': %s>" % (
- self.__dict__["keydir"],
- ";".join([Path(k).basename() for k in keys]),
- )
|