test_utils.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. # Copyright (c) Jupyter Development Team.
  2. # Distributed under the terms of the Modified BSD License.
  3. """Testing utils."""
  4. from __future__ import annotations
  5. import json
  6. import os
  7. import sys
  8. from http.cookies import SimpleCookie
  9. from pathlib import Path
  10. from urllib.parse import parse_qs, urlparse
  11. import tornado.httpclient
  12. import tornado.web
  13. from openapi_core import V30RequestValidator, V30ResponseValidator
  14. from openapi_core.spec.paths import Spec
  15. from openapi_core.validation.request.datatypes import RequestParameters
  16. from tornado.httpclient import HTTPRequest, HTTPResponse
  17. from werkzeug.datastructures import Headers, ImmutableMultiDict
  18. from jupyterlab_server.spec import get_openapi_spec
  19. HERE = Path(os.path.dirname(__file__)).resolve()
  20. with open(HERE / "test_data" / "app-settings" / "overrides.json", encoding="utf-8") as fid:
  21. big_unicode_string = json.load(fid)["@jupyterlab/unicode-extension:plugin"]["comment"]
  22. class TornadoOpenAPIRequest:
  23. """
  24. Converts a torando request to an OpenAPI one
  25. """
  26. def __init__(self, request: HTTPRequest, spec: Spec):
  27. """Initialize the request."""
  28. self.request = request
  29. self.spec = spec
  30. if request.url is None:
  31. msg = "Request URL is missing" # type:ignore[unreachable]
  32. raise RuntimeError(msg)
  33. self._url_parsed = urlparse(request.url)
  34. cookie: SimpleCookie = SimpleCookie()
  35. cookie.load(request.headers.get("Set-Cookie", ""))
  36. cookies = {}
  37. for key, morsel in cookie.items():
  38. cookies[key] = morsel.value
  39. # extract the path
  40. o = urlparse(request.url)
  41. # gets deduced by path finder against spec
  42. path: dict = {}
  43. self.parameters = RequestParameters(
  44. query=ImmutableMultiDict(parse_qs(o.query)),
  45. header=dict(request.headers),
  46. cookie=ImmutableMultiDict(cookies),
  47. path=path,
  48. )
  49. @property
  50. def content_type(self) -> str:
  51. return "application/json"
  52. @property
  53. def host_url(self) -> str:
  54. url = self.request.url
  55. return url[: url.index("/lab")]
  56. @property
  57. def path(self) -> str:
  58. # extract the best matching url
  59. # work around lack of support for path parameters which can contain slashes
  60. # https://github.com/OAI/OpenAPI-Specification/issues/892
  61. url = None
  62. o = urlparse(self.request.url)
  63. for path_ in self.spec["paths"]:
  64. if url:
  65. continue # type:ignore[unreachable]
  66. has_arg = "{" in path_
  67. path = path_[: path_.index("{")] if has_arg else path_
  68. if path in o.path:
  69. u = o.path[o.path.index(path) :]
  70. if not has_arg and len(u) == len(path):
  71. url = u
  72. if has_arg and not u.endswith("/"):
  73. url = u[: len(path)] + r"foo"
  74. if url is None:
  75. msg = f"Could not find matching pattern for {o.path}"
  76. raise ValueError(msg)
  77. return url
  78. @property
  79. def method(self) -> str:
  80. method = self.request.method
  81. return method and method.lower() or ""
  82. @property
  83. def body(self) -> bytes | None:
  84. if self.request.body is None:
  85. return None # type:ignore[unreachable]
  86. if not isinstance(self.request.body, bytes):
  87. msg = "Request body is invalid" # type:ignore[unreachable]
  88. raise AssertionError(msg)
  89. return self.request.body
  90. @property
  91. def mimetype(self) -> str:
  92. # Order matters because all tornado requests
  93. # include Accept */* which does not necessarily match the content type
  94. request = self.request
  95. return (
  96. request.headers.get("Content-Type")
  97. or request.headers.get("Accept")
  98. or "application/json"
  99. )
  100. class TornadoOpenAPIResponse:
  101. """A tornado open API response."""
  102. def __init__(self, response: HTTPResponse):
  103. """Initialize the response."""
  104. self.response = response
  105. @property
  106. def data(self) -> bytes | None:
  107. if not isinstance(self.response.body, bytes):
  108. msg = "Response body is invalid" # type:ignore[unreachable]
  109. raise AssertionError(msg)
  110. return self.response.body
  111. @property
  112. def status_code(self) -> int:
  113. return int(self.response.code)
  114. @property
  115. def content_type(self) -> str:
  116. return "application/json"
  117. @property
  118. def mimetype(self) -> str:
  119. return str(self.response.headers.get("Content-Type", "application/json"))
  120. @property
  121. def headers(self) -> Headers:
  122. return Headers(dict(self.response.headers))
  123. def validate_request(response: HTTPResponse) -> None:
  124. """Validate an API request"""
  125. openapi_spec = get_openapi_spec()
  126. request = TornadoOpenAPIRequest(response.request, openapi_spec)
  127. V30RequestValidator(openapi_spec).validate(request)
  128. torn_response = TornadoOpenAPIResponse(response)
  129. V30ResponseValidator(openapi_spec).validate(request, torn_response)
  130. def maybe_patch_ioloop() -> None:
  131. """a windows 3.8+ patch for the asyncio loop"""
  132. if (
  133. sys.platform.startswith("win")
  134. and tornado.version_info < (6, 1)
  135. and sys.version_info >= (3, 8)
  136. ):
  137. try:
  138. from asyncio import WindowsProactorEventLoopPolicy, WindowsSelectorEventLoopPolicy
  139. except ImportError:
  140. pass
  141. # not affected
  142. else:
  143. from asyncio import get_event_loop_policy, set_event_loop_policy
  144. if type(get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
  145. # WindowsProactorEventLoopPolicy is not compatible with tornado 6
  146. # fallback to the pre-3.8 default of Selector
  147. set_event_loop_policy(WindowsSelectorEventLoopPolicy())
  148. def expected_http_error(
  149. error: Exception, expected_code: int, expected_message: str | None = None
  150. ) -> bool:
  151. """Check that the error matches the expected output error."""
  152. e = error.value # type:ignore[attr-defined]
  153. if isinstance(e, tornado.web.HTTPError):
  154. if expected_code != e.status_code:
  155. return False
  156. if expected_message is not None and expected_message != str(e):
  157. return False
  158. return True
  159. if any(
  160. [
  161. isinstance(e, tornado.httpclient.HTTPClientError),
  162. isinstance(e, tornado.httpclient.HTTPError),
  163. ]
  164. ):
  165. if expected_code != e.code:
  166. return False
  167. if expected_message:
  168. message = json.loads(e.response.body.decode())["message"]
  169. if expected_message != message:
  170. return False
  171. return True
  172. return False