| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- import time
- from collections import defaultdict
- from dataclasses import dataclass
- from typing import Dict, List, Optional, Set
- import ray
- from ray.air.execution.resources.request import (
- AcquiredResources,
- RemoteRayEntity,
- ResourceRequest,
- )
- from ray.air.execution.resources.resource_manager import ResourceManager
- from ray.util.annotations import DeveloperAPI
- from ray.util.placement_group import PlacementGroup, remove_placement_group
- from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
- @DeveloperAPI
- @dataclass
- class PlacementGroupAcquiredResources(AcquiredResources):
- placement_group: PlacementGroup
- def _annotate_remote_entity(
- self, entity: RemoteRayEntity, bundle: Dict[str, float], bundle_index: int
- ) -> RemoteRayEntity:
- bundle = bundle.copy()
- num_cpus = bundle.pop("CPU", 0)
- num_gpus = bundle.pop("GPU", 0)
- memory = bundle.pop("memory", 0.0)
- return entity.options(
- scheduling_strategy=PlacementGroupSchedulingStrategy(
- placement_group=self.placement_group,
- placement_group_bundle_index=bundle_index,
- placement_group_capture_child_tasks=True,
- ),
- num_cpus=num_cpus,
- num_gpus=num_gpus,
- memory=memory,
- resources=bundle,
- )
- @DeveloperAPI
- class PlacementGroupResourceManager(ResourceManager):
- """Resource manager using placement groups as the resource backend.
- This manager will use placement groups to fulfill resource requests. Requesting
- a resource will schedule the placement group. Acquiring a resource will
- return a ``PlacementGroupAcquiredResources`` that can be used to schedule
- Ray tasks and actors on the placement group. Freeing an acquired resource
- will destroy the associated placement group.
- Ray core does not emit events when resources are available. Instead, the
- scheduling state has to be periodically updated.
- Per default, placement group scheduling state is refreshed every time when
- resource state is inquired, but not more often than once every ``update_interval_s``
- seconds. Alternatively, staging futures can be retrieved (and awaited) with
- ``get_resource_futures()`` and state update can be force with ``update_state()``.
- Args:
- update_interval_s: Minimum interval in seconds between updating scheduling
- state of placement groups.
- """
- _resource_cls: AcquiredResources = PlacementGroupAcquiredResources
- def __init__(self, update_interval_s: float = 0.1):
- # Internally, the placement group lifecycle is like this:
- # - Resources are requested with ``request_resources()``
- # - A placement group is scheduled ("staged")
- # - A ``PlacementGroup.ready()`` future is scheduled ("staging future")
- # - We update the scheduling state when we need to
- # (e.g. when ``has_resources_ready()`` is called)
- # - When staging futures resolve, a placement group is moved from "staging"
- # to "ready"
- # - When a resource request is canceled, we remove a placement group from
- # "staging". If there are not staged placement groups
- # (because they are already "ready"), we remove one from "ready" instead.
- # - When a resource is acquired, the pg is removed from "ready" and moved
- # to "acquired"
- # - When a resource is freed, the pg is removed from "acquired" and destroyed
- # Mapping of placement group to request
- self._pg_to_request: Dict[PlacementGroup, ResourceRequest] = {}
- # PGs that are staged but not "ready", yet (i.e. not CREATED)
- self._request_to_staged_pgs: Dict[
- ResourceRequest, Set[PlacementGroup]
- ] = defaultdict(set)
- # PGs that are CREATED and can be used by tasks and actors
- self._request_to_ready_pgs: Dict[
- ResourceRequest, Set[PlacementGroup]
- ] = defaultdict(set)
- # Staging futures used to update internal state.
- # We keep a double mapping here for better lookup efficiency.
- self._staging_future_to_pg: Dict[ray.ObjectRef, PlacementGroup] = dict()
- self._pg_to_staging_future: Dict[PlacementGroup, ray.ObjectRef] = dict()
- # Set of acquired PGs. We keep track of these here to make sure we
- # only free PGs that this manager managed.
- self._acquired_pgs: Set[PlacementGroup] = set()
- # Minimum time between updates of the internal state
- self.update_interval_s = update_interval_s
- self._last_update = time.monotonic() - self.update_interval_s - 1
- def get_resource_futures(self) -> List[ray.ObjectRef]:
- return list(self._staging_future_to_pg.keys())
- def _maybe_update_state(self):
- now = time.monotonic()
- if now > self._last_update + self.update_interval_s:
- self.update_state()
- def update_state(self):
- ready, not_ready = ray.wait(
- list(self._staging_future_to_pg.keys()),
- num_returns=len(self._staging_future_to_pg),
- timeout=0,
- )
- for future in ready:
- # Remove staging future
- pg = self._staging_future_to_pg.pop(future)
- self._pg_to_staging_future.pop(pg)
- # Fetch resource request
- request = self._pg_to_request[pg]
- # Remove from staging, add to ready
- self._request_to_staged_pgs[request].remove(pg)
- self._request_to_ready_pgs[request].add(pg)
- self._last_update = time.monotonic()
- def request_resources(self, resource_request: ResourceRequest):
- pg = resource_request.to_placement_group()
- self._pg_to_request[pg] = resource_request
- self._request_to_staged_pgs[resource_request].add(pg)
- future = pg.ready()
- self._staging_future_to_pg[future] = pg
- self._pg_to_staging_future[pg] = future
- def cancel_resource_request(self, resource_request: ResourceRequest):
- if self._request_to_staged_pgs[resource_request]:
- pg = self._request_to_staged_pgs[resource_request].pop()
- # PG was staging
- future = self._pg_to_staging_future.pop(pg)
- self._staging_future_to_pg.pop(future)
- # Cancel the pg.ready task.
- # Otherwise, it will be pending node assignment forever.
- ray.cancel(future)
- else:
- # PG might be ready
- pg = self._request_to_ready_pgs[resource_request].pop()
- if not pg:
- raise RuntimeError(
- "Cannot cancel resource request: No placement group was "
- f"staged or is ready. Make sure to not cancel more resource "
- f"requests than you've created. Request: {resource_request}"
- )
- self._pg_to_request.pop(pg)
- ray.util.remove_placement_group(pg)
- def has_resources_ready(self, resource_request: ResourceRequest) -> bool:
- if not bool(len(self._request_to_ready_pgs[resource_request])):
- # Only update state if needed
- self._maybe_update_state()
- return bool(len(self._request_to_ready_pgs[resource_request]))
- def acquire_resources(
- self, resource_request: ResourceRequest
- ) -> Optional[PlacementGroupAcquiredResources]:
- if not self.has_resources_ready(resource_request):
- return None
- pg = self._request_to_ready_pgs[resource_request].pop()
- self._acquired_pgs.add(pg)
- return self._resource_cls(placement_group=pg, resource_request=resource_request)
- def free_resources(self, acquired_resource: PlacementGroupAcquiredResources):
- pg = acquired_resource.placement_group
- self._acquired_pgs.remove(pg)
- remove_placement_group(pg)
- self._pg_to_request.pop(pg)
- def clear(self):
- if not ray.is_initialized():
- return
- for staged_pgs in self._request_to_staged_pgs.values():
- for staged_pg in staged_pgs:
- remove_placement_group(staged_pg)
- for ready_pgs in self._request_to_ready_pgs.values():
- for ready_pg in ready_pgs:
- remove_placement_group(ready_pg)
- for acquired_pg in self._acquired_pgs:
- remove_placement_group(acquired_pg)
- # Reset internal state
- self.__init__(update_interval_s=self.update_interval_s)
- def __del__(self):
- self.clear()
|