| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- #!/usr/bin/env python3
- """
- 第一步:检测 CDP 远程调试端口是否已有浏览器(``cdp_endpoint_open`` + ``/json/version``);
- 已开启则只附着、不新启 Chrome;否则再按 browser_core 逻辑连接或拉起调试实例。
- 若已是 HOME_URL,则 **不 goto**。默认同站且非空白时也不跳转(保留当前子路径,如搜索结果页);主流程可将 ``same_site_preserves_current_page=False`` 以始终打开 ``home_url``;若同时 ``skip_home_navigation_when_top_search_input_has_text=True``,则顶栏搜索框已有文案时 **不** 先刷新主页,交给 ``clear-input``。仅空白页或(在允许时)跨站时导航到 ``home_url``。
- """
- from __future__ import annotations
- import sys
- import urllib.request
- from collections.abc import Callable
- from pathlib import Path
- from typing import Any, TypeVar
- from urllib.parse import urlparse
- _ROOT = Path(__file__).resolve().parent.parent
- if str(_ROOT) not in sys.path:
- sys.path.insert(0, str(_ROOT))
- from workplace.cdp_chrome_browser_core import cdp_endpoint_open # noqa: E402
- from workplace.playwright import ( # noqa: E402
- DEFAULT_CDP_URL,
- _is_blank_url,
- connect_browser_via_cdp,
- pick_target_page,
- sync_playwright,
- )
- _T = TypeVar("_T")
- _HOME_PAGE_DOCUMENT_LOAD_WAIT_TIMEOUT_MILLISECONDS = 20_000
- # 与 ``workplace/input-keyword/find_search_input.py`` 中顶栏搜索框一致(用于判断是否已有文案、避免先 goto 冲掉)。
- _TOP_BAR_SEARCH_INPUT_SELECTOR = "#search-input"
- def _top_search_input_has_non_empty_text(page: Any) -> bool:
- """当前页存在顶栏 ``#search-input`` 且 ``input_value`` 去空白后非空时返回 True;否则 False。"""
- try:
- locator = page.locator(_TOP_BAR_SEARCH_INPUT_SELECTOR).first
- if locator.count() == 0:
- return False
- raw = str(locator.input_value(timeout=3_000)).strip()
- return bool(raw)
- except Exception:
- return False
- def _wait_current_page_document_load_completed_or_raise_error(page) -> None:
- page.wait_for_load_state(
- "load",
- timeout=_HOME_PAGE_DOCUMENT_LOAD_WAIT_TIMEOUT_MILLISECONDS,
- )
- def _path_key(path: str | None) -> str:
- p = (path or "").strip() or "/"
- p = p.rstrip("/")
- return "" if p in ("", "/") else p
- def _maximize_chrome_main_window_and_bring_to_foreground_win32(page) -> None:
- """
- Windows:把 Chrome 主窗口**最大化**(工作区铺满,非 F11 全屏)并置于前台。
- 若已最小化则一并拉起。Playwright 的 ``bring_to_front`` 只作用于页签层。
- """
- if sys.platform != "win32":
- return
- try:
- tab_title = (page.title() or "").strip()
- except Exception:
- tab_title = ""
- import ctypes
- from ctypes import wintypes
- user32 = ctypes.windll.user32
- SW_MAXIMIZE = 3
- candidates: list[tuple[int, str, int]] = []
- @ctypes.WINFUNCTYPE(ctypes.c_bool, wintypes.HWND, wintypes.LPARAM)
- def enum_proc(hwnd, _lparam):
- if not user32.IsWindowVisible(hwnd):
- return True
- cls = ctypes.create_unicode_buffer(256)
- user32.GetClassNameW(hwnd, cls, 256)
- if cls.value != "Chrome_WidgetWin_1":
- return True
- ln = user32.GetWindowTextLengthW(hwnd)
- if ln <= 0:
- return True
- buf = ctypes.create_unicode_buffer(ln + 1)
- user32.GetWindowTextW(hwnd, buf, ln + 1)
- title = buf.value
- if not title:
- return True
- rect = wintypes.RECT()
- if not user32.GetWindowRect(hwnd, ctypes.byref(rect)):
- return True
- w = max(0, rect.right - rect.left)
- h = max(0, rect.bottom - rect.top)
- if w * h < 80_000:
- return True
- candidates.append((int(hwnd), title, w * h))
- return True
- user32.EnumWindows(enum_proc, 0)
- if not candidates:
- return
- def score(item: tuple[int, str, int]) -> int:
- _hwnd, title, area = item
- s = min(area // 500, 8000)
- if tab_title:
- if tab_title in title:
- s += 50_000
- elif len(tab_title) >= 4 and tab_title[:12] in title:
- s += 25_000
- if "Chrome" in title or "chrome" in title.lower():
- s += 100
- return s
- hwnd = max(candidates, key=score)[0]
- try:
- user32.ShowWindow(hwnd, SW_MAXIMIZE)
- user32.SetForegroundWindow(hwnd)
- except Exception:
- pass
- def _chrome_remote_debugging_http_endpoint_ready(cdp_browser_http_url: str) -> bool:
- """
- 若 Chrome 已在该地址开启远程调试,``GET …/json/version`` 通常返回 200。
- 作为 ``cdp_endpoint_open`` 的补充,避免误判后再次 ``auto_chrome`` 拉起第二个实例。
- """
- raw = (cdp_browser_http_url or "").strip()
- if not raw:
- return False
- parsed = urlparse(raw)
- if not parsed.hostname:
- return False
- scheme = (parsed.scheme or "http").lower()
- port = parsed.port
- if port is None:
- port = 443 if scheme == "https" else 80
- version_probe_url = f"{scheme}://{parsed.hostname}:{port}/json/version"
- try:
- with urllib.request.urlopen(version_probe_url, timeout=2.0) as http_response:
- return http_response.status == 200
- except Exception:
- return False
- def _same_page_as_home(current: str, home: str) -> bool:
- h = (home or "").strip()
- if not h:
- return False
- c = (current or "").strip()
- if not c.startswith("http"):
- return False
- a = urlparse(c)
- b = urlparse(h)
- return (
- a.scheme.lower() == b.scheme.lower()
- and a.netloc.lower() == b.netloc.lower()
- and _path_key(a.path) == _path_key(b.path)
- and (a.query or "") == (b.query or "")
- )
- def _same_site_as_home_not_blank(current: str, home: str) -> bool:
- """与 ``home_url`` 同 host、且当前不是空白页 — 候选「可跳过 goto」的前置条件。"""
- h = (home or "").strip()
- if not h.startswith("http"):
- return False
- c = (current or "").strip()
- if not c.startswith("http") or _is_blank_url(c):
- return False
- return urlparse(c).netloc.lower() == urlparse(h).netloc.lower()
- def prepare_page_at_home_url(
- playwright_inst,
- home_url: str,
- *,
- cdp: str | None = None,
- same_site_preserves_current_page: bool = True,
- skip_home_navigation_when_top_search_input_has_text: bool = False,
- ):
- """
- 在调用方已有的 ``with sync_playwright() as p:`` 里执行:附着 CDP、选页;按规则 ``goto`` 到 ``home_url``。
- ``same_site_preserves_current_page``:为 ``True``(默认)时,若当前已是小红书同站且非空白,则 **不** 导航,以免冲掉已有搜索结果等;主流程入口宜传 ``False``,保证打开配置中的主页 URL。
- ``skip_home_navigation_when_top_search_input_has_text``:为 ``True`` 时,若已附着到小红书同站页且顶栏 ``#search-input`` 内已有文案,则 **不** 先 ``goto`` 主页,保留当前页,由 ``input-keyword`` 里 ``clear-input`` 清空后再输入(供 ``start.bat`` 第二轮关键词等场景)。
- 返回 ``(退出码, page | None)``。成功时 ``page`` 仍绑定当前浏览器上下文,勿在 ``with`` 块外长期使用。
- """
- home_url = (home_url or "").strip()
- if not home_url.startswith("http"):
- print("Invalid HOME_URL.", file=sys.stderr)
- return 1, None
- url = (cdp or "").strip() or DEFAULT_CDP_URL
- debugging_endpoint_already_listening = cdp_endpoint_open(url) or (
- _chrome_remote_debugging_http_endpoint_ready(url)
- )
- if debugging_endpoint_already_listening:
- print("检测到远程调试端口已有浏览器,仅附着、不新启 Chrome:", url, file=sys.stderr)
- else:
- print(
- f"未检测到调试端口监听,将连接并在需要时启动调试 Chrome:{url}",
- file=sys.stderr,
- )
- try:
- browser = connect_browser_via_cdp(
- playwright_inst,
- cdp=url,
- auto_chrome=not debugging_endpoint_already_listening,
- )
- except Exception as e:
- print("CDP connect failed:", e, file=sys.stderr)
- return 1, None
- page = pick_target_page(browser)
- if not page:
- print("No browser page available.", file=sys.stderr)
- return 1, None
- try:
- page.bring_to_front()
- except Exception:
- pass
- _maximize_chrome_main_window_and_bring_to_foreground_win32(page)
- url_now = (page.url or "").strip()
- if _is_blank_url(url_now):
- print("Blank tab — opening HOME_URL:", home_url, file=sys.stderr)
- page.goto(
- home_url,
- wait_until="load",
- timeout=_HOME_PAGE_DOCUMENT_LOAD_WAIT_TIMEOUT_MILLISECONDS,
- )
- return 0, page
- if _same_page_as_home(url_now, home_url):
- print("已在 HOME_URL,跳过导航。", file=sys.stderr)
- _wait_current_page_document_load_completed_or_raise_error(page)
- return 0, page
- if (
- same_site_preserves_current_page
- and _same_site_as_home_not_blank(url_now, home_url)
- ):
- print("同站已打开,跳过导航与刷新:", url_now, file=sys.stderr)
- _wait_current_page_document_load_completed_or_raise_error(page)
- return 0, page
- if (
- skip_home_navigation_when_top_search_input_has_text
- and _same_site_as_home_not_blank(url_now, home_url)
- and _top_search_input_has_non_empty_text(page)
- ):
- print(
- "顶栏搜索框已有文案,保留当前页(不先刷新主页),随后由 clear-input 清空。",
- file=sys.stderr,
- )
- _wait_current_page_document_load_completed_or_raise_error(page)
- return 0, page
- print("导航到 HOME_URL:", home_url, file=sys.stderr)
- page.goto(
- home_url,
- wait_until="load",
- timeout=_HOME_PAGE_DOCUMENT_LOAD_WAIT_TIMEOUT_MILLISECONDS,
- )
- return 0, page
- def prepare_page_at_home_url_then(
- playwright_inst,
- home_url: str,
- then: Callable[[Any], _T],
- *,
- cdp: str | None = None,
- same_site_preserves_current_page: bool = True,
- skip_home_navigation_when_top_search_input_has_text: bool = False,
- ) -> _T:
- """
- ``prepare_page_at_home_url`` 成功后把 ``page`` 交给 ``then``;失败则抛错(不返回 (code, page))。
- ``then`` 签名为 ``(page) -> T``。
- """
- code, page = prepare_page_at_home_url(
- playwright_inst,
- home_url,
- cdp=cdp,
- same_site_preserves_current_page=same_site_preserves_current_page,
- skip_home_navigation_when_top_search_input_has_text=skip_home_navigation_when_top_search_input_has_text,
- )
- if code != 0 or page is None:
- raise RuntimeError(f"prepare_page_at_home_url failed: code={code}")
- return then(page)
- def start(
- playwright_inst,
- home_url: str,
- then: Callable[[Any], _T],
- *,
- cdp: str | None = None,
- same_site_preserves_current_page: bool = True,
- skip_home_navigation_when_top_search_input_has_text: bool = False,
- ) -> _T:
- """
- 附着 CDP;仅在需要时 ``goto``(见 ``prepare_page_at_home_url``)。
- 不在此重复 ``wait_for_load_state``,避免同页再次等待或体感上的刷新。
- ``then`` 签名为 ``(page) -> T``;失败在 ``prepare`` 阶段抛 ``RuntimeError``。
- """
- def _after_browser_and_navigation_ready(page: Any) -> _T:
- return then(page)
- return prepare_page_at_home_url_then(
- playwright_inst,
- home_url,
- _after_browser_and_navigation_ready,
- cdp=cdp,
- same_site_preserves_current_page=same_site_preserves_current_page,
- skip_home_navigation_when_top_search_input_has_text=skip_home_navigation_when_top_search_input_has_text,
- )
- def ensure_browser_on_home_url(
- home_url: str,
- *,
- cdp: str | None = None,
- same_site_preserves_current_page: bool = True,
- skip_home_navigation_when_top_search_input_has_text: bool = False,
- ) -> tuple[int, str | None]:
- """
- 附着 Chrome CDP;选取目标页。
- - 空白页:goto home_url
- - 已是 HOME_URL:跳过导航
- - 与 home 同站且非空白:跳过导航(含搜索结果页等,不强制回首页)
- - 其它(跨站等):goto home_url
- 若 CDP 端口已在监听,视为浏览器已开启调试,仅附着且 **不再** 自动拉起第二个 Chrome;
- 未监听时再 ``connect_browser_via_cdp(..., auto_chrome=True)`` 尝试启动调试实例。
- 成功时返回 ``(0, html)``,``html`` 为当前页 ``page.content()``;失败时 ``(非0, None)``。
- Windows 下会在附着后把 **Chrome 主窗口最大化** 并置于前台(便于后续 PyAutoGUI 等)。
- """
- with sync_playwright() as p:
- code, page = prepare_page_at_home_url(
- p,
- home_url,
- cdp=cdp,
- same_site_preserves_current_page=same_site_preserves_current_page,
- skip_home_navigation_when_top_search_input_has_text=skip_home_navigation_when_top_search_input_has_text,
- )
- if code != 0 or page is None:
- return code, None
- try:
- return 0, page.content()
- except Exception:
- return 1, None
|