| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- from __future__ import annotations
- import functools
- import sys
- from collections.abc import Generator, Iterable, Iterator
- from typing import TYPE_CHECKING, Callable, Literal, TypeVar
- from pip._vendor.rich.progress import (
- BarColumn,
- DownloadColumn,
- FileSizeColumn,
- MofNCompleteColumn,
- Progress,
- ProgressColumn,
- SpinnerColumn,
- TextColumn,
- TimeElapsedColumn,
- TimeRemainingColumn,
- TransferSpeedColumn,
- )
- from pip._internal.cli.spinners import RateLimiter
- from pip._internal.utils.logging import get_console, get_indentation
- if TYPE_CHECKING:
- from pip._internal.req.req_install import InstallRequirement
- T = TypeVar("T")
- ProgressRenderer = Callable[[Iterable[T]], Iterator[T]]
- BarType = Literal["on", "off", "raw"]
- def _rich_download_progress_bar(
- iterable: Iterable[bytes],
- *,
- bar_type: BarType,
- size: int | None,
- initial_progress: int | None = None,
- ) -> Generator[bytes, None, None]:
- assert bar_type == "on", "This should only be used in the default mode."
- if not size:
- total = float("inf")
- columns: tuple[ProgressColumn, ...] = (
- TextColumn("[progress.description]{task.description}"),
- SpinnerColumn("line", speed=1.5),
- FileSizeColumn(),
- TransferSpeedColumn(),
- TimeElapsedColumn(),
- )
- else:
- total = size
- columns = (
- TextColumn("[progress.description]{task.description}"),
- BarColumn(),
- DownloadColumn(),
- TransferSpeedColumn(),
- TextColumn("{task.fields[time_description]}"),
- TimeRemainingColumn(elapsed_when_finished=True),
- )
- progress = Progress(*columns, refresh_per_second=5)
- task_id = progress.add_task(
- " " * (get_indentation() + 2), total=total, time_description="eta"
- )
- if initial_progress is not None:
- progress.update(task_id, advance=initial_progress)
- with progress:
- for chunk in iterable:
- yield chunk
- progress.update(task_id, advance=len(chunk))
- progress.update(task_id, time_description="")
- def _rich_install_progress_bar(
- iterable: Iterable[InstallRequirement], *, total: int
- ) -> Iterator[InstallRequirement]:
- columns = (
- TextColumn("{task.fields[indent]}"),
- BarColumn(),
- MofNCompleteColumn(),
- TextColumn("{task.description}"),
- )
- console = get_console()
- bar = Progress(*columns, refresh_per_second=6, console=console, transient=True)
- # Hiding the progress bar at initialization forces a refresh cycle to occur
- # until the bar appears, avoiding very short flashes.
- task = bar.add_task("", total=total, indent=" " * get_indentation(), visible=False)
- with bar:
- for req in iterable:
- bar.update(task, description=rf"\[{req.name}]", visible=True)
- yield req
- bar.advance(task)
- def _raw_progress_bar(
- iterable: Iterable[bytes],
- *,
- size: int | None,
- initial_progress: int | None = None,
- ) -> Generator[bytes, None, None]:
- def write_progress(current: int, total: int) -> None:
- sys.stdout.write(f"Progress {current} of {total}\n")
- sys.stdout.flush()
- current = initial_progress or 0
- total = size or 0
- rate_limiter = RateLimiter(0.25)
- write_progress(current, total)
- for chunk in iterable:
- current += len(chunk)
- if rate_limiter.ready() or current == total:
- write_progress(current, total)
- rate_limiter.reset()
- yield chunk
- def get_download_progress_renderer(
- *, bar_type: BarType, size: int | None = None, initial_progress: int | None = None
- ) -> ProgressRenderer[bytes]:
- """Get an object that can be used to render the download progress.
- Returns a callable, that takes an iterable to "wrap".
- """
- if bar_type == "on":
- return functools.partial(
- _rich_download_progress_bar,
- bar_type=bar_type,
- size=size,
- initial_progress=initial_progress,
- )
- elif bar_type == "raw":
- return functools.partial(
- _raw_progress_bar,
- size=size,
- initial_progress=initial_progress,
- )
- else:
- return iter # no-op, when passed an iterator
- def get_install_progress_renderer(
- *, bar_type: BarType, total: int
- ) -> ProgressRenderer[InstallRequirement]:
- """Get an object that can be used to render the install progress.
- Returns a callable, that takes an iterable to "wrap".
- """
- if bar_type == "on":
- return functools.partial(_rich_install_progress_bar, total=total)
- else:
- return iter
|