queues.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. # Copyright 2015 The Tornado Authors
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. """Asynchronous queues for coroutines. These classes are very similar
  15. to those provided in the standard library's `asyncio package
  16. <https://docs.python.org/3/library/asyncio-queue.html>`_.
  17. .. warning::
  18. Unlike the standard library's `queue` module, the classes defined here
  19. are *not* thread-safe. To use these queues from another thread,
  20. use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
  21. before calling any queue methods.
  22. """
  23. import collections
  24. import datetime
  25. import heapq
  26. from tornado import gen, ioloop
  27. from tornado.concurrent import Future, future_set_result_unless_cancelled
  28. from tornado.locks import Event
  29. from typing import Union, TypeVar, Generic, Awaitable, Optional
  30. import typing
  31. if typing.TYPE_CHECKING:
  32. from typing import Deque, Tuple, Any # noqa: F401
  33. _T = TypeVar("_T")
  34. __all__ = ["Queue", "PriorityQueue", "LifoQueue", "QueueFull", "QueueEmpty"]
  35. class QueueEmpty(Exception):
  36. """Raised by `.Queue.get_nowait` when the queue has no items."""
  37. pass
  38. class QueueFull(Exception):
  39. """Raised by `.Queue.put_nowait` when a queue is at its maximum size."""
  40. pass
  41. def _set_timeout(
  42. future: Future, timeout: Union[None, float, datetime.timedelta]
  43. ) -> None:
  44. if timeout:
  45. def on_timeout() -> None:
  46. if not future.done():
  47. future.set_exception(gen.TimeoutError())
  48. io_loop = ioloop.IOLoop.current()
  49. timeout_handle = io_loop.add_timeout(timeout, on_timeout)
  50. future.add_done_callback(lambda _: io_loop.remove_timeout(timeout_handle))
  51. class _QueueIterator(Generic[_T]):
  52. def __init__(self, q: "Queue[_T]") -> None:
  53. self.q = q
  54. def __anext__(self) -> Awaitable[_T]:
  55. return self.q.get()
  56. class Queue(Generic[_T]):
  57. """Coordinate producer and consumer coroutines.
  58. If maxsize is 0 (the default) the queue size is unbounded.
  59. .. testcode::
  60. import asyncio
  61. from tornado.ioloop import IOLoop
  62. from tornado.queues import Queue
  63. q = Queue(maxsize=2)
  64. async def consumer():
  65. async for item in q:
  66. try:
  67. print('Doing work on %s' % item)
  68. await asyncio.sleep(0.01)
  69. finally:
  70. q.task_done()
  71. async def producer():
  72. for item in range(5):
  73. await q.put(item)
  74. print('Put %s' % item)
  75. async def main():
  76. # Start consumer without waiting (since it never finishes).
  77. IOLoop.current().spawn_callback(consumer)
  78. await producer() # Wait for producer to put all tasks.
  79. await q.join() # Wait for consumer to finish all tasks.
  80. print('Done')
  81. asyncio.run(main())
  82. .. testoutput::
  83. Put 0
  84. Put 1
  85. Doing work on 0
  86. Put 2
  87. Doing work on 1
  88. Put 3
  89. Doing work on 2
  90. Put 4
  91. Doing work on 3
  92. Doing work on 4
  93. Done
  94. In versions of Python without native coroutines (before 3.5),
  95. ``consumer()`` could be written as::
  96. @gen.coroutine
  97. def consumer():
  98. while True:
  99. item = yield q.get()
  100. try:
  101. print('Doing work on %s' % item)
  102. yield gen.sleep(0.01)
  103. finally:
  104. q.task_done()
  105. .. versionchanged:: 4.3
  106. Added ``async for`` support in Python 3.5.
  107. """
  108. # Exact type depends on subclass. Could be another generic
  109. # parameter and use protocols to be more precise here.
  110. _queue = None # type: Any
  111. def __init__(self, maxsize: int = 0) -> None:
  112. if maxsize is None:
  113. raise TypeError("maxsize can't be None")
  114. if maxsize < 0:
  115. raise ValueError("maxsize can't be negative")
  116. self._maxsize = maxsize
  117. self._init()
  118. self._getters = collections.deque([]) # type: Deque[Future[_T]]
  119. self._putters = collections.deque([]) # type: Deque[Tuple[_T, Future[None]]]
  120. self._unfinished_tasks = 0
  121. self._finished = Event()
  122. self._finished.set()
  123. @property
  124. def maxsize(self) -> int:
  125. """Number of items allowed in the queue."""
  126. return self._maxsize
  127. def qsize(self) -> int:
  128. """Number of items in the queue."""
  129. return len(self._queue)
  130. def empty(self) -> bool:
  131. return not self._queue
  132. def full(self) -> bool:
  133. if self.maxsize == 0:
  134. return False
  135. else:
  136. return self.qsize() >= self.maxsize
  137. def put(
  138. self, item: _T, timeout: Optional[Union[float, datetime.timedelta]] = None
  139. ) -> "Future[None]":
  140. """Put an item into the queue, perhaps waiting until there is room.
  141. Returns a Future, which raises `tornado.util.TimeoutError` after a
  142. timeout.
  143. ``timeout`` may be a number denoting a time (on the same
  144. scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
  145. `datetime.timedelta` object for a deadline relative to the
  146. current time.
  147. """
  148. future = Future() # type: Future[None]
  149. try:
  150. self.put_nowait(item)
  151. except QueueFull:
  152. self._putters.append((item, future))
  153. _set_timeout(future, timeout)
  154. else:
  155. future.set_result(None)
  156. return future
  157. def put_nowait(self, item: _T) -> None:
  158. """Put an item into the queue without blocking.
  159. If no free slot is immediately available, raise `QueueFull`.
  160. """
  161. self._consume_expired()
  162. if self._getters:
  163. assert self.empty(), "queue non-empty, why are getters waiting?"
  164. getter = self._getters.popleft()
  165. self.__put_internal(item)
  166. future_set_result_unless_cancelled(getter, self._get())
  167. elif self.full():
  168. raise QueueFull
  169. else:
  170. self.__put_internal(item)
  171. def get(
  172. self, timeout: Optional[Union[float, datetime.timedelta]] = None
  173. ) -> Awaitable[_T]:
  174. """Remove and return an item from the queue.
  175. Returns an awaitable which resolves once an item is available, or raises
  176. `tornado.util.TimeoutError` after a timeout.
  177. ``timeout`` may be a number denoting a time (on the same
  178. scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
  179. `datetime.timedelta` object for a deadline relative to the
  180. current time.
  181. .. note::
  182. The ``timeout`` argument of this method differs from that
  183. of the standard library's `queue.Queue.get`. That method
  184. interprets numeric values as relative timeouts; this one
  185. interprets them as absolute deadlines and requires
  186. ``timedelta`` objects for relative timeouts (consistent
  187. with other timeouts in Tornado).
  188. """
  189. future = Future() # type: Future[_T]
  190. try:
  191. future.set_result(self.get_nowait())
  192. except QueueEmpty:
  193. self._getters.append(future)
  194. _set_timeout(future, timeout)
  195. return future
  196. def get_nowait(self) -> _T:
  197. """Remove and return an item from the queue without blocking.
  198. Return an item if one is immediately available, else raise
  199. `QueueEmpty`.
  200. """
  201. self._consume_expired()
  202. if self._putters:
  203. assert self.full(), "queue not full, why are putters waiting?"
  204. item, putter = self._putters.popleft()
  205. self.__put_internal(item)
  206. future_set_result_unless_cancelled(putter, None)
  207. return self._get()
  208. elif self.qsize():
  209. return self._get()
  210. else:
  211. raise QueueEmpty
  212. def task_done(self) -> None:
  213. """Indicate that a formerly enqueued task is complete.
  214. Used by queue consumers. For each `.get` used to fetch a task, a
  215. subsequent call to `.task_done` tells the queue that the processing
  216. on the task is complete.
  217. If a `.join` is blocking, it resumes when all items have been
  218. processed; that is, when every `.put` is matched by a `.task_done`.
  219. Raises `ValueError` if called more times than `.put`.
  220. """
  221. if self._unfinished_tasks <= 0:
  222. raise ValueError("task_done() called too many times")
  223. self._unfinished_tasks -= 1
  224. if self._unfinished_tasks == 0:
  225. self._finished.set()
  226. def join(
  227. self, timeout: Optional[Union[float, datetime.timedelta]] = None
  228. ) -> Awaitable[None]:
  229. """Block until all items in the queue are processed.
  230. Returns an awaitable, which raises `tornado.util.TimeoutError` after a
  231. timeout.
  232. """
  233. return self._finished.wait(timeout)
  234. def __aiter__(self) -> _QueueIterator[_T]:
  235. return _QueueIterator(self)
  236. # These three are overridable in subclasses.
  237. def _init(self) -> None:
  238. self._queue = collections.deque()
  239. def _get(self) -> _T:
  240. return self._queue.popleft()
  241. def _put(self, item: _T) -> None:
  242. self._queue.append(item)
  243. # End of the overridable methods.
  244. def __put_internal(self, item: _T) -> None:
  245. self._unfinished_tasks += 1
  246. self._finished.clear()
  247. self._put(item)
  248. def _consume_expired(self) -> None:
  249. # Remove timed-out waiters.
  250. while self._putters and self._putters[0][1].done():
  251. self._putters.popleft()
  252. while self._getters and self._getters[0].done():
  253. self._getters.popleft()
  254. def __repr__(self) -> str:
  255. return f"<{type(self).__name__} at {hex(id(self))} {self._format()}>"
  256. def __str__(self) -> str:
  257. return f"<{type(self).__name__} {self._format()}>"
  258. def _format(self) -> str:
  259. result = f"maxsize={self.maxsize!r}"
  260. if getattr(self, "_queue", None):
  261. result += " queue=%r" % self._queue
  262. if self._getters:
  263. result += " getters[%s]" % len(self._getters)
  264. if self._putters:
  265. result += " putters[%s]" % len(self._putters)
  266. if self._unfinished_tasks:
  267. result += " tasks=%s" % self._unfinished_tasks
  268. return result
  269. class PriorityQueue(Queue):
  270. """A `.Queue` that retrieves entries in priority order, lowest first.
  271. Entries are typically tuples like ``(priority number, data)``.
  272. .. testcode::
  273. import asyncio
  274. from tornado.queues import PriorityQueue
  275. async def main():
  276. q = PriorityQueue()
  277. q.put((1, 'medium-priority item'))
  278. q.put((0, 'high-priority item'))
  279. q.put((10, 'low-priority item'))
  280. print(await q.get())
  281. print(await q.get())
  282. print(await q.get())
  283. asyncio.run(main())
  284. .. testoutput::
  285. (0, 'high-priority item')
  286. (1, 'medium-priority item')
  287. (10, 'low-priority item')
  288. """
  289. def _init(self) -> None:
  290. self._queue = []
  291. def _put(self, item: _T) -> None:
  292. heapq.heappush(self._queue, item)
  293. def _get(self) -> _T: # type: ignore[type-var]
  294. return heapq.heappop(self._queue)
  295. class LifoQueue(Queue):
  296. """A `.Queue` that retrieves the most recently put items first.
  297. .. testcode::
  298. import asyncio
  299. from tornado.queues import LifoQueue
  300. async def main():
  301. q = LifoQueue()
  302. q.put(3)
  303. q.put(2)
  304. q.put(1)
  305. print(await q.get())
  306. print(await q.get())
  307. print(await q.get())
  308. asyncio.run(main())
  309. .. testoutput::
  310. 1
  311. 2
  312. 3
  313. """
  314. def _init(self) -> None:
  315. self._queue = []
  316. def _put(self, item: _T) -> None:
  317. self._queue.append(item)
  318. def _get(self) -> _T: # type: ignore[type-var]
  319. return self._queue.pop()