basedevice.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. """Classes for running 0MQ Devices in the background."""
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. import time
  5. from multiprocessing import Process
  6. from threading import Thread
  7. from typing import Any, Callable, List, Optional, Tuple
  8. import zmq
  9. from zmq import ENOTSOCK, ETERM, PUSH, QUEUE, Context, ZMQBindError, ZMQError, proxy
  10. class Device:
  11. """A 0MQ Device to be run in the background.
  12. You do not pass Socket instances to this, but rather Socket types::
  13. Device(device_type, in_socket_type, out_socket_type)
  14. For instance::
  15. dev = Device(zmq.QUEUE, zmq.DEALER, zmq.ROUTER)
  16. Similar to zmq.device, but socket types instead of sockets themselves are
  17. passed, and the sockets are created in the work thread, to avoid issues
  18. with thread safety. As a result, additional bind_{in|out} and
  19. connect_{in|out} methods and setsockopt_{in|out} allow users to specify
  20. connections for the sockets.
  21. Parameters
  22. ----------
  23. device_type : int
  24. The 0MQ Device type
  25. {in|out}_type : int
  26. zmq socket types, to be passed later to context.socket(). e.g.
  27. zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used
  28. for both in_socket and out_socket.
  29. Methods
  30. -------
  31. bind_{in_out}(iface)
  32. passthrough for ``{in|out}_socket.bind(iface)``, to be called in the thread
  33. connect_{in_out}(iface)
  34. passthrough for ``{in|out}_socket.connect(iface)``, to be called in the
  35. thread
  36. setsockopt_{in_out}(opt,value)
  37. passthrough for ``{in|out}_socket.setsockopt(opt, value)``, to be called in
  38. the thread
  39. Attributes
  40. ----------
  41. daemon : bool
  42. sets whether the thread should be run as a daemon
  43. Default is true, because if it is false, the thread will not
  44. exit unless it is killed
  45. context_factory : callable
  46. This is a class attribute.
  47. Function for creating the Context. This will be Context.instance
  48. in ThreadDevices, and Context in ProcessDevices. The only reason
  49. it is not instance() in ProcessDevices is that there may be a stale
  50. Context instance already initialized, and the forked environment
  51. should *never* try to use it.
  52. """
  53. context_factory: Callable[[], zmq.Context] = Context.instance
  54. """Callable that returns a context. Typically either Context.instance or Context,
  55. depending on whether the device should share the global instance or not.
  56. """
  57. daemon: bool
  58. device_type: int
  59. in_type: int
  60. out_type: int
  61. _in_binds: List[str]
  62. _in_connects: List[str]
  63. _in_sockopts: List[Tuple[int, Any]]
  64. _out_binds: List[str]
  65. _out_connects: List[str]
  66. _out_sockopts: List[Tuple[int, Any]]
  67. _random_addrs: List[str]
  68. _sockets: List[zmq.Socket]
  69. def __init__(
  70. self,
  71. device_type: int = QUEUE,
  72. in_type: Optional[int] = None,
  73. out_type: Optional[int] = None,
  74. ) -> None:
  75. self.device_type = device_type
  76. if in_type is None:
  77. raise TypeError("in_type must be specified")
  78. if out_type is None:
  79. raise TypeError("out_type must be specified")
  80. self.in_type = in_type
  81. self.out_type = out_type
  82. self._in_binds = []
  83. self._in_connects = []
  84. self._in_sockopts = []
  85. self._out_binds = []
  86. self._out_connects = []
  87. self._out_sockopts = []
  88. self._random_addrs = []
  89. self.daemon = True
  90. self.done = False
  91. self._sockets = []
  92. def bind_in(self, addr: str) -> None:
  93. """Enqueue ZMQ address for binding on in_socket.
  94. See zmq.Socket.bind for details.
  95. """
  96. self._in_binds.append(addr)
  97. def bind_in_to_random_port(self, addr: str, *args, **kwargs) -> int:
  98. """Enqueue a random port on the given interface for binding on
  99. in_socket.
  100. See zmq.Socket.bind_to_random_port for details.
  101. .. versionadded:: 18.0
  102. """
  103. port = self._reserve_random_port(addr, *args, **kwargs)
  104. self.bind_in(f'{addr}:{port}')
  105. return port
  106. def connect_in(self, addr: str) -> None:
  107. """Enqueue ZMQ address for connecting on in_socket.
  108. See zmq.Socket.connect for details.
  109. """
  110. self._in_connects.append(addr)
  111. def setsockopt_in(self, opt: int, value: Any) -> None:
  112. """Enqueue setsockopt(opt, value) for in_socket
  113. See zmq.Socket.setsockopt for details.
  114. """
  115. self._in_sockopts.append((opt, value))
  116. def bind_out(self, addr: str) -> None:
  117. """Enqueue ZMQ address for binding on out_socket.
  118. See zmq.Socket.bind for details.
  119. """
  120. self._out_binds.append(addr)
  121. def bind_out_to_random_port(self, addr: str, *args, **kwargs) -> int:
  122. """Enqueue a random port on the given interface for binding on
  123. out_socket.
  124. See zmq.Socket.bind_to_random_port for details.
  125. .. versionadded:: 18.0
  126. """
  127. port = self._reserve_random_port(addr, *args, **kwargs)
  128. self.bind_out(f'{addr}:{port}')
  129. return port
  130. def connect_out(self, addr: str):
  131. """Enqueue ZMQ address for connecting on out_socket.
  132. See zmq.Socket.connect for details.
  133. """
  134. self._out_connects.append(addr)
  135. def setsockopt_out(self, opt: int, value: Any):
  136. """Enqueue setsockopt(opt, value) for out_socket
  137. See zmq.Socket.setsockopt for details.
  138. """
  139. self._out_sockopts.append((opt, value))
  140. def _reserve_random_port(self, addr: str, *args, **kwargs) -> int:
  141. with Context() as ctx:
  142. with ctx.socket(PUSH) as binder:
  143. for i in range(5):
  144. port = binder.bind_to_random_port(addr, *args, **kwargs)
  145. new_addr = f'{addr}:{port}'
  146. if new_addr in self._random_addrs:
  147. continue
  148. else:
  149. break
  150. else:
  151. raise ZMQBindError("Could not reserve random port.")
  152. self._random_addrs.append(new_addr)
  153. return port
  154. def _setup_sockets(self) -> Tuple[zmq.Socket, zmq.Socket]:
  155. ctx: zmq.Context[zmq.Socket] = self.context_factory() # type: ignore
  156. self._context = ctx
  157. # create the sockets
  158. ins = ctx.socket(self.in_type)
  159. self._sockets.append(ins)
  160. if self.out_type < 0:
  161. outs = ins
  162. else:
  163. outs = ctx.socket(self.out_type)
  164. self._sockets.append(outs)
  165. # set sockopts (must be done first, in case of zmq.IDENTITY)
  166. for opt, value in self._in_sockopts:
  167. ins.setsockopt(opt, value)
  168. for opt, value in self._out_sockopts:
  169. outs.setsockopt(opt, value)
  170. for iface in self._in_binds:
  171. ins.bind(iface)
  172. for iface in self._out_binds:
  173. outs.bind(iface)
  174. for iface in self._in_connects:
  175. ins.connect(iface)
  176. for iface in self._out_connects:
  177. outs.connect(iface)
  178. return ins, outs
  179. def run_device(self) -> None:
  180. """The runner method.
  181. Do not call me directly, instead call ``self.start()``, just like a Thread.
  182. """
  183. ins, outs = self._setup_sockets()
  184. proxy(ins, outs)
  185. def _close_sockets(self):
  186. """Cleanup sockets we created"""
  187. for s in self._sockets:
  188. if s and not s.closed:
  189. s.close()
  190. def run(self) -> None:
  191. """wrap run_device in try/catch ETERM"""
  192. try:
  193. self.run_device()
  194. except ZMQError as e:
  195. if e.errno in {ETERM, ENOTSOCK}:
  196. # silence TERM, ENOTSOCK errors, because this should be a clean shutdown
  197. pass
  198. else:
  199. raise
  200. finally:
  201. self.done = True
  202. self._close_sockets()
  203. def start(self) -> None:
  204. """Start the device. Override me in subclass for other launchers."""
  205. return self.run()
  206. def join(self, timeout: Optional[float] = None) -> None:
  207. """wait for me to finish, like Thread.join.
  208. Reimplemented appropriately by subclasses."""
  209. tic = time.monotonic()
  210. toc = tic
  211. while not self.done and not (timeout is not None and toc - tic > timeout):
  212. time.sleep(0.001)
  213. toc = time.monotonic()
  214. class BackgroundDevice(Device):
  215. """Base class for launching Devices in background processes and threads."""
  216. launcher: Any = None
  217. _launch_class: Any = None
  218. def start(self) -> None:
  219. self.launcher = self._launch_class(target=self.run)
  220. self.launcher.daemon = self.daemon
  221. return self.launcher.start()
  222. def join(self, timeout: Optional[float] = None) -> None:
  223. return self.launcher.join(timeout=timeout)
  224. class ThreadDevice(BackgroundDevice):
  225. """A Device that will be run in a background Thread.
  226. See Device for details.
  227. """
  228. _launch_class = Thread
  229. class ProcessDevice(BackgroundDevice):
  230. """A Device that will be run in a background Process.
  231. See Device for details.
  232. """
  233. _launch_class = Process
  234. context_factory = Context
  235. """Callable that returns a context. Typically either Context.instance or Context,
  236. depending on whether the device should share the global instance or not.
  237. """
  238. __all__ = ['Device', 'ThreadDevice', 'ProcessDevice']