fixed.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from dataclasses import dataclass
  2. from typing import Dict, List, Optional
  3. import ray
  4. from ray import LOCAL_MODE, SCRIPT_MODE
  5. from ray.air.execution.resources.request import (
  6. AcquiredResources,
  7. RemoteRayEntity,
  8. ResourceRequest,
  9. )
  10. from ray.air.execution.resources.resource_manager import ResourceManager
  11. from ray.util.annotations import DeveloperAPI
  12. # Avoid numerical errors by multiplying and subtracting with this number.
  13. # Compare: 0.99 - 0.33 = 0.65999... vs (0.99 * 1000 - 0.33 * 1000) / 1000 = 0.66
  14. _DIGITS = 100000
  15. @DeveloperAPI
  16. @dataclass
  17. class FixedAcquiredResources(AcquiredResources):
  18. bundles: List[Dict[str, float]]
  19. def _annotate_remote_entity(
  20. self, entity: RemoteRayEntity, bundle: Dict[str, float], bundle_index: int
  21. ) -> RemoteRayEntity:
  22. bundle = bundle.copy()
  23. num_cpus = bundle.pop("CPU", 0)
  24. num_gpus = bundle.pop("GPU", 0)
  25. memory = bundle.pop("memory", 0.0)
  26. return entity.options(
  27. num_cpus=num_cpus,
  28. num_gpus=num_gpus,
  29. memory=memory,
  30. resources=bundle,
  31. )
  32. @DeveloperAPI
  33. class FixedResourceManager(ResourceManager):
  34. """Fixed budget based resource manager.
  35. This resource manager keeps track of a fixed set of resources. When resources
  36. are acquired, they are subtracted from the budget. When resources are freed,
  37. they are added back to the budget.
  38. The resource manager still requires resources to be requested before they become
  39. available. However, because the resource requests are virtual, this will not
  40. trigger autoscaling.
  41. Additionally, resources are not reserved on request, only on acquisition. Thus,
  42. acquiring a resource can change the availability of other requests. Note that
  43. this behavior may be changed in future implementations.
  44. The fixed resource manager does not support placement strategies. Using
  45. ``STRICT_SPREAD`` will result in an error. ``STRICT_PACK`` will succeed only
  46. within a placement group bundle. All other placement group arguments will be
  47. ignored.
  48. Args:
  49. total_resources: Budget of resources to manage. Defaults to all available
  50. resources in the current task or all cluster resources (if outside a task).
  51. """
  52. _resource_cls: AcquiredResources = FixedAcquiredResources
  53. def __init__(self, total_resources: Optional[Dict[str, float]] = None):
  54. rtc = ray.get_runtime_context()
  55. if not total_resources:
  56. if rtc.worker.mode in {None, SCRIPT_MODE, LOCAL_MODE}:
  57. total_resources = ray.cluster_resources()
  58. else:
  59. total_resources = rtc.get_assigned_resources()
  60. # If we are in a placement group, all of our resources will be in a bundle
  61. # and thus fulfill requirements of STRICT_PACK - but only if child tasks
  62. # are captured by the pg.
  63. self._allow_strict_pack = (
  64. ray.util.get_current_placement_group() is not None
  65. and rtc.should_capture_child_tasks_in_placement_group
  66. )
  67. self._total_resources = total_resources
  68. self._requested_resources = []
  69. self._used_resources = []
  70. @property
  71. def _available_resources(self) -> Dict[str, float]:
  72. available_resources = self._total_resources.copy()
  73. for used_resources in self._used_resources:
  74. all_resources = used_resources.required_resources
  75. for k, v in all_resources.items():
  76. available_resources[k] = (
  77. available_resources[k] * _DIGITS - v * _DIGITS
  78. ) / _DIGITS
  79. return available_resources
  80. def request_resources(self, resource_request: ResourceRequest):
  81. if resource_request.strategy == "STRICT_SPREAD" or (
  82. not self._allow_strict_pack and resource_request.strategy == "STRICT_PACK"
  83. ):
  84. raise RuntimeError(
  85. f"Requested a resource with placement strategy "
  86. f"{resource_request.strategy}, but this cannot be fulfilled by a "
  87. f"FixedResourceManager. In a nested setting, please set the inner "
  88. f"placement strategy to be less restrictive (i.e. no STRICT_ strategy)."
  89. )
  90. self._requested_resources.append(resource_request)
  91. def cancel_resource_request(self, resource_request: ResourceRequest):
  92. self._requested_resources.remove(resource_request)
  93. def has_resources_ready(self, resource_request: ResourceRequest) -> bool:
  94. if resource_request not in self._requested_resources:
  95. return False
  96. available_resources = self._available_resources
  97. all_resources = resource_request.required_resources
  98. for k, v in all_resources.items():
  99. if available_resources.get(k, 0.0) < v:
  100. return False
  101. return True
  102. def acquire_resources(
  103. self, resource_request: ResourceRequest
  104. ) -> Optional[AcquiredResources]:
  105. if not self.has_resources_ready(resource_request):
  106. return None
  107. self._used_resources.append(resource_request)
  108. return self._resource_cls(
  109. bundles=resource_request.bundles, resource_request=resource_request
  110. )
  111. def free_resources(self, acquired_resource: AcquiredResources):
  112. resources = acquired_resource.resource_request
  113. self._used_resources.remove(resources)
  114. def clear(self):
  115. # Reset internal state
  116. self._requested_resources = []
  117. self._used_resources = []