RomaFrameHeaderJpegReceiver.cs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Threading;
  7. using UnityEngine;
  8. using Stopwatch = System.Diagnostics.Stopwatch;
  9. namespace LightGlue.Unity.Roma
  10. {
  11. /// <summary>
  12. /// 按 orangepizero2w/UDP_PROTOCOL.md 的 FrameHeader(16B) 重组 JPEG 帧。
  13. /// 用于接收 Python->Unity 的转发视频流(或直接接收 OrangePi WiFi 图传)。
  14. ///
  15. /// Header(小端):
  16. /// magic(uint32)="WIFI"(0x57494649), frame_id(uint32), seq(uint16), total(uint16), data_len(uint16), reserved(uint16)
  17. /// </summary>
  18. public sealed class RomaFrameHeaderJpegReceiver : IDisposable
  19. {
  20. private const uint Magic = 0x57494649; // "WIFI" little-endian
  21. private const int HeaderSize = 16;
  22. private static readonly double TickToMs = 1000.0 / Stopwatch.Frequency;
  23. private readonly string _bindIp;
  24. private readonly int _port;
  25. private readonly int _maxQueueSize;
  26. private readonly int _maxInFlightFrames;
  27. private readonly int _frameTimeoutMs;
  28. private UdpClient _client;
  29. private Thread _thread;
  30. private volatile bool _running;
  31. private readonly ConcurrentQueue<byte[]> _jpegQueue = new ConcurrentQueue<byte[]>();
  32. private int _queuedCount;
  33. private sealed class Assembly
  34. {
  35. public readonly int Total;
  36. public readonly byte[][] Parts;
  37. public int Received;
  38. public long LastTick;
  39. public Assembly(int total, long nowTick)
  40. {
  41. Total = total;
  42. Parts = new byte[total][];
  43. Received = 0;
  44. LastTick = nowTick;
  45. }
  46. }
  47. private readonly Dictionary<uint, Assembly> _assemblies = new Dictionary<uint, Assembly>();
  48. private readonly object _asmLock = new object();
  49. public RomaFrameHeaderJpegReceiver(
  50. string bindIp,
  51. int port,
  52. int maxQueueSize = 2,
  53. int maxInFlightFrames = 8,
  54. int frameTimeoutMs = 500)
  55. {
  56. _bindIp = string.IsNullOrWhiteSpace(bindIp) ? "0.0.0.0" : bindIp;
  57. _port = port;
  58. _maxQueueSize = Mathf.Max(1, maxQueueSize);
  59. _maxInFlightFrames = Mathf.Clamp(maxInFlightFrames, 1, 64);
  60. _frameTimeoutMs = Mathf.Clamp(frameTimeoutMs, 50, 5000);
  61. }
  62. public void Start()
  63. {
  64. if (_running) return;
  65. var ip = IPAddress.Parse(_bindIp);
  66. _client = new UdpClient(new IPEndPoint(ip, _port));
  67. _client.Client.ReceiveTimeout = 100;
  68. _running = true;
  69. _thread = new Thread(ReceiveLoop)
  70. {
  71. IsBackground = true,
  72. Name = "RomaFrameHeaderJpegReceiver"
  73. };
  74. _thread.Start();
  75. Debug.Log($"[RomaFH] Receiver started on {_bindIp}:{_port}");
  76. }
  77. public void Stop()
  78. {
  79. _running = false;
  80. try { _client?.Close(); } catch { /* ignore */ }
  81. try { _client?.Dispose(); } catch { /* ignore */ }
  82. _client = null;
  83. if (_thread != null && _thread.IsAlive)
  84. {
  85. if (!_thread.Join(1000))
  86. {
  87. try { _thread.Interrupt(); } catch { /* ignore */ }
  88. }
  89. }
  90. _thread = null;
  91. lock (_asmLock) { _assemblies.Clear(); }
  92. while (_jpegQueue.TryDequeue(out _)) { Interlocked.Decrement(ref _queuedCount); }
  93. }
  94. public bool TryDequeueJpeg(out byte[] jpeg)
  95. {
  96. if (_jpegQueue.TryDequeue(out jpeg))
  97. {
  98. Interlocked.Decrement(ref _queuedCount);
  99. return true;
  100. }
  101. return false;
  102. }
  103. private void ReceiveLoop()
  104. {
  105. var remote = new IPEndPoint(IPAddress.Any, 0);
  106. while (_running)
  107. {
  108. try
  109. {
  110. byte[] data = _client.Receive(ref remote);
  111. if (data == null || data.Length < HeaderSize) continue;
  112. ProcessPacket(data);
  113. CleanupTimeouts();
  114. }
  115. catch (SocketException)
  116. {
  117. CleanupTimeouts();
  118. continue;
  119. }
  120. catch (ObjectDisposedException) { break; }
  121. catch (ThreadInterruptedException) { break; }
  122. catch (Exception ex)
  123. {
  124. Debug.LogWarning($"[RomaFH] Receive error: {ex.Message}");
  125. }
  126. }
  127. }
  128. private void ProcessPacket(byte[] data)
  129. {
  130. uint magic = ReadU32LE(data, 0);
  131. if (magic != Magic) return;
  132. uint frameId = ReadU32LE(data, 4);
  133. ushort seq = ReadU16LE(data, 8);
  134. ushort total = ReadU16LE(data, 10);
  135. ushort dataLen = ReadU16LE(data, 12);
  136. if (total == 0 || seq >= total) return;
  137. if (HeaderSize + dataLen > data.Length) return;
  138. long nowTick = Stopwatch.GetTimestamp();
  139. lock (_asmLock)
  140. {
  141. if (!_assemblies.TryGetValue(frameId, out Assembly asm))
  142. {
  143. // 控制在途帧数量,避免网络抖动时占用过多内存
  144. if (_assemblies.Count >= _maxInFlightFrames)
  145. {
  146. // 丢弃最旧的一帧
  147. uint oldestKey = 0;
  148. long oldestTick = long.MaxValue;
  149. foreach (var kv in _assemblies)
  150. {
  151. if (kv.Value.LastTick < oldestTick)
  152. {
  153. oldestTick = kv.Value.LastTick;
  154. oldestKey = kv.Key;
  155. }
  156. }
  157. if (oldestTick != long.MaxValue)
  158. _assemblies.Remove(oldestKey);
  159. }
  160. asm = new Assembly(total, nowTick);
  161. _assemblies[frameId] = asm;
  162. }
  163. // total 变化时:直接重置该帧(避免协议异常导致越界)
  164. if (asm.Total != total)
  165. {
  166. asm = new Assembly(total, nowTick);
  167. _assemblies[frameId] = asm;
  168. }
  169. asm.LastTick = nowTick;
  170. if (asm.Parts[seq] == null)
  171. {
  172. byte[] payload = new byte[dataLen];
  173. Buffer.BlockCopy(data, HeaderSize, payload, 0, dataLen);
  174. asm.Parts[seq] = payload;
  175. asm.Received++;
  176. }
  177. if (asm.Received == asm.Total)
  178. {
  179. byte[] jpeg = JoinParts(asm.Parts);
  180. _assemblies.Remove(frameId);
  181. EnqueueLatest(jpeg);
  182. }
  183. }
  184. }
  185. private void CleanupTimeouts()
  186. {
  187. long now = Stopwatch.GetTimestamp();
  188. lock (_asmLock)
  189. {
  190. if (_assemblies.Count == 0) return;
  191. List<uint> toRemove = null;
  192. foreach (var kv in _assemblies)
  193. {
  194. double elapsedMs = (now - kv.Value.LastTick) * TickToMs;
  195. if (elapsedMs > _frameTimeoutMs)
  196. {
  197. toRemove ??= new List<uint>(4);
  198. toRemove.Add(kv.Key);
  199. }
  200. }
  201. if (toRemove != null)
  202. {
  203. foreach (uint k in toRemove) _assemblies.Remove(k);
  204. }
  205. }
  206. }
  207. private void EnqueueLatest(byte[] jpeg)
  208. {
  209. if (jpeg == null || jpeg.Length < 4) return;
  210. while (Volatile.Read(ref _queuedCount) >= _maxQueueSize && _jpegQueue.TryDequeue(out _))
  211. Interlocked.Decrement(ref _queuedCount);
  212. _jpegQueue.Enqueue(jpeg);
  213. Interlocked.Increment(ref _queuedCount);
  214. }
  215. private static byte[] JoinParts(byte[][] parts)
  216. {
  217. int totalLen = 0;
  218. for (int i = 0; i < parts.Length; i++)
  219. totalLen += parts[i]?.Length ?? 0;
  220. byte[] joined = new byte[totalLen];
  221. int offset = 0;
  222. for (int i = 0; i < parts.Length; i++)
  223. {
  224. byte[] p = parts[i];
  225. if (p == null || p.Length == 0) continue;
  226. Buffer.BlockCopy(p, 0, joined, offset, p.Length);
  227. offset += p.Length;
  228. }
  229. return joined;
  230. }
  231. private static ushort ReadU16LE(byte[] d, int o)
  232. {
  233. return (ushort)(d[o] | (d[o + 1] << 8));
  234. }
  235. private static uint ReadU32LE(byte[] d, int o)
  236. {
  237. return (uint)(d[o] | (d[o + 1] << 8) | (d[o + 2] << 16) | (d[o + 3] << 24));
  238. }
  239. public void Dispose()
  240. {
  241. Stop();
  242. }
  243. }
  244. }