monitoredqueue.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. """pure Python monitored_queue function
  2. For use when Cython extension is unavailable (PyPy).
  3. Authors
  4. -------
  5. * MinRK
  6. """
  7. # Copyright (C) PyZMQ Developers
  8. # Distributed under the terms of the Modified BSD License.
  9. from typing import Callable
  10. import zmq
  11. from zmq.backend import monitored_queue as _backend_mq
  12. def _relay(ins, outs, sides, prefix, swap_ids):
  13. msg = ins.recv_multipart()
  14. if swap_ids:
  15. msg[:2] = msg[:2][::-1]
  16. outs.send_multipart(msg)
  17. sides.send_multipart([prefix] + msg)
  18. def _monitored_queue(
  19. in_socket, out_socket, mon_socket, in_prefix=b'in', out_prefix=b'out'
  20. ):
  21. swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
  22. poller = zmq.Poller()
  23. poller.register(in_socket, zmq.POLLIN)
  24. poller.register(out_socket, zmq.POLLIN)
  25. while True:
  26. events = dict(poller.poll())
  27. if in_socket in events:
  28. _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
  29. if out_socket in events:
  30. _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
  31. monitored_queue: Callable
  32. if _backend_mq is not None:
  33. monitored_queue = _backend_mq # type: ignore
  34. else:
  35. # backend has no monitored_queue
  36. monitored_queue = _monitored_queue
  37. __all__ = ['monitored_queue']