poll.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. """0MQ polling related functions and classes."""
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. from __future__ import annotations
  5. from typing import Any
  6. from zmq.backend import zmq_poll
  7. from zmq.constants import POLLERR, POLLIN, POLLOUT
  8. # -----------------------------------------------------------------------------
  9. # Polling related methods
  10. # -----------------------------------------------------------------------------
  11. class Poller:
  12. """A stateful poll interface that mirrors Python's built-in poll."""
  13. sockets: list[tuple[Any, int]]
  14. _map: dict
  15. def __init__(self) -> None:
  16. self.sockets = []
  17. self._map = {}
  18. def __contains__(self, socket: Any) -> bool:
  19. return socket in self._map
  20. def register(self, socket: Any, flags: int = POLLIN | POLLOUT):
  21. """p.register(socket, flags=POLLIN|POLLOUT)
  22. Register a 0MQ socket or native fd for I/O monitoring.
  23. register(s,0) is equivalent to unregister(s).
  24. Parameters
  25. ----------
  26. socket : zmq.Socket or native socket
  27. A zmq.Socket or any Python object having a ``fileno()``
  28. method that returns a valid file descriptor.
  29. flags : int
  30. The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
  31. If `flags=0`, socket will be unregistered.
  32. """
  33. if flags:
  34. if socket in self._map:
  35. idx = self._map[socket]
  36. self.sockets[idx] = (socket, flags)
  37. else:
  38. idx = len(self.sockets)
  39. self.sockets.append((socket, flags))
  40. self._map[socket] = idx
  41. elif socket in self._map:
  42. # uregister sockets registered with no events
  43. self.unregister(socket)
  44. else:
  45. # ignore new sockets with no events
  46. pass
  47. def modify(self, socket, flags=POLLIN | POLLOUT):
  48. """Modify the flags for an already registered 0MQ socket or native fd."""
  49. self.register(socket, flags)
  50. def unregister(self, socket: Any):
  51. """Remove a 0MQ socket or native fd for I/O monitoring.
  52. Parameters
  53. ----------
  54. socket : Socket
  55. The socket instance to stop polling.
  56. """
  57. idx = self._map.pop(socket)
  58. self.sockets.pop(idx)
  59. # shift indices after deletion
  60. for socket, flags in self.sockets[idx:]:
  61. self._map[socket] -= 1
  62. def poll(self, timeout: int | None = None) -> list[tuple[Any, int]]:
  63. """Poll the registered 0MQ or native fds for I/O.
  64. If there are currently events ready to be processed, this function will return immediately.
  65. Otherwise, this function will return as soon the first event is available or after timeout
  66. milliseconds have elapsed.
  67. Parameters
  68. ----------
  69. timeout : int
  70. The timeout in milliseconds. If None, no `timeout` (infinite). This
  71. is in milliseconds to be compatible with ``select.poll()``.
  72. Returns
  73. -------
  74. events : list
  75. The list of events that are ready to be processed.
  76. This is a list of tuples of the form ``(socket, event_mask)``, where the 0MQ Socket
  77. or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second.
  78. It is common to call ``events = dict(poller.poll())``,
  79. which turns the list of tuples into a mapping of ``socket : event_mask``.
  80. """
  81. if timeout is None or timeout < 0:
  82. timeout = -1
  83. elif isinstance(timeout, float):
  84. timeout = int(timeout)
  85. return zmq_poll(self.sockets, timeout=timeout)
  86. def select(
  87. rlist: list, wlist: list, xlist: list, timeout: float | None = None
  88. ) -> tuple[list, list, list]:
  89. """select(rlist, wlist, xlist, timeout=None) -> (rlist, wlist, xlist)
  90. Return the result of poll as a lists of sockets ready for r/w/exception.
  91. This has the same interface as Python's built-in ``select.select()`` function.
  92. Parameters
  93. ----------
  94. timeout : float, optional
  95. The timeout in seconds. If None, no timeout (infinite). This is in seconds to be
  96. compatible with ``select.select()``.
  97. rlist : list
  98. sockets/FDs to be polled for read events
  99. wlist : list
  100. sockets/FDs to be polled for write events
  101. xlist : list
  102. sockets/FDs to be polled for error events
  103. Returns
  104. -------
  105. rlist: list
  106. list of sockets or FDs that are readable
  107. wlist: list
  108. list of sockets or FDs that are writable
  109. xlist: list
  110. list of sockets or FDs that had error events (rare)
  111. """
  112. if timeout is None:
  113. timeout = -1
  114. # Convert from sec -> ms for zmq_poll.
  115. # zmq_poll accepts 3.x style timeout in ms
  116. timeout = int(timeout * 1000.0)
  117. if timeout < 0:
  118. timeout = -1
  119. sockets = []
  120. for s in set(rlist + wlist + xlist):
  121. flags = 0
  122. if s in rlist:
  123. flags |= POLLIN
  124. if s in wlist:
  125. flags |= POLLOUT
  126. if s in xlist:
  127. flags |= POLLERR
  128. sockets.append((s, flags))
  129. return_sockets = zmq_poll(sockets, timeout)
  130. rlist, wlist, xlist = [], [], []
  131. for s, flags in return_sockets:
  132. if flags & POLLIN:
  133. rlist.append(s)
  134. if flags & POLLOUT:
  135. wlist.append(s)
  136. if flags & POLLERR:
  137. xlist.append(s)
  138. return rlist, wlist, xlist
  139. # -----------------------------------------------------------------------------
  140. # Symbols to export
  141. # -----------------------------------------------------------------------------
  142. __all__ = ['Poller', 'select']