_zmq.py 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048
  1. # cython: language_level = 3str
  2. # cython: freethreading_compatible = True
  3. """Cython backend for pyzmq"""
  4. # Copyright (C) PyZMQ Developers
  5. # Distributed under the terms of the Modified BSD License.
  6. from __future__ import annotations
  7. try:
  8. import cython
  9. if not cython.compiled:
  10. raise ImportError()
  11. except ImportError:
  12. from pathlib import Path
  13. zmq_root = Path(__file__).parents[3]
  14. msg = f"""
  15. Attempting to import zmq Cython backend, which has not been compiled.
  16. This probably means you are importing zmq from its source tree.
  17. if this is what you want, make sure to do an in-place build first:
  18. pip install -e '{zmq_root}'
  19. If it is not, then '{zmq_root}' is probably on your sys.path,
  20. when it shouldn't be. Is that your current working directory?
  21. If neither of those is true and this file is actually installed,
  22. something seems to have gone wrong with the install!
  23. Please report at https://github.com/zeromq/pyzmq/issues
  24. """
  25. raise ImportError(msg)
  26. import warnings
  27. from threading import Event
  28. from time import monotonic
  29. from weakref import ref
  30. import cython as C
  31. from cython import (
  32. NULL,
  33. Py_ssize_t,
  34. address,
  35. bint,
  36. cast,
  37. cclass,
  38. cfunc,
  39. char,
  40. declare,
  41. inline,
  42. nogil,
  43. p_char,
  44. p_void,
  45. pointer,
  46. size_t,
  47. sizeof,
  48. )
  49. from cython.cimports.cpython.buffer import (
  50. Py_buffer,
  51. PyBUF_ANY_CONTIGUOUS,
  52. PyBUF_WRITABLE,
  53. PyBuffer_Release,
  54. PyObject_GetBuffer,
  55. )
  56. from cython.cimports.cpython.bytes import (
  57. PyBytes_AsString,
  58. PyBytes_FromStringAndSize,
  59. PyBytes_Size,
  60. )
  61. from cython.cimports.cpython.exc import PyErr_CheckSignals
  62. from cython.cimports.libc.errno import EAGAIN, EINTR, ENAMETOOLONG, ENOENT, ENOTSOCK
  63. from cython.cimports.libc.stdint import uint32_t
  64. from cython.cimports.libc.stdio import fprintf
  65. from cython.cimports.libc.stdio import stderr as cstderr
  66. from cython.cimports.libc.stdlib import free, malloc
  67. from cython.cimports.libc.string import memcpy
  68. from cython.cimports.zmq.backend.cython import libzmq
  69. from cython.cimports.zmq.backend.cython._externs import (
  70. get_ipc_path_max_len,
  71. getpid,
  72. mutex_allocate,
  73. mutex_lock,
  74. mutex_t,
  75. mutex_unlock,
  76. )
  77. from cython.cimports.zmq.backend.cython.libzmq import (
  78. ZMQ_ENOTSOCK,
  79. ZMQ_ETERM,
  80. ZMQ_EVENT_ALL,
  81. ZMQ_FD,
  82. ZMQ_IDENTITY,
  83. ZMQ_IO_THREADS,
  84. ZMQ_LINGER,
  85. ZMQ_POLLIN,
  86. ZMQ_POLLOUT,
  87. ZMQ_RCVMORE,
  88. ZMQ_ROUTER,
  89. ZMQ_SNDMORE,
  90. ZMQ_THREAD_SAFE,
  91. ZMQ_TYPE,
  92. _zmq_version,
  93. fd_t,
  94. int64_t,
  95. zmq_bind,
  96. zmq_close,
  97. zmq_connect,
  98. zmq_ctx_destroy,
  99. zmq_ctx_get,
  100. zmq_ctx_new,
  101. zmq_ctx_set,
  102. zmq_curve_keypair,
  103. zmq_curve_public,
  104. zmq_disconnect,
  105. zmq_free_fn,
  106. zmq_getsockopt,
  107. zmq_has,
  108. zmq_join,
  109. zmq_leave,
  110. zmq_msg_close,
  111. zmq_msg_copy,
  112. zmq_msg_data,
  113. zmq_msg_get,
  114. zmq_msg_gets,
  115. zmq_msg_group,
  116. zmq_msg_init,
  117. zmq_msg_init_data,
  118. zmq_msg_init_size,
  119. zmq_msg_recv,
  120. zmq_msg_routing_id,
  121. zmq_msg_send,
  122. zmq_msg_set,
  123. zmq_msg_set_group,
  124. zmq_msg_set_routing_id,
  125. zmq_msg_size,
  126. zmq_msg_t,
  127. zmq_poller_add,
  128. zmq_poller_destroy,
  129. zmq_poller_fd,
  130. zmq_poller_new,
  131. zmq_pollitem_t,
  132. zmq_proxy,
  133. zmq_proxy_steerable,
  134. zmq_recv,
  135. zmq_setsockopt,
  136. zmq_socket,
  137. zmq_socket_monitor,
  138. zmq_strerror,
  139. zmq_unbind,
  140. )
  141. from cython.cimports.zmq.backend.cython.libzmq import zmq_errno as _zmq_errno
  142. from cython.cimports.zmq.backend.cython.libzmq import zmq_poll as zmq_poll_c
  143. import zmq
  144. from zmq.constants import SocketOption, _OptType
  145. from zmq.error import (
  146. Again,
  147. ContextTerminated,
  148. InterruptedSystemCall,
  149. ZMQError,
  150. _check_version,
  151. )
  152. IPC_PATH_MAX_LEN: int = get_ipc_path_max_len()
  153. PYZMQ_DRAFT_API: bool = bool(libzmq.PYZMQ_DRAFT_API)
  154. @cfunc
  155. @inline
  156. @C.exceptval(-1)
  157. def _check_rc(rc: C.int, error_without_errno: bint = False) -> C.int:
  158. """internal utility for checking zmq return condition
  159. and raising the appropriate Exception class
  160. """
  161. errno: C.int = _zmq_errno()
  162. PyErr_CheckSignals()
  163. if errno == 0 and not error_without_errno:
  164. return 0
  165. if rc == -1: # if rc < -1, it's a bug in libzmq. Should we warn?
  166. if errno == EINTR:
  167. raise InterruptedSystemCall(errno)
  168. elif errno == EAGAIN:
  169. raise Again(errno)
  170. elif errno == ZMQ_ETERM:
  171. raise ContextTerminated(errno)
  172. else:
  173. raise ZMQError(errno)
  174. return 0
  175. # message Frame class
  176. _zhint = C.struct(
  177. sock=p_void,
  178. mutex=pointer(mutex_t),
  179. id=size_t,
  180. )
  181. @cfunc
  182. @nogil
  183. def free_python_msg(data: p_void, vhint: p_void) -> C.int:
  184. """A pure-C function for DECREF'ing Python-owned message data.
  185. Sends a message on a PUSH socket
  186. The hint is a `zhint` struct with two values:
  187. sock (void *): pointer to the Garbage Collector's PUSH socket
  188. id (size_t): the id to be used to construct a zmq_msg_t that should be sent on a PUSH socket,
  189. signaling the Garbage Collector to remove its reference to the object.
  190. When the Garbage Collector's PULL socket receives the message,
  191. it deletes its reference to the object,
  192. allowing Python to free the memory.
  193. """
  194. msg = declare(zmq_msg_t)
  195. msg_ptr: pointer(zmq_msg_t) = address(msg)
  196. hint: pointer(_zhint) = cast(pointer(_zhint), vhint)
  197. rc: C.int
  198. if hint != NULL:
  199. zmq_msg_init_size(msg_ptr, sizeof(size_t))
  200. memcpy(zmq_msg_data(msg_ptr), address(hint.id), sizeof(size_t))
  201. rc = mutex_lock(hint.mutex)
  202. if rc != 0:
  203. fprintf(cstderr, "pyzmq-gc mutex lock failed rc=%d\n", rc)
  204. rc = zmq_msg_send(msg_ptr, hint.sock, 0)
  205. if rc < 0:
  206. # gc socket could have been closed, e.g. during process teardown.
  207. # If so, ignore the failure because there's nothing to do.
  208. if _zmq_errno() != ZMQ_ENOTSOCK:
  209. fprintf(
  210. cstderr, "pyzmq-gc send failed: %s\n", zmq_strerror(_zmq_errno())
  211. )
  212. rc = mutex_unlock(hint.mutex)
  213. if rc != 0:
  214. fprintf(cstderr, "pyzmq-gc mutex unlock failed rc=%d\n", rc)
  215. zmq_msg_close(msg_ptr)
  216. free(hint)
  217. return 0
  218. @cfunc
  219. @inline
  220. def _copy_zmq_msg_bytes(zmq_msg: pointer(zmq_msg_t)) -> bytes:
  221. """Copy the data from a zmq_msg_t"""
  222. data_c: p_char = NULL
  223. data_len_c: Py_ssize_t
  224. data_c = cast(p_char, zmq_msg_data(zmq_msg))
  225. data_len_c = zmq_msg_size(zmq_msg)
  226. return PyBytes_FromStringAndSize(data_c, data_len_c)
  227. @cfunc
  228. @inline
  229. def _asbuffer(obj, data_c: pointer(p_void), writable: bint = False) -> size_t:
  230. """Get a C buffer from a memoryview"""
  231. pybuf = declare(Py_buffer)
  232. flags: C.int = PyBUF_ANY_CONTIGUOUS
  233. if writable:
  234. flags |= PyBUF_WRITABLE
  235. rc: C.int = PyObject_GetBuffer(obj, address(pybuf), flags)
  236. if rc < 0:
  237. raise ValueError("Couldn't create buffer")
  238. data_c[0] = pybuf.buf
  239. data_size: size_t = pybuf.len
  240. PyBuffer_Release(address(pybuf))
  241. return data_size
  242. _gc = None
  243. @cclass
  244. class Frame:
  245. def __init__(
  246. self, data=None, track=False, copy=None, copy_threshold=None, **kwargs
  247. ):
  248. rc: C.int
  249. data_c: p_char = NULL
  250. data_len_c: Py_ssize_t = 0
  251. hint: pointer(_zhint)
  252. if copy_threshold is None:
  253. copy_threshold = zmq.COPY_THRESHOLD
  254. c_copy_threshold: C.size_t = 0
  255. if copy_threshold is not None:
  256. c_copy_threshold = copy_threshold
  257. zmq_msg_ptr: pointer(zmq_msg_t) = address(self.zmq_msg)
  258. # init more as False
  259. self.more = False
  260. # Save the data object in case the user wants the the data as a str.
  261. self._data = data
  262. self._failed_init = True # bool switch for dealloc
  263. self._buffer = None # buffer view of data
  264. self._bytes = None # bytes copy of data
  265. self.tracker_event = None
  266. self.tracker = None
  267. # self.tracker should start finished
  268. # except in the case where we are sharing memory with libzmq
  269. if track:
  270. self.tracker = zmq._FINISHED_TRACKER
  271. if isinstance(data, str):
  272. raise TypeError("Str objects not allowed. Only: bytes, buffer interfaces.")
  273. if data is None:
  274. rc = zmq_msg_init(zmq_msg_ptr)
  275. _check_rc(rc)
  276. self._failed_init = False
  277. return
  278. data_len_c = _asbuffer(data, cast(pointer(p_void), address(data_c)))
  279. # copy unspecified, apply copy_threshold
  280. c_copy: bint = True
  281. if copy is None:
  282. if c_copy_threshold and data_len_c < c_copy_threshold:
  283. c_copy = True
  284. else:
  285. c_copy = False
  286. else:
  287. c_copy = copy
  288. if c_copy:
  289. # copy message data instead of sharing memory
  290. rc = zmq_msg_init_size(zmq_msg_ptr, data_len_c)
  291. _check_rc(rc)
  292. memcpy(zmq_msg_data(zmq_msg_ptr), data_c, data_len_c)
  293. self._failed_init = False
  294. return
  295. # Getting here means that we are doing a true zero-copy Frame,
  296. # where libzmq and Python are sharing memory.
  297. # Hook up garbage collection with MessageTracker and zmq_free_fn
  298. # Event and MessageTracker for monitoring when zmq is done with data:
  299. if track:
  300. evt = Event()
  301. self.tracker_event = evt
  302. self.tracker = zmq.MessageTracker(evt)
  303. # create the hint for zmq_free_fn
  304. # two pointers: the gc context and a message to be sent to the gc PULL socket
  305. # allows libzmq to signal to Python when it is done with Python-owned memory.
  306. global _gc
  307. if _gc is None:
  308. from zmq.utils.garbage import gc as _gc
  309. hint: pointer(_zhint) = cast(pointer(_zhint), malloc(sizeof(_zhint)))
  310. hint.id = _gc.store(data, self.tracker_event)
  311. if not _gc._push_mutex:
  312. hint.mutex = mutex_allocate()
  313. _gc._push_mutex = cast(size_t, hint.mutex)
  314. else:
  315. hint.mutex = cast(pointer(mutex_t), cast(size_t, _gc._push_mutex))
  316. hint.sock = cast(p_void, cast(size_t, _gc._push_socket.underlying))
  317. rc = zmq_msg_init_data(
  318. zmq_msg_ptr,
  319. cast(p_void, data_c),
  320. data_len_c,
  321. cast(pointer(zmq_free_fn), free_python_msg),
  322. cast(p_void, hint),
  323. )
  324. if rc != 0:
  325. free(hint)
  326. _check_rc(rc)
  327. self._failed_init = False
  328. def __dealloc__(self):
  329. if self._failed_init:
  330. return
  331. # decrease the 0MQ ref-count of zmq_msg
  332. with nogil:
  333. rc: C.int = zmq_msg_close(address(self.zmq_msg))
  334. _check_rc(rc)
  335. def __copy__(self):
  336. return self.fast_copy()
  337. def fast_copy(self) -> Frame:
  338. new_msg: Frame = Frame()
  339. # This does not copy the contents, but just increases the ref-count
  340. # of the zmq_msg by one.
  341. zmq_msg_copy(address(new_msg.zmq_msg), address(self.zmq_msg))
  342. # Copy the ref to data so the copy won't create a copy when str is
  343. # called.
  344. if self._data is not None:
  345. new_msg._data = self._data
  346. if self._buffer is not None:
  347. new_msg._buffer = self._buffer
  348. if self._bytes is not None:
  349. new_msg._bytes = self._bytes
  350. # Frame copies share the tracker and tracker_event
  351. new_msg.tracker_event = self.tracker_event
  352. new_msg.tracker = self.tracker
  353. return new_msg
  354. # buffer interface code adapted from petsc4py by Lisandro Dalcin, a BSD project
  355. def __getbuffer__(self, buffer: pointer(Py_buffer), flags: C.int): # noqa: F821
  356. # new-style (memoryview) buffer interface
  357. buffer.buf = zmq_msg_data(address(self.zmq_msg))
  358. buffer.len = zmq_msg_size(address(self.zmq_msg))
  359. buffer.obj = self
  360. buffer.readonly = 0
  361. buffer.format = "B"
  362. buffer.ndim = 1
  363. buffer.shape = address(buffer.len)
  364. buffer.strides = NULL
  365. buffer.suboffsets = NULL
  366. buffer.itemsize = 1
  367. buffer.internal = NULL
  368. def __len__(self) -> size_t:
  369. """Return the length of the message in bytes."""
  370. sz: size_t = zmq_msg_size(address(self.zmq_msg))
  371. return sz
  372. @property
  373. def buffer(self):
  374. """A memoryview of the message contents."""
  375. _buffer = self._buffer and self._buffer()
  376. if _buffer is not None:
  377. return _buffer
  378. _buffer = memoryview(self)
  379. self._buffer = ref(_buffer)
  380. return _buffer
  381. @property
  382. def bytes(self):
  383. """The message content as a Python bytes object.
  384. The first time this property is accessed, a copy of the message
  385. contents is made. From then on that same copy of the message is
  386. returned.
  387. """
  388. if self._bytes is None:
  389. self._bytes = _copy_zmq_msg_bytes(address(self.zmq_msg))
  390. return self._bytes
  391. def get(self, option):
  392. """
  393. Get a Frame option or property.
  394. See the 0MQ API documentation for zmq_msg_get and zmq_msg_gets
  395. for details on specific options.
  396. .. versionadded:: libzmq-3.2
  397. .. versionadded:: 13.0
  398. .. versionchanged:: 14.3
  399. add support for zmq_msg_gets (requires libzmq-4.1)
  400. All message properties are strings.
  401. .. versionchanged:: 17.0
  402. Added support for `routing_id` and `group`.
  403. Only available if draft API is enabled
  404. with libzmq >= 4.2.
  405. """
  406. rc: C.int = 0
  407. property_c: p_char = NULL
  408. # zmq_msg_get
  409. if isinstance(option, int):
  410. rc = zmq_msg_get(address(self.zmq_msg), option)
  411. _check_rc(rc)
  412. return rc
  413. if option == 'routing_id':
  414. routing_id: uint32_t = zmq_msg_routing_id(address(self.zmq_msg))
  415. if routing_id == 0:
  416. _check_rc(-1)
  417. return routing_id
  418. elif option == 'group':
  419. buf = zmq_msg_group(address(self.zmq_msg))
  420. if buf == NULL:
  421. _check_rc(-1)
  422. return buf.decode('utf8')
  423. # zmq_msg_gets
  424. _check_version((4, 1), "get string properties")
  425. if isinstance(option, str):
  426. option = option.encode('utf8')
  427. if not isinstance(option, bytes):
  428. raise TypeError(f"expected str, got: {option!r}")
  429. property_c = option
  430. result: p_char = cast(p_char, zmq_msg_gets(address(self.zmq_msg), property_c))
  431. if result == NULL:
  432. _check_rc(-1)
  433. return result.decode('utf8')
  434. def set(self, option, value):
  435. """Set a Frame option.
  436. See the 0MQ API documentation for zmq_msg_set
  437. for details on specific options.
  438. .. versionadded:: libzmq-3.2
  439. .. versionadded:: 13.0
  440. .. versionchanged:: 17.0
  441. Added support for `routing_id` and `group`.
  442. Only available if draft API is enabled
  443. with libzmq >= 4.2.
  444. """
  445. rc: C.int
  446. if option == 'routing_id':
  447. routing_id: uint32_t = value
  448. rc = zmq_msg_set_routing_id(address(self.zmq_msg), routing_id)
  449. _check_rc(rc)
  450. return
  451. elif option == 'group':
  452. if isinstance(value, str):
  453. value = value.encode('utf8')
  454. rc = zmq_msg_set_group(address(self.zmq_msg), value)
  455. _check_rc(rc)
  456. return
  457. rc = zmq_msg_set(address(self.zmq_msg), option, value)
  458. _check_rc(rc)
  459. @cclass
  460. class Context:
  461. """
  462. Manage the lifecycle of a 0MQ context.
  463. Parameters
  464. ----------
  465. io_threads : int
  466. The number of IO threads.
  467. """
  468. def __init__(self, io_threads: C.int = 1, shadow: size_t = 0):
  469. self.handle = NULL
  470. self._pid = 0
  471. self._shadow = False
  472. if shadow:
  473. self.handle = cast(p_void, shadow)
  474. self._shadow = True
  475. else:
  476. self._shadow = False
  477. self.handle = zmq_ctx_new()
  478. if self.handle == NULL:
  479. raise ZMQError()
  480. rc: C.int = 0
  481. if not self._shadow:
  482. rc = zmq_ctx_set(self.handle, ZMQ_IO_THREADS, io_threads)
  483. _check_rc(rc)
  484. self.closed = False
  485. self._pid = getpid()
  486. @property
  487. def underlying(self):
  488. """The address of the underlying libzmq context"""
  489. return cast(size_t, self.handle)
  490. @cfunc
  491. @inline
  492. def _term(self) -> C.int:
  493. rc: C.int = 0
  494. if self.handle != NULL and not self.closed and getpid() == self._pid:
  495. with nogil:
  496. rc = zmq_ctx_destroy(self.handle)
  497. self.handle = NULL
  498. return rc
  499. def term(self):
  500. """
  501. Close or terminate the context.
  502. This can be called to close the context by hand. If this is not called,
  503. the context will automatically be closed when it is garbage collected.
  504. """
  505. rc: C.int = self._term()
  506. try:
  507. _check_rc(rc)
  508. except InterruptedSystemCall:
  509. # ignore interrupted term
  510. # see PEP 475 notes about close & EINTR for why
  511. pass
  512. self.closed = True
  513. def set(self, option: C.int, optval):
  514. """
  515. Set a context option.
  516. See the 0MQ API documentation for zmq_ctx_set
  517. for details on specific options.
  518. .. versionadded:: libzmq-3.2
  519. .. versionadded:: 13.0
  520. Parameters
  521. ----------
  522. option : int
  523. The option to set. Available values will depend on your
  524. version of libzmq. Examples include::
  525. zmq.IO_THREADS, zmq.MAX_SOCKETS
  526. optval : int
  527. The value of the option to set.
  528. """
  529. optval_int_c: C.int
  530. rc: C.int
  531. if self.closed:
  532. raise RuntimeError("Context has been destroyed")
  533. if not isinstance(optval, int):
  534. raise TypeError(f'expected int, got: {optval!r}')
  535. optval_int_c = optval
  536. rc = zmq_ctx_set(self.handle, option, optval_int_c)
  537. _check_rc(rc)
  538. def get(self, option: C.int):
  539. """
  540. Get the value of a context option.
  541. See the 0MQ API documentation for zmq_ctx_get
  542. for details on specific options.
  543. .. versionadded:: libzmq-3.2
  544. .. versionadded:: 13.0
  545. Parameters
  546. ----------
  547. option : int
  548. The option to get. Available values will depend on your
  549. version of libzmq. Examples include::
  550. zmq.IO_THREADS, zmq.MAX_SOCKETS
  551. Returns
  552. -------
  553. optval : int
  554. The value of the option as an integer.
  555. """
  556. rc: C.int
  557. if self.closed:
  558. raise RuntimeError("Context has been destroyed")
  559. rc = zmq_ctx_get(self.handle, option)
  560. _check_rc(rc, error_without_errno=False)
  561. return rc
  562. @cfunc
  563. @inline
  564. def _c_addr(addr) -> bytes:
  565. """cast an address input to bytes
  566. Expects a str, but accepts bytes
  567. and raises informative TypeError otherwise.
  568. """
  569. if isinstance(addr, str):
  570. addr = addr.encode("utf-8")
  571. try:
  572. c_addr: bytes = addr
  573. except TypeError:
  574. raise TypeError(f"Expected addr to be str, got addr={addr!r}")
  575. return c_addr
  576. @cclass
  577. class Socket:
  578. """
  579. A 0MQ socket.
  580. These objects will generally be constructed via the socket() method of a Context object.
  581. Note: 0MQ Sockets are *not* threadsafe. **DO NOT** share them across threads.
  582. Parameters
  583. ----------
  584. context : Context
  585. The 0MQ Context this Socket belongs to.
  586. socket_type : int
  587. The socket type, which can be any of the 0MQ socket types:
  588. REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, XPUB, XSUB.
  589. See Also
  590. --------
  591. .Context.socket : method for creating a socket bound to a Context.
  592. """
  593. def __init__(
  594. self,
  595. context=None,
  596. socket_type: C.int = -1,
  597. shadow: size_t = 0,
  598. copy_threshold=None,
  599. ):
  600. # pre-init
  601. self.handle = NULL
  602. self._draft_poller = NULL
  603. self._pid = 0
  604. self._shadow = False
  605. self.context = None
  606. if copy_threshold is None:
  607. copy_threshold = zmq.COPY_THRESHOLD
  608. self.copy_threshold = copy_threshold
  609. self.handle = NULL
  610. self.context = context
  611. if shadow:
  612. self._shadow = True
  613. self.handle = cast(p_void, shadow)
  614. else:
  615. if context is None:
  616. raise TypeError("context must be specified")
  617. if socket_type < 0:
  618. raise TypeError("socket_type must be specified")
  619. self._shadow = False
  620. self.handle = zmq_socket(self.context.handle, socket_type)
  621. if self.handle == NULL:
  622. raise ZMQError()
  623. self._closed = False
  624. self._pid = getpid()
  625. @property
  626. def underlying(self):
  627. """The address of the underlying libzmq socket"""
  628. return cast(size_t, self.handle)
  629. @property
  630. def closed(self):
  631. """Whether the socket is closed"""
  632. return _check_closed_deep(self)
  633. def close(self, linger: int | None = None):
  634. """
  635. Close the socket.
  636. If linger is specified, LINGER sockopt will be set prior to closing.
  637. This can be called to close the socket by hand. If this is not
  638. called, the socket will automatically be closed when it is
  639. garbage collected.
  640. """
  641. rc: C.int = 0
  642. linger_c: C.int
  643. setlinger: bint = False
  644. if linger is not None:
  645. linger_c = linger
  646. setlinger = True
  647. if self.handle != NULL and not self._closed and getpid() == self._pid:
  648. if setlinger:
  649. zmq_setsockopt(self.handle, ZMQ_LINGER, address(linger_c), sizeof(int))
  650. # teardown draft poller
  651. if self._draft_poller != NULL:
  652. zmq_poller_destroy(address(self._draft_poller))
  653. self._draft_poller = NULL
  654. rc = zmq_close(self.handle)
  655. if rc < 0 and _zmq_errno() != ENOTSOCK:
  656. # ignore ENOTSOCK (closed by Context)
  657. _check_rc(rc)
  658. self._closed = True
  659. self.handle = NULL
  660. def set(self, option: C.int, optval):
  661. """
  662. Set socket options.
  663. See the 0MQ API documentation for details on specific options.
  664. Parameters
  665. ----------
  666. option : int
  667. The option to set. Available values will depend on your
  668. version of libzmq. Examples include::
  669. zmq.SUBSCRIBE, UNSUBSCRIBE, IDENTITY, HWM, LINGER, FD
  670. optval : int or bytes
  671. The value of the option to set.
  672. Notes
  673. -----
  674. .. warning::
  675. All options other than zmq.SUBSCRIBE, zmq.UNSUBSCRIBE and
  676. zmq.LINGER only take effect for subsequent socket bind/connects.
  677. """
  678. optval_int64_c: int64_t
  679. optval_int_c: C.int
  680. optval_c: p_char
  681. sz: Py_ssize_t
  682. _check_closed(self)
  683. if isinstance(optval, str):
  684. raise TypeError("unicode not allowed, use setsockopt_string")
  685. try:
  686. sopt = SocketOption(option)
  687. except ValueError:
  688. # unrecognized option,
  689. # assume from the future,
  690. # let EINVAL raise
  691. opt_type = _OptType.int
  692. else:
  693. opt_type = sopt._opt_type
  694. if opt_type == _OptType.bytes:
  695. if not isinstance(optval, bytes):
  696. raise TypeError(f'expected bytes, got: {optval!r}')
  697. optval_c = PyBytes_AsString(optval)
  698. sz = PyBytes_Size(optval)
  699. _setsockopt(self.handle, option, optval_c, sz)
  700. elif opt_type == _OptType.int64:
  701. if not isinstance(optval, int):
  702. raise TypeError(f'expected int, got: {optval!r}')
  703. optval_int64_c = optval
  704. _setsockopt(self.handle, option, address(optval_int64_c), sizeof(int64_t))
  705. else:
  706. # default is to assume int, which is what most new sockopts will be
  707. # this lets pyzmq work with newer libzmq which may add constants
  708. # pyzmq has not yet added, rather than artificially raising. Invalid
  709. # sockopts will still raise just the same, but it will be libzmq doing
  710. # the raising.
  711. if not isinstance(optval, int):
  712. raise TypeError(f'expected int, got: {optval!r}')
  713. optval_int_c = optval
  714. _setsockopt(self.handle, option, address(optval_int_c), sizeof(int))
  715. def get(self, option: C.int):
  716. """
  717. Get the value of a socket option.
  718. See the 0MQ API documentation for details on specific options.
  719. .. versionchanged:: 27
  720. Added experimental support for ZMQ_FD for draft sockets via `zmq_poller_fd`.
  721. Requires libzmq >=4.3.2 built with draft support.
  722. Parameters
  723. ----------
  724. option : int
  725. The option to get. Available values will depend on your
  726. version of libzmq. Examples include::
  727. zmq.IDENTITY, HWM, LINGER, FD, EVENTS
  728. Returns
  729. -------
  730. optval : int or bytes
  731. The value of the option as a bytestring or int.
  732. """
  733. optval_int64_c = declare(int64_t)
  734. optval_int_c = declare(C.int)
  735. optval_fd_c = declare(fd_t)
  736. identity_str_c = declare(char[255])
  737. sz: size_t
  738. _check_closed(self)
  739. try:
  740. sopt = SocketOption(option)
  741. except ValueError:
  742. # unrecognized option,
  743. # assume from the future,
  744. # let EINVAL raise
  745. opt_type = _OptType.int
  746. else:
  747. opt_type = sopt._opt_type
  748. if opt_type == _OptType.bytes:
  749. sz = 255
  750. _getsockopt(self.handle, option, cast(p_void, identity_str_c), address(sz))
  751. # strip null-terminated strings *except* identity
  752. if (
  753. option != ZMQ_IDENTITY
  754. and sz > 0
  755. and (cast(p_char, identity_str_c))[sz - 1] == b'\0'
  756. ):
  757. sz -= 1
  758. result = PyBytes_FromStringAndSize(cast(p_char, identity_str_c), sz)
  759. elif opt_type == _OptType.int64:
  760. sz = sizeof(int64_t)
  761. _getsockopt(
  762. self.handle, option, cast(p_void, address(optval_int64_c)), address(sz)
  763. )
  764. result = optval_int64_c
  765. elif option == ZMQ_FD and self._draft_poller != NULL:
  766. # draft sockets use FD of a draft zmq_poller as proxy
  767. rc = zmq_poller_fd(self._draft_poller, address(optval_fd_c))
  768. _check_rc(rc)
  769. result = optval_fd_c
  770. elif opt_type == _OptType.fd:
  771. sz = sizeof(fd_t)
  772. try:
  773. _getsockopt(
  774. self.handle, option, cast(p_void, address(optval_fd_c)), address(sz)
  775. )
  776. except ZMQError as e:
  777. # threadsafe sockets don't support ZMQ_FD (yet!)
  778. # fallback on zmq_poller_fd as proxy with the same behavior
  779. # until libzmq fixes this.
  780. # if upstream fixes it, this branch will never be taken
  781. if (
  782. option == ZMQ_FD
  783. and e.errno == zmq.Errno.EINVAL
  784. and self.get(ZMQ_THREAD_SAFE)
  785. ):
  786. _check_version(
  787. (4, 3, 2), "draft socket FD support via zmq_poller_fd"
  788. )
  789. if not zmq.DRAFT_API:
  790. raise RuntimeError(
  791. "libzmq and pyzmq must be built with draft support"
  792. )
  793. warnings.warn(zmq.error.DraftFDWarning(), stacklevel=2)
  794. # create a poller and retrieve its fd
  795. self._draft_poller = zmq_poller_new()
  796. if self._draft_poller == NULL:
  797. # failed (why?), raise original error
  798. raise
  799. # register self with poller
  800. rc = zmq_poller_add(
  801. self._draft_poller, self.handle, NULL, ZMQ_POLLIN | ZMQ_POLLOUT
  802. )
  803. _check_rc(rc)
  804. # use poller fd as proxy for ours
  805. rc = zmq_poller_fd(self._draft_poller, address(optval_fd_c))
  806. _check_rc(rc)
  807. else:
  808. raise
  809. result = optval_fd_c
  810. else:
  811. # default is to assume int, which is what most new sockopts will be
  812. # this lets pyzmq work with newer libzmq which may add constants
  813. # pyzmq has not yet added, rather than artificially raising. Invalid
  814. # sockopts will still raise just the same, but it will be libzmq doing
  815. # the raising.
  816. sz = sizeof(int)
  817. _getsockopt(
  818. self.handle, option, cast(p_void, address(optval_int_c)), address(sz)
  819. )
  820. result = optval_int_c
  821. return result
  822. def bind(self, addr: str | bytes):
  823. """
  824. Bind the socket to an address.
  825. This causes the socket to listen on a network port. Sockets on the
  826. other side of this connection will use ``Socket.connect(addr)`` to
  827. connect to this socket.
  828. Parameters
  829. ----------
  830. addr : str
  831. The address string. This has the form 'protocol://interface:port',
  832. for example 'tcp://127.0.0.1:5555'. Protocols supported include
  833. tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
  834. encoded to utf-8 first.
  835. """
  836. _addr_bytes: bytes = _c_addr(addr)
  837. c_addr: p_char = _addr_bytes
  838. _check_closed(self)
  839. rc: C.int = zmq_bind(self.handle, c_addr)
  840. if rc != 0:
  841. _errno: C.int = _zmq_errno()
  842. _ipc_max: C.int = get_ipc_path_max_len()
  843. if _ipc_max and _errno == ENAMETOOLONG:
  844. path = addr.split('://', 1)[-1]
  845. msg = (
  846. f'ipc path "{path}" is longer than {_ipc_max} '
  847. 'characters (sizeof(sockaddr_un.sun_path)). '
  848. 'zmq.IPC_PATH_MAX_LEN constant can be used '
  849. 'to check addr length (if it is defined).'
  850. )
  851. raise ZMQError(msg=msg)
  852. elif _errno == ENOENT:
  853. path = addr.split('://', 1)[-1]
  854. msg = f'No such file or directory for ipc path "{path}".'
  855. raise ZMQError(msg=msg)
  856. while True:
  857. try:
  858. _check_rc(rc)
  859. except InterruptedSystemCall:
  860. rc = zmq_bind(self.handle, c_addr)
  861. continue
  862. else:
  863. break
  864. def connect(self, addr: str | bytes) -> None:
  865. """
  866. Connect to a remote 0MQ socket.
  867. Parameters
  868. ----------
  869. addr : str
  870. The address string. This has the form 'protocol://interface:port',
  871. for example 'tcp://127.0.0.1:5555'. Protocols supported are
  872. tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
  873. encoded to utf-8 first.
  874. """
  875. rc: C.int
  876. _addr_bytes: bytes = _c_addr(addr)
  877. c_addr: p_char = _addr_bytes
  878. _check_closed(self)
  879. while True:
  880. try:
  881. rc = zmq_connect(self.handle, c_addr)
  882. _check_rc(rc)
  883. except InterruptedSystemCall:
  884. # retry syscall
  885. continue
  886. else:
  887. break
  888. def unbind(self, addr: str | bytes):
  889. """
  890. Unbind from an address (undoes a call to bind).
  891. .. versionadded:: libzmq-3.2
  892. .. versionadded:: 13.0
  893. Parameters
  894. ----------
  895. addr : str
  896. The address string. This has the form 'protocol://interface:port',
  897. for example 'tcp://127.0.0.1:5555'. Protocols supported are
  898. tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
  899. encoded to utf-8 first.
  900. """
  901. _addr_bytes: bytes = _c_addr(addr)
  902. c_addr: p_char = _addr_bytes
  903. _check_closed(self)
  904. rc: C.int = zmq_unbind(self.handle, c_addr)
  905. if rc != 0:
  906. raise ZMQError()
  907. def disconnect(self, addr: str | bytes):
  908. """
  909. Disconnect from a remote 0MQ socket (undoes a call to connect).
  910. .. versionadded:: libzmq-3.2
  911. .. versionadded:: 13.0
  912. Parameters
  913. ----------
  914. addr : str
  915. The address string. This has the form 'protocol://interface:port',
  916. for example 'tcp://127.0.0.1:5555'. Protocols supported are
  917. tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
  918. encoded to utf-8 first.
  919. """
  920. _addr_bytes: bytes = _c_addr(addr)
  921. c_addr: p_char = _addr_bytes
  922. _check_closed(self)
  923. rc: C.int = zmq_disconnect(self.handle, c_addr)
  924. if rc != 0:
  925. raise ZMQError()
  926. def monitor(self, addr: str | bytes | None, events: C.int = ZMQ_EVENT_ALL):
  927. """
  928. Start publishing socket events on inproc.
  929. See libzmq docs for zmq_monitor for details.
  930. While this function is available from libzmq 3.2,
  931. pyzmq cannot parse monitor messages from libzmq prior to 4.0.
  932. .. versionadded: libzmq-3.2
  933. .. versionadded: 14.0
  934. Parameters
  935. ----------
  936. addr : str | None
  937. The inproc url used for monitoring. Passing None as
  938. the addr will cause an existing socket monitor to be
  939. deregistered.
  940. events : int
  941. default: zmq.EVENT_ALL
  942. The zmq event bitmask for which events will be sent to the monitor.
  943. """
  944. c_addr: p_char = NULL
  945. if addr is not None:
  946. _addr_bytes: bytes = _c_addr(addr)
  947. c_addr: p_char = _addr_bytes
  948. _check_closed(self)
  949. _check_rc(zmq_socket_monitor(self.handle, c_addr, events))
  950. def join(self, group: str | bytes):
  951. """
  952. Join a RADIO-DISH group
  953. Only for DISH sockets.
  954. libzmq and pyzmq must have been built with ZMQ_BUILD_DRAFT_API
  955. .. versionadded:: 17
  956. """
  957. _check_version((4, 2), "RADIO-DISH")
  958. if not zmq.DRAFT_API:
  959. raise RuntimeError("libzmq and pyzmq must be built with draft support")
  960. if isinstance(group, str):
  961. group = group.encode('utf8')
  962. c_group: bytes = group
  963. rc: C.int = zmq_join(self.handle, c_group)
  964. _check_rc(rc)
  965. def leave(self, group):
  966. """
  967. Leave a RADIO-DISH group
  968. Only for DISH sockets.
  969. libzmq and pyzmq must have been built with ZMQ_BUILD_DRAFT_API
  970. .. versionadded:: 17
  971. """
  972. _check_version((4, 2), "RADIO-DISH")
  973. if not zmq.DRAFT_API:
  974. raise RuntimeError("libzmq and pyzmq must be built with draft support")
  975. rc: C.int = zmq_leave(self.handle, group)
  976. _check_rc(rc)
  977. def send(self, data, flags=0, copy: bint = True, track: bint = False):
  978. """
  979. Send a single zmq message frame on this socket.
  980. This queues the message to be sent by the IO thread at a later time.
  981. With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full;
  982. otherwise, this waits until space is available.
  983. See :class:`Poller` for more general non-blocking I/O.
  984. Parameters
  985. ----------
  986. data : bytes, Frame, memoryview
  987. The content of the message. This can be any object that provides
  988. the Python buffer API (`memoryview(data)` can be called).
  989. flags : int
  990. 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
  991. copy : bool
  992. Should the message be sent in a copying or non-copying manner.
  993. track : bool
  994. Should the message be tracked for notification that ZMQ has
  995. finished with it? (ignored if copy=True)
  996. Returns
  997. -------
  998. None : if `copy` or not track
  999. None if message was sent, raises an exception otherwise.
  1000. MessageTracker : if track and not copy
  1001. a MessageTracker object, whose `done` property will
  1002. be False until the send is completed.
  1003. Raises
  1004. ------
  1005. TypeError
  1006. If a unicode object is passed
  1007. ValueError
  1008. If `track=True`, but an untracked Frame is passed.
  1009. ZMQError
  1010. for any of the reasons zmq_msg_send might fail (including
  1011. if NOBLOCK is set and the outgoing queue is full).
  1012. """
  1013. _check_closed(self)
  1014. if isinstance(data, str):
  1015. raise TypeError("unicode not allowed, use send_string")
  1016. if copy and not isinstance(data, Frame):
  1017. return _send_copy(self.handle, data, flags)
  1018. else:
  1019. if isinstance(data, Frame):
  1020. if track and not data.tracker:
  1021. raise ValueError('Not a tracked message')
  1022. msg = data
  1023. else:
  1024. if self.copy_threshold:
  1025. buf = memoryview(data)
  1026. nbytes: size_t = buf.nbytes
  1027. copy_threshold: size_t = self.copy_threshold
  1028. # always copy messages smaller than copy_threshold
  1029. if nbytes < copy_threshold:
  1030. _send_copy(self.handle, buf, flags)
  1031. return zmq._FINISHED_TRACKER
  1032. msg = Frame(data, track=track, copy_threshold=self.copy_threshold)
  1033. return _send_frame(self.handle, msg, flags)
  1034. def recv(self, flags=0, copy: bint = True, track: bint = False):
  1035. """
  1036. Receive a message.
  1037. With flags=NOBLOCK, this raises :class:`ZMQError` if no messages have
  1038. arrived; otherwise, this waits until a message arrives.
  1039. See :class:`Poller` for more general non-blocking I/O.
  1040. Parameters
  1041. ----------
  1042. flags : int
  1043. 0 or NOBLOCK.
  1044. copy : bool
  1045. Should the message be received in a copying or non-copying manner?
  1046. If False a Frame object is returned, if True a string copy of
  1047. message is returned.
  1048. track : bool
  1049. Should the message be tracked for notification that ZMQ has
  1050. finished with it? (ignored if copy=True)
  1051. Returns
  1052. -------
  1053. msg : bytes or Frame
  1054. The received message frame. If `copy` is False, then it will be a Frame,
  1055. otherwise it will be bytes.
  1056. Raises
  1057. ------
  1058. ZMQError
  1059. for any of the reasons zmq_msg_recv might fail (including if
  1060. NOBLOCK is set and no new messages have arrived).
  1061. """
  1062. _check_closed(self)
  1063. if copy:
  1064. return _recv_copy(self.handle, flags)
  1065. else:
  1066. frame = _recv_frame(self.handle, flags, track)
  1067. more: bint = False
  1068. sz: size_t = sizeof(bint)
  1069. _getsockopt(
  1070. self.handle, ZMQ_RCVMORE, cast(p_void, address(more)), address(sz)
  1071. )
  1072. frame.more = more
  1073. return frame
  1074. def recv_into(self, buffer, /, *, nbytes=0, flags=0) -> C.int:
  1075. """
  1076. Receive up to nbytes bytes from the socket,
  1077. storing the data into a buffer rather than allocating a new Frame.
  1078. The next message frame can be discarded by receiving into an empty buffer::
  1079. sock.recv_into(bytearray())
  1080. .. versionadded:: 26.4
  1081. Parameters
  1082. ----------
  1083. buffer : memoryview
  1084. Any object providing the buffer interface (i.e. `memoryview(buffer)` works),
  1085. where the memoryview is contiguous and writable.
  1086. nbytes: int, default=0
  1087. The maximum number of bytes to receive.
  1088. If nbytes is not specified (or 0), receive up to the size available in the given buffer.
  1089. If the next frame is larger than this, the frame will be truncated and message content discarded.
  1090. flags: int, default=0
  1091. See `socket.recv`
  1092. Returns
  1093. -------
  1094. bytes_received: int
  1095. Returns the number of bytes received.
  1096. This is always the size of the received frame.
  1097. If the returned `bytes_received` is larger than `nbytes` (or size of `buffer` if `nbytes=0`),
  1098. the message has been truncated and the rest of the frame discarded.
  1099. Truncated data cannot be recovered.
  1100. Raises
  1101. ------
  1102. ZMQError
  1103. for any of the reasons `zmq_recv` might fail.
  1104. BufferError
  1105. for invalid buffers, such as readonly or not contiguous.
  1106. """
  1107. c_flags: C.int = flags
  1108. _check_closed(self)
  1109. c_nbytes: size_t = nbytes
  1110. if c_nbytes < 0:
  1111. raise ValueError(f"{nbytes=} must be non-negative")
  1112. view = memoryview(buffer)
  1113. c_data = declare(pointer(C.void))
  1114. view_bytes: C.size_t = _asbuffer(view, address(c_data), True)
  1115. if nbytes == 0:
  1116. c_nbytes = view_bytes
  1117. elif c_nbytes > view_bytes:
  1118. raise ValueError(f"{nbytes=} too big for memoryview of {view_bytes}B")
  1119. # call zmq_recv, with retries
  1120. while True:
  1121. with nogil:
  1122. rc: C.int = zmq_recv(self.handle, c_data, c_nbytes, c_flags)
  1123. try:
  1124. _check_rc(rc)
  1125. except InterruptedSystemCall:
  1126. continue
  1127. else:
  1128. return rc
  1129. # inline socket methods
  1130. @inline
  1131. @cfunc
  1132. def _check_closed(s: Socket):
  1133. """raise ENOTSUP if socket is closed
  1134. Does not do a deep check
  1135. """
  1136. if s._closed:
  1137. raise ZMQError(ENOTSOCK)
  1138. @inline
  1139. @cfunc
  1140. def _check_closed_deep(s: Socket) -> bint:
  1141. """thorough check of whether the socket has been closed,
  1142. even if by another entity (e.g. ctx.destroy).
  1143. Only used by the `closed` property.
  1144. returns True if closed, False otherwise
  1145. """
  1146. rc: C.int
  1147. errno: C.int
  1148. stype = declare(C.int)
  1149. sz: size_t = sizeof(int)
  1150. if s._closed:
  1151. return True
  1152. else:
  1153. rc = zmq_getsockopt(
  1154. s.handle, ZMQ_TYPE, cast(p_void, address(stype)), address(sz)
  1155. )
  1156. if rc < 0:
  1157. errno = _zmq_errno()
  1158. if errno == ENOTSOCK:
  1159. s._closed = True
  1160. return True
  1161. elif errno == ZMQ_ETERM:
  1162. # don't raise ETERM when checking if we're closed
  1163. return False
  1164. else:
  1165. _check_rc(rc)
  1166. return False
  1167. @cfunc
  1168. @inline
  1169. def _recv_frame(handle: p_void, flags: C.int = 0, track: bint = False) -> Frame:
  1170. """Receive a message in a non-copying manner and return a Frame."""
  1171. rc: C.int
  1172. msg = zmq.Frame(track=track)
  1173. cmsg: Frame = msg
  1174. while True:
  1175. with nogil:
  1176. rc = zmq_msg_recv(address(cmsg.zmq_msg), handle, flags)
  1177. try:
  1178. _check_rc(rc)
  1179. except InterruptedSystemCall:
  1180. continue
  1181. else:
  1182. break
  1183. return msg
  1184. @cfunc
  1185. @inline
  1186. def _recv_copy(handle: p_void, flags: C.int = 0):
  1187. """Receive a message and return a copy"""
  1188. zmq_msg = declare(zmq_msg_t)
  1189. zmq_msg_p: pointer(zmq_msg_t) = address(zmq_msg)
  1190. rc: C.int = zmq_msg_init(zmq_msg_p)
  1191. _check_rc(rc)
  1192. while True:
  1193. with nogil:
  1194. rc = zmq_msg_recv(zmq_msg_p, handle, flags)
  1195. try:
  1196. _check_rc(rc)
  1197. except InterruptedSystemCall:
  1198. continue
  1199. except Exception:
  1200. zmq_msg_close(zmq_msg_p) # ensure msg is closed on failure
  1201. raise
  1202. else:
  1203. break
  1204. msg_bytes = _copy_zmq_msg_bytes(zmq_msg_p)
  1205. zmq_msg_close(zmq_msg_p)
  1206. return msg_bytes
  1207. @cfunc
  1208. @inline
  1209. def _send_frame(handle: p_void, msg: Frame, flags: C.int = 0):
  1210. """Send a Frame on this socket in a non-copy manner."""
  1211. rc: C.int
  1212. msg_copy: Frame
  1213. # Always copy so the original message isn't garbage collected.
  1214. # This doesn't do a real copy, just a reference.
  1215. msg_copy = msg.fast_copy()
  1216. while True:
  1217. with nogil:
  1218. rc = zmq_msg_send(address(msg_copy.zmq_msg), handle, flags)
  1219. try:
  1220. _check_rc(rc)
  1221. except InterruptedSystemCall:
  1222. continue
  1223. else:
  1224. break
  1225. return msg.tracker
  1226. @cfunc
  1227. @inline
  1228. def _send_copy(handle: p_void, buf, flags: C.int = 0):
  1229. """Send a message on this socket by copying its content."""
  1230. rc: C.int
  1231. msg = declare(zmq_msg_t)
  1232. c_bytes = declare(p_void)
  1233. # copy to c array:
  1234. c_bytes_len = _asbuffer(buf, address(c_bytes))
  1235. # Copy the msg before sending. This avoids any complications with
  1236. # the GIL, etc.
  1237. # If zmq_msg_init_* fails we must not call zmq_msg_close (Bus Error)
  1238. rc = zmq_msg_init_size(address(msg), c_bytes_len)
  1239. _check_rc(rc)
  1240. while True:
  1241. with nogil:
  1242. memcpy(zmq_msg_data(address(msg)), c_bytes, zmq_msg_size(address(msg)))
  1243. rc = zmq_msg_send(address(msg), handle, flags)
  1244. try:
  1245. _check_rc(rc)
  1246. except InterruptedSystemCall:
  1247. continue
  1248. except Exception:
  1249. zmq_msg_close(address(msg)) # close the unused msg
  1250. raise # raise original exception
  1251. else:
  1252. rc = zmq_msg_close(address(msg))
  1253. _check_rc(rc)
  1254. break
  1255. @cfunc
  1256. @inline
  1257. def _getsockopt(handle: p_void, option: C.int, optval: p_void, sz: pointer(size_t)):
  1258. """getsockopt, retrying interrupted calls
  1259. checks rc, raising ZMQError on failure.
  1260. """
  1261. rc: C.int = 0
  1262. while True:
  1263. rc = zmq_getsockopt(handle, option, optval, sz)
  1264. try:
  1265. _check_rc(rc)
  1266. except InterruptedSystemCall:
  1267. continue
  1268. else:
  1269. break
  1270. @cfunc
  1271. @inline
  1272. def _setsockopt(handle: p_void, option: C.int, optval: p_void, sz: size_t):
  1273. """setsockopt, retrying interrupted calls
  1274. checks rc, raising ZMQError on failure.
  1275. """
  1276. rc: C.int = 0
  1277. while True:
  1278. rc = zmq_setsockopt(handle, option, optval, sz)
  1279. try:
  1280. _check_rc(rc)
  1281. except InterruptedSystemCall:
  1282. continue
  1283. else:
  1284. break
  1285. # General utility functions
  1286. def zmq_errno() -> C.int:
  1287. """Return the integer errno of the most recent zmq error."""
  1288. return _zmq_errno()
  1289. def strerror(errno: C.int) -> str:
  1290. """
  1291. Return the error string given the error number.
  1292. """
  1293. str_e: bytes = zmq_strerror(errno)
  1294. return str_e.decode("utf8", "replace")
  1295. def zmq_version_info() -> tuple[int, int, int]:
  1296. """Return the version of ZeroMQ itself as a 3-tuple of ints."""
  1297. major: C.int = 0
  1298. minor: C.int = 0
  1299. patch: C.int = 0
  1300. _zmq_version(address(major), address(minor), address(patch))
  1301. return (major, minor, patch)
  1302. def has(capability: str) -> bool:
  1303. """Check for zmq capability by name (e.g. 'ipc', 'curve')
  1304. .. versionadded:: libzmq-4.1
  1305. .. versionadded:: 14.1
  1306. """
  1307. _check_version((4, 1), 'zmq.has')
  1308. ccap: bytes = capability.encode('utf8')
  1309. return bool(zmq_has(ccap))
  1310. def curve_keypair() -> tuple[bytes, bytes]:
  1311. """generate a Z85 key pair for use with zmq.CURVE security
  1312. Requires libzmq (≥ 4.0) to have been built with CURVE support.
  1313. .. versionadded:: libzmq-4.0
  1314. .. versionadded:: 14.0
  1315. Returns
  1316. -------
  1317. public: bytes
  1318. The public key as 40 byte z85-encoded bytestring.
  1319. private: bytes
  1320. The private key as 40 byte z85-encoded bytestring.
  1321. """
  1322. rc: C.int
  1323. public_key = declare(char[64])
  1324. secret_key = declare(char[64])
  1325. _check_version((4, 0), "curve_keypair")
  1326. # see huge comment in libzmq/src/random.cpp
  1327. # about threadsafety of random initialization
  1328. rc = zmq_curve_keypair(public_key, secret_key)
  1329. _check_rc(rc)
  1330. return public_key, secret_key
  1331. def curve_public(secret_key) -> bytes:
  1332. """Compute the public key corresponding to a secret key for use
  1333. with zmq.CURVE security
  1334. Requires libzmq (≥ 4.2) to have been built with CURVE support.
  1335. Parameters
  1336. ----------
  1337. private
  1338. The private key as a 40 byte z85-encoded bytestring
  1339. Returns
  1340. -------
  1341. bytes
  1342. The public key as a 40 byte z85-encoded bytestring
  1343. """
  1344. if isinstance(secret_key, str):
  1345. secret_key = secret_key.encode('utf8')
  1346. if not len(secret_key) == 40:
  1347. raise ValueError('secret key must be a 40 byte z85 encoded string')
  1348. rc: C.int
  1349. public_key = declare(char[64])
  1350. c_secret_key: pointer(char) = secret_key
  1351. _check_version((4, 2), "curve_public")
  1352. # see huge comment in libzmq/src/random.cpp
  1353. # about threadsafety of random initialization
  1354. rc = zmq_curve_public(public_key, c_secret_key)
  1355. _check_rc(rc)
  1356. return public_key[:40]
  1357. # polling
  1358. def zmq_poll(sockets, timeout: C.int = -1):
  1359. """zmq_poll(sockets, timeout=-1)
  1360. Poll a set of 0MQ sockets, native file descs. or sockets.
  1361. Parameters
  1362. ----------
  1363. sockets : list of tuples of (socket, flags)
  1364. Each element of this list is a two-tuple containing a socket
  1365. and a flags. The socket may be a 0MQ socket or any object with
  1366. a ``fileno()`` method. The flags can be zmq.POLLIN (for detecting
  1367. for incoming messages), zmq.POLLOUT (for detecting that send is OK)
  1368. or zmq.POLLIN|zmq.POLLOUT for detecting both.
  1369. timeout : int
  1370. The number of milliseconds to poll for. Negative means no timeout.
  1371. """
  1372. rc: C.int
  1373. i: C.int
  1374. fileno: fd_t
  1375. events: C.int
  1376. pollitems: pointer(zmq_pollitem_t) = NULL
  1377. nsockets: C.int = len(sockets)
  1378. if nsockets == 0:
  1379. return []
  1380. pollitems = cast(pointer(zmq_pollitem_t), malloc(nsockets * sizeof(zmq_pollitem_t)))
  1381. if pollitems == NULL:
  1382. raise MemoryError("Could not allocate poll items")
  1383. for i in range(nsockets):
  1384. s, events = sockets[i]
  1385. if isinstance(s, Socket):
  1386. pollitems[i].socket = cast(Socket, s).handle
  1387. pollitems[i].fd = 0
  1388. pollitems[i].events = events
  1389. pollitems[i].revents = 0
  1390. elif isinstance(s, int):
  1391. fileno = s
  1392. pollitems[i].socket = NULL
  1393. pollitems[i].fd = fileno
  1394. pollitems[i].events = events
  1395. pollitems[i].revents = 0
  1396. elif hasattr(s, 'fileno'):
  1397. try:
  1398. fileno = int(s.fileno())
  1399. except Exception:
  1400. free(pollitems)
  1401. raise ValueError('fileno() must return a valid integer fd')
  1402. else:
  1403. pollitems[i].socket = NULL
  1404. pollitems[i].fd = fileno
  1405. pollitems[i].events = events
  1406. pollitems[i].revents = 0
  1407. else:
  1408. free(pollitems)
  1409. raise TypeError(
  1410. "Socket must be a 0MQ socket, an integer fd or have "
  1411. f"a fileno() method: {s!r}"
  1412. )
  1413. ms_passed: C.int = 0
  1414. tic: C.int
  1415. try:
  1416. while True:
  1417. start: C.int = monotonic()
  1418. with nogil:
  1419. rc = zmq_poll_c(pollitems, nsockets, timeout)
  1420. try:
  1421. _check_rc(rc)
  1422. except InterruptedSystemCall:
  1423. if timeout > 0:
  1424. tic = monotonic()
  1425. ms_passed = int(1000 * (tic - start))
  1426. if ms_passed < 0:
  1427. # don't allow negative ms_passed,
  1428. # which can happen on old Python versions without time.monotonic.
  1429. warnings.warn(
  1430. f"Negative elapsed time for interrupted poll: {ms_passed}."
  1431. " Did the clock change?",
  1432. RuntimeWarning,
  1433. )
  1434. # treat this case the same as no time passing,
  1435. # since it should be rare and not happen twice in a row.
  1436. ms_passed = 0
  1437. timeout = max(0, timeout - ms_passed)
  1438. continue
  1439. else:
  1440. break
  1441. except Exception:
  1442. free(pollitems)
  1443. raise
  1444. results = []
  1445. for i in range(nsockets):
  1446. revents = pollitems[i].revents
  1447. # for compatibility with select.poll:
  1448. # - only return sockets with non-zero status
  1449. # - return the fd for plain sockets
  1450. if revents > 0:
  1451. if pollitems[i].socket != NULL:
  1452. s = sockets[i][0]
  1453. else:
  1454. s = pollitems[i].fd
  1455. results.append((s, revents))
  1456. free(pollitems)
  1457. return results
  1458. def proxy(frontend: Socket, backend: Socket, capture: Socket = None):
  1459. """
  1460. Start a zeromq proxy (replacement for device).
  1461. .. versionadded:: libzmq-3.2
  1462. .. versionadded:: 13.0
  1463. Parameters
  1464. ----------
  1465. frontend : Socket
  1466. The Socket instance for the incoming traffic.
  1467. backend : Socket
  1468. The Socket instance for the outbound traffic.
  1469. capture : Socket (optional)
  1470. The Socket instance for capturing traffic.
  1471. """
  1472. rc: C.int = 0
  1473. capture_handle: p_void
  1474. if isinstance(capture, Socket):
  1475. capture_handle = capture.handle
  1476. else:
  1477. capture_handle = NULL
  1478. while True:
  1479. with nogil:
  1480. rc = zmq_proxy(frontend.handle, backend.handle, capture_handle)
  1481. try:
  1482. _check_rc(rc)
  1483. except InterruptedSystemCall:
  1484. continue
  1485. else:
  1486. break
  1487. return rc
  1488. def proxy_steerable(
  1489. frontend: Socket,
  1490. backend: Socket,
  1491. capture: Socket = None,
  1492. control: Socket = None,
  1493. ):
  1494. """
  1495. Start a zeromq proxy with control flow.
  1496. .. versionadded:: libzmq-4.1
  1497. .. versionadded:: 18.0
  1498. Parameters
  1499. ----------
  1500. frontend : Socket
  1501. The Socket instance for the incoming traffic.
  1502. backend : Socket
  1503. The Socket instance for the outbound traffic.
  1504. capture : Socket (optional)
  1505. The Socket instance for capturing traffic.
  1506. control : Socket (optional)
  1507. The Socket instance for control flow.
  1508. """
  1509. rc: C.int = 0
  1510. capture_handle: p_void
  1511. if isinstance(capture, Socket):
  1512. capture_handle = capture.handle
  1513. else:
  1514. capture_handle = NULL
  1515. if isinstance(control, Socket):
  1516. control_handle = control.handle
  1517. else:
  1518. control_handle = NULL
  1519. while True:
  1520. with nogil:
  1521. rc = zmq_proxy_steerable(
  1522. frontend.handle, backend.handle, capture_handle, control_handle
  1523. )
  1524. try:
  1525. _check_rc(rc)
  1526. except InterruptedSystemCall:
  1527. continue
  1528. else:
  1529. break
  1530. return rc
  1531. # monitored queue - like proxy (predates libzmq proxy)
  1532. # but supports ROUTER-ROUTER devices
  1533. @cfunc
  1534. @inline
  1535. @nogil
  1536. def _mq_relay(
  1537. in_socket: p_void,
  1538. out_socket: p_void,
  1539. side_socket: p_void,
  1540. msg: zmq_msg_t,
  1541. side_msg: zmq_msg_t,
  1542. id_msg: zmq_msg_t,
  1543. swap_ids: bint,
  1544. ) -> C.int:
  1545. rc: C.int
  1546. flags: C.int
  1547. flagsz = declare(size_t)
  1548. more = declare(int)
  1549. flagsz = sizeof(int)
  1550. if swap_ids: # both router, must send second identity first
  1551. # recv two ids into msg, id_msg
  1552. rc = zmq_msg_recv(address(msg), in_socket, 0)
  1553. if rc < 0:
  1554. return rc
  1555. rc = zmq_msg_recv(address(id_msg), in_socket, 0)
  1556. if rc < 0:
  1557. return rc
  1558. # send second id (id_msg) first
  1559. # !!!! always send a copy before the original !!!!
  1560. rc = zmq_msg_copy(address(side_msg), address(id_msg))
  1561. if rc < 0:
  1562. return rc
  1563. rc = zmq_msg_send(address(side_msg), out_socket, ZMQ_SNDMORE)
  1564. if rc < 0:
  1565. return rc
  1566. rc = zmq_msg_send(address(id_msg), side_socket, ZMQ_SNDMORE)
  1567. if rc < 0:
  1568. return rc
  1569. # send first id (msg) second
  1570. rc = zmq_msg_copy(address(side_msg), address(msg))
  1571. if rc < 0:
  1572. return rc
  1573. rc = zmq_msg_send(address(side_msg), out_socket, ZMQ_SNDMORE)
  1574. if rc < 0:
  1575. return rc
  1576. rc = zmq_msg_send(address(msg), side_socket, ZMQ_SNDMORE)
  1577. if rc < 0:
  1578. return rc
  1579. while True:
  1580. rc = zmq_msg_recv(address(msg), in_socket, 0)
  1581. if rc < 0:
  1582. return rc
  1583. # assert (rc == 0)
  1584. rc = zmq_getsockopt(in_socket, ZMQ_RCVMORE, address(more), address(flagsz))
  1585. if rc < 0:
  1586. return rc
  1587. flags = 0
  1588. if more:
  1589. flags |= ZMQ_SNDMORE
  1590. rc = zmq_msg_copy(address(side_msg), address(msg))
  1591. if rc < 0:
  1592. return rc
  1593. if flags:
  1594. rc = zmq_msg_send(address(side_msg), out_socket, flags)
  1595. if rc < 0:
  1596. return rc
  1597. # only SNDMORE for side-socket
  1598. rc = zmq_msg_send(address(msg), side_socket, ZMQ_SNDMORE)
  1599. if rc < 0:
  1600. return rc
  1601. else:
  1602. rc = zmq_msg_send(address(side_msg), out_socket, 0)
  1603. if rc < 0:
  1604. return rc
  1605. rc = zmq_msg_send(address(msg), side_socket, 0)
  1606. if rc < 0:
  1607. return rc
  1608. break
  1609. return rc
  1610. @cfunc
  1611. @inline
  1612. @nogil
  1613. def _mq_inline(
  1614. in_socket: p_void,
  1615. out_socket: p_void,
  1616. side_socket: p_void,
  1617. in_msg_ptr: pointer(zmq_msg_t),
  1618. out_msg_ptr: pointer(zmq_msg_t),
  1619. swap_ids: bint,
  1620. ) -> C.int:
  1621. """
  1622. inner C function for monitored_queue
  1623. """
  1624. msg: zmq_msg_t = declare(zmq_msg_t)
  1625. rc: C.int = zmq_msg_init(address(msg))
  1626. id_msg = declare(zmq_msg_t)
  1627. rc = zmq_msg_init(address(id_msg))
  1628. if rc < 0:
  1629. return rc
  1630. side_msg = declare(zmq_msg_t)
  1631. rc = zmq_msg_init(address(side_msg))
  1632. if rc < 0:
  1633. return rc
  1634. items = declare(zmq_pollitem_t[2])
  1635. items[0].socket = in_socket
  1636. items[0].events = ZMQ_POLLIN
  1637. items[0].fd = items[0].revents = 0
  1638. items[1].socket = out_socket
  1639. items[1].events = ZMQ_POLLIN
  1640. items[1].fd = items[1].revents = 0
  1641. while True:
  1642. # wait for the next message to process
  1643. rc = zmq_poll_c(address(items[0]), 2, -1)
  1644. if rc < 0:
  1645. return rc
  1646. if items[0].revents & ZMQ_POLLIN:
  1647. # send in_prefix to side socket
  1648. rc = zmq_msg_copy(address(side_msg), in_msg_ptr)
  1649. if rc < 0:
  1650. return rc
  1651. rc = zmq_msg_send(address(side_msg), side_socket, ZMQ_SNDMORE)
  1652. if rc < 0:
  1653. return rc
  1654. # relay the rest of the message
  1655. rc = _mq_relay(
  1656. in_socket, out_socket, side_socket, msg, side_msg, id_msg, swap_ids
  1657. )
  1658. if rc < 0:
  1659. return rc
  1660. if items[1].revents & ZMQ_POLLIN:
  1661. # send out_prefix to side socket
  1662. rc = zmq_msg_copy(address(side_msg), out_msg_ptr)
  1663. if rc < 0:
  1664. return rc
  1665. rc = zmq_msg_send(address(side_msg), side_socket, ZMQ_SNDMORE)
  1666. if rc < 0:
  1667. return rc
  1668. # relay the rest of the message
  1669. rc = _mq_relay(
  1670. out_socket, in_socket, side_socket, msg, side_msg, id_msg, swap_ids
  1671. )
  1672. if rc < 0:
  1673. return rc
  1674. return rc
  1675. def monitored_queue(
  1676. in_socket: Socket,
  1677. out_socket: Socket,
  1678. mon_socket: Socket,
  1679. in_prefix: bytes = b'in',
  1680. out_prefix: bytes = b'out',
  1681. ):
  1682. """
  1683. Start a monitored queue device.
  1684. A monitored queue is very similar to the zmq.proxy device (monitored queue came first).
  1685. Differences from zmq.proxy:
  1686. - monitored_queue supports both in and out being ROUTER sockets
  1687. (via swapping IDENTITY prefixes).
  1688. - monitor messages are prefixed, making in and out messages distinguishable.
  1689. Parameters
  1690. ----------
  1691. in_socket : zmq.Socket
  1692. One of the sockets to the Queue. Its messages will be prefixed with
  1693. 'in'.
  1694. out_socket : zmq.Socket
  1695. One of the sockets to the Queue. Its messages will be prefixed with
  1696. 'out'. The only difference between in/out socket is this prefix.
  1697. mon_socket : zmq.Socket
  1698. This socket sends out every message received by each of the others
  1699. with an in/out prefix specifying which one it was.
  1700. in_prefix : str
  1701. Prefix added to broadcast messages from in_socket.
  1702. out_prefix : str
  1703. Prefix added to broadcast messages from out_socket.
  1704. """
  1705. ins: p_void = in_socket.handle
  1706. outs: p_void = out_socket.handle
  1707. mons: p_void = mon_socket.handle
  1708. in_msg = declare(zmq_msg_t)
  1709. out_msg = declare(zmq_msg_t)
  1710. swap_ids: bint
  1711. msg_c: p_void = NULL
  1712. msg_c_len = declare(Py_ssize_t)
  1713. rc: C.int
  1714. # force swap_ids if both ROUTERs
  1715. swap_ids = in_socket.type == ZMQ_ROUTER and out_socket.type == ZMQ_ROUTER
  1716. # build zmq_msg objects from str prefixes
  1717. msg_c_len = _asbuffer(in_prefix, address(msg_c))
  1718. rc = zmq_msg_init_size(address(in_msg), msg_c_len)
  1719. _check_rc(rc)
  1720. memcpy(zmq_msg_data(address(in_msg)), msg_c, zmq_msg_size(address(in_msg)))
  1721. msg_c_len = _asbuffer(out_prefix, address(msg_c))
  1722. rc = zmq_msg_init_size(address(out_msg), msg_c_len)
  1723. _check_rc(rc)
  1724. while True:
  1725. with nogil:
  1726. memcpy(
  1727. zmq_msg_data(address(out_msg)), msg_c, zmq_msg_size(address(out_msg))
  1728. )
  1729. rc = _mq_inline(
  1730. ins, outs, mons, address(in_msg), address(out_msg), swap_ids
  1731. )
  1732. try:
  1733. _check_rc(rc)
  1734. except InterruptedSystemCall:
  1735. continue
  1736. else:
  1737. break
  1738. return rc
  1739. __all__ = [
  1740. 'IPC_PATH_MAX_LEN',
  1741. 'PYZMQ_DRAFT_API',
  1742. 'Context',
  1743. 'Socket',
  1744. 'Frame',
  1745. 'has',
  1746. 'curve_keypair',
  1747. 'curve_public',
  1748. 'zmq_version_info',
  1749. 'zmq_errno',
  1750. 'zmq_poll',
  1751. 'strerror',
  1752. 'proxy',
  1753. 'proxy_steerable',
  1754. ]