test_memmapping.py 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280
  1. import faulthandler
  2. import gc
  3. import itertools
  4. import mmap
  5. import os
  6. import pickle
  7. import platform
  8. import subprocess
  9. import sys
  10. import threading
  11. from time import sleep
  12. import pytest
  13. import joblib._memmapping_reducer as jmr
  14. from joblib._memmapping_reducer import (
  15. ArrayMemmapForwardReducer,
  16. _get_backing_memmap,
  17. _get_temp_dir,
  18. _strided_from_memmap,
  19. _WeakArrayKeyMap,
  20. has_shareable_memory,
  21. )
  22. from joblib.backports import make_memmap
  23. from joblib.executor import _TestingMemmappingExecutor as TestExecutor
  24. from joblib.parallel import Parallel, delayed
  25. from joblib.pool import MemmappingPool
  26. from joblib.test.common import (
  27. IS_GIL_DISABLED,
  28. np,
  29. with_dev_shm,
  30. with_multiprocessing,
  31. with_numpy,
  32. )
  33. from joblib.testing import parametrize, raises, skipif
  34. def setup_module():
  35. faulthandler.dump_traceback_later(timeout=300, exit=True)
  36. def teardown_module():
  37. faulthandler.cancel_dump_traceback_later()
  38. def check_memmap_and_send_back(array):
  39. assert _get_backing_memmap(array) is not None
  40. return array
  41. def check_array(args):
  42. """Dummy helper function to be executed in subprocesses
  43. Check that the provided array has the expected values in the provided
  44. range.
  45. """
  46. data, position, expected = args
  47. np.testing.assert_array_equal(data[position], expected)
  48. def inplace_double(args):
  49. """Dummy helper function to be executed in subprocesses
  50. Check that the input array has the right values in the provided range
  51. and perform an inplace modification to double the values in the range by
  52. two.
  53. """
  54. data, position, expected = args
  55. assert data[position] == expected
  56. data[position] *= 2
  57. np.testing.assert_array_equal(data[position], 2 * expected)
  58. @with_numpy
  59. @with_multiprocessing
  60. def test_memmap_based_array_reducing(tmpdir):
  61. """Check that it is possible to reduce a memmap backed array"""
  62. assert_array_equal = np.testing.assert_array_equal
  63. filename = tmpdir.join("test.mmap").strpath
  64. # Create a file larger than what will be used by a
  65. buffer = np.memmap(filename, dtype=np.float64, shape=500, mode="w+")
  66. # Fill the original buffer with negative markers to detect over of
  67. # underflow in case of test failures
  68. buffer[:] = -1.0 * np.arange(buffer.shape[0], dtype=buffer.dtype)
  69. buffer.flush()
  70. # Memmap a 2D fortran array on a offsetted subsection of the previous
  71. # buffer
  72. a = np.memmap(
  73. filename, dtype=np.float64, shape=(3, 5, 4), mode="r+", order="F", offset=4
  74. )
  75. a[:] = np.arange(60).reshape(a.shape)
  76. # Build various views that share the buffer with the original memmap
  77. # b is an memmap sliced view on an memmap instance
  78. b = a[1:-1, 2:-1, 2:4]
  79. # b2 is a memmap 2d with memmap 1d as base
  80. # non-regression test for https://github.com/joblib/joblib/issues/1703
  81. b2 = buffer.reshape(10, 50)
  82. # c and d are array views
  83. c = np.asarray(b)
  84. d = c.T
  85. # Array reducer with auto dumping disabled
  86. reducer = ArrayMemmapForwardReducer(None, tmpdir.strpath, "c", True)
  87. def reconstruct_array_or_memmap(x):
  88. cons, args = reducer(x)
  89. return cons(*args)
  90. # Reconstruct original memmap
  91. a_reconstructed = reconstruct_array_or_memmap(a)
  92. assert has_shareable_memory(a_reconstructed)
  93. assert isinstance(a_reconstructed, np.memmap)
  94. assert_array_equal(a_reconstructed, a)
  95. # Reconstruct strided memmap view
  96. b_reconstructed = reconstruct_array_or_memmap(b)
  97. assert has_shareable_memory(b_reconstructed)
  98. assert_array_equal(b_reconstructed, b)
  99. # Reconstruct memmap 2d with memmap 1d as base
  100. b2_reconstructed = reconstruct_array_or_memmap(b2)
  101. assert has_shareable_memory(b2_reconstructed)
  102. assert_array_equal(b2_reconstructed, b2)
  103. # Reconstruct arrays views on memmap base
  104. c_reconstructed = reconstruct_array_or_memmap(c)
  105. assert not isinstance(c_reconstructed, np.memmap)
  106. assert has_shareable_memory(c_reconstructed)
  107. assert_array_equal(c_reconstructed, c)
  108. d_reconstructed = reconstruct_array_or_memmap(d)
  109. assert not isinstance(d_reconstructed, np.memmap)
  110. assert has_shareable_memory(d_reconstructed)
  111. assert_array_equal(d_reconstructed, d)
  112. # Test graceful degradation on fake memmap instances with in-memory
  113. # buffers
  114. a3 = a * 3
  115. assert not has_shareable_memory(a3)
  116. a3_reconstructed = reconstruct_array_or_memmap(a3)
  117. assert not has_shareable_memory(a3_reconstructed)
  118. assert not isinstance(a3_reconstructed, np.memmap)
  119. assert_array_equal(a3_reconstructed, a * 3)
  120. # Test graceful degradation on arrays derived from fake memmap instances
  121. b3 = np.asarray(a3)
  122. assert not has_shareable_memory(b3)
  123. b3_reconstructed = reconstruct_array_or_memmap(b3)
  124. assert isinstance(b3_reconstructed, np.ndarray)
  125. assert not has_shareable_memory(b3_reconstructed)
  126. assert_array_equal(b3_reconstructed, b3)
  127. @with_numpy
  128. @with_multiprocessing
  129. @skipif(
  130. sys.platform != "win32", reason="PermissionError only easily triggerable on Windows"
  131. )
  132. def test_resource_tracker_retries_when_permissionerror(tmpdir):
  133. # Test resource_tracker retry mechanism when unlinking memmaps. See more
  134. # thorough information in the ``unlink_file`` documentation of joblib.
  135. filename = tmpdir.join("test.mmap").strpath
  136. cmd = """if 1:
  137. import os
  138. import numpy as np
  139. import time
  140. from joblib.externals.loky.backend import resource_tracker
  141. resource_tracker.VERBOSE = 1
  142. # Start the resource tracker
  143. resource_tracker.ensure_running()
  144. time.sleep(1)
  145. # Create a file containing numpy data
  146. memmap = np.memmap(r"{filename}", dtype=np.float64, shape=10, mode='w+')
  147. memmap[:] = np.arange(10).astype(np.int8).data
  148. memmap.flush()
  149. assert os.path.exists(r"{filename}")
  150. del memmap
  151. # Create a np.memmap backed by this file
  152. memmap = np.memmap(r"{filename}", dtype=np.float64, shape=10, mode='w+')
  153. resource_tracker.register(r"{filename}", "file")
  154. # Ask the resource_tracker to delete the file backing the np.memmap , this
  155. # should raise PermissionError that the resource_tracker will log.
  156. resource_tracker.maybe_unlink(r"{filename}", "file")
  157. # Wait for the resource_tracker to process the maybe_unlink before cleaning
  158. # up the memmap
  159. time.sleep(2)
  160. """.format(filename=filename)
  161. p = subprocess.Popen(
  162. [sys.executable, "-c", cmd], stderr=subprocess.PIPE, stdout=subprocess.PIPE
  163. )
  164. p.wait()
  165. out, err = p.communicate()
  166. assert p.returncode == 0, err.decode()
  167. assert out == b""
  168. msg = "tried to unlink {}, got PermissionError".format(filename)
  169. assert msg in err.decode()
  170. @with_numpy
  171. @with_multiprocessing
  172. def test_high_dimension_memmap_array_reducing(tmpdir):
  173. assert_array_equal = np.testing.assert_array_equal
  174. filename = tmpdir.join("test.mmap").strpath
  175. # Create a high dimensional memmap
  176. a = np.memmap(filename, dtype=np.float64, shape=(100, 15, 15, 3), mode="w+")
  177. a[:] = np.arange(100 * 15 * 15 * 3).reshape(a.shape)
  178. # Create some slices/indices at various dimensions
  179. b = a[0:10]
  180. c = a[:, 5:10]
  181. d = a[:, :, :, 0]
  182. e = a[1:3:4]
  183. # Array reducer with auto dumping disabled
  184. reducer = ArrayMemmapForwardReducer(None, tmpdir.strpath, "c", True)
  185. def reconstruct_array_or_memmap(x):
  186. cons, args = reducer(x)
  187. return cons(*args)
  188. a_reconstructed = reconstruct_array_or_memmap(a)
  189. assert has_shareable_memory(a_reconstructed)
  190. assert isinstance(a_reconstructed, np.memmap)
  191. assert_array_equal(a_reconstructed, a)
  192. b_reconstructed = reconstruct_array_or_memmap(b)
  193. assert has_shareable_memory(b_reconstructed)
  194. assert_array_equal(b_reconstructed, b)
  195. c_reconstructed = reconstruct_array_or_memmap(c)
  196. assert has_shareable_memory(c_reconstructed)
  197. assert_array_equal(c_reconstructed, c)
  198. d_reconstructed = reconstruct_array_or_memmap(d)
  199. assert has_shareable_memory(d_reconstructed)
  200. assert_array_equal(d_reconstructed, d)
  201. e_reconstructed = reconstruct_array_or_memmap(e)
  202. assert has_shareable_memory(e_reconstructed)
  203. assert_array_equal(e_reconstructed, e)
  204. @with_numpy
  205. def test__strided_from_memmap(tmpdir):
  206. fname = tmpdir.join("test.mmap").strpath
  207. size = 5 * mmap.ALLOCATIONGRANULARITY
  208. offset = mmap.ALLOCATIONGRANULARITY + 1
  209. # This line creates the mmap file that is reused later
  210. memmap_obj = np.memmap(fname, mode="w+", shape=size + offset)
  211. # filename, dtype, mode, offset, order, shape, strides, total_buffer_len
  212. memmap_obj = _strided_from_memmap(
  213. fname,
  214. dtype="uint8",
  215. mode="r",
  216. offset=offset,
  217. order="C",
  218. shape=size,
  219. strides=None,
  220. total_buffer_len=None,
  221. unlink_on_gc_collect=False,
  222. )
  223. assert isinstance(memmap_obj, np.memmap)
  224. assert memmap_obj.offset == offset
  225. memmap_backed_obj = _strided_from_memmap(
  226. fname,
  227. dtype="uint8",
  228. mode="r",
  229. offset=offset,
  230. order="C",
  231. shape=(size // 2,),
  232. strides=(2,),
  233. total_buffer_len=size,
  234. unlink_on_gc_collect=False,
  235. )
  236. assert _get_backing_memmap(memmap_backed_obj).offset == offset
  237. @with_numpy
  238. @with_multiprocessing
  239. @parametrize(
  240. "factory",
  241. [MemmappingPool, TestExecutor.get_memmapping_executor],
  242. ids=["multiprocessing", "loky"],
  243. )
  244. def test_pool_with_memmap(factory, tmpdir):
  245. """Check that subprocess can access and update shared memory memmap"""
  246. assert_array_equal = np.testing.assert_array_equal
  247. # Fork the subprocess before allocating the objects to be passed
  248. pool_temp_folder = tmpdir.mkdir("pool").strpath
  249. p = factory(10, max_nbytes=2, temp_folder=pool_temp_folder)
  250. try:
  251. filename = tmpdir.join("test.mmap").strpath
  252. a = np.memmap(filename, dtype=np.float32, shape=(3, 5), mode="w+")
  253. a.fill(1.0)
  254. p.map(
  255. inplace_double,
  256. [(a, (i, j), 1.0) for i in range(a.shape[0]) for j in range(a.shape[1])],
  257. )
  258. assert_array_equal(a, 2 * np.ones(a.shape))
  259. # Open a copy-on-write view on the previous data
  260. b = np.memmap(filename, dtype=np.float32, shape=(5, 3), mode="c")
  261. p.map(
  262. inplace_double,
  263. [(b, (i, j), 2.0) for i in range(b.shape[0]) for j in range(b.shape[1])],
  264. )
  265. # Passing memmap instances to the pool should not trigger the creation
  266. # of new files on the FS
  267. assert os.listdir(pool_temp_folder) == []
  268. # the original data is untouched
  269. assert_array_equal(a, 2 * np.ones(a.shape))
  270. assert_array_equal(b, 2 * np.ones(b.shape))
  271. # readonly maps can be read but not updated
  272. c = np.memmap(filename, dtype=np.float32, shape=(10,), mode="r", offset=5 * 4)
  273. with raises(AssertionError):
  274. p.map(check_array, [(c, i, 3.0) for i in range(c.shape[0])])
  275. # depending on the version of numpy one can either get a RuntimeError
  276. # or a ValueError
  277. with raises((RuntimeError, ValueError)):
  278. p.map(inplace_double, [(c, i, 2.0) for i in range(c.shape[0])])
  279. finally:
  280. # Clean all filehandlers held by the pool
  281. p.terminate()
  282. del p
  283. @with_numpy
  284. @with_multiprocessing
  285. @parametrize(
  286. "factory",
  287. [MemmappingPool, TestExecutor.get_memmapping_executor],
  288. ids=["multiprocessing", "loky"],
  289. )
  290. def test_pool_with_memmap_array_view(factory, tmpdir):
  291. """Check that subprocess can access and update shared memory array"""
  292. assert_array_equal = np.testing.assert_array_equal
  293. # Fork the subprocess before allocating the objects to be passed
  294. pool_temp_folder = tmpdir.mkdir("pool").strpath
  295. p = factory(10, max_nbytes=2, temp_folder=pool_temp_folder)
  296. try:
  297. filename = tmpdir.join("test.mmap").strpath
  298. a = np.memmap(filename, dtype=np.float32, shape=(3, 5), mode="w+")
  299. a.fill(1.0)
  300. # Create an ndarray view on the memmap instance
  301. a_view = np.asarray(a)
  302. assert not isinstance(a_view, np.memmap)
  303. assert has_shareable_memory(a_view)
  304. p.map(
  305. inplace_double,
  306. [
  307. (a_view, (i, j), 1.0)
  308. for i in range(a.shape[0])
  309. for j in range(a.shape[1])
  310. ],
  311. )
  312. # Both a and the a_view have been updated
  313. assert_array_equal(a, 2 * np.ones(a.shape))
  314. assert_array_equal(a_view, 2 * np.ones(a.shape))
  315. # Passing memmap array view to the pool should not trigger the
  316. # creation of new files on the FS
  317. assert os.listdir(pool_temp_folder) == []
  318. finally:
  319. p.terminate()
  320. del p
  321. @with_numpy
  322. @with_multiprocessing
  323. @parametrize("backend", ["multiprocessing", "loky"])
  324. def test_permission_error_windows_reference_cycle(backend):
  325. # Non regression test for:
  326. # https://github.com/joblib/joblib/issues/806
  327. #
  328. # The issue happens when trying to delete a memory mapped file that has
  329. # not yet been closed by one of the worker processes.
  330. cmd = """if 1:
  331. import numpy as np
  332. from joblib import Parallel, delayed
  333. data = np.random.rand(int(2e6)).reshape((int(1e6), 2))
  334. # Build a complex cyclic reference that is likely to delay garbage
  335. # collection of the memmapped array in the worker processes.
  336. first_list = current_list = [data]
  337. for i in range(10):
  338. current_list = [current_list]
  339. first_list.append(current_list)
  340. if __name__ == "__main__":
  341. results = Parallel(n_jobs=2, backend="{b}")(
  342. delayed(len)(current_list) for i in range(10))
  343. assert results == [1] * 10
  344. """.format(b=backend)
  345. p = subprocess.Popen(
  346. [sys.executable, "-c", cmd], stderr=subprocess.PIPE, stdout=subprocess.PIPE
  347. )
  348. p.wait()
  349. out, err = p.communicate()
  350. assert p.returncode == 0, out.decode() + "\n\n" + err.decode()
  351. @with_numpy
  352. @with_multiprocessing
  353. @parametrize("backend", ["multiprocessing", "loky"])
  354. def test_permission_error_windows_memmap_sent_to_parent(backend):
  355. # Second non-regression test for:
  356. # https://github.com/joblib/joblib/issues/806
  357. # previously, child process would not convert temporary memmaps to numpy
  358. # arrays when sending the data back to the parent process. This would lead
  359. # to permission errors on windows when deleting joblib's temporary folder,
  360. # as the memmaped files handles would still opened in the parent process.
  361. cmd = """if 1:
  362. import os
  363. import time
  364. import numpy as np
  365. from joblib import Parallel, delayed
  366. from testutils import return_slice_of_data
  367. data = np.ones(int(2e6))
  368. if __name__ == '__main__':
  369. # warm-up call to launch the workers and start the resource_tracker
  370. _ = Parallel(n_jobs=2, verbose=5, backend='{b}')(
  371. delayed(id)(i) for i in range(20))
  372. time.sleep(0.5)
  373. slice_of_data = Parallel(n_jobs=2, verbose=5, backend='{b}')(
  374. delayed(return_slice_of_data)(data, 0, 20) for _ in range(10))
  375. """.format(b=backend)
  376. for _ in range(3):
  377. env = os.environ.copy()
  378. env["PYTHONPATH"] = os.path.dirname(__file__)
  379. p = subprocess.Popen(
  380. [sys.executable, "-c", cmd],
  381. stderr=subprocess.PIPE,
  382. stdout=subprocess.PIPE,
  383. env=env,
  384. )
  385. p.wait()
  386. out, err = p.communicate()
  387. assert p.returncode == 0, err
  388. assert out == b""
  389. assert b"resource_tracker" not in err
  390. @with_numpy
  391. @with_multiprocessing
  392. @parametrize("backend", ["multiprocessing", "loky"])
  393. def test_parallel_isolated_temp_folders(backend):
  394. # Test that consecutive Parallel call use isolated subfolders, even
  395. # for the loky backend that reuses its executor instance across calls.
  396. array = np.arange(int(1e2))
  397. [filename_1] = Parallel(n_jobs=2, backend=backend, max_nbytes=10)(
  398. delayed(getattr)(array, "filename") for _ in range(1)
  399. )
  400. [filename_2] = Parallel(n_jobs=2, backend=backend, max_nbytes=10)(
  401. delayed(getattr)(array, "filename") for _ in range(1)
  402. )
  403. assert os.path.dirname(filename_2) != os.path.dirname(filename_1)
  404. @with_numpy
  405. @with_multiprocessing
  406. @parametrize("backend", ["multiprocessing", "loky"])
  407. def test_managed_backend_reuse_temp_folder(backend):
  408. # Test that calls to a managed parallel object reuse the same memmaps.
  409. array = np.arange(int(1e2))
  410. with Parallel(n_jobs=2, backend=backend, max_nbytes=10) as p:
  411. [filename_1] = p(delayed(getattr)(array, "filename") for _ in range(1))
  412. [filename_2] = p(delayed(getattr)(array, "filename") for _ in range(1))
  413. assert os.path.dirname(filename_2) == os.path.dirname(filename_1)
  414. @with_numpy
  415. @with_multiprocessing
  416. def test_memmapping_temp_folder_thread_safety():
  417. # Concurrent calls to Parallel with the loky backend will use the same
  418. # executor, and thus the same reducers. Make sure that those reducers use
  419. # different temporary folders depending on which Parallel objects called
  420. # them, which is necessary to limit potential race conditions during the
  421. # garbage collection of temporary memmaps.
  422. array = np.arange(int(1e2))
  423. temp_dirs_thread_1 = set()
  424. temp_dirs_thread_2 = set()
  425. def concurrent_get_filename(array, temp_dirs):
  426. with Parallel(backend="loky", n_jobs=2, max_nbytes=10) as p:
  427. for i in range(10):
  428. [filename] = p(delayed(getattr)(array, "filename") for _ in range(1))
  429. temp_dirs.add(os.path.dirname(filename))
  430. t1 = threading.Thread(
  431. target=concurrent_get_filename, args=(array, temp_dirs_thread_1)
  432. )
  433. t2 = threading.Thread(
  434. target=concurrent_get_filename, args=(array, temp_dirs_thread_2)
  435. )
  436. t1.start()
  437. t2.start()
  438. t1.join()
  439. t2.join()
  440. assert len(temp_dirs_thread_1) == 1
  441. assert len(temp_dirs_thread_2) == 1
  442. assert temp_dirs_thread_1 != temp_dirs_thread_2
  443. @with_numpy
  444. @with_multiprocessing
  445. def test_multithreaded_parallel_termination_resource_tracker_silent():
  446. # test that concurrent termination attempts of a same executor does not
  447. # emit any spurious error from the resource_tracker. We test various
  448. # situations making 0, 1 or both parallel call sending a task that will
  449. # make the worker (and thus the whole Parallel call) error out.
  450. cmd = """if 1:
  451. import os
  452. import numpy as np
  453. from joblib import Parallel, delayed
  454. from joblib.externals.loky.backend import resource_tracker
  455. from concurrent.futures import ThreadPoolExecutor, wait
  456. resource_tracker.VERBOSE = 0
  457. array = np.arange(int(1e2))
  458. temp_dirs_thread_1 = set()
  459. temp_dirs_thread_2 = set()
  460. def raise_error(array):
  461. raise ValueError
  462. def parallel_get_filename(array, temp_dirs):
  463. with Parallel(backend="loky", n_jobs=2, max_nbytes=10) as p:
  464. for i in range(10):
  465. [filename] = p(
  466. delayed(getattr)(array, "filename") for _ in range(1)
  467. )
  468. temp_dirs.add(os.path.dirname(filename))
  469. def parallel_raise(array, temp_dirs):
  470. with Parallel(backend="loky", n_jobs=2, max_nbytes=10) as p:
  471. for i in range(10):
  472. [filename] = p(
  473. delayed(raise_error)(array) for _ in range(1)
  474. )
  475. temp_dirs.add(os.path.dirname(filename))
  476. executor = ThreadPoolExecutor(max_workers=2)
  477. # both function calls will use the same loky executor, but with a
  478. # different Parallel object.
  479. future_1 = executor.submit({f1}, array, temp_dirs_thread_1)
  480. future_2 = executor.submit({f2}, array, temp_dirs_thread_2)
  481. # Wait for both threads to terminate their backend
  482. wait([future_1, future_2])
  483. future_1.result()
  484. future_2.result()
  485. """
  486. functions_and_returncodes = [
  487. ("parallel_get_filename", "parallel_get_filename", 0),
  488. ("parallel_get_filename", "parallel_raise", 1),
  489. ("parallel_raise", "parallel_raise", 1),
  490. ]
  491. for f1, f2, returncode in functions_and_returncodes:
  492. p = subprocess.Popen(
  493. [sys.executable, "-c", cmd.format(f1=f1, f2=f2)],
  494. stderr=subprocess.PIPE,
  495. stdout=subprocess.PIPE,
  496. )
  497. p.wait()
  498. _, err = p.communicate()
  499. assert p.returncode == returncode, err.decode()
  500. assert b"resource_tracker" not in err, err.decode()
  501. @with_numpy
  502. @with_multiprocessing
  503. @parametrize("backend", ["multiprocessing", "loky"])
  504. def test_many_parallel_calls_on_same_object(backend):
  505. # After #966 got merged, consecutive Parallel objects were sharing temp
  506. # folder, which would lead to race conditions happening during the
  507. # temporary resources management with the resource_tracker. This is a
  508. # non-regression test that makes sure that consecutive Parallel operations
  509. # on the same object do not error out.
  510. cmd = """if 1:
  511. import os
  512. import time
  513. import numpy as np
  514. from joblib import Parallel, delayed
  515. from testutils import return_slice_of_data
  516. data = np.ones(100)
  517. if __name__ == '__main__':
  518. for i in range(5):
  519. slice_of_data = Parallel(
  520. n_jobs=2, max_nbytes=1, backend='{b}')(
  521. delayed(return_slice_of_data)(data, 0, 20)
  522. for _ in range(10)
  523. )
  524. """.format(b=backend)
  525. env = os.environ.copy()
  526. env["PYTHONPATH"] = os.path.dirname(__file__)
  527. p = subprocess.Popen(
  528. [sys.executable, "-c", cmd],
  529. stderr=subprocess.PIPE,
  530. stdout=subprocess.PIPE,
  531. env=env,
  532. )
  533. p.wait()
  534. out, err = p.communicate()
  535. assert p.returncode == 0, err.decode()
  536. assert out == b"", out.decode()
  537. assert b"resource_tracker" not in err
  538. @with_numpy
  539. @with_multiprocessing
  540. @parametrize("backend", ["multiprocessing", "loky"])
  541. def test_memmap_returned_as_regular_array(backend):
  542. data = np.ones(int(1e3))
  543. # Check that child processes send temporary memmaps back as numpy arrays.
  544. [result] = Parallel(n_jobs=2, backend=backend, max_nbytes=100)(
  545. delayed(check_memmap_and_send_back)(data) for _ in range(1)
  546. )
  547. assert _get_backing_memmap(result) is None
  548. @with_numpy
  549. @with_multiprocessing
  550. @parametrize("backend", ["multiprocessing", "loky"])
  551. def test_resource_tracker_silent_when_reference_cycles(backend):
  552. # There is a variety of reasons that can make joblib with loky backend
  553. # output noisy warnings when a reference cycle is preventing a memmap from
  554. # being garbage collected. Especially, joblib's main process finalizer
  555. # deletes the temporary folder if it was not done before, which can
  556. # interact badly with the resource_tracker. We don't risk leaking any
  557. # resources, but this will likely make joblib output a lot of low-level
  558. # confusing messages.
  559. #
  560. # This test makes sure that the resource_tracker is silent when a reference
  561. # has been collected concurrently on non-Windows platforms.
  562. #
  563. # Note that the script in ``cmd`` is the exact same script as in
  564. # test_permission_error_windows_reference_cycle.
  565. if backend == "loky" and sys.platform.startswith("win"):
  566. # XXX: on Windows, reference cycles can delay timely garbage collection
  567. # and make it impossible to properly delete the temporary folder in the
  568. # main process because of permission errors.
  569. pytest.xfail(
  570. "The temporary folder cannot be deleted on Windows in the "
  571. "presence of a reference cycle"
  572. )
  573. cmd = """if 1:
  574. import numpy as np
  575. from joblib import Parallel, delayed
  576. data = np.random.rand(int(2e6)).reshape((int(1e6), 2))
  577. # Build a complex cyclic reference that is likely to delay garbage
  578. # collection of the memmapped array in the worker processes.
  579. first_list = current_list = [data]
  580. for i in range(10):
  581. current_list = [current_list]
  582. first_list.append(current_list)
  583. if __name__ == "__main__":
  584. results = Parallel(n_jobs=2, backend="{b}")(
  585. delayed(len)(current_list) for i in range(10))
  586. assert results == [1] * 10
  587. """.format(b=backend)
  588. p = subprocess.Popen(
  589. [sys.executable, "-c", cmd], stderr=subprocess.PIPE, stdout=subprocess.PIPE
  590. )
  591. p.wait()
  592. out, err = p.communicate()
  593. out = out.decode()
  594. err = err.decode()
  595. assert p.returncode == 0, out + "\n\n" + err
  596. assert "resource_tracker" not in err, err
  597. @with_numpy
  598. @with_multiprocessing
  599. @parametrize(
  600. "factory",
  601. [MemmappingPool, TestExecutor.get_memmapping_executor],
  602. ids=["multiprocessing", "loky"],
  603. )
  604. def test_memmapping_pool_for_large_arrays(factory, tmpdir):
  605. """Check that large arrays are not copied in memory"""
  606. # Check that the tempfolder is empty
  607. assert os.listdir(tmpdir.strpath) == []
  608. # Build an array reducers that automatically dump large array content
  609. # to filesystem backed memmap instances to avoid memory explosion
  610. p = factory(3, max_nbytes=40, temp_folder=tmpdir.strpath, verbose=2)
  611. try:
  612. # The temporary folder for the pool is not provisioned in advance
  613. assert os.listdir(tmpdir.strpath) == []
  614. assert not os.path.exists(p._temp_folder)
  615. small = np.ones(5, dtype=np.float32)
  616. assert small.nbytes == 20
  617. p.map(check_array, [(small, i, 1.0) for i in range(small.shape[0])])
  618. # Memory has been copied, the pool filesystem folder is unused
  619. assert os.listdir(tmpdir.strpath) == []
  620. # Try with a file larger than the memmap threshold of 40 bytes
  621. large = np.ones(100, dtype=np.float64)
  622. assert large.nbytes == 800
  623. p.map(check_array, [(large, i, 1.0) for i in range(large.shape[0])])
  624. # The data has been dumped in a temp folder for subprocess to share it
  625. # without per-child memory copies
  626. assert os.path.isdir(p._temp_folder)
  627. dumped_filenames = os.listdir(p._temp_folder)
  628. assert len(dumped_filenames) == 1
  629. # Check that memory mapping is not triggered for arrays with
  630. # dtype='object'
  631. objects = np.array(["abc"] * 100, dtype="object")
  632. results = p.map(has_shareable_memory, [objects])
  633. assert not results[0]
  634. finally:
  635. # check FS garbage upon pool termination
  636. p.terminate()
  637. for i in range(10):
  638. sleep(0.1)
  639. if not os.path.exists(p._temp_folder):
  640. break
  641. else: # pragma: no cover
  642. raise AssertionError(
  643. "temporary folder {} was not deleted".format(p._temp_folder)
  644. )
  645. del p
  646. @with_numpy
  647. @with_multiprocessing
  648. @parametrize(
  649. "backend",
  650. [
  651. pytest.param(
  652. "multiprocessing",
  653. marks=pytest.mark.xfail(
  654. reason="https://github.com/joblib/joblib/issues/1086"
  655. ),
  656. ),
  657. "loky",
  658. ],
  659. )
  660. def test_child_raises_parent_exits_cleanly(backend):
  661. # When a task executed by a child process raises an error, the parent
  662. # process's backend is notified, and calls abort_everything.
  663. # In loky, abort_everything itself calls shutdown(kill_workers=True) which
  664. # sends SIGKILL to the worker, preventing it from running the finalizers
  665. # supposed to signal the resource_tracker when the worker is done using
  666. # objects relying on a shared resource (e.g np.memmaps). Because this
  667. # behavior is prone to :
  668. # - cause a resource leak
  669. # - make the resource tracker emit noisy resource warnings
  670. # we explicitly test that, when the said situation occurs:
  671. # - no resources are actually leaked
  672. # - the temporary resources are deleted as soon as possible (typically, at
  673. # the end of the failing Parallel call)
  674. # - the resource_tracker does not emit any warnings.
  675. cmd = """if 1:
  676. import os
  677. from pathlib import Path
  678. from time import sleep
  679. import numpy as np
  680. from joblib import Parallel, delayed
  681. from testutils import print_filename_and_raise
  682. data = np.random.rand(1000)
  683. def get_temp_folder(parallel_obj, backend):
  684. if "{b}" == "loky":
  685. return Path(parallel_obj._backend._workers._temp_folder)
  686. else:
  687. return Path(parallel_obj._backend._pool._temp_folder)
  688. if __name__ == "__main__":
  689. try:
  690. with Parallel(n_jobs=2, backend="{b}", max_nbytes=100) as p:
  691. temp_folder = get_temp_folder(p, "{b}")
  692. p(delayed(print_filename_and_raise)(data)
  693. for i in range(1))
  694. except ValueError as e:
  695. # the temporary folder should be deleted by the end of this
  696. # call but apparently on some file systems, this takes
  697. # some time to be visible.
  698. #
  699. # We attempt to write into the temporary folder to test for
  700. # its existence and we wait for a maximum of 10 seconds.
  701. for i in range(100):
  702. try:
  703. with open(temp_folder / "some_file.txt", "w") as f:
  704. f.write("some content")
  705. except FileNotFoundError:
  706. # temp_folder has been deleted, all is fine
  707. break
  708. # ... else, wait a bit and try again
  709. sleep(.1)
  710. else:
  711. raise AssertionError(
  712. str(temp_folder) + " was not deleted"
  713. ) from e
  714. """.format(b=backend)
  715. env = os.environ.copy()
  716. env["PYTHONPATH"] = os.path.dirname(__file__)
  717. p = subprocess.Popen(
  718. [sys.executable, "-c", cmd],
  719. stderr=subprocess.PIPE,
  720. stdout=subprocess.PIPE,
  721. env=env,
  722. )
  723. p.wait()
  724. out, err = p.communicate()
  725. out, err = out.decode(), err.decode()
  726. filename = out.split("\n")[0]
  727. assert p.returncode == 0, err or out
  728. assert err == "" # no resource_tracker warnings.
  729. assert not os.path.exists(filename)
  730. @with_numpy
  731. @with_multiprocessing
  732. @parametrize(
  733. "factory",
  734. [MemmappingPool, TestExecutor.get_memmapping_executor],
  735. ids=["multiprocessing", "loky"],
  736. )
  737. def test_memmapping_pool_for_large_arrays_disabled(factory, tmpdir):
  738. """Check that large arrays memmapping can be disabled"""
  739. # Set max_nbytes to None to disable the auto memmapping feature
  740. p = factory(3, max_nbytes=None, temp_folder=tmpdir.strpath)
  741. try:
  742. # Check that the tempfolder is empty
  743. assert os.listdir(tmpdir.strpath) == []
  744. # Try with a file largish than the memmap threshold of 40 bytes
  745. large = np.ones(100, dtype=np.float64)
  746. assert large.nbytes == 800
  747. p.map(check_array, [(large, i, 1.0) for i in range(large.shape[0])])
  748. # Check that the tempfolder is still empty
  749. assert os.listdir(tmpdir.strpath) == []
  750. finally:
  751. # Cleanup open file descriptors
  752. p.terminate()
  753. del p
  754. @with_numpy
  755. @with_multiprocessing
  756. @with_dev_shm
  757. @parametrize(
  758. "factory",
  759. [MemmappingPool, TestExecutor.get_memmapping_executor],
  760. ids=["multiprocessing", "loky"],
  761. )
  762. def test_memmapping_on_large_enough_dev_shm(factory):
  763. """Check that memmapping uses /dev/shm when possible"""
  764. orig_size = jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE
  765. try:
  766. # Make joblib believe that it can use /dev/shm even when running on a
  767. # CI container where the size of the /dev/shm is not very large (that
  768. # is at least 32 MB instead of 2 GB by default).
  769. jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = int(32e6)
  770. p = factory(3, max_nbytes=10)
  771. try:
  772. # Check that the pool has correctly detected the presence of the
  773. # shared memory filesystem.
  774. pool_temp_folder = p._temp_folder
  775. folder_prefix = "/dev/shm/joblib_memmapping_folder_"
  776. assert pool_temp_folder.startswith(folder_prefix)
  777. assert os.path.exists(pool_temp_folder)
  778. # Try with a file larger than the memmap threshold of 10 bytes
  779. a = np.ones(100, dtype=np.float64)
  780. assert a.nbytes == 800
  781. p.map(id, [a] * 10)
  782. # a should have been memmapped to the pool temp folder: the joblib
  783. # pickling procedure generate one .pkl file:
  784. assert len(os.listdir(pool_temp_folder)) == 1
  785. # create a new array with content that is different from 'a' so
  786. # that it is mapped to a different file in the temporary folder of
  787. # the pool.
  788. b = np.ones(100, dtype=np.float64) * 2
  789. assert b.nbytes == 800
  790. p.map(id, [b] * 10)
  791. # A copy of both a and b are now stored in the shared memory folder
  792. assert len(os.listdir(pool_temp_folder)) == 2
  793. finally:
  794. # Cleanup open file descriptors
  795. p.terminate()
  796. del p
  797. for i in range(100):
  798. # The temp folder is cleaned up upon pool termination
  799. if not os.path.exists(pool_temp_folder):
  800. break
  801. sleep(0.1)
  802. else: # pragma: no cover
  803. raise AssertionError("temporary folder of pool was not deleted")
  804. finally:
  805. jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = orig_size
  806. @with_numpy
  807. @with_multiprocessing
  808. @with_dev_shm
  809. @parametrize(
  810. "factory",
  811. [MemmappingPool, TestExecutor.get_memmapping_executor],
  812. ids=["multiprocessing", "loky"],
  813. )
  814. def test_memmapping_on_too_small_dev_shm(factory):
  815. orig_size = jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE
  816. try:
  817. # Make joblib believe that it cannot use /dev/shm unless there is
  818. # 42 exabytes of available shared memory in /dev/shm
  819. jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = int(42e18)
  820. p = factory(3, max_nbytes=10)
  821. try:
  822. # Check that the pool has correctly detected the presence of the
  823. # shared memory filesystem.
  824. pool_temp_folder = p._temp_folder
  825. assert not pool_temp_folder.startswith("/dev/shm")
  826. finally:
  827. # Cleanup open file descriptors
  828. p.terminate()
  829. del p
  830. # The temp folder is cleaned up upon pool termination
  831. assert not os.path.exists(pool_temp_folder)
  832. finally:
  833. jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = orig_size
  834. @with_numpy
  835. @with_multiprocessing
  836. @parametrize(
  837. "factory",
  838. [MemmappingPool, TestExecutor.get_memmapping_executor],
  839. ids=["multiprocessing", "loky"],
  840. )
  841. def test_memmapping_pool_for_large_arrays_in_return(factory, tmpdir):
  842. """Check that large arrays are not copied in memory in return"""
  843. assert_array_equal = np.testing.assert_array_equal
  844. # Build an array reducers that automatically dump large array content
  845. # but check that the returned datastructure are regular arrays to avoid
  846. # passing a memmap array pointing to a pool controlled temp folder that
  847. # might be confusing to the user
  848. # The MemmappingPool user can always return numpy.memmap object explicitly
  849. # to avoid memory copy
  850. p = factory(3, max_nbytes=10, temp_folder=tmpdir.strpath)
  851. try:
  852. res = p.apply_async(np.ones, args=(1000,))
  853. large = res.get()
  854. assert not has_shareable_memory(large)
  855. assert_array_equal(large, np.ones(1000))
  856. finally:
  857. p.terminate()
  858. del p
  859. def _worker_multiply(a, n_times):
  860. """Multiplication function to be executed by subprocess"""
  861. assert has_shareable_memory(a)
  862. return a * n_times
  863. @with_numpy
  864. @with_multiprocessing
  865. @parametrize(
  866. "factory",
  867. [MemmappingPool, TestExecutor.get_memmapping_executor],
  868. ids=["multiprocessing", "loky"],
  869. )
  870. def test_workaround_against_bad_memmap_with_copied_buffers(factory, tmpdir):
  871. """Check that memmaps with a bad buffer are returned as regular arrays
  872. Unary operations and ufuncs on memmap instances return a new memmap
  873. instance with an in-memory buffer (probably a numpy bug).
  874. """
  875. assert_array_equal = np.testing.assert_array_equal
  876. p = factory(3, max_nbytes=10, temp_folder=tmpdir.strpath)
  877. try:
  878. # Send a complex, large-ish view on a array that will be converted to
  879. # a memmap in the worker process
  880. a = np.asarray(np.arange(6000).reshape((1000, 2, 3)), order="F")[:, :1, :]
  881. # Call a non-inplace multiply operation on the worker and memmap and
  882. # send it back to the parent.
  883. b = p.apply_async(_worker_multiply, args=(a, 3)).get()
  884. assert not has_shareable_memory(b)
  885. assert_array_equal(b, 3 * a)
  886. finally:
  887. p.terminate()
  888. del p
  889. def identity(arg):
  890. return arg
  891. @with_numpy
  892. @with_multiprocessing
  893. @parametrize(
  894. "factory,retry_no",
  895. list(
  896. itertools.product(
  897. [MemmappingPool, TestExecutor.get_memmapping_executor], range(3)
  898. )
  899. ),
  900. ids=[
  901. "{}, {}".format(x, y)
  902. for x, y in itertools.product(["multiprocessing", "loky"], map(str, range(3)))
  903. ],
  904. )
  905. def test_pool_memmap_with_big_offset(factory, retry_no, tmpdir):
  906. # Test that numpy memmap offset is set correctly if greater than
  907. # mmap.ALLOCATIONGRANULARITY, see
  908. # https://github.com/joblib/joblib/issues/451 and
  909. # https://github.com/numpy/numpy/pull/8443 for more details.
  910. fname = tmpdir.join("test.mmap").strpath
  911. size = 5 * mmap.ALLOCATIONGRANULARITY
  912. offset = mmap.ALLOCATIONGRANULARITY + 1
  913. obj = make_memmap(fname, mode="w+", shape=size, dtype="uint8", offset=offset)
  914. p = factory(2, temp_folder=tmpdir.strpath)
  915. result = p.apply_async(identity, args=(obj,)).get()
  916. assert isinstance(result, np.memmap)
  917. assert result.offset == offset
  918. np.testing.assert_array_equal(obj, result)
  919. p.terminate()
  920. def test_pool_get_temp_dir(tmpdir):
  921. pool_folder_name = "test.tmpdir"
  922. pool_folder, shared_mem = _get_temp_dir(pool_folder_name, tmpdir.strpath)
  923. assert shared_mem is False
  924. assert pool_folder == tmpdir.join("test.tmpdir").strpath
  925. pool_folder, shared_mem = _get_temp_dir(pool_folder_name, temp_folder=None)
  926. if sys.platform.startswith("win"):
  927. assert shared_mem is False
  928. assert pool_folder.endswith(pool_folder_name)
  929. def test_pool_get_temp_dir_no_statvfs(tmpdir, monkeypatch):
  930. """Check that _get_temp_dir works when os.statvfs is not defined
  931. Regression test for #902
  932. """
  933. pool_folder_name = "test.tmpdir"
  934. import joblib._memmapping_reducer
  935. if hasattr(joblib._memmapping_reducer.os, "statvfs"):
  936. # We are on Unix, since Windows doesn't have this function
  937. monkeypatch.delattr(joblib._memmapping_reducer.os, "statvfs")
  938. pool_folder, shared_mem = _get_temp_dir(pool_folder_name, temp_folder=None)
  939. if sys.platform.startswith("win"):
  940. assert shared_mem is False
  941. assert pool_folder.endswith(pool_folder_name)
  942. @with_numpy
  943. @skipif(
  944. sys.platform == "win32", reason="This test fails with a PermissionError on Windows"
  945. )
  946. @parametrize("mmap_mode", ["r+", "w+"])
  947. def test_numpy_arrays_use_different_memory(mmap_mode):
  948. def func(arr, value):
  949. arr[:] = value
  950. return arr
  951. arrays = [np.zeros((10, 10), dtype="float64") for i in range(10)]
  952. results = Parallel(mmap_mode=mmap_mode, max_nbytes=0, n_jobs=2)(
  953. delayed(func)(arr, i) for i, arr in enumerate(arrays)
  954. )
  955. for i, arr in enumerate(results):
  956. np.testing.assert_array_equal(arr, i)
  957. @with_numpy
  958. def test_weak_array_key_map():
  959. def assert_empty_after_gc_collect(container, retries=100):
  960. for i in range(retries):
  961. if len(container) == 0:
  962. return
  963. gc.collect()
  964. sleep(0.1)
  965. assert len(container) == 0
  966. a = np.ones(42)
  967. m = _WeakArrayKeyMap()
  968. m.set(a, "a")
  969. assert m.get(a) == "a"
  970. b = a
  971. assert m.get(b) == "a"
  972. m.set(b, "b")
  973. assert m.get(a) == "b"
  974. del a
  975. gc.collect()
  976. assert len(m._data) == 1
  977. assert m.get(b) == "b"
  978. del b
  979. assert_empty_after_gc_collect(m._data)
  980. c = np.ones(42)
  981. m.set(c, "c")
  982. assert len(m._data) == 1
  983. assert m.get(c) == "c"
  984. with raises(KeyError):
  985. m.get(np.ones(42))
  986. del c
  987. assert_empty_after_gc_collect(m._data)
  988. # Check that creating and dropping numpy arrays with potentially the same
  989. # object id will not cause the map to get confused.
  990. def get_set_get_collect(m, i):
  991. a = np.ones(42)
  992. with raises(KeyError):
  993. m.get(a)
  994. m.set(a, i)
  995. assert m.get(a) == i
  996. return id(a)
  997. unique_ids = set([get_set_get_collect(m, i) for i in range(1000)])
  998. if platform.python_implementation() == "CPython":
  999. # On CPython (at least) the same id is often reused many times for the
  1000. # temporary arrays created under the local scope of the
  1001. # get_set_get_collect function without causing any spurious lookups /
  1002. # insertions in the map. Apparently on free-threaded Python, the id is
  1003. # not reused as often.
  1004. max_len_unique_ids = 400 if IS_GIL_DISABLED else 100
  1005. assert len(unique_ids) < max_len_unique_ids
  1006. def test_weak_array_key_map_no_pickling():
  1007. m = _WeakArrayKeyMap()
  1008. with raises(pickle.PicklingError):
  1009. pickle.dumps(m)
  1010. @with_numpy
  1011. @with_multiprocessing
  1012. def test_direct_mmap(tmpdir):
  1013. testfile = str(tmpdir.join("arr.dat"))
  1014. a = np.arange(10, dtype="uint8")
  1015. a.tofile(testfile)
  1016. def _read_array():
  1017. with open(testfile) as fd:
  1018. mm = mmap.mmap(fd.fileno(), 0, access=mmap.ACCESS_READ, offset=0)
  1019. return np.ndarray((10,), dtype=np.uint8, buffer=mm, offset=0)
  1020. def func(x):
  1021. return x**2
  1022. arr = _read_array()
  1023. # this gives the reference result of the function with an array
  1024. ref = Parallel(n_jobs=2)(delayed(func)(x) for x in [a])
  1025. # now test that it works with the mmap array
  1026. results = Parallel(n_jobs=2)(delayed(func)(x) for x in [arr])
  1027. np.testing.assert_array_equal(results, ref)
  1028. # also test that a mmap array read in the subprocess is correctly returned
  1029. results = Parallel(n_jobs=2)(delayed(_read_array)() for _ in range(1))
  1030. np.testing.assert_array_equal(results[0], arr)
  1031. @with_numpy
  1032. @with_multiprocessing
  1033. def test_parallel_memmap2d_as_memmap_1d_base(tmpdir):
  1034. # non-regression test for https://github.com/joblib/joblib/issues/1703,
  1035. # where 2D arrays backed by 1D memmap had un-wanted order changes.
  1036. testfile = str(tmpdir.join("arr2.dat"))
  1037. a = np.arange(10, dtype="uint8").reshape(5, 2)
  1038. a.tofile(testfile)
  1039. def _read_array():
  1040. mm = np.memmap(testfile)
  1041. return mm.reshape(5, 2)
  1042. def func(x):
  1043. return x**2
  1044. arr = _read_array()
  1045. # this gives the reference result of the function with an array
  1046. ref = Parallel(n_jobs=2)(delayed(func)(x) for x in [a])
  1047. # now test that it works with a view on a 1D mmap array
  1048. results = Parallel(n_jobs=2)(delayed(func)(x) for x in [arr])
  1049. assert not results[0].flags["F_CONTIGUOUS"]
  1050. np.testing.assert_array_equal(results, ref)
  1051. # also test that returned memmap arrays are correctly ordered
  1052. results = Parallel(n_jobs=2)(delayed(_read_array)() for _ in range(1))
  1053. np.testing.assert_array_equal(results[0], a)