_parallel_backends.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753
  1. """
  2. Backends for embarrassingly parallel code.
  3. """
  4. import contextlib
  5. import gc
  6. import os
  7. import threading
  8. import warnings
  9. from abc import ABCMeta, abstractmethod
  10. from ._multiprocessing_helpers import mp
  11. from ._utils import (
  12. _retrieve_traceback_capturing_wrapped_call,
  13. _TracebackCapturingWrapper,
  14. )
  15. if mp is not None:
  16. from multiprocessing.pool import ThreadPool
  17. from .executor import get_memmapping_executor
  18. # Import loky only if multiprocessing is present
  19. from .externals.loky import cpu_count, process_executor
  20. from .externals.loky.process_executor import ShutdownExecutorError
  21. from .pool import MemmappingPool
  22. class ParallelBackendBase(metaclass=ABCMeta):
  23. """Helper abc which defines all methods a ParallelBackend must implement"""
  24. default_n_jobs = 1
  25. supports_inner_max_num_threads = False
  26. # This flag was introduced for backward compatibility reasons.
  27. # New backends should always set it to True and implement the
  28. # `retrieve_result_callback` method.
  29. supports_retrieve_callback = False
  30. @property
  31. def supports_return_generator(self):
  32. return self.supports_retrieve_callback
  33. @property
  34. def supports_timeout(self):
  35. return self.supports_retrieve_callback
  36. nesting_level = None
  37. def __init__(
  38. self, nesting_level=None, inner_max_num_threads=None, **backend_kwargs
  39. ):
  40. super().__init__()
  41. self.nesting_level = nesting_level
  42. self.inner_max_num_threads = inner_max_num_threads
  43. self.backend_kwargs = backend_kwargs
  44. MAX_NUM_THREADS_VARS = [
  45. "OMP_NUM_THREADS",
  46. "OPENBLAS_NUM_THREADS",
  47. "MKL_NUM_THREADS",
  48. "BLIS_NUM_THREADS",
  49. "VECLIB_MAXIMUM_THREADS",
  50. "NUMBA_NUM_THREADS",
  51. "NUMEXPR_NUM_THREADS",
  52. ]
  53. TBB_ENABLE_IPC_VAR = "ENABLE_IPC"
  54. @abstractmethod
  55. def effective_n_jobs(self, n_jobs):
  56. """Determine the number of jobs that can actually run in parallel
  57. n_jobs is the number of workers requested by the callers. Passing
  58. n_jobs=-1 means requesting all available workers for instance matching
  59. the number of CPU cores on the worker host(s).
  60. This method should return a guesstimate of the number of workers that
  61. can actually perform work concurrently. The primary use case is to make
  62. it possible for the caller to know in how many chunks to slice the
  63. work.
  64. In general working on larger data chunks is more efficient (less
  65. scheduling overhead and better use of CPU cache prefetching heuristics)
  66. as long as all the workers have enough work to do.
  67. """
  68. def apply_async(self, func, callback=None):
  69. """Deprecated: implement `submit` instead."""
  70. raise NotImplementedError("Implement `submit` instead.")
  71. def submit(self, func, callback=None):
  72. """Schedule a function to be run and return a future-like object.
  73. This method should return a future-like object that allow tracking
  74. the progress of the task.
  75. If ``supports_retrieve_callback`` is False, the return value of this
  76. method is passed to ``retrieve_result`` instead of calling
  77. ``retrieve_result_callback``.
  78. Parameters
  79. ----------
  80. func: callable
  81. The function to be run in parallel.
  82. callback: callable
  83. A callable that will be called when the task is completed. This callable
  84. is a wrapper around ``retrieve_result_callback``. This should be added
  85. to the future-like object returned by this method, so that the callback
  86. is called when the task is completed.
  87. For future-like backends, this can be achieved with something like
  88. ``future.add_done_callback(callback)``.
  89. Returns
  90. -------
  91. future: future-like
  92. A future-like object to track the execution of the submitted function.
  93. """
  94. warnings.warn(
  95. "`apply_async` is deprecated, implement and use `submit` instead.",
  96. DeprecationWarning,
  97. )
  98. return self.apply_async(func, callback)
  99. def retrieve_result_callback(self, out):
  100. """Called within the callback function passed to `submit`.
  101. This method can customise how the result of the function is retrieved
  102. from the future-like object.
  103. Parameters
  104. ----------
  105. future: future-like
  106. The future-like object returned by the `submit` method.
  107. Returns
  108. -------
  109. result: object
  110. The result of the function executed in parallel.
  111. """
  112. def retrieve_result(self, out, timeout=None):
  113. """Hook to retrieve the result when support_retrieve_callback=False.
  114. The argument `out` is the result of the `submit` call. This method
  115. should return the result of the computation or raise an exception if
  116. the computation failed.
  117. """
  118. if self.supports_timeout:
  119. return out.get(timeout=timeout)
  120. else:
  121. return out.get()
  122. def configure(
  123. self, n_jobs=1, parallel=None, prefer=None, require=None, **backend_kwargs
  124. ):
  125. """Reconfigure the backend and return the number of workers.
  126. This makes it possible to reuse an existing backend instance for
  127. successive independent calls to Parallel with different parameters.
  128. """
  129. self.parallel = parallel
  130. return self.effective_n_jobs(n_jobs)
  131. def start_call(self):
  132. """Call-back method called at the beginning of a Parallel call"""
  133. def stop_call(self):
  134. """Call-back method called at the end of a Parallel call"""
  135. def terminate(self):
  136. """Shutdown the workers and free the shared memory."""
  137. def compute_batch_size(self):
  138. """Determine the optimal batch size"""
  139. return 1
  140. def batch_completed(self, batch_size, duration):
  141. """Callback indicate how long it took to run a batch"""
  142. def abort_everything(self, ensure_ready=True):
  143. """Abort any running tasks
  144. This is called when an exception has been raised when executing a task
  145. and all the remaining tasks will be ignored and can therefore be
  146. aborted to spare computation resources.
  147. If ensure_ready is True, the backend should be left in an operating
  148. state as future tasks might be re-submitted via that same backend
  149. instance.
  150. If ensure_ready is False, the implementer of this method can decide
  151. to leave the backend in a closed / terminated state as no new task
  152. are expected to be submitted to this backend.
  153. Setting ensure_ready to False is an optimization that can be leveraged
  154. when aborting tasks via killing processes from a local process pool
  155. managed by the backend it-self: if we expect no new tasks, there is no
  156. point in re-creating new workers.
  157. """
  158. # Does nothing by default: to be overridden in subclasses when
  159. # canceling tasks is possible.
  160. pass
  161. def get_nested_backend(self):
  162. """Backend instance to be used by nested Parallel calls.
  163. By default a thread-based backend is used for the first level of
  164. nesting. Beyond, switch to sequential backend to avoid spawning too
  165. many threads on the host.
  166. """
  167. nesting_level = getattr(self, "nesting_level", 0) + 1
  168. if nesting_level > 1:
  169. return SequentialBackend(nesting_level=nesting_level), None
  170. else:
  171. return ThreadingBackend(nesting_level=nesting_level), None
  172. def _prepare_worker_env(self, n_jobs):
  173. """Return environment variables limiting threadpools in external libs.
  174. This function return a dict containing environment variables to pass
  175. when creating a pool of process. These environment variables limit the
  176. number of threads to `n_threads` for OpenMP, MKL, Accelerated and
  177. OpenBLAS libraries in the child processes.
  178. """
  179. explicit_n_threads = self.inner_max_num_threads
  180. default_n_threads = max(cpu_count() // n_jobs, 1)
  181. # Set the inner environment variables to self.inner_max_num_threads if
  182. # it is given. Else, default to cpu_count // n_jobs unless the variable
  183. # is already present in the parent process environment.
  184. env = {}
  185. for var in self.MAX_NUM_THREADS_VARS:
  186. if explicit_n_threads is None:
  187. var_value = os.environ.get(var, default_n_threads)
  188. else:
  189. var_value = explicit_n_threads
  190. env[var] = str(var_value)
  191. if self.TBB_ENABLE_IPC_VAR not in os.environ:
  192. # To avoid over-subscription when using TBB, let the TBB schedulers
  193. # use Inter Process Communication to coordinate:
  194. env[self.TBB_ENABLE_IPC_VAR] = "1"
  195. return env
  196. @contextlib.contextmanager
  197. def retrieval_context(self):
  198. """Context manager to manage an execution context.
  199. Calls to Parallel.retrieve will be made inside this context.
  200. By default, this does nothing. It may be useful for subclasses to
  201. handle nested parallelism. In particular, it may be required to avoid
  202. deadlocks if a backend manages a fixed number of workers, when those
  203. workers may be asked to do nested Parallel calls. Without
  204. 'retrieval_context' this could lead to deadlock, as all the workers
  205. managed by the backend may be "busy" waiting for the nested parallel
  206. calls to finish, but the backend has no free workers to execute those
  207. tasks.
  208. """
  209. yield
  210. @staticmethod
  211. def in_main_thread():
  212. return isinstance(threading.current_thread(), threading._MainThread)
  213. class SequentialBackend(ParallelBackendBase):
  214. """A ParallelBackend which will execute all batches sequentially.
  215. Does not use/create any threading objects, and hence has minimal
  216. overhead. Used when n_jobs == 1.
  217. """
  218. uses_threads = True
  219. supports_timeout = False
  220. supports_retrieve_callback = False
  221. supports_sharedmem = True
  222. def effective_n_jobs(self, n_jobs):
  223. """Determine the number of jobs which are going to run in parallel"""
  224. if n_jobs == 0:
  225. raise ValueError("n_jobs == 0 in Parallel has no meaning")
  226. return 1
  227. def submit(self, func, callback=None):
  228. """Schedule a func to be run"""
  229. raise RuntimeError("Should never be called for SequentialBackend.")
  230. def retrieve_result_callback(self, out):
  231. raise RuntimeError("Should never be called for SequentialBackend.")
  232. def get_nested_backend(self):
  233. # import is not top level to avoid cyclic import errors.
  234. from .parallel import get_active_backend
  235. # SequentialBackend should neither change the nesting level, the
  236. # default backend or the number of jobs. Just return the current one.
  237. return get_active_backend()
  238. class PoolManagerMixin(object):
  239. """A helper class for managing pool of workers."""
  240. _pool = None
  241. def effective_n_jobs(self, n_jobs):
  242. """Determine the number of jobs which are going to run in parallel"""
  243. if n_jobs == 0:
  244. raise ValueError("n_jobs == 0 in Parallel has no meaning")
  245. elif mp is None or n_jobs is None:
  246. # multiprocessing is not available or disabled, fallback
  247. # to sequential mode
  248. return 1
  249. elif n_jobs < 0:
  250. n_jobs = max(cpu_count() + 1 + n_jobs, 1)
  251. return n_jobs
  252. def terminate(self):
  253. """Shutdown the process or thread pool"""
  254. if self._pool is not None:
  255. self._pool.close()
  256. self._pool.terminate() # terminate does a join()
  257. self._pool = None
  258. def _get_pool(self):
  259. """Used by `submit` to make it possible to implement lazy init"""
  260. return self._pool
  261. def submit(self, func, callback=None):
  262. """Schedule a func to be run"""
  263. # Here, we need a wrapper to avoid crashes on KeyboardInterruptErrors.
  264. # We also call the callback on error, to make sure the pool does not
  265. # wait on crashed jobs.
  266. return self._get_pool().apply_async(
  267. _TracebackCapturingWrapper(func),
  268. (),
  269. callback=callback,
  270. error_callback=callback,
  271. )
  272. def retrieve_result_callback(self, result):
  273. """Mimic concurrent.futures results, raising an error if needed."""
  274. # In the multiprocessing Pool API, the callback are called with the
  275. # result value as an argument so `result`(`out`) is the output of
  276. # job.get(). It's either the result or the exception raised while
  277. # collecting the result.
  278. return _retrieve_traceback_capturing_wrapped_call(result)
  279. def abort_everything(self, ensure_ready=True):
  280. """Shutdown the pool and restart a new one with the same parameters"""
  281. self.terminate()
  282. if ensure_ready:
  283. self.configure(
  284. n_jobs=self.parallel.n_jobs,
  285. parallel=self.parallel,
  286. **self.parallel._backend_kwargs,
  287. )
  288. class AutoBatchingMixin(object):
  289. """A helper class for automagically batching jobs."""
  290. # In seconds, should be big enough to hide multiprocessing dispatching
  291. # overhead.
  292. # This settings was found by running benchmarks/bench_auto_batching.py
  293. # with various parameters on various platforms.
  294. MIN_IDEAL_BATCH_DURATION = 0.2
  295. # Should not be too high to avoid stragglers: long jobs running alone
  296. # on a single worker while other workers have no work to process any more.
  297. MAX_IDEAL_BATCH_DURATION = 2
  298. # Batching counters default values
  299. _DEFAULT_EFFECTIVE_BATCH_SIZE = 1
  300. _DEFAULT_SMOOTHED_BATCH_DURATION = 0.0
  301. def __init__(self, **kwargs):
  302. super().__init__(**kwargs)
  303. self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE
  304. self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
  305. def compute_batch_size(self):
  306. """Determine the optimal batch size"""
  307. old_batch_size = self._effective_batch_size
  308. batch_duration = self._smoothed_batch_duration
  309. if batch_duration > 0 and batch_duration < self.MIN_IDEAL_BATCH_DURATION:
  310. # The current batch size is too small: the duration of the
  311. # processing of a batch of task is not large enough to hide
  312. # the scheduling overhead.
  313. ideal_batch_size = int(
  314. old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration
  315. )
  316. # Multiply by two to limit oscilations between min and max.
  317. ideal_batch_size *= 2
  318. # dont increase the batch size too fast to limit huge batch sizes
  319. # potentially leading to starving worker
  320. batch_size = min(2 * old_batch_size, ideal_batch_size)
  321. batch_size = max(batch_size, 1)
  322. self._effective_batch_size = batch_size
  323. if self.parallel.verbose >= 10:
  324. self.parallel._print(
  325. f"Batch computation too fast ({batch_duration}s.) "
  326. f"Setting batch_size={batch_size}."
  327. )
  328. elif batch_duration > self.MAX_IDEAL_BATCH_DURATION and old_batch_size >= 2:
  329. # The current batch size is too big. If we schedule overly long
  330. # running batches some CPUs might wait with nothing left to do
  331. # while a couple of CPUs a left processing a few long running
  332. # batches. Better reduce the batch size a bit to limit the
  333. # likelihood of scheduling such stragglers.
  334. # decrease the batch size quickly to limit potential starving
  335. ideal_batch_size = int(
  336. old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration
  337. )
  338. # Multiply by two to limit oscilations between min and max.
  339. batch_size = max(2 * ideal_batch_size, 1)
  340. self._effective_batch_size = batch_size
  341. if self.parallel.verbose >= 10:
  342. self.parallel._print(
  343. f"Batch computation too slow ({batch_duration}s.) "
  344. f"Setting batch_size={batch_size}."
  345. )
  346. else:
  347. # No batch size adjustment
  348. batch_size = old_batch_size
  349. if batch_size != old_batch_size:
  350. # Reset estimation of the smoothed mean batch duration: this
  351. # estimate is updated in the multiprocessing apply_async
  352. # CallBack as long as the batch_size is constant. Therefore
  353. # we need to reset the estimate whenever we re-tune the batch
  354. # size.
  355. self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
  356. return batch_size
  357. def batch_completed(self, batch_size, duration):
  358. """Callback indicate how long it took to run a batch"""
  359. if batch_size == self._effective_batch_size:
  360. # Update the smoothed streaming estimate of the duration of a batch
  361. # from dispatch to completion
  362. old_duration = self._smoothed_batch_duration
  363. if old_duration == self._DEFAULT_SMOOTHED_BATCH_DURATION:
  364. # First record of duration for this batch size after the last
  365. # reset.
  366. new_duration = duration
  367. else:
  368. # Update the exponentially weighted average of the duration of
  369. # batch for the current effective size.
  370. new_duration = 0.8 * old_duration + 0.2 * duration
  371. self._smoothed_batch_duration = new_duration
  372. def reset_batch_stats(self):
  373. """Reset batch statistics to default values.
  374. This avoids interferences with future jobs.
  375. """
  376. self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE
  377. self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
  378. class ThreadingBackend(PoolManagerMixin, ParallelBackendBase):
  379. """A ParallelBackend which will use a thread pool to execute batches in.
  380. This is a low-overhead backend but it suffers from the Python Global
  381. Interpreter Lock if the called function relies a lot on Python objects.
  382. Mostly useful when the execution bottleneck is a compiled extension that
  383. explicitly releases the GIL (for instance a Cython loop wrapped in a "with
  384. nogil" block or an expensive call to a library such as NumPy).
  385. The actual thread pool is lazily initialized: the actual thread pool
  386. construction is delayed to the first call to apply_async.
  387. ThreadingBackend is used as the default backend for nested calls.
  388. """
  389. supports_retrieve_callback = True
  390. uses_threads = True
  391. supports_sharedmem = True
  392. def configure(self, n_jobs=1, parallel=None, **backend_kwargs):
  393. """Build a process or thread pool and return the number of workers"""
  394. n_jobs = self.effective_n_jobs(n_jobs)
  395. if n_jobs == 1:
  396. # Avoid unnecessary overhead and use sequential backend instead.
  397. raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level))
  398. self.parallel = parallel
  399. self._n_jobs = n_jobs
  400. return n_jobs
  401. def _get_pool(self):
  402. """Lazily initialize the thread pool
  403. The actual pool of worker threads is only initialized at the first
  404. call to apply_async.
  405. """
  406. if self._pool is None:
  407. self._pool = ThreadPool(self._n_jobs)
  408. return self._pool
  409. class MultiprocessingBackend(PoolManagerMixin, AutoBatchingMixin, ParallelBackendBase):
  410. """A ParallelBackend which will use a multiprocessing.Pool.
  411. Will introduce some communication and memory overhead when exchanging
  412. input and output data with the with the worker Python processes.
  413. However, does not suffer from the Python Global Interpreter Lock.
  414. """
  415. supports_retrieve_callback = True
  416. supports_return_generator = False
  417. def effective_n_jobs(self, n_jobs):
  418. """Determine the number of jobs which are going to run in parallel.
  419. This also checks if we are attempting to create a nested parallel
  420. loop.
  421. """
  422. if mp is None:
  423. return 1
  424. if mp.current_process().daemon:
  425. # Daemonic processes cannot have children
  426. if n_jobs != 1:
  427. if inside_dask_worker():
  428. msg = (
  429. "Inside a Dask worker with daemon=True, "
  430. "setting n_jobs=1.\nPossible work-arounds:\n"
  431. "- dask.config.set("
  432. "{'distributed.worker.daemon': False})"
  433. "- set the environment variable "
  434. "DASK_DISTRIBUTED__WORKER__DAEMON=False\n"
  435. "before creating your Dask cluster."
  436. )
  437. else:
  438. msg = (
  439. "Multiprocessing-backed parallel loops "
  440. "cannot be nested, setting n_jobs=1"
  441. )
  442. warnings.warn(msg, stacklevel=3)
  443. return 1
  444. if process_executor._CURRENT_DEPTH > 0:
  445. # Mixing loky and multiprocessing in nested loop is not supported
  446. if n_jobs != 1:
  447. warnings.warn(
  448. "Multiprocessing-backed parallel loops cannot be nested,"
  449. " below loky, setting n_jobs=1",
  450. stacklevel=3,
  451. )
  452. return 1
  453. elif not (self.in_main_thread() or self.nesting_level == 0):
  454. # Prevent posix fork inside in non-main posix threads
  455. if n_jobs != 1:
  456. warnings.warn(
  457. "Multiprocessing-backed parallel loops cannot be nested"
  458. " below threads, setting n_jobs=1",
  459. stacklevel=3,
  460. )
  461. return 1
  462. return super(MultiprocessingBackend, self).effective_n_jobs(n_jobs)
  463. def configure(
  464. self,
  465. n_jobs=1,
  466. parallel=None,
  467. prefer=None,
  468. require=None,
  469. **memmapping_pool_kwargs,
  470. ):
  471. """Build a process or thread pool and return the number of workers"""
  472. n_jobs = self.effective_n_jobs(n_jobs)
  473. if n_jobs == 1:
  474. raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level))
  475. memmapping_pool_kwargs = {
  476. **self.backend_kwargs,
  477. **memmapping_pool_kwargs,
  478. }
  479. # Make sure to free as much memory as possible before forking
  480. gc.collect()
  481. self._pool = MemmappingPool(n_jobs, **memmapping_pool_kwargs)
  482. self.parallel = parallel
  483. return n_jobs
  484. def terminate(self):
  485. """Shutdown the process or thread pool"""
  486. super(MultiprocessingBackend, self).terminate()
  487. self.reset_batch_stats()
  488. class LokyBackend(AutoBatchingMixin, ParallelBackendBase):
  489. """Managing pool of workers with loky instead of multiprocessing."""
  490. supports_retrieve_callback = True
  491. supports_inner_max_num_threads = True
  492. def configure(
  493. self,
  494. n_jobs=1,
  495. parallel=None,
  496. prefer=None,
  497. require=None,
  498. idle_worker_timeout=None,
  499. **memmapping_executor_kwargs,
  500. ):
  501. """Build a process executor and return the number of workers"""
  502. n_jobs = self.effective_n_jobs(n_jobs)
  503. if n_jobs == 1:
  504. raise FallbackToBackend(SequentialBackend(nesting_level=self.nesting_level))
  505. memmapping_executor_kwargs = {
  506. **self.backend_kwargs,
  507. **memmapping_executor_kwargs,
  508. }
  509. # Prohibit the use of 'timeout' in the LokyBackend, as 'idle_worker_timeout'
  510. # better describes the backend's behavior.
  511. if "timeout" in memmapping_executor_kwargs:
  512. raise ValueError(
  513. "The 'timeout' parameter is not supported by the LokyBackend. "
  514. "Please use the `idle_worker_timeout` parameter instead."
  515. )
  516. if idle_worker_timeout is None:
  517. idle_worker_timeout = self.backend_kwargs.get("idle_worker_timeout", 300)
  518. self._workers = get_memmapping_executor(
  519. n_jobs,
  520. timeout=idle_worker_timeout,
  521. env=self._prepare_worker_env(n_jobs=n_jobs),
  522. context_id=parallel._id,
  523. **memmapping_executor_kwargs,
  524. )
  525. self.parallel = parallel
  526. return n_jobs
  527. def effective_n_jobs(self, n_jobs):
  528. """Determine the number of jobs which are going to run in parallel"""
  529. if n_jobs == 0:
  530. raise ValueError("n_jobs == 0 in Parallel has no meaning")
  531. elif mp is None or n_jobs is None:
  532. # multiprocessing is not available or disabled, fallback
  533. # to sequential mode
  534. return 1
  535. elif mp.current_process().daemon:
  536. # Daemonic processes cannot have children
  537. if n_jobs != 1:
  538. if inside_dask_worker():
  539. msg = (
  540. "Inside a Dask worker with daemon=True, "
  541. "setting n_jobs=1.\nPossible work-arounds:\n"
  542. "- dask.config.set("
  543. "{'distributed.worker.daemon': False})\n"
  544. "- set the environment variable "
  545. "DASK_DISTRIBUTED__WORKER__DAEMON=False\n"
  546. "before creating your Dask cluster."
  547. )
  548. else:
  549. msg = (
  550. "Loky-backed parallel loops cannot be called in a"
  551. " multiprocessing, setting n_jobs=1"
  552. )
  553. warnings.warn(msg, stacklevel=3)
  554. return 1
  555. elif not (self.in_main_thread() or self.nesting_level == 0):
  556. # Prevent posix fork inside in non-main posix threads
  557. if n_jobs != 1:
  558. warnings.warn(
  559. "Loky-backed parallel loops cannot be nested below "
  560. "threads, setting n_jobs=1",
  561. stacklevel=3,
  562. )
  563. return 1
  564. elif n_jobs < 0:
  565. n_jobs = max(cpu_count() + 1 + n_jobs, 1)
  566. return n_jobs
  567. def submit(self, func, callback=None):
  568. """Schedule a func to be run"""
  569. future = self._workers.submit(func)
  570. if callback is not None:
  571. future.add_done_callback(callback)
  572. return future
  573. def retrieve_result_callback(self, future):
  574. """Retrieve the result, here out is the future given by submit"""
  575. try:
  576. return future.result()
  577. except ShutdownExecutorError:
  578. raise RuntimeError(
  579. "The executor underlying Parallel has been shutdown. "
  580. "This is likely due to the garbage collection of a previous "
  581. "generator from a call to Parallel with return_as='generator'."
  582. " Make sure the generator is not garbage collected when "
  583. "submitting a new job or that it is first properly exhausted."
  584. )
  585. def terminate(self):
  586. if self._workers is not None:
  587. # Don't terminate the workers as we want to reuse them in later
  588. # calls, but cleanup the temporary resources that the Parallel call
  589. # created. This 'hack' requires a private, low-level operation.
  590. self._workers._temp_folder_manager._clean_temporary_resources(
  591. context_id=self.parallel._id, force=False
  592. )
  593. self._workers = None
  594. self.reset_batch_stats()
  595. def abort_everything(self, ensure_ready=True):
  596. """Shutdown the workers and restart a new one with the same parameters"""
  597. self._workers.terminate(kill_workers=True)
  598. self._workers = None
  599. if ensure_ready:
  600. self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel)
  601. class FallbackToBackend(Exception):
  602. """Raised when configuration should fallback to another backend"""
  603. def __init__(self, backend):
  604. self.backend = backend
  605. def inside_dask_worker():
  606. """Check whether the current function is executed inside a Dask worker."""
  607. # This function can not be in joblib._dask because there would be a
  608. # circular import:
  609. # _dask imports _parallel_backend that imports _dask ...
  610. try:
  611. from distributed import get_worker
  612. except ImportError:
  613. return False
  614. try:
  615. get_worker()
  616. return True
  617. except ValueError:
  618. return False