parallel.py 85 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075
  1. """
  2. Helpers for embarrassingly parallel code.
  3. """
  4. # Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org >
  5. # Copyright: 2010, Gael Varoquaux
  6. # License: BSD 3 clause
  7. from __future__ import division
  8. import collections
  9. import functools
  10. import itertools
  11. import os
  12. import queue
  13. import sys
  14. import threading
  15. import time
  16. import warnings
  17. import weakref
  18. from contextlib import nullcontext
  19. from math import floor, log10, sqrt
  20. from multiprocessing import TimeoutError
  21. from numbers import Integral
  22. from uuid import uuid4
  23. from ._multiprocessing_helpers import mp
  24. # Make sure that those two classes are part of the public joblib.parallel API
  25. # so that 3rd party backend implementers can import them from here.
  26. from ._parallel_backends import (
  27. AutoBatchingMixin, # noqa
  28. FallbackToBackend,
  29. LokyBackend,
  30. MultiprocessingBackend,
  31. ParallelBackendBase, # noqa
  32. SequentialBackend,
  33. ThreadingBackend,
  34. )
  35. from ._utils import _Sentinel, eval_expr
  36. from .disk import memstr_to_bytes
  37. from .logger import Logger, short_format_time
  38. BACKENDS = {
  39. "threading": ThreadingBackend,
  40. "sequential": SequentialBackend,
  41. }
  42. # name of the backend used by default by Parallel outside of any context
  43. # managed by ``parallel_config`` or ``parallel_backend``.
  44. # threading is the only backend that is always everywhere
  45. DEFAULT_BACKEND = "threading"
  46. DEFAULT_THREAD_BACKEND = "threading"
  47. DEFAULT_PROCESS_BACKEND = "threading"
  48. MAYBE_AVAILABLE_BACKENDS = {"multiprocessing", "loky"}
  49. # if multiprocessing is available, so is loky, we set it as the default
  50. # backend
  51. if mp is not None:
  52. BACKENDS["multiprocessing"] = MultiprocessingBackend
  53. from .externals import loky
  54. BACKENDS["loky"] = LokyBackend
  55. DEFAULT_BACKEND = "loky"
  56. DEFAULT_PROCESS_BACKEND = "loky"
  57. # Thread local value that can be overridden by the ``parallel_config`` context
  58. # manager
  59. _backend = threading.local()
  60. def _register_dask():
  61. """Register Dask Backend if called with parallel_config(backend="dask")"""
  62. try:
  63. from ._dask import DaskDistributedBackend
  64. register_parallel_backend("dask", DaskDistributedBackend)
  65. except ImportError as e:
  66. msg = (
  67. "To use the dask.distributed backend you must install both "
  68. "the `dask` and distributed modules.\n\n"
  69. "See https://dask.pydata.org/en/latest/install.html for more "
  70. "information."
  71. )
  72. raise ImportError(msg) from e
  73. EXTERNAL_BACKENDS = {
  74. "dask": _register_dask,
  75. }
  76. # Sentinels for the default values of the Parallel constructor and
  77. # the parallel_config and parallel_backend context managers
  78. default_parallel_config = {
  79. "backend": _Sentinel(default_value=None),
  80. "n_jobs": _Sentinel(default_value=None),
  81. "verbose": _Sentinel(default_value=0),
  82. "temp_folder": _Sentinel(default_value=None),
  83. "max_nbytes": _Sentinel(default_value="1M"),
  84. "mmap_mode": _Sentinel(default_value="r"),
  85. "prefer": _Sentinel(default_value=None),
  86. "require": _Sentinel(default_value=None),
  87. }
  88. VALID_BACKEND_HINTS = ("processes", "threads", None)
  89. VALID_BACKEND_CONSTRAINTS = ("sharedmem", None)
  90. def _get_config_param(param, context_config, key):
  91. """Return the value of a parallel config parameter
  92. Explicitly setting it in Parallel has priority over setting in a
  93. parallel_(config/backend) context manager.
  94. """
  95. if param is not default_parallel_config[key]:
  96. # param is explicitly set, return it
  97. return param
  98. if context_config[key] is not default_parallel_config[key]:
  99. # there's a context manager and the key is set, return it
  100. return context_config[key]
  101. # Otherwise, we are in the default_parallel_config,
  102. # return the default value
  103. return param.default_value
  104. def get_active_backend(
  105. prefer=default_parallel_config["prefer"],
  106. require=default_parallel_config["require"],
  107. verbose=default_parallel_config["verbose"],
  108. ):
  109. """Return the active default backend"""
  110. backend, config = _get_active_backend(prefer, require, verbose)
  111. n_jobs = _get_config_param(default_parallel_config["n_jobs"], config, "n_jobs")
  112. return backend, n_jobs
  113. def _get_active_backend(
  114. prefer=default_parallel_config["prefer"],
  115. require=default_parallel_config["require"],
  116. verbose=default_parallel_config["verbose"],
  117. ):
  118. """Return the active default backend"""
  119. backend_config = getattr(_backend, "config", default_parallel_config)
  120. backend = _get_config_param(
  121. default_parallel_config["backend"], backend_config, "backend"
  122. )
  123. prefer = _get_config_param(prefer, backend_config, "prefer")
  124. require = _get_config_param(require, backend_config, "require")
  125. verbose = _get_config_param(verbose, backend_config, "verbose")
  126. if prefer not in VALID_BACKEND_HINTS:
  127. raise ValueError(
  128. f"prefer={prefer} is not a valid backend hint, "
  129. f"expected one of {VALID_BACKEND_HINTS}"
  130. )
  131. if require not in VALID_BACKEND_CONSTRAINTS:
  132. raise ValueError(
  133. f"require={require} is not a valid backend constraint, "
  134. f"expected one of {VALID_BACKEND_CONSTRAINTS}"
  135. )
  136. if prefer == "processes" and require == "sharedmem":
  137. raise ValueError(
  138. "prefer == 'processes' and require == 'sharedmem' are inconsistent settings"
  139. )
  140. explicit_backend = True
  141. if backend is None:
  142. # We are either outside of the scope of any parallel_(config/backend)
  143. # context manager or the context manager did not set a backend.
  144. # create the default backend instance now.
  145. backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0)
  146. explicit_backend = False
  147. # Try to use the backend set by the user with the context manager.
  148. nesting_level = backend.nesting_level
  149. uses_threads = getattr(backend, "uses_threads", False)
  150. supports_sharedmem = getattr(backend, "supports_sharedmem", False)
  151. # Force to use thread-based backend if the provided backend does not
  152. # match the shared memory constraint or if the backend is not explicitly
  153. # given and threads are preferred.
  154. force_threads = (require == "sharedmem" and not supports_sharedmem) or (
  155. not explicit_backend and prefer == "threads" and not uses_threads
  156. )
  157. force_processes = not explicit_backend and prefer == "processes" and uses_threads
  158. if force_threads:
  159. # This backend does not match the shared memory constraint:
  160. # fallback to the default thead-based backend.
  161. sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND](
  162. nesting_level=nesting_level
  163. )
  164. # Warn the user if we forced the backend to thread-based, while the
  165. # user explicitly specified a non-thread-based backend.
  166. if verbose >= 10 and explicit_backend:
  167. print(
  168. f"Using {sharedmem_backend.__class__.__name__} as "
  169. f"joblib backend instead of {backend.__class__.__name__} "
  170. "as the latter does not provide shared memory semantics."
  171. )
  172. # Force to n_jobs=1 by default
  173. thread_config = backend_config.copy()
  174. thread_config["n_jobs"] = 1
  175. return sharedmem_backend, thread_config
  176. if force_processes:
  177. # This backend does not match the prefer="processes" constraint:
  178. # fallback to the default process-based backend.
  179. process_backend = BACKENDS[DEFAULT_PROCESS_BACKEND](nesting_level=nesting_level)
  180. return process_backend, backend_config.copy()
  181. return backend, backend_config
  182. class parallel_config:
  183. """Set the default backend or configuration for :class:`~joblib.Parallel`.
  184. This is an alternative to directly passing keyword arguments to the
  185. :class:`~joblib.Parallel` class constructor. It is particularly useful when
  186. calling into library code that uses joblib internally but does not expose
  187. the various parallel configuration arguments in its own API.
  188. Parameters
  189. ----------
  190. backend: str or ParallelBackendBase instance, default=None
  191. If ``backend`` is a string it must match a previously registered
  192. implementation using the :func:`~register_parallel_backend` function.
  193. By default the following backends are available:
  194. - 'loky': single-host, process-based parallelism (used by default),
  195. - 'threading': single-host, thread-based parallelism,
  196. - 'multiprocessing': legacy single-host, process-based parallelism.
  197. 'loky' is recommended to run functions that manipulate Python objects.
  198. 'threading' is a low-overhead alternative that is most efficient for
  199. functions that release the Global Interpreter Lock: e.g. I/O-bound
  200. code or CPU-bound code in a few calls to native code that explicitly
  201. releases the GIL. Note that on some rare systems (such as pyodide),
  202. multiprocessing and loky may not be available, in which case joblib
  203. defaults to threading.
  204. In addition, if the ``dask`` and ``distributed`` Python packages are
  205. installed, it is possible to use the 'dask' backend for better
  206. scheduling of nested parallel calls without over-subscription and
  207. potentially distribute parallel calls over a networked cluster of
  208. several hosts.
  209. It is also possible to use the distributed 'ray' backend for
  210. distributing the workload to a cluster of nodes. See more details
  211. in the Examples section below.
  212. Alternatively the backend can be passed directly as an instance.
  213. n_jobs: int, default=None
  214. The maximum number of concurrently running jobs, such as the number
  215. of Python worker processes when ``backend="loky"`` or the size of the
  216. thread-pool when ``backend="threading"``.
  217. This argument is converted to an integer, rounded below for float.
  218. If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
  219. ``n_cpus`` is obtained with :func:`~cpu_count`.
  220. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
  221. using ``n_jobs=-2`` will result in all CPUs but one being used.
  222. This argument can also go above ``n_cpus``, which will cause
  223. oversubscription. In some cases, slight oversubscription can be
  224. beneficial, e.g., for tasks with large I/O operations.
  225. If 1 is given, no parallel computing code is used at all, and the
  226. behavior amounts to a simple python `for` loop. This mode is not
  227. compatible with `timeout`.
  228. None is a marker for 'unset' that will be interpreted as n_jobs=1
  229. unless the call is performed under a :func:`~parallel_config`
  230. context manager that sets another value for ``n_jobs``.
  231. If n_jobs = 0 then a ValueError is raised.
  232. verbose: int, default=0
  233. The verbosity level: if non zero, progress messages are
  234. printed. Above 50, the output is sent to stdout.
  235. The frequency of the messages increases with the verbosity level.
  236. If it more than 10, all iterations are reported.
  237. temp_folder: str or None, default=None
  238. Folder to be used by the pool for memmapping large arrays
  239. for sharing memory with worker processes. If None, this will try in
  240. order:
  241. - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment
  242. variable,
  243. - ``/dev/shm`` if the folder exists and is writable: this is a
  244. RAM disk filesystem available by default on modern Linux
  245. distributions,
  246. - the default system temporary folder that can be
  247. overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment
  248. variables, typically ``/tmp`` under Unix operating systems.
  249. max_nbytes: int, str, or None, optional, default='1M'
  250. Threshold on the size of arrays passed to the workers that
  251. triggers automated memory mapping in temp_folder. Can be an int
  252. in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
  253. Use None to disable memmapping of large arrays.
  254. mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
  255. Memmapping mode for numpy arrays passed to workers. None will
  256. disable memmapping, other modes defined in the numpy.memmap doc:
  257. https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
  258. Also, see 'max_nbytes' parameter documentation for more details.
  259. prefer: str in {'processes', 'threads'} or None, default=None
  260. Soft hint to choose the default backend.
  261. The default process-based backend is 'loky' and the default
  262. thread-based backend is 'threading'. Ignored if the ``backend``
  263. parameter is specified.
  264. require: 'sharedmem' or None, default=None
  265. Hard constraint to select the backend. If set to 'sharedmem',
  266. the selected backend will be single-host and thread-based.
  267. inner_max_num_threads: int, default=None
  268. If not None, overwrites the limit set on the number of threads
  269. usable in some third-party library threadpools like OpenBLAS,
  270. MKL or OpenMP. This is only used with the ``loky`` backend.
  271. backend_params: dict
  272. Additional parameters to pass to the backend constructor when
  273. backend is a string.
  274. Notes
  275. -----
  276. Joblib tries to limit the oversubscription by limiting the number of
  277. threads usable in some third-party library threadpools like OpenBLAS, MKL
  278. or OpenMP. The default limit in each worker is set to
  279. ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
  280. overwritten with the ``inner_max_num_threads`` argument which will be used
  281. to set this limit in the child processes.
  282. .. versionadded:: 1.3
  283. Examples
  284. --------
  285. >>> from operator import neg
  286. >>> with parallel_config(backend='threading'):
  287. ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
  288. ...
  289. [-1, -2, -3, -4, -5]
  290. To use the 'ray' joblib backend add the following lines:
  291. >>> from ray.util.joblib import register_ray # doctest: +SKIP
  292. >>> register_ray() # doctest: +SKIP
  293. >>> with parallel_config(backend="ray"): # doctest: +SKIP
  294. ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
  295. [-1, -2, -3, -4, -5]
  296. """
  297. def __init__(
  298. self,
  299. backend=default_parallel_config["backend"],
  300. *,
  301. n_jobs=default_parallel_config["n_jobs"],
  302. verbose=default_parallel_config["verbose"],
  303. temp_folder=default_parallel_config["temp_folder"],
  304. max_nbytes=default_parallel_config["max_nbytes"],
  305. mmap_mode=default_parallel_config["mmap_mode"],
  306. prefer=default_parallel_config["prefer"],
  307. require=default_parallel_config["require"],
  308. inner_max_num_threads=None,
  309. **backend_params,
  310. ):
  311. # Save the parallel info and set the active parallel config
  312. self.old_parallel_config = getattr(_backend, "config", default_parallel_config)
  313. backend = self._check_backend(backend, inner_max_num_threads, **backend_params)
  314. new_config = {
  315. "n_jobs": n_jobs,
  316. "verbose": verbose,
  317. "temp_folder": temp_folder,
  318. "max_nbytes": max_nbytes,
  319. "mmap_mode": mmap_mode,
  320. "prefer": prefer,
  321. "require": require,
  322. "backend": backend,
  323. }
  324. self.parallel_config = self.old_parallel_config.copy()
  325. self.parallel_config.update(
  326. {k: v for k, v in new_config.items() if not isinstance(v, _Sentinel)}
  327. )
  328. setattr(_backend, "config", self.parallel_config)
  329. def _check_backend(self, backend, inner_max_num_threads, **backend_params):
  330. if backend is default_parallel_config["backend"]:
  331. if inner_max_num_threads is not None or len(backend_params) > 0:
  332. raise ValueError(
  333. "inner_max_num_threads and other constructor "
  334. "parameters backend_params are only supported "
  335. "when backend is not None."
  336. )
  337. return backend
  338. if isinstance(backend, str):
  339. # Handle non-registered or missing backends
  340. if backend not in BACKENDS:
  341. if backend in EXTERNAL_BACKENDS:
  342. register = EXTERNAL_BACKENDS[backend]
  343. register()
  344. elif backend in MAYBE_AVAILABLE_BACKENDS:
  345. warnings.warn(
  346. f"joblib backend '{backend}' is not available on "
  347. f"your system, falling back to {DEFAULT_BACKEND}.",
  348. UserWarning,
  349. stacklevel=2,
  350. )
  351. BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
  352. else:
  353. raise ValueError(
  354. f"Invalid backend: {backend}, expected one of "
  355. f"{sorted(BACKENDS.keys())}"
  356. )
  357. backend = BACKENDS[backend](**backend_params)
  358. else:
  359. if len(backend_params) > 0:
  360. raise ValueError(
  361. "Constructor parameters backend_params are only "
  362. "supported when backend is a string."
  363. )
  364. if inner_max_num_threads is not None:
  365. msg = (
  366. f"{backend.__class__.__name__} does not accept setting the "
  367. "inner_max_num_threads argument."
  368. )
  369. assert backend.supports_inner_max_num_threads, msg
  370. backend.inner_max_num_threads = inner_max_num_threads
  371. # If the nesting_level of the backend is not set previously, use the
  372. # nesting level from the previous active_backend to set it
  373. if backend.nesting_level is None:
  374. parent_backend = self.old_parallel_config["backend"]
  375. if parent_backend is default_parallel_config["backend"]:
  376. nesting_level = 0
  377. else:
  378. nesting_level = parent_backend.nesting_level
  379. backend.nesting_level = nesting_level
  380. return backend
  381. def __enter__(self):
  382. return self.parallel_config
  383. def __exit__(self, type, value, traceback):
  384. self.unregister()
  385. def unregister(self):
  386. setattr(_backend, "config", self.old_parallel_config)
  387. class parallel_backend(parallel_config):
  388. """Change the default backend used by Parallel inside a with block.
  389. .. warning::
  390. It is advised to use the :class:`~joblib.parallel_config` context
  391. manager instead, which allows more fine-grained control over the
  392. backend configuration.
  393. If ``backend`` is a string it must match a previously registered
  394. implementation using the :func:`~register_parallel_backend` function.
  395. By default the following backends are available:
  396. - 'loky': single-host, process-based parallelism (used by default),
  397. - 'threading': single-host, thread-based parallelism,
  398. - 'multiprocessing': legacy single-host, process-based parallelism.
  399. 'loky' is recommended to run functions that manipulate Python objects.
  400. 'threading' is a low-overhead alternative that is most efficient for
  401. functions that release the Global Interpreter Lock: e.g. I/O-bound code or
  402. CPU-bound code in a few calls to native code that explicitly releases the
  403. GIL. Note that on some rare systems (such as Pyodide),
  404. multiprocessing and loky may not be available, in which case joblib
  405. defaults to threading.
  406. You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib
  407. backend to distribute work across machines. This works well with
  408. scikit-learn estimators with the ``n_jobs`` parameter, for example::
  409. >>> import joblib # doctest: +SKIP
  410. >>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP
  411. >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP
  412. >>> # create a local Dask cluster
  413. >>> cluster = LocalCluster() # doctest: +SKIP
  414. >>> client = Client(cluster) # doctest: +SKIP
  415. >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1)
  416. ... # doctest: +SKIP
  417. >>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP
  418. ... grid_search.fit(X, y)
  419. It is also possible to use the distributed 'ray' backend for distributing
  420. the workload to a cluster of nodes. To use the 'ray' joblib backend add
  421. the following lines::
  422. >>> from ray.util.joblib import register_ray # doctest: +SKIP
  423. >>> register_ray() # doctest: +SKIP
  424. >>> with parallel_backend("ray"): # doctest: +SKIP
  425. ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
  426. [-1, -2, -3, -4, -5]
  427. Alternatively the backend can be passed directly as an instance.
  428. By default all available workers will be used (``n_jobs=-1``) unless the
  429. caller passes an explicit value for the ``n_jobs`` parameter.
  430. This is an alternative to passing a ``backend='backend_name'`` argument to
  431. the :class:`~Parallel` class constructor. It is particularly useful when
  432. calling into library code that uses joblib internally but does not expose
  433. the backend argument in its own API.
  434. >>> from operator import neg
  435. >>> with parallel_backend('threading'):
  436. ... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
  437. ...
  438. [-1, -2, -3, -4, -5]
  439. Joblib also tries to limit the oversubscription by limiting the number of
  440. threads usable in some third-party library threadpools like OpenBLAS, MKL
  441. or OpenMP. The default limit in each worker is set to
  442. ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
  443. overwritten with the ``inner_max_num_threads`` argument which will be used
  444. to set this limit in the child processes.
  445. .. versionadded:: 0.10
  446. See Also
  447. --------
  448. joblib.parallel_config: context manager to change the backend configuration.
  449. """
  450. def __init__(
  451. self, backend, n_jobs=-1, inner_max_num_threads=None, **backend_params
  452. ):
  453. super().__init__(
  454. backend=backend,
  455. n_jobs=n_jobs,
  456. inner_max_num_threads=inner_max_num_threads,
  457. **backend_params,
  458. )
  459. if self.old_parallel_config is None:
  460. self.old_backend_and_jobs = None
  461. else:
  462. self.old_backend_and_jobs = (
  463. self.old_parallel_config["backend"],
  464. self.old_parallel_config["n_jobs"],
  465. )
  466. self.new_backend_and_jobs = (
  467. self.parallel_config["backend"],
  468. self.parallel_config["n_jobs"],
  469. )
  470. def __enter__(self):
  471. return self.new_backend_and_jobs
  472. # Under Linux or OS X the default start method of multiprocessing
  473. # can cause third party libraries to crash. Under Python 3.4+ it is possible
  474. # to set an environment variable to switch the default start method from
  475. # 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost
  476. # of causing semantic changes and some additional pool instantiation overhead.
  477. DEFAULT_MP_CONTEXT = None
  478. if hasattr(mp, "get_context"):
  479. method = os.environ.get("JOBLIB_START_METHOD", "").strip() or None
  480. if method is not None:
  481. DEFAULT_MP_CONTEXT = mp.get_context(method=method)
  482. class BatchedCalls(object):
  483. """Wrap a sequence of (func, args, kwargs) tuples as a single callable"""
  484. def __init__(
  485. self, iterator_slice, backend_and_jobs, reducer_callback=None, pickle_cache=None
  486. ):
  487. self.items = list(iterator_slice)
  488. self._size = len(self.items)
  489. self._reducer_callback = reducer_callback
  490. if isinstance(backend_and_jobs, tuple):
  491. self._backend, self._n_jobs = backend_and_jobs
  492. else:
  493. # this is for backward compatibility purposes. Before 0.12.6,
  494. # nested backends were returned without n_jobs indications.
  495. self._backend, self._n_jobs = backend_and_jobs, None
  496. self._pickle_cache = pickle_cache if pickle_cache is not None else {}
  497. def __call__(self):
  498. # Set the default nested backend to self._backend but do not set the
  499. # change the default number of processes to -1
  500. with parallel_config(backend=self._backend, n_jobs=self._n_jobs):
  501. return [func(*args, **kwargs) for func, args, kwargs in self.items]
  502. def __reduce__(self):
  503. if self._reducer_callback is not None:
  504. self._reducer_callback()
  505. # no need to pickle the callback.
  506. return (
  507. BatchedCalls,
  508. (self.items, (self._backend, self._n_jobs), None, self._pickle_cache),
  509. )
  510. def __len__(self):
  511. return self._size
  512. # Possible exit status for a task
  513. TASK_DONE = "Done"
  514. TASK_ERROR = "Error"
  515. TASK_PENDING = "Pending"
  516. ###############################################################################
  517. # CPU count that works also when multiprocessing has been disabled via
  518. # the JOBLIB_MULTIPROCESSING environment variable
  519. def cpu_count(only_physical_cores=False):
  520. """Return the number of CPUs.
  521. This delegates to loky.cpu_count that takes into account additional
  522. constraints such as Linux CFS scheduler quotas (typically set by container
  523. runtimes such as docker) and CPU affinity (for instance using the taskset
  524. command on Linux).
  525. Parameters
  526. ----------
  527. only_physical_cores : boolean, default=False
  528. If True, does not take hyperthreading / SMT logical cores into account.
  529. """
  530. if mp is None:
  531. return 1
  532. return loky.cpu_count(only_physical_cores=only_physical_cores)
  533. ###############################################################################
  534. # For verbosity
  535. def _verbosity_filter(index, verbose):
  536. """Returns False for indices increasingly apart, the distance
  537. depending on the value of verbose.
  538. We use a lag increasing as the square of index
  539. """
  540. if not verbose:
  541. return True
  542. elif verbose > 10:
  543. return False
  544. if index == 0:
  545. return False
  546. verbose = 0.5 * (11 - verbose) ** 2
  547. scale = sqrt(index / verbose)
  548. next_scale = sqrt((index + 1) / verbose)
  549. return int(next_scale) == int(scale)
  550. ###############################################################################
  551. def delayed(function):
  552. """Decorator used to capture the arguments of a function."""
  553. def delayed_function(*args, **kwargs):
  554. return function, args, kwargs
  555. try:
  556. delayed_function = functools.wraps(function)(delayed_function)
  557. except AttributeError:
  558. " functools.wraps fails on some callable objects "
  559. return delayed_function
  560. ###############################################################################
  561. class BatchCompletionCallBack(object):
  562. """Callback to keep track of completed results and schedule the next tasks.
  563. This callable is executed by the parent process whenever a worker process
  564. has completed a batch of tasks.
  565. It is used for progress reporting, to update estimate of the batch
  566. processing duration and to schedule the next batch of tasks to be
  567. processed.
  568. It is assumed that this callback will always be triggered by the backend
  569. right after the end of a task, in case of success as well as in case of
  570. failure.
  571. """
  572. ##########################################################################
  573. # METHODS CALLED BY THE MAIN THREAD #
  574. ##########################################################################
  575. def __init__(self, dispatch_timestamp, batch_size, parallel):
  576. self.dispatch_timestamp = dispatch_timestamp
  577. self.batch_size = batch_size
  578. self.parallel = parallel
  579. self.parallel_call_id = parallel._call_id
  580. self._completion_timeout_counter = None
  581. # Internals to keep track of the status and outcome of the task.
  582. # Used to hold a reference to the future-like object returned by the
  583. # backend after launching this task
  584. # This will be set later when calling `register_job`, as it is only
  585. # created once the task has been submitted.
  586. self.job = None
  587. if not parallel._backend.supports_retrieve_callback:
  588. # The status is only used for asynchronous result retrieval in the
  589. # callback.
  590. self.status = None
  591. else:
  592. # The initial status for the job is TASK_PENDING.
  593. # Once it is done, it will be either TASK_DONE, or TASK_ERROR.
  594. self.status = TASK_PENDING
  595. def register_job(self, job):
  596. """Register the object returned by `submit`."""
  597. self.job = job
  598. def get_result(self, timeout):
  599. """Returns the raw result of the task that was submitted.
  600. If the task raised an exception rather than returning, this same
  601. exception will be raised instead.
  602. If the backend supports the retrieval callback, it is assumed that this
  603. method is only called after the result has been registered. It is
  604. ensured by checking that `self.status(timeout)` does not return
  605. TASK_PENDING. In this case, `get_result` directly returns the
  606. registered result (or raise the registered exception).
  607. For other backends, there are no such assumptions, but `get_result`
  608. still needs to synchronously retrieve the result before it can
  609. return it or raise. It will block at most `self.timeout` seconds
  610. waiting for retrieval to complete, after that it raises a TimeoutError.
  611. """
  612. backend = self.parallel._backend
  613. if backend.supports_retrieve_callback:
  614. # We assume that the result has already been retrieved by the
  615. # callback thread, and is stored internally. It's just waiting to
  616. # be returned.
  617. return self._return_or_raise()
  618. # For other backends, the main thread needs to run the retrieval step.
  619. try:
  620. result = backend.retrieve_result(self.job, timeout=timeout)
  621. outcome = dict(result=result, status=TASK_DONE)
  622. except BaseException as e:
  623. outcome = dict(result=e, status=TASK_ERROR)
  624. self._register_outcome(outcome)
  625. return self._return_or_raise()
  626. def _return_or_raise(self):
  627. try:
  628. if self.status == TASK_ERROR:
  629. raise self._result
  630. return self._result
  631. finally:
  632. del self._result
  633. def get_status(self, timeout):
  634. """Get the status of the task.
  635. This function also checks if the timeout has been reached and register
  636. the TimeoutError outcome when it is the case.
  637. """
  638. if timeout is None or self.status != TASK_PENDING:
  639. return self.status
  640. # The computation are running and the status is pending.
  641. # Check that we did not wait for this jobs more than `timeout`.
  642. now = time.time()
  643. if self._completion_timeout_counter is None:
  644. self._completion_timeout_counter = now
  645. if (now - self._completion_timeout_counter) > timeout:
  646. outcome = dict(result=TimeoutError(), status=TASK_ERROR)
  647. self._register_outcome(outcome)
  648. return self.status
  649. ##########################################################################
  650. # METHODS CALLED BY CALLBACK THREADS #
  651. ##########################################################################
  652. def __call__(self, *args, **kwargs):
  653. """Function called by the callback thread after a job is completed."""
  654. # If the backend doesn't support callback retrievals, the next batch of
  655. # tasks is dispatched regardless. The result will be retrieved by the
  656. # main thread when calling `get_result`.
  657. if not self.parallel._backend.supports_retrieve_callback:
  658. self._dispatch_new()
  659. return
  660. # If the backend supports retrieving the result in the callback, it
  661. # registers the task outcome (TASK_ERROR or TASK_DONE), and schedules
  662. # the next batch if needed.
  663. with self.parallel._lock:
  664. # Edge case where while the task was processing, the `parallel`
  665. # instance has been reset and a new call has been issued, but the
  666. # worker managed to complete the task and trigger this callback
  667. # call just before being aborted by the reset.
  668. if self.parallel._call_id != self.parallel_call_id:
  669. return
  670. # When aborting, stop as fast as possible and do not retrieve the
  671. # result as it won't be returned by the Parallel call.
  672. if self.parallel._aborting:
  673. return
  674. # Retrieves the result of the task in the main process and dispatch
  675. # a new batch if needed.
  676. job_succeeded = self._retrieve_result(*args, **kwargs)
  677. if job_succeeded:
  678. self._dispatch_new()
  679. def _dispatch_new(self):
  680. """Schedule the next batch of tasks to be processed."""
  681. # This steps ensure that auto-batching works as expected.
  682. this_batch_duration = time.time() - self.dispatch_timestamp
  683. self.parallel._backend.batch_completed(self.batch_size, this_batch_duration)
  684. # Schedule the next batch of tasks.
  685. with self.parallel._lock:
  686. self.parallel.n_completed_tasks += self.batch_size
  687. self.parallel.print_progress()
  688. if self.parallel._original_iterator is not None:
  689. self.parallel.dispatch_next()
  690. def _retrieve_result(self, out):
  691. """Fetch and register the outcome of a task.
  692. Return True if the task succeeded, False otherwise.
  693. This function is only called by backends that support retrieving
  694. the task result in the callback thread.
  695. """
  696. try:
  697. result = self.parallel._backend.retrieve_result_callback(out)
  698. outcome = dict(status=TASK_DONE, result=result)
  699. except BaseException as e:
  700. # Avoid keeping references to parallel in the error.
  701. e.__traceback__ = None
  702. outcome = dict(result=e, status=TASK_ERROR)
  703. self._register_outcome(outcome)
  704. return outcome["status"] != TASK_ERROR
  705. ##########################################################################
  706. # This method can be called either in the main thread #
  707. # or in the callback thread. #
  708. ##########################################################################
  709. def _register_outcome(self, outcome):
  710. """Register the outcome of a task.
  711. This method can be called only once, future calls will be ignored.
  712. """
  713. # Covers the edge case where the main thread tries to register a
  714. # `TimeoutError` while the callback thread tries to register a result
  715. # at the same time.
  716. with self.parallel._lock:
  717. if self.status not in (TASK_PENDING, None):
  718. return
  719. self.status = outcome["status"]
  720. self._result = outcome["result"]
  721. # Once the result and the status are extracted, the last reference to
  722. # the job can be deleted.
  723. self.job = None
  724. # As soon as an error as been spotted, early stopping flags are sent to
  725. # the `parallel` instance.
  726. if self.status == TASK_ERROR:
  727. self.parallel._exception = True
  728. self.parallel._aborting = True
  729. if self.parallel.return_ordered:
  730. return
  731. with self.parallel._lock:
  732. # For `return_as=generator_unordered`, append the job to the queue
  733. # in the order of completion instead of submission.
  734. self.parallel._jobs.append(self)
  735. ###############################################################################
  736. def register_parallel_backend(name, factory, make_default=False):
  737. """Register a new Parallel backend factory.
  738. The new backend can then be selected by passing its name as the backend
  739. argument to the :class:`~Parallel` class. Moreover, the default backend can
  740. be overwritten globally by setting make_default=True.
  741. The factory can be any callable that takes no argument and return an
  742. instance of ``ParallelBackendBase``.
  743. Warning: this function is experimental and subject to change in a future
  744. version of joblib.
  745. .. versionadded:: 0.10
  746. """
  747. BACKENDS[name] = factory
  748. if make_default:
  749. global DEFAULT_BACKEND
  750. DEFAULT_BACKEND = name
  751. def effective_n_jobs(n_jobs=-1):
  752. """Determine the number of jobs that can actually run in parallel
  753. n_jobs is the number of workers requested by the callers. Passing n_jobs=-1
  754. means requesting all available workers for instance matching the number of
  755. CPU cores on the worker host(s).
  756. This method should return a guesstimate of the number of workers that can
  757. actually perform work concurrently with the currently enabled default
  758. backend. The primary use case is to make it possible for the caller to know
  759. in how many chunks to slice the work.
  760. In general working on larger data chunks is more efficient (less scheduling
  761. overhead and better use of CPU cache prefetching heuristics) as long as all
  762. the workers have enough work to do.
  763. Warning: this function is experimental and subject to change in a future
  764. version of joblib.
  765. .. versionadded:: 0.10
  766. """
  767. if n_jobs == 1:
  768. return 1
  769. backend, backend_n_jobs = get_active_backend()
  770. if n_jobs is None:
  771. n_jobs = backend_n_jobs
  772. return backend.effective_n_jobs(n_jobs=n_jobs)
  773. ###############################################################################
  774. class Parallel(Logger):
  775. """Helper class for readable parallel mapping.
  776. Read more in the :ref:`User Guide <parallel>`.
  777. Parameters
  778. ----------
  779. n_jobs: int, default=None
  780. The maximum number of concurrently running jobs, such as the number
  781. of Python worker processes when ``backend="loky"`` or the size of
  782. the thread-pool when ``backend="threading"``.
  783. This argument is converted to an integer, rounded below for float.
  784. If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
  785. ``n_cpus`` is obtained with :func:`~cpu_count`.
  786. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
  787. using ``n_jobs=-2`` will result in all CPUs but one being used.
  788. This argument can also go above ``n_cpus``, which will cause
  789. oversubscription. In some cases, slight oversubscription can be
  790. beneficial, e.g., for tasks with large I/O operations.
  791. If 1 is given, no parallel computing code is used at all, and the
  792. behavior amounts to a simple python `for` loop. This mode is not
  793. compatible with ``timeout``.
  794. None is a marker for 'unset' that will be interpreted as n_jobs=1
  795. unless the call is performed under a :func:`~parallel_config`
  796. context manager that sets another value for ``n_jobs``.
  797. If n_jobs = 0 then a ValueError is raised.
  798. backend: str, ParallelBackendBase instance or None, default='loky'
  799. Specify the parallelization backend implementation.
  800. Supported backends are:
  801. - "loky" used by default, can induce some
  802. communication and memory overhead when exchanging input and
  803. output data with the worker Python processes. On some rare
  804. systems (such as Pyiodide), the loky backend may not be
  805. available.
  806. - "multiprocessing" previous process-based backend based on
  807. `multiprocessing.Pool`. Less robust than `loky`.
  808. - "threading" is a very low-overhead backend but it suffers
  809. from the Python Global Interpreter Lock if the called function
  810. relies a lot on Python objects. "threading" is mostly useful
  811. when the execution bottleneck is a compiled extension that
  812. explicitly releases the GIL (for instance a Cython loop wrapped
  813. in a "with nogil" block or an expensive call to a library such
  814. as NumPy).
  815. - finally, you can register backends by calling
  816. :func:`~register_parallel_backend`. This will allow you to
  817. implement a backend of your liking.
  818. It is not recommended to hard-code the backend name in a call to
  819. :class:`~Parallel` in a library. Instead it is recommended to set
  820. soft hints (prefer) or hard constraints (require) so as to make it
  821. possible for library users to change the backend from the outside
  822. using the :func:`~parallel_config` context manager.
  823. return_as: str in {'list', 'generator', 'generator_unordered'}, default='list'
  824. If 'list', calls to this instance will return a list, only when
  825. all results have been processed and retrieved.
  826. If 'generator', it will return a generator that yields the results
  827. as soon as they are available, in the order the tasks have been
  828. submitted with.
  829. If 'generator_unordered', the generator will immediately yield
  830. available results independently of the submission order. The output
  831. order is not deterministic in this case because it depends on the
  832. concurrency of the workers.
  833. prefer: str in {'processes', 'threads'} or None, default=None
  834. Soft hint to choose the default backend if no specific backend
  835. was selected with the :func:`~parallel_config` context manager.
  836. The default process-based backend is 'loky' and the default
  837. thread-based backend is 'threading'. Ignored if the ``backend``
  838. parameter is specified.
  839. require: 'sharedmem' or None, default=None
  840. Hard constraint to select the backend. If set to 'sharedmem',
  841. the selected backend will be single-host and thread-based even
  842. if the user asked for a non-thread based backend with
  843. :func:`~joblib.parallel_config`.
  844. verbose: int, default=0
  845. The verbosity level: if non zero, progress messages are
  846. printed. Above 50, the output is sent to stdout.
  847. The frequency of the messages increases with the verbosity level.
  848. If it more than 10, all iterations are reported.
  849. timeout: float or None, default=None
  850. Timeout limit for each task to complete. If any task takes longer
  851. a TimeOutError will be raised. Only applied when n_jobs != 1
  852. pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}, default='2*n_jobs'
  853. The number of batches (of tasks) to be pre-dispatched.
  854. Default is '2*n_jobs'. When batch_size="auto" this is reasonable
  855. default and the workers should never starve. Note that only basic
  856. arithmetic are allowed here and no modules can be used in this
  857. expression.
  858. batch_size: int or 'auto', default='auto'
  859. The number of atomic tasks to dispatch at once to each
  860. worker. When individual evaluations are very fast, dispatching
  861. calls to workers can be slower than sequential computation because
  862. of the overhead. Batching fast computations together can mitigate
  863. this.
  864. The ``'auto'`` strategy keeps track of the time it takes for a
  865. batch to complete, and dynamically adjusts the batch size to keep
  866. the time on the order of half a second, using a heuristic. The
  867. initial batch size is 1.
  868. ``batch_size="auto"`` with ``backend="threading"`` will dispatch
  869. batches of a single task at a time as the threading backend has
  870. very little overhead and using larger batch size has not proved to
  871. bring any gain in that case.
  872. temp_folder: str or None, default=None
  873. Folder to be used by the pool for memmapping large arrays
  874. for sharing memory with worker processes. If None, this will try in
  875. order:
  876. - a folder pointed by the JOBLIB_TEMP_FOLDER environment
  877. variable,
  878. - /dev/shm if the folder exists and is writable: this is a
  879. RAM disk filesystem available by default on modern Linux
  880. distributions,
  881. - the default system temporary folder that can be
  882. overridden with TMP, TMPDIR or TEMP environment
  883. variables, typically /tmp under Unix operating systems.
  884. Only active when ``backend="loky"`` or ``"multiprocessing"``.
  885. max_nbytes int, str, or None, optional, default='1M'
  886. Threshold on the size of arrays passed to the workers that
  887. triggers automated memory mapping in temp_folder. Can be an int
  888. in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
  889. Use None to disable memmapping of large arrays.
  890. Only active when ``backend="loky"`` or ``"multiprocessing"``.
  891. mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
  892. Memmapping mode for numpy arrays passed to workers. None will
  893. disable memmapping, other modes defined in the numpy.memmap doc:
  894. https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
  895. Also, see 'max_nbytes' parameter documentation for more details.
  896. backend_kwargs: dict, optional
  897. Additional parameters to pass to the backend `configure` method.
  898. Notes
  899. -----
  900. This object uses workers to compute in parallel the application of a
  901. function to many different arguments. The main functionality it brings
  902. in addition to using the raw multiprocessing or concurrent.futures API
  903. are (see examples for details):
  904. * More readable code, in particular since it avoids
  905. constructing list of arguments.
  906. * Easier debugging:
  907. - informative tracebacks even when the error happens on
  908. the client side
  909. - using 'n_jobs=1' enables to turn off parallel computing
  910. for debugging without changing the codepath
  911. - early capture of pickling errors
  912. * An optional progress meter.
  913. * Interruption of multiprocesses jobs with 'Ctrl-C'
  914. * Flexible pickling control for the communication to and from
  915. the worker processes.
  916. * Ability to use shared memory efficiently with worker
  917. processes for large numpy-based datastructures.
  918. Note that the intended usage is to run one call at a time. Multiple
  919. calls to the same Parallel object will result in a ``RuntimeError``
  920. Examples
  921. --------
  922. A simple example:
  923. >>> from math import sqrt
  924. >>> from joblib import Parallel, delayed
  925. >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
  926. [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
  927. Reshaping the output when the function has several return
  928. values:
  929. >>> from math import modf
  930. >>> from joblib import Parallel, delayed
  931. >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10))
  932. >>> res, i = zip(*r)
  933. >>> res
  934. (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5)
  935. >>> i
  936. (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0)
  937. The progress meter: the higher the value of `verbose`, the more
  938. messages:
  939. >>> from time import sleep
  940. >>> from joblib import Parallel, delayed
  941. >>> r = Parallel(n_jobs=2, verbose=10)(
  942. ... delayed(sleep)(.2) for _ in range(10)) #doctest: +SKIP
  943. [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.6s
  944. [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.8s
  945. [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 1.4s finished
  946. Traceback example, note how the line of the error is indicated
  947. as well as the values of the parameter passed to the function that
  948. triggered the exception, even though the traceback happens in the
  949. child process:
  950. >>> from heapq import nlargest
  951. >>> from joblib import Parallel, delayed
  952. >>> Parallel(n_jobs=2)(
  953. ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3))
  954. ... # doctest: +SKIP
  955. -----------------------------------------------------------------------
  956. Sub-process traceback:
  957. -----------------------------------------------------------------------
  958. TypeError Mon Nov 12 11:37:46 2012
  959. PID: 12934 Python 2.7.3: /usr/bin/python
  960. ........................................................................
  961. /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None)
  962. 419 if n >= size:
  963. 420 return sorted(iterable, key=key, reverse=True)[:n]
  964. 421
  965. 422 # When key is none, use simpler decoration
  966. 423 if key is None:
  967. --> 424 it = izip(iterable, count(0,-1)) # decorate
  968. 425 result = _nlargest(n, it)
  969. 426 return map(itemgetter(0), result) # undecorate
  970. 427
  971. 428 # General case, slowest method
  972. TypeError: izip argument #1 must support iteration
  973. _______________________________________________________________________
  974. Using pre_dispatch in a producer/consumer situation, where the
  975. data is generated on the fly. Note how the producer is first
  976. called 3 times before the parallel loop is initiated, and then
  977. called to generate new data on the fly:
  978. >>> from math import sqrt
  979. >>> from joblib import Parallel, delayed
  980. >>> def producer():
  981. ... for i in range(6):
  982. ... print('Produced %s' % i)
  983. ... yield i
  984. >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
  985. ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP
  986. Produced 0
  987. Produced 1
  988. Produced 2
  989. [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s
  990. Produced 3
  991. [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s
  992. Produced 4
  993. [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s
  994. Produced 5
  995. [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s
  996. [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s remaining: 0.0s
  997. [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished
  998. """ # noqa: E501
  999. def __init__(
  1000. self,
  1001. n_jobs=default_parallel_config["n_jobs"],
  1002. backend=default_parallel_config["backend"],
  1003. return_as="list",
  1004. verbose=default_parallel_config["verbose"],
  1005. timeout=None,
  1006. pre_dispatch="2 * n_jobs",
  1007. batch_size="auto",
  1008. temp_folder=default_parallel_config["temp_folder"],
  1009. max_nbytes=default_parallel_config["max_nbytes"],
  1010. mmap_mode=default_parallel_config["mmap_mode"],
  1011. prefer=default_parallel_config["prefer"],
  1012. require=default_parallel_config["require"],
  1013. **backend_kwargs,
  1014. ):
  1015. # Initiate parent Logger class state
  1016. super().__init__()
  1017. # Interpret n_jobs=None as 'unset'
  1018. if n_jobs is None:
  1019. n_jobs = default_parallel_config["n_jobs"]
  1020. active_backend, context_config = _get_active_backend(
  1021. prefer=prefer, require=require, verbose=verbose
  1022. )
  1023. nesting_level = active_backend.nesting_level
  1024. self.verbose = _get_config_param(verbose, context_config, "verbose")
  1025. self.timeout = timeout
  1026. self.pre_dispatch = pre_dispatch
  1027. if return_as not in {"list", "generator", "generator_unordered"}:
  1028. raise ValueError(
  1029. 'Expected `return_as` parameter to be a string equal to "list"'
  1030. f',"generator" or "generator_unordered", but got {return_as} '
  1031. "instead."
  1032. )
  1033. self.return_as = return_as
  1034. self.return_generator = return_as != "list"
  1035. self.return_ordered = return_as != "generator_unordered"
  1036. # Check if we are under a parallel_config or parallel_backend
  1037. # context manager and use the config from the context manager
  1038. # for arguments that are not explicitly set.
  1039. self._backend_kwargs = {
  1040. **backend_kwargs,
  1041. **{
  1042. k: _get_config_param(param, context_config, k)
  1043. for param, k in [
  1044. (max_nbytes, "max_nbytes"),
  1045. (temp_folder, "temp_folder"),
  1046. (mmap_mode, "mmap_mode"),
  1047. (prefer, "prefer"),
  1048. (require, "require"),
  1049. (verbose, "verbose"),
  1050. ]
  1051. },
  1052. }
  1053. if isinstance(self._backend_kwargs["max_nbytes"], str):
  1054. self._backend_kwargs["max_nbytes"] = memstr_to_bytes(
  1055. self._backend_kwargs["max_nbytes"]
  1056. )
  1057. self._backend_kwargs["verbose"] = max(0, self._backend_kwargs["verbose"] - 50)
  1058. if DEFAULT_MP_CONTEXT is not None:
  1059. self._backend_kwargs["context"] = DEFAULT_MP_CONTEXT
  1060. elif hasattr(mp, "get_context"):
  1061. self._backend_kwargs["context"] = mp.get_context()
  1062. if backend is default_parallel_config["backend"] or backend is None:
  1063. backend = active_backend
  1064. elif isinstance(backend, ParallelBackendBase):
  1065. # Use provided backend as is, with the current nesting_level if it
  1066. # is not set yet.
  1067. if backend.nesting_level is None:
  1068. backend.nesting_level = nesting_level
  1069. elif hasattr(backend, "Pool") and hasattr(backend, "Lock"):
  1070. # Make it possible to pass a custom multiprocessing context as
  1071. # backend to change the start method to forkserver or spawn or
  1072. # preload modules on the forkserver helper process.
  1073. self._backend_kwargs["context"] = backend
  1074. backend = MultiprocessingBackend(nesting_level=nesting_level)
  1075. elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS:
  1076. warnings.warn(
  1077. f"joblib backend '{backend}' is not available on "
  1078. f"your system, falling back to {DEFAULT_BACKEND}.",
  1079. UserWarning,
  1080. stacklevel=2,
  1081. )
  1082. BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
  1083. backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level)
  1084. else:
  1085. try:
  1086. backend_factory = BACKENDS[backend]
  1087. except KeyError as e:
  1088. raise ValueError(
  1089. "Invalid backend: %s, expected one of %r"
  1090. % (backend, sorted(BACKENDS.keys()))
  1091. ) from e
  1092. backend = backend_factory(nesting_level=nesting_level)
  1093. n_jobs = _get_config_param(n_jobs, context_config, "n_jobs")
  1094. if n_jobs is None:
  1095. # No specific context override and no specific value request:
  1096. # default to the default of the backend.
  1097. n_jobs = backend.default_n_jobs
  1098. try:
  1099. n_jobs = int(n_jobs)
  1100. except ValueError:
  1101. raise ValueError("n_jobs could not be converted to int")
  1102. self.n_jobs = n_jobs
  1103. if require == "sharedmem" and not getattr(backend, "supports_sharedmem", False):
  1104. raise ValueError("Backend %s does not support shared memory" % backend)
  1105. if batch_size == "auto" or isinstance(batch_size, Integral) and batch_size > 0:
  1106. self.batch_size = batch_size
  1107. else:
  1108. raise ValueError(
  1109. "batch_size must be 'auto' or a positive integer, got: %r" % batch_size
  1110. )
  1111. if not isinstance(backend, SequentialBackend):
  1112. if self.return_generator and not backend.supports_return_generator:
  1113. raise ValueError(
  1114. "Backend {} does not support return_as={}".format(
  1115. backend, return_as
  1116. )
  1117. )
  1118. # This lock is used to coordinate the main thread of this process
  1119. # with the async callback thread of our the pool.
  1120. self._lock = threading.RLock()
  1121. self._jobs = collections.deque()
  1122. self._jobs_set = set()
  1123. self._pending_outputs = list()
  1124. self._ready_batches = queue.Queue()
  1125. self._reducer_callback = None
  1126. # Internal variables
  1127. self._backend = backend
  1128. self._running = False
  1129. self._managed_backend = False
  1130. self._id = uuid4().hex
  1131. self._call_ref = None
  1132. def __enter__(self):
  1133. self._managed_backend = True
  1134. self._calling = False
  1135. self._initialize_backend()
  1136. return self
  1137. def __exit__(self, exc_type, exc_value, traceback):
  1138. self._managed_backend = False
  1139. if self.return_generator and self._calling:
  1140. self._abort()
  1141. self._terminate_and_reset()
  1142. def _initialize_backend(self):
  1143. """Build a process or thread pool and return the number of workers"""
  1144. try:
  1145. n_jobs = self._backend.configure(
  1146. n_jobs=self.n_jobs, parallel=self, **self._backend_kwargs
  1147. )
  1148. if self.timeout is not None and not self._backend.supports_timeout:
  1149. warnings.warn(
  1150. "The backend class {!r} does not support timeout. "
  1151. "You have set 'timeout={}' in Parallel but "
  1152. "the 'timeout' parameter will not be used.".format(
  1153. self._backend.__class__.__name__, self.timeout
  1154. )
  1155. )
  1156. except FallbackToBackend as e:
  1157. # Recursively initialize the backend in case of requested fallback.
  1158. self._backend = e.backend
  1159. n_jobs = self._initialize_backend()
  1160. return n_jobs
  1161. def _effective_n_jobs(self):
  1162. if self._backend:
  1163. return self._backend.effective_n_jobs(self.n_jobs)
  1164. return 1
  1165. def _terminate_and_reset(self):
  1166. if hasattr(self._backend, "stop_call") and self._calling:
  1167. self._backend.stop_call()
  1168. self._calling = False
  1169. if not self._managed_backend:
  1170. self._backend.terminate()
  1171. def _dispatch(self, batch):
  1172. """Queue the batch for computing, with or without multiprocessing
  1173. WARNING: this method is not thread-safe: it should be only called
  1174. indirectly via dispatch_one_batch.
  1175. """
  1176. # If job.get() catches an exception, it closes the queue:
  1177. if self._aborting:
  1178. return
  1179. batch_size = len(batch)
  1180. self.n_dispatched_tasks += batch_size
  1181. self.n_dispatched_batches += 1
  1182. dispatch_timestamp = time.time()
  1183. batch_tracker = BatchCompletionCallBack(dispatch_timestamp, batch_size, self)
  1184. self._register_new_job(batch_tracker)
  1185. # If return_ordered is False, the batch_tracker is not stored in the
  1186. # jobs queue at the time of submission. Instead, it will be appended to
  1187. # the queue by itself as soon as the callback is triggered to be able
  1188. # to return the results in the order of completion.
  1189. job = self._backend.submit(batch, callback=batch_tracker)
  1190. batch_tracker.register_job(job)
  1191. def _register_new_job(self, batch_tracker):
  1192. if self.return_ordered:
  1193. self._jobs.append(batch_tracker)
  1194. else:
  1195. self._jobs_set.add(batch_tracker)
  1196. def dispatch_next(self):
  1197. """Dispatch more data for parallel processing
  1198. This method is meant to be called concurrently by the multiprocessing
  1199. callback. We rely on the thread-safety of dispatch_one_batch to protect
  1200. against concurrent consumption of the unprotected iterator.
  1201. """
  1202. if not self.dispatch_one_batch(self._original_iterator):
  1203. self._iterating = False
  1204. self._original_iterator = None
  1205. def dispatch_one_batch(self, iterator):
  1206. """Prefetch the tasks for the next batch and dispatch them.
  1207. The effective size of the batch is computed here.
  1208. If there are no more jobs to dispatch, return False, else return True.
  1209. The iterator consumption and dispatching is protected by the same
  1210. lock so calling this function should be thread safe.
  1211. """
  1212. if self._aborting:
  1213. return False
  1214. batch_size = self._get_batch_size()
  1215. with self._lock:
  1216. # to ensure an even distribution of the workload between workers,
  1217. # we look ahead in the original iterators more than batch_size
  1218. # tasks - However, we keep consuming only one batch at each
  1219. # dispatch_one_batch call. The extra tasks are stored in a local
  1220. # queue, _ready_batches, that is looked-up prior to re-consuming
  1221. # tasks from the origal iterator.
  1222. try:
  1223. tasks = self._ready_batches.get(block=False)
  1224. except queue.Empty:
  1225. # slice the iterator n_jobs * batchsize items at a time. If the
  1226. # slice returns less than that, then the current batchsize puts
  1227. # too much weight on a subset of workers, while other may end
  1228. # up starving. So in this case, re-scale the batch size
  1229. # accordingly to distribute evenly the last items between all
  1230. # workers.
  1231. n_jobs = self._cached_effective_n_jobs
  1232. big_batch_size = batch_size * n_jobs
  1233. try:
  1234. islice = list(itertools.islice(iterator, big_batch_size))
  1235. except Exception as e:
  1236. # Handle the fact that the generator of task raised an
  1237. # exception. As this part of the code can be executed in
  1238. # a thread internal to the backend, register a task with
  1239. # an error that will be raised in the user's thread.
  1240. if isinstance(e.__context__, queue.Empty):
  1241. # Suppress the cause of the exception if it is
  1242. # queue.Empty to avoid cluttered traceback. Only do it
  1243. # if the __context__ is really empty to avoid messing
  1244. # with causes of the original error.
  1245. e.__cause__ = None
  1246. batch_tracker = BatchCompletionCallBack(0, batch_size, self)
  1247. self._register_new_job(batch_tracker)
  1248. batch_tracker._register_outcome(dict(result=e, status=TASK_ERROR))
  1249. return True
  1250. if len(islice) == 0:
  1251. return False
  1252. elif (
  1253. iterator is self._original_iterator and len(islice) < big_batch_size
  1254. ):
  1255. # We reached the end of the original iterator (unless
  1256. # iterator is the ``pre_dispatch``-long initial slice of
  1257. # the original iterator) -- decrease the batch size to
  1258. # account for potential variance in the batches running
  1259. # time.
  1260. final_batch_size = max(1, len(islice) // (10 * n_jobs))
  1261. else:
  1262. final_batch_size = max(1, len(islice) // n_jobs)
  1263. # enqueue n_jobs batches in a local queue
  1264. for i in range(0, len(islice), final_batch_size):
  1265. tasks = BatchedCalls(
  1266. islice[i : i + final_batch_size],
  1267. self._backend.get_nested_backend(),
  1268. self._reducer_callback,
  1269. self._pickle_cache,
  1270. )
  1271. self._ready_batches.put(tasks)
  1272. # finally, get one task.
  1273. tasks = self._ready_batches.get(block=False)
  1274. if len(tasks) == 0:
  1275. # No more tasks available in the iterator: tell caller to stop.
  1276. return False
  1277. else:
  1278. self._dispatch(tasks)
  1279. return True
  1280. def _get_batch_size(self):
  1281. """Returns the effective batch size for dispatch"""
  1282. if self.batch_size == "auto":
  1283. return self._backend.compute_batch_size()
  1284. else:
  1285. # Fixed batch size strategy
  1286. return self.batch_size
  1287. def _print(self, msg):
  1288. """Display the message on stout or stderr depending on verbosity"""
  1289. # XXX: Not using the logger framework: need to
  1290. # learn to use logger better.
  1291. if not self.verbose:
  1292. return
  1293. if self.verbose < 50:
  1294. writer = sys.stderr.write
  1295. else:
  1296. writer = sys.stdout.write
  1297. writer(f"[{self}]: {msg}\n")
  1298. def _is_completed(self):
  1299. """Check if all tasks have been completed"""
  1300. return self.n_completed_tasks == self.n_dispatched_tasks and not (
  1301. self._iterating or self._aborting
  1302. )
  1303. def print_progress(self):
  1304. """Display the process of the parallel execution only a fraction
  1305. of time, controlled by self.verbose.
  1306. """
  1307. if not self.verbose:
  1308. return
  1309. if self.n_tasks is not None and self.n_tasks > 0:
  1310. width = floor(log10(self.n_tasks)) + 1
  1311. else:
  1312. width = 3
  1313. elapsed_time = time.time() - self._start_time
  1314. if self._is_completed():
  1315. # Make sure that we get a last message telling us we are done
  1316. self._print(
  1317. f"Done {self.n_completed_tasks:{width}d} out of "
  1318. f"{self.n_completed_tasks:{width}d} | elapsed: "
  1319. f"{short_format_time(elapsed_time)} finished"
  1320. )
  1321. return
  1322. # Original job iterator becomes None once it has been fully
  1323. # consumed: at this point we know the total number of jobs and we are
  1324. # able to display an estimation of the remaining time based on already
  1325. # completed jobs. Otherwise, we simply display the number of completed
  1326. # tasks.
  1327. elif self._original_iterator is not None:
  1328. if _verbosity_filter(self.n_dispatched_batches, self.verbose):
  1329. return
  1330. fmt_time = f"| elapsed: {short_format_time(elapsed_time)}"
  1331. index = self.n_completed_tasks
  1332. if self.n_tasks is not None:
  1333. self._print(
  1334. f"Done {index:{width}d} out of {self.n_tasks:{width}d} {fmt_time}"
  1335. )
  1336. else:
  1337. pad = " " * (len("out of ") + width - len("tasks"))
  1338. self._print(f"Done {index:{width}d} tasks {pad}{fmt_time}")
  1339. else:
  1340. index = self.n_completed_tasks
  1341. # We are finished dispatching
  1342. total_tasks = self.n_dispatched_tasks
  1343. # We always display the first loop
  1344. if index != 0:
  1345. # Display depending on the number of remaining items
  1346. # A message as soon as we finish dispatching, cursor is 0
  1347. cursor = total_tasks - index + 1 - self._pre_dispatch_amount
  1348. frequency = (total_tasks // self.verbose) + 1
  1349. is_last_item = index + 1 == total_tasks
  1350. if is_last_item or cursor % frequency:
  1351. return
  1352. remaining_time = (elapsed_time / max(index, 1)) * (
  1353. self.n_dispatched_tasks - index
  1354. )
  1355. # only display status if remaining time is greater or equal to 0
  1356. self._print(
  1357. f"Done {index:{width}d} out of {total_tasks:{width}d} "
  1358. f"| elapsed: {short_format_time(elapsed_time)} remaining: "
  1359. f"{short_format_time(remaining_time)}"
  1360. )
  1361. def _abort(self):
  1362. # Stop dispatching new jobs in the async callback thread
  1363. self._aborting = True
  1364. # If the backend allows it, cancel or kill remaining running
  1365. # tasks without waiting for the results as we will raise
  1366. # the exception we got back to the caller instead of returning
  1367. # any result.
  1368. backend = self._backend
  1369. if not self._aborted and hasattr(backend, "abort_everything"):
  1370. # If the backend is managed externally we need to make sure
  1371. # to leave it in a working state to allow for future jobs
  1372. # scheduling.
  1373. ensure_ready = self._managed_backend
  1374. backend.abort_everything(ensure_ready=ensure_ready)
  1375. self._aborted = True
  1376. def _start(self, iterator, pre_dispatch):
  1377. # Only set self._iterating to True if at least a batch
  1378. # was dispatched. In particular this covers the edge
  1379. # case of Parallel used with an exhausted iterator. If
  1380. # self._original_iterator is None, then this means either
  1381. # that pre_dispatch == "all", n_jobs == 1 or that the first batch
  1382. # was very quick and its callback already dispatched all the
  1383. # remaining jobs.
  1384. self._iterating = False
  1385. if self.dispatch_one_batch(iterator):
  1386. self._iterating = self._original_iterator is not None
  1387. while self.dispatch_one_batch(iterator):
  1388. pass
  1389. if pre_dispatch == "all":
  1390. # The iterable was consumed all at once by the above for loop.
  1391. # No need to wait for async callbacks to trigger to
  1392. # consumption.
  1393. self._iterating = False
  1394. def _get_outputs(self, iterator, pre_dispatch):
  1395. """Iterator returning the tasks' output as soon as they are ready."""
  1396. dispatch_thread_id = threading.get_ident()
  1397. detach_generator_exit = False
  1398. try:
  1399. self._start(iterator, pre_dispatch)
  1400. # first yield returns None, for internal use only. This ensures
  1401. # that we enter the try/except block and start dispatching the
  1402. # tasks.
  1403. yield
  1404. with self._backend.retrieval_context():
  1405. yield from self._retrieve()
  1406. except GeneratorExit:
  1407. # The generator has been garbage collected before being fully
  1408. # consumed. This aborts the remaining tasks if possible and warn
  1409. # the user if necessary.
  1410. self._exception = True
  1411. # In some interpreters such as PyPy, GeneratorExit can be raised in
  1412. # a different thread than the one used to start the dispatch of the
  1413. # parallel tasks. This can lead to hang when a thread attempts to
  1414. # join itself. As workaround, we detach the execution of the
  1415. # aborting code to a dedicated thread. We then need to make sure
  1416. # the rest of the function does not call `_terminate_and_reset`
  1417. # in finally.
  1418. if dispatch_thread_id != threading.get_ident():
  1419. warnings.warn(
  1420. "A generator produced by joblib.Parallel has been "
  1421. "gc'ed in an unexpected thread. This behavior should "
  1422. "not cause major -issues but to make sure, please "
  1423. "report this warning and your use case at "
  1424. "https://github.com/joblib/joblib/issues so it can "
  1425. "be investigated."
  1426. )
  1427. detach_generator_exit = True
  1428. _parallel = self
  1429. class _GeneratorExitThread(threading.Thread):
  1430. def run(self):
  1431. _parallel._abort()
  1432. if _parallel.return_generator:
  1433. _parallel._warn_exit_early()
  1434. _parallel._terminate_and_reset()
  1435. _GeneratorExitThread(name="GeneratorExitThread").start()
  1436. return
  1437. # Otherwise, we are in the thread that started the dispatch: we can
  1438. # safely abort the execution and warn the user.
  1439. self._abort()
  1440. if self.return_generator:
  1441. self._warn_exit_early()
  1442. raise
  1443. # Note: we catch any BaseException instead of just Exception instances
  1444. # to also include KeyboardInterrupt
  1445. except BaseException:
  1446. self._exception = True
  1447. self._abort()
  1448. raise
  1449. finally:
  1450. # Store the unconsumed tasks and terminate the workers if necessary
  1451. _remaining_outputs = [] if self._exception else self._jobs
  1452. self._jobs = collections.deque()
  1453. self._jobs_set = set()
  1454. self._running = False
  1455. if not detach_generator_exit:
  1456. self._terminate_and_reset()
  1457. while len(_remaining_outputs) > 0:
  1458. batched_results = _remaining_outputs.popleft()
  1459. batched_results = batched_results.get_result(self.timeout)
  1460. for result in batched_results:
  1461. yield result
  1462. def _wait_retrieval(self):
  1463. """Return True if we need to continue retrieving some tasks."""
  1464. # If the input load is still being iterated over, it means that tasks
  1465. # are still on the dispatch waitlist and their results will need to
  1466. # be retrieved later on.
  1467. if self._iterating:
  1468. return True
  1469. # If some of the dispatched tasks are still being processed by the
  1470. # workers, wait for the compute to finish before starting retrieval
  1471. if self.n_completed_tasks < self.n_dispatched_tasks:
  1472. return True
  1473. # For backends that does not support retrieving asynchronously the
  1474. # result to the main process, all results must be carefully retrieved
  1475. # in the _retrieve loop in the main thread while the backend is alive.
  1476. # For other backends, the actual retrieval is done asynchronously in
  1477. # the callback thread, and we can terminate the backend before the
  1478. # `self._jobs` result list has been emptied. The remaining results
  1479. # will be collected in the `finally` step of the generator.
  1480. if not self._backend.supports_retrieve_callback:
  1481. if len(self._jobs) > 0:
  1482. return True
  1483. return False
  1484. def _retrieve(self):
  1485. timeout_control_job = None
  1486. while self._wait_retrieval():
  1487. # If the callback thread of a worker has signaled that its task
  1488. # triggered an exception, or if the retrieval loop has raised an
  1489. # exception (e.g. `GeneratorExit`), exit the loop and surface the
  1490. # worker traceback.
  1491. if self._aborting:
  1492. self._raise_error_fast()
  1493. break
  1494. nb_jobs = len(self._jobs)
  1495. # Now wait for a job to be ready for retrieval.
  1496. if self.return_ordered:
  1497. # Case ordered: wait for completion (or error) of the next job
  1498. # that have been dispatched and not retrieved yet. If no job
  1499. # have been dispatched yet, wait for dispatch.
  1500. # We assume that the time to wait for the next job to be
  1501. # dispatched is always low, so that the timeout
  1502. # control only have to be done on the amount of time the next
  1503. # dispatched job is pending.
  1504. if (nb_jobs == 0) or (
  1505. self._jobs[0].get_status(timeout=self.timeout) == TASK_PENDING
  1506. ):
  1507. time.sleep(0.01)
  1508. continue
  1509. elif nb_jobs == 0:
  1510. # Case unordered: jobs are added to the list of jobs to
  1511. # retrieve `self._jobs` only once completed or in error, which
  1512. # is too late to enable timeout control in the same way than in
  1513. # the previous case.
  1514. # Instead, if no job is ready to be retrieved yet, we
  1515. # arbitrarily pick a dispatched job, and the timeout control is
  1516. # done such that an error is raised if this control job
  1517. # timeouts before any other dispatched job has completed and
  1518. # been added to `self._jobs` to be retrieved.
  1519. if timeout_control_job is None:
  1520. timeout_control_job = next(iter(self._jobs_set), None)
  1521. # NB: it can be None if no job has been dispatched yet.
  1522. if timeout_control_job is not None:
  1523. timeout_control_job.get_status(timeout=self.timeout)
  1524. time.sleep(0.01)
  1525. continue
  1526. elif timeout_control_job is not None:
  1527. # Case unordered, when `nb_jobs > 0`:
  1528. # It means that a job is ready to be retrieved, so no timeout
  1529. # will occur during this iteration.
  1530. # Before proceeding to retrieval of the next ready job, reset
  1531. # the timeout control state to prepare the next iteration.
  1532. timeout_control_job._completion_timeout_counter = None
  1533. timeout_control_job = None
  1534. # We need to be careful: the job list can be filling up as
  1535. # we empty it and Python list are not thread-safe by
  1536. # default hence the use of the lock
  1537. with self._lock:
  1538. batched_results = self._jobs.popleft()
  1539. if not self.return_ordered:
  1540. self._jobs_set.remove(batched_results)
  1541. # Flatten the batched results to output one output at a time
  1542. batched_results = batched_results.get_result(self.timeout)
  1543. for result in batched_results:
  1544. self._nb_consumed += 1
  1545. yield result
  1546. def _raise_error_fast(self):
  1547. """If we are aborting, raise if a job caused an error."""
  1548. # Find the first job whose status is TASK_ERROR if it exists.
  1549. with self._lock:
  1550. error_job = next(
  1551. (job for job in self._jobs if job.status == TASK_ERROR), None
  1552. )
  1553. # If this error job exists, immediately raise the error by
  1554. # calling get_result. This job might not exists if abort has been
  1555. # called directly or if the generator is gc'ed.
  1556. if error_job is not None:
  1557. error_job.get_result(self.timeout)
  1558. def _warn_exit_early(self):
  1559. """Warn the user if the generator is gc'ed before being consumned."""
  1560. ready_outputs = self.n_completed_tasks - self._nb_consumed
  1561. is_completed = self._is_completed()
  1562. msg = ""
  1563. if ready_outputs:
  1564. msg += (
  1565. f"{ready_outputs} tasks have been successfully executed but not used."
  1566. )
  1567. if not is_completed:
  1568. msg += " Additionally, "
  1569. if not is_completed:
  1570. msg += (
  1571. f"{self.n_dispatched_tasks - self.n_completed_tasks} tasks "
  1572. "which were still being processed by the workers have been "
  1573. "cancelled."
  1574. )
  1575. if msg:
  1576. msg += (
  1577. " You could benefit from adjusting the input task "
  1578. "iterator to limit unnecessary computation time."
  1579. )
  1580. warnings.warn(msg)
  1581. def _get_sequential_output(self, iterable):
  1582. """Separate loop for sequential output.
  1583. This simplifies the traceback in case of errors and reduces the
  1584. overhead of calling sequential tasks with `joblib`.
  1585. """
  1586. try:
  1587. self._iterating = True
  1588. self._original_iterator = iterable
  1589. batch_size = self._get_batch_size()
  1590. if batch_size != 1:
  1591. it = iter(iterable)
  1592. iterable_batched = iter(
  1593. lambda: tuple(itertools.islice(it, batch_size)), ()
  1594. )
  1595. iterable = (task for batch in iterable_batched for task in batch)
  1596. # first yield returns None, for internal use only. This ensures
  1597. # that we enter the try/except block and setup the generator.
  1598. yield None
  1599. # Sequentially call the tasks and yield the results.
  1600. for func, args, kwargs in iterable:
  1601. self.n_dispatched_batches += 1
  1602. self.n_dispatched_tasks += 1
  1603. res = func(*args, **kwargs)
  1604. self.n_completed_tasks += 1
  1605. self.print_progress()
  1606. yield res
  1607. self._nb_consumed += 1
  1608. except BaseException:
  1609. self._exception = True
  1610. self._aborting = True
  1611. self._aborted = True
  1612. raise
  1613. finally:
  1614. self._running = False
  1615. self._iterating = False
  1616. self._original_iterator = None
  1617. self.print_progress()
  1618. def _reset_run_tracking(self):
  1619. """Reset the counters and flags used to track the execution."""
  1620. # Makes sur the parallel instance was not previously running in a
  1621. # thread-safe way.
  1622. with getattr(self, "_lock", nullcontext()):
  1623. if self._running:
  1624. msg = "This Parallel instance is already running !"
  1625. if self.return_generator is True:
  1626. msg += (
  1627. " Before submitting new tasks, you must wait for the "
  1628. "completion of all the previous tasks, or clean all "
  1629. "references to the output generator."
  1630. )
  1631. raise RuntimeError(msg)
  1632. self._running = True
  1633. # Counter to keep track of the task dispatched and completed.
  1634. self.n_dispatched_batches = 0
  1635. self.n_dispatched_tasks = 0
  1636. self.n_completed_tasks = 0
  1637. # Following count is incremented by one each time the user iterates
  1638. # on the output generator, it is used to prepare an informative
  1639. # warning message in case the generator is deleted before all the
  1640. # dispatched tasks have been consumed.
  1641. self._nb_consumed = 0
  1642. # Following flags are used to synchronize the threads in case one of
  1643. # the tasks error-out to ensure that all workers abort fast and that
  1644. # the backend terminates properly.
  1645. # Set to True as soon as a worker signals that a task errors-out
  1646. self._exception = False
  1647. # Set to True in case of early termination following an incident
  1648. self._aborting = False
  1649. # Set to True after abortion is complete
  1650. self._aborted = False
  1651. def __call__(self, iterable):
  1652. """Main function to dispatch parallel tasks."""
  1653. self._reset_run_tracking()
  1654. self.n_tasks = len(iterable) if hasattr(iterable, "__len__") else None
  1655. self._start_time = time.time()
  1656. if not self._managed_backend:
  1657. n_jobs = self._initialize_backend()
  1658. else:
  1659. n_jobs = self._effective_n_jobs()
  1660. if n_jobs == 1:
  1661. # If n_jobs==1, run the computation sequentially and return
  1662. # immediately to avoid overheads.
  1663. output = self._get_sequential_output(iterable)
  1664. next(output)
  1665. return output if self.return_generator else list(output)
  1666. # Let's create an ID that uniquely identifies the current call. If the
  1667. # call is interrupted early and that the same instance is immediately
  1668. # reused, this id will be used to prevent workers that were
  1669. # concurrently finalizing a task from the previous call to run the
  1670. # callback.
  1671. with self._lock:
  1672. self._call_id = uuid4().hex
  1673. # self._effective_n_jobs should be called in the Parallel.__call__
  1674. # thread only -- store its value in an attribute for further queries.
  1675. self._cached_effective_n_jobs = n_jobs
  1676. if isinstance(self._backend, LokyBackend):
  1677. # For the loky backend, we add a callback executed when reducing
  1678. # BatchCalls, that makes the loky executor use a temporary folder
  1679. # specific to this Parallel object when pickling temporary memmaps.
  1680. # This callback is necessary to ensure that several Parallel
  1681. # objects using the same reusable executor don't use the same
  1682. # temporary resources.
  1683. def _batched_calls_reducer_callback():
  1684. # Relevant implementation detail: the following lines, called
  1685. # when reducing BatchedCalls, are called in a thread-safe
  1686. # situation, meaning that the context of the temporary folder
  1687. # manager will not be changed in between the callback execution
  1688. # and the end of the BatchedCalls pickling. The reason is that
  1689. # pickling (the only place where set_current_context is used)
  1690. # is done from a single thread (the queue_feeder_thread).
  1691. self._backend._workers._temp_folder_manager.set_current_context( # noqa
  1692. self._id
  1693. )
  1694. self._reducer_callback = _batched_calls_reducer_callback
  1695. # self._effective_n_jobs should be called in the Parallel.__call__
  1696. # thread only -- store its value in an attribute for further queries.
  1697. self._cached_effective_n_jobs = n_jobs
  1698. backend_name = self._backend.__class__.__name__
  1699. if n_jobs == 0:
  1700. raise RuntimeError("%s has no active worker." % backend_name)
  1701. self._print(f"Using backend {backend_name} with {n_jobs} concurrent workers.")
  1702. if hasattr(self._backend, "start_call"):
  1703. self._backend.start_call()
  1704. # Following flag prevents double calls to `backend.stop_call`.
  1705. self._calling = True
  1706. iterator = iter(iterable)
  1707. pre_dispatch = self.pre_dispatch
  1708. if pre_dispatch == "all":
  1709. # prevent further dispatch via multiprocessing callback thread
  1710. self._original_iterator = None
  1711. self._pre_dispatch_amount = 0
  1712. else:
  1713. self._original_iterator = iterator
  1714. if hasattr(pre_dispatch, "endswith"):
  1715. pre_dispatch = eval_expr(pre_dispatch.replace("n_jobs", str(n_jobs)))
  1716. self._pre_dispatch_amount = pre_dispatch = int(pre_dispatch)
  1717. # The main thread will consume the first pre_dispatch items and
  1718. # the remaining items will later be lazily dispatched by async
  1719. # callbacks upon task completions.
  1720. # TODO: this iterator should be batch_size * n_jobs
  1721. iterator = itertools.islice(iterator, self._pre_dispatch_amount)
  1722. # Use a caching dict for callables that are pickled with cloudpickle to
  1723. # improve performances. This cache is used only in the case of
  1724. # functions that are defined in the __main__ module, functions that
  1725. # are defined locally (inside another function) and lambda expressions.
  1726. self._pickle_cache = dict()
  1727. output = self._get_outputs(iterator, pre_dispatch)
  1728. self._call_ref = weakref.ref(output)
  1729. # The first item from the output is blank, but it makes the interpreter
  1730. # progress until it enters the Try/Except block of the generator and
  1731. # reaches the first `yield` statement. This starts the asynchronous
  1732. # dispatch of the tasks to the workers.
  1733. next(output)
  1734. return output if self.return_generator else list(output)
  1735. def __repr__(self):
  1736. return "%s(n_jobs=%s)" % (self.__class__.__name__, self.n_jobs)