PdfParser.py 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075
  1. from __future__ import annotations
  2. import calendar
  3. import codecs
  4. import collections
  5. import mmap
  6. import os
  7. import re
  8. import time
  9. import zlib
  10. from typing import Any, NamedTuple
  11. TYPE_CHECKING = False
  12. if TYPE_CHECKING:
  13. from typing import IO
  14. _DictBase = collections.UserDict[str | bytes, Any]
  15. else:
  16. _DictBase = collections.UserDict
  17. # see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set
  18. # on page 656
  19. def encode_text(s: str) -> bytes:
  20. return codecs.BOM_UTF16_BE + s.encode("utf_16_be")
  21. PDFDocEncoding = {
  22. 0x16: "\u0017",
  23. 0x18: "\u02d8",
  24. 0x19: "\u02c7",
  25. 0x1A: "\u02c6",
  26. 0x1B: "\u02d9",
  27. 0x1C: "\u02dd",
  28. 0x1D: "\u02db",
  29. 0x1E: "\u02da",
  30. 0x1F: "\u02dc",
  31. 0x80: "\u2022",
  32. 0x81: "\u2020",
  33. 0x82: "\u2021",
  34. 0x83: "\u2026",
  35. 0x84: "\u2014",
  36. 0x85: "\u2013",
  37. 0x86: "\u0192",
  38. 0x87: "\u2044",
  39. 0x88: "\u2039",
  40. 0x89: "\u203a",
  41. 0x8A: "\u2212",
  42. 0x8B: "\u2030",
  43. 0x8C: "\u201e",
  44. 0x8D: "\u201c",
  45. 0x8E: "\u201d",
  46. 0x8F: "\u2018",
  47. 0x90: "\u2019",
  48. 0x91: "\u201a",
  49. 0x92: "\u2122",
  50. 0x93: "\ufb01",
  51. 0x94: "\ufb02",
  52. 0x95: "\u0141",
  53. 0x96: "\u0152",
  54. 0x97: "\u0160",
  55. 0x98: "\u0178",
  56. 0x99: "\u017d",
  57. 0x9A: "\u0131",
  58. 0x9B: "\u0142",
  59. 0x9C: "\u0153",
  60. 0x9D: "\u0161",
  61. 0x9E: "\u017e",
  62. 0xA0: "\u20ac",
  63. }
  64. def decode_text(b: bytes) -> str:
  65. if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE:
  66. return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be")
  67. else:
  68. return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b)
  69. class PdfFormatError(RuntimeError):
  70. """An error that probably indicates a syntactic or semantic error in the
  71. PDF file structure"""
  72. pass
  73. def check_format_condition(condition: bool, error_message: str) -> None:
  74. if not condition:
  75. raise PdfFormatError(error_message)
  76. class IndirectReferenceTuple(NamedTuple):
  77. object_id: int
  78. generation: int
  79. class IndirectReference(IndirectReferenceTuple):
  80. def __str__(self) -> str:
  81. return f"{self.object_id} {self.generation} R"
  82. def __bytes__(self) -> bytes:
  83. return self.__str__().encode("us-ascii")
  84. def __eq__(self, other: object) -> bool:
  85. if self.__class__ is not other.__class__:
  86. return False
  87. assert isinstance(other, IndirectReference)
  88. return other.object_id == self.object_id and other.generation == self.generation
  89. def __ne__(self, other: object) -> bool:
  90. return not (self == other)
  91. def __hash__(self) -> int:
  92. return hash((self.object_id, self.generation))
  93. class IndirectObjectDef(IndirectReference):
  94. def __str__(self) -> str:
  95. return f"{self.object_id} {self.generation} obj"
  96. class XrefTable:
  97. def __init__(self) -> None:
  98. self.existing_entries: dict[int, tuple[int, int]] = (
  99. {}
  100. ) # object ID => (offset, generation)
  101. self.new_entries: dict[int, tuple[int, int]] = (
  102. {}
  103. ) # object ID => (offset, generation)
  104. self.deleted_entries = {0: 65536} # object ID => generation
  105. self.reading_finished = False
  106. def __setitem__(self, key: int, value: tuple[int, int]) -> None:
  107. if self.reading_finished:
  108. self.new_entries[key] = value
  109. else:
  110. self.existing_entries[key] = value
  111. if key in self.deleted_entries:
  112. del self.deleted_entries[key]
  113. def __getitem__(self, key: int) -> tuple[int, int]:
  114. try:
  115. return self.new_entries[key]
  116. except KeyError:
  117. return self.existing_entries[key]
  118. def __delitem__(self, key: int) -> None:
  119. if key in self.new_entries:
  120. generation = self.new_entries[key][1] + 1
  121. del self.new_entries[key]
  122. self.deleted_entries[key] = generation
  123. elif key in self.existing_entries:
  124. generation = self.existing_entries[key][1] + 1
  125. self.deleted_entries[key] = generation
  126. elif key in self.deleted_entries:
  127. generation = self.deleted_entries[key]
  128. else:
  129. msg = f"object ID {key} cannot be deleted because it doesn't exist"
  130. raise IndexError(msg)
  131. def __contains__(self, key: int) -> bool:
  132. return key in self.existing_entries or key in self.new_entries
  133. def __len__(self) -> int:
  134. return len(
  135. set(self.existing_entries.keys())
  136. | set(self.new_entries.keys())
  137. | set(self.deleted_entries.keys())
  138. )
  139. def keys(self) -> set[int]:
  140. return (
  141. set(self.existing_entries.keys()) - set(self.deleted_entries.keys())
  142. ) | set(self.new_entries.keys())
  143. def write(self, f: IO[bytes]) -> int:
  144. keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys()))
  145. deleted_keys = sorted(set(self.deleted_entries.keys()))
  146. startxref = f.tell()
  147. f.write(b"xref\n")
  148. while keys:
  149. # find a contiguous sequence of object IDs
  150. prev: int | None = None
  151. for index, key in enumerate(keys):
  152. if prev is None or prev + 1 == key:
  153. prev = key
  154. else:
  155. contiguous_keys = keys[:index]
  156. keys = keys[index:]
  157. break
  158. else:
  159. contiguous_keys = keys
  160. keys = []
  161. f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys)))
  162. for object_id in contiguous_keys:
  163. if object_id in self.new_entries:
  164. f.write(b"%010d %05d n \n" % self.new_entries[object_id])
  165. else:
  166. this_deleted_object_id = deleted_keys.pop(0)
  167. check_format_condition(
  168. object_id == this_deleted_object_id,
  169. f"expected the next deleted object ID to be {object_id}, "
  170. f"instead found {this_deleted_object_id}",
  171. )
  172. try:
  173. next_in_linked_list = deleted_keys[0]
  174. except IndexError:
  175. next_in_linked_list = 0
  176. f.write(
  177. b"%010d %05d f \n"
  178. % (next_in_linked_list, self.deleted_entries[object_id])
  179. )
  180. return startxref
  181. class PdfName:
  182. name: bytes
  183. def __init__(self, name: PdfName | bytes | str) -> None:
  184. if isinstance(name, PdfName):
  185. self.name = name.name
  186. elif isinstance(name, bytes):
  187. self.name = name
  188. else:
  189. self.name = name.encode("us-ascii")
  190. def name_as_str(self) -> str:
  191. return self.name.decode("us-ascii")
  192. def __eq__(self, other: object) -> bool:
  193. return (
  194. isinstance(other, PdfName) and other.name == self.name
  195. ) or other == self.name
  196. def __hash__(self) -> int:
  197. return hash(self.name)
  198. def __repr__(self) -> str:
  199. return f"{self.__class__.__name__}({repr(self.name)})"
  200. @classmethod
  201. def from_pdf_stream(cls, data: bytes) -> PdfName:
  202. return cls(PdfParser.interpret_name(data))
  203. allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"}
  204. def __bytes__(self) -> bytes:
  205. result = bytearray(b"/")
  206. for b in self.name:
  207. if b in self.allowed_chars:
  208. result.append(b)
  209. else:
  210. result.extend(b"#%02X" % b)
  211. return bytes(result)
  212. class PdfArray(list[Any]):
  213. def __bytes__(self) -> bytes:
  214. return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]"
  215. class PdfDict(_DictBase):
  216. def __setattr__(self, key: str, value: Any) -> None:
  217. if key == "data":
  218. collections.UserDict.__setattr__(self, key, value)
  219. else:
  220. self[key.encode("us-ascii")] = value
  221. def __getattr__(self, key: str) -> str | time.struct_time:
  222. try:
  223. value = self[key.encode("us-ascii")]
  224. except KeyError as e:
  225. raise AttributeError(key) from e
  226. if isinstance(value, bytes):
  227. value = decode_text(value)
  228. if key.endswith("Date"):
  229. if value.startswith("D:"):
  230. value = value[2:]
  231. relationship = "Z"
  232. if len(value) > 17:
  233. relationship = value[14]
  234. offset = int(value[15:17]) * 60
  235. if len(value) > 20:
  236. offset += int(value[18:20])
  237. format = "%Y%m%d%H%M%S"[: len(value) - 2]
  238. value = time.strptime(value[: len(format) + 2], format)
  239. if relationship in ["+", "-"]:
  240. offset *= 60
  241. if relationship == "+":
  242. offset *= -1
  243. value = time.gmtime(calendar.timegm(value) + offset)
  244. return value
  245. def __bytes__(self) -> bytes:
  246. out = bytearray(b"<<")
  247. for key, value in self.items():
  248. if value is None:
  249. continue
  250. value = pdf_repr(value)
  251. out.extend(b"\n")
  252. out.extend(bytes(PdfName(key)))
  253. out.extend(b" ")
  254. out.extend(value)
  255. out.extend(b"\n>>")
  256. return bytes(out)
  257. class PdfBinary:
  258. def __init__(self, data: list[int] | bytes) -> None:
  259. self.data = data
  260. def __bytes__(self) -> bytes:
  261. return b"<%s>" % b"".join(b"%02X" % b for b in self.data)
  262. class PdfStream:
  263. def __init__(self, dictionary: PdfDict, buf: bytes) -> None:
  264. self.dictionary = dictionary
  265. self.buf = buf
  266. def decode(self) -> bytes:
  267. try:
  268. filter = self.dictionary[b"Filter"]
  269. except KeyError:
  270. return self.buf
  271. if filter == b"FlateDecode":
  272. try:
  273. expected_length = self.dictionary[b"DL"]
  274. except KeyError:
  275. expected_length = self.dictionary[b"Length"]
  276. return zlib.decompress(self.buf, bufsize=int(expected_length))
  277. else:
  278. msg = f"stream filter {repr(filter)} unknown/unsupported"
  279. raise NotImplementedError(msg)
  280. def pdf_repr(x: Any) -> bytes:
  281. if x is True:
  282. return b"true"
  283. elif x is False:
  284. return b"false"
  285. elif x is None:
  286. return b"null"
  287. elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)):
  288. return bytes(x)
  289. elif isinstance(x, (int, float)):
  290. return str(x).encode("us-ascii")
  291. elif isinstance(x, time.struct_time):
  292. return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")"
  293. elif isinstance(x, dict):
  294. return bytes(PdfDict(x))
  295. elif isinstance(x, list):
  296. return bytes(PdfArray(x))
  297. elif isinstance(x, str):
  298. return pdf_repr(encode_text(x))
  299. elif isinstance(x, bytes):
  300. # XXX escape more chars? handle binary garbage
  301. x = x.replace(b"\\", b"\\\\")
  302. x = x.replace(b"(", b"\\(")
  303. x = x.replace(b")", b"\\)")
  304. return b"(" + x + b")"
  305. else:
  306. return bytes(x)
  307. class PdfParser:
  308. """Based on
  309. https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf
  310. Supports PDF up to 1.4
  311. """
  312. def __init__(
  313. self,
  314. filename: str | None = None,
  315. f: IO[bytes] | None = None,
  316. buf: bytes | bytearray | None = None,
  317. start_offset: int = 0,
  318. mode: str = "rb",
  319. ) -> None:
  320. if buf and f:
  321. msg = "specify buf or f or filename, but not both buf and f"
  322. raise RuntimeError(msg)
  323. self.filename = filename
  324. self.buf: bytes | bytearray | mmap.mmap | None = buf
  325. self.f = f
  326. self.start_offset = start_offset
  327. self.should_close_buf = False
  328. self.should_close_file = False
  329. if filename is not None and f is None:
  330. self.f = f = open(filename, mode)
  331. self.should_close_file = True
  332. if f is not None:
  333. self.buf = self.get_buf_from_file(f)
  334. self.should_close_buf = True
  335. if not filename and hasattr(f, "name"):
  336. self.filename = f.name
  337. self.cached_objects: dict[IndirectReference, Any] = {}
  338. self.root_ref: IndirectReference | None
  339. self.info_ref: IndirectReference | None
  340. self.pages_ref: IndirectReference | None
  341. self.last_xref_section_offset: int | None
  342. if self.buf:
  343. self.read_pdf_info()
  344. else:
  345. self.file_size_total = self.file_size_this = 0
  346. self.root = PdfDict()
  347. self.root_ref = None
  348. self.info = PdfDict()
  349. self.info_ref = None
  350. self.page_tree_root = PdfDict()
  351. self.pages: list[IndirectReference] = []
  352. self.orig_pages: list[IndirectReference] = []
  353. self.pages_ref = None
  354. self.last_xref_section_offset = None
  355. self.trailer_dict: dict[bytes, Any] = {}
  356. self.xref_table = XrefTable()
  357. self.xref_table.reading_finished = True
  358. if f:
  359. self.seek_end()
  360. def __enter__(self) -> PdfParser:
  361. return self
  362. def __exit__(self, *args: object) -> None:
  363. self.close()
  364. def start_writing(self) -> None:
  365. self.close_buf()
  366. self.seek_end()
  367. def close_buf(self) -> None:
  368. if isinstance(self.buf, mmap.mmap):
  369. self.buf.close()
  370. self.buf = None
  371. def close(self) -> None:
  372. if self.should_close_buf:
  373. self.close_buf()
  374. if self.f is not None and self.should_close_file:
  375. self.f.close()
  376. self.f = None
  377. def seek_end(self) -> None:
  378. assert self.f is not None
  379. self.f.seek(0, os.SEEK_END)
  380. def write_header(self) -> None:
  381. assert self.f is not None
  382. self.f.write(b"%PDF-1.4\n")
  383. def write_comment(self, s: str) -> None:
  384. assert self.f is not None
  385. self.f.write(f"% {s}\n".encode())
  386. def write_catalog(self) -> IndirectReference:
  387. assert self.f is not None
  388. self.del_root()
  389. self.root_ref = self.next_object_id(self.f.tell())
  390. self.pages_ref = self.next_object_id(0)
  391. self.rewrite_pages()
  392. self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref)
  393. self.write_obj(
  394. self.pages_ref,
  395. Type=PdfName(b"Pages"),
  396. Count=len(self.pages),
  397. Kids=self.pages,
  398. )
  399. return self.root_ref
  400. def rewrite_pages(self) -> None:
  401. pages_tree_nodes_to_delete = []
  402. for i, page_ref in enumerate(self.orig_pages):
  403. page_info = self.cached_objects[page_ref]
  404. del self.xref_table[page_ref.object_id]
  405. pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")])
  406. if page_ref not in self.pages:
  407. # the page has been deleted
  408. continue
  409. # make dict keys into strings for passing to write_page
  410. stringified_page_info = {}
  411. for key, value in page_info.items():
  412. # key should be a PdfName
  413. stringified_page_info[key.name_as_str()] = value
  414. stringified_page_info["Parent"] = self.pages_ref
  415. new_page_ref = self.write_page(None, **stringified_page_info)
  416. for j, cur_page_ref in enumerate(self.pages):
  417. if cur_page_ref == page_ref:
  418. # replace the page reference with the new one
  419. self.pages[j] = new_page_ref
  420. # delete redundant Pages tree nodes from xref table
  421. for pages_tree_node_ref in pages_tree_nodes_to_delete:
  422. while pages_tree_node_ref:
  423. pages_tree_node = self.cached_objects[pages_tree_node_ref]
  424. if pages_tree_node_ref.object_id in self.xref_table:
  425. del self.xref_table[pages_tree_node_ref.object_id]
  426. pages_tree_node_ref = pages_tree_node.get(b"Parent", None)
  427. self.orig_pages = []
  428. def write_xref_and_trailer(
  429. self, new_root_ref: IndirectReference | None = None
  430. ) -> None:
  431. assert self.f is not None
  432. if new_root_ref:
  433. self.del_root()
  434. self.root_ref = new_root_ref
  435. if self.info:
  436. self.info_ref = self.write_obj(None, self.info)
  437. start_xref = self.xref_table.write(self.f)
  438. num_entries = len(self.xref_table)
  439. trailer_dict: dict[str | bytes, Any] = {
  440. b"Root": self.root_ref,
  441. b"Size": num_entries,
  442. }
  443. if self.last_xref_section_offset is not None:
  444. trailer_dict[b"Prev"] = self.last_xref_section_offset
  445. if self.info:
  446. trailer_dict[b"Info"] = self.info_ref
  447. self.last_xref_section_offset = start_xref
  448. self.f.write(
  449. b"trailer\n"
  450. + bytes(PdfDict(trailer_dict))
  451. + b"\nstartxref\n%d\n%%%%EOF" % start_xref
  452. )
  453. def write_page(
  454. self, ref: int | IndirectReference | None, *objs: Any, **dict_obj: Any
  455. ) -> IndirectReference:
  456. obj_ref = self.pages[ref] if isinstance(ref, int) else ref
  457. if "Type" not in dict_obj:
  458. dict_obj["Type"] = PdfName(b"Page")
  459. if "Parent" not in dict_obj:
  460. dict_obj["Parent"] = self.pages_ref
  461. return self.write_obj(obj_ref, *objs, **dict_obj)
  462. def write_obj(
  463. self, ref: IndirectReference | None, *objs: Any, **dict_obj: Any
  464. ) -> IndirectReference:
  465. assert self.f is not None
  466. f = self.f
  467. if ref is None:
  468. ref = self.next_object_id(f.tell())
  469. else:
  470. self.xref_table[ref.object_id] = (f.tell(), ref.generation)
  471. f.write(bytes(IndirectObjectDef(*ref)))
  472. stream = dict_obj.pop("stream", None)
  473. if stream is not None:
  474. dict_obj["Length"] = len(stream)
  475. if dict_obj:
  476. f.write(pdf_repr(dict_obj))
  477. for obj in objs:
  478. f.write(pdf_repr(obj))
  479. if stream is not None:
  480. f.write(b"stream\n")
  481. f.write(stream)
  482. f.write(b"\nendstream\n")
  483. f.write(b"endobj\n")
  484. return ref
  485. def del_root(self) -> None:
  486. if self.root_ref is None:
  487. return
  488. del self.xref_table[self.root_ref.object_id]
  489. del self.xref_table[self.root[b"Pages"].object_id]
  490. @staticmethod
  491. def get_buf_from_file(f: IO[bytes]) -> bytes | mmap.mmap:
  492. if hasattr(f, "getbuffer"):
  493. return f.getbuffer()
  494. elif hasattr(f, "getvalue"):
  495. return f.getvalue()
  496. else:
  497. try:
  498. return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
  499. except ValueError: # cannot mmap an empty file
  500. return b""
  501. def read_pdf_info(self) -> None:
  502. assert self.buf is not None
  503. self.file_size_total = len(self.buf)
  504. self.file_size_this = self.file_size_total - self.start_offset
  505. self.read_trailer()
  506. check_format_condition(
  507. self.trailer_dict.get(b"Root") is not None, "Root is missing"
  508. )
  509. self.root_ref = self.trailer_dict[b"Root"]
  510. assert self.root_ref is not None
  511. self.info_ref = self.trailer_dict.get(b"Info", None)
  512. self.root = PdfDict(self.read_indirect(self.root_ref))
  513. if self.info_ref is None:
  514. self.info = PdfDict()
  515. else:
  516. self.info = PdfDict(self.read_indirect(self.info_ref))
  517. check_format_condition(b"Type" in self.root, "/Type missing in Root")
  518. check_format_condition(
  519. self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog"
  520. )
  521. check_format_condition(
  522. self.root.get(b"Pages") is not None, "/Pages missing in Root"
  523. )
  524. check_format_condition(
  525. isinstance(self.root[b"Pages"], IndirectReference),
  526. "/Pages in Root is not an indirect reference",
  527. )
  528. self.pages_ref = self.root[b"Pages"]
  529. assert self.pages_ref is not None
  530. self.page_tree_root = self.read_indirect(self.pages_ref)
  531. self.pages = self.linearize_page_tree(self.page_tree_root)
  532. # save the original list of page references
  533. # in case the user modifies, adds or deletes some pages
  534. # and we need to rewrite the pages and their list
  535. self.orig_pages = self.pages[:]
  536. def next_object_id(self, offset: int | None = None) -> IndirectReference:
  537. try:
  538. # TODO: support reuse of deleted objects
  539. reference = IndirectReference(max(self.xref_table.keys()) + 1, 0)
  540. except ValueError:
  541. reference = IndirectReference(1, 0)
  542. if offset is not None:
  543. self.xref_table[reference.object_id] = (offset, 0)
  544. return reference
  545. delimiter = rb"[][()<>{}/%]"
  546. delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]"
  547. whitespace = rb"[\000\011\012\014\015\040]"
  548. whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]"
  549. whitespace_optional = whitespace + b"*"
  550. whitespace_mandatory = whitespace + b"+"
  551. # No "\012" aka "\n" or "\015" aka "\r":
  552. whitespace_optional_no_nl = rb"[\000\011\014\040]*"
  553. newline_only = rb"[\r\n]+"
  554. newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl
  555. re_trailer_end = re.compile(
  556. whitespace_mandatory
  557. + rb"trailer"
  558. + whitespace_optional
  559. + rb"<<(.*>>)"
  560. + newline
  561. + rb"startxref"
  562. + newline
  563. + rb"([0-9]+)"
  564. + newline
  565. + rb"%%EOF"
  566. + whitespace_optional
  567. + rb"$",
  568. re.DOTALL,
  569. )
  570. re_trailer_prev = re.compile(
  571. whitespace_optional
  572. + rb"trailer"
  573. + whitespace_optional
  574. + rb"<<(.*?>>)"
  575. + newline
  576. + rb"startxref"
  577. + newline
  578. + rb"([0-9]+)"
  579. + newline
  580. + rb"%%EOF"
  581. + whitespace_optional,
  582. re.DOTALL,
  583. )
  584. def read_trailer(self) -> None:
  585. assert self.buf is not None
  586. search_start_offset = len(self.buf) - 16384
  587. if search_start_offset < self.start_offset:
  588. search_start_offset = self.start_offset
  589. m = self.re_trailer_end.search(self.buf, search_start_offset)
  590. check_format_condition(m is not None, "trailer end not found")
  591. # make sure we found the LAST trailer
  592. last_match = m
  593. while m:
  594. last_match = m
  595. m = self.re_trailer_end.search(self.buf, m.start() + 16)
  596. if not m:
  597. m = last_match
  598. assert m is not None
  599. trailer_data = m.group(1)
  600. self.last_xref_section_offset = int(m.group(2))
  601. self.trailer_dict = self.interpret_trailer(trailer_data)
  602. self.xref_table = XrefTable()
  603. self.read_xref_table(xref_section_offset=self.last_xref_section_offset)
  604. if b"Prev" in self.trailer_dict:
  605. self.read_prev_trailer(self.trailer_dict[b"Prev"])
  606. def read_prev_trailer(self, xref_section_offset: int) -> None:
  607. assert self.buf is not None
  608. trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset)
  609. m = self.re_trailer_prev.search(
  610. self.buf[trailer_offset : trailer_offset + 16384]
  611. )
  612. check_format_condition(m is not None, "previous trailer not found")
  613. assert m is not None
  614. trailer_data = m.group(1)
  615. check_format_condition(
  616. int(m.group(2)) == xref_section_offset,
  617. "xref section offset in previous trailer doesn't match what was expected",
  618. )
  619. trailer_dict = self.interpret_trailer(trailer_data)
  620. if b"Prev" in trailer_dict:
  621. self.read_prev_trailer(trailer_dict[b"Prev"])
  622. re_whitespace_optional = re.compile(whitespace_optional)
  623. re_name = re.compile(
  624. whitespace_optional
  625. + rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?="
  626. + delimiter_or_ws
  627. + rb")"
  628. )
  629. re_dict_start = re.compile(whitespace_optional + rb"<<")
  630. re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional)
  631. @classmethod
  632. def interpret_trailer(cls, trailer_data: bytes) -> dict[bytes, Any]:
  633. trailer = {}
  634. offset = 0
  635. while True:
  636. m = cls.re_name.match(trailer_data, offset)
  637. if not m:
  638. m = cls.re_dict_end.match(trailer_data, offset)
  639. check_format_condition(
  640. m is not None and m.end() == len(trailer_data),
  641. "name not found in trailer, remaining data: "
  642. + repr(trailer_data[offset:]),
  643. )
  644. break
  645. key = cls.interpret_name(m.group(1))
  646. assert isinstance(key, bytes)
  647. value, value_offset = cls.get_value(trailer_data, m.end())
  648. trailer[key] = value
  649. if value_offset is None:
  650. break
  651. offset = value_offset
  652. check_format_condition(
  653. b"Size" in trailer and isinstance(trailer[b"Size"], int),
  654. "/Size not in trailer or not an integer",
  655. )
  656. check_format_condition(
  657. b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference),
  658. "/Root not in trailer or not an indirect reference",
  659. )
  660. return trailer
  661. re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?")
  662. @classmethod
  663. def interpret_name(cls, raw: bytes, as_text: bool = False) -> str | bytes:
  664. name = b""
  665. for m in cls.re_hashes_in_name.finditer(raw):
  666. if m.group(3):
  667. name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii"))
  668. else:
  669. name += m.group(1)
  670. if as_text:
  671. return name.decode("utf-8")
  672. else:
  673. return bytes(name)
  674. re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")")
  675. re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")")
  676. re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")")
  677. re_int = re.compile(
  678. whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")"
  679. )
  680. re_real = re.compile(
  681. whitespace_optional
  682. + rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?="
  683. + delimiter_or_ws
  684. + rb")"
  685. )
  686. re_array_start = re.compile(whitespace_optional + rb"\[")
  687. re_array_end = re.compile(whitespace_optional + rb"]")
  688. re_string_hex = re.compile(
  689. whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>"
  690. )
  691. re_string_lit = re.compile(whitespace_optional + rb"\(")
  692. re_indirect_reference = re.compile(
  693. whitespace_optional
  694. + rb"([-+]?[0-9]+)"
  695. + whitespace_mandatory
  696. + rb"([-+]?[0-9]+)"
  697. + whitespace_mandatory
  698. + rb"R(?="
  699. + delimiter_or_ws
  700. + rb")"
  701. )
  702. re_indirect_def_start = re.compile(
  703. whitespace_optional
  704. + rb"([-+]?[0-9]+)"
  705. + whitespace_mandatory
  706. + rb"([-+]?[0-9]+)"
  707. + whitespace_mandatory
  708. + rb"obj(?="
  709. + delimiter_or_ws
  710. + rb")"
  711. )
  712. re_indirect_def_end = re.compile(
  713. whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")"
  714. )
  715. re_comment = re.compile(
  716. rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*"
  717. )
  718. re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n")
  719. re_stream_end = re.compile(
  720. whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")"
  721. )
  722. @classmethod
  723. def get_value(
  724. cls,
  725. data: bytes | bytearray | mmap.mmap,
  726. offset: int,
  727. expect_indirect: IndirectReference | None = None,
  728. max_nesting: int = -1,
  729. ) -> tuple[Any, int | None]:
  730. if max_nesting == 0:
  731. return None, None
  732. m = cls.re_comment.match(data, offset)
  733. if m:
  734. offset = m.end()
  735. m = cls.re_indirect_def_start.match(data, offset)
  736. if m:
  737. check_format_condition(
  738. int(m.group(1)) > 0,
  739. "indirect object definition: object ID must be greater than 0",
  740. )
  741. check_format_condition(
  742. int(m.group(2)) >= 0,
  743. "indirect object definition: generation must be non-negative",
  744. )
  745. check_format_condition(
  746. expect_indirect is None
  747. or expect_indirect
  748. == IndirectReference(int(m.group(1)), int(m.group(2))),
  749. "indirect object definition different than expected",
  750. )
  751. object, object_offset = cls.get_value(
  752. data, m.end(), max_nesting=max_nesting - 1
  753. )
  754. if object_offset is None:
  755. return object, None
  756. m = cls.re_indirect_def_end.match(data, object_offset)
  757. check_format_condition(
  758. m is not None, "indirect object definition end not found"
  759. )
  760. assert m is not None
  761. return object, m.end()
  762. check_format_condition(
  763. not expect_indirect, "indirect object definition not found"
  764. )
  765. m = cls.re_indirect_reference.match(data, offset)
  766. if m:
  767. check_format_condition(
  768. int(m.group(1)) > 0,
  769. "indirect object reference: object ID must be greater than 0",
  770. )
  771. check_format_condition(
  772. int(m.group(2)) >= 0,
  773. "indirect object reference: generation must be non-negative",
  774. )
  775. return IndirectReference(int(m.group(1)), int(m.group(2))), m.end()
  776. m = cls.re_dict_start.match(data, offset)
  777. if m:
  778. offset = m.end()
  779. result: dict[Any, Any] = {}
  780. m = cls.re_dict_end.match(data, offset)
  781. current_offset: int | None = offset
  782. while not m:
  783. assert current_offset is not None
  784. key, current_offset = cls.get_value(
  785. data, current_offset, max_nesting=max_nesting - 1
  786. )
  787. if current_offset is None:
  788. return result, None
  789. value, current_offset = cls.get_value(
  790. data, current_offset, max_nesting=max_nesting - 1
  791. )
  792. result[key] = value
  793. if current_offset is None:
  794. return result, None
  795. m = cls.re_dict_end.match(data, current_offset)
  796. current_offset = m.end()
  797. m = cls.re_stream_start.match(data, current_offset)
  798. if m:
  799. stream_len = result.get(b"Length")
  800. if stream_len is None or not isinstance(stream_len, int):
  801. msg = f"bad or missing Length in stream dict ({stream_len})"
  802. raise PdfFormatError(msg)
  803. stream_data = data[m.end() : m.end() + stream_len]
  804. m = cls.re_stream_end.match(data, m.end() + stream_len)
  805. check_format_condition(m is not None, "stream end not found")
  806. assert m is not None
  807. current_offset = m.end()
  808. return PdfStream(PdfDict(result), stream_data), current_offset
  809. return PdfDict(result), current_offset
  810. m = cls.re_array_start.match(data, offset)
  811. if m:
  812. offset = m.end()
  813. results = []
  814. m = cls.re_array_end.match(data, offset)
  815. current_offset = offset
  816. while not m:
  817. assert current_offset is not None
  818. value, current_offset = cls.get_value(
  819. data, current_offset, max_nesting=max_nesting - 1
  820. )
  821. results.append(value)
  822. if current_offset is None:
  823. return results, None
  824. m = cls.re_array_end.match(data, current_offset)
  825. return results, m.end()
  826. m = cls.re_null.match(data, offset)
  827. if m:
  828. return None, m.end()
  829. m = cls.re_true.match(data, offset)
  830. if m:
  831. return True, m.end()
  832. m = cls.re_false.match(data, offset)
  833. if m:
  834. return False, m.end()
  835. m = cls.re_name.match(data, offset)
  836. if m:
  837. return PdfName(cls.interpret_name(m.group(1))), m.end()
  838. m = cls.re_int.match(data, offset)
  839. if m:
  840. return int(m.group(1)), m.end()
  841. m = cls.re_real.match(data, offset)
  842. if m:
  843. # XXX Decimal instead of float???
  844. return float(m.group(1)), m.end()
  845. m = cls.re_string_hex.match(data, offset)
  846. if m:
  847. # filter out whitespace
  848. hex_string = bytearray(
  849. b for b in m.group(1) if b in b"0123456789abcdefABCDEF"
  850. )
  851. if len(hex_string) % 2 == 1:
  852. # append a 0 if the length is not even - yes, at the end
  853. hex_string.append(ord(b"0"))
  854. return bytearray.fromhex(hex_string.decode("us-ascii")), m.end()
  855. m = cls.re_string_lit.match(data, offset)
  856. if m:
  857. return cls.get_literal_string(data, m.end())
  858. # return None, offset # fallback (only for debugging)
  859. msg = f"unrecognized object: {repr(data[offset : offset + 32])}"
  860. raise PdfFormatError(msg)
  861. re_lit_str_token = re.compile(
  862. rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))"
  863. )
  864. escaped_chars = {
  865. b"n": b"\n",
  866. b"r": b"\r",
  867. b"t": b"\t",
  868. b"b": b"\b",
  869. b"f": b"\f",
  870. b"(": b"(",
  871. b")": b")",
  872. b"\\": b"\\",
  873. ord(b"n"): b"\n",
  874. ord(b"r"): b"\r",
  875. ord(b"t"): b"\t",
  876. ord(b"b"): b"\b",
  877. ord(b"f"): b"\f",
  878. ord(b"("): b"(",
  879. ord(b")"): b")",
  880. ord(b"\\"): b"\\",
  881. }
  882. @classmethod
  883. def get_literal_string(
  884. cls, data: bytes | bytearray | mmap.mmap, offset: int
  885. ) -> tuple[bytes, int]:
  886. nesting_depth = 0
  887. result = bytearray()
  888. for m in cls.re_lit_str_token.finditer(data, offset):
  889. result.extend(data[offset : m.start()])
  890. if m.group(1):
  891. result.extend(cls.escaped_chars[m.group(1)[1]])
  892. elif m.group(2):
  893. result.append(int(m.group(2)[1:], 8))
  894. elif m.group(3):
  895. pass
  896. elif m.group(5):
  897. result.extend(b"\n")
  898. elif m.group(6):
  899. result.extend(b"(")
  900. nesting_depth += 1
  901. elif m.group(7):
  902. if nesting_depth == 0:
  903. return bytes(result), m.end()
  904. result.extend(b")")
  905. nesting_depth -= 1
  906. offset = m.end()
  907. msg = "unfinished literal string"
  908. raise PdfFormatError(msg)
  909. re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline)
  910. re_xref_subsection_start = re.compile(
  911. whitespace_optional
  912. + rb"([0-9]+)"
  913. + whitespace_mandatory
  914. + rb"([0-9]+)"
  915. + whitespace_optional
  916. + newline_only
  917. )
  918. re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)")
  919. def read_xref_table(self, xref_section_offset: int) -> int:
  920. assert self.buf is not None
  921. subsection_found = False
  922. m = self.re_xref_section_start.match(
  923. self.buf, xref_section_offset + self.start_offset
  924. )
  925. check_format_condition(m is not None, "xref section start not found")
  926. assert m is not None
  927. offset = m.end()
  928. while True:
  929. m = self.re_xref_subsection_start.match(self.buf, offset)
  930. if not m:
  931. check_format_condition(
  932. subsection_found, "xref subsection start not found"
  933. )
  934. break
  935. subsection_found = True
  936. offset = m.end()
  937. first_object = int(m.group(1))
  938. num_objects = int(m.group(2))
  939. for i in range(first_object, first_object + num_objects):
  940. m = self.re_xref_entry.match(self.buf, offset)
  941. check_format_condition(m is not None, "xref entry not found")
  942. assert m is not None
  943. offset = m.end()
  944. is_free = m.group(3) == b"f"
  945. if not is_free:
  946. generation = int(m.group(2))
  947. new_entry = (int(m.group(1)), generation)
  948. if i not in self.xref_table:
  949. self.xref_table[i] = new_entry
  950. return offset
  951. def read_indirect(self, ref: IndirectReference, max_nesting: int = -1) -> Any:
  952. offset, generation = self.xref_table[ref[0]]
  953. check_format_condition(
  954. generation == ref[1],
  955. f"expected to find generation {ref[1]} for object ID {ref[0]} in xref "
  956. f"table, instead found generation {generation} at offset {offset}",
  957. )
  958. assert self.buf is not None
  959. value = self.get_value(
  960. self.buf,
  961. offset + self.start_offset,
  962. expect_indirect=IndirectReference(*ref),
  963. max_nesting=max_nesting,
  964. )[0]
  965. self.cached_objects[ref] = value
  966. return value
  967. def linearize_page_tree(
  968. self, node: PdfDict | None = None
  969. ) -> list[IndirectReference]:
  970. page_node = node if node is not None else self.page_tree_root
  971. check_format_condition(
  972. page_node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages"
  973. )
  974. pages = []
  975. for kid in page_node[b"Kids"]:
  976. kid_object = self.read_indirect(kid)
  977. if kid_object[b"Type"] == b"Page":
  978. pages.append(kid)
  979. else:
  980. pages.extend(self.linearize_page_tree(node=kid_object))
  981. return pages