| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048 |
- # cython: language_level = 3str
- # cython: freethreading_compatible = True
- """Cython backend for pyzmq"""
- # Copyright (C) PyZMQ Developers
- # Distributed under the terms of the Modified BSD License.
- from __future__ import annotations
- try:
- import cython
- if not cython.compiled:
- raise ImportError()
- except ImportError:
- from pathlib import Path
- zmq_root = Path(__file__).parents[3]
- msg = f"""
- Attempting to import zmq Cython backend, which has not been compiled.
- This probably means you are importing zmq from its source tree.
- if this is what you want, make sure to do an in-place build first:
- pip install -e '{zmq_root}'
- If it is not, then '{zmq_root}' is probably on your sys.path,
- when it shouldn't be. Is that your current working directory?
- If neither of those is true and this file is actually installed,
- something seems to have gone wrong with the install!
- Please report at https://github.com/zeromq/pyzmq/issues
- """
- raise ImportError(msg)
- import warnings
- from threading import Event
- from time import monotonic
- from weakref import ref
- import cython as C
- from cython import (
- NULL,
- Py_ssize_t,
- address,
- bint,
- cast,
- cclass,
- cfunc,
- char,
- declare,
- inline,
- nogil,
- p_char,
- p_void,
- pointer,
- size_t,
- sizeof,
- )
- from cython.cimports.cpython.buffer import (
- Py_buffer,
- PyBUF_ANY_CONTIGUOUS,
- PyBUF_WRITABLE,
- PyBuffer_Release,
- PyObject_GetBuffer,
- )
- from cython.cimports.cpython.bytes import (
- PyBytes_AsString,
- PyBytes_FromStringAndSize,
- PyBytes_Size,
- )
- from cython.cimports.cpython.exc import PyErr_CheckSignals
- from cython.cimports.libc.errno import EAGAIN, EINTR, ENAMETOOLONG, ENOENT, ENOTSOCK
- from cython.cimports.libc.stdint import uint32_t
- from cython.cimports.libc.stdio import fprintf
- from cython.cimports.libc.stdio import stderr as cstderr
- from cython.cimports.libc.stdlib import free, malloc
- from cython.cimports.libc.string import memcpy
- from cython.cimports.zmq.backend.cython import libzmq
- from cython.cimports.zmq.backend.cython._externs import (
- get_ipc_path_max_len,
- getpid,
- mutex_allocate,
- mutex_lock,
- mutex_t,
- mutex_unlock,
- )
- from cython.cimports.zmq.backend.cython.libzmq import (
- ZMQ_ENOTSOCK,
- ZMQ_ETERM,
- ZMQ_EVENT_ALL,
- ZMQ_FD,
- ZMQ_IDENTITY,
- ZMQ_IO_THREADS,
- ZMQ_LINGER,
- ZMQ_POLLIN,
- ZMQ_POLLOUT,
- ZMQ_RCVMORE,
- ZMQ_ROUTER,
- ZMQ_SNDMORE,
- ZMQ_THREAD_SAFE,
- ZMQ_TYPE,
- _zmq_version,
- fd_t,
- int64_t,
- zmq_bind,
- zmq_close,
- zmq_connect,
- zmq_ctx_destroy,
- zmq_ctx_get,
- zmq_ctx_new,
- zmq_ctx_set,
- zmq_curve_keypair,
- zmq_curve_public,
- zmq_disconnect,
- zmq_free_fn,
- zmq_getsockopt,
- zmq_has,
- zmq_join,
- zmq_leave,
- zmq_msg_close,
- zmq_msg_copy,
- zmq_msg_data,
- zmq_msg_get,
- zmq_msg_gets,
- zmq_msg_group,
- zmq_msg_init,
- zmq_msg_init_data,
- zmq_msg_init_size,
- zmq_msg_recv,
- zmq_msg_routing_id,
- zmq_msg_send,
- zmq_msg_set,
- zmq_msg_set_group,
- zmq_msg_set_routing_id,
- zmq_msg_size,
- zmq_msg_t,
- zmq_poller_add,
- zmq_poller_destroy,
- zmq_poller_fd,
- zmq_poller_new,
- zmq_pollitem_t,
- zmq_proxy,
- zmq_proxy_steerable,
- zmq_recv,
- zmq_setsockopt,
- zmq_socket,
- zmq_socket_monitor,
- zmq_strerror,
- zmq_unbind,
- )
- from cython.cimports.zmq.backend.cython.libzmq import zmq_errno as _zmq_errno
- from cython.cimports.zmq.backend.cython.libzmq import zmq_poll as zmq_poll_c
- import zmq
- from zmq.constants import SocketOption, _OptType
- from zmq.error import (
- Again,
- ContextTerminated,
- InterruptedSystemCall,
- ZMQError,
- _check_version,
- )
- IPC_PATH_MAX_LEN: int = get_ipc_path_max_len()
- PYZMQ_DRAFT_API: bool = bool(libzmq.PYZMQ_DRAFT_API)
- @cfunc
- @inline
- @C.exceptval(-1)
- def _check_rc(rc: C.int, error_without_errno: bint = False) -> C.int:
- """internal utility for checking zmq return condition
- and raising the appropriate Exception class
- """
- errno: C.int = _zmq_errno()
- PyErr_CheckSignals()
- if errno == 0 and not error_without_errno:
- return 0
- if rc == -1: # if rc < -1, it's a bug in libzmq. Should we warn?
- if errno == EINTR:
- raise InterruptedSystemCall(errno)
- elif errno == EAGAIN:
- raise Again(errno)
- elif errno == ZMQ_ETERM:
- raise ContextTerminated(errno)
- else:
- raise ZMQError(errno)
- return 0
- # message Frame class
- _zhint = C.struct(
- sock=p_void,
- mutex=pointer(mutex_t),
- id=size_t,
- )
- @cfunc
- @nogil
- def free_python_msg(data: p_void, vhint: p_void) -> C.int:
- """A pure-C function for DECREF'ing Python-owned message data.
- Sends a message on a PUSH socket
- The hint is a `zhint` struct with two values:
- sock (void *): pointer to the Garbage Collector's PUSH socket
- id (size_t): the id to be used to construct a zmq_msg_t that should be sent on a PUSH socket,
- signaling the Garbage Collector to remove its reference to the object.
- When the Garbage Collector's PULL socket receives the message,
- it deletes its reference to the object,
- allowing Python to free the memory.
- """
- msg = declare(zmq_msg_t)
- msg_ptr: pointer(zmq_msg_t) = address(msg)
- hint: pointer(_zhint) = cast(pointer(_zhint), vhint)
- rc: C.int
- if hint != NULL:
- zmq_msg_init_size(msg_ptr, sizeof(size_t))
- memcpy(zmq_msg_data(msg_ptr), address(hint.id), sizeof(size_t))
- rc = mutex_lock(hint.mutex)
- if rc != 0:
- fprintf(cstderr, "pyzmq-gc mutex lock failed rc=%d\n", rc)
- rc = zmq_msg_send(msg_ptr, hint.sock, 0)
- if rc < 0:
- # gc socket could have been closed, e.g. during process teardown.
- # If so, ignore the failure because there's nothing to do.
- if _zmq_errno() != ZMQ_ENOTSOCK:
- fprintf(
- cstderr, "pyzmq-gc send failed: %s\n", zmq_strerror(_zmq_errno())
- )
- rc = mutex_unlock(hint.mutex)
- if rc != 0:
- fprintf(cstderr, "pyzmq-gc mutex unlock failed rc=%d\n", rc)
- zmq_msg_close(msg_ptr)
- free(hint)
- return 0
- @cfunc
- @inline
- def _copy_zmq_msg_bytes(zmq_msg: pointer(zmq_msg_t)) -> bytes:
- """Copy the data from a zmq_msg_t"""
- data_c: p_char = NULL
- data_len_c: Py_ssize_t
- data_c = cast(p_char, zmq_msg_data(zmq_msg))
- data_len_c = zmq_msg_size(zmq_msg)
- return PyBytes_FromStringAndSize(data_c, data_len_c)
- @cfunc
- @inline
- def _asbuffer(obj, data_c: pointer(p_void), writable: bint = False) -> size_t:
- """Get a C buffer from a memoryview"""
- pybuf = declare(Py_buffer)
- flags: C.int = PyBUF_ANY_CONTIGUOUS
- if writable:
- flags |= PyBUF_WRITABLE
- rc: C.int = PyObject_GetBuffer(obj, address(pybuf), flags)
- if rc < 0:
- raise ValueError("Couldn't create buffer")
- data_c[0] = pybuf.buf
- data_size: size_t = pybuf.len
- PyBuffer_Release(address(pybuf))
- return data_size
- _gc = None
- @cclass
- class Frame:
- def __init__(
- self, data=None, track=False, copy=None, copy_threshold=None, **kwargs
- ):
- rc: C.int
- data_c: p_char = NULL
- data_len_c: Py_ssize_t = 0
- hint: pointer(_zhint)
- if copy_threshold is None:
- copy_threshold = zmq.COPY_THRESHOLD
- c_copy_threshold: C.size_t = 0
- if copy_threshold is not None:
- c_copy_threshold = copy_threshold
- zmq_msg_ptr: pointer(zmq_msg_t) = address(self.zmq_msg)
- # init more as False
- self.more = False
- # Save the data object in case the user wants the the data as a str.
- self._data = data
- self._failed_init = True # bool switch for dealloc
- self._buffer = None # buffer view of data
- self._bytes = None # bytes copy of data
- self.tracker_event = None
- self.tracker = None
- # self.tracker should start finished
- # except in the case where we are sharing memory with libzmq
- if track:
- self.tracker = zmq._FINISHED_TRACKER
- if isinstance(data, str):
- raise TypeError("Str objects not allowed. Only: bytes, buffer interfaces.")
- if data is None:
- rc = zmq_msg_init(zmq_msg_ptr)
- _check_rc(rc)
- self._failed_init = False
- return
- data_len_c = _asbuffer(data, cast(pointer(p_void), address(data_c)))
- # copy unspecified, apply copy_threshold
- c_copy: bint = True
- if copy is None:
- if c_copy_threshold and data_len_c < c_copy_threshold:
- c_copy = True
- else:
- c_copy = False
- else:
- c_copy = copy
- if c_copy:
- # copy message data instead of sharing memory
- rc = zmq_msg_init_size(zmq_msg_ptr, data_len_c)
- _check_rc(rc)
- memcpy(zmq_msg_data(zmq_msg_ptr), data_c, data_len_c)
- self._failed_init = False
- return
- # Getting here means that we are doing a true zero-copy Frame,
- # where libzmq and Python are sharing memory.
- # Hook up garbage collection with MessageTracker and zmq_free_fn
- # Event and MessageTracker for monitoring when zmq is done with data:
- if track:
- evt = Event()
- self.tracker_event = evt
- self.tracker = zmq.MessageTracker(evt)
- # create the hint for zmq_free_fn
- # two pointers: the gc context and a message to be sent to the gc PULL socket
- # allows libzmq to signal to Python when it is done with Python-owned memory.
- global _gc
- if _gc is None:
- from zmq.utils.garbage import gc as _gc
- hint: pointer(_zhint) = cast(pointer(_zhint), malloc(sizeof(_zhint)))
- hint.id = _gc.store(data, self.tracker_event)
- if not _gc._push_mutex:
- hint.mutex = mutex_allocate()
- _gc._push_mutex = cast(size_t, hint.mutex)
- else:
- hint.mutex = cast(pointer(mutex_t), cast(size_t, _gc._push_mutex))
- hint.sock = cast(p_void, cast(size_t, _gc._push_socket.underlying))
- rc = zmq_msg_init_data(
- zmq_msg_ptr,
- cast(p_void, data_c),
- data_len_c,
- cast(pointer(zmq_free_fn), free_python_msg),
- cast(p_void, hint),
- )
- if rc != 0:
- free(hint)
- _check_rc(rc)
- self._failed_init = False
- def __dealloc__(self):
- if self._failed_init:
- return
- # decrease the 0MQ ref-count of zmq_msg
- with nogil:
- rc: C.int = zmq_msg_close(address(self.zmq_msg))
- _check_rc(rc)
- def __copy__(self):
- return self.fast_copy()
- def fast_copy(self) -> Frame:
- new_msg: Frame = Frame()
- # This does not copy the contents, but just increases the ref-count
- # of the zmq_msg by one.
- zmq_msg_copy(address(new_msg.zmq_msg), address(self.zmq_msg))
- # Copy the ref to data so the copy won't create a copy when str is
- # called.
- if self._data is not None:
- new_msg._data = self._data
- if self._buffer is not None:
- new_msg._buffer = self._buffer
- if self._bytes is not None:
- new_msg._bytes = self._bytes
- # Frame copies share the tracker and tracker_event
- new_msg.tracker_event = self.tracker_event
- new_msg.tracker = self.tracker
- return new_msg
- # buffer interface code adapted from petsc4py by Lisandro Dalcin, a BSD project
- def __getbuffer__(self, buffer: pointer(Py_buffer), flags: C.int): # noqa: F821
- # new-style (memoryview) buffer interface
- buffer.buf = zmq_msg_data(address(self.zmq_msg))
- buffer.len = zmq_msg_size(address(self.zmq_msg))
- buffer.obj = self
- buffer.readonly = 0
- buffer.format = "B"
- buffer.ndim = 1
- buffer.shape = address(buffer.len)
- buffer.strides = NULL
- buffer.suboffsets = NULL
- buffer.itemsize = 1
- buffer.internal = NULL
- def __len__(self) -> size_t:
- """Return the length of the message in bytes."""
- sz: size_t = zmq_msg_size(address(self.zmq_msg))
- return sz
- @property
- def buffer(self):
- """A memoryview of the message contents."""
- _buffer = self._buffer and self._buffer()
- if _buffer is not None:
- return _buffer
- _buffer = memoryview(self)
- self._buffer = ref(_buffer)
- return _buffer
- @property
- def bytes(self):
- """The message content as a Python bytes object.
- The first time this property is accessed, a copy of the message
- contents is made. From then on that same copy of the message is
- returned.
- """
- if self._bytes is None:
- self._bytes = _copy_zmq_msg_bytes(address(self.zmq_msg))
- return self._bytes
- def get(self, option):
- """
- Get a Frame option or property.
- See the 0MQ API documentation for zmq_msg_get and zmq_msg_gets
- for details on specific options.
- .. versionadded:: libzmq-3.2
- .. versionadded:: 13.0
- .. versionchanged:: 14.3
- add support for zmq_msg_gets (requires libzmq-4.1)
- All message properties are strings.
- .. versionchanged:: 17.0
- Added support for `routing_id` and `group`.
- Only available if draft API is enabled
- with libzmq >= 4.2.
- """
- rc: C.int = 0
- property_c: p_char = NULL
- # zmq_msg_get
- if isinstance(option, int):
- rc = zmq_msg_get(address(self.zmq_msg), option)
- _check_rc(rc)
- return rc
- if option == 'routing_id':
- routing_id: uint32_t = zmq_msg_routing_id(address(self.zmq_msg))
- if routing_id == 0:
- _check_rc(-1)
- return routing_id
- elif option == 'group':
- buf = zmq_msg_group(address(self.zmq_msg))
- if buf == NULL:
- _check_rc(-1)
- return buf.decode('utf8')
- # zmq_msg_gets
- _check_version((4, 1), "get string properties")
- if isinstance(option, str):
- option = option.encode('utf8')
- if not isinstance(option, bytes):
- raise TypeError(f"expected str, got: {option!r}")
- property_c = option
- result: p_char = cast(p_char, zmq_msg_gets(address(self.zmq_msg), property_c))
- if result == NULL:
- _check_rc(-1)
- return result.decode('utf8')
- def set(self, option, value):
- """Set a Frame option.
- See the 0MQ API documentation for zmq_msg_set
- for details on specific options.
- .. versionadded:: libzmq-3.2
- .. versionadded:: 13.0
- .. versionchanged:: 17.0
- Added support for `routing_id` and `group`.
- Only available if draft API is enabled
- with libzmq >= 4.2.
- """
- rc: C.int
- if option == 'routing_id':
- routing_id: uint32_t = value
- rc = zmq_msg_set_routing_id(address(self.zmq_msg), routing_id)
- _check_rc(rc)
- return
- elif option == 'group':
- if isinstance(value, str):
- value = value.encode('utf8')
- rc = zmq_msg_set_group(address(self.zmq_msg), value)
- _check_rc(rc)
- return
- rc = zmq_msg_set(address(self.zmq_msg), option, value)
- _check_rc(rc)
- @cclass
- class Context:
- """
- Manage the lifecycle of a 0MQ context.
- Parameters
- ----------
- io_threads : int
- The number of IO threads.
- """
- def __init__(self, io_threads: C.int = 1, shadow: size_t = 0):
- self.handle = NULL
- self._pid = 0
- self._shadow = False
- if shadow:
- self.handle = cast(p_void, shadow)
- self._shadow = True
- else:
- self._shadow = False
- self.handle = zmq_ctx_new()
- if self.handle == NULL:
- raise ZMQError()
- rc: C.int = 0
- if not self._shadow:
- rc = zmq_ctx_set(self.handle, ZMQ_IO_THREADS, io_threads)
- _check_rc(rc)
- self.closed = False
- self._pid = getpid()
- @property
- def underlying(self):
- """The address of the underlying libzmq context"""
- return cast(size_t, self.handle)
- @cfunc
- @inline
- def _term(self) -> C.int:
- rc: C.int = 0
- if self.handle != NULL and not self.closed and getpid() == self._pid:
- with nogil:
- rc = zmq_ctx_destroy(self.handle)
- self.handle = NULL
- return rc
- def term(self):
- """
- Close or terminate the context.
- This can be called to close the context by hand. If this is not called,
- the context will automatically be closed when it is garbage collected.
- """
- rc: C.int = self._term()
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- # ignore interrupted term
- # see PEP 475 notes about close & EINTR for why
- pass
- self.closed = True
- def set(self, option: C.int, optval):
- """
- Set a context option.
- See the 0MQ API documentation for zmq_ctx_set
- for details on specific options.
- .. versionadded:: libzmq-3.2
- .. versionadded:: 13.0
- Parameters
- ----------
- option : int
- The option to set. Available values will depend on your
- version of libzmq. Examples include::
- zmq.IO_THREADS, zmq.MAX_SOCKETS
- optval : int
- The value of the option to set.
- """
- optval_int_c: C.int
- rc: C.int
- if self.closed:
- raise RuntimeError("Context has been destroyed")
- if not isinstance(optval, int):
- raise TypeError(f'expected int, got: {optval!r}')
- optval_int_c = optval
- rc = zmq_ctx_set(self.handle, option, optval_int_c)
- _check_rc(rc)
- def get(self, option: C.int):
- """
- Get the value of a context option.
- See the 0MQ API documentation for zmq_ctx_get
- for details on specific options.
- .. versionadded:: libzmq-3.2
- .. versionadded:: 13.0
- Parameters
- ----------
- option : int
- The option to get. Available values will depend on your
- version of libzmq. Examples include::
- zmq.IO_THREADS, zmq.MAX_SOCKETS
- Returns
- -------
- optval : int
- The value of the option as an integer.
- """
- rc: C.int
- if self.closed:
- raise RuntimeError("Context has been destroyed")
- rc = zmq_ctx_get(self.handle, option)
- _check_rc(rc, error_without_errno=False)
- return rc
- @cfunc
- @inline
- def _c_addr(addr) -> bytes:
- """cast an address input to bytes
- Expects a str, but accepts bytes
- and raises informative TypeError otherwise.
- """
- if isinstance(addr, str):
- addr = addr.encode("utf-8")
- try:
- c_addr: bytes = addr
- except TypeError:
- raise TypeError(f"Expected addr to be str, got addr={addr!r}")
- return c_addr
- @cclass
- class Socket:
- """
- A 0MQ socket.
- These objects will generally be constructed via the socket() method of a Context object.
- Note: 0MQ Sockets are *not* threadsafe. **DO NOT** share them across threads.
- Parameters
- ----------
- context : Context
- The 0MQ Context this Socket belongs to.
- socket_type : int
- The socket type, which can be any of the 0MQ socket types:
- REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, XPUB, XSUB.
- See Also
- --------
- .Context.socket : method for creating a socket bound to a Context.
- """
- def __init__(
- self,
- context=None,
- socket_type: C.int = -1,
- shadow: size_t = 0,
- copy_threshold=None,
- ):
- # pre-init
- self.handle = NULL
- self._draft_poller = NULL
- self._pid = 0
- self._shadow = False
- self.context = None
- if copy_threshold is None:
- copy_threshold = zmq.COPY_THRESHOLD
- self.copy_threshold = copy_threshold
- self.handle = NULL
- self.context = context
- if shadow:
- self._shadow = True
- self.handle = cast(p_void, shadow)
- else:
- if context is None:
- raise TypeError("context must be specified")
- if socket_type < 0:
- raise TypeError("socket_type must be specified")
- self._shadow = False
- self.handle = zmq_socket(self.context.handle, socket_type)
- if self.handle == NULL:
- raise ZMQError()
- self._closed = False
- self._pid = getpid()
- @property
- def underlying(self):
- """The address of the underlying libzmq socket"""
- return cast(size_t, self.handle)
- @property
- def closed(self):
- """Whether the socket is closed"""
- return _check_closed_deep(self)
- def close(self, linger: int | None = None):
- """
- Close the socket.
- If linger is specified, LINGER sockopt will be set prior to closing.
- This can be called to close the socket by hand. If this is not
- called, the socket will automatically be closed when it is
- garbage collected.
- """
- rc: C.int = 0
- linger_c: C.int
- setlinger: bint = False
- if linger is not None:
- linger_c = linger
- setlinger = True
- if self.handle != NULL and not self._closed and getpid() == self._pid:
- if setlinger:
- zmq_setsockopt(self.handle, ZMQ_LINGER, address(linger_c), sizeof(int))
- # teardown draft poller
- if self._draft_poller != NULL:
- zmq_poller_destroy(address(self._draft_poller))
- self._draft_poller = NULL
- rc = zmq_close(self.handle)
- if rc < 0 and _zmq_errno() != ENOTSOCK:
- # ignore ENOTSOCK (closed by Context)
- _check_rc(rc)
- self._closed = True
- self.handle = NULL
- def set(self, option: C.int, optval):
- """
- Set socket options.
- See the 0MQ API documentation for details on specific options.
- Parameters
- ----------
- option : int
- The option to set. Available values will depend on your
- version of libzmq. Examples include::
- zmq.SUBSCRIBE, UNSUBSCRIBE, IDENTITY, HWM, LINGER, FD
- optval : int or bytes
- The value of the option to set.
- Notes
- -----
- .. warning::
- All options other than zmq.SUBSCRIBE, zmq.UNSUBSCRIBE and
- zmq.LINGER only take effect for subsequent socket bind/connects.
- """
- optval_int64_c: int64_t
- optval_int_c: C.int
- optval_c: p_char
- sz: Py_ssize_t
- _check_closed(self)
- if isinstance(optval, str):
- raise TypeError("unicode not allowed, use setsockopt_string")
- try:
- sopt = SocketOption(option)
- except ValueError:
- # unrecognized option,
- # assume from the future,
- # let EINVAL raise
- opt_type = _OptType.int
- else:
- opt_type = sopt._opt_type
- if opt_type == _OptType.bytes:
- if not isinstance(optval, bytes):
- raise TypeError(f'expected bytes, got: {optval!r}')
- optval_c = PyBytes_AsString(optval)
- sz = PyBytes_Size(optval)
- _setsockopt(self.handle, option, optval_c, sz)
- elif opt_type == _OptType.int64:
- if not isinstance(optval, int):
- raise TypeError(f'expected int, got: {optval!r}')
- optval_int64_c = optval
- _setsockopt(self.handle, option, address(optval_int64_c), sizeof(int64_t))
- else:
- # default is to assume int, which is what most new sockopts will be
- # this lets pyzmq work with newer libzmq which may add constants
- # pyzmq has not yet added, rather than artificially raising. Invalid
- # sockopts will still raise just the same, but it will be libzmq doing
- # the raising.
- if not isinstance(optval, int):
- raise TypeError(f'expected int, got: {optval!r}')
- optval_int_c = optval
- _setsockopt(self.handle, option, address(optval_int_c), sizeof(int))
- def get(self, option: C.int):
- """
- Get the value of a socket option.
- See the 0MQ API documentation for details on specific options.
- .. versionchanged:: 27
- Added experimental support for ZMQ_FD for draft sockets via `zmq_poller_fd`.
- Requires libzmq >=4.3.2 built with draft support.
- Parameters
- ----------
- option : int
- The option to get. Available values will depend on your
- version of libzmq. Examples include::
- zmq.IDENTITY, HWM, LINGER, FD, EVENTS
- Returns
- -------
- optval : int or bytes
- The value of the option as a bytestring or int.
- """
- optval_int64_c = declare(int64_t)
- optval_int_c = declare(C.int)
- optval_fd_c = declare(fd_t)
- identity_str_c = declare(char[255])
- sz: size_t
- _check_closed(self)
- try:
- sopt = SocketOption(option)
- except ValueError:
- # unrecognized option,
- # assume from the future,
- # let EINVAL raise
- opt_type = _OptType.int
- else:
- opt_type = sopt._opt_type
- if opt_type == _OptType.bytes:
- sz = 255
- _getsockopt(self.handle, option, cast(p_void, identity_str_c), address(sz))
- # strip null-terminated strings *except* identity
- if (
- option != ZMQ_IDENTITY
- and sz > 0
- and (cast(p_char, identity_str_c))[sz - 1] == b'\0'
- ):
- sz -= 1
- result = PyBytes_FromStringAndSize(cast(p_char, identity_str_c), sz)
- elif opt_type == _OptType.int64:
- sz = sizeof(int64_t)
- _getsockopt(
- self.handle, option, cast(p_void, address(optval_int64_c)), address(sz)
- )
- result = optval_int64_c
- elif option == ZMQ_FD and self._draft_poller != NULL:
- # draft sockets use FD of a draft zmq_poller as proxy
- rc = zmq_poller_fd(self._draft_poller, address(optval_fd_c))
- _check_rc(rc)
- result = optval_fd_c
- elif opt_type == _OptType.fd:
- sz = sizeof(fd_t)
- try:
- _getsockopt(
- self.handle, option, cast(p_void, address(optval_fd_c)), address(sz)
- )
- except ZMQError as e:
- # threadsafe sockets don't support ZMQ_FD (yet!)
- # fallback on zmq_poller_fd as proxy with the same behavior
- # until libzmq fixes this.
- # if upstream fixes it, this branch will never be taken
- if (
- option == ZMQ_FD
- and e.errno == zmq.Errno.EINVAL
- and self.get(ZMQ_THREAD_SAFE)
- ):
- _check_version(
- (4, 3, 2), "draft socket FD support via zmq_poller_fd"
- )
- if not zmq.DRAFT_API:
- raise RuntimeError(
- "libzmq and pyzmq must be built with draft support"
- )
- warnings.warn(zmq.error.DraftFDWarning(), stacklevel=2)
- # create a poller and retrieve its fd
- self._draft_poller = zmq_poller_new()
- if self._draft_poller == NULL:
- # failed (why?), raise original error
- raise
- # register self with poller
- rc = zmq_poller_add(
- self._draft_poller, self.handle, NULL, ZMQ_POLLIN | ZMQ_POLLOUT
- )
- _check_rc(rc)
- # use poller fd as proxy for ours
- rc = zmq_poller_fd(self._draft_poller, address(optval_fd_c))
- _check_rc(rc)
- else:
- raise
- result = optval_fd_c
- else:
- # default is to assume int, which is what most new sockopts will be
- # this lets pyzmq work with newer libzmq which may add constants
- # pyzmq has not yet added, rather than artificially raising. Invalid
- # sockopts will still raise just the same, but it will be libzmq doing
- # the raising.
- sz = sizeof(int)
- _getsockopt(
- self.handle, option, cast(p_void, address(optval_int_c)), address(sz)
- )
- result = optval_int_c
- return result
- def bind(self, addr: str | bytes):
- """
- Bind the socket to an address.
- This causes the socket to listen on a network port. Sockets on the
- other side of this connection will use ``Socket.connect(addr)`` to
- connect to this socket.
- Parameters
- ----------
- addr : str
- The address string. This has the form 'protocol://interface:port',
- for example 'tcp://127.0.0.1:5555'. Protocols supported include
- tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
- encoded to utf-8 first.
- """
- _addr_bytes: bytes = _c_addr(addr)
- c_addr: p_char = _addr_bytes
- _check_closed(self)
- rc: C.int = zmq_bind(self.handle, c_addr)
- if rc != 0:
- _errno: C.int = _zmq_errno()
- _ipc_max: C.int = get_ipc_path_max_len()
- if _ipc_max and _errno == ENAMETOOLONG:
- path = addr.split('://', 1)[-1]
- msg = (
- f'ipc path "{path}" is longer than {_ipc_max} '
- 'characters (sizeof(sockaddr_un.sun_path)). '
- 'zmq.IPC_PATH_MAX_LEN constant can be used '
- 'to check addr length (if it is defined).'
- )
- raise ZMQError(msg=msg)
- elif _errno == ENOENT:
- path = addr.split('://', 1)[-1]
- msg = f'No such file or directory for ipc path "{path}".'
- raise ZMQError(msg=msg)
- while True:
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- rc = zmq_bind(self.handle, c_addr)
- continue
- else:
- break
- def connect(self, addr: str | bytes) -> None:
- """
- Connect to a remote 0MQ socket.
- Parameters
- ----------
- addr : str
- The address string. This has the form 'protocol://interface:port',
- for example 'tcp://127.0.0.1:5555'. Protocols supported are
- tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
- encoded to utf-8 first.
- """
- rc: C.int
- _addr_bytes: bytes = _c_addr(addr)
- c_addr: p_char = _addr_bytes
- _check_closed(self)
- while True:
- try:
- rc = zmq_connect(self.handle, c_addr)
- _check_rc(rc)
- except InterruptedSystemCall:
- # retry syscall
- continue
- else:
- break
- def unbind(self, addr: str | bytes):
- """
- Unbind from an address (undoes a call to bind).
- .. versionadded:: libzmq-3.2
- .. versionadded:: 13.0
- Parameters
- ----------
- addr : str
- The address string. This has the form 'protocol://interface:port',
- for example 'tcp://127.0.0.1:5555'. Protocols supported are
- tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
- encoded to utf-8 first.
- """
- _addr_bytes: bytes = _c_addr(addr)
- c_addr: p_char = _addr_bytes
- _check_closed(self)
- rc: C.int = zmq_unbind(self.handle, c_addr)
- if rc != 0:
- raise ZMQError()
- def disconnect(self, addr: str | bytes):
- """
- Disconnect from a remote 0MQ socket (undoes a call to connect).
- .. versionadded:: libzmq-3.2
- .. versionadded:: 13.0
- Parameters
- ----------
- addr : str
- The address string. This has the form 'protocol://interface:port',
- for example 'tcp://127.0.0.1:5555'. Protocols supported are
- tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
- encoded to utf-8 first.
- """
- _addr_bytes: bytes = _c_addr(addr)
- c_addr: p_char = _addr_bytes
- _check_closed(self)
- rc: C.int = zmq_disconnect(self.handle, c_addr)
- if rc != 0:
- raise ZMQError()
- def monitor(self, addr: str | bytes | None, events: C.int = ZMQ_EVENT_ALL):
- """
- Start publishing socket events on inproc.
- See libzmq docs for zmq_monitor for details.
- While this function is available from libzmq 3.2,
- pyzmq cannot parse monitor messages from libzmq prior to 4.0.
- .. versionadded: libzmq-3.2
- .. versionadded: 14.0
- Parameters
- ----------
- addr : str | None
- The inproc url used for monitoring. Passing None as
- the addr will cause an existing socket monitor to be
- deregistered.
- events : int
- default: zmq.EVENT_ALL
- The zmq event bitmask for which events will be sent to the monitor.
- """
- c_addr: p_char = NULL
- if addr is not None:
- _addr_bytes: bytes = _c_addr(addr)
- c_addr: p_char = _addr_bytes
- _check_closed(self)
- _check_rc(zmq_socket_monitor(self.handle, c_addr, events))
- def join(self, group: str | bytes):
- """
- Join a RADIO-DISH group
- Only for DISH sockets.
- libzmq and pyzmq must have been built with ZMQ_BUILD_DRAFT_API
- .. versionadded:: 17
- """
- _check_version((4, 2), "RADIO-DISH")
- if not zmq.DRAFT_API:
- raise RuntimeError("libzmq and pyzmq must be built with draft support")
- if isinstance(group, str):
- group = group.encode('utf8')
- c_group: bytes = group
- rc: C.int = zmq_join(self.handle, c_group)
- _check_rc(rc)
- def leave(self, group):
- """
- Leave a RADIO-DISH group
- Only for DISH sockets.
- libzmq and pyzmq must have been built with ZMQ_BUILD_DRAFT_API
- .. versionadded:: 17
- """
- _check_version((4, 2), "RADIO-DISH")
- if not zmq.DRAFT_API:
- raise RuntimeError("libzmq and pyzmq must be built with draft support")
- rc: C.int = zmq_leave(self.handle, group)
- _check_rc(rc)
- def send(self, data, flags=0, copy: bint = True, track: bint = False):
- """
- Send a single zmq message frame on this socket.
- This queues the message to be sent by the IO thread at a later time.
- With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full;
- otherwise, this waits until space is available.
- See :class:`Poller` for more general non-blocking I/O.
- Parameters
- ----------
- data : bytes, Frame, memoryview
- The content of the message. This can be any object that provides
- the Python buffer API (`memoryview(data)` can be called).
- flags : int
- 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
- copy : bool
- Should the message be sent in a copying or non-copying manner.
- track : bool
- Should the message be tracked for notification that ZMQ has
- finished with it? (ignored if copy=True)
- Returns
- -------
- None : if `copy` or not track
- None if message was sent, raises an exception otherwise.
- MessageTracker : if track and not copy
- a MessageTracker object, whose `done` property will
- be False until the send is completed.
- Raises
- ------
- TypeError
- If a unicode object is passed
- ValueError
- If `track=True`, but an untracked Frame is passed.
- ZMQError
- for any of the reasons zmq_msg_send might fail (including
- if NOBLOCK is set and the outgoing queue is full).
- """
- _check_closed(self)
- if isinstance(data, str):
- raise TypeError("unicode not allowed, use send_string")
- if copy and not isinstance(data, Frame):
- return _send_copy(self.handle, data, flags)
- else:
- if isinstance(data, Frame):
- if track and not data.tracker:
- raise ValueError('Not a tracked message')
- msg = data
- else:
- if self.copy_threshold:
- buf = memoryview(data)
- nbytes: size_t = buf.nbytes
- copy_threshold: size_t = self.copy_threshold
- # always copy messages smaller than copy_threshold
- if nbytes < copy_threshold:
- _send_copy(self.handle, buf, flags)
- return zmq._FINISHED_TRACKER
- msg = Frame(data, track=track, copy_threshold=self.copy_threshold)
- return _send_frame(self.handle, msg, flags)
- def recv(self, flags=0, copy: bint = True, track: bint = False):
- """
- Receive a message.
- With flags=NOBLOCK, this raises :class:`ZMQError` if no messages have
- arrived; otherwise, this waits until a message arrives.
- See :class:`Poller` for more general non-blocking I/O.
- Parameters
- ----------
- flags : int
- 0 or NOBLOCK.
- copy : bool
- Should the message be received in a copying or non-copying manner?
- If False a Frame object is returned, if True a string copy of
- message is returned.
- track : bool
- Should the message be tracked for notification that ZMQ has
- finished with it? (ignored if copy=True)
- Returns
- -------
- msg : bytes or Frame
- The received message frame. If `copy` is False, then it will be a Frame,
- otherwise it will be bytes.
- Raises
- ------
- ZMQError
- for any of the reasons zmq_msg_recv might fail (including if
- NOBLOCK is set and no new messages have arrived).
- """
- _check_closed(self)
- if copy:
- return _recv_copy(self.handle, flags)
- else:
- frame = _recv_frame(self.handle, flags, track)
- more: bint = False
- sz: size_t = sizeof(bint)
- _getsockopt(
- self.handle, ZMQ_RCVMORE, cast(p_void, address(more)), address(sz)
- )
- frame.more = more
- return frame
- def recv_into(self, buffer, /, *, nbytes=0, flags=0) -> C.int:
- """
- Receive up to nbytes bytes from the socket,
- storing the data into a buffer rather than allocating a new Frame.
- The next message frame can be discarded by receiving into an empty buffer::
- sock.recv_into(bytearray())
- .. versionadded:: 26.4
- Parameters
- ----------
- buffer : memoryview
- Any object providing the buffer interface (i.e. `memoryview(buffer)` works),
- where the memoryview is contiguous and writable.
- nbytes: int, default=0
- The maximum number of bytes to receive.
- If nbytes is not specified (or 0), receive up to the size available in the given buffer.
- If the next frame is larger than this, the frame will be truncated and message content discarded.
- flags: int, default=0
- See `socket.recv`
- Returns
- -------
- bytes_received: int
- Returns the number of bytes received.
- This is always the size of the received frame.
- If the returned `bytes_received` is larger than `nbytes` (or size of `buffer` if `nbytes=0`),
- the message has been truncated and the rest of the frame discarded.
- Truncated data cannot be recovered.
- Raises
- ------
- ZMQError
- for any of the reasons `zmq_recv` might fail.
- BufferError
- for invalid buffers, such as readonly or not contiguous.
- """
- c_flags: C.int = flags
- _check_closed(self)
- c_nbytes: size_t = nbytes
- if c_nbytes < 0:
- raise ValueError(f"{nbytes=} must be non-negative")
- view = memoryview(buffer)
- c_data = declare(pointer(C.void))
- view_bytes: C.size_t = _asbuffer(view, address(c_data), True)
- if nbytes == 0:
- c_nbytes = view_bytes
- elif c_nbytes > view_bytes:
- raise ValueError(f"{nbytes=} too big for memoryview of {view_bytes}B")
- # call zmq_recv, with retries
- while True:
- with nogil:
- rc: C.int = zmq_recv(self.handle, c_data, c_nbytes, c_flags)
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- continue
- else:
- return rc
- # inline socket methods
- @inline
- @cfunc
- def _check_closed(s: Socket):
- """raise ENOTSUP if socket is closed
- Does not do a deep check
- """
- if s._closed:
- raise ZMQError(ENOTSOCK)
- @inline
- @cfunc
- def _check_closed_deep(s: Socket) -> bint:
- """thorough check of whether the socket has been closed,
- even if by another entity (e.g. ctx.destroy).
- Only used by the `closed` property.
- returns True if closed, False otherwise
- """
- rc: C.int
- errno: C.int
- stype = declare(C.int)
- sz: size_t = sizeof(int)
- if s._closed:
- return True
- else:
- rc = zmq_getsockopt(
- s.handle, ZMQ_TYPE, cast(p_void, address(stype)), address(sz)
- )
- if rc < 0:
- errno = _zmq_errno()
- if errno == ENOTSOCK:
- s._closed = True
- return True
- elif errno == ZMQ_ETERM:
- # don't raise ETERM when checking if we're closed
- return False
- else:
- _check_rc(rc)
- return False
- @cfunc
- @inline
- def _recv_frame(handle: p_void, flags: C.int = 0, track: bint = False) -> Frame:
- """Receive a message in a non-copying manner and return a Frame."""
- rc: C.int
- msg = zmq.Frame(track=track)
- cmsg: Frame = msg
- while True:
- with nogil:
- rc = zmq_msg_recv(address(cmsg.zmq_msg), handle, flags)
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- continue
- else:
- break
- return msg
- @cfunc
- @inline
- def _recv_copy(handle: p_void, flags: C.int = 0):
- """Receive a message and return a copy"""
- zmq_msg = declare(zmq_msg_t)
- zmq_msg_p: pointer(zmq_msg_t) = address(zmq_msg)
- rc: C.int = zmq_msg_init(zmq_msg_p)
- _check_rc(rc)
- while True:
- with nogil:
- rc = zmq_msg_recv(zmq_msg_p, handle, flags)
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- continue
- except Exception:
- zmq_msg_close(zmq_msg_p) # ensure msg is closed on failure
- raise
- else:
- break
- msg_bytes = _copy_zmq_msg_bytes(zmq_msg_p)
- zmq_msg_close(zmq_msg_p)
- return msg_bytes
- @cfunc
- @inline
- def _send_frame(handle: p_void, msg: Frame, flags: C.int = 0):
- """Send a Frame on this socket in a non-copy manner."""
- rc: C.int
- msg_copy: Frame
- # Always copy so the original message isn't garbage collected.
- # This doesn't do a real copy, just a reference.
- msg_copy = msg.fast_copy()
- while True:
- with nogil:
- rc = zmq_msg_send(address(msg_copy.zmq_msg), handle, flags)
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- continue
- else:
- break
- return msg.tracker
- @cfunc
- @inline
- def _send_copy(handle: p_void, buf, flags: C.int = 0):
- """Send a message on this socket by copying its content."""
- rc: C.int
- msg = declare(zmq_msg_t)
- c_bytes = declare(p_void)
- # copy to c array:
- c_bytes_len = _asbuffer(buf, address(c_bytes))
- # Copy the msg before sending. This avoids any complications with
- # the GIL, etc.
- # If zmq_msg_init_* fails we must not call zmq_msg_close (Bus Error)
- rc = zmq_msg_init_size(address(msg), c_bytes_len)
- _check_rc(rc)
- while True:
- with nogil:
- memcpy(zmq_msg_data(address(msg)), c_bytes, zmq_msg_size(address(msg)))
- rc = zmq_msg_send(address(msg), handle, flags)
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- continue
- except Exception:
- zmq_msg_close(address(msg)) # close the unused msg
- raise # raise original exception
- else:
- rc = zmq_msg_close(address(msg))
- _check_rc(rc)
- break
- @cfunc
- @inline
- def _getsockopt(handle: p_void, option: C.int, optval: p_void, sz: pointer(size_t)):
- """getsockopt, retrying interrupted calls
- checks rc, raising ZMQError on failure.
- """
- rc: C.int = 0
- while True:
- rc = zmq_getsockopt(handle, option, optval, sz)
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- continue
- else:
- break
- @cfunc
- @inline
- def _setsockopt(handle: p_void, option: C.int, optval: p_void, sz: size_t):
- """setsockopt, retrying interrupted calls
- checks rc, raising ZMQError on failure.
- """
- rc: C.int = 0
- while True:
- rc = zmq_setsockopt(handle, option, optval, sz)
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- continue
- else:
- break
- # General utility functions
- def zmq_errno() -> C.int:
- """Return the integer errno of the most recent zmq error."""
- return _zmq_errno()
- def strerror(errno: C.int) -> str:
- """
- Return the error string given the error number.
- """
- str_e: bytes = zmq_strerror(errno)
- return str_e.decode("utf8", "replace")
- def zmq_version_info() -> tuple[int, int, int]:
- """Return the version of ZeroMQ itself as a 3-tuple of ints."""
- major: C.int = 0
- minor: C.int = 0
- patch: C.int = 0
- _zmq_version(address(major), address(minor), address(patch))
- return (major, minor, patch)
- def has(capability: str) -> bool:
- """Check for zmq capability by name (e.g. 'ipc', 'curve')
- .. versionadded:: libzmq-4.1
- .. versionadded:: 14.1
- """
- _check_version((4, 1), 'zmq.has')
- ccap: bytes = capability.encode('utf8')
- return bool(zmq_has(ccap))
- def curve_keypair() -> tuple[bytes, bytes]:
- """generate a Z85 key pair for use with zmq.CURVE security
- Requires libzmq (≥ 4.0) to have been built with CURVE support.
- .. versionadded:: libzmq-4.0
- .. versionadded:: 14.0
- Returns
- -------
- public: bytes
- The public key as 40 byte z85-encoded bytestring.
- private: bytes
- The private key as 40 byte z85-encoded bytestring.
- """
- rc: C.int
- public_key = declare(char[64])
- secret_key = declare(char[64])
- _check_version((4, 0), "curve_keypair")
- # see huge comment in libzmq/src/random.cpp
- # about threadsafety of random initialization
- rc = zmq_curve_keypair(public_key, secret_key)
- _check_rc(rc)
- return public_key, secret_key
- def curve_public(secret_key) -> bytes:
- """Compute the public key corresponding to a secret key for use
- with zmq.CURVE security
- Requires libzmq (≥ 4.2) to have been built with CURVE support.
- Parameters
- ----------
- private
- The private key as a 40 byte z85-encoded bytestring
- Returns
- -------
- bytes
- The public key as a 40 byte z85-encoded bytestring
- """
- if isinstance(secret_key, str):
- secret_key = secret_key.encode('utf8')
- if not len(secret_key) == 40:
- raise ValueError('secret key must be a 40 byte z85 encoded string')
- rc: C.int
- public_key = declare(char[64])
- c_secret_key: pointer(char) = secret_key
- _check_version((4, 2), "curve_public")
- # see huge comment in libzmq/src/random.cpp
- # about threadsafety of random initialization
- rc = zmq_curve_public(public_key, c_secret_key)
- _check_rc(rc)
- return public_key[:40]
- # polling
- def zmq_poll(sockets, timeout: C.int = -1):
- """zmq_poll(sockets, timeout=-1)
- Poll a set of 0MQ sockets, native file descs. or sockets.
- Parameters
- ----------
- sockets : list of tuples of (socket, flags)
- Each element of this list is a two-tuple containing a socket
- and a flags. The socket may be a 0MQ socket or any object with
- a ``fileno()`` method. The flags can be zmq.POLLIN (for detecting
- for incoming messages), zmq.POLLOUT (for detecting that send is OK)
- or zmq.POLLIN|zmq.POLLOUT for detecting both.
- timeout : int
- The number of milliseconds to poll for. Negative means no timeout.
- """
- rc: C.int
- i: C.int
- fileno: fd_t
- events: C.int
- pollitems: pointer(zmq_pollitem_t) = NULL
- nsockets: C.int = len(sockets)
- if nsockets == 0:
- return []
- pollitems = cast(pointer(zmq_pollitem_t), malloc(nsockets * sizeof(zmq_pollitem_t)))
- if pollitems == NULL:
- raise MemoryError("Could not allocate poll items")
- for i in range(nsockets):
- s, events = sockets[i]
- if isinstance(s, Socket):
- pollitems[i].socket = cast(Socket, s).handle
- pollitems[i].fd = 0
- pollitems[i].events = events
- pollitems[i].revents = 0
- elif isinstance(s, int):
- fileno = s
- pollitems[i].socket = NULL
- pollitems[i].fd = fileno
- pollitems[i].events = events
- pollitems[i].revents = 0
- elif hasattr(s, 'fileno'):
- try:
- fileno = int(s.fileno())
- except Exception:
- free(pollitems)
- raise ValueError('fileno() must return a valid integer fd')
- else:
- pollitems[i].socket = NULL
- pollitems[i].fd = fileno
- pollitems[i].events = events
- pollitems[i].revents = 0
- else:
- free(pollitems)
- raise TypeError(
- "Socket must be a 0MQ socket, an integer fd or have "
- f"a fileno() method: {s!r}"
- )
- ms_passed: C.int = 0
- tic: C.int
- try:
- while True:
- start: C.int = monotonic()
- with nogil:
- rc = zmq_poll_c(pollitems, nsockets, timeout)
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- if timeout > 0:
- tic = monotonic()
- ms_passed = int(1000 * (tic - start))
- if ms_passed < 0:
- # don't allow negative ms_passed,
- # which can happen on old Python versions without time.monotonic.
- warnings.warn(
- f"Negative elapsed time for interrupted poll: {ms_passed}."
- " Did the clock change?",
- RuntimeWarning,
- )
- # treat this case the same as no time passing,
- # since it should be rare and not happen twice in a row.
- ms_passed = 0
- timeout = max(0, timeout - ms_passed)
- continue
- else:
- break
- except Exception:
- free(pollitems)
- raise
- results = []
- for i in range(nsockets):
- revents = pollitems[i].revents
- # for compatibility with select.poll:
- # - only return sockets with non-zero status
- # - return the fd for plain sockets
- if revents > 0:
- if pollitems[i].socket != NULL:
- s = sockets[i][0]
- else:
- s = pollitems[i].fd
- results.append((s, revents))
- free(pollitems)
- return results
- def proxy(frontend: Socket, backend: Socket, capture: Socket = None):
- """
- Start a zeromq proxy (replacement for device).
- .. versionadded:: libzmq-3.2
- .. versionadded:: 13.0
- Parameters
- ----------
- frontend : Socket
- The Socket instance for the incoming traffic.
- backend : Socket
- The Socket instance for the outbound traffic.
- capture : Socket (optional)
- The Socket instance for capturing traffic.
- """
- rc: C.int = 0
- capture_handle: p_void
- if isinstance(capture, Socket):
- capture_handle = capture.handle
- else:
- capture_handle = NULL
- while True:
- with nogil:
- rc = zmq_proxy(frontend.handle, backend.handle, capture_handle)
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- continue
- else:
- break
- return rc
- def proxy_steerable(
- frontend: Socket,
- backend: Socket,
- capture: Socket = None,
- control: Socket = None,
- ):
- """
- Start a zeromq proxy with control flow.
- .. versionadded:: libzmq-4.1
- .. versionadded:: 18.0
- Parameters
- ----------
- frontend : Socket
- The Socket instance for the incoming traffic.
- backend : Socket
- The Socket instance for the outbound traffic.
- capture : Socket (optional)
- The Socket instance for capturing traffic.
- control : Socket (optional)
- The Socket instance for control flow.
- """
- rc: C.int = 0
- capture_handle: p_void
- if isinstance(capture, Socket):
- capture_handle = capture.handle
- else:
- capture_handle = NULL
- if isinstance(control, Socket):
- control_handle = control.handle
- else:
- control_handle = NULL
- while True:
- with nogil:
- rc = zmq_proxy_steerable(
- frontend.handle, backend.handle, capture_handle, control_handle
- )
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- continue
- else:
- break
- return rc
- # monitored queue - like proxy (predates libzmq proxy)
- # but supports ROUTER-ROUTER devices
- @cfunc
- @inline
- @nogil
- def _mq_relay(
- in_socket: p_void,
- out_socket: p_void,
- side_socket: p_void,
- msg: zmq_msg_t,
- side_msg: zmq_msg_t,
- id_msg: zmq_msg_t,
- swap_ids: bint,
- ) -> C.int:
- rc: C.int
- flags: C.int
- flagsz = declare(size_t)
- more = declare(int)
- flagsz = sizeof(int)
- if swap_ids: # both router, must send second identity first
- # recv two ids into msg, id_msg
- rc = zmq_msg_recv(address(msg), in_socket, 0)
- if rc < 0:
- return rc
- rc = zmq_msg_recv(address(id_msg), in_socket, 0)
- if rc < 0:
- return rc
- # send second id (id_msg) first
- # !!!! always send a copy before the original !!!!
- rc = zmq_msg_copy(address(side_msg), address(id_msg))
- if rc < 0:
- return rc
- rc = zmq_msg_send(address(side_msg), out_socket, ZMQ_SNDMORE)
- if rc < 0:
- return rc
- rc = zmq_msg_send(address(id_msg), side_socket, ZMQ_SNDMORE)
- if rc < 0:
- return rc
- # send first id (msg) second
- rc = zmq_msg_copy(address(side_msg), address(msg))
- if rc < 0:
- return rc
- rc = zmq_msg_send(address(side_msg), out_socket, ZMQ_SNDMORE)
- if rc < 0:
- return rc
- rc = zmq_msg_send(address(msg), side_socket, ZMQ_SNDMORE)
- if rc < 0:
- return rc
- while True:
- rc = zmq_msg_recv(address(msg), in_socket, 0)
- if rc < 0:
- return rc
- # assert (rc == 0)
- rc = zmq_getsockopt(in_socket, ZMQ_RCVMORE, address(more), address(flagsz))
- if rc < 0:
- return rc
- flags = 0
- if more:
- flags |= ZMQ_SNDMORE
- rc = zmq_msg_copy(address(side_msg), address(msg))
- if rc < 0:
- return rc
- if flags:
- rc = zmq_msg_send(address(side_msg), out_socket, flags)
- if rc < 0:
- return rc
- # only SNDMORE for side-socket
- rc = zmq_msg_send(address(msg), side_socket, ZMQ_SNDMORE)
- if rc < 0:
- return rc
- else:
- rc = zmq_msg_send(address(side_msg), out_socket, 0)
- if rc < 0:
- return rc
- rc = zmq_msg_send(address(msg), side_socket, 0)
- if rc < 0:
- return rc
- break
- return rc
- @cfunc
- @inline
- @nogil
- def _mq_inline(
- in_socket: p_void,
- out_socket: p_void,
- side_socket: p_void,
- in_msg_ptr: pointer(zmq_msg_t),
- out_msg_ptr: pointer(zmq_msg_t),
- swap_ids: bint,
- ) -> C.int:
- """
- inner C function for monitored_queue
- """
- msg: zmq_msg_t = declare(zmq_msg_t)
- rc: C.int = zmq_msg_init(address(msg))
- id_msg = declare(zmq_msg_t)
- rc = zmq_msg_init(address(id_msg))
- if rc < 0:
- return rc
- side_msg = declare(zmq_msg_t)
- rc = zmq_msg_init(address(side_msg))
- if rc < 0:
- return rc
- items = declare(zmq_pollitem_t[2])
- items[0].socket = in_socket
- items[0].events = ZMQ_POLLIN
- items[0].fd = items[0].revents = 0
- items[1].socket = out_socket
- items[1].events = ZMQ_POLLIN
- items[1].fd = items[1].revents = 0
- while True:
- # wait for the next message to process
- rc = zmq_poll_c(address(items[0]), 2, -1)
- if rc < 0:
- return rc
- if items[0].revents & ZMQ_POLLIN:
- # send in_prefix to side socket
- rc = zmq_msg_copy(address(side_msg), in_msg_ptr)
- if rc < 0:
- return rc
- rc = zmq_msg_send(address(side_msg), side_socket, ZMQ_SNDMORE)
- if rc < 0:
- return rc
- # relay the rest of the message
- rc = _mq_relay(
- in_socket, out_socket, side_socket, msg, side_msg, id_msg, swap_ids
- )
- if rc < 0:
- return rc
- if items[1].revents & ZMQ_POLLIN:
- # send out_prefix to side socket
- rc = zmq_msg_copy(address(side_msg), out_msg_ptr)
- if rc < 0:
- return rc
- rc = zmq_msg_send(address(side_msg), side_socket, ZMQ_SNDMORE)
- if rc < 0:
- return rc
- # relay the rest of the message
- rc = _mq_relay(
- out_socket, in_socket, side_socket, msg, side_msg, id_msg, swap_ids
- )
- if rc < 0:
- return rc
- return rc
- def monitored_queue(
- in_socket: Socket,
- out_socket: Socket,
- mon_socket: Socket,
- in_prefix: bytes = b'in',
- out_prefix: bytes = b'out',
- ):
- """
- Start a monitored queue device.
- A monitored queue is very similar to the zmq.proxy device (monitored queue came first).
- Differences from zmq.proxy:
- - monitored_queue supports both in and out being ROUTER sockets
- (via swapping IDENTITY prefixes).
- - monitor messages are prefixed, making in and out messages distinguishable.
- Parameters
- ----------
- in_socket : zmq.Socket
- One of the sockets to the Queue. Its messages will be prefixed with
- 'in'.
- out_socket : zmq.Socket
- One of the sockets to the Queue. Its messages will be prefixed with
- 'out'. The only difference between in/out socket is this prefix.
- mon_socket : zmq.Socket
- This socket sends out every message received by each of the others
- with an in/out prefix specifying which one it was.
- in_prefix : str
- Prefix added to broadcast messages from in_socket.
- out_prefix : str
- Prefix added to broadcast messages from out_socket.
- """
- ins: p_void = in_socket.handle
- outs: p_void = out_socket.handle
- mons: p_void = mon_socket.handle
- in_msg = declare(zmq_msg_t)
- out_msg = declare(zmq_msg_t)
- swap_ids: bint
- msg_c: p_void = NULL
- msg_c_len = declare(Py_ssize_t)
- rc: C.int
- # force swap_ids if both ROUTERs
- swap_ids = in_socket.type == ZMQ_ROUTER and out_socket.type == ZMQ_ROUTER
- # build zmq_msg objects from str prefixes
- msg_c_len = _asbuffer(in_prefix, address(msg_c))
- rc = zmq_msg_init_size(address(in_msg), msg_c_len)
- _check_rc(rc)
- memcpy(zmq_msg_data(address(in_msg)), msg_c, zmq_msg_size(address(in_msg)))
- msg_c_len = _asbuffer(out_prefix, address(msg_c))
- rc = zmq_msg_init_size(address(out_msg), msg_c_len)
- _check_rc(rc)
- while True:
- with nogil:
- memcpy(
- zmq_msg_data(address(out_msg)), msg_c, zmq_msg_size(address(out_msg))
- )
- rc = _mq_inline(
- ins, outs, mons, address(in_msg), address(out_msg), swap_ids
- )
- try:
- _check_rc(rc)
- except InterruptedSystemCall:
- continue
- else:
- break
- return rc
- __all__ = [
- 'IPC_PATH_MAX_LEN',
- 'PYZMQ_DRAFT_API',
- 'Context',
- 'Socket',
- 'Frame',
- 'has',
- 'curve_keypair',
- 'curve_public',
- 'zmq_version_info',
- 'zmq_errno',
- 'zmq_poll',
- 'strerror',
- 'proxy',
- 'proxy_steerable',
- ]
|