| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Net;
- using System.Net.Sockets;
- using System.Threading;
- using UnityEngine;
- using Stopwatch = System.Diagnostics.Stopwatch;
- namespace LightGlue.Unity.Roma
- {
- /// <summary>
- /// 按 orangepizero2w/UDP_PROTOCOL.md 的 FrameHeader(16B) 重组 JPEG 帧。
- /// 用于接收 Python->Unity 的转发视频流(或直接接收 OrangePi WiFi 图传)。
- ///
- /// Header(小端):
- /// magic(uint32)="WIFI"(0x57494649), frame_id(uint32), seq(uint16), total(uint16), data_len(uint16), reserved(uint16)
- /// </summary>
- public sealed class RomaFrameHeaderJpegReceiver : IDisposable
- {
- private const uint Magic = 0x57494649; // "WIFI" little-endian
- private const int HeaderSize = 16;
- private static readonly double TickToMs = 1000.0 / Stopwatch.Frequency;
- private readonly string _bindIp;
- private readonly int _port;
- private readonly int _maxQueueSize;
- private readonly int _maxInFlightFrames;
- private readonly int _frameTimeoutMs;
- private UdpClient _client;
- private Thread _thread;
- private volatile bool _running;
- private readonly ConcurrentQueue<byte[]> _jpegQueue = new ConcurrentQueue<byte[]>();
- private int _queuedCount;
- private sealed class Assembly
- {
- public readonly int Total;
- public readonly byte[][] Parts;
- public int Received;
- public long LastTick;
- public Assembly(int total, long nowTick)
- {
- Total = total;
- Parts = new byte[total][];
- Received = 0;
- LastTick = nowTick;
- }
- }
- private readonly Dictionary<uint, Assembly> _assemblies = new Dictionary<uint, Assembly>();
- private readonly object _asmLock = new object();
- public RomaFrameHeaderJpegReceiver(
- string bindIp,
- int port,
- int maxQueueSize = 2,
- int maxInFlightFrames = 8,
- int frameTimeoutMs = 500)
- {
- _bindIp = string.IsNullOrWhiteSpace(bindIp) ? "0.0.0.0" : bindIp;
- _port = port;
- _maxQueueSize = Mathf.Max(1, maxQueueSize);
- _maxInFlightFrames = Mathf.Clamp(maxInFlightFrames, 1, 64);
- _frameTimeoutMs = Mathf.Clamp(frameTimeoutMs, 50, 5000);
- }
- public void Start()
- {
- if (_running) return;
- var ip = IPAddress.Parse(_bindIp);
- _client = new UdpClient(new IPEndPoint(ip, _port));
- _client.Client.ReceiveTimeout = 100;
- _running = true;
- _thread = new Thread(ReceiveLoop)
- {
- IsBackground = true,
- Name = "RomaFrameHeaderJpegReceiver"
- };
- _thread.Start();
- Debug.Log($"[RomaFH] Receiver started on {_bindIp}:{_port}");
- }
- public void Stop()
- {
- _running = false;
- try { _client?.Close(); } catch { /* ignore */ }
- try { _client?.Dispose(); } catch { /* ignore */ }
- _client = null;
- if (_thread != null && _thread.IsAlive)
- {
- if (!_thread.Join(1000))
- {
- try { _thread.Interrupt(); } catch { /* ignore */ }
- }
- }
- _thread = null;
- lock (_asmLock) { _assemblies.Clear(); }
- while (_jpegQueue.TryDequeue(out _)) { Interlocked.Decrement(ref _queuedCount); }
- }
- public bool TryDequeueJpeg(out byte[] jpeg)
- {
- if (_jpegQueue.TryDequeue(out jpeg))
- {
- Interlocked.Decrement(ref _queuedCount);
- return true;
- }
- return false;
- }
- private void ReceiveLoop()
- {
- var remote = new IPEndPoint(IPAddress.Any, 0);
- while (_running)
- {
- try
- {
- byte[] data = _client.Receive(ref remote);
- if (data == null || data.Length < HeaderSize) continue;
- ProcessPacket(data);
- CleanupTimeouts();
- }
- catch (SocketException)
- {
- CleanupTimeouts();
- continue;
- }
- catch (ObjectDisposedException) { break; }
- catch (ThreadInterruptedException) { break; }
- catch (Exception ex)
- {
- Debug.LogWarning($"[RomaFH] Receive error: {ex.Message}");
- }
- }
- }
- private void ProcessPacket(byte[] data)
- {
- uint magic = ReadU32LE(data, 0);
- if (magic != Magic) return;
- uint frameId = ReadU32LE(data, 4);
- ushort seq = ReadU16LE(data, 8);
- ushort total = ReadU16LE(data, 10);
- ushort dataLen = ReadU16LE(data, 12);
- if (total == 0 || seq >= total) return;
- if (HeaderSize + dataLen > data.Length) return;
- long nowTick = Stopwatch.GetTimestamp();
- lock (_asmLock)
- {
- if (!_assemblies.TryGetValue(frameId, out Assembly asm))
- {
- // 控制在途帧数量,避免网络抖动时占用过多内存
- if (_assemblies.Count >= _maxInFlightFrames)
- {
- // 丢弃最旧的一帧
- uint oldestKey = 0;
- long oldestTick = long.MaxValue;
- foreach (var kv in _assemblies)
- {
- if (kv.Value.LastTick < oldestTick)
- {
- oldestTick = kv.Value.LastTick;
- oldestKey = kv.Key;
- }
- }
- if (oldestTick != long.MaxValue)
- _assemblies.Remove(oldestKey);
- }
- asm = new Assembly(total, nowTick);
- _assemblies[frameId] = asm;
- }
- // total 变化时:直接重置该帧(避免协议异常导致越界)
- if (asm.Total != total)
- {
- asm = new Assembly(total, nowTick);
- _assemblies[frameId] = asm;
- }
- asm.LastTick = nowTick;
- if (asm.Parts[seq] == null)
- {
- byte[] payload = new byte[dataLen];
- Buffer.BlockCopy(data, HeaderSize, payload, 0, dataLen);
- asm.Parts[seq] = payload;
- asm.Received++;
- }
- if (asm.Received == asm.Total)
- {
- byte[] jpeg = JoinParts(asm.Parts);
- _assemblies.Remove(frameId);
- EnqueueLatest(jpeg);
- }
- }
- }
- private void CleanupTimeouts()
- {
- long now = Stopwatch.GetTimestamp();
- lock (_asmLock)
- {
- if (_assemblies.Count == 0) return;
- List<uint> toRemove = null;
- foreach (var kv in _assemblies)
- {
- double elapsedMs = (now - kv.Value.LastTick) * TickToMs;
- if (elapsedMs > _frameTimeoutMs)
- {
- toRemove ??= new List<uint>(4);
- toRemove.Add(kv.Key);
- }
- }
- if (toRemove != null)
- {
- foreach (uint k in toRemove) _assemblies.Remove(k);
- }
- }
- }
- private void EnqueueLatest(byte[] jpeg)
- {
- if (jpeg == null || jpeg.Length < 4) return;
- while (Volatile.Read(ref _queuedCount) >= _maxQueueSize && _jpegQueue.TryDequeue(out _))
- Interlocked.Decrement(ref _queuedCount);
- _jpegQueue.Enqueue(jpeg);
- Interlocked.Increment(ref _queuedCount);
- }
- private static byte[] JoinParts(byte[][] parts)
- {
- int totalLen = 0;
- for (int i = 0; i < parts.Length; i++)
- totalLen += parts[i]?.Length ?? 0;
- byte[] joined = new byte[totalLen];
- int offset = 0;
- for (int i = 0; i < parts.Length; i++)
- {
- byte[] p = parts[i];
- if (p == null || p.Length == 0) continue;
- Buffer.BlockCopy(p, 0, joined, offset, p.Length);
- offset += p.Length;
- }
- return joined;
- }
- private static ushort ReadU16LE(byte[] d, int o)
- {
- return (ushort)(d[o] | (d[o + 1] << 8));
- }
- private static uint ReadU32LE(byte[] d, int o)
- {
- return (uint)(d[o] | (d[o + 1] << 8) | (d[o + 2] << 16) | (d[o + 3] << 24));
- }
- public void Dispose()
- {
- Stop();
- }
- }
- }
|