socket.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. """Defines a dummy socket implementing (part of) the zmq.Socket interface."""
  2. # Copyright (c) IPython Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. from queue import Queue
  5. import zmq
  6. from traitlets import HasTraits, Instance, Int
  7. # -----------------------------------------------------------------------------
  8. # Dummy socket class
  9. # -----------------------------------------------------------------------------
  10. class DummySocket(HasTraits):
  11. """A dummy socket implementing (part of) the zmq.Socket interface."""
  12. queue = Instance(Queue, ())
  13. message_sent = Int(0) # Should be an Event
  14. context = Instance(zmq.Context)
  15. def _context_default(self):
  16. return zmq.Context()
  17. # -------------------------------------------------------------------------
  18. # Socket interface
  19. # -------------------------------------------------------------------------
  20. def recv_multipart(self, flags=0, copy=True, track=False):
  21. """Recv a multipart message."""
  22. return self.queue.get_nowait()
  23. def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
  24. """Send a multipart message."""
  25. msg_parts = list(map(zmq.Message, msg_parts))
  26. self.queue.put_nowait(msg_parts)
  27. self.message_sent += 1
  28. def flush(self, timeout=1.0):
  29. """no-op to comply with stream API"""