helpers.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. from typing import TYPE_CHECKING, Any, Iterable, Iterator, Optional, Sequence, Union
  2. import ray
  3. from ray.util.annotations import PublicAPI
  4. if TYPE_CHECKING:
  5. from ray import ObjectRef
  6. from ray.remote_function import RemoteFunction
  7. # ray.wait() has a default num_returns of 1.
  8. # Using a slightly larger batch until the optimization is fully implemented, see
  9. # https://github.com/ray-project/ray/issues/49905
  10. DEFAULT_CHUNK_SIZE = 10
  11. DEFAULT_BACKPRESSURE_SIZE = 100
  12. def _wait_and_get_single_batch(
  13. refs: "Sequence[ObjectRef]",
  14. *,
  15. chunk_size: int,
  16. yield_obj_refs: bool = False,
  17. **kwargs,
  18. ) -> tuple[list[Union[Any, "ObjectRef"]], "list[ObjectRef]"]:
  19. """Call ray.wait and explicitly return the ready objects/results
  20. and remaining Ray remote refs.
  21. Args:
  22. refs: A list of Ray object refs.
  23. chunk_size: The `num_returns` parameter to pass to `ray.wait()`.
  24. yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
  25. **kwargs: Additional keyword arguments to pass to `ray.wait()`.
  26. Returns:
  27. A tuple of two lists, ready and not ready. This is the same as the return value of `ray.wait()`.
  28. """
  29. if chunk_size < 1:
  30. raise ValueError("`chunk_size` must be >= 1")
  31. kwargs = kwargs or {}
  32. # num_returns must be <= len(refs)
  33. ready, refs = ray.wait(
  34. refs,
  35. num_returns=min(chunk_size, len(refs)),
  36. **kwargs,
  37. )
  38. if not yield_obj_refs:
  39. return ray.get(ready), refs
  40. return ready, refs
  41. @PublicAPI(stability="alpha")
  42. def as_completed(
  43. refs: "Sequence[ObjectRef]",
  44. *,
  45. chunk_size: int = DEFAULT_CHUNK_SIZE,
  46. yield_obj_refs: bool = False,
  47. **kwargs,
  48. ) -> Iterator[Union[Any, "ObjectRef"]]:
  49. """Given a list of Ray task references, yield results as they become available.
  50. Unlike calling :meth:`~ray.get` on a list of references (i.e., `ray.get(refs)`) which
  51. waits for all results to be ready, this function begins to yield result as soon as
  52. a batch of `chunk_size` results are ready.
  53. .. note::
  54. Generally there is no guarantee on the order of results. For example, the first result
  55. is not necessarily the first one completed, but rather the first one submitted in the
  56. first available batch (See :meth:`~ray.wait` for more details about
  57. preservation of submission order).
  58. .. note::
  59. Use this function instead of calling :meth:`~ray.get` inside a for loop. See
  60. https://docs.ray.io/en/latest/ray-core/patterns/ray-get-loop.html for more details.
  61. Example:
  62. Suppose we have a function that sleeps for x seconds depending on the input.
  63. We expect to obtain a partially sorted list of results.
  64. .. testcode:: python
  65. import ray
  66. import time
  67. @ray.remote
  68. def f(x):
  69. time.sleep(x)
  70. return x
  71. refs = [f.remote(i) for i in [10, 4, 6, 8, 2]]
  72. for x in ray.util.as_completed(refs, chunk_size=2):
  73. print(x)
  74. .. testoutput::
  75. :options: +MOCK
  76. # Output:
  77. 4
  78. 2
  79. 6
  80. 8
  81. 10
  82. Args:
  83. refs: A list of Ray object refs.
  84. chunk_size: The number of tasks to wait for in each iteration (default 10).
  85. The parameter is passed as `num_returns` to :meth:`~ray.wait` internally.
  86. yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
  87. **kwargs: Additional keyword arguments to pass to :meth:`~ray.wait`, e.g.,
  88. `timeout` and `fetch_local`.
  89. Yields:
  90. Union[Any, ObjectRef]: The results (or optionally their Ray references) of the Ray tasks as they complete.
  91. """
  92. if chunk_size < 1:
  93. raise ValueError("`chunk_size` must be >= 1")
  94. if "num_returns" in kwargs:
  95. raise ValueError("Use the `chunksize` argument instead of `num_returns`.")
  96. while refs:
  97. results, refs = _wait_and_get_single_batch(
  98. refs,
  99. chunk_size=chunk_size,
  100. yield_obj_refs=yield_obj_refs,
  101. **kwargs,
  102. )
  103. yield from results
  104. @PublicAPI(stability="alpha")
  105. def map_unordered(
  106. fn: "RemoteFunction",
  107. items: Iterable[Any],
  108. *,
  109. backpressure_size: Optional[int] = DEFAULT_BACKPRESSURE_SIZE,
  110. chunk_size: int = DEFAULT_CHUNK_SIZE,
  111. yield_obj_refs: bool = False,
  112. **kwargs,
  113. ) -> Iterator[Union[Any, "ObjectRef"]]:
  114. """Apply a Ray remote function to a list of items and return an iterator that yields
  115. the completed results as they become available.
  116. This helper function applies backpressure to control the number of pending tasks, following the
  117. design pattern described in
  118. https://docs.ray.io/en/latest/ray-core/patterns/limit-pending-tasks.html.
  119. .. note::
  120. There is generally no guarantee on the order of results.
  121. Example:
  122. Suppose we have a function that sleeps for x seconds depending on the input.
  123. We expect to obtain a partially sorted list of results.
  124. .. testcode:: python
  125. import ray
  126. import time
  127. @ray.remote
  128. def f(x):
  129. time.sleep(x)
  130. return x
  131. # Example 1: chunk_size=2
  132. for x in ray.util.map_unordered(f, [10, 4, 6, 8, 2], chunk_size=2):
  133. print(x)
  134. .. testoutput::
  135. :options: +MOCK
  136. 4
  137. 2
  138. 6
  139. 8
  140. 10
  141. .. testcode:: python
  142. # Example 2: backpressure_size=2, chunk_size=1
  143. for x in ray.util.map_unordered(f, [10, 4, 6, 8, 2], backpressure_size=2, chunk_size=1):
  144. print(x)
  145. .. testoutput::
  146. :options: +MOCK
  147. 4
  148. 10
  149. 6
  150. 8
  151. 2
  152. Args:
  153. fn: A remote function to apply to the list of items. For more complex use cases, use Ray Data's
  154. :meth:`~ray.data.Dataset.map` / :meth:`~ray.data.Dataset.map_batches` instead.
  155. items: An iterable of items to apply the function to.
  156. backpressure_size: Maximum number of in-flight tasks allowed before
  157. calling a blocking :meth:`~ray.wait` (default 100). If None, no backpressure is applied.
  158. chunk_size: The number of tasks to wait for when the number of in-flight tasks exceeds
  159. `backpressure_size`. The parameter is passed as `num_returns` to :meth:`~ray.wait` internally.
  160. yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
  161. **kwargs: Additional keyword arguments to pass to :meth:`~ray.wait`, e.g.,
  162. `timeout` and `fetch_local`.
  163. Yields:
  164. Union[Any, ObjectRef]: The results (or optionally their Ray references) of the Ray tasks as they complete.
  165. .. seealso::
  166. :meth:`~ray.util.as_completed`
  167. Call this method for an existing list of Ray object refs.
  168. :meth:`~ray.data.Dataset.map`
  169. Use Ray Data APIs (e.g., :meth:`~ray.data.Dataset.map` and :meth:`~ray.data.Dataset.map_batches`)
  170. for better control and complex use cases, e.g., functions with multiple arguments.
  171. .. note::
  172. This is an altenative to `pool.imap_unordered()` in Ray's Actor-based `multiprocessing.Pool`.
  173. See https://docs.ray.io/en/latest/ray-more-libs/multiprocessing.html for more details.
  174. """
  175. if backpressure_size is None:
  176. backpressure_size: float = float("inf")
  177. elif backpressure_size <= 0:
  178. raise ValueError("backpressure_size must be positive.")
  179. if chunk_size < 1:
  180. raise ValueError("`chunk_size` must be >= 1")
  181. if "num_returns" in kwargs:
  182. raise ValueError("Use the `chunk_size` argument instead of `num_returns`.")
  183. refs = []
  184. for item in items:
  185. refs.append(fn.remote(item))
  186. if len(refs) >= backpressure_size:
  187. results, refs = _wait_and_get_single_batch(
  188. refs,
  189. chunk_size=chunk_size,
  190. yield_obj_refs=yield_obj_refs,
  191. **kwargs,
  192. )
  193. yield from results
  194. else:
  195. yield from as_completed(
  196. refs,
  197. chunk_size=chunk_size,
  198. yield_obj_refs=yield_obj_refs,
  199. **kwargs,
  200. )