progress_bars.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. from __future__ import annotations
  2. import functools
  3. import sys
  4. from collections.abc import Generator, Iterable, Iterator
  5. from typing import TYPE_CHECKING, Callable, Literal, TypeVar
  6. from pip._vendor.rich.progress import (
  7. BarColumn,
  8. DownloadColumn,
  9. FileSizeColumn,
  10. MofNCompleteColumn,
  11. Progress,
  12. ProgressColumn,
  13. SpinnerColumn,
  14. TextColumn,
  15. TimeElapsedColumn,
  16. TimeRemainingColumn,
  17. TransferSpeedColumn,
  18. )
  19. from pip._internal.cli.spinners import RateLimiter
  20. from pip._internal.utils.logging import get_console, get_indentation
  21. if TYPE_CHECKING:
  22. from pip._internal.req.req_install import InstallRequirement
  23. T = TypeVar("T")
  24. ProgressRenderer = Callable[[Iterable[T]], Iterator[T]]
  25. BarType = Literal["on", "off", "raw"]
  26. def _rich_download_progress_bar(
  27. iterable: Iterable[bytes],
  28. *,
  29. bar_type: BarType,
  30. size: int | None,
  31. initial_progress: int | None = None,
  32. ) -> Generator[bytes, None, None]:
  33. assert bar_type == "on", "This should only be used in the default mode."
  34. if not size:
  35. total = float("inf")
  36. columns: tuple[ProgressColumn, ...] = (
  37. TextColumn("[progress.description]{task.description}"),
  38. SpinnerColumn("line", speed=1.5),
  39. FileSizeColumn(),
  40. TransferSpeedColumn(),
  41. TimeElapsedColumn(),
  42. )
  43. else:
  44. total = size
  45. columns = (
  46. TextColumn("[progress.description]{task.description}"),
  47. BarColumn(),
  48. DownloadColumn(),
  49. TransferSpeedColumn(),
  50. TextColumn("{task.fields[time_description]}"),
  51. TimeRemainingColumn(elapsed_when_finished=True),
  52. )
  53. progress = Progress(*columns, refresh_per_second=5)
  54. task_id = progress.add_task(
  55. " " * (get_indentation() + 2), total=total, time_description="eta"
  56. )
  57. if initial_progress is not None:
  58. progress.update(task_id, advance=initial_progress)
  59. with progress:
  60. for chunk in iterable:
  61. yield chunk
  62. progress.update(task_id, advance=len(chunk))
  63. progress.update(task_id, time_description="")
  64. def _rich_install_progress_bar(
  65. iterable: Iterable[InstallRequirement], *, total: int
  66. ) -> Iterator[InstallRequirement]:
  67. columns = (
  68. TextColumn("{task.fields[indent]}"),
  69. BarColumn(),
  70. MofNCompleteColumn(),
  71. TextColumn("{task.description}"),
  72. )
  73. console = get_console()
  74. bar = Progress(*columns, refresh_per_second=6, console=console, transient=True)
  75. # Hiding the progress bar at initialization forces a refresh cycle to occur
  76. # until the bar appears, avoiding very short flashes.
  77. task = bar.add_task("", total=total, indent=" " * get_indentation(), visible=False)
  78. with bar:
  79. for req in iterable:
  80. bar.update(task, description=rf"\[{req.name}]", visible=True)
  81. yield req
  82. bar.advance(task)
  83. def _raw_progress_bar(
  84. iterable: Iterable[bytes],
  85. *,
  86. size: int | None,
  87. initial_progress: int | None = None,
  88. ) -> Generator[bytes, None, None]:
  89. def write_progress(current: int, total: int) -> None:
  90. sys.stdout.write(f"Progress {current} of {total}\n")
  91. sys.stdout.flush()
  92. current = initial_progress or 0
  93. total = size or 0
  94. rate_limiter = RateLimiter(0.25)
  95. write_progress(current, total)
  96. for chunk in iterable:
  97. current += len(chunk)
  98. if rate_limiter.ready() or current == total:
  99. write_progress(current, total)
  100. rate_limiter.reset()
  101. yield chunk
  102. def get_download_progress_renderer(
  103. *, bar_type: BarType, size: int | None = None, initial_progress: int | None = None
  104. ) -> ProgressRenderer[bytes]:
  105. """Get an object that can be used to render the download progress.
  106. Returns a callable, that takes an iterable to "wrap".
  107. """
  108. if bar_type == "on":
  109. return functools.partial(
  110. _rich_download_progress_bar,
  111. bar_type=bar_type,
  112. size=size,
  113. initial_progress=initial_progress,
  114. )
  115. elif bar_type == "raw":
  116. return functools.partial(
  117. _raw_progress_bar,
  118. size=size,
  119. initial_progress=initial_progress,
  120. )
  121. else:
  122. return iter # no-op, when passed an iterator
  123. def get_install_progress_renderer(
  124. *, bar_type: BarType, total: int
  125. ) -> ProgressRenderer[InstallRequirement]:
  126. """Get an object that can be used to render the install progress.
  127. Returns a callable, that takes an iterable to "wrap".
  128. """
  129. if bar_type == "on":
  130. return functools.partial(_rich_install_progress_bar, total=total)
  131. else:
  132. return iter