ref_resolver.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. """
  2. JSON Schema URI resolution scopes and dereferencing
  3. https://tools.ietf.org/id/draft-zyp-json-schema-04.html#rfc.section.7
  4. Code adapted from https://github.com/Julian/jsonschema
  5. """
  6. import contextlib
  7. import json
  8. import re
  9. from urllib import parse as urlparse
  10. from urllib.parse import unquote
  11. from .exceptions import JsonSchemaDefinitionException
  12. def get_id(schema):
  13. """
  14. Originally ID was `id` and since v7 it's `$id`.
  15. """
  16. return schema.get('$id', schema.get('id', ''))
  17. def resolve_path(schema, fragment):
  18. """
  19. Return definition from path.
  20. Path is unescaped according https://tools.ietf.org/html/rfc6901
  21. """
  22. fragment = fragment.lstrip('/')
  23. parts = unquote(fragment).split('/') if fragment else []
  24. for part in parts:
  25. part = part.replace('~1', '/').replace('~0', '~')
  26. if isinstance(schema, list):
  27. schema = schema[int(part)]
  28. elif part in schema:
  29. schema = schema[part]
  30. else:
  31. raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(part))
  32. return schema
  33. def normalize(uri):
  34. return urlparse.urlsplit(uri).geturl()
  35. def resolve_remote(uri, handlers):
  36. """
  37. Resolve a remote ``uri``.
  38. .. note::
  39. urllib library is used to fetch requests from the remote ``uri``
  40. if handlers does notdefine otherwise.
  41. """
  42. scheme = urlparse.urlsplit(uri).scheme
  43. if scheme in handlers:
  44. result = handlers[scheme](uri)
  45. else:
  46. from urllib.request import urlopen
  47. req = urlopen(uri)
  48. encoding = req.info().get_content_charset() or 'utf-8'
  49. try:
  50. result = json.loads(req.read().decode(encoding),)
  51. except ValueError as exc:
  52. raise JsonSchemaDefinitionException('{} failed to decode: {}'.format(uri, exc))
  53. finally:
  54. req.close()
  55. return result
  56. class RefResolver:
  57. """
  58. Resolve JSON References.
  59. """
  60. # pylint: disable=dangerous-default-value,too-many-arguments
  61. def __init__(self, base_uri, schema, store={}, cache=True, handlers={}):
  62. """
  63. `base_uri` is URI of the referring document from the `schema`.
  64. `store` is an dictionary that will be used to cache the fetched schemas
  65. (if `cache=True`).
  66. Please notice that you can have caching problems when compiling schemas
  67. with colliding `$ref`. To force overwriting use `cache=False` or
  68. explicitly pass the `store` argument (with a brand new dictionary)
  69. """
  70. self.base_uri = base_uri
  71. self.resolution_scope = base_uri
  72. self.schema = schema
  73. self.store = store
  74. self.cache = cache
  75. self.handlers = handlers
  76. self.walk(schema)
  77. @classmethod
  78. def from_schema(cls, schema, handlers={}, **kwargs):
  79. """
  80. Construct a resolver from a JSON schema object.
  81. """
  82. return cls(
  83. get_id(schema) if isinstance(schema, dict) else '',
  84. schema,
  85. handlers=handlers,
  86. **kwargs
  87. )
  88. @contextlib.contextmanager
  89. def in_scope(self, scope: str):
  90. """
  91. Context manager to handle current scope.
  92. """
  93. old_scope = self.resolution_scope
  94. self.resolution_scope = urlparse.urljoin(old_scope, scope)
  95. try:
  96. yield
  97. finally:
  98. self.resolution_scope = old_scope
  99. @contextlib.contextmanager
  100. def resolving(self, ref: str):
  101. """
  102. Context manager which resolves a JSON ``ref`` and enters the
  103. resolution scope of this ref.
  104. """
  105. new_uri = urlparse.urljoin(self.resolution_scope, ref)
  106. uri, fragment = urlparse.urldefrag(new_uri)
  107. if uri and normalize(uri) in self.store:
  108. schema = self.store[normalize(uri)]
  109. elif not uri or uri == self.base_uri:
  110. schema = self.schema
  111. else:
  112. schema = resolve_remote(uri, self.handlers)
  113. if self.cache:
  114. self.store[normalize(uri)] = schema
  115. old_base_uri, old_schema = self.base_uri, self.schema
  116. self.base_uri, self.schema = uri, schema
  117. try:
  118. with self.in_scope(uri):
  119. yield resolve_path(schema, fragment)
  120. finally:
  121. self.base_uri, self.schema = old_base_uri, old_schema
  122. def get_uri(self):
  123. return normalize(self.resolution_scope)
  124. def get_scope_name(self):
  125. """
  126. Get current scope and return it as a valid function name.
  127. """
  128. name = 'validate_' + unquote(self.resolution_scope).replace('~1', '_').replace('~0', '_').replace('"', '')
  129. name = re.sub(r'($[^a-zA-Z]|[^a-zA-Z0-9])', '_', name)
  130. name = name.lower().rstrip('_')
  131. return name
  132. def walk(self, node: dict):
  133. """
  134. Walk thru schema and dereferencing ``id`` and ``$ref`` instances
  135. """
  136. if isinstance(node, bool):
  137. pass
  138. elif '$ref' in node and isinstance(node['$ref'], str):
  139. ref = node['$ref']
  140. node['$ref'] = urlparse.urljoin(self.resolution_scope, ref)
  141. elif ('$id' in node or 'id' in node) and isinstance(get_id(node), str):
  142. with self.in_scope(get_id(node)):
  143. self.store[normalize(self.resolution_scope)] = node
  144. for _, item in node.items():
  145. if isinstance(item, dict):
  146. self.walk(item)
  147. else:
  148. for _, item in node.items():
  149. if isinstance(item, dict):
  150. self.walk(item)