reusable_executor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. ###############################################################################
  2. # Reusable ProcessPoolExecutor
  3. #
  4. # author: Thomas Moreau and Olivier Grisel
  5. #
  6. import time
  7. import warnings
  8. import threading
  9. import multiprocessing as mp
  10. from .process_executor import ProcessPoolExecutor, EXTRA_QUEUED_CALLS
  11. from .backend.context import cpu_count
  12. from .backend import get_context
  13. __all__ = ["get_reusable_executor"]
  14. # Singleton executor and id management
  15. _executor_lock = threading.RLock()
  16. _next_executor_id = 0
  17. _executor = None
  18. _executor_kwargs = None
  19. def _get_next_executor_id():
  20. """Ensure that each successive executor instance has a unique, monotonic id.
  21. The purpose of this monotonic id is to help debug and test automated
  22. instance creation.
  23. """
  24. global _next_executor_id
  25. with _executor_lock:
  26. executor_id = _next_executor_id
  27. _next_executor_id += 1
  28. return executor_id
  29. def get_reusable_executor(
  30. max_workers=None,
  31. context=None,
  32. timeout=10,
  33. kill_workers=False,
  34. reuse="auto",
  35. job_reducers=None,
  36. result_reducers=None,
  37. initializer=None,
  38. initargs=(),
  39. env=None,
  40. ):
  41. """Return the current ReusableExectutor instance.
  42. Start a new instance if it has not been started already or if the previous
  43. instance was left in a broken state.
  44. If the previous instance does not have the requested number of workers, the
  45. executor is dynamically resized to adjust the number of workers prior to
  46. returning.
  47. Reusing a singleton instance spares the overhead of starting new worker
  48. processes and importing common python packages each time.
  49. ``max_workers`` controls the maximum number of tasks that can be running in
  50. parallel in worker processes. By default this is set to the number of
  51. CPUs on the host.
  52. Setting ``timeout`` (in seconds) makes idle workers automatically shutdown
  53. so as to release system resources. New workers are respawn upon submission
  54. of new tasks so that ``max_workers`` are available to accept the newly
  55. submitted tasks. Setting ``timeout`` to around 100 times the time required
  56. to spawn new processes and import packages in them (on the order of 100ms)
  57. ensures that the overhead of spawning workers is negligible.
  58. Setting ``kill_workers=True`` makes it possible to forcibly interrupt
  59. previously spawned jobs to get a new instance of the reusable executor
  60. with new constructor argument values.
  61. The ``job_reducers`` and ``result_reducers`` are used to customize the
  62. pickling of tasks and results send to the executor.
  63. When provided, the ``initializer`` is run first in newly spawned
  64. processes with argument ``initargs``.
  65. The environment variable in the child process are a copy of the values in
  66. the main process. One can provide a dict ``{ENV: VAL}`` where ``ENV`` and
  67. ``VAL`` are string literals to overwrite the environment variable ``ENV``
  68. in the child processes to value ``VAL``. The environment variables are set
  69. in the children before any module is loaded. This only works with the
  70. ``loky`` context.
  71. """
  72. _executor, _ = _ReusablePoolExecutor.get_reusable_executor(
  73. max_workers=max_workers,
  74. context=context,
  75. timeout=timeout,
  76. kill_workers=kill_workers,
  77. reuse=reuse,
  78. job_reducers=job_reducers,
  79. result_reducers=result_reducers,
  80. initializer=initializer,
  81. initargs=initargs,
  82. env=env,
  83. )
  84. return _executor
  85. class _ReusablePoolExecutor(ProcessPoolExecutor):
  86. def __init__(
  87. self,
  88. submit_resize_lock,
  89. max_workers=None,
  90. context=None,
  91. timeout=None,
  92. executor_id=0,
  93. job_reducers=None,
  94. result_reducers=None,
  95. initializer=None,
  96. initargs=(),
  97. env=None,
  98. ):
  99. super().__init__(
  100. max_workers=max_workers,
  101. context=context,
  102. timeout=timeout,
  103. job_reducers=job_reducers,
  104. result_reducers=result_reducers,
  105. initializer=initializer,
  106. initargs=initargs,
  107. env=env,
  108. )
  109. self.executor_id = executor_id
  110. self._submit_resize_lock = submit_resize_lock
  111. @classmethod
  112. def get_reusable_executor(
  113. cls,
  114. max_workers=None,
  115. context=None,
  116. timeout=10,
  117. kill_workers=False,
  118. reuse="auto",
  119. job_reducers=None,
  120. result_reducers=None,
  121. initializer=None,
  122. initargs=(),
  123. env=None,
  124. ):
  125. with _executor_lock:
  126. global _executor, _executor_kwargs
  127. executor = _executor
  128. if max_workers is None:
  129. if reuse is True and executor is not None:
  130. max_workers = executor._max_workers
  131. else:
  132. max_workers = cpu_count()
  133. elif max_workers <= 0:
  134. raise ValueError(
  135. f"max_workers must be greater than 0, got {max_workers}."
  136. )
  137. if isinstance(context, str):
  138. context = get_context(context)
  139. if context is not None and context.get_start_method() == "fork":
  140. raise ValueError(
  141. "Cannot use reusable executor with the 'fork' context"
  142. )
  143. kwargs = dict(
  144. context=context,
  145. timeout=timeout,
  146. job_reducers=job_reducers,
  147. result_reducers=result_reducers,
  148. initializer=initializer,
  149. initargs=initargs,
  150. env=env,
  151. )
  152. if executor is None:
  153. is_reused = False
  154. mp.util.debug(
  155. f"Create a executor with max_workers={max_workers}."
  156. )
  157. executor_id = _get_next_executor_id()
  158. _executor_kwargs = kwargs
  159. _executor = executor = cls(
  160. _executor_lock,
  161. max_workers=max_workers,
  162. executor_id=executor_id,
  163. **kwargs,
  164. )
  165. else:
  166. if reuse == "auto":
  167. reuse = kwargs == _executor_kwargs
  168. if (
  169. executor._flags.broken
  170. or executor._flags.shutdown
  171. or not reuse
  172. or executor.queue_size < max_workers
  173. ):
  174. if executor._flags.broken:
  175. reason = "broken"
  176. elif executor._flags.shutdown:
  177. reason = "shutdown"
  178. elif executor.queue_size < max_workers:
  179. # Do not reuse the executor if the queue size is too
  180. # small as this would lead to limited parallelism.
  181. reason = "queue size is too small"
  182. else:
  183. reason = "arguments have changed"
  184. mp.util.debug(
  185. "Creating a new executor with max_workers="
  186. f"{max_workers} as the previous instance cannot be "
  187. f"reused ({reason})."
  188. )
  189. executor.shutdown(wait=True, kill_workers=kill_workers)
  190. _executor = executor = _executor_kwargs = None
  191. # Recursive call to build a new instance
  192. return cls.get_reusable_executor(
  193. max_workers=max_workers, **kwargs
  194. )
  195. else:
  196. mp.util.debug(
  197. "Reusing existing executor with "
  198. f"max_workers={executor._max_workers}."
  199. )
  200. is_reused = True
  201. executor._resize(max_workers)
  202. return executor, is_reused
  203. def submit(self, fn, *args, **kwargs):
  204. with self._submit_resize_lock:
  205. return super().submit(fn, *args, **kwargs)
  206. def _resize(self, max_workers):
  207. with self._submit_resize_lock:
  208. if max_workers is None:
  209. raise ValueError("Trying to resize with max_workers=None")
  210. elif max_workers == self._max_workers:
  211. return
  212. if self._executor_manager_thread is None:
  213. # If the executor_manager_thread has not been started
  214. # then no processes have been spawned and we can just
  215. # update _max_workers and return
  216. self._max_workers = max_workers
  217. return
  218. self._wait_job_completion()
  219. # Some process might have returned due to timeout so check how many
  220. # children are still alive. Use the _process_management_lock to
  221. # ensure that no process are spawned or timeout during the resize.
  222. with self._processes_management_lock:
  223. processes = list(self._processes.values())
  224. nb_children_alive = sum(p.is_alive() for p in processes)
  225. self._max_workers = max_workers
  226. for _ in range(max_workers, nb_children_alive):
  227. self._call_queue.put(None)
  228. while (
  229. len(self._processes) > max_workers and not self._flags.broken
  230. ):
  231. time.sleep(1e-3)
  232. self._adjust_process_count()
  233. processes = list(self._processes.values())
  234. while not all(p.is_alive() for p in processes):
  235. time.sleep(1e-3)
  236. def _wait_job_completion(self):
  237. """Wait for the cache to be empty before resizing the pool."""
  238. # Issue a warning to the user about the bad effect of this usage.
  239. if self._pending_work_items:
  240. warnings.warn(
  241. "Trying to resize an executor with running jobs: "
  242. "waiting for jobs completion before resizing.",
  243. UserWarning,
  244. )
  245. mp.util.debug(
  246. f"Executor {self.executor_id} waiting for jobs completion "
  247. "before resizing"
  248. )
  249. # Wait for the completion of the jobs
  250. while self._pending_work_items:
  251. time.sleep(1e-3)
  252. def _setup_queues(self, job_reducers, result_reducers):
  253. # As this executor can be resized, use a large queue size to avoid
  254. # underestimating capacity and introducing overhead
  255. # Also handle the case where the user set max_workers to a value larger
  256. # than cpu_count(), to avoid limiting the number of parallel jobs.
  257. min_queue_size = max(cpu_count(), self._max_workers)
  258. self.queue_size = 2 * min_queue_size + EXTRA_QUEUED_CALLS
  259. super()._setup_queues(
  260. job_reducers, result_reducers, queue_size=self.queue_size
  261. )