_memmapping_reducer.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. """
  2. Reducer using memory mapping for numpy arrays
  3. """
  4. # Author: Thomas Moreau <thomas.moreau.2010@gmail.com>
  5. # Copyright: 2017, Thomas Moreau
  6. # License: BSD 3 clause
  7. import atexit
  8. import errno
  9. import os
  10. import stat
  11. import tempfile
  12. import threading
  13. import time
  14. import warnings
  15. import weakref
  16. from mmap import mmap
  17. from multiprocessing import util
  18. from pickle import HIGHEST_PROTOCOL, PicklingError, dumps, loads, whichmodule
  19. from uuid import uuid4
  20. try:
  21. WindowsError
  22. except NameError:
  23. WindowsError = type(None)
  24. try:
  25. import numpy as np
  26. from numpy.lib.stride_tricks import as_strided
  27. except ImportError:
  28. np = None
  29. from .backports import make_memmap
  30. from .disk import delete_folder
  31. from .externals.loky.backend import resource_tracker
  32. from .numpy_pickle import dump, load, load_temporary_memmap
  33. # Some system have a ramdisk mounted by default, we can use it instead of /tmp
  34. # as the default folder to dump big arrays to share with subprocesses.
  35. SYSTEM_SHARED_MEM_FS = "/dev/shm"
  36. # Minimal number of bytes available on SYSTEM_SHARED_MEM_FS to consider using
  37. # it as the default folder to dump big arrays to share with subprocesses.
  38. SYSTEM_SHARED_MEM_FS_MIN_SIZE = int(2e9)
  39. # Folder and file permissions to chmod temporary files generated by the
  40. # memmapping pool. Only the owner of the Python process can access the
  41. # temporary files and folder.
  42. FOLDER_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
  43. FILE_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR
  44. # Set used in joblib workers, referencing the filenames of temporary memmaps
  45. # created by joblib to speed up data communication. In child processes, we add
  46. # a finalizer to these memmaps that sends a maybe_unlink call to the
  47. # resource_tracker, in order to free main memory as fast as possible.
  48. JOBLIB_MMAPS = set()
  49. def _log_and_unlink(filename):
  50. from .externals.loky.backend.resource_tracker import _resource_tracker
  51. util.debug(
  52. "[FINALIZER CALL] object mapping to {} about to be deleted,"
  53. " decrementing the refcount of the file (pid: {})".format(
  54. os.path.basename(filename), os.getpid()
  55. )
  56. )
  57. _resource_tracker.maybe_unlink(filename, "file")
  58. def add_maybe_unlink_finalizer(memmap):
  59. util.debug(
  60. "[FINALIZER ADD] adding finalizer to {} (id {}, filename {}, pid {})".format(
  61. type(memmap), id(memmap), os.path.basename(memmap.filename), os.getpid()
  62. )
  63. )
  64. weakref.finalize(memmap, _log_and_unlink, memmap.filename)
  65. def unlink_file(filename):
  66. """Wrapper around os.unlink with a retry mechanism.
  67. The retry mechanism has been implemented primarily to overcome a race
  68. condition happening during the finalizer of a np.memmap: when a process
  69. holding the last reference to a mmap-backed np.memmap/np.array is about to
  70. delete this array (and close the reference), it sends a maybe_unlink
  71. request to the resource_tracker. This request can be processed faster than
  72. it takes for the last reference of the memmap to be closed, yielding (on
  73. Windows) a PermissionError in the resource_tracker loop.
  74. """
  75. NUM_RETRIES = 10
  76. for retry_no in range(1, NUM_RETRIES + 1):
  77. try:
  78. os.unlink(filename)
  79. break
  80. except PermissionError:
  81. util.debug(
  82. "[ResourceTracker] tried to unlink {}, got PermissionError".format(
  83. filename
  84. )
  85. )
  86. if retry_no == NUM_RETRIES:
  87. raise
  88. else:
  89. time.sleep(0.2)
  90. except FileNotFoundError:
  91. # In case of a race condition when deleting the temporary folder,
  92. # avoid noisy FileNotFoundError exception in the resource tracker.
  93. pass
  94. resource_tracker._CLEANUP_FUNCS["file"] = unlink_file
  95. class _WeakArrayKeyMap:
  96. """A variant of weakref.WeakKeyDictionary for unhashable numpy arrays.
  97. This datastructure will be used with numpy arrays as obj keys, therefore we
  98. do not use the __get__ / __set__ methods to avoid any conflict with the
  99. numpy fancy indexing syntax.
  100. """
  101. def __init__(self):
  102. self._data = {}
  103. def get(self, obj):
  104. ref, val = self._data[id(obj)]
  105. if ref() is not obj:
  106. # In case of race condition with on_destroy: could never be
  107. # triggered by the joblib tests with CPython.
  108. raise KeyError(obj)
  109. return val
  110. def set(self, obj, value):
  111. key = id(obj)
  112. try:
  113. ref, _ = self._data[key]
  114. if ref() is not obj:
  115. # In case of race condition with on_destroy: could never be
  116. # triggered by the joblib tests with CPython.
  117. raise KeyError(obj)
  118. except KeyError:
  119. # Insert the new entry in the mapping along with a weakref
  120. # callback to automatically delete the entry from the mapping
  121. # as soon as the object used as key is garbage collected.
  122. def on_destroy(_):
  123. del self._data[key]
  124. ref = weakref.ref(obj, on_destroy)
  125. self._data[key] = ref, value
  126. def __getstate__(self):
  127. raise PicklingError("_WeakArrayKeyMap is not pickleable")
  128. ###############################################################################
  129. # Support for efficient transient pickling of numpy data structures
  130. def _get_backing_memmap(a):
  131. """Recursively look up the original np.memmap instance base if any."""
  132. b = getattr(a, "base", None)
  133. if b is None:
  134. # TODO: check scipy sparse datastructure if scipy is installed
  135. # a nor its descendants do not have a memmap base
  136. return None
  137. elif isinstance(b, mmap):
  138. # a is already a real memmap instance.
  139. return a
  140. else:
  141. # Recursive exploration of the base ancestry
  142. return _get_backing_memmap(b)
  143. def _get_temp_dir(pool_folder_name, temp_folder=None):
  144. """Get the full path to a subfolder inside the temporary folder.
  145. Parameters
  146. ----------
  147. pool_folder_name : str
  148. Sub-folder name used for the serialization of a pool instance.
  149. temp_folder: str, optional
  150. Folder to be used by the pool for memmapping large arrays
  151. for sharing memory with worker processes. If None, this will try in
  152. order:
  153. - a folder pointed by the JOBLIB_TEMP_FOLDER environment
  154. variable,
  155. - /dev/shm if the folder exists and is writable: this is a
  156. RAMdisk filesystem available by default on modern Linux
  157. distributions,
  158. - the default system temporary folder that can be
  159. overridden with TMP, TMPDIR or TEMP environment
  160. variables, typically /tmp under Unix operating systems.
  161. Returns
  162. -------
  163. pool_folder : str
  164. full path to the temporary folder
  165. use_shared_mem : bool
  166. whether the temporary folder is written to the system shared memory
  167. folder or some other temporary folder.
  168. """
  169. use_shared_mem = False
  170. if temp_folder is None:
  171. temp_folder = os.environ.get("JOBLIB_TEMP_FOLDER", None)
  172. if temp_folder is None:
  173. if os.path.exists(SYSTEM_SHARED_MEM_FS) and hasattr(os, "statvfs"):
  174. try:
  175. shm_stats = os.statvfs(SYSTEM_SHARED_MEM_FS)
  176. available_nbytes = shm_stats.f_bsize * shm_stats.f_bavail
  177. if available_nbytes > SYSTEM_SHARED_MEM_FS_MIN_SIZE:
  178. # Try to see if we have write access to the shared mem
  179. # folder only if it is reasonably large (that is 2GB or
  180. # more).
  181. temp_folder = SYSTEM_SHARED_MEM_FS
  182. pool_folder = os.path.join(temp_folder, pool_folder_name)
  183. if not os.path.exists(pool_folder):
  184. os.makedirs(pool_folder)
  185. use_shared_mem = True
  186. except (IOError, OSError):
  187. # Missing rights in the /dev/shm partition, fallback to regular
  188. # temp folder.
  189. temp_folder = None
  190. if temp_folder is None:
  191. # Fallback to the default tmp folder, typically /tmp
  192. temp_folder = tempfile.gettempdir()
  193. temp_folder = os.path.abspath(os.path.expanduser(temp_folder))
  194. pool_folder = os.path.join(temp_folder, pool_folder_name)
  195. return pool_folder, use_shared_mem
  196. def has_shareable_memory(a):
  197. """Return True if a is backed by some mmap buffer directly or not."""
  198. return _get_backing_memmap(a) is not None
  199. def _strided_from_memmap(
  200. filename,
  201. dtype,
  202. mode,
  203. offset,
  204. order,
  205. shape,
  206. strides,
  207. total_buffer_len,
  208. unlink_on_gc_collect,
  209. ):
  210. """Reconstruct an array view on a memory mapped file."""
  211. if mode == "w+":
  212. # Do not zero the original data when unpickling
  213. mode = "r+"
  214. if strides is None:
  215. # Simple, contiguous memmap
  216. return make_memmap(
  217. filename,
  218. dtype=dtype,
  219. shape=shape,
  220. mode=mode,
  221. offset=offset,
  222. order=order,
  223. unlink_on_gc_collect=unlink_on_gc_collect,
  224. )
  225. else:
  226. # For non-contiguous data, memmap the total enclosing buffer and then
  227. # extract the non-contiguous view with the stride-tricks API
  228. base = make_memmap(
  229. filename,
  230. dtype=dtype,
  231. shape=total_buffer_len,
  232. offset=offset,
  233. mode=mode,
  234. order=order,
  235. unlink_on_gc_collect=unlink_on_gc_collect,
  236. )
  237. return as_strided(base, shape=shape, strides=strides)
  238. def _reduce_memmap_backed(a, m):
  239. """Pickling reduction for memmap backed arrays.
  240. a is expected to be an instance of np.ndarray (or np.memmap)
  241. m is expected to be an instance of np.memmap on the top of the ``base``
  242. attribute ancestry of a. ``m.base`` should be the real python mmap object.
  243. """
  244. # offset that comes from the striding differences between a and m
  245. util.debug(
  246. "[MEMMAP REDUCE] reducing a memmap-backed array (shape, {}, pid: {})".format(
  247. a.shape, os.getpid()
  248. )
  249. )
  250. try:
  251. from numpy.lib.array_utils import byte_bounds
  252. except (ModuleNotFoundError, ImportError):
  253. # Backward-compat for numpy < 2.0
  254. from numpy import byte_bounds
  255. a_start, a_end = byte_bounds(a)
  256. m_start = byte_bounds(m)[0]
  257. offset = a_start - m_start
  258. # offset from the backing memmap
  259. offset += m.offset
  260. # 1D arrays are both F and C contiguous, so only set the flag in
  261. # higher dimensions. See https://github.com/joblib/joblib/pull/1704.
  262. if m.ndim > 1 and m.flags["F_CONTIGUOUS"]:
  263. order = "F"
  264. else:
  265. # The backing memmap buffer is necessarily contiguous hence C if not
  266. # Fortran
  267. order = "C"
  268. if a.flags["F_CONTIGUOUS"] or a.flags["C_CONTIGUOUS"]:
  269. # If the array is a contiguous view, no need to pass the strides
  270. strides = None
  271. total_buffer_len = None
  272. else:
  273. # Compute the total number of items to map from which the strided
  274. # view will be extracted.
  275. strides = a.strides
  276. total_buffer_len = (a_end - a_start) // a.itemsize
  277. return (
  278. _strided_from_memmap,
  279. (
  280. m.filename,
  281. a.dtype,
  282. m.mode,
  283. offset,
  284. order,
  285. a.shape,
  286. strides,
  287. total_buffer_len,
  288. False,
  289. ),
  290. )
  291. def reduce_array_memmap_backward(a):
  292. """reduce a np.array or a np.memmap from a child process"""
  293. m = _get_backing_memmap(a)
  294. if isinstance(m, np.memmap) and m.filename not in JOBLIB_MMAPS:
  295. # if a is backed by a memmaped file, reconstruct a using the
  296. # memmaped file.
  297. return _reduce_memmap_backed(a, m)
  298. else:
  299. # a is either a regular (not memmap-backed) numpy array, or an array
  300. # backed by a shared temporary file created by joblib. In the latter
  301. # case, in order to limit the lifespan of these temporary files, we
  302. # serialize the memmap as a regular numpy array, and decref the
  303. # file backing the memmap (done implicitly in a previously registered
  304. # finalizer, see ``unlink_on_gc_collect`` for more details)
  305. return (loads, (dumps(np.asarray(a), protocol=HIGHEST_PROTOCOL),))
  306. class ArrayMemmapForwardReducer(object):
  307. """Reducer callable to dump large arrays to memmap files.
  308. Parameters
  309. ----------
  310. max_nbytes: int
  311. Threshold to trigger memmapping of large arrays to files created
  312. a folder.
  313. temp_folder_resolver: callable
  314. An callable in charge of resolving a temporary folder name where files
  315. for backing memmapped arrays are created.
  316. mmap_mode: 'r', 'r+' or 'c'
  317. Mode for the created memmap datastructure. See the documentation of
  318. numpy.memmap for more details. Note: 'w+' is coerced to 'r+'
  319. automatically to avoid zeroing the data on unpickling.
  320. verbose: int, optional, 0 by default
  321. If verbose > 0, memmap creations are logged.
  322. If verbose > 1, both memmap creations, reuse and array pickling are
  323. logged.
  324. prewarm: bool, optional, False by default.
  325. Force a read on newly memmapped array to make sure that OS pre-cache it
  326. memory. This can be useful to avoid concurrent disk access when the
  327. same data array is passed to different worker processes.
  328. """
  329. def __init__(
  330. self,
  331. max_nbytes,
  332. temp_folder_resolver,
  333. mmap_mode,
  334. unlink_on_gc_collect,
  335. verbose=0,
  336. prewarm=True,
  337. ):
  338. self._max_nbytes = max_nbytes
  339. self._temp_folder_resolver = temp_folder_resolver
  340. self._mmap_mode = mmap_mode
  341. self.verbose = int(verbose)
  342. if prewarm == "auto":
  343. self._prewarm = not self._temp_folder.startswith(SYSTEM_SHARED_MEM_FS)
  344. else:
  345. self._prewarm = prewarm
  346. self._prewarm = prewarm
  347. self._memmaped_arrays = _WeakArrayKeyMap()
  348. self._temporary_memmaped_filenames = set()
  349. self._unlink_on_gc_collect = unlink_on_gc_collect
  350. @property
  351. def _temp_folder(self):
  352. return self._temp_folder_resolver()
  353. def __reduce__(self):
  354. # The ArrayMemmapForwardReducer is passed to the children processes: it
  355. # needs to be pickled but the _WeakArrayKeyMap need to be skipped as
  356. # it's only guaranteed to be consistent with the parent process memory
  357. # garbage collection.
  358. # Although this reducer is pickled, it is not needed in its destination
  359. # process (child processes), as we only use this reducer to send
  360. # memmaps from the parent process to the children processes. For this
  361. # reason, we can afford skipping the resolver, (which would otherwise
  362. # be unpicklable), and pass it as None instead.
  363. args = (self._max_nbytes, None, self._mmap_mode, self._unlink_on_gc_collect)
  364. kwargs = {
  365. "verbose": self.verbose,
  366. "prewarm": self._prewarm,
  367. }
  368. return ArrayMemmapForwardReducer, args, kwargs
  369. def __call__(self, a):
  370. m = _get_backing_memmap(a)
  371. if m is not None and isinstance(m, np.memmap):
  372. # a is already backed by a memmap file, let's reuse it directly
  373. return _reduce_memmap_backed(a, m)
  374. if (
  375. not a.dtype.hasobject
  376. and self._max_nbytes is not None
  377. and a.nbytes > self._max_nbytes
  378. ):
  379. # check that the folder exists (lazily create the pool temp folder
  380. # if required)
  381. try:
  382. os.makedirs(self._temp_folder)
  383. os.chmod(self._temp_folder, FOLDER_PERMISSIONS)
  384. except OSError as e:
  385. if e.errno != errno.EEXIST:
  386. raise e
  387. try:
  388. basename = self._memmaped_arrays.get(a)
  389. except KeyError:
  390. # Generate a new unique random filename. The process and thread
  391. # ids are only useful for debugging purpose and to make it
  392. # easier to cleanup orphaned files in case of hard process
  393. # kill (e.g. by "kill -9" or segfault).
  394. basename = "{}-{}-{}.pkl".format(
  395. os.getpid(), id(threading.current_thread()), uuid4().hex
  396. )
  397. self._memmaped_arrays.set(a, basename)
  398. filename = os.path.join(self._temp_folder, basename)
  399. # In case the same array with the same content is passed several
  400. # times to the pool subprocess children, serialize it only once
  401. is_new_memmap = filename not in self._temporary_memmaped_filenames
  402. # add the memmap to the list of temporary memmaps created by joblib
  403. self._temporary_memmaped_filenames.add(filename)
  404. if self._unlink_on_gc_collect:
  405. # Bump reference count of the memmap by 1 to account for
  406. # shared usage of the memmap by a child process. The
  407. # corresponding decref call will be executed upon calling
  408. # resource_tracker.maybe_unlink, registered as a finalizer in
  409. # the child.
  410. # the incref/decref calls here are only possible when the child
  411. # and the parent share the same resource_tracker. It is not the
  412. # case for the multiprocessing backend, but it does not matter
  413. # because unlinking a memmap from a child process is only
  414. # useful to control the memory usage of long-lasting child
  415. # processes, while the multiprocessing-based pools terminate
  416. # their workers at the end of a map() call.
  417. resource_tracker.register(filename, "file")
  418. if is_new_memmap:
  419. # Incref each temporary memmap created by joblib one extra
  420. # time. This means that these memmaps will only be deleted
  421. # once an extra maybe_unlink() is called, which is done once
  422. # all the jobs have completed (or been canceled) in the
  423. # Parallel._terminate_backend() method.
  424. resource_tracker.register(filename, "file")
  425. if not os.path.exists(filename):
  426. util.debug(
  427. "[ARRAY DUMP] Pickling new array (shape={}, dtype={}) "
  428. "creating a new memmap at {}".format(a.shape, a.dtype, filename)
  429. )
  430. for dumped_filename in dump(a, filename):
  431. os.chmod(dumped_filename, FILE_PERMISSIONS)
  432. if self._prewarm:
  433. # Warm up the data by accessing it. This operation ensures
  434. # that the disk access required to create the memmapping
  435. # file are performed in the reducing process and avoids
  436. # concurrent memmap creation in multiple children
  437. # processes.
  438. load(filename, mmap_mode=self._mmap_mode).max()
  439. else:
  440. util.debug(
  441. "[ARRAY DUMP] Pickling known array (shape={}, dtype={}) "
  442. "reusing memmap file: {}".format(
  443. a.shape, a.dtype, os.path.basename(filename)
  444. )
  445. )
  446. # The worker process will use joblib.load to memmap the data
  447. return (
  448. load_temporary_memmap,
  449. (filename, self._mmap_mode, self._unlink_on_gc_collect),
  450. )
  451. else:
  452. # do not convert a into memmap, let pickler do its usual copy with
  453. # the default system pickler
  454. util.debug(
  455. "[ARRAY DUMP] Pickling array (NO MEMMAPPING) (shape={}, "
  456. " dtype={}).".format(a.shape, a.dtype)
  457. )
  458. return (loads, (dumps(a, protocol=HIGHEST_PROTOCOL),))
  459. def get_memmapping_reducers(
  460. forward_reducers=None,
  461. backward_reducers=None,
  462. temp_folder_resolver=None,
  463. max_nbytes=1e6,
  464. mmap_mode="r",
  465. verbose=0,
  466. prewarm=False,
  467. unlink_on_gc_collect=True,
  468. **kwargs,
  469. ):
  470. """Construct a pair of memmapping reducer linked to a tmpdir.
  471. This function manage the creation and the clean up of the temporary folders
  472. underlying the memory maps and should be use to get the reducers necessary
  473. to construct joblib pool or executor.
  474. """
  475. if forward_reducers is None:
  476. forward_reducers = dict()
  477. if backward_reducers is None:
  478. backward_reducers = dict()
  479. if np is not None:
  480. # Register smart numpy.ndarray reducers that detects memmap backed
  481. # arrays and that is also able to dump to memmap large in-memory
  482. # arrays over the max_nbytes threshold
  483. forward_reduce_ndarray = ArrayMemmapForwardReducer(
  484. max_nbytes,
  485. temp_folder_resolver,
  486. mmap_mode,
  487. unlink_on_gc_collect,
  488. verbose,
  489. prewarm=prewarm,
  490. )
  491. forward_reducers[np.ndarray] = forward_reduce_ndarray
  492. forward_reducers[np.memmap] = forward_reduce_ndarray
  493. # Communication from child process to the parent process always
  494. # pickles in-memory numpy.ndarray without dumping them as memmap
  495. # to avoid confusing the caller and make it tricky to collect the
  496. # temporary folder
  497. backward_reducers[np.ndarray] = reduce_array_memmap_backward
  498. backward_reducers[np.memmap] = reduce_array_memmap_backward
  499. return forward_reducers, backward_reducers
  500. class TemporaryResourcesManager(object):
  501. """Stateful object able to manage temporary folder and pickles
  502. It exposes:
  503. - a per-context folder name resolving API that memmap-based reducers will
  504. rely on to know where to pickle the temporary memmaps
  505. - a temporary file/folder management API that internally uses the
  506. resource_tracker.
  507. """
  508. def __init__(self, temp_folder_root=None, context_id=None):
  509. self._current_temp_folder = None
  510. self._temp_folder_root = temp_folder_root
  511. self._use_shared_mem = None
  512. self._cached_temp_folders = dict()
  513. self._id = uuid4().hex
  514. self._finalizers = {}
  515. if context_id is None:
  516. # It would be safer to not assign a default context id (less silent
  517. # bugs), but doing this while maintaining backward compatibility
  518. # with the previous, context-unaware version get_memmaping_executor
  519. # exposes too many low-level details.
  520. context_id = uuid4().hex
  521. self.set_current_context(context_id)
  522. def set_current_context(self, context_id):
  523. self._current_context_id = context_id
  524. self.register_new_context(context_id)
  525. def register_new_context(self, context_id):
  526. # Prepare a sub-folder name specific to a context (usually a unique id
  527. # generated by each instance of the Parallel class). Do not create in
  528. # advance to spare FS write access if no array is to be dumped).
  529. if context_id in self._cached_temp_folders:
  530. return
  531. else:
  532. # During its lifecycle, one Parallel object can have several
  533. # executors associated to it (for instance, if a loky worker raises
  534. # an exception, joblib shutdowns the executor and instantly
  535. # recreates a new one before raising the error - see
  536. # ``ensure_ready``. Because we don't want two executors tied to
  537. # the same Parallel object (and thus the same context id) to
  538. # register/use/delete the same folder, we also add an id specific
  539. # to the current Manager (and thus specific to its associated
  540. # executor) to the folder name.
  541. new_folder_name = "joblib_memmapping_folder_{}_{}_{}".format(
  542. os.getpid(), self._id, context_id
  543. )
  544. new_folder_path, _ = _get_temp_dir(new_folder_name, self._temp_folder_root)
  545. self.register_folder_finalizer(new_folder_path, context_id)
  546. self._cached_temp_folders[context_id] = new_folder_path
  547. def resolve_temp_folder_name(self):
  548. """Return a folder name specific to the currently activated context"""
  549. return self._cached_temp_folders[self._current_context_id]
  550. # resource management API
  551. def register_folder_finalizer(self, pool_subfolder, context_id):
  552. # Register the garbage collector at program exit in case caller forgets
  553. # to call terminate explicitly: note we do not pass any reference to
  554. # ensure that this callback won't prevent garbage collection of
  555. # parallel instance and related file handler resources such as POSIX
  556. # semaphores and pipes
  557. pool_module_name = whichmodule(delete_folder, "delete_folder")
  558. resource_tracker.register(pool_subfolder, "folder")
  559. def _cleanup():
  560. # In some cases the Python runtime seems to set delete_folder to
  561. # None just before exiting when accessing the delete_folder
  562. # function from the closure namespace. So instead we reimport
  563. # the delete_folder function explicitly.
  564. # https://github.com/joblib/joblib/issues/328
  565. # We cannot just use from 'joblib.pool import delete_folder'
  566. # because joblib should only use relative imports to allow
  567. # easy vendoring.
  568. delete_folder = __import__(
  569. pool_module_name, fromlist=["delete_folder"]
  570. ).delete_folder
  571. try:
  572. delete_folder(pool_subfolder, allow_non_empty=True)
  573. resource_tracker.unregister(pool_subfolder, "folder")
  574. except OSError:
  575. warnings.warn(
  576. "Failed to delete temporary folder: {}".format(pool_subfolder)
  577. )
  578. self._finalizers[context_id] = atexit.register(_cleanup)
  579. def _clean_temporary_resources(
  580. self, context_id=None, force=False, allow_non_empty=False
  581. ):
  582. """Clean temporary resources created by a process-based pool"""
  583. if context_id is None:
  584. # Iterates over a copy of the cache keys to avoid Error due to
  585. # iterating over a changing size dictionary.
  586. for context_id in list(self._cached_temp_folders):
  587. self._clean_temporary_resources(
  588. context_id, force=force, allow_non_empty=allow_non_empty
  589. )
  590. else:
  591. temp_folder = self._cached_temp_folders.get(context_id)
  592. if temp_folder and os.path.exists(temp_folder):
  593. for filename in os.listdir(temp_folder):
  594. if force:
  595. # Some workers have failed and the ref counted might
  596. # be off. The workers should have shut down by this
  597. # time so forcefully clean up the files.
  598. resource_tracker.unregister(
  599. os.path.join(temp_folder, filename), "file"
  600. )
  601. else:
  602. resource_tracker.maybe_unlink(
  603. os.path.join(temp_folder, filename), "file"
  604. )
  605. # When forcing clean-up, try to delete the folder even if some
  606. # files are still in it. Otherwise, try to delete the folder
  607. allow_non_empty |= force
  608. # Clean up the folder if possible, either if it is empty or
  609. # if none of the files in it are in used and allow_non_empty.
  610. try:
  611. delete_folder(temp_folder, allow_non_empty=allow_non_empty)
  612. # Forget the folder once it has been deleted
  613. self._cached_temp_folders.pop(context_id, None)
  614. resource_tracker.unregister(temp_folder, "folder")
  615. # Also cancel the finalizers that gets triggered at gc.
  616. finalizer = self._finalizers.pop(context_id, None)
  617. if finalizer is not None:
  618. atexit.unregister(finalizer)
  619. except OSError:
  620. # Temporary folder cannot be deleted right now.
  621. # This folder will be cleaned up by an atexit
  622. # finalizer registered by the memmapping_reducer.
  623. pass