resource_manager.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import abc
  2. from typing import List, Optional
  3. import ray
  4. from ray.air.execution.resources.request import AcquiredResources, ResourceRequest
  5. from ray.util.annotations import DeveloperAPI
  6. @DeveloperAPI
  7. class ResourceManager(abc.ABC):
  8. """Resource manager interface.
  9. A resource manager can be used to request resources from a Ray cluster and
  10. allocate them to remote Ray tasks or actors.
  11. Resources have to be requested before they can be acquired.
  12. Resources managed by the resource manager can be in three states:
  13. 1. "Requested": The resources have been requested but are not yet available to
  14. schedule remote Ray objects. The resource request may trigger autoscaling,
  15. and can be cancelled if no longer needed.
  16. 2. "Ready": The requested resources are now available to schedule remote Ray
  17. objects. They can be acquired and subsequently used remote Ray objects.
  18. The resource request can still be cancelled if no longer needed.
  19. 3. "Acquired": The resources have been acquired by a caller to use for scheduling
  20. remote Ray objects. Note that it is the responsibility of the caller to
  21. schedule the Ray objects with these resources.
  22. The associated resource request has been completed and can no longer be
  23. cancelled. The acquired resources can be freed by the resource manager when
  24. they are no longer used.
  25. The flow is as follows:
  26. .. code-block:: python
  27. # Create resource manager
  28. resource_manager = ResourceManager()
  29. # Create resource request
  30. resource_request = ResourceRequest([{"CPU": 4}])
  31. # Pass to resource manager
  32. resource_manager.request_resources(resource_request)
  33. # Wait until ready
  34. while not resource_manager.has_resources_ready(resource_request):
  35. time.sleep(1)
  36. # Once ready, acquire resources
  37. acquired_resource = resource_manager.acquire_resources(resource_request)
  38. # Bind to remote task or actor
  39. annotated_remote_fn = acquired_resource.annotate_remote_entities(
  40. [remote_fn])
  41. # Run remote function. This will use the acquired resources
  42. ray.get(annotated_remote_fn.remote())
  43. # After using the resources, free
  44. resource_manager.free_resources(annotated_resources)
  45. """
  46. def request_resources(self, resource_request: ResourceRequest):
  47. """Request resources.
  48. Depending on the backend, resources can trigger autoscaling. Requested
  49. resources can be ready or not ready. Once they are "ready", they can
  50. be acquired and used by remote Ray objects.
  51. Resource requests can be cancelled anytime using ``cancel_resource_request()``.
  52. Once acquired, the resource request is removed. Acquired resources can be
  53. freed with ``free_resources()``.
  54. """
  55. raise NotImplementedError
  56. def cancel_resource_request(self, resource_request: ResourceRequest):
  57. """Cancel resource request.
  58. Resource requests can be cancelled anytime before a resource is acquired.
  59. Acquiring a resource will remove the associated resource request.
  60. Acquired resources can be freed with ``free_resources()``.
  61. """
  62. raise NotImplementedError
  63. def has_resources_ready(self, resource_request: ResourceRequest) -> bool:
  64. """Returns True if resources for the given request are ready to be acquired."""
  65. raise NotImplementedError
  66. def acquire_resources(
  67. self, resource_request: ResourceRequest
  68. ) -> Optional[AcquiredResources]:
  69. """Acquire resources. Returns None if resources are not ready to be acquired.
  70. Acquiring resources will remove the associated resource request.
  71. Acquired resources can be returned with ``free_resources()``.
  72. """
  73. raise NotImplementedError
  74. def free_resources(self, acquired_resource: AcquiredResources):
  75. """Free acquired resources from usage and return them to the resource manager.
  76. Freeing resources will return the resources to the manager, but there are
  77. no guarantees about the tasks and actors scheduled on the resources. The caller
  78. should make sure that any references to tasks or actors scheduled on the
  79. resources have been removed before calling ``free_resources()``.
  80. """
  81. raise NotImplementedError
  82. def get_resource_futures(self) -> List[ray.ObjectRef]:
  83. """Return futures for resources to await.
  84. Depending on the backend, we use resource futures to determine availability
  85. of resources (e.g. placement groups) or resolution of requests.
  86. In this case, the futures can be awaited externally by the caller.
  87. When a resource future resolved, the caller may call ``update_state()``
  88. to force the resource manager to update its internal state immediately.
  89. """
  90. return []
  91. def update_state(self):
  92. """Update internal state of the resource manager.
  93. The resource manager may have internal state that needs periodic updating.
  94. For instance, depending on the backend, resource futures can be awaited
  95. externally (with ``get_resource_futures()``).
  96. If such a future resolved, the caller can instruct the resource
  97. manager to update its internal state immediately.
  98. """
  99. pass
  100. def clear(self):
  101. """Reset internal state and clear all resources.
  102. Calling this method will reset the resource manager to its initialization state.
  103. All resources will be removed.
  104. Clearing the state will remove tracked resources from the manager, but there are
  105. no guarantees about the tasks and actors scheduled on the resources. The caller
  106. should make sure that any references to tasks or actors scheduled on the
  107. resources have been removed before calling ``clear()``.
  108. """
  109. raise NotImplementedError
  110. def __reduce__(self):
  111. """We disallow serialization.
  112. Shared resource managers should live on an actor.
  113. """
  114. raise ValueError(
  115. f"Resource managers cannot be serialized. Resource manager: {str(self)}"
  116. )