| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667 |
- #!/usr/bin/env python3
- """
- RoMa demo with asynchronous video streaming and camera position tracking.
- This script mirrors the input handling style used in the LightGlue demos:
- - webcam / IP camera / video file / image directory
- - UDP JPEG stream via udp://host:port (requires udp_jpeg_receiver.py)
- - async frame reader to reduce capture-side blocking
- """
- from pathlib import Path
- import argparse
- import queue
- import threading
- import time
- import os
- import sys
- import cv2
- import numpy as np
- import torch
- from PIL import Image
- from romatch import roma_outdoor, tiny_roma_v1_outdoor
- try:
- from udp_jpeg_receiver import UDPJPEGReceiver
- except ImportError:
- UDPJPEGReceiver = None
- torch.set_grad_enabled(False)
- class AverageTimer:
- """Class to help manage printing simple timing of code execution."""
- def __init__(self, smoothing=0.3, newline=False):
- self.smoothing = smoothing
- self.newline = newline
- self.times = {}
- self.will_print = {}
- self.reset()
- def reset(self):
- now = time.time()
- self.start = now
- self.last_time = now
- for name in self.will_print:
- self.will_print[name] = False
- def update(self, name="default"):
- now = time.time()
- dt = now - self.last_time
- if name in self.times:
- dt = self.smoothing * dt + (1 - self.smoothing) * self.times[name]
- self.times[name] = dt
- self.will_print[name] = True
- self.last_time = now
- def print(self, text="Timer"):
- total = 0.0
- print(f"[{text}]", end=" ")
- for key in self.times:
- val = self.times[key]
- if self.will_print[key]:
- print(f"{key}={val:.3f}", end=" ")
- total += val
- if total > 0:
- print(f"total={total:.3f} sec {{{1.0 / total:.1f} FPS}}", end=" ")
- else:
- print("total=0.000 sec {inf FPS}", end=" ")
- if self.newline:
- print(flush=True)
- else:
- print(end="\r", flush=True)
- self.reset()
- class VideoStreamer:
- """Class to help with reading images from a video stream."""
- def __init__(self, source, resize, skip, image_glob, max_length=1_000_000):
- self.source = source
- self.skip = skip
- self.max_length = max_length
- self.resize = resize
- self.i = 0
- self.cap = None
- self.is_ip_camera = False
- self.is_udp_jpeg = False
- self.udp_receiver = None
- self._is_digit_source = isinstance(source, int) or (
- isinstance(source, str) and source.isdigit()
- )
- if isinstance(source, str) and source.startswith("udp://"):
- if UDPJPEGReceiver is None:
- raise ImportError(
- "UDPJPEGReceiver not available. Make sure udp_jpeg_receiver.py exists."
- )
- parts = source.replace("udp://", "").split(":")
- if len(parts) == 2:
- host = parts[0] if parts[0] else "0.0.0.0"
- port = int(parts[1])
- else:
- host = "0.0.0.0"
- port = int(parts[0])
- self.is_udp_jpeg = True
- self.udp_receiver = UDPJPEGReceiver(host=host, port=port)
- self.udp_receiver.start()
- print(f"UDP JPEG receiver initialized: {host}:{port}")
- elif Path(source).is_dir():
- self.listing = []
- for ext in image_glob:
- self.listing.extend(list(Path(source).glob(ext)))
- self.listing = self.listing[: self.max_length]
- self.max_length = len(self.listing)
- if self.max_length == 0:
- raise IOError(f"No images found in directory: {source}")
- print(f"Found {self.max_length} images in {source}")
- elif Path(source).exists():
- self.cap = cv2.VideoCapture(source)
- else:
- if not self._is_digit_source and not Path(source).exists():
- self.is_ip_camera = True
- self.cap = cv2.VideoCapture(source, cv2.CAP_FFMPEG)
- self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- else:
- self.cap = cv2.VideoCapture(int(source) if self._is_digit_source else source)
- def _resize_if_needed(self, frame):
- if len(self.resize) == 2:
- h, w = frame.shape[:2]
- if w != self.resize[0] or h != self.resize[1]:
- frame = cv2.resize(frame, tuple(self.resize))
- elif len(self.resize) == 1 and self.resize[0] > 0:
- h, w = frame.shape[:2]
- scale = self.resize[0] / max(h, w)
- new_w, new_h = int(w * scale), int(h * scale)
- frame = cv2.resize(frame, (new_w, new_h))
- return frame
- def next_frame(self):
- if self.is_udp_jpeg:
- frame = self.udp_receiver.get_image(timeout=0.1)
- if frame is None:
- return None, False
- return self._resize_if_needed(frame), True
- if self.cap is not None:
- if self.is_ip_camera:
- for _ in range(2):
- if not self.cap.grab():
- break
- ret, frame = self.cap.read()
- if not ret:
- return None, False
- frame = self._resize_if_needed(frame)
- for _ in range(self.skip):
- ret, _ = self.cap.read()
- if not ret:
- break
- return frame, True
- if self.i >= self.max_length:
- return None, False
- image_file = self.listing[self.i]
- frame = cv2.imread(str(image_file), cv2.IMREAD_COLOR)
- if frame is None:
- print(f"Failed to load image: {image_file}")
- return None, False
- self.i += 1
- return self._resize_if_needed(frame), True
- def cleanup(self):
- if self.is_udp_jpeg and self.udp_receiver is not None:
- self.udp_receiver.stop()
- if self.cap is not None:
- self.cap.release()
- class AsyncVideoStreamer:
- """Background frame reader that keeps only the latest frame."""
- def __init__(self, streamer: VideoStreamer, queue_size: int = 1, timeout: float = 1.0):
- self.streamer = streamer
- self.queue: "queue.Queue[np.ndarray]" = queue.Queue(maxsize=max(queue_size, 1))
- self.timeout = timeout
- self._stop_requested = False
- self._has_error = False
- self._thread = threading.Thread(target=self._reader, name="AsyncVideoStreamer", daemon=True)
- self._thread.start()
- def _reader(self):
- try:
- while not self._stop_requested:
- frame, ret = self.streamer.next_frame()
- if not ret:
- if hasattr(self.streamer, "is_udp_jpeg") and self.streamer.is_udp_jpeg:
- time.sleep(0.01)
- continue
- self._stop_requested = True
- break
- if self.queue.full():
- try:
- self.queue.get_nowait()
- except queue.Empty:
- pass
- self.queue.put(frame)
- except Exception as exc: # pylint: disable=broad-except
- self._has_error = True
- print(f"[AsyncVideoStreamer] Reader thread error: {exc}")
- finally:
- self._stop_requested = True
- def read(self):
- if self._has_error:
- return None, False
- try:
- frame = self.queue.get(timeout=self.timeout)
- return frame, True
- except queue.Empty:
- return None, False
- def stop(self):
- self._stop_requested = True
- if self._thread.is_alive():
- self._thread.join(timeout=1.0)
- self.streamer.cleanup()
- def maybe_resize(input_frame, resize_opt):
- if len(resize_opt) == 2:
- return cv2.resize(input_frame, tuple(resize_opt))
- if len(resize_opt) == 1 and resize_opt[0] > 0:
- h, w = input_frame.shape[:2]
- scale = resize_opt[0] / max(h, w)
- return cv2.resize(input_frame, (int(w * scale), int(h * scale)))
- return input_frame
- def apply_orientation(frame, opt):
- if opt.rotate == 90:
- frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
- elif opt.rotate == 180:
- frame = cv2.rotate(frame, cv2.ROTATE_180)
- elif opt.rotate == 270:
- frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
- if opt.flip_horizontal:
- frame = cv2.flip(frame, 1)
- if opt.flip_vertical:
- frame = cv2.flip(frame, 0)
- return frame
- def draw_camera_position_on_reference(
- reference_frame_bgr,
- camera_center_ref,
- has_valid_pose,
- num_matches=0,
- min_matches=30,
- inliers_ratio=0.0,
- trail_points=None,
- trail_thickness=2,
- status_text="",
- ):
- h_ref, w_ref = reference_frame_bgr.shape[:2]
- ref_colored = reference_frame_bgr.copy()
- center_ref = (w_ref // 2, h_ref // 2)
- cv2.circle(ref_colored, center_ref, 12, (0, 255, 0), 2)
- cv2.line(ref_colored, (center_ref[0] - 15, center_ref[1]), (center_ref[0] + 15, center_ref[1]), (0, 255, 0), 2)
- cv2.line(ref_colored, (center_ref[0], center_ref[1] - 15), (center_ref[0], center_ref[1] + 15), (0, 255, 0), 2)
- if (not has_valid_pose) or camera_center_ref is None or num_matches < min_matches:
- if not status_text:
- status_text = f"Pose unavailable. matches={num_matches}, inliers={inliers_ratio:.1%}"
- cv2.putText(ref_colored, status_text, (10, 30),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
- return ref_colored
- cxy = (int(camera_center_ref[0]), int(camera_center_ref[1]))
- if trail_points is not None and len(trail_points) >= 2:
- for idx in range(1, len(trail_points)):
- p0 = (int(trail_points[idx - 1][0]), int(trail_points[idx - 1][1]))
- p1 = (int(trail_points[idx][0]), int(trail_points[idx][1]))
- cv2.line(ref_colored, p0, p1, (255, 255, 0), trail_thickness, cv2.LINE_AA)
- cv2.circle(ref_colored, cxy, 12, (0, 0, 255), 2)
- cv2.line(ref_colored, (cxy[0] - 15, cxy[1]), (cxy[0] + 15, cxy[1]), (0, 0, 255), 2)
- cv2.line(ref_colored, (cxy[0], cxy[1] - 15), (cxy[0], cxy[1] + 15), (0, 0, 255), 2)
- cv2.line(ref_colored, center_ref, cxy, (255, 0, 255), 2)
- cv2.putText(ref_colored, f"Matches: {num_matches}", (10, 30),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
- cv2.putText(ref_colored, f"Inliers: {inliers_ratio:.1%}", (10, 60),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
- return ref_colored
- def frame_bgr_to_pil_rgb(frame_bgr):
- return Image.fromarray(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))
- def parse_args():
- parser = argparse.ArgumentParser(
- description="RoMa demo (asynchronous capture)",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter,
- )
- parser.add_argument("--input", type=str, default="0", help="USB webcam index, IP camera URL, UDP stream (udp://host:port), or video path")
- parser.add_argument("--reference_image", type=str, default=None, help="Optional reference image path")
- parser.add_argument("--output_dir", type=str, default=None, help="Directory to save visualisations")
- parser.add_argument("--image_glob", type=str, nargs="+", default=["*.png", "*.jpg", "*.jpeg"], help="Glob for image sequences")
- parser.add_argument("--skip", type=int, default=0, help="Number of frames to skip between reads")
- parser.add_argument("--max_length", type=int, default=1_000_000, help="Maximum frames")
- parser.add_argument(
- "--resize",
- type=int,
- nargs="+",
- default=[640, 480],
- help="Resize input image. Two numbers = width height, one number = max dimension, -1 = no resize",
- )
- parser.add_argument("--model", type=str, default="tiny", choices=["tiny", "roma"], help="Model variant")
- parser.add_argument("--sample_num", type=int, default=2500, help="Number of sampled correspondences")
- parser.add_argument("--sample_thresh", type=float, default=0.05, help="Sampling certainty threshold")
- parser.add_argument("--ransac_reproj_threshold", type=float, default=3.0, help="RANSAC reprojection threshold in pixels")
- parser.add_argument("--min_matches", type=int, default=30, help="Minimum matches required to compute homography")
- parser.add_argument("--min_inlier_ratio", type=float, default=0.25, help="Minimum inlier ratio to accept homography")
- parser.add_argument("--smooth_alpha", type=float, default=0.8, help="EMA smoothing factor for camera position")
- parser.add_argument("--roma_interval", type=int, default=10, help="Run RoMa every N frames (higher N = faster)")
- parser.add_argument("--trail_len", type=int, default=80, help="Max number of points in trajectory")
- parser.add_argument("--trail_thickness", type=int, default=2, help="Trajectory line thickness")
- parser.add_argument("--queue_size", type=int, default=1, help="Frame queue size for async reader")
- parser.add_argument("--read_timeout", type=float, default=1.0, help="Seconds to wait for a frame from async reader")
- parser.add_argument("--flip_horizontal", action="store_true", help="Flip frames horizontally")
- parser.add_argument("--flip_vertical", action="store_true", help="Flip frames vertically")
- parser.add_argument("--rotate", type=int, default=0, choices=[0, 90, 180, 270], help="Rotate frames clockwise")
- parser.add_argument("--show_fps", action="store_true", help="Render FPS overlay")
- parser.add_argument("--max_fps", type=float, default=90.0, help="Cap processing FPS (<=0 disables cap)")
- parser.add_argument(
- "--timer_print_interval",
- type=float,
- default=0.5,
- help="Seconds between console timer prints (0 = every frame).",
- )
- parser.add_argument(
- "--max_display_fps",
- type=float,
- default=0.0,
- help="Cap rendering/display FPS only (0 = unlimited).",
- )
- parser.add_argument(
- "--idle_sleep_ms",
- type=float,
- default=2.0,
- help="Sleep time in milliseconds when a frame is skipped for rendering (<=0 disables).",
- )
- parser.add_argument("--force_cpu", action="store_true", help="Run inference on CPU even if CUDA is available")
- parser.add_argument("--no_ip_grab", action="store_true", help="Disable extra grab calls for IP cameras")
- parser.add_argument("--no_display", action="store_true", help="Disable OpenCV window")
- parser.add_argument("--no_ui", action="store_true", help="Suppress console output (UI embedding)")
- return parser.parse_args()
- def main():
- opt = parse_args()
- if len(opt.resize) == 2 and opt.resize[1] == -1:
- opt.resize = opt.resize[0:1]
- if len(opt.resize) == 2:
- print(f"Will resize to {opt.resize[0]}x{opt.resize[1]} (WxH)")
- elif len(opt.resize) == 1 and opt.resize[0] > 0:
- print(f"Will resize max dimension to {opt.resize[0]}")
- elif len(opt.resize) == 1:
- print("Will not resize images")
- else:
- raise ValueError("Cannot specify more than two integers for --resize")
- if opt.no_ui:
- sys.stdout = open(os.devnull, "w")
- sys.stderr = open(os.devnull, "w")
- device = "cuda" if torch.cuda.is_available() and not opt.force_cpu else "cpu"
- print(f'Running inference on device "{device}"')
- if opt.model == "tiny":
- roma_model = tiny_roma_v1_outdoor(device=torch.device(device))
- else:
- roma_model = roma_outdoor(device=torch.device(device))
- roma_model.sample_thresh = opt.sample_thresh
- if opt.reference_image is not None:
- print(f"==> Loading reference image: {opt.reference_image}")
- ref_frame = cv2.imread(opt.reference_image, cv2.IMREAD_COLOR)
- if ref_frame is None:
- raise IOError(f"Cannot load reference image: {opt.reference_image}")
- ref_frame = maybe_resize(ref_frame, opt.resize)
- ref_frame = apply_orientation(ref_frame, opt)
- else:
- ref_frame = None
- streamer = VideoStreamer(opt.input, opt.resize, opt.skip, opt.image_glob, opt.max_length)
- if hasattr(streamer, "is_udp_jpeg") and streamer.is_udp_jpeg:
- print("UDP JPEG mode: receiver started in background thread")
- elif hasattr(streamer, "cap") and streamer.cap is not None and opt.no_ip_grab and hasattr(streamer, "is_ip_camera"):
- streamer.is_ip_camera = False
- print("IP camera buffer flush disabled (no extra grab calls).")
- async_streamer = AsyncVideoStreamer(streamer, queue_size=opt.queue_size, timeout=opt.read_timeout)
- if ref_frame is None:
- first_frame, ret = async_streamer.read()
- if not ret:
- raise RuntimeError("Error when reading first frame.")
- ref_frame = apply_orientation(first_frame, opt)
- print("Using first frame as reference")
- if opt.output_dir is not None:
- Path(opt.output_dir).mkdir(exist_ok=True)
- print(f"==> Will write outputs to {opt.output_dir}")
- window_name_ref = "Camera Position in Reference"
- window_name_cam = "Live Camera"
- if not opt.no_display:
- try:
- cv2.namedWindow(window_name_ref, cv2.WINDOW_NORMAL)
- cv2.resizeWindow(window_name_ref, 640, 480)
- cv2.namedWindow(window_name_cam, cv2.WINDOW_NORMAL)
- cv2.resizeWindow(window_name_cam, 640, 480)
- except cv2.error as err:
- print(f"Warning: Could not create OpenCV windows: {err}")
- opt.no_display = True
- print("==> Keyboard control:\n\tn: set current frame as reference\n\tq: quit\n\tf: toggle FPS overlay\n")
- timer = AverageTimer(newline=True)
- show_fps = opt.show_fps
- fps_display = 0.0
- last_fps_print_time = time.time()
- fps_print_interval = 2.0
- last_timer_print_time = time.time()
- last_display_time = time.perf_counter()
- frame_id = 0
- # Global / fast-tracker state
- last_good_H_slow = None # from RoMa
- H_ref2cur_fast = None # fused pose used for drawing
- prev_gray = None
- prev_kpts = None
- orb = cv2.ORB_create(nfeatures=800)
- last_camera_center_ref = None
- trail_points = []
- try:
- while True:
- loop_start_time = time.time()
- frame_id += 1
- frame, ret = async_streamer.read()
- if not ret:
- if hasattr(streamer, "is_udp_jpeg") and streamer.is_udp_jpeg:
- continue
- print("Stream ended or timeout exceeded.")
- break
- frame = apply_orientation(frame, opt)
- timer.update("data")
- h_ref, w_ref = ref_frame.shape[:2]
- h_cur, w_cur = frame.shape[:2]
- H_ref_to_cur = None
- inliers_ratio = 0.0
- num_matches = 0
- status_text = ""
- # -------- Slow level: RoMa global alignment every roma_interval frames --------
- run_roma_this_frame = (frame_id % opt.roma_interval == 0) or (last_good_H_slow is None)
- if run_roma_this_frame:
- ref_pil = frame_bgr_to_pil_rgb(ref_frame)
- cur_pil = frame_bgr_to_pil_rgb(frame)
- if opt.model == "tiny":
- warp, certainty = roma_model.match(ref_pil, cur_pil)
- else:
- warp, certainty = roma_model.match(ref_pil, cur_pil, device=torch.device(device))
- matches, _certainty_samples = roma_model.sample(warp, certainty, num=opt.sample_num)
- k_ref, k_cur = roma_model.to_pixel_coordinates(matches, h_ref, w_ref, h_cur, w_cur)
- pts_ref = k_ref.detach().cpu().numpy().astype(np.float32)
- pts_cur = k_cur.detach().cpu().numpy().astype(np.float32)
- num_matches = len(pts_ref)
- if num_matches >= opt.min_matches:
- H_tmp, mask = cv2.findHomography(
- pts_ref, pts_cur, cv2.RANSAC, opt.ransac_reproj_threshold
- )
- if H_tmp is not None and mask is not None:
- inliers_ratio = float(mask.mean())
- if inliers_ratio >= opt.min_inlier_ratio:
- last_good_H_slow = H_tmp
- H_ref2cur_fast = H_tmp.copy()
- status_text = f"RoMa pose. matches={num_matches}, inliers={inliers_ratio:.1%}"
- else:
- status_text = (
- f"RoMa low inliers: {inliers_ratio:.1%} < {opt.min_inlier_ratio:.1%}"
- )
- else:
- status_text = "RoMa homography failed."
- else:
- status_text = f"RoMa insufficient matches: {num_matches}/{opt.min_matches}"
- # reset fast tracker state on each RoMa update
- prev_gray = None
- prev_kpts = None
- timer.update("forward")
- else:
- # -------- Fast level: ORB + LK between consecutive frames --------
- frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- if prev_gray is None:
- prev_gray = frame_gray
- prev_kpts = None
- if prev_kpts is None or len(prev_kpts) < 100:
- kpts = orb.detect(prev_gray, None)
- kpts = sorted(kpts, key=lambda k: -k.response)[:500]
- prev_kpts = np.array([k.pt for k in kpts], dtype=np.float32) if kpts else None
- if prev_kpts is not None and len(prev_kpts) >= 20:
- next_pts, status, _err = cv2.calcOpticalFlowPyrLK(
- prev_gray, frame_gray, prev_kpts, None,
- winSize=(21, 21), maxLevel=3,
- criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01),
- )
- st = status.squeeze()
- good_prev = prev_kpts[st == 1]
- good_next = next_pts[st == 1]
- num_matches = len(good_prev)
- if num_matches >= 20:
- H_prev2cur_fast, mask = cv2.findHomography(
- good_prev, good_next, cv2.RANSAC, 3.0
- )
- if H_prev2cur_fast is not None and H_ref2cur_fast is not None:
- inliers_ratio = float(mask.mean()) if mask is not None else 0.0
- H_ref2cur_fast = H_prev2cur_fast @ H_ref2cur_fast
- status_text = f"Fast pose. matches={num_matches}, inliers={inliers_ratio:.1%}"
- else:
- status_text = "Fast homography failed."
- prev_kpts = good_next.reshape(-1, 2)
- prev_gray = frame_gray
- else:
- status_text = "Fast tracker: not enough keypoints."
- timer.update("forward")
- camera_center_current = (w_cur // 2, h_cur // 2)
- camera_center_ref = None
- has_valid_pose = False
- H_for_pose = H_ref2cur_fast if H_ref2cur_fast is not None else last_good_H_slow
- if H_for_pose is not None and num_matches >= opt.min_matches:
- try:
- H_cur_to_ref = np.linalg.inv(H_for_pose)
- center_cur = np.array([[camera_center_current]], dtype=np.float32)
- center_ref_now = cv2.perspectiveTransform(center_cur, H_cur_to_ref)[0, 0]
- center_ref_now = np.clip(center_ref_now, [0, 0], [w_ref - 1, h_ref - 1])
- if last_camera_center_ref is None:
- camera_center_ref = center_ref_now
- else:
- camera_center_ref = (
- opt.smooth_alpha * last_camera_center_ref
- + (1.0 - opt.smooth_alpha) * center_ref_now
- )
- last_camera_center_ref = camera_center_ref
- has_valid_pose = True
- trail_points.append(camera_center_ref.copy())
- if len(trail_points) > opt.trail_len:
- trail_points = trail_points[-opt.trail_len :]
- except np.linalg.LinAlgError:
- has_valid_pose = False
- should_render = True
- if opt.max_display_fps and opt.max_display_fps > 0:
- display_dt = 1.0 / float(opt.max_display_fps)
- if (time.perf_counter() - last_display_time) < display_dt:
- should_render = False
- if should_render:
- ref_view = draw_camera_position_on_reference(
- ref_frame,
- camera_center_ref,
- has_valid_pose,
- num_matches=num_matches,
- min_matches=opt.min_matches,
- inliers_ratio=inliers_ratio,
- trail_points=trail_points,
- trail_thickness=opt.trail_thickness,
- status_text=status_text,
- )
- if show_fps:
- cv2.putText(ref_view, f"FPS: {fps_display:.1f}", (10, 90),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
- if not opt.no_display:
- cv2.imshow(window_name_ref, ref_view)
- # Show raw incoming frame for debugging / monitoring.
- # (frame is already oriented according to --rotate/flip)
- cv2.imshow(window_name_cam, frame)
- key = cv2.waitKey(1) & 0xFF
- else:
- key = 0
- last_display_time = time.perf_counter()
- else:
- if opt.idle_sleep_ms > 0:
- time.sleep(opt.idle_sleep_ms / 1000.0)
- key = 0
- now = time.time()
- if opt.timer_print_interval <= 0 or (now - last_timer_print_time) >= opt.timer_print_interval:
- timer.print("RoMa-Async")
- last_timer_print_time = now
- if key == ord("q"):
- print("Exiting via keyboard (q)")
- break
- if key == ord("n"):
- ref_frame = frame.copy()
- last_good_H_slow = None
- H_ref2cur_fast = None
- last_camera_center_ref = None
- trail_points = []
- print("Updated reference frame")
- elif key == ord("f"):
- show_fps = not show_fps
- if opt.output_dir is not None and should_render:
- out_file = Path(opt.output_dir, f"camera_pos_{frame_id:06d}.png")
- cv2.imwrite(str(out_file), ref_view)
- frame_id += 1
- loop_elapsed = time.time() - loop_start_time
- if opt.max_fps > 0:
- target_dt = 1.0 / opt.max_fps
- if loop_elapsed < target_dt:
- time.sleep(target_dt - loop_elapsed)
- loop_elapsed = target_dt
- dt = max(loop_elapsed, 1e-6)
- fps_display = 0.9 * fps_display + 0.1 * (1.0 / dt)
- now = time.time()
- if now - last_fps_print_time >= fps_print_interval:
- print(f"[FPS] {fps_display:.1f} FPS")
- last_fps_print_time = now
- finally:
- async_streamer.stop()
- try:
- cv2.destroyAllWindows()
- except Exception:
- pass
- if __name__ == "__main__":
- main()
|