executor.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. """Utility function to construct a loky.ReusableExecutor with custom pickler.
  2. This module provides efficient ways of working with data stored in
  3. shared memory with numpy.memmap arrays without inducing any memory
  4. copy between the parent and child processes.
  5. """
  6. # Author: Thomas Moreau <thomas.moreau.2010@gmail.com>
  7. # Copyright: 2017, Thomas Moreau
  8. # License: BSD 3 clause
  9. from ._memmapping_reducer import TemporaryResourcesManager, get_memmapping_reducers
  10. from .externals.loky.reusable_executor import _ReusablePoolExecutor
  11. _executor_args = None
  12. def get_memmapping_executor(n_jobs, **kwargs):
  13. return MemmappingExecutor.get_memmapping_executor(n_jobs, **kwargs)
  14. class MemmappingExecutor(_ReusablePoolExecutor):
  15. @classmethod
  16. def get_memmapping_executor(
  17. cls,
  18. n_jobs,
  19. timeout=300,
  20. initializer=None,
  21. initargs=(),
  22. env=None,
  23. temp_folder=None,
  24. context_id=None,
  25. **backend_args,
  26. ):
  27. """Factory for ReusableExecutor with automatic memmapping for large
  28. numpy arrays.
  29. """
  30. global _executor_args
  31. # Check if we can reuse the executor here instead of deferring the test
  32. # to loky as the reducers are objects that changes at each call.
  33. executor_args = backend_args.copy()
  34. executor_args.update(env if env else {})
  35. executor_args.update(
  36. dict(timeout=timeout, initializer=initializer, initargs=initargs)
  37. )
  38. reuse = _executor_args is None or _executor_args == executor_args
  39. _executor_args = executor_args
  40. manager = TemporaryResourcesManager(temp_folder)
  41. # reducers access the temporary folder in which to store temporary
  42. # pickles through a call to manager.resolve_temp_folder_name. resolving
  43. # the folder name dynamically is useful to use different folders across
  44. # calls of a same reusable executor
  45. job_reducers, result_reducers = get_memmapping_reducers(
  46. unlink_on_gc_collect=True,
  47. temp_folder_resolver=manager.resolve_temp_folder_name,
  48. **backend_args,
  49. )
  50. _executor, executor_is_reused = super().get_reusable_executor(
  51. n_jobs,
  52. job_reducers=job_reducers,
  53. result_reducers=result_reducers,
  54. reuse=reuse,
  55. timeout=timeout,
  56. initializer=initializer,
  57. initargs=initargs,
  58. env=env,
  59. )
  60. if not executor_is_reused:
  61. # Only set a _temp_folder_manager for new executors. Reused
  62. # executors already have a _temporary_folder_manager that must not
  63. # be re-assigned like that because it is referenced in various
  64. # places in the reducing machinery of the executor.
  65. _executor._temp_folder_manager = manager
  66. if context_id is not None:
  67. # Only register the specified context once we know which manager
  68. # the current executor is using, in order to not register an atexit
  69. # finalizer twice for the same folder.
  70. _executor._temp_folder_manager.register_new_context(context_id)
  71. return _executor
  72. def terminate(self, kill_workers=False):
  73. self.shutdown(kill_workers=kill_workers)
  74. # When workers are killed in a brutal manner, they cannot execute the
  75. # finalizer of their shared memmaps. The refcount of those memmaps may
  76. # be off by an unknown number, so instead of decref'ing them, we force
  77. # delete the whole temporary folder, and unregister them. There is no
  78. # risk of PermissionError at folder deletion because at this
  79. # point, all child processes are dead, so all references to temporary
  80. # memmaps are closed. Otherwise, just try to delete as much as possible
  81. # with allow_non_empty=True but if we can't, it will be clean up later
  82. # on by the resource_tracker.
  83. with self._submit_resize_lock:
  84. self._temp_folder_manager._clean_temporary_resources(
  85. force=kill_workers, allow_non_empty=True
  86. )
  87. @property
  88. def _temp_folder(self):
  89. # Legacy property in tests. could be removed if we refactored the
  90. # memmapping tests. SHOULD ONLY BE USED IN TESTS!
  91. # We cache this property because it is called late in the tests - at
  92. # this point, all context have been unregistered, and
  93. # resolve_temp_folder_name raises an error.
  94. if getattr(self, "_cached_temp_folder", None) is not None:
  95. return self._cached_temp_folder
  96. else:
  97. self._cached_temp_folder = (
  98. self._temp_folder_manager.resolve_temp_folder_name()
  99. ) # noqa
  100. return self._cached_temp_folder
  101. class _TestingMemmappingExecutor(MemmappingExecutor):
  102. """Wrapper around ReusableExecutor to ease memmapping testing with Pool
  103. and Executor. This is only for testing purposes.
  104. """
  105. def apply_async(self, func, args):
  106. """Schedule a func to be run"""
  107. future = self.submit(func, *args)
  108. future.get = future.result
  109. return future
  110. def map(self, f, *args):
  111. return list(super().map(f, *args))