using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using UnityEngine;
namespace LightGlue.Unity.Networking
{
///
/// UDP结果接收器(用于接收Python发送的LightGlue算法结果)
/// - 接收二进制格式的结果数据包
/// - 在后台线程接收,主线程通过队列消费结果
/// - 协议格式:版本(2) + 长度(2) + 有效性(1) + 匹配数(2) + 内点比例(4) + 相机X(4) + 相机Y(4) + 校验和(1) = 20字节
///
public sealed class UDPResultReceiver : IDisposable
{
private const ushort ProtocolVersion = 0x0001;
private const int PacketSize = 20; // 包含校验和的完整包大小
private const int DataSize = 15; // 数据段大小(不包括版本和长度字段)
private readonly string _bindIp;
private readonly int _port;
private readonly int _maxQueueSize;
private UdpClient _client;
private Thread _thread;
private volatile bool _running;
private readonly ConcurrentQueue _resultQueue = new ConcurrentQueue();
private int _queuedCount;
public UDPResultReceiver(string bindIp, int port, int maxQueueSize = 10)
{
_bindIp = string.IsNullOrWhiteSpace(bindIp) ? "0.0.0.0" : bindIp;
_port = port;
_maxQueueSize = Mathf.Max(1, maxQueueSize);
}
public void Start()
{
if (_running) return;
var ip = IPAddress.Parse(_bindIp);
_client = new UdpClient(new IPEndPoint(ip, _port));
_client.Client.ReceiveTimeout = 100; // 100ms超时,用于定期检查
_running = true;
_thread = new Thread(ReceiveLoop)
{
IsBackground = true,
Name = "UDPResultReceiver"
};
_thread.Start();
Debug.Log($"[UDP] Result receiver started on {_bindIp}:{_port}");
}
public void Stop()
{
_running = false;
if (_client != null)
{
try
{
_client.Close();
_client.Dispose();
}
catch { /* ignore */ }
_client = null;
}
if (_thread != null && _thread.IsAlive)
{
if (!_thread.Join(1000))
{
try { _thread.Interrupt(); } catch { /* ignore */ }
}
}
_thread = null;
// 清空队列
while (_resultQueue.TryDequeue(out _))
{
Interlocked.Decrement(ref _queuedCount);
}
Debug.Log("[UDP] Result receiver stopped");
}
///
/// 尝试从队列中获取最新结果(非阻塞)
///
public bool TryDequeueResult(out LightGlueResult result)
{
if (_resultQueue.TryDequeue(out result))
{
Interlocked.Decrement(ref _queuedCount);
return true;
}
return false;
}
private void ReceiveLoop()
{
IPEndPoint remoteEp = new IPEndPoint(IPAddress.Any, 0);
while (_running)
{
try
{
byte[] data = _client.Receive(ref remoteEp);
if (data != null && data.Length >= PacketSize)
{
ProcessPacket(data);
}
}
catch (SocketException ex)
{
// timeout or other socket errors - just continue
if (!_running) break;
continue;
}
catch (ObjectDisposedException)
{
break;
}
catch (ThreadInterruptedException)
{
break;
}
catch (Exception ex)
{
Debug.LogError($"[UDP] Result receive error: {ex}");
break;
}
}
}
private void ProcessPacket(byte[] data)
{
try
{
// 解析协议版本(小端序)
ushort version = (ushort)(data[0] | (data[1] << 8));
if (version != ProtocolVersion)
{
Debug.LogWarning($"[UDP] Invalid protocol version: {version}, expected {ProtocolVersion}");
return;
}
// 解析数据长度(小端序)
ushort dataLength = (ushort)(data[2] | (data[3] << 8));
if (dataLength != DataSize)
{
Debug.LogWarning($"[UDP] Invalid data length: {dataLength}, expected {DataSize}");
return;
}
// 解析数据段(从索引4开始)
// 格式:is_valid(1) + num_matches(2) + inliers_ratio(4) + camera_x(4) + camera_y(4) + checksum(1)
bool isValid = data[4] != 0;
ushort numMatches = (ushort)(data[5] | (data[6] << 8));
// 解析浮点数(小端序)
float inliersRatio = BitConverter.ToSingle(data, 7);
float cameraX = BitConverter.ToSingle(data, 11);
float cameraY = BitConverter.ToSingle(data, 15);
// 可选:验证校验和
byte checksum = data[19];
if (checksum != 0) // 如果校验和不为0,则验证
{
byte calculatedChecksum = 0;
for (int i = 4; i < 19; i++)
{
calculatedChecksum = (byte)((calculatedChecksum + data[i]) % 256);
}
if (calculatedChecksum != checksum)
{
Debug.LogWarning($"[UDP] Checksum mismatch: calculated={calculatedChecksum}, received={checksum}");
return;
}
}
// 创建结果对象
LightGlueResult result = new LightGlueResult(isValid, numMatches, inliersRatio, cameraX, cameraY);
// 入队(只保留最新N个结果)
EnqueueLatest(result);
}
catch (Exception ex)
{
Debug.LogError($"[UDP] Failed to parse result packet: {ex}");
}
}
private void EnqueueLatest(LightGlueResult result)
{
// 只保留最新N个结果
while (Volatile.Read(ref _queuedCount) >= _maxQueueSize && _resultQueue.TryDequeue(out _))
{
Interlocked.Decrement(ref _queuedCount);
}
_resultQueue.Enqueue(result);
Interlocked.Increment(ref _queuedCount);
}
public void Dispose()
{
Stop();
}
}
}