_asyncio_loop.py 597 B

123456789101112131415161718192021222324252627
  1. import asyncio
  2. import sys
  3. def load_loop_functions():
  4. if sys.version_info >= (3, 7):
  5. def get_task_loop(task):
  6. return task.get_loop()
  7. get_running_loop = asyncio.get_running_loop
  8. else:
  9. def get_task_loop(task):
  10. return task._loop
  11. def get_running_loop():
  12. loop = asyncio.get_event_loop()
  13. if not loop.is_running():
  14. raise RuntimeError("There is no running event loop")
  15. return loop
  16. return get_task_loop, get_running_loop
  17. get_task_loop, get_running_loop = load_loop_functions()