queues_test.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  2. # not use this file except in compliance with the License. You may obtain
  3. # a copy of the License at
  4. #
  5. # http://www.apache.org/licenses/LICENSE-2.0
  6. #
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  9. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  10. # License for the specific language governing permissions and limitations
  11. # under the License.
  12. import asyncio
  13. from datetime import timedelta
  14. from random import random
  15. import unittest
  16. from tornado import gen, queues
  17. from tornado.gen import TimeoutError
  18. from tornado.testing import gen_test, AsyncTestCase
  19. class QueueBasicTest(AsyncTestCase):
  20. def test_repr_and_str(self):
  21. q = queues.Queue(maxsize=1) # type: queues.Queue[None]
  22. self.assertIn(hex(id(q)), repr(q))
  23. self.assertNotIn(hex(id(q)), str(q))
  24. q.get()
  25. for q_str in repr(q), str(q):
  26. self.assertTrue(q_str.startswith("<Queue"))
  27. self.assertIn("maxsize=1", q_str)
  28. self.assertIn("getters[1]", q_str)
  29. self.assertNotIn("putters", q_str)
  30. self.assertNotIn("tasks", q_str)
  31. q.put(None)
  32. q.put(None)
  33. # Now the queue is full, this putter blocks.
  34. q.put(None)
  35. for q_str in repr(q), str(q):
  36. self.assertNotIn("getters", q_str)
  37. self.assertIn("putters[1]", q_str)
  38. self.assertIn("tasks=2", q_str)
  39. def test_order(self):
  40. q = queues.Queue() # type: queues.Queue[int]
  41. for i in [1, 3, 2]:
  42. q.put_nowait(i)
  43. items = [q.get_nowait() for _ in range(3)]
  44. self.assertEqual([1, 3, 2], items)
  45. @gen_test
  46. def test_maxsize(self):
  47. self.assertRaises(TypeError, queues.Queue, maxsize=None)
  48. self.assertRaises(ValueError, queues.Queue, maxsize=-1)
  49. q = queues.Queue(maxsize=2) # type: queues.Queue[int]
  50. self.assertTrue(q.empty())
  51. self.assertFalse(q.full())
  52. self.assertEqual(2, q.maxsize)
  53. self.assertTrue(q.put(0).done())
  54. self.assertTrue(q.put(1).done())
  55. self.assertFalse(q.empty())
  56. self.assertTrue(q.full())
  57. put2 = q.put(2)
  58. self.assertFalse(put2.done())
  59. self.assertEqual(0, (yield q.get())) # Make room.
  60. self.assertTrue(put2.done())
  61. self.assertFalse(q.empty())
  62. self.assertTrue(q.full())
  63. class QueueGetTest(AsyncTestCase):
  64. @gen_test
  65. def test_blocking_get(self):
  66. q = queues.Queue() # type: queues.Queue[int]
  67. q.put_nowait(0)
  68. self.assertEqual(0, (yield q.get()))
  69. def test_nonblocking_get(self):
  70. q = queues.Queue() # type: queues.Queue[int]
  71. q.put_nowait(0)
  72. self.assertEqual(0, q.get_nowait())
  73. def test_nonblocking_get_exception(self):
  74. q = queues.Queue() # type: queues.Queue[int]
  75. self.assertRaises(queues.QueueEmpty, q.get_nowait)
  76. @gen_test
  77. def test_get_with_putters(self):
  78. q = queues.Queue(1) # type: queues.Queue[int]
  79. q.put_nowait(0)
  80. put = q.put(1)
  81. self.assertEqual(0, (yield q.get()))
  82. self.assertIsNone((yield put))
  83. @gen_test
  84. def test_blocking_get_wait(self):
  85. q = queues.Queue() # type: queues.Queue[int]
  86. q.put(0)
  87. self.io_loop.call_later(0.01, q.put_nowait, 1)
  88. self.io_loop.call_later(0.02, q.put_nowait, 2)
  89. self.assertEqual(0, (yield q.get(timeout=timedelta(seconds=1))))
  90. self.assertEqual(1, (yield q.get(timeout=timedelta(seconds=1))))
  91. @gen_test
  92. def test_get_timeout(self):
  93. q = queues.Queue() # type: queues.Queue[int]
  94. get_timeout = q.get(timeout=timedelta(seconds=0.01))
  95. get = q.get()
  96. with self.assertRaises(TimeoutError):
  97. yield get_timeout
  98. q.put_nowait(0)
  99. self.assertEqual(0, (yield get))
  100. @gen_test
  101. def test_get_timeout_preempted(self):
  102. q = queues.Queue() # type: queues.Queue[int]
  103. get = q.get(timeout=timedelta(seconds=0.01))
  104. q.put(0)
  105. yield gen.sleep(0.02)
  106. self.assertEqual(0, (yield get))
  107. @gen_test
  108. def test_get_clears_timed_out_putters(self):
  109. q = queues.Queue(1) # type: queues.Queue[int]
  110. # First putter succeeds, remainder block.
  111. putters = [q.put(i, timedelta(seconds=0.01)) for i in range(10)]
  112. put = q.put(10)
  113. self.assertEqual(10, len(q._putters))
  114. yield gen.sleep(0.02)
  115. self.assertEqual(10, len(q._putters))
  116. self.assertFalse(put.done()) # Final waiter is still active.
  117. q.put(11)
  118. self.assertEqual(0, (yield q.get())) # get() clears the waiters.
  119. self.assertEqual(1, len(q._putters))
  120. for putter in putters[1:]:
  121. self.assertRaises(TimeoutError, putter.result)
  122. @gen_test
  123. def test_get_clears_timed_out_getters(self):
  124. q = queues.Queue() # type: queues.Queue[int]
  125. getters = [
  126. asyncio.ensure_future(q.get(timedelta(seconds=0.01))) for _ in range(10)
  127. ]
  128. get = asyncio.ensure_future(q.get())
  129. self.assertEqual(11, len(q._getters))
  130. yield gen.sleep(0.02)
  131. self.assertEqual(11, len(q._getters))
  132. self.assertFalse(get.done()) # Final waiter is still active.
  133. q.get() # get() clears the waiters.
  134. self.assertEqual(2, len(q._getters))
  135. for getter in getters:
  136. self.assertRaises(TimeoutError, getter.result)
  137. @gen_test
  138. def test_async_for(self):
  139. q = queues.Queue() # type: queues.Queue[int]
  140. for i in range(5):
  141. q.put(i)
  142. async def f():
  143. results = []
  144. async for i in q:
  145. results.append(i)
  146. if i == 4:
  147. return results
  148. results = yield f()
  149. self.assertEqual(results, list(range(5)))
  150. class QueuePutTest(AsyncTestCase):
  151. @gen_test
  152. def test_blocking_put(self):
  153. q = queues.Queue() # type: queues.Queue[int]
  154. q.put(0)
  155. self.assertEqual(0, q.get_nowait())
  156. def test_nonblocking_put_exception(self):
  157. q = queues.Queue(1) # type: queues.Queue[int]
  158. q.put(0)
  159. self.assertRaises(queues.QueueFull, q.put_nowait, 1)
  160. @gen_test
  161. def test_put_with_getters(self):
  162. q = queues.Queue() # type: queues.Queue[int]
  163. get0 = q.get()
  164. get1 = q.get()
  165. yield q.put(0)
  166. self.assertEqual(0, (yield get0))
  167. yield q.put(1)
  168. self.assertEqual(1, (yield get1))
  169. @gen_test
  170. def test_nonblocking_put_with_getters(self):
  171. q = queues.Queue() # type: queues.Queue[int]
  172. get0 = q.get()
  173. get1 = q.get()
  174. q.put_nowait(0)
  175. # put_nowait does *not* immediately unblock getters.
  176. yield gen.moment
  177. self.assertEqual(0, (yield get0))
  178. q.put_nowait(1)
  179. yield gen.moment
  180. self.assertEqual(1, (yield get1))
  181. @gen_test
  182. def test_blocking_put_wait(self):
  183. q = queues.Queue(1) # type: queues.Queue[int]
  184. q.put_nowait(0)
  185. def get_and_discard():
  186. q.get()
  187. self.io_loop.call_later(0.01, get_and_discard)
  188. self.io_loop.call_later(0.02, get_and_discard)
  189. futures = [q.put(0), q.put(1)]
  190. self.assertFalse(any(f.done() for f in futures))
  191. yield futures
  192. @gen_test
  193. def test_put_timeout(self):
  194. q = queues.Queue(1) # type: queues.Queue[int]
  195. q.put_nowait(0) # Now it's full.
  196. put_timeout = q.put(1, timeout=timedelta(seconds=0.01))
  197. put = q.put(2)
  198. with self.assertRaises(TimeoutError):
  199. yield put_timeout
  200. self.assertEqual(0, q.get_nowait())
  201. # 1 was never put in the queue.
  202. self.assertEqual(2, (yield q.get()))
  203. # Final get() unblocked this putter.
  204. yield put
  205. @gen_test
  206. def test_put_timeout_preempted(self):
  207. q = queues.Queue(1) # type: queues.Queue[int]
  208. q.put_nowait(0)
  209. put = q.put(1, timeout=timedelta(seconds=0.01))
  210. q.get()
  211. yield gen.sleep(0.02)
  212. yield put # No TimeoutError.
  213. @gen_test
  214. def test_put_clears_timed_out_putters(self):
  215. q = queues.Queue(1) # type: queues.Queue[int]
  216. # First putter succeeds, remainder block.
  217. putters = [q.put(i, timedelta(seconds=0.01)) for i in range(10)]
  218. put = q.put(10)
  219. self.assertEqual(10, len(q._putters))
  220. yield gen.sleep(0.02)
  221. self.assertEqual(10, len(q._putters))
  222. self.assertFalse(put.done()) # Final waiter is still active.
  223. q.put(11) # put() clears the waiters.
  224. self.assertEqual(2, len(q._putters))
  225. for putter in putters[1:]:
  226. self.assertRaises(TimeoutError, putter.result)
  227. @gen_test
  228. def test_put_clears_timed_out_getters(self):
  229. q = queues.Queue() # type: queues.Queue[int]
  230. getters = [
  231. asyncio.ensure_future(q.get(timedelta(seconds=0.01))) for _ in range(10)
  232. ]
  233. get = asyncio.ensure_future(q.get())
  234. q.get()
  235. self.assertEqual(12, len(q._getters))
  236. yield gen.sleep(0.02)
  237. self.assertEqual(12, len(q._getters))
  238. self.assertFalse(get.done()) # Final waiters still active.
  239. q.put(0) # put() clears the waiters.
  240. self.assertEqual(1, len(q._getters))
  241. self.assertEqual(0, (yield get))
  242. for getter in getters:
  243. self.assertRaises(TimeoutError, getter.result)
  244. @gen_test
  245. def test_float_maxsize(self):
  246. # If a float is passed for maxsize, a reasonable limit should
  247. # be enforced, instead of being treated as unlimited.
  248. # It happens to be rounded up.
  249. # http://bugs.python.org/issue21723
  250. q = queues.Queue(maxsize=1.3) # type: ignore
  251. self.assertTrue(q.empty())
  252. self.assertFalse(q.full())
  253. q.put_nowait(0)
  254. q.put_nowait(1)
  255. self.assertFalse(q.empty())
  256. self.assertTrue(q.full())
  257. self.assertRaises(queues.QueueFull, q.put_nowait, 2)
  258. self.assertEqual(0, q.get_nowait())
  259. self.assertFalse(q.empty())
  260. self.assertFalse(q.full())
  261. yield q.put(2)
  262. put = q.put(3)
  263. self.assertFalse(put.done())
  264. self.assertEqual(1, (yield q.get()))
  265. yield put
  266. self.assertTrue(q.full())
  267. class QueueJoinTest(AsyncTestCase):
  268. queue_class = queues.Queue
  269. def test_task_done_underflow(self):
  270. q = self.queue_class() # type: queues.Queue
  271. self.assertRaises(ValueError, q.task_done)
  272. @gen_test
  273. def test_task_done(self):
  274. q = self.queue_class() # type: queues.Queue
  275. for i in range(100):
  276. q.put_nowait(i)
  277. self.accumulator = 0
  278. @gen.coroutine
  279. def worker():
  280. while True:
  281. item = yield q.get()
  282. self.accumulator += item
  283. q.task_done()
  284. yield gen.sleep(random() * 0.01)
  285. # Two coroutines share work.
  286. worker()
  287. worker()
  288. yield q.join()
  289. self.assertEqual(sum(range(100)), self.accumulator)
  290. @gen_test
  291. def test_task_done_delay(self):
  292. # Verify it is task_done(), not get(), that unblocks join().
  293. q = self.queue_class() # type: queues.Queue
  294. q.put_nowait(0)
  295. join = asyncio.ensure_future(q.join())
  296. self.assertFalse(join.done())
  297. yield q.get()
  298. self.assertFalse(join.done())
  299. yield gen.moment
  300. self.assertFalse(join.done())
  301. q.task_done()
  302. self.assertTrue(join.done())
  303. @gen_test
  304. def test_join_empty_queue(self):
  305. q = self.queue_class() # type: queues.Queue
  306. yield q.join()
  307. yield q.join()
  308. @gen_test
  309. def test_join_timeout(self):
  310. q = self.queue_class() # type: queues.Queue
  311. q.put(0)
  312. with self.assertRaises(TimeoutError):
  313. yield q.join(timeout=timedelta(seconds=0.01))
  314. class PriorityQueueJoinTest(QueueJoinTest):
  315. queue_class = queues.PriorityQueue
  316. @gen_test
  317. def test_order(self):
  318. q = self.queue_class(maxsize=2)
  319. q.put_nowait((1, "a"))
  320. q.put_nowait((0, "b"))
  321. self.assertTrue(q.full())
  322. q.put((3, "c"))
  323. q.put((2, "d"))
  324. self.assertEqual((0, "b"), q.get_nowait())
  325. self.assertEqual((1, "a"), (yield q.get()))
  326. self.assertEqual((2, "d"), q.get_nowait())
  327. self.assertEqual((3, "c"), (yield q.get()))
  328. self.assertTrue(q.empty())
  329. class LifoQueueJoinTest(QueueJoinTest):
  330. queue_class = queues.LifoQueue
  331. @gen_test
  332. def test_order(self):
  333. q = self.queue_class(maxsize=2)
  334. q.put_nowait(1)
  335. q.put_nowait(0)
  336. self.assertTrue(q.full())
  337. q.put(3)
  338. q.put(2)
  339. self.assertEqual(3, q.get_nowait())
  340. self.assertEqual(2, (yield q.get()))
  341. self.assertEqual(0, q.get_nowait())
  342. self.assertEqual(1, (yield q.get()))
  343. self.assertTrue(q.empty())
  344. class ProducerConsumerTest(AsyncTestCase):
  345. @gen_test
  346. def test_producer_consumer(self):
  347. q = queues.Queue(maxsize=3) # type: queues.Queue[int]
  348. history = []
  349. # We don't yield between get() and task_done(), so get() must wait for
  350. # the next tick. Otherwise we'd immediately call task_done and unblock
  351. # join() before q.put() resumes, and we'd only process the first four
  352. # items.
  353. @gen.coroutine
  354. def consumer():
  355. while True:
  356. history.append((yield q.get()))
  357. q.task_done()
  358. @gen.coroutine
  359. def producer():
  360. for item in range(10):
  361. yield q.put(item)
  362. consumer()
  363. yield producer()
  364. yield q.join()
  365. self.assertEqual(list(range(10)), history)
  366. if __name__ == "__main__":
  367. unittest.main()