| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- """Dummy Frame object"""
- # Copyright (C) PyZMQ Developers
- # Distributed under the terms of the Modified BSD License.
- import errno
- from threading import Event
- import zmq
- import zmq.error
- from zmq.constants import ETERM
- from ._cffi import ffi
- from ._cffi import lib as C
- zmq_gc = None
- try:
- from __pypy__.bufferable import bufferable as maybe_bufferable
- except ImportError:
- maybe_bufferable = object
- def _content(obj):
- """Return content of obj as bytes"""
- if type(obj) is bytes:
- return obj
- if not isinstance(obj, memoryview):
- obj = memoryview(obj)
- return obj.tobytes()
- def _check_rc(rc):
- err = C.zmq_errno()
- if rc == -1:
- if err == errno.EINTR:
- raise zmq.error.InterrruptedSystemCall(err)
- elif err == errno.EAGAIN:
- raise zmq.error.Again(errno)
- elif err == ETERM:
- raise zmq.error.ContextTerminated(err)
- else:
- raise zmq.error.ZMQError(err)
- return 0
- class Frame(maybe_bufferable):
- _data = None
- tracker = None
- closed = False
- more = False
- _buffer = None
- _bytes = None
- _failed_init = False
- tracker_event = None
- zmq_msg = None
- def __init__(self, data=None, track=False, copy=None, copy_threshold=None):
- self._failed_init = True
- self.zmq_msg = ffi.cast('zmq_msg_t[1]', C.malloc(ffi.sizeof("zmq_msg_t")))
- # self.tracker should start finished
- # except in the case where we are sharing memory with libzmq
- if track:
- self.tracker = zmq._FINISHED_TRACKER
- if isinstance(data, str):
- raise TypeError(
- "Unicode strings are not allowed. Only: bytes, buffer interfaces."
- )
- if data is None:
- rc = C.zmq_msg_init(self.zmq_msg)
- _check_rc(rc)
- self._failed_init = False
- return
- self._data = data
- if type(data) is bytes:
- # avoid unnecessary copy on .bytes access
- self._bytes = data
- self._buffer = memoryview(data)
- if not self._buffer.contiguous:
- raise BufferError("memoryview: underlying buffer is not contiguous")
- # from_buffer silently copies if memory is not contiguous
- c_data = ffi.from_buffer(self._buffer)
- data_len_c = self._buffer.nbytes
- if copy is None:
- if copy_threshold and data_len_c < copy_threshold:
- copy = True
- else:
- copy = False
- if copy:
- # copy message data instead of sharing memory
- rc = C.zmq_msg_init_size(self.zmq_msg, data_len_c)
- _check_rc(rc)
- ffi.buffer(C.zmq_msg_data(self.zmq_msg), data_len_c)[:] = self._buffer
- self._failed_init = False
- return
- # Getting here means that we are doing a true zero-copy Frame,
- # where libzmq and Python are sharing memory.
- # Hook up garbage collection with MessageTracker and zmq_free_fn
- # Event and MessageTracker for monitoring when zmq is done with data:
- if track:
- evt = Event()
- self.tracker_event = evt
- self.tracker = zmq.MessageTracker(evt)
- # create the hint for zmq_free_fn
- # two pointers: the zmq_gc context and a message to be sent to the zmq_gc PULL socket
- # allows libzmq to signal to Python when it is done with Python-owned memory.
- global zmq_gc
- if zmq_gc is None:
- from zmq.utils.garbage import gc as zmq_gc
- # can't use ffi.new because it will be freed at the wrong time!
- hint = ffi.cast("zhint[1]", C.malloc(ffi.sizeof("zhint")))
- hint[0].id = zmq_gc.store(data, self.tracker_event)
- if not zmq_gc._push_mutex:
- zmq_gc._push_mutex = C.mutex_allocate()
- hint[0].mutex = ffi.cast("mutex_t*", zmq_gc._push_mutex)
- hint[0].sock = ffi.cast("void*", zmq_gc._push_socket.underlying)
- # calls zmq_wrap_msg_init_data with the C.free_python_msg callback
- rc = C.zmq_wrap_msg_init_data(
- self.zmq_msg,
- c_data,
- data_len_c,
- hint,
- )
- if rc != 0:
- C.free(hint)
- C.free(self.zmq_msg)
- _check_rc(rc)
- self._failed_init = False
- def __del__(self):
- if not self.closed and not self._failed_init:
- self.close()
- def close(self):
- if self.closed or self._failed_init or self.zmq_msg is None:
- return
- self.closed = True
- rc = C.zmq_msg_close(self.zmq_msg)
- C.free(self.zmq_msg)
- self.zmq_msg = None
- if rc != 0:
- _check_rc(rc)
- def _buffer_from_zmq_msg(self):
- """one-time extract buffer from zmq_msg
- for Frames created by recv
- """
- if self._data is None:
- self._data = ffi.buffer(
- C.zmq_msg_data(self.zmq_msg), C.zmq_msg_size(self.zmq_msg)
- )
- if self._buffer is None:
- self._buffer = memoryview(self._data)
- @property
- def buffer(self):
- if self._buffer is None:
- self._buffer_from_zmq_msg()
- return self._buffer
- @property
- def bytes(self):
- if self._bytes is None:
- self._bytes = self.buffer.tobytes()
- return self._bytes
- def __len__(self):
- return self.buffer.nbytes
- def __eq__(self, other):
- return self.bytes == _content(other)
- @property
- def done(self):
- return self.tracker.done()
- def __buffer__(self, flags):
- return self.buffer
- def __copy__(self):
- """Create a shallow copy of the message.
- This does not copy the contents of the Frame, just the pointer.
- This will increment the 0MQ ref count of the message, but not
- the ref count of the Python object. That is only done once when
- the Python is first turned into a 0MQ message.
- """
- return self.fast_copy()
- def fast_copy(self):
- """Fast shallow copy of the Frame.
- Does not copy underlying data.
- """
- new_msg = Frame()
- # This does not copy the contents, but just increases the ref-count
- # of the zmq_msg by one.
- C.zmq_msg_copy(new_msg.zmq_msg, self.zmq_msg)
- # Copy the ref to underlying data
- new_msg._data = self._data
- new_msg._buffer = self._buffer
- # Frame copies share the tracker and tracker_event
- new_msg.tracker_event = self.tracker_event
- new_msg.tracker = self.tracker
- return new_msg
- Message = Frame
- __all__ = ['Frame', 'Message']
|