| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- from typing import TYPE_CHECKING, Any, Iterable, Iterator, Optional, Sequence, Union
- import ray
- from ray.util.annotations import PublicAPI
- if TYPE_CHECKING:
- from ray import ObjectRef
- from ray.remote_function import RemoteFunction
- # ray.wait() has a default num_returns of 1.
- # Using a slightly larger batch until the optimization is fully implemented, see
- # https://github.com/ray-project/ray/issues/49905
- DEFAULT_CHUNK_SIZE = 10
- DEFAULT_BACKPRESSURE_SIZE = 100
- def _wait_and_get_single_batch(
- refs: "Sequence[ObjectRef]",
- *,
- chunk_size: int,
- yield_obj_refs: bool = False,
- **kwargs,
- ) -> tuple[list[Union[Any, "ObjectRef"]], "list[ObjectRef]"]:
- """Call ray.wait and explicitly return the ready objects/results
- and remaining Ray remote refs.
- Args:
- refs: A list of Ray object refs.
- chunk_size: The `num_returns` parameter to pass to `ray.wait()`.
- yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
- **kwargs: Additional keyword arguments to pass to `ray.wait()`.
- Returns:
- A tuple of two lists, ready and not ready. This is the same as the return value of `ray.wait()`.
- """
- if chunk_size < 1:
- raise ValueError("`chunk_size` must be >= 1")
- kwargs = kwargs or {}
- # num_returns must be <= len(refs)
- ready, refs = ray.wait(
- refs,
- num_returns=min(chunk_size, len(refs)),
- **kwargs,
- )
- if not yield_obj_refs:
- return ray.get(ready), refs
- return ready, refs
- @PublicAPI(stability="alpha")
- def as_completed(
- refs: "Sequence[ObjectRef]",
- *,
- chunk_size: int = DEFAULT_CHUNK_SIZE,
- yield_obj_refs: bool = False,
- **kwargs,
- ) -> Iterator[Union[Any, "ObjectRef"]]:
- """Given a list of Ray task references, yield results as they become available.
- Unlike calling :meth:`~ray.get` on a list of references (i.e., `ray.get(refs)`) which
- waits for all results to be ready, this function begins to yield result as soon as
- a batch of `chunk_size` results are ready.
- .. note::
- Generally there is no guarantee on the order of results. For example, the first result
- is not necessarily the first one completed, but rather the first one submitted in the
- first available batch (See :meth:`~ray.wait` for more details about
- preservation of submission order).
- .. note::
- Use this function instead of calling :meth:`~ray.get` inside a for loop. See
- https://docs.ray.io/en/latest/ray-core/patterns/ray-get-loop.html for more details.
- Example:
- Suppose we have a function that sleeps for x seconds depending on the input.
- We expect to obtain a partially sorted list of results.
- .. testcode:: python
- import ray
- import time
- @ray.remote
- def f(x):
- time.sleep(x)
- return x
- refs = [f.remote(i) for i in [10, 4, 6, 8, 2]]
- for x in ray.util.as_completed(refs, chunk_size=2):
- print(x)
- .. testoutput::
- :options: +MOCK
- # Output:
- 4
- 2
- 6
- 8
- 10
- Args:
- refs: A list of Ray object refs.
- chunk_size: The number of tasks to wait for in each iteration (default 10).
- The parameter is passed as `num_returns` to :meth:`~ray.wait` internally.
- yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
- **kwargs: Additional keyword arguments to pass to :meth:`~ray.wait`, e.g.,
- `timeout` and `fetch_local`.
- Yields:
- Union[Any, ObjectRef]: The results (or optionally their Ray references) of the Ray tasks as they complete.
- """
- if chunk_size < 1:
- raise ValueError("`chunk_size` must be >= 1")
- if "num_returns" in kwargs:
- raise ValueError("Use the `chunksize` argument instead of `num_returns`.")
- while refs:
- results, refs = _wait_and_get_single_batch(
- refs,
- chunk_size=chunk_size,
- yield_obj_refs=yield_obj_refs,
- **kwargs,
- )
- yield from results
- @PublicAPI(stability="alpha")
- def map_unordered(
- fn: "RemoteFunction",
- items: Iterable[Any],
- *,
- backpressure_size: Optional[int] = DEFAULT_BACKPRESSURE_SIZE,
- chunk_size: int = DEFAULT_CHUNK_SIZE,
- yield_obj_refs: bool = False,
- **kwargs,
- ) -> Iterator[Union[Any, "ObjectRef"]]:
- """Apply a Ray remote function to a list of items and return an iterator that yields
- the completed results as they become available.
- This helper function applies backpressure to control the number of pending tasks, following the
- design pattern described in
- https://docs.ray.io/en/latest/ray-core/patterns/limit-pending-tasks.html.
- .. note::
- There is generally no guarantee on the order of results.
- Example:
- Suppose we have a function that sleeps for x seconds depending on the input.
- We expect to obtain a partially sorted list of results.
- .. testcode:: python
- import ray
- import time
- @ray.remote
- def f(x):
- time.sleep(x)
- return x
- # Example 1: chunk_size=2
- for x in ray.util.map_unordered(f, [10, 4, 6, 8, 2], chunk_size=2):
- print(x)
- .. testoutput::
- :options: +MOCK
- 4
- 2
- 6
- 8
- 10
- .. testcode:: python
- # Example 2: backpressure_size=2, chunk_size=1
- for x in ray.util.map_unordered(f, [10, 4, 6, 8, 2], backpressure_size=2, chunk_size=1):
- print(x)
- .. testoutput::
- :options: +MOCK
- 4
- 10
- 6
- 8
- 2
- Args:
- fn: A remote function to apply to the list of items. For more complex use cases, use Ray Data's
- :meth:`~ray.data.Dataset.map` / :meth:`~ray.data.Dataset.map_batches` instead.
- items: An iterable of items to apply the function to.
- backpressure_size: Maximum number of in-flight tasks allowed before
- calling a blocking :meth:`~ray.wait` (default 100). If None, no backpressure is applied.
- chunk_size: The number of tasks to wait for when the number of in-flight tasks exceeds
- `backpressure_size`. The parameter is passed as `num_returns` to :meth:`~ray.wait` internally.
- yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
- **kwargs: Additional keyword arguments to pass to :meth:`~ray.wait`, e.g.,
- `timeout` and `fetch_local`.
- Yields:
- Union[Any, ObjectRef]: The results (or optionally their Ray references) of the Ray tasks as they complete.
- .. seealso::
- :meth:`~ray.util.as_completed`
- Call this method for an existing list of Ray object refs.
- :meth:`~ray.data.Dataset.map`
- Use Ray Data APIs (e.g., :meth:`~ray.data.Dataset.map` and :meth:`~ray.data.Dataset.map_batches`)
- for better control and complex use cases, e.g., functions with multiple arguments.
- .. note::
- This is an altenative to `pool.imap_unordered()` in Ray's Actor-based `multiprocessing.Pool`.
- See https://docs.ray.io/en/latest/ray-more-libs/multiprocessing.html for more details.
- """
- if backpressure_size is None:
- backpressure_size: float = float("inf")
- elif backpressure_size <= 0:
- raise ValueError("backpressure_size must be positive.")
- if chunk_size < 1:
- raise ValueError("`chunk_size` must be >= 1")
- if "num_returns" in kwargs:
- raise ValueError("Use the `chunk_size` argument instead of `num_returns`.")
- refs = []
- for item in items:
- refs.append(fn.remote(item))
- if len(refs) >= backpressure_size:
- results, refs = _wait_and_get_single_batch(
- refs,
- chunk_size=chunk_size,
- yield_obj_refs=yield_obj_refs,
- **kwargs,
- )
- yield from results
- else:
- yield from as_completed(
- refs,
- chunk_size=chunk_size,
- yield_obj_refs=yield_obj_refs,
- **kwargs,
- )
|