| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- from typing import Any, Callable, List, Optional, Tuple
- class Barrier:
- """Barrier to collect results and process them in bulk.
- A barrier can be used to collect multiple results and process them in bulk once
- a certain count or a timeout is reached.
- For instance, if ``max_results=N``, the ``on_completion`` callback will be
- invoked once :meth:`arrive` has been called ``N`` times.
- The completion callback will only be invoked once, even if more results
- arrive after completion. The collected results can be resetted
- with :meth:`reset`, after which the callback may be invoked again.
- The completion callback should expect one argument, which is the barrier
- object that completed.
- Args:
- max_results: Maximum number of results to collect before a call to
- :meth:`wait` resolves or the :meth:`on_completion` callback is invoked.
- on_completion: Callback to invoke when ``max_results`` results
- arrived at the barrier.
- """
- def __init__(
- self,
- max_results: int,
- *,
- on_completion: Optional[Callable[["Barrier"], None]] = None,
- ):
- self._max_results = max_results
- # on_completion callback
- self._completed = False
- self._on_completion = on_completion
- # Collect received results
- self._results: List[Tuple[Any]] = []
- def arrive(self, *data):
- """Notify barrier that a result successfully arrived.
- This will count against the ``max_results`` limit. The received result
- will be included in a call to :meth:`get_results`.
- Args:
- *data: Result data to be cached. Can be obtained via :meth:`get_results`.
- """
- if len(data) == 1:
- data = data[0]
- self._results.append(data)
- self._check_completion()
- def _check_completion(self):
- if self._completed:
- # Already fired completion callback
- return
- if self.num_results >= self._max_results:
- # Barrier is complete
- self._completed = True
- if self._on_completion:
- self._on_completion(self)
- @property
- def completed(self) -> bool:
- """Returns True if the barrier is completed."""
- return self._completed
- @property
- def num_results(self) -> int:
- """Number of received (successful) results."""
- return len(self._results)
- def get_results(self) -> List[Tuple[Any]]:
- """Return list of received results."""
- return self._results
- def reset(self) -> None:
- """Reset barrier, removing all received results.
- Resetting the barrier will reset the completion status. When ``max_results``
- is set and enough new events arrive after resetting, the
- :meth:`on_completion` callback will be invoked again.
- """
- self._completed = False
- self._results = []
|