asyncio.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. """AsyncIO support for zmq
  2. Requires asyncio and Python 3.
  3. """
  4. # Copyright (c) PyZMQ Developers.
  5. # Distributed under the terms of the Modified BSD License.
  6. from __future__ import annotations
  7. import asyncio
  8. import selectors
  9. import sys
  10. import warnings
  11. from asyncio import Future, SelectorEventLoop
  12. from weakref import WeakKeyDictionary
  13. import zmq as _zmq
  14. from zmq import _future
  15. # registry of asyncio loop : selector thread
  16. _selectors: WeakKeyDictionary = WeakKeyDictionary()
  17. class ProactorSelectorThreadWarning(RuntimeWarning):
  18. """Warning class for notifying about the extra thread spawned by tornado
  19. We automatically support proactor via tornado's AddThreadSelectorEventLoop"""
  20. def _get_selector_windows(
  21. asyncio_loop,
  22. ) -> asyncio.AbstractEventLoop:
  23. """Get selector-compatible loop
  24. Returns an object with ``add_reader`` family of methods,
  25. either the loop itself or a SelectorThread instance.
  26. Workaround Windows proactor removal of
  27. *reader methods, which we need for zmq sockets.
  28. """
  29. if asyncio_loop in _selectors:
  30. return _selectors[asyncio_loop]
  31. # detect add_reader instead of checking for proactor?
  32. if hasattr(asyncio, "ProactorEventLoop") and isinstance(
  33. asyncio_loop,
  34. asyncio.ProactorEventLoop, # type: ignore
  35. ):
  36. try:
  37. from tornado.platform.asyncio import AddThreadSelectorEventLoop
  38. except ImportError:
  39. raise RuntimeError(
  40. "Proactor event loop does not implement add_reader family of methods required for zmq."
  41. " zmq will work with proactor if tornado >= 6.1 can be found."
  42. " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`"
  43. " or install 'tornado>=6.1' to avoid this error."
  44. )
  45. warnings.warn(
  46. "Proactor event loop does not implement add_reader family of methods required for zmq."
  47. " Registering an additional selector thread for add_reader support via tornado."
  48. " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`"
  49. " to avoid this warning.",
  50. RuntimeWarning,
  51. # stacklevel 5 matches most likely zmq.asyncio.Context().socket()
  52. stacklevel=5,
  53. )
  54. selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop(
  55. asyncio_loop
  56. ) # type: ignore
  57. # patch loop.close to also close the selector thread
  58. loop_close = asyncio_loop.close
  59. def _close_selector_and_loop():
  60. # restore original before calling selector.close,
  61. # which in turn calls eventloop.close!
  62. asyncio_loop.close = loop_close
  63. _selectors.pop(asyncio_loop, None)
  64. selector_loop.close()
  65. asyncio_loop.close = _close_selector_and_loop # type: ignore # mypy bug - assign a function to method
  66. return selector_loop
  67. else:
  68. return asyncio_loop
  69. def _get_selector_noop(loop) -> asyncio.AbstractEventLoop:
  70. """no-op on non-Windows"""
  71. return loop
  72. if sys.platform == "win32":
  73. _get_selector = _get_selector_windows
  74. else:
  75. _get_selector = _get_selector_noop
  76. class _AsyncIO:
  77. _Future = Future
  78. _WRITE = selectors.EVENT_WRITE
  79. _READ = selectors.EVENT_READ
  80. def _default_loop(self):
  81. try:
  82. return asyncio.get_running_loop()
  83. except RuntimeError:
  84. warnings.warn(
  85. "No running event loop. zmq.asyncio should be used from within an asyncio loop.",
  86. RuntimeWarning,
  87. stacklevel=4,
  88. )
  89. # get_event_loop deprecated in 3.10:
  90. return asyncio.get_event_loop()
  91. class Poller(_AsyncIO, _future._AsyncPoller):
  92. """Poller returning asyncio.Future for poll results."""
  93. def _watch_raw_socket(self, loop, socket, evt, f):
  94. """Schedule callback for a raw socket"""
  95. selector = _get_selector(loop)
  96. if evt & self._READ:
  97. selector.add_reader(socket, lambda *args: f())
  98. if evt & self._WRITE:
  99. selector.add_writer(socket, lambda *args: f())
  100. def _unwatch_raw_sockets(self, loop, *sockets):
  101. """Unschedule callback for a raw socket"""
  102. selector = _get_selector(loop)
  103. for socket in sockets:
  104. selector.remove_reader(socket)
  105. selector.remove_writer(socket)
  106. class Socket(_AsyncIO, _future._AsyncSocket):
  107. """Socket returning asyncio Futures for send/recv/poll methods."""
  108. _poller_class = Poller
  109. def _get_selector(self, io_loop=None):
  110. if io_loop is None:
  111. io_loop = self._get_loop()
  112. return _get_selector(io_loop)
  113. def _init_io_state(self, io_loop=None):
  114. """initialize the ioloop event handler"""
  115. self._get_selector(io_loop).add_reader(
  116. self._fd, lambda: self._handle_events(0, 0)
  117. )
  118. def _clear_io_state(self):
  119. """clear any ioloop event handler
  120. called once at close
  121. """
  122. loop = self._current_loop
  123. if loop and not loop.is_closed() and self._fd != -1:
  124. self._get_selector(loop).remove_reader(self._fd)
  125. Poller._socket_class = Socket
  126. class Context(_zmq.Context[Socket]):
  127. """Context for creating asyncio-compatible Sockets"""
  128. _socket_class = Socket
  129. # avoid sharing instance with base Context class
  130. _instance = None
  131. # overload with no changes to satisfy pyright
  132. def __init__(
  133. self: Context,
  134. io_threads: int | _zmq.Context = 1,
  135. shadow: _zmq.Context | int = 0,
  136. ) -> None:
  137. super().__init__(io_threads, shadow) # type: ignore
  138. class ZMQEventLoop(SelectorEventLoop):
  139. """DEPRECATED: AsyncIO eventloop using zmq_poll.
  140. pyzmq sockets should work with any asyncio event loop as of pyzmq 17.
  141. """
  142. def __init__(self, selector=None):
  143. _deprecated()
  144. return super().__init__(selector)
  145. _loop = None
  146. def _deprecated():
  147. if _deprecated.called: # type: ignore
  148. return
  149. _deprecated.called = True # type: ignore
  150. warnings.warn(
  151. "ZMQEventLoop and zmq.asyncio.install are deprecated in pyzmq 17. Special eventloop integration is no longer needed.",
  152. DeprecationWarning,
  153. stacklevel=3,
  154. )
  155. _deprecated.called = False # type: ignore
  156. def install():
  157. """DEPRECATED: No longer needed in pyzmq 17"""
  158. _deprecated()
  159. __all__ = [
  160. "Context",
  161. "Socket",
  162. "Poller",
  163. "ZMQEventLoop",
  164. "install",
  165. ]