| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- """
- Implementation of a custom transfer agent for the transfer type "multipart" for
- git-lfs.
- Inspired by:
- github.com/cbartz/git-lfs-swift-transfer-agent/blob/master/git_lfs_swift_transfer.py
- Spec is: github.com/git-lfs/git-lfs/blob/master/docs/custom-transfers.md
- To launch debugger while developing:
- ``` [lfs "customtransfer.multipart"]
- path = /path/to/huggingface_hub/.venv/bin/python args = -m debugpy --listen 5678
- --wait-for-client
- /path/to/huggingface_hub/src/huggingface_hub/commands/huggingface_cli.py
- lfs-multipart-upload ```"""
- import json
- import os
- import subprocess
- import sys
- from typing import Annotated
- import typer
- from huggingface_hub.errors import CLIError
- from huggingface_hub.lfs import LFS_MULTIPART_UPLOAD_COMMAND
- from ..utils import get_session, hf_raise_for_status, logging
- from ..utils._lfs import SliceFileObj
- logger = logging.get_logger(__name__)
- def lfs_enable_largefiles(
- path: Annotated[
- str,
- typer.Argument(
- help="Local path to repository you want to configure.",
- ),
- ],
- ) -> None:
- """
- Configure your repository to enable upload of files > 5GB.
- This command sets up git-lfs to use the custom multipart transfer agent
- which enables efficient uploading of large files in chunks.
- """
- local_path = os.path.abspath(path)
- if not os.path.isdir(local_path):
- raise CLIError("This does not look like a valid git repo.")
- subprocess.run(
- "git config lfs.customtransfer.multipart.path hf".split(),
- check=True,
- cwd=local_path,
- )
- subprocess.run(
- f"git config lfs.customtransfer.multipart.args {LFS_MULTIPART_UPLOAD_COMMAND}".split(),
- check=True,
- cwd=local_path,
- )
- print("Local repo set up for largefiles")
- def write_msg(msg: dict):
- """Write out the message in Line delimited JSON."""
- msg_str = json.dumps(msg) + "\n"
- sys.stdout.write(msg_str)
- sys.stdout.flush()
- def read_msg() -> dict | None:
- """Read Line delimited JSON from stdin."""
- msg = json.loads(sys.stdin.readline().strip())
- if "terminate" in (msg.get("type"), msg.get("event")):
- # terminate message received
- return None
- if msg.get("event") not in ("download", "upload"):
- logger.critical("Received unexpected message")
- sys.exit(1)
- return msg
- def lfs_multipart_upload() -> None:
- """Internal git-lfs custom transfer agent for multipart uploads.
- This function implements the custom transfer protocol for git-lfs multipart uploads.
- Handles chunked uploads of large files to Hugging Face Hub.
- """
- # Immediately after invoking a custom transfer process, git-lfs
- # sends initiation data to the process over stdin.
- # This tells the process useful information about the configuration.
- init_msg = json.loads(sys.stdin.readline().strip())
- if not (init_msg.get("event") == "init" and init_msg.get("operation") == "upload"):
- write_msg({"error": {"code": 32, "message": "Wrong lfs init operation"}})
- sys.exit(1)
- # The transfer process should use the information it needs from the
- # initiation structure, and also perform any one-off setup tasks it
- # needs to do. It should then respond on stdout with a simple empty
- # confirmation structure, as follows:
- write_msg({})
- # After the initiation exchange, git-lfs will send any number of
- # transfer requests to the stdin of the transfer process, in a serial sequence.
- while True:
- msg = read_msg()
- if msg is None:
- # When all transfers have been processed, git-lfs will send
- # a terminate event to the stdin of the transfer process.
- # On receiving this message the transfer process should
- # clean up and terminate. No response is expected.
- sys.exit(0)
- oid = msg["oid"]
- filepath = msg["path"]
- completion_url = msg["action"]["href"]
- header = msg["action"]["header"]
- chunk_size = int(header.pop("chunk_size"))
- presigned_urls: list[str] = list(header.values())
- # Send a "started" progress event to allow other workers to start.
- # Otherwise they're delayed until first "progress" event is reported,
- # i.e. after the first 5GB by default (!)
- write_msg(
- {
- "event": "progress",
- "oid": oid,
- "bytesSoFar": 1,
- "bytesSinceLast": 0,
- }
- )
- parts = []
- with open(filepath, "rb") as file:
- for i, presigned_url in enumerate(presigned_urls):
- with SliceFileObj(
- file,
- seek_from=i * chunk_size,
- read_limit=chunk_size,
- ) as data:
- r = get_session().put(presigned_url, data=data)
- hf_raise_for_status(r)
- parts.append(
- {
- "etag": r.headers.get("etag"),
- "partNumber": i + 1,
- }
- )
- # In order to support progress reporting while data is uploading / downloading,
- # the transfer process should post messages to stdout
- write_msg(
- {
- "event": "progress",
- "oid": oid,
- "bytesSoFar": (i + 1) * chunk_size,
- "bytesSinceLast": chunk_size,
- }
- )
- r = get_session().post(
- completion_url,
- json={
- "oid": oid,
- "parts": parts,
- },
- )
- hf_raise_for_status(r)
- write_msg({"event": "complete", "oid": oid})
|