gen_test.py 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123
  1. import asyncio
  2. from concurrent import futures
  3. import gc
  4. import datetime
  5. import platform
  6. import sys
  7. import time
  8. import weakref
  9. import unittest
  10. from tornado.concurrent import Future
  11. from tornado.log import app_log
  12. from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
  13. from tornado.test.util import skipNotCPython
  14. from tornado.web import Application, RequestHandler, HTTPError
  15. from tornado import gen
  16. try:
  17. import contextvars
  18. except ImportError:
  19. contextvars = None # type: ignore
  20. import typing
  21. if typing.TYPE_CHECKING:
  22. from typing import List, Optional # noqa: F401
  23. class GenBasicTest(AsyncTestCase):
  24. @gen.coroutine
  25. def delay(self, iterations, arg):
  26. """Returns arg after a number of IOLoop iterations."""
  27. for i in range(iterations):
  28. yield gen.moment
  29. raise gen.Return(arg)
  30. @gen.coroutine
  31. def async_future(self, result):
  32. yield gen.moment
  33. return result
  34. @gen.coroutine
  35. def async_exception(self, e):
  36. yield gen.moment
  37. raise e
  38. @gen.coroutine
  39. def add_one_async(self, x):
  40. yield gen.moment
  41. raise gen.Return(x + 1)
  42. def test_no_yield(self):
  43. @gen.coroutine
  44. def f():
  45. pass
  46. self.io_loop.run_sync(f)
  47. def test_exception_phase1(self):
  48. @gen.coroutine
  49. def f():
  50. 1 / 0
  51. self.assertRaises(ZeroDivisionError, self.io_loop.run_sync, f)
  52. def test_exception_phase2(self):
  53. @gen.coroutine
  54. def f():
  55. yield gen.moment
  56. 1 / 0
  57. self.assertRaises(ZeroDivisionError, self.io_loop.run_sync, f)
  58. def test_bogus_yield(self):
  59. @gen.coroutine
  60. def f():
  61. yield 42
  62. self.assertRaises(gen.BadYieldError, self.io_loop.run_sync, f)
  63. def test_bogus_yield_tuple(self):
  64. @gen.coroutine
  65. def f():
  66. yield (1, 2)
  67. self.assertRaises(gen.BadYieldError, self.io_loop.run_sync, f)
  68. def test_reuse(self):
  69. @gen.coroutine
  70. def f():
  71. yield gen.moment
  72. self.io_loop.run_sync(f)
  73. self.io_loop.run_sync(f)
  74. def test_none(self):
  75. @gen.coroutine
  76. def f():
  77. yield None
  78. self.io_loop.run_sync(f)
  79. def test_multi(self):
  80. @gen.coroutine
  81. def f():
  82. results = yield [self.add_one_async(1), self.add_one_async(2)]
  83. self.assertEqual(results, [2, 3])
  84. self.io_loop.run_sync(f)
  85. def test_multi_dict(self):
  86. @gen.coroutine
  87. def f():
  88. results = yield dict(foo=self.add_one_async(1), bar=self.add_one_async(2))
  89. self.assertEqual(results, dict(foo=2, bar=3))
  90. self.io_loop.run_sync(f)
  91. def test_multi_delayed(self):
  92. @gen.coroutine
  93. def f():
  94. # callbacks run at different times
  95. responses = yield gen.multi_future(
  96. [self.delay(3, "v1"), self.delay(1, "v2")]
  97. )
  98. self.assertEqual(responses, ["v1", "v2"])
  99. self.io_loop.run_sync(f)
  100. def test_multi_dict_delayed(self):
  101. @gen.coroutine
  102. def f():
  103. # callbacks run at different times
  104. responses = yield gen.multi_future(
  105. dict(foo=self.delay(3, "v1"), bar=self.delay(1, "v2"))
  106. )
  107. self.assertEqual(responses, dict(foo="v1", bar="v2"))
  108. self.io_loop.run_sync(f)
  109. @gen_test
  110. def test_multi_performance(self):
  111. # Yielding a list used to have quadratic performance; make
  112. # sure a large list stays reasonable. On my laptop a list of
  113. # 2000 used to take 1.8s, now it takes 0.12.
  114. start = time.time()
  115. yield [gen.moment for i in range(2000)]
  116. end = time.time()
  117. self.assertLess(end - start, 1.0)
  118. @gen_test
  119. def test_multi_empty(self):
  120. # Empty lists or dicts should return the same type.
  121. x = yield []
  122. self.assertTrue(isinstance(x, list))
  123. y = yield {}
  124. self.assertTrue(isinstance(y, dict))
  125. @gen_test
  126. def test_future(self):
  127. result = yield self.async_future(1)
  128. self.assertEqual(result, 1)
  129. @gen_test
  130. def test_multi_future(self):
  131. results = yield [self.async_future(1), self.async_future(2)]
  132. self.assertEqual(results, [1, 2])
  133. @gen_test
  134. def test_multi_future_duplicate(self):
  135. # Note that this doesn't work with native corotines, only with
  136. # decorated coroutines.
  137. f = self.async_future(2)
  138. results = yield [self.async_future(1), f, self.async_future(3), f]
  139. self.assertEqual(results, [1, 2, 3, 2])
  140. @gen_test
  141. def test_multi_dict_future(self):
  142. results = yield dict(foo=self.async_future(1), bar=self.async_future(2))
  143. self.assertEqual(results, dict(foo=1, bar=2))
  144. @gen_test
  145. def test_multi_exceptions(self):
  146. with ExpectLog(app_log, "Multiple exceptions in yield list"):
  147. with self.assertRaises(RuntimeError) as cm:
  148. yield gen.Multi(
  149. [
  150. self.async_exception(RuntimeError("error 1")),
  151. self.async_exception(RuntimeError("error 2")),
  152. ]
  153. )
  154. self.assertEqual(str(cm.exception), "error 1")
  155. # With only one exception, no error is logged.
  156. with self.assertRaises(RuntimeError):
  157. yield gen.Multi(
  158. [self.async_exception(RuntimeError("error 1")), self.async_future(2)]
  159. )
  160. # Exception logging may be explicitly quieted.
  161. with self.assertRaises(RuntimeError):
  162. yield gen.Multi(
  163. [
  164. self.async_exception(RuntimeError("error 1")),
  165. self.async_exception(RuntimeError("error 2")),
  166. ],
  167. quiet_exceptions=RuntimeError,
  168. )
  169. @gen_test
  170. def test_multi_future_exceptions(self):
  171. with ExpectLog(app_log, "Multiple exceptions in yield list"):
  172. with self.assertRaises(RuntimeError) as cm:
  173. yield [
  174. self.async_exception(RuntimeError("error 1")),
  175. self.async_exception(RuntimeError("error 2")),
  176. ]
  177. self.assertEqual(str(cm.exception), "error 1")
  178. # With only one exception, no error is logged.
  179. with self.assertRaises(RuntimeError):
  180. yield [self.async_exception(RuntimeError("error 1")), self.async_future(2)]
  181. # Exception logging may be explicitly quieted.
  182. with self.assertRaises(RuntimeError):
  183. yield gen.multi_future(
  184. [
  185. self.async_exception(RuntimeError("error 1")),
  186. self.async_exception(RuntimeError("error 2")),
  187. ],
  188. quiet_exceptions=RuntimeError,
  189. )
  190. def test_sync_raise_return(self):
  191. @gen.coroutine
  192. def f():
  193. raise gen.Return()
  194. self.io_loop.run_sync(f)
  195. def test_async_raise_return(self):
  196. @gen.coroutine
  197. def f():
  198. yield gen.moment
  199. raise gen.Return()
  200. self.io_loop.run_sync(f)
  201. def test_sync_raise_return_value(self):
  202. @gen.coroutine
  203. def f():
  204. raise gen.Return(42)
  205. self.assertEqual(42, self.io_loop.run_sync(f))
  206. def test_sync_raise_return_value_tuple(self):
  207. @gen.coroutine
  208. def f():
  209. raise gen.Return((1, 2))
  210. self.assertEqual((1, 2), self.io_loop.run_sync(f))
  211. def test_async_raise_return_value(self):
  212. @gen.coroutine
  213. def f():
  214. yield gen.moment
  215. raise gen.Return(42)
  216. self.assertEqual(42, self.io_loop.run_sync(f))
  217. def test_async_raise_return_value_tuple(self):
  218. @gen.coroutine
  219. def f():
  220. yield gen.moment
  221. raise gen.Return((1, 2))
  222. self.assertEqual((1, 2), self.io_loop.run_sync(f))
  223. class GenCoroutineTest(AsyncTestCase):
  224. def setUp(self):
  225. # Stray StopIteration exceptions can lead to tests exiting prematurely,
  226. # so we need explicit checks here to make sure the tests run all
  227. # the way through.
  228. self.finished = False
  229. super().setUp()
  230. def tearDown(self):
  231. super().tearDown()
  232. assert self.finished
  233. def test_attributes(self):
  234. self.finished = True
  235. def f():
  236. yield gen.moment
  237. coro = gen.coroutine(f)
  238. self.assertEqual(coro.__name__, f.__name__)
  239. self.assertEqual(coro.__module__, f.__module__)
  240. self.assertIs(coro.__wrapped__, f) # type: ignore
  241. def test_is_coroutine_function(self):
  242. self.finished = True
  243. def f():
  244. yield gen.moment
  245. coro = gen.coroutine(f)
  246. self.assertFalse(gen.is_coroutine_function(f))
  247. self.assertTrue(gen.is_coroutine_function(coro))
  248. self.assertFalse(gen.is_coroutine_function(coro()))
  249. @gen_test
  250. def test_sync_gen_return(self):
  251. @gen.coroutine
  252. def f():
  253. raise gen.Return(42)
  254. result = yield f()
  255. self.assertEqual(result, 42)
  256. self.finished = True
  257. @gen_test
  258. def test_async_gen_return(self):
  259. @gen.coroutine
  260. def f():
  261. yield gen.moment
  262. raise gen.Return(42)
  263. result = yield f()
  264. self.assertEqual(result, 42)
  265. self.finished = True
  266. @gen_test
  267. def test_sync_return(self):
  268. @gen.coroutine
  269. def f():
  270. return 42
  271. result = yield f()
  272. self.assertEqual(result, 42)
  273. self.finished = True
  274. @gen_test
  275. def test_async_return(self):
  276. @gen.coroutine
  277. def f():
  278. yield gen.moment
  279. return 42
  280. result = yield f()
  281. self.assertEqual(result, 42)
  282. self.finished = True
  283. @gen_test
  284. def test_async_early_return(self):
  285. # A yield statement exists but is not executed, which means
  286. # this function "returns" via an exception. This exception
  287. # doesn't happen before the exception handling is set up.
  288. @gen.coroutine
  289. def f():
  290. if True:
  291. return 42
  292. yield gen.Task(self.io_loop.add_callback)
  293. result = yield f()
  294. self.assertEqual(result, 42)
  295. self.finished = True
  296. @gen_test
  297. def test_async_await(self):
  298. @gen.coroutine
  299. def f1():
  300. yield gen.moment
  301. raise gen.Return(42)
  302. # This test verifies that an async function can await a
  303. # yield-based gen.coroutine, and that a gen.coroutine
  304. # (the test method itself) can yield an async function.
  305. async def f2():
  306. result = await f1()
  307. return result
  308. result = yield f2()
  309. self.assertEqual(result, 42)
  310. self.finished = True
  311. @gen_test
  312. def test_asyncio_sleep_zero(self):
  313. # asyncio.sleep(0) turns into a special case (equivalent to
  314. # `yield None`)
  315. async def f():
  316. import asyncio
  317. await asyncio.sleep(0)
  318. return 42
  319. result = yield f()
  320. self.assertEqual(result, 42)
  321. self.finished = True
  322. @gen_test
  323. def test_async_await_mixed_multi_native_future(self):
  324. @gen.coroutine
  325. def f1():
  326. yield gen.moment
  327. async def f2():
  328. await f1()
  329. return 42
  330. @gen.coroutine
  331. def f3():
  332. yield gen.moment
  333. raise gen.Return(43)
  334. results = yield [f2(), f3()]
  335. self.assertEqual(results, [42, 43])
  336. self.finished = True
  337. @gen_test
  338. def test_async_with_timeout(self):
  339. async def f1():
  340. return 42
  341. result = yield gen.with_timeout(datetime.timedelta(hours=1), f1())
  342. self.assertEqual(result, 42)
  343. self.finished = True
  344. @gen_test
  345. def test_sync_return_no_value(self):
  346. @gen.coroutine
  347. def f():
  348. return
  349. result = yield f()
  350. self.assertIsNone(result)
  351. self.finished = True
  352. @gen_test
  353. def test_async_return_no_value(self):
  354. @gen.coroutine
  355. def f():
  356. yield gen.moment
  357. return
  358. result = yield f()
  359. self.assertIsNone(result)
  360. self.finished = True
  361. @gen_test
  362. def test_sync_raise(self):
  363. @gen.coroutine
  364. def f():
  365. 1 / 0
  366. # The exception is raised when the future is yielded
  367. # (or equivalently when its result method is called),
  368. # not when the function itself is called).
  369. future = f()
  370. with self.assertRaises(ZeroDivisionError):
  371. yield future
  372. self.finished = True
  373. @gen_test
  374. def test_async_raise(self):
  375. @gen.coroutine
  376. def f():
  377. yield gen.moment
  378. 1 / 0
  379. future = f()
  380. with self.assertRaises(ZeroDivisionError):
  381. yield future
  382. self.finished = True
  383. @gen_test
  384. def test_replace_yieldpoint_exception(self):
  385. # Test exception handling: a coroutine can catch one exception
  386. # raised by a yield point and raise a different one.
  387. @gen.coroutine
  388. def f1():
  389. 1 / 0
  390. @gen.coroutine
  391. def f2():
  392. try:
  393. yield f1()
  394. except ZeroDivisionError:
  395. raise KeyError()
  396. future = f2()
  397. with self.assertRaises(KeyError):
  398. yield future
  399. self.finished = True
  400. @gen_test
  401. def test_swallow_yieldpoint_exception(self):
  402. # Test exception handling: a coroutine can catch an exception
  403. # raised by a yield point and not raise a different one.
  404. @gen.coroutine
  405. def f1():
  406. 1 / 0
  407. @gen.coroutine
  408. def f2():
  409. try:
  410. yield f1()
  411. except ZeroDivisionError:
  412. raise gen.Return(42)
  413. result = yield f2()
  414. self.assertEqual(result, 42)
  415. self.finished = True
  416. @gen_test
  417. def test_moment(self):
  418. calls = []
  419. @gen.coroutine
  420. def f(name, yieldable):
  421. for i in range(5):
  422. calls.append(name)
  423. yield yieldable
  424. # First, confirm the behavior without moment: each coroutine
  425. # monopolizes the event loop until it finishes.
  426. immediate = Future() # type: Future[None]
  427. immediate.set_result(None)
  428. yield [f("a", immediate), f("b", immediate)]
  429. self.assertEqual("".join(calls), "aaaaabbbbb")
  430. # With moment, they take turns.
  431. calls = []
  432. yield [f("a", gen.moment), f("b", gen.moment)]
  433. self.assertEqual("".join(calls), "ababababab")
  434. self.finished = True
  435. calls = []
  436. yield [f("a", gen.moment), f("b", immediate)]
  437. self.assertEqual("".join(calls), "abbbbbaaaa")
  438. @gen_test
  439. def test_sleep(self):
  440. yield gen.sleep(0.01)
  441. self.finished = True
  442. @gen_test
  443. def test_py3_leak_exception_context(self):
  444. class LeakedException(Exception):
  445. pass
  446. @gen.coroutine
  447. def inner(iteration):
  448. raise LeakedException(iteration)
  449. try:
  450. yield inner(1)
  451. except LeakedException as e:
  452. self.assertEqual(str(e), "1")
  453. self.assertIsNone(e.__context__)
  454. try:
  455. yield inner(2)
  456. except LeakedException as e:
  457. self.assertEqual(str(e), "2")
  458. self.assertIsNone(e.__context__)
  459. self.finished = True
  460. @skipNotCPython
  461. def test_coroutine_refcounting(self):
  462. # On CPython, tasks and their arguments should be released immediately
  463. # without waiting for garbage collection.
  464. @gen.coroutine
  465. def inner():
  466. class Foo:
  467. pass
  468. local_var = Foo()
  469. self.local_ref = weakref.ref(local_var)
  470. def dummy():
  471. pass
  472. yield gen.coroutine(dummy)()
  473. raise ValueError("Some error")
  474. @gen.coroutine
  475. def inner2():
  476. try:
  477. yield inner()
  478. except ValueError:
  479. pass
  480. self.io_loop.run_sync(inner2, timeout=3)
  481. self.assertIsNone(self.local_ref())
  482. self.finished = True
  483. def test_asyncio_future_debug_info(self):
  484. self.finished = True
  485. # Enable debug mode
  486. asyncio_loop = asyncio.get_event_loop()
  487. self.addCleanup(asyncio_loop.set_debug, asyncio_loop.get_debug())
  488. asyncio_loop.set_debug(True)
  489. def f():
  490. yield gen.moment
  491. coro = gen.coroutine(f)()
  492. self.assertIsInstance(coro, asyncio.Future)
  493. # We expect the coroutine repr() to show the place where
  494. # it was instantiated
  495. expected = "created at %s:%d" % (__file__, f.__code__.co_firstlineno + 3)
  496. actual = repr(coro)
  497. self.assertIn(expected, actual)
  498. @gen_test
  499. def test_asyncio_gather(self):
  500. # This demonstrates that tornado coroutines can be understood
  501. # by asyncio (This failed prior to Tornado 5.0).
  502. @gen.coroutine
  503. def f():
  504. yield gen.moment
  505. raise gen.Return(1)
  506. ret = yield asyncio.gather(f(), f())
  507. self.assertEqual(ret, [1, 1])
  508. self.finished = True
  509. class GenCoroutineSequenceHandler(RequestHandler):
  510. @gen.coroutine
  511. def get(self):
  512. yield gen.moment
  513. self.write("1")
  514. yield gen.moment
  515. self.write("2")
  516. yield gen.moment
  517. self.finish("3")
  518. class GenCoroutineUnfinishedSequenceHandler(RequestHandler):
  519. @gen.coroutine
  520. def get(self):
  521. yield gen.moment
  522. self.write("1")
  523. yield gen.moment
  524. self.write("2")
  525. yield gen.moment
  526. # just write, don't finish
  527. self.write("3")
  528. # "Undecorated" here refers to the absence of @asynchronous.
  529. class UndecoratedCoroutinesHandler(RequestHandler):
  530. @gen.coroutine
  531. def prepare(self):
  532. self.chunks = [] # type: List[str]
  533. yield gen.moment
  534. self.chunks.append("1")
  535. @gen.coroutine
  536. def get(self):
  537. self.chunks.append("2")
  538. yield gen.moment
  539. self.chunks.append("3")
  540. yield gen.moment
  541. self.write("".join(self.chunks))
  542. class AsyncPrepareErrorHandler(RequestHandler):
  543. @gen.coroutine
  544. def prepare(self):
  545. yield gen.moment
  546. raise HTTPError(403)
  547. def get(self):
  548. self.finish("ok")
  549. class NativeCoroutineHandler(RequestHandler):
  550. async def get(self):
  551. await asyncio.sleep(0)
  552. self.write("ok")
  553. class GenWebTest(AsyncHTTPTestCase):
  554. def get_app(self):
  555. return Application(
  556. [
  557. ("/coroutine_sequence", GenCoroutineSequenceHandler),
  558. (
  559. "/coroutine_unfinished_sequence",
  560. GenCoroutineUnfinishedSequenceHandler,
  561. ),
  562. ("/undecorated_coroutine", UndecoratedCoroutinesHandler),
  563. ("/async_prepare_error", AsyncPrepareErrorHandler),
  564. ("/native_coroutine", NativeCoroutineHandler),
  565. ]
  566. )
  567. def test_coroutine_sequence_handler(self):
  568. response = self.fetch("/coroutine_sequence")
  569. self.assertEqual(response.body, b"123")
  570. def test_coroutine_unfinished_sequence_handler(self):
  571. response = self.fetch("/coroutine_unfinished_sequence")
  572. self.assertEqual(response.body, b"123")
  573. def test_undecorated_coroutines(self):
  574. response = self.fetch("/undecorated_coroutine")
  575. self.assertEqual(response.body, b"123")
  576. def test_async_prepare_error_handler(self):
  577. response = self.fetch("/async_prepare_error")
  578. self.assertEqual(response.code, 403)
  579. def test_native_coroutine_handler(self):
  580. response = self.fetch("/native_coroutine")
  581. self.assertEqual(response.code, 200)
  582. self.assertEqual(response.body, b"ok")
  583. class WithTimeoutTest(AsyncTestCase):
  584. @gen_test
  585. def test_timeout(self):
  586. with self.assertRaises(gen.TimeoutError):
  587. yield gen.with_timeout(datetime.timedelta(seconds=0.1), Future())
  588. @gen_test
  589. def test_completes_before_timeout(self):
  590. future = Future() # type: Future[str]
  591. self.io_loop.add_timeout(
  592. datetime.timedelta(seconds=0.1), lambda: future.set_result("asdf")
  593. )
  594. result = yield gen.with_timeout(datetime.timedelta(seconds=3600), future)
  595. self.assertEqual(result, "asdf")
  596. @gen_test
  597. def test_fails_before_timeout(self):
  598. future = Future() # type: Future[str]
  599. self.io_loop.add_timeout(
  600. datetime.timedelta(seconds=0.1),
  601. lambda: future.set_exception(ZeroDivisionError()),
  602. )
  603. with self.assertRaises(ZeroDivisionError):
  604. yield gen.with_timeout(datetime.timedelta(seconds=3600), future)
  605. @gen_test
  606. def test_already_resolved(self):
  607. future = Future() # type: Future[str]
  608. future.set_result("asdf")
  609. result = yield gen.with_timeout(datetime.timedelta(seconds=3600), future)
  610. self.assertEqual(result, "asdf")
  611. @gen_test
  612. def test_timeout_concurrent_future(self):
  613. # A concurrent future that does not resolve before the timeout.
  614. with futures.ThreadPoolExecutor(1) as executor:
  615. with self.assertRaises(gen.TimeoutError):
  616. yield gen.with_timeout(
  617. self.io_loop.time(), executor.submit(time.sleep, 0.1)
  618. )
  619. @gen_test
  620. def test_completed_concurrent_future(self):
  621. # A concurrent future that is resolved before we even submit it
  622. # to with_timeout.
  623. with futures.ThreadPoolExecutor(1) as executor:
  624. def dummy():
  625. pass
  626. f = executor.submit(dummy)
  627. f.result() # wait for completion
  628. yield gen.with_timeout(datetime.timedelta(seconds=3600), f)
  629. @gen_test
  630. def test_normal_concurrent_future(self):
  631. # A conccurrent future that resolves while waiting for the timeout.
  632. with futures.ThreadPoolExecutor(1) as executor:
  633. yield gen.with_timeout(
  634. datetime.timedelta(seconds=3600),
  635. executor.submit(lambda: time.sleep(0.01)),
  636. )
  637. class WaitIteratorTest(AsyncTestCase):
  638. @gen_test
  639. def test_empty_iterator(self):
  640. g = gen.WaitIterator()
  641. self.assertTrue(g.done(), "empty generator iterated")
  642. with self.assertRaises(ValueError):
  643. g = gen.WaitIterator(Future(), bar=Future())
  644. self.assertIsNone(g.current_index, "bad nil current index")
  645. self.assertIsNone(g.current_future, "bad nil current future")
  646. @gen_test
  647. def test_already_done(self):
  648. f1 = Future() # type: Future[int]
  649. f2 = Future() # type: Future[int]
  650. f3 = Future() # type: Future[int]
  651. f1.set_result(24)
  652. f2.set_result(42)
  653. f3.set_result(84)
  654. g = gen.WaitIterator(f1, f2, f3)
  655. i = 0
  656. while not g.done():
  657. r = yield g.next()
  658. # Order is not guaranteed, but the current implementation
  659. # preserves ordering of already-done Futures.
  660. if i == 0:
  661. self.assertEqual(g.current_index, 0)
  662. self.assertIs(g.current_future, f1)
  663. self.assertEqual(r, 24)
  664. elif i == 1:
  665. self.assertEqual(g.current_index, 1)
  666. self.assertIs(g.current_future, f2)
  667. self.assertEqual(r, 42)
  668. elif i == 2:
  669. self.assertEqual(g.current_index, 2)
  670. self.assertIs(g.current_future, f3)
  671. self.assertEqual(r, 84)
  672. i += 1
  673. self.assertIsNone(g.current_index, "bad nil current index")
  674. self.assertIsNone(g.current_future, "bad nil current future")
  675. dg = gen.WaitIterator(f1=f1, f2=f2)
  676. while not dg.done():
  677. dr = yield dg.next()
  678. if dg.current_index == "f1":
  679. self.assertTrue(
  680. dg.current_future == f1 and dr == 24,
  681. "WaitIterator dict status incorrect",
  682. )
  683. elif dg.current_index == "f2":
  684. self.assertTrue(
  685. dg.current_future == f2 and dr == 42,
  686. "WaitIterator dict status incorrect",
  687. )
  688. else:
  689. self.fail(f"got bad WaitIterator index {dg.current_index}")
  690. i += 1
  691. self.assertIsNone(g.current_index, "bad nil current index")
  692. self.assertIsNone(g.current_future, "bad nil current future")
  693. def finish_coroutines(self, iteration, futures):
  694. if iteration == 3:
  695. futures[2].set_result(24)
  696. elif iteration == 5:
  697. futures[0].set_exception(ZeroDivisionError())
  698. elif iteration == 8:
  699. futures[1].set_result(42)
  700. futures[3].set_result(84)
  701. if iteration < 8:
  702. self.io_loop.add_callback(self.finish_coroutines, iteration + 1, futures)
  703. @gen_test
  704. def test_iterator(self):
  705. futures = [Future(), Future(), Future(), Future()] # type: List[Future[int]]
  706. self.finish_coroutines(0, futures)
  707. g = gen.WaitIterator(*futures)
  708. i = 0
  709. while not g.done():
  710. try:
  711. r = yield g.next()
  712. except ZeroDivisionError:
  713. self.assertIs(g.current_future, futures[0], "exception future invalid")
  714. else:
  715. if i == 0:
  716. self.assertEqual(r, 24, "iterator value incorrect")
  717. self.assertEqual(g.current_index, 2, "wrong index")
  718. elif i == 2:
  719. self.assertEqual(r, 42, "iterator value incorrect")
  720. self.assertEqual(g.current_index, 1, "wrong index")
  721. elif i == 3:
  722. self.assertEqual(r, 84, "iterator value incorrect")
  723. self.assertEqual(g.current_index, 3, "wrong index")
  724. i += 1
  725. @gen_test
  726. def test_iterator_async_await(self):
  727. # Recreate the previous test with py35 syntax. It's a little clunky
  728. # because of the way the previous test handles an exception on
  729. # a single iteration.
  730. futures = [Future(), Future(), Future(), Future()] # type: List[Future[int]]
  731. self.finish_coroutines(0, futures)
  732. self.finished = False
  733. async def f():
  734. i = 0
  735. g = gen.WaitIterator(*futures)
  736. try:
  737. async for r in g:
  738. if i == 0:
  739. self.assertEqual(r, 24, "iterator value incorrect")
  740. self.assertEqual(g.current_index, 2, "wrong index")
  741. else:
  742. raise Exception("expected exception on iteration 1")
  743. i += 1
  744. except ZeroDivisionError:
  745. i += 1
  746. async for r in g:
  747. if i == 2:
  748. self.assertEqual(r, 42, "iterator value incorrect")
  749. self.assertEqual(g.current_index, 1, "wrong index")
  750. elif i == 3:
  751. self.assertEqual(r, 84, "iterator value incorrect")
  752. self.assertEqual(g.current_index, 3, "wrong index")
  753. else:
  754. raise Exception("didn't expect iteration %d" % i)
  755. i += 1
  756. self.finished = True
  757. yield f()
  758. self.assertTrue(self.finished)
  759. @gen_test
  760. def test_no_ref(self):
  761. # In this usage, there is no direct hard reference to the
  762. # WaitIterator itself, only the Future it returns. Since
  763. # WaitIterator uses weak references internally to improve GC
  764. # performance, this used to cause problems.
  765. yield gen.with_timeout(
  766. datetime.timedelta(seconds=0.1), gen.WaitIterator(gen.sleep(0)).next()
  767. )
  768. class RunnerGCTest(AsyncTestCase):
  769. def is_pypy3(self):
  770. return platform.python_implementation() == "PyPy" and sys.version_info > (3,)
  771. @gen_test
  772. def test_gc(self):
  773. # GitHub issue 1769: Runner objects can get GCed unexpectedly
  774. # while their future is alive.
  775. weakref_scope = [None] # type: List[Optional[weakref.ReferenceType]]
  776. def callback():
  777. gc.collect(2)
  778. weakref_scope[0]().set_result(123) # type: ignore
  779. @gen.coroutine
  780. def tester():
  781. fut = Future() # type: Future[int]
  782. weakref_scope[0] = weakref.ref(fut)
  783. self.io_loop.add_callback(callback)
  784. yield fut
  785. yield gen.with_timeout(datetime.timedelta(seconds=0.2), tester())
  786. def test_gc_infinite_coro(self):
  787. # GitHub issue 2229: suspended coroutines should be GCed when
  788. # their loop is closed, even if they're involved in a reference
  789. # cycle.
  790. loop = self.get_new_ioloop()
  791. result = [] # type: List[Optional[bool]]
  792. wfut = []
  793. @gen.coroutine
  794. def infinite_coro():
  795. try:
  796. while True:
  797. yield gen.sleep(1e-3)
  798. result.append(True)
  799. finally:
  800. # coroutine finalizer
  801. result.append(None)
  802. @gen.coroutine
  803. def do_something():
  804. fut = infinite_coro()
  805. fut._refcycle = fut # type: ignore
  806. wfut.append(weakref.ref(fut))
  807. yield gen.sleep(0.2)
  808. loop.run_sync(do_something)
  809. loop.close()
  810. gc.collect()
  811. # Future was collected
  812. self.assertIsNone(wfut[0]())
  813. # At least one wakeup
  814. self.assertGreaterEqual(len(result), 2)
  815. if not self.is_pypy3():
  816. # coroutine finalizer was called (not on PyPy3 apparently)
  817. self.assertIsNone(result[-1])
  818. def test_gc_infinite_async_await(self):
  819. # Same as test_gc_infinite_coro, but with a `async def` function
  820. import asyncio
  821. async def infinite_coro(result):
  822. try:
  823. while True:
  824. await gen.sleep(1e-3)
  825. result.append(True)
  826. finally:
  827. # coroutine finalizer
  828. result.append(None)
  829. loop = self.get_new_ioloop()
  830. result = [] # type: List[Optional[bool]]
  831. wfut = []
  832. @gen.coroutine
  833. def do_something():
  834. fut = asyncio.get_event_loop().create_task(infinite_coro(result))
  835. fut._refcycle = fut # type: ignore
  836. wfut.append(weakref.ref(fut))
  837. yield gen.sleep(0.2)
  838. loop.run_sync(do_something)
  839. with ExpectLog("asyncio", "Task was destroyed but it is pending"):
  840. loop.close()
  841. gc.collect()
  842. # Future was collected
  843. self.assertIsNone(wfut[0]())
  844. # At least one wakeup and one finally
  845. self.assertGreaterEqual(len(result), 2)
  846. if not self.is_pypy3():
  847. # coroutine finalizer was called (not on PyPy3 apparently)
  848. self.assertIsNone(result[-1])
  849. def test_multi_moment(self):
  850. # Test gen.multi with moment
  851. # now that it's not a real Future
  852. @gen.coroutine
  853. def wait_a_moment():
  854. result = yield gen.multi([gen.moment, gen.moment])
  855. raise gen.Return(result)
  856. loop = self.get_new_ioloop()
  857. result = loop.run_sync(wait_a_moment)
  858. self.assertEqual(result, [None, None])
  859. if contextvars is not None:
  860. ctx_var = contextvars.ContextVar("ctx_var") # type: contextvars.ContextVar[int]
  861. @unittest.skipIf(contextvars is None, "contextvars module not present")
  862. class ContextVarsTest(AsyncTestCase):
  863. async def native_root(self, x):
  864. ctx_var.set(x)
  865. await self.inner(x)
  866. @gen.coroutine
  867. def gen_root(self, x):
  868. ctx_var.set(x)
  869. yield
  870. yield self.inner(x)
  871. async def inner(self, x):
  872. self.assertEqual(ctx_var.get(), x)
  873. await self.gen_inner(x)
  874. self.assertEqual(ctx_var.get(), x)
  875. # IOLoop.run_in_executor doesn't automatically copy context
  876. ctx = contextvars.copy_context()
  877. await self.io_loop.run_in_executor(None, lambda: ctx.run(self.thread_inner, x))
  878. self.assertEqual(ctx_var.get(), x)
  879. # Neither does asyncio's run_in_executor.
  880. await asyncio.get_event_loop().run_in_executor(
  881. None, lambda: ctx.run(self.thread_inner, x)
  882. )
  883. self.assertEqual(ctx_var.get(), x)
  884. @gen.coroutine
  885. def gen_inner(self, x):
  886. self.assertEqual(ctx_var.get(), x)
  887. yield
  888. self.assertEqual(ctx_var.get(), x)
  889. def thread_inner(self, x):
  890. self.assertEqual(ctx_var.get(), x)
  891. @gen_test
  892. def test_propagate(self):
  893. # Verify that context vars get propagated across various
  894. # combinations of native and decorated coroutines.
  895. yield [
  896. self.native_root(1),
  897. self.native_root(2),
  898. self.gen_root(3),
  899. self.gen_root(4),
  900. ]
  901. @gen_test
  902. def test_reset(self):
  903. token = ctx_var.set(1)
  904. yield
  905. # reset asserts that we are still at the same level of the context tree,
  906. # so we must make sure that we maintain that property across yield.
  907. ctx_var.reset(token)
  908. @gen_test
  909. def test_propagate_to_first_yield_with_native_async_function(self):
  910. x = 10
  911. async def native_async_function():
  912. self.assertEqual(ctx_var.get(), x)
  913. ctx_var.set(x)
  914. yield native_async_function()
  915. if __name__ == "__main__":
  916. unittest.main()