synchronize.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. ###############################################################################
  2. # Synchronization primitives based on our SemLock implementation
  3. #
  4. # author: Thomas Moreau and Olivier Grisel
  5. #
  6. # adapted from multiprocessing/synchronize.py (17/02/2017)
  7. # * Remove ctx argument for compatibility reason
  8. # * Registers a cleanup function with the loky resource_tracker to remove the
  9. # semaphore when the process dies instead.
  10. #
  11. # TODO: investigate which Python version is required to be able to use
  12. # multiprocessing.resource_tracker and therefore multiprocessing.synchronize
  13. # instead of a loky-specific fork.
  14. import os
  15. import sys
  16. import tempfile
  17. import threading
  18. import _multiprocessing
  19. from time import time as _time
  20. from multiprocessing import process, util
  21. from multiprocessing.context import assert_spawning
  22. from . import resource_tracker
  23. __all__ = [
  24. "Lock",
  25. "RLock",
  26. "Semaphore",
  27. "BoundedSemaphore",
  28. "Condition",
  29. "Event",
  30. ]
  31. # Try to import the mp.synchronize module cleanly, if it fails
  32. # raise ImportError for platforms lacking a working sem_open implementation.
  33. # See issue 3770
  34. try:
  35. from _multiprocessing import SemLock as _SemLock
  36. from _multiprocessing import sem_unlink
  37. except ImportError:
  38. raise ImportError(
  39. "This platform lacks a functioning sem_open"
  40. " implementation, therefore, the required"
  41. " synchronization primitives needed will not"
  42. " function, see issue 3770."
  43. )
  44. #
  45. # Constants
  46. #
  47. RECURSIVE_MUTEX, SEMAPHORE = range(2)
  48. SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
  49. #
  50. # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
  51. #
  52. class SemLock:
  53. _rand = tempfile._RandomNameSequence()
  54. def __init__(self, kind, value, maxvalue, name=None):
  55. # unlink_now is only used on win32 or when we are using fork.
  56. unlink_now = False
  57. if name is None:
  58. # Try to find an unused name for the SemLock instance.
  59. for _ in range(100):
  60. try:
  61. self._semlock = _SemLock(
  62. kind, value, maxvalue, SemLock._make_name(), unlink_now
  63. )
  64. except FileExistsError: # pragma: no cover
  65. pass
  66. else:
  67. break
  68. else: # pragma: no cover
  69. raise FileExistsError("cannot find name for semaphore")
  70. else:
  71. self._semlock = _SemLock(kind, value, maxvalue, name, unlink_now)
  72. self.name = name
  73. util.debug(
  74. f"created semlock with handle {self._semlock.handle} and name "
  75. f'"{self.name}"'
  76. )
  77. self._make_methods()
  78. def _after_fork(obj):
  79. obj._semlock._after_fork()
  80. util.register_after_fork(self, _after_fork)
  81. # When the object is garbage collected or the
  82. # process shuts down we unlink the semaphore name
  83. resource_tracker.register(self._semlock.name, "semlock")
  84. util.Finalize(
  85. self, SemLock._cleanup, (self._semlock.name,), exitpriority=0
  86. )
  87. @staticmethod
  88. def _cleanup(name):
  89. try:
  90. sem_unlink(name)
  91. except FileNotFoundError:
  92. # Already unlinked, possibly by user code: ignore and make sure to
  93. # unregister the semaphore from the resource tracker.
  94. pass
  95. finally:
  96. resource_tracker.unregister(name, "semlock")
  97. def _make_methods(self):
  98. self.acquire = self._semlock.acquire
  99. self.release = self._semlock.release
  100. def __enter__(self):
  101. return self._semlock.acquire()
  102. def __exit__(self, *args):
  103. return self._semlock.release()
  104. def __getstate__(self):
  105. assert_spawning(self)
  106. sl = self._semlock
  107. h = sl.handle
  108. return (h, sl.kind, sl.maxvalue, sl.name)
  109. def __setstate__(self, state):
  110. self._semlock = _SemLock._rebuild(*state)
  111. util.debug(
  112. f'recreated blocker with handle {state[0]!r} and name "{state[3]}"'
  113. )
  114. self._make_methods()
  115. @staticmethod
  116. def _make_name():
  117. # OSX does not support long names for semaphores
  118. return f"/loky-{os.getpid()}-{next(SemLock._rand)}"
  119. #
  120. # Semaphore
  121. #
  122. class Semaphore(SemLock):
  123. def __init__(self, value=1):
  124. SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
  125. def get_value(self):
  126. if sys.platform == "darwin":
  127. raise NotImplementedError("OSX does not implement sem_getvalue")
  128. return self._semlock._get_value()
  129. def __repr__(self):
  130. try:
  131. value = self._semlock._get_value()
  132. except Exception:
  133. value = "unknown"
  134. return f"<{self.__class__.__name__}(value={value})>"
  135. #
  136. # Bounded semaphore
  137. #
  138. class BoundedSemaphore(Semaphore):
  139. def __init__(self, value=1):
  140. SemLock.__init__(self, SEMAPHORE, value, value)
  141. def __repr__(self):
  142. try:
  143. value = self._semlock._get_value()
  144. except Exception:
  145. value = "unknown"
  146. return (
  147. f"<{self.__class__.__name__}(value={value}, "
  148. f"maxvalue={self._semlock.maxvalue})>"
  149. )
  150. #
  151. # Non-recursive lock
  152. #
  153. class Lock(SemLock):
  154. def __init__(self):
  155. super().__init__(SEMAPHORE, 1, 1)
  156. def __repr__(self):
  157. try:
  158. if self._semlock._is_mine():
  159. name = process.current_process().name
  160. if threading.current_thread().name != "MainThread":
  161. name = f"{name}|{threading.current_thread().name}"
  162. elif self._semlock._get_value() == 1:
  163. name = "None"
  164. elif self._semlock._count() > 0:
  165. name = "SomeOtherThread"
  166. else:
  167. name = "SomeOtherProcess"
  168. except Exception:
  169. name = "unknown"
  170. return f"<{self.__class__.__name__}(owner={name})>"
  171. #
  172. # Recursive lock
  173. #
  174. class RLock(SemLock):
  175. def __init__(self):
  176. super().__init__(RECURSIVE_MUTEX, 1, 1)
  177. def __repr__(self):
  178. try:
  179. if self._semlock._is_mine():
  180. name = process.current_process().name
  181. if threading.current_thread().name != "MainThread":
  182. name = f"{name}|{threading.current_thread().name}"
  183. count = self._semlock._count()
  184. elif self._semlock._get_value() == 1:
  185. name, count = "None", 0
  186. elif self._semlock._count() > 0:
  187. name, count = "SomeOtherThread", "nonzero"
  188. else:
  189. name, count = "SomeOtherProcess", "nonzero"
  190. except Exception:
  191. name, count = "unknown", "unknown"
  192. return f"<{self.__class__.__name__}({name}, {count})>"
  193. #
  194. # Condition variable
  195. #
  196. class Condition:
  197. def __init__(self, lock=None):
  198. self._lock = lock or RLock()
  199. self._sleeping_count = Semaphore(0)
  200. self._woken_count = Semaphore(0)
  201. self._wait_semaphore = Semaphore(0)
  202. self._make_methods()
  203. def __getstate__(self):
  204. assert_spawning(self)
  205. return (
  206. self._lock,
  207. self._sleeping_count,
  208. self._woken_count,
  209. self._wait_semaphore,
  210. )
  211. def __setstate__(self, state):
  212. (
  213. self._lock,
  214. self._sleeping_count,
  215. self._woken_count,
  216. self._wait_semaphore,
  217. ) = state
  218. self._make_methods()
  219. def __enter__(self):
  220. return self._lock.__enter__()
  221. def __exit__(self, *args):
  222. return self._lock.__exit__(*args)
  223. def _make_methods(self):
  224. self.acquire = self._lock.acquire
  225. self.release = self._lock.release
  226. def __repr__(self):
  227. try:
  228. num_waiters = (
  229. self._sleeping_count._semlock._get_value()
  230. - self._woken_count._semlock._get_value()
  231. )
  232. except Exception:
  233. num_waiters = "unknown"
  234. return f"<{self.__class__.__name__}({self._lock}, {num_waiters})>"
  235. def wait(self, timeout=None):
  236. assert (
  237. self._lock._semlock._is_mine()
  238. ), "must acquire() condition before using wait()"
  239. # indicate that this thread is going to sleep
  240. self._sleeping_count.release()
  241. # release lock
  242. count = self._lock._semlock._count()
  243. for _ in range(count):
  244. self._lock.release()
  245. try:
  246. # wait for notification or timeout
  247. return self._wait_semaphore.acquire(True, timeout)
  248. finally:
  249. # indicate that this thread has woken
  250. self._woken_count.release()
  251. # reacquire lock
  252. for _ in range(count):
  253. self._lock.acquire()
  254. def notify(self):
  255. assert self._lock._semlock._is_mine(), "lock is not owned"
  256. assert not self._wait_semaphore.acquire(False)
  257. # to take account of timeouts since last notify() we subtract
  258. # woken_count from sleeping_count and rezero woken_count
  259. while self._woken_count.acquire(False):
  260. res = self._sleeping_count.acquire(False)
  261. assert res
  262. if self._sleeping_count.acquire(False): # try grabbing a sleeper
  263. self._wait_semaphore.release() # wake up one sleeper
  264. self._woken_count.acquire() # wait for the sleeper to wake
  265. # rezero _wait_semaphore in case a timeout just happened
  266. self._wait_semaphore.acquire(False)
  267. def notify_all(self):
  268. assert self._lock._semlock._is_mine(), "lock is not owned"
  269. assert not self._wait_semaphore.acquire(False)
  270. # to take account of timeouts since last notify*() we subtract
  271. # woken_count from sleeping_count and rezero woken_count
  272. while self._woken_count.acquire(False):
  273. res = self._sleeping_count.acquire(False)
  274. assert res
  275. sleepers = 0
  276. while self._sleeping_count.acquire(False):
  277. self._wait_semaphore.release() # wake up one sleeper
  278. sleepers += 1
  279. if sleepers:
  280. for _ in range(sleepers):
  281. self._woken_count.acquire() # wait for a sleeper to wake
  282. # rezero wait_semaphore in case some timeouts just happened
  283. while self._wait_semaphore.acquire(False):
  284. pass
  285. def wait_for(self, predicate, timeout=None):
  286. result = predicate()
  287. if result:
  288. return result
  289. if timeout is not None:
  290. endtime = _time() + timeout
  291. else:
  292. endtime = None
  293. waittime = None
  294. while not result:
  295. if endtime is not None:
  296. waittime = endtime - _time()
  297. if waittime <= 0:
  298. break
  299. self.wait(waittime)
  300. result = predicate()
  301. return result
  302. #
  303. # Event
  304. #
  305. class Event:
  306. def __init__(self):
  307. self._cond = Condition(Lock())
  308. self._flag = Semaphore(0)
  309. def is_set(self):
  310. with self._cond:
  311. if self._flag.acquire(False):
  312. self._flag.release()
  313. return True
  314. return False
  315. def set(self):
  316. with self._cond:
  317. self._flag.acquire(False)
  318. self._flag.release()
  319. self._cond.notify_all()
  320. def clear(self):
  321. with self._cond:
  322. self._flag.acquire(False)
  323. def wait(self, timeout=None):
  324. with self._cond:
  325. if self._flag.acquire(False):
  326. self._flag.release()
  327. else:
  328. self._cond.wait(timeout)
  329. if self._flag.acquire(False):
  330. self._flag.release()
  331. return True
  332. return False