poll.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. from __future__ import annotations
  2. import gevent
  3. from gevent import select
  4. import zmq
  5. from zmq import Poller as _original_Poller
  6. class _Poller(_original_Poller):
  7. """Replacement for :class:`zmq.Poller`
  8. Ensures that the greened Poller below is used in calls to
  9. :meth:`zmq.Poller.poll`.
  10. """
  11. _gevent_bug_timeout = 1.33 # minimum poll interval, for working around gevent bug
  12. def _get_descriptors(self):
  13. """Returns three elements tuple with socket descriptors ready
  14. for gevent.select.select
  15. """
  16. rlist = []
  17. wlist = []
  18. xlist = []
  19. for socket, flags in self.sockets:
  20. if isinstance(socket, zmq.Socket):
  21. rlist.append(socket.getsockopt(zmq.FD))
  22. continue
  23. elif isinstance(socket, int):
  24. fd = socket
  25. elif hasattr(socket, 'fileno'):
  26. try:
  27. fd = int(socket.fileno())
  28. except Exception:
  29. raise ValueError('fileno() must return an valid integer fd')
  30. else:
  31. raise TypeError(
  32. 'Socket must be a 0MQ socket, an integer fd '
  33. f'or have a fileno() method: {socket!r}'
  34. )
  35. if flags & zmq.POLLIN:
  36. rlist.append(fd)
  37. if flags & zmq.POLLOUT:
  38. wlist.append(fd)
  39. if flags & zmq.POLLERR:
  40. xlist.append(fd)
  41. return (rlist, wlist, xlist)
  42. def poll(self, timeout=-1):
  43. """Overridden method to ensure that the green version of
  44. Poller is used.
  45. Behaves the same as :meth:`zmq.core.Poller.poll`
  46. """
  47. if timeout is None:
  48. timeout = -1
  49. if timeout < 0:
  50. timeout = -1
  51. rlist = None
  52. wlist = None
  53. xlist = None
  54. if timeout > 0:
  55. tout = gevent.Timeout.start_new(timeout / 1000.0)
  56. else:
  57. tout = None
  58. try:
  59. # Loop until timeout or events available
  60. rlist, wlist, xlist = self._get_descriptors()
  61. while True:
  62. events = super().poll(0)
  63. if events or timeout == 0:
  64. return events
  65. # wait for activity on sockets in a green way
  66. # set a minimum poll frequency,
  67. # because gevent < 1.0 cannot be trusted to catch edge-triggered FD events
  68. _bug_timeout = gevent.Timeout.start_new(self._gevent_bug_timeout)
  69. try:
  70. select.select(rlist, wlist, xlist)
  71. except gevent.Timeout as t:
  72. if t is not _bug_timeout:
  73. raise
  74. finally:
  75. _bug_timeout.cancel()
  76. except gevent.Timeout as t:
  77. if t is not tout:
  78. raise
  79. return []
  80. finally:
  81. if timeout > 0:
  82. tout.cancel()