| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- """Internal helpers for Hugging Face marketplace skill installation and upgrades."""
- import base64
- import io
- import json
- import shutil
- import tarfile
- import tempfile
- from dataclasses import dataclass, replace
- from pathlib import Path, PurePosixPath
- from typing import Any, Literal
- from huggingface_hub.errors import CLIError
- from huggingface_hub.utils import get_session
- DEFAULT_SKILLS_REPO_ID = "huggingface/skills"
- DEFAULT_SKILLS_REPO_OWNER, DEFAULT_SKILLS_REPO_NAME = DEFAULT_SKILLS_REPO_ID.split("/")
- DEFAULT_SKILLS_REF = "main"
- MARKETPLACE_PATH = ".claude-plugin/marketplace.json"
- GITHUB_API_TIMEOUT = 10
- SKILL_MANIFEST_FILENAME = ".hf-skill-manifest.json"
- SKILL_MANIFEST_SCHEMA_VERSION = 1
- SkillUpdateStatus = Literal[
- "up_to_date",
- "update_available",
- "updated",
- "unmanaged",
- "invalid_metadata",
- "source_unreachable",
- ]
- @dataclass(frozen=True)
- class MarketplaceSkill:
- name: str
- repo_path: str
- @dataclass(frozen=True)
- class InstalledSkillManifest:
- schema_version: int
- installed_revision: str
- @dataclass(frozen=True)
- class SkillUpdateInfo:
- name: str
- skill_dir: Path
- status: SkillUpdateStatus
- detail: str | None = None
- current_revision: str | None = None
- available_revision: str | None = None
- def load_marketplace_skills() -> list[MarketplaceSkill]:
- """Load skills from the default Hugging Face marketplace."""
- payload = _load_marketplace_payload()
- plugins = payload.get("plugins")
- if not isinstance(plugins, list):
- raise CLIError("Invalid marketplace payload: expected a top-level 'plugins' list.")
- skills: list[MarketplaceSkill] = []
- for plugin in plugins:
- if not isinstance(plugin, dict):
- continue
- name = plugin.get("name")
- source = plugin.get("source")
- if not isinstance(name, str) or not isinstance(source, str):
- continue
- skills.append(MarketplaceSkill(name=name, repo_path=_normalize_repo_path(source)))
- return skills
- def get_marketplace_skill(selector: str) -> MarketplaceSkill:
- """Resolve a marketplace skill by name."""
- selected = _select_marketplace_skill(load_marketplace_skills(), selector)
- if selected is None:
- raise CLIError(
- f"Skill '{selector}' not found in {DEFAULT_SKILLS_REPO_ID}. "
- "Try `hf skills add` to install `hf-cli` or use a known skill name."
- )
- return selected
- def install_marketplace_skill(skill: MarketplaceSkill, destination_root: Path, force: bool = False) -> Path:
- """Install a marketplace skill into a local skills directory."""
- destination_root = destination_root.expanduser().resolve()
- destination_root.mkdir(parents=True, exist_ok=True)
- install_dir = destination_root / skill.name
- if install_dir.exists() and not force:
- raise FileExistsError(f"Skill already exists: {install_dir}")
- if install_dir.exists():
- with tempfile.TemporaryDirectory(dir=destination_root, prefix=f".{install_dir.name}.install-") as tmp_dir_str:
- tmp_dir = Path(tmp_dir_str)
- staged_dir = tmp_dir / install_dir.name
- _populate_install_dir(skill=skill, install_dir=staged_dir)
- _atomic_replace_directory(existing_dir=install_dir, staged_dir=staged_dir)
- return install_dir
- try:
- _populate_install_dir(skill=skill, install_dir=install_dir)
- except Exception:
- if install_dir.exists():
- shutil.rmtree(install_dir)
- raise
- return install_dir
- def check_for_updates(
- roots: list[Path],
- selector: str | None = None,
- ) -> list[SkillUpdateInfo]:
- """Check managed skill installs for newer upstream revisions."""
- marketplace_skills = {skill.name.lower(): skill for skill in load_marketplace_skills()}
- updates = [_evaluate_update(skill_dir, marketplace_skills) for skill_dir in _iter_unique_skill_dirs(roots)]
- filtered = _filter_updates(updates, selector)
- if selector is not None and not filtered:
- raise CLIError(f"No installed skills match '{selector}'.")
- return filtered
- def apply_updates(
- roots: list[Path],
- selector: str | None = None,
- ) -> list[SkillUpdateInfo]:
- """Upgrade managed skills in place when the upstream revision changes."""
- updates = check_for_updates(roots, selector)
- results: list[SkillUpdateInfo] = []
- for update in updates:
- results.append(_apply_single_update(update))
- return results
- def read_installed_skill_manifest(skill_dir: Path) -> tuple[InstalledSkillManifest | None, str | None]:
- """Read local skill metadata written by `hf skills add`."""
- manifest_path = skill_dir / SKILL_MANIFEST_FILENAME
- if not manifest_path.exists():
- return None, None
- try:
- payload = json.loads(manifest_path.read_text(encoding="utf-8"))
- except Exception as exc: # noqa: BLE001
- return None, f"invalid json: {exc}"
- if not isinstance(payload, dict):
- return None, "metadata root must be an object"
- try:
- return _parse_installed_skill_manifest(payload), None
- except ValueError as exc:
- return None, str(exc)
- def write_installed_skill_manifest(skill_dir: Path, manifest: InstalledSkillManifest) -> None:
- payload = {
- "schema_version": manifest.schema_version,
- "installed_revision": manifest.installed_revision,
- }
- (skill_dir / SKILL_MANIFEST_FILENAME).write_text(
- json.dumps(payload, indent=2, sort_keys=True) + "\n",
- encoding="utf-8",
- )
- def _load_marketplace_payload() -> dict[str, Any]:
- response = _fetch_from_skills_repo(
- f"contents/{MARKETPLACE_PATH}",
- params={"ref": DEFAULT_SKILLS_REF},
- )
- try:
- payload = response.json()
- except Exception as exc: # noqa: BLE001
- raise CLIError(f"Failed to decode GitHub API response for 'contents/{MARKETPLACE_PATH}': {exc}") from exc
- if not isinstance(payload, dict):
- raise CLIError("Invalid marketplace response: expected a JSON object.")
- content = payload.get("content")
- encoding = payload.get("encoding")
- if not isinstance(content, str) or encoding != "base64":
- raise CLIError("Invalid marketplace payload: expected base64-encoded content.")
- try:
- decoded = base64.b64decode(content).decode("utf-8")
- parsed = json.loads(decoded)
- except Exception as exc: # noqa: BLE001
- raise CLIError(f"Failed to decode marketplace payload: {exc}") from exc
- if not isinstance(parsed, dict):
- raise CLIError("Invalid marketplace payload: expected a JSON object.")
- return parsed
- def _select_marketplace_skill(skills: list[MarketplaceSkill], selector: str) -> MarketplaceSkill | None:
- selector_lower = selector.strip().lower()
- for skill in skills:
- if skill.name.lower() == selector_lower:
- return skill
- return None
- def _normalize_repo_path(path: str) -> str:
- normalized = path.strip()
- while normalized.startswith("./"):
- normalized = normalized[2:]
- normalized = normalized.strip("/")
- if not normalized:
- raise CLIError("Invalid marketplace entry: empty source path.")
- return normalized
- def _populate_install_dir(skill: MarketplaceSkill, install_dir: Path) -> None:
- installed_revision = _resolve_available_revision(skill)
- install_dir.mkdir(parents=True, exist_ok=True)
- _extract_remote_github_path(
- revision=installed_revision,
- source_path=skill.repo_path,
- install_dir=install_dir,
- )
- _validate_installed_skill_dir(install_dir)
- write_installed_skill_manifest(
- install_dir,
- InstalledSkillManifest(
- schema_version=SKILL_MANIFEST_SCHEMA_VERSION,
- installed_revision=installed_revision,
- ),
- )
- def _validate_installed_skill_dir(skill_dir: Path) -> None:
- skill_file = skill_dir / "SKILL.md"
- if not skill_file.is_file():
- raise RuntimeError(f"Installed skill is missing SKILL.md: {skill_file}")
- def _extract_remote_github_path(revision: str, source_path: str, install_dir: Path) -> None:
- tar_bytes = _fetch_from_skills_repo(f"tarball/{revision}").content
- _extract_tar_subpath(tar_bytes, source_path=source_path, install_dir=install_dir)
- def _extract_tar_subpath(tar_bytes: bytes, source_path: str, install_dir: Path) -> None:
- """Extract a skill subdirectory from a tar archive.
- GitHub tarballs include a leading `<repo>-<revision>/` directory. The helper also
- accepts archives that start directly at `skills/<name>/...` to keep tests simple.
- """
- source_parts = PurePosixPath(source_path).parts
- with tarfile.open(fileobj=io.BytesIO(tar_bytes), mode="r:*") as archive:
- members = archive.getmembers()
- matched = False
- for member in members:
- relative_parts = _member_relative_parts(member_name=member.name, source_parts=source_parts)
- if relative_parts is None:
- continue
- if not relative_parts:
- matched = True
- continue
- matched = True
- relative_path = Path(*relative_parts)
- if ".." in relative_path.parts:
- raise RuntimeError(f"Invalid path found in archive for {source_path}.")
- destination_path = install_dir / relative_path
- if member.isdir():
- destination_path.mkdir(parents=True, exist_ok=True)
- continue
- if not member.isfile():
- continue
- destination_path.parent.mkdir(parents=True, exist_ok=True)
- extracted = archive.extractfile(member)
- if extracted is None:
- raise RuntimeError(f"Failed to extract {member.name}.")
- destination_path.write_bytes(extracted.read())
- if not matched:
- raise FileNotFoundError(f"Path '{source_path}' not found in source archive.")
- def _member_relative_parts(member_name: str, source_parts: tuple[str, ...]) -> tuple[str, ...] | None:
- path_parts = PurePosixPath(member_name).parts
- if tuple(path_parts[: len(source_parts)]) == source_parts:
- return path_parts[len(source_parts) :]
- if len(path_parts) > len(source_parts) and tuple(path_parts[1 : 1 + len(source_parts)]) == source_parts:
- return path_parts[1 + len(source_parts) :]
- return None
- def _atomic_replace_directory(existing_dir: Path, staged_dir: Path) -> None:
- backup_dir = staged_dir.parent / f"{existing_dir.name}.backup"
- try:
- existing_dir.rename(backup_dir)
- staged_dir.rename(existing_dir)
- shutil.rmtree(backup_dir)
- except Exception:
- if backup_dir.exists() and not existing_dir.exists():
- backup_dir.rename(existing_dir)
- raise
- def _iter_unique_skill_dirs(roots: list[Path]) -> list[Path]:
- seen: set[Path] = set()
- discovered: list[Path] = []
- for root in roots:
- root = root.expanduser().resolve()
- if not root.is_dir():
- continue
- for child in sorted(root.iterdir()):
- if child.name.startswith("."):
- continue
- if not child.is_dir() and not child.is_symlink():
- continue
- resolved = child.resolve()
- if resolved in seen or not resolved.is_dir():
- continue
- seen.add(resolved)
- discovered.append(resolved)
- return discovered
- def _evaluate_update(skill_dir: Path, marketplace_skills: dict[str, MarketplaceSkill]) -> SkillUpdateInfo:
- base = SkillUpdateInfo(name=skill_dir.name, skill_dir=skill_dir, status="unmanaged")
- manifest, error = read_installed_skill_manifest(skill_dir)
- if manifest is None:
- return replace(base, status="invalid_metadata" if error else "unmanaged", detail=error)
- skill = marketplace_skills.get(skill_dir.name.lower())
- if skill is None:
- return replace(
- base,
- status="source_unreachable",
- detail=f"Skill '{skill_dir.name}' is no longer available in {DEFAULT_SKILLS_REPO_ID}.",
- current_revision=manifest.installed_revision,
- )
- current_revision = manifest.installed_revision
- try:
- available_revision = _resolve_available_revision(skill)
- except Exception as exc:
- return replace(base, status="source_unreachable", detail=str(exc), current_revision=current_revision)
- status: SkillUpdateStatus = "up_to_date" if available_revision == current_revision else "update_available"
- return replace(
- base,
- status=status,
- detail="update available" if status == "update_available" else None,
- current_revision=current_revision,
- available_revision=available_revision,
- )
- def _apply_single_update(update: SkillUpdateInfo) -> SkillUpdateInfo:
- if update.status != "update_available":
- return update
- try:
- skill = get_marketplace_skill(update.skill_dir.name)
- install_marketplace_skill(skill, update.skill_dir.parent, force=True)
- except Exception as exc:
- return replace(update, status="source_unreachable", detail=str(exc))
- return replace(update, status="updated", detail="updated")
- def _filter_updates(updates: list[SkillUpdateInfo], selector: str | None) -> list[SkillUpdateInfo]:
- if selector is None:
- return updates
- selector_lower = selector.strip().lower()
- return [update for update in updates if update.name.lower() == selector_lower]
- def _resolve_available_revision(skill: MarketplaceSkill) -> str:
- response = _fetch_from_skills_repo(
- "commits",
- params={"sha": DEFAULT_SKILLS_REF, "path": skill.repo_path, "per_page": 1},
- )
- try:
- payload = response.json()
- except Exception as exc: # noqa: BLE001
- raise CLIError(f"Failed to decode GitHub API response for 'commits': {exc}") from exc
- if not isinstance(payload, list) or not payload:
- raise CLIError(f"Unable to resolve the current revision for skill '{skill.name}'.")
- latest = payload[0]
- if not isinstance(latest, dict):
- raise CLIError(f"Invalid commit response while resolving skill '{skill.name}'.")
- revision = latest.get("sha")
- if not isinstance(revision, str) or not revision:
- raise CLIError(f"Invalid commit response while resolving skill '{skill.name}'.")
- return revision
- def _parse_installed_skill_manifest(payload: dict[str, Any]) -> InstalledSkillManifest:
- if payload.get("schema_version") != SKILL_MANIFEST_SCHEMA_VERSION:
- raise ValueError(f"unsupported schema_version: {payload.get('schema_version')}")
- installed_revision = payload.get("installed_revision")
- if not isinstance(installed_revision, str) or not installed_revision:
- raise ValueError("missing installed_revision")
- return InstalledSkillManifest(
- schema_version=SKILL_MANIFEST_SCHEMA_VERSION,
- installed_revision=installed_revision,
- )
- def _fetch_from_skills_repo(endpoint: str, params: dict[str, Any] | None = None) -> Any:
- url = f"https://api.github.com/repos/{DEFAULT_SKILLS_REPO_OWNER}/{DEFAULT_SKILLS_REPO_NAME}/{endpoint.lstrip('/')}"
- try:
- response = get_session().get(
- url,
- params=params,
- headers={"Accept": "application/vnd.github+json"},
- follow_redirects=True,
- timeout=GITHUB_API_TIMEOUT,
- )
- response.raise_for_status()
- except Exception as exc: # noqa: BLE001
- raise CLIError(f"Failed to fetch '{endpoint}' from {DEFAULT_SKILLS_REPO_ID}: {exc}") from exc
- return response
|