json_parser.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. from collections.abc import Callable
  2. from typing import TYPE_CHECKING, Any, TextIO
  3. from .parse_array import parse_array as _parse_array
  4. from .parse_comment import parse_comment as _parse_comment
  5. from .parse_number import parse_number as _parse_number
  6. from .parse_object import parse_object as _parse_object
  7. from .parse_string import parse_string as _parse_string
  8. from .utils.constants import STRING_DELIMITERS, JSONReturnType
  9. from .utils.json_context import JsonContext
  10. from .utils.object_comparer import ObjectComparer
  11. from .utils.string_file_wrapper import StringFileWrapper
  12. if TYPE_CHECKING:
  13. from .schema_repair import SchemaRepairer
  14. class JSONParser:
  15. # Split the parse methods into separate files because this one was like 3000 lines
  16. def parse_array(
  17. self,
  18. schema: dict[str, Any] | bool | None = None,
  19. path: str = "$",
  20. ) -> list[JSONReturnType]:
  21. return _parse_array(self, schema, path)
  22. def parse_comment(self) -> JSONReturnType:
  23. return _parse_comment(self)
  24. def parse_number(self) -> JSONReturnType:
  25. return _parse_number(self)
  26. def parse_object(
  27. self,
  28. schema: dict[str, Any] | bool | None = None,
  29. path: str = "$",
  30. ) -> JSONReturnType:
  31. return _parse_object(self, schema, path)
  32. def parse_string(self) -> JSONReturnType:
  33. return _parse_string(self)
  34. def __init__(
  35. self,
  36. json_str: str | StringFileWrapper,
  37. json_fd: TextIO | None,
  38. logging: bool | None,
  39. json_fd_chunk_length: int = 0,
  40. stream_stable: bool = False,
  41. strict: bool = False,
  42. ) -> None:
  43. # The string to parse
  44. self.json_str: str | StringFileWrapper = json_str
  45. # Alternatively, the file description with a json file in it
  46. if json_fd:
  47. # This is a trick we do to treat the file wrapper as an array
  48. self.json_str = StringFileWrapper(json_fd, json_fd_chunk_length)
  49. # Index is our iterator that will keep track of which character we are looking at right now
  50. self.index: int = 0
  51. # This is used in the object member parsing to manage the special cases of missing quotes in key or value
  52. self.context = JsonContext()
  53. # Use this to log the activity, but only if logging is active
  54. # This is a trick but a beautiful one. We call self.log in the code over and over even if it's not needed.
  55. # We could add a guard in the code for each call but that would make this code unreadable, so here's this neat trick
  56. # Replace self.log with a noop
  57. self.logging = logging
  58. self.logger: list[dict[str, str]] = []
  59. if logging:
  60. self.log = self._log
  61. else:
  62. # No-op
  63. self.log = lambda *args, **kwargs: None # noqa: ARG005
  64. # When the json to be repaired is the accumulation of streaming json at a certain moment.
  65. # e.g. json obtained from llm response.
  66. # If this parameter to True will keep the repair results stable. For example:
  67. # case 1: '{"key": "val\\' => '{"key": "val"}'
  68. # case 2: '{"key": "val\\n' => '{"key": "val\\n"}'
  69. # case 3: '{"key": "val\\n123,`key2:value2' => '{"key": "val\\n123,`key2:value2"}'
  70. # case 4: '{"key": "val\\n123,`key2:value2`"}' => '{"key": "val\\n123,`key2:value2`"}'
  71. self.stream_stable = stream_stable
  72. # Over time the library got more and more complex heuristics to repair JSON. Some of these heuristics
  73. # may not be desirable in some use cases and the user would prefer json_repair to return an exception.
  74. # So strict mode was added to disable some of those heuristics.
  75. self.strict = strict
  76. self.schema_repairer: SchemaRepairer | None = None
  77. def parse(
  78. self,
  79. ) -> JSONReturnType:
  80. return self._parse_top_level(self.parse_json)
  81. def parse_with_schema(
  82. self,
  83. repairer: "SchemaRepairer",
  84. schema: dict[str, Any] | bool,
  85. ) -> JSONReturnType:
  86. """Parse with schema guidance enabled for all nested values."""
  87. self.schema_repairer = repairer
  88. return self._parse_top_level(lambda: self.parse_json(schema, "$"))
  89. # Consolidate top-level parsing so we handle multiple sequential JSON values consistently
  90. # (including update semantics and strict-mode validation).
  91. def _parse_top_level(self, parse_element: Callable[[], JSONReturnType]) -> JSONReturnType:
  92. json = parse_element()
  93. if self.index < len(self.json_str):
  94. self.log(
  95. "The parser returned early, checking if there's more json elements",
  96. )
  97. json = [json]
  98. while self.index < len(self.json_str):
  99. self.context.reset()
  100. j = parse_element()
  101. if j:
  102. if ObjectComparer.is_same_object(json[-1], j):
  103. # Treat repeated objects as updates: keep the newest value.
  104. json.pop()
  105. else:
  106. if not json[-1]:
  107. json.pop()
  108. json.append(j)
  109. else:
  110. self.index += 1
  111. if len(json) == 1:
  112. self.log(
  113. "There were no more elements, returning the element without the array",
  114. )
  115. json = json[0]
  116. elif self.strict:
  117. self.log(
  118. "Multiple top-level JSON elements found in strict mode, raising an error",
  119. )
  120. raise ValueError("Multiple top-level JSON elements found in strict mode.")
  121. return json
  122. def parse_json(
  123. self,
  124. schema: dict[str, Any] | bool | None = None,
  125. path: str = "$",
  126. ) -> JSONReturnType:
  127. """Parse the next JSON value and, when configured, enforce schema constraints."""
  128. repairer = self.schema_repairer if self.schema_repairer is not None and schema not in (None, True) else None
  129. if repairer is not None:
  130. # Resolve references once and decide whether schema-guided repairs are needed.
  131. schema = repairer.resolve_schema(schema)
  132. if schema is True:
  133. repairer = None
  134. elif schema is False:
  135. raise ValueError("Schema does not allow any values.")
  136. while True:
  137. char = self.get_char_at()
  138. # None means that we are at the end of the string provided
  139. if char is None:
  140. return ""
  141. # <object> starts with '{'
  142. if char == "{":
  143. self.index += 1
  144. value = self.parse_object(schema, path) if repairer else self.parse_object()
  145. return repairer.repair_value(value, schema, path) if repairer else value
  146. # <array> starts with '['
  147. if char == "[":
  148. self.index += 1
  149. value = self.parse_array(schema, path) if repairer else self.parse_array()
  150. return repairer.repair_value(value, schema, path) if repairer else value
  151. # <string> starts with a quote
  152. if not self.context.empty and (char in STRING_DELIMITERS or char.isalpha()):
  153. value = self.parse_string()
  154. return repairer.repair_value(value, schema, path) if repairer else value
  155. # <number> starts with [0-9] or minus
  156. if not self.context.empty and (char.isdigit() or char == "-" or char == "."):
  157. value = self.parse_number()
  158. return repairer.repair_value(value, schema, path) if repairer else value
  159. if char in ["#", "/"]:
  160. value = self.parse_comment()
  161. return repairer.repair_value(value, schema, path) if repairer else value
  162. # If everything else fails, we just ignore and move on
  163. self.index += 1
  164. def get_char_at(self, count: int = 0) -> str | None:
  165. # Why not use something simpler? Because try/except in python is a faster alternative to an "if" statement that is often True
  166. try:
  167. return self.json_str[self.index + count]
  168. except IndexError:
  169. return None
  170. def skip_whitespaces(self) -> None:
  171. """
  172. This function quickly iterates on whitespaces, moving the self.index forward
  173. """
  174. try:
  175. char = self.json_str[self.index]
  176. while char.isspace():
  177. self.index += 1
  178. char = self.json_str[self.index]
  179. except IndexError:
  180. pass
  181. def scroll_whitespaces(self, idx: int = 0) -> int:
  182. """
  183. This function quickly iterates on whitespaces. Doesn't move the self.index and returns the offset from self.index
  184. """
  185. try:
  186. char = self.json_str[self.index + idx]
  187. while char.isspace():
  188. idx += 1
  189. char = self.json_str[self.index + idx]
  190. except IndexError:
  191. pass
  192. return idx
  193. def skip_to_character(self, character: str | list[str], idx: int = 0) -> int:
  194. """
  195. Advance from (self.index + idx) until we hit an *unescaped* target character.
  196. Returns the offset (idx) from self.index to that position, or the distance to the end if not found.
  197. """
  198. targets = set(character) if isinstance(character, list) else {character}
  199. i = self.index + idx
  200. n = len(self.json_str)
  201. backslashes = 0 # count of consecutive '\' immediately before current char
  202. while i < n:
  203. ch = self.json_str[i]
  204. if ch == "\\":
  205. backslashes += 1
  206. i += 1
  207. continue
  208. # ch is not a backslash; if it's a target and not escaped (even backslashes), we're done
  209. if ch in targets and (backslashes % 2 == 0):
  210. return i - self.index
  211. # reset backslash run when we see a non-backslash
  212. backslashes = 0
  213. i += 1
  214. # not found; return distance to end
  215. return n - self.index
  216. def _log(self, text: str) -> None:
  217. window: int = 10
  218. start: int = max(self.index - window, 0)
  219. end: int = min(self.index + window, len(self.json_str))
  220. context: str = self.json_str[start:end]
  221. self.logger.append(
  222. {
  223. "text": text,
  224. "context": context,
  225. }
  226. )