udp_jpeg_receiver.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. """
  2. UDP JPEG receiver for the OrangePi framed protocol.
  3. Packet format:
  4. 16-byte header + JPEG payload fragment
  5. Header layout (little-endian):
  6. uint32 magic = 0x57494649 ("WIFI")
  7. uint32 frame_id
  8. uint16 seq
  9. uint16 total
  10. uint16 data_len
  11. uint16 reserved
  12. """
  13. import socket
  14. import struct
  15. import threading
  16. import queue
  17. import cv2
  18. import numpy as np
  19. import time
  20. class UDPJPEGReceiver:
  21. """UDP JPEG receiver for frame/fragment protocol."""
  22. MAGIC = 0x57494649
  23. HEADER_FMT = "<I I H H H H"
  24. HEADER_SIZE = struct.calcsize(HEADER_FMT)
  25. MAX_PACKET_SIZE = 2048
  26. def __init__(self, host="0.0.0.0", port=5000, timeout=0.5, max_frame_buffers=8):
  27. self.host = host
  28. self.port = port
  29. self.timeout = timeout
  30. self.max_frame_buffers = max_frame_buffers
  31. self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  32. try:
  33. self.socket.bind((host, port))
  34. except OSError as err:
  35. if getattr(err, "winerror", None) == 10049:
  36. raise OSError(
  37. f"Cannot bind to {host}:{port}. "
  38. "This address is not a local address. "
  39. "Use '0.0.0.0' or your local IP address."
  40. ) from err
  41. raise
  42. self.socket.settimeout(0.1)
  43. # frame_id -> frame state
  44. self.frame_buffers = {}
  45. self.last_completed_frame_id = -1
  46. self.image_queue = queue.Queue(maxsize=2)
  47. self._latest_jpeg = None
  48. self._latest_lock = threading.Lock()
  49. self._latest_sender = None
  50. self._sender_lock = threading.Lock()
  51. self.running = False
  52. self.receive_thread = None
  53. def _new_frame_state(self, total):
  54. return {
  55. "total": total,
  56. "parts": [None] * total,
  57. "received_count": 0,
  58. "created_at": time.time(),
  59. "last_update_at": time.time(),
  60. }
  61. def _cleanup_expired_frames(self):
  62. now = time.time()
  63. expired = []
  64. for frame_id, state in self.frame_buffers.items():
  65. if (now - state["last_update_at"]) > self.timeout:
  66. expired.append(frame_id)
  67. for frame_id in expired:
  68. del self.frame_buffers[frame_id]
  69. # Bound memory usage for pathological packet loss/OOO conditions.
  70. if len(self.frame_buffers) > self.max_frame_buffers:
  71. oldest = sorted(
  72. self.frame_buffers.items(),
  73. key=lambda item: item[1]["created_at"],
  74. )[: len(self.frame_buffers) - self.max_frame_buffers]
  75. for frame_id, _ in oldest:
  76. del self.frame_buffers[frame_id]
  77. def _receive_loop(self):
  78. while self.running:
  79. try:
  80. data, _addr = self.socket.recvfrom(self.MAX_PACKET_SIZE)
  81. with self._sender_lock:
  82. self._latest_sender = _addr
  83. self._process_packet(data)
  84. self._cleanup_expired_frames()
  85. except socket.timeout:
  86. self._cleanup_expired_frames()
  87. continue
  88. except Exception as exc:
  89. print(f"[UDP] Receive error: {exc}")
  90. break
  91. def _process_packet(self, packet: bytes):
  92. if len(packet) < self.HEADER_SIZE:
  93. return
  94. header = struct.unpack(self.HEADER_FMT, packet[: self.HEADER_SIZE])
  95. magic, frame_id, seq, total, data_len, _reserved = header
  96. if magic != self.MAGIC:
  97. return
  98. if total == 0:
  99. return
  100. if seq >= total:
  101. return
  102. payload_end = self.HEADER_SIZE + data_len
  103. if payload_end > len(packet):
  104. # Malformed/truncated packet
  105. return
  106. payload = packet[self.HEADER_SIZE : payload_end]
  107. # Drop very old out-of-order packets once we already completed newer frames.
  108. if frame_id < self.last_completed_frame_id - 2:
  109. return
  110. state = self.frame_buffers.get(frame_id)
  111. if state is None:
  112. state = self._new_frame_state(total)
  113. self.frame_buffers[frame_id] = state
  114. elif state["total"] != total:
  115. # Inconsistent metadata for same frame_id; reset this frame.
  116. state = self._new_frame_state(total)
  117. self.frame_buffers[frame_id] = state
  118. # Ignore duplicate fragments.
  119. if state["parts"][seq] is None:
  120. state["parts"][seq] = payload
  121. state["received_count"] += 1
  122. state["last_update_at"] = time.time()
  123. if state["received_count"] == state["total"]:
  124. jpeg_data = b"".join(state["parts"])
  125. del self.frame_buffers[frame_id]
  126. self.last_completed_frame_id = max(self.last_completed_frame_id, frame_id)
  127. self._enqueue_if_valid_jpeg(jpeg_data)
  128. def _enqueue_if_valid_jpeg(self, jpeg_data: bytes):
  129. if len(jpeg_data) < 4:
  130. return
  131. if jpeg_data[:2] != b"\xFF\xD8" or jpeg_data[-2:] != b"\xFF\xD9":
  132. return
  133. # decode check: drops corrupted reassemblies quickly
  134. arr = np.frombuffer(jpeg_data, dtype=np.uint8)
  135. test = cv2.imdecode(arr, cv2.IMREAD_COLOR)
  136. if test is None:
  137. return
  138. if self.image_queue.full():
  139. try:
  140. self.image_queue.get_nowait()
  141. except queue.Empty:
  142. pass
  143. self.image_queue.put(jpeg_data)
  144. with self._latest_lock:
  145. self._latest_jpeg = jpeg_data
  146. def start(self):
  147. self.running = True
  148. self.receive_thread = threading.Thread(
  149. target=self._receive_loop,
  150. daemon=True,
  151. name="UDPJPEGReceiver",
  152. )
  153. self.receive_thread.start()
  154. print(f"[UDP] Receiver started on {self.host}:{self.port}")
  155. def stop(self):
  156. self.running = False
  157. if self.receive_thread:
  158. self.receive_thread.join(timeout=1.0)
  159. self.socket.close()
  160. print("[UDP] Receiver stopped")
  161. def get_image(self, timeout=0.1):
  162. try:
  163. jpeg_data = self.image_queue.get(timeout=timeout)
  164. image_array = np.frombuffer(jpeg_data, dtype=np.uint8)
  165. frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
  166. if frame is None:
  167. return None
  168. return frame
  169. except queue.Empty:
  170. return None
  171. except Exception as exc:
  172. print(f"[UDP] Error decoding JPEG: {exc}")
  173. return None
  174. def get_jpeg(self, timeout=0.1):
  175. """直接获取重组后的 JPEG 字节(不解码)。用于转发给 Unity 显示端。"""
  176. try:
  177. jpeg_data = self.image_queue.get(timeout=timeout)
  178. return jpeg_data
  179. except queue.Empty:
  180. return None
  181. except Exception as exc:
  182. print(f"[UDP] Error getting JPEG: {exc}")
  183. return None
  184. def get_latest_jpeg(self):
  185. """获取最近一次重组成功的 JPEG(不出队)。用于“推理 + 转发”共享同一帧数据。"""
  186. with self._latest_lock:
  187. return self._latest_jpeg
  188. def get_latest_sender(self):
  189. """获取最近一次收到包的发送端 (ip, port)。"""
  190. with self._sender_lock:
  191. return self._latest_sender