message.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. """Dummy Frame object"""
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. import errno
  5. from threading import Event
  6. import zmq
  7. import zmq.error
  8. from zmq.constants import ETERM
  9. from ._cffi import ffi
  10. from ._cffi import lib as C
  11. zmq_gc = None
  12. try:
  13. from __pypy__.bufferable import bufferable as maybe_bufferable
  14. except ImportError:
  15. maybe_bufferable = object
  16. def _content(obj):
  17. """Return content of obj as bytes"""
  18. if type(obj) is bytes:
  19. return obj
  20. if not isinstance(obj, memoryview):
  21. obj = memoryview(obj)
  22. return obj.tobytes()
  23. def _check_rc(rc):
  24. err = C.zmq_errno()
  25. if rc == -1:
  26. if err == errno.EINTR:
  27. raise zmq.error.InterrruptedSystemCall(err)
  28. elif err == errno.EAGAIN:
  29. raise zmq.error.Again(errno)
  30. elif err == ETERM:
  31. raise zmq.error.ContextTerminated(err)
  32. else:
  33. raise zmq.error.ZMQError(err)
  34. return 0
  35. class Frame(maybe_bufferable):
  36. _data = None
  37. tracker = None
  38. closed = False
  39. more = False
  40. _buffer = None
  41. _bytes = None
  42. _failed_init = False
  43. tracker_event = None
  44. zmq_msg = None
  45. def __init__(self, data=None, track=False, copy=None, copy_threshold=None):
  46. self._failed_init = True
  47. self.zmq_msg = ffi.cast('zmq_msg_t[1]', C.malloc(ffi.sizeof("zmq_msg_t")))
  48. # self.tracker should start finished
  49. # except in the case where we are sharing memory with libzmq
  50. if track:
  51. self.tracker = zmq._FINISHED_TRACKER
  52. if isinstance(data, str):
  53. raise TypeError(
  54. "Unicode strings are not allowed. Only: bytes, buffer interfaces."
  55. )
  56. if data is None:
  57. rc = C.zmq_msg_init(self.zmq_msg)
  58. _check_rc(rc)
  59. self._failed_init = False
  60. return
  61. self._data = data
  62. if type(data) is bytes:
  63. # avoid unnecessary copy on .bytes access
  64. self._bytes = data
  65. self._buffer = memoryview(data)
  66. if not self._buffer.contiguous:
  67. raise BufferError("memoryview: underlying buffer is not contiguous")
  68. # from_buffer silently copies if memory is not contiguous
  69. c_data = ffi.from_buffer(self._buffer)
  70. data_len_c = self._buffer.nbytes
  71. if copy is None:
  72. if copy_threshold and data_len_c < copy_threshold:
  73. copy = True
  74. else:
  75. copy = False
  76. if copy:
  77. # copy message data instead of sharing memory
  78. rc = C.zmq_msg_init_size(self.zmq_msg, data_len_c)
  79. _check_rc(rc)
  80. ffi.buffer(C.zmq_msg_data(self.zmq_msg), data_len_c)[:] = self._buffer
  81. self._failed_init = False
  82. return
  83. # Getting here means that we are doing a true zero-copy Frame,
  84. # where libzmq and Python are sharing memory.
  85. # Hook up garbage collection with MessageTracker and zmq_free_fn
  86. # Event and MessageTracker for monitoring when zmq is done with data:
  87. if track:
  88. evt = Event()
  89. self.tracker_event = evt
  90. self.tracker = zmq.MessageTracker(evt)
  91. # create the hint for zmq_free_fn
  92. # two pointers: the zmq_gc context and a message to be sent to the zmq_gc PULL socket
  93. # allows libzmq to signal to Python when it is done with Python-owned memory.
  94. global zmq_gc
  95. if zmq_gc is None:
  96. from zmq.utils.garbage import gc as zmq_gc
  97. # can't use ffi.new because it will be freed at the wrong time!
  98. hint = ffi.cast("zhint[1]", C.malloc(ffi.sizeof("zhint")))
  99. hint[0].id = zmq_gc.store(data, self.tracker_event)
  100. if not zmq_gc._push_mutex:
  101. zmq_gc._push_mutex = C.mutex_allocate()
  102. hint[0].mutex = ffi.cast("mutex_t*", zmq_gc._push_mutex)
  103. hint[0].sock = ffi.cast("void*", zmq_gc._push_socket.underlying)
  104. # calls zmq_wrap_msg_init_data with the C.free_python_msg callback
  105. rc = C.zmq_wrap_msg_init_data(
  106. self.zmq_msg,
  107. c_data,
  108. data_len_c,
  109. hint,
  110. )
  111. if rc != 0:
  112. C.free(hint)
  113. C.free(self.zmq_msg)
  114. _check_rc(rc)
  115. self._failed_init = False
  116. def __del__(self):
  117. if not self.closed and not self._failed_init:
  118. self.close()
  119. def close(self):
  120. if self.closed or self._failed_init or self.zmq_msg is None:
  121. return
  122. self.closed = True
  123. rc = C.zmq_msg_close(self.zmq_msg)
  124. C.free(self.zmq_msg)
  125. self.zmq_msg = None
  126. if rc != 0:
  127. _check_rc(rc)
  128. def _buffer_from_zmq_msg(self):
  129. """one-time extract buffer from zmq_msg
  130. for Frames created by recv
  131. """
  132. if self._data is None:
  133. self._data = ffi.buffer(
  134. C.zmq_msg_data(self.zmq_msg), C.zmq_msg_size(self.zmq_msg)
  135. )
  136. if self._buffer is None:
  137. self._buffer = memoryview(self._data)
  138. @property
  139. def buffer(self):
  140. if self._buffer is None:
  141. self._buffer_from_zmq_msg()
  142. return self._buffer
  143. @property
  144. def bytes(self):
  145. if self._bytes is None:
  146. self._bytes = self.buffer.tobytes()
  147. return self._bytes
  148. def __len__(self):
  149. return self.buffer.nbytes
  150. def __eq__(self, other):
  151. return self.bytes == _content(other)
  152. @property
  153. def done(self):
  154. return self.tracker.done()
  155. def __buffer__(self, flags):
  156. return self.buffer
  157. def __copy__(self):
  158. """Create a shallow copy of the message.
  159. This does not copy the contents of the Frame, just the pointer.
  160. This will increment the 0MQ ref count of the message, but not
  161. the ref count of the Python object. That is only done once when
  162. the Python is first turned into a 0MQ message.
  163. """
  164. return self.fast_copy()
  165. def fast_copy(self):
  166. """Fast shallow copy of the Frame.
  167. Does not copy underlying data.
  168. """
  169. new_msg = Frame()
  170. # This does not copy the contents, but just increases the ref-count
  171. # of the zmq_msg by one.
  172. C.zmq_msg_copy(new_msg.zmq_msg, self.zmq_msg)
  173. # Copy the ref to underlying data
  174. new_msg._data = self._data
  175. new_msg._buffer = self._buffer
  176. # Frame copies share the tracker and tracker_event
  177. new_msg.tracker_event = self.tracker_event
  178. new_msg.tracker = self.tracker
  179. return new_msg
  180. Message = Frame
  181. __all__ = ['Frame', 'Message']