resource_tracker.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. ###############################################################################
  2. # Server process to keep track of unlinked resources, like folders and
  3. # semaphores and clean them.
  4. #
  5. # author: Thomas Moreau
  6. #
  7. # Adapted from multiprocessing/resource_tracker.py
  8. # * add some VERBOSE logging,
  9. # * add support to track folders,
  10. # * add Windows support,
  11. # * refcounting scheme to avoid unlinking resources still in use.
  12. #
  13. # On Unix we run a server process which keeps track of unlinked
  14. # resources. The server ignores SIGINT and SIGTERM and reads from a
  15. # pipe. The resource_tracker implements a reference counting scheme: each time
  16. # a Python process anticipates the shared usage of a resource by another
  17. # process, it signals the resource_tracker of this shared usage, and in return,
  18. # the resource_tracker increments the resource's reference count by 1.
  19. # Similarly, when access to a resource is closed by a Python process, the
  20. # process notifies the resource_tracker by asking it to decrement the
  21. # resource's reference count by 1. When the reference count drops to 0, the
  22. # resource_tracker attempts to clean up the underlying resource.
  23. # Finally, every other process connected to the resource tracker has a copy of
  24. # the writable end of the pipe used to communicate with it, so the resource
  25. # tracker gets EOF when all other processes have exited. Then the
  26. # resource_tracker process unlinks any remaining leaked resources (with
  27. # reference count above 0)
  28. # For semaphores, this is important because the system only supports a limited
  29. # number of named semaphores, and they will not be automatically removed till
  30. # the next reboot. Without this resource tracker process, "killall python"
  31. # would probably leave unlinked semaphores.
  32. # Note that this behavior differs from CPython's resource_tracker, which only
  33. # implements list of shared resources, and not a proper refcounting scheme.
  34. # Also, CPython's resource tracker will only attempt to cleanup those shared
  35. # resources once all processes connected to the resource tracker have exited.
  36. import os
  37. import shutil
  38. import sys
  39. import signal
  40. import warnings
  41. from multiprocessing import util
  42. from multiprocessing.resource_tracker import (
  43. ResourceTracker as _ResourceTracker,
  44. )
  45. from . import spawn
  46. if sys.platform == "win32":
  47. import _winapi
  48. import msvcrt
  49. from multiprocessing.reduction import duplicate
  50. __all__ = ["ensure_running", "register", "unregister"]
  51. _HAVE_SIGMASK = hasattr(signal, "pthread_sigmask")
  52. _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
  53. def cleanup_noop(name):
  54. raise RuntimeError("noop should never be registered or cleaned up")
  55. _CLEANUP_FUNCS = {
  56. "noop": cleanup_noop,
  57. "folder": shutil.rmtree,
  58. "file": os.unlink,
  59. }
  60. if os.name == "posix":
  61. import _multiprocessing
  62. # Use sem_unlink() to clean up named semaphores.
  63. #
  64. # sem_unlink() may be missing if the Python build process detected the
  65. # absence of POSIX named semaphores. In that case, no named semaphores were
  66. # ever opened, so no cleanup would be necessary.
  67. if hasattr(_multiprocessing, "sem_unlink"):
  68. _CLEANUP_FUNCS.update(
  69. {
  70. "semlock": _multiprocessing.sem_unlink,
  71. }
  72. )
  73. VERBOSE = False
  74. class ResourceTracker(_ResourceTracker):
  75. """Resource tracker with refcounting scheme.
  76. This class is an extension of the multiprocessing ResourceTracker class
  77. which implements a reference counting scheme to avoid unlinking shared
  78. resources still in use in other processes.
  79. This feature is notably used by `joblib.Parallel` to share temporary
  80. folders and memory mapped files between the main process and the worker
  81. processes.
  82. The actual implementation of the refcounting scheme is in the main
  83. function, which is run in a dedicated process.
  84. """
  85. def maybe_unlink(self, name, rtype):
  86. """Decrement the refcount of a resource, and delete it if it hits 0"""
  87. self._send("MAYBE_UNLINK", name, rtype)
  88. def ensure_running(self):
  89. """Make sure that resource tracker process is running.
  90. This can be run from any process. Usually a child process will use
  91. the resource created by its parent.
  92. This function is necessary for backward compatibility with python
  93. versions before 3.13.7.
  94. """
  95. return self._ensure_running_and_write()
  96. def _teardown_dead_process(self):
  97. # Override this function for compatibility with windows and
  98. # for python version before 3.13.7
  99. # At this point, the resource_tracker process has been killed
  100. # or crashed.
  101. os.close(self._fd)
  102. # Let's remove the process entry from the process table on POSIX system
  103. # to avoid zombie processes.
  104. if os.name == "posix":
  105. try:
  106. # _pid can be None if this process is a child from another
  107. # python process, which has started the resource_tracker.
  108. if self._pid is not None:
  109. os.waitpid(self._pid, 0)
  110. except OSError:
  111. # The resource_tracker has already been terminated.
  112. pass
  113. self._fd = None
  114. self._pid = None
  115. warnings.warn(
  116. "resource_tracker: process died unexpectedly, relaunching. "
  117. "Some folders/semaphores might leak."
  118. )
  119. def _launch(self):
  120. # This is the overridden part of the resource tracker, which launches
  121. # loky's version, which is compatible with windows and allow to track
  122. # folders with external ref counting.
  123. fds_to_pass = []
  124. try:
  125. fds_to_pass.append(sys.stderr.fileno())
  126. except Exception:
  127. pass
  128. # Create a pipe for posix and windows
  129. r, w = os.pipe()
  130. if sys.platform == "win32":
  131. _r = duplicate(msvcrt.get_osfhandle(r), inheritable=True)
  132. os.close(r)
  133. r = _r
  134. cmd = f"from {main.__module__} import main; main({r}, {VERBOSE})"
  135. try:
  136. fds_to_pass.append(r)
  137. # process will out live us, so no need to wait on pid
  138. exe = spawn.get_executable()
  139. args = [exe, *util._args_from_interpreter_flags(), "-c", cmd]
  140. util.debug(f"launching resource tracker: {args}")
  141. # bpo-33613: Register a signal mask that will block the
  142. # signals. This signal mask will be inherited by the child
  143. # that is going to be spawned and will protect the child from a
  144. # race condition that can make the child die before it
  145. # registers signal handlers for SIGINT and SIGTERM. The mask is
  146. # unregistered after spawning the child.
  147. try:
  148. if _HAVE_SIGMASK:
  149. signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
  150. pid = spawnv_passfds(exe, args, fds_to_pass)
  151. finally:
  152. if _HAVE_SIGMASK:
  153. signal.pthread_sigmask(
  154. signal.SIG_UNBLOCK, _IGNORED_SIGNALS
  155. )
  156. except BaseException:
  157. os.close(w)
  158. raise
  159. else:
  160. self._fd = w
  161. self._pid = pid
  162. finally:
  163. if sys.platform == "win32":
  164. _winapi.CloseHandle(r)
  165. else:
  166. os.close(r)
  167. def _ensure_running_and_write(self, msg=None):
  168. """Make sure that resource tracker process is running.
  169. This can be run from any process. Usually a child process will use
  170. the resource created by its parent.
  171. This function is added for compatibility with python version before 3.13.7.
  172. """
  173. with self._lock:
  174. if (
  175. self._fd is not None
  176. ): # resource tracker was launched before, is it still running?
  177. if msg is None:
  178. to_send = b"PROBE:0:noop\n"
  179. else:
  180. to_send = msg
  181. try:
  182. self._write(to_send)
  183. except OSError:
  184. self._teardown_dead_process()
  185. self._launch()
  186. msg = None # message was sent in probe
  187. else:
  188. self._launch()
  189. if msg is not None:
  190. self._write(msg)
  191. def _write(self, msg):
  192. nbytes = os.write(self._fd, msg)
  193. assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
  194. def __del__(self):
  195. # ignore error due to trying to clean up child process which has already been
  196. # shutdown on windows. See https://github.com/joblib/loky/pull/450
  197. # This is only required if __del__ is defined
  198. if not hasattr(_ResourceTracker, "__del__"):
  199. return
  200. try:
  201. super().__del__()
  202. except ChildProcessError:
  203. pass
  204. _resource_tracker = ResourceTracker()
  205. ensure_running = _resource_tracker.ensure_running
  206. register = _resource_tracker.register
  207. maybe_unlink = _resource_tracker.maybe_unlink
  208. unregister = _resource_tracker.unregister
  209. getfd = _resource_tracker.getfd
  210. def main(fd, verbose=0):
  211. """Run resource tracker."""
  212. if verbose:
  213. util.log_to_stderr(level=util.DEBUG)
  214. # protect the process from ^C and "killall python" etc
  215. signal.signal(signal.SIGINT, signal.SIG_IGN)
  216. signal.signal(signal.SIGTERM, signal.SIG_IGN)
  217. if _HAVE_SIGMASK:
  218. signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
  219. for f in (sys.stdin, sys.stdout):
  220. try:
  221. f.close()
  222. except Exception:
  223. pass
  224. if verbose:
  225. util.debug("Main resource tracker is running")
  226. registry = {rtype: {} for rtype in _CLEANUP_FUNCS.keys()}
  227. try:
  228. if sys.platform == "win32":
  229. fd = msvcrt.open_osfhandle(fd, os.O_RDONLY)
  230. # keep track of registered/unregistered resources
  231. with open(fd, "rb") as f:
  232. for line in f:
  233. try:
  234. splitted = line.strip().decode("ascii").split(":")
  235. # name can potentially contain separator symbols (for
  236. # instance folders on Windows)
  237. cmd, name, rtype = (
  238. splitted[0],
  239. ":".join(splitted[1:-1]),
  240. splitted[-1],
  241. )
  242. if rtype not in _CLEANUP_FUNCS:
  243. raise ValueError(
  244. f"Cannot register {name} for automatic cleanup: "
  245. f"unknown resource type ({rtype}). Resource type "
  246. "should be one of the following: "
  247. f"{list(_CLEANUP_FUNCS.keys())}"
  248. )
  249. if cmd == "PROBE":
  250. pass
  251. elif cmd == "REGISTER":
  252. if name not in registry[rtype]:
  253. registry[rtype][name] = 1
  254. else:
  255. registry[rtype][name] += 1
  256. if verbose:
  257. util.debug(
  258. "[ResourceTracker] incremented refcount of "
  259. f"{rtype} {name} "
  260. f"(current {registry[rtype][name]})"
  261. )
  262. elif cmd == "UNREGISTER":
  263. del registry[rtype][name]
  264. if verbose:
  265. util.debug(
  266. f"[ResourceTracker] unregister {name} {rtype}: "
  267. f"registry({len(registry)})"
  268. )
  269. elif cmd == "MAYBE_UNLINK":
  270. registry[rtype][name] -= 1
  271. if verbose:
  272. util.debug(
  273. "[ResourceTracker] decremented refcount of "
  274. f"{rtype} {name} "
  275. f"(current {registry[rtype][name]})"
  276. )
  277. if registry[rtype][name] == 0:
  278. del registry[rtype][name]
  279. try:
  280. if verbose:
  281. util.debug(
  282. f"[ResourceTracker] unlink {name}"
  283. )
  284. _CLEANUP_FUNCS[rtype](name)
  285. except Exception as e:
  286. warnings.warn(
  287. f"resource_tracker: {name}: {e!r}"
  288. )
  289. else:
  290. raise RuntimeError(f"unrecognized command {cmd!r}")
  291. except BaseException:
  292. try:
  293. sys.excepthook(*sys.exc_info())
  294. except BaseException:
  295. pass
  296. finally:
  297. # all processes have terminated; cleanup any remaining resources
  298. def _unlink_resources(rtype_registry, rtype):
  299. if rtype_registry:
  300. try:
  301. warnings.warn(
  302. "resource_tracker: There appear to be "
  303. f"{len(rtype_registry)} leaked {rtype} objects to "
  304. "clean up at shutdown"
  305. )
  306. except Exception:
  307. pass
  308. for name in rtype_registry:
  309. # For some reason the process which created and registered this
  310. # resource has failed to unregister it. Presumably it has
  311. # died. We therefore clean it up.
  312. try:
  313. _CLEANUP_FUNCS[rtype](name)
  314. if verbose:
  315. util.debug(f"[ResourceTracker] unlink {name}")
  316. except Exception as e:
  317. warnings.warn(f"resource_tracker: {name}: {e!r}")
  318. for rtype, rtype_registry in registry.items():
  319. if rtype == "folder":
  320. continue
  321. else:
  322. _unlink_resources(rtype_registry, rtype)
  323. # The default cleanup routine for folders deletes everything inside
  324. # those folders recursively, which can include other resources tracked
  325. # by the resource tracker). To limit the risk of the resource tracker
  326. # attempting to delete twice a resource (once as part of a tracked
  327. # folder, and once as a resource), we delete the folders after all
  328. # other resource types.
  329. if "folder" in registry:
  330. _unlink_resources(registry["folder"], "folder")
  331. if verbose:
  332. util.debug("resource tracker shut down")
  333. def spawnv_passfds(path, args, passfds):
  334. if sys.platform != "win32":
  335. args = [arg.encode("utf-8") for arg in args]
  336. path = path.encode("utf-8")
  337. return util.spawnv_passfds(path, args, passfds)
  338. else:
  339. passfds = sorted(passfds)
  340. cmd = " ".join(f'"{x}"' for x in args)
  341. try:
  342. _, ht, pid, _ = _winapi.CreateProcess(
  343. path, cmd, None, None, True, 0, None, None, None
  344. )
  345. _winapi.CloseHandle(ht)
  346. except BaseException:
  347. pass
  348. return pid