| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- from dataclasses import dataclass
- from typing import Dict, List, Optional
- import ray
- from ray import LOCAL_MODE, SCRIPT_MODE
- from ray.air.execution.resources.request import (
- AcquiredResources,
- RemoteRayEntity,
- ResourceRequest,
- )
- from ray.air.execution.resources.resource_manager import ResourceManager
- from ray.util.annotations import DeveloperAPI
- # Avoid numerical errors by multiplying and subtracting with this number.
- # Compare: 0.99 - 0.33 = 0.65999... vs (0.99 * 1000 - 0.33 * 1000) / 1000 = 0.66
- _DIGITS = 100000
- @DeveloperAPI
- @dataclass
- class FixedAcquiredResources(AcquiredResources):
- bundles: List[Dict[str, float]]
- def _annotate_remote_entity(
- self, entity: RemoteRayEntity, bundle: Dict[str, float], bundle_index: int
- ) -> RemoteRayEntity:
- bundle = bundle.copy()
- num_cpus = bundle.pop("CPU", 0)
- num_gpus = bundle.pop("GPU", 0)
- memory = bundle.pop("memory", 0.0)
- return entity.options(
- num_cpus=num_cpus,
- num_gpus=num_gpus,
- memory=memory,
- resources=bundle,
- )
- @DeveloperAPI
- class FixedResourceManager(ResourceManager):
- """Fixed budget based resource manager.
- This resource manager keeps track of a fixed set of resources. When resources
- are acquired, they are subtracted from the budget. When resources are freed,
- they are added back to the budget.
- The resource manager still requires resources to be requested before they become
- available. However, because the resource requests are virtual, this will not
- trigger autoscaling.
- Additionally, resources are not reserved on request, only on acquisition. Thus,
- acquiring a resource can change the availability of other requests. Note that
- this behavior may be changed in future implementations.
- The fixed resource manager does not support placement strategies. Using
- ``STRICT_SPREAD`` will result in an error. ``STRICT_PACK`` will succeed only
- within a placement group bundle. All other placement group arguments will be
- ignored.
- Args:
- total_resources: Budget of resources to manage. Defaults to all available
- resources in the current task or all cluster resources (if outside a task).
- """
- _resource_cls: AcquiredResources = FixedAcquiredResources
- def __init__(self, total_resources: Optional[Dict[str, float]] = None):
- rtc = ray.get_runtime_context()
- if not total_resources:
- if rtc.worker.mode in {None, SCRIPT_MODE, LOCAL_MODE}:
- total_resources = ray.cluster_resources()
- else:
- total_resources = rtc.get_assigned_resources()
- # If we are in a placement group, all of our resources will be in a bundle
- # and thus fulfill requirements of STRICT_PACK - but only if child tasks
- # are captured by the pg.
- self._allow_strict_pack = (
- ray.util.get_current_placement_group() is not None
- and rtc.should_capture_child_tasks_in_placement_group
- )
- self._total_resources = total_resources
- self._requested_resources = []
- self._used_resources = []
- @property
- def _available_resources(self) -> Dict[str, float]:
- available_resources = self._total_resources.copy()
- for used_resources in self._used_resources:
- all_resources = used_resources.required_resources
- for k, v in all_resources.items():
- available_resources[k] = (
- available_resources[k] * _DIGITS - v * _DIGITS
- ) / _DIGITS
- return available_resources
- def request_resources(self, resource_request: ResourceRequest):
- if resource_request.strategy == "STRICT_SPREAD" or (
- not self._allow_strict_pack and resource_request.strategy == "STRICT_PACK"
- ):
- raise RuntimeError(
- f"Requested a resource with placement strategy "
- f"{resource_request.strategy}, but this cannot be fulfilled by a "
- f"FixedResourceManager. In a nested setting, please set the inner "
- f"placement strategy to be less restrictive (i.e. no STRICT_ strategy)."
- )
- self._requested_resources.append(resource_request)
- def cancel_resource_request(self, resource_request: ResourceRequest):
- self._requested_resources.remove(resource_request)
- def has_resources_ready(self, resource_request: ResourceRequest) -> bool:
- if resource_request not in self._requested_resources:
- return False
- available_resources = self._available_resources
- all_resources = resource_request.required_resources
- for k, v in all_resources.items():
- if available_resources.get(k, 0.0) < v:
- return False
- return True
- def acquire_resources(
- self, resource_request: ResourceRequest
- ) -> Optional[AcquiredResources]:
- if not self.has_resources_ready(resource_request):
- return None
- self._used_resources.append(resource_request)
- return self._resource_cls(
- bundles=resource_request.bundles, resource_request=resource_request
- )
- def free_resources(self, acquired_resource: AcquiredResources):
- resources = acquired_resource.resource_request
- self._used_resources.remove(resources)
- def clear(self):
- # Reset internal state
- self._requested_resources = []
- self._used_resources = []
|