compute.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import logging
  2. from typing import Any, Callable, Iterable, Optional, TypeVar, Union
  3. from ray.data._internal.execution.interfaces import TaskContext
  4. from ray.data.block import Block, UserDefinedFunction
  5. from ray.util.annotations import DeveloperAPI, PublicAPI
  6. logger = logging.getLogger(__name__)
  7. T = TypeVar("T")
  8. U = TypeVar("U")
  9. # Block transform function applied by task and actor pools.
  10. BlockTransform = Union[
  11. # TODO(Clark): Once Ray only supports Python 3.8+, use protocol to constrain block
  12. # transform type.
  13. # Callable[[Block, ...], Iterable[Block]]
  14. # Callable[[Block, UserDefinedFunction, ...], Iterable[Block]],
  15. Callable[[Iterable[Block], TaskContext], Iterable[Block]],
  16. Callable[[Iterable[Block], TaskContext, UserDefinedFunction], Iterable[Block]],
  17. Callable[..., Iterable[Block]],
  18. ]
  19. @DeveloperAPI
  20. class ComputeStrategy:
  21. pass
  22. @PublicAPI
  23. class TaskPoolStrategy(ComputeStrategy):
  24. """Specify the task-based compute strategy for a Dataset transform.
  25. TaskPoolStrategy executes dataset transformations using Ray tasks that are
  26. scheduled through a pool. Provide ``size`` to cap the number of concurrent
  27. tasks; leave it unset to allow Ray Data to scale the task count
  28. automatically.
  29. """
  30. def __init__(
  31. self,
  32. size: Optional[int] = None,
  33. ):
  34. """Construct TaskPoolStrategy for a Dataset transform.
  35. Args:
  36. size: Specify the maximum size of the task pool.
  37. """
  38. if size is not None and size < 1:
  39. raise ValueError("`size` must be >= 1", size)
  40. self.size = size
  41. def __eq__(self, other: Any) -> bool:
  42. return (isinstance(other, TaskPoolStrategy) and self.size == other.size) or (
  43. other == "tasks" and self.size is None
  44. )
  45. def __repr__(self) -> str:
  46. return f"TaskPoolStrategy(size={self.size})"
  47. @PublicAPI
  48. class ActorPoolStrategy(ComputeStrategy):
  49. """Specify the actor-based compute strategy for a Dataset transform.
  50. ActorPoolStrategy specifies that an autoscaling pool of actors should be used
  51. for a given Dataset transform. This is useful for stateful setup of callable
  52. classes.
  53. For a fixed-sized pool of size ``n``, use ``ActorPoolStrategy(size=n)``.
  54. To autoscale from ``m`` to ``n`` actors, use
  55. ``ActorPoolStrategy(min_size=m, max_size=n)``.
  56. To autoscale from ``m`` to ``n`` actors, with an initial size of ``initial``, use
  57. ``ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)``.
  58. To increase opportunities for pipelining task dependency prefetching with
  59. computation and avoiding actor startup delays, set max_tasks_in_flight_per_actor
  60. to 2 or greater; to try to decrease the delay due to queueing of tasks on the worker
  61. actors, set max_tasks_in_flight_per_actor to 1.
  62. The `enable_true_multi_threading` argument primarily exists to prevent GPU OOM issues with multi-threaded actors.
  63. The life cycle of an actor task involves 3 main steps:
  64. 1. Batching Inputs
  65. 2. Running actor UDF
  66. 3. Batching Outputs
  67. The `enable_true_multi_threading` flag affects step 2. If set to `True`, then the UDF can be run concurrently.
  68. By default, it is set to `False`, so at most 1 actor UDF is running at a time per actor. The `max_concurrency`
  69. flag on `ray.remote` affects steps 1 and 3. Below is a matrix summary:
  70. - [`enable_true_multi_threading=False or True`, `max_concurrency=1`] = 1 actor task running per actor. So at most 1
  71. of steps 1, 2, or 3 is running at any point in time.
  72. - [`enable_true_multi_threading=False`, `max_concurrency>1`] = multiple tasks running per actor
  73. (respecting GIL) but UDF runs 1 at a time. This is useful for doing CPU and GPU work,
  74. where you want to use a large batch size but want to hide the overhead of *batching*
  75. the inputs. In this case, CPU *batching* is done concurrently, while GPU *inference*
  76. is done 1 at a time. Concretely, steps 1 and 3 can have multiple threads, while step 2 is done serially.
  77. - [`enable_true_multi_threading=True`, `max_concurrency>1`] = multiple tasks running per actor.
  78. Unlike bullet #3 ^, the UDF runs concurrently (respecting GIL). No restrictions on steps 1, 2, or 3
  79. NOTE: `enable_true_multi_threading` does not apply to async actors
  80. """
  81. def __init__(
  82. self,
  83. *,
  84. size: Optional[int] = None,
  85. min_size: Optional[int] = None,
  86. max_size: Optional[int] = None,
  87. initial_size: Optional[int] = None,
  88. max_tasks_in_flight_per_actor: Optional[int] = None,
  89. enable_true_multi_threading: bool = False,
  90. ):
  91. """Construct ActorPoolStrategy for a Dataset transform.
  92. Args:
  93. size: Specify a fixed size actor pool of this size. It is an error to
  94. specify both `size` and `min_size` or `max_size`.
  95. min_size: The minimum size of the actor pool.
  96. max_size: The maximum size of the actor pool.
  97. initial_size: The initial number of actors to start with. If not specified,
  98. defaults to min_size. Must be between min_size and max_size.
  99. max_tasks_in_flight_per_actor: The maximum number of tasks to concurrently
  100. send to a single actor worker. Increasing this will increase
  101. opportunities for pipelining task dependency prefetching with
  102. computation and avoiding actor startup delays, but will also increase
  103. queueing delay.
  104. enable_true_multi_threading: If enable_true_multi_threading=False, no more than 1 UDF
  105. runs per actor. Otherwise, respects the `max_concurrency` argument. For more details, see
  106. the `ActorPoolStrategy` class docstring.
  107. """
  108. if size is not None:
  109. if size < 1:
  110. raise ValueError("size must be >= 1", size)
  111. if max_size is not None or min_size is not None or initial_size is not None:
  112. raise ValueError(
  113. "min_size, max_size, and initial_size cannot be set at the same time as `size`"
  114. )
  115. min_size = size
  116. max_size = size
  117. initial_size = size
  118. if min_size is not None and min_size < 1:
  119. raise ValueError("min_size must be >= 1", min_size)
  120. if max_size is not None:
  121. if min_size is None:
  122. min_size = 1 # Legacy default.
  123. if min_size > max_size:
  124. raise ValueError("min_size must be <= max_size", min_size, max_size)
  125. if (
  126. max_tasks_in_flight_per_actor is not None
  127. and max_tasks_in_flight_per_actor < 1
  128. ):
  129. raise ValueError(
  130. "max_tasks_in_flight_per_actor must be >= 1, got: ",
  131. max_tasks_in_flight_per_actor,
  132. )
  133. self.min_size = min_size or 1
  134. self.max_size = max_size or float("inf")
  135. # Validate and set initial_size
  136. if initial_size is not None:
  137. if initial_size < self.min_size:
  138. raise ValueError(
  139. f"initial_size ({initial_size}) must be >= min_size ({self.min_size})"
  140. )
  141. if self.max_size != float("inf") and initial_size > self.max_size:
  142. raise ValueError(
  143. f"initial_size ({initial_size}) must be <= max_size ({self.max_size})"
  144. )
  145. self.initial_size = initial_size or self.min_size
  146. self.max_tasks_in_flight_per_actor = max_tasks_in_flight_per_actor
  147. self.num_workers = 0
  148. self.ready_to_total_workers_ratio = 0.8
  149. self.enable_true_multi_threading = enable_true_multi_threading
  150. def __eq__(self, other: Any) -> bool:
  151. return isinstance(other, ActorPoolStrategy) and (
  152. self.min_size == other.min_size
  153. and self.max_size == other.max_size
  154. and self.initial_size == other.initial_size
  155. and self.enable_true_multi_threading == other.enable_true_multi_threading
  156. and self.max_tasks_in_flight_per_actor
  157. == other.max_tasks_in_flight_per_actor
  158. )
  159. def __repr__(self) -> str:
  160. return (
  161. f"ActorPoolStrategy(min_size={self.min_size}, "
  162. f"max_size={self.max_size}, "
  163. f"initial_size={self.initial_size}, "
  164. f"max_tasks_in_flight_per_actor={self.max_tasks_in_flight_per_actor})"
  165. f"num_workers={self.num_workers}, "
  166. f"enable_true_multi_threading={self.enable_true_multi_threading}, "
  167. f"ready_to_total_workers_ratio={self.ready_to_total_workers_ratio})"
  168. )
  169. def get_compute(compute_spec: Union[str, ComputeStrategy]) -> ComputeStrategy:
  170. if not isinstance(compute_spec, (TaskPoolStrategy, ActorPoolStrategy)):
  171. raise ValueError(
  172. "In Ray 2.5, the compute spec must be either "
  173. f"TaskPoolStrategy or ActorPoolStrategy, was: {compute_spec}."
  174. )
  175. elif not compute_spec or compute_spec == "tasks":
  176. return TaskPoolStrategy()
  177. elif compute_spec == "actors":
  178. return ActorPoolStrategy()
  179. elif isinstance(compute_spec, ComputeStrategy):
  180. return compute_spec
  181. else:
  182. raise ValueError("compute must be one of [`tasks`, `actors`, ComputeStrategy]")