test_parallel.py 76 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250
  1. """
  2. Test the parallel module.
  3. """
  4. # Author: Gael Varoquaux <gael dot varoquaux at normalesup dot org>
  5. # Copyright (c) 2010-2011 Gael Varoquaux
  6. # License: BSD Style, 3 clauses.
  7. import mmap
  8. import os
  9. import re
  10. import sys
  11. import threading
  12. import time
  13. import warnings
  14. import weakref
  15. from contextlib import nullcontext
  16. from math import sqrt
  17. from multiprocessing import TimeoutError
  18. from pickle import PicklingError
  19. from time import sleep
  20. from traceback import format_exception
  21. import pytest
  22. import joblib
  23. from joblib import dump, load, parallel
  24. from joblib._multiprocessing_helpers import mp
  25. from joblib.test.common import (
  26. IS_GIL_DISABLED,
  27. np,
  28. with_multiprocessing,
  29. with_numpy,
  30. )
  31. from joblib.testing import check_subprocess_call, parametrize, raises, skipif, warns
  32. if mp is not None:
  33. # Loky is not available if multiprocessing is not
  34. from joblib.externals.loky import get_reusable_executor
  35. from queue import Queue
  36. try:
  37. import posix
  38. except ImportError:
  39. posix = None
  40. try:
  41. from ._openmp_test_helper.parallel_sum import parallel_sum
  42. except ImportError:
  43. parallel_sum = None
  44. try:
  45. import distributed
  46. except ImportError:
  47. distributed = None
  48. from joblib._parallel_backends import (
  49. LokyBackend,
  50. MultiprocessingBackend,
  51. ParallelBackendBase,
  52. SequentialBackend,
  53. ThreadingBackend,
  54. )
  55. from joblib.parallel import (
  56. BACKENDS,
  57. Parallel,
  58. cpu_count,
  59. delayed,
  60. effective_n_jobs,
  61. mp,
  62. parallel_backend,
  63. parallel_config,
  64. register_parallel_backend,
  65. )
  66. RETURN_GENERATOR_BACKENDS = BACKENDS.copy()
  67. RETURN_GENERATOR_BACKENDS.pop("multiprocessing", None)
  68. ALL_VALID_BACKENDS = [None] + sorted(BACKENDS.keys())
  69. # Add instances of backend classes deriving from ParallelBackendBase
  70. ALL_VALID_BACKENDS += [BACKENDS[backend_str]() for backend_str in BACKENDS]
  71. if mp is None:
  72. PROCESS_BACKENDS = []
  73. else:
  74. PROCESS_BACKENDS = ["multiprocessing", "loky"]
  75. PARALLEL_BACKENDS = PROCESS_BACKENDS + ["threading"]
  76. if hasattr(mp, "get_context"):
  77. # Custom multiprocessing context in Python 3.4+
  78. ALL_VALID_BACKENDS.append(mp.get_context("spawn"))
  79. def get_default_backend_instance():
  80. # The default backend can be changed before running the tests through
  81. # JOBLIB_DEFAULT_PARALLEL_BACKEND environment variable so we need to use
  82. # parallel.DEFAULT_BACKEND here and not
  83. # from joblib.parallel import DEFAULT_BACKEND
  84. return BACKENDS[parallel.DEFAULT_BACKEND]
  85. def get_workers(backend):
  86. return getattr(backend, "_pool", getattr(backend, "_workers", None))
  87. def division(x, y):
  88. return x / y
  89. def square(x):
  90. return x**2
  91. class MyExceptionWithFinickyInit(Exception):
  92. """An exception class with non trivial __init__"""
  93. def __init__(self, a, b, c, d):
  94. pass
  95. def exception_raiser(x, custom_exception=False):
  96. if x == 7:
  97. raise (
  98. MyExceptionWithFinickyInit("a", "b", "c", "d")
  99. if custom_exception
  100. else ValueError
  101. )
  102. return x
  103. def interrupt_raiser(x):
  104. time.sleep(0.05)
  105. raise KeyboardInterrupt
  106. def f(x, y=0, z=0):
  107. """A module-level function so that it can be spawn with
  108. multiprocessing.
  109. """
  110. return x**2 + y + z
  111. def _active_backend_type():
  112. return type(parallel.get_active_backend()[0])
  113. def parallel_func(inner_n_jobs, backend):
  114. return Parallel(n_jobs=inner_n_jobs, backend=backend)(
  115. delayed(square)(i) for i in range(3)
  116. )
  117. ###############################################################################
  118. def test_cpu_count():
  119. assert cpu_count() > 0
  120. def test_effective_n_jobs():
  121. assert effective_n_jobs() > 0
  122. @parametrize("context", [parallel_config, parallel_backend])
  123. @pytest.mark.parametrize(
  124. "backend_n_jobs, expected_n_jobs",
  125. [(3, 3), (-1, effective_n_jobs(n_jobs=-1)), (None, 1)],
  126. ids=["positive-int", "negative-int", "None"],
  127. )
  128. @with_multiprocessing
  129. def test_effective_n_jobs_None(context, backend_n_jobs, expected_n_jobs):
  130. # check the number of effective jobs when `n_jobs=None`
  131. # non-regression test for https://github.com/joblib/joblib/issues/984
  132. with context("threading", n_jobs=backend_n_jobs):
  133. # when using a backend, the default of number jobs will be the one set
  134. # in the backend
  135. assert effective_n_jobs(n_jobs=None) == expected_n_jobs
  136. # without any backend, None will default to a single job
  137. assert effective_n_jobs(n_jobs=None) == 1
  138. ###############################################################################
  139. # Test parallel
  140. @parametrize("backend", ALL_VALID_BACKENDS)
  141. @parametrize("n_jobs", [1, 2, -1, -2])
  142. @parametrize("verbose", [2, 11, 100])
  143. def test_simple_parallel(backend, n_jobs, verbose):
  144. assert [square(x) for x in range(5)] == Parallel(
  145. n_jobs=n_jobs, backend=backend, verbose=verbose
  146. )(delayed(square)(x) for x in range(5))
  147. @parametrize("backend", ALL_VALID_BACKENDS)
  148. @parametrize("n_jobs", [1, 2])
  149. def test_parallel_pretty_print(backend, n_jobs):
  150. n_tasks = 100
  151. pattern = re.compile(r"(Done\s+\d+ out of \d+ \|)")
  152. class ParallelLog(Parallel):
  153. messages = []
  154. def _print(self, msg):
  155. self.messages.append(msg)
  156. executor = ParallelLog(n_jobs=n_jobs, backend=backend, verbose=10000)
  157. executor([delayed(f)(i) for i in range(n_tasks)])
  158. lens = set()
  159. for message in executor.messages:
  160. if s := pattern.search(message):
  161. a, b = s.span()
  162. lens.add(b - a)
  163. assert len(lens) == 1
  164. @parametrize("backend", ALL_VALID_BACKENDS)
  165. def test_main_thread_renamed_no_warning(backend, monkeypatch):
  166. # Check that no default backend relies on the name of the main thread:
  167. # https://github.com/joblib/joblib/issues/180#issuecomment-253266247
  168. # Some programs use a different name for the main thread. This is the case
  169. # for uWSGI apps for instance.
  170. monkeypatch.setattr(
  171. target=threading.current_thread(),
  172. name="name",
  173. value="some_new_name_for_the_main_thread",
  174. )
  175. with warnings.catch_warnings(record=True) as warninfo:
  176. results = Parallel(n_jobs=2, backend=backend)(
  177. delayed(square)(x) for x in range(3)
  178. )
  179. assert results == [0, 1, 4]
  180. # Due to the default parameters of LokyBackend, there is a chance that
  181. # warninfo catches Warnings from worker timeouts. We remove it if it exists
  182. # We also remove DeprecationWarnings which could lead to false negatives.
  183. warninfo = [
  184. w
  185. for w in warninfo
  186. if "worker timeout" not in str(w.message)
  187. and not isinstance(w.message, DeprecationWarning)
  188. ]
  189. # Under Python 3.13 if backend='multiprocessing', you will get a
  190. # warning saying that forking a multi-threaded process is not a good idea,
  191. # we ignore them in this test
  192. if backend in [None, "multiprocessing"] or isinstance(
  193. backend, MultiprocessingBackend
  194. ):
  195. message_part = "multi-threaded, use of fork() may lead to deadlocks"
  196. warninfo = [w for w in warninfo if message_part not in str(w.message)]
  197. # The multiprocessing backend will raise a warning when detecting that is
  198. # started from the non-main thread. Let's check that there is no false
  199. # positive because of the name change.
  200. assert len(warninfo) == 0
  201. def _assert_warning_nested(backend, inner_n_jobs, expected):
  202. with warnings.catch_warnings(record=True) as warninfo:
  203. warnings.simplefilter("always")
  204. parallel_func(backend=backend, inner_n_jobs=inner_n_jobs)
  205. warninfo = [w.message for w in warninfo]
  206. if expected:
  207. if warninfo:
  208. warnings_are_correct = all(
  209. "backed parallel loops cannot" in each.args[0] for each in warninfo
  210. )
  211. # With free-threaded Python, when the outer backend is threading,
  212. # we might see more that one warning
  213. warnings_have_the_right_length = (
  214. len(warninfo) >= 1 if IS_GIL_DISABLED else len(warninfo) == 1
  215. )
  216. return warnings_are_correct and warnings_have_the_right_length
  217. return False
  218. else:
  219. assert not warninfo
  220. return True
  221. @with_multiprocessing
  222. @parametrize(
  223. "parent_backend,child_backend,expected",
  224. [
  225. ("loky", "multiprocessing", True),
  226. ("loky", "loky", False),
  227. ("multiprocessing", "multiprocessing", True),
  228. ("multiprocessing", "loky", True),
  229. ("threading", "multiprocessing", True),
  230. ("threading", "loky", True),
  231. ],
  232. )
  233. def test_nested_parallel_warnings(parent_backend, child_backend, expected):
  234. # no warnings if inner_n_jobs=1
  235. Parallel(n_jobs=2, backend=parent_backend)(
  236. delayed(_assert_warning_nested)(
  237. backend=child_backend, inner_n_jobs=1, expected=False
  238. )
  239. for _ in range(5)
  240. )
  241. # warnings if inner_n_jobs != 1 and expected
  242. res = Parallel(n_jobs=2, backend=parent_backend)(
  243. delayed(_assert_warning_nested)(
  244. backend=child_backend, inner_n_jobs=2, expected=expected
  245. )
  246. for _ in range(5)
  247. )
  248. # warning handling is not thread safe. One thread might see multiple
  249. # warning or no warning at all.
  250. if parent_backend == "threading":
  251. assert any(res)
  252. else:
  253. assert all(res)
  254. @with_multiprocessing
  255. @parametrize("backend", ["loky", "multiprocessing", "threading"])
  256. def test_background_thread_parallelism(backend):
  257. is_run_parallel = [False]
  258. def background_thread(is_run_parallel):
  259. with warnings.catch_warnings(record=True) as warninfo:
  260. Parallel(n_jobs=2)(delayed(sleep)(0.1) for _ in range(4))
  261. print(len(warninfo))
  262. is_run_parallel[0] = len(warninfo) == 0
  263. t = threading.Thread(target=background_thread, args=(is_run_parallel,))
  264. t.start()
  265. t.join()
  266. assert is_run_parallel[0]
  267. def nested_loop(backend):
  268. Parallel(n_jobs=2, backend=backend)(delayed(square)(0.01) for _ in range(2))
  269. @parametrize("child_backend", BACKENDS)
  270. @parametrize("parent_backend", BACKENDS)
  271. def test_nested_loop(parent_backend, child_backend):
  272. Parallel(n_jobs=2, backend=parent_backend)(
  273. delayed(nested_loop)(child_backend) for _ in range(2)
  274. )
  275. def raise_exception(backend):
  276. raise ValueError
  277. @with_multiprocessing
  278. def test_nested_loop_with_exception_with_loky():
  279. with raises(ValueError):
  280. with Parallel(n_jobs=2, backend="loky") as parallel:
  281. parallel([delayed(nested_loop)("loky"), delayed(raise_exception)("loky")])
  282. def test_mutate_input_with_threads():
  283. """Input is mutable when using the threading backend"""
  284. q = Queue(maxsize=5)
  285. Parallel(n_jobs=2, backend="threading")(delayed(q.put)(1) for _ in range(5))
  286. assert q.full()
  287. @parametrize("n_jobs", [1, 2, 3])
  288. def test_parallel_kwargs(n_jobs):
  289. """Check the keyword argument processing of pmap."""
  290. lst = range(10)
  291. assert [f(x, y=1) for x in lst] == Parallel(n_jobs=n_jobs)(
  292. delayed(f)(x, y=1) for x in lst
  293. )
  294. @parametrize("backend", PARALLEL_BACKENDS)
  295. def test_parallel_as_context_manager(backend):
  296. lst = range(10)
  297. expected = [f(x, y=1) for x in lst]
  298. with Parallel(n_jobs=4, backend=backend) as p:
  299. # Internally a pool instance has been eagerly created and is managed
  300. # via the context manager protocol
  301. managed_backend = p._backend
  302. # We make call with the managed parallel object several times inside
  303. # the managed block:
  304. assert expected == p(delayed(f)(x, y=1) for x in lst)
  305. assert expected == p(delayed(f)(x, y=1) for x in lst)
  306. # Those calls have all used the same pool instance:
  307. if mp is not None:
  308. assert get_workers(managed_backend) is get_workers(p._backend)
  309. # As soon as we exit the context manager block, the pool is terminated and
  310. # no longer referenced from the parallel object:
  311. if mp is not None:
  312. assert get_workers(p._backend) is None
  313. # It's still possible to use the parallel instance in non-managed mode:
  314. assert expected == p(delayed(f)(x, y=1) for x in lst)
  315. if mp is not None:
  316. assert get_workers(p._backend) is None
  317. @with_multiprocessing
  318. def test_parallel_pickling():
  319. """Check that pmap captures the errors when it is passed an object
  320. that cannot be pickled.
  321. """
  322. class UnpicklableObject(object):
  323. def __reduce__(self):
  324. raise RuntimeError("123")
  325. with raises(PicklingError, match=r"the task to send"):
  326. Parallel(n_jobs=2, backend="loky")(
  327. delayed(id)(UnpicklableObject()) for _ in range(10)
  328. )
  329. @with_numpy
  330. @with_multiprocessing
  331. @parametrize("byteorder", ["<", ">", "="])
  332. @parametrize("max_nbytes", [1, "1M"])
  333. def test_parallel_byteorder_corruption(byteorder, max_nbytes):
  334. def inspect_byteorder(x):
  335. return x, x.dtype.byteorder
  336. x = np.arange(6).reshape((2, 3)).view(f"{byteorder}i4")
  337. initial_np_byteorder = x.dtype.byteorder
  338. result = Parallel(n_jobs=2, backend="loky", max_nbytes=max_nbytes)(
  339. delayed(inspect_byteorder)(x) for _ in range(3)
  340. )
  341. for x_returned, byteorder_in_worker in result:
  342. assert byteorder_in_worker == initial_np_byteorder
  343. assert byteorder_in_worker == x_returned.dtype.byteorder
  344. np.testing.assert_array_equal(x, x_returned)
  345. @parametrize("backend", PARALLEL_BACKENDS)
  346. def test_parallel_timeout_success(backend):
  347. # Check that timeout isn't thrown when function is fast enough
  348. assert (
  349. len(
  350. Parallel(n_jobs=2, backend=backend, timeout=30)(
  351. delayed(sleep)(0.001) for x in range(10)
  352. )
  353. )
  354. == 10
  355. )
  356. @with_multiprocessing
  357. @parametrize("backend", PARALLEL_BACKENDS)
  358. def test_parallel_timeout_fail(backend):
  359. # Check that timeout properly fails when function is too slow
  360. with raises(TimeoutError):
  361. Parallel(n_jobs=2, backend=backend, timeout=0.01)(
  362. delayed(sleep)(10) for x in range(10)
  363. )
  364. @with_multiprocessing
  365. @parametrize("backend", set(RETURN_GENERATOR_BACKENDS) - {"sequential"})
  366. @parametrize("return_as", ["generator", "generator_unordered"])
  367. def test_parallel_timeout_fail_with_generator(backend, return_as):
  368. # Check that timeout properly fails when function is too slow with
  369. # return_as=generator
  370. with raises(TimeoutError):
  371. list(
  372. Parallel(n_jobs=2, backend=backend, return_as=return_as, timeout=0.1)(
  373. delayed(sleep)(10) for x in range(10)
  374. )
  375. )
  376. # Fast tasks and high timeout should not raise
  377. list(
  378. Parallel(n_jobs=2, backend=backend, return_as=return_as, timeout=10)(
  379. delayed(sleep)(0.01) for x in range(10)
  380. )
  381. )
  382. @with_multiprocessing
  383. @parametrize("backend", PROCESS_BACKENDS)
  384. def test_error_capture(backend):
  385. # Check that error are captured, and that correct exceptions
  386. # are raised.
  387. if mp is not None:
  388. with raises(ZeroDivisionError):
  389. Parallel(n_jobs=2, backend=backend)(
  390. [delayed(division)(x, y) for x, y in zip((0, 1), (1, 0))]
  391. )
  392. with raises(KeyboardInterrupt):
  393. Parallel(n_jobs=2, backend=backend)(
  394. [delayed(interrupt_raiser)(x) for x in (1, 0)]
  395. )
  396. # Try again with the context manager API
  397. with Parallel(n_jobs=2, backend=backend) as parallel:
  398. assert get_workers(parallel._backend) is not None
  399. original_workers = get_workers(parallel._backend)
  400. with raises(ZeroDivisionError):
  401. parallel([delayed(division)(x, y) for x, y in zip((0, 1), (1, 0))])
  402. # The managed pool should still be available and be in a working
  403. # state despite the previously raised (and caught) exception
  404. assert get_workers(parallel._backend) is not None
  405. # The pool should have been interrupted and restarted:
  406. assert get_workers(parallel._backend) is not original_workers
  407. assert [f(x, y=1) for x in range(10)] == parallel(
  408. delayed(f)(x, y=1) for x in range(10)
  409. )
  410. original_workers = get_workers(parallel._backend)
  411. with raises(KeyboardInterrupt):
  412. parallel([delayed(interrupt_raiser)(x) for x in (1, 0)])
  413. # The pool should still be available despite the exception
  414. assert get_workers(parallel._backend) is not None
  415. # The pool should have been interrupted and restarted:
  416. assert get_workers(parallel._backend) is not original_workers
  417. assert [f(x, y=1) for x in range(10)] == parallel(
  418. delayed(f)(x, y=1) for x in range(10)
  419. ), (
  420. parallel._iterating,
  421. parallel.n_completed_tasks,
  422. parallel.n_dispatched_tasks,
  423. parallel._aborting,
  424. )
  425. # Check that the inner pool has been terminated when exiting the
  426. # context manager
  427. assert get_workers(parallel._backend) is None
  428. else:
  429. with raises(KeyboardInterrupt):
  430. Parallel(n_jobs=2)([delayed(interrupt_raiser)(x) for x in (1, 0)])
  431. # wrapped exceptions should inherit from the class of the original
  432. # exception to make it easy to catch them
  433. with raises(ZeroDivisionError):
  434. Parallel(n_jobs=2)([delayed(division)(x, y) for x, y in zip((0, 1), (1, 0))])
  435. with raises(MyExceptionWithFinickyInit):
  436. Parallel(n_jobs=2, verbose=0)(
  437. (delayed(exception_raiser)(i, custom_exception=True) for i in range(30))
  438. )
  439. @with_multiprocessing
  440. @parametrize("backend", BACKENDS)
  441. def test_error_in_task_iterator(backend):
  442. def my_generator(raise_at=0):
  443. for i in range(20):
  444. if i == raise_at:
  445. raise ValueError("Iterator Raising Error")
  446. yield i
  447. with Parallel(n_jobs=2, backend=backend) as p:
  448. # The error is raised in the pre-dispatch phase
  449. with raises(ValueError, match="Iterator Raising Error"):
  450. p(delayed(square)(i) for i in my_generator(raise_at=0))
  451. # The error is raised when dispatching a new task after the
  452. # pre-dispatch (likely to happen in a different thread)
  453. with raises(ValueError, match="Iterator Raising Error"):
  454. p(delayed(square)(i) for i in my_generator(raise_at=5))
  455. # Same, but raises long after the pre-dispatch phase
  456. with raises(ValueError, match="Iterator Raising Error"):
  457. p(delayed(square)(i) for i in my_generator(raise_at=19))
  458. def consumer(queue, item):
  459. queue.append("Consumed %s" % item)
  460. @parametrize("backend", BACKENDS)
  461. @parametrize(
  462. "batch_size, expected_queue",
  463. [
  464. (
  465. 1,
  466. [
  467. "Produced 0",
  468. "Consumed 0",
  469. "Produced 1",
  470. "Consumed 1",
  471. "Produced 2",
  472. "Consumed 2",
  473. "Produced 3",
  474. "Consumed 3",
  475. "Produced 4",
  476. "Consumed 4",
  477. "Produced 5",
  478. "Consumed 5",
  479. ],
  480. ),
  481. (
  482. 4,
  483. [ # First Batch
  484. "Produced 0",
  485. "Produced 1",
  486. "Produced 2",
  487. "Produced 3",
  488. "Consumed 0",
  489. "Consumed 1",
  490. "Consumed 2",
  491. "Consumed 3",
  492. # Second batch
  493. "Produced 4",
  494. "Produced 5",
  495. "Consumed 4",
  496. "Consumed 5",
  497. ],
  498. ),
  499. ],
  500. )
  501. def test_dispatch_one_job(backend, batch_size, expected_queue):
  502. """Test that with only one job, Parallel does act as a iterator."""
  503. queue = list()
  504. def producer():
  505. for i in range(6):
  506. queue.append("Produced %i" % i)
  507. yield i
  508. Parallel(n_jobs=1, batch_size=batch_size, backend=backend)(
  509. delayed(consumer)(queue, x) for x in producer()
  510. )
  511. assert queue == expected_queue
  512. assert len(queue) == 12
  513. @with_multiprocessing
  514. @parametrize("backend", PARALLEL_BACKENDS)
  515. def test_dispatch_multiprocessing(backend):
  516. """Check that using pre_dispatch Parallel does indeed dispatch items
  517. lazily.
  518. """
  519. manager = mp.Manager()
  520. queue = manager.list()
  521. def producer():
  522. for i in range(6):
  523. queue.append("Produced %i" % i)
  524. yield i
  525. Parallel(n_jobs=2, batch_size=1, pre_dispatch=3, backend=backend)(
  526. delayed(consumer)(queue, "any") for _ in producer()
  527. )
  528. queue_contents = list(queue)
  529. assert queue_contents[0] == "Produced 0"
  530. # Only 3 tasks are pre-dispatched out of 6. The 4th task is dispatched only
  531. # after any of the first 3 jobs have completed.
  532. first_consumption_index = queue_contents[:4].index("Consumed any")
  533. assert first_consumption_index > -1
  534. produced_3_index = queue_contents.index("Produced 3") # 4th task produced
  535. assert produced_3_index > first_consumption_index
  536. assert len(queue) == 12
  537. def test_batching_auto_threading():
  538. # batching='auto' with the threading backend leaves the effective batch
  539. # size to 1 (no batching) as it has been found to never be beneficial with
  540. # this low-overhead backend.
  541. with Parallel(n_jobs=2, batch_size="auto", backend="threading") as p:
  542. p(delayed(id)(i) for i in range(5000)) # many very fast tasks
  543. assert p._backend.compute_batch_size() == 1
  544. @with_multiprocessing
  545. @parametrize("backend", PROCESS_BACKENDS)
  546. def test_batching_auto_subprocesses(backend):
  547. with Parallel(n_jobs=2, batch_size="auto", backend=backend) as p:
  548. p(delayed(id)(i) for i in range(5000)) # many very fast tasks
  549. # It should be strictly larger than 1 but as we don't want heisen
  550. # failures on clogged CI worker environment be safe and only check that
  551. # it's a strictly positive number.
  552. assert p._backend.compute_batch_size() > 0
  553. def test_exception_dispatch():
  554. """Make sure that exception raised during dispatch are indeed captured"""
  555. with raises(ValueError):
  556. Parallel(n_jobs=2, pre_dispatch=16, verbose=0)(
  557. delayed(exception_raiser)(i) for i in range(30)
  558. )
  559. def nested_function_inner(i):
  560. Parallel(n_jobs=2)(delayed(exception_raiser)(j) for j in range(30))
  561. def nested_function_outer(i):
  562. Parallel(n_jobs=2)(delayed(nested_function_inner)(j) for j in range(30))
  563. @with_multiprocessing
  564. @parametrize("backend", PARALLEL_BACKENDS)
  565. @pytest.mark.xfail(reason="https://github.com/joblib/loky/pull/255")
  566. def test_nested_exception_dispatch(backend):
  567. """Ensure errors for nested joblib cases gets propagated
  568. We rely on the Python 3 built-in __cause__ system that already
  569. report this kind of information to the user.
  570. """
  571. with raises(ValueError) as excinfo:
  572. Parallel(n_jobs=2, backend=backend)(
  573. delayed(nested_function_outer)(i) for i in range(30)
  574. )
  575. # Check that important information such as function names are visible
  576. # in the final error message reported to the user
  577. report_lines = format_exception(excinfo.type, excinfo.value, excinfo.tb)
  578. report = "".join(report_lines)
  579. assert "nested_function_outer" in report
  580. assert "nested_function_inner" in report
  581. assert "exception_raiser" in report
  582. assert type(excinfo.value) is ValueError
  583. class FakeParallelBackend(SequentialBackend):
  584. """Pretends to run concurrently while running sequentially."""
  585. def configure(self, n_jobs=1, parallel=None, **backend_args):
  586. self.n_jobs = self.effective_n_jobs(n_jobs)
  587. self.parallel = parallel
  588. return n_jobs
  589. def effective_n_jobs(self, n_jobs=1):
  590. if n_jobs < 0:
  591. n_jobs = max(mp.cpu_count() + 1 + n_jobs, 1)
  592. return n_jobs
  593. def test_invalid_backend():
  594. with raises(ValueError, match="Invalid backend:"):
  595. Parallel(backend="unit-testing")
  596. with raises(ValueError, match="Invalid backend:"):
  597. with parallel_config(backend="unit-testing"):
  598. pass
  599. with raises(ValueError, match="Invalid backend:"):
  600. with parallel_config(backend="unit-testing"):
  601. pass
  602. @parametrize("backend", ALL_VALID_BACKENDS)
  603. def test_invalid_njobs(backend):
  604. with raises(ValueError) as excinfo:
  605. Parallel(n_jobs=0, backend=backend)._initialize_backend()
  606. assert "n_jobs == 0 in Parallel has no meaning" in str(excinfo.value)
  607. with raises(ValueError) as excinfo:
  608. Parallel(n_jobs=0.5, backend=backend)._initialize_backend()
  609. assert "n_jobs == 0 in Parallel has no meaning" in str(excinfo.value)
  610. with raises(ValueError) as excinfo:
  611. Parallel(n_jobs="2.3", backend=backend)._initialize_backend()
  612. assert "n_jobs could not be converted to int" in str(excinfo.value)
  613. with raises(ValueError) as excinfo:
  614. Parallel(n_jobs="invalid_str", backend=backend)._initialize_backend()
  615. assert "n_jobs could not be converted to int" in str(excinfo.value)
  616. @with_multiprocessing
  617. @parametrize("backend", PARALLEL_BACKENDS)
  618. @parametrize("n_jobs", ["2", 2.3, 2])
  619. def test_njobs_converted_to_int(backend, n_jobs):
  620. p = Parallel(n_jobs=n_jobs, backend=backend)
  621. assert p._effective_n_jobs() == 2
  622. res = p(delayed(square)(i) for i in range(10))
  623. assert all(r == square(i) for i, r in enumerate(res))
  624. def test_register_parallel_backend():
  625. try:
  626. register_parallel_backend("test_backend", FakeParallelBackend)
  627. assert "test_backend" in BACKENDS
  628. assert BACKENDS["test_backend"] == FakeParallelBackend
  629. finally:
  630. del BACKENDS["test_backend"]
  631. def test_overwrite_default_backend():
  632. default_backend_orig = parallel.DEFAULT_BACKEND
  633. assert _active_backend_type() == get_default_backend_instance()
  634. try:
  635. register_parallel_backend("threading", BACKENDS["threading"], make_default=True)
  636. assert _active_backend_type() == ThreadingBackend
  637. finally:
  638. # Restore the global default manually
  639. parallel.DEFAULT_BACKEND = default_backend_orig
  640. assert _active_backend_type() == get_default_backend_instance()
  641. @skipif(mp is not None, reason="Only without multiprocessing")
  642. def test_backend_no_multiprocessing():
  643. with warns(UserWarning, match="joblib backend '.*' is not available on.*"):
  644. Parallel(backend="loky")(delayed(square)(i) for i in range(3))
  645. # The below should now work without problems
  646. with parallel_config(backend="loky"):
  647. Parallel()(delayed(square)(i) for i in range(3))
  648. def check_backend_context_manager(context, backend_name):
  649. with context(backend_name, n_jobs=3):
  650. active_backend, active_n_jobs = parallel.get_active_backend()
  651. assert active_n_jobs == 3
  652. assert effective_n_jobs(3) == 3
  653. p = Parallel()
  654. assert p.n_jobs == 3
  655. if backend_name == "multiprocessing":
  656. assert type(active_backend) is MultiprocessingBackend
  657. assert type(p._backend) is MultiprocessingBackend
  658. elif backend_name == "loky":
  659. assert type(active_backend) is LokyBackend
  660. assert type(p._backend) is LokyBackend
  661. elif backend_name == "threading":
  662. assert type(active_backend) is ThreadingBackend
  663. assert type(p._backend) is ThreadingBackend
  664. elif backend_name.startswith("test_"):
  665. assert type(active_backend) is FakeParallelBackend
  666. assert type(p._backend) is FakeParallelBackend
  667. all_backends_for_context_manager = PARALLEL_BACKENDS[:]
  668. all_backends_for_context_manager.extend(["test_backend_%d" % i for i in range(3)])
  669. @with_multiprocessing
  670. @parametrize("backend", all_backends_for_context_manager)
  671. @parametrize("context", [parallel_backend, parallel_config])
  672. def test_backend_context_manager(monkeypatch, backend, context):
  673. if backend not in BACKENDS:
  674. monkeypatch.setitem(BACKENDS, backend, FakeParallelBackend)
  675. assert _active_backend_type() == get_default_backend_instance()
  676. # check that this possible to switch parallel backends sequentially
  677. check_backend_context_manager(context, backend)
  678. # The default backend is restored
  679. assert _active_backend_type() == get_default_backend_instance()
  680. # Check that context manager switching is thread safe:
  681. Parallel(n_jobs=2, backend="threading")(
  682. delayed(check_backend_context_manager)(context, b)
  683. for b in all_backends_for_context_manager
  684. if not b
  685. )
  686. # The default backend is again restored
  687. assert _active_backend_type() == get_default_backend_instance()
  688. class ParameterizedParallelBackend(SequentialBackend):
  689. """Pretends to run conncurrently while running sequentially."""
  690. def __init__(self, param=None):
  691. if param is None:
  692. raise ValueError("param should not be None")
  693. self.param = param
  694. @parametrize("context", [parallel_config, parallel_backend])
  695. def test_parameterized_backend_context_manager(monkeypatch, context):
  696. monkeypatch.setitem(BACKENDS, "param_backend", ParameterizedParallelBackend)
  697. assert _active_backend_type() == get_default_backend_instance()
  698. with context("param_backend", param=42, n_jobs=3):
  699. active_backend, active_n_jobs = parallel.get_active_backend()
  700. assert type(active_backend) is ParameterizedParallelBackend
  701. assert active_backend.param == 42
  702. assert active_n_jobs == 3
  703. p = Parallel()
  704. assert p.n_jobs == 3
  705. assert p._backend is active_backend
  706. results = p(delayed(sqrt)(i) for i in range(5))
  707. assert results == [sqrt(i) for i in range(5)]
  708. # The default backend is again restored
  709. assert _active_backend_type() == get_default_backend_instance()
  710. @parametrize("context", [parallel_config, parallel_backend])
  711. def test_directly_parameterized_backend_context_manager(context):
  712. assert _active_backend_type() == get_default_backend_instance()
  713. # Check that it's possible to pass a backend instance directly,
  714. # without registration
  715. with context(ParameterizedParallelBackend(param=43), n_jobs=5):
  716. active_backend, active_n_jobs = parallel.get_active_backend()
  717. assert type(active_backend) is ParameterizedParallelBackend
  718. assert active_backend.param == 43
  719. assert active_n_jobs == 5
  720. p = Parallel()
  721. assert p.n_jobs == 5
  722. assert p._backend is active_backend
  723. results = p(delayed(sqrt)(i) for i in range(5))
  724. assert results == [sqrt(i) for i in range(5)]
  725. # The default backend is again restored
  726. assert _active_backend_type() == get_default_backend_instance()
  727. def sleep_and_return_pid():
  728. sleep(0.1)
  729. return os.getpid()
  730. def get_nested_pids():
  731. assert _active_backend_type() == ThreadingBackend
  732. # Assert that the nested backend does not change the default number of
  733. # jobs used in Parallel
  734. assert Parallel()._effective_n_jobs() == 1
  735. # Assert that the tasks are running only on one process
  736. return Parallel(n_jobs=2)(delayed(sleep_and_return_pid)() for _ in range(2))
  737. class MyBackend(joblib._parallel_backends.LokyBackend):
  738. """Backend to test backward compatibility with older backends"""
  739. def get_nested_backend(
  740. self,
  741. ):
  742. # Older backends only return a backend, without n_jobs indications.
  743. return super(MyBackend, self).get_nested_backend()[0]
  744. register_parallel_backend("back_compat_backend", MyBackend)
  745. @with_multiprocessing
  746. @parametrize("backend", ["threading", "loky", "multiprocessing", "back_compat_backend"])
  747. @parametrize("context", [parallel_config, parallel_backend])
  748. def test_nested_backend_context_manager(context, backend):
  749. # Check that by default, nested parallel calls will always use the
  750. # ThreadingBackend
  751. with context(backend):
  752. pid_groups = Parallel(n_jobs=2)(delayed(get_nested_pids)() for _ in range(10))
  753. for pid_group in pid_groups:
  754. assert len(set(pid_group)) == 1
  755. @with_multiprocessing
  756. @parametrize("n_jobs", [2, -1, None])
  757. @parametrize("backend", PARALLEL_BACKENDS)
  758. @parametrize("context", [parallel_config, parallel_backend])
  759. def test_nested_backend_in_sequential(backend, n_jobs, context):
  760. # Check that by default, nested parallel calls will always use the
  761. # ThreadingBackend
  762. def check_nested_backend(expected_backend_type, expected_n_job):
  763. # Assert that the sequential backend at top level, does not change the
  764. # backend for nested calls.
  765. assert _active_backend_type() == BACKENDS[expected_backend_type]
  766. # Assert that the nested backend in SequentialBackend does not change
  767. # the default number of jobs used in Parallel
  768. expected_n_job = effective_n_jobs(expected_n_job)
  769. assert Parallel()._effective_n_jobs() == expected_n_job
  770. Parallel(n_jobs=1)(
  771. delayed(check_nested_backend)(parallel.DEFAULT_BACKEND, 1) for _ in range(10)
  772. )
  773. with context(backend, n_jobs=n_jobs):
  774. Parallel(n_jobs=1)(
  775. delayed(check_nested_backend)(backend, n_jobs) for _ in range(10)
  776. )
  777. def check_nesting_level(context, inner_backend, expected_level):
  778. with context(inner_backend) as ctx:
  779. if context is parallel_config:
  780. backend = ctx["backend"]
  781. if context is parallel_backend:
  782. backend = ctx[0]
  783. assert backend.nesting_level == expected_level
  784. @with_multiprocessing
  785. @parametrize("outer_backend", PARALLEL_BACKENDS)
  786. @parametrize("inner_backend", PARALLEL_BACKENDS)
  787. @parametrize("context", [parallel_config, parallel_backend])
  788. def test_backend_nesting_level(context, outer_backend, inner_backend):
  789. # Check that the nesting level for the backend is correctly set
  790. check_nesting_level(context, outer_backend, 0)
  791. Parallel(n_jobs=2, backend=outer_backend)(
  792. delayed(check_nesting_level)(context, inner_backend, 1) for _ in range(10)
  793. )
  794. with context(inner_backend, n_jobs=2):
  795. Parallel()(
  796. delayed(check_nesting_level)(context, inner_backend, 1) for _ in range(10)
  797. )
  798. @with_multiprocessing
  799. @parametrize("context", [parallel_config, parallel_backend])
  800. @parametrize("with_retrieve_callback", [True, False])
  801. def test_retrieval_context(context, with_retrieve_callback):
  802. import contextlib
  803. class MyBackend(ThreadingBackend):
  804. i = 0
  805. supports_retrieve_callback = with_retrieve_callback
  806. @contextlib.contextmanager
  807. def retrieval_context(self):
  808. self.i += 1
  809. yield
  810. register_parallel_backend("retrieval", MyBackend)
  811. def nested_call(n):
  812. return Parallel(n_jobs=2)(delayed(id)(i) for i in range(n))
  813. with context("retrieval") as ctx:
  814. Parallel(n_jobs=2)(delayed(nested_call)(i) for i in range(5))
  815. if context is parallel_config:
  816. assert ctx["backend"].i == 1
  817. if context is parallel_backend:
  818. assert ctx[0].i == 1
  819. ###############################################################################
  820. # Test helpers
  821. @parametrize("batch_size", [0, -1, 1.42])
  822. def test_invalid_batch_size(batch_size):
  823. with raises(ValueError):
  824. Parallel(batch_size=batch_size)
  825. @parametrize(
  826. "n_tasks, n_jobs, pre_dispatch, batch_size",
  827. [
  828. (2, 2, "all", "auto"),
  829. (2, 2, "n_jobs", "auto"),
  830. (10, 2, "n_jobs", "auto"),
  831. (517, 2, "n_jobs", "auto"),
  832. (10, 2, "n_jobs", "auto"),
  833. (10, 4, "n_jobs", "auto"),
  834. (200, 12, "n_jobs", "auto"),
  835. (25, 12, "2 * n_jobs", 1),
  836. (250, 12, "all", 1),
  837. (250, 12, "2 * n_jobs", 7),
  838. (200, 12, "2 * n_jobs", "auto"),
  839. ],
  840. )
  841. def test_dispatch_race_condition(n_tasks, n_jobs, pre_dispatch, batch_size):
  842. # Check that using (async-)dispatch does not yield a race condition on the
  843. # iterable generator that is not thread-safe natively.
  844. # This is a non-regression test for the "Pool seems closed" class of error
  845. params = {"n_jobs": n_jobs, "pre_dispatch": pre_dispatch, "batch_size": batch_size}
  846. expected = [square(i) for i in range(n_tasks)]
  847. results = Parallel(**params)(delayed(square)(i) for i in range(n_tasks))
  848. assert results == expected
  849. @with_multiprocessing
  850. def test_default_mp_context():
  851. mp_start_method = mp.get_start_method()
  852. p = Parallel(n_jobs=2, backend="multiprocessing")
  853. context = p._backend_kwargs.get("context")
  854. start_method = context.get_start_method()
  855. assert start_method == mp_start_method
  856. @with_numpy
  857. @with_multiprocessing
  858. @parametrize("backend", PROCESS_BACKENDS)
  859. def test_no_blas_crash_or_freeze_with_subprocesses(backend):
  860. if backend == "multiprocessing":
  861. # Use the spawn backend that is both robust and available on all
  862. # platforms
  863. backend = mp.get_context("spawn")
  864. # Check that on recent Python version, the 'spawn' start method can make
  865. # it possible to use multiprocessing in conjunction of any BLAS
  866. # implementation that happens to be used by numpy with causing a freeze or
  867. # a crash
  868. rng = np.random.RandomState(42)
  869. # call BLAS DGEMM to force the initialization of the internal thread-pool
  870. # in the main process
  871. a = rng.randn(1000, 1000)
  872. np.dot(a, a.T)
  873. # check that the internal BLAS thread-pool is not in an inconsistent state
  874. # in the worker processes managed by multiprocessing
  875. Parallel(n_jobs=2, backend=backend)(delayed(np.dot)(a, a.T) for i in range(2))
  876. UNPICKLABLE_CALLABLE_SCRIPT_TEMPLATE_NO_MAIN = """\
  877. from joblib import Parallel, delayed
  878. def square(x):
  879. return x ** 2
  880. backend = "{}"
  881. if backend == "spawn":
  882. from multiprocessing import get_context
  883. backend = get_context(backend)
  884. print(Parallel(n_jobs=2, backend=backend)(
  885. delayed(square)(i) for i in range(5)))
  886. """
  887. @with_multiprocessing
  888. @parametrize("backend", PROCESS_BACKENDS)
  889. def test_parallel_with_interactively_defined_functions(backend):
  890. # When using the "-c" flag, interactive functions defined in __main__
  891. # should work with any backend.
  892. if backend == "multiprocessing" and mp.get_start_method() != "fork":
  893. pytest.skip(
  894. "Require fork start method to use interactively defined "
  895. "functions with multiprocessing."
  896. )
  897. code = UNPICKLABLE_CALLABLE_SCRIPT_TEMPLATE_NO_MAIN.format(backend)
  898. check_subprocess_call(
  899. [sys.executable, "-c", code], timeout=10, stdout_regex=r"\[0, 1, 4, 9, 16\]"
  900. )
  901. UNPICKLABLE_CALLABLE_SCRIPT_TEMPLATE_MAIN = """\
  902. import sys
  903. # Make sure that joblib is importable in the subprocess launching this
  904. # script. This is needed in case we run the tests from the joblib root
  905. # folder without having installed joblib
  906. sys.path.insert(0, {joblib_root_folder!r})
  907. from joblib import Parallel, delayed
  908. def run(f, x):
  909. return f(x)
  910. {define_func}
  911. if __name__ == "__main__":
  912. backend = "{backend}"
  913. if backend == "spawn":
  914. from multiprocessing import get_context
  915. backend = get_context(backend)
  916. callable_position = "{callable_position}"
  917. if callable_position == "delayed":
  918. print(Parallel(n_jobs=2, backend=backend)(
  919. delayed(square)(i) for i in range(5)))
  920. elif callable_position == "args":
  921. print(Parallel(n_jobs=2, backend=backend)(
  922. delayed(run)(square, i) for i in range(5)))
  923. else:
  924. print(Parallel(n_jobs=2, backend=backend)(
  925. delayed(run)(f=square, x=i) for i in range(5)))
  926. """
  927. SQUARE_MAIN = """\
  928. def square(x):
  929. return x ** 2
  930. """
  931. SQUARE_LOCAL = """\
  932. def gen_square():
  933. def square(x):
  934. return x ** 2
  935. return square
  936. square = gen_square()
  937. """
  938. SQUARE_LAMBDA = """\
  939. square = lambda x: x ** 2
  940. """
  941. @with_multiprocessing
  942. @parametrize("backend", PROCESS_BACKENDS + ([] if mp is None else ["spawn"]))
  943. @parametrize("define_func", [SQUARE_MAIN, SQUARE_LOCAL, SQUARE_LAMBDA])
  944. @parametrize("callable_position", ["delayed", "args", "kwargs"])
  945. def test_parallel_with_unpicklable_functions_in_args(
  946. backend, define_func, callable_position, tmpdir
  947. ):
  948. if backend in ["multiprocessing", "spawn"] and (
  949. define_func != SQUARE_MAIN or sys.platform == "win32"
  950. ):
  951. pytest.skip("Not picklable with pickle")
  952. code = UNPICKLABLE_CALLABLE_SCRIPT_TEMPLATE_MAIN.format(
  953. define_func=define_func,
  954. backend=backend,
  955. callable_position=callable_position,
  956. joblib_root_folder=os.path.dirname(os.path.dirname(joblib.__file__)),
  957. )
  958. code_file = tmpdir.join("unpicklable_func_script.py")
  959. code_file.write(code)
  960. check_subprocess_call(
  961. [sys.executable, code_file.strpath],
  962. timeout=10,
  963. stdout_regex=r"\[0, 1, 4, 9, 16\]",
  964. )
  965. INTERACTIVE_DEFINED_FUNCTION_AND_CLASS_SCRIPT_CONTENT = """\
  966. import sys
  967. import faulthandler
  968. # Make sure that joblib is importable in the subprocess launching this
  969. # script. This is needed in case we run the tests from the joblib root
  970. # folder without having installed joblib
  971. sys.path.insert(0, {joblib_root_folder!r})
  972. from joblib import Parallel, delayed
  973. from functools import partial
  974. class MyClass:
  975. '''Class defined in the __main__ namespace'''
  976. def __init__(self, value):
  977. self.value = value
  978. def square(x, ignored=None, ignored2=None):
  979. '''Function defined in the __main__ namespace'''
  980. return x.value ** 2
  981. square2 = partial(square, ignored2='something')
  982. # Here, we do not need the `if __name__ == "__main__":` safeguard when
  983. # using the default `loky` backend (even on Windows).
  984. # To make debugging easier
  985. faulthandler.dump_traceback_later(30, exit=True)
  986. # The following baroque function call is meant to check that joblib
  987. # introspection rightfully uses cloudpickle instead of the (faster) pickle
  988. # module of the standard library when necessary. In particular cloudpickle is
  989. # necessary for functions and instances of classes interactively defined in the
  990. # __main__ module.
  991. print(Parallel(backend="loky", n_jobs=2)(
  992. delayed(square2)(MyClass(i), ignored=[dict(a=MyClass(1))])
  993. for i in range(5)
  994. ))
  995. """.format(joblib_root_folder=os.path.dirname(os.path.dirname(joblib.__file__)))
  996. @with_multiprocessing
  997. def test_parallel_with_interactively_defined_functions_loky(tmpdir):
  998. # loky accepts interactive functions defined in __main__ and does not
  999. # require if __name__ == '__main__' even when the __main__ module is
  1000. # defined by the result of the execution of a filesystem script.
  1001. script = tmpdir.join("joblib_interactively_defined_function.py")
  1002. script.write(INTERACTIVE_DEFINED_FUNCTION_AND_CLASS_SCRIPT_CONTENT)
  1003. check_subprocess_call(
  1004. [sys.executable, script.strpath],
  1005. stdout_regex=r"\[0, 1, 4, 9, 16\]",
  1006. timeout=None, # rely on faulthandler to kill the process
  1007. )
  1008. INTERACTIVELY_DEFINED_SUBCLASS_WITH_METHOD_SCRIPT_CONTENT = """\
  1009. import sys
  1010. # Make sure that joblib is importable in the subprocess launching this
  1011. # script. This is needed in case we run the tests from the joblib root
  1012. # folder without having installed joblib
  1013. sys.path.insert(0, {joblib_root_folder!r})
  1014. from joblib import Parallel, delayed, hash
  1015. import multiprocessing as mp
  1016. mp.util.log_to_stderr(5)
  1017. class MyList(list):
  1018. '''MyList is interactively defined by MyList.append is a built-in'''
  1019. def __hash__(self):
  1020. # XXX: workaround limitation in cloudpickle
  1021. return hash(self).__hash__()
  1022. l = MyList()
  1023. print(Parallel(backend="loky", n_jobs=2)(
  1024. delayed(l.append)(i) for i in range(3)
  1025. ))
  1026. """.format(joblib_root_folder=os.path.dirname(os.path.dirname(joblib.__file__)))
  1027. @with_multiprocessing
  1028. def test_parallel_with_interactively_defined_bound_method_loky(tmpdir):
  1029. script = tmpdir.join("joblib_interactive_bound_method_script.py")
  1030. script.write(INTERACTIVELY_DEFINED_SUBCLASS_WITH_METHOD_SCRIPT_CONTENT)
  1031. check_subprocess_call(
  1032. [sys.executable, script.strpath],
  1033. stdout_regex=r"\[None, None, None\]",
  1034. stderr_regex=r"LokyProcess",
  1035. timeout=15,
  1036. )
  1037. def test_parallel_with_exhausted_iterator():
  1038. exhausted_iterator = iter([])
  1039. assert Parallel(n_jobs=2)(exhausted_iterator) == []
  1040. def check_memmap(a):
  1041. if not isinstance(a, np.memmap):
  1042. raise TypeError("Expected np.memmap instance, got %r", type(a))
  1043. return a.copy() # return a regular array instead of a memmap
  1044. @with_numpy
  1045. @with_multiprocessing
  1046. @parametrize("backend", PROCESS_BACKENDS)
  1047. def test_auto_memmap_on_arrays_from_generator(backend):
  1048. # Non-regression test for a problem with a bad interaction between the
  1049. # GC collecting arrays recently created during iteration inside the
  1050. # parallel dispatch loop and the auto-memmap feature of Parallel.
  1051. # See: https://github.com/joblib/joblib/pull/294
  1052. def generate_arrays(n):
  1053. for i in range(n):
  1054. yield np.ones(10, dtype=np.float32) * i
  1055. # Use max_nbytes=1 to force the use of memory-mapping even for small
  1056. # arrays
  1057. results = Parallel(n_jobs=2, max_nbytes=1, backend=backend)(
  1058. delayed(check_memmap)(a) for a in generate_arrays(100)
  1059. )
  1060. for result, expected in zip(results, generate_arrays(len(results))):
  1061. np.testing.assert_array_equal(expected, result)
  1062. # Second call to force loky to adapt the executor by growing the number
  1063. # of worker processes. This is a non-regression test for:
  1064. # https://github.com/joblib/joblib/issues/629.
  1065. results = Parallel(n_jobs=4, max_nbytes=1, backend=backend)(
  1066. delayed(check_memmap)(a) for a in generate_arrays(100)
  1067. )
  1068. for result, expected in zip(results, generate_arrays(len(results))):
  1069. np.testing.assert_array_equal(expected, result)
  1070. def identity(arg):
  1071. return arg
  1072. @with_numpy
  1073. @with_multiprocessing
  1074. def test_memmap_with_big_offset(tmpdir):
  1075. fname = tmpdir.join("test.mmap").strpath
  1076. size = mmap.ALLOCATIONGRANULARITY
  1077. obj = [np.zeros(size, dtype="uint8"), np.ones(size, dtype="uint8")]
  1078. dump(obj, fname)
  1079. memmap = load(fname, mmap_mode="r")
  1080. (result,) = Parallel(n_jobs=2)(delayed(identity)(memmap) for _ in [0])
  1081. assert isinstance(memmap[1], np.memmap)
  1082. assert memmap[1].offset > size
  1083. np.testing.assert_array_equal(obj, result)
  1084. def test_warning_about_timeout_not_supported_by_backend():
  1085. with warnings.catch_warnings(record=True) as warninfo:
  1086. Parallel(n_jobs=1, timeout=1)(delayed(square)(i) for i in range(50))
  1087. assert len(warninfo) == 1
  1088. w = warninfo[0]
  1089. assert isinstance(w.message, UserWarning)
  1090. assert str(w.message) == (
  1091. "The backend class 'SequentialBackend' does not support timeout. "
  1092. "You have set 'timeout=1' in Parallel but the 'timeout' parameter "
  1093. "will not be used."
  1094. )
  1095. def set_list_value(input_list, index, value):
  1096. input_list[index] = value
  1097. return value
  1098. @pytest.mark.parametrize("n_jobs", [1, 2, 4])
  1099. def test_parallel_return_order_with_return_as_generator_parameter(n_jobs):
  1100. # This test inserts values in a list in some expected order
  1101. # in sequential computing, and then checks that this order has been
  1102. # respected by Parallel output generator.
  1103. input_list = [0] * 5
  1104. result = Parallel(n_jobs=n_jobs, return_as="generator", backend="threading")(
  1105. delayed(set_list_value)(input_list, i, i) for i in range(5)
  1106. )
  1107. # Ensure that all the tasks are completed before checking the result
  1108. result = list(result)
  1109. assert all(v == r for v, r in zip(input_list, result))
  1110. def _sqrt_with_delay(e, delay):
  1111. if delay:
  1112. sleep(30)
  1113. return sqrt(e)
  1114. # Use a private function so it can also be called for the dask backend in
  1115. # test_dask.py without triggering the test twice.
  1116. # We isolate the test with the dask backend to simplify optional deps
  1117. # management and leaking environment variables.
  1118. def _test_parallel_unordered_generator_returns_fastest_first(backend, n_jobs):
  1119. # This test submits 10 tasks, but the second task is super slow. This test
  1120. # checks that the 9 other tasks return before the slow task is done, when
  1121. # `return_as` parameter is set to `'generator_unordered'`
  1122. result = Parallel(n_jobs=n_jobs, return_as="generator_unordered", backend=backend)(
  1123. delayed(_sqrt_with_delay)(i**2, (i == 1)) for i in range(10)
  1124. )
  1125. quickly_returned = sorted(next(result) for _ in range(9))
  1126. expected_quickly_returned = [0] + list(range(2, 10))
  1127. assert all(v == r for v, r in zip(expected_quickly_returned, quickly_returned))
  1128. del result
  1129. @pytest.mark.parametrize("n_jobs", [2, 4])
  1130. # NB: for this test to work, the backend must be allowed to process tasks
  1131. # concurrently, so at least two jobs with a non-sequential backend are
  1132. # mandatory.
  1133. @with_multiprocessing
  1134. @parametrize("backend", set(RETURN_GENERATOR_BACKENDS) - {"sequential"})
  1135. def test_parallel_unordered_generator_returns_fastest_first(backend, n_jobs):
  1136. _test_parallel_unordered_generator_returns_fastest_first(backend, n_jobs)
  1137. @parametrize("backend", ALL_VALID_BACKENDS)
  1138. @parametrize("n_jobs", [1, 2, -2, -1])
  1139. def test_abort_backend(n_jobs, backend):
  1140. delays = ["a"] + [10] * 100
  1141. with raises(TypeError):
  1142. t_start = time.time()
  1143. Parallel(n_jobs=n_jobs, backend=backend)(delayed(time.sleep)(i) for i in delays)
  1144. dt = time.time() - t_start
  1145. assert dt < 20
  1146. def get_large_object(arg):
  1147. result = np.ones(int(5 * 1e5), dtype=bool)
  1148. result[0] = False
  1149. return result
  1150. # Use a private function so it can also be called for the dask backend in
  1151. # test_dask.py without triggering the test twice.
  1152. # We isolate the test with the dask backend to simplify optional deps
  1153. # management and leaking environment variables.
  1154. def _test_deadlock_with_generator(backend, return_as, n_jobs):
  1155. # Non-regression test for a race condition in the backends when the pickler
  1156. # is delayed by a large object.
  1157. with Parallel(n_jobs=n_jobs, backend=backend, return_as=return_as) as parallel:
  1158. result = parallel(delayed(get_large_object)(i) for i in range(10))
  1159. next(result)
  1160. next(result)
  1161. del result
  1162. @with_numpy
  1163. @parametrize("backend", RETURN_GENERATOR_BACKENDS)
  1164. @parametrize("return_as", ["generator", "generator_unordered"])
  1165. @parametrize("n_jobs", [1, 2, -2, -1])
  1166. def test_deadlock_with_generator(backend, return_as, n_jobs):
  1167. _test_deadlock_with_generator(backend, return_as, n_jobs)
  1168. @parametrize("backend", RETURN_GENERATOR_BACKENDS)
  1169. @parametrize("return_as", ["generator", "generator_unordered"])
  1170. @parametrize("n_jobs", [1, 2, -2, -1])
  1171. def test_multiple_generator_call(backend, return_as, n_jobs):
  1172. # Non-regression test that ensures the dispatch of the tasks starts
  1173. # immediately when Parallel.__call__ is called. This test relies on the
  1174. # assumption that only one generator can be submitted at a time.
  1175. with raises(RuntimeError, match="This Parallel instance is already running"):
  1176. parallel = Parallel(n_jobs, backend=backend, return_as=return_as)
  1177. g = parallel(delayed(sleep)(1) for _ in range(10)) # noqa: F841
  1178. t_start = time.time()
  1179. gen2 = parallel(delayed(id)(i) for i in range(100)) # noqa: F841
  1180. # Make sure that the error is raised quickly
  1181. assert time.time() - t_start < 2, (
  1182. "The error should be raised immediately when submitting a new task "
  1183. "but it took more than 2s."
  1184. )
  1185. del g
  1186. @parametrize("backend", RETURN_GENERATOR_BACKENDS)
  1187. @parametrize("return_as", ["generator", "generator_unordered"])
  1188. @parametrize("n_jobs", [1, 2, -2, -1])
  1189. def test_multiple_generator_call_managed(backend, return_as, n_jobs):
  1190. # Non-regression test that ensures the dispatch of the tasks starts
  1191. # immediately when Parallel.__call__ is called. This test relies on the
  1192. # assumption that only one generator can be submitted at a time.
  1193. with Parallel(n_jobs, backend=backend, return_as=return_as) as parallel:
  1194. g = parallel(delayed(sleep)(10) for _ in range(10)) # noqa: F841
  1195. t_start = time.time()
  1196. with raises(RuntimeError, match="This Parallel instance is already running"):
  1197. g2 = parallel(delayed(id)(i) for i in range(100)) # noqa: F841
  1198. # Make sure that the error is raised quickly
  1199. assert time.time() - t_start < 2, (
  1200. "The error should be raised immediately when submitting a new task "
  1201. "but it took more than 2s."
  1202. )
  1203. del g
  1204. @parametrize("backend", RETURN_GENERATOR_BACKENDS)
  1205. @parametrize("return_as_1", ["generator", "generator_unordered"])
  1206. @parametrize("return_as_2", ["generator", "generator_unordered"])
  1207. @parametrize("n_jobs", [1, 2, -2, -1])
  1208. def test_multiple_generator_call_separated(backend, return_as_1, return_as_2, n_jobs):
  1209. # Check that for separated Parallel, both tasks are correctly returned.
  1210. g = Parallel(n_jobs, backend=backend, return_as=return_as_1)(
  1211. delayed(sqrt)(i**2) for i in range(10)
  1212. )
  1213. g2 = Parallel(n_jobs, backend=backend, return_as=return_as_2)(
  1214. delayed(sqrt)(i**2) for i in range(10, 20)
  1215. )
  1216. if return_as_1 == "generator_unordered":
  1217. g = sorted(g)
  1218. if return_as_2 == "generator_unordered":
  1219. g2 = sorted(g2)
  1220. assert all(res == i for res, i in zip(g, range(10)))
  1221. assert all(res == i for res, i in zip(g2, range(10, 20)))
  1222. @parametrize(
  1223. "backend, error",
  1224. [
  1225. ("loky", True),
  1226. ("threading", False),
  1227. ("sequential", False),
  1228. ],
  1229. )
  1230. @parametrize("return_as_1", ["generator", "generator_unordered"])
  1231. @parametrize("return_as_2", ["generator", "generator_unordered"])
  1232. def test_multiple_generator_call_separated_gc(backend, return_as_1, return_as_2, error):
  1233. if (backend == "loky") and (mp is None):
  1234. pytest.skip("Requires multiprocessing")
  1235. # Check that in loky, only one call can be run at a time with
  1236. # a single executor.
  1237. parallel = Parallel(2, backend=backend, return_as=return_as_1)
  1238. g = parallel(delayed(sleep)(10) for i in range(10))
  1239. g_wr = weakref.finalize(g, lambda: print("Generator collected"))
  1240. ctx = (
  1241. raises(RuntimeError, match="The executor underlying Parallel")
  1242. if error
  1243. else nullcontext()
  1244. )
  1245. with ctx:
  1246. # For loky, this call will raise an error as the gc of the previous
  1247. # generator will shutdown the shared executor.
  1248. # For the other backends, as the worker pools are not shared between
  1249. # the two calls, this should proceed correctly.
  1250. t_start = time.time()
  1251. g = Parallel(2, backend=backend, return_as=return_as_2)(
  1252. delayed(sqrt)(i**2) for i in range(10, 20)
  1253. )
  1254. if return_as_2 == "generator_unordered":
  1255. g = sorted(g)
  1256. assert all(res == i for res, i in zip(g, range(10, 20)))
  1257. assert time.time() - t_start < 5
  1258. # Make sure that the computation are stopped for the gc'ed generator
  1259. retry = 0
  1260. while g_wr.alive and retry < 3:
  1261. retry += 1
  1262. time.sleep(0.5)
  1263. assert time.time() - t_start < 5
  1264. if parallel._effective_n_jobs() != 1:
  1265. # check that the first parallel object is aborting (the final _aborted
  1266. # state might be delayed).
  1267. assert parallel._aborting
  1268. @with_numpy
  1269. @with_multiprocessing
  1270. @parametrize("backend", PROCESS_BACKENDS)
  1271. def test_memmapping_leaks(backend, tmpdir):
  1272. # Non-regression test for memmapping backends. Ensure that the data
  1273. # does not stay too long in memory
  1274. tmpdir = tmpdir.strpath
  1275. # Use max_nbytes=1 to force the use of memory-mapping even for small
  1276. # arrays
  1277. with Parallel(n_jobs=2, max_nbytes=1, backend=backend, temp_folder=tmpdir) as p:
  1278. p(delayed(check_memmap)(a) for a in [np.random.random(10)] * 2)
  1279. # The memmap folder should not be clean in the context scope
  1280. assert len(os.listdir(tmpdir)) > 0
  1281. # Make sure that the shared memory is cleaned at the end when we exit
  1282. # the context
  1283. for _ in range(100):
  1284. if not os.listdir(tmpdir):
  1285. break
  1286. sleep(0.1)
  1287. else:
  1288. raise AssertionError("temporary directory of Parallel was not removed")
  1289. # Make sure that the shared memory is cleaned at the end of a call
  1290. p = Parallel(n_jobs=2, max_nbytes=1, backend=backend)
  1291. p(delayed(check_memmap)(a) for a in [np.random.random(10)] * 2)
  1292. for _ in range(100):
  1293. if not os.listdir(tmpdir):
  1294. break
  1295. sleep(0.1)
  1296. else:
  1297. raise AssertionError("temporary directory of Parallel was not removed")
  1298. @parametrize(
  1299. "backend", ([None, "threading"] if mp is None else [None, "loky", "threading"])
  1300. )
  1301. def test_lambda_expression(backend):
  1302. # cloudpickle is used to pickle delayed callables
  1303. results = Parallel(n_jobs=2, backend=backend)(
  1304. delayed(lambda x: x**2)(i) for i in range(10)
  1305. )
  1306. assert results == [i**2 for i in range(10)]
  1307. @with_multiprocessing
  1308. @parametrize("backend", PROCESS_BACKENDS)
  1309. def test_backend_batch_statistics_reset(backend):
  1310. """Test that a parallel backend correctly resets its batch statistics."""
  1311. n_jobs = 2
  1312. n_inputs = 500
  1313. task_time = 2.0 / n_inputs
  1314. p = Parallel(verbose=10, n_jobs=n_jobs, backend=backend)
  1315. p(delayed(time.sleep)(task_time) for i in range(n_inputs))
  1316. assert p._backend._effective_batch_size == p._backend._DEFAULT_EFFECTIVE_BATCH_SIZE
  1317. assert (
  1318. p._backend._smoothed_batch_duration
  1319. == p._backend._DEFAULT_SMOOTHED_BATCH_DURATION
  1320. )
  1321. p(delayed(time.sleep)(task_time) for i in range(n_inputs))
  1322. assert p._backend._effective_batch_size == p._backend._DEFAULT_EFFECTIVE_BATCH_SIZE
  1323. assert (
  1324. p._backend._smoothed_batch_duration
  1325. == p._backend._DEFAULT_SMOOTHED_BATCH_DURATION
  1326. )
  1327. @with_multiprocessing
  1328. @parametrize("context", [parallel_config, parallel_backend])
  1329. def test_backend_hinting_and_constraints(context):
  1330. for n_jobs in [1, 2, -1]:
  1331. assert type(Parallel(n_jobs=n_jobs)._backend) is get_default_backend_instance()
  1332. p = Parallel(n_jobs=n_jobs, prefer="threads")
  1333. assert type(p._backend) is ThreadingBackend
  1334. p = Parallel(n_jobs=n_jobs, prefer="processes")
  1335. assert type(p._backend) is LokyBackend
  1336. p = Parallel(n_jobs=n_jobs, require="sharedmem")
  1337. assert type(p._backend) is ThreadingBackend
  1338. # Explicit backend selection can override backend hinting although it
  1339. # is useless to pass a hint when selecting a backend.
  1340. p = Parallel(n_jobs=2, backend="loky", prefer="threads")
  1341. assert type(p._backend) is LokyBackend
  1342. with context("loky", n_jobs=2):
  1343. # Explicit backend selection by the user with the context manager
  1344. # should be respected when combined with backend hints only.
  1345. p = Parallel(prefer="threads")
  1346. assert type(p._backend) is LokyBackend
  1347. assert p.n_jobs == 2
  1348. with context("loky", n_jobs=2):
  1349. # Locally hard-coded n_jobs value is respected.
  1350. p = Parallel(n_jobs=3, prefer="threads")
  1351. assert type(p._backend) is LokyBackend
  1352. assert p.n_jobs == 3
  1353. with context("loky", n_jobs=2):
  1354. # Explicit backend selection by the user with the context manager
  1355. # should be ignored when the Parallel call has hard constraints.
  1356. # In this case, the default backend that supports shared mem is
  1357. # used an the default number of processes is used.
  1358. p = Parallel(require="sharedmem")
  1359. assert type(p._backend) is ThreadingBackend
  1360. assert p.n_jobs == 1
  1361. with context("loky", n_jobs=2):
  1362. p = Parallel(n_jobs=3, require="sharedmem")
  1363. assert type(p._backend) is ThreadingBackend
  1364. assert p.n_jobs == 3
  1365. @parametrize("n_jobs", [1, 2])
  1366. @parametrize("prefer", [None, "processes", "threads"])
  1367. def test_backend_hinting_always_running(n_jobs, prefer):
  1368. # Check that the backend hinting never results in an error
  1369. # Non-regression test for https://github.com/joblib/joblib/issues/1720
  1370. expected_results = [i**2 for i in range(10)]
  1371. results = Parallel(n_jobs=n_jobs, prefer=prefer)(
  1372. delayed(square)(i) for i in range(10)
  1373. )
  1374. assert results == expected_results
  1375. with parallel_config(prefer=prefer, n_jobs=n_jobs):
  1376. results = Parallel()(delayed(square)(i) for i in range(10))
  1377. assert results == expected_results
  1378. @parametrize("context", [parallel_config, parallel_backend])
  1379. def test_backend_hinting_and_constraints_with_custom_backends(capsys, context):
  1380. # Custom backends can declare that they use threads and have shared memory
  1381. # semantics:
  1382. class MyCustomThreadingBackend(ParallelBackendBase):
  1383. supports_sharedmem = True
  1384. use_threads = True
  1385. def apply_async(self):
  1386. pass
  1387. def effective_n_jobs(self, n_jobs):
  1388. return n_jobs
  1389. with context(MyCustomThreadingBackend()):
  1390. p = Parallel(n_jobs=2, prefer="processes") # ignored
  1391. assert type(p._backend) is MyCustomThreadingBackend
  1392. p = Parallel(n_jobs=2, require="sharedmem")
  1393. assert type(p._backend) is MyCustomThreadingBackend
  1394. class MyCustomProcessingBackend(ParallelBackendBase):
  1395. supports_sharedmem = False
  1396. use_threads = False
  1397. def apply_async(self):
  1398. pass
  1399. def effective_n_jobs(self, n_jobs):
  1400. return n_jobs
  1401. with context(MyCustomProcessingBackend()):
  1402. p = Parallel(n_jobs=2, prefer="processes")
  1403. assert type(p._backend) is MyCustomProcessingBackend
  1404. out, err = capsys.readouterr()
  1405. assert out == ""
  1406. assert err == ""
  1407. p = Parallel(n_jobs=2, require="sharedmem", verbose=10)
  1408. assert type(p._backend) is ThreadingBackend
  1409. out, err = capsys.readouterr()
  1410. expected = (
  1411. "Using ThreadingBackend as joblib backend "
  1412. "instead of MyCustomProcessingBackend as the latter "
  1413. "does not provide shared memory semantics."
  1414. )
  1415. assert out.strip() == expected
  1416. assert err == ""
  1417. with raises(ValueError):
  1418. Parallel(backend=MyCustomProcessingBackend(), require="sharedmem")
  1419. def test_invalid_backend_hinting_and_constraints():
  1420. with raises(ValueError):
  1421. Parallel(prefer="invalid")
  1422. with raises(ValueError):
  1423. Parallel(require="invalid")
  1424. with raises(ValueError):
  1425. # It is inconsistent to prefer process-based parallelism while
  1426. # requiring shared memory semantics.
  1427. Parallel(prefer="processes", require="sharedmem")
  1428. if mp is not None:
  1429. # It is inconsistent to ask explicitly for a process-based
  1430. # parallelism while requiring shared memory semantics.
  1431. with raises(ValueError):
  1432. Parallel(backend="loky", require="sharedmem")
  1433. with raises(ValueError):
  1434. Parallel(backend="multiprocessing", require="sharedmem")
  1435. def _recursive_backend_info(limit=3, **kwargs):
  1436. """Perform nested parallel calls and introspect the backend on the way"""
  1437. with Parallel(n_jobs=2) as p:
  1438. this_level = [(type(p._backend).__name__, p._backend.nesting_level)]
  1439. if limit == 0:
  1440. return this_level
  1441. results = p(
  1442. delayed(_recursive_backend_info)(limit=limit - 1, **kwargs)
  1443. for i in range(1)
  1444. )
  1445. return this_level + results[0]
  1446. @with_multiprocessing
  1447. @parametrize("backend", ["loky", "threading"])
  1448. @parametrize("context", [parallel_config, parallel_backend])
  1449. def test_nested_parallelism_limit(context, backend):
  1450. with context(backend, n_jobs=2):
  1451. backend_types_and_levels = _recursive_backend_info()
  1452. top_level_backend_type = backend.title() + "Backend"
  1453. expected_types_and_levels = [
  1454. (top_level_backend_type, 0),
  1455. ("ThreadingBackend", 1),
  1456. ("SequentialBackend", 2),
  1457. ("SequentialBackend", 2),
  1458. ]
  1459. assert backend_types_and_levels == expected_types_and_levels
  1460. def _recursive_parallel(nesting_limit=None):
  1461. """A horrible function that does recursive parallel calls"""
  1462. return Parallel()(delayed(_recursive_parallel)() for i in range(2))
  1463. @pytest.mark.no_cover
  1464. @parametrize("context", [parallel_config, parallel_backend])
  1465. @parametrize("backend", (["threading"] if mp is None else ["loky", "threading"]))
  1466. def test_thread_bomb_mitigation(context, backend):
  1467. # Test that recursive parallelism raises a recursion rather than
  1468. # saturating the operating system resources by creating a unbounded number
  1469. # of threads.
  1470. with context(backend, n_jobs=2):
  1471. with raises(BaseException) as excinfo:
  1472. _recursive_parallel()
  1473. exc = excinfo.value
  1474. if backend == "loky":
  1475. # Local import because loky may not be importable for lack of
  1476. # multiprocessing
  1477. from joblib.externals.loky.process_executor import TerminatedWorkerError # noqa
  1478. if isinstance(exc, (TerminatedWorkerError, PicklingError)):
  1479. # The recursion exception can itself cause an error when
  1480. # pickling it to be send back to the parent process. In this
  1481. # case the worker crashes but the original traceback is still
  1482. # printed on stderr. This could be improved but does not seem
  1483. # simple to do and this is not critical for users (as long
  1484. # as there is no process or thread bomb happening).
  1485. pytest.xfail("Loky worker crash when serializing RecursionError")
  1486. assert isinstance(exc, RecursionError)
  1487. def _run_parallel_sum():
  1488. env_vars = {}
  1489. for var in [
  1490. "OMP_NUM_THREADS",
  1491. "OPENBLAS_NUM_THREADS",
  1492. "MKL_NUM_THREADS",
  1493. "VECLIB_MAXIMUM_THREADS",
  1494. "NUMEXPR_NUM_THREADS",
  1495. "NUMBA_NUM_THREADS",
  1496. "ENABLE_IPC",
  1497. ]:
  1498. env_vars[var] = os.environ.get(var)
  1499. return env_vars, parallel_sum(100)
  1500. @parametrize("backend", ([None, "loky"] if mp is not None else [None]))
  1501. @skipif(parallel_sum is None, reason="Need OpenMP helper compiled")
  1502. def test_parallel_thread_limit(backend):
  1503. results = Parallel(n_jobs=2, backend=backend)(
  1504. delayed(_run_parallel_sum)() for _ in range(2)
  1505. )
  1506. expected_num_threads = max(cpu_count() // 2, 1)
  1507. for worker_env_vars, omp_num_threads in results:
  1508. assert omp_num_threads == expected_num_threads
  1509. for name, value in worker_env_vars.items():
  1510. if name.endswith("_THREADS"):
  1511. assert value == str(expected_num_threads)
  1512. else:
  1513. assert name == "ENABLE_IPC"
  1514. assert value == "1"
  1515. @parametrize("context", [parallel_config, parallel_backend])
  1516. @skipif(distributed is not None, reason="This test requires dask")
  1517. def test_dask_backend_when_dask_not_installed(context):
  1518. with raises(ValueError, match="Please install dask"):
  1519. context("dask")
  1520. @parametrize("context", [parallel_config, parallel_backend])
  1521. def test_zero_worker_backend(context):
  1522. # joblib.Parallel should reject with an explicit error message parallel
  1523. # backends that have no worker.
  1524. class ZeroWorkerBackend(ThreadingBackend):
  1525. def configure(self, *args, **kwargs):
  1526. return 0
  1527. def apply_async(self, func, callback=None): # pragma: no cover
  1528. raise TimeoutError("No worker available")
  1529. def effective_n_jobs(self, n_jobs): # pragma: no cover
  1530. return 0
  1531. expected_msg = "ZeroWorkerBackend has no active worker"
  1532. with context(ZeroWorkerBackend()):
  1533. with pytest.raises(RuntimeError, match=expected_msg):
  1534. Parallel(n_jobs=2)(delayed(id)(i) for i in range(2))
  1535. def test_globals_update_at_each_parallel_call():
  1536. # This is a non-regression test related to joblib issues #836 and #833.
  1537. # Cloudpickle versions between 0.5.4 and 0.7 introduced a bug where global
  1538. # variables changes in a parent process between two calls to
  1539. # joblib.Parallel would not be propagated into the workers.
  1540. global MY_GLOBAL_VARIABLE
  1541. MY_GLOBAL_VARIABLE = "original value"
  1542. def check_globals():
  1543. global MY_GLOBAL_VARIABLE
  1544. return MY_GLOBAL_VARIABLE
  1545. assert check_globals() == "original value"
  1546. workers_global_variable = Parallel(n_jobs=2)(
  1547. delayed(check_globals)() for i in range(2)
  1548. )
  1549. assert set(workers_global_variable) == {"original value"}
  1550. # Change the value of MY_GLOBAL_VARIABLE, and make sure this change gets
  1551. # propagated into the workers environment
  1552. MY_GLOBAL_VARIABLE = "changed value"
  1553. assert check_globals() == "changed value"
  1554. workers_global_variable = Parallel(n_jobs=2)(
  1555. delayed(check_globals)() for i in range(2)
  1556. )
  1557. assert set(workers_global_variable) == {"changed value"}
  1558. ##############################################################################
  1559. # Test environment variable in child env, in particular for limiting
  1560. # the maximal number of threads in C-library threadpools.
  1561. #
  1562. def _check_numpy_threadpool_limits():
  1563. import numpy as np
  1564. # Let's call BLAS on a Matrix Matrix multiplication with dimensions large
  1565. # enough to ensure that the threadpool managed by the underlying BLAS
  1566. # implementation is actually used so as to force its initialization.
  1567. a = np.random.randn(100, 100)
  1568. np.dot(a, a)
  1569. threadpoolctl = pytest.importorskip("threadpoolctl")
  1570. return threadpoolctl.threadpool_info()
  1571. def _parent_max_num_threads_for(child_module, parent_info):
  1572. for parent_module in parent_info:
  1573. if parent_module["filepath"] == child_module["filepath"]:
  1574. return parent_module["num_threads"]
  1575. raise ValueError(
  1576. "An unexpected module was loaded in child:\n{}".format(child_module)
  1577. )
  1578. def check_child_num_threads(workers_info, parent_info, num_threads):
  1579. # Check that the number of threads reported in workers_info is consistent
  1580. # with the expectation. We need to be careful to handle the cases where
  1581. # the requested number of threads is below max_num_thread for the library.
  1582. for child_threadpool_info in workers_info:
  1583. for child_module in child_threadpool_info:
  1584. parent_max_num_threads = _parent_max_num_threads_for(
  1585. child_module, parent_info
  1586. )
  1587. expected = {min(num_threads, parent_max_num_threads), num_threads}
  1588. assert child_module["num_threads"] in expected
  1589. @with_numpy
  1590. @with_multiprocessing
  1591. @parametrize("n_jobs", [2, 4, -2, -1])
  1592. def test_threadpool_limitation_in_child_loky(n_jobs):
  1593. # Check that the protection against oversubscription in workers is working
  1594. # using threadpoolctl functionalities.
  1595. # Skip this test if numpy is not linked to a BLAS library
  1596. parent_info = _check_numpy_threadpool_limits()
  1597. if len(parent_info) == 0:
  1598. pytest.skip(reason="Need a version of numpy linked to BLAS")
  1599. workers_threadpool_infos = Parallel(backend="loky", n_jobs=n_jobs)(
  1600. delayed(_check_numpy_threadpool_limits)() for i in range(2)
  1601. )
  1602. n_jobs = effective_n_jobs(n_jobs)
  1603. if n_jobs == 1:
  1604. expected_child_num_threads = parent_info[0]["num_threads"]
  1605. else:
  1606. expected_child_num_threads = max(cpu_count() // n_jobs, 1)
  1607. check_child_num_threads(
  1608. workers_threadpool_infos, parent_info, expected_child_num_threads
  1609. )
  1610. @with_numpy
  1611. @with_multiprocessing
  1612. @parametrize("inner_max_num_threads", [1, 2, 4, None])
  1613. @parametrize("n_jobs", [2, -1])
  1614. @parametrize("context", [parallel_config, parallel_backend])
  1615. def test_threadpool_limitation_in_child_context(context, n_jobs, inner_max_num_threads):
  1616. # Check that the protection against oversubscription in workers is working
  1617. # using threadpoolctl functionalities.
  1618. # Skip this test if numpy is not linked to a BLAS library
  1619. parent_info = _check_numpy_threadpool_limits()
  1620. if len(parent_info) == 0:
  1621. pytest.skip(reason="Need a version of numpy linked to BLAS")
  1622. with context("loky", inner_max_num_threads=inner_max_num_threads):
  1623. workers_threadpool_infos = Parallel(n_jobs=n_jobs)(
  1624. delayed(_check_numpy_threadpool_limits)() for i in range(2)
  1625. )
  1626. n_jobs = effective_n_jobs(n_jobs)
  1627. if n_jobs == 1:
  1628. expected_child_num_threads = parent_info[0]["num_threads"]
  1629. elif inner_max_num_threads is None:
  1630. expected_child_num_threads = max(cpu_count() // n_jobs, 1)
  1631. else:
  1632. expected_child_num_threads = inner_max_num_threads
  1633. check_child_num_threads(
  1634. workers_threadpool_infos, parent_info, expected_child_num_threads
  1635. )
  1636. @with_multiprocessing
  1637. @parametrize("n_jobs", [2, -1])
  1638. @parametrize("var_name", ["OPENBLAS_NUM_THREADS", "MKL_NUM_THREADS", "OMP_NUM_THREADS"])
  1639. @parametrize("context", [parallel_config, parallel_backend])
  1640. def test_threadpool_limitation_in_child_override(context, n_jobs, var_name):
  1641. # Check that environment variables set by the user on the main process
  1642. # always have the priority.
  1643. # Skip this test if the process is run sequetially
  1644. if effective_n_jobs(n_jobs) == 1:
  1645. pytest.skip("Skip test when n_jobs == 1")
  1646. # Clean up the existing executor because we change the environment of the
  1647. # parent at runtime and it is not detected in loky intentionally.
  1648. get_reusable_executor(reuse=True).shutdown()
  1649. def _get_env(var_name):
  1650. return os.environ.get(var_name)
  1651. original_var_value = os.environ.get(var_name)
  1652. try:
  1653. os.environ[var_name] = "4"
  1654. # Skip this test if numpy is not linked to a BLAS library
  1655. results = Parallel(n_jobs=n_jobs)(delayed(_get_env)(var_name) for i in range(2))
  1656. assert results == ["4", "4"]
  1657. with context("loky", inner_max_num_threads=1):
  1658. results = Parallel(n_jobs=n_jobs)(
  1659. delayed(_get_env)(var_name) for i in range(2)
  1660. )
  1661. assert results == ["1", "1"]
  1662. finally:
  1663. if original_var_value is None:
  1664. del os.environ[var_name]
  1665. else:
  1666. os.environ[var_name] = original_var_value
  1667. @with_multiprocessing
  1668. @parametrize("n_jobs", [2, 4, -1])
  1669. def test_loky_reuse_workers(n_jobs):
  1670. # Non-regression test for issue #967 where the workers are not reused when
  1671. # calling multiple Parallel loops.
  1672. def parallel_call(n_jobs):
  1673. x = range(10)
  1674. Parallel(n_jobs=n_jobs)(delayed(sum)(x) for i in range(10))
  1675. # Run a parallel loop and get the workers used for computations
  1676. parallel_call(n_jobs)
  1677. first_executor = get_reusable_executor(reuse=True)
  1678. # Ensure that the workers are reused for the next calls, as the executor is
  1679. # not restarted.
  1680. for _ in range(10):
  1681. parallel_call(n_jobs)
  1682. executor = get_reusable_executor(reuse=True)
  1683. assert executor == first_executor
  1684. def _set_initialized(status):
  1685. status[os.getpid()] = "initialized"
  1686. def _check_status(status, n_jobs, wait_workers=False):
  1687. pid = os.getpid()
  1688. state = status.get(pid, None)
  1689. assert state in ("initialized", "started"), (
  1690. f"worker should have been in initialized state, got {state}"
  1691. )
  1692. if not wait_workers:
  1693. return
  1694. status[pid] = "started"
  1695. # wait up to 30 seconds for the workers to be initialized
  1696. deadline = time.time() + 30
  1697. n_started = len([pid for pid, v in status.items() if v == "started"])
  1698. while time.time() < deadline and n_started < n_jobs:
  1699. time.sleep(0.1)
  1700. n_started = len([pid for pid, v in status.items() if v == "started"])
  1701. if time.time() >= deadline:
  1702. raise TimeoutError("Waited more than 30s to start all the workers")
  1703. return pid
  1704. @with_multiprocessing
  1705. @parametrize("n_jobs", [2, 4])
  1706. @parametrize("backend", PROCESS_BACKENDS)
  1707. @parametrize("context", [parallel_config, parallel_backend])
  1708. def test_initializer_context(n_jobs, backend, context):
  1709. manager = mp.Manager()
  1710. status = manager.dict()
  1711. # pass the initializer to the backend context
  1712. with context(
  1713. backend=backend,
  1714. n_jobs=n_jobs,
  1715. initializer=_set_initialized,
  1716. initargs=(status,),
  1717. ):
  1718. # check_status checks that the initializer is correctly call
  1719. Parallel()(delayed(_check_status)(status, n_jobs) for i in range(100))
  1720. @with_multiprocessing
  1721. @parametrize("n_jobs", [2, 4])
  1722. @parametrize("backend", PROCESS_BACKENDS)
  1723. def test_initializer_parallel(n_jobs, backend):
  1724. manager = mp.Manager()
  1725. status = manager.dict()
  1726. # pass the initializer directly to the Parallel call
  1727. # check_status checks that the initializer is called in all tasks
  1728. Parallel(
  1729. backend=backend,
  1730. n_jobs=n_jobs,
  1731. initializer=_set_initialized,
  1732. initargs=(status,),
  1733. )(delayed(_check_status)(status, n_jobs) for i in range(100))
  1734. @with_multiprocessing
  1735. @pytest.mark.parametrize("n_jobs", [2, 4])
  1736. def test_initializer_reused(n_jobs):
  1737. # Check that it is possible to pass initializer config via the `Parallel`
  1738. # call directly and the worker are reused when the arguments are the same.
  1739. n_repetitions = 3
  1740. manager = mp.Manager()
  1741. status = manager.dict()
  1742. pids = set()
  1743. for i in range(n_repetitions):
  1744. results = Parallel(
  1745. backend="loky",
  1746. n_jobs=n_jobs,
  1747. initializer=_set_initialized,
  1748. initargs=(status,),
  1749. )(
  1750. delayed(_check_status)(status, n_jobs, wait_workers=True)
  1751. for i in range(n_jobs)
  1752. )
  1753. pids = pids.union(set(results))
  1754. assert len(pids) == n_jobs, (
  1755. "The workers should be reused when the initializer is the same"
  1756. )
  1757. @with_multiprocessing
  1758. @pytest.mark.parametrize("n_jobs", [2, 4])
  1759. def test_initializer_not_reused(n_jobs):
  1760. # Check that when changing the initializer arguments, each parallel call uses its
  1761. # own initializer args, independently of the previous calls, hence the loky workers
  1762. # are not reused.
  1763. n_repetitions = 3
  1764. manager = mp.Manager()
  1765. pids = set()
  1766. for i in range(n_repetitions):
  1767. status = manager.dict()
  1768. results = Parallel(
  1769. backend="loky",
  1770. n_jobs=n_jobs,
  1771. initializer=_set_initialized,
  1772. initargs=(status,),
  1773. )(
  1774. delayed(_check_status)(status, n_jobs, wait_workers=True)
  1775. for i in range(n_jobs)
  1776. )
  1777. pids = pids.union(set(results))
  1778. assert len(pids) == n_repetitions * n_jobs, (
  1779. "The workers should not be reused when the initializer arguments change"
  1780. )