wsgi_test.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import asyncio
  2. import concurrent.futures
  3. import threading
  4. from wsgiref.validate import validator
  5. from tornado.routing import RuleRouter
  6. from tornado.testing import AsyncHTTPTestCase, gen_test
  7. from tornado.wsgi import WSGIContainer
  8. class WSGIAppMixin:
  9. # TODO: Now that WSGIAdapter is gone, this is a pretty weak test.
  10. def get_executor(self):
  11. raise NotImplementedError()
  12. def get_app(self):
  13. executor = self.get_executor()
  14. # The barrier test in DummyExecutorTest will always wait the full
  15. # value of this timeout, so we don't want it to be too high.
  16. self.barrier = threading.Barrier(2, timeout=0.3)
  17. def make_container(app):
  18. return WSGIContainer(validator(app), executor=executor)
  19. return RuleRouter(
  20. [
  21. ("/simple", make_container(self.simple_wsgi_app)),
  22. ("/barrier", make_container(self.barrier_wsgi_app)),
  23. ("/streaming_barrier", make_container(self.streaming_barrier_wsgi_app)),
  24. ]
  25. )
  26. def respond_plain(self, start_response):
  27. status = "200 OK"
  28. response_headers = [("Content-Type", "text/plain")]
  29. start_response(status, response_headers)
  30. def simple_wsgi_app(self, environ, start_response):
  31. self.respond_plain(start_response)
  32. return [b"Hello world!"]
  33. def barrier_wsgi_app(self, environ, start_response):
  34. self.respond_plain(start_response)
  35. try:
  36. n = self.barrier.wait()
  37. except threading.BrokenBarrierError:
  38. return [b"broken barrier"]
  39. else:
  40. return [b"ok %d" % n]
  41. def streaming_barrier_wsgi_app(self, environ, start_response):
  42. self.respond_plain(start_response)
  43. yield b"ok "
  44. try:
  45. n = self.barrier.wait()
  46. except threading.BrokenBarrierError:
  47. yield b"broken barrier"
  48. else:
  49. yield b"%d" % n
  50. class WSGIContainerDummyExecutorTest(WSGIAppMixin, AsyncHTTPTestCase):
  51. def get_executor(self):
  52. return None
  53. def test_simple(self):
  54. response = self.fetch("/simple")
  55. self.assertEqual(response.body, b"Hello world!")
  56. @gen_test
  57. async def test_concurrent_barrier(self):
  58. self.barrier.reset()
  59. resps = await asyncio.gather(
  60. self.http_client.fetch(self.get_url("/barrier")),
  61. self.http_client.fetch(self.get_url("/barrier")),
  62. )
  63. for resp in resps:
  64. self.assertEqual(resp.body, b"broken barrier")
  65. @gen_test
  66. async def test_concurrent_streaming_barrier(self):
  67. self.barrier.reset()
  68. resps = await asyncio.gather(
  69. self.http_client.fetch(self.get_url("/streaming_barrier")),
  70. self.http_client.fetch(self.get_url("/streaming_barrier")),
  71. )
  72. for resp in resps:
  73. self.assertEqual(resp.body, b"ok broken barrier")
  74. class WSGIContainerThreadPoolTest(WSGIAppMixin, AsyncHTTPTestCase):
  75. def get_executor(self):
  76. return concurrent.futures.ThreadPoolExecutor()
  77. def test_simple(self):
  78. response = self.fetch("/simple")
  79. self.assertEqual(response.body, b"Hello world!")
  80. @gen_test
  81. async def test_concurrent_barrier(self):
  82. self.barrier.reset()
  83. resps = await asyncio.gather(
  84. self.http_client.fetch(self.get_url("/barrier")),
  85. self.http_client.fetch(self.get_url("/barrier")),
  86. )
  87. self.assertEqual([b"ok 0", b"ok 1"], sorted([resp.body for resp in resps]))
  88. @gen_test
  89. async def test_concurrent_streaming_barrier(self):
  90. self.barrier.reset()
  91. resps = await asyncio.gather(
  92. self.http_client.fetch(self.get_url("/streaming_barrier")),
  93. self.http_client.fetch(self.get_url("/streaming_barrier")),
  94. )
  95. self.assertEqual([b"ok 0", b"ok 1"], sorted([resp.body for resp in resps]))