web_test.py 123 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417
  1. import http
  2. from tornado.concurrent import Future
  3. from tornado import gen
  4. from tornado.escape import (
  5. json_decode,
  6. utf8,
  7. to_unicode,
  8. recursive_unicode,
  9. native_str,
  10. to_basestring,
  11. )
  12. from tornado.httpclient import HTTPClientError
  13. from tornado.httputil import format_timestamp
  14. from tornado.iostream import IOStream
  15. from tornado import locale
  16. from tornado.locks import Event
  17. from tornado.log import app_log, gen_log
  18. from tornado.simple_httpclient import SimpleAsyncHTTPClient
  19. from tornado.template import DictLoader
  20. from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
  21. from tornado.test.util import ignore_deprecation
  22. from tornado.util import ObjectDict, unicode_type
  23. from tornado.web import (
  24. Application,
  25. RequestHandler,
  26. StaticFileHandler,
  27. RedirectHandler as WebRedirectHandler,
  28. HTTPError,
  29. MissingArgumentError,
  30. ErrorHandler,
  31. authenticated,
  32. url,
  33. _create_signature_v1,
  34. create_signed_value,
  35. decode_signed_value,
  36. get_signature_key_version,
  37. UIModule,
  38. Finish,
  39. stream_request_body,
  40. removeslash,
  41. addslash,
  42. GZipContentEncoding,
  43. )
  44. import binascii
  45. import contextlib
  46. import copy
  47. import datetime
  48. import email.utils
  49. import gzip
  50. from io import BytesIO
  51. import itertools
  52. import logging
  53. import os
  54. import re
  55. import socket
  56. import typing # noqa: F401
  57. import unittest
  58. import urllib.parse
  59. def relpath(*a):
  60. return os.path.join(os.path.dirname(__file__), *a)
  61. class WebTestCase(AsyncHTTPTestCase):
  62. """Base class for web tests that also supports WSGI mode.
  63. Override get_handlers and get_app_kwargs instead of get_app.
  64. This class is deprecated since WSGI mode is no longer supported.
  65. """
  66. def get_app(self):
  67. self.app = Application(self.get_handlers(), **self.get_app_kwargs())
  68. return self.app
  69. def get_handlers(self):
  70. raise NotImplementedError()
  71. def get_app_kwargs(self):
  72. return {}
  73. class SimpleHandlerTestCase(WebTestCase):
  74. """Simplified base class for tests that work with a single handler class.
  75. To use, define a nested class named ``Handler``.
  76. """
  77. Handler = None
  78. def get_handlers(self):
  79. return [("/", self.Handler)]
  80. class HelloHandler(RequestHandler):
  81. def get(self):
  82. self.write("hello")
  83. class CookieTestRequestHandler(RequestHandler):
  84. # stub out enough methods to make the signed_cookie functions work
  85. def __init__(self, cookie_secret="0123456789", key_version=None):
  86. # don't call super.__init__
  87. self._cookies = {} # type: typing.Dict[str, bytes]
  88. if key_version is None:
  89. self.application = ObjectDict( # type: ignore
  90. settings=dict(cookie_secret=cookie_secret)
  91. )
  92. else:
  93. self.application = ObjectDict( # type: ignore
  94. settings=dict(cookie_secret=cookie_secret, key_version=key_version)
  95. )
  96. def get_cookie(self, name):
  97. return self._cookies.get(name)
  98. def set_cookie(self, name, value, expires_days=None):
  99. self._cookies[name] = value
  100. # See SignedValueTest below for more.
  101. class SecureCookieV1Test(unittest.TestCase):
  102. def test_round_trip(self):
  103. handler = CookieTestRequestHandler()
  104. handler.set_signed_cookie("foo", b"bar", version=1)
  105. self.assertEqual(handler.get_signed_cookie("foo", min_version=1), b"bar")
  106. def test_cookie_tampering_future_timestamp(self):
  107. handler = CookieTestRequestHandler()
  108. # this string base64-encodes to '12345678'
  109. handler.set_signed_cookie("foo", binascii.a2b_hex(b"d76df8e7aefc"), version=1)
  110. cookie = handler._cookies["foo"]
  111. match = re.match(rb"12345678\|([0-9]+)\|([0-9a-f]+)", cookie)
  112. self.assertIsNotNone(match)
  113. assert match is not None # for mypy
  114. timestamp = match.group(1)
  115. sig = match.group(2)
  116. self.assertEqual(
  117. _create_signature_v1(
  118. handler.application.settings["cookie_secret"],
  119. "foo",
  120. "12345678",
  121. timestamp,
  122. ),
  123. sig,
  124. )
  125. # shifting digits from payload to timestamp doesn't alter signature
  126. # (this is not desirable behavior, just confirming that that's how it
  127. # works)
  128. self.assertEqual(
  129. _create_signature_v1(
  130. handler.application.settings["cookie_secret"],
  131. "foo",
  132. "1234",
  133. b"5678" + timestamp,
  134. ),
  135. sig,
  136. )
  137. # tamper with the cookie
  138. handler._cookies["foo"] = utf8(
  139. f"1234|5678{to_basestring(timestamp)}|{to_basestring(sig)}"
  140. )
  141. # it gets rejected
  142. with ExpectLog(gen_log, "Cookie timestamp in future"):
  143. self.assertIsNone(handler.get_signed_cookie("foo", min_version=1))
  144. def test_arbitrary_bytes(self):
  145. # Secure cookies accept arbitrary data (which is base64 encoded).
  146. # Note that normal cookies accept only a subset of ascii.
  147. handler = CookieTestRequestHandler()
  148. handler.set_signed_cookie("foo", b"\xe9", version=1)
  149. self.assertEqual(handler.get_signed_cookie("foo", min_version=1), b"\xe9")
  150. # See SignedValueTest below for more.
  151. class SecureCookieV2Test(unittest.TestCase):
  152. KEY_VERSIONS = {0: "ajklasdf0ojaisdf", 1: "aslkjasaolwkjsdf"}
  153. def test_round_trip(self):
  154. handler = CookieTestRequestHandler()
  155. handler.set_signed_cookie("foo", b"bar", version=2)
  156. self.assertEqual(handler.get_signed_cookie("foo", min_version=2), b"bar")
  157. def test_key_version_roundtrip(self):
  158. handler = CookieTestRequestHandler(
  159. cookie_secret=self.KEY_VERSIONS, key_version=0
  160. )
  161. handler.set_signed_cookie("foo", b"bar")
  162. self.assertEqual(handler.get_signed_cookie("foo"), b"bar")
  163. def test_key_version_roundtrip_differing_version(self):
  164. handler = CookieTestRequestHandler(
  165. cookie_secret=self.KEY_VERSIONS, key_version=1
  166. )
  167. handler.set_signed_cookie("foo", b"bar")
  168. self.assertEqual(handler.get_signed_cookie("foo"), b"bar")
  169. def test_key_version_increment_version(self):
  170. handler = CookieTestRequestHandler(
  171. cookie_secret=self.KEY_VERSIONS, key_version=0
  172. )
  173. handler.set_signed_cookie("foo", b"bar")
  174. new_handler = CookieTestRequestHandler(
  175. cookie_secret=self.KEY_VERSIONS, key_version=1
  176. )
  177. new_handler._cookies = handler._cookies
  178. self.assertEqual(new_handler.get_signed_cookie("foo"), b"bar")
  179. def test_key_version_invalidate_version(self):
  180. handler = CookieTestRequestHandler(
  181. cookie_secret=self.KEY_VERSIONS, key_version=0
  182. )
  183. handler.set_signed_cookie("foo", b"bar")
  184. new_key_versions = self.KEY_VERSIONS.copy()
  185. new_key_versions.pop(0)
  186. new_handler = CookieTestRequestHandler(
  187. cookie_secret=new_key_versions, key_version=1
  188. )
  189. new_handler._cookies = handler._cookies
  190. self.assertEqual(new_handler.get_signed_cookie("foo"), None)
  191. class FinalReturnTest(WebTestCase):
  192. final_return = None # type: Future
  193. def get_handlers(self):
  194. test = self
  195. class FinishHandler(RequestHandler):
  196. @gen.coroutine
  197. def get(self):
  198. test.final_return = self.finish()
  199. yield test.final_return
  200. @gen.coroutine
  201. def post(self):
  202. self.write("hello,")
  203. yield self.flush()
  204. test.final_return = self.finish("world")
  205. yield test.final_return
  206. class RenderHandler(RequestHandler):
  207. def create_template_loader(self, path):
  208. return DictLoader({"foo.html": "hi"})
  209. @gen.coroutine
  210. def get(self):
  211. test.final_return = self.render("foo.html")
  212. return [("/finish", FinishHandler), ("/render", RenderHandler)]
  213. def get_app_kwargs(self):
  214. return dict(template_path="FinalReturnTest")
  215. def test_finish_method_return_future(self):
  216. response = self.fetch(self.get_url("/finish"))
  217. self.assertEqual(response.code, 200)
  218. self.assertIsInstance(self.final_return, Future)
  219. self.assertTrue(self.final_return.done())
  220. response = self.fetch(self.get_url("/finish"), method="POST", body=b"")
  221. self.assertEqual(response.code, 200)
  222. self.assertIsInstance(self.final_return, Future)
  223. self.assertTrue(self.final_return.done())
  224. def test_render_method_return_future(self):
  225. response = self.fetch(self.get_url("/render"))
  226. self.assertEqual(response.code, 200)
  227. self.assertIsInstance(self.final_return, Future)
  228. class CookieTest(WebTestCase):
  229. def get_handlers(self):
  230. class SetCookieHandler(RequestHandler):
  231. def get(self):
  232. # Try setting cookies with different argument types
  233. # to ensure that everything gets encoded correctly
  234. self.set_cookie("str", "asdf")
  235. self.set_cookie("unicode", "qwer")
  236. self.set_cookie("bytes", b"zxcv")
  237. class GetCookieHandler(RequestHandler):
  238. def get(self):
  239. cookie = self.get_cookie("foo", "default")
  240. assert cookie is not None
  241. self.write(cookie)
  242. class SetCookieDomainHandler(RequestHandler):
  243. def get(self):
  244. # unicode domain and path arguments shouldn't break things
  245. # either (see bug #285)
  246. self.set_cookie("unicode_args", "blah", domain="foo.com", path="/foo")
  247. class SetCookieSpecialCharHandler(RequestHandler):
  248. # "Special" characters are allowed in cookie values, but trigger special quoting.
  249. def get(self):
  250. self.set_cookie("equals", "a=b")
  251. self.set_cookie("semicolon", "a;b")
  252. self.set_cookie("quote", 'a"b')
  253. class SetCookieForbiddenCharHandler(RequestHandler):
  254. def get(self):
  255. # Control characters and semicolons raise errors in cookie names and attributes
  256. # (but not values, which are tested in SetCookieSpecialCharHandler)
  257. for char in list(map(chr, range(0x20))) + [chr(0x7F), ";"]:
  258. try:
  259. self.set_cookie("foo" + char, "bar")
  260. self.write(
  261. "Didn't get expected exception for char %r in name\n" % char
  262. )
  263. except http.cookies.CookieError as e:
  264. if "Invalid cookie attribute name" not in str(e):
  265. self.write(
  266. "unexpected exception for char %r in name: %s\n"
  267. % (char, e)
  268. )
  269. try:
  270. self.set_cookie("foo", "bar", domain="example" + char + ".com")
  271. self.write(
  272. "Didn't get expected exception for char %r in domain\n"
  273. % char
  274. )
  275. except http.cookies.CookieError as e:
  276. if "Invalid cookie attribute domain" not in str(e):
  277. self.write(
  278. "unexpected exception for char %r in domain: %s\n"
  279. % (char, e)
  280. )
  281. try:
  282. self.set_cookie("foo", "bar", path="/" + char)
  283. self.write(
  284. "Didn't get expected exception for char %r in path\n" % char
  285. )
  286. except http.cookies.CookieError as e:
  287. if "Invalid cookie attribute path" not in str(e):
  288. self.write(
  289. "unexpected exception for char %r in path: %s\n"
  290. % (char, e)
  291. )
  292. try:
  293. self.set_cookie("foo", "bar", samesite="a" + char)
  294. self.write(
  295. "Didn't get expected exception for char %r in samesite\n"
  296. % char
  297. )
  298. except http.cookies.CookieError as e:
  299. if "Invalid cookie attribute samesite" not in str(e):
  300. self.write(
  301. "unexpected exception for char %r in samesite: %s\n"
  302. % (char, e)
  303. )
  304. class SetCookieOverwriteHandler(RequestHandler):
  305. def get(self):
  306. self.set_cookie("a", "b", domain="example.com")
  307. self.set_cookie("c", "d", domain="example.com")
  308. # A second call with the same name clobbers the first.
  309. # Attributes from the first call are not carried over.
  310. self.set_cookie("a", "e")
  311. class SetCookieMaxAgeHandler(RequestHandler):
  312. def get(self):
  313. self.set_cookie("foo", "bar", max_age=10)
  314. class SetCookieExpiresDaysHandler(RequestHandler):
  315. def get(self):
  316. self.set_cookie("foo", "bar", expires_days=10)
  317. class SetCookieFalsyFlags(RequestHandler):
  318. def get(self):
  319. self.set_cookie("a", "1", secure=True)
  320. self.set_cookie("b", "1", secure=False)
  321. self.set_cookie("c", "1", httponly=True)
  322. self.set_cookie("d", "1", httponly=False)
  323. class SetCookieDeprecatedArgs(RequestHandler):
  324. def get(self):
  325. # Mixed case is supported, but deprecated
  326. self.set_cookie("a", "b", HttpOnly=True, pATH="/foo")
  327. return [
  328. ("/set", SetCookieHandler),
  329. ("/get", GetCookieHandler),
  330. ("/set_domain", SetCookieDomainHandler),
  331. ("/special_char", SetCookieSpecialCharHandler),
  332. ("/forbidden_char", SetCookieForbiddenCharHandler),
  333. ("/set_overwrite", SetCookieOverwriteHandler),
  334. ("/set_max_age", SetCookieMaxAgeHandler),
  335. ("/set_expires_days", SetCookieExpiresDaysHandler),
  336. ("/set_falsy_flags", SetCookieFalsyFlags),
  337. ("/set_deprecated", SetCookieDeprecatedArgs),
  338. ]
  339. def test_set_cookie(self):
  340. response = self.fetch("/set")
  341. self.assertEqual(
  342. sorted(response.headers.get_list("Set-Cookie")),
  343. ["bytes=zxcv; Path=/", "str=asdf; Path=/", "unicode=qwer; Path=/"],
  344. )
  345. def test_get_cookie(self):
  346. response = self.fetch("/get", headers={"Cookie": "foo=bar"})
  347. self.assertEqual(response.body, b"bar")
  348. response = self.fetch("/get", headers={"Cookie": 'foo="bar"'})
  349. self.assertEqual(response.body, b"bar")
  350. response = self.fetch("/get", headers={"Cookie": "/=exception;"})
  351. self.assertEqual(response.body, b"default")
  352. def test_set_cookie_domain(self):
  353. response = self.fetch("/set_domain")
  354. self.assertEqual(
  355. response.headers.get_list("Set-Cookie"),
  356. ["unicode_args=blah; Domain=foo.com; Path=/foo"],
  357. )
  358. def test_cookie_special_char(self):
  359. response = self.fetch("/special_char")
  360. headers = sorted(response.headers.get_list("Set-Cookie"))
  361. self.assertEqual(len(headers), 3)
  362. self.assertEqual(headers[0], 'equals="a=b"; Path=/')
  363. self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
  364. # Semicolons are octal-escaped
  365. self.assertIn(
  366. headers[2],
  367. ('semicolon="a;b"; Path=/', 'semicolon="a\\073b"; Path=/'),
  368. headers[2],
  369. )
  370. data = [
  371. ("foo=a=b", "a=b"),
  372. ('foo="a=b"', "a=b"),
  373. ('foo="a;b"', '"a'), # even quoted, ";" is a delimiter
  374. ("foo=a\\073b", "a\\073b"), # escapes only decoded in quotes
  375. ('foo="a\\073b"', "a;b"),
  376. ('foo="a\\"b"', 'a"b'),
  377. ]
  378. for header, expected in data:
  379. logging.debug("trying %r", header)
  380. response = self.fetch("/get", headers={"Cookie": header})
  381. self.assertEqual(response.body, utf8(expected))
  382. def test_set_cookie_forbidden_char(self):
  383. response = self.fetch("/forbidden_char")
  384. self.assertEqual(response.code, 200)
  385. self.maxDiff = 10000
  386. self.assertMultiLineEqual(to_unicode(response.body), "")
  387. def test_set_cookie_overwrite(self):
  388. response = self.fetch("/set_overwrite")
  389. headers = response.headers.get_list("Set-Cookie")
  390. self.assertEqual(
  391. sorted(headers), ["a=e; Path=/", "c=d; Domain=example.com; Path=/"]
  392. )
  393. def test_set_cookie_max_age(self):
  394. response = self.fetch("/set_max_age")
  395. headers = response.headers.get_list("Set-Cookie")
  396. self.assertEqual(sorted(headers), ["foo=bar; Max-Age=10; Path=/"])
  397. def test_set_cookie_expires_days(self):
  398. response = self.fetch("/set_expires_days")
  399. header = response.headers.get("Set-Cookie")
  400. self.assertIsNotNone(header)
  401. assert header is not None # for mypy
  402. match = re.match("foo=bar; expires=(?P<expires>.+); Path=/", header)
  403. self.assertIsNotNone(match)
  404. assert match is not None # for mypy
  405. expires = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
  406. days=10
  407. )
  408. header_expires = email.utils.parsedate_to_datetime(match.groupdict()["expires"])
  409. self.assertLess(abs((expires - header_expires).total_seconds()), 10)
  410. def test_set_cookie_false_flags(self):
  411. response = self.fetch("/set_falsy_flags")
  412. headers = sorted(response.headers.get_list("Set-Cookie"))
  413. self.assertEqual(headers[0], "a=1; Path=/; Secure")
  414. self.assertEqual(headers[1], "b=1; Path=/")
  415. self.assertEqual(headers[2], "c=1; HttpOnly; Path=/")
  416. self.assertEqual(headers[3], "d=1; Path=/")
  417. def test_set_cookie_deprecated(self):
  418. with ignore_deprecation():
  419. response = self.fetch("/set_deprecated")
  420. header = response.headers.get("Set-Cookie")
  421. self.assertEqual(header, "a=b; HttpOnly; Path=/foo")
  422. class AuthRedirectRequestHandler(RequestHandler):
  423. def initialize(self, login_url):
  424. self.login_url = login_url
  425. def get_login_url(self):
  426. return self.login_url
  427. @authenticated
  428. def get(self):
  429. # we'll never actually get here because the test doesn't follow redirects
  430. self.send_error(500)
  431. class AuthRedirectTest(WebTestCase):
  432. def get_handlers(self):
  433. return [
  434. ("/relative", AuthRedirectRequestHandler, dict(login_url="/login")),
  435. (
  436. "/absolute",
  437. AuthRedirectRequestHandler,
  438. dict(login_url="http://example.com/login"),
  439. ),
  440. ]
  441. def test_relative_auth_redirect(self):
  442. response = self.fetch(self.get_url("/relative"), follow_redirects=False)
  443. self.assertEqual(response.code, 302)
  444. self.assertEqual(response.headers["Location"], "/login?next=%2Frelative")
  445. def test_absolute_auth_redirect(self):
  446. response = self.fetch(self.get_url("/absolute"), follow_redirects=False)
  447. self.assertEqual(response.code, 302)
  448. self.assertTrue(
  449. re.match(
  450. r"http://example.com/login\?next=http%3A%2F%2F127.0.0.1%3A[0-9]+%2Fabsolute",
  451. response.headers["Location"],
  452. ),
  453. response.headers["Location"],
  454. )
  455. class ConnectionCloseHandler(RequestHandler):
  456. def initialize(self, test):
  457. self.test = test
  458. @gen.coroutine
  459. def get(self):
  460. self.test.on_handler_waiting()
  461. yield self.test.cleanup_event.wait()
  462. def on_connection_close(self):
  463. self.test.on_connection_close()
  464. class ConnectionCloseTest(WebTestCase):
  465. def get_handlers(self):
  466. self.cleanup_event = Event()
  467. return [("/", ConnectionCloseHandler, dict(test=self))]
  468. def test_connection_close(self):
  469. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  470. s.connect(("127.0.0.1", self.get_http_port()))
  471. self.stream = IOStream(s)
  472. self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
  473. self.wait()
  474. # Let the hanging coroutine clean up after itself
  475. self.cleanup_event.set()
  476. self.io_loop.run_sync(lambda: gen.sleep(0))
  477. def on_handler_waiting(self):
  478. logging.debug("handler waiting")
  479. self.stream.close()
  480. def on_connection_close(self):
  481. logging.debug("connection closed")
  482. self.stop()
  483. class EchoHandler(RequestHandler):
  484. def get(self, *path_args):
  485. # Type checks: web.py interfaces convert argument values to
  486. # unicode strings (by default, but see also decode_argument).
  487. # In httpserver.py (i.e. self.request.arguments), they're left
  488. # as bytes. Keys are always native strings.
  489. for key in self.request.arguments:
  490. if type(key) is not str:
  491. raise Exception("incorrect type for key: %r" % type(key))
  492. for bvalue in self.request.arguments[key]:
  493. if type(bvalue) is not bytes:
  494. raise Exception("incorrect type for value: %r" % type(bvalue))
  495. for svalue in self.get_arguments(key):
  496. if type(svalue) is not unicode_type:
  497. raise Exception("incorrect type for value: %r" % type(svalue))
  498. for arg in path_args:
  499. if type(arg) is not unicode_type:
  500. raise Exception("incorrect type for path arg: %r" % type(arg))
  501. self.write(
  502. dict(
  503. path=self.request.path,
  504. path_args=path_args,
  505. args=recursive_unicode(self.request.arguments),
  506. )
  507. )
  508. class RequestEncodingTest(WebTestCase):
  509. def get_handlers(self):
  510. return [("/group/(.*)", EchoHandler), ("/slashes/([^/]*)/([^/]*)", EchoHandler)]
  511. def fetch_json(self, path):
  512. return json_decode(self.fetch(path).body)
  513. def test_group_question_mark(self):
  514. # Ensure that url-encoded question marks are handled properly
  515. self.assertEqual(
  516. self.fetch_json("/group/%3F"),
  517. dict(path="/group/%3F", path_args=["?"], args={}),
  518. )
  519. self.assertEqual(
  520. self.fetch_json("/group/%3F?%3F=%3F"),
  521. dict(path="/group/%3F", path_args=["?"], args={"?": ["?"]}),
  522. )
  523. def test_group_encoding(self):
  524. # Path components and query arguments should be decoded the same way
  525. self.assertEqual(
  526. self.fetch_json("/group/%C3%A9?arg=%C3%A9"),
  527. {
  528. "path": "/group/%C3%A9",
  529. "path_args": ["\u00e9"],
  530. "args": {"arg": ["\u00e9"]},
  531. },
  532. )
  533. def test_slashes(self):
  534. # Slashes may be escaped to appear as a single "directory" in the path,
  535. # but they are then unescaped when passed to the get() method.
  536. self.assertEqual(
  537. self.fetch_json("/slashes/foo/bar"),
  538. dict(path="/slashes/foo/bar", path_args=["foo", "bar"], args={}),
  539. )
  540. self.assertEqual(
  541. self.fetch_json("/slashes/a%2Fb/c%2Fd"),
  542. dict(path="/slashes/a%2Fb/c%2Fd", path_args=["a/b", "c/d"], args={}),
  543. )
  544. def test_error(self):
  545. # Percent signs (encoded as %25) should not mess up printf-style
  546. # messages in logs
  547. with ExpectLog(gen_log, ".*Invalid unicode"):
  548. self.fetch("/group/?arg=%25%e9")
  549. class TypeCheckHandler(RequestHandler):
  550. def prepare(self):
  551. self.errors = {} # type: typing.Dict[str, str]
  552. self.check_type("status", self.get_status(), int)
  553. # get_argument is an exception from the general rule of using
  554. # type str for non-body data mainly for historical reasons.
  555. self.check_type("argument", self.get_argument("foo"), unicode_type)
  556. self.check_type("cookie_key", list(self.cookies.keys())[0], str)
  557. self.check_type("cookie_value", list(self.cookies.values())[0].value, str)
  558. # Secure cookies return bytes because they can contain arbitrary
  559. # data, but regular cookies are native strings.
  560. if list(self.cookies.keys()) != ["asdf"]:
  561. raise Exception(
  562. "unexpected values for cookie keys: %r" % self.cookies.keys()
  563. )
  564. self.check_type("get_signed_cookie", self.get_signed_cookie("asdf"), bytes)
  565. self.check_type("get_cookie", self.get_cookie("asdf"), str)
  566. self.check_type("xsrf_token", self.xsrf_token, bytes)
  567. self.check_type("xsrf_form_html", self.xsrf_form_html(), str)
  568. self.check_type("reverse_url", self.reverse_url("typecheck", "foo"), str)
  569. self.check_type("request_summary", self._request_summary(), str)
  570. def get(self, path_component):
  571. # path_component uses type unicode instead of str for consistency
  572. # with get_argument()
  573. self.check_type("path_component", path_component, unicode_type)
  574. self.write(self.errors)
  575. def post(self, path_component):
  576. self.check_type("path_component", path_component, unicode_type)
  577. self.write(self.errors)
  578. def check_type(self, name, obj, expected_type):
  579. actual_type = type(obj)
  580. if expected_type != actual_type:
  581. self.errors[name] = f"expected {expected_type}, got {actual_type}"
  582. class DecodeArgHandler(RequestHandler):
  583. def decode_argument(self, value, name=None):
  584. if type(value) is not bytes:
  585. raise Exception("unexpected type for value: %r" % type(value))
  586. # use self.request.arguments directly to avoid recursion
  587. if "encoding" in self.request.arguments:
  588. return value.decode(to_unicode(self.request.arguments["encoding"][0]))
  589. else:
  590. return value
  591. def get(self, arg):
  592. def describe(s):
  593. if type(s) is bytes:
  594. return ["bytes", native_str(binascii.b2a_hex(s))]
  595. elif type(s) is unicode_type:
  596. return ["unicode", s]
  597. raise Exception("unknown type")
  598. self.write({"path": describe(arg), "query": describe(self.get_argument("foo"))})
  599. class LinkifyHandler(RequestHandler):
  600. def get(self):
  601. self.render("linkify.html", message="http://example.com")
  602. class UIModuleResourceHandler(RequestHandler):
  603. def get(self):
  604. self.render("page.html", entries=[1, 2])
  605. class OptionalPathHandler(RequestHandler):
  606. def get(self, path):
  607. self.write({"path": path})
  608. class MultiHeaderHandler(RequestHandler):
  609. def get(self):
  610. self.set_header("x-overwrite", "1")
  611. self.set_header("X-Overwrite", 2)
  612. self.add_header("x-multi", 3)
  613. self.add_header("X-Multi", "4")
  614. class RedirectHandler(RequestHandler):
  615. def get(self):
  616. if self.get_argument("permanent", None) is not None:
  617. self.redirect("/", permanent=bool(int(self.get_argument("permanent"))))
  618. elif self.get_argument("status", None) is not None:
  619. self.redirect("/", status=int(self.get_argument("status")))
  620. else:
  621. raise Exception("didn't get permanent or status arguments")
  622. class EmptyFlushCallbackHandler(RequestHandler):
  623. @gen.coroutine
  624. def get(self):
  625. # Ensure that the flush callback is run whether or not there
  626. # was any output. The gen.Task and direct yield forms are
  627. # equivalent.
  628. yield self.flush() # "empty" flush, but writes headers
  629. yield self.flush() # empty flush
  630. self.write("o")
  631. yield self.flush() # flushes the "o"
  632. yield self.flush() # empty flush
  633. self.finish("k")
  634. class HeaderInjectionHandler(RequestHandler):
  635. def get(self):
  636. try:
  637. self.set_header("X-Foo", "foo\r\nX-Bar: baz")
  638. raise Exception("Didn't get expected exception")
  639. except ValueError as e:
  640. if "Unsafe header value" in str(e):
  641. self.finish(b"ok")
  642. else:
  643. raise
  644. class SetHeaderHandler(RequestHandler):
  645. def get(self):
  646. # tests the validity of web.RequestHandler._VALID_HEADER_CHARS
  647. illegal_chars = [chr(o) for o in range(0, 0x20)]
  648. illegal_chars.append(chr(0x7F))
  649. illegal_chars.remove("\t")
  650. for char in illegal_chars:
  651. try:
  652. self.set_header("X-Foo", "foo" + char + "bar")
  653. raise Exception("Didn't get expected exception")
  654. except ValueError as e:
  655. if "Unsafe header value" not in str(e):
  656. raise
  657. # an empty header value is valid as well
  658. self.set_header("X-Foo", "")
  659. self.finish(b"ok")
  660. class GetArgumentHandler(RequestHandler):
  661. def prepare(self):
  662. if self.get_argument("source", None) == "query":
  663. method = self.get_query_argument
  664. elif self.get_argument("source", None) == "body":
  665. method = self.get_body_argument
  666. else:
  667. method = self.get_argument # type: ignore
  668. self.finish(method("foo", "default"))
  669. class GetArgumentsHandler(RequestHandler):
  670. def prepare(self):
  671. self.finish(
  672. dict(
  673. default=self.get_arguments("foo"),
  674. query=self.get_query_arguments("foo"),
  675. body=self.get_body_arguments("foo"),
  676. )
  677. )
  678. # This test was shared with wsgi_test.py; now the name is meaningless.
  679. class WSGISafeWebTest(WebTestCase):
  680. COOKIE_SECRET = "WebTest.COOKIE_SECRET"
  681. def get_app_kwargs(self):
  682. loader = DictLoader(
  683. {
  684. "linkify.html": "{% module linkify(message) %}",
  685. "page.html": """\
  686. <html><head></head><body>
  687. {% for e in entries %}
  688. {% module Template("entry.html", entry=e) %}
  689. {% end %}
  690. </body></html>""",
  691. "entry.html": """\
  692. {{ set_resources(embedded_css=".entry { margin-bottom: 1em; }",
  693. embedded_javascript="js_embed()",
  694. css_files=["/base.css", "/foo.css"],
  695. javascript_files="/common.js",
  696. html_head="<meta>",
  697. html_body='<script src="/analytics.js"/>') }}
  698. <div class="entry">...</div>""",
  699. }
  700. )
  701. return dict(
  702. template_loader=loader,
  703. autoescape="xhtml_escape",
  704. cookie_secret=self.COOKIE_SECRET,
  705. )
  706. def tearDown(self):
  707. super().tearDown()
  708. RequestHandler._template_loaders.clear()
  709. def get_handlers(self):
  710. urls = [
  711. url("/typecheck/(.*)", TypeCheckHandler, name="typecheck"),
  712. url("/decode_arg/(.*)", DecodeArgHandler, name="decode_arg"),
  713. url("/decode_arg_kw/(?P<arg>.*)", DecodeArgHandler),
  714. url("/linkify", LinkifyHandler),
  715. url("/uimodule_resources", UIModuleResourceHandler),
  716. url("/optional_path/(.+)?", OptionalPathHandler),
  717. url("/multi_header", MultiHeaderHandler),
  718. url("/redirect", RedirectHandler),
  719. url(
  720. "/web_redirect_permanent",
  721. WebRedirectHandler,
  722. {"url": "/web_redirect_newpath"},
  723. ),
  724. url(
  725. "/web_redirect",
  726. WebRedirectHandler,
  727. {"url": "/web_redirect_newpath", "permanent": False},
  728. ),
  729. url(
  730. "//web_redirect_double_slash",
  731. WebRedirectHandler,
  732. {"url": "/web_redirect_newpath"},
  733. ),
  734. url("/header_injection", HeaderInjectionHandler),
  735. url("/get_argument", GetArgumentHandler),
  736. url("/get_arguments", GetArgumentsHandler),
  737. url("/set_header", SetHeaderHandler),
  738. ]
  739. return urls
  740. def fetch_json(self, *args, **kwargs):
  741. response = self.fetch(*args, **kwargs)
  742. response.rethrow()
  743. return json_decode(response.body)
  744. def test_types(self):
  745. cookie_value = to_unicode(
  746. create_signed_value(self.COOKIE_SECRET, "asdf", "qwer")
  747. )
  748. response = self.fetch(
  749. "/typecheck/asdf?foo=bar", headers={"Cookie": "asdf=" + cookie_value}
  750. )
  751. data = json_decode(response.body)
  752. self.assertEqual(data, {})
  753. response = self.fetch(
  754. "/typecheck/asdf?foo=bar",
  755. method="POST",
  756. headers={"Cookie": "asdf=" + cookie_value},
  757. body="foo=bar",
  758. )
  759. def test_decode_argument(self):
  760. # These urls all decode to the same thing
  761. urls = [
  762. "/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
  763. "/decode_arg/%E9?foo=%E9&encoding=latin1",
  764. "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
  765. ]
  766. for req_url in urls:
  767. response = self.fetch(req_url)
  768. response.rethrow()
  769. data = json_decode(response.body)
  770. self.assertEqual(
  771. data,
  772. {"path": ["unicode", "\u00e9"], "query": ["unicode", "\u00e9"]},
  773. )
  774. response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
  775. response.rethrow()
  776. data = json_decode(response.body)
  777. self.assertEqual(data, {"path": ["bytes", "c3a9"], "query": ["bytes", "c3a9"]})
  778. def test_decode_argument_invalid_unicode(self):
  779. # test that invalid unicode in URLs causes 400, not 500
  780. with ExpectLog(gen_log, ".*Invalid unicode.*"):
  781. response = self.fetch("/typecheck/invalid%FF")
  782. self.assertEqual(response.code, 400)
  783. response = self.fetch("/typecheck/invalid?foo=%FF")
  784. self.assertEqual(response.code, 400)
  785. def test_decode_argument_plus(self):
  786. # These urls are all equivalent.
  787. urls = [
  788. "/decode_arg/1%20%2B%201?foo=1%20%2B%201&encoding=utf-8",
  789. "/decode_arg/1%20+%201?foo=1+%2B+1&encoding=utf-8",
  790. ]
  791. for req_url in urls:
  792. response = self.fetch(req_url)
  793. response.rethrow()
  794. data = json_decode(response.body)
  795. self.assertEqual(
  796. data,
  797. {"path": ["unicode", "1 + 1"], "query": ["unicode", "1 + 1"]},
  798. )
  799. def test_reverse_url(self):
  800. self.assertEqual(self.app.reverse_url("decode_arg", "foo"), "/decode_arg/foo")
  801. self.assertEqual(self.app.reverse_url("decode_arg", 42), "/decode_arg/42")
  802. self.assertEqual(self.app.reverse_url("decode_arg", b"\xe9"), "/decode_arg/%E9")
  803. self.assertEqual(
  804. self.app.reverse_url("decode_arg", "\u00e9"), "/decode_arg/%C3%A9"
  805. )
  806. self.assertEqual(
  807. self.app.reverse_url("decode_arg", "1 + 1"), "/decode_arg/1%20%2B%201"
  808. )
  809. def test_uimodule_unescaped(self):
  810. response = self.fetch("/linkify")
  811. self.assertEqual(
  812. response.body, b'<a href="http://example.com">http://example.com</a>'
  813. )
  814. def test_uimodule_resources(self):
  815. response = self.fetch("/uimodule_resources")
  816. self.assertEqual(
  817. response.body,
  818. b"""\
  819. <html><head><link href="/base.css" type="text/css" rel="stylesheet"/><link href="/foo.css" type="text/css" rel="stylesheet"/>
  820. <style type="text/css">
  821. .entry { margin-bottom: 1em; }
  822. </style>
  823. <meta>
  824. </head><body>
  825. <div class="entry">...</div>
  826. <div class="entry">...</div>
  827. <script src="/common.js" type="text/javascript"></script>
  828. <script type="text/javascript">
  829. //<![CDATA[
  830. js_embed()
  831. //]]>
  832. </script>
  833. <script src="/analytics.js"/>
  834. </body></html>""", # noqa: E501
  835. )
  836. def test_optional_path(self):
  837. self.assertEqual(self.fetch_json("/optional_path/foo"), {"path": "foo"})
  838. self.assertEqual(self.fetch_json("/optional_path/"), {"path": None})
  839. def test_multi_header(self):
  840. response = self.fetch("/multi_header")
  841. self.assertEqual(response.headers["x-overwrite"], "2")
  842. self.assertEqual(response.headers.get_list("x-multi"), ["3", "4"])
  843. def test_redirect(self):
  844. response = self.fetch("/redirect?permanent=1", follow_redirects=False)
  845. self.assertEqual(response.code, 301)
  846. response = self.fetch("/redirect?permanent=0", follow_redirects=False)
  847. self.assertEqual(response.code, 302)
  848. response = self.fetch("/redirect?status=307", follow_redirects=False)
  849. self.assertEqual(response.code, 307)
  850. def test_web_redirect(self):
  851. response = self.fetch("/web_redirect_permanent", follow_redirects=False)
  852. self.assertEqual(response.code, 301)
  853. self.assertEqual(response.headers["Location"], "/web_redirect_newpath")
  854. response = self.fetch("/web_redirect", follow_redirects=False)
  855. self.assertEqual(response.code, 302)
  856. self.assertEqual(response.headers["Location"], "/web_redirect_newpath")
  857. def test_web_redirect_double_slash(self):
  858. response = self.fetch("//web_redirect_double_slash", follow_redirects=False)
  859. self.assertEqual(response.code, 301)
  860. self.assertEqual(response.headers["Location"], "/web_redirect_newpath")
  861. def test_header_injection(self):
  862. response = self.fetch("/header_injection")
  863. self.assertEqual(response.body, b"ok")
  864. def test_set_header(self):
  865. response = self.fetch("/set_header")
  866. self.assertEqual(response.body, b"ok")
  867. def test_get_argument(self):
  868. response = self.fetch("/get_argument?foo=bar")
  869. self.assertEqual(response.body, b"bar")
  870. response = self.fetch("/get_argument?foo=")
  871. self.assertEqual(response.body, b"")
  872. response = self.fetch("/get_argument")
  873. self.assertEqual(response.body, b"default")
  874. # Test merging of query and body arguments.
  875. # In singular form, body arguments take precedence over query arguments.
  876. body = urllib.parse.urlencode(dict(foo="hello"))
  877. response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
  878. self.assertEqual(response.body, b"hello")
  879. # In plural methods they are merged.
  880. response = self.fetch("/get_arguments?foo=bar", method="POST", body=body)
  881. self.assertEqual(
  882. json_decode(response.body),
  883. dict(default=["bar", "hello"], query=["bar"], body=["hello"]),
  884. )
  885. def test_get_query_arguments(self):
  886. # send as a post so we can ensure the separation between query
  887. # string and body arguments.
  888. body = urllib.parse.urlencode(dict(foo="hello"))
  889. response = self.fetch(
  890. "/get_argument?source=query&foo=bar", method="POST", body=body
  891. )
  892. self.assertEqual(response.body, b"bar")
  893. response = self.fetch(
  894. "/get_argument?source=query&foo=", method="POST", body=body
  895. )
  896. self.assertEqual(response.body, b"")
  897. response = self.fetch("/get_argument?source=query", method="POST", body=body)
  898. self.assertEqual(response.body, b"default")
  899. def test_get_body_arguments(self):
  900. body = urllib.parse.urlencode(dict(foo="bar"))
  901. response = self.fetch(
  902. "/get_argument?source=body&foo=hello", method="POST", body=body
  903. )
  904. self.assertEqual(response.body, b"bar")
  905. body = urllib.parse.urlencode(dict(foo=""))
  906. response = self.fetch(
  907. "/get_argument?source=body&foo=hello", method="POST", body=body
  908. )
  909. self.assertEqual(response.body, b"")
  910. body = urllib.parse.urlencode(dict())
  911. response = self.fetch(
  912. "/get_argument?source=body&foo=hello", method="POST", body=body
  913. )
  914. self.assertEqual(response.body, b"default")
  915. def test_no_gzip(self):
  916. response = self.fetch("/get_argument")
  917. self.assertNotIn("Accept-Encoding", response.headers.get("Vary", ""))
  918. self.assertNotIn("gzip", response.headers.get("Content-Encoding", ""))
  919. class NonWSGIWebTests(WebTestCase):
  920. def get_handlers(self):
  921. return [("/empty_flush", EmptyFlushCallbackHandler)]
  922. def test_empty_flush(self):
  923. response = self.fetch("/empty_flush")
  924. self.assertEqual(response.body, b"ok")
  925. class ErrorResponseTest(WebTestCase):
  926. def get_handlers(self):
  927. class DefaultHandler(RequestHandler):
  928. def get(self):
  929. if self.get_argument("status", None):
  930. raise HTTPError(int(self.get_argument("status")))
  931. 1 / 0
  932. class WriteErrorHandler(RequestHandler):
  933. def get(self):
  934. if self.get_argument("status", None):
  935. self.send_error(int(self.get_argument("status")))
  936. else:
  937. 1 / 0
  938. def write_error(self, status_code, **kwargs):
  939. self.set_header("Content-Type", "text/plain")
  940. if "exc_info" in kwargs:
  941. self.write("Exception: %s" % kwargs["exc_info"][0].__name__)
  942. else:
  943. self.write("Status: %d" % status_code)
  944. class FailedWriteErrorHandler(RequestHandler):
  945. def get(self):
  946. 1 / 0
  947. def write_error(self, status_code, **kwargs):
  948. raise Exception("exception in write_error")
  949. return [
  950. url("/default", DefaultHandler),
  951. url("/write_error", WriteErrorHandler),
  952. url("/failed_write_error", FailedWriteErrorHandler),
  953. ]
  954. def test_default(self):
  955. with ExpectLog(app_log, "Uncaught exception"):
  956. response = self.fetch("/default")
  957. self.assertEqual(response.code, 500)
  958. self.assertIn(b"500: Internal Server Error", response.body)
  959. response = self.fetch("/default?status=503")
  960. self.assertEqual(response.code, 503)
  961. self.assertIn(b"503: Service Unavailable", response.body)
  962. response = self.fetch("/default?status=435")
  963. self.assertEqual(response.code, 435)
  964. self.assertIn(b"435: Unknown", response.body)
  965. def test_write_error(self):
  966. with ExpectLog(app_log, "Uncaught exception"):
  967. response = self.fetch("/write_error")
  968. self.assertEqual(response.code, 500)
  969. self.assertEqual(b"Exception: ZeroDivisionError", response.body)
  970. response = self.fetch("/write_error?status=503")
  971. self.assertEqual(response.code, 503)
  972. self.assertEqual(b"Status: 503", response.body)
  973. def test_failed_write_error(self):
  974. with ExpectLog(app_log, "Uncaught exception"):
  975. response = self.fetch("/failed_write_error")
  976. self.assertEqual(response.code, 500)
  977. self.assertEqual(b"", response.body)
  978. class StaticFileTest(WebTestCase):
  979. # The expected SHA-512 hash of robots.txt, used in tests that call
  980. # StaticFileHandler.get_version
  981. robots_txt_hash = (
  982. b"63a36e950e134b5217e33c763e88840c10a07d80e6057d92b9ac97508de7fb1f"
  983. b"a6f0e9b7531e169657165ea764e8963399cb6d921ffe6078425aaafe54c04563"
  984. )
  985. static_dir = os.path.join(os.path.dirname(__file__), "static")
  986. def get_handlers(self):
  987. class StaticUrlHandler(RequestHandler):
  988. def get(self, path):
  989. with_v = int(self.get_argument("include_version", "1"))
  990. self.write(self.static_url(path, include_version=with_v))
  991. class AbsoluteStaticUrlHandler(StaticUrlHandler):
  992. include_host = True
  993. class OverrideStaticUrlHandler(RequestHandler):
  994. def get(self, path):
  995. do_include = bool(self.get_argument("include_host"))
  996. self.include_host = not do_include
  997. regular_url = self.static_url(path)
  998. override_url = self.static_url(path, include_host=do_include)
  999. if override_url == regular_url:
  1000. return self.write(str(False))
  1001. protocol = self.request.protocol + "://"
  1002. protocol_length = len(protocol)
  1003. check_regular = regular_url.find(protocol, 0, protocol_length)
  1004. check_override = override_url.find(protocol, 0, protocol_length)
  1005. if do_include:
  1006. result = check_override == 0 and check_regular == -1
  1007. else:
  1008. result = check_override == -1 and check_regular == 0
  1009. self.write(str(result))
  1010. return [
  1011. ("/static_url/(.*)", StaticUrlHandler),
  1012. ("/abs_static_url/(.*)", AbsoluteStaticUrlHandler),
  1013. ("/override_static_url/(.*)", OverrideStaticUrlHandler),
  1014. ("/root_static/(.*)", StaticFileHandler, dict(path="/")),
  1015. ]
  1016. def get_app_kwargs(self):
  1017. return dict(static_path=relpath("static"))
  1018. def test_static_files(self):
  1019. response = self.fetch("/robots.txt")
  1020. self.assertIn(b"Disallow: /", response.body)
  1021. response = self.fetch("/static/robots.txt")
  1022. self.assertIn(b"Disallow: /", response.body)
  1023. self.assertEqual(response.headers.get("Content-Type"), "text/plain")
  1024. def test_static_files_cacheable(self):
  1025. # Test that the version parameter triggers cache-control headers. This
  1026. # test is pretty weak but it gives us coverage of the code path which
  1027. # was important for detecting the deprecation of datetime.utcnow.
  1028. response = self.fetch("/robots.txt?v=12345")
  1029. self.assertIn(b"Disallow: /", response.body)
  1030. self.assertIn("Cache-Control", response.headers)
  1031. self.assertIn("Expires", response.headers)
  1032. def test_static_compressed_files(self):
  1033. response = self.fetch("/static/sample.xml.gz")
  1034. self.assertEqual(response.headers.get("Content-Type"), "application/gzip")
  1035. response = self.fetch("/static/sample.xml.bz2")
  1036. self.assertEqual(
  1037. response.headers.get("Content-Type"), "application/octet-stream"
  1038. )
  1039. # make sure the uncompressed file still has the correct type
  1040. response = self.fetch("/static/sample.xml")
  1041. self.assertIn(
  1042. response.headers.get("Content-Type"), {"text/xml", "application/xml"}
  1043. )
  1044. def test_static_url(self):
  1045. response = self.fetch("/static_url/robots.txt")
  1046. self.assertEqual(response.body, b"/static/robots.txt?v=" + self.robots_txt_hash)
  1047. def test_absolute_static_url(self):
  1048. response = self.fetch("/abs_static_url/robots.txt")
  1049. self.assertEqual(
  1050. response.body,
  1051. (utf8(self.get_url("/")) + b"static/robots.txt?v=" + self.robots_txt_hash),
  1052. )
  1053. def test_relative_version_exclusion(self):
  1054. response = self.fetch("/static_url/robots.txt?include_version=0")
  1055. self.assertEqual(response.body, b"/static/robots.txt")
  1056. def test_absolute_version_exclusion(self):
  1057. response = self.fetch("/abs_static_url/robots.txt?include_version=0")
  1058. self.assertEqual(response.body, utf8(self.get_url("/") + "static/robots.txt"))
  1059. def test_include_host_override(self):
  1060. self._trigger_include_host_check(False)
  1061. self._trigger_include_host_check(True)
  1062. def _trigger_include_host_check(self, include_host):
  1063. path = "/override_static_url/robots.txt?include_host=%s"
  1064. response = self.fetch(path % int(include_host))
  1065. self.assertEqual(response.body, utf8(str(True)))
  1066. def get_and_head(self, *args, **kwargs):
  1067. """Performs a GET and HEAD request and returns the GET response.
  1068. Fails if any ``Content-*`` headers returned by the two requests
  1069. differ.
  1070. """
  1071. head_response = self.fetch(*args, method="HEAD", **kwargs)
  1072. get_response = self.fetch(*args, method="GET", **kwargs)
  1073. content_headers = set()
  1074. for h in itertools.chain(head_response.headers, get_response.headers):
  1075. if h.startswith("Content-"):
  1076. content_headers.add(h)
  1077. for h in content_headers:
  1078. self.assertEqual(
  1079. head_response.headers.get(h),
  1080. get_response.headers.get(h),
  1081. "%s differs between GET (%s) and HEAD (%s)"
  1082. % (h, head_response.headers.get(h), get_response.headers.get(h)),
  1083. )
  1084. return get_response
  1085. def test_static_304_if_modified_since(self):
  1086. response1 = self.get_and_head("/static/robots.txt")
  1087. response2 = self.get_and_head(
  1088. "/static/robots.txt",
  1089. headers={"If-Modified-Since": response1.headers["Last-Modified"]},
  1090. )
  1091. self.assertEqual(response2.code, 304)
  1092. self.assertNotIn("Content-Length", response2.headers)
  1093. def test_static_304_if_none_match(self):
  1094. response1 = self.get_and_head("/static/robots.txt")
  1095. response2 = self.get_and_head(
  1096. "/static/robots.txt", headers={"If-None-Match": response1.headers["Etag"]}
  1097. )
  1098. self.assertEqual(response2.code, 304)
  1099. def test_static_304_etag_modified_bug(self):
  1100. response1 = self.get_and_head("/static/robots.txt")
  1101. response2 = self.get_and_head(
  1102. "/static/robots.txt",
  1103. headers={
  1104. "If-None-Match": '"MISMATCH"',
  1105. "If-Modified-Since": response1.headers["Last-Modified"],
  1106. },
  1107. )
  1108. self.assertEqual(response2.code, 200)
  1109. def test_static_304_if_modified_since_invalid(self):
  1110. response = self.get_and_head(
  1111. "/static/robots.txt",
  1112. headers={"If-Modified-Since": "!nv@l!d"},
  1113. )
  1114. self.assertEqual(response.code, 200)
  1115. def test_static_if_modified_since_pre_epoch(self):
  1116. # On windows, the functions that work with time_t do not accept
  1117. # negative values, and at least one client (processing.js) seems
  1118. # to use if-modified-since 1/1/1960 as a cache-busting technique.
  1119. response = self.get_and_head(
  1120. "/static/robots.txt",
  1121. headers={"If-Modified-Since": "Fri, 01 Jan 1960 00:00:00 GMT"},
  1122. )
  1123. self.assertEqual(response.code, 200)
  1124. def test_static_if_modified_since_time_zone(self):
  1125. # Instead of the value from Last-Modified, make requests with times
  1126. # chosen just before and after the known modification time
  1127. # of the file to ensure that the right time zone is being used
  1128. # when parsing If-Modified-Since.
  1129. stat = os.stat(relpath("static/robots.txt"))
  1130. response = self.get_and_head(
  1131. "/static/robots.txt",
  1132. headers={"If-Modified-Since": format_timestamp(stat.st_mtime - 1)},
  1133. )
  1134. self.assertEqual(response.code, 200)
  1135. response = self.get_and_head(
  1136. "/static/robots.txt",
  1137. headers={"If-Modified-Since": format_timestamp(stat.st_mtime + 1)},
  1138. )
  1139. self.assertEqual(response.code, 304)
  1140. def test_static_etag(self):
  1141. response = self.get_and_head("/static/robots.txt")
  1142. self.assertEqual(
  1143. utf8(response.headers.get("Etag")), b'"' + self.robots_txt_hash + b'"'
  1144. )
  1145. def test_static_with_range(self):
  1146. response = self.get_and_head(
  1147. "/static/robots.txt", headers={"Range": "bytes=0-9"}
  1148. )
  1149. self.assertEqual(response.code, 206)
  1150. self.assertEqual(response.body, b"User-agent")
  1151. self.assertEqual(
  1152. utf8(response.headers.get("Etag")), b'"' + self.robots_txt_hash + b'"'
  1153. )
  1154. self.assertEqual(response.headers.get("Content-Length"), "10")
  1155. self.assertEqual(response.headers.get("Content-Range"), "bytes 0-9/26")
  1156. def test_static_with_range_full_file(self):
  1157. response = self.get_and_head(
  1158. "/static/robots.txt", headers={"Range": "bytes=0-"}
  1159. )
  1160. # Note: Chrome refuses to play audio if it gets an HTTP 206 in response
  1161. # to ``Range: bytes=0-`` :(
  1162. self.assertEqual(response.code, 200)
  1163. robots_file_path = os.path.join(self.static_dir, "robots.txt")
  1164. with open(robots_file_path, encoding="utf-8") as f:
  1165. self.assertEqual(response.body, utf8(f.read()))
  1166. self.assertEqual(response.headers.get("Content-Length"), "26")
  1167. self.assertIsNone(response.headers.get("Content-Range"))
  1168. def test_static_with_range_full_past_end(self):
  1169. response = self.get_and_head(
  1170. "/static/robots.txt", headers={"Range": "bytes=0-10000000"}
  1171. )
  1172. self.assertEqual(response.code, 200)
  1173. robots_file_path = os.path.join(self.static_dir, "robots.txt")
  1174. with open(robots_file_path, encoding="utf-8") as f:
  1175. self.assertEqual(response.body, utf8(f.read()))
  1176. self.assertEqual(response.headers.get("Content-Length"), "26")
  1177. self.assertIsNone(response.headers.get("Content-Range"))
  1178. def test_static_with_range_partial_past_end(self):
  1179. response = self.get_and_head(
  1180. "/static/robots.txt", headers={"Range": "bytes=1-10000000"}
  1181. )
  1182. self.assertEqual(response.code, 206)
  1183. robots_file_path = os.path.join(self.static_dir, "robots.txt")
  1184. with open(robots_file_path, encoding="utf-8") as f:
  1185. self.assertEqual(response.body, utf8(f.read()[1:]))
  1186. self.assertEqual(response.headers.get("Content-Length"), "25")
  1187. self.assertEqual(response.headers.get("Content-Range"), "bytes 1-25/26")
  1188. def test_static_with_range_end_edge(self):
  1189. response = self.get_and_head(
  1190. "/static/robots.txt", headers={"Range": "bytes=22-"}
  1191. )
  1192. self.assertEqual(response.body, b": /\n")
  1193. self.assertEqual(response.headers.get("Content-Length"), "4")
  1194. self.assertEqual(response.headers.get("Content-Range"), "bytes 22-25/26")
  1195. def test_static_with_range_neg_end(self):
  1196. response = self.get_and_head(
  1197. "/static/robots.txt", headers={"Range": "bytes=-4"}
  1198. )
  1199. self.assertEqual(response.body, b": /\n")
  1200. self.assertEqual(response.headers.get("Content-Length"), "4")
  1201. self.assertEqual(response.headers.get("Content-Range"), "bytes 22-25/26")
  1202. def test_static_with_range_neg_past_start(self):
  1203. response = self.get_and_head(
  1204. "/static/robots.txt", headers={"Range": "bytes=-1000000"}
  1205. )
  1206. self.assertEqual(response.code, 200)
  1207. robots_file_path = os.path.join(self.static_dir, "robots.txt")
  1208. with open(robots_file_path, encoding="utf-8") as f:
  1209. self.assertEqual(response.body, utf8(f.read()))
  1210. self.assertEqual(response.headers.get("Content-Length"), "26")
  1211. self.assertIsNone(response.headers.get("Content-Range"))
  1212. def test_static_invalid_range(self):
  1213. response = self.get_and_head("/static/robots.txt", headers={"Range": "asdf"})
  1214. self.assertEqual(response.code, 200)
  1215. def test_static_unsatisfiable_range_zero_suffix(self):
  1216. response = self.get_and_head(
  1217. "/static/robots.txt", headers={"Range": "bytes=-0"}
  1218. )
  1219. self.assertEqual(response.headers.get("Content-Range"), "bytes */26")
  1220. self.assertEqual(response.code, 416)
  1221. def test_static_unsatisfiable_range_invalid_start(self):
  1222. response = self.get_and_head(
  1223. "/static/robots.txt", headers={"Range": "bytes=26"}
  1224. )
  1225. self.assertEqual(response.code, 416)
  1226. self.assertEqual(response.headers.get("Content-Range"), "bytes */26")
  1227. def test_static_unsatisfiable_range_end_less_than_start(self):
  1228. response = self.get_and_head(
  1229. "/static/robots.txt", headers={"Range": "bytes=10-3"}
  1230. )
  1231. self.assertEqual(response.code, 416)
  1232. self.assertEqual(response.headers.get("Content-Range"), "bytes */26")
  1233. def test_static_head(self):
  1234. response = self.fetch("/static/robots.txt", method="HEAD")
  1235. self.assertEqual(response.code, 200)
  1236. # No body was returned, but we did get the right content length.
  1237. self.assertEqual(response.body, b"")
  1238. self.assertEqual(response.headers["Content-Length"], "26")
  1239. self.assertEqual(
  1240. utf8(response.headers["Etag"]), b'"' + self.robots_txt_hash + b'"'
  1241. )
  1242. def test_static_head_range(self):
  1243. response = self.fetch(
  1244. "/static/robots.txt", method="HEAD", headers={"Range": "bytes=1-4"}
  1245. )
  1246. self.assertEqual(response.code, 206)
  1247. self.assertEqual(response.body, b"")
  1248. self.assertEqual(response.headers["Content-Length"], "4")
  1249. self.assertEqual(
  1250. utf8(response.headers["Etag"]), b'"' + self.robots_txt_hash + b'"'
  1251. )
  1252. def test_static_range_if_none_match(self):
  1253. response = self.get_and_head(
  1254. "/static/robots.txt",
  1255. headers={
  1256. "Range": "bytes=1-4",
  1257. "If-None-Match": b'"' + self.robots_txt_hash + b'"',
  1258. },
  1259. )
  1260. self.assertEqual(response.code, 304)
  1261. self.assertEqual(response.body, b"")
  1262. self.assertNotIn("Content-Length", response.headers)
  1263. self.assertEqual(
  1264. utf8(response.headers["Etag"]), b'"' + self.robots_txt_hash + b'"'
  1265. )
  1266. def test_static_404(self):
  1267. response = self.get_and_head("/static/blarg")
  1268. self.assertEqual(response.code, 404)
  1269. def test_path_traversal_protection(self):
  1270. # curl_httpclient processes ".." on the client side, so we
  1271. # must test this with simple_httpclient.
  1272. self.http_client.close()
  1273. self.http_client = SimpleAsyncHTTPClient()
  1274. with ExpectLog(gen_log, ".*not in root static directory"):
  1275. response = self.get_and_head("/static/../static_foo.txt")
  1276. # Attempted path traversal should result in 403, not 200
  1277. # (which means the check failed and the file was served)
  1278. # or 404 (which means that the file didn't exist and
  1279. # is probably a packaging error).
  1280. self.assertEqual(response.code, 403)
  1281. @unittest.skipIf(os.name != "posix", "non-posix OS")
  1282. def test_root_static_path(self):
  1283. # Sometimes people set the StaticFileHandler's path to '/'
  1284. # to disable Tornado's path validation (in conjunction with
  1285. # their own validation in get_absolute_path). Make sure
  1286. # that the stricter validation in 4.2.1 doesn't break them.
  1287. path = os.path.join(
  1288. os.path.dirname(os.path.abspath(__file__)), "static/robots.txt"
  1289. )
  1290. response = self.get_and_head("/root_static" + urllib.parse.quote(path))
  1291. self.assertEqual(response.code, 200)
  1292. class StaticDefaultFilenameTest(WebTestCase):
  1293. def get_app_kwargs(self):
  1294. return dict(
  1295. static_path=relpath("static"),
  1296. static_handler_args=dict(default_filename="index.html"),
  1297. )
  1298. def get_handlers(self):
  1299. return []
  1300. def test_static_default_filename(self):
  1301. response = self.fetch("/static/dir/", follow_redirects=False)
  1302. self.assertEqual(response.code, 200)
  1303. self.assertEqual(b"this is the index\n", response.body)
  1304. def test_static_default_redirect(self):
  1305. response = self.fetch("/static/dir", follow_redirects=False)
  1306. self.assertEqual(response.code, 301)
  1307. self.assertTrue(response.headers["Location"].endswith("/static/dir/"))
  1308. class StaticDefaultFilenameRootTest(WebTestCase):
  1309. def get_app_kwargs(self):
  1310. return dict(
  1311. static_path=os.path.abspath(relpath("static")),
  1312. static_handler_args=dict(default_filename="index.html"),
  1313. static_url_prefix="/",
  1314. )
  1315. def get_handlers(self):
  1316. return []
  1317. def get_http_client(self):
  1318. # simple_httpclient only: curl doesn't let you send a request starting
  1319. # with two slashes.
  1320. return SimpleAsyncHTTPClient()
  1321. def test_no_open_redirect(self):
  1322. # This test verifies that the open redirect that affected some configurations
  1323. # prior to Tornado 6.3.2 is no longer possible. The vulnerability required
  1324. # a static_url_prefix of "/" and a default_filename (any value) to be set.
  1325. # The absolute* server-side path to the static directory must also be known.
  1326. #
  1327. # * Almost absolute: On windows, the drive letter is stripped from the path.
  1328. test_dir = os.path.dirname(__file__)
  1329. drive, tail = os.path.splitdrive(test_dir)
  1330. if os.name == "posix":
  1331. self.assertEqual(tail, test_dir)
  1332. else:
  1333. test_dir = tail
  1334. with ExpectLog(gen_log, ".*cannot redirect path with two initial slashes"):
  1335. response = self.fetch(
  1336. f"//evil.com/../{test_dir}/static/dir",
  1337. follow_redirects=False,
  1338. )
  1339. self.assertEqual(response.code, 403)
  1340. class StaticFileWithPathTest(WebTestCase):
  1341. def get_app_kwargs(self):
  1342. return dict(
  1343. static_path=relpath("static"),
  1344. static_handler_args=dict(default_filename="index.html"),
  1345. )
  1346. def get_handlers(self):
  1347. return [("/foo/(.*)", StaticFileHandler, {"path": relpath("templates/")})]
  1348. def test_serve(self):
  1349. response = self.fetch("/foo/utf8.html")
  1350. self.assertEqual(response.body, b"H\xc3\xa9llo\n")
  1351. class CustomStaticFileTest(WebTestCase):
  1352. def get_handlers(self):
  1353. class MyStaticFileHandler(StaticFileHandler):
  1354. @classmethod
  1355. def make_static_url(cls, settings, path):
  1356. version_hash = cls.get_version(settings, path)
  1357. extension_index = path.rindex(".")
  1358. before_version = path[:extension_index]
  1359. after_version = path[(extension_index + 1) :]
  1360. return "/static/{}.{}.{}".format(
  1361. before_version,
  1362. version_hash,
  1363. after_version,
  1364. )
  1365. def parse_url_path(self, url_path):
  1366. extension_index = url_path.rindex(".")
  1367. version_index = url_path.rindex(".", 0, extension_index)
  1368. return f"{url_path[:version_index]}{url_path[extension_index:]}"
  1369. @classmethod
  1370. def get_absolute_path(cls, settings, path):
  1371. return "CustomStaticFileTest:" + path
  1372. def validate_absolute_path(self, root, absolute_path):
  1373. return absolute_path
  1374. @classmethod
  1375. def get_content(self, path, start=None, end=None):
  1376. assert start is None and end is None
  1377. if path == "CustomStaticFileTest:foo.txt":
  1378. return b"bar"
  1379. raise Exception("unexpected path %r" % path)
  1380. def get_content_size(self):
  1381. if self.absolute_path == "CustomStaticFileTest:foo.txt":
  1382. return 3
  1383. raise Exception("unexpected path %r" % self.absolute_path)
  1384. def get_modified_time(self):
  1385. return None
  1386. @classmethod
  1387. def get_version(cls, settings, path):
  1388. return "42"
  1389. class StaticUrlHandler(RequestHandler):
  1390. def get(self, path):
  1391. self.write(self.static_url(path))
  1392. self.static_handler_class = MyStaticFileHandler
  1393. return [("/static_url/(.*)", StaticUrlHandler)]
  1394. def get_app_kwargs(self):
  1395. return dict(static_path="dummy", static_handler_class=self.static_handler_class)
  1396. def test_serve(self):
  1397. response = self.fetch("/static/foo.42.txt")
  1398. self.assertEqual(response.body, b"bar")
  1399. def test_static_url(self):
  1400. with ExpectLog(gen_log, "Could not open static file", required=False):
  1401. response = self.fetch("/static_url/foo.txt")
  1402. self.assertEqual(response.body, b"/static/foo.42.txt")
  1403. class HostMatchingTest(WebTestCase):
  1404. class Handler(RequestHandler):
  1405. def initialize(self, reply):
  1406. self.reply = reply
  1407. def get(self):
  1408. self.write(self.reply)
  1409. def get_handlers(self):
  1410. return [("/foo", HostMatchingTest.Handler, {"reply": "wildcard"})]
  1411. def test_host_matching(self):
  1412. self.app.add_handlers(
  1413. "www.example.com", [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})]
  1414. )
  1415. self.app.add_handlers(
  1416. r"www\.example\.com", [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})]
  1417. )
  1418. self.app.add_handlers(
  1419. "www.example.com", [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})]
  1420. )
  1421. self.app.add_handlers(
  1422. "www.e.*e.com", [("/baz", HostMatchingTest.Handler, {"reply": "[3]"})]
  1423. )
  1424. response = self.fetch("/foo")
  1425. self.assertEqual(response.body, b"wildcard")
  1426. response = self.fetch("/bar")
  1427. self.assertEqual(response.code, 404)
  1428. response = self.fetch("/baz")
  1429. self.assertEqual(response.code, 404)
  1430. response = self.fetch("/foo", headers={"Host": "www.example.com"})
  1431. self.assertEqual(response.body, b"[0]")
  1432. response = self.fetch("/bar", headers={"Host": "www.example.com"})
  1433. self.assertEqual(response.body, b"[1]")
  1434. response = self.fetch("/baz", headers={"Host": "www.example.com"})
  1435. self.assertEqual(response.body, b"[2]")
  1436. response = self.fetch("/baz", headers={"Host": "www.exe.com"})
  1437. self.assertEqual(response.body, b"[3]")
  1438. class DefaultHostMatchingTest(WebTestCase):
  1439. def get_handlers(self):
  1440. return []
  1441. def get_app_kwargs(self):
  1442. return {"default_host": "www.example.com"}
  1443. def test_default_host_matching(self):
  1444. self.app.add_handlers(
  1445. "www.example.com", [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})]
  1446. )
  1447. self.app.add_handlers(
  1448. r"www\.example\.com", [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})]
  1449. )
  1450. self.app.add_handlers(
  1451. "www.test.com", [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})]
  1452. )
  1453. response = self.fetch("/foo")
  1454. self.assertEqual(response.body, b"[0]")
  1455. response = self.fetch("/bar")
  1456. self.assertEqual(response.body, b"[1]")
  1457. response = self.fetch("/baz")
  1458. self.assertEqual(response.code, 404)
  1459. response = self.fetch("/foo", headers={"X-Real-Ip": "127.0.0.1"})
  1460. self.assertEqual(response.code, 404)
  1461. self.app.default_host = "www.test.com"
  1462. response = self.fetch("/baz")
  1463. self.assertEqual(response.body, b"[2]")
  1464. class NamedURLSpecGroupsTest(WebTestCase):
  1465. def get_handlers(self):
  1466. class EchoHandler(RequestHandler):
  1467. def get(self, path):
  1468. self.write(path)
  1469. return [
  1470. ("/str/(?P<path>.*)", EchoHandler),
  1471. ("/unicode/(?P<path>.*)", EchoHandler),
  1472. ]
  1473. def test_named_urlspec_groups(self):
  1474. response = self.fetch("/str/foo")
  1475. self.assertEqual(response.body, b"foo")
  1476. response = self.fetch("/unicode/bar")
  1477. self.assertEqual(response.body, b"bar")
  1478. class ClearHeaderTest(SimpleHandlerTestCase):
  1479. class Handler(RequestHandler):
  1480. def get(self):
  1481. self.set_header("h1", "foo")
  1482. self.set_header("h2", "bar")
  1483. self.clear_header("h1")
  1484. self.clear_header("nonexistent")
  1485. def test_clear_header(self):
  1486. response = self.fetch("/")
  1487. self.assertNotIn("h1", response.headers)
  1488. self.assertEqual(response.headers["h2"], "bar")
  1489. class Header204Test(SimpleHandlerTestCase):
  1490. class Handler(RequestHandler):
  1491. def get(self):
  1492. self.set_status(204)
  1493. self.finish()
  1494. def test_204_headers(self):
  1495. response = self.fetch("/")
  1496. self.assertEqual(response.code, 204)
  1497. self.assertNotIn("Content-Length", response.headers)
  1498. self.assertNotIn("Transfer-Encoding", response.headers)
  1499. class Header304Test(SimpleHandlerTestCase):
  1500. class Handler(RequestHandler):
  1501. def get(self):
  1502. self.set_header("Content-Language", "en_US")
  1503. self.write("hello")
  1504. def test_304_headers(self):
  1505. response1 = self.fetch("/")
  1506. self.assertEqual(response1.headers["Content-Length"], "5")
  1507. self.assertEqual(response1.headers["Content-Language"], "en_US")
  1508. response2 = self.fetch(
  1509. "/", headers={"If-None-Match": response1.headers["Etag"]}
  1510. )
  1511. self.assertEqual(response2.code, 304)
  1512. self.assertNotIn("Content-Length", response2.headers)
  1513. self.assertNotIn("Content-Language", response2.headers)
  1514. # Not an entity header, but should not be added to 304s by chunking
  1515. self.assertNotIn("Transfer-Encoding", response2.headers)
  1516. class StatusReasonTest(SimpleHandlerTestCase):
  1517. class Handler(RequestHandler):
  1518. def get(self):
  1519. reason = self.request.arguments.get("reason", [])
  1520. raise HTTPError(
  1521. int(self.get_argument("code")),
  1522. reason=to_unicode(reason[0]) if reason else None,
  1523. )
  1524. def get_http_client(self):
  1525. # simple_httpclient only: curl doesn't expose the reason string
  1526. return SimpleAsyncHTTPClient()
  1527. def test_status(self):
  1528. response = self.fetch("/?code=304")
  1529. self.assertEqual(response.code, 304)
  1530. self.assertEqual(response.reason, "Not Modified")
  1531. response = self.fetch("/?code=304&reason=Foo")
  1532. self.assertEqual(response.code, 304)
  1533. self.assertEqual(response.reason, "Foo")
  1534. response = self.fetch("/?code=682&reason=Bar")
  1535. self.assertEqual(response.code, 682)
  1536. self.assertEqual(response.reason, "Bar")
  1537. response = self.fetch("/?code=682")
  1538. self.assertEqual(response.code, 682)
  1539. self.assertEqual(response.reason, "Unknown")
  1540. def test_header_injection(self):
  1541. response = self.fetch("/?code=200&reason=OK%0D%0AX-Injection:injected")
  1542. self.assertEqual(response.code, 200)
  1543. self.assertEqual(response.reason, "Unknown")
  1544. self.assertNotIn("X-Injection", response.headers)
  1545. def test_reason_xss(self):
  1546. response = self.fetch("/?code=400&reason=<script>alert(1)</script>")
  1547. self.assertEqual(response.code, 400)
  1548. self.assertEqual(response.reason, "Unknown")
  1549. self.assertNotIn(b"script", response.body)
  1550. self.assertIn(b"Unknown", response.body)
  1551. class DateHeaderTest(SimpleHandlerTestCase):
  1552. class Handler(RequestHandler):
  1553. def get(self):
  1554. self.write("hello")
  1555. def test_date_header(self):
  1556. response = self.fetch("/")
  1557. header_date = email.utils.parsedate_to_datetime(response.headers["Date"])
  1558. self.assertLess(
  1559. header_date - datetime.datetime.now(datetime.timezone.utc),
  1560. datetime.timedelta(seconds=2),
  1561. )
  1562. class RaiseWithReasonTest(SimpleHandlerTestCase):
  1563. class Handler(RequestHandler):
  1564. def get(self):
  1565. raise HTTPError(682, reason="Foo")
  1566. def get_http_client(self):
  1567. # simple_httpclient only: curl doesn't expose the reason string
  1568. return SimpleAsyncHTTPClient()
  1569. def test_raise_with_reason(self):
  1570. response = self.fetch("/")
  1571. self.assertEqual(response.code, 682)
  1572. self.assertEqual(response.reason, "Foo")
  1573. self.assertIn(b"682: Foo", response.body)
  1574. def test_httperror_str(self):
  1575. self.assertEqual(str(HTTPError(682, reason="Foo")), "HTTP 682: Foo")
  1576. def test_httperror_str_from_httputil(self):
  1577. self.assertEqual(str(HTTPError(682)), "HTTP 682: Unknown")
  1578. class ErrorHandlerXSRFTest(WebTestCase):
  1579. def get_handlers(self):
  1580. # note that if the handlers list is empty we get the default_host
  1581. # redirect fallback instead of a 404, so test with both an
  1582. # explicitly defined error handler and an implicit 404.
  1583. return [("/error", ErrorHandler, dict(status_code=417))]
  1584. def get_app_kwargs(self):
  1585. return dict(xsrf_cookies=True)
  1586. def test_error_xsrf(self):
  1587. response = self.fetch("/error", method="POST", body="")
  1588. self.assertEqual(response.code, 417)
  1589. def test_404_xsrf(self):
  1590. response = self.fetch("/404", method="POST", body="")
  1591. self.assertEqual(response.code, 404)
  1592. class GzipTestCase(SimpleHandlerTestCase):
  1593. class Handler(RequestHandler):
  1594. def get(self):
  1595. for v in self.get_arguments("vary"):
  1596. self.add_header("Vary", v)
  1597. # Must write at least MIN_LENGTH bytes to activate compression.
  1598. self.write("hello world" + ("!" * GZipContentEncoding.MIN_LENGTH))
  1599. def get_app_kwargs(self):
  1600. return dict(
  1601. gzip=True, static_path=os.path.join(os.path.dirname(__file__), "static")
  1602. )
  1603. def assert_compressed(self, response):
  1604. # simple_httpclient renames the content-encoding header;
  1605. # curl_httpclient doesn't.
  1606. self.assertEqual(
  1607. response.headers.get(
  1608. "Content-Encoding", response.headers.get("X-Consumed-Content-Encoding")
  1609. ),
  1610. "gzip",
  1611. )
  1612. def test_gzip(self):
  1613. response = self.fetch("/")
  1614. self.assert_compressed(response)
  1615. self.assertEqual(response.headers["Vary"], "Accept-Encoding")
  1616. def test_gzip_static(self):
  1617. # The streaming responses in StaticFileHandler have subtle
  1618. # interactions with the gzip output so test this case separately.
  1619. response = self.fetch("/robots.txt")
  1620. self.assert_compressed(response)
  1621. self.assertEqual(response.headers["Vary"], "Accept-Encoding")
  1622. def test_gzip_not_requested(self):
  1623. response = self.fetch("/", use_gzip=False)
  1624. self.assertNotIn("Content-Encoding", response.headers)
  1625. self.assertEqual(response.headers["Vary"], "Accept-Encoding")
  1626. def test_vary_already_present(self):
  1627. response = self.fetch("/?vary=Accept-Language")
  1628. self.assert_compressed(response)
  1629. self.assertEqual(
  1630. [s.strip() for s in response.headers["Vary"].split(",")],
  1631. ["Accept-Language", "Accept-Encoding"],
  1632. )
  1633. def test_vary_already_present_multiple(self):
  1634. # Regression test for https://github.com/tornadoweb/tornado/issues/1670
  1635. response = self.fetch("/?vary=Accept-Language&vary=Cookie")
  1636. self.assert_compressed(response)
  1637. self.assertEqual(
  1638. [s.strip() for s in response.headers["Vary"].split(",")],
  1639. ["Accept-Language", "Cookie", "Accept-Encoding"],
  1640. )
  1641. class PathArgsInPrepareTest(WebTestCase):
  1642. class Handler(RequestHandler):
  1643. def prepare(self):
  1644. self.write(dict(args=self.path_args, kwargs=self.path_kwargs))
  1645. def get(self, path):
  1646. assert path == "foo"
  1647. self.finish()
  1648. def get_handlers(self):
  1649. return [("/pos/(.*)", self.Handler), ("/kw/(?P<path>.*)", self.Handler)]
  1650. def test_pos(self):
  1651. response = self.fetch("/pos/foo")
  1652. response.rethrow()
  1653. data = json_decode(response.body)
  1654. self.assertEqual(data, {"args": ["foo"], "kwargs": {}})
  1655. def test_kw(self):
  1656. response = self.fetch("/kw/foo")
  1657. response.rethrow()
  1658. data = json_decode(response.body)
  1659. self.assertEqual(data, {"args": [], "kwargs": {"path": "foo"}})
  1660. class ClearAllCookiesTest(SimpleHandlerTestCase):
  1661. class Handler(RequestHandler):
  1662. def get(self):
  1663. self.clear_all_cookies()
  1664. self.write("ok")
  1665. def test_clear_all_cookies(self):
  1666. response = self.fetch("/", headers={"Cookie": "foo=bar; baz=xyzzy"})
  1667. set_cookies = sorted(response.headers.get_list("Set-Cookie"))
  1668. # Python 3.5 sends 'baz="";'; older versions use 'baz=;'
  1669. self.assertTrue(set_cookies[0].startswith('baz="";'))
  1670. self.assertTrue(set_cookies[1].startswith('foo="";'))
  1671. class PermissionError(Exception):
  1672. pass
  1673. class ExceptionHandlerTest(SimpleHandlerTestCase):
  1674. class Handler(RequestHandler):
  1675. def get(self):
  1676. exc = self.get_argument("exc")
  1677. if exc == "http":
  1678. raise HTTPError(410, "no longer here")
  1679. elif exc == "zero":
  1680. 1 / 0
  1681. elif exc == "permission":
  1682. raise PermissionError("not allowed")
  1683. def write_error(self, status_code, **kwargs):
  1684. if "exc_info" in kwargs:
  1685. typ, value, tb = kwargs["exc_info"]
  1686. if isinstance(value, PermissionError):
  1687. self.set_status(403)
  1688. self.write("PermissionError")
  1689. return
  1690. RequestHandler.write_error(self, status_code, **kwargs)
  1691. def log_exception(self, typ, value, tb):
  1692. if isinstance(value, PermissionError):
  1693. app_log.warning("custom logging for PermissionError: %s", value.args[0])
  1694. else:
  1695. RequestHandler.log_exception(self, typ, value, tb)
  1696. def test_http_error(self):
  1697. # HTTPErrors are logged as warnings with no stack trace.
  1698. # TODO: extend ExpectLog to test this more precisely
  1699. with ExpectLog(gen_log, ".*no longer here"):
  1700. response = self.fetch("/?exc=http")
  1701. self.assertEqual(response.code, 410)
  1702. def test_unknown_error(self):
  1703. # Unknown errors are logged as errors with a stack trace.
  1704. with ExpectLog(app_log, "Uncaught exception"):
  1705. response = self.fetch("/?exc=zero")
  1706. self.assertEqual(response.code, 500)
  1707. def test_known_error(self):
  1708. # log_exception can override logging behavior, and write_error
  1709. # can override the response.
  1710. with ExpectLog(app_log, "custom logging for PermissionError: not allowed"):
  1711. response = self.fetch("/?exc=permission")
  1712. self.assertEqual(response.code, 403)
  1713. class BuggyLoggingTest(SimpleHandlerTestCase):
  1714. class Handler(RequestHandler):
  1715. def get(self):
  1716. 1 / 0
  1717. def log_exception(self, typ, value, tb):
  1718. 1 / 0
  1719. def test_buggy_log_exception(self):
  1720. # Something gets logged even though the application's
  1721. # logger is broken.
  1722. with ExpectLog(app_log, ".*"):
  1723. self.fetch("/")
  1724. class UIMethodUIModuleTest(SimpleHandlerTestCase):
  1725. """Test that UI methods and modules are created correctly and
  1726. associated with the handler.
  1727. """
  1728. class Handler(RequestHandler):
  1729. def get(self):
  1730. self.render("foo.html")
  1731. def value(self):
  1732. return self.get_argument("value")
  1733. def get_app_kwargs(self):
  1734. def my_ui_method(handler, x):
  1735. return f"In my_ui_method({x}) with handler value {handler.value()}."
  1736. class MyModule(UIModule):
  1737. def render(self, x):
  1738. return "In MyModule({}) with handler value {}.".format(
  1739. x,
  1740. typing.cast(UIMethodUIModuleTest.Handler, self.handler).value(),
  1741. )
  1742. loader = DictLoader(
  1743. {"foo.html": "{{ my_ui_method(42) }} {% module MyModule(123) %}"}
  1744. )
  1745. return dict(
  1746. template_loader=loader,
  1747. ui_methods={"my_ui_method": my_ui_method},
  1748. ui_modules={"MyModule": MyModule},
  1749. )
  1750. def tearDown(self):
  1751. super().tearDown()
  1752. # TODO: fix template loader caching so this isn't necessary.
  1753. RequestHandler._template_loaders.clear()
  1754. def test_ui_method(self):
  1755. response = self.fetch("/?value=asdf")
  1756. self.assertEqual(
  1757. response.body,
  1758. b"In my_ui_method(42) with handler value asdf. "
  1759. b"In MyModule(123) with handler value asdf.",
  1760. )
  1761. class GetArgumentErrorTest(SimpleHandlerTestCase):
  1762. class Handler(RequestHandler):
  1763. def get(self):
  1764. try:
  1765. self.get_argument("foo")
  1766. self.write({})
  1767. except MissingArgumentError as e:
  1768. self.write({"arg_name": e.arg_name, "log_message": e.log_message})
  1769. def test_catch_error(self):
  1770. response = self.fetch("/")
  1771. self.assertEqual(
  1772. json_decode(response.body),
  1773. {"arg_name": "foo", "log_message": "Missing argument foo"},
  1774. )
  1775. class SetLazyPropertiesTest(SimpleHandlerTestCase):
  1776. class Handler(RequestHandler):
  1777. def prepare(self):
  1778. self.current_user = "Ben"
  1779. self.locale = locale.get("en_US")
  1780. def get_user_locale(self):
  1781. raise NotImplementedError()
  1782. def get_current_user(self):
  1783. raise NotImplementedError()
  1784. def get(self):
  1785. self.write(f"Hello {self.current_user} ({self.locale.code})")
  1786. def test_set_properties(self):
  1787. # Ensure that current_user can be assigned to normally for apps
  1788. # that want to forgo the lazy get_current_user property
  1789. response = self.fetch("/")
  1790. self.assertEqual(response.body, b"Hello Ben (en_US)")
  1791. class GetCurrentUserTest(WebTestCase):
  1792. def get_app_kwargs(self):
  1793. class WithoutUserModule(UIModule):
  1794. def render(self):
  1795. return ""
  1796. class WithUserModule(UIModule):
  1797. def render(self):
  1798. return str(self.current_user)
  1799. loader = DictLoader(
  1800. {
  1801. "without_user.html": "",
  1802. "with_user.html": "{{ current_user }}",
  1803. "without_user_module.html": "{% module WithoutUserModule() %}",
  1804. "with_user_module.html": "{% module WithUserModule() %}",
  1805. }
  1806. )
  1807. return dict(
  1808. template_loader=loader,
  1809. ui_modules={
  1810. "WithUserModule": WithUserModule,
  1811. "WithoutUserModule": WithoutUserModule,
  1812. },
  1813. )
  1814. def tearDown(self):
  1815. super().tearDown()
  1816. RequestHandler._template_loaders.clear()
  1817. def get_handlers(self):
  1818. class CurrentUserHandler(RequestHandler):
  1819. def prepare(self):
  1820. self.has_loaded_current_user = False
  1821. def get_current_user(self):
  1822. self.has_loaded_current_user = True
  1823. return ""
  1824. class WithoutUserHandler(CurrentUserHandler):
  1825. def get(self):
  1826. self.render_string("without_user.html")
  1827. self.finish(str(self.has_loaded_current_user))
  1828. class WithUserHandler(CurrentUserHandler):
  1829. def get(self):
  1830. self.render_string("with_user.html")
  1831. self.finish(str(self.has_loaded_current_user))
  1832. class CurrentUserModuleHandler(CurrentUserHandler):
  1833. def get_template_namespace(self):
  1834. # If RequestHandler.get_template_namespace is called, then
  1835. # get_current_user is evaluated. Until #820 is fixed, this
  1836. # is a small hack to circumvent the issue.
  1837. return self.ui
  1838. class WithoutUserModuleHandler(CurrentUserModuleHandler):
  1839. def get(self):
  1840. self.render_string("without_user_module.html")
  1841. self.finish(str(self.has_loaded_current_user))
  1842. class WithUserModuleHandler(CurrentUserModuleHandler):
  1843. def get(self):
  1844. self.render_string("with_user_module.html")
  1845. self.finish(str(self.has_loaded_current_user))
  1846. return [
  1847. ("/without_user", WithoutUserHandler),
  1848. ("/with_user", WithUserHandler),
  1849. ("/without_user_module", WithoutUserModuleHandler),
  1850. ("/with_user_module", WithUserModuleHandler),
  1851. ]
  1852. @unittest.skip("needs fix")
  1853. def test_get_current_user_is_lazy(self):
  1854. # TODO: Make this test pass. See #820.
  1855. response = self.fetch("/without_user")
  1856. self.assertEqual(response.body, b"False")
  1857. def test_get_current_user_works(self):
  1858. response = self.fetch("/with_user")
  1859. self.assertEqual(response.body, b"True")
  1860. def test_get_current_user_from_ui_module_is_lazy(self):
  1861. response = self.fetch("/without_user_module")
  1862. self.assertEqual(response.body, b"False")
  1863. def test_get_current_user_from_ui_module_works(self):
  1864. response = self.fetch("/with_user_module")
  1865. self.assertEqual(response.body, b"True")
  1866. class UnimplementedHTTPMethodsTest(SimpleHandlerTestCase):
  1867. class Handler(RequestHandler):
  1868. pass
  1869. def test_unimplemented_standard_methods(self):
  1870. for method in ["HEAD", "GET", "DELETE", "OPTIONS"]:
  1871. response = self.fetch("/", method=method)
  1872. self.assertEqual(response.code, 405)
  1873. for method in ["POST", "PUT"]:
  1874. response = self.fetch("/", method=method, body=b"")
  1875. self.assertEqual(response.code, 405)
  1876. class UnimplementedNonStandardMethodsTest(SimpleHandlerTestCase):
  1877. class Handler(RequestHandler):
  1878. def other(self):
  1879. # Even though this method exists, it won't get called automatically
  1880. # because it is not in SUPPORTED_METHODS.
  1881. self.write("other")
  1882. def test_unimplemented_patch(self):
  1883. # PATCH is recently standardized; Tornado supports it by default
  1884. # but wsgiref.validate doesn't like it.
  1885. response = self.fetch("/", method="PATCH", body=b"")
  1886. self.assertEqual(response.code, 405)
  1887. def test_unimplemented_other(self):
  1888. response = self.fetch("/", method="OTHER", allow_nonstandard_methods=True)
  1889. self.assertEqual(response.code, 405)
  1890. class AllHTTPMethodsTest(SimpleHandlerTestCase):
  1891. class Handler(RequestHandler):
  1892. def method(self):
  1893. assert self.request.method is not None
  1894. self.write(self.request.method)
  1895. get = delete = options = post = put = method # type: ignore
  1896. def test_standard_methods(self):
  1897. response = self.fetch("/", method="HEAD")
  1898. self.assertEqual(response.body, b"")
  1899. for method in ["GET", "DELETE", "OPTIONS"]:
  1900. response = self.fetch("/", method=method)
  1901. self.assertEqual(response.body, utf8(method))
  1902. for method in ["POST", "PUT"]:
  1903. response = self.fetch("/", method=method, body=b"")
  1904. self.assertEqual(response.body, utf8(method))
  1905. class PatchMethodTest(SimpleHandlerTestCase):
  1906. class Handler(RequestHandler):
  1907. SUPPORTED_METHODS = RequestHandler.SUPPORTED_METHODS + ( # type: ignore
  1908. "OTHER",
  1909. )
  1910. def patch(self):
  1911. self.write("patch")
  1912. def other(self):
  1913. self.write("other")
  1914. def test_patch(self):
  1915. response = self.fetch("/", method="PATCH", body=b"")
  1916. self.assertEqual(response.body, b"patch")
  1917. def test_other(self):
  1918. response = self.fetch("/", method="OTHER", allow_nonstandard_methods=True)
  1919. self.assertEqual(response.body, b"other")
  1920. class FinishInPrepareTest(SimpleHandlerTestCase):
  1921. class Handler(RequestHandler):
  1922. def prepare(self):
  1923. self.finish("done")
  1924. def get(self):
  1925. # It's difficult to assert for certain that a method did not
  1926. # or will not be called in an asynchronous context, but this
  1927. # will be logged noisily if it is reached.
  1928. raise Exception("should not reach this method")
  1929. def test_finish_in_prepare(self):
  1930. response = self.fetch("/")
  1931. self.assertEqual(response.body, b"done")
  1932. class Default404Test(WebTestCase):
  1933. def get_handlers(self):
  1934. # If there are no handlers at all a default redirect handler gets added.
  1935. return [("/foo", RequestHandler)]
  1936. def test_404(self):
  1937. response = self.fetch("/")
  1938. self.assertEqual(response.code, 404)
  1939. self.assertEqual(
  1940. response.body,
  1941. b"<html><title>404: Not Found</title>"
  1942. b"<body>404: Not Found</body></html>",
  1943. )
  1944. class Custom404Test(WebTestCase):
  1945. def get_handlers(self):
  1946. return [("/foo", RequestHandler)]
  1947. def get_app_kwargs(self):
  1948. class Custom404Handler(RequestHandler):
  1949. def get(self):
  1950. self.set_status(404)
  1951. self.write("custom 404 response")
  1952. return dict(default_handler_class=Custom404Handler)
  1953. def test_404(self):
  1954. response = self.fetch("/")
  1955. self.assertEqual(response.code, 404)
  1956. self.assertEqual(response.body, b"custom 404 response")
  1957. class DefaultHandlerArgumentsTest(WebTestCase):
  1958. def get_handlers(self):
  1959. return [("/foo", RequestHandler)]
  1960. def get_app_kwargs(self):
  1961. return dict(
  1962. default_handler_class=ErrorHandler,
  1963. default_handler_args=dict(status_code=403),
  1964. )
  1965. def test_403(self):
  1966. response = self.fetch("/")
  1967. self.assertEqual(response.code, 403)
  1968. class HandlerByNameTest(WebTestCase):
  1969. def get_handlers(self):
  1970. # All three are equivalent.
  1971. return [
  1972. ("/hello1", HelloHandler),
  1973. ("/hello2", "tornado.test.web_test.HelloHandler"),
  1974. url("/hello3", "tornado.test.web_test.HelloHandler"),
  1975. ]
  1976. def test_handler_by_name(self):
  1977. resp = self.fetch("/hello1")
  1978. self.assertEqual(resp.body, b"hello")
  1979. resp = self.fetch("/hello2")
  1980. self.assertEqual(resp.body, b"hello")
  1981. resp = self.fetch("/hello3")
  1982. self.assertEqual(resp.body, b"hello")
  1983. class StreamingRequestBodyTest(WebTestCase):
  1984. def get_handlers(self):
  1985. @stream_request_body
  1986. class StreamingBodyHandler(RequestHandler):
  1987. def initialize(self, test):
  1988. self.test = test
  1989. def prepare(self):
  1990. self.test.prepared.set_result(None)
  1991. def data_received(self, data):
  1992. self.test.data.set_result(data)
  1993. def get(self):
  1994. self.test.finished.set_result(None)
  1995. self.write({})
  1996. @stream_request_body
  1997. class EarlyReturnHandler(RequestHandler):
  1998. def prepare(self):
  1999. # If we finish the response in prepare, it won't continue to
  2000. # the (non-existent) data_received.
  2001. raise HTTPError(401)
  2002. @stream_request_body
  2003. class CloseDetectionHandler(RequestHandler):
  2004. def initialize(self, test):
  2005. self.test = test
  2006. def on_connection_close(self):
  2007. super().on_connection_close()
  2008. self.test.close_future.set_result(None)
  2009. return [
  2010. ("/stream_body", StreamingBodyHandler, dict(test=self)),
  2011. ("/early_return", EarlyReturnHandler),
  2012. ("/close_detection", CloseDetectionHandler, dict(test=self)),
  2013. ]
  2014. def connect(self, url, connection_close):
  2015. # Use a raw connection so we can control the sending of data.
  2016. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  2017. s.connect(("127.0.0.1", self.get_http_port()))
  2018. stream = IOStream(s)
  2019. stream.write(b"GET " + url + b" HTTP/1.1\r\nHost: 127.0.0.1\r\n")
  2020. if connection_close:
  2021. stream.write(b"Connection: close\r\n")
  2022. stream.write(b"Transfer-Encoding: chunked\r\n\r\n")
  2023. return stream
  2024. @gen_test
  2025. def test_streaming_body(self):
  2026. self.prepared = Future() # type: Future[None]
  2027. self.data = Future() # type: Future[bytes]
  2028. self.finished = Future() # type: Future[None]
  2029. stream = self.connect(b"/stream_body", connection_close=True)
  2030. yield self.prepared
  2031. stream.write(b"4\r\nasdf\r\n")
  2032. # Ensure the first chunk is received before we send the second.
  2033. data = yield self.data
  2034. self.assertEqual(data, b"asdf")
  2035. self.data = Future()
  2036. stream.write(b"4\r\nqwer\r\n")
  2037. data = yield self.data
  2038. self.assertEqual(data, b"qwer")
  2039. stream.write(b"0\r\n\r\n")
  2040. yield self.finished
  2041. data = yield stream.read_until_close()
  2042. # This would ideally use an HTTP1Connection to read the response.
  2043. self.assertTrue(data.endswith(b"{}"))
  2044. stream.close()
  2045. @gen_test
  2046. def test_early_return(self):
  2047. stream = self.connect(b"/early_return", connection_close=False)
  2048. data = yield stream.read_until_close()
  2049. self.assertTrue(data.startswith(b"HTTP/1.1 401"))
  2050. @gen_test
  2051. def test_early_return_with_data(self):
  2052. stream = self.connect(b"/early_return", connection_close=False)
  2053. stream.write(b"4\r\nasdf\r\n")
  2054. data = yield stream.read_until_close()
  2055. self.assertTrue(data.startswith(b"HTTP/1.1 401"))
  2056. @gen_test
  2057. def test_close_during_upload(self):
  2058. self.close_future = Future() # type: Future[None]
  2059. stream = self.connect(b"/close_detection", connection_close=False)
  2060. stream.close()
  2061. yield self.close_future
  2062. # Each method in this handler returns a yieldable object and yields to the
  2063. # IOLoop so the future is not immediately ready. Ensure that the
  2064. # yieldables are respected and no method is called before the previous
  2065. # one has completed.
  2066. @stream_request_body
  2067. class BaseFlowControlHandler(RequestHandler):
  2068. def initialize(self, test):
  2069. self.test = test
  2070. self.method = None
  2071. self.methods = [] # type: typing.List[str]
  2072. @contextlib.contextmanager
  2073. def in_method(self, method):
  2074. if self.method is not None:
  2075. self.test.fail(f"entered method {method} while in {self.method}")
  2076. self.method = method
  2077. self.methods.append(method)
  2078. try:
  2079. yield
  2080. finally:
  2081. self.method = None
  2082. @gen.coroutine
  2083. def prepare(self):
  2084. # Note that asynchronous prepare() does not block data_received,
  2085. # so we don't use in_method here.
  2086. self.methods.append("prepare")
  2087. yield gen.moment
  2088. @gen.coroutine
  2089. def post(self):
  2090. with self.in_method("post"):
  2091. yield gen.moment
  2092. self.write(dict(methods=self.methods))
  2093. class BaseStreamingRequestFlowControlTest:
  2094. def get_httpserver_options(self):
  2095. # Use a small chunk size so flow control is relevant even though
  2096. # all the data arrives at once.
  2097. return dict(chunk_size=10, decompress_request=True)
  2098. def get_http_client(self):
  2099. # simple_httpclient only: curl doesn't support body_producer.
  2100. return SimpleAsyncHTTPClient()
  2101. # Test all the slightly different code paths for fixed, chunked, etc bodies.
  2102. def test_flow_control_fixed_body(self: typing.Any):
  2103. response = self.fetch("/", body="abcdefghijklmnopqrstuvwxyz", method="POST")
  2104. response.rethrow()
  2105. self.assertEqual(
  2106. json_decode(response.body),
  2107. dict(
  2108. methods=[
  2109. "prepare",
  2110. "data_received",
  2111. "data_received",
  2112. "data_received",
  2113. "post",
  2114. ]
  2115. ),
  2116. )
  2117. def test_flow_control_chunked_body(self: typing.Any):
  2118. chunks = [b"abcd", b"efgh", b"ijkl"]
  2119. @gen.coroutine
  2120. def body_producer(write):
  2121. for i in chunks:
  2122. yield write(i)
  2123. response = self.fetch("/", body_producer=body_producer, method="POST")
  2124. response.rethrow()
  2125. self.assertEqual(
  2126. json_decode(response.body),
  2127. dict(
  2128. methods=[
  2129. "prepare",
  2130. "data_received",
  2131. "data_received",
  2132. "data_received",
  2133. "post",
  2134. ]
  2135. ),
  2136. )
  2137. def test_flow_control_compressed_body(self: typing.Any):
  2138. bytesio = BytesIO()
  2139. gzip_file = gzip.GzipFile(mode="w", fileobj=bytesio)
  2140. gzip_file.write(b"abcdefghijklmnopqrstuvwxyz")
  2141. gzip_file.close()
  2142. compressed_body = bytesio.getvalue()
  2143. response = self.fetch(
  2144. "/",
  2145. body=compressed_body,
  2146. method="POST",
  2147. headers={"Content-Encoding": "gzip"},
  2148. )
  2149. response.rethrow()
  2150. self.assertEqual(
  2151. json_decode(response.body),
  2152. dict(
  2153. methods=[
  2154. "prepare",
  2155. "data_received",
  2156. "data_received",
  2157. "data_received",
  2158. "post",
  2159. ]
  2160. ),
  2161. )
  2162. class DecoratedStreamingRequestFlowControlTest(
  2163. BaseStreamingRequestFlowControlTest, WebTestCase
  2164. ):
  2165. def get_handlers(self):
  2166. class DecoratedFlowControlHandler(BaseFlowControlHandler):
  2167. @gen.coroutine
  2168. def data_received(self, data):
  2169. with self.in_method("data_received"):
  2170. yield gen.moment
  2171. return [("/", DecoratedFlowControlHandler, dict(test=self))]
  2172. class NativeStreamingRequestFlowControlTest(
  2173. BaseStreamingRequestFlowControlTest, WebTestCase
  2174. ):
  2175. def get_handlers(self):
  2176. class NativeFlowControlHandler(BaseFlowControlHandler):
  2177. async def data_received(self, data):
  2178. with self.in_method("data_received"):
  2179. import asyncio
  2180. await asyncio.sleep(0)
  2181. return [("/", NativeFlowControlHandler, dict(test=self))]
  2182. class IncorrectContentLengthTest(SimpleHandlerTestCase):
  2183. def get_handlers(self):
  2184. test = self
  2185. self.server_error = None
  2186. # Manually set a content-length that doesn't match the actual content.
  2187. class TooHigh(RequestHandler):
  2188. def get(self):
  2189. self.set_header("Content-Length", "42")
  2190. try:
  2191. self.finish("ok")
  2192. except Exception as e:
  2193. test.server_error = e
  2194. raise
  2195. class TooLow(RequestHandler):
  2196. def get(self):
  2197. self.set_header("Content-Length", "2")
  2198. try:
  2199. self.finish("hello")
  2200. except Exception as e:
  2201. test.server_error = e
  2202. raise
  2203. return [("/high", TooHigh), ("/low", TooLow)]
  2204. def test_content_length_too_high(self):
  2205. # When the content-length is too high, the connection is simply
  2206. # closed without completing the response. An error is logged on
  2207. # the server.
  2208. with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
  2209. with ExpectLog(
  2210. gen_log,
  2211. "(Cannot send error response after headers written"
  2212. "|Failed to flush partial response)",
  2213. ):
  2214. with self.assertRaises(HTTPClientError):
  2215. self.fetch("/high", raise_error=True)
  2216. self.assertEqual(
  2217. str(self.server_error), "Tried to write 40 bytes less than Content-Length"
  2218. )
  2219. def test_content_length_too_low(self):
  2220. # When the content-length is too low, the connection is closed
  2221. # without writing the last chunk, so the client never sees the request
  2222. # complete (which would be a framing error).
  2223. with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
  2224. with ExpectLog(
  2225. gen_log,
  2226. "(Cannot send error response after headers written"
  2227. "|Failed to flush partial response)",
  2228. ):
  2229. with self.assertRaises(HTTPClientError):
  2230. self.fetch("/low", raise_error=True)
  2231. self.assertEqual(
  2232. str(self.server_error), "Tried to write more data than Content-Length"
  2233. )
  2234. class ClientCloseTest(SimpleHandlerTestCase):
  2235. class Handler(RequestHandler):
  2236. def get(self):
  2237. if self.request.version.startswith("HTTP/1"):
  2238. # Simulate a connection closed by the client during
  2239. # request processing. The client will see an error, but the
  2240. # server should respond gracefully (without logging errors
  2241. # because we were unable to write out as many bytes as
  2242. # Content-Length said we would)
  2243. self.request.connection.stream.close() # type: ignore
  2244. self.write("hello")
  2245. else:
  2246. # TODO: add a HTTP2-compatible version of this test.
  2247. self.write("requires HTTP/1.x")
  2248. def test_client_close(self):
  2249. with self.assertRaises((HTTPClientError, unittest.SkipTest)): # type: ignore
  2250. response = self.fetch("/", raise_error=True)
  2251. if response.body == b"requires HTTP/1.x":
  2252. self.skipTest("requires HTTP/1.x")
  2253. self.assertEqual(response.code, 599)
  2254. class SignedValueTest(unittest.TestCase):
  2255. SECRET = "It's a secret to everybody"
  2256. SECRET_DICT = {0: "asdfbasdf", 1: "12312312", 2: "2342342"}
  2257. def past(self):
  2258. return self.present() - 86400 * 32
  2259. def present(self):
  2260. return 1300000000
  2261. def test_known_values(self):
  2262. signed_v1 = create_signed_value(
  2263. SignedValueTest.SECRET, "key", "value", version=1, clock=self.present
  2264. )
  2265. self.assertEqual(
  2266. signed_v1, b"dmFsdWU=|1300000000|31c934969f53e48164c50768b40cbd7e2daaaa4f"
  2267. )
  2268. signed_v2 = create_signed_value(
  2269. SignedValueTest.SECRET, "key", "value", version=2, clock=self.present
  2270. )
  2271. self.assertEqual(
  2272. signed_v2,
  2273. b"2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
  2274. b"3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152",
  2275. )
  2276. signed_default = create_signed_value(
  2277. SignedValueTest.SECRET, "key", "value", clock=self.present
  2278. )
  2279. self.assertEqual(signed_default, signed_v2)
  2280. decoded_v1 = decode_signed_value(
  2281. SignedValueTest.SECRET, "key", signed_v1, min_version=1, clock=self.present
  2282. )
  2283. self.assertEqual(decoded_v1, b"value")
  2284. decoded_v2 = decode_signed_value(
  2285. SignedValueTest.SECRET, "key", signed_v2, min_version=2, clock=self.present
  2286. )
  2287. self.assertEqual(decoded_v2, b"value")
  2288. def test_name_swap(self):
  2289. signed1 = create_signed_value(
  2290. SignedValueTest.SECRET, "key1", "value", clock=self.present
  2291. )
  2292. signed2 = create_signed_value(
  2293. SignedValueTest.SECRET, "key2", "value", clock=self.present
  2294. )
  2295. # Try decoding each string with the other's "name"
  2296. decoded1 = decode_signed_value(
  2297. SignedValueTest.SECRET, "key2", signed1, clock=self.present
  2298. )
  2299. self.assertIsNone(decoded1)
  2300. decoded2 = decode_signed_value(
  2301. SignedValueTest.SECRET, "key1", signed2, clock=self.present
  2302. )
  2303. self.assertIsNone(decoded2)
  2304. def test_expired(self):
  2305. signed = create_signed_value(
  2306. SignedValueTest.SECRET, "key1", "value", clock=self.past
  2307. )
  2308. decoded_past = decode_signed_value(
  2309. SignedValueTest.SECRET, "key1", signed, clock=self.past
  2310. )
  2311. self.assertEqual(decoded_past, b"value")
  2312. decoded_present = decode_signed_value(
  2313. SignedValueTest.SECRET, "key1", signed, clock=self.present
  2314. )
  2315. self.assertIsNone(decoded_present)
  2316. def test_payload_tampering(self):
  2317. # These cookies are variants of the one in test_known_values.
  2318. sig = "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"
  2319. def validate(prefix):
  2320. return b"value" == decode_signed_value(
  2321. SignedValueTest.SECRET, "key", prefix + sig, clock=self.present
  2322. )
  2323. self.assertTrue(validate("2|1:0|10:1300000000|3:key|8:dmFsdWU=|"))
  2324. # Change key version
  2325. self.assertFalse(validate("2|1:1|10:1300000000|3:key|8:dmFsdWU=|"))
  2326. # length mismatch (field too short)
  2327. self.assertFalse(validate("2|1:0|10:130000000|3:key|8:dmFsdWU=|"))
  2328. # length mismatch (field too long)
  2329. self.assertFalse(validate("2|1:0|10:1300000000|3:keey|8:dmFsdWU=|"))
  2330. def test_signature_tampering(self):
  2331. prefix = "2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
  2332. def validate(sig):
  2333. return b"value" == decode_signed_value(
  2334. SignedValueTest.SECRET, "key", prefix + sig, clock=self.present
  2335. )
  2336. self.assertTrue(
  2337. validate("3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")
  2338. )
  2339. # All zeros
  2340. self.assertFalse(validate("0" * 32))
  2341. # Change one character
  2342. self.assertFalse(
  2343. validate("4d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")
  2344. )
  2345. # Change another character
  2346. self.assertFalse(
  2347. validate("3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e153")
  2348. )
  2349. # Truncate
  2350. self.assertFalse(
  2351. validate("3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e15")
  2352. )
  2353. # Lengthen
  2354. self.assertFalse(
  2355. validate(
  2356. "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e1538"
  2357. )
  2358. )
  2359. def test_non_ascii(self):
  2360. value = b"\xe9"
  2361. signed = create_signed_value(
  2362. SignedValueTest.SECRET, "key", value, clock=self.present
  2363. )
  2364. decoded = decode_signed_value(
  2365. SignedValueTest.SECRET, "key", signed, clock=self.present
  2366. )
  2367. self.assertEqual(value, decoded)
  2368. def test_key_versioning_read_write_default_key(self):
  2369. value = b"\xe9"
  2370. signed = create_signed_value(
  2371. SignedValueTest.SECRET_DICT, "key", value, clock=self.present, key_version=0
  2372. )
  2373. decoded = decode_signed_value(
  2374. SignedValueTest.SECRET_DICT, "key", signed, clock=self.present
  2375. )
  2376. self.assertEqual(value, decoded)
  2377. def test_key_versioning_read_write_non_default_key(self):
  2378. value = b"\xe9"
  2379. signed = create_signed_value(
  2380. SignedValueTest.SECRET_DICT, "key", value, clock=self.present, key_version=1
  2381. )
  2382. decoded = decode_signed_value(
  2383. SignedValueTest.SECRET_DICT, "key", signed, clock=self.present
  2384. )
  2385. self.assertEqual(value, decoded)
  2386. def test_key_versioning_invalid_key(self):
  2387. value = b"\xe9"
  2388. signed = create_signed_value(
  2389. SignedValueTest.SECRET_DICT, "key", value, clock=self.present, key_version=0
  2390. )
  2391. newkeys = SignedValueTest.SECRET_DICT.copy()
  2392. newkeys.pop(0)
  2393. decoded = decode_signed_value(newkeys, "key", signed, clock=self.present)
  2394. self.assertIsNone(decoded)
  2395. def test_key_version_retrieval(self):
  2396. value = b"\xe9"
  2397. signed = create_signed_value(
  2398. SignedValueTest.SECRET_DICT, "key", value, clock=self.present, key_version=1
  2399. )
  2400. key_version = get_signature_key_version(signed)
  2401. self.assertEqual(1, key_version)
  2402. class XSRFTest(SimpleHandlerTestCase):
  2403. class Handler(RequestHandler):
  2404. def get(self):
  2405. version = int(self.get_argument("version", "2"))
  2406. # This would be a bad idea in a real app, but in this test
  2407. # it's fine.
  2408. self.settings["xsrf_cookie_version"] = version
  2409. self.write(self.xsrf_token)
  2410. def post(self):
  2411. self.write("ok")
  2412. def get_app_kwargs(self):
  2413. return dict(xsrf_cookies=True)
  2414. def setUp(self):
  2415. super().setUp()
  2416. self.xsrf_token = self.get_token()
  2417. def get_token(self, old_token=None, version=None):
  2418. if old_token is not None:
  2419. headers = self.cookie_headers(old_token)
  2420. else:
  2421. headers = None
  2422. response = self.fetch(
  2423. "/" if version is None else ("/?version=%d" % version), headers=headers
  2424. )
  2425. response.rethrow()
  2426. return native_str(response.body)
  2427. def cookie_headers(self, token=None):
  2428. if token is None:
  2429. token = self.xsrf_token
  2430. return {"Cookie": "_xsrf=" + token}
  2431. def test_xsrf_fail_no_token(self):
  2432. with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
  2433. response = self.fetch("/", method="POST", body=b"")
  2434. self.assertEqual(response.code, 403)
  2435. def test_xsrf_fail_body_no_cookie(self):
  2436. with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
  2437. response = self.fetch(
  2438. "/",
  2439. method="POST",
  2440. body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2441. )
  2442. self.assertEqual(response.code, 403)
  2443. def test_xsrf_fail_argument_invalid_format(self):
  2444. with ExpectLog(gen_log, ".*'_xsrf' argument has invalid format"):
  2445. response = self.fetch(
  2446. "/",
  2447. method="POST",
  2448. headers=self.cookie_headers(),
  2449. body=urllib.parse.urlencode(dict(_xsrf="3|")),
  2450. )
  2451. self.assertEqual(response.code, 403)
  2452. def test_xsrf_fail_cookie_invalid_format(self):
  2453. with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
  2454. response = self.fetch(
  2455. "/",
  2456. method="POST",
  2457. headers=self.cookie_headers(token="3|"),
  2458. body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2459. )
  2460. self.assertEqual(response.code, 403)
  2461. def test_xsrf_fail_cookie_no_body(self):
  2462. with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
  2463. response = self.fetch(
  2464. "/", method="POST", body=b"", headers=self.cookie_headers()
  2465. )
  2466. self.assertEqual(response.code, 403)
  2467. def test_xsrf_success_short_token(self):
  2468. response = self.fetch(
  2469. "/",
  2470. method="POST",
  2471. body=urllib.parse.urlencode(dict(_xsrf="deadbeef")),
  2472. headers=self.cookie_headers(token="deadbeef"),
  2473. )
  2474. self.assertEqual(response.code, 200)
  2475. def test_xsrf_success_non_hex_token(self):
  2476. response = self.fetch(
  2477. "/",
  2478. method="POST",
  2479. body=urllib.parse.urlencode(dict(_xsrf="xoxo")),
  2480. headers=self.cookie_headers(token="xoxo"),
  2481. )
  2482. self.assertEqual(response.code, 200)
  2483. def test_xsrf_success_post_body(self):
  2484. response = self.fetch(
  2485. "/",
  2486. method="POST",
  2487. body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2488. headers=self.cookie_headers(),
  2489. )
  2490. self.assertEqual(response.code, 200)
  2491. def test_xsrf_success_query_string(self):
  2492. response = self.fetch(
  2493. "/?" + urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2494. method="POST",
  2495. body=b"",
  2496. headers=self.cookie_headers(),
  2497. )
  2498. self.assertEqual(response.code, 200)
  2499. def test_xsrf_success_header(self):
  2500. response = self.fetch(
  2501. "/",
  2502. method="POST",
  2503. body=b"",
  2504. headers=dict(
  2505. {"X-Xsrftoken": self.xsrf_token}, # type: ignore
  2506. **self.cookie_headers(),
  2507. ),
  2508. )
  2509. self.assertEqual(response.code, 200)
  2510. def test_distinct_tokens(self):
  2511. # Every request gets a distinct token.
  2512. NUM_TOKENS = 10
  2513. tokens = set()
  2514. for i in range(NUM_TOKENS):
  2515. tokens.add(self.get_token())
  2516. self.assertEqual(len(tokens), NUM_TOKENS)
  2517. def test_cross_user(self):
  2518. token2 = self.get_token()
  2519. # Each token can be used to authenticate its own request.
  2520. for token in (self.xsrf_token, token2):
  2521. response = self.fetch(
  2522. "/",
  2523. method="POST",
  2524. body=urllib.parse.urlencode(dict(_xsrf=token)),
  2525. headers=self.cookie_headers(token),
  2526. )
  2527. self.assertEqual(response.code, 200)
  2528. # Sending one in the cookie and the other in the body is not allowed.
  2529. for cookie_token, body_token in (
  2530. (self.xsrf_token, token2),
  2531. (token2, self.xsrf_token),
  2532. ):
  2533. with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
  2534. response = self.fetch(
  2535. "/",
  2536. method="POST",
  2537. body=urllib.parse.urlencode(dict(_xsrf=body_token)),
  2538. headers=self.cookie_headers(cookie_token),
  2539. )
  2540. self.assertEqual(response.code, 403)
  2541. def test_refresh_token(self):
  2542. token = self.xsrf_token
  2543. tokens_seen = {token}
  2544. # A user's token is stable over time. Refreshing the page in one tab
  2545. # might update the cookie while an older tab still has the old cookie
  2546. # in its DOM. Simulate this scenario by passing a constant token
  2547. # in the body and re-querying for the token.
  2548. for i in range(5):
  2549. token = self.get_token(token)
  2550. # Tokens are encoded uniquely each time
  2551. tokens_seen.add(token)
  2552. response = self.fetch(
  2553. "/",
  2554. method="POST",
  2555. body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2556. headers=self.cookie_headers(token),
  2557. )
  2558. self.assertEqual(response.code, 200)
  2559. self.assertEqual(len(tokens_seen), 6)
  2560. def test_versioning(self):
  2561. # Version 1 still produces distinct tokens per request.
  2562. self.assertNotEqual(self.get_token(version=1), self.get_token(version=1))
  2563. # Refreshed v1 tokens are all identical.
  2564. v1_token = self.get_token(version=1)
  2565. for i in range(5):
  2566. self.assertEqual(self.get_token(v1_token, version=1), v1_token)
  2567. # Upgrade to a v2 version of the same token
  2568. v2_token = self.get_token(v1_token)
  2569. self.assertNotEqual(v1_token, v2_token)
  2570. # Each v1 token can map to many v2 tokens.
  2571. self.assertNotEqual(v2_token, self.get_token(v1_token))
  2572. # The tokens are cross-compatible.
  2573. for cookie_token, body_token in ((v1_token, v2_token), (v2_token, v1_token)):
  2574. response = self.fetch(
  2575. "/",
  2576. method="POST",
  2577. body=urllib.parse.urlencode(dict(_xsrf=body_token)),
  2578. headers=self.cookie_headers(cookie_token),
  2579. )
  2580. self.assertEqual(response.code, 200)
  2581. # A subset of the previous test with a different cookie name
  2582. class XSRFCookieNameTest(SimpleHandlerTestCase):
  2583. class Handler(RequestHandler):
  2584. def get(self):
  2585. self.write(self.xsrf_token)
  2586. def post(self):
  2587. self.write("ok")
  2588. def get_app_kwargs(self):
  2589. return dict(
  2590. xsrf_cookies=True,
  2591. xsrf_cookie_name="__Host-xsrf",
  2592. xsrf_cookie_kwargs={"secure": True},
  2593. )
  2594. def setUp(self):
  2595. super().setUp()
  2596. self.xsrf_token = self.get_token()
  2597. def get_token(self, old_token=None):
  2598. if old_token is not None:
  2599. headers = self.cookie_headers(old_token)
  2600. else:
  2601. headers = None
  2602. response = self.fetch("/", headers=headers)
  2603. response.rethrow()
  2604. return native_str(response.body)
  2605. def cookie_headers(self, token=None):
  2606. if token is None:
  2607. token = self.xsrf_token
  2608. return {"Cookie": "__Host-xsrf=" + token}
  2609. def test_xsrf_fail_no_token(self):
  2610. with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
  2611. response = self.fetch("/", method="POST", body=b"")
  2612. self.assertEqual(response.code, 403)
  2613. def test_xsrf_fail_body_no_cookie(self):
  2614. with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
  2615. response = self.fetch(
  2616. "/",
  2617. method="POST",
  2618. body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2619. )
  2620. self.assertEqual(response.code, 403)
  2621. def test_xsrf_success_post_body(self):
  2622. response = self.fetch(
  2623. "/",
  2624. method="POST",
  2625. # Note that renaming the cookie doesn't rename the POST param
  2626. body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2627. headers=self.cookie_headers(),
  2628. )
  2629. self.assertEqual(response.code, 200)
  2630. class XSRFCookieKwargsTest(SimpleHandlerTestCase):
  2631. class Handler(RequestHandler):
  2632. def get(self):
  2633. self.write(self.xsrf_token)
  2634. def get_app_kwargs(self):
  2635. return dict(
  2636. xsrf_cookies=True, xsrf_cookie_kwargs=dict(httponly=True, expires_days=2)
  2637. )
  2638. def test_xsrf_httponly(self):
  2639. response = self.fetch("/")
  2640. self.assertIn("httponly;", response.headers["Set-Cookie"].lower())
  2641. self.assertIn("expires=", response.headers["Set-Cookie"].lower())
  2642. header = response.headers.get("Set-Cookie")
  2643. assert header is not None
  2644. match = re.match(".*; expires=(?P<expires>.+);.*", header)
  2645. assert match is not None
  2646. expires = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
  2647. days=2
  2648. )
  2649. header_expires = email.utils.parsedate_to_datetime(match.groupdict()["expires"])
  2650. if header_expires.tzinfo is None:
  2651. header_expires = header_expires.replace(tzinfo=datetime.timezone.utc)
  2652. self.assertTrue(abs((expires - header_expires).total_seconds()) < 10)
  2653. class FinishExceptionTest(SimpleHandlerTestCase):
  2654. class Handler(RequestHandler):
  2655. def get(self):
  2656. self.set_status(401)
  2657. self.set_header("WWW-Authenticate", 'Basic realm="something"')
  2658. if self.get_argument("finish_value", ""):
  2659. raise Finish("authentication required")
  2660. else:
  2661. self.write("authentication required")
  2662. raise Finish()
  2663. def test_finish_exception(self):
  2664. for u in ["/", "/?finish_value=1"]:
  2665. response = self.fetch(u)
  2666. self.assertEqual(response.code, 401)
  2667. self.assertEqual(
  2668. 'Basic realm="something"', response.headers.get("WWW-Authenticate")
  2669. )
  2670. self.assertEqual(b"authentication required", response.body)
  2671. class DecoratorTest(WebTestCase):
  2672. def get_handlers(self):
  2673. class RemoveSlashHandler(RequestHandler):
  2674. @removeslash
  2675. def get(self):
  2676. pass
  2677. class AddSlashHandler(RequestHandler):
  2678. @addslash
  2679. def get(self):
  2680. pass
  2681. return [("/removeslash/", RemoveSlashHandler), ("/addslash", AddSlashHandler)]
  2682. def test_removeslash(self):
  2683. response = self.fetch("/removeslash/", follow_redirects=False)
  2684. self.assertEqual(response.code, 301)
  2685. self.assertEqual(response.headers["Location"], "/removeslash")
  2686. response = self.fetch("/removeslash/?foo=bar", follow_redirects=False)
  2687. self.assertEqual(response.code, 301)
  2688. self.assertEqual(response.headers["Location"], "/removeslash?foo=bar")
  2689. def test_addslash(self):
  2690. response = self.fetch("/addslash", follow_redirects=False)
  2691. self.assertEqual(response.code, 301)
  2692. self.assertEqual(response.headers["Location"], "/addslash/")
  2693. response = self.fetch("/addslash?foo=bar", follow_redirects=False)
  2694. self.assertEqual(response.code, 301)
  2695. self.assertEqual(response.headers["Location"], "/addslash/?foo=bar")
  2696. class CacheTest(WebTestCase):
  2697. def get_handlers(self):
  2698. class EtagHandler(RequestHandler):
  2699. def get(self, computed_etag):
  2700. self.write(computed_etag)
  2701. def compute_etag(self):
  2702. return self._write_buffer[0]
  2703. return [("/etag/(.*)", EtagHandler)]
  2704. def test_wildcard_etag(self):
  2705. computed_etag = '"xyzzy"'
  2706. etags = "*"
  2707. self._test_etag(computed_etag, etags, 304)
  2708. def test_strong_etag_match(self):
  2709. computed_etag = '"xyzzy"'
  2710. etags = '"xyzzy"'
  2711. self._test_etag(computed_etag, etags, 304)
  2712. def test_multiple_strong_etag_match(self):
  2713. computed_etag = '"xyzzy1"'
  2714. etags = '"xyzzy1", "xyzzy2"'
  2715. self._test_etag(computed_etag, etags, 304)
  2716. def test_strong_etag_not_match(self):
  2717. computed_etag = '"xyzzy"'
  2718. etags = '"xyzzy1"'
  2719. self._test_etag(computed_etag, etags, 200)
  2720. def test_multiple_strong_etag_not_match(self):
  2721. computed_etag = '"xyzzy"'
  2722. etags = '"xyzzy1", "xyzzy2"'
  2723. self._test_etag(computed_etag, etags, 200)
  2724. def test_weak_etag_match(self):
  2725. computed_etag = '"xyzzy1"'
  2726. etags = 'W/"xyzzy1"'
  2727. self._test_etag(computed_etag, etags, 304)
  2728. def test_multiple_weak_etag_match(self):
  2729. computed_etag = '"xyzzy2"'
  2730. etags = 'W/"xyzzy1", W/"xyzzy2"'
  2731. self._test_etag(computed_etag, etags, 304)
  2732. def test_weak_etag_not_match(self):
  2733. computed_etag = '"xyzzy2"'
  2734. etags = 'W/"xyzzy1"'
  2735. self._test_etag(computed_etag, etags, 200)
  2736. def test_multiple_weak_etag_not_match(self):
  2737. computed_etag = '"xyzzy3"'
  2738. etags = 'W/"xyzzy1", W/"xyzzy2"'
  2739. self._test_etag(computed_etag, etags, 200)
  2740. def _test_etag(self, computed_etag, etags, status_code):
  2741. response = self.fetch(
  2742. "/etag/" + computed_etag, headers={"If-None-Match": etags}
  2743. )
  2744. self.assertEqual(response.code, status_code)
  2745. class RequestSummaryTest(SimpleHandlerTestCase):
  2746. class Handler(RequestHandler):
  2747. def get(self):
  2748. # remote_ip is optional, although it's set by
  2749. # both HTTPServer and WSGIAdapter.
  2750. # Clobber it to make sure it doesn't break logging.
  2751. self.request.remote_ip = None
  2752. self.finish(self._request_summary())
  2753. def test_missing_remote_ip(self):
  2754. resp = self.fetch("/")
  2755. self.assertEqual(resp.body, b"GET / (None)")
  2756. class HTTPErrorTest(unittest.TestCase):
  2757. def test_copy(self):
  2758. e = HTTPError(403, reason="Go away")
  2759. e2 = copy.copy(e)
  2760. self.assertIsNot(e, e2)
  2761. self.assertEqual(e.status_code, e2.status_code)
  2762. self.assertEqual(e.reason, e2.reason)
  2763. class ApplicationTest(AsyncTestCase):
  2764. def test_listen(self):
  2765. app = Application([])
  2766. server = app.listen(0, address="127.0.0.1")
  2767. server.stop()
  2768. class URLSpecReverseTest(unittest.TestCase):
  2769. def test_reverse(self):
  2770. self.assertEqual("/favicon.ico", url(r"/favicon\.ico", None).reverse())
  2771. self.assertEqual("/favicon.ico", url(r"^/favicon\.ico$", None).reverse())
  2772. def test_non_reversible(self):
  2773. # URLSpecs are non-reversible if they include non-constant
  2774. # regex features outside capturing groups. Currently, this is
  2775. # only strictly enforced for backslash-escaped character
  2776. # classes.
  2777. paths = [r"^/api/v\d+/foo/(\w+)$"]
  2778. for path in paths:
  2779. # A URLSpec can still be created even if it cannot be reversed.
  2780. url_spec = url(path, None)
  2781. try:
  2782. result = url_spec.reverse()
  2783. self.fail(
  2784. "did not get expected exception when reversing %s. "
  2785. "result: %s" % (path, result)
  2786. )
  2787. except ValueError:
  2788. pass
  2789. def test_reverse_arguments(self):
  2790. self.assertEqual(
  2791. "/api/v1/foo/bar", url(r"^/api/v1/foo/(\w+)$", None).reverse("bar")
  2792. )
  2793. self.assertEqual(
  2794. "/api.v1/foo/5/icon.png",
  2795. url(r"/api\.v1/foo/([0-9]+)/icon\.png", None).reverse(5),
  2796. )
  2797. class RedirectHandlerTest(WebTestCase):
  2798. def get_handlers(self):
  2799. return [
  2800. ("/src", WebRedirectHandler, {"url": "/dst"}),
  2801. ("/src2", WebRedirectHandler, {"url": "/dst2?foo=bar"}),
  2802. (r"/(.*?)/(.*?)/(.*)", WebRedirectHandler, {"url": "/{1}/{0}/{2}"}),
  2803. ]
  2804. def test_basic_redirect(self):
  2805. response = self.fetch("/src", follow_redirects=False)
  2806. self.assertEqual(response.code, 301)
  2807. self.assertEqual(response.headers["Location"], "/dst")
  2808. def test_redirect_with_argument(self):
  2809. response = self.fetch("/src?foo=bar", follow_redirects=False)
  2810. self.assertEqual(response.code, 301)
  2811. self.assertEqual(response.headers["Location"], "/dst?foo=bar")
  2812. def test_redirect_with_appending_argument(self):
  2813. response = self.fetch("/src2?foo2=bar2", follow_redirects=False)
  2814. self.assertEqual(response.code, 301)
  2815. self.assertEqual(response.headers["Location"], "/dst2?foo=bar&foo2=bar2")
  2816. def test_redirect_pattern(self):
  2817. response = self.fetch("/a/b/c", follow_redirects=False)
  2818. self.assertEqual(response.code, 301)
  2819. self.assertEqual(response.headers["Location"], "/b/a/c")
  2820. class AcceptLanguageTest(WebTestCase):
  2821. """Test evaluation of Accept-Language header"""
  2822. def get_handlers(self):
  2823. locale.load_gettext_translations(
  2824. os.path.join(os.path.dirname(__file__), "gettext_translations"),
  2825. "tornado_test",
  2826. )
  2827. class AcceptLanguageHandler(RequestHandler):
  2828. def get(self):
  2829. self.set_header(
  2830. "Content-Language", self.get_browser_locale().code.replace("_", "-")
  2831. )
  2832. self.finish(b"")
  2833. return [
  2834. ("/", AcceptLanguageHandler),
  2835. ]
  2836. def test_accept_language(self):
  2837. response = self.fetch("/", headers={"Accept-Language": "fr-FR;q=0.9"})
  2838. self.assertEqual(response.headers["Content-Language"], "fr-FR")
  2839. response = self.fetch("/", headers={"Accept-Language": "fr-FR; q=0.9"})
  2840. self.assertEqual(response.headers["Content-Language"], "fr-FR")
  2841. def test_accept_language_ignore(self):
  2842. response = self.fetch("/", headers={"Accept-Language": "fr-FR;q=0"})
  2843. self.assertEqual(response.headers["Content-Language"], "en-US")
  2844. def test_accept_language_invalid(self):
  2845. response = self.fetch("/", headers={"Accept-Language": "fr-FR;q=-1"})
  2846. self.assertEqual(response.headers["Content-Language"], "en-US")