processing_smolvlm.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. # Copyright 2025 The HuggingFace Inc. team.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """
  15. Processor class for SmolVLM.
  16. """
  17. from datetime import timedelta
  18. from typing import TYPE_CHECKING, Union
  19. from ...feature_extraction_utils import BatchFeature
  20. from ...image_utils import ImageInput, make_nested_list_of_images
  21. from ...processing_utils import ProcessingKwargs, ProcessorMixin, Unpack
  22. from ...tokenization_utils_base import BatchEncoding, TextInput
  23. from ...utils import auto_docstring, is_num2words_available, logging
  24. from ...video_utils import VideoInput
  25. # Adapted from transformers.models.smolvlm.video_processing_smolvlm.DEFAULT_VIDEO_INTRO
  26. DEFAULT_VIDEO_INTRO = (
  27. "You are provided the following series of {frame_count} frames from a {video_duration} [H:MM:SS] video.\n"
  28. )
  29. # Adapted from transformers.models.smolvlm.video_processing_smolvlm.DEFAULT_MEDIA_OUTTRO
  30. DEFAULT_MEDIA_OUTTRO = "\n\n"
  31. # Adapted from transformers.models.smolvlm.video_processing_smolvlm.FRAME_TIMESTAMP_MESSAGE
  32. FRAME_TIMESTAMP_MESSAGE = "\nFrame from {timestamp}:"
  33. if TYPE_CHECKING:
  34. from ...tokenization_utils_base import PreTokenizedInput
  35. logger = logging.get_logger(__name__)
  36. if is_num2words_available():
  37. from num2words import num2words
  38. else:
  39. num2words = None
  40. # The correct chat template to be used for videos after #38105
  41. DEFAULT_CHAT_TEMPLATE = "<|im_start|>{% for message in messages %}{{message['role'] | capitalize}}{% if message['content'][0]['type'] == 'image' %}{{':'}}{% else %}{{': '}}{% endif %}{% for line in message['content'] %}{% if line['type'] == 'text' %}{{line['text']}}{% elif line['type'] == 'image' %}{{ '<image>' }}{% elif line['type'] == 'video' %}{{ '<video>' }}{% endif %}{% endfor %}<end_of_utterance>\n{% endfor %}{% if add_generation_prompt %}{{ 'Assistant:' }}{% endif %}"
  42. def _prompt_split_image(
  43. image_seq_len, image_rows, image_cols, fake_token_around_image, image_token, global_image_token
  44. ):
  45. """Prompt with expanded image tokens for when the image is split into patches."""
  46. text_split_images = ""
  47. for n_h in range(image_rows):
  48. for n_w in range(image_cols):
  49. text_split_images += (
  50. f"{fake_token_around_image}" + f"<row_{n_h + 1}_col_{n_w + 1}>" + f"{image_token}" * image_seq_len
  51. )
  52. text_split_images += "\n"
  53. text_split_images += (
  54. f"\n{fake_token_around_image}"
  55. + f"{global_image_token}"
  56. + f"{image_token}" * image_seq_len
  57. + f"{fake_token_around_image}"
  58. )
  59. return text_split_images
  60. def _prompt_single_image(image_seq_len, fake_token_around_image, image_token, global_image_token):
  61. """Prompt with expanded image tokens for a single image."""
  62. return (
  63. f"{fake_token_around_image}"
  64. + f"{global_image_token}"
  65. + f"{image_token}" * image_seq_len
  66. + f"{fake_token_around_image}"
  67. )
  68. def get_image_prompt_string(
  69. image_rows, image_cols, image_seq_len, fake_token_around_image, image_token, global_image_token
  70. ):
  71. if image_rows == 0 and image_cols == 0:
  72. return _prompt_single_image(
  73. image_seq_len,
  74. fake_token_around_image=fake_token_around_image,
  75. image_token=image_token,
  76. global_image_token=global_image_token,
  77. )
  78. return _prompt_split_image(
  79. image_seq_len, image_rows, image_cols, fake_token_around_image, image_token, global_image_token
  80. )
  81. class SmolVLMProcessorKwargs(ProcessingKwargs, total=False):
  82. _defaults = {
  83. "text_kwargs": {
  84. "add_special_tokens": True,
  85. "padding": False,
  86. "is_split_into_words": False,
  87. },
  88. "images_kwargs": {
  89. "return_row_col_info": True,
  90. },
  91. "videos_kwargs": {
  92. "return_metadata": True,
  93. },
  94. }
  95. @auto_docstring
  96. class SmolVLMProcessor(ProcessorMixin):
  97. def __init__(
  98. self,
  99. image_processor,
  100. tokenizer,
  101. video_processor,
  102. image_seq_len: int = 169,
  103. chat_template: str | None = None,
  104. **kwargs,
  105. ):
  106. r"""
  107. image_seq_len (`int`, *optional*, defaults to 169):
  108. The length of the image sequence i.e. the number of <image> tokens per image in the input.
  109. This parameter is used to build the string from the input prompt and image tokens and should match the
  110. value the model used. It is computed as: image_seq_len = int(((image_size // patch_size) ** 2) / (scale_factor**2))
  111. """
  112. self.fake_image_token = getattr(tokenizer, "fake_image_token", "<fake_token_around_image>")
  113. self.image_token = getattr(tokenizer, "image_token", "<image>")
  114. self.image_token_id = tokenizer.convert_tokens_to_ids(self.image_token)
  115. self.end_of_utterance_token = getattr(tokenizer, "end_of_utterance_token", "<end_of_utterance>")
  116. self.global_image_token = getattr(tokenizer, "global_image_token", "<global-img>")
  117. self.image_seq_len = image_seq_len
  118. self.video_token = getattr(tokenizer, "video_token", "<video>")
  119. if not num2words:
  120. raise ImportError(
  121. "Package `num2words` is required to run SmolVLM processor. Install it with `pip install num2words`."
  122. )
  123. super().__init__(image_processor, tokenizer, video_processor, chat_template=chat_template, **kwargs)
  124. def expand_text_with_image_tokens(self, text, image_rows, image_cols):
  125. prompt_strings = []
  126. for sample, sample_rows, sample_cols in zip(text, image_rows, image_cols):
  127. # Replace the image token with fake tokens around the expanded image token sequence of length `image_seq_len`
  128. image_prompt_strings = []
  129. for n_rows, n_cols in zip(sample_rows, sample_cols):
  130. image_prompt_string = get_image_prompt_string(
  131. n_rows,
  132. n_cols,
  133. self.image_seq_len,
  134. image_token=self.image_token,
  135. fake_token_around_image=self.fake_image_token,
  136. global_image_token=self.global_image_token,
  137. )
  138. image_prompt_strings.append(image_prompt_string)
  139. split_sample = sample.split(self.image_token)
  140. if len(split_sample) == 0:
  141. raise ValueError("The image token should be present in the text.")
  142. # Place in the image prompt strings where the image tokens are
  143. sample = split_sample[0]
  144. for i, image_prompt_string in enumerate(image_prompt_strings):
  145. sample += image_prompt_string + split_sample[i + 1]
  146. prompt_strings.append(sample)
  147. return prompt_strings
  148. def expand_text_with_video_tokens(self, text, video_inputs):
  149. num_frames = video_inputs["pixel_values"].shape[1]
  150. video_metadata = iter(video_inputs["video_metadata"])
  151. prompt_strings = []
  152. for sample in text:
  153. while self.video_token in sample:
  154. metadata = next(video_metadata)
  155. if metadata.fps is None:
  156. logger.warning_once(
  157. "SmolVLM requires frame timestamps to construct prompts, but the `fps` of the input video could not be inferred. "
  158. "Probably `video_metadata` was missing from inputs and you passed pre-sampled frames. "
  159. "Defaulting to `fps=24`. Please provide `video_metadata` for more accurate results."
  160. )
  161. metadata.fps = 24 # Set the default fps to 24 for BC, otherwise `timestamps` can't be inferred
  162. timestamps = [(int(second // 60), int(second % 60)) for second in metadata.timestamps]
  163. duration = int(metadata.duration) if metadata.duration is not None else int(metadata.timestamps[-1])
  164. duration_td = timedelta(seconds=int(duration))
  165. image_prompt_strings = DEFAULT_VIDEO_INTRO.format(
  166. frame_count=num2words(num_frames), video_duration=str(duration_td)
  167. )
  168. for timestamp in timestamps:
  169. image_prompt_string = _prompt_single_image(
  170. self.image_seq_len,
  171. image_token=self.image_token,
  172. fake_token_around_image=self.fake_image_token,
  173. global_image_token=self.global_image_token,
  174. )
  175. timestamp = f"{timestamp[0]:02d}:{timestamp[1]:02d}"
  176. image_prompt_string = FRAME_TIMESTAMP_MESSAGE.format(timestamp=timestamp) + image_prompt_string
  177. image_prompt_strings += image_prompt_string
  178. image_prompt_strings += DEFAULT_MEDIA_OUTTRO
  179. sample = sample.replace(self.video_token, image_prompt_strings, 1)
  180. prompt_strings.append(sample)
  181. return prompt_strings
  182. @auto_docstring
  183. def __call__(
  184. self,
  185. images: ImageInput | list[ImageInput] | list[list[ImageInput]] = None,
  186. text: Union[TextInput, "PreTokenizedInput", list[TextInput], list["PreTokenizedInput"]] = None,
  187. videos: VideoInput | None = None,
  188. **kwargs: Unpack[SmolVLMProcessorKwargs],
  189. ) -> BatchEncoding:
  190. if text is None and images is None and videos is None:
  191. raise ValueError("You must provide one of `text`, `images` or `videos'.")
  192. if text is None and ((images is None) ^ (videos is not None)):
  193. raise ValueError("You must specify exactly one of `images` or `videos`")
  194. output_kwargs = self._merge_kwargs(
  195. SmolVLMProcessorKwargs,
  196. tokenizer_init_kwargs=self.tokenizer.init_kwargs,
  197. **kwargs,
  198. )
  199. if text is not None:
  200. if isinstance(text, str):
  201. text = [text]
  202. elif not isinstance(text, list) and not isinstance(text[0], str):
  203. raise ValueError("Invalid input text. Please provide a string, or a list of strings")
  204. n_images_in_text = sum(sample.count(self.image_token) for sample in text)
  205. if n_images_in_text > 0 and (images is None and videos is None):
  206. raise ValueError(f"We detected {n_images_in_text} tokens in the text but no images/videos were passed")
  207. inputs = {}
  208. # Images and videos are mutually exclusive, so process one which is present
  209. if images is not None:
  210. images = self.image_processor.fetch_images(images)
  211. images = make_nested_list_of_images(images)
  212. vision_inputs = self.image_processor(images, **output_kwargs["images_kwargs"])
  213. image_rows = vision_inputs.pop("rows", None)
  214. image_cols = vision_inputs.pop("cols", None)
  215. inputs.update(vision_inputs)
  216. if text is not None:
  217. n_images_in_text = [sample.count(self.image_token) for sample in text]
  218. n_images_in_images = [len(sublist) for sublist in images]
  219. if n_images_in_images != n_images_in_text:
  220. raise ValueError(
  221. f"The number of images in the text {n_images_in_text} and images {n_images_in_images} should be the same."
  222. )
  223. # Set default values for image_rows and image_cols if not provided
  224. if image_rows is None:
  225. image_rows = [[0] * n_images for n_images in n_images_in_text]
  226. if image_cols is None:
  227. image_cols = [[0] * n_images for n_images in n_images_in_text]
  228. text = self.expand_text_with_image_tokens(text, image_rows=image_rows, image_cols=image_cols)
  229. elif videos is not None:
  230. vision_inputs = self.video_processor(videos, **output_kwargs["videos_kwargs"])
  231. if text is not None:
  232. n_videos_in_text = [sample.count(self.video_token) for sample in text]
  233. n_videos_in_videos = [len(sublist) for sublist in videos]
  234. if n_videos_in_videos != n_videos_in_text:
  235. raise ValueError(
  236. f"The number of videos in the text {n_videos_in_text} and videos {n_videos_in_videos} should be the same."
  237. )
  238. text = self.expand_text_with_video_tokens(text, vision_inputs)
  239. # If user has not requested video metadata, pop it. By default metadata
  240. # is always returned to expand video tokens correctly
  241. if not kwargs.get("return_metadata"):
  242. vision_inputs.pop("video_metadata")
  243. inputs.update(vision_inputs)
  244. return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None)
  245. if text is not None:
  246. text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"])
  247. self._check_special_mm_tokens(text, text_inputs, modalities=["image"])
  248. inputs.update(text_inputs)
  249. return BatchFeature(inputs, tensor_type=return_tensors)
  250. def apply_chat_template(
  251. self,
  252. conversation: list[dict[str, str]] | list[list[dict[str, str]]],
  253. chat_template: str | None = None,
  254. processor_kwargs: dict | None = None,
  255. **kwargs,
  256. ) -> str:
  257. """
  258. Similar to the `apply_chat_template` method on tokenizers, this method applies a Jinja template to input
  259. conversations to turn them into a single tokenizable string.
  260. The input is expected to be in the following format, where each message content is a list consisting of text and
  261. optionally image or video inputs. One can also provide an image, video, URL or local path which will be used to form
  262. `pixel_values` when `return_dict=True`. If not provided, one will get only the formatted text, optionally tokenized text.
  263. conversation = [
  264. {
  265. "role": "user",
  266. "content": [
  267. {"type": "image", "url": "https://www.ilankelman.org/stopsigns/australia.jpg"},
  268. {"type": "text", "text": "Please describe this image in detail."},
  269. ],
  270. },
  271. ]
  272. Args:
  273. conversation (`Union[list[Dict, [str, str]], list[list[dict[str, str]]]]`):
  274. The conversation to format.
  275. chat_template (`Optional[str]`, *optional*):
  276. The Jinja template to use for formatting the conversation. If not provided, the tokenizer's
  277. chat template is used.
  278. """
  279. if isinstance(conversation, (list, tuple)) and (
  280. isinstance(conversation[0], (list, tuple)) or hasattr(conversation[0], "content")
  281. ):
  282. conversations = conversation
  283. else:
  284. conversations = [conversation]
  285. has_video = any(
  286. (isinstance(content, dict) and content["type"] == "video")
  287. for conversation in conversations
  288. for message in conversation
  289. for content in message["content"]
  290. )
  291. if chat_template is None and has_video:
  292. # re-assign to the correct default template for BC, if user is not requesting their own template
  293. chat_template = DEFAULT_CHAT_TEMPLATE
  294. # Users might be passing processor kwargs simply as `**kwargs`
  295. if processor_kwargs:
  296. processor_kwargs.setdefault("num_frames", self.video_processor.num_frames)
  297. processor_kwargs.setdefault("fps", self.video_processor.fps)
  298. else:
  299. kwargs.setdefault("num_frames", self.video_processor.num_frames)
  300. kwargs.setdefault("fps", self.video_processor.fps)
  301. return super().apply_chat_template(conversation, chat_template, processor_kwargs=processor_kwargs, **kwargs)
  302. __all__ = ["SmolVLMProcessor"]