gen.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903
  1. """``tornado.gen`` implements generator-based coroutines.
  2. .. note::
  3. The "decorator and generator" approach in this module is a
  4. precursor to native coroutines (using ``async def`` and ``await``)
  5. which were introduced in Python 3.5. Applications that do not
  6. require compatibility with older versions of Python should use
  7. native coroutines instead. Some parts of this module are still
  8. useful with native coroutines, notably `multi`, `sleep`,
  9. `WaitIterator`, and `with_timeout`. Some of these functions have
  10. counterparts in the `asyncio` module which may be used as well,
  11. although the two may not necessarily be 100% compatible.
  12. Coroutines provide an easier way to work in an asynchronous
  13. environment than chaining callbacks. Code using coroutines is
  14. technically asynchronous, but it is written as a single generator
  15. instead of a collection of separate functions.
  16. For example, here's a coroutine-based handler:
  17. .. testcode::
  18. class GenAsyncHandler(RequestHandler):
  19. @gen.coroutine
  20. def get(self):
  21. http_client = AsyncHTTPClient()
  22. response = yield http_client.fetch("http://example.com")
  23. do_something_with_response(response)
  24. self.render("template.html")
  25. Asynchronous functions in Tornado return an ``Awaitable`` or `.Future`;
  26. yielding this object returns its result.
  27. You can also yield a list or dict of other yieldable objects, which
  28. will be started at the same time and run in parallel; a list or dict
  29. of results will be returned when they are all finished:
  30. .. testcode::
  31. @gen.coroutine
  32. def get(self):
  33. http_client = AsyncHTTPClient()
  34. response1, response2 = yield [http_client.fetch(url1),
  35. http_client.fetch(url2)]
  36. response_dict = yield dict(response3=http_client.fetch(url3),
  37. response4=http_client.fetch(url4))
  38. response3 = response_dict['response3']
  39. response4 = response_dict['response4']
  40. If ``tornado.platform.twisted`` is imported, it is also possible to
  41. yield Twisted's ``Deferred`` objects. See the `convert_yielded`
  42. function to extend this mechanism.
  43. .. versionchanged:: 3.2
  44. Dict support added.
  45. .. versionchanged:: 4.1
  46. Support added for yielding ``asyncio`` Futures and Twisted Deferreds
  47. via ``singledispatch``.
  48. """
  49. import asyncio
  50. import builtins
  51. import collections
  52. from collections.abc import Generator
  53. import concurrent.futures
  54. import datetime
  55. import functools
  56. from functools import singledispatch
  57. from inspect import isawaitable
  58. import sys
  59. import types
  60. from tornado.concurrent import (
  61. Future,
  62. is_future,
  63. chain_future,
  64. future_set_exc_info,
  65. future_add_done_callback,
  66. future_set_result_unless_cancelled,
  67. )
  68. from tornado.ioloop import IOLoop
  69. from tornado.log import app_log
  70. from tornado.util import TimeoutError
  71. try:
  72. import contextvars
  73. except ImportError:
  74. contextvars = None # type: ignore
  75. import typing
  76. from typing import (
  77. Mapping,
  78. Union,
  79. Any,
  80. Callable,
  81. List,
  82. Type,
  83. Tuple,
  84. Awaitable,
  85. Dict,
  86. Sequence,
  87. overload,
  88. )
  89. if typing.TYPE_CHECKING:
  90. from typing import Deque, Optional, Set, Iterable # noqa: F401
  91. _T = typing.TypeVar("_T")
  92. _Yieldable = Union[
  93. None, Awaitable, List[Awaitable], Dict[Any, Awaitable], concurrent.futures.Future
  94. ]
  95. class KeyReuseError(Exception):
  96. pass
  97. class UnknownKeyError(Exception):
  98. pass
  99. class LeakedCallbackError(Exception):
  100. pass
  101. class BadYieldError(Exception):
  102. pass
  103. class ReturnValueIgnoredError(Exception):
  104. pass
  105. def _value_from_stopiteration(e: Union[StopIteration, "Return"]) -> Any:
  106. try:
  107. # StopIteration has a value attribute beginning in py33.
  108. # So does our Return class.
  109. return e.value
  110. except AttributeError:
  111. pass
  112. try:
  113. # Cython backports coroutine functionality by putting the value in
  114. # e.args[0].
  115. return e.args[0]
  116. except (AttributeError, IndexError):
  117. return None
  118. def _create_future() -> Future:
  119. future = Future() # type: Future
  120. # Fixup asyncio debug info by removing extraneous stack entries
  121. source_traceback = getattr(future, "_source_traceback", ())
  122. while source_traceback:
  123. # Each traceback entry is equivalent to a
  124. # (filename, self.lineno, self.name, self.line) tuple
  125. filename = source_traceback[-1][0]
  126. if filename == __file__:
  127. del source_traceback[-1]
  128. else:
  129. break
  130. return future
  131. def _fake_ctx_run(f: Callable[..., _T], *args: Any, **kw: Any) -> _T:
  132. return f(*args, **kw)
  133. @overload
  134. def coroutine(
  135. func: Callable[..., "Generator[Any, Any, _T]"]
  136. ) -> Callable[..., "Future[_T]"]: ...
  137. @overload
  138. def coroutine(func: Callable[..., _T]) -> Callable[..., "Future[_T]"]: ...
  139. def coroutine(
  140. func: Union[Callable[..., "Generator[Any, Any, _T]"], Callable[..., _T]]
  141. ) -> Callable[..., "Future[_T]"]:
  142. """Decorator for asynchronous generators.
  143. For compatibility with older versions of Python, coroutines may
  144. also "return" by raising the special exception `Return(value)
  145. <Return>`.
  146. Functions with this decorator return a `.Future`.
  147. .. warning::
  148. When exceptions occur inside a coroutine, the exception
  149. information will be stored in the `.Future` object. You must
  150. examine the result of the `.Future` object, or the exception
  151. may go unnoticed by your code. This means yielding the function
  152. if called from another coroutine, using something like
  153. `.IOLoop.run_sync` for top-level calls, or passing the `.Future`
  154. to `.IOLoop.add_future`.
  155. .. versionchanged:: 6.0
  156. The ``callback`` argument was removed. Use the returned
  157. awaitable object instead.
  158. """
  159. @functools.wraps(func)
  160. def wrapper(*args, **kwargs):
  161. # type: (*Any, **Any) -> Future[_T]
  162. # This function is type-annotated with a comment to work around
  163. # https://bitbucket.org/pypy/pypy/issues/2868/segfault-with-args-type-annotation-in
  164. future = _create_future()
  165. if contextvars is not None:
  166. ctx_run = contextvars.copy_context().run # type: Callable
  167. else:
  168. ctx_run = _fake_ctx_run
  169. try:
  170. result = ctx_run(func, *args, **kwargs)
  171. except (Return, StopIteration) as e:
  172. result = _value_from_stopiteration(e)
  173. except Exception:
  174. future_set_exc_info(future, sys.exc_info())
  175. try:
  176. return future
  177. finally:
  178. # Avoid circular references
  179. future = None # type: ignore
  180. else:
  181. if isinstance(result, Generator):
  182. # Inline the first iteration of Runner.run. This lets us
  183. # avoid the cost of creating a Runner when the coroutine
  184. # never actually yields, which in turn allows us to
  185. # use "optional" coroutines in critical path code without
  186. # performance penalty for the synchronous case.
  187. try:
  188. yielded = ctx_run(next, result)
  189. except (StopIteration, Return) as e:
  190. future_set_result_unless_cancelled(
  191. future, _value_from_stopiteration(e)
  192. )
  193. except Exception:
  194. future_set_exc_info(future, sys.exc_info())
  195. else:
  196. # Provide strong references to Runner objects as long
  197. # as their result future objects also have strong
  198. # references (typically from the parent coroutine's
  199. # Runner). This keeps the coroutine's Runner alive.
  200. # We do this by exploiting the public API
  201. # add_done_callback() instead of putting a private
  202. # attribute on the Future.
  203. # (GitHub issues #1769, #2229).
  204. runner = Runner(ctx_run, result, future, yielded)
  205. future.add_done_callback(lambda _: runner)
  206. yielded = None
  207. try:
  208. return future
  209. finally:
  210. # Subtle memory optimization: if next() raised an exception,
  211. # the future's exc_info contains a traceback which
  212. # includes this stack frame. This creates a cycle,
  213. # which will be collected at the next full GC but has
  214. # been shown to greatly increase memory usage of
  215. # benchmarks (relative to the refcount-based scheme
  216. # used in the absence of cycles). We can avoid the
  217. # cycle by clearing the local variable after we return it.
  218. future = None # type: ignore
  219. future_set_result_unless_cancelled(future, result)
  220. return future
  221. wrapper.__wrapped__ = func # type: ignore
  222. wrapper.__tornado_coroutine__ = True # type: ignore
  223. return wrapper
  224. def is_coroutine_function(func: Any) -> bool:
  225. """Return whether *func* is a coroutine function, i.e. a function
  226. wrapped with `~.gen.coroutine`.
  227. .. versionadded:: 4.5
  228. """
  229. return getattr(func, "__tornado_coroutine__", False)
  230. class Return(Exception):
  231. """Special exception to return a value from a `coroutine`.
  232. This exception exists for compatibility with older versions of
  233. Python (before 3.3). In newer code use the ``return`` statement
  234. instead.
  235. If this exception is raised, its value argument is used as the
  236. result of the coroutine::
  237. @gen.coroutine
  238. def fetch_json(url):
  239. response = yield AsyncHTTPClient().fetch(url)
  240. raise gen.Return(json_decode(response.body))
  241. By analogy with the return statement, the value argument is optional.
  242. """
  243. def __init__(self, value: Any = None) -> None:
  244. super().__init__()
  245. self.value = value
  246. # Cython recognizes subclasses of StopIteration with a .args tuple.
  247. self.args = (value,)
  248. class WaitIterator:
  249. """Provides an iterator to yield the results of awaitables as they finish.
  250. Yielding a set of awaitables like this:
  251. ``results = yield [awaitable1, awaitable2]``
  252. pauses the coroutine until both ``awaitable1`` and ``awaitable2``
  253. return, and then restarts the coroutine with the results of both
  254. awaitables. If either awaitable raises an exception, the
  255. expression will raise that exception and all the results will be
  256. lost.
  257. If you need to get the result of each awaitable as soon as possible,
  258. or if you need the result of some awaitables even if others produce
  259. errors, you can use ``WaitIterator``::
  260. wait_iterator = gen.WaitIterator(awaitable1, awaitable2)
  261. while not wait_iterator.done():
  262. try:
  263. result = yield wait_iterator.next()
  264. except Exception as e:
  265. print("Error {} from {}".format(e, wait_iterator.current_future))
  266. else:
  267. print("Result {} received from {} at {}".format(
  268. result, wait_iterator.current_future,
  269. wait_iterator.current_index))
  270. Because results are returned as soon as they are available the
  271. output from the iterator *will not be in the same order as the
  272. input arguments*. If you need to know which future produced the
  273. current result, you can use the attributes
  274. ``WaitIterator.current_future``, or ``WaitIterator.current_index``
  275. to get the index of the awaitable from the input list. (if keyword
  276. arguments were used in the construction of the `WaitIterator`,
  277. ``current_index`` will use the corresponding keyword).
  278. `WaitIterator` implements the async iterator
  279. protocol, so it can be used with the ``async for`` statement (note
  280. that in this version the entire iteration is aborted if any value
  281. raises an exception, while the previous example can continue past
  282. individual errors)::
  283. async for result in gen.WaitIterator(future1, future2):
  284. print("Result {} received from {} at {}".format(
  285. result, wait_iterator.current_future,
  286. wait_iterator.current_index))
  287. .. versionadded:: 4.1
  288. .. versionchanged:: 4.3
  289. Added ``async for`` support in Python 3.5.
  290. """
  291. _unfinished = {} # type: Dict[Future, Union[int, str]]
  292. def __init__(self, *args: Future, **kwargs: Future) -> None:
  293. if args and kwargs:
  294. raise ValueError("You must provide args or kwargs, not both")
  295. if kwargs:
  296. self._unfinished = {f: k for (k, f) in kwargs.items()}
  297. futures = list(kwargs.values()) # type: Sequence[Future]
  298. else:
  299. self._unfinished = {f: i for (i, f) in enumerate(args)}
  300. futures = args
  301. self._finished = collections.deque() # type: Deque[Future]
  302. self.current_index = None # type: Optional[Union[str, int]]
  303. self.current_future = None # type: Optional[Future]
  304. self._running_future = None # type: Optional[Future]
  305. for future in futures:
  306. future_add_done_callback(future, self._done_callback)
  307. def done(self) -> bool:
  308. """Returns True if this iterator has no more results."""
  309. if self._finished or self._unfinished:
  310. return False
  311. # Clear the 'current' values when iteration is done.
  312. self.current_index = self.current_future = None
  313. return True
  314. def next(self) -> Future:
  315. """Returns a `.Future` that will yield the next available result.
  316. Note that this `.Future` will not be the same object as any of
  317. the inputs.
  318. """
  319. self._running_future = Future()
  320. if self._finished:
  321. return self._return_result(self._finished.popleft())
  322. return self._running_future
  323. def _done_callback(self, done: Future) -> None:
  324. if self._running_future and not self._running_future.done():
  325. self._return_result(done)
  326. else:
  327. self._finished.append(done)
  328. def _return_result(self, done: Future) -> Future:
  329. """Called set the returned future's state that of the future
  330. we yielded, and set the current future for the iterator.
  331. """
  332. if self._running_future is None:
  333. raise Exception("no future is running")
  334. chain_future(done, self._running_future)
  335. res = self._running_future
  336. self._running_future = None
  337. self.current_future = done
  338. self.current_index = self._unfinished.pop(done)
  339. return res
  340. def __aiter__(self) -> typing.AsyncIterator:
  341. return self
  342. def __anext__(self) -> Future:
  343. if self.done():
  344. # Lookup by name to silence pyflakes on older versions.
  345. raise getattr(builtins, "StopAsyncIteration")()
  346. return self.next()
  347. @overload
  348. def multi(
  349. children: Sequence[_Yieldable],
  350. quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]] = (),
  351. ) -> Future[List]: ...
  352. @overload
  353. def multi(
  354. children: Mapping[Any, _Yieldable],
  355. quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]] = (),
  356. ) -> Future[Dict]: ...
  357. def multi(
  358. children: Union[Sequence[_Yieldable], Mapping[Any, _Yieldable]],
  359. quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
  360. ) -> "Union[Future[List], Future[Dict]]":
  361. """Runs multiple asynchronous operations in parallel.
  362. ``children`` may either be a list or a dict whose values are
  363. yieldable objects. ``multi()`` returns a new yieldable
  364. object that resolves to a parallel structure containing their
  365. results. If ``children`` is a list, the result is a list of
  366. results in the same order; if it is a dict, the result is a dict
  367. with the same keys.
  368. That is, ``results = yield multi(list_of_futures)`` is equivalent
  369. to::
  370. results = []
  371. for future in list_of_futures:
  372. results.append(yield future)
  373. If any children raise exceptions, ``multi()`` will raise the first
  374. one. All others will be logged, unless they are of types
  375. contained in the ``quiet_exceptions`` argument.
  376. In a ``yield``-based coroutine, it is not normally necessary to
  377. call this function directly, since the coroutine runner will
  378. do it automatically when a list or dict is yielded. However,
  379. it is necessary in ``await``-based coroutines, or to pass
  380. the ``quiet_exceptions`` argument.
  381. This function is available under the names ``multi()`` and ``Multi()``
  382. for historical reasons.
  383. Cancelling a `.Future` returned by ``multi()`` does not cancel its
  384. children. `asyncio.gather` is similar to ``multi()``, but it does
  385. cancel its children.
  386. .. versionchanged:: 4.2
  387. If multiple yieldables fail, any exceptions after the first
  388. (which is raised) will be logged. Added the ``quiet_exceptions``
  389. argument to suppress this logging for selected exception types.
  390. .. versionchanged:: 4.3
  391. Replaced the class ``Multi`` and the function ``multi_future``
  392. with a unified function ``multi``. Added support for yieldables
  393. other than ``YieldPoint`` and `.Future`.
  394. """
  395. return multi_future(children, quiet_exceptions=quiet_exceptions)
  396. Multi = multi
  397. def multi_future(
  398. children: Union[Sequence[_Yieldable], Mapping[Any, _Yieldable]],
  399. quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
  400. ) -> "Union[Future[List], Future[Dict]]":
  401. """Wait for multiple asynchronous futures in parallel.
  402. Since Tornado 6.0, this function is exactly the same as `multi`.
  403. .. versionadded:: 4.0
  404. .. versionchanged:: 4.2
  405. If multiple ``Futures`` fail, any exceptions after the first (which is
  406. raised) will be logged. Added the ``quiet_exceptions``
  407. argument to suppress this logging for selected exception types.
  408. .. deprecated:: 4.3
  409. Use `multi` instead.
  410. """
  411. if isinstance(children, dict):
  412. keys = list(children.keys()) # type: Optional[List]
  413. children_seq = children.values() # type: Iterable
  414. else:
  415. keys = None
  416. children_seq = children
  417. children_futs = list(map(convert_yielded, children_seq))
  418. assert all(is_future(i) or isinstance(i, _NullFuture) for i in children_futs)
  419. unfinished_children = set(children_futs)
  420. future = _create_future()
  421. if not children_futs:
  422. future_set_result_unless_cancelled(future, {} if keys is not None else [])
  423. def callback(fut: Future) -> None:
  424. unfinished_children.remove(fut)
  425. if not unfinished_children:
  426. result_list = []
  427. for f in children_futs:
  428. try:
  429. result_list.append(f.result())
  430. except Exception as e:
  431. if future.done():
  432. if not isinstance(e, quiet_exceptions):
  433. app_log.error(
  434. "Multiple exceptions in yield list", exc_info=True
  435. )
  436. else:
  437. future_set_exc_info(future, sys.exc_info())
  438. if not future.done():
  439. if keys is not None:
  440. future_set_result_unless_cancelled(
  441. future, dict(zip(keys, result_list))
  442. )
  443. else:
  444. future_set_result_unless_cancelled(future, result_list)
  445. listening = set() # type: Set[Future]
  446. for f in children_futs:
  447. if f not in listening:
  448. listening.add(f)
  449. future_add_done_callback(f, callback)
  450. return future
  451. def maybe_future(x: Any) -> Future:
  452. """Converts ``x`` into a `.Future`.
  453. If ``x`` is already a `.Future`, it is simply returned; otherwise
  454. it is wrapped in a new `.Future`. This is suitable for use as
  455. ``result = yield gen.maybe_future(f())`` when you don't know whether
  456. ``f()`` returns a `.Future` or not.
  457. .. deprecated:: 4.3
  458. This function only handles ``Futures``, not other yieldable objects.
  459. Instead of `maybe_future`, check for the non-future result types
  460. you expect (often just ``None``), and ``yield`` anything unknown.
  461. """
  462. if is_future(x):
  463. return x
  464. else:
  465. fut = _create_future()
  466. fut.set_result(x)
  467. return fut
  468. def with_timeout(
  469. timeout: Union[float, datetime.timedelta],
  470. future: _Yieldable,
  471. quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
  472. ) -> Future:
  473. """Wraps a `.Future` (or other yieldable object) in a timeout.
  474. Raises `tornado.util.TimeoutError` if the input future does not
  475. complete before ``timeout``, which may be specified in any form
  476. allowed by `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or
  477. an absolute time relative to `.IOLoop.time`)
  478. If the wrapped `.Future` fails after it has timed out, the exception
  479. will be logged unless it is either of a type contained in
  480. ``quiet_exceptions`` (which may be an exception type or a sequence of
  481. types), or an ``asyncio.CancelledError``.
  482. The wrapped `.Future` is not canceled when the timeout expires,
  483. permitting it to be reused. `asyncio.wait_for` is similar to this
  484. function but it does cancel the wrapped `.Future` on timeout.
  485. .. versionadded:: 4.0
  486. .. versionchanged:: 4.1
  487. Added the ``quiet_exceptions`` argument and the logging of unhandled
  488. exceptions.
  489. .. versionchanged:: 4.4
  490. Added support for yieldable objects other than `.Future`.
  491. .. versionchanged:: 6.0.3
  492. ``asyncio.CancelledError`` is now always considered "quiet".
  493. .. versionchanged:: 6.2
  494. ``tornado.util.TimeoutError`` is now an alias to ``asyncio.TimeoutError``.
  495. """
  496. # It's tempting to optimize this by cancelling the input future on timeout
  497. # instead of creating a new one, but A) we can't know if we are the only
  498. # one waiting on the input future, so cancelling it might disrupt other
  499. # callers and B) concurrent futures can only be cancelled while they are
  500. # in the queue, so cancellation cannot reliably bound our waiting time.
  501. future_converted = convert_yielded(future)
  502. result = _create_future()
  503. chain_future(future_converted, result)
  504. io_loop = IOLoop.current()
  505. def error_callback(future: Future) -> None:
  506. try:
  507. future.result()
  508. except asyncio.CancelledError:
  509. pass
  510. except Exception as e:
  511. if not isinstance(e, quiet_exceptions):
  512. app_log.error(
  513. "Exception in Future %r after timeout", future, exc_info=True
  514. )
  515. def timeout_callback() -> None:
  516. if not result.done():
  517. result.set_exception(TimeoutError("Timeout"))
  518. # In case the wrapped future goes on to fail, log it.
  519. future_add_done_callback(future_converted, error_callback)
  520. timeout_handle = io_loop.add_timeout(timeout, timeout_callback)
  521. if isinstance(future_converted, Future):
  522. # We know this future will resolve on the IOLoop, so we don't
  523. # need the extra thread-safety of IOLoop.add_future (and we also
  524. # don't care about StackContext here.
  525. future_add_done_callback(
  526. future_converted, lambda future: io_loop.remove_timeout(timeout_handle)
  527. )
  528. else:
  529. # concurrent.futures.Futures may resolve on any thread, so we
  530. # need to route them back to the IOLoop.
  531. io_loop.add_future(
  532. future_converted, lambda future: io_loop.remove_timeout(timeout_handle)
  533. )
  534. return result
  535. def sleep(duration: float) -> "Future[None]":
  536. """Return a `.Future` that resolves after the given number of seconds.
  537. When used with ``yield`` in a coroutine, this is a non-blocking
  538. analogue to `time.sleep` (which should not be used in coroutines
  539. because it is blocking)::
  540. yield gen.sleep(0.5)
  541. Note that calling this function on its own does nothing; you must
  542. wait on the `.Future` it returns (usually by yielding it).
  543. .. versionadded:: 4.1
  544. """
  545. f = _create_future()
  546. IOLoop.current().call_later(
  547. duration, lambda: future_set_result_unless_cancelled(f, None)
  548. )
  549. return f
  550. class _NullFuture:
  551. """_NullFuture resembles a Future that finished with a result of None.
  552. It's not actually a `Future` to avoid depending on a particular event loop.
  553. Handled as a special case in the coroutine runner.
  554. We lie and tell the type checker that a _NullFuture is a Future so
  555. we don't have to leak _NullFuture into lots of public APIs. But
  556. this means that the type checker can't warn us when we're passing
  557. a _NullFuture into a code path that doesn't understand what to do
  558. with it.
  559. """
  560. def result(self) -> None:
  561. return None
  562. def done(self) -> bool:
  563. return True
  564. # _null_future is used as a dummy value in the coroutine runner. It differs
  565. # from moment in that moment always adds a delay of one IOLoop iteration
  566. # while _null_future is processed as soon as possible.
  567. _null_future = typing.cast(Future, _NullFuture())
  568. moment = typing.cast(Future, _NullFuture())
  569. moment.__doc__ = """A special object which may be yielded to allow the IOLoop to run for
  570. one iteration.
  571. This is not needed in normal use but it can be helpful in long-running
  572. coroutines that are likely to yield Futures that are ready instantly.
  573. Usage: ``yield gen.moment``
  574. In native coroutines, the equivalent of ``yield gen.moment`` is
  575. ``await asyncio.sleep(0)``.
  576. .. versionadded:: 4.0
  577. .. deprecated:: 4.5
  578. ``yield None`` (or ``yield`` with no argument) is now equivalent to
  579. ``yield gen.moment``.
  580. """
  581. class Runner:
  582. """Internal implementation of `tornado.gen.coroutine`.
  583. Maintains information about pending callbacks and their results.
  584. The results of the generator are stored in ``result_future`` (a
  585. `.Future`)
  586. """
  587. def __init__(
  588. self,
  589. ctx_run: Callable,
  590. gen: "Generator[_Yieldable, Any, _T]",
  591. result_future: "Future[_T]",
  592. first_yielded: _Yieldable,
  593. ) -> None:
  594. self.ctx_run = ctx_run
  595. self.gen = gen
  596. self.result_future = result_future
  597. self.future = _null_future # type: Union[None, Future]
  598. self.running = False
  599. self.finished = False
  600. self.io_loop = IOLoop.current()
  601. if self.ctx_run(self.handle_yield, first_yielded):
  602. gen = result_future = first_yielded = None # type: ignore
  603. self.ctx_run(self.run)
  604. def run(self) -> None:
  605. """Starts or resumes the generator, running until it reaches a
  606. yield point that is not ready.
  607. """
  608. if self.running or self.finished:
  609. return
  610. try:
  611. self.running = True
  612. while True:
  613. future = self.future
  614. if future is None:
  615. raise Exception("No pending future")
  616. if not future.done():
  617. return
  618. self.future = None
  619. try:
  620. try:
  621. value = future.result()
  622. except Exception as e:
  623. # Save the exception for later. It's important that
  624. # gen.throw() not be called inside this try/except block
  625. # because that makes sys.exc_info behave unexpectedly.
  626. exc: Optional[Exception] = e
  627. else:
  628. exc = None
  629. finally:
  630. future = None
  631. if exc is not None:
  632. try:
  633. yielded = self.gen.throw(exc)
  634. finally:
  635. # Break up a circular reference for faster GC on
  636. # CPython.
  637. del exc
  638. else:
  639. yielded = self.gen.send(value)
  640. except (StopIteration, Return) as e:
  641. self.finished = True
  642. self.future = _null_future
  643. future_set_result_unless_cancelled(
  644. self.result_future, _value_from_stopiteration(e)
  645. )
  646. self.result_future = None # type: ignore
  647. return
  648. except Exception:
  649. self.finished = True
  650. self.future = _null_future
  651. future_set_exc_info(self.result_future, sys.exc_info())
  652. self.result_future = None # type: ignore
  653. return
  654. if not self.handle_yield(yielded):
  655. return
  656. yielded = None
  657. finally:
  658. self.running = False
  659. def handle_yield(self, yielded: _Yieldable) -> bool:
  660. try:
  661. self.future = convert_yielded(yielded)
  662. except BadYieldError:
  663. self.future = Future()
  664. future_set_exc_info(self.future, sys.exc_info())
  665. if self.future is moment:
  666. self.io_loop.add_callback(self.ctx_run, self.run)
  667. return False
  668. elif self.future is None:
  669. raise Exception("no pending future")
  670. elif not self.future.done():
  671. def inner(f: Any) -> None:
  672. # Break a reference cycle to speed GC.
  673. f = None # noqa: F841
  674. self.ctx_run(self.run)
  675. self.io_loop.add_future(self.future, inner)
  676. return False
  677. return True
  678. def handle_exception(
  679. self, typ: Type[Exception], value: Exception, tb: types.TracebackType
  680. ) -> bool:
  681. if not self.running and not self.finished:
  682. self.future = Future()
  683. future_set_exc_info(self.future, (typ, value, tb))
  684. self.ctx_run(self.run)
  685. return True
  686. else:
  687. return False
  688. def _wrap_awaitable(awaitable: Awaitable) -> Future:
  689. # Convert Awaitables into Futures.
  690. # Note that we use ensure_future, which handles both awaitables
  691. # and coroutines, rather than create_task, which only accepts
  692. # coroutines. (ensure_future calls create_task if given a coroutine)
  693. fut = asyncio.ensure_future(awaitable)
  694. # See comments on IOLoop._pending_tasks.
  695. loop = IOLoop.current()
  696. loop._register_task(fut)
  697. fut.add_done_callback(lambda f: loop._unregister_task(f))
  698. return fut
  699. def convert_yielded(yielded: _Yieldable) -> Future:
  700. """Convert a yielded object into a `.Future`.
  701. The default implementation accepts lists, dictionaries, and
  702. Futures. This has the side effect of starting any coroutines that
  703. did not start themselves, similar to `asyncio.ensure_future`.
  704. If the `~functools.singledispatch` library is available, this function
  705. may be extended to support additional types. For example::
  706. @convert_yielded.register(asyncio.Future)
  707. def _(asyncio_future):
  708. return tornado.platform.asyncio.to_tornado_future(asyncio_future)
  709. .. versionadded:: 4.1
  710. """
  711. if yielded is None or yielded is moment:
  712. return moment
  713. elif yielded is _null_future:
  714. return _null_future
  715. elif isinstance(yielded, (list, dict)):
  716. return multi(yielded) # type: ignore
  717. elif is_future(yielded):
  718. return typing.cast(Future, yielded)
  719. elif isawaitable(yielded):
  720. return _wrap_awaitable(yielded) # type: ignore
  721. else:
  722. raise BadYieldError(f"yielded unknown object {yielded!r}")
  723. convert_yielded = singledispatch(convert_yielded)