memory-cache-image-name.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. #!/usr/bin/env python3
  2. from __future__ import annotations
  3. import sys
  4. from collections.abc import Callable
  5. from pathlib import Path
  6. from urllib.parse import unquote, urlparse
  7. from playwright.sync_api import Page, Response
  8. _REPOSITORY_ROOT_DIRECTORY = Path(__file__).resolve().parent.parent
  9. if str(_REPOSITORY_ROOT_DIRECTORY) not in sys.path:
  10. sys.path.insert(0, str(_REPOSITORY_ROOT_DIRECTORY))
  11. from workplace.note_pipeline_forbidden_standalone_url_image_fetch import (
  12. XHS_NOTE_PIPELINE_ABSOLUTE_PROHIBITION_STANDALONE_HTTP_GET_FOR_IMAGE_BYTES,
  13. )
  14. _CONTENT_TYPE_IMAGE_PREFIX = "image/"
  15. def should_cache_image_request_url_for_note_pipeline(image_request_url: str) -> bool:
  16. parsed_image_request_url = urlparse(image_request_url)
  17. hostname_only = (parsed_image_request_url.hostname or "").lower()
  18. if not hostname_only.endswith(".xhscdn.com"):
  19. return False
  20. return hostname_only.startswith("sns-webpic") or hostname_only.startswith(
  21. "sns-avatar",
  22. )
  23. def _http_response_represents_image_payload(response: Response) -> bool:
  24. content_type_header_value = response.headers.get("content-type", "")
  25. return _CONTENT_TYPE_IMAGE_PREFIX in content_type_header_value.lower()
  26. def _last_path_segment_unquoted_as_resource_name(image_request_url: str) -> str:
  27. url_path = urlparse(image_request_url).path
  28. stripped_path = url_path.strip("/")
  29. if not stripped_path:
  30. return "image"
  31. last_segment = stripped_path.split("/")[-1]
  32. decoded_segment = unquote(last_segment)
  33. return decoded_segment if decoded_segment else "image"
  34. def start(
  35. page: Page,
  36. *,
  37. image_name_log_output_file_path: Path | None = None,
  38. cached_image_body_by_request_url: dict[str, bytes] | None = None,
  39. ) -> Callable[[], None]:
  40. _xhs_note_pipeline_image_bytes_must_not_use_standalone_http_get = (
  41. XHS_NOTE_PIPELINE_ABSOLUTE_PROHIBITION_STANDALONE_HTTP_GET_FOR_IMAGE_BYTES
  42. )
  43. _ = _xhs_note_pipeline_image_bytes_must_not_use_standalone_http_get
  44. recorded_image_request_url_set: set[str] = set()
  45. def on_http_response_finished(response: Response) -> None:
  46. if (
  47. response.request.resource_type != "image"
  48. and not _http_response_represents_image_payload(response)
  49. ):
  50. return
  51. image_request_url = response.url
  52. if not should_cache_image_request_url_for_note_pipeline(image_request_url):
  53. return
  54. if image_request_url in recorded_image_request_url_set:
  55. return
  56. recorded_image_request_url_set.add(image_request_url)
  57. if cached_image_body_by_request_url is not None:
  58. cached_image_body_by_request_url[image_request_url] = response.body()
  59. cache_image_resource_name = _last_path_segment_unquoted_as_resource_name(
  60. image_request_url,
  61. )
  62. if image_name_log_output_file_path is not None:
  63. image_name_log_output_file_path.parent.mkdir(parents=True, exist_ok=True)
  64. log_line = f"{cache_image_resource_name}\t{image_request_url}\n"
  65. with image_name_log_output_file_path.open(
  66. "a",
  67. encoding="utf-8",
  68. ) as image_name_log_output_file_handle:
  69. image_name_log_output_file_handle.write(log_line)
  70. page.on("response", on_http_response_finished)
  71. def stop_listening_page_image_responses() -> None:
  72. page.remove_listener("response", on_http_response_finished)
  73. return stop_listening_page_image_responses
  74. __all__ = ["should_cache_image_request_url_for_note_pipeline", "start"]