using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using UnityEngine;
namespace LightGlue.Unity.Networking
{
///
/// UDP JPEG receiver (transparent mode).
/// - No protocol header; reassembles a JPEG by searching for SOI (FFD8) and EOI (FFD9).
/// - Ignores false SOI: in JPEG scan data 0xFF is byte-stuffed as 0xFF 0x00, so 0xFF 0xD8 after 0xFF 0x00 is not a real SOI.
/// - Receives on a background thread; enqueues complete JPEG byte[] for main-thread decoding.
///
public sealed class UDPJpegReceiver : IDisposable
{
private static readonly byte[] JpegStart = { 0xFF, 0xD8 }; // SOI
private static readonly byte[] JpegEnd = { 0xFF, 0xD9 }; // EOI
private readonly string _bindIp;
private readonly int _port;
private readonly float _timeoutSeconds;
private readonly int _maxQueueSize;
private UdpClient _client;
private Thread _thread;
private volatile bool _running;
private byte[] _buffer = Array.Empty();
private int _bufferLen;
private bool _receiving;
private DateTime _startTimeUtc;
private readonly ConcurrentQueue _jpegQueue = new ConcurrentQueue();
private int _queuedCount;
public UDPJpegReceiver(string bindIp, int port, float timeoutSeconds = 2.0f, int maxQueueSize = 2)
{
_bindIp = string.IsNullOrWhiteSpace(bindIp) ? "0.0.0.0" : bindIp;
_port = port;
_timeoutSeconds = Mathf.Max(0.1f, timeoutSeconds);
_maxQueueSize = Mathf.Max(1, maxQueueSize);
}
public void Start()
{
if (_running) return;
var ip = IPAddress.Parse(_bindIp);
_client = new UdpClient(new IPEndPoint(ip, _port));
_client.Client.ReceiveTimeout = 100; // 100ms, used to periodically check timeout/running
_running = true;
_thread = new Thread(ReceiveLoop)
{
IsBackground = true,
Name = "UDPJpegReceiver"
};
_thread.Start();
Debug.Log($"[UDP] JPEG receiver started on {_bindIp}:{_port}");
}
public void Stop()
{
_running = false;
// 先关闭UDP客户端,确保socket释放
if (_client != null)
{
try
{
_client.Close();
_client.Dispose(); // 确保完全释放资源
}
catch { /* ignore */ }
_client = null;
}
// 等待接收线程结束
if (_thread != null && _thread.IsAlive)
{
if (!_thread.Join(1000))
{
try { _thread.Interrupt(); } catch { /* ignore */ }
}
}
_thread = null;
ResetState();
Debug.Log("[UDP] JPEG receiver stopped");
}
public bool TryDequeueJpeg(out byte[] jpegBytes)
{
if (_jpegQueue.TryDequeue(out jpegBytes))
{
Interlocked.Decrement(ref _queuedCount);
return true;
}
return false;
}
private void ReceiveLoop()
{
IPEndPoint remoteEp = new IPEndPoint(IPAddress.Any, 0);
while (_running)
{
try
{
byte[] data = _client.Receive(ref remoteEp);
ProcessData(data);
CheckTimeout();
}
catch (SocketException ex)
{
// timeout => WSAETIMEDOUT (10060) or generic; just check timeout and continue
CheckTimeout();
if (!_running) break;
continue;
}
catch (ObjectDisposedException)
{
break;
}
catch (ThreadInterruptedException)
{
break;
}
catch (Exception ex)
{
Debug.LogError($"[UDP] Receive error: {ex}");
break;
}
}
}
private void ProcessData(byte[] data)
{
int startIdx = IndexOfRealJpegSoi(data, 0, data.Length);
if (startIdx >= 0 && !_receiving)
{
_receiving = true;
_startTimeUtc = DateTime.UtcNow;
EnsureCapacity(data.Length - startIdx);
_bufferLen = 0;
AppendToBuffer(data, startIdx, data.Length - startIdx);
return;
}
if (!_receiving) return;
EnsureCapacity(_bufferLen + data.Length);
AppendToBuffer(data, 0, data.Length);
int endIdx = IndexOf(_buffer, JpegEnd, 0, _bufferLen);
if (endIdx < 0) return;
int jpegLen = endIdx + 2;
if (jpegLen >= 4 && _buffer[0] == 0xFF && _buffer[1] == 0xD8 && _buffer[jpegLen - 2] == 0xFF && _buffer[jpegLen - 1] == 0xD9)
{
byte[] jpeg = new byte[jpegLen];
Buffer.BlockCopy(_buffer, 0, jpeg, 0, jpegLen);
EnqueueLatest(jpeg);
}
// 检查剩余缓冲区是否包含下一帧 SOI,避免丢弃下一帧开头导致后续损坏
int remainCount = _bufferLen - jpegLen;
int nextSoi = remainCount >= 2 ? IndexOfRealJpegSoi(_buffer, jpegLen, remainCount) : -1;
if (nextSoi >= jpegLen)
{
int remain = _bufferLen - nextSoi;
Buffer.BlockCopy(_buffer, nextSoi, _buffer, 0, remain);
_bufferLen = remain;
_startTimeUtc = DateTime.UtcNow;
}
else
{
ResetState();
}
}
///
/// 查找“真实”的 JPEG SOI (FF D8)。JPEG 熵编码中 0xFF 会被填充为 0xFF 0x00,
/// 故 0xFF 0x00 0xD8 中的 FFD8 是假 SOI,应跳过继续找下一个。
///
private static int IndexOfRealJpegSoi(byte[] data, int startIndex, int count)
{
if (data == null || count < 2 || startIndex + count > data.Length)
return -1;
int limit = startIndex + count - 2;
for (int i = startIndex; i <= limit; i++)
{
if (data[i] != 0xFF || data[i + 1] != 0xD8) continue;
if (i >= 2 && data[i - 2] == 0xFF && data[i - 1] == 0x00)
continue;
return i;
}
return -1;
}
private void CheckTimeout()
{
if (!_receiving) return;
double elapsed = (DateTime.UtcNow - _startTimeUtc).TotalSeconds;
if (elapsed > _timeoutSeconds)
{
Debug.LogWarning($"[UDP] Timeout: clearing incomplete image buffer ({_bufferLen} bytes)");
ResetState();
}
}
private void EnqueueLatest(byte[] jpeg)
{
// keep only latest N frames
while (Volatile.Read(ref _queuedCount) >= _maxQueueSize && _jpegQueue.TryDequeue(out _))
{
Interlocked.Decrement(ref _queuedCount);
}
_jpegQueue.Enqueue(jpeg);
Interlocked.Increment(ref _queuedCount);
}
private void ResetState()
{
_receiving = false;
_bufferLen = 0;
_startTimeUtc = default(DateTime);
}
private void EnsureCapacity(int desired)
{
if (_buffer.Length >= desired) return;
int newSize = Mathf.NextPowerOfTwo(Mathf.Max(1024, desired));
Array.Resize(ref _buffer, newSize);
}
private void AppendToBuffer(byte[] src, int srcOffset, int count)
{
Buffer.BlockCopy(src, srcOffset, _buffer, _bufferLen, count);
_bufferLen += count;
}
private static int IndexOf(byte[] haystack, byte[] needle, int startIndex)
{
return IndexOf(haystack, needle, startIndex, haystack.Length);
}
private static int IndexOf(byte[] haystack, byte[] needle, int startIndex, int haystackCount)
{
if (needle == null || needle.Length == 0) return -1;
if (haystack == null || haystackCount < needle.Length) return -1;
int limit = haystackCount - needle.Length;
for (int i = startIndex; i <= limit; i++)
{
bool match = true;
for (int j = 0; j < needle.Length; j++)
{
if (haystack[i + j] != needle[j])
{
match = false;
break;
}
}
if (match) return i;
}
return -1;
}
public void Dispose()
{
Stop();
}
}
}