| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- using System;
- using System.Collections.Concurrent;
- using System.Net;
- using System.Net.Sockets;
- using System.Threading;
- using UnityEngine;
- namespace LightGlue.Unity.Networking
- {
- /// <summary>
- /// UDP JPEG receiver (transparent mode).
- /// - No protocol header; reassembles a JPEG by searching for SOI (FFD8) and EOI (FFD9).
- /// - Ignores false SOI: in JPEG scan data 0xFF is byte-stuffed as 0xFF 0x00, so 0xFF 0xD8 after 0xFF 0x00 is not a real SOI.
- /// - Receives on a background thread; enqueues complete JPEG byte[] for main-thread decoding.
- /// </summary>
- public sealed class UDPJpegReceiver : IDisposable
- {
- private static readonly byte[] JpegStart = { 0xFF, 0xD8 }; // SOI
- private static readonly byte[] JpegEnd = { 0xFF, 0xD9 }; // EOI
- private readonly string _bindIp;
- private readonly int _port;
- private readonly float _timeoutSeconds;
- private readonly int _maxQueueSize;
- private UdpClient _client;
- private Thread _thread;
- private volatile bool _running;
- private byte[] _buffer = Array.Empty<byte>();
- private int _bufferLen;
- private bool _receiving;
- private DateTime _startTimeUtc;
- private readonly ConcurrentQueue<byte[]> _jpegQueue = new ConcurrentQueue<byte[]>();
- private int _queuedCount;
- public UDPJpegReceiver(string bindIp, int port, float timeoutSeconds = 2.0f, int maxQueueSize = 2)
- {
- _bindIp = string.IsNullOrWhiteSpace(bindIp) ? "0.0.0.0" : bindIp;
- _port = port;
- _timeoutSeconds = Mathf.Max(0.1f, timeoutSeconds);
- _maxQueueSize = Mathf.Max(1, maxQueueSize);
- }
- public void Start()
- {
- if (_running) return;
- var ip = IPAddress.Parse(_bindIp);
- _client = new UdpClient(new IPEndPoint(ip, _port));
- _client.Client.ReceiveTimeout = 100; // 100ms, used to periodically check timeout/running
- _running = true;
- _thread = new Thread(ReceiveLoop)
- {
- IsBackground = true,
- Name = "UDPJpegReceiver"
- };
- _thread.Start();
- Debug.Log($"[UDP] JPEG receiver started on {_bindIp}:{_port}");
- }
- public void Stop()
- {
- _running = false;
-
- // 先关闭UDP客户端,确保socket释放
- if (_client != null)
- {
- try
- {
- _client.Close();
- _client.Dispose(); // 确保完全释放资源
- }
- catch { /* ignore */ }
- _client = null;
- }
- // 等待接收线程结束
- if (_thread != null && _thread.IsAlive)
- {
- if (!_thread.Join(1000))
- {
- try { _thread.Interrupt(); } catch { /* ignore */ }
- }
- }
- _thread = null;
- ResetState();
- Debug.Log("[UDP] JPEG receiver stopped");
- }
- public bool TryDequeueJpeg(out byte[] jpegBytes)
- {
- if (_jpegQueue.TryDequeue(out jpegBytes))
- {
- Interlocked.Decrement(ref _queuedCount);
- return true;
- }
- return false;
- }
- private void ReceiveLoop()
- {
- IPEndPoint remoteEp = new IPEndPoint(IPAddress.Any, 0);
- while (_running)
- {
- try
- {
- byte[] data = _client.Receive(ref remoteEp);
- ProcessData(data);
- CheckTimeout();
- }
- catch (SocketException ex)
- {
- // timeout => WSAETIMEDOUT (10060) or generic; just check timeout and continue
- CheckTimeout();
- if (!_running) break;
- continue;
- }
- catch (ObjectDisposedException)
- {
- break;
- }
- catch (ThreadInterruptedException)
- {
- break;
- }
- catch (Exception ex)
- {
- Debug.LogError($"[UDP] Receive error: {ex}");
- break;
- }
- }
- }
- private void ProcessData(byte[] data)
- {
- int startIdx = IndexOfRealJpegSoi(data, 0, data.Length);
- if (startIdx >= 0 && !_receiving)
- {
- _receiving = true;
- _startTimeUtc = DateTime.UtcNow;
- EnsureCapacity(data.Length - startIdx);
- _bufferLen = 0;
- AppendToBuffer(data, startIdx, data.Length - startIdx);
- return;
- }
- if (!_receiving) return;
- EnsureCapacity(_bufferLen + data.Length);
- AppendToBuffer(data, 0, data.Length);
- int endIdx = IndexOf(_buffer, JpegEnd, 0, _bufferLen);
- if (endIdx < 0) return;
- int jpegLen = endIdx + 2;
- if (jpegLen >= 4 && _buffer[0] == 0xFF && _buffer[1] == 0xD8 && _buffer[jpegLen - 2] == 0xFF && _buffer[jpegLen - 1] == 0xD9)
- {
- byte[] jpeg = new byte[jpegLen];
- Buffer.BlockCopy(_buffer, 0, jpeg, 0, jpegLen);
- EnqueueLatest(jpeg);
- }
- // 检查剩余缓冲区是否包含下一帧 SOI,避免丢弃下一帧开头导致后续损坏
- int remainCount = _bufferLen - jpegLen;
- int nextSoi = remainCount >= 2 ? IndexOfRealJpegSoi(_buffer, jpegLen, remainCount) : -1;
- if (nextSoi >= jpegLen)
- {
- int remain = _bufferLen - nextSoi;
- Buffer.BlockCopy(_buffer, nextSoi, _buffer, 0, remain);
- _bufferLen = remain;
- _startTimeUtc = DateTime.UtcNow;
- }
- else
- {
- ResetState();
- }
- }
- /// <summary>
- /// 查找“真实”的 JPEG SOI (FF D8)。JPEG 熵编码中 0xFF 会被填充为 0xFF 0x00,
- /// 故 0xFF 0x00 0xD8 中的 FFD8 是假 SOI,应跳过继续找下一个。
- /// </summary>
- private static int IndexOfRealJpegSoi(byte[] data, int startIndex, int count)
- {
- if (data == null || count < 2 || startIndex + count > data.Length)
- return -1;
- int limit = startIndex + count - 2;
- for (int i = startIndex; i <= limit; i++)
- {
- if (data[i] != 0xFF || data[i + 1] != 0xD8) continue;
- if (i >= 2 && data[i - 2] == 0xFF && data[i - 1] == 0x00)
- continue;
- return i;
- }
- return -1;
- }
- private void CheckTimeout()
- {
- if (!_receiving) return;
- double elapsed = (DateTime.UtcNow - _startTimeUtc).TotalSeconds;
- if (elapsed > _timeoutSeconds)
- {
- Debug.LogWarning($"[UDP] Timeout: clearing incomplete image buffer ({_bufferLen} bytes)");
- ResetState();
- }
- }
- private void EnqueueLatest(byte[] jpeg)
- {
- // keep only latest N frames
- while (Volatile.Read(ref _queuedCount) >= _maxQueueSize && _jpegQueue.TryDequeue(out _))
- {
- Interlocked.Decrement(ref _queuedCount);
- }
- _jpegQueue.Enqueue(jpeg);
- Interlocked.Increment(ref _queuedCount);
- }
- private void ResetState()
- {
- _receiving = false;
- _bufferLen = 0;
- _startTimeUtc = default(DateTime);
- }
- private void EnsureCapacity(int desired)
- {
- if (_buffer.Length >= desired) return;
- int newSize = Mathf.NextPowerOfTwo(Mathf.Max(1024, desired));
- Array.Resize(ref _buffer, newSize);
- }
- private void AppendToBuffer(byte[] src, int srcOffset, int count)
- {
- Buffer.BlockCopy(src, srcOffset, _buffer, _bufferLen, count);
- _bufferLen += count;
- }
- private static int IndexOf(byte[] haystack, byte[] needle, int startIndex)
- {
- return IndexOf(haystack, needle, startIndex, haystack.Length);
- }
- private static int IndexOf(byte[] haystack, byte[] needle, int startIndex, int haystackCount)
- {
- if (needle == null || needle.Length == 0) return -1;
- if (haystack == null || haystackCount < needle.Length) return -1;
- int limit = haystackCount - needle.Length;
- for (int i = startIndex; i <= limit; i++)
- {
- bool match = true;
- for (int j = 0; j < needle.Length; j++)
- {
- if (haystack[i + j] != needle[j])
- {
- match = false;
- break;
- }
- }
- if (match) return i;
- }
- return -1;
- }
- public void Dispose()
- {
- Stop();
- }
- }
- }
|