schema_repair.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. from __future__ import annotations
  2. import copy
  3. import importlib
  4. from typing import Any, Literal, cast
  5. from .utils.constants import MISSING_VALUE, JSONReturnType, MissingValueType
  6. from .utils.pattern_properties import match_pattern_properties
  7. SchemaRepairMode = Literal["standard", "salvage"]
  8. SUPPORTED_SCHEMA_REPAIR_MODES: tuple[SchemaRepairMode, ...] = ("standard", "salvage")
  9. class SchemaDefinitionError(ValueError):
  10. """Raised when schema metadata is invalid or unsupported."""
  11. def normalize_schema_repair_mode(mode: str | None) -> SchemaRepairMode:
  12. if mode is None:
  13. return "standard"
  14. if mode in SUPPORTED_SCHEMA_REPAIR_MODES:
  15. return cast("SchemaRepairMode", mode)
  16. expected = ", ".join(SUPPORTED_SCHEMA_REPAIR_MODES)
  17. raise ValueError(f"schema_repair_mode must be one of: {expected}.")
  18. def _require_jsonschema() -> Any:
  19. try:
  20. return importlib.import_module("jsonschema")
  21. except ImportError as exc: # pragma: no cover - optional dependency
  22. raise ValueError("jsonschema is required when using schema-aware repair.") from exc
  23. def _require_pydantic() -> Any:
  24. try:
  25. return importlib.import_module("pydantic")
  26. except ImportError as exc: # pragma: no cover - optional dependency
  27. raise ValueError("pydantic is required when using schema models.") from exc
  28. def _prepare_schema_for_validation_node(node: Any) -> Any:
  29. if isinstance(node, dict):
  30. normalized = {key: _prepare_schema_for_validation_node(value) for key, value in node.items()}
  31. items = normalized.get("items")
  32. if isinstance(items, list):
  33. normalized.pop("items", None)
  34. normalized["prefixItems"] = items
  35. additional_items = normalized.pop("additionalItems", None)
  36. if additional_items is False:
  37. normalized["items"] = False
  38. elif isinstance(additional_items, dict):
  39. normalized["items"] = additional_items
  40. return normalized
  41. if isinstance(node, list):
  42. return [_prepare_schema_for_validation_node(item) for item in node]
  43. return node
  44. def load_schema_model(path: str) -> type[Any]:
  45. if ":" not in path:
  46. raise ValueError("Schema model must be in the form 'module:ClassName'.")
  47. module_name, class_name = path.split(":", 1)
  48. module = importlib.import_module(module_name)
  49. model: object | None = module.__dict__.get(class_name)
  50. if model is None or not isinstance(model, type):
  51. raise ValueError(f"Schema model '{class_name}' not found in module '{module_name}'.")
  52. return model
  53. def normalize_missing_values(value: object) -> JSONReturnType:
  54. if value is MISSING_VALUE or isinstance(value, MissingValueType):
  55. return ""
  56. if isinstance(value, dict):
  57. normalized: dict[str, JSONReturnType] = {}
  58. for key, item in value.items():
  59. if not isinstance(key, str):
  60. raise ValueError("Object keys must be strings.")
  61. normalized[key] = normalize_missing_values(item)
  62. return normalized
  63. if isinstance(value, list):
  64. return [normalize_missing_values(item) for item in value]
  65. if value is None or isinstance(value, (str, int, float, bool)):
  66. return value
  67. raise ValueError("Value is not JSON compatible.")
  68. def schema_from_input(schema: Any) -> dict[str, Any] | bool:
  69. if isinstance(schema, dict):
  70. return schema
  71. if schema is True or schema is False:
  72. return schema
  73. if hasattr(schema, "model_json_schema"):
  74. pydantic = _require_pydantic()
  75. version = getattr(pydantic, "VERSION", getattr(pydantic, "__version__", "0"))
  76. if int(version.split(".")[0]) < 2:
  77. raise ValueError("pydantic v2 is required for schema models.")
  78. schema_dict: dict[str, Any] = schema.model_json_schema()
  79. if hasattr(schema, "model_fields"):
  80. properties = schema_dict.setdefault("properties", {})
  81. if not isinstance(properties, dict):
  82. properties = {}
  83. schema_dict["properties"] = properties
  84. for name, field in schema.model_fields.items():
  85. if field.is_required():
  86. continue
  87. property_schema = properties.setdefault(name, {})
  88. if not isinstance(property_schema, dict):
  89. property_schema = {}
  90. properties[name] = property_schema
  91. if "default" in property_schema:
  92. continue
  93. if field.default_factory is not None:
  94. property_schema["default"] = field.default_factory()
  95. else:
  96. property_schema["default"] = field.default
  97. return schema_dict
  98. raise ValueError("Schema must be a JSON Schema dict, boolean schema, or pydantic v2 model.")
  99. class SchemaRepairer:
  100. def __init__(
  101. self,
  102. schema: dict[str, Any] | bool,
  103. log: list[dict[str, str]] | None,
  104. schema_repair_mode: str = "standard",
  105. ) -> None:
  106. self.root_schema = schema
  107. self.log = log
  108. self.schema_repair_mode = normalize_schema_repair_mode(schema_repair_mode)
  109. self._validator_cache: dict[int, tuple[dict[str, Any], Any]] = {}
  110. def _log(self, text: str, path: str) -> None:
  111. if self.log is not None:
  112. self.log.append({"text": text, "context": path})
  113. def _get_validator(self, schema: dict[str, Any]) -> Any:
  114. cache_key = id(schema)
  115. cached_validator = self._validator_cache.get(cache_key)
  116. if cached_validator is not None and cached_validator[0] is schema:
  117. return cached_validator[1]
  118. prepared_schema = self._prepare_schema_for_validation(schema)
  119. jsonschema = _require_jsonschema()
  120. validator_cls = jsonschema.validators.validator_for(prepared_schema)
  121. validator = validator_cls(prepared_schema)
  122. self._validator_cache[cache_key] = (schema, validator)
  123. return validator
  124. def is_valid(self, value: JSONReturnType, schema: dict[str, Any] | bool) -> bool:
  125. schema = self.resolve_schema(schema)
  126. if schema is True:
  127. return True
  128. if schema is False:
  129. return False
  130. validator = self._get_validator(schema)
  131. return bool(validator.is_valid(value))
  132. def validate(self, value: JSONReturnType, schema: dict[str, Any] | bool) -> None:
  133. schema = self.resolve_schema(schema)
  134. if schema is True:
  135. return
  136. if schema is False:
  137. raise ValueError("Schema does not allow any values.")
  138. jsonschema = _require_jsonschema()
  139. validator = self._get_validator(schema)
  140. try:
  141. validator.validate(value)
  142. except jsonschema.exceptions.ValidationError as exc:
  143. raise ValueError(exc.message) from exc
  144. def resolve_schema(self, schema: object | None) -> dict[str, Any] | bool:
  145. if schema is None:
  146. return True
  147. if isinstance(schema, bool):
  148. return schema
  149. if not isinstance(schema, dict):
  150. raise SchemaDefinitionError("Schema must be an object.")
  151. schema_dict: dict[str, Any] = {}
  152. for key, value in schema.items():
  153. if not isinstance(key, str):
  154. raise SchemaDefinitionError("Schema keys must be strings.")
  155. schema_dict[key] = value
  156. while "$ref" in schema_dict:
  157. ref = schema_dict["$ref"]
  158. resolved = self._resolve_ref(ref)
  159. if isinstance(resolved, bool):
  160. return resolved
  161. schema_dict = resolved
  162. return schema_dict
  163. def is_object_schema(self, schema: dict[str, Any] | bool | None) -> bool:
  164. schema = self.resolve_schema(schema)
  165. if not isinstance(schema, dict):
  166. return False
  167. schema_type = schema.get("type")
  168. if schema_type == "object":
  169. return True
  170. if isinstance(schema_type, list) and "object" in schema_type:
  171. return True
  172. return any(key in schema for key in ("properties", "patternProperties", "additionalProperties", "required"))
  173. def is_array_schema(self, schema: dict[str, Any] | bool | None) -> bool:
  174. schema = self.resolve_schema(schema)
  175. if not isinstance(schema, dict):
  176. return False
  177. schema_type = schema.get("type")
  178. if schema_type == "array":
  179. return True
  180. if isinstance(schema_type, list) and "array" in schema_type:
  181. return True
  182. return "items" in schema
  183. def _allows_schema_type(self, schema: dict[str, Any], schema_type: str) -> bool:
  184. declared_type = schema.get("type")
  185. if isinstance(declared_type, str):
  186. return declared_type == schema_type
  187. if isinstance(declared_type, list):
  188. return schema_type in declared_type
  189. if schema_type == "object":
  190. return self.is_object_schema(schema)
  191. # This helper is only used for object/array checks in _can_salvage_list_as_object.
  192. return self.is_array_schema(schema)
  193. def _can_salvage_list_as_object(self, schema: dict[str, Any]) -> bool:
  194. return self._allows_schema_type(schema, "object") and not self._allows_schema_type(schema, "array")
  195. def repair_value(self, value: Any, schema: dict[str, Any] | bool | None, path: str) -> JSONReturnType:
  196. """Apply schema rules to a parsed value, including unions, coercions, and defaults."""
  197. schema = self.resolve_schema(schema)
  198. if schema is True:
  199. return normalize_missing_values(value)
  200. if schema is False:
  201. raise ValueError("Schema does not allow any values.")
  202. if not schema:
  203. return normalize_missing_values(value)
  204. if value is MISSING_VALUE:
  205. return self._fill_missing(schema, path)
  206. if "allOf" in schema:
  207. subschemas = schema["allOf"]
  208. if not subschemas:
  209. return normalize_missing_values(value)
  210. repaired = self.repair_value(value, subschemas[0], path)
  211. for subschema in subschemas[1:]:
  212. repaired = self.repair_value(repaired, subschema, path)
  213. return repaired
  214. if "oneOf" in schema:
  215. return self._repair_union(value, schema["oneOf"], path)
  216. if "anyOf" in schema:
  217. return self._repair_union(value, schema["anyOf"], path)
  218. expected_type = schema.get("type")
  219. if expected_type is None:
  220. if self.is_object_schema(schema):
  221. expected_type = "object"
  222. elif self.is_array_schema(schema):
  223. expected_type = "array"
  224. if isinstance(expected_type, list):
  225. return self._repair_type_union(value, expected_type, schema, path)
  226. if expected_type == "object":
  227. repaired = self._repair_object(value, schema, path)
  228. elif expected_type == "array":
  229. repaired = self._repair_array(value, schema, path)
  230. elif isinstance(expected_type, str):
  231. repaired = self._coerce_scalar(value, expected_type, path)
  232. else:
  233. repaired = normalize_missing_values(value)
  234. return self._apply_enum_const(repaired, schema, path)
  235. def _repair_union(self, value: Any, schemas: list[dict[str, Any] | bool], path: str) -> JSONReturnType:
  236. last_error: Exception | None = None
  237. for subschema in schemas:
  238. try:
  239. candidate = self.repair_value(copy.deepcopy(value), subschema, path)
  240. self.validate(candidate, subschema)
  241. except ValueError as exc:
  242. last_error = exc
  243. else:
  244. return candidate
  245. if last_error:
  246. raise ValueError(str(last_error)) from last_error
  247. raise ValueError("No schema matched the value.")
  248. def _repair_type_union(
  249. self,
  250. value: Any,
  251. types: list[str],
  252. schema: dict[str, Any],
  253. path: str,
  254. ) -> JSONReturnType:
  255. last_error: Exception | None = None
  256. for schema_type in types:
  257. branch_schema = {**schema, "type": schema_type}
  258. try:
  259. # Keep structural schema context for repair heuristics, but validate against the narrowed branch type.
  260. candidate = self._repair_by_type(copy.deepcopy(value), schema_type, schema, path)
  261. candidate = self._apply_enum_const(candidate, branch_schema, path)
  262. self.validate(candidate, branch_schema)
  263. except ValueError as exc:
  264. last_error = exc
  265. else:
  266. return candidate
  267. if last_error:
  268. raise ValueError(str(last_error)) from last_error
  269. raise ValueError("No schema type matched the value.")
  270. def _repair_by_type(self, value: Any, schema_type: str, schema: dict[str, Any], path: str) -> JSONReturnType:
  271. if schema_type == "array":
  272. return self._repair_array(value, schema, path)
  273. if schema_type == "object":
  274. return self._repair_object(value, schema, path)
  275. return self._coerce_scalar(value, schema_type, path)
  276. def _repair_array(self, value: Any, schema: dict[str, Any], path: str) -> JSONReturnType:
  277. if isinstance(value, list):
  278. items: list[JSONReturnType] = value
  279. else:
  280. self._log("Wrapped value in array to match schema", path)
  281. items = [normalize_missing_values(value)]
  282. salvage_mode = self.schema_repair_mode == "salvage"
  283. def repair_or_drop(raw_item: Any, item_schema: Any, item_path: str) -> tuple[bool, JSONReturnType]:
  284. try:
  285. return True, self.repair_value(raw_item, item_schema, item_path)
  286. except SchemaDefinitionError:
  287. raise
  288. except ValueError:
  289. if not salvage_mode:
  290. raise
  291. self._log("Dropped invalid array item while salvaging", item_path)
  292. return False, None
  293. items_schema = schema.get("items")
  294. if items_schema is not None:
  295. if isinstance(items_schema, list):
  296. repaired_items: list[JSONReturnType] = []
  297. for idx, item_schema in enumerate(items_schema):
  298. if idx >= len(items):
  299. break
  300. item_path = f"{path}[{idx}]"
  301. keep_item, repaired_value = repair_or_drop(items[idx], item_schema, item_path)
  302. if keep_item:
  303. repaired_items.append(repaired_value)
  304. additional_items = schema.get("additionalItems")
  305. if len(items) > len(items_schema):
  306. tail = items[len(items_schema) :]
  307. if isinstance(additional_items, dict):
  308. for offset, item in enumerate(tail, start=len(items_schema)):
  309. item_path = f"{path}[{offset}]"
  310. keep_item, repaired_value = repair_or_drop(item, additional_items, item_path)
  311. if keep_item:
  312. repaired_items.append(repaired_value)
  313. elif additional_items is True or additional_items is None:
  314. repaired_items.extend(normalize_missing_values(item) for item in tail)
  315. else:
  316. for offset, _item in enumerate(tail, start=len(items_schema)):
  317. self._log("Dropped extra array item not covered by schema", f"{path}[{offset}]")
  318. items = repaired_items
  319. else:
  320. repaired_items = []
  321. for idx, item in enumerate(items):
  322. item_path = f"{path}[{idx}]"
  323. keep_item, repaired_value = repair_or_drop(item, items_schema, item_path)
  324. if keep_item:
  325. repaired_items.append(repaired_value)
  326. items = repaired_items
  327. min_items = schema.get("minItems")
  328. if min_items is not None and len(items) < min_items:
  329. raise ValueError(f"Array at {path} does not meet minItems.")
  330. return items
  331. def _repair_object(self, value: Any, schema: dict[str, Any], path: str) -> JSONReturnType:
  332. if (
  333. self.schema_repair_mode == "salvage"
  334. and isinstance(value, list)
  335. and self._can_salvage_list_as_object(schema)
  336. ):
  337. mapped = self._map_list_to_object(value, schema, path)
  338. if mapped is not None:
  339. value = mapped
  340. elif path == "$" and len(value) == 1 and isinstance(value[0], dict):
  341. # Conservatively unwrap the common root wrapper shape: [{...}] -> {...}.
  342. value = value[0]
  343. self._log("Unwrapped single-item root array to object while salvaging", path)
  344. if not isinstance(value, dict):
  345. raise ValueError(f"Expected object at {path}, got {type(value).__name__}.")
  346. properties = schema.get("properties", {})
  347. if not isinstance(properties, dict):
  348. properties = {}
  349. required = set(schema.get("required", []))
  350. pattern_properties = schema.get("patternProperties", {})
  351. if not isinstance(pattern_properties, dict):
  352. pattern_properties = {}
  353. additional_properties = schema.get("additionalProperties")
  354. if self.schema_repair_mode == "salvage" and required:
  355. value_with_salvage_fills = dict(value)
  356. for key in required:
  357. if key in value_with_salvage_fills:
  358. continue
  359. prop_schema = properties.get(key)
  360. if prop_schema is None:
  361. continue
  362. key_path = f"{path}.{key}"
  363. filled, filled_value = self._fill_missing_required_for_salvage(prop_schema, key_path)
  364. if filled:
  365. value_with_salvage_fills[key] = filled_value
  366. self._log("Filled missing required property while salvaging", key_path)
  367. value = value_with_salvage_fills
  368. missing_required = [key for key in required if key not in value]
  369. if missing_required:
  370. raise ValueError(f"Missing required properties at {path}: {', '.join(missing_required)}")
  371. repaired: dict[str, JSONReturnType] = {}
  372. for key, prop_schema in properties.items():
  373. key_path = f"{path}.{key}"
  374. if key in value:
  375. repaired[key] = self.repair_value(value[key], prop_schema, key_path)
  376. elif isinstance(prop_schema, dict) and "default" in prop_schema and key not in required:
  377. repaired[key] = self._copy_json_value(prop_schema["default"], key_path, "default")
  378. self._log("Inserted default value for missing property", key_path)
  379. for key, raw_value in value.items():
  380. if key in properties:
  381. continue
  382. key_path = f"{path}.{key}"
  383. matched: list[Any] = []
  384. unsupported_patterns: list[str] = []
  385. if pattern_properties:
  386. matched, unsupported_patterns = match_pattern_properties(pattern_properties, key)
  387. for pattern in unsupported_patterns:
  388. self._log(f"Skipped unsupported patternProperties regex '{pattern}'", key_path)
  389. if matched:
  390. repaired_value = self.repair_value(raw_value, matched[0], key_path)
  391. for prop_schema in matched[1:]:
  392. repaired_value = self.repair_value(repaired_value, prop_schema, key_path)
  393. repaired[key] = repaired_value
  394. continue
  395. if isinstance(additional_properties, dict):
  396. repaired[key] = self.repair_value(raw_value, additional_properties, key_path)
  397. continue
  398. if additional_properties is True or additional_properties is None:
  399. repaired[key] = normalize_missing_values(raw_value)
  400. continue
  401. self._log("Dropped extra property not covered by schema", key_path)
  402. min_properties = schema.get("minProperties")
  403. if min_properties is not None and len(repaired) < min_properties:
  404. raise ValueError(f"Object at {path} does not meet minProperties.")
  405. return repaired
  406. def _map_list_to_object(
  407. self, value: list[Any], schema: dict[str, Any], path: str
  408. ) -> dict[str, JSONReturnType] | None:
  409. properties = schema.get("properties")
  410. if not isinstance(properties, dict) or not properties:
  411. return None
  412. typed_properties: dict[str, Any] = {}
  413. for key, prop_schema in properties.items():
  414. if not isinstance(key, str):
  415. raise SchemaDefinitionError("Schema object property names must be strings.")
  416. typed_properties[key] = prop_schema
  417. keys = list(typed_properties.keys())
  418. if len(value) != len(keys):
  419. return None
  420. mapped: dict[str, JSONReturnType] = {}
  421. for idx, key in enumerate(keys):
  422. key_path = f"{path}.{key}"
  423. try:
  424. mapped[key] = self.repair_value(value[idx], typed_properties[key], key_path)
  425. except SchemaDefinitionError:
  426. raise
  427. except ValueError:
  428. return None
  429. self._log("Mapped array to object by schema property order", path)
  430. return mapped
  431. def _fill_missing_required_for_salvage(self, schema: object, path: str) -> tuple[bool, JSONReturnType]:
  432. resolved_schema = self.resolve_schema(schema)
  433. if not isinstance(resolved_schema, dict):
  434. return False, ""
  435. if "default" in resolved_schema:
  436. return True, self._copy_json_value(resolved_schema["default"], path, "default")
  437. if "const" in resolved_schema:
  438. return True, self._copy_json_value(resolved_schema["const"], path, "const")
  439. enum_values = resolved_schema.get("enum")
  440. if enum_values:
  441. return True, self._copy_json_value(enum_values[0], path, "enum")
  442. expected_type = resolved_schema.get("type")
  443. if expected_type is None:
  444. if self.is_array_schema(resolved_schema):
  445. expected_type = "array"
  446. elif self.is_object_schema(resolved_schema):
  447. expected_type = "object"
  448. if expected_type == "array" and not resolved_schema.get("minItems"):
  449. return True, []
  450. if expected_type == "object" and not resolved_schema.get("minProperties"):
  451. return True, {}
  452. return False, ""
  453. def _fill_missing(self, schema: dict[str, Any], path: str) -> JSONReturnType:
  454. if "const" in schema:
  455. # Const/enum/default have priority over type inference.
  456. self._log("Filled missing value with const", path)
  457. return self._copy_json_value(schema["const"], path, "const")
  458. if "enum" in schema:
  459. enum_values = schema["enum"]
  460. if not enum_values:
  461. raise ValueError(f"Enum at {path} has no values.")
  462. self._log("Filled missing value with first enum value", path)
  463. return self._copy_json_value(enum_values[0], path, "enum")
  464. if "default" in schema:
  465. self._log("Filled missing value with default", path)
  466. return self._copy_json_value(schema["default"], path, "default")
  467. expected_type = schema.get("type")
  468. if isinstance(expected_type, list):
  469. for schema_type in expected_type:
  470. try:
  471. return self._fill_missing({**schema, "type": schema_type}, path)
  472. except ValueError:
  473. continue
  474. raise ValueError(f"Cannot infer missing value at {path}.")
  475. if expected_type is None:
  476. # Infer container types based on schema shape if type is omitted.
  477. if self.is_object_schema(schema):
  478. expected_type = "object"
  479. elif self.is_array_schema(schema):
  480. expected_type = "array"
  481. if expected_type == "string":
  482. self._log("Filled missing value with empty string", path)
  483. return ""
  484. if expected_type in ("integer", "number"):
  485. self._log("Filled missing value with 0", path)
  486. return 0
  487. if expected_type == "boolean":
  488. self._log("Filled missing value with false", path)
  489. return False
  490. if expected_type == "array":
  491. min_items = schema.get("minItems")
  492. if min_items:
  493. raise ValueError(f"Array at {path} requires at least {min_items} items.")
  494. self._log("Filled missing value with empty array", path)
  495. return []
  496. if expected_type == "object":
  497. min_properties = schema.get("minProperties")
  498. if min_properties:
  499. raise ValueError(f"Object at {path} requires at least {min_properties} properties.")
  500. self._log("Filled missing value with empty object", path)
  501. return {}
  502. if expected_type == "null":
  503. self._log("Filled missing value with null", path)
  504. return None
  505. raise ValueError(f"Cannot infer missing value at {path}.")
  506. def _coerce_scalar(self, value: Any, schema_type: str, path: str) -> JSONReturnType:
  507. if schema_type == "string":
  508. if isinstance(value, str):
  509. return value
  510. if isinstance(value, (int, float)) and not isinstance(value, bool):
  511. self._log("Coerced number to string", path)
  512. return str(value)
  513. raise ValueError(f"Expected string at {path}.")
  514. if schema_type == "integer":
  515. if isinstance(value, bool):
  516. raise ValueError(f"Expected integer at {path}.")
  517. if isinstance(value, int):
  518. return value
  519. if isinstance(value, float):
  520. if value.is_integer():
  521. self._log("Coerced number to integer", path)
  522. return int(value)
  523. raise ValueError(f"Expected integer at {path}.")
  524. if isinstance(value, str):
  525. try:
  526. int_value = int(value)
  527. except ValueError:
  528. int_value = None
  529. if int_value is not None:
  530. self._log("Coerced string to integer", path)
  531. return int_value
  532. try:
  533. num = float(value)
  534. except ValueError as exc:
  535. raise ValueError(f"Expected integer at {path}.") from exc
  536. if not num.is_integer():
  537. raise ValueError(f"Expected integer at {path}.")
  538. self._log("Coerced number to integer", path)
  539. return int(num)
  540. raise ValueError(f"Expected integer at {path}.")
  541. if schema_type == "number":
  542. if isinstance(value, bool):
  543. raise ValueError(f"Expected number at {path}.")
  544. if isinstance(value, (int, float)):
  545. return value
  546. if isinstance(value, str):
  547. try:
  548. float_value = float(value)
  549. except ValueError as exc:
  550. raise ValueError(f"Expected number at {path}.") from exc
  551. self._log("Coerced string to number", path)
  552. return float_value
  553. raise ValueError(f"Expected number at {path}.")
  554. if schema_type == "boolean":
  555. if isinstance(value, bool):
  556. return value
  557. if isinstance(value, str):
  558. lowered = value.lower()
  559. if lowered in ("true", "yes", "y", "on", "1"):
  560. self._log("Coerced string to boolean", path)
  561. return True
  562. if lowered in ("false", "no", "n", "off", "0"):
  563. self._log("Coerced string to boolean", path)
  564. return False
  565. if isinstance(value, (int, float)) and not isinstance(value, bool) and value in (0, 1):
  566. self._log("Coerced number to boolean", path)
  567. return bool(value)
  568. raise ValueError(f"Expected boolean at {path}.")
  569. if schema_type == "null":
  570. if value is None:
  571. return None
  572. raise ValueError(f"Expected null at {path}.")
  573. raise SchemaDefinitionError(f"Unsupported schema type {schema_type} at {path}.")
  574. def _apply_enum_const(self, value: JSONReturnType, schema: dict[str, Any], path: str) -> JSONReturnType:
  575. if "const" in schema and value != schema["const"]:
  576. raise ValueError(f"Value at {path} does not match const.")
  577. if "enum" in schema and value not in schema["enum"]:
  578. raise ValueError(f"Value at {path} does not match enum.")
  579. return value
  580. def _resolve_ref(self, ref: str) -> dict[str, Any] | bool:
  581. if not ref.startswith("#/"):
  582. raise SchemaDefinitionError(f"Unsupported $ref: {ref}")
  583. parts = ref.lstrip("#/").split("/")
  584. current: Any = self.root_schema
  585. for part in parts:
  586. resolved_part = part.replace("~1", "/").replace("~0", "~")
  587. if not isinstance(current, dict) or resolved_part not in current:
  588. raise SchemaDefinitionError(f"Unresolvable $ref: {ref}")
  589. current = current[resolved_part]
  590. if isinstance(current, dict):
  591. return current
  592. if current is True:
  593. return True
  594. if current is False:
  595. return False
  596. raise SchemaDefinitionError(f"Unresolvable $ref: {ref}")
  597. def _copy_json_value(self, value: Any, path: str, label: str) -> JSONReturnType:
  598. if value is None or isinstance(value, (str, int, float, bool)):
  599. return value
  600. if isinstance(value, list):
  601. return [self._copy_json_value(item, f"{path}[{idx}]", label) for idx, item in enumerate(value)]
  602. if isinstance(value, dict):
  603. copied: dict[str, JSONReturnType] = {}
  604. for key, item in value.items():
  605. if not isinstance(key, str):
  606. raise ValueError(f"{label.capitalize()} value at {path} contains a non-string key.")
  607. copied[key] = self._copy_json_value(item, f"{path}.{key}", label)
  608. return copied
  609. raise ValueError(f"{label.capitalize()} value at {path} is not JSON compatible.")
  610. def _prepare_schema_for_validation(self, schema: object) -> dict[str, Any]:
  611. normalized = _prepare_schema_for_validation_node(schema)
  612. if not isinstance(normalized, dict):
  613. raise ValueError("Schema must be an object.")
  614. return normalized