| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- import atexit
- import logging
- import os
- import time
- from concurrent.futures import Future
- from dataclasses import dataclass
- from io import SEEK_END, SEEK_SET, BytesIO
- from pathlib import Path
- from threading import Lock, Thread
- from typing import Optional
- from .hf_api import DEFAULT_IGNORE_PATTERNS, CommitInfo, CommitOperationAdd, HfApi
- from .utils import filter_repo_objects
- logger = logging.getLogger(__name__)
- @dataclass(frozen=True)
- class _FileToUpload:
- """Temporary dataclass to store info about files to upload. Not meant to be used directly."""
- local_path: Path
- path_in_repo: str
- size_limit: int
- last_modified: float
- class CommitScheduler:
- """
- Scheduler to upload a local folder to the Hub at regular intervals (e.g. push to hub every 5 minutes).
- The recommended way to use the scheduler is to use it as a context manager. This ensures that the scheduler is
- properly stopped and the last commit is triggered when the script ends. The scheduler can also be stopped manually
- with the `stop` method. Checkout the [upload guide](https://huggingface.co/docs/huggingface_hub/guides/upload#scheduled-uploads)
- to learn more about how to use it.
- Args:
- repo_id (`str`):
- The id of the repo to commit to.
- folder_path (`str` or `Path`):
- Path to the local folder to upload regularly.
- every (`int` or `float`, *optional*):
- The number of minutes between each commit. Defaults to 5 minutes.
- path_in_repo (`str`, *optional*):
- Relative path of the directory in the repo, for example: `"checkpoints/"`. Defaults to the root folder
- of the repository.
- repo_type (`str`, *optional*):
- The type of the repo to commit to. Defaults to `model`.
- revision (`str`, *optional*):
- The revision of the repo to commit to. Defaults to `main`.
- private (`bool`, *optional*):
- Whether to make the repo private. If `None` (default), the repo will be public unless the organization's default is private. This value is ignored if the repo already exists.
- token (`str`, *optional*):
- The token to use to commit to the repo. Defaults to the token saved on the machine.
- allow_patterns (`list[str]` or `str`, *optional*):
- If provided, only files matching at least one pattern are uploaded.
- ignore_patterns (`list[str]` or `str`, *optional*):
- If provided, files matching any of the patterns are not uploaded.
- squash_history (`bool`, *optional*):
- Whether to squash the history of the repo after each commit. Defaults to `False`. Squashing commits is
- useful to avoid degraded performances on the repo when it grows too large.
- hf_api (`HfApi`, *optional*):
- The [`HfApi`] client to use to commit to the Hub. Can be set with custom settings (user agent, token,...).
- Example:
- ```py
- >>> from pathlib import Path
- >>> from huggingface_hub import CommitScheduler
- # Scheduler uploads every 10 minutes
- >>> csv_path = Path("watched_folder/data.csv")
- >>> CommitScheduler(repo_id="test_scheduler", repo_type="dataset", folder_path=csv_path.parent, every=10)
- >>> with csv_path.open("a") as f:
- ... f.write("first line")
- # Some time later (...)
- >>> with csv_path.open("a") as f:
- ... f.write("second line")
- ```
- Example using a context manager:
- ```py
- >>> from pathlib import Path
- >>> from huggingface_hub import CommitScheduler
- >>> with CommitScheduler(repo_id="test_scheduler", repo_type="dataset", folder_path="watched_folder", every=10) as scheduler:
- ... csv_path = Path("watched_folder/data.csv")
- ... with csv_path.open("a") as f:
- ... f.write("first line")
- ... (...)
- ... with csv_path.open("a") as f:
- ... f.write("second line")
- # Scheduler is now stopped and last commit have been triggered
- ```
- """
- def __init__(
- self,
- *,
- repo_id: str,
- folder_path: str | Path,
- every: int | float = 5,
- path_in_repo: str | None = None,
- repo_type: str | None = None,
- revision: str | None = None,
- private: bool | None = None,
- token: str | None = None,
- allow_patterns: list[str] | str | None = None,
- ignore_patterns: list[str] | str | None = None,
- squash_history: bool = False,
- hf_api: Optional["HfApi"] = None,
- ) -> None:
- self.api = hf_api or HfApi(token=token)
- # Folder
- self.folder_path = Path(folder_path).expanduser().resolve()
- self.path_in_repo = path_in_repo or ""
- self.allow_patterns = allow_patterns
- if ignore_patterns is None:
- ignore_patterns = []
- elif isinstance(ignore_patterns, str):
- ignore_patterns = [ignore_patterns]
- self.ignore_patterns = ignore_patterns + DEFAULT_IGNORE_PATTERNS
- if self.folder_path.is_file():
- raise ValueError(f"'folder_path' must be a directory, not a file: '{self.folder_path}'.")
- self.folder_path.mkdir(parents=True, exist_ok=True)
- # Repository
- repo_url = self.api.create_repo(repo_id=repo_id, private=private, repo_type=repo_type, exist_ok=True)
- self.repo_id = repo_url.repo_id
- self.repo_type = repo_type
- self.revision = revision
- self.token = token
- # Keep track of already uploaded files
- self.last_uploaded: dict[Path, float] = {} # key is local path, value is timestamp
- # Scheduler
- if not every > 0:
- raise ValueError(f"'every' must be a positive integer, not '{every}'.")
- self.lock = Lock()
- self.every = every
- self.squash_history = squash_history
- logger.info(f"Scheduled job to push '{self.folder_path}' to '{self.repo_id}' every {self.every} minutes.")
- self._scheduler_thread = Thread(target=self._run_scheduler, daemon=True)
- self._scheduler_thread.start()
- atexit.register(self._push_to_hub)
- self.__stopped = False
- def stop(self) -> None:
- """Stop the scheduler.
- A stopped scheduler cannot be restarted. Mostly for tests purposes.
- """
- self.__stopped = True
- def __enter__(self) -> "CommitScheduler":
- return self
- def __exit__(self, exc_type, exc_value, traceback) -> None:
- # Upload last changes before exiting
- self.trigger().result()
- self.stop()
- return
- def _run_scheduler(self) -> None:
- """Dumb thread waiting between each scheduled push to Hub."""
- while True:
- self.last_future = self.trigger()
- time.sleep(self.every * 60)
- if self.__stopped:
- break
- def trigger(self) -> Future:
- """Trigger a `push_to_hub` and return a future.
- This method is automatically called every `every` minutes. You can also call it manually to trigger a commit
- immediately, without waiting for the next scheduled commit.
- """
- return self.api.run_as_future(self._push_to_hub)
- def _push_to_hub(self) -> CommitInfo | None:
- if self.__stopped: # If stopped, already scheduled commits are ignored
- return None
- logger.info("(Background) scheduled commit triggered.")
- try:
- value = self.push_to_hub()
- if self.squash_history:
- logger.info("(Background) squashing repo history.")
- self.api.super_squash_history(repo_id=self.repo_id, repo_type=self.repo_type, branch=self.revision)
- return value
- except Exception as e:
- logger.error(f"Error while pushing to Hub: {e}") # Depending on the setup, error might be silenced
- raise
- def push_to_hub(self) -> CommitInfo | None:
- """
- Push folder to the Hub and return the commit info.
- > [!WARNING]
- > This method is not meant to be called directly. It is run in the background by the scheduler, respecting a
- > queue mechanism to avoid concurrent commits. Making a direct call to the method might lead to concurrency
- > issues.
- The default behavior of `push_to_hub` is to assume an append-only folder. It lists all files in the folder and
- uploads only changed files. If no changes are found, the method returns without committing anything. If you want
- to change this behavior, you can inherit from [`CommitScheduler`] and override this method. This can be useful
- for example to compress data together in a single file before committing. For more details and examples, check
- out our [integration guide](https://huggingface.co/docs/huggingface_hub/main/en/guides/upload#scheduled-uploads).
- """
- # Check files to upload (with lock)
- with self.lock:
- logger.debug("Listing files to upload for scheduled commit.")
- # List files from folder (taken from `_prepare_upload_folder_additions`)
- relpath_to_abspath = {
- path.relative_to(self.folder_path).as_posix(): path
- for path in sorted(self.folder_path.glob("**/*")) # sorted to be deterministic
- if path.is_file()
- }
- prefix = f"{self.path_in_repo.strip('/')}/" if self.path_in_repo else ""
- # Filter with pattern + filter out unchanged files + retrieve current file size
- files_to_upload: list[_FileToUpload] = []
- for relpath in filter_repo_objects(
- relpath_to_abspath.keys(), allow_patterns=self.allow_patterns, ignore_patterns=self.ignore_patterns
- ):
- local_path = relpath_to_abspath[relpath]
- stat = local_path.stat()
- if self.last_uploaded.get(local_path) is None or self.last_uploaded[local_path] != stat.st_mtime:
- files_to_upload.append(
- _FileToUpload(
- local_path=local_path,
- path_in_repo=prefix + relpath,
- size_limit=stat.st_size,
- last_modified=stat.st_mtime,
- )
- )
- # Return if nothing to upload
- if len(files_to_upload) == 0:
- logger.debug("Dropping schedule commit: no changed file to upload.")
- return None
- # Convert `_FileToUpload` as `CommitOperationAdd` (=> compute file shas + limit to file size)
- logger.debug("Removing unchanged files since previous scheduled commit.")
- add_operations = [
- CommitOperationAdd(
- # Cap the file to its current size, even if the user append data to it while a scheduled commit is happening
- path_or_fileobj=PartialFileIO(file_to_upload.local_path, size_limit=file_to_upload.size_limit),
- path_in_repo=file_to_upload.path_in_repo,
- )
- for file_to_upload in files_to_upload
- ]
- # Upload files (append mode expected - no need for lock)
- logger.debug("Uploading files for scheduled commit.")
- commit_info = self.api.create_commit(
- repo_id=self.repo_id,
- repo_type=self.repo_type,
- operations=add_operations,
- commit_message="Scheduled Commit",
- revision=self.revision,
- )
- # Successful commit: keep track of the latest "last_modified" for each file
- for file in files_to_upload:
- self.last_uploaded[file.local_path] = file.last_modified
- return commit_info
- class PartialFileIO(BytesIO):
- """A file-like object that reads only the first part of a file.
- Useful to upload a file to the Hub when the user might still be appending data to it. Only the first part of the
- file is uploaded (i.e. the part that was available when the filesystem was first scanned).
- In practice, only used internally by the CommitScheduler to regularly push a folder to the Hub with minimal
- disturbance for the user. The object is passed to `CommitOperationAdd`.
- Only supports `read`, `tell` and `seek` methods.
- Args:
- file_path (`str` or `Path`):
- Path to the file to read.
- size_limit (`int`):
- The maximum number of bytes to read from the file. If the file is larger than this, only the first part
- will be read (and uploaded).
- """
- def __init__(self, file_path: str | Path, size_limit: int) -> None:
- self._file_path = Path(file_path)
- self._file = self._file_path.open("rb")
- self._size_limit = min(size_limit, os.fstat(self._file.fileno()).st_size)
- def __del__(self) -> None:
- self._file.close()
- return super().__del__()
- def __repr__(self) -> str:
- return f"<PartialFileIO file_path={self._file_path} size_limit={self._size_limit}>"
- def __len__(self) -> int:
- return self._size_limit
- def __getattribute__(self, name: str):
- if name.startswith("_") or name in ("read", "tell", "seek", "fileno"): # only 4 public methods supported
- return super().__getattribute__(name)
- raise NotImplementedError(f"PartialFileIO does not support '{name}'.")
- def fileno(self):
- raise AttributeError("PartialFileIO does not have a fileno.")
- def tell(self) -> int:
- """Return the current file position."""
- return self._file.tell()
- def seek(self, __offset: int, __whence: int = SEEK_SET) -> int:
- """Change the stream position to the given offset.
- Behavior is the same as a regular file, except that the position is capped to the size limit.
- """
- if __whence == SEEK_END:
- # SEEK_END => set from the truncated end
- __offset = len(self) + __offset
- __whence = SEEK_SET
- pos = self._file.seek(__offset, __whence)
- if pos > self._size_limit:
- return self._file.seek(self._size_limit)
- return pos
- def read(self, __size: int | None = -1) -> bytes:
- """Read at most `__size` bytes from the file.
- Behavior is the same as a regular file, except that it is capped to the size limit.
- """
- current = self._file.tell()
- if __size is None or __size < 0:
- # Read until file limit
- truncated_size = self._size_limit - current
- else:
- # Read until file limit or __size
- truncated_size = min(__size, self._size_limit - current)
- return self._file.read(truncated_size)
|