| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- try:
- # Python 2.7: use the C pickle to speed up
- # test_concurrency_safe_write which pickles big python objects
- import cPickle as cpickle
- except ImportError:
- import pickle as cpickle
- import functools
- import time
- from pickle import PicklingError
- import pytest
- from joblib import Parallel, delayed
- from joblib._store_backends import (
- CacheWarning,
- FileSystemStoreBackend,
- concurrency_safe_write,
- )
- from joblib.backports import concurrency_safe_rename
- from joblib.test.common import with_multiprocessing
- from joblib.testing import parametrize, timeout
- def write_func(output, filename):
- with open(filename, "wb") as f:
- cpickle.dump(output, f)
- def load_func(expected, filename):
- for i in range(10):
- try:
- with open(filename, "rb") as f:
- reloaded = cpickle.load(f)
- break
- except (OSError, IOError):
- # On Windows you can have WindowsError ([Error 5] Access
- # is denied or [Error 13] Permission denied) when reading the file,
- # probably because a writer process has a lock on the file
- time.sleep(0.1)
- else:
- raise
- assert expected == reloaded
- def concurrency_safe_write_rename(to_write, filename, write_func):
- temporary_filename = concurrency_safe_write(to_write, filename, write_func)
- concurrency_safe_rename(temporary_filename, filename)
- @timeout(0) # No timeout as this test can be long
- @with_multiprocessing
- @parametrize("backend", ["multiprocessing", "loky", "threading"])
- def test_concurrency_safe_write(tmpdir, backend):
- # Add one item to cache
- filename = tmpdir.join("test.pkl").strpath
- obj = {str(i): i for i in range(int(1e5))}
- funcs = [
- functools.partial(concurrency_safe_write_rename, write_func=write_func)
- if i % 3 != 2
- else load_func
- for i in range(12)
- ]
- Parallel(n_jobs=2, backend=backend)(delayed(func)(obj, filename) for func in funcs)
- def test_warning_on_dump_failure(tmpdir):
- # Check that a warning is raised when the dump fails for any reason but
- # a PicklingError.
- class UnpicklableObject(object):
- def __reduce__(self):
- raise RuntimeError("some exception")
- backend = FileSystemStoreBackend()
- backend.location = tmpdir.join("test_warning_on_pickling_error").strpath
- backend.compress = None
- with pytest.warns(CacheWarning, match="some exception"):
- backend.dump_item("testpath", UnpicklableObject())
- def test_warning_on_pickling_error(tmpdir):
- # This is separate from test_warning_on_dump_failure because in the
- # future we will turn this into an exception.
- class UnpicklableObject(object):
- def __reduce__(self):
- raise PicklingError("not picklable")
- backend = FileSystemStoreBackend()
- backend.location = tmpdir.join("test_warning_on_pickling_error").strpath
- backend.compress = None
- with pytest.warns(FutureWarning, match="not picklable"):
- backend.dump_item("testpath", UnpicklableObject())
|