nest_asyncio.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. """Patch asyncio to allow nested event loops."""
  2. import asyncio
  3. import asyncio.events as events
  4. import os
  5. import sys
  6. import threading
  7. from contextlib import contextmanager, suppress
  8. from heapq import heappop
  9. def apply(loop=None):
  10. """Patch asyncio to make its event loop reentrant."""
  11. _patch_asyncio()
  12. _patch_policy()
  13. _patch_tornado()
  14. loop = loop or asyncio.get_event_loop()
  15. _patch_loop(loop)
  16. def _patch_asyncio():
  17. """Patch asyncio module to use pure Python tasks and futures."""
  18. def run(main, *, debug=False):
  19. loop = asyncio.get_event_loop()
  20. loop.set_debug(debug)
  21. task = asyncio.ensure_future(main)
  22. try:
  23. return loop.run_until_complete(task)
  24. finally:
  25. if not task.done():
  26. task.cancel()
  27. with suppress(asyncio.CancelledError):
  28. loop.run_until_complete(task)
  29. def _get_event_loop(stacklevel=3):
  30. loop = events._get_running_loop()
  31. if loop is None:
  32. loop = events.get_event_loop_policy().get_event_loop()
  33. return loop
  34. # Use module level _current_tasks, all_tasks and patch run method.
  35. if hasattr(asyncio, '_nest_patched'):
  36. return
  37. if sys.version_info >= (3, 6, 0):
  38. asyncio.Task = asyncio.tasks._CTask = asyncio.tasks.Task = \
  39. asyncio.tasks._PyTask
  40. asyncio.Future = asyncio.futures._CFuture = asyncio.futures.Future = \
  41. asyncio.futures._PyFuture
  42. if sys.version_info < (3, 7, 0):
  43. asyncio.tasks._current_tasks = asyncio.tasks.Task._current_tasks
  44. asyncio.all_tasks = asyncio.tasks.Task.all_tasks
  45. if sys.version_info >= (3, 9, 0):
  46. events._get_event_loop = events.get_event_loop = \
  47. asyncio.get_event_loop = _get_event_loop
  48. asyncio.run = run
  49. asyncio._nest_patched = True
  50. def _patch_policy():
  51. """Patch the policy to always return a patched loop."""
  52. def get_event_loop(self):
  53. if self._local._loop is None:
  54. loop = self.new_event_loop()
  55. _patch_loop(loop)
  56. self.set_event_loop(loop)
  57. return self._local._loop
  58. policy = events.get_event_loop_policy()
  59. policy.__class__.get_event_loop = get_event_loop
  60. def _patch_loop(loop):
  61. """Patch loop to make it reentrant."""
  62. def run_forever(self):
  63. with manage_run(self), manage_asyncgens(self):
  64. while True:
  65. self._run_once()
  66. if self._stopping:
  67. break
  68. self._stopping = False
  69. def run_until_complete(self, future):
  70. with manage_run(self):
  71. f = asyncio.ensure_future(future, loop=self)
  72. if f is not future:
  73. f._log_destroy_pending = False
  74. while not f.done():
  75. self._run_once()
  76. if self._stopping:
  77. break
  78. if not f.done():
  79. raise RuntimeError(
  80. 'Event loop stopped before Future completed.')
  81. return f.result()
  82. def _run_once(self):
  83. """
  84. Simplified re-implementation of asyncio's _run_once that
  85. runs handles as they become ready.
  86. """
  87. ready = self._ready
  88. scheduled = self._scheduled
  89. while scheduled and scheduled[0]._cancelled:
  90. heappop(scheduled)
  91. timeout = (
  92. 0 if ready or self._stopping
  93. else min(max(
  94. scheduled[0]._when - self.time(), 0), 86400) if scheduled
  95. else None)
  96. event_list = self._selector.select(timeout)
  97. self._process_events(event_list)
  98. end_time = self.time() + self._clock_resolution
  99. while scheduled and scheduled[0]._when < end_time:
  100. handle = heappop(scheduled)
  101. ready.append(handle)
  102. for _ in range(len(ready)):
  103. if not ready:
  104. break
  105. handle = ready.popleft()
  106. if not handle._cancelled:
  107. # preempt the current task so that that checks in
  108. # Task.__step do not raise
  109. curr_task = curr_tasks.pop(self, None)
  110. try:
  111. handle._run()
  112. finally:
  113. # restore the current task
  114. if curr_task is not None:
  115. curr_tasks[self] = curr_task
  116. handle = None
  117. @contextmanager
  118. def manage_run(self):
  119. """Set up the loop for running."""
  120. self._check_closed()
  121. old_thread_id = self._thread_id
  122. old_running_loop = events._get_running_loop()
  123. try:
  124. self._thread_id = threading.get_ident()
  125. events._set_running_loop(self)
  126. self._num_runs_pending += 1
  127. if self._is_proactorloop:
  128. if self._self_reading_future is None:
  129. self.call_soon(self._loop_self_reading)
  130. yield
  131. finally:
  132. self._thread_id = old_thread_id
  133. events._set_running_loop(old_running_loop)
  134. self._num_runs_pending -= 1
  135. if self._is_proactorloop:
  136. if (self._num_runs_pending == 0
  137. and self._self_reading_future is not None):
  138. ov = self._self_reading_future._ov
  139. self._self_reading_future.cancel()
  140. if ov is not None:
  141. self._proactor._unregister(ov)
  142. self._self_reading_future = None
  143. @contextmanager
  144. def manage_asyncgens(self):
  145. if not hasattr(sys, 'get_asyncgen_hooks'):
  146. # Python version is too old.
  147. return
  148. old_agen_hooks = sys.get_asyncgen_hooks()
  149. try:
  150. self._set_coroutine_origin_tracking(self._debug)
  151. if self._asyncgens is not None:
  152. sys.set_asyncgen_hooks(
  153. firstiter=self._asyncgen_firstiter_hook,
  154. finalizer=self._asyncgen_finalizer_hook)
  155. yield
  156. finally:
  157. self._set_coroutine_origin_tracking(False)
  158. if self._asyncgens is not None:
  159. sys.set_asyncgen_hooks(*old_agen_hooks)
  160. def _check_running(self):
  161. """Do not throw exception if loop is already running."""
  162. pass
  163. if hasattr(loop, '_nest_patched'):
  164. return
  165. if not isinstance(loop, asyncio.BaseEventLoop):
  166. raise ValueError('Can\'t patch loop of type %s' % type(loop))
  167. cls = loop.__class__
  168. cls.run_forever = run_forever
  169. cls.run_until_complete = run_until_complete
  170. cls._run_once = _run_once
  171. cls._check_running = _check_running
  172. cls._check_runnung = _check_running # typo in Python 3.7 source
  173. cls._num_runs_pending = 1 if loop.is_running() else 0
  174. cls._is_proactorloop = (
  175. os.name == 'nt' and issubclass(cls, asyncio.ProactorEventLoop))
  176. if sys.version_info < (3, 7, 0):
  177. cls._set_coroutine_origin_tracking = cls._set_coroutine_wrapper
  178. curr_tasks = asyncio.tasks._current_tasks \
  179. if sys.version_info >= (3, 7, 0) else asyncio.Task._current_tasks
  180. cls._nest_patched = True
  181. def _patch_tornado():
  182. """
  183. If tornado is imported before nest_asyncio, make tornado aware of
  184. the pure-Python asyncio Future.
  185. """
  186. if 'tornado' in sys.modules:
  187. import tornado.concurrent as tc # type: ignore
  188. tc.Future = asyncio.Future
  189. if asyncio.Future not in tc.FUTURES:
  190. tc.FUTURES += (asyncio.Future,)