using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using UnityEngine;
using Stopwatch = System.Diagnostics.Stopwatch;
namespace LightGlue.Unity.Roma
{
///
/// 按 orangepizero2w/UDP_PROTOCOL.md 的 FrameHeader(16B) 重组 JPEG 帧。
/// 用于接收 Python->Unity 的转发视频流(或直接接收 OrangePi WiFi 图传)。
///
/// Header(小端):
/// magic(uint32)="WIFI"(0x57494649), frame_id(uint32), seq(uint16), total(uint16), data_len(uint16), reserved(uint16)
///
public sealed class RomaFrameHeaderJpegReceiver : IDisposable
{
private const uint Magic = 0x57494649; // "WIFI" little-endian
private const int HeaderSize = 16;
private static readonly double TickToMs = 1000.0 / Stopwatch.Frequency;
private readonly string _bindIp;
private readonly int _port;
private readonly int _maxQueueSize;
private readonly int _maxInFlightFrames;
private readonly int _frameTimeoutMs;
private UdpClient _client;
private Thread _thread;
private volatile bool _running;
private readonly ConcurrentQueue _jpegQueue = new ConcurrentQueue();
private int _queuedCount;
private sealed class Assembly
{
public readonly int Total;
public readonly byte[][] Parts;
public int Received;
public long LastTick;
public Assembly(int total, long nowTick)
{
Total = total;
Parts = new byte[total][];
Received = 0;
LastTick = nowTick;
}
}
private readonly Dictionary _assemblies = new Dictionary();
private readonly object _asmLock = new object();
public RomaFrameHeaderJpegReceiver(
string bindIp,
int port,
int maxQueueSize = 2,
int maxInFlightFrames = 8,
int frameTimeoutMs = 500)
{
_bindIp = string.IsNullOrWhiteSpace(bindIp) ? "0.0.0.0" : bindIp;
_port = port;
_maxQueueSize = Mathf.Max(1, maxQueueSize);
_maxInFlightFrames = Mathf.Clamp(maxInFlightFrames, 1, 64);
_frameTimeoutMs = Mathf.Clamp(frameTimeoutMs, 50, 5000);
}
public void Start()
{
if (_running) return;
var ip = IPAddress.Parse(_bindIp);
_client = new UdpClient(new IPEndPoint(ip, _port));
_client.Client.ReceiveTimeout = 100;
_running = true;
_thread = new Thread(ReceiveLoop)
{
IsBackground = true,
Name = "RomaFrameHeaderJpegReceiver"
};
_thread.Start();
Debug.Log($"[RomaFH] Receiver started on {_bindIp}:{_port}");
}
public void Stop()
{
_running = false;
try { _client?.Close(); } catch { /* ignore */ }
try { _client?.Dispose(); } catch { /* ignore */ }
_client = null;
if (_thread != null && _thread.IsAlive)
{
if (!_thread.Join(1000))
{
try { _thread.Interrupt(); } catch { /* ignore */ }
}
}
_thread = null;
lock (_asmLock) { _assemblies.Clear(); }
while (_jpegQueue.TryDequeue(out _)) { Interlocked.Decrement(ref _queuedCount); }
}
public bool TryDequeueJpeg(out byte[] jpeg)
{
if (_jpegQueue.TryDequeue(out jpeg))
{
Interlocked.Decrement(ref _queuedCount);
return true;
}
return false;
}
private void ReceiveLoop()
{
var remote = new IPEndPoint(IPAddress.Any, 0);
while (_running)
{
try
{
byte[] data = _client.Receive(ref remote);
if (data == null || data.Length < HeaderSize) continue;
ProcessPacket(data);
CleanupTimeouts();
}
catch (SocketException)
{
CleanupTimeouts();
continue;
}
catch (ObjectDisposedException) { break; }
catch (ThreadInterruptedException) { break; }
catch (Exception ex)
{
Debug.LogWarning($"[RomaFH] Receive error: {ex.Message}");
}
}
}
private void ProcessPacket(byte[] data)
{
uint magic = ReadU32LE(data, 0);
if (magic != Magic) return;
uint frameId = ReadU32LE(data, 4);
ushort seq = ReadU16LE(data, 8);
ushort total = ReadU16LE(data, 10);
ushort dataLen = ReadU16LE(data, 12);
if (total == 0 || seq >= total) return;
if (HeaderSize + dataLen > data.Length) return;
long nowTick = Stopwatch.GetTimestamp();
lock (_asmLock)
{
if (!_assemblies.TryGetValue(frameId, out Assembly asm))
{
// 控制在途帧数量,避免网络抖动时占用过多内存
if (_assemblies.Count >= _maxInFlightFrames)
{
// 丢弃最旧的一帧
uint oldestKey = 0;
long oldestTick = long.MaxValue;
foreach (var kv in _assemblies)
{
if (kv.Value.LastTick < oldestTick)
{
oldestTick = kv.Value.LastTick;
oldestKey = kv.Key;
}
}
if (oldestTick != long.MaxValue)
_assemblies.Remove(oldestKey);
}
asm = new Assembly(total, nowTick);
_assemblies[frameId] = asm;
}
// total 变化时:直接重置该帧(避免协议异常导致越界)
if (asm.Total != total)
{
asm = new Assembly(total, nowTick);
_assemblies[frameId] = asm;
}
asm.LastTick = nowTick;
if (asm.Parts[seq] == null)
{
byte[] payload = new byte[dataLen];
Buffer.BlockCopy(data, HeaderSize, payload, 0, dataLen);
asm.Parts[seq] = payload;
asm.Received++;
}
if (asm.Received == asm.Total)
{
byte[] jpeg = JoinParts(asm.Parts);
_assemblies.Remove(frameId);
EnqueueLatest(jpeg);
}
}
}
private void CleanupTimeouts()
{
long now = Stopwatch.GetTimestamp();
lock (_asmLock)
{
if (_assemblies.Count == 0) return;
List toRemove = null;
foreach (var kv in _assemblies)
{
double elapsedMs = (now - kv.Value.LastTick) * TickToMs;
if (elapsedMs > _frameTimeoutMs)
{
toRemove ??= new List(4);
toRemove.Add(kv.Key);
}
}
if (toRemove != null)
{
foreach (uint k in toRemove) _assemblies.Remove(k);
}
}
}
private void EnqueueLatest(byte[] jpeg)
{
if (jpeg == null || jpeg.Length < 4) return;
while (Volatile.Read(ref _queuedCount) >= _maxQueueSize && _jpegQueue.TryDequeue(out _))
Interlocked.Decrement(ref _queuedCount);
_jpegQueue.Enqueue(jpeg);
Interlocked.Increment(ref _queuedCount);
}
private static byte[] JoinParts(byte[][] parts)
{
int totalLen = 0;
for (int i = 0; i < parts.Length; i++)
totalLen += parts[i]?.Length ?? 0;
byte[] joined = new byte[totalLen];
int offset = 0;
for (int i = 0; i < parts.Length; i++)
{
byte[] p = parts[i];
if (p == null || p.Length == 0) continue;
Buffer.BlockCopy(p, 0, joined, offset, p.Length);
offset += p.Length;
}
return joined;
}
private static ushort ReadU16LE(byte[] d, int o)
{
return (ushort)(d[o] | (d[o + 1] << 8));
}
private static uint ReadU32LE(byte[] d, int o)
{
return (uint)(d[o] | (d[o + 1] << 8) | (d[o + 2] << 16) | (d[o + 3] << 24));
}
public void Dispose()
{
Stop();
}
}
}