process.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. #
  2. # Copyright 2011 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """Utilities for working with multiple processes, including both forking
  16. the server into multiple processes and managing subprocesses.
  17. """
  18. import asyncio
  19. import os
  20. import multiprocessing
  21. import signal
  22. import subprocess
  23. import sys
  24. import time
  25. from binascii import hexlify
  26. from tornado.concurrent import (
  27. Future,
  28. future_set_result_unless_cancelled,
  29. future_set_exception_unless_cancelled,
  30. )
  31. from tornado import ioloop
  32. from tornado.iostream import PipeIOStream
  33. from tornado.log import gen_log
  34. import typing
  35. from typing import Optional, Any, Callable
  36. if typing.TYPE_CHECKING:
  37. from typing import List # noqa: F401
  38. # Re-export this exception for convenience.
  39. CalledProcessError = subprocess.CalledProcessError
  40. def cpu_count() -> int:
  41. """Returns the number of processors on this machine."""
  42. if multiprocessing is None:
  43. return 1
  44. try:
  45. return multiprocessing.cpu_count()
  46. except NotImplementedError:
  47. pass
  48. try:
  49. return os.sysconf("SC_NPROCESSORS_CONF") # type: ignore
  50. except (AttributeError, ValueError):
  51. pass
  52. gen_log.error("Could not detect number of processors; assuming 1")
  53. return 1
  54. def _reseed_random() -> None:
  55. if "random" not in sys.modules:
  56. return
  57. import random
  58. # If os.urandom is available, this method does the same thing as
  59. # random.seed (at least as of python 2.6). If os.urandom is not
  60. # available, we mix in the pid in addition to a timestamp.
  61. try:
  62. seed = int(hexlify(os.urandom(16)), 16)
  63. except NotImplementedError:
  64. seed = int(time.time() * 1000) ^ os.getpid()
  65. random.seed(seed)
  66. _task_id = None
  67. def fork_processes(
  68. num_processes: Optional[int], max_restarts: Optional[int] = None
  69. ) -> int:
  70. """Starts multiple worker processes.
  71. If ``num_processes`` is None or <= 0, we detect the number of cores
  72. available on this machine and fork that number of child
  73. processes. If ``num_processes`` is given and > 0, we fork that
  74. specific number of sub-processes.
  75. Since we use processes and not threads, there is no shared memory
  76. between any server code.
  77. Note that multiple processes are not compatible with the autoreload
  78. module (or the ``autoreload=True`` option to `tornado.web.Application`
  79. which defaults to True when ``debug=True``).
  80. When using multiple processes, no IOLoops can be created or
  81. referenced until after the call to ``fork_processes``.
  82. In each child process, ``fork_processes`` returns its *task id*, a
  83. number between 0 and ``num_processes``. Processes that exit
  84. abnormally (due to a signal or non-zero exit status) are restarted
  85. with the same id (up to ``max_restarts`` times). In the parent
  86. process, ``fork_processes`` calls ``sys.exit(0)`` after all child
  87. processes have exited normally.
  88. max_restarts defaults to 100.
  89. Availability: Unix
  90. """
  91. if sys.platform == "win32":
  92. # The exact form of this condition matters to mypy; it understands
  93. # if but not assert in this context.
  94. raise Exception("fork not available on windows")
  95. if max_restarts is None:
  96. max_restarts = 100
  97. global _task_id
  98. assert _task_id is None
  99. if num_processes is None or num_processes <= 0:
  100. num_processes = cpu_count()
  101. gen_log.info("Starting %d processes", num_processes)
  102. children = {}
  103. def start_child(i: int) -> Optional[int]:
  104. pid = os.fork()
  105. if pid == 0:
  106. # child process
  107. _reseed_random()
  108. global _task_id
  109. _task_id = i
  110. return i
  111. else:
  112. children[pid] = i
  113. return None
  114. for i in range(num_processes):
  115. id = start_child(i)
  116. if id is not None:
  117. return id
  118. num_restarts = 0
  119. while children:
  120. pid, status = os.wait()
  121. if pid not in children:
  122. continue
  123. id = children.pop(pid)
  124. if os.WIFSIGNALED(status):
  125. gen_log.warning(
  126. "child %d (pid %d) killed by signal %d, restarting",
  127. id,
  128. pid,
  129. os.WTERMSIG(status),
  130. )
  131. elif os.WEXITSTATUS(status) != 0:
  132. gen_log.warning(
  133. "child %d (pid %d) exited with status %d, restarting",
  134. id,
  135. pid,
  136. os.WEXITSTATUS(status),
  137. )
  138. else:
  139. gen_log.info("child %d (pid %d) exited normally", id, pid)
  140. continue
  141. num_restarts += 1
  142. if num_restarts > max_restarts:
  143. raise RuntimeError("Too many child restarts, giving up")
  144. new_id = start_child(id)
  145. if new_id is not None:
  146. return new_id
  147. # All child processes exited cleanly, so exit the master process
  148. # instead of just returning to right after the call to
  149. # fork_processes (which will probably just start up another IOLoop
  150. # unless the caller checks the return value).
  151. sys.exit(0)
  152. def task_id() -> Optional[int]:
  153. """Returns the current task id, if any.
  154. Returns None if this process was not created by `fork_processes`.
  155. """
  156. global _task_id
  157. return _task_id
  158. class Subprocess:
  159. """Wraps ``subprocess.Popen`` with IOStream support.
  160. The constructor is the same as ``subprocess.Popen`` with the following
  161. additions:
  162. * ``stdin``, ``stdout``, and ``stderr`` may have the value
  163. ``tornado.process.Subprocess.STREAM``, which will make the corresponding
  164. attribute of the resulting Subprocess a `.PipeIOStream`. If this option
  165. is used, the caller is responsible for closing the streams when done
  166. with them.
  167. The ``Subprocess.STREAM`` option and the ``set_exit_callback`` and
  168. ``wait_for_exit`` methods do not work on Windows. There is
  169. therefore no reason to use this class instead of
  170. ``subprocess.Popen`` on that platform.
  171. .. versionchanged:: 5.0
  172. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
  173. """
  174. STREAM = object()
  175. _initialized = False
  176. _waiting = {} # type: ignore
  177. def __init__(self, *args: Any, **kwargs: Any) -> None:
  178. self.io_loop = ioloop.IOLoop.current()
  179. # All FDs we create should be closed on error; those in to_close
  180. # should be closed in the parent process on success.
  181. pipe_fds = [] # type: List[int]
  182. to_close = [] # type: List[int]
  183. if kwargs.get("stdin") is Subprocess.STREAM:
  184. in_r, in_w = os.pipe()
  185. kwargs["stdin"] = in_r
  186. pipe_fds.extend((in_r, in_w))
  187. to_close.append(in_r)
  188. self.stdin = PipeIOStream(in_w)
  189. if kwargs.get("stdout") is Subprocess.STREAM:
  190. out_r, out_w = os.pipe()
  191. kwargs["stdout"] = out_w
  192. pipe_fds.extend((out_r, out_w))
  193. to_close.append(out_w)
  194. self.stdout = PipeIOStream(out_r)
  195. if kwargs.get("stderr") is Subprocess.STREAM:
  196. err_r, err_w = os.pipe()
  197. kwargs["stderr"] = err_w
  198. pipe_fds.extend((err_r, err_w))
  199. to_close.append(err_w)
  200. self.stderr = PipeIOStream(err_r)
  201. try:
  202. self.proc = subprocess.Popen(*args, **kwargs)
  203. except:
  204. for fd in pipe_fds:
  205. os.close(fd)
  206. raise
  207. for fd in to_close:
  208. os.close(fd)
  209. self.pid = self.proc.pid
  210. for attr in ["stdin", "stdout", "stderr"]:
  211. if not hasattr(self, attr): # don't clobber streams set above
  212. setattr(self, attr, getattr(self.proc, attr))
  213. self._exit_callback = None # type: Optional[Callable[[int], None]]
  214. self.returncode = None # type: Optional[int]
  215. def set_exit_callback(self, callback: Callable[[int], None]) -> None:
  216. """Runs ``callback`` when this process exits.
  217. The callback takes one argument, the return code of the process.
  218. This method uses a ``SIGCHLD`` handler, which is a global setting
  219. and may conflict if you have other libraries trying to handle the
  220. same signal. If you are using more than one ``IOLoop`` it may
  221. be necessary to call `Subprocess.initialize` first to designate
  222. one ``IOLoop`` to run the signal handlers.
  223. In many cases a close callback on the stdout or stderr streams
  224. can be used as an alternative to an exit callback if the
  225. signal handler is causing a problem.
  226. Availability: Unix
  227. """
  228. self._exit_callback = callback
  229. Subprocess.initialize()
  230. Subprocess._waiting[self.pid] = self
  231. Subprocess._try_cleanup_process(self.pid)
  232. def wait_for_exit(self, raise_error: bool = True) -> "Future[int]":
  233. """Returns a `.Future` which resolves when the process exits.
  234. Usage::
  235. ret = yield proc.wait_for_exit()
  236. This is a coroutine-friendly alternative to `set_exit_callback`
  237. (and a replacement for the blocking `subprocess.Popen.wait`).
  238. By default, raises `subprocess.CalledProcessError` if the process
  239. has a non-zero exit status. Use ``wait_for_exit(raise_error=False)``
  240. to suppress this behavior and return the exit status without raising.
  241. .. versionadded:: 4.2
  242. Availability: Unix
  243. """
  244. future = Future() # type: Future[int]
  245. def callback(ret: int) -> None:
  246. if ret != 0 and raise_error:
  247. # Unfortunately we don't have the original args any more.
  248. future_set_exception_unless_cancelled(
  249. future, CalledProcessError(ret, "unknown")
  250. )
  251. else:
  252. future_set_result_unless_cancelled(future, ret)
  253. self.set_exit_callback(callback)
  254. return future
  255. @classmethod
  256. def initialize(cls) -> None:
  257. """Initializes the ``SIGCHLD`` handler.
  258. The signal handler is run on an `.IOLoop` to avoid locking issues.
  259. Note that the `.IOLoop` used for signal handling need not be the
  260. same one used by individual Subprocess objects (as long as the
  261. ``IOLoops`` are each running in separate threads).
  262. .. versionchanged:: 5.0
  263. The ``io_loop`` argument (deprecated since version 4.1) has been
  264. removed.
  265. Availability: Unix
  266. """
  267. if cls._initialized:
  268. return
  269. loop = asyncio.get_event_loop()
  270. loop.add_signal_handler(signal.SIGCHLD, cls._cleanup)
  271. cls._initialized = True
  272. @classmethod
  273. def uninitialize(cls) -> None:
  274. """Removes the ``SIGCHLD`` handler."""
  275. if not cls._initialized:
  276. return
  277. loop = asyncio.get_event_loop()
  278. loop.remove_signal_handler(signal.SIGCHLD)
  279. cls._initialized = False
  280. @classmethod
  281. def _cleanup(cls) -> None:
  282. for pid in list(cls._waiting.keys()): # make a copy
  283. cls._try_cleanup_process(pid)
  284. @classmethod
  285. def _try_cleanup_process(cls, pid: int) -> None:
  286. try:
  287. ret_pid, status = os.waitpid(pid, os.WNOHANG) # type: ignore
  288. except ChildProcessError:
  289. return
  290. if ret_pid == 0:
  291. return
  292. assert ret_pid == pid
  293. subproc = cls._waiting.pop(pid)
  294. subproc.io_loop.add_callback(subproc._set_returncode, status)
  295. def _set_returncode(self, status: int) -> None:
  296. if sys.platform == "win32":
  297. self.returncode = -1
  298. else:
  299. if os.WIFSIGNALED(status):
  300. self.returncode = -os.WTERMSIG(status)
  301. else:
  302. assert os.WIFEXITED(status)
  303. self.returncode = os.WEXITSTATUS(status)
  304. # We've taken over wait() duty from the subprocess.Popen
  305. # object. If we don't inform it of the process's return code,
  306. # it will log a warning at destruction in python 3.6+.
  307. self.proc.returncode = self.returncode
  308. if self._exit_callback:
  309. callback = self._exit_callback
  310. self._exit_callback = None
  311. callback(self.returncode)