tcpserver.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. #
  2. # Copyright 2011 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """A non-blocking, single-threaded TCP server."""
  16. import errno
  17. import os
  18. import socket
  19. import ssl
  20. from tornado import gen
  21. from tornado.log import app_log
  22. from tornado.ioloop import IOLoop
  23. from tornado.iostream import IOStream, SSLIOStream
  24. from tornado.netutil import (
  25. bind_sockets,
  26. add_accept_handler,
  27. ssl_wrap_socket,
  28. _DEFAULT_BACKLOG,
  29. )
  30. from tornado import process
  31. from tornado.util import errno_from_exception
  32. import typing
  33. from typing import Union, Dict, Any, Iterable, Optional, Awaitable
  34. if typing.TYPE_CHECKING:
  35. from typing import Callable, List # noqa: F401
  36. class TCPServer:
  37. r"""A non-blocking, single-threaded TCP server.
  38. To use `TCPServer`, define a subclass which overrides the `handle_stream`
  39. method. For example, a simple echo server could be defined like this::
  40. from tornado.tcpserver import TCPServer
  41. from tornado.iostream import StreamClosedError
  42. class EchoServer(TCPServer):
  43. async def handle_stream(self, stream, address):
  44. while True:
  45. try:
  46. data = await stream.read_until(b"\n") await
  47. stream.write(data)
  48. except StreamClosedError:
  49. break
  50. To make this server serve SSL traffic, send the ``ssl_options`` keyword
  51. argument with an `ssl.SSLContext` object. For compatibility with older
  52. versions of Python ``ssl_options`` may also be a dictionary of keyword
  53. arguments for the `ssl.SSLContext.wrap_socket` method.::
  54. ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
  55. ssl_ctx.load_cert_chain(os.path.join(data_dir, "mydomain.crt"),
  56. os.path.join(data_dir, "mydomain.key"))
  57. TCPServer(ssl_options=ssl_ctx)
  58. `TCPServer` initialization follows one of three patterns:
  59. 1. `listen`: single-process::
  60. async def main():
  61. server = TCPServer()
  62. server.listen(8888)
  63. await asyncio.Event().wait()
  64. asyncio.run(main())
  65. While this example does not create multiple processes on its own, when
  66. the ``reuse_port=True`` argument is passed to ``listen()`` you can run
  67. the program multiple times to create a multi-process service.
  68. 2. `add_sockets`: multi-process::
  69. sockets = bind_sockets(8888)
  70. tornado.process.fork_processes(0)
  71. async def post_fork_main():
  72. server = TCPServer()
  73. server.add_sockets(sockets)
  74. await asyncio.Event().wait()
  75. asyncio.run(post_fork_main())
  76. The `add_sockets` interface is more complicated, but it can be used with
  77. `tornado.process.fork_processes` to run a multi-process service with all
  78. worker processes forked from a single parent. `add_sockets` can also be
  79. used in single-process servers if you want to create your listening
  80. sockets in some way other than `~tornado.netutil.bind_sockets`.
  81. Note that when using this pattern, nothing that touches the event loop
  82. can be run before ``fork_processes``.
  83. 3. `bind`/`start`: simple **deprecated** multi-process::
  84. server = TCPServer()
  85. server.bind(8888)
  86. server.start(0) # Forks multiple sub-processes
  87. IOLoop.current().start()
  88. This pattern is deprecated because it requires interfaces in the
  89. `asyncio` module that have been deprecated since Python 3.10. Support for
  90. creating multiple processes in the ``start`` method will be removed in a
  91. future version of Tornado.
  92. .. versionadded:: 3.1
  93. The ``max_buffer_size`` argument.
  94. .. versionchanged:: 5.0
  95. The ``io_loop`` argument has been removed.
  96. """
  97. def __init__(
  98. self,
  99. ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
  100. max_buffer_size: Optional[int] = None,
  101. read_chunk_size: Optional[int] = None,
  102. ) -> None:
  103. self.ssl_options = ssl_options
  104. self._sockets = {} # type: Dict[int, socket.socket]
  105. self._handlers = {} # type: Dict[int, Callable[[], None]]
  106. self._pending_sockets = [] # type: List[socket.socket]
  107. self._started = False
  108. self._stopped = False
  109. self.max_buffer_size = max_buffer_size
  110. self.read_chunk_size = read_chunk_size
  111. # Verify the SSL options. Otherwise we don't get errors until clients
  112. # connect. This doesn't verify that the keys are legitimate, but
  113. # the SSL module doesn't do that until there is a connected socket
  114. # which seems like too much work
  115. if self.ssl_options is not None and isinstance(self.ssl_options, dict):
  116. # Only certfile is required: it can contain both keys
  117. if "certfile" not in self.ssl_options:
  118. raise KeyError('missing key "certfile" in ssl_options')
  119. if not os.path.exists(self.ssl_options["certfile"]):
  120. raise ValueError(
  121. 'certfile "%s" does not exist' % self.ssl_options["certfile"]
  122. )
  123. if "keyfile" in self.ssl_options and not os.path.exists(
  124. self.ssl_options["keyfile"]
  125. ):
  126. raise ValueError(
  127. 'keyfile "%s" does not exist' % self.ssl_options["keyfile"]
  128. )
  129. def listen(
  130. self,
  131. port: int,
  132. address: Optional[str] = None,
  133. family: socket.AddressFamily = socket.AF_UNSPEC,
  134. backlog: int = _DEFAULT_BACKLOG,
  135. flags: Optional[int] = None,
  136. reuse_port: bool = False,
  137. ) -> None:
  138. """Starts accepting connections on the given port.
  139. This method may be called more than once to listen on multiple ports.
  140. `listen` takes effect immediately; it is not necessary to call
  141. `TCPServer.start` afterwards. It is, however, necessary to start the
  142. event loop if it is not already running.
  143. All arguments have the same meaning as in
  144. `tornado.netutil.bind_sockets`.
  145. .. versionchanged:: 6.2
  146. Added ``family``, ``backlog``, ``flags``, and ``reuse_port``
  147. arguments to match `tornado.netutil.bind_sockets`.
  148. """
  149. sockets = bind_sockets(
  150. port,
  151. address=address,
  152. family=family,
  153. backlog=backlog,
  154. flags=flags,
  155. reuse_port=reuse_port,
  156. )
  157. self.add_sockets(sockets)
  158. def add_sockets(self, sockets: Iterable[socket.socket]) -> None:
  159. """Makes this server start accepting connections on the given sockets.
  160. The ``sockets`` parameter is a list of socket objects such as
  161. those returned by `~tornado.netutil.bind_sockets`.
  162. `add_sockets` is typically used in combination with that
  163. method and `tornado.process.fork_processes` to provide greater
  164. control over the initialization of a multi-process server.
  165. """
  166. for sock in sockets:
  167. self._sockets[sock.fileno()] = sock
  168. self._handlers[sock.fileno()] = add_accept_handler(
  169. sock, self._handle_connection
  170. )
  171. def add_socket(self, socket: socket.socket) -> None:
  172. """Singular version of `add_sockets`. Takes a single socket object."""
  173. self.add_sockets([socket])
  174. def bind(
  175. self,
  176. port: int,
  177. address: Optional[str] = None,
  178. family: socket.AddressFamily = socket.AF_UNSPEC,
  179. backlog: int = _DEFAULT_BACKLOG,
  180. flags: Optional[int] = None,
  181. reuse_port: bool = False,
  182. ) -> None:
  183. """Binds this server to the given port on the given address.
  184. To start the server, call `start`. If you want to run this server in a
  185. single process, you can call `listen` as a shortcut to the sequence of
  186. `bind` and `start` calls.
  187. Address may be either an IP address or hostname. If it's a hostname,
  188. the server will listen on all IP addresses associated with the name.
  189. Address may be an empty string or None to listen on all available
  190. interfaces. Family may be set to either `socket.AF_INET` or
  191. `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise both
  192. will be used if available.
  193. The ``backlog`` argument has the same meaning as for `socket.listen
  194. <socket.socket.listen>`. The ``reuse_port`` argument has the same
  195. meaning as for `.bind_sockets`.
  196. This method may be called multiple times prior to `start` to listen on
  197. multiple ports or interfaces.
  198. .. versionchanged:: 4.4
  199. Added the ``reuse_port`` argument.
  200. .. versionchanged:: 6.2
  201. Added the ``flags`` argument to match `.bind_sockets`.
  202. .. deprecated:: 6.2
  203. Use either ``listen()`` or ``add_sockets()`` instead of ``bind()``
  204. and ``start()``.
  205. """
  206. sockets = bind_sockets(
  207. port,
  208. address=address,
  209. family=family,
  210. backlog=backlog,
  211. flags=flags,
  212. reuse_port=reuse_port,
  213. )
  214. if self._started:
  215. self.add_sockets(sockets)
  216. else:
  217. self._pending_sockets.extend(sockets)
  218. def start(
  219. self, num_processes: Optional[int] = 1, max_restarts: Optional[int] = None
  220. ) -> None:
  221. """Starts this server in the `.IOLoop`.
  222. By default, we run the server in this process and do not fork any
  223. additional child process.
  224. If num_processes is ``None`` or <= 0, we detect the number of cores
  225. available on this machine and fork that number of child
  226. processes. If num_processes is given and > 1, we fork that
  227. specific number of sub-processes.
  228. Since we use processes and not threads, there is no shared memory
  229. between any server code.
  230. Note that multiple processes are not compatible with the autoreload
  231. module (or the ``autoreload=True`` option to `tornado.web.Application`
  232. which defaults to True when ``debug=True``).
  233. When using multiple processes, no IOLoops can be created or
  234. referenced until after the call to ``TCPServer.start(n)``.
  235. Values of ``num_processes`` other than 1 are not supported on Windows.
  236. The ``max_restarts`` argument is passed to `.fork_processes`.
  237. .. versionchanged:: 6.0
  238. Added ``max_restarts`` argument.
  239. .. deprecated:: 6.2
  240. Use either ``listen()`` or ``add_sockets()`` instead of ``bind()``
  241. and ``start()``.
  242. """
  243. assert not self._started
  244. self._started = True
  245. if num_processes != 1:
  246. process.fork_processes(num_processes, max_restarts)
  247. sockets = self._pending_sockets
  248. self._pending_sockets = []
  249. self.add_sockets(sockets)
  250. def stop(self) -> None:
  251. """Stops listening for new connections.
  252. Requests currently in progress may still continue after the
  253. server is stopped.
  254. """
  255. if self._stopped:
  256. return
  257. self._stopped = True
  258. for fd, sock in self._sockets.items():
  259. assert sock.fileno() == fd
  260. # Unregister socket from IOLoop
  261. self._handlers.pop(fd)()
  262. sock.close()
  263. def handle_stream(
  264. self, stream: IOStream, address: tuple
  265. ) -> Optional[Awaitable[None]]:
  266. """Override to handle a new `.IOStream` from an incoming connection.
  267. This method may be a coroutine; if so any exceptions it raises
  268. asynchronously will be logged. Accepting of incoming connections
  269. will not be blocked by this coroutine.
  270. If this `TCPServer` is configured for SSL, ``handle_stream``
  271. may be called before the SSL handshake has completed. Use
  272. `.SSLIOStream.wait_for_handshake` if you need to verify the client's
  273. certificate or use NPN/ALPN.
  274. .. versionchanged:: 4.2
  275. Added the option for this method to be a coroutine.
  276. """
  277. raise NotImplementedError()
  278. def _handle_connection(self, connection: socket.socket, address: Any) -> None:
  279. if self.ssl_options is not None:
  280. assert ssl, "OpenSSL required for SSL"
  281. try:
  282. connection = ssl_wrap_socket(
  283. connection,
  284. self.ssl_options,
  285. server_side=True,
  286. do_handshake_on_connect=False,
  287. )
  288. except ssl.SSLError as err:
  289. if err.args[0] == ssl.SSL_ERROR_EOF:
  290. return connection.close()
  291. else:
  292. raise
  293. except OSError as err:
  294. # If the connection is closed immediately after it is created
  295. # (as in a port scan), we can get one of several errors.
  296. # wrap_socket makes an internal call to getpeername,
  297. # which may return either EINVAL (Mac OS X) or ENOTCONN
  298. # (Linux). If it returns ENOTCONN, this error is
  299. # silently swallowed by the ssl module, so we need to
  300. # catch another error later on (AttributeError in
  301. # SSLIOStream._do_ssl_handshake).
  302. # To test this behavior, try nmap with the -sT flag.
  303. # https://github.com/tornadoweb/tornado/pull/750
  304. if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):
  305. return connection.close()
  306. else:
  307. raise
  308. try:
  309. if self.ssl_options is not None:
  310. stream = SSLIOStream(
  311. connection,
  312. max_buffer_size=self.max_buffer_size,
  313. read_chunk_size=self.read_chunk_size,
  314. ) # type: IOStream
  315. else:
  316. stream = IOStream(
  317. connection,
  318. max_buffer_size=self.max_buffer_size,
  319. read_chunk_size=self.read_chunk_size,
  320. )
  321. future = self.handle_stream(stream, address)
  322. if future is not None:
  323. IOLoop.current().add_future(
  324. gen.convert_yielded(future), lambda f: f.result()
  325. )
  326. except Exception:
  327. app_log.error("Error in connection callback", exc_info=True)