future.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. """Future-returning APIs for tornado coroutines.
  2. .. seealso::
  3. :mod:`zmq.asyncio`
  4. """
  5. # Copyright (c) PyZMQ Developers.
  6. # Distributed under the terms of the Modified BSD License.
  7. from __future__ import annotations
  8. import asyncio
  9. import warnings
  10. from typing import Any
  11. from tornado.concurrent import Future
  12. from tornado.ioloop import IOLoop
  13. import zmq as _zmq
  14. from zmq._future import _AsyncPoller, _AsyncSocket
  15. class CancelledError(Exception):
  16. pass
  17. class _TornadoFuture(Future):
  18. """Subclass Tornado Future, reinstating cancellation."""
  19. def cancel(self):
  20. if self.done():
  21. return False
  22. self.set_exception(CancelledError())
  23. return True
  24. def cancelled(self):
  25. return self.done() and isinstance(self.exception(), CancelledError)
  26. class _CancellableTornadoTimeout:
  27. def __init__(self, loop, timeout):
  28. self.loop = loop
  29. self.timeout = timeout
  30. def cancel(self):
  31. self.loop.remove_timeout(self.timeout)
  32. # mixin for tornado/asyncio compatibility
  33. class _AsyncTornado:
  34. _Future: type[asyncio.Future] = _TornadoFuture
  35. _READ = IOLoop.READ
  36. _WRITE = IOLoop.WRITE
  37. def _default_loop(self):
  38. return IOLoop.current()
  39. def _call_later(self, delay, callback):
  40. io_loop = self._get_loop()
  41. timeout = io_loop.call_later(delay, callback)
  42. return _CancellableTornadoTimeout(io_loop, timeout)
  43. class Poller(_AsyncTornado, _AsyncPoller):
  44. def _watch_raw_socket(self, loop, socket, evt, f):
  45. """Schedule callback for a raw socket"""
  46. loop.add_handler(socket, lambda *args: f(), evt)
  47. def _unwatch_raw_sockets(self, loop, *sockets):
  48. """Unschedule callback for a raw socket"""
  49. for socket in sockets:
  50. loop.remove_handler(socket)
  51. class Socket(_AsyncTornado, _AsyncSocket):
  52. _poller_class = Poller
  53. Poller._socket_class = Socket
  54. class Context(_zmq.Context[Socket]):
  55. # avoid sharing instance with base Context class
  56. _instance = None
  57. io_loop = None
  58. @staticmethod
  59. def _socket_class(self, socket_type):
  60. return Socket(self, socket_type)
  61. def __init__(self: Context, *args: Any, **kwargs: Any) -> None:
  62. io_loop = kwargs.pop('io_loop', None)
  63. if io_loop is not None:
  64. warnings.warn(
  65. f"{self.__class__.__name__}(io_loop) argument is deprecated in pyzmq 22.2."
  66. " The currently active loop will always be used.",
  67. DeprecationWarning,
  68. stacklevel=2,
  69. )
  70. super().__init__(*args, **kwargs) # type: ignore