| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- """
- UDP结果发送器 - 将算法结果发送给Unity
- 协议格式:版本(2) + 长度(2) + 有效性(1) + 匹配数(2) + 内点比例(4) + 相机X(4) + 相机Y(4) + 校验和(1) = 20字节
- 所有字段均为小端序
- Roma 工程内独立一份(不依赖 LightGlue_Deployment)。
- """
- import socket
- import struct
- from typing import Optional
- class UDPResultSender:
- PROTOCOL_VERSION = 0x0001
- PACKET_SIZE = 20
- DATA_SIZE = 15
- def __init__(self, unity_ip: str = "127.0.0.1", unity_port: int = 12348):
- self.unity_ip = unity_ip
- self.unity_port = unity_port
- self.socket: Optional[socket.socket] = None
- self._initialize_socket()
- def _initialize_socket(self) -> None:
- try:
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- print(f"[UDP] Result sender initialized: {self.unity_ip}:{self.unity_port}", flush=True)
- except Exception as exc:
- print(f"[UDP] Failed to initialize result sender: {exc}", flush=True)
- self.socket = None
- def send_result(
- self,
- is_valid: bool,
- num_matches: int,
- inliers_ratio: float,
- camera_x: float,
- camera_y: float,
- ) -> bool:
- if self.socket is None:
- return False
- try:
- body = struct.pack(
- "<B H f f f",
- 1 if is_valid else 0,
- int(num_matches) & 0xFFFF,
- float(inliers_ratio),
- float(camera_x),
- float(camera_y),
- )
- checksum = sum(body) % 256
- packet = struct.pack("<HH", self.PROTOCOL_VERSION, self.DATA_SIZE) + body + struct.pack("B", checksum)
- self.socket.sendto(packet, (self.unity_ip, self.unity_port))
- return True
- except Exception as exc:
- print(f"[UDP] Failed to send result: {exc}", flush=True)
- return False
- def close(self) -> None:
- if self.socket is None:
- return
- try:
- self.socket.close()
- print("[UDP] Result sender closed", flush=True)
- except Exception as exc:
- print(f"[UDP] Error closing result sender: {exc}", flush=True)
- finally:
- self.socket = None
|