request.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. import abc
  2. import json
  3. from copy import deepcopy
  4. from dataclasses import dataclass
  5. from inspect import signature
  6. from typing import Dict, List, Union
  7. import ray
  8. from ray.util import placement_group
  9. from ray.util.annotations import DeveloperAPI
  10. RemoteRayEntity = Union[ray.remote_function.RemoteFunction, ray.actor.ActorClass]
  11. def _sum_bundles(bundles: List[Dict[str, float]]) -> Dict[str, float]:
  12. """Sum all resources in a list of resource bundles.
  13. Args:
  14. bundles: List of resource bundles.
  15. Returns: Dict containing all resources summed up.
  16. """
  17. resources = {}
  18. for bundle in bundles:
  19. for k, v in bundle.items():
  20. resources[k] = resources.get(k, 0) + v
  21. return resources
  22. @DeveloperAPI
  23. class ResourceRequest:
  24. """Request for resources.
  25. This class is used to define a resource request. A resource request comprises one
  26. or more bundles of resources and instructions on the scheduling behavior.
  27. The resource request can be submitted to a resource manager, which will
  28. schedule the resources. Depending on the resource backend, this may instruct
  29. Ray to scale up (autoscaling).
  30. Resource requests are compatible with the most fine-grained low-level resource
  31. backend, which are Ray placement groups.
  32. Args:
  33. bundles: A list of bundles which represent the resources requirements.
  34. E.g. ``[{"CPU": 1, "GPU": 1}]``.
  35. strategy: The scheduling strategy to acquire the bundles.
  36. - "PACK": Packs Bundles into as few nodes as possible.
  37. - "SPREAD": Places Bundles across distinct nodes as even as possible.
  38. - "STRICT_PACK": Packs Bundles into one node. The group is
  39. not allowed to span multiple nodes.
  40. - "STRICT_SPREAD": Packs Bundles across distinct nodes.
  41. *args: Passed to the call of ``placement_group()``, if applicable.
  42. **kwargs: Passed to the call of ``placement_group()``, if applicable.
  43. """
  44. def __init__(
  45. self,
  46. bundles: List[Dict[str, Union[int, float]]],
  47. strategy: str = "PACK",
  48. *args,
  49. **kwargs,
  50. ):
  51. if not bundles:
  52. raise ValueError("Cannot initialize a ResourceRequest with zero bundles.")
  53. # Remove empty resource keys
  54. self._bundles = [
  55. {k: float(v) for k, v in bundle.items() if v != 0} for bundle in bundles
  56. ]
  57. # Check if the head bundle is empty (no resources defined or all resources
  58. # are 0 (and thus removed in the previous step)
  59. if not self._bundles[0]:
  60. # This is when the head bundle doesn't need resources.
  61. self._head_bundle_is_empty = True
  62. self._bundles.pop(0)
  63. if not self._bundles:
  64. raise ValueError(
  65. "Cannot initialize a ResourceRequest with an empty head "
  66. "and zero worker bundles."
  67. )
  68. else:
  69. self._head_bundle_is_empty = False
  70. self._strategy = strategy
  71. self._args = args
  72. self._kwargs = kwargs
  73. self._hash = None
  74. self._bound = None
  75. self._bind()
  76. @property
  77. def head_bundle_is_empty(self):
  78. """Returns True if head bundle is empty while child bundles
  79. need resources.
  80. This is considered an internal API within Tune.
  81. """
  82. return self._head_bundle_is_empty
  83. @property
  84. @DeveloperAPI
  85. def head_cpus(self) -> float:
  86. """Returns the number of cpus in the head bundle."""
  87. return 0.0 if self._head_bundle_is_empty else self._bundles[0].get("CPU", 0.0)
  88. @property
  89. @DeveloperAPI
  90. def bundles(self) -> List[Dict[str, float]]:
  91. """Returns a deep copy of resource bundles"""
  92. return deepcopy(self._bundles)
  93. @property
  94. def required_resources(self) -> Dict[str, float]:
  95. """Returns a dict containing the sums of all resources"""
  96. return _sum_bundles(self._bundles)
  97. @property
  98. @DeveloperAPI
  99. def strategy(self) -> str:
  100. """Returns the placement strategy"""
  101. return self._strategy
  102. def _bind(self):
  103. """Bind the args and kwargs to the `placement_group()` signature.
  104. We bind the args and kwargs, so we can compare equality of two resource
  105. requests. The main reason for this is that the `placement_group()` API
  106. can evolve independently from the ResourceRequest API (e.g. adding new
  107. arguments). Then, `ResourceRequest(bundles, strategy, arg=arg)` should
  108. be the same as `ResourceRequest(bundles, strategy, arg)`.
  109. """
  110. sig = signature(placement_group)
  111. try:
  112. self._bound = sig.bind(
  113. self._bundles, self._strategy, *self._args, **self._kwargs
  114. )
  115. except Exception as exc:
  116. raise RuntimeError(
  117. "Invalid definition for resource request. Please check "
  118. "that you passed valid arguments to the ResourceRequest "
  119. "object."
  120. ) from exc
  121. def to_placement_group(self):
  122. return placement_group(*self._bound.args, **self._bound.kwargs)
  123. def __eq__(self, other: "ResourceRequest"):
  124. return (
  125. isinstance(other, ResourceRequest)
  126. and self._bound == other._bound
  127. and self.head_bundle_is_empty == other.head_bundle_is_empty
  128. )
  129. def __hash__(self):
  130. if not self._hash:
  131. # Cache hash
  132. self._hash = hash(
  133. json.dumps(
  134. {"args": self._bound.args, "kwargs": self._bound.kwargs},
  135. sort_keys=True,
  136. indent=0,
  137. ensure_ascii=True,
  138. )
  139. )
  140. return self._hash
  141. def __getstate__(self):
  142. state = self.__dict__.copy()
  143. state.pop("_hash", None)
  144. state.pop("_bound", None)
  145. return state
  146. def __setstate__(self, state):
  147. self.__dict__.update(state)
  148. self._hash = None
  149. self._bound = None
  150. self._bind()
  151. def __repr__(self) -> str:
  152. return (
  153. f"<ResourceRequest (_bound={self._bound}, "
  154. f"head_bundle_is_empty={self.head_bundle_is_empty})>"
  155. )
  156. @DeveloperAPI
  157. @dataclass
  158. class AcquiredResources(abc.ABC):
  159. """Base class for resources that have been acquired.
  160. Acquired resources can be associated to Ray objects, which can then be
  161. scheduled using these resources.
  162. Internally this can point e.g. to a placement group, a placement
  163. group bundle index, or just raw resources.
  164. The main API is the `annotate_remote_entities` method. This will associate
  165. remote Ray objects (tasks and actors) with the acquired resources by setting
  166. the Ray remote options to use the acquired resources.
  167. """
  168. resource_request: ResourceRequest
  169. def annotate_remote_entities(
  170. self, entities: List[RemoteRayEntity]
  171. ) -> List[Union[RemoteRayEntity]]:
  172. """Return remote ray entities (tasks/actors) to use the acquired resources.
  173. The first entity will be associated with the first bundle, the second
  174. entity will be associated with the second bundle, etc.
  175. Args:
  176. entities: Remote Ray entities to annotate with the acquired resources.
  177. """
  178. bundles = self.resource_request.bundles
  179. # Also count the empty head bundle as a bundle
  180. num_bundles = len(bundles) + int(self.resource_request.head_bundle_is_empty)
  181. if len(entities) > num_bundles:
  182. raise RuntimeError(
  183. f"The number of callables to annotate ({len(entities)}) cannot "
  184. f"exceed the number of available bundles ({num_bundles})."
  185. )
  186. annotated = []
  187. if self.resource_request.head_bundle_is_empty:
  188. # The empty head bundle is place on the first bundle index with empty
  189. # resources.
  190. annotated.append(
  191. self._annotate_remote_entity(entities[0], {}, bundle_index=0)
  192. )
  193. # Shift the remaining entities
  194. entities = entities[1:]
  195. for i, (entity, bundle) in enumerate(zip(entities, bundles)):
  196. annotated.append(
  197. self._annotate_remote_entity(entity, bundle, bundle_index=i)
  198. )
  199. return annotated
  200. def _annotate_remote_entity(
  201. self, entity: RemoteRayEntity, bundle: Dict[str, float], bundle_index: int
  202. ) -> RemoteRayEntity:
  203. raise NotImplementedError