| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- ###############################################################################
- # Basic context management with LokyContext
- #
- # author: Thomas Moreau and Olivier Grisel
- #
- # adapted from multiprocessing/context.py
- # * Create a context ensuring loky uses only objects that are compatible
- # * Add LokyContext to the list of context of multiprocessing so loky can be
- # used with multiprocessing.set_start_method
- # * Implement a CFS-aware amd physical-core aware cpu_count function.
- #
- import os
- import sys
- import math
- import subprocess
- import traceback
- import warnings
- import multiprocessing as mp
- from multiprocessing import get_context as mp_get_context
- from multiprocessing.context import BaseContext
- from concurrent.futures.process import _MAX_WINDOWS_WORKERS
- from .process import LokyProcess, LokyInitMainProcess
- # Apparently, on older Python versions, loky cannot work 61 workers on Windows
- # but instead 60: ¯\_(ツ)_/¯
- if sys.version_info < (3, 10):
- _MAX_WINDOWS_WORKERS = _MAX_WINDOWS_WORKERS - 1
- START_METHODS = ["loky", "loky_init_main", "spawn"]
- if sys.platform != "win32":
- START_METHODS += ["fork", "forkserver"]
- _DEFAULT_START_METHOD = None
- # Cache for the number of physical cores to avoid repeating subprocess calls.
- # It should not change during the lifetime of the program.
- physical_cores_cache = None
- def get_context(method=None):
- # Try to overload the default context
- method = method or _DEFAULT_START_METHOD or "loky"
- if method == "fork":
- # If 'fork' is explicitly requested, warn user about potential issues.
- warnings.warn(
- "`fork` start method should not be used with "
- "`loky` as it does not respect POSIX. Try using "
- "`spawn` or `loky` instead.",
- UserWarning,
- )
- try:
- return mp_get_context(method)
- except ValueError:
- raise ValueError(
- f"Unknown context '{method}'. Value should be in "
- f"{START_METHODS}."
- )
- def set_start_method(method, force=False):
- global _DEFAULT_START_METHOD
- if _DEFAULT_START_METHOD is not None and not force:
- raise RuntimeError("context has already been set")
- assert method is None or method in START_METHODS, (
- f"'{method}' is not a valid start_method. It should be in "
- f"{START_METHODS}"
- )
- _DEFAULT_START_METHOD = method
- def get_start_method():
- return _DEFAULT_START_METHOD
- def cpu_count(only_physical_cores=False):
- """Return the number of CPUs the current process can use.
- The returned number of CPUs accounts for:
- * the number of CPUs in the system, as given by
- ``multiprocessing.cpu_count``;
- * the CPU affinity settings of the current process
- (available on some Unix systems);
- * Cgroup CPU bandwidth limit (available on Linux only, typically
- set by docker and similar container orchestration systems);
- * the value of the LOKY_MAX_CPU_COUNT environment variable if defined.
- and is given as the minimum of these constraints.
- If ``only_physical_cores`` is True, return the number of physical cores
- instead of the number of logical cores (hyperthreading / SMT). Note that
- this option is not enforced if the number of usable cores is controlled in
- any other way such as: process affinity, Cgroup restricted CPU bandwidth
- or the LOKY_MAX_CPU_COUNT environment variable. If the number of physical
- cores is not found, return the number of logical cores.
- Note that on Windows, the returned number of CPUs cannot exceed 61 (or 60 for
- Python < 3.10), see:
- https://bugs.python.org/issue26903.
- It is also always larger or equal to 1.
- """
- # Note: os.cpu_count() is allowed to return None in its docstring
- os_cpu_count = os.cpu_count() or 1
- if sys.platform == "win32":
- # On Windows, attempting to use more than 61 CPUs would result in a
- # OS-level error. See https://bugs.python.org/issue26903. According to
- # https://learn.microsoft.com/en-us/windows/win32/procthread/processor-groups
- # it might be possible to go beyond with a lot of extra work but this
- # does not look easy.
- os_cpu_count = min(os_cpu_count, _MAX_WINDOWS_WORKERS)
- cpu_count_user = _cpu_count_user(os_cpu_count)
- aggregate_cpu_count = max(min(os_cpu_count, cpu_count_user), 1)
- if not only_physical_cores:
- return aggregate_cpu_count
- if cpu_count_user < os_cpu_count:
- # Respect user setting
- return max(cpu_count_user, 1)
- cpu_count_physical, exception = _count_physical_cores()
- if cpu_count_physical != "not found":
- return cpu_count_physical
- # Fallback to default behavior
- if exception is not None:
- # warns only the first time
- warnings.warn(
- "Could not find the number of physical cores for the "
- f"following reason:\n{exception}\n"
- "Returning the number of logical cores instead. You can "
- "silence this warning by setting LOKY_MAX_CPU_COUNT to "
- "the number of cores you want to use."
- )
- traceback.print_tb(exception.__traceback__)
- return aggregate_cpu_count
- def _cpu_count_cgroup(os_cpu_count):
- # Cgroup CPU bandwidth limit available in Linux since 2.6 kernel
- cpu_max_fname = "/sys/fs/cgroup/cpu.max"
- cfs_quota_fname = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"
- cfs_period_fname = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"
- if os.path.exists(cpu_max_fname):
- # cgroup v2
- # https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html
- with open(cpu_max_fname) as fh:
- cpu_quota_us, cpu_period_us = fh.read().strip().split()
- elif os.path.exists(cfs_quota_fname) and os.path.exists(cfs_period_fname):
- # cgroup v1
- # https://www.kernel.org/doc/html/latest/scheduler/sched-bwc.html#management
- with open(cfs_quota_fname) as fh:
- cpu_quota_us = fh.read().strip()
- with open(cfs_period_fname) as fh:
- cpu_period_us = fh.read().strip()
- else:
- # No Cgroup CPU bandwidth limit (e.g. non-Linux platform)
- cpu_quota_us = "max"
- cpu_period_us = 100_000 # unused, for consistency with default values
- if cpu_quota_us == "max":
- # No active Cgroup quota on a Cgroup-capable platform
- return os_cpu_count
- else:
- cpu_quota_us = int(cpu_quota_us)
- cpu_period_us = int(cpu_period_us)
- if cpu_quota_us > 0 and cpu_period_us > 0:
- return math.ceil(cpu_quota_us / cpu_period_us)
- else: # pragma: no cover
- # Setting a negative cpu_quota_us value is a valid way to disable
- # cgroup CPU bandwith limits
- return os_cpu_count
- def _cpu_count_affinity(os_cpu_count):
- # Number of available CPUs given affinity settings
- if hasattr(os, "sched_getaffinity"):
- try:
- return len(os.sched_getaffinity(0))
- except NotImplementedError:
- pass
- # On some platforms, os.sched_getaffinity does not exist or raises
- # NotImplementedError, let's try with the psutil if installed.
- try:
- import psutil
- p = psutil.Process()
- if hasattr(p, "cpu_affinity"):
- return len(p.cpu_affinity())
- except ImportError: # pragma: no cover
- if (
- sys.platform == "linux"
- and os.environ.get("LOKY_MAX_CPU_COUNT") is None
- ):
- # Some platforms don't implement os.sched_getaffinity on Linux which
- # can cause severe oversubscription problems. Better warn the
- # user in this particularly pathological case which can wreck
- # havoc, typically on CI workers.
- warnings.warn(
- "Failed to inspect CPU affinity constraints on this system. "
- "Please install psutil or explictly set LOKY_MAX_CPU_COUNT."
- )
- # This can happen for platforms that do not implement any kind of CPU
- # infinity such as macOS-based platforms.
- return os_cpu_count
- def _cpu_count_user(os_cpu_count):
- """Number of user defined available CPUs"""
- cpu_count_affinity = _cpu_count_affinity(os_cpu_count)
- cpu_count_cgroup = _cpu_count_cgroup(os_cpu_count)
- # User defined soft-limit passed as a loky specific environment variable.
- cpu_count_loky = int(os.environ.get("LOKY_MAX_CPU_COUNT", os_cpu_count))
- return min(cpu_count_affinity, cpu_count_cgroup, cpu_count_loky)
- def _count_physical_cores():
- """Return a tuple (number of physical cores, exception)
- If the number of physical cores is found, exception is set to None.
- If it has not been found, return ("not found", exception).
- The number of physical cores is cached to avoid repeating subprocess calls.
- """
- exception = None
- # First check if the value is cached
- global physical_cores_cache
- if physical_cores_cache is not None:
- return physical_cores_cache, exception
- # Not cached yet, find it
- try:
- if sys.platform == "linux":
- cpu_count_physical = _count_physical_cores_linux()
- elif sys.platform == "win32":
- cpu_count_physical = _count_physical_cores_win32()
- elif sys.platform == "darwin":
- cpu_count_physical = _count_physical_cores_darwin()
- else:
- raise NotImplementedError(f"unsupported platform: {sys.platform}")
- # if cpu_count_physical < 1, we did not find a valid value
- if cpu_count_physical < 1:
- raise ValueError(f"found {cpu_count_physical} physical cores < 1")
- except Exception as e:
- exception = e
- cpu_count_physical = "not found"
- # Put the result in cache
- physical_cores_cache = cpu_count_physical
- return cpu_count_physical, exception
- def _count_physical_cores_linux():
- try:
- cpu_info = subprocess.run(
- "lscpu --parse=core".split(), capture_output=True, text=True
- )
- cpu_info = cpu_info.stdout.splitlines()
- cpu_info = {line for line in cpu_info if not line.startswith("#")}
- return len(cpu_info)
- except:
- pass # fallback to /proc/cpuinfo
- cpu_info = subprocess.run(
- "cat /proc/cpuinfo".split(), capture_output=True, text=True
- )
- cpu_info = cpu_info.stdout.splitlines()
- cpu_info = {line for line in cpu_info if line.startswith("core id")}
- return len(cpu_info)
- def _count_physical_cores_win32():
- try:
- cmd = "-Command (Get-CimInstance -ClassName Win32_Processor).NumberOfCores"
- cpu_info = subprocess.run(
- f"powershell.exe {cmd}".split(),
- capture_output=True,
- text=True,
- )
- cpu_info = cpu_info.stdout.splitlines()
- return int(cpu_info[0])
- except:
- pass # fallback to wmic (older Windows versions; deprecated now)
- cpu_info = subprocess.run(
- "wmic CPU Get NumberOfCores /Format:csv".split(),
- capture_output=True,
- text=True,
- )
- cpu_info = cpu_info.stdout.splitlines()
- cpu_info = [
- l.split(",")[1] for l in cpu_info if (l and l != "Node,NumberOfCores")
- ]
- return sum(map(int, cpu_info))
- def _count_physical_cores_darwin():
- cpu_info = subprocess.run(
- "sysctl -n hw.physicalcpu".split(),
- capture_output=True,
- text=True,
- )
- cpu_info = cpu_info.stdout
- return int(cpu_info)
- class LokyContext(BaseContext):
- """Context relying on the LokyProcess."""
- _name = "loky"
- Process = LokyProcess
- cpu_count = staticmethod(cpu_count)
- def Queue(self, maxsize=0, reducers=None):
- """Returns a queue object"""
- from .queues import Queue
- return Queue(maxsize, reducers=reducers, ctx=self.get_context())
- def SimpleQueue(self, reducers=None):
- """Returns a queue object"""
- from .queues import SimpleQueue
- return SimpleQueue(reducers=reducers, ctx=self.get_context())
- if sys.platform != "win32":
- """For Unix platform, use our custom implementation of synchronize
- ensuring that we use the loky.backend.resource_tracker to clean-up
- the semaphores in case of a worker crash.
- """
- def Semaphore(self, value=1):
- """Returns a semaphore object"""
- from .synchronize import Semaphore
- return Semaphore(value=value)
- def BoundedSemaphore(self, value):
- """Returns a bounded semaphore object"""
- from .synchronize import BoundedSemaphore
- return BoundedSemaphore(value)
- def Lock(self):
- """Returns a lock object"""
- from .synchronize import Lock
- return Lock()
- def RLock(self):
- """Returns a recurrent lock object"""
- from .synchronize import RLock
- return RLock()
- def Condition(self, lock=None):
- """Returns a condition object"""
- from .synchronize import Condition
- return Condition(lock)
- def Event(self):
- """Returns an event object"""
- from .synchronize import Event
- return Event()
- class LokyInitMainContext(LokyContext):
- """Extra context with LokyProcess, which does load the main module
- This context is used for compatibility in the case ``cloudpickle`` is not
- present on the running system. This permits to load functions defined in
- the ``main`` module, using proper safeguards. The declaration of the
- ``executor`` should be protected by ``if __name__ == "__main__":`` and the
- functions and variable used from main should be out of this block.
- This mimics the default behavior of multiprocessing under Windows and the
- behavior of the ``spawn`` start method on a posix system.
- For more details, see the end of the following section of python doc
- https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
- """
- _name = "loky_init_main"
- Process = LokyInitMainProcess
- # Register loky context so it works with multiprocessing.get_context
- ctx_loky = LokyContext()
- mp.context._concrete_contexts["loky"] = ctx_loky
- mp.context._concrete_contexts["loky_init_main"] = LokyInitMainContext()
|