using System; using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Threading; using UnityEngine; namespace LightGlue.Unity.Networking { /// /// UDP JPEG receiver (transparent mode). /// - No protocol header; reassembles a JPEG by searching for SOI (FFD8) and EOI (FFD9). /// - Ignores false SOI: in JPEG scan data 0xFF is byte-stuffed as 0xFF 0x00, so 0xFF 0xD8 after 0xFF 0x00 is not a real SOI. /// - Receives on a background thread; enqueues complete JPEG byte[] for main-thread decoding. /// public sealed class UDPJpegReceiver : IDisposable { private static readonly byte[] JpegStart = { 0xFF, 0xD8 }; // SOI private static readonly byte[] JpegEnd = { 0xFF, 0xD9 }; // EOI private readonly string _bindIp; private readonly int _port; private readonly float _timeoutSeconds; private readonly int _maxQueueSize; private UdpClient _client; private Thread _thread; private volatile bool _running; private byte[] _buffer = Array.Empty(); private int _bufferLen; private bool _receiving; private DateTime _startTimeUtc; private readonly ConcurrentQueue _jpegQueue = new ConcurrentQueue(); private int _queuedCount; public UDPJpegReceiver(string bindIp, int port, float timeoutSeconds = 2.0f, int maxQueueSize = 2) { _bindIp = string.IsNullOrWhiteSpace(bindIp) ? "0.0.0.0" : bindIp; _port = port; _timeoutSeconds = Mathf.Max(0.1f, timeoutSeconds); _maxQueueSize = Mathf.Max(1, maxQueueSize); } public void Start() { if (_running) return; var ip = IPAddress.Parse(_bindIp); _client = new UdpClient(new IPEndPoint(ip, _port)); _client.Client.ReceiveTimeout = 100; // 100ms, used to periodically check timeout/running _running = true; _thread = new Thread(ReceiveLoop) { IsBackground = true, Name = "UDPJpegReceiver" }; _thread.Start(); Debug.Log($"[UDP] JPEG receiver started on {_bindIp}:{_port}"); } public void Stop() { _running = false; // 先关闭UDP客户端,确保socket释放 if (_client != null) { try { _client.Close(); _client.Dispose(); // 确保完全释放资源 } catch { /* ignore */ } _client = null; } // 等待接收线程结束 if (_thread != null && _thread.IsAlive) { if (!_thread.Join(1000)) { try { _thread.Interrupt(); } catch { /* ignore */ } } } _thread = null; ResetState(); Debug.Log("[UDP] JPEG receiver stopped"); } public bool TryDequeueJpeg(out byte[] jpegBytes) { if (_jpegQueue.TryDequeue(out jpegBytes)) { Interlocked.Decrement(ref _queuedCount); return true; } return false; } private void ReceiveLoop() { IPEndPoint remoteEp = new IPEndPoint(IPAddress.Any, 0); while (_running) { try { byte[] data = _client.Receive(ref remoteEp); ProcessData(data); CheckTimeout(); } catch (SocketException ex) { // timeout => WSAETIMEDOUT (10060) or generic; just check timeout and continue CheckTimeout(); if (!_running) break; continue; } catch (ObjectDisposedException) { break; } catch (ThreadInterruptedException) { break; } catch (Exception ex) { Debug.LogError($"[UDP] Receive error: {ex}"); break; } } } private void ProcessData(byte[] data) { int startIdx = IndexOfRealJpegSoi(data, 0, data.Length); if (startIdx >= 0 && !_receiving) { _receiving = true; _startTimeUtc = DateTime.UtcNow; EnsureCapacity(data.Length - startIdx); _bufferLen = 0; AppendToBuffer(data, startIdx, data.Length - startIdx); return; } if (!_receiving) return; EnsureCapacity(_bufferLen + data.Length); AppendToBuffer(data, 0, data.Length); int endIdx = IndexOf(_buffer, JpegEnd, 0, _bufferLen); if (endIdx < 0) return; int jpegLen = endIdx + 2; if (jpegLen >= 4 && _buffer[0] == 0xFF && _buffer[1] == 0xD8 && _buffer[jpegLen - 2] == 0xFF && _buffer[jpegLen - 1] == 0xD9) { byte[] jpeg = new byte[jpegLen]; Buffer.BlockCopy(_buffer, 0, jpeg, 0, jpegLen); EnqueueLatest(jpeg); } // 检查剩余缓冲区是否包含下一帧 SOI,避免丢弃下一帧开头导致后续损坏 int remainCount = _bufferLen - jpegLen; int nextSoi = remainCount >= 2 ? IndexOfRealJpegSoi(_buffer, jpegLen, remainCount) : -1; if (nextSoi >= jpegLen) { int remain = _bufferLen - nextSoi; Buffer.BlockCopy(_buffer, nextSoi, _buffer, 0, remain); _bufferLen = remain; _startTimeUtc = DateTime.UtcNow; } else { ResetState(); } } /// /// 查找“真实”的 JPEG SOI (FF D8)。JPEG 熵编码中 0xFF 会被填充为 0xFF 0x00, /// 故 0xFF 0x00 0xD8 中的 FFD8 是假 SOI,应跳过继续找下一个。 /// private static int IndexOfRealJpegSoi(byte[] data, int startIndex, int count) { if (data == null || count < 2 || startIndex + count > data.Length) return -1; int limit = startIndex + count - 2; for (int i = startIndex; i <= limit; i++) { if (data[i] != 0xFF || data[i + 1] != 0xD8) continue; if (i >= 2 && data[i - 2] == 0xFF && data[i - 1] == 0x00) continue; return i; } return -1; } private void CheckTimeout() { if (!_receiving) return; double elapsed = (DateTime.UtcNow - _startTimeUtc).TotalSeconds; if (elapsed > _timeoutSeconds) { Debug.LogWarning($"[UDP] Timeout: clearing incomplete image buffer ({_bufferLen} bytes)"); ResetState(); } } private void EnqueueLatest(byte[] jpeg) { // keep only latest N frames while (Volatile.Read(ref _queuedCount) >= _maxQueueSize && _jpegQueue.TryDequeue(out _)) { Interlocked.Decrement(ref _queuedCount); } _jpegQueue.Enqueue(jpeg); Interlocked.Increment(ref _queuedCount); } private void ResetState() { _receiving = false; _bufferLen = 0; _startTimeUtc = default(DateTime); } private void EnsureCapacity(int desired) { if (_buffer.Length >= desired) return; int newSize = Mathf.NextPowerOfTwo(Mathf.Max(1024, desired)); Array.Resize(ref _buffer, newSize); } private void AppendToBuffer(byte[] src, int srcOffset, int count) { Buffer.BlockCopy(src, srcOffset, _buffer, _bufferLen, count); _bufferLen += count; } private static int IndexOf(byte[] haystack, byte[] needle, int startIndex) { return IndexOf(haystack, needle, startIndex, haystack.Length); } private static int IndexOf(byte[] haystack, byte[] needle, int startIndex, int haystackCount) { if (needle == null || needle.Length == 0) return -1; if (haystack == null || haystackCount < needle.Length) return -1; int limit = haystackCount - needle.Length; for (int i = startIndex; i <= limit; i++) { bool match = true; for (int j = 0; j < needle.Length; j++) { if (haystack[i + j] != needle[j]) { match = false; break; } } if (match) return i; } return -1; } public void Dispose() { Stop(); } } }