processing_donut.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # Copyright 2022 The HuggingFace Inc. team.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """
  15. Processor class for Donut.
  16. """
  17. import re
  18. from ...image_utils import ImageInput
  19. from ...processing_utils import ProcessingKwargs, ProcessorMixin, Unpack
  20. from ...tokenization_utils_base import PreTokenizedInput, TextInput
  21. from ...utils import auto_docstring, logging
  22. class DonutProcessorKwargs(ProcessingKwargs, total=False):
  23. _defaults = {}
  24. logger = logging.get_logger(__name__)
  25. @auto_docstring
  26. class DonutProcessor(ProcessorMixin):
  27. def __init__(self, image_processor=None, tokenizer=None, **kwargs):
  28. super().__init__(image_processor, tokenizer)
  29. @auto_docstring
  30. def __call__(
  31. self,
  32. images: ImageInput | None = None,
  33. text: str | list[str] | TextInput | PreTokenizedInput | None = None,
  34. **kwargs: Unpack[DonutProcessorKwargs],
  35. ):
  36. if images is None and text is None:
  37. raise ValueError("You need to specify either an `images` or `text` input to process.")
  38. output_kwargs = self._merge_kwargs(
  39. DonutProcessorKwargs,
  40. tokenizer_init_kwargs=self.tokenizer.init_kwargs,
  41. **kwargs,
  42. )
  43. if images is not None:
  44. inputs = self.image_processor(images, **output_kwargs["images_kwargs"])
  45. if text is not None:
  46. if images is not None:
  47. output_kwargs["text_kwargs"].setdefault("add_special_tokens", False)
  48. encodings = self.tokenizer(text, **output_kwargs["text_kwargs"])
  49. if text is None:
  50. return inputs
  51. elif images is None:
  52. return encodings
  53. else:
  54. inputs["labels"] = encodings["input_ids"] # for BC
  55. inputs["input_ids"] = encodings["input_ids"]
  56. return inputs
  57. @property
  58. def model_input_names(self):
  59. image_processor_input_names = self.image_processor.model_input_names
  60. return list(image_processor_input_names + ["input_ids", "labels"])
  61. def token2json(self, tokens, is_inner_value=False, added_vocab=None):
  62. """
  63. Convert a (generated) token sequence into an ordered JSON format.
  64. """
  65. if added_vocab is None:
  66. added_vocab = self.tokenizer.get_added_vocab()
  67. output = {}
  68. while tokens:
  69. # We want r"<s_(.*?)>" but without ReDOS risk, so do it manually in two parts
  70. potential_start = re.search(r"<s_", tokens, re.IGNORECASE)
  71. if potential_start is None:
  72. break
  73. start_token = tokens[potential_start.start() :]
  74. if ">" not in start_token:
  75. break
  76. start_token = start_token[: start_token.index(">") + 1]
  77. key = start_token[len("<s_") : -len(">")]
  78. key_escaped = re.escape(key)
  79. end_token = re.search(rf"</s_{key_escaped}>", tokens, re.IGNORECASE)
  80. if end_token is None:
  81. tokens = tokens.replace(start_token, "")
  82. else:
  83. end_token = end_token.group()
  84. start_token_escaped = re.escape(start_token)
  85. end_token_escaped = re.escape(end_token)
  86. content = re.search(
  87. f"{start_token_escaped}(.*?){end_token_escaped}", tokens, re.IGNORECASE | re.DOTALL
  88. )
  89. if content is not None:
  90. content = content.group(1).strip()
  91. if r"<s_" in content and r"</s_" in content: # non-leaf node
  92. value = self.token2json(content, is_inner_value=True, added_vocab=added_vocab)
  93. if value:
  94. if len(value) == 1:
  95. value = value[0]
  96. output[key] = value
  97. else: # leaf nodes
  98. output[key] = []
  99. for leaf in content.split(r"<sep/>"):
  100. leaf = leaf.strip()
  101. if leaf in added_vocab and leaf[0] == "<" and leaf[-2:] == "/>":
  102. leaf = leaf[1:-2] # for categorical special tokens
  103. output[key].append(leaf)
  104. if len(output[key]) == 1:
  105. output[key] = output[key][0]
  106. tokens = tokens[tokens.find(end_token) + len(end_token) :].strip()
  107. if tokens[:6] == r"<sep/>": # non-leaf nodes
  108. return [output] + self.token2json(tokens[6:], is_inner_value=True, added_vocab=added_vocab)
  109. if output:
  110. return [output] if is_inner_value else output
  111. else:
  112. return [] if is_inner_value else {"text_sequence": tokens}
  113. __all__ = ["DonutProcessor"]