from dataclasses import dataclass from typing import Dict, List, Optional import ray from ray import LOCAL_MODE, SCRIPT_MODE from ray.air.execution.resources.request import ( AcquiredResources, RemoteRayEntity, ResourceRequest, ) from ray.air.execution.resources.resource_manager import ResourceManager from ray.util.annotations import DeveloperAPI # Avoid numerical errors by multiplying and subtracting with this number. # Compare: 0.99 - 0.33 = 0.65999... vs (0.99 * 1000 - 0.33 * 1000) / 1000 = 0.66 _DIGITS = 100000 @DeveloperAPI @dataclass class FixedAcquiredResources(AcquiredResources): bundles: List[Dict[str, float]] def _annotate_remote_entity( self, entity: RemoteRayEntity, bundle: Dict[str, float], bundle_index: int ) -> RemoteRayEntity: bundle = bundle.copy() num_cpus = bundle.pop("CPU", 0) num_gpus = bundle.pop("GPU", 0) memory = bundle.pop("memory", 0.0) return entity.options( num_cpus=num_cpus, num_gpus=num_gpus, memory=memory, resources=bundle, ) @DeveloperAPI class FixedResourceManager(ResourceManager): """Fixed budget based resource manager. This resource manager keeps track of a fixed set of resources. When resources are acquired, they are subtracted from the budget. When resources are freed, they are added back to the budget. The resource manager still requires resources to be requested before they become available. However, because the resource requests are virtual, this will not trigger autoscaling. Additionally, resources are not reserved on request, only on acquisition. Thus, acquiring a resource can change the availability of other requests. Note that this behavior may be changed in future implementations. The fixed resource manager does not support placement strategies. Using ``STRICT_SPREAD`` will result in an error. ``STRICT_PACK`` will succeed only within a placement group bundle. All other placement group arguments will be ignored. Args: total_resources: Budget of resources to manage. Defaults to all available resources in the current task or all cluster resources (if outside a task). """ _resource_cls: AcquiredResources = FixedAcquiredResources def __init__(self, total_resources: Optional[Dict[str, float]] = None): rtc = ray.get_runtime_context() if not total_resources: if rtc.worker.mode in {None, SCRIPT_MODE, LOCAL_MODE}: total_resources = ray.cluster_resources() else: total_resources = rtc.get_assigned_resources() # If we are in a placement group, all of our resources will be in a bundle # and thus fulfill requirements of STRICT_PACK - but only if child tasks # are captured by the pg. self._allow_strict_pack = ( ray.util.get_current_placement_group() is not None and rtc.should_capture_child_tasks_in_placement_group ) self._total_resources = total_resources self._requested_resources = [] self._used_resources = [] @property def _available_resources(self) -> Dict[str, float]: available_resources = self._total_resources.copy() for used_resources in self._used_resources: all_resources = used_resources.required_resources for k, v in all_resources.items(): available_resources[k] = ( available_resources[k] * _DIGITS - v * _DIGITS ) / _DIGITS return available_resources def request_resources(self, resource_request: ResourceRequest): if resource_request.strategy == "STRICT_SPREAD" or ( not self._allow_strict_pack and resource_request.strategy == "STRICT_PACK" ): raise RuntimeError( f"Requested a resource with placement strategy " f"{resource_request.strategy}, but this cannot be fulfilled by a " f"FixedResourceManager. In a nested setting, please set the inner " f"placement strategy to be less restrictive (i.e. no STRICT_ strategy)." ) self._requested_resources.append(resource_request) def cancel_resource_request(self, resource_request: ResourceRequest): self._requested_resources.remove(resource_request) def has_resources_ready(self, resource_request: ResourceRequest) -> bool: if resource_request not in self._requested_resources: return False available_resources = self._available_resources all_resources = resource_request.required_resources for k, v in all_resources.items(): if available_resources.get(k, 0.0) < v: return False return True def acquire_resources( self, resource_request: ResourceRequest ) -> Optional[AcquiredResources]: if not self.has_resources_ready(resource_request): return None self._used_resources.append(resource_request) return self._resource_cls( bundles=resource_request.bundles, resource_request=resource_request ) def free_resources(self, acquired_resource: AcquiredResources): resources = acquired_resource.resource_request self._used_resources.remove(resources) def clear(self): # Reset internal state self._requested_resources = [] self._used_resources = []