using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Threading; using UnityEngine; using Stopwatch = System.Diagnostics.Stopwatch; namespace LightGlue.Unity.Roma { /// /// 按 orangepizero2w/UDP_PROTOCOL.md 的 FrameHeader(16B) 重组 JPEG 帧。 /// 用于接收 Python->Unity 的转发视频流(或直接接收 OrangePi WiFi 图传)。 /// /// Header(小端): /// magic(uint32)="WIFI"(0x57494649), frame_id(uint32), seq(uint16), total(uint16), data_len(uint16), reserved(uint16) /// public sealed class RomaFrameHeaderJpegReceiver : IDisposable { private const uint Magic = 0x57494649; // "WIFI" little-endian private const int HeaderSize = 16; private static readonly double TickToMs = 1000.0 / Stopwatch.Frequency; private readonly string _bindIp; private readonly int _port; private readonly int _maxQueueSize; private readonly int _maxInFlightFrames; private readonly int _frameTimeoutMs; private UdpClient _client; private Thread _thread; private volatile bool _running; private readonly ConcurrentQueue _jpegQueue = new ConcurrentQueue(); private int _queuedCount; private sealed class Assembly { public readonly int Total; public readonly byte[][] Parts; public int Received; public long LastTick; public Assembly(int total, long nowTick) { Total = total; Parts = new byte[total][]; Received = 0; LastTick = nowTick; } } private readonly Dictionary _assemblies = new Dictionary(); private readonly object _asmLock = new object(); public RomaFrameHeaderJpegReceiver( string bindIp, int port, int maxQueueSize = 2, int maxInFlightFrames = 8, int frameTimeoutMs = 500) { _bindIp = string.IsNullOrWhiteSpace(bindIp) ? "0.0.0.0" : bindIp; _port = port; _maxQueueSize = Mathf.Max(1, maxQueueSize); _maxInFlightFrames = Mathf.Clamp(maxInFlightFrames, 1, 64); _frameTimeoutMs = Mathf.Clamp(frameTimeoutMs, 50, 5000); } public void Start() { if (_running) return; var ip = IPAddress.Parse(_bindIp); _client = new UdpClient(new IPEndPoint(ip, _port)); _client.Client.ReceiveTimeout = 100; _running = true; _thread = new Thread(ReceiveLoop) { IsBackground = true, Name = "RomaFrameHeaderJpegReceiver" }; _thread.Start(); Debug.Log($"[RomaFH] Receiver started on {_bindIp}:{_port}"); } public void Stop() { _running = false; try { _client?.Close(); } catch { /* ignore */ } try { _client?.Dispose(); } catch { /* ignore */ } _client = null; if (_thread != null && _thread.IsAlive) { if (!_thread.Join(1000)) { try { _thread.Interrupt(); } catch { /* ignore */ } } } _thread = null; lock (_asmLock) { _assemblies.Clear(); } while (_jpegQueue.TryDequeue(out _)) { Interlocked.Decrement(ref _queuedCount); } } public bool TryDequeueJpeg(out byte[] jpeg) { if (_jpegQueue.TryDequeue(out jpeg)) { Interlocked.Decrement(ref _queuedCount); return true; } return false; } private void ReceiveLoop() { var remote = new IPEndPoint(IPAddress.Any, 0); while (_running) { try { byte[] data = _client.Receive(ref remote); if (data == null || data.Length < HeaderSize) continue; ProcessPacket(data); CleanupTimeouts(); } catch (SocketException) { CleanupTimeouts(); continue; } catch (ObjectDisposedException) { break; } catch (ThreadInterruptedException) { break; } catch (Exception ex) { Debug.LogWarning($"[RomaFH] Receive error: {ex.Message}"); } } } private void ProcessPacket(byte[] data) { uint magic = ReadU32LE(data, 0); if (magic != Magic) return; uint frameId = ReadU32LE(data, 4); ushort seq = ReadU16LE(data, 8); ushort total = ReadU16LE(data, 10); ushort dataLen = ReadU16LE(data, 12); if (total == 0 || seq >= total) return; if (HeaderSize + dataLen > data.Length) return; long nowTick = Stopwatch.GetTimestamp(); lock (_asmLock) { if (!_assemblies.TryGetValue(frameId, out Assembly asm)) { // 控制在途帧数量,避免网络抖动时占用过多内存 if (_assemblies.Count >= _maxInFlightFrames) { // 丢弃最旧的一帧 uint oldestKey = 0; long oldestTick = long.MaxValue; foreach (var kv in _assemblies) { if (kv.Value.LastTick < oldestTick) { oldestTick = kv.Value.LastTick; oldestKey = kv.Key; } } if (oldestTick != long.MaxValue) _assemblies.Remove(oldestKey); } asm = new Assembly(total, nowTick); _assemblies[frameId] = asm; } // total 变化时:直接重置该帧(避免协议异常导致越界) if (asm.Total != total) { asm = new Assembly(total, nowTick); _assemblies[frameId] = asm; } asm.LastTick = nowTick; if (asm.Parts[seq] == null) { byte[] payload = new byte[dataLen]; Buffer.BlockCopy(data, HeaderSize, payload, 0, dataLen); asm.Parts[seq] = payload; asm.Received++; } if (asm.Received == asm.Total) { byte[] jpeg = JoinParts(asm.Parts); _assemblies.Remove(frameId); EnqueueLatest(jpeg); } } } private void CleanupTimeouts() { long now = Stopwatch.GetTimestamp(); lock (_asmLock) { if (_assemblies.Count == 0) return; List toRemove = null; foreach (var kv in _assemblies) { double elapsedMs = (now - kv.Value.LastTick) * TickToMs; if (elapsedMs > _frameTimeoutMs) { toRemove ??= new List(4); toRemove.Add(kv.Key); } } if (toRemove != null) { foreach (uint k in toRemove) _assemblies.Remove(k); } } } private void EnqueueLatest(byte[] jpeg) { if (jpeg == null || jpeg.Length < 4) return; while (Volatile.Read(ref _queuedCount) >= _maxQueueSize && _jpegQueue.TryDequeue(out _)) Interlocked.Decrement(ref _queuedCount); _jpegQueue.Enqueue(jpeg); Interlocked.Increment(ref _queuedCount); } private static byte[] JoinParts(byte[][] parts) { int totalLen = 0; for (int i = 0; i < parts.Length; i++) totalLen += parts[i]?.Length ?? 0; byte[] joined = new byte[totalLen]; int offset = 0; for (int i = 0; i < parts.Length; i++) { byte[] p = parts[i]; if (p == null || p.Length == 0) continue; Buffer.BlockCopy(p, 0, joined, offset, p.Length); offset += p.Length; } return joined; } private static ushort ReadU16LE(byte[] d, int o) { return (ushort)(d[o] | (d[o + 1] << 8)); } private static uint ReadU32LE(byte[] d, int o) { return (uint)(d[o] | (d[o + 1] << 8) | (d[o + 2] << 16) | (d[o + 3] << 24)); } public void Dispose() { Stop(); } } }