barrier.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from typing import Any, Callable, List, Optional, Tuple
  2. class Barrier:
  3. """Barrier to collect results and process them in bulk.
  4. A barrier can be used to collect multiple results and process them in bulk once
  5. a certain count or a timeout is reached.
  6. For instance, if ``max_results=N``, the ``on_completion`` callback will be
  7. invoked once :meth:`arrive` has been called ``N`` times.
  8. The completion callback will only be invoked once, even if more results
  9. arrive after completion. The collected results can be resetted
  10. with :meth:`reset`, after which the callback may be invoked again.
  11. The completion callback should expect one argument, which is the barrier
  12. object that completed.
  13. Args:
  14. max_results: Maximum number of results to collect before a call to
  15. :meth:`wait` resolves or the :meth:`on_completion` callback is invoked.
  16. on_completion: Callback to invoke when ``max_results`` results
  17. arrived at the barrier.
  18. """
  19. def __init__(
  20. self,
  21. max_results: int,
  22. *,
  23. on_completion: Optional[Callable[["Barrier"], None]] = None,
  24. ):
  25. self._max_results = max_results
  26. # on_completion callback
  27. self._completed = False
  28. self._on_completion = on_completion
  29. # Collect received results
  30. self._results: List[Tuple[Any]] = []
  31. def arrive(self, *data):
  32. """Notify barrier that a result successfully arrived.
  33. This will count against the ``max_results`` limit. The received result
  34. will be included in a call to :meth:`get_results`.
  35. Args:
  36. *data: Result data to be cached. Can be obtained via :meth:`get_results`.
  37. """
  38. if len(data) == 1:
  39. data = data[0]
  40. self._results.append(data)
  41. self._check_completion()
  42. def _check_completion(self):
  43. if self._completed:
  44. # Already fired completion callback
  45. return
  46. if self.num_results >= self._max_results:
  47. # Barrier is complete
  48. self._completed = True
  49. if self._on_completion:
  50. self._on_completion(self)
  51. @property
  52. def completed(self) -> bool:
  53. """Returns True if the barrier is completed."""
  54. return self._completed
  55. @property
  56. def num_results(self) -> int:
  57. """Number of received (successful) results."""
  58. return len(self._results)
  59. def get_results(self) -> List[Tuple[Any]]:
  60. """Return list of received results."""
  61. return self._results
  62. def reset(self) -> None:
  63. """Reset barrier, removing all received results.
  64. Resetting the barrier will reset the completion status. When ``max_results``
  65. is set and enough new events arrive after resetting, the
  66. :meth:`on_completion` callback will be invoked again.
  67. """
  68. self._completed = False
  69. self._results = []