curl_httpclient_test.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. from hashlib import md5
  2. import unittest
  3. from tornado.escape import utf8
  4. from tornado.testing import AsyncHTTPTestCase
  5. from tornado.test import httpclient_test
  6. from tornado.web import Application, RequestHandler
  7. try:
  8. import pycurl
  9. except ImportError:
  10. pycurl = None # type: ignore
  11. if pycurl is not None:
  12. from tornado.curl_httpclient import CurlAsyncHTTPClient
  13. @unittest.skipIf(pycurl is None, "pycurl module not present")
  14. class CurlHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase):
  15. def get_http_client(self):
  16. client = CurlAsyncHTTPClient(defaults=dict(allow_ipv6=False))
  17. # make sure AsyncHTTPClient magic doesn't give us the wrong class
  18. self.assertTrue(isinstance(client, CurlAsyncHTTPClient))
  19. return client
  20. class DigestAuthHandler(RequestHandler):
  21. def initialize(self, username, password):
  22. self.username = username
  23. self.password = password
  24. def get(self):
  25. realm = "test"
  26. opaque = "asdf"
  27. # Real implementations would use a random nonce.
  28. nonce = "1234"
  29. auth_header = self.request.headers.get("Authorization", None)
  30. if auth_header is not None:
  31. auth_mode, params = auth_header.split(" ", 1)
  32. assert auth_mode == "Digest"
  33. param_dict = {}
  34. for pair in params.split(","):
  35. k, v = pair.strip().split("=", 1)
  36. if v[0] == '"' and v[-1] == '"':
  37. v = v[1:-1]
  38. param_dict[k] = v
  39. assert param_dict["realm"] == realm
  40. assert param_dict["opaque"] == opaque
  41. assert param_dict["nonce"] == nonce
  42. assert param_dict["username"] == self.username
  43. assert param_dict["uri"] == self.request.path
  44. h1 = md5(utf8(f"{self.username}:{realm}:{self.password}")).hexdigest()
  45. h2 = md5(utf8(f"{self.request.method}:{self.request.path}")).hexdigest()
  46. digest = md5(utf8(f"{h1}:{nonce}:{h2}")).hexdigest()
  47. if digest == param_dict["response"]:
  48. self.write("ok")
  49. else:
  50. self.write("fail")
  51. else:
  52. self.set_status(401)
  53. self.set_header(
  54. "WWW-Authenticate",
  55. f'Digest realm="{realm}", nonce="{nonce}", opaque="{opaque}"',
  56. )
  57. class CustomReasonHandler(RequestHandler):
  58. def get(self):
  59. self.set_status(200, "Custom reason")
  60. class CustomFailReasonHandler(RequestHandler):
  61. def get(self):
  62. self.set_status(400, "Custom reason")
  63. @unittest.skipIf(pycurl is None, "pycurl module not present")
  64. class CurlHTTPClientTestCase(AsyncHTTPTestCase):
  65. def setUp(self):
  66. super().setUp()
  67. self.http_client = self.create_client()
  68. def get_app(self):
  69. return Application(
  70. [
  71. ("/digest", DigestAuthHandler, {"username": "foo", "password": "bar"}),
  72. (
  73. "/digest_non_ascii",
  74. DigestAuthHandler,
  75. {"username": "foo", "password": "barユ£"},
  76. ),
  77. ("/custom_reason", CustomReasonHandler),
  78. ("/custom_fail_reason", CustomFailReasonHandler),
  79. ]
  80. )
  81. def create_client(self, **kwargs):
  82. return CurlAsyncHTTPClient(
  83. force_instance=True, defaults=dict(allow_ipv6=False), **kwargs
  84. )
  85. def test_digest_auth(self):
  86. response = self.fetch(
  87. "/digest", auth_mode="digest", auth_username="foo", auth_password="bar"
  88. )
  89. self.assertEqual(response.body, b"ok")
  90. def test_custom_reason(self):
  91. response = self.fetch("/custom_reason")
  92. self.assertEqual(response.reason, "Custom reason")
  93. def test_fail_custom_reason(self):
  94. response = self.fetch("/custom_fail_reason")
  95. self.assertEqual(str(response.error), "HTTP 400: Custom reason")
  96. def test_digest_auth_non_ascii(self):
  97. response = self.fetch(
  98. "/digest_non_ascii",
  99. auth_mode="digest",
  100. auth_username="foo",
  101. auth_password="barユ£",
  102. )
  103. self.assertEqual(response.body, b"ok")