context.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. """Python bindings for 0MQ."""
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. from __future__ import annotations
  5. import atexit
  6. import os
  7. from threading import Lock
  8. from typing import Any, Callable, Generic, TypeVar, overload
  9. from warnings import warn
  10. from weakref import WeakSet
  11. import zmq
  12. from zmq._typing import TypeAlias
  13. from zmq.backend import Context as ContextBase
  14. from zmq.constants import ContextOption, Errno, SocketOption
  15. from zmq.error import ZMQError
  16. from zmq.utils.interop import cast_int_addr
  17. from .attrsettr import AttributeSetter, OptValT
  18. from .socket import Socket, SyncSocket
  19. # notice when exiting, to avoid triggering term on exit
  20. _exiting = False
  21. def _notice_atexit() -> None:
  22. global _exiting
  23. _exiting = True
  24. atexit.register(_notice_atexit)
  25. _ContextType = TypeVar('_ContextType', bound='Context')
  26. _SocketType = TypeVar('_SocketType', bound='Socket', covariant=True)
  27. class Context(ContextBase, AttributeSetter, Generic[_SocketType]):
  28. """Create a zmq Context
  29. A zmq Context creates sockets via its ``ctx.socket`` method.
  30. .. versionchanged:: 24
  31. When using a Context as a context manager (``with zmq.Context()``),
  32. or deleting a context without closing it first,
  33. ``ctx.destroy()`` is called,
  34. closing any leftover sockets,
  35. instead of `ctx.term()` which requires sockets to be closed first.
  36. This prevents hangs caused by `ctx.term()` if sockets are left open,
  37. but means that unclean destruction of contexts
  38. (with sockets left open) is not safe
  39. if sockets are managed in other threads.
  40. .. versionadded:: 25
  41. Contexts can now be shadowed by passing another Context.
  42. This helps in creating an async copy of a sync context or vice versa::
  43. ctx = zmq.Context(async_ctx)
  44. Which previously had to be::
  45. ctx = zmq.Context.shadow(async_ctx.underlying)
  46. """
  47. sockopts: dict[int, Any]
  48. _instance: Any = None
  49. _instance_lock = Lock()
  50. _instance_pid: int | None = None
  51. _shadow = False
  52. _shadow_obj = None
  53. _warn_destroy_close = False
  54. _sockets: WeakSet
  55. # mypy doesn't like a default value here
  56. _socket_class: type[_SocketType] = Socket # type: ignore
  57. @overload
  58. def __init__(self: SyncContext, io_threads: int = 1): ...
  59. @overload
  60. def __init__(self: SyncContext, io_threads: Context, /): ...
  61. @overload
  62. def __init__(self: SyncContext, *, shadow: Context | int): ...
  63. def __init__(
  64. self: SyncContext,
  65. io_threads: int | Context = 1,
  66. shadow: Context | int = 0,
  67. ) -> None:
  68. if isinstance(io_threads, Context):
  69. # allow positional shadow `zmq.Context(zmq.asyncio.Context())`
  70. # this s
  71. shadow = io_threads
  72. io_threads = 1
  73. shadow_address: int = 0
  74. if shadow:
  75. self._shadow = True
  76. # hold a reference to the shadow object
  77. self._shadow_obj = shadow
  78. if not isinstance(shadow, int):
  79. try:
  80. shadow = shadow.underlying
  81. except AttributeError:
  82. pass
  83. shadow_address = cast_int_addr(shadow)
  84. else:
  85. self._shadow = False
  86. super().__init__(io_threads=io_threads, shadow=shadow_address)
  87. self.sockopts = {}
  88. self._sockets = WeakSet()
  89. def __del__(self) -> None:
  90. """Deleting a Context without closing it destroys it and all sockets.
  91. .. versionchanged:: 24
  92. Switch from threadsafe `term()` which hangs in the event of open sockets
  93. to less safe `destroy()` which
  94. warns about any leftover sockets and closes them.
  95. """
  96. # Calling locals() here conceals issue #1167 on Windows CPython 3.5.4.
  97. locals()
  98. if not self._shadow and not _exiting and not self.closed:
  99. self._warn_destroy_close = True
  100. if warn is not None and getattr(self, "_sockets", None) is not None:
  101. # warn can be None during process teardown
  102. warn(
  103. f"Unclosed context {self}",
  104. ResourceWarning,
  105. stacklevel=2,
  106. source=self,
  107. )
  108. self.destroy()
  109. _repr_cls = "zmq.Context"
  110. def __repr__(self) -> str:
  111. cls = self.__class__
  112. # look up _repr_cls on exact class, not inherited
  113. _repr_cls = cls.__dict__.get("_repr_cls", None)
  114. if _repr_cls is None:
  115. _repr_cls = f"{cls.__module__}.{cls.__name__}"
  116. closed = ' closed' if self.closed else ''
  117. if getattr(self, "_sockets", None):
  118. n_sockets = len(self._sockets)
  119. s = 's' if n_sockets > 1 else ''
  120. sockets = f"{n_sockets} socket{s}"
  121. else:
  122. sockets = ""
  123. return f"<{_repr_cls}({sockets}) at {hex(id(self))}{closed}>"
  124. def __enter__(self: _ContextType) -> _ContextType:
  125. return self
  126. def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
  127. # warn about any leftover sockets before closing them
  128. self._warn_destroy_close = True
  129. self.destroy()
  130. def __copy__(self: _ContextType, memo: Any = None) -> _ContextType:
  131. """Copying a Context creates a shadow copy"""
  132. return self.__class__.shadow(self.underlying)
  133. __deepcopy__ = __copy__
  134. @classmethod
  135. def shadow(cls: type[_ContextType], address: int | zmq.Context) -> _ContextType:
  136. """Shadow an existing libzmq context
  137. address is a zmq.Context or an integer (or FFI pointer)
  138. representing the address of the libzmq context.
  139. .. versionadded:: 14.1
  140. .. versionadded:: 25
  141. Support for shadowing `zmq.Context` objects,
  142. instead of just integer addresses.
  143. """
  144. return cls(shadow=address)
  145. @classmethod
  146. def shadow_pyczmq(cls: type[_ContextType], ctx: Any) -> _ContextType:
  147. """Shadow an existing pyczmq context
  148. ctx is the FFI `zctx_t *` pointer
  149. .. versionadded:: 14.1
  150. """
  151. from pyczmq import zctx # type: ignore
  152. from zmq.utils.interop import cast_int_addr
  153. underlying = zctx.underlying(ctx)
  154. address = cast_int_addr(underlying)
  155. return cls(shadow=address)
  156. # static method copied from tornado IOLoop.instance
  157. @classmethod
  158. def instance(cls: type[_ContextType], io_threads: int = 1) -> _ContextType:
  159. """Returns a global Context instance.
  160. Most single-process applications have a single, global Context.
  161. Use this method instead of passing around Context instances
  162. throughout your code.
  163. A common pattern for classes that depend on Contexts is to use
  164. a default argument to enable programs with multiple Contexts
  165. but not require the argument for simpler applications::
  166. class MyClass(object):
  167. def __init__(self, context=None):
  168. self.context = context or Context.instance()
  169. .. versionchanged:: 18.1
  170. When called in a subprocess after forking,
  171. a new global instance is created instead of inheriting
  172. a Context that won't work from the parent process.
  173. """
  174. if (
  175. cls._instance is None
  176. or cls._instance_pid != os.getpid()
  177. or cls._instance.closed
  178. ):
  179. with cls._instance_lock:
  180. if (
  181. cls._instance is None
  182. or cls._instance_pid != os.getpid()
  183. or cls._instance.closed
  184. ):
  185. cls._instance = cls(io_threads=io_threads)
  186. cls._instance_pid = os.getpid()
  187. return cls._instance
  188. def term(self) -> None:
  189. """Close or terminate the context.
  190. Context termination is performed in the following steps:
  191. - Any blocking operations currently in progress on sockets open within context shall
  192. raise :class:`zmq.ContextTerminated`.
  193. With the exception of socket.close(), any further operations on sockets open within this context
  194. shall raise :class:`zmq.ContextTerminated`.
  195. - After interrupting all blocking calls, term shall block until the following conditions are satisfied:
  196. - All sockets open within context have been closed.
  197. - For each socket within context, all messages sent on the socket have either been
  198. physically transferred to a network peer,
  199. or the socket's linger period set with the zmq.LINGER socket option has expired.
  200. For further details regarding socket linger behaviour refer to libzmq documentation for ZMQ_LINGER.
  201. This can be called to close the context by hand. If this is not called,
  202. the context will automatically be closed when it is garbage collected,
  203. in which case you may see a ResourceWarning about the unclosed context.
  204. """
  205. super().term()
  206. # -------------------------------------------------------------------------
  207. # Hooks for ctxopt completion
  208. # -------------------------------------------------------------------------
  209. def __dir__(self) -> list[str]:
  210. keys = dir(self.__class__)
  211. keys.extend(ContextOption.__members__)
  212. return keys
  213. # -------------------------------------------------------------------------
  214. # Creating Sockets
  215. # -------------------------------------------------------------------------
  216. def _add_socket(self, socket: Any) -> None:
  217. """Add a weakref to a socket for Context.destroy / reference counting"""
  218. self._sockets.add(socket)
  219. def _rm_socket(self, socket: Any) -> None:
  220. """Remove a socket for Context.destroy / reference counting"""
  221. # allow _sockets to be None in case of process teardown
  222. if getattr(self, "_sockets", None) is not None:
  223. self._sockets.discard(socket)
  224. def destroy(self, linger: int | None = None) -> None:
  225. """Close all sockets associated with this context and then terminate
  226. the context.
  227. .. warning::
  228. destroy involves calling :meth:`Socket.close`, which is **NOT** threadsafe.
  229. If there are active sockets in other threads, this must not be called.
  230. Parameters
  231. ----------
  232. linger : int, optional
  233. If specified, set LINGER on sockets prior to closing them.
  234. """
  235. if self.closed:
  236. return
  237. sockets: list[_SocketType] = list(getattr(self, "_sockets", None) or [])
  238. for s in sockets:
  239. if s and not s.closed:
  240. if self._warn_destroy_close and warn is not None:
  241. # warn can be None during process teardown
  242. warn(
  243. f"Destroying context with unclosed socket {s}",
  244. ResourceWarning,
  245. stacklevel=3,
  246. source=s,
  247. )
  248. if linger is not None:
  249. s.setsockopt(SocketOption.LINGER, linger)
  250. s.close()
  251. self.term()
  252. def socket(
  253. self: _ContextType,
  254. socket_type: int,
  255. socket_class: Callable[[_ContextType, int], _SocketType] | None = None,
  256. **kwargs: Any,
  257. ) -> _SocketType:
  258. """Create a Socket associated with this Context.
  259. Parameters
  260. ----------
  261. socket_type : int
  262. The socket type, which can be any of the 0MQ socket types:
  263. REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, etc.
  264. socket_class: zmq.Socket
  265. The socket class to instantiate, if different from the default for this Context.
  266. e.g. for creating an asyncio socket attached to a default Context or vice versa.
  267. .. versionadded:: 25
  268. kwargs:
  269. will be passed to the __init__ method of the socket class.
  270. """
  271. if self.closed:
  272. raise ZMQError(Errno.ENOTSUP)
  273. if socket_class is None:
  274. socket_class = self._socket_class
  275. s: _SocketType = (
  276. socket_class( # set PYTHONTRACEMALLOC=2 to get the calling frame
  277. self, socket_type, **kwargs
  278. )
  279. )
  280. for opt, value in self.sockopts.items():
  281. try:
  282. s.setsockopt(opt, value)
  283. except ZMQError:
  284. # ignore ZMQErrors, which are likely for socket options
  285. # that do not apply to a particular socket type, e.g.
  286. # SUBSCRIBE for non-SUB sockets.
  287. pass
  288. self._add_socket(s)
  289. return s
  290. def setsockopt(self, opt: int, value: Any) -> None:
  291. """set default socket options for new sockets created by this Context
  292. .. versionadded:: 13.0
  293. """
  294. self.sockopts[opt] = value
  295. def getsockopt(self, opt: int) -> OptValT:
  296. """get default socket options for new sockets created by this Context
  297. .. versionadded:: 13.0
  298. """
  299. return self.sockopts[opt]
  300. def _set_attr_opt(self, name: str, opt: int, value: OptValT) -> None:
  301. """set default sockopts as attributes"""
  302. if name in ContextOption.__members__:
  303. return self.set(opt, value)
  304. elif name in SocketOption.__members__:
  305. self.sockopts[opt] = value
  306. else:
  307. raise AttributeError(f"No such context or socket option: {name}")
  308. def _get_attr_opt(self, name: str, opt: int) -> OptValT:
  309. """get default sockopts as attributes"""
  310. if name in ContextOption.__members__:
  311. return self.get(opt)
  312. else:
  313. if opt not in self.sockopts:
  314. raise AttributeError(name)
  315. else:
  316. return self.sockopts[opt]
  317. def __delattr__(self, key: str) -> None:
  318. """delete default sockopts as attributes"""
  319. if key in self.__dict__:
  320. self.__dict__.pop(key)
  321. return
  322. key = key.upper()
  323. try:
  324. opt = getattr(SocketOption, key)
  325. except AttributeError:
  326. raise AttributeError(f"No such socket option: {key!r}")
  327. else:
  328. if opt not in self.sockopts:
  329. raise AttributeError(key)
  330. else:
  331. del self.sockopts[opt]
  332. SyncContext: TypeAlias = Context[SyncSocket]
  333. __all__ = ['Context', 'SyncContext']