| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075 |
- """
- Helpers for embarrassingly parallel code.
- """
- # Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org >
- # Copyright: 2010, Gael Varoquaux
- # License: BSD 3 clause
- from __future__ import division
- import collections
- import functools
- import itertools
- import os
- import queue
- import sys
- import threading
- import time
- import warnings
- import weakref
- from contextlib import nullcontext
- from math import floor, log10, sqrt
- from multiprocessing import TimeoutError
- from numbers import Integral
- from uuid import uuid4
- from ._multiprocessing_helpers import mp
- # Make sure that those two classes are part of the public joblib.parallel API
- # so that 3rd party backend implementers can import them from here.
- from ._parallel_backends import (
- AutoBatchingMixin, # noqa
- FallbackToBackend,
- LokyBackend,
- MultiprocessingBackend,
- ParallelBackendBase, # noqa
- SequentialBackend,
- ThreadingBackend,
- )
- from ._utils import _Sentinel, eval_expr
- from .disk import memstr_to_bytes
- from .logger import Logger, short_format_time
- BACKENDS = {
- "threading": ThreadingBackend,
- "sequential": SequentialBackend,
- }
- # name of the backend used by default by Parallel outside of any context
- # managed by ``parallel_config`` or ``parallel_backend``.
- # threading is the only backend that is always everywhere
- DEFAULT_BACKEND = "threading"
- DEFAULT_THREAD_BACKEND = "threading"
- DEFAULT_PROCESS_BACKEND = "threading"
- MAYBE_AVAILABLE_BACKENDS = {"multiprocessing", "loky"}
- # if multiprocessing is available, so is loky, we set it as the default
- # backend
- if mp is not None:
- BACKENDS["multiprocessing"] = MultiprocessingBackend
- from .externals import loky
- BACKENDS["loky"] = LokyBackend
- DEFAULT_BACKEND = "loky"
- DEFAULT_PROCESS_BACKEND = "loky"
- # Thread local value that can be overridden by the ``parallel_config`` context
- # manager
- _backend = threading.local()
- def _register_dask():
- """Register Dask Backend if called with parallel_config(backend="dask")"""
- try:
- from ._dask import DaskDistributedBackend
- register_parallel_backend("dask", DaskDistributedBackend)
- except ImportError as e:
- msg = (
- "To use the dask.distributed backend you must install both "
- "the `dask` and distributed modules.\n\n"
- "See https://dask.pydata.org/en/latest/install.html for more "
- "information."
- )
- raise ImportError(msg) from e
- EXTERNAL_BACKENDS = {
- "dask": _register_dask,
- }
- # Sentinels for the default values of the Parallel constructor and
- # the parallel_config and parallel_backend context managers
- default_parallel_config = {
- "backend": _Sentinel(default_value=None),
- "n_jobs": _Sentinel(default_value=None),
- "verbose": _Sentinel(default_value=0),
- "temp_folder": _Sentinel(default_value=None),
- "max_nbytes": _Sentinel(default_value="1M"),
- "mmap_mode": _Sentinel(default_value="r"),
- "prefer": _Sentinel(default_value=None),
- "require": _Sentinel(default_value=None),
- }
- VALID_BACKEND_HINTS = ("processes", "threads", None)
- VALID_BACKEND_CONSTRAINTS = ("sharedmem", None)
- def _get_config_param(param, context_config, key):
- """Return the value of a parallel config parameter
- Explicitly setting it in Parallel has priority over setting in a
- parallel_(config/backend) context manager.
- """
- if param is not default_parallel_config[key]:
- # param is explicitly set, return it
- return param
- if context_config[key] is not default_parallel_config[key]:
- # there's a context manager and the key is set, return it
- return context_config[key]
- # Otherwise, we are in the default_parallel_config,
- # return the default value
- return param.default_value
- def get_active_backend(
- prefer=default_parallel_config["prefer"],
- require=default_parallel_config["require"],
- verbose=default_parallel_config["verbose"],
- ):
- """Return the active default backend"""
- backend, config = _get_active_backend(prefer, require, verbose)
- n_jobs = _get_config_param(default_parallel_config["n_jobs"], config, "n_jobs")
- return backend, n_jobs
- def _get_active_backend(
- prefer=default_parallel_config["prefer"],
- require=default_parallel_config["require"],
- verbose=default_parallel_config["verbose"],
- ):
- """Return the active default backend"""
- backend_config = getattr(_backend, "config", default_parallel_config)
- backend = _get_config_param(
- default_parallel_config["backend"], backend_config, "backend"
- )
- prefer = _get_config_param(prefer, backend_config, "prefer")
- require = _get_config_param(require, backend_config, "require")
- verbose = _get_config_param(verbose, backend_config, "verbose")
- if prefer not in VALID_BACKEND_HINTS:
- raise ValueError(
- f"prefer={prefer} is not a valid backend hint, "
- f"expected one of {VALID_BACKEND_HINTS}"
- )
- if require not in VALID_BACKEND_CONSTRAINTS:
- raise ValueError(
- f"require={require} is not a valid backend constraint, "
- f"expected one of {VALID_BACKEND_CONSTRAINTS}"
- )
- if prefer == "processes" and require == "sharedmem":
- raise ValueError(
- "prefer == 'processes' and require == 'sharedmem' are inconsistent settings"
- )
- explicit_backend = True
- if backend is None:
- # We are either outside of the scope of any parallel_(config/backend)
- # context manager or the context manager did not set a backend.
- # create the default backend instance now.
- backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0)
- explicit_backend = False
- # Try to use the backend set by the user with the context manager.
- nesting_level = backend.nesting_level
- uses_threads = getattr(backend, "uses_threads", False)
- supports_sharedmem = getattr(backend, "supports_sharedmem", False)
- # Force to use thread-based backend if the provided backend does not
- # match the shared memory constraint or if the backend is not explicitly
- # given and threads are preferred.
- force_threads = (require == "sharedmem" and not supports_sharedmem) or (
- not explicit_backend and prefer == "threads" and not uses_threads
- )
- force_processes = not explicit_backend and prefer == "processes" and uses_threads
- if force_threads:
- # This backend does not match the shared memory constraint:
- # fallback to the default thead-based backend.
- sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND](
- nesting_level=nesting_level
- )
- # Warn the user if we forced the backend to thread-based, while the
- # user explicitly specified a non-thread-based backend.
- if verbose >= 10 and explicit_backend:
- print(
- f"Using {sharedmem_backend.__class__.__name__} as "
- f"joblib backend instead of {backend.__class__.__name__} "
- "as the latter does not provide shared memory semantics."
- )
- # Force to n_jobs=1 by default
- thread_config = backend_config.copy()
- thread_config["n_jobs"] = 1
- return sharedmem_backend, thread_config
- if force_processes:
- # This backend does not match the prefer="processes" constraint:
- # fallback to the default process-based backend.
- process_backend = BACKENDS[DEFAULT_PROCESS_BACKEND](nesting_level=nesting_level)
- return process_backend, backend_config.copy()
- return backend, backend_config
- class parallel_config:
- """Set the default backend or configuration for :class:`~joblib.Parallel`.
- This is an alternative to directly passing keyword arguments to the
- :class:`~joblib.Parallel` class constructor. It is particularly useful when
- calling into library code that uses joblib internally but does not expose
- the various parallel configuration arguments in its own API.
- Parameters
- ----------
- backend: str or ParallelBackendBase instance, default=None
- If ``backend`` is a string it must match a previously registered
- implementation using the :func:`~register_parallel_backend` function.
- By default the following backends are available:
- - 'loky': single-host, process-based parallelism (used by default),
- - 'threading': single-host, thread-based parallelism,
- - 'multiprocessing': legacy single-host, process-based parallelism.
- 'loky' is recommended to run functions that manipulate Python objects.
- 'threading' is a low-overhead alternative that is most efficient for
- functions that release the Global Interpreter Lock: e.g. I/O-bound
- code or CPU-bound code in a few calls to native code that explicitly
- releases the GIL. Note that on some rare systems (such as pyodide),
- multiprocessing and loky may not be available, in which case joblib
- defaults to threading.
- In addition, if the ``dask`` and ``distributed`` Python packages are
- installed, it is possible to use the 'dask' backend for better
- scheduling of nested parallel calls without over-subscription and
- potentially distribute parallel calls over a networked cluster of
- several hosts.
- It is also possible to use the distributed 'ray' backend for
- distributing the workload to a cluster of nodes. See more details
- in the Examples section below.
- Alternatively the backend can be passed directly as an instance.
- n_jobs: int, default=None
- The maximum number of concurrently running jobs, such as the number
- of Python worker processes when ``backend="loky"`` or the size of the
- thread-pool when ``backend="threading"``.
- This argument is converted to an integer, rounded below for float.
- If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
- ``n_cpus`` is obtained with :func:`~cpu_count`.
- For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
- using ``n_jobs=-2`` will result in all CPUs but one being used.
- This argument can also go above ``n_cpus``, which will cause
- oversubscription. In some cases, slight oversubscription can be
- beneficial, e.g., for tasks with large I/O operations.
- If 1 is given, no parallel computing code is used at all, and the
- behavior amounts to a simple python `for` loop. This mode is not
- compatible with `timeout`.
- None is a marker for 'unset' that will be interpreted as n_jobs=1
- unless the call is performed under a :func:`~parallel_config`
- context manager that sets another value for ``n_jobs``.
- If n_jobs = 0 then a ValueError is raised.
- verbose: int, default=0
- The verbosity level: if non zero, progress messages are
- printed. Above 50, the output is sent to stdout.
- The frequency of the messages increases with the verbosity level.
- If it more than 10, all iterations are reported.
- temp_folder: str or None, default=None
- Folder to be used by the pool for memmapping large arrays
- for sharing memory with worker processes. If None, this will try in
- order:
- - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment
- variable,
- - ``/dev/shm`` if the folder exists and is writable: this is a
- RAM disk filesystem available by default on modern Linux
- distributions,
- - the default system temporary folder that can be
- overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment
- variables, typically ``/tmp`` under Unix operating systems.
- max_nbytes: int, str, or None, optional, default='1M'
- Threshold on the size of arrays passed to the workers that
- triggers automated memory mapping in temp_folder. Can be an int
- in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
- Use None to disable memmapping of large arrays.
- mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
- Memmapping mode for numpy arrays passed to workers. None will
- disable memmapping, other modes defined in the numpy.memmap doc:
- https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
- Also, see 'max_nbytes' parameter documentation for more details.
- prefer: str in {'processes', 'threads'} or None, default=None
- Soft hint to choose the default backend.
- The default process-based backend is 'loky' and the default
- thread-based backend is 'threading'. Ignored if the ``backend``
- parameter is specified.
- require: 'sharedmem' or None, default=None
- Hard constraint to select the backend. If set to 'sharedmem',
- the selected backend will be single-host and thread-based.
- inner_max_num_threads: int, default=None
- If not None, overwrites the limit set on the number of threads
- usable in some third-party library threadpools like OpenBLAS,
- MKL or OpenMP. This is only used with the ``loky`` backend.
- backend_params: dict
- Additional parameters to pass to the backend constructor when
- backend is a string.
- Notes
- -----
- Joblib tries to limit the oversubscription by limiting the number of
- threads usable in some third-party library threadpools like OpenBLAS, MKL
- or OpenMP. The default limit in each worker is set to
- ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
- overwritten with the ``inner_max_num_threads`` argument which will be used
- to set this limit in the child processes.
- .. versionadded:: 1.3
- Examples
- --------
- >>> from operator import neg
- >>> with parallel_config(backend='threading'):
- ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
- ...
- [-1, -2, -3, -4, -5]
- To use the 'ray' joblib backend add the following lines:
- >>> from ray.util.joblib import register_ray # doctest: +SKIP
- >>> register_ray() # doctest: +SKIP
- >>> with parallel_config(backend="ray"): # doctest: +SKIP
- ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
- [-1, -2, -3, -4, -5]
- """
- def __init__(
- self,
- backend=default_parallel_config["backend"],
- *,
- n_jobs=default_parallel_config["n_jobs"],
- verbose=default_parallel_config["verbose"],
- temp_folder=default_parallel_config["temp_folder"],
- max_nbytes=default_parallel_config["max_nbytes"],
- mmap_mode=default_parallel_config["mmap_mode"],
- prefer=default_parallel_config["prefer"],
- require=default_parallel_config["require"],
- inner_max_num_threads=None,
- **backend_params,
- ):
- # Save the parallel info and set the active parallel config
- self.old_parallel_config = getattr(_backend, "config", default_parallel_config)
- backend = self._check_backend(backend, inner_max_num_threads, **backend_params)
- new_config = {
- "n_jobs": n_jobs,
- "verbose": verbose,
- "temp_folder": temp_folder,
- "max_nbytes": max_nbytes,
- "mmap_mode": mmap_mode,
- "prefer": prefer,
- "require": require,
- "backend": backend,
- }
- self.parallel_config = self.old_parallel_config.copy()
- self.parallel_config.update(
- {k: v for k, v in new_config.items() if not isinstance(v, _Sentinel)}
- )
- setattr(_backend, "config", self.parallel_config)
- def _check_backend(self, backend, inner_max_num_threads, **backend_params):
- if backend is default_parallel_config["backend"]:
- if inner_max_num_threads is not None or len(backend_params) > 0:
- raise ValueError(
- "inner_max_num_threads and other constructor "
- "parameters backend_params are only supported "
- "when backend is not None."
- )
- return backend
- if isinstance(backend, str):
- # Handle non-registered or missing backends
- if backend not in BACKENDS:
- if backend in EXTERNAL_BACKENDS:
- register = EXTERNAL_BACKENDS[backend]
- register()
- elif backend in MAYBE_AVAILABLE_BACKENDS:
- warnings.warn(
- f"joblib backend '{backend}' is not available on "
- f"your system, falling back to {DEFAULT_BACKEND}.",
- UserWarning,
- stacklevel=2,
- )
- BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
- else:
- raise ValueError(
- f"Invalid backend: {backend}, expected one of "
- f"{sorted(BACKENDS.keys())}"
- )
- backend = BACKENDS[backend](**backend_params)
- else:
- if len(backend_params) > 0:
- raise ValueError(
- "Constructor parameters backend_params are only "
- "supported when backend is a string."
- )
- if inner_max_num_threads is not None:
- msg = (
- f"{backend.__class__.__name__} does not accept setting the "
- "inner_max_num_threads argument."
- )
- assert backend.supports_inner_max_num_threads, msg
- backend.inner_max_num_threads = inner_max_num_threads
- # If the nesting_level of the backend is not set previously, use the
- # nesting level from the previous active_backend to set it
- if backend.nesting_level is None:
- parent_backend = self.old_parallel_config["backend"]
- if parent_backend is default_parallel_config["backend"]:
- nesting_level = 0
- else:
- nesting_level = parent_backend.nesting_level
- backend.nesting_level = nesting_level
- return backend
- def __enter__(self):
- return self.parallel_config
- def __exit__(self, type, value, traceback):
- self.unregister()
- def unregister(self):
- setattr(_backend, "config", self.old_parallel_config)
- class parallel_backend(parallel_config):
- """Change the default backend used by Parallel inside a with block.
- .. warning::
- It is advised to use the :class:`~joblib.parallel_config` context
- manager instead, which allows more fine-grained control over the
- backend configuration.
- If ``backend`` is a string it must match a previously registered
- implementation using the :func:`~register_parallel_backend` function.
- By default the following backends are available:
- - 'loky': single-host, process-based parallelism (used by default),
- - 'threading': single-host, thread-based parallelism,
- - 'multiprocessing': legacy single-host, process-based parallelism.
- 'loky' is recommended to run functions that manipulate Python objects.
- 'threading' is a low-overhead alternative that is most efficient for
- functions that release the Global Interpreter Lock: e.g. I/O-bound code or
- CPU-bound code in a few calls to native code that explicitly releases the
- GIL. Note that on some rare systems (such as Pyodide),
- multiprocessing and loky may not be available, in which case joblib
- defaults to threading.
- You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib
- backend to distribute work across machines. This works well with
- scikit-learn estimators with the ``n_jobs`` parameter, for example::
- >>> import joblib # doctest: +SKIP
- >>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP
- >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP
- >>> # create a local Dask cluster
- >>> cluster = LocalCluster() # doctest: +SKIP
- >>> client = Client(cluster) # doctest: +SKIP
- >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1)
- ... # doctest: +SKIP
- >>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP
- ... grid_search.fit(X, y)
- It is also possible to use the distributed 'ray' backend for distributing
- the workload to a cluster of nodes. To use the 'ray' joblib backend add
- the following lines::
- >>> from ray.util.joblib import register_ray # doctest: +SKIP
- >>> register_ray() # doctest: +SKIP
- >>> with parallel_backend("ray"): # doctest: +SKIP
- ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
- [-1, -2, -3, -4, -5]
- Alternatively the backend can be passed directly as an instance.
- By default all available workers will be used (``n_jobs=-1``) unless the
- caller passes an explicit value for the ``n_jobs`` parameter.
- This is an alternative to passing a ``backend='backend_name'`` argument to
- the :class:`~Parallel` class constructor. It is particularly useful when
- calling into library code that uses joblib internally but does not expose
- the backend argument in its own API.
- >>> from operator import neg
- >>> with parallel_backend('threading'):
- ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
- ...
- [-1, -2, -3, -4, -5]
- Joblib also tries to limit the oversubscription by limiting the number of
- threads usable in some third-party library threadpools like OpenBLAS, MKL
- or OpenMP. The default limit in each worker is set to
- ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
- overwritten with the ``inner_max_num_threads`` argument which will be used
- to set this limit in the child processes.
- .. versionadded:: 0.10
- See Also
- --------
- joblib.parallel_config: context manager to change the backend configuration.
- """
- def __init__(
- self, backend, n_jobs=-1, inner_max_num_threads=None, **backend_params
- ):
- super().__init__(
- backend=backend,
- n_jobs=n_jobs,
- inner_max_num_threads=inner_max_num_threads,
- **backend_params,
- )
- if self.old_parallel_config is None:
- self.old_backend_and_jobs = None
- else:
- self.old_backend_and_jobs = (
- self.old_parallel_config["backend"],
- self.old_parallel_config["n_jobs"],
- )
- self.new_backend_and_jobs = (
- self.parallel_config["backend"],
- self.parallel_config["n_jobs"],
- )
- def __enter__(self):
- return self.new_backend_and_jobs
- # Under Linux or OS X the default start method of multiprocessing
- # can cause third party libraries to crash. Under Python 3.4+ it is possible
- # to set an environment variable to switch the default start method from
- # 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost
- # of causing semantic changes and some additional pool instantiation overhead.
- DEFAULT_MP_CONTEXT = None
- if hasattr(mp, "get_context"):
- method = os.environ.get("JOBLIB_START_METHOD", "").strip() or None
- if method is not None:
- DEFAULT_MP_CONTEXT = mp.get_context(method=method)
- class BatchedCalls(object):
- """Wrap a sequence of (func, args, kwargs) tuples as a single callable"""
- def __init__(
- self, iterator_slice, backend_and_jobs, reducer_callback=None, pickle_cache=None
- ):
- self.items = list(iterator_slice)
- self._size = len(self.items)
- self._reducer_callback = reducer_callback
- if isinstance(backend_and_jobs, tuple):
- self._backend, self._n_jobs = backend_and_jobs
- else:
- # this is for backward compatibility purposes. Before 0.12.6,
- # nested backends were returned without n_jobs indications.
- self._backend, self._n_jobs = backend_and_jobs, None
- self._pickle_cache = pickle_cache if pickle_cache is not None else {}
- def __call__(self):
- # Set the default nested backend to self._backend but do not set the
- # change the default number of processes to -1
- with parallel_config(backend=self._backend, n_jobs=self._n_jobs):
- return [func(*args, **kwargs) for func, args, kwargs in self.items]
- def __reduce__(self):
- if self._reducer_callback is not None:
- self._reducer_callback()
- # no need to pickle the callback.
- return (
- BatchedCalls,
- (self.items, (self._backend, self._n_jobs), None, self._pickle_cache),
- )
- def __len__(self):
- return self._size
- # Possible exit status for a task
- TASK_DONE = "Done"
- TASK_ERROR = "Error"
- TASK_PENDING = "Pending"
- ###############################################################################
- # CPU count that works also when multiprocessing has been disabled via
- # the JOBLIB_MULTIPROCESSING environment variable
- def cpu_count(only_physical_cores=False):
- """Return the number of CPUs.
- This delegates to loky.cpu_count that takes into account additional
- constraints such as Linux CFS scheduler quotas (typically set by container
- runtimes such as docker) and CPU affinity (for instance using the taskset
- command on Linux).
- Parameters
- ----------
- only_physical_cores : boolean, default=False
- If True, does not take hyperthreading / SMT logical cores into account.
- """
- if mp is None:
- return 1
- return loky.cpu_count(only_physical_cores=only_physical_cores)
- ###############################################################################
- # For verbosity
- def _verbosity_filter(index, verbose):
- """Returns False for indices increasingly apart, the distance
- depending on the value of verbose.
- We use a lag increasing as the square of index
- """
- if not verbose:
- return True
- elif verbose > 10:
- return False
- if index == 0:
- return False
- verbose = 0.5 * (11 - verbose) ** 2
- scale = sqrt(index / verbose)
- next_scale = sqrt((index + 1) / verbose)
- return int(next_scale) == int(scale)
- ###############################################################################
- def delayed(function):
- """Decorator used to capture the arguments of a function."""
- def delayed_function(*args, **kwargs):
- return function, args, kwargs
- try:
- delayed_function = functools.wraps(function)(delayed_function)
- except AttributeError:
- " functools.wraps fails on some callable objects "
- return delayed_function
- ###############################################################################
- class BatchCompletionCallBack(object):
- """Callback to keep track of completed results and schedule the next tasks.
- This callable is executed by the parent process whenever a worker process
- has completed a batch of tasks.
- It is used for progress reporting, to update estimate of the batch
- processing duration and to schedule the next batch of tasks to be
- processed.
- It is assumed that this callback will always be triggered by the backend
- right after the end of a task, in case of success as well as in case of
- failure.
- """
- ##########################################################################
- # METHODS CALLED BY THE MAIN THREAD #
- ##########################################################################
- def __init__(self, dispatch_timestamp, batch_size, parallel):
- self.dispatch_timestamp = dispatch_timestamp
- self.batch_size = batch_size
- self.parallel = parallel
- self.parallel_call_id = parallel._call_id
- self._completion_timeout_counter = None
- # Internals to keep track of the status and outcome of the task.
- # Used to hold a reference to the future-like object returned by the
- # backend after launching this task
- # This will be set later when calling `register_job`, as it is only
- # created once the task has been submitted.
- self.job = None
- if not parallel._backend.supports_retrieve_callback:
- # The status is only used for asynchronous result retrieval in the
- # callback.
- self.status = None
- else:
- # The initial status for the job is TASK_PENDING.
- # Once it is done, it will be either TASK_DONE, or TASK_ERROR.
- self.status = TASK_PENDING
- def register_job(self, job):
- """Register the object returned by `submit`."""
- self.job = job
- def get_result(self, timeout):
- """Returns the raw result of the task that was submitted.
- If the task raised an exception rather than returning, this same
- exception will be raised instead.
- If the backend supports the retrieval callback, it is assumed that this
- method is only called after the result has been registered. It is
- ensured by checking that `self.status(timeout)` does not return
- TASK_PENDING. In this case, `get_result` directly returns the
- registered result (or raise the registered exception).
- For other backends, there are no such assumptions, but `get_result`
- still needs to synchronously retrieve the result before it can
- return it or raise. It will block at most `self.timeout` seconds
- waiting for retrieval to complete, after that it raises a TimeoutError.
- """
- backend = self.parallel._backend
- if backend.supports_retrieve_callback:
- # We assume that the result has already been retrieved by the
- # callback thread, and is stored internally. It's just waiting to
- # be returned.
- return self._return_or_raise()
- # For other backends, the main thread needs to run the retrieval step.
- try:
- result = backend.retrieve_result(self.job, timeout=timeout)
- outcome = dict(result=result, status=TASK_DONE)
- except BaseException as e:
- outcome = dict(result=e, status=TASK_ERROR)
- self._register_outcome(outcome)
- return self._return_or_raise()
- def _return_or_raise(self):
- try:
- if self.status == TASK_ERROR:
- raise self._result
- return self._result
- finally:
- del self._result
- def get_status(self, timeout):
- """Get the status of the task.
- This function also checks if the timeout has been reached and register
- the TimeoutError outcome when it is the case.
- """
- if timeout is None or self.status != TASK_PENDING:
- return self.status
- # The computation are running and the status is pending.
- # Check that we did not wait for this jobs more than `timeout`.
- now = time.time()
- if self._completion_timeout_counter is None:
- self._completion_timeout_counter = now
- if (now - self._completion_timeout_counter) > timeout:
- outcome = dict(result=TimeoutError(), status=TASK_ERROR)
- self._register_outcome(outcome)
- return self.status
- ##########################################################################
- # METHODS CALLED BY CALLBACK THREADS #
- ##########################################################################
- def __call__(self, *args, **kwargs):
- """Function called by the callback thread after a job is completed."""
- # If the backend doesn't support callback retrievals, the next batch of
- # tasks is dispatched regardless. The result will be retrieved by the
- # main thread when calling `get_result`.
- if not self.parallel._backend.supports_retrieve_callback:
- self._dispatch_new()
- return
- # If the backend supports retrieving the result in the callback, it
- # registers the task outcome (TASK_ERROR or TASK_DONE), and schedules
- # the next batch if needed.
- with self.parallel._lock:
- # Edge case where while the task was processing, the `parallel`
- # instance has been reset and a new call has been issued, but the
- # worker managed to complete the task and trigger this callback
- # call just before being aborted by the reset.
- if self.parallel._call_id != self.parallel_call_id:
- return
- # When aborting, stop as fast as possible and do not retrieve the
- # result as it won't be returned by the Parallel call.
- if self.parallel._aborting:
- return
- # Retrieves the result of the task in the main process and dispatch
- # a new batch if needed.
- job_succeeded = self._retrieve_result(*args, **kwargs)
- if job_succeeded:
- self._dispatch_new()
- def _dispatch_new(self):
- """Schedule the next batch of tasks to be processed."""
- # This steps ensure that auto-batching works as expected.
- this_batch_duration = time.time() - self.dispatch_timestamp
- self.parallel._backend.batch_completed(self.batch_size, this_batch_duration)
- # Schedule the next batch of tasks.
- with self.parallel._lock:
- self.parallel.n_completed_tasks += self.batch_size
- self.parallel.print_progress()
- if self.parallel._original_iterator is not None:
- self.parallel.dispatch_next()
- def _retrieve_result(self, out):
- """Fetch and register the outcome of a task.
- Return True if the task succeeded, False otherwise.
- This function is only called by backends that support retrieving
- the task result in the callback thread.
- """
- try:
- result = self.parallel._backend.retrieve_result_callback(out)
- outcome = dict(status=TASK_DONE, result=result)
- except BaseException as e:
- # Avoid keeping references to parallel in the error.
- e.__traceback__ = None
- outcome = dict(result=e, status=TASK_ERROR)
- self._register_outcome(outcome)
- return outcome["status"] != TASK_ERROR
- ##########################################################################
- # This method can be called either in the main thread #
- # or in the callback thread. #
- ##########################################################################
- def _register_outcome(self, outcome):
- """Register the outcome of a task.
- This method can be called only once, future calls will be ignored.
- """
- # Covers the edge case where the main thread tries to register a
- # `TimeoutError` while the callback thread tries to register a result
- # at the same time.
- with self.parallel._lock:
- if self.status not in (TASK_PENDING, None):
- return
- self.status = outcome["status"]
- self._result = outcome["result"]
- # Once the result and the status are extracted, the last reference to
- # the job can be deleted.
- self.job = None
- # As soon as an error as been spotted, early stopping flags are sent to
- # the `parallel` instance.
- if self.status == TASK_ERROR:
- self.parallel._exception = True
- self.parallel._aborting = True
- if self.parallel.return_ordered:
- return
- with self.parallel._lock:
- # For `return_as=generator_unordered`, append the job to the queue
- # in the order of completion instead of submission.
- self.parallel._jobs.append(self)
- ###############################################################################
- def register_parallel_backend(name, factory, make_default=False):
- """Register a new Parallel backend factory.
- The new backend can then be selected by passing its name as the backend
- argument to the :class:`~Parallel` class. Moreover, the default backend can
- be overwritten globally by setting make_default=True.
- The factory can be any callable that takes no argument and return an
- instance of ``ParallelBackendBase``.
- Warning: this function is experimental and subject to change in a future
- version of joblib.
- .. versionadded:: 0.10
- """
- BACKENDS[name] = factory
- if make_default:
- global DEFAULT_BACKEND
- DEFAULT_BACKEND = name
- def effective_n_jobs(n_jobs=-1):
- """Determine the number of jobs that can actually run in parallel
- n_jobs is the number of workers requested by the callers. Passing n_jobs=-1
- means requesting all available workers for instance matching the number of
- CPU cores on the worker host(s).
- This method should return a guesstimate of the number of workers that can
- actually perform work concurrently with the currently enabled default
- backend. The primary use case is to make it possible for the caller to know
- in how many chunks to slice the work.
- In general working on larger data chunks is more efficient (less scheduling
- overhead and better use of CPU cache prefetching heuristics) as long as all
- the workers have enough work to do.
- Warning: this function is experimental and subject to change in a future
- version of joblib.
- .. versionadded:: 0.10
- """
- if n_jobs == 1:
- return 1
- backend, backend_n_jobs = get_active_backend()
- if n_jobs is None:
- n_jobs = backend_n_jobs
- return backend.effective_n_jobs(n_jobs=n_jobs)
- ###############################################################################
- class Parallel(Logger):
- """Helper class for readable parallel mapping.
- Read more in the :ref:`User Guide <parallel>`.
- Parameters
- ----------
- n_jobs: int, default=None
- The maximum number of concurrently running jobs, such as the number
- of Python worker processes when ``backend="loky"`` or the size of
- the thread-pool when ``backend="threading"``.
- This argument is converted to an integer, rounded below for float.
- If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
- ``n_cpus`` is obtained with :func:`~cpu_count`.
- For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
- using ``n_jobs=-2`` will result in all CPUs but one being used.
- This argument can also go above ``n_cpus``, which will cause
- oversubscription. In some cases, slight oversubscription can be
- beneficial, e.g., for tasks with large I/O operations.
- If 1 is given, no parallel computing code is used at all, and the
- behavior amounts to a simple python `for` loop. This mode is not
- compatible with ``timeout``.
- None is a marker for 'unset' that will be interpreted as n_jobs=1
- unless the call is performed under a :func:`~parallel_config`
- context manager that sets another value for ``n_jobs``.
- If n_jobs = 0 then a ValueError is raised.
- backend: str, ParallelBackendBase instance or None, default='loky'
- Specify the parallelization backend implementation.
- Supported backends are:
- - "loky" used by default, can induce some
- communication and memory overhead when exchanging input and
- output data with the worker Python processes. On some rare
- systems (such as Pyiodide), the loky backend may not be
- available.
- - "multiprocessing" previous process-based backend based on
- `multiprocessing.Pool`. Less robust than `loky`.
- - "threading" is a very low-overhead backend but it suffers
- from the Python Global Interpreter Lock if the called function
- relies a lot on Python objects. "threading" is mostly useful
- when the execution bottleneck is a compiled extension that
- explicitly releases the GIL (for instance a Cython loop wrapped
- in a "with nogil" block or an expensive call to a library such
- as NumPy).
- - finally, you can register backends by calling
- :func:`~register_parallel_backend`. This will allow you to
- implement a backend of your liking.
- It is not recommended to hard-code the backend name in a call to
- :class:`~Parallel` in a library. Instead it is recommended to set
- soft hints (prefer) or hard constraints (require) so as to make it
- possible for library users to change the backend from the outside
- using the :func:`~parallel_config` context manager.
- return_as: str in {'list', 'generator', 'generator_unordered'}, default='list'
- If 'list', calls to this instance will return a list, only when
- all results have been processed and retrieved.
- If 'generator', it will return a generator that yields the results
- as soon as they are available, in the order the tasks have been
- submitted with.
- If 'generator_unordered', the generator will immediately yield
- available results independently of the submission order. The output
- order is not deterministic in this case because it depends on the
- concurrency of the workers.
- prefer: str in {'processes', 'threads'} or None, default=None
- Soft hint to choose the default backend if no specific backend
- was selected with the :func:`~parallel_config` context manager.
- The default process-based backend is 'loky' and the default
- thread-based backend is 'threading'. Ignored if the ``backend``
- parameter is specified.
- require: 'sharedmem' or None, default=None
- Hard constraint to select the backend. If set to 'sharedmem',
- the selected backend will be single-host and thread-based even
- if the user asked for a non-thread based backend with
- :func:`~joblib.parallel_config`.
- verbose: int, default=0
- The verbosity level: if non zero, progress messages are
- printed. Above 50, the output is sent to stdout.
- The frequency of the messages increases with the verbosity level.
- If it more than 10, all iterations are reported.
- timeout: float or None, default=None
- Timeout limit for each task to complete. If any task takes longer
- a TimeOutError will be raised. Only applied when n_jobs != 1
- pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}, default='2*n_jobs'
- The number of batches (of tasks) to be pre-dispatched.
- Default is '2*n_jobs'. When batch_size="auto" this is reasonable
- default and the workers should never starve. Note that only basic
- arithmetic are allowed here and no modules can be used in this
- expression.
- batch_size: int or 'auto', default='auto'
- The number of atomic tasks to dispatch at once to each
- worker. When individual evaluations are very fast, dispatching
- calls to workers can be slower than sequential computation because
- of the overhead. Batching fast computations together can mitigate
- this.
- The ``'auto'`` strategy keeps track of the time it takes for a
- batch to complete, and dynamically adjusts the batch size to keep
- the time on the order of half a second, using a heuristic. The
- initial batch size is 1.
- ``batch_size="auto"`` with ``backend="threading"`` will dispatch
- batches of a single task at a time as the threading backend has
- very little overhead and using larger batch size has not proved to
- bring any gain in that case.
- temp_folder: str or None, default=None
- Folder to be used by the pool for memmapping large arrays
- for sharing memory with worker processes. If None, this will try in
- order:
- - a folder pointed by the JOBLIB_TEMP_FOLDER environment
- variable,
- - /dev/shm if the folder exists and is writable: this is a
- RAM disk filesystem available by default on modern Linux
- distributions,
- - the default system temporary folder that can be
- overridden with TMP, TMPDIR or TEMP environment
- variables, typically /tmp under Unix operating systems.
- Only active when ``backend="loky"`` or ``"multiprocessing"``.
- max_nbytes int, str, or None, optional, default='1M'
- Threshold on the size of arrays passed to the workers that
- triggers automated memory mapping in temp_folder. Can be an int
- in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
- Use None to disable memmapping of large arrays.
- Only active when ``backend="loky"`` or ``"multiprocessing"``.
- mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
- Memmapping mode for numpy arrays passed to workers. None will
- disable memmapping, other modes defined in the numpy.memmap doc:
- https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
- Also, see 'max_nbytes' parameter documentation for more details.
- backend_kwargs: dict, optional
- Additional parameters to pass to the backend `configure` method.
- Notes
- -----
- This object uses workers to compute in parallel the application of a
- function to many different arguments. The main functionality it brings
- in addition to using the raw multiprocessing or concurrent.futures API
- are (see examples for details):
- * More readable code, in particular since it avoids
- constructing list of arguments.
- * Easier debugging:
- - informative tracebacks even when the error happens on
- the client side
- - using 'n_jobs=1' enables to turn off parallel computing
- for debugging without changing the codepath
- - early capture of pickling errors
- * An optional progress meter.
- * Interruption of multiprocesses jobs with 'Ctrl-C'
- * Flexible pickling control for the communication to and from
- the worker processes.
- * Ability to use shared memory efficiently with worker
- processes for large numpy-based datastructures.
- Note that the intended usage is to run one call at a time. Multiple
- calls to the same Parallel object will result in a ``RuntimeError``
- Examples
- --------
- A simple example:
- >>> from math import sqrt
- >>> from joblib import Parallel, delayed
- >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
- [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
- Reshaping the output when the function has several return
- values:
- >>> from math import modf
- >>> from joblib import Parallel, delayed
- >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10))
- >>> res, i = zip(*r)
- >>> res
- (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5)
- >>> i
- (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0)
- The progress meter: the higher the value of `verbose`, the more
- messages:
- >>> from time import sleep
- >>> from joblib import Parallel, delayed
- >>> r = Parallel(n_jobs=2, verbose=10)(
- ... delayed(sleep)(.2) for _ in range(10)) #doctest: +SKIP
- [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.6s
- [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.8s
- [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 1.4s finished
- Traceback example, note how the line of the error is indicated
- as well as the values of the parameter passed to the function that
- triggered the exception, even though the traceback happens in the
- child process:
- >>> from heapq import nlargest
- >>> from joblib import Parallel, delayed
- >>> Parallel(n_jobs=2)(
- ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3))
- ... # doctest: +SKIP
- -----------------------------------------------------------------------
- Sub-process traceback:
- -----------------------------------------------------------------------
- TypeError Mon Nov 12 11:37:46 2012
- PID: 12934 Python 2.7.3: /usr/bin/python
- ........................................................................
- /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None)
- 419 if n >= size:
- 420 return sorted(iterable, key=key, reverse=True)[:n]
- 421
- 422 # When key is none, use simpler decoration
- 423 if key is None:
- --> 424 it = izip(iterable, count(0,-1)) # decorate
- 425 result = _nlargest(n, it)
- 426 return map(itemgetter(0), result) # undecorate
- 427
- 428 # General case, slowest method
- TypeError: izip argument #1 must support iteration
- _______________________________________________________________________
- Using pre_dispatch in a producer/consumer situation, where the
- data is generated on the fly. Note how the producer is first
- called 3 times before the parallel loop is initiated, and then
- called to generate new data on the fly:
- >>> from math import sqrt
- >>> from joblib import Parallel, delayed
- >>> def producer():
- ... for i in range(6):
- ... print('Produced %s' % i)
- ... yield i
- >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
- ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP
- Produced 0
- Produced 1
- Produced 2
- [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s
- Produced 3
- [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s
- Produced 4
- [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s
- Produced 5
- [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s
- [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s remaining: 0.0s
- [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished
- """ # noqa: E501
- def __init__(
- self,
- n_jobs=default_parallel_config["n_jobs"],
- backend=default_parallel_config["backend"],
- return_as="list",
- verbose=default_parallel_config["verbose"],
- timeout=None,
- pre_dispatch="2 * n_jobs",
- batch_size="auto",
- temp_folder=default_parallel_config["temp_folder"],
- max_nbytes=default_parallel_config["max_nbytes"],
- mmap_mode=default_parallel_config["mmap_mode"],
- prefer=default_parallel_config["prefer"],
- require=default_parallel_config["require"],
- **backend_kwargs,
- ):
- # Initiate parent Logger class state
- super().__init__()
- # Interpret n_jobs=None as 'unset'
- if n_jobs is None:
- n_jobs = default_parallel_config["n_jobs"]
- active_backend, context_config = _get_active_backend(
- prefer=prefer, require=require, verbose=verbose
- )
- nesting_level = active_backend.nesting_level
- self.verbose = _get_config_param(verbose, context_config, "verbose")
- self.timeout = timeout
- self.pre_dispatch = pre_dispatch
- if return_as not in {"list", "generator", "generator_unordered"}:
- raise ValueError(
- 'Expected `return_as` parameter to be a string equal to "list"'
- f',"generator" or "generator_unordered", but got {return_as} '
- "instead."
- )
- self.return_as = return_as
- self.return_generator = return_as != "list"
- self.return_ordered = return_as != "generator_unordered"
- # Check if we are under a parallel_config or parallel_backend
- # context manager and use the config from the context manager
- # for arguments that are not explicitly set.
- self._backend_kwargs = {
- **backend_kwargs,
- **{
- k: _get_config_param(param, context_config, k)
- for param, k in [
- (max_nbytes, "max_nbytes"),
- (temp_folder, "temp_folder"),
- (mmap_mode, "mmap_mode"),
- (prefer, "prefer"),
- (require, "require"),
- (verbose, "verbose"),
- ]
- },
- }
- if isinstance(self._backend_kwargs["max_nbytes"], str):
- self._backend_kwargs["max_nbytes"] = memstr_to_bytes(
- self._backend_kwargs["max_nbytes"]
- )
- self._backend_kwargs["verbose"] = max(0, self._backend_kwargs["verbose"] - 50)
- if DEFAULT_MP_CONTEXT is not None:
- self._backend_kwargs["context"] = DEFAULT_MP_CONTEXT
- elif hasattr(mp, "get_context"):
- self._backend_kwargs["context"] = mp.get_context()
- if backend is default_parallel_config["backend"] or backend is None:
- backend = active_backend
- elif isinstance(backend, ParallelBackendBase):
- # Use provided backend as is, with the current nesting_level if it
- # is not set yet.
- if backend.nesting_level is None:
- backend.nesting_level = nesting_level
- elif hasattr(backend, "Pool") and hasattr(backend, "Lock"):
- # Make it possible to pass a custom multiprocessing context as
- # backend to change the start method to forkserver or spawn or
- # preload modules on the forkserver helper process.
- self._backend_kwargs["context"] = backend
- backend = MultiprocessingBackend(nesting_level=nesting_level)
- elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS:
- warnings.warn(
- f"joblib backend '{backend}' is not available on "
- f"your system, falling back to {DEFAULT_BACKEND}.",
- UserWarning,
- stacklevel=2,
- )
- BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
- backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level)
- else:
- try:
- backend_factory = BACKENDS[backend]
- except KeyError as e:
- raise ValueError(
- "Invalid backend: %s, expected one of %r"
- % (backend, sorted(BACKENDS.keys()))
- ) from e
- backend = backend_factory(nesting_level=nesting_level)
- n_jobs = _get_config_param(n_jobs, context_config, "n_jobs")
- if n_jobs is None:
- # No specific context override and no specific value request:
- # default to the default of the backend.
- n_jobs = backend.default_n_jobs
- try:
- n_jobs = int(n_jobs)
- except ValueError:
- raise ValueError("n_jobs could not be converted to int")
- self.n_jobs = n_jobs
- if require == "sharedmem" and not getattr(backend, "supports_sharedmem", False):
- raise ValueError("Backend %s does not support shared memory" % backend)
- if batch_size == "auto" or isinstance(batch_size, Integral) and batch_size > 0:
- self.batch_size = batch_size
- else:
- raise ValueError(
- "batch_size must be 'auto' or a positive integer, got: %r" % batch_size
- )
- if not isinstance(backend, SequentialBackend):
- if self.return_generator and not backend.supports_return_generator:
- raise ValueError(
- "Backend {} does not support return_as={}".format(
- backend, return_as
- )
- )
- # This lock is used to coordinate the main thread of this process
- # with the async callback thread of our the pool.
- self._lock = threading.RLock()
- self._jobs = collections.deque()
- self._jobs_set = set()
- self._pending_outputs = list()
- self._ready_batches = queue.Queue()
- self._reducer_callback = None
- # Internal variables
- self._backend = backend
- self._running = False
- self._managed_backend = False
- self._id = uuid4().hex
- self._call_ref = None
- def __enter__(self):
- self._managed_backend = True
- self._calling = False
- self._initialize_backend()
- return self
- def __exit__(self, exc_type, exc_value, traceback):
- self._managed_backend = False
- if self.return_generator and self._calling:
- self._abort()
- self._terminate_and_reset()
- def _initialize_backend(self):
- """Build a process or thread pool and return the number of workers"""
- try:
- n_jobs = self._backend.configure(
- n_jobs=self.n_jobs, parallel=self, **self._backend_kwargs
- )
- if self.timeout is not None and not self._backend.supports_timeout:
- warnings.warn(
- "The backend class {!r} does not support timeout. "
- "You have set 'timeout={}' in Parallel but "
- "the 'timeout' parameter will not be used.".format(
- self._backend.__class__.__name__, self.timeout
- )
- )
- except FallbackToBackend as e:
- # Recursively initialize the backend in case of requested fallback.
- self._backend = e.backend
- n_jobs = self._initialize_backend()
- return n_jobs
- def _effective_n_jobs(self):
- if self._backend:
- return self._backend.effective_n_jobs(self.n_jobs)
- return 1
- def _terminate_and_reset(self):
- if hasattr(self._backend, "stop_call") and self._calling:
- self._backend.stop_call()
- self._calling = False
- if not self._managed_backend:
- self._backend.terminate()
- def _dispatch(self, batch):
- """Queue the batch for computing, with or without multiprocessing
- WARNING: this method is not thread-safe: it should be only called
- indirectly via dispatch_one_batch.
- """
- # If job.get() catches an exception, it closes the queue:
- if self._aborting:
- return
- batch_size = len(batch)
- self.n_dispatched_tasks += batch_size
- self.n_dispatched_batches += 1
- dispatch_timestamp = time.time()
- batch_tracker = BatchCompletionCallBack(dispatch_timestamp, batch_size, self)
- self._register_new_job(batch_tracker)
- # If return_ordered is False, the batch_tracker is not stored in the
- # jobs queue at the time of submission. Instead, it will be appended to
- # the queue by itself as soon as the callback is triggered to be able
- # to return the results in the order of completion.
- job = self._backend.submit(batch, callback=batch_tracker)
- batch_tracker.register_job(job)
- def _register_new_job(self, batch_tracker):
- if self.return_ordered:
- self._jobs.append(batch_tracker)
- else:
- self._jobs_set.add(batch_tracker)
- def dispatch_next(self):
- """Dispatch more data for parallel processing
- This method is meant to be called concurrently by the multiprocessing
- callback. We rely on the thread-safety of dispatch_one_batch to protect
- against concurrent consumption of the unprotected iterator.
- """
- if not self.dispatch_one_batch(self._original_iterator):
- self._iterating = False
- self._original_iterator = None
- def dispatch_one_batch(self, iterator):
- """Prefetch the tasks for the next batch and dispatch them.
- The effective size of the batch is computed here.
- If there are no more jobs to dispatch, return False, else return True.
- The iterator consumption and dispatching is protected by the same
- lock so calling this function should be thread safe.
- """
- if self._aborting:
- return False
- batch_size = self._get_batch_size()
- with self._lock:
- # to ensure an even distribution of the workload between workers,
- # we look ahead in the original iterators more than batch_size
- # tasks - However, we keep consuming only one batch at each
- # dispatch_one_batch call. The extra tasks are stored in a local
- # queue, _ready_batches, that is looked-up prior to re-consuming
- # tasks from the origal iterator.
- try:
- tasks = self._ready_batches.get(block=False)
- except queue.Empty:
- # slice the iterator n_jobs * batchsize items at a time. If the
- # slice returns less than that, then the current batchsize puts
- # too much weight on a subset of workers, while other may end
- # up starving. So in this case, re-scale the batch size
- # accordingly to distribute evenly the last items between all
- # workers.
- n_jobs = self._cached_effective_n_jobs
- big_batch_size = batch_size * n_jobs
- try:
- islice = list(itertools.islice(iterator, big_batch_size))
- except Exception as e:
- # Handle the fact that the generator of task raised an
- # exception. As this part of the code can be executed in
- # a thread internal to the backend, register a task with
- # an error that will be raised in the user's thread.
- if isinstance(e.__context__, queue.Empty):
- # Suppress the cause of the exception if it is
- # queue.Empty to avoid cluttered traceback. Only do it
- # if the __context__ is really empty to avoid messing
- # with causes of the original error.
- e.__cause__ = None
- batch_tracker = BatchCompletionCallBack(0, batch_size, self)
- self._register_new_job(batch_tracker)
- batch_tracker._register_outcome(dict(result=e, status=TASK_ERROR))
- return True
- if len(islice) == 0:
- return False
- elif (
- iterator is self._original_iterator and len(islice) < big_batch_size
- ):
- # We reached the end of the original iterator (unless
- # iterator is the ``pre_dispatch``-long initial slice of
- # the original iterator) -- decrease the batch size to
- # account for potential variance in the batches running
- # time.
- final_batch_size = max(1, len(islice) // (10 * n_jobs))
- else:
- final_batch_size = max(1, len(islice) // n_jobs)
- # enqueue n_jobs batches in a local queue
- for i in range(0, len(islice), final_batch_size):
- tasks = BatchedCalls(
- islice[i : i + final_batch_size],
- self._backend.get_nested_backend(),
- self._reducer_callback,
- self._pickle_cache,
- )
- self._ready_batches.put(tasks)
- # finally, get one task.
- tasks = self._ready_batches.get(block=False)
- if len(tasks) == 0:
- # No more tasks available in the iterator: tell caller to stop.
- return False
- else:
- self._dispatch(tasks)
- return True
- def _get_batch_size(self):
- """Returns the effective batch size for dispatch"""
- if self.batch_size == "auto":
- return self._backend.compute_batch_size()
- else:
- # Fixed batch size strategy
- return self.batch_size
- def _print(self, msg):
- """Display the message on stout or stderr depending on verbosity"""
- # XXX: Not using the logger framework: need to
- # learn to use logger better.
- if not self.verbose:
- return
- if self.verbose < 50:
- writer = sys.stderr.write
- else:
- writer = sys.stdout.write
- writer(f"[{self}]: {msg}\n")
- def _is_completed(self):
- """Check if all tasks have been completed"""
- return self.n_completed_tasks == self.n_dispatched_tasks and not (
- self._iterating or self._aborting
- )
- def print_progress(self):
- """Display the process of the parallel execution only a fraction
- of time, controlled by self.verbose.
- """
- if not self.verbose:
- return
- if self.n_tasks is not None and self.n_tasks > 0:
- width = floor(log10(self.n_tasks)) + 1
- else:
- width = 3
- elapsed_time = time.time() - self._start_time
- if self._is_completed():
- # Make sure that we get a last message telling us we are done
- self._print(
- f"Done {self.n_completed_tasks:{width}d} out of "
- f"{self.n_completed_tasks:{width}d} | elapsed: "
- f"{short_format_time(elapsed_time)} finished"
- )
- return
- # Original job iterator becomes None once it has been fully
- # consumed: at this point we know the total number of jobs and we are
- # able to display an estimation of the remaining time based on already
- # completed jobs. Otherwise, we simply display the number of completed
- # tasks.
- elif self._original_iterator is not None:
- if _verbosity_filter(self.n_dispatched_batches, self.verbose):
- return
- fmt_time = f"| elapsed: {short_format_time(elapsed_time)}"
- index = self.n_completed_tasks
- if self.n_tasks is not None:
- self._print(
- f"Done {index:{width}d} out of {self.n_tasks:{width}d} {fmt_time}"
- )
- else:
- pad = " " * (len("out of ") + width - len("tasks"))
- self._print(f"Done {index:{width}d} tasks {pad}{fmt_time}")
- else:
- index = self.n_completed_tasks
- # We are finished dispatching
- total_tasks = self.n_dispatched_tasks
- # We always display the first loop
- if index != 0:
- # Display depending on the number of remaining items
- # A message as soon as we finish dispatching, cursor is 0
- cursor = total_tasks - index + 1 - self._pre_dispatch_amount
- frequency = (total_tasks // self.verbose) + 1
- is_last_item = index + 1 == total_tasks
- if is_last_item or cursor % frequency:
- return
- remaining_time = (elapsed_time / max(index, 1)) * (
- self.n_dispatched_tasks - index
- )
- # only display status if remaining time is greater or equal to 0
- self._print(
- f"Done {index:{width}d} out of {total_tasks:{width}d} "
- f"| elapsed: {short_format_time(elapsed_time)} remaining: "
- f"{short_format_time(remaining_time)}"
- )
- def _abort(self):
- # Stop dispatching new jobs in the async callback thread
- self._aborting = True
- # If the backend allows it, cancel or kill remaining running
- # tasks without waiting for the results as we will raise
- # the exception we got back to the caller instead of returning
- # any result.
- backend = self._backend
- if not self._aborted and hasattr(backend, "abort_everything"):
- # If the backend is managed externally we need to make sure
- # to leave it in a working state to allow for future jobs
- # scheduling.
- ensure_ready = self._managed_backend
- backend.abort_everything(ensure_ready=ensure_ready)
- self._aborted = True
- def _start(self, iterator, pre_dispatch):
- # Only set self._iterating to True if at least a batch
- # was dispatched. In particular this covers the edge
- # case of Parallel used with an exhausted iterator. If
- # self._original_iterator is None, then this means either
- # that pre_dispatch == "all", n_jobs == 1 or that the first batch
- # was very quick and its callback already dispatched all the
- # remaining jobs.
- self._iterating = False
- if self.dispatch_one_batch(iterator):
- self._iterating = self._original_iterator is not None
- while self.dispatch_one_batch(iterator):
- pass
- if pre_dispatch == "all":
- # The iterable was consumed all at once by the above for loop.
- # No need to wait for async callbacks to trigger to
- # consumption.
- self._iterating = False
- def _get_outputs(self, iterator, pre_dispatch):
- """Iterator returning the tasks' output as soon as they are ready."""
- dispatch_thread_id = threading.get_ident()
- detach_generator_exit = False
- try:
- self._start(iterator, pre_dispatch)
- # first yield returns None, for internal use only. This ensures
- # that we enter the try/except block and start dispatching the
- # tasks.
- yield
- with self._backend.retrieval_context():
- yield from self._retrieve()
- except GeneratorExit:
- # The generator has been garbage collected before being fully
- # consumed. This aborts the remaining tasks if possible and warn
- # the user if necessary.
- self._exception = True
- # In some interpreters such as PyPy, GeneratorExit can be raised in
- # a different thread than the one used to start the dispatch of the
- # parallel tasks. This can lead to hang when a thread attempts to
- # join itself. As workaround, we detach the execution of the
- # aborting code to a dedicated thread. We then need to make sure
- # the rest of the function does not call `_terminate_and_reset`
- # in finally.
- if dispatch_thread_id != threading.get_ident():
- warnings.warn(
- "A generator produced by joblib.Parallel has been "
- "gc'ed in an unexpected thread. This behavior should "
- "not cause major -issues but to make sure, please "
- "report this warning and your use case at "
- "https://github.com/joblib/joblib/issues so it can "
- "be investigated."
- )
- detach_generator_exit = True
- _parallel = self
- class _GeneratorExitThread(threading.Thread):
- def run(self):
- _parallel._abort()
- if _parallel.return_generator:
- _parallel._warn_exit_early()
- _parallel._terminate_and_reset()
- _GeneratorExitThread(name="GeneratorExitThread").start()
- return
- # Otherwise, we are in the thread that started the dispatch: we can
- # safely abort the execution and warn the user.
- self._abort()
- if self.return_generator:
- self._warn_exit_early()
- raise
- # Note: we catch any BaseException instead of just Exception instances
- # to also include KeyboardInterrupt
- except BaseException:
- self._exception = True
- self._abort()
- raise
- finally:
- # Store the unconsumed tasks and terminate the workers if necessary
- _remaining_outputs = [] if self._exception else self._jobs
- self._jobs = collections.deque()
- self._jobs_set = set()
- self._running = False
- if not detach_generator_exit:
- self._terminate_and_reset()
- while len(_remaining_outputs) > 0:
- batched_results = _remaining_outputs.popleft()
- batched_results = batched_results.get_result(self.timeout)
- for result in batched_results:
- yield result
- def _wait_retrieval(self):
- """Return True if we need to continue retrieving some tasks."""
- # If the input load is still being iterated over, it means that tasks
- # are still on the dispatch waitlist and their results will need to
- # be retrieved later on.
- if self._iterating:
- return True
- # If some of the dispatched tasks are still being processed by the
- # workers, wait for the compute to finish before starting retrieval
- if self.n_completed_tasks < self.n_dispatched_tasks:
- return True
- # For backends that does not support retrieving asynchronously the
- # result to the main process, all results must be carefully retrieved
- # in the _retrieve loop in the main thread while the backend is alive.
- # For other backends, the actual retrieval is done asynchronously in
- # the callback thread, and we can terminate the backend before the
- # `self._jobs` result list has been emptied. The remaining results
- # will be collected in the `finally` step of the generator.
- if not self._backend.supports_retrieve_callback:
- if len(self._jobs) > 0:
- return True
- return False
- def _retrieve(self):
- timeout_control_job = None
- while self._wait_retrieval():
- # If the callback thread of a worker has signaled that its task
- # triggered an exception, or if the retrieval loop has raised an
- # exception (e.g. `GeneratorExit`), exit the loop and surface the
- # worker traceback.
- if self._aborting:
- self._raise_error_fast()
- break
- nb_jobs = len(self._jobs)
- # Now wait for a job to be ready for retrieval.
- if self.return_ordered:
- # Case ordered: wait for completion (or error) of the next job
- # that have been dispatched and not retrieved yet. If no job
- # have been dispatched yet, wait for dispatch.
- # We assume that the time to wait for the next job to be
- # dispatched is always low, so that the timeout
- # control only have to be done on the amount of time the next
- # dispatched job is pending.
- if (nb_jobs == 0) or (
- self._jobs[0].get_status(timeout=self.timeout) == TASK_PENDING
- ):
- time.sleep(0.01)
- continue
- elif nb_jobs == 0:
- # Case unordered: jobs are added to the list of jobs to
- # retrieve `self._jobs` only once completed or in error, which
- # is too late to enable timeout control in the same way than in
- # the previous case.
- # Instead, if no job is ready to be retrieved yet, we
- # arbitrarily pick a dispatched job, and the timeout control is
- # done such that an error is raised if this control job
- # timeouts before any other dispatched job has completed and
- # been added to `self._jobs` to be retrieved.
- if timeout_control_job is None:
- timeout_control_job = next(iter(self._jobs_set), None)
- # NB: it can be None if no job has been dispatched yet.
- if timeout_control_job is not None:
- timeout_control_job.get_status(timeout=self.timeout)
- time.sleep(0.01)
- continue
- elif timeout_control_job is not None:
- # Case unordered, when `nb_jobs > 0`:
- # It means that a job is ready to be retrieved, so no timeout
- # will occur during this iteration.
- # Before proceeding to retrieval of the next ready job, reset
- # the timeout control state to prepare the next iteration.
- timeout_control_job._completion_timeout_counter = None
- timeout_control_job = None
- # We need to be careful: the job list can be filling up as
- # we empty it and Python list are not thread-safe by
- # default hence the use of the lock
- with self._lock:
- batched_results = self._jobs.popleft()
- if not self.return_ordered:
- self._jobs_set.remove(batched_results)
- # Flatten the batched results to output one output at a time
- batched_results = batched_results.get_result(self.timeout)
- for result in batched_results:
- self._nb_consumed += 1
- yield result
- def _raise_error_fast(self):
- """If we are aborting, raise if a job caused an error."""
- # Find the first job whose status is TASK_ERROR if it exists.
- with self._lock:
- error_job = next(
- (job for job in self._jobs if job.status == TASK_ERROR), None
- )
- # If this error job exists, immediately raise the error by
- # calling get_result. This job might not exists if abort has been
- # called directly or if the generator is gc'ed.
- if error_job is not None:
- error_job.get_result(self.timeout)
- def _warn_exit_early(self):
- """Warn the user if the generator is gc'ed before being consumned."""
- ready_outputs = self.n_completed_tasks - self._nb_consumed
- is_completed = self._is_completed()
- msg = ""
- if ready_outputs:
- msg += (
- f"{ready_outputs} tasks have been successfully executed but not used."
- )
- if not is_completed:
- msg += " Additionally, "
- if not is_completed:
- msg += (
- f"{self.n_dispatched_tasks - self.n_completed_tasks} tasks "
- "which were still being processed by the workers have been "
- "cancelled."
- )
- if msg:
- msg += (
- " You could benefit from adjusting the input task "
- "iterator to limit unnecessary computation time."
- )
- warnings.warn(msg)
- def _get_sequential_output(self, iterable):
- """Separate loop for sequential output.
- This simplifies the traceback in case of errors and reduces the
- overhead of calling sequential tasks with `joblib`.
- """
- try:
- self._iterating = True
- self._original_iterator = iterable
- batch_size = self._get_batch_size()
- if batch_size != 1:
- it = iter(iterable)
- iterable_batched = iter(
- lambda: tuple(itertools.islice(it, batch_size)), ()
- )
- iterable = (task for batch in iterable_batched for task in batch)
- # first yield returns None, for internal use only. This ensures
- # that we enter the try/except block and setup the generator.
- yield None
- # Sequentially call the tasks and yield the results.
- for func, args, kwargs in iterable:
- self.n_dispatched_batches += 1
- self.n_dispatched_tasks += 1
- res = func(*args, **kwargs)
- self.n_completed_tasks += 1
- self.print_progress()
- yield res
- self._nb_consumed += 1
- except BaseException:
- self._exception = True
- self._aborting = True
- self._aborted = True
- raise
- finally:
- self._running = False
- self._iterating = False
- self._original_iterator = None
- self.print_progress()
- def _reset_run_tracking(self):
- """Reset the counters and flags used to track the execution."""
- # Makes sur the parallel instance was not previously running in a
- # thread-safe way.
- with getattr(self, "_lock", nullcontext()):
- if self._running:
- msg = "This Parallel instance is already running !"
- if self.return_generator is True:
- msg += (
- " Before submitting new tasks, you must wait for the "
- "completion of all the previous tasks, or clean all "
- "references to the output generator."
- )
- raise RuntimeError(msg)
- self._running = True
- # Counter to keep track of the task dispatched and completed.
- self.n_dispatched_batches = 0
- self.n_dispatched_tasks = 0
- self.n_completed_tasks = 0
- # Following count is incremented by one each time the user iterates
- # on the output generator, it is used to prepare an informative
- # warning message in case the generator is deleted before all the
- # dispatched tasks have been consumed.
- self._nb_consumed = 0
- # Following flags are used to synchronize the threads in case one of
- # the tasks error-out to ensure that all workers abort fast and that
- # the backend terminates properly.
- # Set to True as soon as a worker signals that a task errors-out
- self._exception = False
- # Set to True in case of early termination following an incident
- self._aborting = False
- # Set to True after abortion is complete
- self._aborted = False
- def __call__(self, iterable):
- """Main function to dispatch parallel tasks."""
- self._reset_run_tracking()
- self.n_tasks = len(iterable) if hasattr(iterable, "__len__") else None
- self._start_time = time.time()
- if not self._managed_backend:
- n_jobs = self._initialize_backend()
- else:
- n_jobs = self._effective_n_jobs()
- if n_jobs == 1:
- # If n_jobs==1, run the computation sequentially and return
- # immediately to avoid overheads.
- output = self._get_sequential_output(iterable)
- next(output)
- return output if self.return_generator else list(output)
- # Let's create an ID that uniquely identifies the current call. If the
- # call is interrupted early and that the same instance is immediately
- # reused, this id will be used to prevent workers that were
- # concurrently finalizing a task from the previous call to run the
- # callback.
- with self._lock:
- self._call_id = uuid4().hex
- # self._effective_n_jobs should be called in the Parallel.__call__
- # thread only -- store its value in an attribute for further queries.
- self._cached_effective_n_jobs = n_jobs
- if isinstance(self._backend, LokyBackend):
- # For the loky backend, we add a callback executed when reducing
- # BatchCalls, that makes the loky executor use a temporary folder
- # specific to this Parallel object when pickling temporary memmaps.
- # This callback is necessary to ensure that several Parallel
- # objects using the same reusable executor don't use the same
- # temporary resources.
- def _batched_calls_reducer_callback():
- # Relevant implementation detail: the following lines, called
- # when reducing BatchedCalls, are called in a thread-safe
- # situation, meaning that the context of the temporary folder
- # manager will not be changed in between the callback execution
- # and the end of the BatchedCalls pickling. The reason is that
- # pickling (the only place where set_current_context is used)
- # is done from a single thread (the queue_feeder_thread).
- self._backend._workers._temp_folder_manager.set_current_context( # noqa
- self._id
- )
- self._reducer_callback = _batched_calls_reducer_callback
- # self._effective_n_jobs should be called in the Parallel.__call__
- # thread only -- store its value in an attribute for further queries.
- self._cached_effective_n_jobs = n_jobs
- backend_name = self._backend.__class__.__name__
- if n_jobs == 0:
- raise RuntimeError("%s has no active worker." % backend_name)
- self._print(f"Using backend {backend_name} with {n_jobs} concurrent workers.")
- if hasattr(self._backend, "start_call"):
- self._backend.start_call()
- # Following flag prevents double calls to `backend.stop_call`.
- self._calling = True
- iterator = iter(iterable)
- pre_dispatch = self.pre_dispatch
- if pre_dispatch == "all":
- # prevent further dispatch via multiprocessing callback thread
- self._original_iterator = None
- self._pre_dispatch_amount = 0
- else:
- self._original_iterator = iterator
- if hasattr(pre_dispatch, "endswith"):
- pre_dispatch = eval_expr(pre_dispatch.replace("n_jobs", str(n_jobs)))
- self._pre_dispatch_amount = pre_dispatch = int(pre_dispatch)
- # The main thread will consume the first pre_dispatch items and
- # the remaining items will later be lazily dispatched by async
- # callbacks upon task completions.
- # TODO: this iterator should be batch_size * n_jobs
- iterator = itertools.islice(iterator, self._pre_dispatch_amount)
- # Use a caching dict for callables that are pickled with cloudpickle to
- # improve performances. This cache is used only in the case of
- # functions that are defined in the __main__ module, functions that
- # are defined locally (inside another function) and lambda expressions.
- self._pickle_cache = dict()
- output = self._get_outputs(iterator, pre_dispatch)
- self._call_ref = weakref.ref(output)
- # The first item from the output is blank, but it makes the interpreter
- # progress until it enters the Try/Except block of the generator and
- # reaches the first `yield` statement. This starts the asynchronous
- # dispatch of the tasks to the workers.
- next(output)
- return output if self.return_generator else list(output)
- def __repr__(self):
- return "%s(n_jobs=%s)" % (self.__class__.__name__, self.n_jobs)
|