_suite.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. """
  2. Python representations of the JSON Schema Test Suite tests.
  3. """
  4. from __future__ import annotations
  5. from contextlib import suppress
  6. from functools import partial
  7. from pathlib import Path
  8. from typing import TYPE_CHECKING, Any
  9. import json
  10. import os
  11. import re
  12. import sys
  13. import unittest
  14. from attrs import field, frozen
  15. from referencing import Registry
  16. import referencing.jsonschema
  17. if TYPE_CHECKING:
  18. from collections.abc import Iterable, Mapping, Sequence
  19. from referencing.jsonschema import Schema
  20. import pyperf
  21. from jsonschema.validators import _VALIDATORS
  22. import jsonschema
  23. MAGIC_REMOTE_URL = "http://localhost:1234"
  24. _DELIMITERS = re.compile(r"[\W\- ]+")
  25. def _find_suite():
  26. root = os.environ.get("JSON_SCHEMA_TEST_SUITE")
  27. if root is not None:
  28. return Path(root)
  29. root = Path(jsonschema.__file__).parent.parent / "json"
  30. if not root.is_dir(): # pragma: no cover
  31. raise ValueError(
  32. (
  33. "Can't find the JSON-Schema-Test-Suite directory. "
  34. "Set the 'JSON_SCHEMA_TEST_SUITE' environment "
  35. "variable or run the tests from alongside a checkout "
  36. "of the suite."
  37. ),
  38. )
  39. return root
  40. @frozen
  41. class Suite:
  42. _root: Path = field(factory=_find_suite)
  43. def benchmark(self, runner: pyperf.Runner): # pragma: no cover
  44. for name, Validator in _VALIDATORS.items():
  45. self.version(name=name).benchmark(
  46. runner=runner,
  47. Validator=Validator,
  48. )
  49. def version(self, name) -> Version:
  50. Validator = _VALIDATORS[name]
  51. uri: str = Validator.ID_OF(Validator.META_SCHEMA) # type: ignore[assignment]
  52. specification = referencing.jsonschema.specification_with(uri)
  53. registry = Registry().with_contents(
  54. remotes_in(root=self._root / "remotes", name=name, uri=uri),
  55. default_specification=specification,
  56. )
  57. return Version(
  58. name=name,
  59. path=self._root / "tests" / name,
  60. remotes=registry,
  61. )
  62. @frozen
  63. class Version:
  64. _path: Path
  65. _remotes: referencing.jsonschema.SchemaRegistry
  66. name: str
  67. def benchmark(self, **kwargs): # pragma: no cover
  68. for case in self.cases():
  69. case.benchmark(**kwargs)
  70. def cases(self) -> Iterable[_Case]:
  71. return self._cases_in(paths=self._path.glob("*.json"))
  72. def format_cases(self) -> Iterable[_Case]:
  73. return self._cases_in(paths=self._path.glob("optional/format/*.json"))
  74. def optional_cases_of(self, name: str) -> Iterable[_Case]:
  75. return self._cases_in(paths=[self._path / "optional" / f"{name}.json"])
  76. def to_unittest_testcase(self, *groups, **kwargs):
  77. name = kwargs.pop("name", "Test" + self.name.title().replace("-", ""))
  78. methods = {
  79. method.__name__: method
  80. for method in (
  81. test.to_unittest_method(**kwargs)
  82. for group in groups
  83. for case in group
  84. for test in case.tests
  85. )
  86. }
  87. cls = type(name, (unittest.TestCase,), methods)
  88. # We're doing crazy things, so if they go wrong, like a function
  89. # behaving differently on some other interpreter, just make them
  90. # not happen.
  91. with suppress(Exception):
  92. cls.__module__ = _someone_save_us_the_module_of_the_caller()
  93. return cls
  94. def _cases_in(self, paths: Iterable[Path]) -> Iterable[_Case]:
  95. for path in paths:
  96. for case in json.loads(path.read_text(encoding="utf-8")):
  97. yield _Case.from_dict(
  98. case,
  99. version=self,
  100. subject=path.stem,
  101. remotes=self._remotes,
  102. )
  103. @frozen
  104. class _Case:
  105. version: Version
  106. subject: str
  107. description: str
  108. schema: Mapping[str, Any] | bool
  109. tests: list[_Test]
  110. comment: str | None = None
  111. specification: Sequence[dict[str, str]] = ()
  112. @classmethod
  113. def from_dict(cls, data, remotes, **kwargs):
  114. data.update(kwargs)
  115. tests = [
  116. _Test(
  117. version=data["version"],
  118. subject=data["subject"],
  119. case_description=data["description"],
  120. schema=data["schema"],
  121. remotes=remotes,
  122. **test,
  123. ) for test in data.pop("tests")
  124. ]
  125. return cls(tests=tests, **data)
  126. def benchmark(self, runner: pyperf.Runner, **kwargs): # pragma: no cover
  127. for test in self.tests:
  128. runner.bench_func(
  129. test.fully_qualified_name,
  130. partial(test.validate_ignoring_errors, **kwargs),
  131. )
  132. def remotes_in(
  133. root: Path,
  134. name: str,
  135. uri: str,
  136. ) -> Iterable[tuple[str, Schema]]:
  137. # This messy logic is because the test suite is terrible at indicating
  138. # what remotes are needed for what drafts, and mixes in schemas which
  139. # have no $schema and which are invalid under earlier versions, in with
  140. # other schemas which are needed for tests.
  141. for each in root.rglob("*.json"):
  142. schema = json.loads(each.read_text())
  143. relative = str(each.relative_to(root)).replace("\\", "/")
  144. if (
  145. ( # invalid boolean schema
  146. name in {"draft3", "draft4"}
  147. and each.stem == "tree"
  148. ) or
  149. ( # draft<NotThisDialect>/*.json
  150. "$schema" not in schema
  151. and relative.startswith("draft")
  152. and not relative.startswith(name)
  153. )
  154. ):
  155. continue
  156. yield f"{MAGIC_REMOTE_URL}/{relative}", schema
  157. @frozen(repr=False)
  158. class _Test:
  159. version: Version
  160. subject: str
  161. case_description: str
  162. description: str
  163. data: Any
  164. schema: Mapping[str, Any] | bool
  165. valid: bool
  166. _remotes: referencing.jsonschema.SchemaRegistry
  167. comment: str | None = None
  168. def __repr__(self): # pragma: no cover
  169. return f"<Test {self.fully_qualified_name}>"
  170. @property
  171. def fully_qualified_name(self): # pragma: no cover
  172. return " > ".join( # noqa: FLY002
  173. [
  174. self.version.name,
  175. self.subject,
  176. self.case_description,
  177. self.description,
  178. ],
  179. )
  180. def to_unittest_method(self, skip=lambda test: None, **kwargs):
  181. if self.valid:
  182. def fn(this):
  183. self.validate(**kwargs)
  184. else:
  185. def fn(this):
  186. with this.assertRaises(jsonschema.ValidationError):
  187. self.validate(**kwargs)
  188. fn.__name__ = "_".join(
  189. [
  190. "test",
  191. _DELIMITERS.sub("_", self.subject),
  192. _DELIMITERS.sub("_", self.case_description),
  193. _DELIMITERS.sub("_", self.description),
  194. ],
  195. )
  196. reason = skip(self)
  197. if reason is None or os.environ.get("JSON_SCHEMA_DEBUG", "0") != "0":
  198. return fn
  199. elif os.environ.get("JSON_SCHEMA_EXPECTED_FAILURES", "0") != "0": # pragma: no cover # noqa: E501
  200. return unittest.expectedFailure(fn)
  201. else:
  202. return unittest.skip(reason)(fn)
  203. def validate(self, Validator, **kwargs):
  204. Validator.check_schema(self.schema)
  205. validator = Validator(
  206. schema=self.schema,
  207. registry=self._remotes,
  208. **kwargs,
  209. )
  210. if os.environ.get("JSON_SCHEMA_DEBUG", "0") != "0": # pragma: no cover
  211. breakpoint() # noqa: T100
  212. validator.validate(instance=self.data)
  213. def validate_ignoring_errors(self, Validator): # pragma: no cover
  214. with suppress(jsonschema.ValidationError):
  215. self.validate(Validator=Validator)
  216. def _someone_save_us_the_module_of_the_caller():
  217. """
  218. The FQON of the module 2nd stack frames up from here.
  219. This is intended to allow us to dynamically return test case classes that
  220. are indistinguishable from being defined in the module that wants them.
  221. Otherwise, trial will mis-print the FQON, and copy pasting it won't re-run
  222. the class that really is running.
  223. Save us all, this is all so so so so so terrible.
  224. """
  225. return sys._getframe(2).f_globals["__name__"]