| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- """Future-returning APIs for tornado coroutines.
- .. seealso::
- :mod:`zmq.asyncio`
- """
- # Copyright (c) PyZMQ Developers.
- # Distributed under the terms of the Modified BSD License.
- from __future__ import annotations
- import asyncio
- import warnings
- from typing import Any
- from tornado.concurrent import Future
- from tornado.ioloop import IOLoop
- import zmq as _zmq
- from zmq._future import _AsyncPoller, _AsyncSocket
- class CancelledError(Exception):
- pass
- class _TornadoFuture(Future):
- """Subclass Tornado Future, reinstating cancellation."""
- def cancel(self):
- if self.done():
- return False
- self.set_exception(CancelledError())
- return True
- def cancelled(self):
- return self.done() and isinstance(self.exception(), CancelledError)
- class _CancellableTornadoTimeout:
- def __init__(self, loop, timeout):
- self.loop = loop
- self.timeout = timeout
- def cancel(self):
- self.loop.remove_timeout(self.timeout)
- # mixin for tornado/asyncio compatibility
- class _AsyncTornado:
- _Future: type[asyncio.Future] = _TornadoFuture
- _READ = IOLoop.READ
- _WRITE = IOLoop.WRITE
- def _default_loop(self):
- return IOLoop.current()
- def _call_later(self, delay, callback):
- io_loop = self._get_loop()
- timeout = io_loop.call_later(delay, callback)
- return _CancellableTornadoTimeout(io_loop, timeout)
- class Poller(_AsyncTornado, _AsyncPoller):
- def _watch_raw_socket(self, loop, socket, evt, f):
- """Schedule callback for a raw socket"""
- loop.add_handler(socket, lambda *args: f(), evt)
- def _unwatch_raw_sockets(self, loop, *sockets):
- """Unschedule callback for a raw socket"""
- for socket in sockets:
- loop.remove_handler(socket)
- class Socket(_AsyncTornado, _AsyncSocket):
- _poller_class = Poller
- Poller._socket_class = Socket
- class Context(_zmq.Context[Socket]):
- # avoid sharing instance with base Context class
- _instance = None
- io_loop = None
- @staticmethod
- def _socket_class(self, socket_type):
- return Socket(self, socket_type)
- def __init__(self: Context, *args: Any, **kwargs: Any) -> None:
- io_loop = kwargs.pop('io_loop', None)
- if io_loop is not None:
- warnings.warn(
- f"{self.__class__.__name__}(io_loop) argument is deprecated in pyzmq 22.2."
- " The currently active loop will always be used.",
- DeprecationWarning,
- stacklevel=2,
- )
- super().__init__(*args, **kwargs) # type: ignore
|