simple_httpclient.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. from tornado.escape import _unicode
  2. from tornado import gen, version
  3. from tornado.httpclient import (
  4. HTTPResponse,
  5. HTTPError,
  6. AsyncHTTPClient,
  7. main,
  8. _RequestProxy,
  9. HTTPRequest,
  10. )
  11. from tornado import httputil
  12. from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters
  13. from tornado.ioloop import IOLoop
  14. from tornado.iostream import StreamClosedError, IOStream
  15. from tornado.netutil import (
  16. Resolver,
  17. OverrideResolver,
  18. _client_ssl_defaults,
  19. is_valid_ip,
  20. )
  21. from tornado.log import gen_log
  22. from tornado.tcpclient import TCPClient
  23. import base64
  24. import collections
  25. import copy
  26. import functools
  27. import re
  28. import socket
  29. import ssl
  30. import sys
  31. import time
  32. from io import BytesIO
  33. import urllib.parse
  34. from typing import Dict, Any, Callable, Optional, Type, Union
  35. from types import TracebackType
  36. import typing
  37. if typing.TYPE_CHECKING:
  38. from typing import Deque, Tuple, List # noqa: F401
  39. class HTTPTimeoutError(HTTPError):
  40. """Error raised by SimpleAsyncHTTPClient on timeout.
  41. For historical reasons, this is a subclass of `.HTTPClientError`
  42. which simulates a response code of 599.
  43. .. versionadded:: 5.1
  44. """
  45. def __init__(self, message: str) -> None:
  46. super().__init__(599, message=message)
  47. def __str__(self) -> str:
  48. return self.message or "Timeout"
  49. class HTTPStreamClosedError(HTTPError):
  50. """Error raised by SimpleAsyncHTTPClient when the underlying stream is closed.
  51. When a more specific exception is available (such as `ConnectionResetError`),
  52. it may be raised instead of this one.
  53. For historical reasons, this is a subclass of `.HTTPClientError`
  54. which simulates a response code of 599.
  55. .. versionadded:: 5.1
  56. """
  57. def __init__(self, message: str) -> None:
  58. super().__init__(599, message=message)
  59. def __str__(self) -> str:
  60. return self.message or "Stream closed"
  61. class SimpleAsyncHTTPClient(AsyncHTTPClient):
  62. """Non-blocking HTTP client with no external dependencies.
  63. This class implements an HTTP 1.1 client on top of Tornado's IOStreams.
  64. Some features found in the curl-based AsyncHTTPClient are not yet
  65. supported. In particular, proxies are not supported, connections
  66. are not reused, and callers cannot select the network interface to be
  67. used.
  68. This implementation supports the following arguments, which can be passed
  69. to ``configure()`` to control the global singleton, or to the constructor
  70. when ``force_instance=True``.
  71. ``max_clients`` is the number of concurrent requests that can be
  72. in progress; when this limit is reached additional requests will be
  73. queued. Note that time spent waiting in this queue still counts
  74. against the ``request_timeout``.
  75. ``defaults`` is a dict of parameters that will be used as defaults on all
  76. `.HTTPRequest` objects submitted to this client.
  77. ``hostname_mapping`` is a dictionary mapping hostnames to IP addresses.
  78. It can be used to make local DNS changes when modifying system-wide
  79. settings like ``/etc/hosts`` is not possible or desirable (e.g. in
  80. unittests). ``resolver`` is similar, but using the `.Resolver` interface
  81. instead of a simple mapping.
  82. ``max_buffer_size`` (default 100MB) is the number of bytes
  83. that can be read into memory at once. ``max_body_size``
  84. (defaults to ``max_buffer_size``) is the largest response body
  85. that the client will accept. Without a
  86. ``streaming_callback``, the smaller of these two limits
  87. applies; with a ``streaming_callback`` only ``max_body_size``
  88. does.
  89. .. versionchanged:: 4.2
  90. Added the ``max_body_size`` argument.
  91. """
  92. def initialize( # type: ignore
  93. self,
  94. max_clients: int = 10,
  95. hostname_mapping: Optional[Dict[str, str]] = None,
  96. max_buffer_size: int = 104857600,
  97. resolver: Optional[Resolver] = None,
  98. defaults: Optional[Dict[str, Any]] = None,
  99. max_header_size: Optional[int] = None,
  100. max_body_size: Optional[int] = None,
  101. ) -> None:
  102. super().initialize(defaults=defaults)
  103. self.max_clients = max_clients
  104. self.queue = (
  105. collections.deque()
  106. ) # type: Deque[Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]]
  107. self.active = (
  108. {}
  109. ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]]]
  110. self.waiting = (
  111. {}
  112. ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object]]
  113. self.max_buffer_size = max_buffer_size
  114. self.max_header_size = max_header_size
  115. self.max_body_size = max_body_size
  116. # TCPClient could create a Resolver for us, but we have to do it
  117. # ourselves to support hostname_mapping.
  118. if resolver:
  119. self.resolver = resolver
  120. self.own_resolver = False
  121. else:
  122. self.resolver = Resolver()
  123. self.own_resolver = True
  124. if hostname_mapping is not None:
  125. self.resolver = OverrideResolver(
  126. resolver=self.resolver, mapping=hostname_mapping
  127. )
  128. self.tcp_client = TCPClient(resolver=self.resolver)
  129. def close(self) -> None:
  130. super().close()
  131. if self.own_resolver:
  132. self.resolver.close()
  133. self.tcp_client.close()
  134. def fetch_impl(
  135. self, request: HTTPRequest, callback: Callable[[HTTPResponse], None]
  136. ) -> None:
  137. key = object()
  138. self.queue.append((key, request, callback))
  139. assert request.connect_timeout is not None
  140. assert request.request_timeout is not None
  141. timeout_handle = None
  142. if len(self.active) >= self.max_clients:
  143. timeout = (
  144. min(request.connect_timeout, request.request_timeout)
  145. or request.connect_timeout
  146. or request.request_timeout
  147. ) # min but skip zero
  148. if timeout:
  149. timeout_handle = self.io_loop.add_timeout(
  150. self.io_loop.time() + timeout,
  151. functools.partial(self._on_timeout, key, "in request queue"),
  152. )
  153. self.waiting[key] = (request, callback, timeout_handle)
  154. self._process_queue()
  155. if self.queue:
  156. gen_log.debug(
  157. "max_clients limit reached, request queued. "
  158. "%d active, %d queued requests." % (len(self.active), len(self.queue))
  159. )
  160. def _process_queue(self) -> None:
  161. while self.queue and len(self.active) < self.max_clients:
  162. key, request, callback = self.queue.popleft()
  163. if key not in self.waiting:
  164. continue
  165. self._remove_timeout(key)
  166. self.active[key] = (request, callback)
  167. release_callback = functools.partial(self._release_fetch, key)
  168. self._handle_request(request, release_callback, callback)
  169. def _connection_class(self) -> type:
  170. return _HTTPConnection
  171. def _handle_request(
  172. self,
  173. request: HTTPRequest,
  174. release_callback: Callable[[], None],
  175. final_callback: Callable[[HTTPResponse], None],
  176. ) -> None:
  177. self._connection_class()(
  178. self,
  179. request,
  180. release_callback,
  181. final_callback,
  182. self.max_buffer_size,
  183. self.tcp_client,
  184. self.max_header_size,
  185. self.max_body_size,
  186. )
  187. def _release_fetch(self, key: object) -> None:
  188. del self.active[key]
  189. self._process_queue()
  190. def _remove_timeout(self, key: object) -> None:
  191. if key in self.waiting:
  192. request, callback, timeout_handle = self.waiting[key]
  193. if timeout_handle is not None:
  194. self.io_loop.remove_timeout(timeout_handle)
  195. del self.waiting[key]
  196. def _on_timeout(self, key: object, info: Optional[str] = None) -> None:
  197. """Timeout callback of request.
  198. Construct a timeout HTTPResponse when a timeout occurs.
  199. :arg object key: A simple object to mark the request.
  200. :info string key: More detailed timeout information.
  201. """
  202. request, callback, timeout_handle = self.waiting[key]
  203. self.queue.remove((key, request, callback))
  204. error_message = f"Timeout {info}" if info else "Timeout"
  205. timeout_response = HTTPResponse(
  206. request,
  207. 599,
  208. error=HTTPTimeoutError(error_message),
  209. request_time=self.io_loop.time() - request.start_time,
  210. )
  211. self.io_loop.add_callback(callback, timeout_response)
  212. del self.waiting[key]
  213. class _HTTPConnection(httputil.HTTPMessageDelegate):
  214. _SUPPORTED_METHODS = {"GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"}
  215. def __init__(
  216. self,
  217. client: Optional[SimpleAsyncHTTPClient],
  218. request: HTTPRequest,
  219. release_callback: Callable[[], None],
  220. final_callback: Callable[[HTTPResponse], None],
  221. max_buffer_size: int,
  222. tcp_client: TCPClient,
  223. max_header_size: int,
  224. max_body_size: int,
  225. ) -> None:
  226. self.io_loop = IOLoop.current()
  227. self.start_time = self.io_loop.time()
  228. self.start_wall_time = time.time()
  229. self.client = client
  230. self.request = request
  231. self.release_callback = release_callback
  232. self.final_callback = final_callback
  233. self.max_buffer_size = max_buffer_size
  234. self.tcp_client = tcp_client
  235. self.max_header_size = max_header_size
  236. self.max_body_size = max_body_size
  237. self.code = None # type: Optional[int]
  238. self.headers = None # type: Optional[httputil.HTTPHeaders]
  239. self.chunks = [] # type: List[bytes]
  240. self._decompressor = None
  241. # Timeout handle returned by IOLoop.add_timeout
  242. self._timeout = None # type: object
  243. self._sockaddr = None
  244. IOLoop.current().add_future(
  245. gen.convert_yielded(self.run()), lambda f: f.result()
  246. )
  247. async def run(self) -> None:
  248. try:
  249. self.parsed = urllib.parse.urlsplit(_unicode(self.request.url))
  250. if self.parsed.scheme not in ("http", "https"):
  251. raise ValueError("Unsupported url scheme: %s" % self.request.url)
  252. # urlsplit results have hostname and port results, but they
  253. # didn't support ipv6 literals until python 2.7.
  254. netloc = self.parsed.netloc
  255. if "@" in netloc:
  256. userpass, _, netloc = netloc.rpartition("@")
  257. host, port = httputil.split_host_and_port(netloc)
  258. if port is None:
  259. port = 443 if self.parsed.scheme == "https" else 80
  260. if re.match(r"^\[.*\]$", host):
  261. # raw ipv6 addresses in urls are enclosed in brackets
  262. host = host[1:-1]
  263. self.parsed_hostname = host # save final host for _on_connect
  264. if self.request.allow_ipv6 is False:
  265. af = socket.AF_INET
  266. else:
  267. af = socket.AF_UNSPEC
  268. ssl_options = self._get_ssl_options(self.parsed.scheme)
  269. source_ip = None
  270. if self.request.network_interface:
  271. if is_valid_ip(self.request.network_interface):
  272. source_ip = self.request.network_interface
  273. else:
  274. raise ValueError(
  275. "Unrecognized IPv4 or IPv6 address for network_interface, got %r"
  276. % (self.request.network_interface,)
  277. )
  278. if self.request.connect_timeout and self.request.request_timeout:
  279. timeout = min(
  280. self.request.connect_timeout, self.request.request_timeout
  281. )
  282. elif self.request.connect_timeout:
  283. timeout = self.request.connect_timeout
  284. elif self.request.request_timeout:
  285. timeout = self.request.request_timeout
  286. else:
  287. timeout = 0
  288. if timeout:
  289. self._timeout = self.io_loop.add_timeout(
  290. self.start_time + timeout,
  291. functools.partial(self._on_timeout, "while connecting"),
  292. )
  293. stream = await self.tcp_client.connect(
  294. host,
  295. port,
  296. af=af,
  297. ssl_options=ssl_options,
  298. max_buffer_size=self.max_buffer_size,
  299. source_ip=source_ip,
  300. )
  301. if self.final_callback is None:
  302. # final_callback is cleared if we've hit our timeout.
  303. stream.close()
  304. return
  305. self.stream = stream
  306. self.stream.set_close_callback(self.on_connection_close)
  307. self._remove_timeout()
  308. if self.final_callback is None:
  309. return
  310. if self.request.request_timeout:
  311. self._timeout = self.io_loop.add_timeout(
  312. self.start_time + self.request.request_timeout,
  313. functools.partial(self._on_timeout, "during request"),
  314. )
  315. if (
  316. self.request.method not in self._SUPPORTED_METHODS
  317. and not self.request.allow_nonstandard_methods
  318. ):
  319. raise KeyError("unknown method %s" % self.request.method)
  320. for key in (
  321. "proxy_host",
  322. "proxy_port",
  323. "proxy_username",
  324. "proxy_password",
  325. "proxy_auth_mode",
  326. ):
  327. if getattr(self.request, key, None):
  328. raise NotImplementedError("%s not supported" % key)
  329. if "Connection" not in self.request.headers:
  330. self.request.headers["Connection"] = "close"
  331. if "Host" not in self.request.headers:
  332. if "@" in self.parsed.netloc:
  333. self.request.headers["Host"] = self.parsed.netloc.rpartition("@")[
  334. -1
  335. ]
  336. else:
  337. self.request.headers["Host"] = self.parsed.netloc
  338. username, password = None, None
  339. if self.parsed.username is not None:
  340. username, password = self.parsed.username, self.parsed.password
  341. elif self.request.auth_username is not None:
  342. username = self.request.auth_username
  343. password = self.request.auth_password or ""
  344. if username is not None:
  345. assert password is not None
  346. if self.request.auth_mode not in (None, "basic"):
  347. raise ValueError("unsupported auth_mode %s", self.request.auth_mode)
  348. self.request.headers["Authorization"] = "Basic " + _unicode(
  349. base64.b64encode(
  350. httputil.encode_username_password(username, password)
  351. )
  352. )
  353. if self.request.user_agent:
  354. self.request.headers["User-Agent"] = self.request.user_agent
  355. elif self.request.headers.get("User-Agent") is None:
  356. self.request.headers["User-Agent"] = f"Tornado/{version}"
  357. if not self.request.allow_nonstandard_methods:
  358. # Some HTTP methods nearly always have bodies while others
  359. # almost never do. Fail in this case unless the user has
  360. # opted out of sanity checks with allow_nonstandard_methods.
  361. body_expected = self.request.method in ("POST", "PATCH", "PUT")
  362. body_present = (
  363. self.request.body is not None
  364. or self.request.body_producer is not None
  365. )
  366. if (body_expected and not body_present) or (
  367. body_present and not body_expected
  368. ):
  369. raise ValueError(
  370. "Body must %sbe None for method %s (unless "
  371. "allow_nonstandard_methods is true)"
  372. % ("not " if body_expected else "", self.request.method)
  373. )
  374. if self.request.expect_100_continue:
  375. self.request.headers["Expect"] = "100-continue"
  376. if self.request.body is not None:
  377. # When body_producer is used the caller is responsible for
  378. # setting Content-Length (or else chunked encoding will be used).
  379. self.request.headers["Content-Length"] = str(len(self.request.body))
  380. if (
  381. self.request.method == "POST"
  382. and "Content-Type" not in self.request.headers
  383. ):
  384. self.request.headers["Content-Type"] = (
  385. "application/x-www-form-urlencoded"
  386. )
  387. if self.request.decompress_response:
  388. self.request.headers["Accept-Encoding"] = "gzip"
  389. req_path = (self.parsed.path or "/") + (
  390. ("?" + self.parsed.query) if self.parsed.query else ""
  391. )
  392. self.connection = self._create_connection(stream)
  393. start_line = httputil.RequestStartLine(self.request.method, req_path, "")
  394. self.connection.write_headers(start_line, self.request.headers)
  395. if self.request.expect_100_continue:
  396. await self.connection.read_response(self)
  397. else:
  398. await self._write_body(True)
  399. except Exception:
  400. if not self._handle_exception(*sys.exc_info()):
  401. raise
  402. def _get_ssl_options(
  403. self, scheme: str
  404. ) -> Union[None, Dict[str, Any], ssl.SSLContext]:
  405. if scheme == "https":
  406. if self.request.ssl_options is not None:
  407. return self.request.ssl_options
  408. # If we are using the defaults, don't construct a
  409. # new SSLContext.
  410. if (
  411. self.request.validate_cert
  412. and self.request.ca_certs is None
  413. and self.request.client_cert is None
  414. and self.request.client_key is None
  415. ):
  416. return _client_ssl_defaults
  417. ssl_ctx = ssl.create_default_context(
  418. ssl.Purpose.SERVER_AUTH, cafile=self.request.ca_certs
  419. )
  420. if not self.request.validate_cert:
  421. ssl_ctx.check_hostname = False
  422. ssl_ctx.verify_mode = ssl.CERT_NONE
  423. if self.request.client_cert is not None:
  424. ssl_ctx.load_cert_chain(
  425. self.request.client_cert, self.request.client_key
  426. )
  427. if hasattr(ssl, "OP_NO_COMPRESSION"):
  428. # See netutil.ssl_options_to_context
  429. ssl_ctx.options |= ssl.OP_NO_COMPRESSION
  430. return ssl_ctx
  431. return None
  432. def _on_timeout(self, info: Optional[str] = None) -> None:
  433. """Timeout callback of _HTTPConnection instance.
  434. Raise a `HTTPTimeoutError` when a timeout occurs.
  435. :info string key: More detailed timeout information.
  436. """
  437. self._timeout = None
  438. error_message = f"Timeout {info}" if info else "Timeout"
  439. if self.final_callback is not None:
  440. self._handle_exception(
  441. HTTPTimeoutError, HTTPTimeoutError(error_message), None
  442. )
  443. def _remove_timeout(self) -> None:
  444. if self._timeout is not None:
  445. self.io_loop.remove_timeout(self._timeout)
  446. self._timeout = None
  447. def _create_connection(self, stream: IOStream) -> HTTP1Connection:
  448. stream.set_nodelay(True)
  449. connection = HTTP1Connection(
  450. stream,
  451. True,
  452. HTTP1ConnectionParameters(
  453. no_keep_alive=True,
  454. max_header_size=self.max_header_size,
  455. max_body_size=self.max_body_size,
  456. decompress=bool(self.request.decompress_response),
  457. ),
  458. self._sockaddr,
  459. )
  460. return connection
  461. async def _write_body(self, start_read: bool) -> None:
  462. if self.request.body is not None:
  463. self.connection.write(self.request.body)
  464. elif self.request.body_producer is not None:
  465. fut = self.request.body_producer(self.connection.write)
  466. if fut is not None:
  467. await fut
  468. self.connection.finish()
  469. if start_read:
  470. try:
  471. await self.connection.read_response(self)
  472. except StreamClosedError:
  473. if not self._handle_exception(*sys.exc_info()):
  474. raise
  475. def _release(self) -> None:
  476. if self.release_callback is not None:
  477. release_callback = self.release_callback
  478. self.release_callback = None # type: ignore
  479. release_callback()
  480. def _run_callback(self, response: HTTPResponse) -> None:
  481. self._release()
  482. if self.final_callback is not None:
  483. final_callback = self.final_callback
  484. self.final_callback = None # type: ignore
  485. self.io_loop.add_callback(final_callback, response)
  486. def _handle_exception(
  487. self,
  488. typ: "Optional[Type[BaseException]]",
  489. value: Optional[BaseException],
  490. tb: Optional[TracebackType],
  491. ) -> bool:
  492. if self.final_callback is not None:
  493. self._remove_timeout()
  494. if isinstance(value, StreamClosedError):
  495. if value.real_error is None:
  496. value = HTTPStreamClosedError("Stream closed")
  497. else:
  498. value = value.real_error
  499. self._run_callback(
  500. HTTPResponse(
  501. self.request,
  502. 599,
  503. error=value,
  504. request_time=self.io_loop.time() - self.start_time,
  505. start_time=self.start_wall_time,
  506. )
  507. )
  508. if hasattr(self, "stream"):
  509. # TODO: this may cause a StreamClosedError to be raised
  510. # by the connection's Future. Should we cancel the
  511. # connection more gracefully?
  512. self.stream.close()
  513. return True
  514. else:
  515. # If our callback has already been called, we are probably
  516. # catching an exception that is not caused by us but rather
  517. # some child of our callback. Rather than drop it on the floor,
  518. # pass it along, unless it's just the stream being closed.
  519. return isinstance(value, StreamClosedError)
  520. def on_connection_close(self) -> None:
  521. if self.final_callback is not None:
  522. message = "Connection closed"
  523. if self.stream.error:
  524. raise self.stream.error
  525. try:
  526. raise HTTPStreamClosedError(message)
  527. except HTTPStreamClosedError:
  528. self._handle_exception(*sys.exc_info())
  529. async def headers_received(
  530. self,
  531. first_line: Union[httputil.ResponseStartLine, httputil.RequestStartLine],
  532. headers: httputil.HTTPHeaders,
  533. ) -> None:
  534. assert isinstance(first_line, httputil.ResponseStartLine)
  535. if self.request.expect_100_continue and first_line.code == 100:
  536. await self._write_body(False)
  537. return
  538. self.code = first_line.code
  539. self.reason = first_line.reason
  540. self.headers = headers
  541. if self._should_follow_redirect():
  542. return
  543. if self.request.header_callback is not None:
  544. # Reassemble the start line.
  545. self.request.header_callback("%s %s %s\r\n" % first_line)
  546. for k, v in self.headers.get_all():
  547. self.request.header_callback(f"{k}: {v}\r\n")
  548. self.request.header_callback("\r\n")
  549. def _should_follow_redirect(self) -> bool:
  550. if self.request.follow_redirects:
  551. assert self.request.max_redirects is not None
  552. return (
  553. self.code in (301, 302, 303, 307, 308)
  554. and self.request.max_redirects > 0
  555. and self.headers is not None
  556. and self.headers.get("Location") is not None
  557. )
  558. return False
  559. def finish(self) -> None:
  560. assert self.code is not None
  561. data = b"".join(self.chunks)
  562. self._remove_timeout()
  563. original_request = getattr(self.request, "original_request", self.request)
  564. if self._should_follow_redirect():
  565. assert isinstance(self.request, _RequestProxy)
  566. assert self.headers is not None
  567. new_request = copy.copy(self.request.request)
  568. new_request.url = urllib.parse.urljoin(
  569. self.request.url, self.headers["Location"]
  570. )
  571. assert self.request.max_redirects is not None
  572. new_request.max_redirects = self.request.max_redirects - 1
  573. del new_request.headers["Host"]
  574. # https://tools.ietf.org/html/rfc7231#section-6.4
  575. #
  576. # The original HTTP spec said that after a 301 or 302
  577. # redirect, the request method should be preserved.
  578. # However, browsers implemented this by changing the
  579. # method to GET, and the behavior stuck. 303 redirects
  580. # always specified this POST-to-GET behavior, arguably
  581. # for *all* methods, but libcurl < 7.70 only does this
  582. # for POST, while libcurl >= 7.70 does it for other methods.
  583. if (self.code == 303 and self.request.method != "HEAD") or (
  584. self.code in (301, 302) and self.request.method == "POST"
  585. ):
  586. new_request.method = "GET"
  587. new_request.body = None # type: ignore
  588. for h in [
  589. "Content-Length",
  590. "Content-Type",
  591. "Content-Encoding",
  592. "Transfer-Encoding",
  593. ]:
  594. try:
  595. del self.request.headers[h]
  596. except KeyError:
  597. pass
  598. new_request.original_request = original_request # type: ignore
  599. final_callback = self.final_callback
  600. self.final_callback = None # type: ignore
  601. self._release()
  602. assert self.client is not None
  603. fut = self.client.fetch(new_request, raise_error=False)
  604. fut.add_done_callback(lambda f: final_callback(f.result()))
  605. self._on_end_request()
  606. return
  607. if self.request.streaming_callback:
  608. buffer = BytesIO()
  609. else:
  610. buffer = BytesIO(data) # TODO: don't require one big string?
  611. response = HTTPResponse(
  612. original_request,
  613. self.code,
  614. reason=getattr(self, "reason", None),
  615. headers=self.headers,
  616. request_time=self.io_loop.time() - self.start_time,
  617. start_time=self.start_wall_time,
  618. buffer=buffer,
  619. effective_url=self.request.url,
  620. )
  621. self._run_callback(response)
  622. self._on_end_request()
  623. def _on_end_request(self) -> None:
  624. self.stream.close()
  625. def data_received(self, chunk: bytes) -> None:
  626. if self._should_follow_redirect():
  627. # We're going to follow a redirect so just discard the body.
  628. return
  629. if self.request.streaming_callback is not None:
  630. self.request.streaming_callback(chunk)
  631. else:
  632. self.chunks.append(chunk)
  633. if __name__ == "__main__":
  634. AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
  635. main()