udp_result_sender.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. """
  2. UDP结果发送器 - 将算法结果发送给Unity
  3. 协议格式:版本(2) + 长度(2) + 有效性(1) + 匹配数(2) + 内点比例(4) + 相机X(4) + 相机Y(4) + 校验和(1) = 20字节
  4. 所有字段均为小端序
  5. Roma 工程内独立一份(不依赖 LightGlue_Deployment)。
  6. """
  7. import socket
  8. import struct
  9. from typing import Optional
  10. class UDPResultSender:
  11. PROTOCOL_VERSION = 0x0001
  12. PACKET_SIZE = 20
  13. DATA_SIZE = 15
  14. def __init__(self, unity_ip: str = "127.0.0.1", unity_port: int = 12348):
  15. self.unity_ip = unity_ip
  16. self.unity_port = unity_port
  17. self.socket: Optional[socket.socket] = None
  18. self._initialize_socket()
  19. def _initialize_socket(self) -> None:
  20. try:
  21. self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  22. print(f"[UDP] Result sender initialized: {self.unity_ip}:{self.unity_port}", flush=True)
  23. except Exception as exc:
  24. print(f"[UDP] Failed to initialize result sender: {exc}", flush=True)
  25. self.socket = None
  26. def send_result(
  27. self,
  28. is_valid: bool,
  29. num_matches: int,
  30. inliers_ratio: float,
  31. camera_x: float,
  32. camera_y: float,
  33. ) -> bool:
  34. if self.socket is None:
  35. return False
  36. try:
  37. body = struct.pack(
  38. "<B H f f f",
  39. 1 if is_valid else 0,
  40. int(num_matches) & 0xFFFF,
  41. float(inliers_ratio),
  42. float(camera_x),
  43. float(camera_y),
  44. )
  45. checksum = sum(body) % 256
  46. packet = struct.pack("<HH", self.PROTOCOL_VERSION, self.DATA_SIZE) + body + struct.pack("B", checksum)
  47. self.socket.sendto(packet, (self.unity_ip, self.unity_port))
  48. return True
  49. except Exception as exc:
  50. print(f"[UDP] Failed to send result: {exc}", flush=True)
  51. return False
  52. def close(self) -> None:
  53. if self.socket is None:
  54. return
  55. try:
  56. self.socket.close()
  57. print("[UDP] Result sender closed", flush=True)
  58. except Exception as exc:
  59. print(f"[UDP] Error closing result sender: {exc}", flush=True)
  60. finally:
  61. self.socket = None