socket_pair.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. """Pair of ZMQ inproc sockets used for communication between threads."""
  2. from __future__ import annotations
  3. from typing import Any
  4. import zmq
  5. from tornado.ioloop import IOLoop
  6. from zmq.eventloop.zmqstream import ZMQStream
  7. class SocketPair:
  8. """Pair of ZMQ inproc sockets for one-direction communication between 2 threads.
  9. One of the threads is always the shell_channel_thread, the other may be the control
  10. thread, main thread or a subshell thread.
  11. .. versionadded:: 7
  12. """
  13. from_socket: zmq.Socket[Any]
  14. to_socket: zmq.Socket[Any]
  15. to_stream: ZMQStream | None = None
  16. def __init__(self, context: zmq.Context[Any], name: str):
  17. """Initialize the inproc socker pair."""
  18. self.from_socket = context.socket(zmq.PAIR)
  19. self.to_socket = context.socket(zmq.PAIR)
  20. address = self._address(name)
  21. self.from_socket.bind(address)
  22. self.to_socket.connect(address) # Or do I need to do this in another thread?
  23. def close(self):
  24. """Close the inproc socker pair."""
  25. self.from_socket.close()
  26. if self.to_stream is not None:
  27. self.to_stream.close()
  28. self.to_socket.close()
  29. def on_recv(self, io_loop: IOLoop, on_recv_callback, copy: bool = False):
  30. """Set the callback used when a message is received on the to stream."""
  31. # io_loop is that of the 'to' thread.
  32. if self.to_stream is None:
  33. self.to_stream = ZMQStream(self.to_socket, io_loop)
  34. self.to_stream.on_recv(on_recv_callback, copy=copy)
  35. def _address(self, name) -> str:
  36. """Return the address used for this inproc socket pair."""
  37. return f"inproc://subshell{name}"