context.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. ###############################################################################
  2. # Basic context management with LokyContext
  3. #
  4. # author: Thomas Moreau and Olivier Grisel
  5. #
  6. # adapted from multiprocessing/context.py
  7. # * Create a context ensuring loky uses only objects that are compatible
  8. # * Add LokyContext to the list of context of multiprocessing so loky can be
  9. # used with multiprocessing.set_start_method
  10. # * Implement a CFS-aware amd physical-core aware cpu_count function.
  11. #
  12. import os
  13. import sys
  14. import math
  15. import subprocess
  16. import traceback
  17. import warnings
  18. import multiprocessing as mp
  19. from multiprocessing import get_context as mp_get_context
  20. from multiprocessing.context import BaseContext
  21. from concurrent.futures.process import _MAX_WINDOWS_WORKERS
  22. from .process import LokyProcess, LokyInitMainProcess
  23. # Apparently, on older Python versions, loky cannot work 61 workers on Windows
  24. # but instead 60: ¯\_(ツ)_/¯
  25. if sys.version_info < (3, 10):
  26. _MAX_WINDOWS_WORKERS = _MAX_WINDOWS_WORKERS - 1
  27. START_METHODS = ["loky", "loky_init_main", "spawn"]
  28. if sys.platform != "win32":
  29. START_METHODS += ["fork", "forkserver"]
  30. _DEFAULT_START_METHOD = None
  31. # Cache for the number of physical cores to avoid repeating subprocess calls.
  32. # It should not change during the lifetime of the program.
  33. physical_cores_cache = None
  34. def get_context(method=None):
  35. # Try to overload the default context
  36. method = method or _DEFAULT_START_METHOD or "loky"
  37. if method == "fork":
  38. # If 'fork' is explicitly requested, warn user about potential issues.
  39. warnings.warn(
  40. "`fork` start method should not be used with "
  41. "`loky` as it does not respect POSIX. Try using "
  42. "`spawn` or `loky` instead.",
  43. UserWarning,
  44. )
  45. try:
  46. return mp_get_context(method)
  47. except ValueError:
  48. raise ValueError(
  49. f"Unknown context '{method}'. Value should be in "
  50. f"{START_METHODS}."
  51. )
  52. def set_start_method(method, force=False):
  53. global _DEFAULT_START_METHOD
  54. if _DEFAULT_START_METHOD is not None and not force:
  55. raise RuntimeError("context has already been set")
  56. assert method is None or method in START_METHODS, (
  57. f"'{method}' is not a valid start_method. It should be in "
  58. f"{START_METHODS}"
  59. )
  60. _DEFAULT_START_METHOD = method
  61. def get_start_method():
  62. return _DEFAULT_START_METHOD
  63. def cpu_count(only_physical_cores=False):
  64. """Return the number of CPUs the current process can use.
  65. The returned number of CPUs accounts for:
  66. * the number of CPUs in the system, as given by
  67. ``multiprocessing.cpu_count``;
  68. * the CPU affinity settings of the current process
  69. (available on some Unix systems);
  70. * Cgroup CPU bandwidth limit (available on Linux only, typically
  71. set by docker and similar container orchestration systems);
  72. * the value of the LOKY_MAX_CPU_COUNT environment variable if defined.
  73. and is given as the minimum of these constraints.
  74. If ``only_physical_cores`` is True, return the number of physical cores
  75. instead of the number of logical cores (hyperthreading / SMT). Note that
  76. this option is not enforced if the number of usable cores is controlled in
  77. any other way such as: process affinity, Cgroup restricted CPU bandwidth
  78. or the LOKY_MAX_CPU_COUNT environment variable. If the number of physical
  79. cores is not found, return the number of logical cores.
  80. Note that on Windows, the returned number of CPUs cannot exceed 61 (or 60 for
  81. Python < 3.10), see:
  82. https://bugs.python.org/issue26903.
  83. It is also always larger or equal to 1.
  84. """
  85. # Note: os.cpu_count() is allowed to return None in its docstring
  86. os_cpu_count = os.cpu_count() or 1
  87. if sys.platform == "win32":
  88. # On Windows, attempting to use more than 61 CPUs would result in a
  89. # OS-level error. See https://bugs.python.org/issue26903. According to
  90. # https://learn.microsoft.com/en-us/windows/win32/procthread/processor-groups
  91. # it might be possible to go beyond with a lot of extra work but this
  92. # does not look easy.
  93. os_cpu_count = min(os_cpu_count, _MAX_WINDOWS_WORKERS)
  94. cpu_count_user = _cpu_count_user(os_cpu_count)
  95. aggregate_cpu_count = max(min(os_cpu_count, cpu_count_user), 1)
  96. if not only_physical_cores:
  97. return aggregate_cpu_count
  98. if cpu_count_user < os_cpu_count:
  99. # Respect user setting
  100. return max(cpu_count_user, 1)
  101. cpu_count_physical, exception = _count_physical_cores()
  102. if cpu_count_physical != "not found":
  103. return cpu_count_physical
  104. # Fallback to default behavior
  105. if exception is not None:
  106. # warns only the first time
  107. warnings.warn(
  108. "Could not find the number of physical cores for the "
  109. f"following reason:\n{exception}\n"
  110. "Returning the number of logical cores instead. You can "
  111. "silence this warning by setting LOKY_MAX_CPU_COUNT to "
  112. "the number of cores you want to use."
  113. )
  114. traceback.print_tb(exception.__traceback__)
  115. return aggregate_cpu_count
  116. def _cpu_count_cgroup(os_cpu_count):
  117. # Cgroup CPU bandwidth limit available in Linux since 2.6 kernel
  118. cpu_max_fname = "/sys/fs/cgroup/cpu.max"
  119. cfs_quota_fname = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"
  120. cfs_period_fname = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"
  121. if os.path.exists(cpu_max_fname):
  122. # cgroup v2
  123. # https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html
  124. with open(cpu_max_fname) as fh:
  125. cpu_quota_us, cpu_period_us = fh.read().strip().split()
  126. elif os.path.exists(cfs_quota_fname) and os.path.exists(cfs_period_fname):
  127. # cgroup v1
  128. # https://www.kernel.org/doc/html/latest/scheduler/sched-bwc.html#management
  129. with open(cfs_quota_fname) as fh:
  130. cpu_quota_us = fh.read().strip()
  131. with open(cfs_period_fname) as fh:
  132. cpu_period_us = fh.read().strip()
  133. else:
  134. # No Cgroup CPU bandwidth limit (e.g. non-Linux platform)
  135. cpu_quota_us = "max"
  136. cpu_period_us = 100_000 # unused, for consistency with default values
  137. if cpu_quota_us == "max":
  138. # No active Cgroup quota on a Cgroup-capable platform
  139. return os_cpu_count
  140. else:
  141. cpu_quota_us = int(cpu_quota_us)
  142. cpu_period_us = int(cpu_period_us)
  143. if cpu_quota_us > 0 and cpu_period_us > 0:
  144. return math.ceil(cpu_quota_us / cpu_period_us)
  145. else: # pragma: no cover
  146. # Setting a negative cpu_quota_us value is a valid way to disable
  147. # cgroup CPU bandwith limits
  148. return os_cpu_count
  149. def _cpu_count_affinity(os_cpu_count):
  150. # Number of available CPUs given affinity settings
  151. if hasattr(os, "sched_getaffinity"):
  152. try:
  153. return len(os.sched_getaffinity(0))
  154. except NotImplementedError:
  155. pass
  156. # On some platforms, os.sched_getaffinity does not exist or raises
  157. # NotImplementedError, let's try with the psutil if installed.
  158. try:
  159. import psutil
  160. p = psutil.Process()
  161. if hasattr(p, "cpu_affinity"):
  162. return len(p.cpu_affinity())
  163. except ImportError: # pragma: no cover
  164. if (
  165. sys.platform == "linux"
  166. and os.environ.get("LOKY_MAX_CPU_COUNT") is None
  167. ):
  168. # Some platforms don't implement os.sched_getaffinity on Linux which
  169. # can cause severe oversubscription problems. Better warn the
  170. # user in this particularly pathological case which can wreck
  171. # havoc, typically on CI workers.
  172. warnings.warn(
  173. "Failed to inspect CPU affinity constraints on this system. "
  174. "Please install psutil or explictly set LOKY_MAX_CPU_COUNT."
  175. )
  176. # This can happen for platforms that do not implement any kind of CPU
  177. # infinity such as macOS-based platforms.
  178. return os_cpu_count
  179. def _cpu_count_user(os_cpu_count):
  180. """Number of user defined available CPUs"""
  181. cpu_count_affinity = _cpu_count_affinity(os_cpu_count)
  182. cpu_count_cgroup = _cpu_count_cgroup(os_cpu_count)
  183. # User defined soft-limit passed as a loky specific environment variable.
  184. cpu_count_loky = int(os.environ.get("LOKY_MAX_CPU_COUNT", os_cpu_count))
  185. return min(cpu_count_affinity, cpu_count_cgroup, cpu_count_loky)
  186. def _count_physical_cores():
  187. """Return a tuple (number of physical cores, exception)
  188. If the number of physical cores is found, exception is set to None.
  189. If it has not been found, return ("not found", exception).
  190. The number of physical cores is cached to avoid repeating subprocess calls.
  191. """
  192. exception = None
  193. # First check if the value is cached
  194. global physical_cores_cache
  195. if physical_cores_cache is not None:
  196. return physical_cores_cache, exception
  197. # Not cached yet, find it
  198. try:
  199. if sys.platform == "linux":
  200. cpu_count_physical = _count_physical_cores_linux()
  201. elif sys.platform == "win32":
  202. cpu_count_physical = _count_physical_cores_win32()
  203. elif sys.platform == "darwin":
  204. cpu_count_physical = _count_physical_cores_darwin()
  205. else:
  206. raise NotImplementedError(f"unsupported platform: {sys.platform}")
  207. # if cpu_count_physical < 1, we did not find a valid value
  208. if cpu_count_physical < 1:
  209. raise ValueError(f"found {cpu_count_physical} physical cores < 1")
  210. except Exception as e:
  211. exception = e
  212. cpu_count_physical = "not found"
  213. # Put the result in cache
  214. physical_cores_cache = cpu_count_physical
  215. return cpu_count_physical, exception
  216. def _count_physical_cores_linux():
  217. try:
  218. cpu_info = subprocess.run(
  219. "lscpu --parse=core".split(), capture_output=True, text=True
  220. )
  221. cpu_info = cpu_info.stdout.splitlines()
  222. cpu_info = {line for line in cpu_info if not line.startswith("#")}
  223. return len(cpu_info)
  224. except:
  225. pass # fallback to /proc/cpuinfo
  226. cpu_info = subprocess.run(
  227. "cat /proc/cpuinfo".split(), capture_output=True, text=True
  228. )
  229. cpu_info = cpu_info.stdout.splitlines()
  230. cpu_info = {line for line in cpu_info if line.startswith("core id")}
  231. return len(cpu_info)
  232. def _count_physical_cores_win32():
  233. try:
  234. cmd = "-Command (Get-CimInstance -ClassName Win32_Processor).NumberOfCores"
  235. cpu_info = subprocess.run(
  236. f"powershell.exe {cmd}".split(),
  237. capture_output=True,
  238. text=True,
  239. )
  240. cpu_info = cpu_info.stdout.splitlines()
  241. return int(cpu_info[0])
  242. except:
  243. pass # fallback to wmic (older Windows versions; deprecated now)
  244. cpu_info = subprocess.run(
  245. "wmic CPU Get NumberOfCores /Format:csv".split(),
  246. capture_output=True,
  247. text=True,
  248. )
  249. cpu_info = cpu_info.stdout.splitlines()
  250. cpu_info = [
  251. l.split(",")[1] for l in cpu_info if (l and l != "Node,NumberOfCores")
  252. ]
  253. return sum(map(int, cpu_info))
  254. def _count_physical_cores_darwin():
  255. cpu_info = subprocess.run(
  256. "sysctl -n hw.physicalcpu".split(),
  257. capture_output=True,
  258. text=True,
  259. )
  260. cpu_info = cpu_info.stdout
  261. return int(cpu_info)
  262. class LokyContext(BaseContext):
  263. """Context relying on the LokyProcess."""
  264. _name = "loky"
  265. Process = LokyProcess
  266. cpu_count = staticmethod(cpu_count)
  267. def Queue(self, maxsize=0, reducers=None):
  268. """Returns a queue object"""
  269. from .queues import Queue
  270. return Queue(maxsize, reducers=reducers, ctx=self.get_context())
  271. def SimpleQueue(self, reducers=None):
  272. """Returns a queue object"""
  273. from .queues import SimpleQueue
  274. return SimpleQueue(reducers=reducers, ctx=self.get_context())
  275. if sys.platform != "win32":
  276. """For Unix platform, use our custom implementation of synchronize
  277. ensuring that we use the loky.backend.resource_tracker to clean-up
  278. the semaphores in case of a worker crash.
  279. """
  280. def Semaphore(self, value=1):
  281. """Returns a semaphore object"""
  282. from .synchronize import Semaphore
  283. return Semaphore(value=value)
  284. def BoundedSemaphore(self, value):
  285. """Returns a bounded semaphore object"""
  286. from .synchronize import BoundedSemaphore
  287. return BoundedSemaphore(value)
  288. def Lock(self):
  289. """Returns a lock object"""
  290. from .synchronize import Lock
  291. return Lock()
  292. def RLock(self):
  293. """Returns a recurrent lock object"""
  294. from .synchronize import RLock
  295. return RLock()
  296. def Condition(self, lock=None):
  297. """Returns a condition object"""
  298. from .synchronize import Condition
  299. return Condition(lock)
  300. def Event(self):
  301. """Returns an event object"""
  302. from .synchronize import Event
  303. return Event()
  304. class LokyInitMainContext(LokyContext):
  305. """Extra context with LokyProcess, which does load the main module
  306. This context is used for compatibility in the case ``cloudpickle`` is not
  307. present on the running system. This permits to load functions defined in
  308. the ``main`` module, using proper safeguards. The declaration of the
  309. ``executor`` should be protected by ``if __name__ == "__main__":`` and the
  310. functions and variable used from main should be out of this block.
  311. This mimics the default behavior of multiprocessing under Windows and the
  312. behavior of the ``spawn`` start method on a posix system.
  313. For more details, see the end of the following section of python doc
  314. https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
  315. """
  316. _name = "loky_init_main"
  317. Process = LokyInitMainProcess
  318. # Register loky context so it works with multiprocessing.get_context
  319. ctx_loky = LokyContext()
  320. mp.context._concrete_contexts["loky"] = ctx_loky
  321. mp.context._concrete_contexts["loky_init_main"] = LokyInitMainContext()