| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- # Copyright 2026 The HuggingFace Team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from __future__ import annotations
- import json
- import re
- from transformers.utils import is_jmespath_available
- if is_jmespath_available():
- import jmespath
- else:
- jmespath = None
- def _gemma4_json_to_json(text: str) -> str:
- """Convert Gemma4 tool call format (unquoted keys, ``<|"|>`` string delimiters) to valid JSON."""
- strings = []
- def _capture(m):
- strings.append(m.group(1))
- return f"\x00{len(strings) - 1}\x00"
- # Grab the inside of gemma-quotes and store them for later
- text = re.sub(r'<\|"\|>(.*?)<\|"\|>', _capture, text, flags=re.DOTALL)
- # Add quotes to the bare keys elsewhere
- text = re.sub(r"(?<=[{,])(\w+):", r'"\1":', text)
- # Put the inside of the quotes back afterwards
- for i, s in enumerate(strings):
- text = text.replace(f"\x00{i}\x00", json.dumps(s))
- return text
- def _parse_re_match(node_match: re.Match) -> dict | str:
- # If the regex has named groups, return a dict of those groups
- if node_match.groupdict():
- return {key: val for key, val in node_match.groupdict().items() if val is not None}
- # Otherwise the regex must have exactly one unnamed group, and we return that
- else:
- groups = list(node_match.groups())
- if len(groups) > 1:
- raise ValueError(f"Regex has multiple unnamed groups!\nGroups: {groups}\n")
- elif len(groups) == 0:
- raise ValueError(f"Regex has no capture groups:\n\n{node_match.group(0)}")
- return groups[0]
- def recursive_parse(
- node_content: str | list | dict,
- node_schema: dict,
- ):
- """
- This function takes content and a JSON schema which includes
- regex extractors, and recursively parses the content. The output
- should be a data structure matching the schema.
- Args:
- node_content: The content corresponding to this node. Usually a string, but can be something else
- if the parent node has multiple capture groups or named groups. In that case,
- we generally pass the capture groups straight through to the children of this node
- and don't do any parsing at this level.
- node_schema: The schema node controlling the parsing.
- Returns:
- The parsed data structure for the current node.
- """
- # If the schema has a const, we just return that value and do absolutely nothing else
- if "const" in node_schema:
- return node_schema["const"]
- # If the node content is None, we return None. EZ.
- if node_content is None:
- return None
- # If not, we have to do a little parsing. First, set some vars and do basic validation
- node_type = node_schema.get("type")
- has_regex = (
- "x-regex" in node_schema
- or "x-regex-iterator" in node_schema
- or "x-regex-key-value" in node_schema
- or "x-regex-substitutions" in node_schema
- )
- if has_regex and not isinstance(node_content, str):
- raise TypeError(
- "Schema node got a non-string input, but has a regex for parsing or substitution.\n"
- f"Input: {node_content}\n"
- f"Schema: {node_schema}"
- )
- node_subs = node_schema.get("x-regex-substitutions", [])
- for node_sub in node_subs:
- node_content = re.sub(node_sub[0], node_sub[1], node_content, flags=re.DOTALL)
- node_regex = node_schema.get("x-regex")
- node_regex_iterator = node_schema.get("x-regex-iterator")
- node_regex_to_dict = node_schema.get("x-regex-key-value")
- if node_regex is not None:
- node_match = re.search(node_regex, node_content, flags=re.DOTALL)
- if not node_match:
- return None
- node_content = _parse_re_match(node_match)
- if node_regex_iterator is not None:
- if node_type != "array":
- raise TypeError(f"Schema node with type {node_type} cannot use x-regex-iterator.\nSchema: {node_schema}")
- # Note that this can be applied after a standard node-regex search
- node_content = [
- _parse_re_match(node_match)
- for node_match in re.finditer(node_regex_iterator, node_content, flags=re.DOTALL)
- ]
- if not node_content:
- return None
- if node_regex_to_dict is not None:
- if node_type != "object":
- raise TypeError(f"Schema node with type {node_type} cannot use x-regex-key-value.\nSchema: {node_schema}")
- # Note that this can be applied after a standard node-regex search
- output_content = {}
- for node_match in re.finditer(node_regex_to_dict, node_content, flags=re.DOTALL):
- match_groups = _parse_re_match(node_match)
- if not isinstance(match_groups, dict) or "key" not in match_groups or "value" not in match_groups:
- raise ValueError(
- f"Regex for x-regex-key-value must have named groups 'key' and 'value'.\n"
- f"Match groups: {match_groups}\n"
- f"Schema: {node_schema}"
- )
- output_content[match_groups["key"]] = match_groups["value"]
- node_content = output_content
- if not node_content:
- return None
- # Next, if the node has a parser, apply it. We do this after regexes so that the regex can extract
- # a substring to parse, if needed.
- if "x-parser" in node_schema:
- parser = node_schema["x-parser"]
- if parser == "gemma4-tool-call":
- if not isinstance(node_content, str):
- raise TypeError(
- f"Node has Gemma4 tool call parser but got non-string input: {node_content}\nSchema: {node_schema}"
- )
- node_content = _gemma4_json_to_json(node_content)
- parser = "json" # fall through to the JSON parser below - don't add an elif!
- if parser == "json":
- if not isinstance(node_content, str):
- raise TypeError(
- f"Node has JSON parser but got non-string input: {node_content}\nSchema: {node_schema}"
- )
- parser_args = node_schema.get("x-parser-args", {})
- transform = parser_args.get("transform")
- allow_non_json = parser_args.get("allow_non_json", False)
- try:
- parsed_json = json.loads(node_content)
- except json.JSONDecodeError as e:
- if allow_non_json:
- parsed_json = node_content
- else:
- raise ValueError(
- f"Node has JSON parser but could not parse its contents as JSON. You can use the `allow_non_json` parser arg for nodes which may contain JSON or string content.\n\nContent: {node_content}\n\nError: {e}"
- )
- if transform is not None:
- if jmespath is None:
- raise ImportError(
- "Chat response schema includes a jmespath transformation, but jmespath is not installed. You can install it with `pip install jmespath`."
- )
- parsed_json = jmespath.search(parser_args["transform"], parsed_json)
- node_content = parsed_json
- else:
- raise ValueError(f"Unknown parser {parser} for schema node: {node_schema}")
- # Finally, handle parsed content based on schema type and recurse if required
- if node_type == "object":
- parsed_schema = {}
- if isinstance(node_content, str):
- # This means we don't have a regex at this level, so all of our child nodes need to parse the whole
- # string themselves to extract their value.
- if "properties" not in node_schema:
- raise ValueError(
- f"Object node received string content but has no regex or parser to handle it.\n"
- f"Content: {node_content}\n"
- f"Schema: {node_schema}"
- )
- for key, child_node in node_schema["properties"].items():
- child_node_content = recursive_parse(node_content, node_schema["properties"][key])
- if child_node_content is not None:
- parsed_schema[key] = child_node_content
- elif isinstance(node_content, dict):
- for key, child_node in node_schema.get("properties", {}).items():
- if "const" in child_node:
- parsed_schema[key] = child_node["const"]
- elif key in node_content:
- parsed_schema[key] = recursive_parse(node_content[key], child_node)
- elif "default" in child_node:
- parsed_schema[key] = child_node["default"]
- additional_schema = node_schema.get("additionalProperties", True)
- # We want to check only for False values; {} is "falsy" but should pass through
- if additional_schema is not False:
- additional_schema = additional_schema if isinstance(additional_schema, dict) else {}
- for key, value in node_content.items():
- if key not in node_schema.get("properties", {}):
- parsed_schema[key] = recursive_parse(value, additional_schema)
- else:
- raise TypeError(f"Expected a dict or str for schema node with type object, got {node_content}")
- required = node_schema.get("required", [])
- missing = [key for key in required if key not in parsed_schema]
- if missing:
- input_preview = repr(node_content[:500]) if isinstance(node_content, str) else repr(node_content)
- raise ValueError(
- f"Required fields {missing} are missing from parsed output.\n"
- f"Parsed: {parsed_schema}\n"
- f"Input: {input_preview}"
- )
- return parsed_schema
- elif node_type == "array":
- if not node_content:
- return []
- parsed_schema = []
- if "items" in node_schema:
- if not isinstance(node_content, list):
- raise TypeError(f"Expected a list or regex for schema node with type array, got {node_content}")
- for item in node_content:
- parsed_schema.append(recursive_parse(item, node_schema["items"]))
- return parsed_schema
- elif "prefixItems" in node_schema:
- if not isinstance(node_content, list):
- if len(node_schema["prefixItems"]) == 1:
- # If there's only one prefix item, this is a single item array, we can just wrap the string
- node_content = [node_content]
- else:
- raise TypeError(f"Expected a list or regex for schema node with type array, got {node_content}")
- if len(node_content) != len(node_schema["prefixItems"]):
- raise ValueError(
- f"Array node has {len(node_content)} items, but schema only has "
- f"{len(node_schema['prefixItems'])} prefixItems defined.\n"
- f"Content: {node_content}\n"
- f"Schema: {node_schema}"
- )
- for item, item_schema in zip(node_content, node_schema["prefixItems"]):
- parsed_schema.append(recursive_parse(item, item_schema))
- return parsed_schema
- else:
- raise ValueError(f"Array node has no items or prefixItems schema defined.\nSchema: {node_schema}")
- elif node_type in ("string", "integer", "number", "boolean"):
- if node_type == "integer":
- if isinstance(node_content, int):
- return node_content
- if not isinstance(node_content, str):
- raise TypeError(
- f"Expected a string or int for schema node with type integer, got {type(node_content).__name__}: {node_content}"
- )
- try:
- return int(node_content)
- except ValueError:
- raise ValueError(
- f"Schema node has type 'integer', but the parsed string content is not a valid integer: {node_content!r}"
- )
- elif node_type == "number":
- if isinstance(node_content, (int, float)):
- return float(node_content)
- if not isinstance(node_content, str):
- raise TypeError(
- f"Expected a string or number for schema node with type number, got {type(node_content).__name__}: {node_content}"
- )
- try:
- return float(node_content)
- except ValueError:
- raise ValueError(
- f"Schema node has type 'number', but the parsed string content is not a valid number: {node_content!r}"
- )
- elif node_type == "boolean":
- if isinstance(node_content, bool):
- return node_content
- if not isinstance(node_content, str):
- raise TypeError(
- f"Expected a string or bool for schema node with type boolean, got {type(node_content).__name__}: {node_content}"
- )
- if node_content.lower() in ("true", "1"):
- return True
- elif node_content.lower() in ("false", "0"):
- return False
- else:
- raise ValueError(f"Invalid boolean value: {node_content}")
- else:
- # String type
- if not isinstance(node_content, str):
- raise TypeError(
- f"Expected a string for schema node with type string, got {type(node_content).__name__}: {node_content}"
- )
- return node_content
- elif node_type is None or node_type == "any":
- return node_content # Don't touch it
- else:
- raise TypeError(f"Unsupported schema type {node_type} for node: {node_content}")
|