| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- #!/usr/bin/env python3
- from __future__ import annotations
- import sys
- from collections.abc import Callable
- from pathlib import Path
- from urllib.parse import unquote, urlparse
- from playwright.sync_api import Page, Response
- _REPOSITORY_ROOT_DIRECTORY = Path(__file__).resolve().parent.parent
- if str(_REPOSITORY_ROOT_DIRECTORY) not in sys.path:
- sys.path.insert(0, str(_REPOSITORY_ROOT_DIRECTORY))
- from workplace.note_pipeline_forbidden_standalone_url_image_fetch import (
- XHS_NOTE_PIPELINE_ABSOLUTE_PROHIBITION_STANDALONE_HTTP_GET_FOR_IMAGE_BYTES,
- )
- _CONTENT_TYPE_IMAGE_PREFIX = "image/"
- def should_cache_image_request_url_for_note_pipeline(image_request_url: str) -> bool:
- parsed_image_request_url = urlparse(image_request_url)
- hostname_only = (parsed_image_request_url.hostname or "").lower()
- if not hostname_only.endswith(".xhscdn.com"):
- return False
- return hostname_only.startswith("sns-webpic") or hostname_only.startswith(
- "sns-avatar",
- )
- def _http_response_represents_image_payload(response: Response) -> bool:
- content_type_header_value = response.headers.get("content-type", "")
- return _CONTENT_TYPE_IMAGE_PREFIX in content_type_header_value.lower()
- def _last_path_segment_unquoted_as_resource_name(image_request_url: str) -> str:
- url_path = urlparse(image_request_url).path
- stripped_path = url_path.strip("/")
- if not stripped_path:
- return "image"
- last_segment = stripped_path.split("/")[-1]
- decoded_segment = unquote(last_segment)
- return decoded_segment if decoded_segment else "image"
- def start(
- page: Page,
- *,
- image_name_log_output_file_path: Path | None = None,
- cached_image_body_by_request_url: dict[str, bytes] | None = None,
- ) -> Callable[[], None]:
- _xhs_note_pipeline_image_bytes_must_not_use_standalone_http_get = (
- XHS_NOTE_PIPELINE_ABSOLUTE_PROHIBITION_STANDALONE_HTTP_GET_FOR_IMAGE_BYTES
- )
- _ = _xhs_note_pipeline_image_bytes_must_not_use_standalone_http_get
- recorded_image_request_url_set: set[str] = set()
- def on_http_response_finished(response: Response) -> None:
- if (
- response.request.resource_type != "image"
- and not _http_response_represents_image_payload(response)
- ):
- return
- image_request_url = response.url
- if not should_cache_image_request_url_for_note_pipeline(image_request_url):
- return
- if image_request_url in recorded_image_request_url_set:
- return
- recorded_image_request_url_set.add(image_request_url)
- if cached_image_body_by_request_url is not None:
- cached_image_body_by_request_url[image_request_url] = response.body()
- cache_image_resource_name = _last_path_segment_unquoted_as_resource_name(
- image_request_url,
- )
- if image_name_log_output_file_path is not None:
- image_name_log_output_file_path.parent.mkdir(parents=True, exist_ok=True)
- log_line = f"{cache_image_resource_name}\t{image_request_url}\n"
- with image_name_log_output_file_path.open(
- "a",
- encoding="utf-8",
- ) as image_name_log_output_file_handle:
- image_name_log_output_file_handle.write(log_line)
- page.on("response", on_http_response_finished)
- def stop_listening_page_image_responses() -> None:
- page.remove_listener("response", on_http_response_finished)
- return stop_listening_page_image_responses
- __all__ = ["should_cache_image_request_url_for_note_pipeline", "start"]
|