_deprecated.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. """tornado IOLoop API with zmq compatibility
  2. If you have tornado ≥ 3.0, this is a subclass of tornado's IOLoop,
  3. otherwise we ship a minimal subset of tornado in zmq.eventloop.minitornado.
  4. The minimal shipped version of tornado's IOLoop does not include
  5. support for concurrent futures - this will only be available if you
  6. have tornado ≥ 3.0.
  7. """
  8. # Copyright (C) PyZMQ Developers
  9. # Distributed under the terms of the Modified BSD License.
  10. import time
  11. import warnings
  12. from typing import Tuple
  13. from zmq import ETERM, POLLERR, POLLIN, POLLOUT, Poller, ZMQError
  14. tornado_version: Tuple = ()
  15. try:
  16. import tornado
  17. tornado_version = tornado.version_info
  18. except (ImportError, AttributeError):
  19. pass
  20. from .minitornado.ioloop import PeriodicCallback, PollIOLoop
  21. from .minitornado.log import gen_log
  22. class DelayedCallback(PeriodicCallback):
  23. """Schedules the given callback to be called once.
  24. The callback is called once, after callback_time milliseconds.
  25. `start` must be called after the DelayedCallback is created.
  26. The timeout is calculated from when `start` is called.
  27. """
  28. def __init__(self, callback, callback_time, io_loop=None):
  29. # PeriodicCallback require callback_time to be positive
  30. warnings.warn(
  31. """DelayedCallback is deprecated.
  32. Use loop.add_timeout instead.""",
  33. DeprecationWarning,
  34. )
  35. callback_time = max(callback_time, 1e-3)
  36. super().__init__(callback, callback_time, io_loop)
  37. def start(self):
  38. """Starts the timer."""
  39. self._running = True
  40. self._firstrun = True
  41. self._next_timeout = time.time() + self.callback_time / 1000.0
  42. self.io_loop.add_timeout(self._next_timeout, self._run)
  43. def _run(self):
  44. if not self._running:
  45. return
  46. self._running = False
  47. try:
  48. self.callback()
  49. except Exception:
  50. gen_log.error("Error in delayed callback", exc_info=True)
  51. class ZMQPoller:
  52. """A poller that can be used in the tornado IOLoop.
  53. This simply wraps a regular zmq.Poller, scaling the timeout
  54. by 1000, so that it is in seconds rather than milliseconds.
  55. """
  56. def __init__(self):
  57. self._poller = Poller()
  58. @staticmethod
  59. def _map_events(events):
  60. """translate IOLoop.READ/WRITE/ERROR event masks into zmq.POLLIN/OUT/ERR"""
  61. z_events = 0
  62. if events & IOLoop.READ:
  63. z_events |= POLLIN
  64. if events & IOLoop.WRITE:
  65. z_events |= POLLOUT
  66. if events & IOLoop.ERROR:
  67. z_events |= POLLERR
  68. return z_events
  69. @staticmethod
  70. def _remap_events(z_events):
  71. """translate zmq.POLLIN/OUT/ERR event masks into IOLoop.READ/WRITE/ERROR"""
  72. events = 0
  73. if z_events & POLLIN:
  74. events |= IOLoop.READ
  75. if z_events & POLLOUT:
  76. events |= IOLoop.WRITE
  77. if z_events & POLLERR:
  78. events |= IOLoop.ERROR
  79. return events
  80. def register(self, fd, events):
  81. return self._poller.register(fd, self._map_events(events))
  82. def modify(self, fd, events):
  83. return self._poller.modify(fd, self._map_events(events))
  84. def unregister(self, fd):
  85. return self._poller.unregister(fd)
  86. def poll(self, timeout):
  87. """poll in seconds rather than milliseconds.
  88. Event masks will be IOLoop.READ/WRITE/ERROR
  89. """
  90. z_events = self._poller.poll(1000 * timeout)
  91. return [(fd, self._remap_events(evt)) for (fd, evt) in z_events]
  92. def close(self):
  93. pass
  94. class ZMQIOLoop(PollIOLoop):
  95. """ZMQ subclass of tornado's IOLoop
  96. Minor modifications, so that .current/.instance return self
  97. """
  98. _zmq_impl = ZMQPoller
  99. def initialize(self, impl=None, **kwargs):
  100. impl = self._zmq_impl() if impl is None else impl
  101. super().initialize(impl=impl, **kwargs)
  102. @classmethod
  103. def instance(cls, *args, **kwargs):
  104. """Returns a global `IOLoop` instance.
  105. Most applications have a single, global `IOLoop` running on the
  106. main thread. Use this method to get this instance from
  107. another thread. To get the current thread's `IOLoop`, use `current()`.
  108. """
  109. # install ZMQIOLoop as the active IOLoop implementation
  110. # when using tornado 3
  111. if tornado_version >= (3,):
  112. PollIOLoop.configure(cls)
  113. loop = PollIOLoop.instance(*args, **kwargs)
  114. if not isinstance(loop, cls):
  115. warnings.warn(
  116. f"IOLoop.current expected instance of {cls!r}, got {loop!r}",
  117. RuntimeWarning,
  118. stacklevel=2,
  119. )
  120. return loop
  121. @classmethod
  122. def current(cls, *args, **kwargs):
  123. """Returns the current thread’s IOLoop."""
  124. # install ZMQIOLoop as the active IOLoop implementation
  125. # when using tornado 3
  126. if tornado_version >= (3,):
  127. PollIOLoop.configure(cls)
  128. loop = PollIOLoop.current(*args, **kwargs)
  129. if not isinstance(loop, cls):
  130. warnings.warn(
  131. f"IOLoop.current expected instance of {cls!r}, got {loop!r}",
  132. RuntimeWarning,
  133. stacklevel=2,
  134. )
  135. return loop
  136. def start(self):
  137. try:
  138. super().start()
  139. except ZMQError as e:
  140. if e.errno == ETERM:
  141. # quietly return on ETERM
  142. pass
  143. else:
  144. raise
  145. # public API name
  146. IOLoop = ZMQIOLoop
  147. def install():
  148. """set the tornado IOLoop instance with the pyzmq IOLoop.
  149. After calling this function, tornado's IOLoop.instance() and pyzmq's
  150. IOLoop.instance() will return the same object.
  151. An assertion error will be raised if tornado's IOLoop has been initialized
  152. prior to calling this function.
  153. """
  154. from tornado import ioloop
  155. # check if tornado's IOLoop is already initialized to something other
  156. # than the pyzmq IOLoop instance:
  157. assert (
  158. not ioloop.IOLoop.initialized()
  159. ) or ioloop.IOLoop.instance() is IOLoop.instance(), (
  160. "tornado IOLoop already initialized"
  161. )
  162. if tornado_version >= (3,):
  163. # tornado 3 has an official API for registering new defaults, yay!
  164. ioloop.IOLoop.configure(ZMQIOLoop)
  165. else:
  166. # we have to set the global instance explicitly
  167. ioloop.IOLoop._instance = IOLoop.instance()