placement_group.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import time
  2. from collections import defaultdict
  3. from dataclasses import dataclass
  4. from typing import Dict, List, Optional, Set
  5. import ray
  6. from ray.air.execution.resources.request import (
  7. AcquiredResources,
  8. RemoteRayEntity,
  9. ResourceRequest,
  10. )
  11. from ray.air.execution.resources.resource_manager import ResourceManager
  12. from ray.util.annotations import DeveloperAPI
  13. from ray.util.placement_group import PlacementGroup, remove_placement_group
  14. from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
  15. @DeveloperAPI
  16. @dataclass
  17. class PlacementGroupAcquiredResources(AcquiredResources):
  18. placement_group: PlacementGroup
  19. def _annotate_remote_entity(
  20. self, entity: RemoteRayEntity, bundle: Dict[str, float], bundle_index: int
  21. ) -> RemoteRayEntity:
  22. bundle = bundle.copy()
  23. num_cpus = bundle.pop("CPU", 0)
  24. num_gpus = bundle.pop("GPU", 0)
  25. memory = bundle.pop("memory", 0.0)
  26. return entity.options(
  27. scheduling_strategy=PlacementGroupSchedulingStrategy(
  28. placement_group=self.placement_group,
  29. placement_group_bundle_index=bundle_index,
  30. placement_group_capture_child_tasks=True,
  31. ),
  32. num_cpus=num_cpus,
  33. num_gpus=num_gpus,
  34. memory=memory,
  35. resources=bundle,
  36. )
  37. @DeveloperAPI
  38. class PlacementGroupResourceManager(ResourceManager):
  39. """Resource manager using placement groups as the resource backend.
  40. This manager will use placement groups to fulfill resource requests. Requesting
  41. a resource will schedule the placement group. Acquiring a resource will
  42. return a ``PlacementGroupAcquiredResources`` that can be used to schedule
  43. Ray tasks and actors on the placement group. Freeing an acquired resource
  44. will destroy the associated placement group.
  45. Ray core does not emit events when resources are available. Instead, the
  46. scheduling state has to be periodically updated.
  47. Per default, placement group scheduling state is refreshed every time when
  48. resource state is inquired, but not more often than once every ``update_interval_s``
  49. seconds. Alternatively, staging futures can be retrieved (and awaited) with
  50. ``get_resource_futures()`` and state update can be force with ``update_state()``.
  51. Args:
  52. update_interval_s: Minimum interval in seconds between updating scheduling
  53. state of placement groups.
  54. """
  55. _resource_cls: AcquiredResources = PlacementGroupAcquiredResources
  56. def __init__(self, update_interval_s: float = 0.1):
  57. # Internally, the placement group lifecycle is like this:
  58. # - Resources are requested with ``request_resources()``
  59. # - A placement group is scheduled ("staged")
  60. # - A ``PlacementGroup.ready()`` future is scheduled ("staging future")
  61. # - We update the scheduling state when we need to
  62. # (e.g. when ``has_resources_ready()`` is called)
  63. # - When staging futures resolve, a placement group is moved from "staging"
  64. # to "ready"
  65. # - When a resource request is canceled, we remove a placement group from
  66. # "staging". If there are not staged placement groups
  67. # (because they are already "ready"), we remove one from "ready" instead.
  68. # - When a resource is acquired, the pg is removed from "ready" and moved
  69. # to "acquired"
  70. # - When a resource is freed, the pg is removed from "acquired" and destroyed
  71. # Mapping of placement group to request
  72. self._pg_to_request: Dict[PlacementGroup, ResourceRequest] = {}
  73. # PGs that are staged but not "ready", yet (i.e. not CREATED)
  74. self._request_to_staged_pgs: Dict[
  75. ResourceRequest, Set[PlacementGroup]
  76. ] = defaultdict(set)
  77. # PGs that are CREATED and can be used by tasks and actors
  78. self._request_to_ready_pgs: Dict[
  79. ResourceRequest, Set[PlacementGroup]
  80. ] = defaultdict(set)
  81. # Staging futures used to update internal state.
  82. # We keep a double mapping here for better lookup efficiency.
  83. self._staging_future_to_pg: Dict[ray.ObjectRef, PlacementGroup] = dict()
  84. self._pg_to_staging_future: Dict[PlacementGroup, ray.ObjectRef] = dict()
  85. # Set of acquired PGs. We keep track of these here to make sure we
  86. # only free PGs that this manager managed.
  87. self._acquired_pgs: Set[PlacementGroup] = set()
  88. # Minimum time between updates of the internal state
  89. self.update_interval_s = update_interval_s
  90. self._last_update = time.monotonic() - self.update_interval_s - 1
  91. def get_resource_futures(self) -> List[ray.ObjectRef]:
  92. return list(self._staging_future_to_pg.keys())
  93. def _maybe_update_state(self):
  94. now = time.monotonic()
  95. if now > self._last_update + self.update_interval_s:
  96. self.update_state()
  97. def update_state(self):
  98. ready, not_ready = ray.wait(
  99. list(self._staging_future_to_pg.keys()),
  100. num_returns=len(self._staging_future_to_pg),
  101. timeout=0,
  102. )
  103. for future in ready:
  104. # Remove staging future
  105. pg = self._staging_future_to_pg.pop(future)
  106. self._pg_to_staging_future.pop(pg)
  107. # Fetch resource request
  108. request = self._pg_to_request[pg]
  109. # Remove from staging, add to ready
  110. self._request_to_staged_pgs[request].remove(pg)
  111. self._request_to_ready_pgs[request].add(pg)
  112. self._last_update = time.monotonic()
  113. def request_resources(self, resource_request: ResourceRequest):
  114. pg = resource_request.to_placement_group()
  115. self._pg_to_request[pg] = resource_request
  116. self._request_to_staged_pgs[resource_request].add(pg)
  117. future = pg.ready()
  118. self._staging_future_to_pg[future] = pg
  119. self._pg_to_staging_future[pg] = future
  120. def cancel_resource_request(self, resource_request: ResourceRequest):
  121. if self._request_to_staged_pgs[resource_request]:
  122. pg = self._request_to_staged_pgs[resource_request].pop()
  123. # PG was staging
  124. future = self._pg_to_staging_future.pop(pg)
  125. self._staging_future_to_pg.pop(future)
  126. # Cancel the pg.ready task.
  127. # Otherwise, it will be pending node assignment forever.
  128. ray.cancel(future)
  129. else:
  130. # PG might be ready
  131. pg = self._request_to_ready_pgs[resource_request].pop()
  132. if not pg:
  133. raise RuntimeError(
  134. "Cannot cancel resource request: No placement group was "
  135. f"staged or is ready. Make sure to not cancel more resource "
  136. f"requests than you've created. Request: {resource_request}"
  137. )
  138. self._pg_to_request.pop(pg)
  139. ray.util.remove_placement_group(pg)
  140. def has_resources_ready(self, resource_request: ResourceRequest) -> bool:
  141. if not bool(len(self._request_to_ready_pgs[resource_request])):
  142. # Only update state if needed
  143. self._maybe_update_state()
  144. return bool(len(self._request_to_ready_pgs[resource_request]))
  145. def acquire_resources(
  146. self, resource_request: ResourceRequest
  147. ) -> Optional[PlacementGroupAcquiredResources]:
  148. if not self.has_resources_ready(resource_request):
  149. return None
  150. pg = self._request_to_ready_pgs[resource_request].pop()
  151. self._acquired_pgs.add(pg)
  152. return self._resource_cls(placement_group=pg, resource_request=resource_request)
  153. def free_resources(self, acquired_resource: PlacementGroupAcquiredResources):
  154. pg = acquired_resource.placement_group
  155. self._acquired_pgs.remove(pg)
  156. remove_placement_group(pg)
  157. self._pg_to_request.pop(pg)
  158. def clear(self):
  159. if not ray.is_initialized():
  160. return
  161. for staged_pgs in self._request_to_staged_pgs.values():
  162. for staged_pg in staged_pgs:
  163. remove_placement_group(staged_pg)
  164. for ready_pgs in self._request_to_ready_pgs.values():
  165. for ready_pg in ready_pgs:
  166. remove_placement_group(ready_pg)
  167. for acquired_pg in self._acquired_pgs:
  168. remove_placement_group(acquired_pg)
  169. # Reset internal state
  170. self.__init__(update_interval_s=self.update_interval_s)
  171. def __del__(self):
  172. self.clear()