_commit_api.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976
  1. """
  2. Type definitions and utilities for the `create_commit` API
  3. """
  4. import base64
  5. import io
  6. import os
  7. import warnings
  8. from collections import defaultdict
  9. from collections.abc import Iterable, Iterator
  10. from contextlib import contextmanager
  11. from dataclasses import dataclass, field
  12. from itertools import groupby
  13. from pathlib import Path, PurePosixPath
  14. from typing import TYPE_CHECKING, Any, BinaryIO, Literal, Union
  15. from tqdm.contrib.concurrent import thread_map
  16. from . import constants
  17. from .errors import EntryNotFoundError, HfHubHTTPError, XetAuthorizationError, XetRefreshTokenError
  18. from .file_download import hf_hub_url
  19. from .lfs import UploadInfo, lfs_upload, post_lfs_batch_info
  20. from .utils import (
  21. FORBIDDEN_FOLDERS,
  22. XetTokenType,
  23. are_progress_bars_disabled,
  24. chunk_iterable,
  25. fetch_xet_connection_info_from_repo_info,
  26. get_session,
  27. hf_raise_for_status,
  28. http_backoff,
  29. logging,
  30. sha,
  31. tqdm_stream_file,
  32. validate_hf_hub_args,
  33. )
  34. from .utils import tqdm as hf_tqdm
  35. from .utils._runtime import is_xet_available
  36. if TYPE_CHECKING:
  37. from .hf_api import RepoFile
  38. logger = logging.get_logger(__name__)
  39. UploadMode = Literal["lfs", "regular"]
  40. # Max is 1,000 per request on the Hub for HfApi.get_paths_info
  41. # Otherwise we get:
  42. # HfHubHTTPError: 413 Client Error: Payload Too Large for url: https://huggingface.co/api/datasets/xxx (Request ID: xxx)\n\ntoo many parameters
  43. # See https://github.com/huggingface/huggingface_hub/issues/1503
  44. FETCH_LFS_BATCH_SIZE = 500
  45. UPLOAD_BATCH_MAX_NUM_FILES = 256
  46. @dataclass
  47. class CommitOperationDelete:
  48. """
  49. Data structure holding necessary info to delete a file or a folder from a repository
  50. on the Hub.
  51. Args:
  52. path_in_repo (`str`):
  53. Relative filepath in the repo, for example: `"checkpoints/1fec34a/weights.bin"`
  54. for a file or `"checkpoints/1fec34a/"` for a folder.
  55. is_folder (`bool` or `Literal["auto"]`, *optional*)
  56. Whether the Delete Operation applies to a folder or not. If "auto", the path
  57. type (file or folder) is guessed automatically by looking if path ends with
  58. a "/" (folder) or not (file). To explicitly set the path type, you can set
  59. `is_folder=True` or `is_folder=False`.
  60. """
  61. path_in_repo: str
  62. is_folder: bool | Literal["auto"] = "auto"
  63. def __post_init__(self):
  64. self.path_in_repo = _validate_path_in_repo(self.path_in_repo)
  65. if self.is_folder == "auto":
  66. self.is_folder = self.path_in_repo.endswith("/")
  67. if not isinstance(self.is_folder, bool):
  68. raise ValueError(
  69. f"Wrong value for `is_folder`. Must be one of [`True`, `False`, `'auto'`]. Got '{self.is_folder}'."
  70. )
  71. @dataclass
  72. class CommitOperationCopy:
  73. """
  74. Data structure holding necessary info to copy a file in a repository on the Hub.
  75. Limitations:
  76. - Only LFS files can be copied. To copy a regular file, you need to download it locally and re-upload it
  77. - Cross-repository copies are not supported.
  78. Note: you can combine a [`CommitOperationCopy`] and a [`CommitOperationDelete`] to rename an LFS file on the Hub.
  79. Args:
  80. src_path_in_repo (`str`):
  81. Relative filepath in the repo of the file to be copied, e.g. `"checkpoints/1fec34a/weights.bin"`.
  82. path_in_repo (`str`):
  83. Relative filepath in the repo where to copy the file, e.g. `"checkpoints/1fec34a/weights_copy.bin"`.
  84. src_revision (`str`, *optional*):
  85. The git revision of the file to be copied. Can be any valid git revision.
  86. Default to the target commit revision.
  87. """
  88. src_path_in_repo: str
  89. path_in_repo: str
  90. src_revision: str | None = None
  91. # set to the OID of the file to be copied if it has already been uploaded
  92. # useful to determine if a commit will be empty or not.
  93. _src_oid: str | None = None
  94. # set to the OID of the file to copy to if it has already been uploaded
  95. # useful to determine if a commit will be empty or not.
  96. _dest_oid: str | None = None
  97. def __post_init__(self):
  98. self.src_path_in_repo = _validate_path_in_repo(self.src_path_in_repo)
  99. self.path_in_repo = _validate_path_in_repo(self.path_in_repo)
  100. @dataclass
  101. class CommitOperationAdd:
  102. """
  103. Data structure holding necessary info to upload a file to a repository on the Hub.
  104. Args:
  105. path_in_repo (`str`):
  106. Relative filepath in the repo, for example: `"checkpoints/1fec34a/weights.bin"`
  107. path_or_fileobj (`str`, `Path`, `bytes`, or `BinaryIO`):
  108. Either:
  109. - a path to a local file (as `str` or `pathlib.Path`) to upload
  110. - a buffer of bytes (`bytes`) holding the content of the file to upload
  111. - a "file object" (subclass of `io.BufferedIOBase`), typically obtained
  112. with `open(path, "rb")`. It must support `seek()` and `tell()` methods.
  113. Raises:
  114. [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError)
  115. If `path_or_fileobj` is not one of `str`, `Path`, `bytes` or `io.BufferedIOBase`.
  116. [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError)
  117. If `path_or_fileobj` is a `str` or `Path` but not a path to an existing file.
  118. [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError)
  119. If `path_or_fileobj` is a `io.BufferedIOBase` but it doesn't support both
  120. `seek()` and `tell()`.
  121. """
  122. path_in_repo: str
  123. path_or_fileobj: str | Path | bytes | BinaryIO
  124. upload_info: UploadInfo = field(init=False, repr=False)
  125. # Internal attributes
  126. # set to "lfs" or "regular" once known
  127. _upload_mode: UploadMode | None = field(init=False, repr=False, default=None)
  128. # set to True if .gitignore rules prevent the file from being uploaded as LFS
  129. # (server-side check)
  130. _should_ignore: bool | None = field(init=False, repr=False, default=None)
  131. # set to the remote OID of the file if it has already been uploaded
  132. # useful to determine if a commit will be empty or not
  133. _remote_oid: str | None = field(init=False, repr=False, default=None)
  134. # set to True once the file has been uploaded as LFS
  135. _is_uploaded: bool = field(init=False, repr=False, default=False)
  136. # set to True once the file has been committed
  137. _is_committed: bool = field(init=False, repr=False, default=False)
  138. def __post_init__(self) -> None:
  139. """Validates `path_or_fileobj` and compute `upload_info`."""
  140. self.path_in_repo = _validate_path_in_repo(self.path_in_repo)
  141. # Validate `path_or_fileobj` value
  142. if isinstance(self.path_or_fileobj, Path):
  143. self.path_or_fileobj = str(self.path_or_fileobj)
  144. if isinstance(self.path_or_fileobj, str):
  145. path_or_fileobj = os.path.normpath(os.path.expanduser(self.path_or_fileobj))
  146. if not os.path.isfile(path_or_fileobj):
  147. raise ValueError(f"Provided path: '{path_or_fileobj}' is not a file on the local file system")
  148. elif not isinstance(self.path_or_fileobj, (io.BufferedIOBase, bytes)):
  149. # ^^ Inspired from: https://stackoverflow.com/questions/44584829/how-to-determine-if-file-is-opened-in-binary-or-text-mode
  150. raise ValueError(
  151. "path_or_fileobj must be either an instance of str, bytes or"
  152. " io.BufferedIOBase. If you passed a file-like object, make sure it is"
  153. " in binary mode."
  154. )
  155. if isinstance(self.path_or_fileobj, io.BufferedIOBase):
  156. try:
  157. self.path_or_fileobj.tell()
  158. self.path_or_fileobj.seek(0, os.SEEK_CUR)
  159. except (OSError, AttributeError) as exc:
  160. raise ValueError(
  161. "path_or_fileobj is a file-like object but does not implement seek() and tell()"
  162. ) from exc
  163. # Compute "upload_info" attribute
  164. if isinstance(self.path_or_fileobj, str):
  165. self.upload_info = UploadInfo.from_path(self.path_or_fileobj)
  166. elif isinstance(self.path_or_fileobj, bytes):
  167. self.upload_info = UploadInfo.from_bytes(self.path_or_fileobj)
  168. else:
  169. self.upload_info = UploadInfo.from_fileobj(self.path_or_fileobj)
  170. @contextmanager
  171. def as_file(self, with_tqdm: bool = False) -> Iterator[BinaryIO]:
  172. """
  173. A context manager that yields a file-like object allowing to read the underlying
  174. data behind `path_or_fileobj`.
  175. Args:
  176. with_tqdm (`bool`, *optional*, defaults to `False`):
  177. If True, iterating over the file object will display a progress bar. Only
  178. works if the file-like object is a path to a file. Pure bytes and buffers
  179. are not supported.
  180. Example:
  181. ```python
  182. >>> operation = CommitOperationAdd(
  183. ... path_in_repo="remote/dir/weights.h5",
  184. ... path_or_fileobj="./local/weights.h5",
  185. ... )
  186. CommitOperationAdd(path_in_repo='remote/dir/weights.h5', path_or_fileobj='./local/weights.h5')
  187. >>> with operation.as_file() as file:
  188. ... content = file.read()
  189. >>> with operation.as_file(with_tqdm=True) as file:
  190. ... while True:
  191. ... data = file.read(1024)
  192. ... if not data:
  193. ... break
  194. config.json: 100%|█████████████████████████| 8.19k/8.19k [00:02<00:00, 3.72kB/s]
  195. >>> with operation.as_file(with_tqdm=True) as file:
  196. ... httpx.put(..., data=file)
  197. config.json: 100%|█████████████████████████| 8.19k/8.19k [00:02<00:00, 3.72kB/s]
  198. ```
  199. """
  200. if isinstance(self.path_or_fileobj, str) or isinstance(self.path_or_fileobj, Path):
  201. if with_tqdm:
  202. with tqdm_stream_file(self.path_or_fileobj) as file:
  203. yield file
  204. else:
  205. with open(self.path_or_fileobj, "rb") as file:
  206. yield file
  207. elif isinstance(self.path_or_fileobj, bytes):
  208. yield io.BytesIO(self.path_or_fileobj)
  209. elif isinstance(self.path_or_fileobj, io.BufferedIOBase):
  210. prev_pos = self.path_or_fileobj.tell()
  211. yield self.path_or_fileobj
  212. self.path_or_fileobj.seek(prev_pos, io.SEEK_SET)
  213. def b64content(self) -> bytes:
  214. """
  215. The base64-encoded content of `path_or_fileobj`
  216. Returns: `bytes`
  217. """
  218. with self.as_file() as file:
  219. return base64.b64encode(file.read())
  220. @property
  221. def _local_oid(self) -> str | None:
  222. """Return the OID of the local file.
  223. This OID is then compared to `self._remote_oid` to check if the file has changed compared to the remote one.
  224. If the file did not change, we won't upload it again to prevent empty commits.
  225. For LFS files, the OID corresponds to the SHA256 of the file content (used a LFS ref).
  226. For regular files, the OID corresponds to the SHA1 of the file content.
  227. Note: this is slightly different to git OID computation since the oid of an LFS file is usually the git-SHA1 of the
  228. pointer file content (not the actual file content). However, using the SHA256 is enough to detect changes
  229. and more convenient client-side.
  230. """
  231. if self._upload_mode is None:
  232. return None
  233. elif self._upload_mode == "lfs":
  234. return self.upload_info.sha256.hex()
  235. else:
  236. # Regular file => compute sha1
  237. # => no need to read by chunk since the file is guaranteed to be <=5MB.
  238. with self.as_file() as file:
  239. return sha.git_hash(file.read())
  240. def _validate_path_in_repo(path_in_repo: str) -> str:
  241. # Validate `path_in_repo` value to prevent a server-side issue
  242. if path_in_repo.startswith("/"):
  243. path_in_repo = path_in_repo[1:]
  244. if path_in_repo == "." or path_in_repo == ".." or path_in_repo.startswith("../"):
  245. raise ValueError(f"Invalid `path_in_repo` in CommitOperation: '{path_in_repo}'")
  246. if path_in_repo.startswith("./"):
  247. path_in_repo = path_in_repo[2:]
  248. for forbidden in FORBIDDEN_FOLDERS:
  249. if any(part == forbidden for part in path_in_repo.split("/")):
  250. raise ValueError(
  251. f"Invalid `path_in_repo` in CommitOperation: cannot update files under a '{forbidden}/' folder (path:"
  252. f" '{path_in_repo}')."
  253. )
  254. return path_in_repo
  255. CommitOperation = Union[CommitOperationAdd, CommitOperationCopy, CommitOperationDelete]
  256. def _warn_on_overwriting_operations(operations: list[CommitOperation]) -> None:
  257. """
  258. Warn user when a list of operations is expected to overwrite itself in a single
  259. commit.
  260. Rules:
  261. - If a filepath is updated by multiple `CommitOperationAdd` operations, a warning
  262. message is triggered.
  263. - If a filepath is updated at least once by a `CommitOperationAdd` and then deleted
  264. by a `CommitOperationDelete`, a warning is triggered.
  265. - If a `CommitOperationDelete` deletes a filepath that is then updated by a
  266. `CommitOperationAdd`, no warning is triggered. This is usually useless (no need to
  267. delete before upload) but can happen if a user deletes an entire folder and then
  268. add new files to it.
  269. """
  270. nb_additions_per_path: dict[str, int] = defaultdict(int)
  271. for operation in operations:
  272. path_in_repo = operation.path_in_repo
  273. if isinstance(operation, CommitOperationAdd):
  274. if nb_additions_per_path[path_in_repo] > 0:
  275. warnings.warn(
  276. "About to update multiple times the same file in the same commit:"
  277. f" '{path_in_repo}'. This can cause undesired inconsistencies in"
  278. " your repo."
  279. )
  280. nb_additions_per_path[path_in_repo] += 1
  281. for parent in PurePosixPath(path_in_repo).parents:
  282. # Also keep track of number of updated files per folder
  283. # => warns if deleting a folder overwrite some contained files
  284. nb_additions_per_path[str(parent)] += 1
  285. if isinstance(operation, CommitOperationDelete):
  286. if nb_additions_per_path[str(PurePosixPath(path_in_repo))] > 0:
  287. if operation.is_folder:
  288. warnings.warn(
  289. "About to delete a folder containing files that have just been"
  290. f" updated within the same commit: '{path_in_repo}'. This can"
  291. " cause undesired inconsistencies in your repo."
  292. )
  293. else:
  294. warnings.warn(
  295. "About to delete a file that have just been updated within the"
  296. f" same commit: '{path_in_repo}'. This can cause undesired"
  297. " inconsistencies in your repo."
  298. )
  299. @validate_hf_hub_args
  300. def _upload_files(
  301. *,
  302. additions: list[CommitOperationAdd],
  303. repo_type: str,
  304. repo_id: str,
  305. headers: dict[str, str],
  306. endpoint: str | None = None,
  307. num_threads: int = 5,
  308. revision: str | None = None,
  309. create_pr: bool | None = None,
  310. ):
  311. """
  312. Negotiates per-file transfer (LFS vs Xet) and uploads in batches.
  313. """
  314. xet_additions: list[CommitOperationAdd] = []
  315. lfs_actions: list[dict[str, Any]] = []
  316. lfs_oid2addop: dict[str, CommitOperationAdd] = {}
  317. for chunk in chunk_iterable(additions, chunk_size=UPLOAD_BATCH_MAX_NUM_FILES):
  318. chunk_list = [op for op in chunk]
  319. transfers: list[str] = ["basic", "multipart"]
  320. has_buffered_io_data = any(isinstance(op.path_or_fileobj, io.BufferedIOBase) for op in chunk_list)
  321. if is_xet_available():
  322. if not has_buffered_io_data:
  323. transfers.append("xet")
  324. else:
  325. logger.warning(
  326. "Uploading files as a binary IO buffer is not supported by Xet Storage. "
  327. "Falling back to HTTP upload."
  328. )
  329. actions_chunk, errors_chunk, chosen_transfer = post_lfs_batch_info(
  330. upload_infos=[op.upload_info for op in chunk_list],
  331. repo_id=repo_id,
  332. repo_type=repo_type,
  333. revision=revision,
  334. endpoint=endpoint,
  335. headers=headers,
  336. token=None, # already passed in 'headers'
  337. transfers=transfers,
  338. )
  339. if errors_chunk:
  340. message = "\n".join(
  341. [
  342. f"Encountered error for file with OID {err.get('oid')}: `{err.get('error', {}).get('message')}"
  343. for err in errors_chunk
  344. ]
  345. )
  346. raise ValueError(f"LFS batch API returned errors:\n{message}")
  347. # If server returns a transfer we didn't offer (e.g "xet" while uploading from BytesIO),
  348. # fall back to LFS for this chunk.
  349. if chosen_transfer == "xet" and ("xet" in transfers):
  350. xet_additions.extend(chunk_list)
  351. else:
  352. lfs_actions.extend(actions_chunk)
  353. for op in chunk_list:
  354. lfs_oid2addop[op.upload_info.sha256.hex()] = op
  355. if len(lfs_actions) > 0:
  356. _upload_lfs_files(
  357. actions=lfs_actions,
  358. oid2addop=lfs_oid2addop,
  359. headers=headers,
  360. endpoint=endpoint,
  361. num_threads=num_threads,
  362. )
  363. if len(xet_additions) > 0:
  364. _upload_xet_files(
  365. additions=xet_additions,
  366. repo_type=repo_type,
  367. repo_id=repo_id,
  368. headers=headers,
  369. endpoint=endpoint,
  370. revision=revision,
  371. create_pr=create_pr,
  372. )
  373. @validate_hf_hub_args
  374. def _upload_lfs_files(
  375. *,
  376. actions: list[dict[str, Any]],
  377. oid2addop: dict[str, CommitOperationAdd],
  378. headers: dict[str, str],
  379. endpoint: str | None = None,
  380. num_threads: int = 5,
  381. ):
  382. """
  383. Uploads the content of `additions` to the Hub using the large file storage protocol.
  384. Relevant external documentation:
  385. - LFS Batch API: https://github.com/git-lfs/git-lfs/blob/main/docs/api/batch.md
  386. Args:
  387. actions (`list[dict[str, Any]]`):
  388. LFS batch actions returned by the server.
  389. oid2addop (`dict[str, CommitOperationAdd]`):
  390. A dictionary mapping the OID of the file to the corresponding `CommitOperationAdd` object.
  391. headers (`dict[str, str]`):
  392. Headers to use for the request, including authorization headers and user agent.
  393. endpoint (`str`, *optional*):
  394. The endpoint to use for the request. Defaults to `constants.ENDPOINT`.
  395. num_threads (`int`, *optional*):
  396. The number of concurrent threads to use when uploading. Defaults to 5.
  397. Raises:
  398. [`EnvironmentError`](https://docs.python.org/3/library/exceptions.html#EnvironmentError)
  399. If an upload failed for any reason
  400. [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError)
  401. Type of the repo to upload to: `"model"`, `"dataset"` or `"space"`.
  402. repo_id (`str`):
  403. A namespace (user or an organization) and a repo name separated
  404. by a `/`.
  405. headers (`dict[str, str]`):
  406. Headers to use for the request, including authorization headers and user agent.
  407. num_threads (`int`, *optional*):
  408. The number of concurrent threads to use when uploading. Defaults to 5.
  409. revision (`str`, *optional*):
  410. The git revision to upload to.
  411. Raises:
  412. [`EnvironmentError`](https://docs.python.org/3/library/exceptions.html#EnvironmentError)
  413. If an upload failed for any reason
  414. [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError)
  415. If the server returns malformed responses
  416. [`HfHubHTTPError`]
  417. If the LFS batch endpoint returned an HTTP error.
  418. """
  419. # Filter out files already present upstream
  420. filtered_actions = []
  421. for action in actions:
  422. if action.get("actions") is None:
  423. logger.debug(
  424. f"Content of file {oid2addop[action['oid']].path_in_repo} is already present upstream - skipping upload."
  425. )
  426. else:
  427. filtered_actions.append(action)
  428. # Upload according to server-provided actions
  429. def _wrapped_lfs_upload(batch_action) -> None:
  430. try:
  431. operation = oid2addop[batch_action["oid"]]
  432. lfs_upload(operation=operation, lfs_batch_action=batch_action, headers=headers, endpoint=endpoint)
  433. except Exception as exc:
  434. raise RuntimeError(f"Error while uploading '{operation.path_in_repo}' to the Hub.") from exc
  435. if len(filtered_actions) == 1:
  436. logger.debug("Uploading 1 LFS file to the Hub")
  437. _wrapped_lfs_upload(filtered_actions[0])
  438. else:
  439. logger.debug(
  440. f"Uploading {len(filtered_actions)} LFS files to the Hub using up to {num_threads} threads concurrently"
  441. )
  442. thread_map(
  443. _wrapped_lfs_upload,
  444. filtered_actions,
  445. desc=f"Upload {len(filtered_actions)} LFS files",
  446. max_workers=num_threads,
  447. tqdm_class=hf_tqdm,
  448. )
  449. @validate_hf_hub_args
  450. def _upload_xet_files(
  451. *,
  452. additions: list[CommitOperationAdd],
  453. repo_type: str,
  454. repo_id: str,
  455. headers: dict[str, str],
  456. endpoint: str | None = None,
  457. revision: str | None = None,
  458. create_pr: bool | None = None,
  459. ):
  460. """
  461. Uploads the content of `additions` to the Hub using the xet storage protocol.
  462. This chunks the files and deduplicates the chunks before uploading them to xetcas storage.
  463. Args:
  464. additions (`` of `CommitOperationAdd`):
  465. The files to be uploaded.
  466. repo_type (`str`):
  467. Type of the repo to upload to: `"model"`, `"dataset"` or `"space"`.
  468. repo_id (`str`):
  469. A namespace (user or an organization) and a repo name separated
  470. by a `/`.
  471. headers (`dict[str, str]`):
  472. Headers to use for the request, including authorization headers and user agent.
  473. endpoint: (`str`, *optional*):
  474. The endpoint to use for the xetcas service. Defaults to `constants.ENDPOINT`.
  475. revision (`str`, *optional*):
  476. The git revision to upload to.
  477. create_pr (`bool`, *optional*):
  478. Whether or not to create a Pull Request with that commit.
  479. Raises:
  480. [`EnvironmentError`](https://docs.python.org/3/library/exceptions.html#EnvironmentError)
  481. If an upload failed for any reason.
  482. [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError)
  483. If the server returns malformed responses or if the user is unauthorized to upload to xet storage.
  484. [`HfHubHTTPError`]
  485. If the LFS batch endpoint returned an HTTP error.
  486. **How it works:**
  487. The file download system uses Xet storage, which is a content-addressable storage system that breaks files into chunks
  488. for efficient storage and transfer.
  489. `hf_xet.upload_files` manages uploading files by:
  490. - Taking a list of file paths to upload
  491. - Breaking files into smaller chunks for efficient storage
  492. - Avoiding duplicate storage by recognizing identical chunks across files
  493. - Connecting to a storage server (CAS server) that manages these chunks
  494. The upload process works like this:
  495. 1. Create a local folder at ~/.cache/huggingface/xet/chunk-cache to store file chunks for reuse.
  496. 2. Process files in parallel (up to 8 files at once):
  497. 2.1. Read the file content.
  498. 2.2. Split the file content into smaller chunks based on content patterns: each chunk gets a unique ID based on what's in it.
  499. 2.3. For each chunk:
  500. - Check if it already exists in storage.
  501. - Skip uploading chunks that already exist.
  502. 2.4. Group chunks into larger blocks for efficient transfer.
  503. 2.5. Upload these blocks to the storage server.
  504. 2.6. Create and upload information about how the file is structured.
  505. 3. Return reference files that contain information about the uploaded files, which can be used later to download them.
  506. """
  507. if len(additions) == 0:
  508. return
  509. # at this point, we know that hf_xet is installed
  510. from hf_xet import upload_bytes, upload_files
  511. from .utils._xet_progress_reporting import XetProgressReporter
  512. try:
  513. xet_connection_info = fetch_xet_connection_info_from_repo_info(
  514. token_type=XetTokenType.WRITE,
  515. repo_id=repo_id,
  516. repo_type=repo_type,
  517. revision=revision,
  518. headers=headers,
  519. endpoint=endpoint,
  520. params={"create_pr": "1"} if create_pr else None,
  521. )
  522. except HfHubHTTPError as e:
  523. if e.response.status_code == 401:
  524. raise XetAuthorizationError(
  525. f"You are unauthorized to upload to xet storage for {repo_type}/{repo_id}. "
  526. f"Please check that you have configured your access token with write access to the repo."
  527. ) from e
  528. raise
  529. xet_endpoint = xet_connection_info.endpoint
  530. access_token_info = (xet_connection_info.access_token, xet_connection_info.expiration_unix_epoch)
  531. def token_refresher() -> tuple[str, int]:
  532. new_xet_connection = fetch_xet_connection_info_from_repo_info(
  533. token_type=XetTokenType.WRITE,
  534. repo_id=repo_id,
  535. repo_type=repo_type,
  536. revision=revision,
  537. headers=headers,
  538. endpoint=endpoint,
  539. params={"create_pr": "1"} if create_pr else None,
  540. )
  541. if new_xet_connection is None:
  542. raise XetRefreshTokenError("Failed to refresh xet token")
  543. return new_xet_connection.access_token, new_xet_connection.expiration_unix_epoch
  544. if not are_progress_bars_disabled():
  545. progress = XetProgressReporter()
  546. progress_callback = progress.update_progress
  547. else:
  548. progress, progress_callback = None, None
  549. try:
  550. all_bytes_ops = [op for op in additions if isinstance(op.path_or_fileobj, bytes)]
  551. all_paths_ops = [op for op in additions if isinstance(op.path_or_fileobj, (str, Path))]
  552. xet_headers = headers.copy()
  553. xet_headers.pop("authorization", None)
  554. if len(all_paths_ops) > 0:
  555. all_paths = [str(op.path_or_fileobj) for op in all_paths_ops]
  556. all_sha256s = [op.upload_info.sha256.hex() for op in all_paths_ops]
  557. upload_files(
  558. all_paths,
  559. xet_endpoint,
  560. access_token_info,
  561. token_refresher,
  562. progress_callback,
  563. repo_type,
  564. request_headers=xet_headers,
  565. sha256s=all_sha256s,
  566. )
  567. if len(all_bytes_ops) > 0:
  568. all_bytes = [op.path_or_fileobj for op in all_bytes_ops]
  569. all_sha256s = [op.upload_info.sha256.hex() for op in all_bytes_ops]
  570. upload_bytes(
  571. all_bytes,
  572. xet_endpoint,
  573. access_token_info,
  574. token_refresher,
  575. progress_callback,
  576. repo_type,
  577. request_headers=xet_headers,
  578. sha256s=all_sha256s,
  579. )
  580. finally:
  581. if progress is not None:
  582. progress.close(False)
  583. return
  584. def _validate_preupload_info(preupload_info: dict):
  585. files = preupload_info.get("files")
  586. if not isinstance(files, list):
  587. raise ValueError("preupload_info is improperly formatted")
  588. for file_info in files:
  589. if not (
  590. isinstance(file_info, dict)
  591. and isinstance(file_info.get("path"), str)
  592. and isinstance(file_info.get("uploadMode"), str)
  593. and (file_info["uploadMode"] in ("lfs", "regular"))
  594. ):
  595. raise ValueError("preupload_info is improperly formatted:")
  596. return preupload_info
  597. @validate_hf_hub_args
  598. def _fetch_upload_modes(
  599. additions: Iterable[CommitOperationAdd],
  600. repo_type: str,
  601. repo_id: str,
  602. headers: dict[str, str],
  603. revision: str,
  604. endpoint: str | None = None,
  605. create_pr: bool = False,
  606. gitignore_content: str | None = None,
  607. ) -> None:
  608. """
  609. Requests the Hub "preupload" endpoint to determine whether each input file should be uploaded as a regular git blob,
  610. as a git LFS blob, or as a XET file. Input `additions` are mutated in-place with the upload mode.
  611. Args:
  612. additions (`Iterable` of :class:`CommitOperationAdd`):
  613. Iterable of :class:`CommitOperationAdd` describing the files to
  614. upload to the Hub.
  615. repo_type (`str`):
  616. Type of the repo to upload to: `"model"`, `"dataset"` or `"space"`.
  617. repo_id (`str`):
  618. A namespace (user or an organization) and a repo name separated
  619. by a `/`.
  620. headers (`dict[str, str]`):
  621. Headers to use for the request, including authorization headers and user agent.
  622. revision (`str`):
  623. The git revision to upload the files to. Can be any valid git revision.
  624. gitignore_content (`str`, *optional*):
  625. The content of the `.gitignore` file to know which files should be ignored. The order of priority
  626. is to first check if `gitignore_content` is passed, then check if the `.gitignore` file is present
  627. in the list of files to commit and finally default to the `.gitignore` file already hosted on the Hub
  628. (if any).
  629. Raises:
  630. [`~utils.HfHubHTTPError`]
  631. If the Hub API returned an error.
  632. [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError)
  633. If the Hub API response is improperly formatted.
  634. """
  635. endpoint = endpoint if endpoint is not None else constants.ENDPOINT
  636. # Fetch upload mode (LFS or regular) chunk by chunk.
  637. upload_modes: dict[str, UploadMode] = {}
  638. should_ignore_info: dict[str, bool] = {}
  639. oid_info: dict[str, str | None] = {}
  640. for chunk in chunk_iterable(additions, 256):
  641. payload: dict = {
  642. "files": [
  643. {
  644. "path": op.path_in_repo,
  645. "sample": base64.b64encode(op.upload_info.sample).decode("ascii"),
  646. "size": op.upload_info.size,
  647. }
  648. for op in chunk
  649. ]
  650. }
  651. if gitignore_content is not None:
  652. payload["gitIgnore"] = gitignore_content
  653. resp = http_backoff(
  654. "POST",
  655. f"{endpoint}/api/{repo_type}s/{repo_id}/preupload/{revision}",
  656. json=payload,
  657. headers=headers,
  658. params={"create_pr": "1"} if create_pr else None,
  659. )
  660. hf_raise_for_status(resp)
  661. preupload_info = _validate_preupload_info(resp.json())
  662. upload_modes.update(**{file["path"]: file["uploadMode"] for file in preupload_info["files"]})
  663. should_ignore_info.update(**{file["path"]: file["shouldIgnore"] for file in preupload_info["files"]})
  664. oid_info.update(**{file["path"]: file.get("oid") for file in preupload_info["files"]})
  665. # Set upload mode for each addition operation
  666. for addition in additions:
  667. addition._upload_mode = upload_modes[addition.path_in_repo]
  668. addition._should_ignore = should_ignore_info[addition.path_in_repo]
  669. addition._remote_oid = oid_info[addition.path_in_repo]
  670. # Empty files cannot be uploaded as LFS (S3 would fail with a 501 Not Implemented)
  671. # => empty files are uploaded as "regular" to still allow users to commit them.
  672. for addition in additions:
  673. if addition.upload_info.size == 0:
  674. addition._upload_mode = "regular"
  675. @validate_hf_hub_args
  676. def _fetch_files_to_copy(
  677. copies: Iterable[CommitOperationCopy],
  678. repo_type: str,
  679. repo_id: str,
  680. headers: dict[str, str],
  681. revision: str,
  682. endpoint: str | None = None,
  683. ) -> dict[tuple[str, str | None], Union["RepoFile", bytes]]:
  684. """
  685. Fetch information about the files to copy.
  686. For LFS files, we only need their metadata (file size and sha256) while for regular files
  687. we need to download the raw content from the Hub.
  688. Args:
  689. copies (`Iterable` of :class:`CommitOperationCopy`):
  690. Iterable of :class:`CommitOperationCopy` describing the files to
  691. copy on the Hub.
  692. repo_type (`str`):
  693. Type of the repo to upload to: `"model"`, `"dataset"` or `"space"`.
  694. repo_id (`str`):
  695. A namespace (user or an organization) and a repo name separated
  696. by a `/`.
  697. headers (`dict[str, str]`):
  698. Headers to use for the request, including authorization headers and user agent.
  699. revision (`str`):
  700. The git revision to upload the files to. Can be any valid git revision.
  701. Returns: `dict[tuple[str, Optional[str]], Union[RepoFile, bytes]]]`
  702. Key is the file path and revision of the file to copy.
  703. Value is the raw content as bytes (for regular files) or the file information as a RepoFile (for LFS files).
  704. Raises:
  705. [`~utils.HfHubHTTPError`]
  706. If the Hub API returned an error.
  707. [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError)
  708. If the Hub API response is improperly formatted.
  709. """
  710. from .hf_api import HfApi, RepoFolder
  711. hf_api = HfApi(endpoint=endpoint, headers=headers)
  712. files_to_copy: dict[tuple[str, str | None], Union["RepoFile", bytes]] = {}
  713. # Store (path, revision) -> oid mapping
  714. oid_info: dict[tuple[str, str | None], str | None] = {}
  715. # 1. Fetch OIDs for destination paths in batches.
  716. dest_paths = [op.path_in_repo for op in copies]
  717. for offset in range(0, len(dest_paths), FETCH_LFS_BATCH_SIZE):
  718. dest_repo_files = hf_api.get_paths_info(
  719. repo_id=repo_id,
  720. paths=dest_paths[offset : offset + FETCH_LFS_BATCH_SIZE],
  721. revision=revision,
  722. repo_type=repo_type,
  723. )
  724. for file in dest_repo_files:
  725. if not isinstance(file, RepoFolder):
  726. oid_info[(file.path, revision)] = file.blob_id
  727. # 2. Group by source revision and fetch source file info in batches.
  728. for src_revision, operations in groupby(copies, key=lambda op: op.src_revision):
  729. operations = list(operations) # type: ignore
  730. src_paths = [op.src_path_in_repo for op in operations]
  731. for offset in range(0, len(src_paths), FETCH_LFS_BATCH_SIZE):
  732. src_repo_files = hf_api.get_paths_info(
  733. repo_id=repo_id,
  734. paths=src_paths[offset : offset + FETCH_LFS_BATCH_SIZE],
  735. revision=src_revision or revision,
  736. repo_type=repo_type,
  737. )
  738. for src_repo_file in src_repo_files:
  739. if isinstance(src_repo_file, RepoFolder):
  740. raise NotImplementedError("Copying a folder is not implemented.")
  741. oid_info[(src_repo_file.path, src_revision)] = src_repo_file.blob_id
  742. # If it's an LFS file, store the RepoFile object. Otherwise, download raw bytes.
  743. if src_repo_file.lfs:
  744. files_to_copy[(src_repo_file.path, src_revision)] = src_repo_file
  745. else:
  746. # TODO: (optimization) download regular files to copy concurrently
  747. url = hf_hub_url(
  748. endpoint=endpoint,
  749. repo_type=repo_type,
  750. repo_id=repo_id,
  751. revision=src_revision or revision,
  752. filename=src_repo_file.path,
  753. )
  754. response = get_session().get(url, headers=headers)
  755. hf_raise_for_status(response)
  756. files_to_copy[(src_repo_file.path, src_revision)] = response.content
  757. # 3. Ensure all operations found a corresponding file in the Hub
  758. # and track src/dest OIDs for each operation.
  759. for operation in operations:
  760. if (operation.src_path_in_repo, src_revision) not in files_to_copy:
  761. raise EntryNotFoundError(
  762. f"Cannot copy {operation.src_path_in_repo} at revision "
  763. f"{src_revision or revision}: file is missing on repo."
  764. )
  765. operation._src_oid = oid_info.get((operation.src_path_in_repo, operation.src_revision))
  766. operation._dest_oid = oid_info.get((operation.path_in_repo, revision))
  767. return files_to_copy
  768. def _prepare_commit_payload(
  769. operations: Iterable[CommitOperation],
  770. files_to_copy: dict[tuple[str, str | None], Union["RepoFile", bytes]],
  771. commit_message: str,
  772. commit_description: str | None = None,
  773. parent_commit: str | None = None,
  774. ) -> Iterable[dict[str, Any]]:
  775. """
  776. Builds the payload to POST to the `/commit` API of the Hub.
  777. Payload is returned as an iterator so that it can be streamed as a ndjson in the
  778. POST request.
  779. For more information, see:
  780. - https://github.com/huggingface/huggingface_hub/issues/1085#issuecomment-1265208073
  781. - http://ndjson.org/
  782. """
  783. commit_description = commit_description if commit_description is not None else ""
  784. # 1. Send a header item with the commit metadata
  785. header_value = {"summary": commit_message, "description": commit_description}
  786. if parent_commit is not None:
  787. header_value["parentCommit"] = parent_commit
  788. yield {"key": "header", "value": header_value}
  789. nb_ignored_files = 0
  790. # 2. Send operations, one per line
  791. for operation in operations:
  792. # Skip ignored files
  793. if isinstance(operation, CommitOperationAdd) and operation._should_ignore:
  794. logger.debug(f"Skipping file '{operation.path_in_repo}' in commit (ignored by gitignore file).")
  795. nb_ignored_files += 1
  796. continue
  797. # 2.a. Case adding a regular file
  798. if isinstance(operation, CommitOperationAdd) and operation._upload_mode == "regular":
  799. yield {
  800. "key": "file",
  801. "value": {
  802. "content": operation.b64content().decode(),
  803. "path": operation.path_in_repo,
  804. "encoding": "base64",
  805. },
  806. }
  807. # 2.b. Case adding an LFS file
  808. elif isinstance(operation, CommitOperationAdd) and operation._upload_mode == "lfs":
  809. yield {
  810. "key": "lfsFile",
  811. "value": {
  812. "path": operation.path_in_repo,
  813. "algo": "sha256",
  814. "oid": operation.upload_info.sha256.hex(),
  815. "size": operation.upload_info.size,
  816. },
  817. }
  818. # 2.c. Case deleting a file or folder
  819. elif isinstance(operation, CommitOperationDelete):
  820. yield {
  821. "key": "deletedFolder" if operation.is_folder else "deletedFile",
  822. "value": {"path": operation.path_in_repo},
  823. }
  824. # 2.d. Case copying a file or folder
  825. elif isinstance(operation, CommitOperationCopy):
  826. file_to_copy = files_to_copy[(operation.src_path_in_repo, operation.src_revision)]
  827. if isinstance(file_to_copy, bytes):
  828. yield {
  829. "key": "file",
  830. "value": {
  831. "content": base64.b64encode(file_to_copy).decode(),
  832. "path": operation.path_in_repo,
  833. "encoding": "base64",
  834. },
  835. }
  836. elif file_to_copy.lfs:
  837. yield {
  838. "key": "lfsFile",
  839. "value": {
  840. "path": operation.path_in_repo,
  841. "algo": "sha256",
  842. "oid": file_to_copy.lfs.sha256,
  843. },
  844. }
  845. else:
  846. raise ValueError(
  847. "Malformed files_to_copy (should be raw file content as bytes or RepoFile objects with LFS info."
  848. )
  849. # 2.e. Never expected to happen
  850. else:
  851. raise ValueError(
  852. f"Unknown operation to commit. Operation: {operation}. Upload mode:"
  853. f" {getattr(operation, '_upload_mode', None)}"
  854. )
  855. if nb_ignored_files > 0:
  856. logger.info(f"Skipped {nb_ignored_files} file(s) in commit (ignored by gitignore file).")