lfs.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. """
  2. Implementation of a custom transfer agent for the transfer type "multipart" for
  3. git-lfs.
  4. Inspired by:
  5. github.com/cbartz/git-lfs-swift-transfer-agent/blob/master/git_lfs_swift_transfer.py
  6. Spec is: github.com/git-lfs/git-lfs/blob/master/docs/custom-transfers.md
  7. To launch debugger while developing:
  8. ``` [lfs "customtransfer.multipart"]
  9. path = /path/to/huggingface_hub/.venv/bin/python args = -m debugpy --listen 5678
  10. --wait-for-client
  11. /path/to/huggingface_hub/src/huggingface_hub/commands/huggingface_cli.py
  12. lfs-multipart-upload ```"""
  13. import json
  14. import os
  15. import subprocess
  16. import sys
  17. from typing import Annotated
  18. import typer
  19. from huggingface_hub.errors import CLIError
  20. from huggingface_hub.lfs import LFS_MULTIPART_UPLOAD_COMMAND
  21. from ..utils import get_session, hf_raise_for_status, logging
  22. from ..utils._lfs import SliceFileObj
  23. logger = logging.get_logger(__name__)
  24. def lfs_enable_largefiles(
  25. path: Annotated[
  26. str,
  27. typer.Argument(
  28. help="Local path to repository you want to configure.",
  29. ),
  30. ],
  31. ) -> None:
  32. """
  33. Configure your repository to enable upload of files > 5GB.
  34. This command sets up git-lfs to use the custom multipart transfer agent
  35. which enables efficient uploading of large files in chunks.
  36. """
  37. local_path = os.path.abspath(path)
  38. if not os.path.isdir(local_path):
  39. raise CLIError("This does not look like a valid git repo.")
  40. subprocess.run(
  41. "git config lfs.customtransfer.multipart.path hf".split(),
  42. check=True,
  43. cwd=local_path,
  44. )
  45. subprocess.run(
  46. f"git config lfs.customtransfer.multipart.args {LFS_MULTIPART_UPLOAD_COMMAND}".split(),
  47. check=True,
  48. cwd=local_path,
  49. )
  50. print("Local repo set up for largefiles")
  51. def write_msg(msg: dict):
  52. """Write out the message in Line delimited JSON."""
  53. msg_str = json.dumps(msg) + "\n"
  54. sys.stdout.write(msg_str)
  55. sys.stdout.flush()
  56. def read_msg() -> dict | None:
  57. """Read Line delimited JSON from stdin."""
  58. msg = json.loads(sys.stdin.readline().strip())
  59. if "terminate" in (msg.get("type"), msg.get("event")):
  60. # terminate message received
  61. return None
  62. if msg.get("event") not in ("download", "upload"):
  63. logger.critical("Received unexpected message")
  64. sys.exit(1)
  65. return msg
  66. def lfs_multipart_upload() -> None:
  67. """Internal git-lfs custom transfer agent for multipart uploads.
  68. This function implements the custom transfer protocol for git-lfs multipart uploads.
  69. Handles chunked uploads of large files to Hugging Face Hub.
  70. """
  71. # Immediately after invoking a custom transfer process, git-lfs
  72. # sends initiation data to the process over stdin.
  73. # This tells the process useful information about the configuration.
  74. init_msg = json.loads(sys.stdin.readline().strip())
  75. if not (init_msg.get("event") == "init" and init_msg.get("operation") == "upload"):
  76. write_msg({"error": {"code": 32, "message": "Wrong lfs init operation"}})
  77. sys.exit(1)
  78. # The transfer process should use the information it needs from the
  79. # initiation structure, and also perform any one-off setup tasks it
  80. # needs to do. It should then respond on stdout with a simple empty
  81. # confirmation structure, as follows:
  82. write_msg({})
  83. # After the initiation exchange, git-lfs will send any number of
  84. # transfer requests to the stdin of the transfer process, in a serial sequence.
  85. while True:
  86. msg = read_msg()
  87. if msg is None:
  88. # When all transfers have been processed, git-lfs will send
  89. # a terminate event to the stdin of the transfer process.
  90. # On receiving this message the transfer process should
  91. # clean up and terminate. No response is expected.
  92. sys.exit(0)
  93. oid = msg["oid"]
  94. filepath = msg["path"]
  95. completion_url = msg["action"]["href"]
  96. header = msg["action"]["header"]
  97. chunk_size = int(header.pop("chunk_size"))
  98. presigned_urls: list[str] = list(header.values())
  99. # Send a "started" progress event to allow other workers to start.
  100. # Otherwise they're delayed until first "progress" event is reported,
  101. # i.e. after the first 5GB by default (!)
  102. write_msg(
  103. {
  104. "event": "progress",
  105. "oid": oid,
  106. "bytesSoFar": 1,
  107. "bytesSinceLast": 0,
  108. }
  109. )
  110. parts = []
  111. with open(filepath, "rb") as file:
  112. for i, presigned_url in enumerate(presigned_urls):
  113. with SliceFileObj(
  114. file,
  115. seek_from=i * chunk_size,
  116. read_limit=chunk_size,
  117. ) as data:
  118. r = get_session().put(presigned_url, data=data)
  119. hf_raise_for_status(r)
  120. parts.append(
  121. {
  122. "etag": r.headers.get("etag"),
  123. "partNumber": i + 1,
  124. }
  125. )
  126. # In order to support progress reporting while data is uploading / downloading,
  127. # the transfer process should post messages to stdout
  128. write_msg(
  129. {
  130. "event": "progress",
  131. "oid": oid,
  132. "bytesSoFar": (i + 1) * chunk_size,
  133. "bytesSinceLast": chunk_size,
  134. }
  135. )
  136. r = get_session().post(
  137. completion_url,
  138. json={
  139. "oid": oid,
  140. "parts": parts,
  141. },
  142. )
  143. hf_raise_for_status(r)
  144. write_msg({"event": "complete", "oid": oid})