| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692 |
- from __future__ import annotations
- import copy
- import importlib
- from typing import Any, Literal, cast
- from .utils.constants import MISSING_VALUE, JSONReturnType, MissingValueType
- from .utils.pattern_properties import match_pattern_properties
- SchemaRepairMode = Literal["standard", "salvage"]
- SUPPORTED_SCHEMA_REPAIR_MODES: tuple[SchemaRepairMode, ...] = ("standard", "salvage")
- class SchemaDefinitionError(ValueError):
- """Raised when schema metadata is invalid or unsupported."""
- def normalize_schema_repair_mode(mode: str | None) -> SchemaRepairMode:
- if mode is None:
- return "standard"
- if mode in SUPPORTED_SCHEMA_REPAIR_MODES:
- return cast("SchemaRepairMode", mode)
- expected = ", ".join(SUPPORTED_SCHEMA_REPAIR_MODES)
- raise ValueError(f"schema_repair_mode must be one of: {expected}.")
- def _require_jsonschema() -> Any:
- try:
- return importlib.import_module("jsonschema")
- except ImportError as exc: # pragma: no cover - optional dependency
- raise ValueError("jsonschema is required when using schema-aware repair.") from exc
- def _require_pydantic() -> Any:
- try:
- return importlib.import_module("pydantic")
- except ImportError as exc: # pragma: no cover - optional dependency
- raise ValueError("pydantic is required when using schema models.") from exc
- def _prepare_schema_for_validation_node(node: Any) -> Any:
- if isinstance(node, dict):
- normalized = {key: _prepare_schema_for_validation_node(value) for key, value in node.items()}
- items = normalized.get("items")
- if isinstance(items, list):
- normalized.pop("items", None)
- normalized["prefixItems"] = items
- additional_items = normalized.pop("additionalItems", None)
- if additional_items is False:
- normalized["items"] = False
- elif isinstance(additional_items, dict):
- normalized["items"] = additional_items
- return normalized
- if isinstance(node, list):
- return [_prepare_schema_for_validation_node(item) for item in node]
- return node
- def load_schema_model(path: str) -> type[Any]:
- if ":" not in path:
- raise ValueError("Schema model must be in the form 'module:ClassName'.")
- module_name, class_name = path.split(":", 1)
- module = importlib.import_module(module_name)
- model: object | None = module.__dict__.get(class_name)
- if model is None or not isinstance(model, type):
- raise ValueError(f"Schema model '{class_name}' not found in module '{module_name}'.")
- return model
- def normalize_missing_values(value: object) -> JSONReturnType:
- if value is MISSING_VALUE or isinstance(value, MissingValueType):
- return ""
- if isinstance(value, dict):
- normalized: dict[str, JSONReturnType] = {}
- for key, item in value.items():
- if not isinstance(key, str):
- raise ValueError("Object keys must be strings.")
- normalized[key] = normalize_missing_values(item)
- return normalized
- if isinstance(value, list):
- return [normalize_missing_values(item) for item in value]
- if value is None or isinstance(value, (str, int, float, bool)):
- return value
- raise ValueError("Value is not JSON compatible.")
- def schema_from_input(schema: Any) -> dict[str, Any] | bool:
- if isinstance(schema, dict):
- return schema
- if schema is True or schema is False:
- return schema
- if hasattr(schema, "model_json_schema"):
- pydantic = _require_pydantic()
- version = getattr(pydantic, "VERSION", getattr(pydantic, "__version__", "0"))
- if int(version.split(".")[0]) < 2:
- raise ValueError("pydantic v2 is required for schema models.")
- schema_dict: dict[str, Any] = schema.model_json_schema()
- if hasattr(schema, "model_fields"):
- properties = schema_dict.setdefault("properties", {})
- if not isinstance(properties, dict):
- properties = {}
- schema_dict["properties"] = properties
- for name, field in schema.model_fields.items():
- if field.is_required():
- continue
- property_schema = properties.setdefault(name, {})
- if not isinstance(property_schema, dict):
- property_schema = {}
- properties[name] = property_schema
- if "default" in property_schema:
- continue
- if field.default_factory is not None:
- property_schema["default"] = field.default_factory()
- else:
- property_schema["default"] = field.default
- return schema_dict
- raise ValueError("Schema must be a JSON Schema dict, boolean schema, or pydantic v2 model.")
- class SchemaRepairer:
- def __init__(
- self,
- schema: dict[str, Any] | bool,
- log: list[dict[str, str]] | None,
- schema_repair_mode: str = "standard",
- ) -> None:
- self.root_schema = schema
- self.log = log
- self.schema_repair_mode = normalize_schema_repair_mode(schema_repair_mode)
- self._validator_cache: dict[int, tuple[dict[str, Any], Any]] = {}
- def _log(self, text: str, path: str) -> None:
- if self.log is not None:
- self.log.append({"text": text, "context": path})
- def _get_validator(self, schema: dict[str, Any]) -> Any:
- cache_key = id(schema)
- cached_validator = self._validator_cache.get(cache_key)
- if cached_validator is not None and cached_validator[0] is schema:
- return cached_validator[1]
- prepared_schema = self._prepare_schema_for_validation(schema)
- jsonschema = _require_jsonschema()
- validator_cls = jsonschema.validators.validator_for(prepared_schema)
- validator = validator_cls(prepared_schema)
- self._validator_cache[cache_key] = (schema, validator)
- return validator
- def is_valid(self, value: JSONReturnType, schema: dict[str, Any] | bool) -> bool:
- schema = self.resolve_schema(schema)
- if schema is True:
- return True
- if schema is False:
- return False
- validator = self._get_validator(schema)
- return bool(validator.is_valid(value))
- def validate(self, value: JSONReturnType, schema: dict[str, Any] | bool) -> None:
- schema = self.resolve_schema(schema)
- if schema is True:
- return
- if schema is False:
- raise ValueError("Schema does not allow any values.")
- jsonschema = _require_jsonschema()
- validator = self._get_validator(schema)
- try:
- validator.validate(value)
- except jsonschema.exceptions.ValidationError as exc:
- raise ValueError(exc.message) from exc
- def resolve_schema(self, schema: object | None) -> dict[str, Any] | bool:
- if schema is None:
- return True
- if isinstance(schema, bool):
- return schema
- if not isinstance(schema, dict):
- raise SchemaDefinitionError("Schema must be an object.")
- schema_dict: dict[str, Any] = {}
- for key, value in schema.items():
- if not isinstance(key, str):
- raise SchemaDefinitionError("Schema keys must be strings.")
- schema_dict[key] = value
- while "$ref" in schema_dict:
- ref = schema_dict["$ref"]
- resolved = self._resolve_ref(ref)
- if isinstance(resolved, bool):
- return resolved
- schema_dict = resolved
- return schema_dict
- def is_object_schema(self, schema: dict[str, Any] | bool | None) -> bool:
- schema = self.resolve_schema(schema)
- if not isinstance(schema, dict):
- return False
- schema_type = schema.get("type")
- if schema_type == "object":
- return True
- if isinstance(schema_type, list) and "object" in schema_type:
- return True
- return any(key in schema for key in ("properties", "patternProperties", "additionalProperties", "required"))
- def is_array_schema(self, schema: dict[str, Any] | bool | None) -> bool:
- schema = self.resolve_schema(schema)
- if not isinstance(schema, dict):
- return False
- schema_type = schema.get("type")
- if schema_type == "array":
- return True
- if isinstance(schema_type, list) and "array" in schema_type:
- return True
- return "items" in schema
- def _allows_schema_type(self, schema: dict[str, Any], schema_type: str) -> bool:
- declared_type = schema.get("type")
- if isinstance(declared_type, str):
- return declared_type == schema_type
- if isinstance(declared_type, list):
- return schema_type in declared_type
- if schema_type == "object":
- return self.is_object_schema(schema)
- # This helper is only used for object/array checks in _can_salvage_list_as_object.
- return self.is_array_schema(schema)
- def _can_salvage_list_as_object(self, schema: dict[str, Any]) -> bool:
- return self._allows_schema_type(schema, "object") and not self._allows_schema_type(schema, "array")
- def repair_value(self, value: Any, schema: dict[str, Any] | bool | None, path: str) -> JSONReturnType:
- """Apply schema rules to a parsed value, including unions, coercions, and defaults."""
- schema = self.resolve_schema(schema)
- if schema is True:
- return normalize_missing_values(value)
- if schema is False:
- raise ValueError("Schema does not allow any values.")
- if not schema:
- return normalize_missing_values(value)
- if value is MISSING_VALUE:
- return self._fill_missing(schema, path)
- if "allOf" in schema:
- subschemas = schema["allOf"]
- if not subschemas:
- return normalize_missing_values(value)
- repaired = self.repair_value(value, subschemas[0], path)
- for subschema in subschemas[1:]:
- repaired = self.repair_value(repaired, subschema, path)
- return repaired
- if "oneOf" in schema:
- return self._repair_union(value, schema["oneOf"], path)
- if "anyOf" in schema:
- return self._repair_union(value, schema["anyOf"], path)
- expected_type = schema.get("type")
- if expected_type is None:
- if self.is_object_schema(schema):
- expected_type = "object"
- elif self.is_array_schema(schema):
- expected_type = "array"
- if isinstance(expected_type, list):
- return self._repair_type_union(value, expected_type, schema, path)
- if expected_type == "object":
- repaired = self._repair_object(value, schema, path)
- elif expected_type == "array":
- repaired = self._repair_array(value, schema, path)
- elif isinstance(expected_type, str):
- repaired = self._coerce_scalar(value, expected_type, path)
- else:
- repaired = normalize_missing_values(value)
- return self._apply_enum_const(repaired, schema, path)
- def _repair_union(self, value: Any, schemas: list[dict[str, Any] | bool], path: str) -> JSONReturnType:
- last_error: Exception | None = None
- for subschema in schemas:
- try:
- candidate = self.repair_value(copy.deepcopy(value), subschema, path)
- self.validate(candidate, subschema)
- except ValueError as exc:
- last_error = exc
- else:
- return candidate
- if last_error:
- raise ValueError(str(last_error)) from last_error
- raise ValueError("No schema matched the value.")
- def _repair_type_union(
- self,
- value: Any,
- types: list[str],
- schema: dict[str, Any],
- path: str,
- ) -> JSONReturnType:
- last_error: Exception | None = None
- for schema_type in types:
- branch_schema = {**schema, "type": schema_type}
- try:
- # Keep structural schema context for repair heuristics, but validate against the narrowed branch type.
- candidate = self._repair_by_type(copy.deepcopy(value), schema_type, schema, path)
- candidate = self._apply_enum_const(candidate, branch_schema, path)
- self.validate(candidate, branch_schema)
- except ValueError as exc:
- last_error = exc
- else:
- return candidate
- if last_error:
- raise ValueError(str(last_error)) from last_error
- raise ValueError("No schema type matched the value.")
- def _repair_by_type(self, value: Any, schema_type: str, schema: dict[str, Any], path: str) -> JSONReturnType:
- if schema_type == "array":
- return self._repair_array(value, schema, path)
- if schema_type == "object":
- return self._repair_object(value, schema, path)
- return self._coerce_scalar(value, schema_type, path)
- def _repair_array(self, value: Any, schema: dict[str, Any], path: str) -> JSONReturnType:
- if isinstance(value, list):
- items: list[JSONReturnType] = value
- else:
- self._log("Wrapped value in array to match schema", path)
- items = [normalize_missing_values(value)]
- salvage_mode = self.schema_repair_mode == "salvage"
- def repair_or_drop(raw_item: Any, item_schema: Any, item_path: str) -> tuple[bool, JSONReturnType]:
- try:
- return True, self.repair_value(raw_item, item_schema, item_path)
- except SchemaDefinitionError:
- raise
- except ValueError:
- if not salvage_mode:
- raise
- self._log("Dropped invalid array item while salvaging", item_path)
- return False, None
- items_schema = schema.get("items")
- if items_schema is not None:
- if isinstance(items_schema, list):
- repaired_items: list[JSONReturnType] = []
- for idx, item_schema in enumerate(items_schema):
- if idx >= len(items):
- break
- item_path = f"{path}[{idx}]"
- keep_item, repaired_value = repair_or_drop(items[idx], item_schema, item_path)
- if keep_item:
- repaired_items.append(repaired_value)
- additional_items = schema.get("additionalItems")
- if len(items) > len(items_schema):
- tail = items[len(items_schema) :]
- if isinstance(additional_items, dict):
- for offset, item in enumerate(tail, start=len(items_schema)):
- item_path = f"{path}[{offset}]"
- keep_item, repaired_value = repair_or_drop(item, additional_items, item_path)
- if keep_item:
- repaired_items.append(repaired_value)
- elif additional_items is True or additional_items is None:
- repaired_items.extend(normalize_missing_values(item) for item in tail)
- else:
- for offset, _item in enumerate(tail, start=len(items_schema)):
- self._log("Dropped extra array item not covered by schema", f"{path}[{offset}]")
- items = repaired_items
- else:
- repaired_items = []
- for idx, item in enumerate(items):
- item_path = f"{path}[{idx}]"
- keep_item, repaired_value = repair_or_drop(item, items_schema, item_path)
- if keep_item:
- repaired_items.append(repaired_value)
- items = repaired_items
- min_items = schema.get("minItems")
- if min_items is not None and len(items) < min_items:
- raise ValueError(f"Array at {path} does not meet minItems.")
- return items
- def _repair_object(self, value: Any, schema: dict[str, Any], path: str) -> JSONReturnType:
- if (
- self.schema_repair_mode == "salvage"
- and isinstance(value, list)
- and self._can_salvage_list_as_object(schema)
- ):
- mapped = self._map_list_to_object(value, schema, path)
- if mapped is not None:
- value = mapped
- elif path == "$" and len(value) == 1 and isinstance(value[0], dict):
- # Conservatively unwrap the common root wrapper shape: [{...}] -> {...}.
- value = value[0]
- self._log("Unwrapped single-item root array to object while salvaging", path)
- if not isinstance(value, dict):
- raise ValueError(f"Expected object at {path}, got {type(value).__name__}.")
- properties = schema.get("properties", {})
- if not isinstance(properties, dict):
- properties = {}
- required = set(schema.get("required", []))
- pattern_properties = schema.get("patternProperties", {})
- if not isinstance(pattern_properties, dict):
- pattern_properties = {}
- additional_properties = schema.get("additionalProperties")
- if self.schema_repair_mode == "salvage" and required:
- value_with_salvage_fills = dict(value)
- for key in required:
- if key in value_with_salvage_fills:
- continue
- prop_schema = properties.get(key)
- if prop_schema is None:
- continue
- key_path = f"{path}.{key}"
- filled, filled_value = self._fill_missing_required_for_salvage(prop_schema, key_path)
- if filled:
- value_with_salvage_fills[key] = filled_value
- self._log("Filled missing required property while salvaging", key_path)
- value = value_with_salvage_fills
- missing_required = [key for key in required if key not in value]
- if missing_required:
- raise ValueError(f"Missing required properties at {path}: {', '.join(missing_required)}")
- repaired: dict[str, JSONReturnType] = {}
- for key, prop_schema in properties.items():
- key_path = f"{path}.{key}"
- if key in value:
- repaired[key] = self.repair_value(value[key], prop_schema, key_path)
- elif isinstance(prop_schema, dict) and "default" in prop_schema and key not in required:
- repaired[key] = self._copy_json_value(prop_schema["default"], key_path, "default")
- self._log("Inserted default value for missing property", key_path)
- for key, raw_value in value.items():
- if key in properties:
- continue
- key_path = f"{path}.{key}"
- matched: list[Any] = []
- unsupported_patterns: list[str] = []
- if pattern_properties:
- matched, unsupported_patterns = match_pattern_properties(pattern_properties, key)
- for pattern in unsupported_patterns:
- self._log(f"Skipped unsupported patternProperties regex '{pattern}'", key_path)
- if matched:
- repaired_value = self.repair_value(raw_value, matched[0], key_path)
- for prop_schema in matched[1:]:
- repaired_value = self.repair_value(repaired_value, prop_schema, key_path)
- repaired[key] = repaired_value
- continue
- if isinstance(additional_properties, dict):
- repaired[key] = self.repair_value(raw_value, additional_properties, key_path)
- continue
- if additional_properties is True or additional_properties is None:
- repaired[key] = normalize_missing_values(raw_value)
- continue
- self._log("Dropped extra property not covered by schema", key_path)
- min_properties = schema.get("minProperties")
- if min_properties is not None and len(repaired) < min_properties:
- raise ValueError(f"Object at {path} does not meet minProperties.")
- return repaired
- def _map_list_to_object(
- self, value: list[Any], schema: dict[str, Any], path: str
- ) -> dict[str, JSONReturnType] | None:
- properties = schema.get("properties")
- if not isinstance(properties, dict) or not properties:
- return None
- typed_properties: dict[str, Any] = {}
- for key, prop_schema in properties.items():
- if not isinstance(key, str):
- raise SchemaDefinitionError("Schema object property names must be strings.")
- typed_properties[key] = prop_schema
- keys = list(typed_properties.keys())
- if len(value) != len(keys):
- return None
- mapped: dict[str, JSONReturnType] = {}
- for idx, key in enumerate(keys):
- key_path = f"{path}.{key}"
- try:
- mapped[key] = self.repair_value(value[idx], typed_properties[key], key_path)
- except SchemaDefinitionError:
- raise
- except ValueError:
- return None
- self._log("Mapped array to object by schema property order", path)
- return mapped
- def _fill_missing_required_for_salvage(self, schema: object, path: str) -> tuple[bool, JSONReturnType]:
- resolved_schema = self.resolve_schema(schema)
- if not isinstance(resolved_schema, dict):
- return False, ""
- if "default" in resolved_schema:
- return True, self._copy_json_value(resolved_schema["default"], path, "default")
- if "const" in resolved_schema:
- return True, self._copy_json_value(resolved_schema["const"], path, "const")
- enum_values = resolved_schema.get("enum")
- if enum_values:
- return True, self._copy_json_value(enum_values[0], path, "enum")
- expected_type = resolved_schema.get("type")
- if expected_type is None:
- if self.is_array_schema(resolved_schema):
- expected_type = "array"
- elif self.is_object_schema(resolved_schema):
- expected_type = "object"
- if expected_type == "array" and not resolved_schema.get("minItems"):
- return True, []
- if expected_type == "object" and not resolved_schema.get("minProperties"):
- return True, {}
- return False, ""
- def _fill_missing(self, schema: dict[str, Any], path: str) -> JSONReturnType:
- if "const" in schema:
- # Const/enum/default have priority over type inference.
- self._log("Filled missing value with const", path)
- return self._copy_json_value(schema["const"], path, "const")
- if "enum" in schema:
- enum_values = schema["enum"]
- if not enum_values:
- raise ValueError(f"Enum at {path} has no values.")
- self._log("Filled missing value with first enum value", path)
- return self._copy_json_value(enum_values[0], path, "enum")
- if "default" in schema:
- self._log("Filled missing value with default", path)
- return self._copy_json_value(schema["default"], path, "default")
- expected_type = schema.get("type")
- if isinstance(expected_type, list):
- for schema_type in expected_type:
- try:
- return self._fill_missing({**schema, "type": schema_type}, path)
- except ValueError:
- continue
- raise ValueError(f"Cannot infer missing value at {path}.")
- if expected_type is None:
- # Infer container types based on schema shape if type is omitted.
- if self.is_object_schema(schema):
- expected_type = "object"
- elif self.is_array_schema(schema):
- expected_type = "array"
- if expected_type == "string":
- self._log("Filled missing value with empty string", path)
- return ""
- if expected_type in ("integer", "number"):
- self._log("Filled missing value with 0", path)
- return 0
- if expected_type == "boolean":
- self._log("Filled missing value with false", path)
- return False
- if expected_type == "array":
- min_items = schema.get("minItems")
- if min_items:
- raise ValueError(f"Array at {path} requires at least {min_items} items.")
- self._log("Filled missing value with empty array", path)
- return []
- if expected_type == "object":
- min_properties = schema.get("minProperties")
- if min_properties:
- raise ValueError(f"Object at {path} requires at least {min_properties} properties.")
- self._log("Filled missing value with empty object", path)
- return {}
- if expected_type == "null":
- self._log("Filled missing value with null", path)
- return None
- raise ValueError(f"Cannot infer missing value at {path}.")
- def _coerce_scalar(self, value: Any, schema_type: str, path: str) -> JSONReturnType:
- if schema_type == "string":
- if isinstance(value, str):
- return value
- if isinstance(value, (int, float)) and not isinstance(value, bool):
- self._log("Coerced number to string", path)
- return str(value)
- raise ValueError(f"Expected string at {path}.")
- if schema_type == "integer":
- if isinstance(value, bool):
- raise ValueError(f"Expected integer at {path}.")
- if isinstance(value, int):
- return value
- if isinstance(value, float):
- if value.is_integer():
- self._log("Coerced number to integer", path)
- return int(value)
- raise ValueError(f"Expected integer at {path}.")
- if isinstance(value, str):
- try:
- int_value = int(value)
- except ValueError:
- int_value = None
- if int_value is not None:
- self._log("Coerced string to integer", path)
- return int_value
- try:
- num = float(value)
- except ValueError as exc:
- raise ValueError(f"Expected integer at {path}.") from exc
- if not num.is_integer():
- raise ValueError(f"Expected integer at {path}.")
- self._log("Coerced number to integer", path)
- return int(num)
- raise ValueError(f"Expected integer at {path}.")
- if schema_type == "number":
- if isinstance(value, bool):
- raise ValueError(f"Expected number at {path}.")
- if isinstance(value, (int, float)):
- return value
- if isinstance(value, str):
- try:
- float_value = float(value)
- except ValueError as exc:
- raise ValueError(f"Expected number at {path}.") from exc
- self._log("Coerced string to number", path)
- return float_value
- raise ValueError(f"Expected number at {path}.")
- if schema_type == "boolean":
- if isinstance(value, bool):
- return value
- if isinstance(value, str):
- lowered = value.lower()
- if lowered in ("true", "yes", "y", "on", "1"):
- self._log("Coerced string to boolean", path)
- return True
- if lowered in ("false", "no", "n", "off", "0"):
- self._log("Coerced string to boolean", path)
- return False
- if isinstance(value, (int, float)) and not isinstance(value, bool) and value in (0, 1):
- self._log("Coerced number to boolean", path)
- return bool(value)
- raise ValueError(f"Expected boolean at {path}.")
- if schema_type == "null":
- if value is None:
- return None
- raise ValueError(f"Expected null at {path}.")
- raise SchemaDefinitionError(f"Unsupported schema type {schema_type} at {path}.")
- def _apply_enum_const(self, value: JSONReturnType, schema: dict[str, Any], path: str) -> JSONReturnType:
- if "const" in schema and value != schema["const"]:
- raise ValueError(f"Value at {path} does not match const.")
- if "enum" in schema and value not in schema["enum"]:
- raise ValueError(f"Value at {path} does not match enum.")
- return value
- def _resolve_ref(self, ref: str) -> dict[str, Any] | bool:
- if not ref.startswith("#/"):
- raise SchemaDefinitionError(f"Unsupported $ref: {ref}")
- parts = ref.lstrip("#/").split("/")
- current: Any = self.root_schema
- for part in parts:
- resolved_part = part.replace("~1", "/").replace("~0", "~")
- if not isinstance(current, dict) or resolved_part not in current:
- raise SchemaDefinitionError(f"Unresolvable $ref: {ref}")
- current = current[resolved_part]
- if isinstance(current, dict):
- return current
- if current is True:
- return True
- if current is False:
- return False
- raise SchemaDefinitionError(f"Unresolvable $ref: {ref}")
- def _copy_json_value(self, value: Any, path: str, label: str) -> JSONReturnType:
- if value is None or isinstance(value, (str, int, float, bool)):
- return value
- if isinstance(value, list):
- return [self._copy_json_value(item, f"{path}[{idx}]", label) for idx, item in enumerate(value)]
- if isinstance(value, dict):
- copied: dict[str, JSONReturnType] = {}
- for key, item in value.items():
- if not isinstance(key, str):
- raise ValueError(f"{label.capitalize()} value at {path} contains a non-string key.")
- copied[key] = self._copy_json_value(item, f"{path}.{key}", label)
- return copied
- raise ValueError(f"{label.capitalize()} value at {path} is not JSON compatible.")
- def _prepare_schema_for_validation(self, schema: object) -> dict[str, Any]:
- normalized = _prepare_schema_for_validation_node(schema)
- if not isinstance(normalized, dict):
- raise ValueError("Schema must be an object.")
- return normalized
|