| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- """
- UDP JPEG receiver for the OrangePi framed protocol.
- Packet format:
- 16-byte header + JPEG payload fragment
- Header layout (little-endian):
- uint32 magic = 0x57494649 ("WIFI")
- uint32 frame_id
- uint16 seq
- uint16 total
- uint16 data_len
- uint16 reserved
- """
- import socket
- import struct
- import threading
- import queue
- import cv2
- import numpy as np
- import time
- class UDPJPEGReceiver:
- """UDP JPEG receiver for frame/fragment protocol."""
- MAGIC = 0x57494649
- HEADER_FMT = "<I I H H H H"
- HEADER_SIZE = struct.calcsize(HEADER_FMT)
- MAX_PACKET_SIZE = 2048
- def __init__(self, host="0.0.0.0", port=5000, timeout=0.5, max_frame_buffers=8):
- self.host = host
- self.port = port
- self.timeout = timeout
- self.max_frame_buffers = max_frame_buffers
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- try:
- self.socket.bind((host, port))
- except OSError as err:
- if getattr(err, "winerror", None) == 10049:
- raise OSError(
- f"Cannot bind to {host}:{port}. "
- "This address is not a local address. "
- "Use '0.0.0.0' or your local IP address."
- ) from err
- raise
- self.socket.settimeout(0.1)
- # frame_id -> frame state
- self.frame_buffers = {}
- self.last_completed_frame_id = -1
- self.image_queue = queue.Queue(maxsize=2)
- self._latest_jpeg = None
- self._latest_lock = threading.Lock()
- self._latest_sender = None
- self._sender_lock = threading.Lock()
- self.running = False
- self.receive_thread = None
- def _new_frame_state(self, total):
- return {
- "total": total,
- "parts": [None] * total,
- "received_count": 0,
- "created_at": time.time(),
- "last_update_at": time.time(),
- }
- def _cleanup_expired_frames(self):
- now = time.time()
- expired = []
- for frame_id, state in self.frame_buffers.items():
- if (now - state["last_update_at"]) > self.timeout:
- expired.append(frame_id)
- for frame_id in expired:
- del self.frame_buffers[frame_id]
- # Bound memory usage for pathological packet loss/OOO conditions.
- if len(self.frame_buffers) > self.max_frame_buffers:
- oldest = sorted(
- self.frame_buffers.items(),
- key=lambda item: item[1]["created_at"],
- )[: len(self.frame_buffers) - self.max_frame_buffers]
- for frame_id, _ in oldest:
- del self.frame_buffers[frame_id]
- def _receive_loop(self):
- while self.running:
- try:
- data, _addr = self.socket.recvfrom(self.MAX_PACKET_SIZE)
- with self._sender_lock:
- self._latest_sender = _addr
- self._process_packet(data)
- self._cleanup_expired_frames()
- except socket.timeout:
- self._cleanup_expired_frames()
- continue
- except Exception as exc:
- print(f"[UDP] Receive error: {exc}")
- break
- def _process_packet(self, packet: bytes):
- if len(packet) < self.HEADER_SIZE:
- return
- header = struct.unpack(self.HEADER_FMT, packet[: self.HEADER_SIZE])
- magic, frame_id, seq, total, data_len, _reserved = header
- if magic != self.MAGIC:
- return
- if total == 0:
- return
- if seq >= total:
- return
- payload_end = self.HEADER_SIZE + data_len
- if payload_end > len(packet):
- # Malformed/truncated packet
- return
- payload = packet[self.HEADER_SIZE : payload_end]
- # Drop very old out-of-order packets once we already completed newer frames.
- if frame_id < self.last_completed_frame_id - 2:
- return
- state = self.frame_buffers.get(frame_id)
- if state is None:
- state = self._new_frame_state(total)
- self.frame_buffers[frame_id] = state
- elif state["total"] != total:
- # Inconsistent metadata for same frame_id; reset this frame.
- state = self._new_frame_state(total)
- self.frame_buffers[frame_id] = state
- # Ignore duplicate fragments.
- if state["parts"][seq] is None:
- state["parts"][seq] = payload
- state["received_count"] += 1
- state["last_update_at"] = time.time()
- if state["received_count"] == state["total"]:
- jpeg_data = b"".join(state["parts"])
- del self.frame_buffers[frame_id]
- self.last_completed_frame_id = max(self.last_completed_frame_id, frame_id)
- self._enqueue_if_valid_jpeg(jpeg_data)
- def _enqueue_if_valid_jpeg(self, jpeg_data: bytes):
- if len(jpeg_data) < 4:
- return
- if jpeg_data[:2] != b"\xFF\xD8" or jpeg_data[-2:] != b"\xFF\xD9":
- return
- # decode check: drops corrupted reassemblies quickly
- arr = np.frombuffer(jpeg_data, dtype=np.uint8)
- test = cv2.imdecode(arr, cv2.IMREAD_COLOR)
- if test is None:
- return
- if self.image_queue.full():
- try:
- self.image_queue.get_nowait()
- except queue.Empty:
- pass
- self.image_queue.put(jpeg_data)
- with self._latest_lock:
- self._latest_jpeg = jpeg_data
- def start(self):
- self.running = True
- self.receive_thread = threading.Thread(
- target=self._receive_loop,
- daemon=True,
- name="UDPJPEGReceiver",
- )
- self.receive_thread.start()
- print(f"[UDP] Receiver started on {self.host}:{self.port}")
- def stop(self):
- self.running = False
- if self.receive_thread:
- self.receive_thread.join(timeout=1.0)
- self.socket.close()
- print("[UDP] Receiver stopped")
- def get_image(self, timeout=0.1):
- try:
- jpeg_data = self.image_queue.get(timeout=timeout)
- image_array = np.frombuffer(jpeg_data, dtype=np.uint8)
- frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
- if frame is None:
- return None
- return frame
- except queue.Empty:
- return None
- except Exception as exc:
- print(f"[UDP] Error decoding JPEG: {exc}")
- return None
- def get_jpeg(self, timeout=0.1):
- """直接获取重组后的 JPEG 字节(不解码)。用于转发给 Unity 显示端。"""
- try:
- jpeg_data = self.image_queue.get(timeout=timeout)
- return jpeg_data
- except queue.Empty:
- return None
- except Exception as exc:
- print(f"[UDP] Error getting JPEG: {exc}")
- return None
- def get_latest_jpeg(self):
- """获取最近一次重组成功的 JPEG(不出队)。用于“推理 + 转发”共享同一帧数据。"""
- with self._latest_lock:
- return self._latest_jpeg
- def get_latest_sender(self):
- """获取最近一次收到包的发送端 (ip, port)。"""
- with self._sender_lock:
- return self._latest_sender
|