_format.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. from __future__ import annotations
  2. from contextlib import suppress
  3. from datetime import date, datetime
  4. from uuid import UUID
  5. import ipaddress
  6. import re
  7. import typing
  8. import warnings
  9. from jsonschema.exceptions import FormatError
  10. _FormatCheckCallable = typing.Callable[[object], bool]
  11. #: A format checker callable.
  12. _F = typing.TypeVar("_F", bound=_FormatCheckCallable)
  13. _RaisesType = type[Exception] | tuple[type[Exception], ...]
  14. _RE_DATE = re.compile(r"^\d{4}-\d{2}-\d{2}$", re.ASCII)
  15. class FormatChecker:
  16. """
  17. A ``format`` property checker.
  18. JSON Schema does not mandate that the ``format`` property actually do any
  19. validation. If validation is desired however, instances of this class can
  20. be hooked into validators to enable format validation.
  21. `FormatChecker` objects always return ``True`` when asked about
  22. formats that they do not know how to validate.
  23. To add a check for a custom format use the `FormatChecker.checks`
  24. decorator.
  25. Arguments:
  26. formats:
  27. The known formats to validate. This argument can be used to
  28. limit which formats will be used during validation.
  29. """
  30. checkers: dict[
  31. str,
  32. tuple[_FormatCheckCallable, _RaisesType],
  33. ] = {} # noqa: RUF012
  34. def __init__(self, formats: typing.Iterable[str] | None = None):
  35. if formats is None:
  36. formats = self.checkers.keys()
  37. self.checkers = {k: self.checkers[k] for k in formats}
  38. def __repr__(self):
  39. return f"<FormatChecker checkers={sorted(self.checkers)}>"
  40. def checks(
  41. self, format: str, raises: _RaisesType = (),
  42. ) -> typing.Callable[[_F], _F]:
  43. """
  44. Register a decorated function as validating a new format.
  45. Arguments:
  46. format:
  47. The format that the decorated function will check.
  48. raises:
  49. The exception(s) raised by the decorated function when an
  50. invalid instance is found.
  51. The exception object will be accessible as the
  52. `jsonschema.exceptions.ValidationError.cause` attribute of the
  53. resulting validation error.
  54. """
  55. def _checks(func: _F) -> _F:
  56. self.checkers[format] = (func, raises)
  57. return func
  58. return _checks
  59. @classmethod
  60. def cls_checks(
  61. cls, format: str, raises: _RaisesType = (),
  62. ) -> typing.Callable[[_F], _F]:
  63. warnings.warn(
  64. (
  65. "FormatChecker.cls_checks is deprecated. Call "
  66. "FormatChecker.checks on a specific FormatChecker instance "
  67. "instead."
  68. ),
  69. DeprecationWarning,
  70. stacklevel=2,
  71. )
  72. return cls._cls_checks(format=format, raises=raises)
  73. @classmethod
  74. def _cls_checks(
  75. cls, format: str, raises: _RaisesType = (),
  76. ) -> typing.Callable[[_F], _F]:
  77. def _checks(func: _F) -> _F:
  78. cls.checkers[format] = (func, raises)
  79. return func
  80. return _checks
  81. def check(self, instance: object, format: str) -> None:
  82. """
  83. Check whether the instance conforms to the given format.
  84. Arguments:
  85. instance (*any primitive type*, i.e. str, number, bool):
  86. The instance to check
  87. format:
  88. The format that instance should conform to
  89. Raises:
  90. FormatError:
  91. if the instance does not conform to ``format``
  92. """
  93. if format not in self.checkers:
  94. return
  95. func, raises = self.checkers[format]
  96. result, cause = None, None
  97. try:
  98. result = func(instance)
  99. except raises as e:
  100. cause = e
  101. if not result:
  102. raise FormatError(f"{instance!r} is not a {format!r}", cause=cause)
  103. def conforms(self, instance: object, format: str) -> bool:
  104. """
  105. Check whether the instance conforms to the given format.
  106. Arguments:
  107. instance (*any primitive type*, i.e. str, number, bool):
  108. The instance to check
  109. format:
  110. The format that instance should conform to
  111. Returns:
  112. bool: whether it conformed
  113. """
  114. try:
  115. self.check(instance, format)
  116. except FormatError:
  117. return False
  118. else:
  119. return True
  120. draft3_format_checker = FormatChecker()
  121. draft4_format_checker = FormatChecker()
  122. draft6_format_checker = FormatChecker()
  123. draft7_format_checker = FormatChecker()
  124. draft201909_format_checker = FormatChecker()
  125. draft202012_format_checker = FormatChecker()
  126. _draft_checkers: dict[str, FormatChecker] = dict(
  127. draft3=draft3_format_checker,
  128. draft4=draft4_format_checker,
  129. draft6=draft6_format_checker,
  130. draft7=draft7_format_checker,
  131. draft201909=draft201909_format_checker,
  132. draft202012=draft202012_format_checker,
  133. )
  134. def _checks_drafts(
  135. name=None,
  136. draft3=None,
  137. draft4=None,
  138. draft6=None,
  139. draft7=None,
  140. draft201909=None,
  141. draft202012=None,
  142. raises=(),
  143. ) -> typing.Callable[[_F], _F]:
  144. draft3 = draft3 or name
  145. draft4 = draft4 or name
  146. draft6 = draft6 or name
  147. draft7 = draft7 or name
  148. draft201909 = draft201909 or name
  149. draft202012 = draft202012 or name
  150. def wrap(func: _F) -> _F:
  151. if draft3:
  152. func = _draft_checkers["draft3"].checks(draft3, raises)(func)
  153. if draft4:
  154. func = _draft_checkers["draft4"].checks(draft4, raises)(func)
  155. if draft6:
  156. func = _draft_checkers["draft6"].checks(draft6, raises)(func)
  157. if draft7:
  158. func = _draft_checkers["draft7"].checks(draft7, raises)(func)
  159. if draft201909:
  160. func = _draft_checkers["draft201909"].checks(draft201909, raises)(
  161. func,
  162. )
  163. if draft202012:
  164. func = _draft_checkers["draft202012"].checks(draft202012, raises)(
  165. func,
  166. )
  167. # Oy. This is bad global state, but relied upon for now, until
  168. # deprecation. See #519 and test_format_checkers_come_with_defaults
  169. FormatChecker._cls_checks(
  170. draft202012 or draft201909 or draft7 or draft6 or draft4 or draft3,
  171. raises,
  172. )(func)
  173. return func
  174. return wrap
  175. @_checks_drafts(name="idn-email")
  176. @_checks_drafts(name="email")
  177. def is_email(instance: object) -> bool:
  178. if not isinstance(instance, str):
  179. return True
  180. return "@" in instance
  181. @_checks_drafts(
  182. draft3="ip-address",
  183. draft4="ipv4",
  184. draft6="ipv4",
  185. draft7="ipv4",
  186. draft201909="ipv4",
  187. draft202012="ipv4",
  188. raises=ipaddress.AddressValueError,
  189. )
  190. def is_ipv4(instance: object) -> bool:
  191. if not isinstance(instance, str):
  192. return True
  193. return bool(ipaddress.IPv4Address(instance))
  194. @_checks_drafts(name="ipv6", raises=ipaddress.AddressValueError)
  195. def is_ipv6(instance: object) -> bool:
  196. if not isinstance(instance, str):
  197. return True
  198. address = ipaddress.IPv6Address(instance)
  199. return not getattr(address, "scope_id", "")
  200. with suppress(ImportError):
  201. from fqdn import FQDN
  202. @_checks_drafts(
  203. draft3="host-name",
  204. draft4="hostname",
  205. draft6="hostname",
  206. draft7="hostname",
  207. draft201909="hostname",
  208. draft202012="hostname",
  209. # fqdn.FQDN("") raises a ValueError due to a bug
  210. # however, it's not clear when or if that will be fixed, so catch it
  211. # here for now
  212. raises=ValueError,
  213. )
  214. def is_host_name(instance: object) -> bool:
  215. if not isinstance(instance, str):
  216. return True
  217. return FQDN(instance, min_labels=1).is_valid
  218. with suppress(ImportError):
  219. # The built-in `idna` codec only implements RFC 3890, so we go elsewhere.
  220. import idna
  221. @_checks_drafts(
  222. draft7="idn-hostname",
  223. draft201909="idn-hostname",
  224. draft202012="idn-hostname",
  225. raises=(idna.IDNAError, UnicodeError),
  226. )
  227. def is_idn_host_name(instance: object) -> bool:
  228. if not isinstance(instance, str):
  229. return True
  230. idna.encode(instance)
  231. return True
  232. try:
  233. import rfc3987
  234. except ImportError:
  235. with suppress(ImportError):
  236. from rfc3986_validator import validate_rfc3986
  237. @_checks_drafts(name="uri")
  238. def is_uri(instance: object) -> bool:
  239. if not isinstance(instance, str):
  240. return True
  241. return validate_rfc3986(instance, rule="URI")
  242. @_checks_drafts(
  243. draft6="uri-reference",
  244. draft7="uri-reference",
  245. draft201909="uri-reference",
  246. draft202012="uri-reference",
  247. raises=ValueError,
  248. )
  249. def is_uri_reference(instance: object) -> bool:
  250. if not isinstance(instance, str):
  251. return True
  252. return validate_rfc3986(instance, rule="URI_reference")
  253. with suppress(ImportError):
  254. from rfc3987_syntax import is_valid_syntax as _rfc3987_is_valid_syntax
  255. @_checks_drafts(
  256. draft7="iri",
  257. draft201909="iri",
  258. draft202012="iri",
  259. raises=ValueError,
  260. )
  261. def is_iri(instance: object) -> bool:
  262. if not isinstance(instance, str):
  263. return True
  264. return _rfc3987_is_valid_syntax("iri", instance)
  265. @_checks_drafts(
  266. draft7="iri-reference",
  267. draft201909="iri-reference",
  268. draft202012="iri-reference",
  269. raises=ValueError,
  270. )
  271. def is_iri_reference(instance: object) -> bool:
  272. if not isinstance(instance, str):
  273. return True
  274. return _rfc3987_is_valid_syntax("iri_reference", instance)
  275. else:
  276. @_checks_drafts(
  277. draft7="iri",
  278. draft201909="iri",
  279. draft202012="iri",
  280. raises=ValueError,
  281. )
  282. def is_iri(instance: object) -> bool:
  283. if not isinstance(instance, str):
  284. return True
  285. return rfc3987.parse(instance, rule="IRI")
  286. @_checks_drafts(
  287. draft7="iri-reference",
  288. draft201909="iri-reference",
  289. draft202012="iri-reference",
  290. raises=ValueError,
  291. )
  292. def is_iri_reference(instance: object) -> bool:
  293. if not isinstance(instance, str):
  294. return True
  295. return rfc3987.parse(instance, rule="IRI_reference")
  296. @_checks_drafts(name="uri", raises=ValueError)
  297. def is_uri(instance: object) -> bool:
  298. if not isinstance(instance, str):
  299. return True
  300. return rfc3987.parse(instance, rule="URI")
  301. @_checks_drafts(
  302. draft6="uri-reference",
  303. draft7="uri-reference",
  304. draft201909="uri-reference",
  305. draft202012="uri-reference",
  306. raises=ValueError,
  307. )
  308. def is_uri_reference(instance: object) -> bool:
  309. if not isinstance(instance, str):
  310. return True
  311. return rfc3987.parse(instance, rule="URI_reference")
  312. with suppress(ImportError):
  313. from rfc3339_validator import validate_rfc3339
  314. @_checks_drafts(name="date-time")
  315. def is_datetime(instance: object) -> bool:
  316. if not isinstance(instance, str):
  317. return True
  318. return validate_rfc3339(instance.upper())
  319. @_checks_drafts(
  320. draft7="time",
  321. draft201909="time",
  322. draft202012="time",
  323. )
  324. def is_time(instance: object) -> bool:
  325. if not isinstance(instance, str):
  326. return True
  327. return is_datetime("1970-01-01T" + instance)
  328. @_checks_drafts(name="regex", raises=re.error)
  329. def is_regex(instance: object) -> bool:
  330. if not isinstance(instance, str):
  331. return True
  332. return bool(re.compile(instance))
  333. @_checks_drafts(
  334. draft3="date",
  335. draft7="date",
  336. draft201909="date",
  337. draft202012="date",
  338. raises=ValueError,
  339. )
  340. def is_date(instance: object) -> bool:
  341. if not isinstance(instance, str):
  342. return True
  343. return bool(_RE_DATE.fullmatch(instance) and date.fromisoformat(instance))
  344. @_checks_drafts(draft3="time", raises=ValueError)
  345. def is_draft3_time(instance: object) -> bool:
  346. if not isinstance(instance, str):
  347. return True
  348. return bool(datetime.strptime(instance, "%H:%M:%S")) # noqa: DTZ007
  349. with suppress(ImportError):
  350. import webcolors
  351. @_checks_drafts(draft3="color", raises=(ValueError, TypeError))
  352. def is_css21_color(instance: object) -> bool:
  353. if isinstance(instance, str):
  354. try:
  355. webcolors.name_to_hex(instance)
  356. except ValueError:
  357. webcolors.normalize_hex(instance.lower())
  358. return True
  359. with suppress(ImportError):
  360. import jsonpointer
  361. @_checks_drafts(
  362. draft6="json-pointer",
  363. draft7="json-pointer",
  364. draft201909="json-pointer",
  365. draft202012="json-pointer",
  366. raises=jsonpointer.JsonPointerException,
  367. )
  368. def is_json_pointer(instance: object) -> bool:
  369. if not isinstance(instance, str):
  370. return True
  371. return bool(jsonpointer.JsonPointer(instance))
  372. # TODO: I don't want to maintain this, so it
  373. # needs to go either into jsonpointer (pending
  374. # https://github.com/stefankoegl/python-json-pointer/issues/34) or
  375. # into a new external library.
  376. @_checks_drafts(
  377. draft7="relative-json-pointer",
  378. draft201909="relative-json-pointer",
  379. draft202012="relative-json-pointer",
  380. raises=jsonpointer.JsonPointerException,
  381. )
  382. def is_relative_json_pointer(instance: object) -> bool:
  383. # Definition taken from:
  384. # https://tools.ietf.org/html/draft-handrews-relative-json-pointer-01#section-3
  385. if not isinstance(instance, str):
  386. return True
  387. if not instance:
  388. return False
  389. non_negative_integer, rest = [], ""
  390. for i, character in enumerate(instance):
  391. if character.isdigit():
  392. # digits with a leading "0" are not allowed
  393. if i > 0 and int(instance[i - 1]) == 0:
  394. return False
  395. non_negative_integer.append(character)
  396. continue
  397. if not non_negative_integer:
  398. return False
  399. rest = instance[i:]
  400. break
  401. return (rest == "#") or bool(jsonpointer.JsonPointer(rest))
  402. with suppress(ImportError):
  403. import uri_template
  404. @_checks_drafts(
  405. draft6="uri-template",
  406. draft7="uri-template",
  407. draft201909="uri-template",
  408. draft202012="uri-template",
  409. )
  410. def is_uri_template(instance: object) -> bool:
  411. if not isinstance(instance, str):
  412. return True
  413. return uri_template.validate(instance)
  414. with suppress(ImportError):
  415. import isoduration
  416. @_checks_drafts(
  417. draft201909="duration",
  418. draft202012="duration",
  419. raises=isoduration.DurationParsingException,
  420. )
  421. def is_duration(instance: object) -> bool:
  422. if not isinstance(instance, str):
  423. return True
  424. isoduration.parse_duration(instance)
  425. # FIXME: See bolsote/isoduration#25 and bolsote/isoduration#21
  426. return instance.endswith(tuple("DMYWHMS"))
  427. @_checks_drafts(
  428. draft201909="uuid",
  429. draft202012="uuid",
  430. raises=ValueError,
  431. )
  432. def is_uuid(instance: object) -> bool:
  433. if not isinstance(instance, str):
  434. return True
  435. UUID(instance)
  436. return all(instance[position] == "-" for position in (8, 13, 18, 23))