test_store_backends.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. try:
  2. # Python 2.7: use the C pickle to speed up
  3. # test_concurrency_safe_write which pickles big python objects
  4. import cPickle as cpickle
  5. except ImportError:
  6. import pickle as cpickle
  7. import functools
  8. import time
  9. from pickle import PicklingError
  10. import pytest
  11. from joblib import Parallel, delayed
  12. from joblib._store_backends import (
  13. CacheWarning,
  14. FileSystemStoreBackend,
  15. concurrency_safe_write,
  16. )
  17. from joblib.backports import concurrency_safe_rename
  18. from joblib.test.common import with_multiprocessing
  19. from joblib.testing import parametrize, timeout
  20. def write_func(output, filename):
  21. with open(filename, "wb") as f:
  22. cpickle.dump(output, f)
  23. def load_func(expected, filename):
  24. for i in range(10):
  25. try:
  26. with open(filename, "rb") as f:
  27. reloaded = cpickle.load(f)
  28. break
  29. except (OSError, IOError):
  30. # On Windows you can have WindowsError ([Error 5] Access
  31. # is denied or [Error 13] Permission denied) when reading the file,
  32. # probably because a writer process has a lock on the file
  33. time.sleep(0.1)
  34. else:
  35. raise
  36. assert expected == reloaded
  37. def concurrency_safe_write_rename(to_write, filename, write_func):
  38. temporary_filename = concurrency_safe_write(to_write, filename, write_func)
  39. concurrency_safe_rename(temporary_filename, filename)
  40. @timeout(0) # No timeout as this test can be long
  41. @with_multiprocessing
  42. @parametrize("backend", ["multiprocessing", "loky", "threading"])
  43. def test_concurrency_safe_write(tmpdir, backend):
  44. # Add one item to cache
  45. filename = tmpdir.join("test.pkl").strpath
  46. obj = {str(i): i for i in range(int(1e5))}
  47. funcs = [
  48. functools.partial(concurrency_safe_write_rename, write_func=write_func)
  49. if i % 3 != 2
  50. else load_func
  51. for i in range(12)
  52. ]
  53. Parallel(n_jobs=2, backend=backend)(delayed(func)(obj, filename) for func in funcs)
  54. def test_warning_on_dump_failure(tmpdir):
  55. # Check that a warning is raised when the dump fails for any reason but
  56. # a PicklingError.
  57. class UnpicklableObject(object):
  58. def __reduce__(self):
  59. raise RuntimeError("some exception")
  60. backend = FileSystemStoreBackend()
  61. backend.location = tmpdir.join("test_warning_on_pickling_error").strpath
  62. backend.compress = None
  63. with pytest.warns(CacheWarning, match="some exception"):
  64. backend.dump_item("testpath", UnpicklableObject())
  65. def test_warning_on_pickling_error(tmpdir):
  66. # This is separate from test_warning_on_dump_failure because in the
  67. # future we will turn this into an exception.
  68. class UnpicklableObject(object):
  69. def __reduce__(self):
  70. raise PicklingError("not picklable")
  71. backend = FileSystemStoreBackend()
  72. backend.location = tmpdir.join("test_warning_on_pickling_error").strpath
  73. backend.compress = None
  74. with pytest.warns(FutureWarning, match="not picklable"):
  75. backend.dump_item("testpath", UnpicklableObject())