queues.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. ###############################################################################
  2. # Queue and SimpleQueue implementation for loky
  3. #
  4. # authors: Thomas Moreau, Olivier Grisel
  5. #
  6. # based on multiprocessing/queues.py (16/02/2017)
  7. # * Add some custom reducers for the Queues/SimpleQueue to tweak the
  8. # pickling process. (overload Queue._feed/SimpleQueue.put)
  9. #
  10. import os
  11. import sys
  12. import errno
  13. import weakref
  14. import threading
  15. from multiprocessing import util
  16. from multiprocessing.queues import (
  17. Full,
  18. Queue as mp_Queue,
  19. SimpleQueue as mp_SimpleQueue,
  20. _sentinel,
  21. )
  22. from multiprocessing.context import assert_spawning
  23. from .reduction import dumps
  24. __all__ = ["Queue", "SimpleQueue", "Full"]
  25. class Queue(mp_Queue):
  26. def __init__(self, maxsize=0, reducers=None, ctx=None):
  27. super().__init__(maxsize=maxsize, ctx=ctx)
  28. self._reducers = reducers
  29. # Use custom queue set/get state to be able to reduce the custom reducers
  30. def __getstate__(self):
  31. assert_spawning(self)
  32. return (
  33. self._ignore_epipe,
  34. self._maxsize,
  35. self._reader,
  36. self._writer,
  37. self._reducers,
  38. self._rlock,
  39. self._wlock,
  40. self._sem,
  41. self._opid,
  42. )
  43. def __setstate__(self, state):
  44. (
  45. self._ignore_epipe,
  46. self._maxsize,
  47. self._reader,
  48. self._writer,
  49. self._reducers,
  50. self._rlock,
  51. self._wlock,
  52. self._sem,
  53. self._opid,
  54. ) = state
  55. if sys.version_info >= (3, 9):
  56. self._reset()
  57. else:
  58. self._after_fork()
  59. # Overload _start_thread to correctly call our custom _feed
  60. def _start_thread(self):
  61. util.debug("Queue._start_thread()")
  62. # Start thread which transfers data from buffer to pipe
  63. self._buffer.clear()
  64. self._thread = threading.Thread(
  65. target=Queue._feed,
  66. args=(
  67. self._buffer,
  68. self._notempty,
  69. self._send_bytes,
  70. self._wlock,
  71. self._writer.close,
  72. self._reducers,
  73. self._ignore_epipe,
  74. self._on_queue_feeder_error,
  75. self._sem,
  76. ),
  77. name="QueueFeederThread",
  78. )
  79. self._thread.daemon = True
  80. util.debug("doing self._thread.start()")
  81. self._thread.start()
  82. util.debug("... done self._thread.start()")
  83. # On process exit we will wait for data to be flushed to pipe.
  84. #
  85. # However, if this process created the queue then all
  86. # processes which use the queue will be descendants of this
  87. # process. Therefore waiting for the queue to be flushed
  88. # is pointless once all the child processes have been joined.
  89. created_by_this_process = self._opid == os.getpid()
  90. if not self._joincancelled and not created_by_this_process:
  91. self._jointhread = util.Finalize(
  92. self._thread,
  93. Queue._finalize_join,
  94. [weakref.ref(self._thread)],
  95. exitpriority=-5,
  96. )
  97. # Send sentinel to the thread queue object when garbage collected
  98. self._close = util.Finalize(
  99. self,
  100. Queue._finalize_close,
  101. [self._buffer, self._notempty],
  102. exitpriority=10,
  103. )
  104. # Overload the _feed methods to use our custom pickling strategy.
  105. @staticmethod
  106. def _feed(
  107. buffer,
  108. notempty,
  109. send_bytes,
  110. writelock,
  111. close,
  112. reducers,
  113. ignore_epipe,
  114. onerror,
  115. queue_sem,
  116. ):
  117. util.debug("starting thread to feed data to pipe")
  118. nacquire = notempty.acquire
  119. nrelease = notempty.release
  120. nwait = notempty.wait
  121. bpopleft = buffer.popleft
  122. sentinel = _sentinel
  123. if sys.platform != "win32":
  124. wacquire = writelock.acquire
  125. wrelease = writelock.release
  126. else:
  127. wacquire = None
  128. while True:
  129. try:
  130. nacquire()
  131. try:
  132. if not buffer:
  133. nwait()
  134. finally:
  135. nrelease()
  136. try:
  137. while True:
  138. obj = bpopleft()
  139. if obj is sentinel:
  140. util.debug("feeder thread got sentinel -- exiting")
  141. close()
  142. return
  143. # serialize the data before acquiring the lock
  144. obj_ = dumps(obj, reducers=reducers)
  145. if wacquire is None:
  146. send_bytes(obj_)
  147. else:
  148. wacquire()
  149. try:
  150. send_bytes(obj_)
  151. finally:
  152. wrelease()
  153. # Remove references early to avoid leaking memory
  154. del obj, obj_
  155. except IndexError:
  156. pass
  157. except BaseException as e:
  158. if ignore_epipe and getattr(e, "errno", 0) == errno.EPIPE:
  159. return
  160. # Since this runs in a daemon thread the resources it uses
  161. # may be become unusable while the process is cleaning up.
  162. # We ignore errors which happen after the process has
  163. # started to cleanup.
  164. if util.is_exiting():
  165. util.info(f"error in queue thread: {e}")
  166. return
  167. else:
  168. queue_sem.release()
  169. onerror(e, obj)
  170. def _on_queue_feeder_error(self, e, obj):
  171. """
  172. Private API hook called when feeding data in the background thread
  173. raises an exception. For overriding by concurrent.futures.
  174. """
  175. import traceback
  176. traceback.print_exc()
  177. class SimpleQueue(mp_SimpleQueue):
  178. def __init__(self, reducers=None, ctx=None):
  179. super().__init__(ctx=ctx)
  180. # Add possiblity to use custom reducers
  181. self._reducers = reducers
  182. def close(self):
  183. self._reader.close()
  184. self._writer.close()
  185. # Use custom queue set/get state to be able to reduce the custom reducers
  186. def __getstate__(self):
  187. assert_spawning(self)
  188. return (
  189. self._reader,
  190. self._writer,
  191. self._reducers,
  192. self._rlock,
  193. self._wlock,
  194. )
  195. def __setstate__(self, state):
  196. (
  197. self._reader,
  198. self._writer,
  199. self._reducers,
  200. self._rlock,
  201. self._wlock,
  202. ) = state
  203. # Overload put to use our customizable reducer
  204. def put(self, obj):
  205. # serialize the data before acquiring the lock
  206. obj = dumps(obj, reducers=self._reducers)
  207. if self._wlock is None:
  208. # writes to a message oriented win32 pipe are atomic
  209. self._writer.send_bytes(obj)
  210. else:
  211. with self._wlock:
  212. self._writer.send_bytes(obj)