| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756 |
- # Copyright 2025-present, the HuggingFace Inc. team.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Contains the 'hf cache' command group with cache management subcommands."""
- import re
- import time
- from collections import defaultdict
- from collections.abc import Callable, Mapping
- from dataclasses import dataclass
- from enum import Enum
- from typing import Annotated, Any
- import typer
- from huggingface_hub.errors import CLIError
- from ..utils import ANSI, CachedRepoInfo, CachedRevisionInfo, CacheNotFound, HFCacheInfo, _format_size, scan_cache_dir
- from ..utils._parsing import parse_duration, parse_size
- from ._cli_utils import FormatWithAutoOpt, RepoIdArg, RepoTypeOpt, RevisionOpt, TokenOpt, get_hf_api, typer_factory
- from ._output import OutputFormatWithAuto, out
- cache_cli = typer_factory(help="Manage local cache directory.")
- #### Cache helper utilities
- @dataclass(frozen=True)
- class _DeletionResolution:
- revisions: frozenset[str]
- selected: dict[CachedRepoInfo, frozenset[CachedRevisionInfo]]
- missing: tuple[str, ...]
- _FILTER_PATTERN = re.compile(r"^(?P<key>[a-zA-Z_]+)\s*(?P<op>==|!=|>=|<=|>|<|=)\s*(?P<value>.+)$")
- _ALLOWED_OPERATORS = {"=", "!=", ">", "<", ">=", "<="}
- _FILTER_KEYS = {"accessed", "modified", "refs", "size", "type"}
- _SORT_KEYS = {"accessed", "modified", "name", "size"}
- _SORT_PATTERN = re.compile(r"^(?P<key>[a-zA-Z_]+)(?::(?P<order>asc|desc))?$")
- _SORT_DEFAULT_ORDER = {
- # Default ordering: accessed/modified/size are descending (newest/biggest first), name is ascending
- "accessed": "desc",
- "modified": "desc",
- "size": "desc",
- "name": "asc",
- }
- # Dynamically generate SortOptions enum from _SORT_KEYS
- _sort_options_dict = {}
- for key in sorted(_SORT_KEYS):
- _sort_options_dict[key] = key
- _sort_options_dict[f"{key}_asc"] = f"{key}:asc"
- _sort_options_dict[f"{key}_desc"] = f"{key}:desc"
- SortOptions = Enum("SortOptions", _sort_options_dict, type=str, module=__name__) # type: ignore
- @dataclass(frozen=True)
- class CacheDeletionCounts:
- """Simple counters summarizing cache deletions for CLI messaging."""
- repo_count: int
- partial_revision_count: int
- total_revision_count: int
- CacheEntry = tuple[CachedRepoInfo, CachedRevisionInfo | None]
- RepoRefsMap = dict[CachedRepoInfo, frozenset[str]]
- def summarize_deletions(
- selected_by_repo: Mapping[CachedRepoInfo, frozenset[CachedRevisionInfo]],
- ) -> CacheDeletionCounts:
- """Summarize deletions across repositories."""
- repo_count = 0
- total_revisions = 0
- revisions_in_full_repos = 0
- for repo, revisions in selected_by_repo.items():
- total_revisions += len(revisions)
- if len(revisions) == len(repo.revisions):
- repo_count += 1
- revisions_in_full_repos += len(revisions)
- partial_revision_count = total_revisions - revisions_in_full_repos
- return CacheDeletionCounts(repo_count, partial_revision_count, total_revisions)
- def print_cache_selected_revisions(selected_by_repo: Mapping[CachedRepoInfo, frozenset[CachedRevisionInfo]]) -> None:
- """Pretty-print selected cache revisions during confirmation prompts."""
- for repo in sorted(selected_by_repo.keys(), key=lambda repo: (repo.repo_type, repo.repo_id.lower())):
- repo_key = f"{repo.repo_type}/{repo.repo_id}"
- revisions = sorted(selected_by_repo[repo], key=lambda rev: rev.commit_hash)
- if len(revisions) == len(repo.revisions):
- out.text(f" - {repo_key} (entire repo)")
- continue
- out.text(f" - {repo_key}:")
- for revision in revisions:
- refs = " ".join(sorted(revision.refs)) or "(detached)"
- out.text(f" {revision.commit_hash} [{refs}] {revision.size_on_disk_str}")
- def build_cache_index(
- hf_cache_info: HFCacheInfo,
- ) -> tuple[
- dict[str, CachedRepoInfo],
- dict[str, tuple[CachedRepoInfo, CachedRevisionInfo]],
- ]:
- """Create lookup tables so CLI commands can resolve repo ids and revisions quickly."""
- repo_lookup: dict[str, CachedRepoInfo] = {}
- revision_lookup: dict[str, tuple[CachedRepoInfo, CachedRevisionInfo]] = {}
- for repo in hf_cache_info.repos:
- repo_key = repo.cache_id.lower()
- repo_lookup[repo_key] = repo
- for revision in repo.revisions:
- revision_lookup[revision.commit_hash.lower()] = (repo, revision)
- return repo_lookup, revision_lookup
- def collect_cache_entries(
- hf_cache_info: HFCacheInfo, *, include_revisions: bool
- ) -> tuple[list[CacheEntry], RepoRefsMap]:
- """Flatten cache metadata into rows consumed by `hf cache ls`."""
- entries: list[CacheEntry] = []
- repo_refs_map: RepoRefsMap = {}
- sorted_repos = sorted(hf_cache_info.repos, key=lambda repo: (repo.repo_type, repo.repo_id.lower()))
- for repo in sorted_repos:
- repo_refs_map[repo] = frozenset({ref for revision in repo.revisions for ref in revision.refs})
- if include_revisions:
- for revision in sorted(repo.revisions, key=lambda rev: rev.commit_hash):
- entries.append((repo, revision))
- else:
- entries.append((repo, None))
- if include_revisions:
- entries.sort(
- key=lambda entry: (
- entry[0].cache_id,
- entry[1].commit_hash if entry[1] is not None else "",
- )
- )
- else:
- entries.sort(key=lambda entry: entry[0].cache_id)
- return entries, repo_refs_map
- def compile_cache_filter(
- expr: str, repo_refs_map: RepoRefsMap
- ) -> Callable[[CachedRepoInfo, CachedRevisionInfo | None, float], bool]:
- """Convert a `hf cache ls` filter expression into the yes/no test we apply to each cache entry before displaying it."""
- match = _FILTER_PATTERN.match(expr.strip())
- if not match:
- raise ValueError(f"Invalid filter expression: '{expr}'.")
- key = match.group("key").lower()
- op = match.group("op")
- value_raw = match.group("value").strip()
- if op not in _ALLOWED_OPERATORS:
- raise ValueError(f"Unsupported operator '{op}' in filter '{expr}'. Must be one of {list(_ALLOWED_OPERATORS)}.")
- if key not in _FILTER_KEYS:
- raise ValueError(f"Unsupported filter key '{key}' in '{expr}'. Must be one of {list(_FILTER_KEYS)}.")
- # at this point we know that key is in `_FILTER_KEYS`
- if key == "size":
- size_threshold = parse_size(value_raw)
- return lambda repo, revision, _: _compare_numeric(
- revision.size_on_disk if revision is not None else repo.size_on_disk,
- op,
- size_threshold,
- )
- if key in {"modified", "accessed"}:
- seconds = parse_duration(value_raw.strip())
- def _time_filter(repo: CachedRepoInfo, revision: CachedRevisionInfo | None, now: float) -> bool:
- timestamp = (
- repo.last_accessed
- if key == "accessed"
- else revision.last_modified
- if revision is not None
- else repo.last_modified
- )
- if timestamp is None:
- return False
- return _compare_numeric(now - timestamp, op, seconds)
- return _time_filter
- if key == "type":
- expected = value_raw.lower()
- if op != "=":
- raise ValueError(f"Only '=' is supported for 'type' filters. Got '{op}'.")
- def _type_filter(repo: CachedRepoInfo, revision: CachedRevisionInfo | None, _: float) -> bool:
- return repo.repo_type.lower() == expected
- return _type_filter
- else: # key == "refs"
- if op != "=":
- raise ValueError(f"Only '=' is supported for 'refs' filters. Got {op}.")
- def _refs_filter(repo: CachedRepoInfo, revision: CachedRevisionInfo | None, _: float) -> bool:
- refs = revision.refs if revision is not None else repo_refs_map.get(repo, frozenset())
- return value_raw.lower() in [ref.lower() for ref in refs]
- return _refs_filter
- def _compare_numeric(left: float | None, op: str, right: float) -> bool:
- """Evaluate numeric comparisons for filters."""
- if left is None:
- return False
- comparisons = {
- "=": left == right,
- "!=": left != right,
- ">": left > right,
- "<": left < right,
- ">=": left >= right,
- "<=": left <= right,
- }
- if op not in comparisons:
- raise ValueError(f"Unsupported numeric comparison operator: {op}")
- return comparisons[op]
- def compile_cache_sort(sort_expr: str) -> tuple[Callable[[CacheEntry], tuple[Any, ...]], bool]:
- """Convert a `hf cache ls` sort expression into a key function for sorting entries.
- Returns:
- A tuple of (key_function, reverse_flag) where reverse_flag indicates whether
- to sort in descending order (True) or ascending order (False).
- """
- match = _SORT_PATTERN.match(sort_expr.strip().lower())
- if not match:
- raise ValueError(f"Invalid sort expression: '{sort_expr}'. Expected format: 'key' or 'key:asc' or 'key:desc'.")
- key = match.group("key").lower()
- explicit_order = match.group("order")
- if key not in _SORT_KEYS:
- raise ValueError(f"Unsupported sort key '{key}' in '{sort_expr}'. Must be one of {list(_SORT_KEYS)}.")
- # Use explicit order if provided, otherwise use default for the key
- order = explicit_order if explicit_order else _SORT_DEFAULT_ORDER[key]
- reverse = order == "desc"
- def _sort_key(entry: CacheEntry) -> tuple[Any, ...]:
- repo, revision = entry
- if key == "name":
- # Sort by cache_id (repo type/id)
- value: Any = repo.cache_id.lower()
- return (value,)
- if key == "size":
- # Use revision size if available, otherwise repo size
- value = revision.size_on_disk if revision is not None else repo.size_on_disk
- return (value,)
- if key == "accessed":
- # For revisions, accessed is not available per-revision, use repo's last_accessed
- # For repos, use repo's last_accessed
- value = repo.last_accessed if repo.last_accessed is not None else 0.0
- return (value,)
- if key == "modified":
- # Use revision's last_modified if available, otherwise repo's last_modified
- if revision is not None:
- value = revision.last_modified if revision.last_modified is not None else 0.0
- else:
- value = repo.last_modified if repo.last_modified is not None else 0.0
- return (value,)
- # Should never reach here due to validation above
- raise ValueError(f"Unsupported sort key: {key}")
- return _sort_key, reverse
- def _resolve_deletion_targets(hf_cache_info: HFCacheInfo, targets: list[str]) -> _DeletionResolution:
- """Resolve the deletion targets into a deletion resolution."""
- repo_lookup, revision_lookup = build_cache_index(hf_cache_info)
- selected: dict[CachedRepoInfo, set[CachedRevisionInfo]] = defaultdict(set)
- revisions: set[str] = set()
- missing: list[str] = []
- for raw_target in targets:
- target = raw_target.strip()
- if not target:
- continue
- lowered = target.lower()
- if re.fullmatch(r"[0-9a-fA-F]{40}", lowered):
- match = revision_lookup.get(lowered)
- if match is None:
- missing.append(raw_target)
- continue
- repo, revision = match
- selected[repo].add(revision)
- revisions.add(revision.commit_hash)
- continue
- matched_repo = repo_lookup.get(lowered)
- if matched_repo is None:
- missing.append(raw_target)
- continue
- for revision in matched_repo.revisions:
- selected[matched_repo].add(revision)
- revisions.add(revision.commit_hash)
- frozen_selected = {repo: frozenset(revs) for repo, revs in selected.items()}
- return _DeletionResolution(
- revisions=frozenset(revisions),
- selected=frozen_selected,
- missing=tuple(missing),
- )
- #### Cache CLI commands
- @cache_cli.command(
- "list | ls",
- examples=[
- "hf cache ls",
- "hf cache ls --revisions",
- 'hf cache ls --filter "size>1GB" --limit 20',
- "hf cache ls --format json",
- ],
- )
- def ls(
- cache_dir: Annotated[
- str | None,
- typer.Option(
- help="Cache directory to scan (defaults to Hugging Face cache).",
- ),
- ] = None,
- revisions: Annotated[
- bool,
- typer.Option(
- help="Include revisions in the output instead of aggregated repositories.",
- ),
- ] = False,
- filter: Annotated[
- list[str] | None,
- typer.Option(
- "-f",
- "--filter",
- help="Filter entries (e.g. 'size>1GB', 'type=model', 'accessed>7d'). Can be used multiple times.",
- ),
- ] = None,
- format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
- sort: Annotated[
- SortOptions | None,
- typer.Option(
- help="Sort entries by key. Supported keys: 'accessed', 'modified', 'name', 'size'. "
- "Append ':asc' or ':desc' to explicitly set the order (e.g., 'modified:asc'). "
- "Defaults: 'accessed', 'modified', 'size' default to 'desc' (newest/biggest first); "
- "'name' defaults to 'asc' (alphabetical).",
- ),
- ] = None,
- limit: Annotated[
- int | None,
- typer.Option(
- help="Limit the number of results returned. Returns only the top N entries after sorting.",
- ),
- ] = None,
- ) -> None:
- """List cached repositories or revisions."""
- try:
- hf_cache_info = scan_cache_dir(cache_dir)
- except CacheNotFound as exc:
- raise CLIError(f"Cache directory not found: {exc.cache_dir}") from exc
- filters = filter or []
- entries, repo_refs_map = collect_cache_entries(hf_cache_info, include_revisions=revisions)
- try:
- filter_fns = [compile_cache_filter(expr, repo_refs_map) for expr in filters]
- except ValueError as exc:
- raise typer.BadParameter(str(exc)) from exc
- now = time.time()
- for fn in filter_fns:
- entries = [entry for entry in entries if fn(entry[0], entry[1], now)]
- # Apply sorting if requested
- if sort:
- try:
- sort_key_fn, reverse = compile_cache_sort(sort.value)
- entries.sort(key=sort_key_fn, reverse=reverse)
- except ValueError as exc:
- raise typer.BadParameter(str(exc)) from exc
- # Apply limit if requested
- if limit is not None:
- if limit < 0:
- raise typer.BadParameter(f"Limit must be a positive integer, got {limit}.")
- entries = entries[:limit]
- if revisions:
- items = [
- {
- "id": repo.cache_id,
- "repo_id": repo.repo_id,
- "repo_type": repo.repo_type,
- "revision": revision.commit_hash,
- "snapshot_path": str(revision.snapshot_path),
- "size": revision.size_on_disk_str,
- "last_modified": revision.last_modified_str,
- "refs": sorted(revision.refs),
- }
- for repo, revision in entries
- if revision is not None
- ]
- out.table(
- items,
- headers=["id", "revision", "size", "last_modified", "refs"],
- id_key="revision",
- alignments={"size": "right"},
- )
- else:
- items = [
- {
- "id": repo.cache_id,
- "repo_id": repo.repo_id,
- "repo_type": repo.repo_type,
- "size": repo.size_on_disk_str,
- "last_accessed": repo.last_accessed_str or "",
- "last_modified": repo.last_modified_str,
- "refs": sorted(repo_refs_map.get(repo, frozenset())),
- }
- for repo, _ in entries
- ]
- out.table(
- items,
- headers=["id", "size", "last_accessed", "last_modified", "refs"],
- id_key="id",
- alignments={"size": "right"},
- )
- if entries:
- unique_repos = {repo for repo, _ in entries}
- repo_count = len(unique_repos)
- if revisions:
- revision_count = sum(1 for _, rev in entries if rev is not None)
- total_size = sum(rev.size_on_disk for _, rev in entries if rev is not None)
- else:
- revision_count = sum(len(repo.revisions) for repo in unique_repos)
- total_size = sum(repo.size_on_disk for repo in unique_repos)
- out.text(
- ANSI.bold(
- f"\nFound {repo_count} repo(s) for a total of {revision_count} revision(s)"
- f" and {_format_size(total_size)} on disk."
- )
- )
- @cache_cli.command(
- examples=[
- "hf cache rm model/gpt2",
- "hf cache rm <revision_hash>",
- "hf cache rm model/gpt2 --dry-run",
- "hf cache rm model/gpt2 --yes",
- ],
- )
- def rm(
- targets: Annotated[
- list[str],
- typer.Argument(
- help="One or more repo IDs (e.g. model/bert-base-uncased) or revision hashes to delete.",
- ),
- ],
- cache_dir: Annotated[
- str | None,
- typer.Option(
- help="Cache directory to scan (defaults to Hugging Face cache).",
- ),
- ] = None,
- yes: Annotated[
- bool,
- typer.Option(
- "-y",
- "--yes",
- help="Skip confirmation prompt.",
- ),
- ] = False,
- dry_run: Annotated[
- bool,
- typer.Option(
- help="Preview deletions without removing anything.",
- ),
- ] = False,
- format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
- ) -> None:
- """Remove cached repositories or revisions."""
- try:
- hf_cache_info = scan_cache_dir(cache_dir)
- except CacheNotFound as exc:
- raise CLIError(f"Cache directory not found: {exc.cache_dir}") from exc
- resolution = _resolve_deletion_targets(hf_cache_info, targets)
- if resolution.missing:
- details = "\n".join(f" - {entry}" for entry in resolution.missing)
- out.warning(f"Could not find in cache:\n{details}")
- if len(resolution.revisions) == 0:
- out.text("Nothing to delete.")
- raise typer.Exit(code=0)
- strategy = hf_cache_info.delete_revisions(*sorted(resolution.revisions))
- counts = summarize_deletions(resolution.selected)
- summary_parts: list[str] = []
- if counts.repo_count:
- summary_parts.append(f"{counts.repo_count} repo(s)")
- if counts.partial_revision_count:
- summary_parts.append(f"{counts.partial_revision_count} revision(s)")
- if not summary_parts:
- summary_parts.append(f"{counts.total_revision_count} revision(s)")
- summary_text = " and ".join(summary_parts)
- out.text(f"About to delete {summary_text} totalling {strategy.expected_freed_size_str}.")
- print_cache_selected_revisions(resolution.selected)
- if dry_run:
- out.result(
- "Dry run: no files were deleted.",
- dry_run=True,
- repos=counts.repo_count,
- revisions=counts.total_revision_count,
- size=strategy.expected_freed_size_str,
- )
- return
- out.confirm("Proceed with deletion?", yes=yes)
- strategy.execute()
- counts = summarize_deletions(resolution.selected)
- out.result(
- f"Deleted {counts.repo_count} repo(s) and {counts.total_revision_count} revision(s);"
- f" freed {strategy.expected_freed_size_str}.",
- repos_deleted=counts.repo_count,
- revisions_deleted=counts.total_revision_count,
- freed=strategy.expected_freed_size_str,
- )
- @cache_cli.command(examples=["hf cache prune", "hf cache prune --dry-run"])
- def prune(
- cache_dir: Annotated[
- str | None,
- typer.Option(
- help="Cache directory to scan (defaults to Hugging Face cache).",
- ),
- ] = None,
- yes: Annotated[
- bool,
- typer.Option(
- "-y",
- "--yes",
- help="Skip confirmation prompt.",
- ),
- ] = False,
- dry_run: Annotated[
- bool,
- typer.Option(
- help="Preview deletions without removing anything.",
- ),
- ] = False,
- format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
- ) -> None:
- """Remove detached revisions from the cache."""
- try:
- hf_cache_info = scan_cache_dir(cache_dir)
- except CacheNotFound as exc:
- raise CLIError(f"Cache directory not found: {exc.cache_dir}") from exc
- selected: dict[CachedRepoInfo, frozenset[CachedRevisionInfo]] = {}
- revisions: set[str] = set()
- for repo in hf_cache_info.repos:
- detached = frozenset(revision for revision in repo.revisions if len(revision.refs) == 0)
- if not detached:
- continue
- selected[repo] = detached
- revisions.update(revision.commit_hash for revision in detached)
- if len(revisions) == 0:
- out.text("No unreferenced revisions found. Nothing to prune.")
- return
- resolution = _DeletionResolution(
- revisions=frozenset(revisions),
- selected=selected,
- missing=(),
- )
- strategy = hf_cache_info.delete_revisions(*sorted(resolution.revisions))
- counts = summarize_deletions(selected)
- out.text(
- f"About to delete {counts.total_revision_count} unreferenced revision(s) ({strategy.expected_freed_size_str} total)."
- )
- print_cache_selected_revisions(selected)
- if dry_run:
- out.result(
- "Dry run: no files were deleted.",
- dry_run=True,
- revisions=counts.total_revision_count,
- size=strategy.expected_freed_size_str,
- )
- return
- out.confirm("Proceed?", yes=yes)
- strategy.execute()
- out.result(
- f"Deleted {counts.total_revision_count} unreferenced revision(s); freed {strategy.expected_freed_size_str}.",
- revisions_deleted=counts.total_revision_count,
- freed=strategy.expected_freed_size_str,
- )
- @cache_cli.command(
- examples=[
- "hf cache verify gpt2",
- "hf cache verify gpt2 --revision refs/pr/1",
- "hf cache verify my-dataset --repo-type dataset",
- ],
- )
- def verify(
- repo_id: RepoIdArg,
- repo_type: RepoTypeOpt = RepoTypeOpt.model,
- revision: RevisionOpt = None,
- cache_dir: Annotated[
- str | None,
- typer.Option(
- help="Cache directory to use when verifying files from cache (defaults to Hugging Face cache).",
- ),
- ] = None,
- local_dir: Annotated[
- str | None,
- typer.Option(
- help="If set, verify files under this directory instead of the cache.",
- ),
- ] = None,
- fail_on_missing_files: Annotated[
- bool,
- typer.Option(
- "--fail-on-missing-files",
- help="Fail if some files exist on the remote but are missing locally.",
- ),
- ] = False,
- fail_on_extra_files: Annotated[
- bool,
- typer.Option(
- "--fail-on-extra-files",
- help="Fail if some files exist locally but are not present on the remote revision.",
- ),
- ] = False,
- token: TokenOpt = None,
- format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
- ) -> None:
- """Verify checksums for a single repo revision from cache or a local directory.
- Examples:
- - Verify main revision in cache: `hf cache verify gpt2`
- - Verify specific revision: `hf cache verify gpt2 --revision refs/pr/1`
- - Verify dataset: `hf cache verify karpathy/fineweb-edu-100b-shuffle --repo-type dataset`
- - Verify local dir: `hf cache verify deepseek-ai/DeepSeek-OCR --local-dir /path/to/repo`
- """
- if local_dir is not None and cache_dir is not None:
- out.error("Cannot pass both --local-dir and --cache-dir. Use one or the other.")
- raise typer.Exit(code=2)
- api = get_hf_api(token=token)
- result = api.verify_repo_checksums(
- repo_id=repo_id,
- repo_type=repo_type.value if hasattr(repo_type, "value") else str(repo_type),
- revision=revision,
- local_dir=local_dir,
- cache_dir=cache_dir,
- token=token,
- )
- exit_code = 0
- if result.mismatches:
- details = "\n".join(
- f" - {m['path']}: expected {m['expected']} ({m['algorithm']}), got {m['actual']}"
- for m in result.mismatches
- )
- out.text(f"❌ Checksum verification failed for the following file(s):\n{details}")
- exit_code = 1
- if result.missing_paths:
- if fail_on_missing_files:
- details = "\n".join(f" - {p}" for p in result.missing_paths)
- out.text(f"❌ Missing files (present remotely, absent locally):\n{details}")
- exit_code = 1
- else:
- out.warning(
- f"{len(result.missing_paths)} remote file(s) are missing locally. "
- "Use --fail-on-missing-files for details."
- )
- if result.extra_paths:
- if fail_on_extra_files:
- details = "\n".join(f" - {p}" for p in result.extra_paths)
- out.text(f"❌ Extra files (present locally, absent remotely):\n{details}")
- exit_code = 1
- else:
- out.warning(
- f"{len(result.extra_paths)} local file(s) do not exist on the remote repo. "
- "Use --fail-on-extra-files for details."
- )
- verified_location = result.verified_path
- if exit_code != 0:
- out.error(
- f"Verification failed for '{repo_id}' ({repo_type.value}) in {verified_location}.\n Revision: {result.revision}"
- )
- raise typer.Exit(code=exit_code)
- out.result(
- f"Verified {result.checked_count} file(s) for {repo_type.value} '{repo_id}'. All checksums match.",
- repo_id=repo_id,
- repo_type=repo_type.value,
- checked=result.checked_count,
- path=str(verified_location),
- )
|