UDPJpegReceiver.cs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Net;
  4. using System.Net.Sockets;
  5. using System.Threading;
  6. using UnityEngine;
  7. namespace LightGlue.Unity.Networking
  8. {
  9. /// <summary>
  10. /// UDP JPEG receiver (transparent mode).
  11. /// - No protocol header; reassembles a JPEG by searching for SOI (FFD8) and EOI (FFD9).
  12. /// - Ignores false SOI: in JPEG scan data 0xFF is byte-stuffed as 0xFF 0x00, so 0xFF 0xD8 after 0xFF 0x00 is not a real SOI.
  13. /// - Receives on a background thread; enqueues complete JPEG byte[] for main-thread decoding.
  14. /// </summary>
  15. public sealed class UDPJpegReceiver : IDisposable
  16. {
  17. private static readonly byte[] JpegStart = { 0xFF, 0xD8 }; // SOI
  18. private static readonly byte[] JpegEnd = { 0xFF, 0xD9 }; // EOI
  19. private readonly string _bindIp;
  20. private readonly int _port;
  21. private readonly float _timeoutSeconds;
  22. private readonly int _maxQueueSize;
  23. private UdpClient _client;
  24. private Thread _thread;
  25. private volatile bool _running;
  26. private byte[] _buffer = Array.Empty<byte>();
  27. private int _bufferLen;
  28. private bool _receiving;
  29. private DateTime _startTimeUtc;
  30. private readonly ConcurrentQueue<byte[]> _jpegQueue = new ConcurrentQueue<byte[]>();
  31. private int _queuedCount;
  32. public UDPJpegReceiver(string bindIp, int port, float timeoutSeconds = 2.0f, int maxQueueSize = 2)
  33. {
  34. _bindIp = string.IsNullOrWhiteSpace(bindIp) ? "0.0.0.0" : bindIp;
  35. _port = port;
  36. _timeoutSeconds = Mathf.Max(0.1f, timeoutSeconds);
  37. _maxQueueSize = Mathf.Max(1, maxQueueSize);
  38. }
  39. public void Start()
  40. {
  41. if (_running) return;
  42. var ip = IPAddress.Parse(_bindIp);
  43. _client = new UdpClient(new IPEndPoint(ip, _port));
  44. _client.Client.ReceiveTimeout = 100; // 100ms, used to periodically check timeout/running
  45. _running = true;
  46. _thread = new Thread(ReceiveLoop)
  47. {
  48. IsBackground = true,
  49. Name = "UDPJpegReceiver"
  50. };
  51. _thread.Start();
  52. Debug.Log($"[UDP] JPEG receiver started on {_bindIp}:{_port}");
  53. }
  54. public void Stop()
  55. {
  56. _running = false;
  57. // 先关闭UDP客户端,确保socket释放
  58. if (_client != null)
  59. {
  60. try
  61. {
  62. _client.Close();
  63. _client.Dispose(); // 确保完全释放资源
  64. }
  65. catch { /* ignore */ }
  66. _client = null;
  67. }
  68. // 等待接收线程结束
  69. if (_thread != null && _thread.IsAlive)
  70. {
  71. if (!_thread.Join(1000))
  72. {
  73. try { _thread.Interrupt(); } catch { /* ignore */ }
  74. }
  75. }
  76. _thread = null;
  77. ResetState();
  78. Debug.Log("[UDP] JPEG receiver stopped");
  79. }
  80. public bool TryDequeueJpeg(out byte[] jpegBytes)
  81. {
  82. if (_jpegQueue.TryDequeue(out jpegBytes))
  83. {
  84. Interlocked.Decrement(ref _queuedCount);
  85. return true;
  86. }
  87. return false;
  88. }
  89. private void ReceiveLoop()
  90. {
  91. IPEndPoint remoteEp = new IPEndPoint(IPAddress.Any, 0);
  92. while (_running)
  93. {
  94. try
  95. {
  96. byte[] data = _client.Receive(ref remoteEp);
  97. ProcessData(data);
  98. CheckTimeout();
  99. }
  100. catch (SocketException ex)
  101. {
  102. // timeout => WSAETIMEDOUT (10060) or generic; just check timeout and continue
  103. CheckTimeout();
  104. if (!_running) break;
  105. continue;
  106. }
  107. catch (ObjectDisposedException)
  108. {
  109. break;
  110. }
  111. catch (ThreadInterruptedException)
  112. {
  113. break;
  114. }
  115. catch (Exception ex)
  116. {
  117. Debug.LogError($"[UDP] Receive error: {ex}");
  118. break;
  119. }
  120. }
  121. }
  122. private void ProcessData(byte[] data)
  123. {
  124. int startIdx = IndexOfRealJpegSoi(data, 0, data.Length);
  125. if (startIdx >= 0 && !_receiving)
  126. {
  127. _receiving = true;
  128. _startTimeUtc = DateTime.UtcNow;
  129. EnsureCapacity(data.Length - startIdx);
  130. _bufferLen = 0;
  131. AppendToBuffer(data, startIdx, data.Length - startIdx);
  132. return;
  133. }
  134. if (!_receiving) return;
  135. EnsureCapacity(_bufferLen + data.Length);
  136. AppendToBuffer(data, 0, data.Length);
  137. int endIdx = IndexOf(_buffer, JpegEnd, 0, _bufferLen);
  138. if (endIdx < 0) return;
  139. int jpegLen = endIdx + 2;
  140. if (jpegLen >= 4 && _buffer[0] == 0xFF && _buffer[1] == 0xD8 && _buffer[jpegLen - 2] == 0xFF && _buffer[jpegLen - 1] == 0xD9)
  141. {
  142. byte[] jpeg = new byte[jpegLen];
  143. Buffer.BlockCopy(_buffer, 0, jpeg, 0, jpegLen);
  144. EnqueueLatest(jpeg);
  145. }
  146. // 检查剩余缓冲区是否包含下一帧 SOI,避免丢弃下一帧开头导致后续损坏
  147. int remainCount = _bufferLen - jpegLen;
  148. int nextSoi = remainCount >= 2 ? IndexOfRealJpegSoi(_buffer, jpegLen, remainCount) : -1;
  149. if (nextSoi >= jpegLen)
  150. {
  151. int remain = _bufferLen - nextSoi;
  152. Buffer.BlockCopy(_buffer, nextSoi, _buffer, 0, remain);
  153. _bufferLen = remain;
  154. _startTimeUtc = DateTime.UtcNow;
  155. }
  156. else
  157. {
  158. ResetState();
  159. }
  160. }
  161. /// <summary>
  162. /// 查找“真实”的 JPEG SOI (FF D8)。JPEG 熵编码中 0xFF 会被填充为 0xFF 0x00,
  163. /// 故 0xFF 0x00 0xD8 中的 FFD8 是假 SOI,应跳过继续找下一个。
  164. /// </summary>
  165. private static int IndexOfRealJpegSoi(byte[] data, int startIndex, int count)
  166. {
  167. if (data == null || count < 2 || startIndex + count > data.Length)
  168. return -1;
  169. int limit = startIndex + count - 2;
  170. for (int i = startIndex; i <= limit; i++)
  171. {
  172. if (data[i] != 0xFF || data[i + 1] != 0xD8) continue;
  173. if (i >= 2 && data[i - 2] == 0xFF && data[i - 1] == 0x00)
  174. continue;
  175. return i;
  176. }
  177. return -1;
  178. }
  179. private void CheckTimeout()
  180. {
  181. if (!_receiving) return;
  182. double elapsed = (DateTime.UtcNow - _startTimeUtc).TotalSeconds;
  183. if (elapsed > _timeoutSeconds)
  184. {
  185. Debug.LogWarning($"[UDP] Timeout: clearing incomplete image buffer ({_bufferLen} bytes)");
  186. ResetState();
  187. }
  188. }
  189. private void EnqueueLatest(byte[] jpeg)
  190. {
  191. // keep only latest N frames
  192. while (Volatile.Read(ref _queuedCount) >= _maxQueueSize && _jpegQueue.TryDequeue(out _))
  193. {
  194. Interlocked.Decrement(ref _queuedCount);
  195. }
  196. _jpegQueue.Enqueue(jpeg);
  197. Interlocked.Increment(ref _queuedCount);
  198. }
  199. private void ResetState()
  200. {
  201. _receiving = false;
  202. _bufferLen = 0;
  203. _startTimeUtc = default(DateTime);
  204. }
  205. private void EnsureCapacity(int desired)
  206. {
  207. if (_buffer.Length >= desired) return;
  208. int newSize = Mathf.NextPowerOfTwo(Mathf.Max(1024, desired));
  209. Array.Resize(ref _buffer, newSize);
  210. }
  211. private void AppendToBuffer(byte[] src, int srcOffset, int count)
  212. {
  213. Buffer.BlockCopy(src, srcOffset, _buffer, _bufferLen, count);
  214. _bufferLen += count;
  215. }
  216. private static int IndexOf(byte[] haystack, byte[] needle, int startIndex)
  217. {
  218. return IndexOf(haystack, needle, startIndex, haystack.Length);
  219. }
  220. private static int IndexOf(byte[] haystack, byte[] needle, int startIndex, int haystackCount)
  221. {
  222. if (needle == null || needle.Length == 0) return -1;
  223. if (haystack == null || haystackCount < needle.Length) return -1;
  224. int limit = haystackCount - needle.Length;
  225. for (int i = startIndex; i <= limit; i++)
  226. {
  227. bool match = true;
  228. for (int j = 0; j < needle.Length; j++)
  229. {
  230. if (haystack[i + j] != needle[j])
  231. {
  232. match = false;
  233. break;
  234. }
  235. }
  236. if (match) return i;
  237. }
  238. return -1;
  239. }
  240. public void Dispose()
  241. {
  242. Stop();
  243. }
  244. }
  245. }