hf_file_system.py 60 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442
  1. import os
  2. import tempfile
  3. import threading
  4. from collections import deque
  5. from collections.abc import Iterable, Iterator
  6. from contextlib import ExitStack
  7. from copy import deepcopy
  8. from dataclasses import dataclass, field
  9. from datetime import datetime
  10. from itertools import chain
  11. from pathlib import Path, PurePosixPath
  12. from typing import Any, NoReturn, Union
  13. from urllib.parse import quote, unquote
  14. import fsspec
  15. import httpx
  16. from fsspec.callbacks import _DEFAULT_CALLBACK, NoOpCallback, TqdmCallback
  17. from fsspec.utils import isfilelike
  18. from . import constants
  19. from ._commit_api import CommitOperationCopy, CommitOperationDelete
  20. from .errors import (
  21. BucketNotFoundError,
  22. EntryNotFoundError,
  23. HfHubHTTPError,
  24. RepositoryNotFoundError,
  25. RevisionNotFoundError,
  26. )
  27. from .file_download import hf_hub_url, http_get
  28. from .hf_api import SPECIAL_REFS_REVISION_REGEX, BucketFile, BucketFolder, HfApi, LastCommitInfo, RepoFile, RepoFolder
  29. from .utils import HFValidationError, hf_raise_for_status, http_backoff, http_stream_backoff
  30. from .utils.insecure_hashlib import md5
  31. @dataclass
  32. class HfFileSystemResolvedPath:
  33. """Top level Data structure containing information about a resolved Hugging Face file system path."""
  34. root: str
  35. path: str
  36. def unresolve(self) -> str:
  37. return f"{self.root}/{self.path}".rstrip("/")
  38. @dataclass
  39. class HfFileSystemResolvedRepositoryPath(HfFileSystemResolvedPath):
  40. """Data structure containing information about a resolved path in a repository."""
  41. repo_type: str
  42. repo_id: str
  43. revision: str
  44. path_in_repo: str
  45. root: str = field(init=False)
  46. path: str = field(init=False)
  47. # The part placed after '@' in the initial path. It can be a quoted or unquoted refs revision.
  48. # Used to reconstruct the unresolved path to return to the user.
  49. _raw_revision: str | None = field(default=None, repr=False)
  50. def __post_init__(self):
  51. repo_path = constants.REPO_TYPES_URL_PREFIXES.get(self.repo_type, "") + self.repo_id
  52. if self._raw_revision:
  53. self.root = f"{repo_path}@{self._raw_revision}"
  54. elif self.revision != constants.DEFAULT_REVISION:
  55. self.root = f"{repo_path}@{safe_revision(self.revision)}"
  56. else:
  57. self.root = repo_path
  58. self.path = self.path_in_repo
  59. @dataclass
  60. class HfFileSystemResolvedBucketPath(HfFileSystemResolvedPath):
  61. """Data structure containing information about a resolved path in a bucket."""
  62. bucket_id: str
  63. root: str = field(init=False)
  64. def __post_init__(self):
  65. self.root = "buckets/" + self.bucket_id
  66. # We need to improve fsspec.spec._Cached which is AbstractFileSystem's metaclass
  67. _cached_base: Any = type(fsspec.AbstractFileSystem)
  68. class _Cached(_cached_base):
  69. """
  70. Metaclass for caching HfFileSystem instances according to the args.
  71. This creates an additional reference to the filesystem, which prevents the
  72. filesystem from being garbage collected when all *user* references go away.
  73. A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also*
  74. be made for a filesystem instance to be garbage collected.
  75. This is a slightly modified version of `fsspec.spec._Cached` to improve it.
  76. In particular in `_tokenize` the pid isn't taken into account for the
  77. `fs_token` used to identify cached instances. The `fs_token` logic is also
  78. robust to defaults values and the order of the args. Finally new instances
  79. reuse the states from sister instances in the main thread.
  80. """
  81. def __init__(cls, *args, **kwargs):
  82. # Hack: override https://github.com/fsspec/filesystem_spec/blob/dcb167e8f50e6273d4cfdfc4cab8fc5aa4c958bf/fsspec/spec.py#L53
  83. super().__init__(*args, **kwargs)
  84. # Note: we intentionally create a reference here, to avoid garbage
  85. # collecting instances when all other references are gone. To really
  86. # delete a FileSystem, the cache must be cleared.
  87. cls._cache = {}
  88. def __call__(cls, *args, **kwargs):
  89. # Hack: override https://github.com/fsspec/filesystem_spec/blob/dcb167e8f50e6273d4cfdfc4cab8fc5aa4c958bf/fsspec/spec.py#L65
  90. skip = kwargs.pop("skip_instance_cache", False)
  91. fs_token = cls._tokenize(cls, threading.get_ident(), *args, **kwargs)
  92. fs_token_main_thread = cls._tokenize(cls, threading.main_thread().ident, *args, **kwargs)
  93. if not skip and cls.cachable and fs_token in cls._cache:
  94. # reuse cached instance
  95. cls._latest = fs_token
  96. return cls._cache[fs_token]
  97. else:
  98. # create new instance
  99. obj = type.__call__(cls, *args, **kwargs)
  100. if not skip and cls.cachable and fs_token_main_thread in cls._cache:
  101. # reuse the cache from the main thread instance in the new instance
  102. instance_state = cls._cache[fs_token_main_thread]._get_instance_state()
  103. for attr, state_value in instance_state.items():
  104. setattr(obj, attr, state_value)
  105. obj._fs_token_ = fs_token
  106. obj.storage_args = args
  107. obj.storage_options = kwargs
  108. if cls.cachable and not skip:
  109. cls._latest = fs_token
  110. cls._cache[fs_token] = obj
  111. return obj
  112. class HfFileSystem(fsspec.AbstractFileSystem, metaclass=_Cached):
  113. """
  114. Access a remote Hugging Face Hub repository as if were a local file system.
  115. > [!WARNING]
  116. > [`HfFileSystem`] provides fsspec compatibility, which is useful for libraries that require it (e.g., reading
  117. > Hugging Face datasets directly with `pandas`). However, it introduces additional overhead due to this compatibility
  118. > layer. For better performance and reliability, it's recommended to use `HfApi` methods when possible.
  119. The file system supports paths for the `hf://` protocol, which follows those URL schemes:
  120. * Models, Datasets and Spaces repositories:
  121. ```
  122. hf://<repo-id>[@<revision>]/<path/in/repo>
  123. hf://datasets/<repo-id>[@<revision>]/<path/in/repo>
  124. hf://spaces/<repo-id>[@<revision>]/<path/in/repo>
  125. ```
  126. * Buckets (generic storage):
  127. ```
  128. hf://buckets/<bucket-id>/<path/in/bucket>
  129. ```
  130. Note: when using the [`HfFileSystem`] directly, passing the `hf://` protocol prefix is optional in paths.
  131. Args:
  132. endpoint (`str`, *optional*):
  133. Endpoint of the Hub. Defaults to <https://huggingface.co>.
  134. token (`bool` or `str`, *optional*):
  135. A valid user access token (string). Defaults to the locally saved
  136. token, which is the recommended method for authentication (see
  137. https://huggingface.co/docs/huggingface_hub/quick-start#authentication).
  138. To disable authentication, pass `False`.
  139. block_size (`int`, *optional*):
  140. Block size for reading and writing files.
  141. expand_info (`bool`, *optional*):
  142. Whether to expand the information of the files.
  143. **storage_options (`dict`, *optional*):
  144. Additional options for the filesystem. See [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.__init__).
  145. Usage:
  146. ```python
  147. >>> from huggingface_hub import hffs
  148. >>> # List files
  149. >>> hffs.glob("my-username/my-model/*.bin")
  150. ['my-username/my-model/pytorch_model.bin']
  151. >>> hffs.ls("datasets/my-username/my-dataset", detail=False)
  152. ['datasets/my-username/my-dataset/.gitattributes', 'datasets/my-username/my-dataset/README.md', 'datasets/my-username/my-dataset/data.json']
  153. >>> # Read/write files
  154. >>> with hffs.open("my-username/my-model/pytorch_model.bin") as f:
  155. ... data = f.read()
  156. >>> with hffs.open("my-username/my-model/pytorch_model.bin", "wb") as f:
  157. ... f.write(data)
  158. ```
  159. Specify a token for authentication:
  160. ```python
  161. >>> from huggingface_hub import HfFileSystem
  162. >>> hffs = HfFileSystem(token=token)
  163. ```
  164. """
  165. root_marker = ""
  166. protocol = "hf"
  167. def __init__(
  168. self,
  169. *args,
  170. endpoint: str | None = None,
  171. token: bool | str | None = None,
  172. block_size: int | None = None,
  173. expand_info: bool | None = None,
  174. **storage_options,
  175. ):
  176. super().__init__(*args, **storage_options)
  177. self.endpoint = endpoint or constants.ENDPOINT
  178. self.token = token
  179. self._api = HfApi(endpoint=endpoint, token=token)
  180. self.block_size = block_size
  181. self.expand_info = expand_info
  182. # Maps (repo_type, repo_id, revision) to a 2-tuple with:
  183. # * the 1st element indicating whether the repository and the revision exist
  184. # * the 2nd element being the exception raised if the repository or revision doesn't exist
  185. self._repo_and_revision_exists_cache: dict[tuple[str, str, str | None], tuple[bool, Exception | None]] = {}
  186. # Same for buckets
  187. self._bucket_exists_cache: dict[str, tuple[bool, Exception | None]] = {}
  188. # Note: special case for buckets: revision is always None
  189. # Maps parent directory path to path infos
  190. self.dircache: dict[str, list[dict[str, Any]]] = {}
  191. @classmethod
  192. def _tokenize(cls, threading_ident: int, *args, **kwargs) -> str:
  193. """Deterministic token for caching"""
  194. # make fs_token robust to default values and to kwargs order
  195. kwargs["endpoint"] = kwargs.get("endpoint") or constants.ENDPOINT
  196. kwargs["token"] = kwargs.get("token")
  197. kwargs = {key: kwargs[key] for key in sorted(kwargs)}
  198. # contrary to fsspec, we don't include pid here
  199. tokenize_args = (cls, threading_ident, args, kwargs)
  200. h = md5(str(tokenize_args).encode())
  201. return h.hexdigest()
  202. def _repo_and_revision_exist(
  203. self, repo_type: str, repo_id: str, revision: str | None
  204. ) -> tuple[bool, Exception | None]:
  205. if (repo_type, repo_id, revision) not in self._repo_and_revision_exists_cache:
  206. try:
  207. self._api.repo_info(
  208. repo_id, revision=revision, repo_type=repo_type, timeout=constants.HF_HUB_ETAG_TIMEOUT
  209. )
  210. except (RepositoryNotFoundError, HFValidationError) as e:
  211. self._repo_and_revision_exists_cache[(repo_type, repo_id, revision)] = False, e
  212. self._repo_and_revision_exists_cache[(repo_type, repo_id, None)] = False, e
  213. except RevisionNotFoundError as e:
  214. self._repo_and_revision_exists_cache[(repo_type, repo_id, revision)] = False, e
  215. self._repo_and_revision_exists_cache[(repo_type, repo_id, None)] = True, None
  216. else:
  217. self._repo_and_revision_exists_cache[(repo_type, repo_id, revision)] = True, None
  218. self._repo_and_revision_exists_cache[(repo_type, repo_id, None)] = True, None
  219. return self._repo_and_revision_exists_cache[(repo_type, repo_id, revision)]
  220. def _bucket_exists(self, bucket_id: str) -> tuple[bool, Exception | None]:
  221. if bucket_id not in self._bucket_exists_cache:
  222. try:
  223. self._api.bucket_info(bucket_id)
  224. except BucketNotFoundError as e:
  225. self._bucket_exists_cache[bucket_id] = False, e
  226. else:
  227. self._bucket_exists_cache[bucket_id] = True, None
  228. return self._bucket_exists_cache[bucket_id]
  229. def resolve_path(
  230. self, path: str, revision: str | None = None
  231. ) -> HfFileSystemResolvedRepositoryPath | HfFileSystemResolvedBucketPath:
  232. """
  233. Resolve a Hugging Face file system path into its components.
  234. Args:
  235. path (`str`):
  236. Path to resolve.
  237. revision (`str`, *optional*):
  238. The revision of the repo to resolve. Defaults to the revision specified in the path.
  239. Returns:
  240. [`HfFileSystemResolvedPath`]: Resolved path information containing `repo_type`, `repo_id`, `revision` and `path_in_repo`.
  241. Raises:
  242. `ValueError`:
  243. If path contains conflicting revision information.
  244. `NotImplementedError`:
  245. If trying to list repositories.
  246. """
  247. def _align_revision_in_path_with_revision(revision_in_path: str | None, revision: str | None) -> str | None:
  248. if revision is not None:
  249. if revision_in_path is not None and revision_in_path != revision:
  250. raise ValueError(
  251. f'Revision specified in path ("{revision_in_path}") and in `revision` argument ("{revision}")'
  252. " are not the same."
  253. )
  254. else:
  255. revision = revision_in_path
  256. return revision
  257. path = self._strip_protocol(path)
  258. if not path:
  259. # can't list repositories at root
  260. raise NotImplementedError("Access to buckets and repositories lists is not implemented.")
  261. elif path.split("/")[0] == "buckets":
  262. bucket_id = "/".join(path.split("/")[1:3])
  263. path = "/".join(path.split("/")[3:])
  264. bucket_exists, err = self._bucket_exists(bucket_id)
  265. if not bucket_exists:
  266. _raise_file_not_found(path, err)
  267. return HfFileSystemResolvedBucketPath(bucket_id=bucket_id, path=path)
  268. elif path.split("/")[0] + "/" in constants.REPO_TYPES_URL_PREFIXES.values():
  269. if "/" not in path:
  270. # can't list repositories at the repository type level
  271. raise NotImplementedError("Access to repositories lists is not implemented.")
  272. repo_type, path = path.split("/", 1)
  273. repo_type = constants.REPO_TYPES_MAPPING[repo_type]
  274. else:
  275. repo_type = constants.REPO_TYPE_MODEL
  276. if path.count("/") > 0:
  277. if "@" in "/".join(path.split("/")[:2]):
  278. repo_id, revision_in_path = path.split("@", 1)
  279. if "/" in revision_in_path:
  280. match = SPECIAL_REFS_REVISION_REGEX.search(revision_in_path)
  281. if match is not None and revision in (None, match.group()):
  282. # Handle `refs/convert/parquet` and PR revisions separately
  283. path_in_repo = SPECIAL_REFS_REVISION_REGEX.sub("", revision_in_path).lstrip("/")
  284. revision_in_path = match.group()
  285. else:
  286. revision_in_path, path_in_repo = revision_in_path.split("/", 1)
  287. else:
  288. path_in_repo = ""
  289. revision = _align_revision_in_path_with_revision(unquote(revision_in_path), revision)
  290. repo_and_revision_exist, err = self._repo_and_revision_exist(repo_type, repo_id, revision)
  291. if not repo_and_revision_exist:
  292. _raise_file_not_found(path, err)
  293. else:
  294. revision_in_path = None
  295. repo_id_with_namespace = "/".join(path.split("/")[:2])
  296. path_in_repo_with_namespace = "/".join(path.split("/")[2:])
  297. repo_id_without_namespace = path.split("/")[0]
  298. path_in_repo_without_namespace = "/".join(path.split("/")[1:])
  299. repo_id = repo_id_with_namespace
  300. path_in_repo = path_in_repo_with_namespace
  301. repo_and_revision_exist, err = self._repo_and_revision_exist(repo_type, repo_id, revision)
  302. if not repo_and_revision_exist:
  303. if isinstance(err, (RepositoryNotFoundError, HFValidationError)):
  304. repo_id = repo_id_without_namespace
  305. path_in_repo = path_in_repo_without_namespace
  306. repo_and_revision_exist, _ = self._repo_and_revision_exist(repo_type, repo_id, revision)
  307. if not repo_and_revision_exist:
  308. _raise_file_not_found(path, err)
  309. else:
  310. _raise_file_not_found(path, err)
  311. else:
  312. repo_id = path
  313. path_in_repo = ""
  314. if "@" in path:
  315. repo_id, revision_in_path = path.split("@", 1)
  316. revision = _align_revision_in_path_with_revision(unquote(revision_in_path), revision)
  317. else:
  318. revision_in_path = None
  319. repo_and_revision_exist, _ = self._repo_and_revision_exist(repo_type, repo_id, revision)
  320. if not repo_and_revision_exist:
  321. raise NotImplementedError("Access to repositories lists is not implemented.")
  322. revision = revision if revision is not None else constants.DEFAULT_REVISION
  323. return HfFileSystemResolvedRepositoryPath(
  324. repo_type, repo_id, revision, path_in_repo, _raw_revision=revision_in_path
  325. )
  326. def invalidate_cache(self, path: str | None = None) -> None:
  327. """
  328. Clear the cache for a given path.
  329. For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.invalidate_cache).
  330. Args:
  331. path (`str`, *optional*):
  332. Path to clear from cache. If not provided, clear the entire cache.
  333. """
  334. if not path:
  335. self.dircache.clear()
  336. self._repo_and_revision_exists_cache.clear()
  337. else:
  338. resolved_path = self.resolve_path(path)
  339. path = resolved_path.unresolve()
  340. while path:
  341. self.dircache.pop(path, None)
  342. path = self._parent(path)
  343. # Only clear repo cache if path is to repo root
  344. if not resolved_path.path:
  345. if isinstance(resolved_path, HfFileSystemResolvedRepositoryPath):
  346. self._repo_and_revision_exists_cache.pop(
  347. (resolved_path.repo_type, resolved_path.repo_id, None), None
  348. )
  349. self._repo_and_revision_exists_cache.pop(
  350. (resolved_path.repo_type, resolved_path.repo_id, resolved_path.revision), None
  351. )
  352. else:
  353. self._bucket_exists_cache.pop(resolved_path.bucket_id, None)
  354. def _open( # type: ignore
  355. self,
  356. path: str,
  357. mode: str = "rb",
  358. block_size: int | None = None,
  359. revision: str | None = None,
  360. **kwargs,
  361. ) -> Union["HfFileSystemFile", "HfFileSystemStreamFile"]:
  362. block_size = block_size if block_size is not None else self.block_size
  363. if block_size is not None:
  364. kwargs["block_size"] = block_size
  365. if "a" in mode:
  366. raise NotImplementedError("Appending to remote files is not yet supported.")
  367. if block_size == 0:
  368. return HfFileSystemStreamFile(self, path, mode=mode, revision=revision, **kwargs)
  369. else:
  370. return HfFileSystemFile(self, path, mode=mode, revision=revision, **kwargs)
  371. def _rm(self, path: str, revision: str | None = None, **kwargs) -> None:
  372. resolved_path = self.resolve_path(path, revision=revision)
  373. if isinstance(resolved_path, HfFileSystemResolvedBucketPath):
  374. self._api.batch_bucket_files(resolved_path.bucket_id, delete=[resolved_path.path])
  375. else:
  376. self._api.delete_file(
  377. path_in_repo=resolved_path.path_in_repo,
  378. repo_id=resolved_path.repo_id,
  379. token=self.token,
  380. repo_type=resolved_path.repo_type,
  381. revision=resolved_path.revision,
  382. commit_message=kwargs.get("commit_message"),
  383. commit_description=kwargs.get("commit_description"),
  384. )
  385. self.invalidate_cache(path=resolved_path.unresolve())
  386. def rm(
  387. self,
  388. path: str,
  389. recursive: bool = False,
  390. maxdepth: int | None = None,
  391. revision: str | None = None,
  392. **kwargs,
  393. ) -> None:
  394. """
  395. Delete files from a repository.
  396. For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.rm).
  397. > [!WARNING]
  398. > Note: When possible, use `HfApi.delete_file()` for better performance.
  399. Args:
  400. path (`str`):
  401. Path to delete.
  402. recursive (`bool`, *optional*):
  403. If True, delete directory and all its contents. Defaults to False.
  404. maxdepth (`int`, *optional*):
  405. Maximum number of subdirectories to visit when deleting recursively.
  406. revision (`str`, *optional*):
  407. The git revision to delete from.
  408. """
  409. resolved_path = self.resolve_path(path, revision=revision)
  410. paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth, revision=revision)
  411. if isinstance(resolved_path, HfFileSystemResolvedBucketPath):
  412. delete = [self.resolve_path(path).path for path in paths if not self.isdir(path)]
  413. self._api.batch_bucket_files(resolved_path.bucket_id, delete=delete)
  414. else:
  415. paths_in_repo = [self.resolve_path(path).path for path in paths if not self.isdir(path)]
  416. operations = [CommitOperationDelete(path_in_repo=path_in_repo) for path_in_repo in paths_in_repo]
  417. commit_message = f"Delete {path} "
  418. commit_message += "recursively " if recursive else ""
  419. commit_message += f"up to depth {maxdepth} " if maxdepth is not None else ""
  420. # TODO: use `commit_description` to list all the deleted paths?
  421. self._api.create_commit(
  422. repo_id=resolved_path.repo_id,
  423. repo_type=resolved_path.repo_type,
  424. token=self.token,
  425. operations=operations,
  426. revision=resolved_path.revision,
  427. commit_message=kwargs.get("commit_message", commit_message),
  428. commit_description=kwargs.get("commit_description"),
  429. )
  430. self.invalidate_cache(path=resolved_path.unresolve())
  431. def ls(
  432. self, path: str, detail: bool = True, refresh: bool = False, revision: str | None = None, **kwargs
  433. ) -> list[str | dict[str, Any]]:
  434. """
  435. List the contents of a directory.
  436. For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.ls).
  437. > [!WARNING]
  438. > Note: When possible, use `HfApi.list_repo_tree()` for better performance.
  439. Args:
  440. path (`str`):
  441. Path to the directory.
  442. detail (`bool`, *optional*):
  443. If True, returns a list of dictionaries containing file information. If False,
  444. returns a list of file paths. Defaults to True.
  445. refresh (`bool`, *optional*):
  446. If True, bypass the cache and fetch the latest data. Defaults to False.
  447. revision (`str`, *optional*):
  448. The git revision to list from.
  449. Returns:
  450. `list[Union[str, dict[str, Any]]]`: List of file paths (if detail=False) or list of file information
  451. dictionaries (if detail=True).
  452. """
  453. resolved_path = self.resolve_path(path, revision=revision)
  454. path = resolved_path.unresolve()
  455. try:
  456. out = self._ls_tree(path, refresh=refresh, revision=revision, **kwargs)
  457. except EntryNotFoundError:
  458. # Path could be a file
  459. if not resolved_path.path:
  460. _raise_file_not_found(path, None)
  461. try:
  462. out = self._ls_tree(self._parent(path), refresh=refresh, revision=revision, **kwargs)
  463. except EntryNotFoundError:
  464. out = []
  465. out = [o for o in out if o["name"] == path]
  466. if len(out) == 0:
  467. _raise_file_not_found(path, None)
  468. return out if detail else [o["name"] for o in out]
  469. def _ls_tree(
  470. self,
  471. path: str,
  472. recursive: bool = False,
  473. refresh: bool = False,
  474. revision: str | None = None,
  475. expand_info: bool | None = None,
  476. maxdepth: int | None = None,
  477. ):
  478. expand_info = (
  479. expand_info if expand_info is not None else (self.expand_info if self.expand_info is not None else False)
  480. )
  481. resolved_path = self.resolve_path(path, revision=revision)
  482. path = resolved_path.unresolve()
  483. root_path = resolved_path.root
  484. maxdepth = maxdepth if recursive else 1
  485. out = []
  486. if path in self.dircache and not refresh:
  487. cached_path_infos = self.dircache[path]
  488. out.extend(cached_path_infos)
  489. dirs_not_in_dircache = []
  490. if recursive:
  491. # Use BFS to traverse the cache and build the "recursive "output
  492. # (The Hub uses a so-called "tree first" strategy for the tree endpoint but we sort the output to follow the spec so the result is (eventually) the same)
  493. depth = 2
  494. dirs_to_visit = deque(
  495. [(depth, path_info) for path_info in cached_path_infos if path_info["type"] == "directory"]
  496. )
  497. while dirs_to_visit:
  498. depth, dir_info = dirs_to_visit.popleft()
  499. if maxdepth is None or depth <= maxdepth:
  500. if dir_info["name"] not in self.dircache:
  501. dirs_not_in_dircache.append(dir_info["name"])
  502. else:
  503. cached_path_infos = self.dircache[dir_info["name"]]
  504. out.extend(cached_path_infos)
  505. dirs_to_visit.extend(
  506. [
  507. (depth + 1, path_info)
  508. for path_info in cached_path_infos
  509. if path_info["type"] == "directory"
  510. ]
  511. )
  512. dirs_not_expanded = []
  513. if expand_info and isinstance(resolved_path, HfFileSystemResolvedRepositoryPath):
  514. # Check if there are directories in repos with non-expanded entries
  515. dirs_not_expanded = [self._parent(o["name"]) for o in out if o["last_commit"] is None]
  516. if (recursive and dirs_not_in_dircache) or (expand_info and dirs_not_expanded):
  517. # If the dircache is incomplete, find the common path of the missing and non-expanded entries
  518. # and extend the output with the result of `_ls_tree(common_path, recursive=True)`
  519. common_prefix = os.path.commonprefix(dirs_not_in_dircache + dirs_not_expanded)
  520. # Get the parent directory if the common prefix itself is not a directory
  521. common_path = (
  522. common_prefix.rstrip("/")
  523. if common_prefix.endswith("/")
  524. or common_prefix == root_path
  525. or common_prefix in chain(dirs_not_in_dircache, dirs_not_expanded)
  526. else self._parent(common_prefix)
  527. )
  528. if maxdepth is not None:
  529. common_path_depth = common_path[len(path) :].count("/")
  530. maxdepth -= common_path_depth
  531. out = [o for o in out if not o["name"].startswith(common_path + "/")]
  532. for cached_path in list(self.dircache):
  533. if cached_path.startswith(common_path + "/"):
  534. self.dircache.pop(cached_path, None)
  535. self.dircache.pop(common_path, None)
  536. out.extend(
  537. self._ls_tree(
  538. common_path,
  539. recursive=recursive,
  540. refresh=True,
  541. revision=revision,
  542. expand_info=expand_info,
  543. maxdepth=maxdepth,
  544. )
  545. )
  546. else:
  547. tree: Iterable[RepoFile | RepoFolder | BucketFile | BucketFolder]
  548. if isinstance(resolved_path, HfFileSystemResolvedBucketPath):
  549. tree = self._list_bucket_tree_with_folders(
  550. resolved_path.bucket_id,
  551. prefix=resolved_path.path,
  552. recursive=recursive,
  553. )
  554. else:
  555. tree = self._api.list_repo_tree(
  556. resolved_path.repo_id,
  557. resolved_path.path,
  558. recursive=recursive,
  559. expand=expand_info,
  560. revision=resolved_path.revision,
  561. repo_type=resolved_path.repo_type,
  562. )
  563. for path_info in tree:
  564. cache_path = root_path + "/" + path_info.path
  565. if isinstance(path_info, RepoFile):
  566. cache_path_info = {
  567. "name": cache_path,
  568. "size": path_info.size,
  569. "type": "file",
  570. "blob_id": path_info.blob_id,
  571. "lfs": path_info.lfs,
  572. "xet_hash": path_info.xet_hash,
  573. "last_commit": path_info.last_commit,
  574. "security": path_info.security,
  575. }
  576. elif isinstance(path_info, BucketFile):
  577. cache_path_info = {
  578. "name": cache_path,
  579. "size": path_info.size,
  580. "type": "file",
  581. "xet_hash": path_info.xet_hash,
  582. "mtime": path_info.mtime,
  583. "uploaded_at": path_info.uploaded_at,
  584. }
  585. elif isinstance(path_info, RepoFolder):
  586. cache_path_info = {
  587. "name": cache_path,
  588. "size": 0,
  589. "type": "directory",
  590. "tree_id": path_info.tree_id,
  591. "last_commit": path_info.last_commit,
  592. }
  593. else:
  594. cache_path_info = {
  595. "name": cache_path,
  596. "size": 0,
  597. "type": "directory",
  598. "uploaded_at": path_info.uploaded_at,
  599. }
  600. parent_path = self._parent(cache_path_info["name"])
  601. self.dircache.setdefault(parent_path, []).append(cache_path_info)
  602. depth = cache_path[len(path) :].count("/")
  603. if maxdepth is None or depth <= maxdepth:
  604. out.append(cache_path_info)
  605. return out
  606. def _list_bucket_tree_with_folders(
  607. self, bucket_id: str, prefix: str, recursive: bool
  608. ) -> Iterable[BucketFile | BucketFolder]:
  609. """Same as `HfApi.list_bucket_tree` but always includes folders"""
  610. bucket_files = self._api.list_bucket_tree(bucket_id, prefix, recursive=recursive)
  611. bucket_folders: dict[str, BucketFolder] = {}
  612. min_depth = 1 + prefix.count("/") if prefix else 0
  613. out: list[BucketFile | BucketFolder] = []
  614. for bucket_entry in bucket_files:
  615. out.append(bucket_entry)
  616. # If recursive=False, both files and folders are returned by the server => nothing to do
  617. if not recursive:
  618. continue
  619. # Otherwise, let's rebuild BucketFolders manually
  620. for parent_bucket_folder_str in list(PurePosixPath(bucket_entry.path).parents)[: -min_depth - 1]:
  621. parent_bucket_folder = BucketFolder(
  622. type="directory", path=str(parent_bucket_folder_str), uploaded_at=bucket_entry.uploaded_at
  623. )
  624. # If folder not visited yet, add it
  625. if parent_bucket_folder.path not in bucket_folders:
  626. out.append(parent_bucket_folder)
  627. bucket_folders[parent_bucket_folder.path] = parent_bucket_folder
  628. continue
  629. # Otherwise, get back BucketFolder object and update its 'uploaded_at'
  630. if parent_bucket_folder.uploaded_at is not None:
  631. bucket_folder = bucket_folders[parent_bucket_folder.path]
  632. if bucket_folder.uploaded_at is None or (
  633. bucket_folder.uploaded_at < parent_bucket_folder.uploaded_at
  634. ):
  635. bucket_folder.uploaded_at = parent_bucket_folder.uploaded_at
  636. if not out:
  637. raise EntryNotFoundError(f"File not found in bucket '{bucket_id}': '{prefix}'")
  638. return out
  639. def walk(self, path: str, *args, **kwargs) -> Iterator[tuple[str, list[str], list[str]]]:
  640. """
  641. Return all files below the given path.
  642. For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.walk).
  643. Args:
  644. path (`str`):
  645. Root path to list files from.
  646. Returns:
  647. `Iterator[tuple[str, list[str], list[str]]]`: An iterator of (path, list of directory names, list of file names) tuples.
  648. """
  649. path = self.resolve_path(path, revision=kwargs.get("revision")).unresolve()
  650. yield from super().walk(path, *args, **kwargs)
  651. def glob(self, path: str, maxdepth: int | None = None, **kwargs) -> list[str]:
  652. """
  653. Find files by glob-matching.
  654. For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.glob).
  655. Args:
  656. path (`str`):
  657. Path pattern to match.
  658. Returns:
  659. `list[str]`: List of paths matching the pattern.
  660. """
  661. path = self.resolve_path(path, revision=kwargs.get("revision")).unresolve()
  662. return super().glob(path, maxdepth=maxdepth, **kwargs)
  663. def find(
  664. self,
  665. path: str,
  666. maxdepth: int | None = None,
  667. withdirs: bool = False,
  668. detail: bool = False,
  669. refresh: bool = False,
  670. revision: str | None = None,
  671. **kwargs,
  672. ) -> list[str] | dict[str, dict[str, Any]]:
  673. """
  674. List all files below path.
  675. For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.find).
  676. Args:
  677. path (`str`):
  678. Root path to list files from.
  679. maxdepth (`int`, *optional*):
  680. Maximum depth to descend into subdirectories.
  681. withdirs (`bool`, *optional*):
  682. Include directory paths in the output. Defaults to False.
  683. detail (`bool`, *optional*):
  684. If True, returns a dict mapping paths to file information. Defaults to False.
  685. refresh (`bool`, *optional*):
  686. If True, bypass the cache and fetch the latest data. Defaults to False.
  687. revision (`str`, *optional*):
  688. The git revision to list from.
  689. Returns:
  690. `Union[list[str], dict[str, dict[str, Any]]]`: List of paths or dict of file information.
  691. """
  692. if maxdepth is not None and maxdepth < 1:
  693. raise ValueError("maxdepth must be at least 1")
  694. resolved_path = self.resolve_path(path, revision=revision)
  695. path = resolved_path.unresolve()
  696. try:
  697. out = self._ls_tree(path, recursive=True, refresh=refresh, maxdepth=maxdepth, **kwargs)
  698. except EntryNotFoundError:
  699. # Path could be a file
  700. try:
  701. if self.info(path, revision=revision, **kwargs)["type"] == "file":
  702. out = {path: {}}
  703. else:
  704. out = {}
  705. except FileNotFoundError:
  706. out = {}
  707. else:
  708. if not withdirs:
  709. out = [o for o in out if o["type"] != "directory"]
  710. else:
  711. # If `withdirs=True`, include the directory itself to be consistent with the spec
  712. path_info = self.info(path, **kwargs)
  713. out = [path_info] + out if path_info["type"] == "directory" else out
  714. out = {o["name"]: o for o in out}
  715. names = sorted(out)
  716. if not detail:
  717. return names
  718. else:
  719. return {name: out[name] for name in names}
  720. def cp_file(self, path1: str, path2: str, revision: str | None = None, **kwargs) -> None:
  721. """
  722. Copy a file within or between repositories.
  723. > [!WARNING]
  724. > Note: When possible, use `HfApi.upload_file()` for better performance.
  725. Args:
  726. path1 (`str`):
  727. Source path to copy from.
  728. path2 (`str`):
  729. Destination path to copy to.
  730. revision (`str`, *optional*):
  731. The git revision to copy from.
  732. """
  733. resolved_path1 = self.resolve_path(path1, revision=revision)
  734. resolved_path2 = self.resolve_path(path2, revision=revision)
  735. if isinstance(resolved_path1, HfFileSystemResolvedBucketPath) or isinstance(
  736. resolved_path2, HfFileSystemResolvedBucketPath
  737. ):
  738. raise NotImplementedError("Copy from/to buckets is not available yet")
  739. same_repo = (
  740. resolved_path1.repo_type == resolved_path2.repo_type and resolved_path1.repo_id == resolved_path2.repo_id
  741. )
  742. if same_repo:
  743. commit_message = f"Copy {path1} to {path2}"
  744. self._api.create_commit(
  745. repo_id=resolved_path1.repo_id,
  746. repo_type=resolved_path1.repo_type,
  747. revision=resolved_path2.revision,
  748. commit_message=kwargs.get("commit_message", commit_message),
  749. commit_description=kwargs.get("commit_description", ""),
  750. operations=[
  751. CommitOperationCopy(
  752. src_path_in_repo=resolved_path1.path_in_repo,
  753. path_in_repo=resolved_path2.path_in_repo,
  754. src_revision=resolved_path1.revision,
  755. )
  756. ],
  757. )
  758. else:
  759. with self.open(path1, "rb", revision=resolved_path1.revision) as f:
  760. content = f.read()
  761. commit_message = f"Copy {path1} to {path2}"
  762. self._api.upload_file(
  763. path_or_fileobj=content,
  764. path_in_repo=resolved_path2.path_in_repo,
  765. repo_id=resolved_path2.repo_id,
  766. token=self.token,
  767. repo_type=resolved_path2.repo_type,
  768. revision=resolved_path2.revision,
  769. commit_message=kwargs.get("commit_message", commit_message),
  770. commit_description=kwargs.get("commit_description"),
  771. )
  772. self.invalidate_cache(path=resolved_path1.unresolve())
  773. self.invalidate_cache(path=resolved_path2.unresolve())
  774. def modified(self, path: str, **kwargs) -> datetime:
  775. """
  776. Get the last modified time of a file.
  777. For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.modified).
  778. Args:
  779. path (`str`):
  780. Path to the file.
  781. Returns:
  782. `datetime`: Last modified time of the file.
  783. """
  784. info = self.info(path, **{**kwargs, "expand_info": True}) # type: ignore
  785. if "last_commit" in info:
  786. if info["last_commit"] is None:
  787. raise NotImplementedError(f"'modified' is not implemented for repository paths like '{path}'")
  788. return info["last_commit"].date
  789. elif "mtime" in info and info["mtime"]:
  790. return info["mtime"]
  791. elif "uploaded_at" in info and info["uploaded_at"]:
  792. return info["uploaded_at"]
  793. else:
  794. raise NotImplementedError(f"Cannot determined 'modified' for path '{path}' (info: {info})")
  795. def info(self, path: str, refresh: bool = False, revision: str | None = None, **kwargs) -> dict[str, Any]:
  796. """
  797. Get information about a file or directory.
  798. For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.info).
  799. > [!WARNING]
  800. > Note: When possible, use `HfApi.get_paths_info()` or `HfApi.repo_info()` for better performance
  801. > (or `HfApi.get_bucket_paths_info()` or `HfApi.bucket_info()` for buckets)
  802. Args:
  803. path (`str`):
  804. Path to get info for.
  805. refresh (`bool`, *optional*):
  806. If True, bypass the cache and fetch the latest data. Defaults to False.
  807. revision (`str`, *optional*):
  808. The git revision to get info from.
  809. Returns:
  810. `dict[str, Any]`: Dictionary containing file information (type, size, commit info, etc.).
  811. """
  812. resolved_path = self.resolve_path(path, revision=revision)
  813. path = resolved_path.unresolve()
  814. expand_info = kwargs.get(
  815. "expand_info", self.expand_info if self.expand_info is not None else False
  816. ) # don't expose it as a parameter in the public API to follow the spec
  817. out: dict[str, Any] | None
  818. if not resolved_path.path:
  819. # Path is the root directory
  820. out = {
  821. "name": path,
  822. "size": 0,
  823. "type": "directory",
  824. }
  825. if isinstance(resolved_path, HfFileSystemResolvedRepositoryPath):
  826. out["last_commit"] = None
  827. if isinstance(resolved_path, HfFileSystemResolvedRepositoryPath) and expand_info:
  828. last_commit = self._api.list_repo_commits(
  829. resolved_path.repo_id, repo_type=resolved_path.repo_type, revision=resolved_path.revision
  830. )[-1]
  831. out = {
  832. **out,
  833. "tree_id": None, # TODO: tree_id of the root directory?
  834. "last_commit": LastCommitInfo(
  835. oid=last_commit.commit_id, title=last_commit.title, date=last_commit.created_at
  836. ),
  837. }
  838. elif isinstance(resolved_path, HfFileSystemResolvedBucketPath):
  839. parent_path = self._parent(path)
  840. # Fill the cache with cheap call
  841. self.ls(parent_path, refresh=refresh)
  842. out1 = [o for o in self.dircache[parent_path] if o["name"] == path]
  843. if not out1:
  844. _raise_file_not_found(path, None)
  845. out = out1[0]
  846. else:
  847. out = None
  848. parent_path = self._parent(path)
  849. if not expand_info and parent_path not in self.dircache:
  850. # Fill the cache with cheap call
  851. self.ls(parent_path)
  852. if parent_path in self.dircache:
  853. # Check if the path is in the cache
  854. out1 = [o for o in self.dircache[parent_path] if o["name"] == path]
  855. if not out1:
  856. _raise_file_not_found(path, None)
  857. out = out1[0]
  858. if refresh or out is None or (expand_info and out and out["last_commit"] is None):
  859. paths_info = self._api.get_paths_info(
  860. resolved_path.repo_id,
  861. resolved_path.path_in_repo,
  862. expand=expand_info,
  863. revision=resolved_path.revision,
  864. repo_type=resolved_path.repo_type,
  865. )
  866. if not paths_info:
  867. _raise_file_not_found(path, None)
  868. path_info = paths_info[0]
  869. root_path = HfFileSystemResolvedRepositoryPath(
  870. resolved_path.repo_type,
  871. resolved_path.repo_id,
  872. resolved_path.revision,
  873. path_in_repo="",
  874. _raw_revision=resolved_path._raw_revision,
  875. ).unresolve()
  876. if isinstance(path_info, RepoFile):
  877. out = {
  878. "name": root_path + "/" + path_info.path,
  879. "size": path_info.size,
  880. "type": "file",
  881. "blob_id": path_info.blob_id,
  882. "lfs": path_info.lfs,
  883. "xet_hash": path_info.xet_hash,
  884. "last_commit": path_info.last_commit,
  885. "security": path_info.security,
  886. }
  887. else:
  888. out = {
  889. "name": root_path + "/" + path_info.path,
  890. "size": 0,
  891. "type": "directory",
  892. "tree_id": path_info.tree_id,
  893. "last_commit": path_info.last_commit,
  894. }
  895. if not expand_info:
  896. out = {k: out[k] for k in ["name", "size", "type"]}
  897. assert out is not None
  898. return out
  899. def exists(self, path, **kwargs):
  900. """
  901. Check if a file exists.
  902. For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists).
  903. > [!WARNING]
  904. > Note: When possible, use `HfApi.file_exists()` for better performance.
  905. Args:
  906. path (`str`):
  907. Path to check.
  908. Returns:
  909. `bool`: True if file exists, False otherwise.
  910. """
  911. try:
  912. if kwargs.get("refresh", False):
  913. self.invalidate_cache(path)
  914. self.info(path, **kwargs)
  915. return True
  916. except OSError:
  917. return False
  918. def isdir(self, path):
  919. """
  920. Check if a path is a directory.
  921. For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.isdir).
  922. Args:
  923. path (`str`):
  924. Path to check.
  925. Returns:
  926. `bool`: True if path is a directory, False otherwise.
  927. """
  928. try:
  929. return self.info(path)["type"] == "directory"
  930. except OSError:
  931. return False
  932. def isfile(self, path):
  933. """
  934. Check if a path is a file.
  935. For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.isfile).
  936. Args:
  937. path (`str`):
  938. Path to check.
  939. Returns:
  940. `bool`: True if path is a file, False otherwise.
  941. """
  942. try:
  943. return self.info(path)["type"] == "file"
  944. except OSError:
  945. return False
  946. def url(self, path: str) -> str:
  947. """
  948. Get the HTTP URL of the given path.
  949. Args:
  950. path (`str`):
  951. Path to get URL for.
  952. Returns:
  953. `str`: HTTP URL to access the file or directory on the Hub.
  954. """
  955. resolved_path = self.resolve_path(path)
  956. if isinstance(resolved_path, HfFileSystemResolvedBucketPath):
  957. url = f"{self.endpoint}/buckets/{resolved_path.bucket_id}/resolve/{quote(resolved_path.path)}"
  958. else:
  959. url = hf_hub_url(
  960. resolved_path.repo_id,
  961. resolved_path.path_in_repo,
  962. repo_type=resolved_path.repo_type,
  963. revision=resolved_path.revision,
  964. endpoint=self.endpoint,
  965. )
  966. if self.isdir(path):
  967. url = url.replace("/resolve/", "/tree/", 1)
  968. return url
  969. def get_file(self, rpath, lpath, callback=_DEFAULT_CALLBACK, outfile=None, **kwargs) -> None:
  970. """
  971. Copy single remote file to local.
  972. > [!WARNING]
  973. > Note: When possible, use `HfApi.hf_hub_download()` or `HfApi.download_bucket_files` for better performance.
  974. Args:
  975. rpath (`str`):
  976. Remote path to download from.
  977. lpath (`str`):
  978. Local path to download to.
  979. callback (`Callback`, *optional*):
  980. Optional callback to track download progress. Defaults to no callback.
  981. outfile (`IO`, *optional*):
  982. Optional file-like object to write to. If provided, `lpath` is ignored.
  983. """
  984. revision = kwargs.get("revision")
  985. unhandled_kwargs = set(kwargs.keys()) - {"revision"}
  986. if not isinstance(callback, (NoOpCallback, TqdmCallback)) or len(unhandled_kwargs) > 0:
  987. # for now, let's not handle custom callbacks
  988. # and let's not handle custom kwargs
  989. return super().get_file(rpath, lpath, callback=callback, outfile=outfile, **kwargs)
  990. # Taken from https://github.com/fsspec/filesystem_spec/blob/47b445ae4c284a82dd15e0287b1ffc410e8fc470/fsspec/spec.py#L883
  991. if isfilelike(lpath):
  992. outfile = lpath
  993. elif self.isdir(rpath):
  994. os.makedirs(lpath, exist_ok=True)
  995. return None
  996. if isinstance(lpath, (str, Path)): # otherwise, let's assume it's a file-like object
  997. os.makedirs(os.path.dirname(lpath), exist_ok=True)
  998. # Open file if not already open
  999. close_file = False
  1000. if outfile is None:
  1001. outfile = open(lpath, "wb")
  1002. close_file = True
  1003. initial_pos = outfile.tell()
  1004. # Custom implementation of `get_file` to use `http_get`.
  1005. resolve_remote_path = self.resolve_path(rpath, revision=revision)
  1006. expected_size = self.info(rpath, revision=revision)["size"]
  1007. callback.set_size(expected_size)
  1008. try:
  1009. http_get(
  1010. url=self.url(resolve_remote_path.unresolve()),
  1011. temp_file=outfile, # type: ignore
  1012. displayed_filename=rpath,
  1013. expected_size=expected_size,
  1014. resume_size=0,
  1015. headers=self._api._build_hf_headers(),
  1016. _tqdm_bar=callback.tqdm if isinstance(callback, TqdmCallback) else None,
  1017. )
  1018. outfile.seek(initial_pos)
  1019. finally:
  1020. # Close file only if we opened it ourselves
  1021. if close_file:
  1022. outfile.close()
  1023. @property
  1024. def transaction(self):
  1025. """A context within which files are committed together upon exit
  1026. Requires the file class to implement `.commit()` and `.discard()`
  1027. for the normal and exception cases.
  1028. """
  1029. # Taken from https://github.com/fsspec/filesystem_spec/blob/3fbb6fee33b46cccb015607630843dea049d3243/fsspec/spec.py#L231
  1030. # See https://github.com/huggingface/huggingface_hub/issues/1733
  1031. raise NotImplementedError("Transactional commits are not supported.")
  1032. def start_transaction(self):
  1033. """Begin write transaction for deferring files, non-context version"""
  1034. # Taken from https://github.com/fsspec/filesystem_spec/blob/3fbb6fee33b46cccb015607630843dea049d3243/fsspec/spec.py#L241
  1035. # See https://github.com/huggingface/huggingface_hub/issues/1733
  1036. raise NotImplementedError("Transactional commits are not supported.")
  1037. def __reduce__(self):
  1038. # re-populate the instance cache at HfFileSystem._cache and re-populate the state of every instance
  1039. return make_instance, (
  1040. type(self),
  1041. self.storage_args,
  1042. self.storage_options,
  1043. self._get_instance_state(),
  1044. )
  1045. def _get_instance_state(self):
  1046. return {
  1047. "dircache": deepcopy(self.dircache),
  1048. "_repo_and_revision_exists_cache": deepcopy(self._repo_and_revision_exists_cache),
  1049. "_bucket_exists_cache": deepcopy(self._bucket_exists_cache),
  1050. }
  1051. class HfFileSystemFile(fsspec.spec.AbstractBufferedFile):
  1052. def __init__(self, fs: HfFileSystem, path: str, revision: str | None = None, **kwargs):
  1053. try:
  1054. self.resolved_path = fs.resolve_path(path, revision=revision)
  1055. except FileNotFoundError as e:
  1056. if "w" in kwargs.get("mode", ""):
  1057. raise FileNotFoundError(
  1058. f"{e}.\nMake sure the repository and revision exist before writing data."
  1059. ) from e
  1060. raise
  1061. super().__init__(fs, self.resolved_path.unresolve(), **kwargs)
  1062. self.fs: HfFileSystem
  1063. def __del__(self):
  1064. if not hasattr(self, "resolved_path"):
  1065. # Means that the constructor failed. Nothing to do.
  1066. return
  1067. return super().__del__()
  1068. def _fetch_range(self, start: int, end: int) -> bytes:
  1069. headers = {
  1070. "range": f"bytes={start}-{end - 1}",
  1071. **self.fs._api._build_hf_headers(),
  1072. }
  1073. url = self.url()
  1074. r = http_backoff("GET", url, headers=headers, timeout=constants.HF_HUB_DOWNLOAD_TIMEOUT)
  1075. hf_raise_for_status(r)
  1076. return r.content
  1077. def _initiate_upload(self) -> None:
  1078. self.temp_file = tempfile.NamedTemporaryFile(prefix="hffs-", delete=False)
  1079. def _upload_chunk(self, final: bool = False) -> None:
  1080. self.buffer.seek(0)
  1081. block = self.buffer.read()
  1082. self.temp_file.write(block)
  1083. if final:
  1084. self.temp_file.close()
  1085. if isinstance(self.resolved_path, HfFileSystemResolvedBucketPath):
  1086. self.fs._api.batch_bucket_files(
  1087. self.resolved_path.bucket_id, add=[(self.temp_file.name, self.resolved_path.path)]
  1088. )
  1089. else:
  1090. self.fs._api.upload_file(
  1091. path_or_fileobj=self.temp_file.name,
  1092. path_in_repo=self.resolved_path.path_in_repo,
  1093. repo_id=self.resolved_path.repo_id,
  1094. token=self.fs.token,
  1095. repo_type=self.resolved_path.repo_type,
  1096. revision=self.resolved_path.revision,
  1097. commit_message=self.kwargs.get("commit_message"),
  1098. commit_description=self.kwargs.get("commit_description"),
  1099. )
  1100. os.remove(self.temp_file.name)
  1101. self.fs.invalidate_cache(
  1102. path=self.resolved_path.unresolve(),
  1103. )
  1104. def read(self, length=-1):
  1105. """Read remote file.
  1106. If `length` is not provided or is -1, the entire file is downloaded and read. On POSIX systems the file is
  1107. loaded in memory directly. Otherwise, the file is downloaded to a temporary file and read from there.
  1108. """
  1109. if self.mode == "rb" and (length is None or length == -1) and self.loc == 0:
  1110. with self.fs.open(self.path, "rb", block_size=0) as f: # block_size=0 enables fast streaming
  1111. out = f.read()
  1112. self.loc += len(out)
  1113. return out
  1114. return super().read(length)
  1115. def url(self) -> str:
  1116. return self.fs.url(self.path)
  1117. class HfFileSystemStreamFile(fsspec.spec.AbstractBufferedFile):
  1118. def __init__(
  1119. self,
  1120. fs: HfFileSystem,
  1121. path: str,
  1122. mode: str = "rb",
  1123. revision: str | None = None,
  1124. block_size: int = 0,
  1125. cache_type: str = "none",
  1126. **kwargs,
  1127. ):
  1128. if block_size != 0:
  1129. raise ValueError(f"HfFileSystemStreamFile only supports block_size=0 but got {block_size}")
  1130. if cache_type != "none":
  1131. raise ValueError(f"HfFileSystemStreamFile only supports cache_type='none' but got {cache_type}")
  1132. if "w" in mode:
  1133. raise ValueError(f"HfFileSystemStreamFile only supports reading but got mode='{mode}'")
  1134. try:
  1135. self.resolved_path = fs.resolve_path(path, revision=revision)
  1136. except FileNotFoundError as e:
  1137. if "w" in kwargs.get("mode", ""):
  1138. raise FileNotFoundError(
  1139. f"{e}.\nMake sure the repository and revision exist before writing data."
  1140. ) from e
  1141. # avoid an unnecessary .info() call to instantiate .details
  1142. self.details = {"name": self.resolved_path.unresolve(), "size": None}
  1143. super().__init__(
  1144. fs, self.resolved_path.unresolve(), mode=mode, block_size=block_size, cache_type=cache_type, **kwargs
  1145. )
  1146. self.response: httpx.Response | None = None
  1147. self.fs: HfFileSystem
  1148. self._exit_stack = ExitStack()
  1149. # streaming state
  1150. self._stream_iterator: Iterator[bytes] | None = None
  1151. self._stream_buffer = bytearray()
  1152. def seek(self, loc: int, whence: int = 0):
  1153. if loc == 0 and whence == 1:
  1154. return
  1155. if loc == self.loc and whence == 0:
  1156. return
  1157. raise ValueError("Cannot seek streaming HF file")
  1158. def read(self, length: int = -1):
  1159. """Read the remote file.
  1160. If the file is already open, we reuse the connection.
  1161. Otherwise, open a new connection and read from it.
  1162. If reading the stream fails, we retry with a new connection.
  1163. """
  1164. if self.response is None:
  1165. self._open_connection()
  1166. retried_once = False
  1167. while True:
  1168. try:
  1169. if self.response is None or self._stream_iterator is None:
  1170. return b"" # Already read the entire file
  1171. out = self._read_from_stream(self._stream_iterator, length)
  1172. self.loc += len(out)
  1173. return out
  1174. except Exception:
  1175. if self.response is not None:
  1176. self.response.close()
  1177. if retried_once: # Already retried once, give up
  1178. raise
  1179. # First failure, retry with range header
  1180. self._open_connection()
  1181. retried_once = True
  1182. def _read_from_stream(self, iterator: Iterator[bytes], length: int = -1) -> bytes:
  1183. """Read up to `length` bytes from stream buffer and stream.
  1184. If length < 0, read until EOF.
  1185. If EOF is reached before length, fewer bytes may be returned.
  1186. """
  1187. if length == 0:
  1188. return b""
  1189. if length < 0:
  1190. buf = bytearray(self._stream_buffer)
  1191. self._stream_buffer.clear()
  1192. for chunk in iterator:
  1193. buf.extend(chunk)
  1194. return bytes(buf)
  1195. if length <= len(self._stream_buffer):
  1196. result = bytes(self._stream_buffer[:length])
  1197. del self._stream_buffer[:length]
  1198. return result
  1199. buf = bytearray(self._stream_buffer)
  1200. self._stream_buffer.clear()
  1201. for chunk in iterator:
  1202. need = length - len(buf)
  1203. if need > len(chunk):
  1204. buf.extend(chunk)
  1205. else:
  1206. buf.extend(chunk[:need])
  1207. self._stream_buffer.extend(chunk[need:])
  1208. break
  1209. return bytes(buf)
  1210. def url(self) -> str:
  1211. return self.fs.url(self.path)
  1212. def __del__(self):
  1213. if not hasattr(self, "resolved_path"):
  1214. # Means that the constructor failed. Nothing to do.
  1215. return
  1216. self._exit_stack.close()
  1217. return super().__del__()
  1218. def __reduce__(self):
  1219. return reopen, (self.fs, self.path, self.mode, self.blocksize, self.cache.name)
  1220. def _open_connection(self):
  1221. """Open a connection to the remote file."""
  1222. # reset streaming state
  1223. self._stream_buffer.clear()
  1224. self._stream_iterator = None
  1225. url = self.url()
  1226. headers = self.fs._api._build_hf_headers()
  1227. if self.loc > 0:
  1228. headers["Range"] = f"bytes={self.loc}-"
  1229. self.response = self._exit_stack.enter_context(
  1230. http_stream_backoff(
  1231. "GET",
  1232. url,
  1233. headers=headers,
  1234. timeout=constants.HF_HUB_DOWNLOAD_TIMEOUT,
  1235. )
  1236. )
  1237. try:
  1238. hf_raise_for_status(self.response)
  1239. except HfHubHTTPError as e:
  1240. if e.response.status_code == 416:
  1241. # Range not satisfiable => means that we have already read the entire file
  1242. self.response = None
  1243. return
  1244. raise
  1245. self._stream_iterator = self.response.iter_bytes()
  1246. def safe_revision(revision: str) -> str:
  1247. return revision if SPECIAL_REFS_REVISION_REGEX.match(revision) else safe_quote(revision)
  1248. def safe_quote(s: str) -> str:
  1249. return quote(s, safe="")
  1250. def _raise_file_not_found(path: str, err: Exception | None) -> NoReturn:
  1251. msg = path
  1252. if isinstance(err, RepositoryNotFoundError):
  1253. msg = f"{path} (repository not found)"
  1254. elif isinstance(err, RevisionNotFoundError):
  1255. msg = f"{path} (revision not found)"
  1256. elif isinstance(err, HFValidationError):
  1257. msg = f"{path} (invalid repository id)"
  1258. raise FileNotFoundError(msg) from err
  1259. def reopen(fs: HfFileSystem, path: str, mode: str, block_size: int, cache_type: str):
  1260. return fs.open(path, mode=mode, block_size=block_size, cache_type=cache_type)
  1261. def make_instance(cls, args, kwargs, instance_state):
  1262. fs = cls(*args, **kwargs)
  1263. for attr, state_value in instance_state.items():
  1264. setattr(fs, attr, state_value)
  1265. return fs
  1266. hffs = HfFileSystem()