| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- #!/usr/bin/env python3
- """download-note 流水线共用:路径、封面高度、OCR 锚点清洗、详情页 DOM 提取 JS、轮播翻页、选详情页 Playwright Page。"""
- from __future__ import annotations
- import importlib.util
- from pathlib import Path
- from typing import Any
- from PIL import Image
- from playwright.sync_api import Page
- REPOSITORY_ROOT_DIRECTORY = Path(__file__).resolve().parent.parent
- def cover_image_height_px_from_note_output_sequence_folder(
- note_output_sequence_folder_path: Path,
- ) -> int:
- cover_image_path = sorted(note_output_sequence_folder_path.glob("cover_image.*"))[0]
- with Image.open(cover_image_path) as cover_image:
- return int(cover_image.height)
- def half_cover_image_height_pixels_from_disk_cover_file_under_note_output_folder(
- note_output_sequence_folder_path: Path,
- ) -> float:
- return cover_image_height_px_from_note_output_sequence_folder(
- note_output_sequence_folder_path,
- ) / 2.0
- def normalize_raw_note_card_metadata_string_to_plain_text_for_ocr_anchor(
- raw_string: str,
- ) -> str:
- allowed_chinese_punctuation_character_set = frozenset(
- ",。!?、:;()「」『』【】《》…—·\u201c\u201d\u2018\u2019"
- )
- result_character_list: list[str] = []
- for single_character in raw_string:
- code_point_integer = ord(single_character)
- if single_character.isascii():
- if single_character.isalnum() or single_character in " \t.-":
- result_character_list.append(single_character)
- continue
- if (
- 0x4E00 <= code_point_integer <= 0x9FFF
- or 0x3400 <= code_point_integer <= 0x4DBF
- or 0x20000 <= code_point_integer <= 0x2CEAF
- ):
- result_character_list.append(single_character)
- continue
- if single_character in allowed_chinese_punctuation_character_set:
- result_character_list.append(single_character)
- return "".join(result_character_list).strip()
- def clip_note_title_plain_text_for_feed_ocr_anchor(
- normalized_title_plain_text: str,
- *,
- maximum_anchor_character_count_inclusive: int = 11,
- ) -> str:
- """
- 仅用 ``note_title`` 做 feed 卡片 OCR 锚点:规范化后**最多**保留前 ``maximum`` 个 Unicode 码点
- (汉字与中文标点各算 1);超长则截断头部,与列表区标题展示一致。
- """
- s = (normalized_title_plain_text or "").strip()
- if not s:
- return ""
- if len(s) <= maximum_anchor_character_count_inclusive:
- return s
- return s[:maximum_anchor_character_count_inclusive]
- EXTRACT_NOTE_DETAIL_DOM_FOR_DISK_CACHE_JS = r"""
- () => {
- const extraction_result = {
- detailTitleText: "",
- detailBodyPlainText: "",
- detailBodyHashtagObjectList: [],
- detailBodyEmojiImageUrlList: [],
- noteDetailImageUrlList: [],
- };
- const detailTitleElement = document.querySelector("#detail-title");
- if (detailTitleElement && detailTitleElement.innerText.trim()) {
- extraction_result.detailTitleText = detailTitleElement.innerText.trim();
- }
- const detailDescElement = document.querySelector("#detail-desc");
- if (detailDescElement) {
- extraction_result.detailBodyPlainText = detailDescElement.innerText.trim();
- detailDescElement.querySelectorAll("a.tag, a[id='hash-tag']").forEach((anchorElement) => {
- extraction_result.detailBodyHashtagObjectList.push({
- display_text: (anchorElement.innerText || "").trim(),
- relative_href: (anchorElement.getAttribute("href") || "").trim(),
- });
- });
- detailDescElement.querySelectorAll("img.note-content-emoji").forEach((imgElement) => {
- let emojiImageUrl = (
- imgElement.getAttribute("src") ||
- imgElement.currentSrc ||
- ""
- ).trim();
- if (emojiImageUrl.startsWith("//")) emojiImageUrl = "https:" + emojiImageUrl;
- if (
- emojiImageUrl.startsWith("http") &&
- !extraction_result.detailBodyEmojiImageUrlList.includes(emojiImageUrl)
- ) {
- extraction_result.detailBodyEmojiImageUrlList.push(emojiImageUrl);
- }
- });
- }
- const deduplicatedAbsoluteImageUrlSet = new Set();
- const deduplicatedSnsWebpicNoteImageStemSet = new Set();
- const noteImageCanonicalStemFromSnsWebpicUrl = (absoluteImageUrl) => {
- let urlPathname = "";
- try {
- urlPathname = new URL(absoluteImageUrl).pathname;
- } catch {
- return "";
- }
- const pathSegmentList = urlPathname.split("/").filter(Boolean);
- if (!pathSegmentList.length) return "";
- const lastPathSegment = pathSegmentList[pathSegmentList.length - 1];
- const stemBeforeBang = lastPathSegment.split("!")[0];
- return stemBeforeBang || "";
- };
- const pushNormalizedImageUrlForDetailCache = (rawUrl, applySnsWebpicStemDedup) => {
- let normalizedUrl = (rawUrl || "").trim();
- if (!normalizedUrl) return;
- if (normalizedUrl.startsWith("//")) normalizedUrl = "https:" + normalizedUrl;
- if (!normalizedUrl.startsWith("http")) return;
- if (deduplicatedAbsoluteImageUrlSet.has(normalizedUrl)) return;
- if (
- applySnsWebpicStemDedup &&
- normalizedUrl.includes("sns-webpic")
- ) {
- const canonicalStem = noteImageCanonicalStemFromSnsWebpicUrl(normalizedUrl);
- if (
- canonicalStem &&
- deduplicatedSnsWebpicNoteImageStemSet.has(canonicalStem)
- ) {
- return;
- }
- if (canonicalStem) {
- deduplicatedSnsWebpicNoteImageStemSet.add(canonicalStem);
- }
- }
- deduplicatedAbsoluteImageUrlSet.add(normalizedUrl);
- extraction_result.noteDetailImageUrlList.push(normalizedUrl);
- };
- const pickSnsWebpicUrlFromCarouselImgElement = (imgElement) => {
- const fromSrc = (imgElement.getAttribute("src") || "").trim();
- const fromCurrent = (imgElement.currentSrc || "").trim();
- if (fromCurrent.includes("sns-webpic")) return fromCurrent;
- if (fromSrc.includes("sns-webpic")) return fromSrc;
- return "";
- };
- const carouselImgIsOutsideDetailDescAndCommentAreas = (imgElement) => {
- if (detailDescElement && detailDescElement.contains(imgElement)) return false;
- if (imgElement.closest("[class*='comment']")) return false;
- if (imgElement.closest(".note-content-emoji")) return false;
- return true;
- };
- const noteDetailLayoutRootElement =
- document.querySelector("#noteContainer") ||
- document.querySelector(".interaction-container") ||
- document.querySelector("[class*='note-detail']") ||
- document.body;
- const noteImageCarouselScopeElement =
- noteDetailLayoutRootElement.querySelector(".swiper") ||
- noteDetailLayoutRootElement.querySelector(".note-slider") ||
- noteDetailLayoutRootElement.querySelector("[class*='swiper']") ||
- noteDetailLayoutRootElement;
- const swiperSlideElementList = Array.from(
- noteImageCarouselScopeElement.querySelectorAll(".swiper-slide"),
- ).filter(
- (slideElement) => !slideElement.classList.contains("swiper-slide-duplicate"),
- );
- if (swiperSlideElementList.length > 0) {
- swiperSlideElementList.forEach((slideElement) => {
- const imgNodeListInSlide = slideElement.querySelectorAll("img");
- let chosenCarouselRawUrl = "";
- imgNodeListInSlide.forEach((imgElement) => {
- if (chosenCarouselRawUrl) return;
- chosenCarouselRawUrl = pickSnsWebpicUrlFromCarouselImgElement(imgElement);
- });
- if (chosenCarouselRawUrl) {
- pushNormalizedImageUrlForDetailCache(chosenCarouselRawUrl, true);
- }
- });
- } else {
- noteImageCarouselScopeElement.querySelectorAll("img[src*='sns-webpic']").forEach(
- (imgElement) => {
- if (!carouselImgIsOutsideDetailDescAndCommentAreas(imgElement)) return;
- const fallbackRawUrl = pickSnsWebpicUrlFromCarouselImgElement(imgElement);
- if (fallbackRawUrl) {
- pushNormalizedImageUrlForDetailCache(fallbackRawUrl, true);
- }
- },
- );
- }
- return extraction_result;
- }
- """
- COUNT_NOTE_DETAIL_CAROUSEL_NON_DUPLICATE_SLIDE_ELEMENT_JS = r"""
- () => {
- const noteDetailLayoutRootElement =
- document.querySelector("#noteContainer") ||
- document.querySelector(".interaction-container") ||
- document.querySelector("[class*='note-detail']") ||
- document.body;
- const noteImageCarouselScopeElement =
- noteDetailLayoutRootElement.querySelector(".swiper") ||
- noteDetailLayoutRootElement.querySelector(".note-slider") ||
- noteDetailLayoutRootElement.querySelector("[class*='swiper']") ||
- noteDetailLayoutRootElement;
- return noteImageCarouselScopeElement.querySelectorAll(
- ".swiper-slide:not(.swiper-slide-duplicate)",
- ).length;
- }
- """
- MAX_NOTE_DETAIL_CAROUSEL_SLIDE_ITERATION_COUNT = 48
- MILLISECONDS_TO_PAUSE_AFTER_EACH_NOTE_DETAIL_CAROUSEL_SLIDE_NAVIGATION = 600
- NOTE_DETAIL_CAROUSEL_ESTIMATED_NEW_SLIDE_IMAGE_COUNT_LOADED_EACH_ARROW_RIGHT_KEY_PRESS = 3
- def iterate_note_detail_carousel_each_slide_so_playwright_image_response_cache_populates(
- note_detail_playwright_page: Page,
- ) -> None:
- carousel_non_duplicate_slide_element_count = int(
- note_detail_playwright_page.evaluate(
- COUNT_NOTE_DETAIL_CAROUSEL_NON_DUPLICATE_SLIDE_ELEMENT_JS,
- )
- )
- iteration_upper_bound = min(
- carousel_non_duplicate_slide_element_count,
- MAX_NOTE_DETAIL_CAROUSEL_SLIDE_ITERATION_COUNT,
- )
- if iteration_upper_bound <= 1:
- note_detail_playwright_page.wait_for_timeout(
- MILLISECONDS_TO_PAUSE_AFTER_EACH_NOTE_DETAIL_CAROUSEL_SLIDE_NAVIGATION,
- )
- return
- note_detail_playwright_page.bring_to_front()
- carousel_edge_transition_count = iteration_upper_bound - 1
- batch_load_estimate = max(
- 1,
- int(NOTE_DETAIL_CAROUSEL_ESTIMATED_NEW_SLIDE_IMAGE_COUNT_LOADED_EACH_ARROW_RIGHT_KEY_PRESS),
- )
- arrow_right_key_press_count = (
- (carousel_edge_transition_count + batch_load_estimate - 1) // batch_load_estimate
- )
- for _arrow_right_key_press_index in range(arrow_right_key_press_count):
- note_detail_playwright_page.keyboard.press("ArrowRight")
- note_detail_playwright_page.wait_for_timeout(
- MILLISECONDS_TO_PAUSE_AFTER_EACH_NOTE_DETAIL_CAROUSEL_SLIDE_NAVIGATION,
- )
- def playwright_page_for_opened_note_detail_extraction(playwright_page: Page) -> Page:
- ordered_candidate_page_list: list[Page] = [playwright_page]
- for context_page in playwright_page.context.pages:
- if context_page is not playwright_page:
- ordered_candidate_page_list.append(context_page)
- for candidate_playwright_page in ordered_candidate_page_list:
- if candidate_playwright_page.is_closed():
- continue
- dom_has_note_detail_root = candidate_playwright_page.evaluate(
- "() => !!(document.querySelector('#detail-title') || document.querySelector('#detail-desc'))",
- )
- if dom_has_note_detail_root:
- return candidate_playwright_page
- return playwright_page
- def load_python_module_from_file(
- python_file_path: Path,
- importlib_logical_module_name: str,
- ):
- importlib_module_spec = importlib.util.spec_from_file_location(
- importlib_logical_module_name,
- python_file_path,
- )
- if importlib_module_spec is None or importlib_module_spec.loader is None:
- raise ImportError(f"Cannot load {python_file_path}")
- loaded_python_module = importlib.util.module_from_spec(importlib_module_spec)
- importlib_module_spec.loader.exec_module(loaded_python_module)
- return loaded_python_module
- PROJECT_CONFIG_BUSINESS_COOPERATION_BTN_SCREEN_XY_PAIR_CONFIG_KEY = "business-cooperation-btn-pos"
- BUSINESS_COOPERATION_BUTTON_VISIBLE_TEXT_FOR_OCR_ANCHOR = "业务合作"
- __all__ = [
- "REPOSITORY_ROOT_DIRECTORY",
- "BUSINESS_COOPERATION_BUTTON_VISIBLE_TEXT_FOR_OCR_ANCHOR",
- "COUNT_NOTE_DETAIL_CAROUSEL_NON_DUPLICATE_SLIDE_ELEMENT_JS",
- "EXTRACT_NOTE_DETAIL_DOM_FOR_DISK_CACHE_JS",
- "MAX_NOTE_DETAIL_CAROUSEL_SLIDE_ITERATION_COUNT",
- "MILLISECONDS_TO_PAUSE_AFTER_EACH_NOTE_DETAIL_CAROUSEL_SLIDE_NAVIGATION",
- "NOTE_DETAIL_CAROUSEL_ESTIMATED_NEW_SLIDE_IMAGE_COUNT_LOADED_EACH_ARROW_RIGHT_KEY_PRESS",
- "PROJECT_CONFIG_BUSINESS_COOPERATION_BTN_SCREEN_XY_PAIR_CONFIG_KEY",
- "cover_image_height_px_from_note_output_sequence_folder",
- "half_cover_image_height_pixels_from_disk_cover_file_under_note_output_folder",
- "iterate_note_detail_carousel_each_slide_so_playwright_image_response_cache_populates",
- "load_python_module_from_file",
- "normalize_raw_note_card_metadata_string_to_plain_text_for_ocr_anchor",
- "clip_note_title_plain_text_for_feed_ocr_anchor",
- "playwright_page_for_opened_note_detail_extraction",
- ]
|