#!/usr/bin/env python3 """ 使用仓库内 ``python/RapidOCR`` 对**图片**做 OCR。 ``ocr_find_text_center``:在单行识别结果中找完整包含目标子串的框(与 CLI 一致)。 ``ocr_find_text_center_allowing_note_title_wrapped_across_two_ocr_lines``(笔记标题锚点): 仅使用用户字符串 **第一逻辑行**(首个换行符之前);第二行及以后 **不参与** 匹配与几何计算。 对第一行先做 **NFKC**,再抠掉难以 OCR 的字符得到多段可识别文本。整图 **只跑一次** OCR, 每段取面积最小匹配框中心;同一段内多段则取首末段中心的中点作为代表点(不再跨多逻辑行拼接)。 与 Playwright / 浏览器 DOM 无关;仅输入图片路径或字节。 用法:``python workplace/ocr-pos.py <目标文字> <图片路径>`` 成功时 stdout 一行:``cx cy``(整数)。 可调模块级常量:``OCR_RAPIDOCR_GLOBAL_MAX_SIDE_LEN``、``OCR_RAPIDOCR_INFERENCE_ENGINE_NAME``、 ``OCR_RAPIDOCR_ONNX_RUNTIME_USE_CUDA``(仅 ``onnxruntime`` 引擎时对应 ``EngineConfig.onnxruntime.use_cuda``)。 可选推理依赖见 ``python/RapidOCR/requirements-inference-*.txt``。 """ from __future__ import annotations import sys import unicodedata from pathlib import Path from typing import Any import cv2 import numpy as np _REPO_ROOT = Path(__file__).resolve().parent.parent _RAPIDOCR_PKG_ROOT = _REPO_ROOT / "python" / "RapidOCR" / "python" if str(_RAPIDOCR_PKG_ROOT) not in sys.path: sys.path.insert(0, str(_RAPIDOCR_PKG_ROOT)) REPO_ROOT: Path = _REPO_ROOT # OCR_RAPIDOCR_GLOBAL_MAX_SIDE_LEN: int = 2000 # 提速可试 1280 或 960(顶栏 OCR 常够用;过小易丢小字) # OCR_RAPIDOCR_INFERENCE_ENGINE_NAME: str = "onnxruntime" # 可选 openvino(Intel CPU 常较快)/ tensorrt(需 NVIDIA 环境) # OCR_RAPIDOCR_ONNX_RUNTIME_USE_CUDA: bool = False # 已装 onnxruntime-gpu 且要用 GPU 时改为 True OCR_RAPIDOCR_GLOBAL_MAX_SIDE_LEN: int = 2000 # 提速可试 1280 或 960(顶栏 OCR 常够用;过小易丢小字) OCR_RAPIDOCR_INFERENCE_ENGINE_NAME: str = "onnxruntime" # 与 requirements 中 onnxruntime 一致;本机已装 openvino 可改 openvino OCR_RAPIDOCR_ONNX_RUNTIME_USE_CUDA: bool = False # 已装 onnxruntime-gpu 且要用 GPU 时改为 True # 下方为 RapidOCR 单例缓存(勿改此变量)。可切换的只有上面三个常量;改完后须重启进程才会重新建引擎。 _ocr_engine_singleton = None def _build_rapid_ocr_params_dictionary() -> dict[str, Any]: from rapidocr.utils.typings import EngineType inference_engine_name_normalized = ( OCR_RAPIDOCR_INFERENCE_ENGINE_NAME.strip().lower() ) rapid_ocr_params: dict[str, Any] = { "Global.max_side_len": int(OCR_RAPIDOCR_GLOBAL_MAX_SIDE_LEN), "Global.use_cls": False, } if inference_engine_name_normalized == "openvino": rapid_ocr_params["Det.engine_type"] = EngineType.OPENVINO rapid_ocr_params["Cls.engine_type"] = EngineType.OPENVINO rapid_ocr_params["Rec.engine_type"] = EngineType.OPENVINO return rapid_ocr_params if inference_engine_name_normalized == "tensorrt": rapid_ocr_params["Det.engine_type"] = EngineType.TENSORRT rapid_ocr_params["Cls.engine_type"] = EngineType.TENSORRT rapid_ocr_params["Rec.engine_type"] = EngineType.TENSORRT return rapid_ocr_params rapid_ocr_params["Det.engine_type"] = EngineType.ONNXRUNTIME rapid_ocr_params["Cls.engine_type"] = EngineType.ONNXRUNTIME rapid_ocr_params["Rec.engine_type"] = EngineType.ONNXRUNTIME rapid_ocr_params["EngineConfig.onnxruntime.use_cuda"] = bool( OCR_RAPIDOCR_ONNX_RUNTIME_USE_CUDA ) return rapid_ocr_params def _get_rapid_ocr(): global _ocr_engine_singleton if _ocr_engine_singleton is None: from rapidocr import RapidOCR _ocr_engine_singleton = RapidOCR( params=_build_rapid_ocr_params_dictionary(), ) return _ocr_engine_singleton def _rapidocr_input_from_path_bytes_or_bgr_numpy( image: str | Path | bytes | np.ndarray, ): if isinstance(image, np.ndarray): return cv2.cvtColor(image, cv2.COLOR_BGR2RGB) return image def _quad_center_xy(box) -> tuple[float, float]: """ ``box`` 为四角点 (4,2);返回几何中心点。 """ xs = [float(p[0]) for p in box] ys = [float(p[1]) for p in box] return sum(xs) / len(xs), sum(ys) / len(ys) def _normalize_ocr_match_string(s: str) -> str: """去掉首尾空白并去掉内部空白;再将全角标点与半角对齐,便于 metadata 与 RapidOCR 输出对齐。""" collapsed_whitespace_string = "".join((s or "").strip().split()) punctuation_aligned_string = ( collapsed_whitespace_string.replace(",", ",") .replace("?", "?") .replace("(", "(") .replace(")", ")") .replace(":", ":") .replace(";", ";") .replace("!", "!") ) return punctuation_aligned_string.casefold() def _ocr_line_contains_full_target(ocr_line: str, target_text: str) -> bool: """ 仅当 **整段** ``target_text`` 连续出现在本行识别结果中才算匹配。 禁止再用「识别串是目标子串」(如仅「小红书」)去碰「搜索小红书」, 避免中心点落到不完整的字块上(例如「小红书上」里截出的「小红书」)。 """ line = _normalize_ocr_match_string(ocr_line) target = _normalize_ocr_match_string(target_text) if not target: return False return target in line def _ocr_dom_anchor_line_matches_normalized_text( ocr_line: str, anchor_norm: str, ) -> bool: """与 ``clear-input`` 一致:多字用子串匹配,单字要求整行一致。""" norm = _normalize_ocr_match_string(ocr_line) if not anchor_norm: return False if len(anchor_norm) >= 2: return anchor_norm in norm return norm == anchor_norm def ocr_smallest_line_center_for_dom_text( image: str | Path | bytes | np.ndarray, anchor_dom_text: str, ) -> tuple[int, int] | None: """ 在 OCR 结果中找归一后包含 ``anchor_dom_text`` 的行;多行命中时取**框面积最小**的一行, 返回该行中心(屏幕/图像像素)。与 ``clear-input`` 锚点规则一致。 """ anchor_norm = _normalize_ocr_match_string(anchor_dom_text) if not anchor_norm: return None ocr = _get_rapid_ocr() result = ocr(_rapidocr_input_from_path_bytes_or_bgr_numpy(image)) if result is None or not result.txts or result.boxes is None: return None anchor_indices: list[int] = [] for i, txt in enumerate(result.txts): if _ocr_dom_anchor_line_matches_normalized_text(txt, anchor_norm): anchor_indices.append(i) if not anchor_indices: return None def _box_area(idx: int) -> float: box = result.boxes[idx] xs = [float(p[0]) for p in box] ys = [float(p[1]) for p in box] return (max(xs) - min(xs)) * (max(ys) - min(ys)) best_i = min(anchor_indices, key=_box_area) cx, cy = _quad_center_xy(result.boxes[best_i]) return int(round(cx)), int(round(cy)) _CJK_AND_COMMON_TITLE_PUNCTUATION_CHARACTER_FROZEN_SET = frozenset( ",。、:;?!""''()《》·—…-—「」『』【】〈〉&%#@…—", ) def _is_single_character_treated_as_recognizable_by_rapidocr_line_text_anchor( single_unicode_character: str, ) -> bool: if len(single_unicode_character) != 1: return False code_point_integer = ord(single_unicode_character) if single_unicode_character.isascii(): return single_unicode_character.isalnum() or single_unicode_character in " \t.-_/:&!?" if ( 0x4E00 <= code_point_integer <= 0x9FFF or 0x3400 <= code_point_integer <= 0x4DBF or 0x20000 <= code_point_integer <= 0x2CEAF ): return True if single_unicode_character in _CJK_AND_COMMON_TITLE_PUNCTUATION_CHARACTER_FROZEN_SET: return True return False def _split_user_anchor_string_into_recognizable_text_segment_string_list( raw_anchor_string_from_user: str, ) -> list[str]: unicode_nfkc_normalized_anchor_string = unicodedata.normalize( "NFKC", raw_anchor_string_from_user or "", ) contiguous_recognizable_character_list: list[str] = [] recognizable_text_segment_string_list: list[str] = [] for single_character in unicode_nfkc_normalized_anchor_string: if _is_single_character_treated_as_recognizable_by_rapidocr_line_text_anchor( single_character, ): contiguous_recognizable_character_list.append(single_character) else: if contiguous_recognizable_character_list: recognizable_text_segment_string_list.append( "".join(contiguous_recognizable_character_list), ) contiguous_recognizable_character_list = [] if contiguous_recognizable_character_list: recognizable_text_segment_string_list.append( "".join(contiguous_recognizable_character_list), ) return recognizable_text_segment_string_list def _split_anchor_raw_string_into_logical_line_then_recognizable_segment_nested_string_list( raw_anchor_string_from_user: str, ) -> list[list[str]]: anchor_string_with_normalized_newline_only = ( (raw_anchor_string_from_user or "") .replace("\r\n", "\n") .replace("\r", "\n") ) logical_line_raw_string_list = anchor_string_with_normalized_newline_only.split("\n") nested_recognizable_segment_string_list: list[list[str]] = [] for single_logical_line_raw_string in logical_line_raw_string_list: stripped_logical_line_string = single_logical_line_raw_string.strip() if not stripped_logical_line_string: continue recognizable_segment_list_for_one_logical_line = ( _split_user_anchor_string_into_recognizable_text_segment_string_list( stripped_logical_line_string, ) ) if recognizable_segment_list_for_one_logical_line: nested_recognizable_segment_string_list.append( recognizable_segment_list_for_one_logical_line, ) return nested_recognizable_segment_string_list def _midpoint_float_xy_pair_from_first_and_last_center_in_ordered_float_pair_list( ordered_center_xy_float_pair_list: list[tuple[float, float]], ) -> tuple[float, float] | None: if not ordered_center_xy_float_pair_list: return None if len(ordered_center_xy_float_pair_list) == 1: lone_center_x, lone_center_y = ordered_center_xy_float_pair_list[0] return lone_center_x, lone_center_y first_center_x, first_center_y = ordered_center_xy_float_pair_list[0] last_center_x, last_center_y = ordered_center_xy_float_pair_list[-1] return ( (first_center_x + last_center_x) / 2.0, (first_center_y + last_center_y) / 2.0, ) def _midpoint_float_xy_pair_from_closest_pair_of_points_in_float_pair_list( center_xy_float_pair_list: list[tuple[float, float]], ) -> tuple[float, float] | None: point_count = len(center_xy_float_pair_list) if point_count == 0: return None if point_count == 1: lone_x, lone_y = center_xy_float_pair_list[0] return lone_x, lone_y best_squared_euclidean_distance_between_pair: float | None = None midpoint_x_between_closest_pair = 0.0 midpoint_y_between_closest_pair = 0.0 for first_point_index in range(point_count): first_x, first_y = center_xy_float_pair_list[first_point_index] for second_point_index in range(first_point_index + 1, point_count): second_x, second_y = center_xy_float_pair_list[second_point_index] delta_x = first_x - second_x delta_y = first_y - second_y squared_distance = delta_x * delta_x + delta_y * delta_y if ( best_squared_euclidean_distance_between_pair is None or squared_distance < best_squared_euclidean_distance_between_pair ): best_squared_euclidean_distance_between_pair = squared_distance midpoint_x_between_closest_pair = (first_x + second_x) / 2.0 midpoint_y_between_closest_pair = (first_y + second_y) / 2.0 return midpoint_x_between_closest_pair, midpoint_y_between_closest_pair def _ocr_smallest_area_line_index_where_normalized_line_contains_substring( ocr_result: Any, substring_normalized: str, ) -> int | None: if not substring_normalized: return None best_line_index: int | None = None best_area_pixels = float("inf") for line_index, line_text in enumerate(ocr_result.txts): line_normalized = _normalize_ocr_match_string(line_text) if substring_normalized not in line_normalized: continue box = ocr_result.boxes[line_index] xs = [float(p[0]) for p in box] ys = [float(p[1]) for p in box] area_pixels = (max(xs) - min(xs)) * (max(ys) - min(ys)) if area_pixels < best_area_pixels: best_area_pixels = area_pixels best_line_index = line_index return best_line_index def _first_logical_line_only_from_multiline_anchor_string(raw_anchor_string: str) -> str: """只保留第一逻辑行(首个换行前),用于列表页标题锚点:第二行起忽略。""" normalized_newlines_only = (raw_anchor_string or "").replace("\r\n", "\n").replace( "\r", "\n", ) return normalized_newlines_only.split("\n", 1)[0].strip() def ocr_find_text_center_allowing_note_title_wrapped_across_two_ocr_lines( image: str | Path | bytes | np.ndarray, target_text: str, ) -> tuple[int, int] | None: anchor_first_logical_line_only = _first_logical_line_only_from_multiline_anchor_string( target_text, ) nested_recognizable_segment_string_list = ( _split_anchor_raw_string_into_logical_line_then_recognizable_segment_nested_string_list( anchor_first_logical_line_only, ) ) if not nested_recognizable_segment_string_list: return None ocr = _get_rapid_ocr() result = ocr(_rapidocr_input_from_path_bytes_or_bgr_numpy(image)) if result is None or not result.txts or result.boxes is None: return None logical_line_representative_center_xy_float_pair_list: list[tuple[float, float]] = [] for recognizable_segment_raw_string_list_for_one_logical_line in nested_recognizable_segment_string_list: segment_center_xy_float_pair_list_for_one_logical_line: list[tuple[float, float]] = [] for raw_segment_string in recognizable_segment_raw_string_list_for_one_logical_line: normalized_segment_string = _normalize_ocr_match_string(raw_segment_string) if not normalized_segment_string: continue matched_line_index = _ocr_smallest_area_line_index_where_normalized_line_contains_substring( result, normalized_segment_string, ) if matched_line_index is None: return None segment_center_x, segment_center_y = _quad_center_xy( result.boxes[matched_line_index], ) segment_center_xy_float_pair_list_for_one_logical_line.append( (segment_center_x, segment_center_y), ) if not segment_center_xy_float_pair_list_for_one_logical_line: continue one_logical_line_representative_center_xy_float_pair = ( _midpoint_float_xy_pair_from_first_and_last_center_in_ordered_float_pair_list( segment_center_xy_float_pair_list_for_one_logical_line, ) ) if one_logical_line_representative_center_xy_float_pair is not None: logical_line_representative_center_xy_float_pair_list.append( one_logical_line_representative_center_xy_float_pair, ) if not logical_line_representative_center_xy_float_pair_list: return None final_anchor_center_xy_float_pair = ( _midpoint_float_xy_pair_from_closest_pair_of_points_in_float_pair_list( logical_line_representative_center_xy_float_pair_list, ) ) if final_anchor_center_xy_float_pair is None: return None return ( int(round(final_anchor_center_xy_float_pair[0])), int(round(final_anchor_center_xy_float_pair[1])), ) def ocr_find_text_center( image: str | Path | bytes | np.ndarray, target_text: str, *, prefer_highest_score: bool = True, ) -> tuple[int, int] | None: """ 对 ``image``(路径、``Path``、图像字节或 OpenCV BGR ``numpy`` 数组)跑 RapidOCR,找到与 ``target_text`` 最匹配的一行, 返回该行四边形框的**中心点** ``(cx, cy)``(与图像像素坐标一致)。 匹配规则:识别行经空白归一后,必须 **完整包含** ``target_text``(连续子串), 不要求逐字与目标一致,但必须整段关键词都在同一 OCR 框对应的文字里。 多行命中时:优先与目标 **完全一致** 的行,否则取识别串 **更短** 的行(更接近整块占位符), 再按 ``prefer_highest_score`` 用置信度打破平局。 """ ocr = _get_rapid_ocr() result = ocr(_rapidocr_input_from_path_bytes_or_bgr_numpy(image)) if result is None or not result.txts or result.boxes is None: return None target_norm = _normalize_ocr_match_string(target_text) if not target_norm: return None matches: list[tuple[int, float, int, int]] = [] for i, txt in enumerate(result.txts): if not _ocr_line_contains_full_target(txt, target_text): continue line_norm = _normalize_ocr_match_string(txt) exact = 0 if line_norm == target_norm else 1 line_len = len(line_norm) score = float(result.scores[i]) if result.scores and i < len(result.scores) else 0.0 matches.append((i, score, exact, line_len)) if not matches: return None if len(matches) > 1: if prefer_highest_score: matches.sort(key=lambda row: (row[2], row[3], -row[1])) else: matches.sort(key=lambda row: (row[2], row[3], row[1])) idx = matches[0][0] box = result.boxes[idx] cx, cy = _quad_center_xy(box) return int(round(cx)), int(round(cy)) def ocr_find_text_center_by_shortening_prefix_of_anchor_until_match( image: str | Path | bytes | np.ndarray, anchor_text: str, *, minimum_raw_character_count_inclusive: int = 6, prefer_highest_score: bool = True, ) -> tuple[int, int] | None: """ 在整段 ``anchor_text`` 无法命中时,从去掉末尾一字的前缀起,逐步缩短, 直到某一前缀在单行 OCR 中可子串匹配(仍用 ``ocr_find_text_center`` 规则)。 用于识别结果漏掉标题末尾一两字、或个别字与 DOM 不一致时仍能落到标题行。 """ stripped_anchor_text = (anchor_text or "").strip() character_count = len(stripped_anchor_text) if character_count <= minimum_raw_character_count_inclusive: return None for end_exclusive in range( character_count - 1, minimum_raw_character_count_inclusive - 1, -1, ): prefix_only_anchor_text = stripped_anchor_text[:end_exclusive] found_center_xy_integer_pair = ocr_find_text_center( image, prefix_only_anchor_text, prefer_highest_score=prefer_highest_score, ) if found_center_xy_integer_pair is not None: return found_center_xy_integer_pair return None def start(argv: list[str] | None = None) -> int: rest = argv if argv is not None else sys.argv[1:] if len(rest) < 2: print("用法: ocr-pos.py <目标文字> <图片路径>", file=sys.stderr) return 2 target = (rest[0] or "").strip() image_path = Path(rest[1]).expanduser() if not target: print("目标文字不能为空。", file=sys.stderr) return 2 if not image_path.is_file(): print(f"图片不存在: {image_path}", file=sys.stderr) return 2 pt = ocr_find_text_center(image_path, target) if pt is None: print(f"未在图中找到与「{target}」匹配的文字。", file=sys.stderr) return 1 print(f"{pt[0]} {pt[1]}") return 0 def main() -> int: return start() __all__ = [ "REPO_ROOT", "OCR_RAPIDOCR_GLOBAL_MAX_SIDE_LEN", "OCR_RAPIDOCR_INFERENCE_ENGINE_NAME", "OCR_RAPIDOCR_ONNX_RUNTIME_USE_CUDA", "ocr_find_text_center", "ocr_find_text_center_allowing_note_title_wrapped_across_two_ocr_lines", "ocr_find_text_center_by_shortening_prefix_of_anchor_until_match", "ocr_smallest_line_center_for_dom_text", "start", "main", ] if __name__ == "__main__": raise SystemExit(start())