#!/usr/bin/env python3 from __future__ import annotations import sys from collections.abc import Callable from pathlib import Path from urllib.parse import unquote, urlparse from playwright.sync_api import Page, Response _REPOSITORY_ROOT_DIRECTORY = Path(__file__).resolve().parent.parent if str(_REPOSITORY_ROOT_DIRECTORY) not in sys.path: sys.path.insert(0, str(_REPOSITORY_ROOT_DIRECTORY)) from workplace.note_pipeline_forbidden_standalone_url_image_fetch import ( XHS_NOTE_PIPELINE_ABSOLUTE_PROHIBITION_STANDALONE_HTTP_GET_FOR_IMAGE_BYTES, ) _CONTENT_TYPE_IMAGE_PREFIX = "image/" def should_cache_image_request_url_for_note_pipeline(image_request_url: str) -> bool: parsed_image_request_url = urlparse(image_request_url) hostname_only = (parsed_image_request_url.hostname or "").lower() if not hostname_only.endswith(".xhscdn.com"): return False return hostname_only.startswith("sns-webpic") or hostname_only.startswith( "sns-avatar", ) def _http_response_represents_image_payload(response: Response) -> bool: content_type_header_value = response.headers.get("content-type", "") return _CONTENT_TYPE_IMAGE_PREFIX in content_type_header_value.lower() def _last_path_segment_unquoted_as_resource_name(image_request_url: str) -> str: url_path = urlparse(image_request_url).path stripped_path = url_path.strip("/") if not stripped_path: return "image" last_segment = stripped_path.split("/")[-1] decoded_segment = unquote(last_segment) return decoded_segment if decoded_segment else "image" def start( page: Page, *, image_name_log_output_file_path: Path | None = None, cached_image_body_by_request_url: dict[str, bytes] | None = None, ) -> Callable[[], None]: _xhs_note_pipeline_image_bytes_must_not_use_standalone_http_get = ( XHS_NOTE_PIPELINE_ABSOLUTE_PROHIBITION_STANDALONE_HTTP_GET_FOR_IMAGE_BYTES ) _ = _xhs_note_pipeline_image_bytes_must_not_use_standalone_http_get recorded_image_request_url_set: set[str] = set() def on_http_response_finished(response: Response) -> None: if ( response.request.resource_type != "image" and not _http_response_represents_image_payload(response) ): return image_request_url = response.url if not should_cache_image_request_url_for_note_pipeline(image_request_url): return if image_request_url in recorded_image_request_url_set: return recorded_image_request_url_set.add(image_request_url) if cached_image_body_by_request_url is not None: cached_image_body_by_request_url[image_request_url] = response.body() cache_image_resource_name = _last_path_segment_unquoted_as_resource_name( image_request_url, ) if image_name_log_output_file_path is not None: image_name_log_output_file_path.parent.mkdir(parents=True, exist_ok=True) log_line = f"{cache_image_resource_name}\t{image_request_url}\n" with image_name_log_output_file_path.open( "a", encoding="utf-8", ) as image_name_log_output_file_handle: image_name_log_output_file_handle.write(log_line) page.on("response", on_http_response_finished) def stop_listening_page_image_responses() -> None: page.remove_listener("response", on_http_response_finished) return stop_listening_page_image_responses __all__ = ["should_cache_image_request_url_for_note_pipeline", "start"]