generator.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. from collections import OrderedDict
  2. from decimal import Decimal
  3. import re
  4. from .exceptions import JsonSchemaValueException, JsonSchemaDefinitionException
  5. from .indent import indent
  6. from .ref_resolver import RefResolver
  7. def enforce_list(variable):
  8. if isinstance(variable, list):
  9. return variable
  10. return [variable]
  11. # pylint: disable=too-many-instance-attributes,too-many-public-methods
  12. class CodeGenerator:
  13. """
  14. This class is not supposed to be used directly. Anything
  15. inside of this class can be changed without noticing.
  16. This class generates code of validation function from JSON
  17. schema object as string. Example:
  18. .. code-block:: python
  19. CodeGenerator(json_schema_definition).func_code
  20. """
  21. INDENT = 4 # spaces
  22. def __init__(self, definition, resolver=None, detailed_exceptions=True):
  23. self._code = []
  24. self._compile_regexps = {}
  25. self._custom_formats = {}
  26. self._detailed_exceptions = detailed_exceptions
  27. # Any extra library should be here to be imported only once.
  28. # Lines are imports to be printed in the file and objects
  29. # key-value pair to pass to compile function directly.
  30. self._extra_imports_lines = [
  31. "from decimal import Decimal",
  32. ]
  33. self._extra_imports_objects = {
  34. "Decimal": Decimal,
  35. }
  36. self._variables = set()
  37. self._indent = 0
  38. self._indent_last_line = None
  39. self._variable = None
  40. self._variable_name = None
  41. self._root_definition = definition
  42. self._definition = None
  43. # map schema URIs to validation function names for functions
  44. # that are not yet generated, but need to be generated
  45. self._needed_validation_functions = {}
  46. # validation function names that are already done
  47. self._validation_functions_done = set()
  48. if resolver is None:
  49. resolver = RefResolver.from_schema(definition, store={})
  50. self._resolver = resolver
  51. # add main function to `self._needed_validation_functions`
  52. self._needed_validation_functions[self._resolver.get_uri()] = self._resolver.get_scope_name()
  53. self._json_keywords_to_function = OrderedDict()
  54. @property
  55. def func_code(self):
  56. """
  57. Returns generated code of whole validation function as string.
  58. """
  59. self._generate_func_code()
  60. return '\n'.join(self._code)
  61. @property
  62. def global_state(self):
  63. """
  64. Returns global variables for generating function from ``func_code``. Includes
  65. compiled regular expressions and imports, so it does not have to do it every
  66. time when validation function is called.
  67. """
  68. self._generate_func_code()
  69. return dict(
  70. **self._extra_imports_objects,
  71. REGEX_PATTERNS=self._compile_regexps,
  72. re=re,
  73. JsonSchemaValueException=JsonSchemaValueException,
  74. )
  75. @property
  76. def global_state_code(self):
  77. """
  78. Returns global variables for generating function from ``func_code`` as code.
  79. Includes compiled regular expressions and imports.
  80. """
  81. self._generate_func_code()
  82. if not self._compile_regexps:
  83. return '\n'.join(self._extra_imports_lines + [
  84. 'from fastjsonschema import JsonSchemaValueException',
  85. '',
  86. '',
  87. ])
  88. return '\n'.join(self._extra_imports_lines + [
  89. 'import re',
  90. 'from fastjsonschema import JsonSchemaValueException',
  91. '',
  92. '',
  93. 'REGEX_PATTERNS = ' + serialize_regexes(self._compile_regexps),
  94. '',
  95. ])
  96. def _generate_func_code(self):
  97. if not self._code:
  98. self.generate_func_code()
  99. def generate_func_code(self):
  100. """
  101. Creates base code of validation function and calls helper
  102. for creating code by definition.
  103. """
  104. self.l('NoneType = type(None)')
  105. # Generate parts that are referenced and not yet generated
  106. while self._needed_validation_functions:
  107. # During generation of validation function, could be needed to generate
  108. # new one that is added again to `_needed_validation_functions`.
  109. # Therefore usage of while instead of for loop.
  110. uri, name = self._needed_validation_functions.popitem()
  111. self.generate_validation_function(uri, name)
  112. def generate_validation_function(self, uri, name):
  113. """
  114. Generate validation function for given uri with given name
  115. """
  116. self._validation_functions_done.add(uri)
  117. self.l('')
  118. with self._resolver.resolving(uri) as definition:
  119. with self.l('def {}(data, custom_formats={{}}, name_prefix=None):', name):
  120. self.generate_func_code_block(definition, 'data', 'data', clear_variables=True)
  121. self.l('return data')
  122. def generate_func_code_block(self, definition, variable, variable_name, clear_variables=False):
  123. """
  124. Creates validation rules for current definition.
  125. Returns the number of validation rules generated as code.
  126. """
  127. backup = self._definition, self._variable, self._variable_name
  128. self._definition, self._variable, self._variable_name = definition, variable, variable_name
  129. if clear_variables:
  130. backup_variables = self._variables
  131. self._variables = set()
  132. count = self._generate_func_code_block(definition)
  133. self._definition, self._variable, self._variable_name = backup
  134. if clear_variables:
  135. self._variables = backup_variables
  136. return count
  137. def _generate_func_code_block(self, definition):
  138. if not isinstance(definition, dict):
  139. raise JsonSchemaDefinitionException("definition must be an object")
  140. if '$ref' in definition:
  141. # needed because ref overrides any sibling keywords
  142. return self.generate_ref()
  143. else:
  144. return self.run_generate_functions(definition)
  145. def run_generate_functions(self, definition):
  146. """Returns the number of generate functions that were executed."""
  147. count = 0
  148. for key, func in self._json_keywords_to_function.items():
  149. if key in definition:
  150. func()
  151. count += 1
  152. return count
  153. def generate_ref(self):
  154. """
  155. Ref can be link to remote or local definition.
  156. .. code-block:: python
  157. {'$ref': 'http://json-schema.org/draft-04/schema#'}
  158. {
  159. 'properties': {
  160. 'foo': {'type': 'integer'},
  161. 'bar': {'$ref': '#/properties/foo'}
  162. }
  163. }
  164. """
  165. with self._resolver.in_scope(self._definition['$ref']):
  166. name = self._resolver.get_scope_name()
  167. uri = self._resolver.get_uri()
  168. if uri not in self._validation_functions_done:
  169. self._needed_validation_functions[uri] = name
  170. # call validation function
  171. assert self._variable_name.startswith("data")
  172. path = self._variable_name[4:]
  173. name_arg = '(name_prefix or "data") + "{}"'.format(path)
  174. if '{' in name_arg:
  175. name_arg = name_arg + '.format(**locals())'
  176. self.l('{}({variable}, custom_formats, {name_arg})', name, name_arg=name_arg)
  177. # pylint: disable=invalid-name
  178. @indent
  179. def l(self, line, *args, **kwds):
  180. """
  181. Short-cut of line. Used for inserting line. It's formated with parameters
  182. ``variable``, ``variable_name`` (as ``name`` for short-cut), all keys from
  183. current JSON schema ``definition`` and also passed arguments in ``args``
  184. and named ``kwds``.
  185. .. code-block:: python
  186. self.l('if {variable} not in {enum}: raise JsonSchemaValueException("Wrong!")')
  187. When you want to indent block, use it as context manager. For example:
  188. .. code-block:: python
  189. with self.l('if {variable} not in {enum}:'):
  190. self.l('raise JsonSchemaValueException("Wrong!")')
  191. """
  192. spaces = ' ' * self.INDENT * self._indent
  193. name = self._variable_name
  194. if name:
  195. # Add name_prefix to the name when it is being outputted.
  196. assert name.startswith('data')
  197. name = '" + (name_prefix or "data") + "' + name[4:]
  198. if '{' in name:
  199. name = name + '".format(**locals()) + "'
  200. context = dict(
  201. self._definition if self._definition and self._definition is not True else {},
  202. variable=self._variable,
  203. name=name,
  204. **kwds
  205. )
  206. line = line.format(*args, **context)
  207. line = line.replace('\n', '\\n').replace('\r', '\\r')
  208. self._code.append(spaces + line)
  209. return line
  210. def e(self, string):
  211. """
  212. Short-cut of escape. Used for inserting user values into a string message.
  213. .. code-block:: python
  214. self.l('raise JsonSchemaValueException("Variable: {}")', self.e(variable))
  215. """
  216. return str(string).replace('"', '\\"')
  217. def exc(self, msg, *args, append_to_msg=None, rule=None):
  218. """
  219. Short-cut for creating raising exception in the code.
  220. """
  221. if not self._detailed_exceptions:
  222. self.l('raise JsonSchemaValueException("'+msg+'")', *args)
  223. return
  224. arg = '"'+msg+'"'
  225. if append_to_msg:
  226. arg += ' + (' + append_to_msg + ')'
  227. msg = 'raise JsonSchemaValueException('+arg+', value={variable}, name="{name}", definition={definition}, rule={rule})'
  228. definition = self._expand_refs(self._definition)
  229. definition_rule = self.e(definition.get(rule) if isinstance(definition, dict) else None)
  230. self.l(msg, *args, definition=repr(definition), rule=repr(rule), definition_rule=definition_rule)
  231. def _expand_refs(self, definition):
  232. if isinstance(definition, list):
  233. return [self._expand_refs(v) for v in definition]
  234. if not isinstance(definition, dict):
  235. return definition
  236. if "$ref" in definition and isinstance(definition["$ref"], str):
  237. with self._resolver.resolving(definition["$ref"]) as schema:
  238. return schema
  239. return {k: self._expand_refs(v) for k, v in definition.items()}
  240. def create_variable_with_length(self):
  241. """
  242. Append code for creating variable with length of that variable
  243. (for example length of list or dictionary) with name ``{variable}_len``.
  244. It can be called several times and always it's done only when that variable
  245. still does not exists.
  246. """
  247. variable_name = '{}_len'.format(self._variable)
  248. if variable_name in self._variables:
  249. return
  250. self._variables.add(variable_name)
  251. self.l('{variable}_len = len({variable})')
  252. def create_variable_keys(self):
  253. """
  254. Append code for creating variable with keys of that variable (dictionary)
  255. with a name ``{variable}_keys``. Similar to `create_variable_with_length`.
  256. """
  257. variable_name = '{}_keys'.format(self._variable)
  258. if variable_name in self._variables:
  259. return
  260. self._variables.add(variable_name)
  261. self.l('{variable}_keys = set({variable}.keys())')
  262. def create_variable_is_list(self):
  263. """
  264. Append code for creating variable with bool if it's instance of list
  265. with a name ``{variable}_is_list``. Similar to `create_variable_with_length`.
  266. """
  267. variable_name = '{}_is_list'.format(self._variable)
  268. if variable_name in self._variables:
  269. return
  270. self._variables.add(variable_name)
  271. self.l('{variable}_is_list = isinstance({variable}, (list, tuple))')
  272. def create_variable_is_dict(self):
  273. """
  274. Append code for creating variable with bool if it's instance of list
  275. with a name ``{variable}_is_dict``. Similar to `create_variable_with_length`.
  276. """
  277. variable_name = '{}_is_dict'.format(self._variable)
  278. if variable_name in self._variables:
  279. return
  280. self._variables.add(variable_name)
  281. self.l('{variable}_is_dict = isinstance({variable}, dict)')
  282. def serialize_regexes(patterns_dict):
  283. # Unfortunately using `pprint.pformat` is causing errors
  284. # specially with big regexes
  285. regex_patterns = (
  286. repr(k) + ": " + repr_regex(v)
  287. for k, v in patterns_dict.items()
  288. )
  289. return '{\n ' + ",\n ".join(regex_patterns) + "\n}"
  290. def repr_regex(regex):
  291. all_flags = ("A", "I", "DEBUG", "L", "M", "S", "X")
  292. flags = " | ".join(f"re.{f}" for f in all_flags if regex.flags & getattr(re, f))
  293. flags = ", " + flags if flags else ""
  294. return "re.compile({!r}{})".format(regex.pattern, flags)