locks.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. # Copyright 2015 The Tornado Authors
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. import collections
  15. import datetime
  16. import types
  17. from tornado import gen, ioloop
  18. from tornado.concurrent import Future, future_set_result_unless_cancelled
  19. from typing import Union, Optional, Type, Any, Awaitable
  20. import typing
  21. if typing.TYPE_CHECKING:
  22. from typing import Deque, Set # noqa: F401
  23. __all__ = ["Condition", "Event", "Semaphore", "BoundedSemaphore", "Lock"]
  24. class _TimeoutGarbageCollector:
  25. """Base class for objects that periodically clean up timed-out waiters.
  26. Avoids memory leak in a common pattern like:
  27. while True:
  28. yield condition.wait(short_timeout)
  29. print('looping....')
  30. """
  31. def __init__(self) -> None:
  32. self._waiters = collections.deque() # type: Deque[Future]
  33. self._timeouts = 0
  34. def _garbage_collect(self) -> None:
  35. # Occasionally clear timed-out waiters.
  36. self._timeouts += 1
  37. if self._timeouts > 100:
  38. self._timeouts = 0
  39. self._waiters = collections.deque(w for w in self._waiters if not w.done())
  40. class Condition(_TimeoutGarbageCollector):
  41. """A condition allows one or more coroutines to wait until notified.
  42. Like a standard `threading.Condition`, but does not need an underlying lock
  43. that is acquired and released.
  44. With a `Condition`, coroutines can wait to be notified by other coroutines:
  45. .. testcode::
  46. import asyncio
  47. from tornado import gen
  48. from tornado.locks import Condition
  49. condition = Condition()
  50. async def waiter():
  51. print("I'll wait right here")
  52. await condition.wait()
  53. print("I'm done waiting")
  54. async def notifier():
  55. print("About to notify")
  56. condition.notify()
  57. print("Done notifying")
  58. async def runner():
  59. # Wait for waiter() and notifier() in parallel
  60. await gen.multi([waiter(), notifier()])
  61. asyncio.run(runner())
  62. .. testoutput::
  63. I'll wait right here
  64. About to notify
  65. Done notifying
  66. I'm done waiting
  67. `wait` takes an optional ``timeout`` argument, which is either an absolute
  68. timestamp::
  69. io_loop = IOLoop.current()
  70. # Wait up to 1 second for a notification.
  71. await condition.wait(timeout=io_loop.time() + 1)
  72. ...or a `datetime.timedelta` for a timeout relative to the current time::
  73. # Wait up to 1 second.
  74. await condition.wait(timeout=datetime.timedelta(seconds=1))
  75. The method returns False if there's no notification before the deadline.
  76. .. versionchanged:: 5.0
  77. Previously, waiters could be notified synchronously from within
  78. `notify`. Now, the notification will always be received on the
  79. next iteration of the `.IOLoop`.
  80. """
  81. def __repr__(self) -> str:
  82. result = f"<{self.__class__.__name__}"
  83. if self._waiters:
  84. result += " waiters[%s]" % len(self._waiters)
  85. return result + ">"
  86. def wait(
  87. self, timeout: Optional[Union[float, datetime.timedelta]] = None
  88. ) -> Awaitable[bool]:
  89. """Wait for `.notify`.
  90. Returns a `.Future` that resolves ``True`` if the condition is notified,
  91. or ``False`` after a timeout.
  92. """
  93. waiter = Future() # type: Future[bool]
  94. self._waiters.append(waiter)
  95. if timeout:
  96. def on_timeout() -> None:
  97. if not waiter.done():
  98. future_set_result_unless_cancelled(waiter, False)
  99. self._garbage_collect()
  100. io_loop = ioloop.IOLoop.current()
  101. timeout_handle = io_loop.add_timeout(timeout, on_timeout)
  102. waiter.add_done_callback(lambda _: io_loop.remove_timeout(timeout_handle))
  103. return waiter
  104. def notify(self, n: int = 1) -> None:
  105. """Wake ``n`` waiters."""
  106. waiters = [] # Waiters we plan to run right now.
  107. while n and self._waiters:
  108. waiter = self._waiters.popleft()
  109. if not waiter.done(): # Might have timed out.
  110. n -= 1
  111. waiters.append(waiter)
  112. for waiter in waiters:
  113. future_set_result_unless_cancelled(waiter, True)
  114. def notify_all(self) -> None:
  115. """Wake all waiters."""
  116. self.notify(len(self._waiters))
  117. class Event:
  118. """An event blocks coroutines until its internal flag is set to True.
  119. Similar to `threading.Event`.
  120. A coroutine can wait for an event to be set. Once it is set, calls to
  121. ``yield event.wait()`` will not block unless the event has been cleared:
  122. .. testcode::
  123. import asyncio
  124. from tornado import gen
  125. from tornado.locks import Event
  126. event = Event()
  127. async def waiter():
  128. print("Waiting for event")
  129. await event.wait()
  130. print("Not waiting this time")
  131. await event.wait()
  132. print("Done")
  133. async def setter():
  134. print("About to set the event")
  135. event.set()
  136. async def runner():
  137. await gen.multi([waiter(), setter()])
  138. asyncio.run(runner())
  139. .. testoutput::
  140. Waiting for event
  141. About to set the event
  142. Not waiting this time
  143. Done
  144. """
  145. def __init__(self) -> None:
  146. self._value = False
  147. self._waiters = set() # type: Set[Future[None]]
  148. def __repr__(self) -> str:
  149. return "<{} {}>".format(
  150. self.__class__.__name__,
  151. "set" if self.is_set() else "clear",
  152. )
  153. def is_set(self) -> bool:
  154. """Return ``True`` if the internal flag is true."""
  155. return self._value
  156. def set(self) -> None:
  157. """Set the internal flag to ``True``. All waiters are awakened.
  158. Calling `.wait` once the flag is set will not block.
  159. """
  160. if not self._value:
  161. self._value = True
  162. for fut in self._waiters:
  163. if not fut.done():
  164. fut.set_result(None)
  165. def clear(self) -> None:
  166. """Reset the internal flag to ``False``.
  167. Calls to `.wait` will block until `.set` is called.
  168. """
  169. self._value = False
  170. def wait(
  171. self, timeout: Optional[Union[float, datetime.timedelta]] = None
  172. ) -> Awaitable[None]:
  173. """Block until the internal flag is true.
  174. Returns an awaitable, which raises `tornado.util.TimeoutError` after a
  175. timeout.
  176. """
  177. fut = Future() # type: Future[None]
  178. if self._value:
  179. fut.set_result(None)
  180. return fut
  181. self._waiters.add(fut)
  182. fut.add_done_callback(lambda fut: self._waiters.remove(fut))
  183. if timeout is None:
  184. return fut
  185. else:
  186. timeout_fut = gen.with_timeout(timeout, fut)
  187. # This is a slightly clumsy workaround for the fact that
  188. # gen.with_timeout doesn't cancel its futures. Cancelling
  189. # fut will remove it from the waiters list.
  190. timeout_fut.add_done_callback(
  191. lambda tf: fut.cancel() if not fut.done() else None
  192. )
  193. return timeout_fut
  194. class _ReleasingContextManager:
  195. """Releases a Lock or Semaphore at the end of a "with" statement.
  196. with (yield semaphore.acquire()):
  197. pass
  198. # Now semaphore.release() has been called.
  199. """
  200. def __init__(self, obj: Any) -> None:
  201. self._obj = obj
  202. def __enter__(self) -> None:
  203. pass
  204. def __exit__(
  205. self,
  206. exc_type: "Optional[Type[BaseException]]",
  207. exc_val: Optional[BaseException],
  208. exc_tb: Optional[types.TracebackType],
  209. ) -> None:
  210. self._obj.release()
  211. class Semaphore(_TimeoutGarbageCollector):
  212. """A lock that can be acquired a fixed number of times before blocking.
  213. A Semaphore manages a counter representing the number of `.release` calls
  214. minus the number of `.acquire` calls, plus an initial value. The `.acquire`
  215. method blocks if necessary until it can return without making the counter
  216. negative.
  217. Semaphores limit access to a shared resource. To allow access for two
  218. workers at a time:
  219. .. testsetup:: semaphore
  220. from collections import deque
  221. from tornado import gen
  222. from tornado.ioloop import IOLoop
  223. from tornado.concurrent import Future
  224. inited = False
  225. async def simulator(futures):
  226. for f in futures:
  227. # simulate the asynchronous passage of time
  228. await gen.sleep(0)
  229. await gen.sleep(0)
  230. f.set_result(None)
  231. def use_some_resource():
  232. global inited
  233. global futures_q
  234. if not inited:
  235. inited = True
  236. # Ensure reliable doctest output: resolve Futures one at a time.
  237. futures_q = deque([Future() for _ in range(3)])
  238. IOLoop.current().add_callback(simulator, list(futures_q))
  239. return futures_q.popleft()
  240. .. testcode:: semaphore
  241. import asyncio
  242. from tornado import gen
  243. from tornado.locks import Semaphore
  244. sem = Semaphore(2)
  245. async def worker(worker_id):
  246. await sem.acquire()
  247. try:
  248. print("Worker %d is working" % worker_id)
  249. await use_some_resource()
  250. finally:
  251. print("Worker %d is done" % worker_id)
  252. sem.release()
  253. async def runner():
  254. # Join all workers.
  255. await gen.multi([worker(i) for i in range(3)])
  256. asyncio.run(runner())
  257. .. testoutput:: semaphore
  258. Worker 0 is working
  259. Worker 1 is working
  260. Worker 0 is done
  261. Worker 2 is working
  262. Worker 1 is done
  263. Worker 2 is done
  264. Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
  265. the semaphore has been released once, by worker 0.
  266. The semaphore can be used as an async context manager::
  267. async def worker(worker_id):
  268. async with sem:
  269. print("Worker %d is working" % worker_id)
  270. await use_some_resource()
  271. # Now the semaphore has been released.
  272. print("Worker %d is done" % worker_id)
  273. For compatibility with older versions of Python, `.acquire` is a
  274. context manager, so ``worker`` could also be written as::
  275. @gen.coroutine
  276. def worker(worker_id):
  277. with (yield sem.acquire()):
  278. print("Worker %d is working" % worker_id)
  279. yield use_some_resource()
  280. # Now the semaphore has been released.
  281. print("Worker %d is done" % worker_id)
  282. .. versionchanged:: 4.3
  283. Added ``async with`` support in Python 3.5.
  284. """
  285. def __init__(self, value: int = 1) -> None:
  286. super().__init__()
  287. if value < 0:
  288. raise ValueError("semaphore initial value must be >= 0")
  289. self._value = value
  290. def __repr__(self) -> str:
  291. res = super().__repr__()
  292. extra = "locked" if self._value == 0 else f"unlocked,value:{self._value}"
  293. if self._waiters:
  294. extra = f"{extra},waiters:{len(self._waiters)}"
  295. return f"<{res[1:-1]} [{extra}]>"
  296. def release(self) -> None:
  297. """Increment the counter and wake one waiter."""
  298. self._value += 1
  299. while self._waiters:
  300. waiter = self._waiters.popleft()
  301. if not waiter.done():
  302. self._value -= 1
  303. # If the waiter is a coroutine paused at
  304. #
  305. # with (yield semaphore.acquire()):
  306. #
  307. # then the context manager's __exit__ calls release() at the end
  308. # of the "with" block.
  309. waiter.set_result(_ReleasingContextManager(self))
  310. break
  311. def acquire(
  312. self, timeout: Optional[Union[float, datetime.timedelta]] = None
  313. ) -> Awaitable[_ReleasingContextManager]:
  314. """Decrement the counter. Returns an awaitable.
  315. Block if the counter is zero and wait for a `.release`. The awaitable
  316. raises `.TimeoutError` after the deadline.
  317. """
  318. waiter = Future() # type: Future[_ReleasingContextManager]
  319. if self._value > 0:
  320. self._value -= 1
  321. waiter.set_result(_ReleasingContextManager(self))
  322. else:
  323. self._waiters.append(waiter)
  324. if timeout:
  325. def on_timeout() -> None:
  326. if not waiter.done():
  327. waiter.set_exception(gen.TimeoutError())
  328. self._garbage_collect()
  329. io_loop = ioloop.IOLoop.current()
  330. timeout_handle = io_loop.add_timeout(timeout, on_timeout)
  331. waiter.add_done_callback(
  332. lambda _: io_loop.remove_timeout(timeout_handle)
  333. )
  334. return waiter
  335. def __enter__(self) -> None:
  336. raise RuntimeError("Use 'async with' instead of 'with' for Semaphore")
  337. def __exit__(
  338. self,
  339. typ: "Optional[Type[BaseException]]",
  340. value: Optional[BaseException],
  341. traceback: Optional[types.TracebackType],
  342. ) -> None:
  343. self.__enter__()
  344. async def __aenter__(self) -> None:
  345. await self.acquire()
  346. async def __aexit__(
  347. self,
  348. typ: "Optional[Type[BaseException]]",
  349. value: Optional[BaseException],
  350. tb: Optional[types.TracebackType],
  351. ) -> None:
  352. self.release()
  353. class BoundedSemaphore(Semaphore):
  354. """A semaphore that prevents release() being called too many times.
  355. If `.release` would increment the semaphore's value past the initial
  356. value, it raises `ValueError`. Semaphores are mostly used to guard
  357. resources with limited capacity, so a semaphore released too many times
  358. is a sign of a bug.
  359. """
  360. def __init__(self, value: int = 1) -> None:
  361. super().__init__(value=value)
  362. self._initial_value = value
  363. def release(self) -> None:
  364. """Increment the counter and wake one waiter."""
  365. if self._value >= self._initial_value:
  366. raise ValueError("Semaphore released too many times")
  367. super().release()
  368. class Lock:
  369. """A lock for coroutines.
  370. A Lock begins unlocked, and `acquire` locks it immediately. While it is
  371. locked, a coroutine that yields `acquire` waits until another coroutine
  372. calls `release`.
  373. Releasing an unlocked lock raises `RuntimeError`.
  374. A Lock can be used as an async context manager with the ``async
  375. with`` statement:
  376. >>> from tornado import locks
  377. >>> lock = locks.Lock()
  378. >>>
  379. >>> async def f():
  380. ... async with lock:
  381. ... # Do something holding the lock.
  382. ... pass
  383. ...
  384. ... # Now the lock is released.
  385. For compatibility with older versions of Python, the `.acquire`
  386. method asynchronously returns a regular context manager:
  387. >>> async def f2():
  388. ... with (yield lock.acquire()):
  389. ... # Do something holding the lock.
  390. ... pass
  391. ...
  392. ... # Now the lock is released.
  393. .. versionchanged:: 4.3
  394. Added ``async with`` support in Python 3.5.
  395. """
  396. def __init__(self) -> None:
  397. self._block = BoundedSemaphore(value=1)
  398. def __repr__(self) -> str:
  399. return f"<{self.__class__.__name__} _block={self._block}>"
  400. def acquire(
  401. self, timeout: Optional[Union[float, datetime.timedelta]] = None
  402. ) -> Awaitable[_ReleasingContextManager]:
  403. """Attempt to lock. Returns an awaitable.
  404. Returns an awaitable, which raises `tornado.util.TimeoutError` after a
  405. timeout.
  406. """
  407. return self._block.acquire(timeout)
  408. def release(self) -> None:
  409. """Unlock.
  410. The first coroutine in line waiting for `acquire` gets the lock.
  411. If not locked, raise a `RuntimeError`.
  412. """
  413. try:
  414. self._block.release()
  415. except ValueError:
  416. raise RuntimeError("release unlocked lock")
  417. def __enter__(self) -> None:
  418. raise RuntimeError("Use `async with` instead of `with` for Lock")
  419. def __exit__(
  420. self,
  421. typ: "Optional[Type[BaseException]]",
  422. value: Optional[BaseException],
  423. tb: Optional[types.TracebackType],
  424. ) -> None:
  425. self.__enter__()
  426. async def __aenter__(self) -> None:
  427. await self.acquire()
  428. async def __aexit__(
  429. self,
  430. typ: "Optional[Type[BaseException]]",
  431. value: Optional[BaseException],
  432. tb: Optional[types.TracebackType],
  433. ) -> None:
  434. self.release()