| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- import logging
- from typing import Optional
- import ray._common.utils
- import ray._private.ray_constants as ray_constants
- import ray._private.utils as utils
- logger = logging.getLogger(__name__)
- # See https://docs.kernel.org/admin-guide/cgroup-v2.html#weights
- # for information about cpu weights
- _CGROUP_CPU_MAX_WEIGHT: int = 10000
- class ResourceIsolationConfig:
- """Configuration for enabling resource isolation by reserving memory and cpu for ray system processes through cgroupv2.
- Validates configuration for resource isolation by enforcing types, correct combinations of values, applying default values,
- and sanity checking cpu and memory reservations. Also, converts system_reserved_cpu into cpu.weights for cgroupv2.
- Attributes:
- enable_resource_isolation: True if cgroupv2 based isolation of ray
- system processes is enabled.
- cgroup_path: The path for the cgroup the raylet should use to enforce
- resource isolation.
- system_reserved_cpu: The amount of cores reserved for ray system
- processes. Must be >= ray_constants.MINIMUM_SYSTEM_RESERVED_CPU_CORES
- and < the total number of cores available.
- system_reserved_memory: The amount of memory in bytes reserved
- for ray system processes. Must be >= ray_constants.MINIMUM_SYSTEM_RESERVED_MEMORY_BYTES
- and system_reserved_cpu + object_store_bytes < the total memory available.
- TODO(54703): Link documentation when it's available.
- """
- def __init__(
- self,
- enable_resource_isolation: bool = False,
- cgroup_path: Optional[str] = None,
- system_reserved_cpu: Optional[float] = None,
- system_reserved_memory: Optional[int] = None,
- object_store_memory: Optional[int] = None,
- ):
- """
- Raises:
- ValueError: On invalid inputs.
- Args:
- enable_resource_isolation: True if cgroupv2 based isolation of ray
- system processes is enabled.
- cgroup_path: The path for the cgroup the raylet should use to enforce
- resource isolation.
- system_reserved_cpu: The amount of cores reserved for ray system
- processes. Must be >= ray_constants.MINIMUM_SYSTEM_RESERVED_CPU_CORES
- and < the total number of cores available.
- system_reserved_memory: The amount of memory in bytes reserved
- for ray system processes. Must be >= ray_constants.MINIMUM_SYSTEM_RESERVED_MEMORY_BYTES
- and system_reserved_memory + object_store_memory < the total memory available.
- object_store_memory: The amount of memory in bytes reserved for the object store.
- Must be not None when resource isolation is enabled.
- """
- self._resource_isolation_enabled = enable_resource_isolation
- self.cgroup_path = cgroup_path
- self.system_reserved_memory = system_reserved_memory
- self.system_pids = ""
- # cgroupv2 cpu.weight calculated from system_reserved_cpu assumes ray uses all available cores.
- self.system_reserved_cpu_weight: int = None
- if not enable_resource_isolation:
- if self.cgroup_path:
- raise ValueError(
- "cgroup_path cannot be set when resource isolation is not enabled. "
- "Set enable_resource_isolation to True if you're using ray.init or use the "
- "--enable-resource-isolation flag if you're using the ray cli."
- )
- if system_reserved_cpu:
- raise ValueError(
- "system_reserved_cpu cannot be set when resource isolation is not enabled. "
- "Set enable_resource_isolation to True if you're using ray.init or use the "
- "--enable-resource-isolation flag if you're using the ray cli."
- )
- if self.system_reserved_memory:
- raise ValueError(
- "system_reserved_memory cannot be set when resource isolation is not enabled. "
- "Set enable_resource_isolation to True if you're using ray.init or use the "
- "--enable-resource-isolation flag if you're using the ray cli."
- )
- return
- if object_store_memory is None:
- raise ValueError(
- "object_store_memory must be resolved before creating a ResourceIsolationConfig "
- "when resource isolation is enabled. This is likely a bug in Ray, "
- "please report it at https://github.com/ray-project/ray/issues/new/choose."
- )
- self.system_reserved_cpu_weight = self._validate_and_get_system_reserved_cpu(
- system_reserved_cpu
- )
- self.system_reserved_memory = self._validate_and_get_system_reserved_memory(
- system_reserved_memory, object_store_memory
- )
- self.cgroup_path = self._validate_and_get_cgroup_path(cgroup_path)
- def is_enabled(self) -> bool:
- return self._resource_isolation_enabled
- def add_system_pids(self, system_pids: str):
- """A comma-separated list of pids to move into the system cgroup."""
- self.system_pids = system_pids
- @staticmethod
- def _validate_and_get_cgroup_path(cgroup_path: Optional[str]) -> str:
- """Returns the ray_constants.DEFAULT_CGROUP_PATH if cgroup_path is not specified.
- Args:
- cgroup_path: The path for the cgroup the raylet should use to enforce
- resource isolation.
- Returns:
- str: The validated cgroup path.
- Raises:
- ValueError: If cgroup_path is not a string.
- """
- if not cgroup_path:
- cgroup_path = ray_constants.DEFAULT_CGROUP_PATH
- if not isinstance(cgroup_path, str):
- raise ValueError(
- f"Invalid value={cgroup_path} for cgroup_path. "
- "Use a string to represent the path for the cgroup that the raylet should use "
- "to enable resource isolation."
- )
- return cgroup_path
- @staticmethod
- def _validate_and_get_system_reserved_cpu(
- system_reserved_cpu: Optional[float],
- ) -> int:
- """If system_reserved_cpu is specified, validates it, otherwise returns the default value.
- Validation entails checking the type, ensuring that the value is in range, and converts it
- into cpu.weights for cgroupv2. See https://docs.kernel.org/admin-guide/cgroup-v2.html#weights
- for more information.
- If system_reserved_cpu is not specified, returns a default value between
- [DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES, DEFAULT_MAX_SYSTEM_RESERVED_CPU_CORES].
- # TODO(54703): The errors from this method are user-facing and thus need
- to be linked the user-facing documentation once it's available.
- Args:
- system_reserved_cpu: The amount of cores reserved for ray system
- processes. Must be >= ray_constants.MINIMUM_SYSTEM_RESERVED_CPU_CORES
- and < the total number of cores available.
- Raises:
- ValueError: If system_reserved_cpu is specified, but invalid or if the system
- does not have enough available cpus.
- """
- available_system_cpus = utils.get_num_cpus(truncate=False)
- if available_system_cpus < ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES:
- raise ValueError(
- f"The available number of cpu cores on this system {available_system_cpus} is less than "
- f"the minimum amount that is required for ray's system processes. "
- f"Pick a number of cpu cores greater than or equal to {ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES}"
- )
- if not system_reserved_cpu:
- system_reserved_cpu = float(
- min(
- max(
- ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES,
- ray_constants.DEFAULT_SYSTEM_RESERVED_CPU_PROPORTION
- * available_system_cpus,
- ),
- ray_constants.DEFAULT_MAX_SYSTEM_RESERVED_CPU_CORES,
- )
- )
- if not (
- isinstance(system_reserved_cpu, float)
- or isinstance(system_reserved_cpu, int)
- ):
- raise ValueError(
- f"Invalid value={system_reserved_cpu} for system_reserved_cpu. "
- "Use a float to represent the number of cores that need to be reserved for "
- "ray system processes to enable resource isolation."
- )
- system_reserved_cpu = float(system_reserved_cpu)
- if system_reserved_cpu < ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES:
- raise ValueError(
- f"The requested system_reserved_cpu={system_reserved_cpu} is less than "
- f"the minimum number of cpus that can be used for resource isolation. "
- "Pick a number of cpu cores to reserve for ray system processes "
- f"greater than or equal to {ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES}"
- )
- if system_reserved_cpu >= available_system_cpus:
- raise ValueError(
- f"The requested system_reserved_cpu={system_reserved_cpu} is greater than or equal to "
- f"the number of cpus available={available_system_cpus}. "
- "Pick a smaller number of cpu cores to reserve for ray system processes."
- )
- # Converting the number of cores the user defined into cpu.weights
- # This assumes that ray is allowed to use all available CPU
- # cores and distribute them between system, worker and
- # user processes
- return int(
- (system_reserved_cpu / float(available_system_cpus))
- * _CGROUP_CPU_MAX_WEIGHT
- )
- @staticmethod
- def _validate_and_get_system_reserved_memory(
- system_reserved_memory: Optional[int],
- object_store_memory: int,
- ) -> int:
- """If system_reserved_memory is not specified, returns the default value. Otherwise,
- checks the type, makes sure that the value is in range.
- Args:
- system_reserved_memory: The amount of memory in bytes reserved
- for ray system processes. Must be >= ray_constants.MINIMUM_SYSTEM_RESERVED_MEMORY_BYTES
- and < the total memory available.
- object_store_memory: The amount of memory in bytes reserved for the object store.
- Returns:
- int: The validated system reserved memory in bytes.
- Raises:
- ValueError: If system_reserved_memory is specified, but invalid.
- """
- available_system_memory = ray._common.utils.get_system_memory()
- if (
- available_system_memory
- < ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_MEMORY_BYTES
- ):
- raise ValueError(
- f"The available memory on this system {available_system_memory} is less than "
- f"the minimum amount that is required for ray's system processes. "
- f"Pick a number of bytes greater than or equal to {ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_MEMORY_BYTES}"
- )
- if not system_reserved_memory:
- system_reserved_memory = int(
- min(
- max(
- ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_MEMORY_BYTES,
- ray_constants.DEFAULT_SYSTEM_RESERVED_MEMORY_PROPORTION
- * available_system_memory,
- ),
- ray_constants.DEFAULT_MAX_SYSTEM_RESERVED_MEMORY_BYTES,
- )
- )
- if not isinstance(system_reserved_memory, int):
- raise ValueError(
- f"Invalid value {system_reserved_memory} for system_reserved_memory. "
- "Use an integer to represent the number bytes that need to be reserved for "
- "ray system processes to enable resource isolation."
- )
- if (
- system_reserved_memory
- < ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_MEMORY_BYTES
- ):
- raise ValueError(
- f"The requested system_reserved_memory {system_reserved_memory} is less than "
- f"the minimum number of bytes that can be used for resource isolation. "
- "Pick a number of bytes to reserve for ray system processes "
- f"greater than or equal to {ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_MEMORY_BYTES}"
- )
- total_system_reserved_memory = system_reserved_memory + object_store_memory
- if total_system_reserved_memory > available_system_memory:
- raise ValueError(
- f"The total requested system_reserved_memory={total_system_reserved_memory} is greater than "
- f"the amount of memory available={available_system_memory}."
- )
- return total_system_reserved_memory
|