| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766 |
- # Copyright 2024-present, the HuggingFace Inc. team.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import enum
- import logging
- import os
- import queue
- import shutil
- import sys
- import threading
- import time
- import traceback
- from datetime import datetime
- from pathlib import Path
- from threading import Lock
- from typing import TYPE_CHECKING, Any
- from urllib.parse import quote
- from ._commit_api import CommitOperationAdd, UploadInfo, _fetch_upload_modes
- from ._local_folder import LocalUploadFileMetadata, LocalUploadFilePaths, get_local_upload_paths, read_upload_metadata
- from .constants import DEFAULT_REVISION, REPO_TYPES
- from .utils import DEFAULT_IGNORE_PATTERNS, _format_size, filter_repo_objects, tqdm
- from .utils._runtime import is_xet_available
- from .utils.sha import sha_fileobj
- if TYPE_CHECKING:
- from .hf_api import HfApi
- logger = logging.getLogger(__name__)
- WAITING_TIME_IF_NO_TASKS = 10 # seconds
- MAX_NB_FILES_FETCH_UPLOAD_MODE = 100
- COMMIT_SIZE_SCALE: list[int] = [20, 50, 75, 100, 125, 200, 250, 400, 600, 1000]
- UPLOAD_BATCH_SIZE_XET = 256 # Max 256 files per upload batch for XET-enabled repos
- UPLOAD_BATCH_SIZE_LFS = 1 # Otherwise, batches of 1 for regular LFS upload
- # Repository limits (from https://huggingface.co/docs/hub/repositories-recommendations)
- MAX_FILES_PER_REPO = 100_000 # Recommended maximum number of files per repository
- MAX_FILES_PER_FOLDER = 10_000 # Recommended maximum number of files per folder
- MAX_FILE_SIZE_GB = 200 # Recommended maximum for individual file size (split larger files)
- RECOMMENDED_FILE_SIZE_GB = 20 # Recommended maximum for individual file size
- def _validate_upload_limits(paths_list: list[LocalUploadFilePaths]) -> None:
- """
- Validate upload against repository limits and warn about potential issues.
- Args:
- paths_list: List of file paths to be uploaded
- Warns about:
- - Too many files in the repository (>100k)
- - Too many entries (files or subdirectories) in a single folder (>10k)
- - Files exceeding size limits (>20GB recommended, >200GB maximum)
- """
- logger.info("Running validation checks on files to upload...")
- # Check 1: Total file count
- if len(paths_list) > MAX_FILES_PER_REPO:
- logger.warning(
- f"You are about to upload {len(paths_list):,} files. "
- f"This exceeds the recommended limit of {MAX_FILES_PER_REPO:,} files per repository.\n"
- f"Consider:\n"
- f" - Splitting your data into multiple repositories\n"
- f" - Using fewer, larger files (e.g., parquet files)\n"
- f" - See: https://huggingface.co/docs/hub/repositories-recommendations"
- )
- # Check 2: Files and subdirectories per folder
- # Track immediate children (files and subdirs) for each folder
- from collections import defaultdict
- entries_per_folder: dict[str, Any] = defaultdict(lambda: {"files": 0, "subdirs": set()})
- for paths in paths_list:
- path = Path(paths.path_in_repo)
- parts = path.parts
- # Count this file in its immediate parent directory
- parent = str(path.parent) if str(path.parent) != "." else "."
- entries_per_folder[parent]["files"] += 1
- # Track immediate subdirectories for each parent folder
- # Walk through the path components to track parent-child relationships
- for i, child in enumerate(parts[:-1]):
- parent = "." if i == 0 else "/".join(parts[:i])
- entries_per_folder[parent]["subdirs"].add(child)
- # Check limits for each folder
- for folder, data in entries_per_folder.items():
- file_count = data["files"]
- subdir_count = len(data["subdirs"])
- total_entries = file_count + subdir_count
- if total_entries > MAX_FILES_PER_FOLDER:
- folder_display = "root" if folder == "." else folder
- logger.warning(
- f"Folder '{folder_display}' contains {total_entries:,} entries "
- f"({file_count:,} files and {subdir_count:,} subdirectories). "
- f"This exceeds the recommended {MAX_FILES_PER_FOLDER:,} entries per folder.\n"
- "Consider reorganising into sub-folders."
- )
- # Check 3: File sizes
- large_files = []
- very_large_files = []
- for paths in paths_list:
- size = paths.file_path.stat().st_size
- size_gb = size / 1_000_000_000 # Use decimal GB as per Hub limits
- if size_gb > MAX_FILE_SIZE_GB:
- very_large_files.append((paths.path_in_repo, size_gb))
- elif size_gb > RECOMMENDED_FILE_SIZE_GB:
- large_files.append((paths.path_in_repo, size_gb))
- # Warn about very large files (>200GB)
- if very_large_files:
- files_str = "\n - ".join(f"{path}: {size:.1f}GB" for path, size in very_large_files[:5])
- more_str = f"\n ... and {len(very_large_files) - 5} more files" if len(very_large_files) > 5 else ""
- logger.warning(
- f"Found {len(very_large_files)} files exceeding the {MAX_FILE_SIZE_GB}GB recommended maximum:\n"
- f" - {files_str}{more_str}\n"
- f"Consider splitting these files into smaller chunks."
- )
- # Warn about large files (>20GB)
- if large_files:
- files_str = "\n - ".join(f"{path}: {size:.1f}GB" for path, size in large_files[:5])
- more_str = f"\n ... and {len(large_files) - 5} more files" if len(large_files) > 5 else ""
- logger.warning(
- f"Found {len(large_files)} files larger than {RECOMMENDED_FILE_SIZE_GB}GB (recommended limit):\n"
- f" - {files_str}{more_str}\n"
- f"Large files may slow down loading and processing."
- )
- logger.info("Validation checks complete.")
- def upload_large_folder_internal(
- api: "HfApi",
- repo_id: str,
- folder_path: str | Path,
- *,
- repo_type: str, # Repo type is required!
- revision: str | None = None,
- private: bool | None = None,
- allow_patterns: list[str] | str | None = None,
- ignore_patterns: list[str] | str | None = None,
- num_workers: int | None = None,
- print_report: bool = True,
- print_report_every: int = 60,
- ):
- """Upload a large folder to the Hub in the most resilient way possible.
- See [`HfApi.upload_large_folder`] for the full documentation.
- """
- # 1. Check args and setup
- if repo_type is None:
- raise ValueError(
- "For large uploads, `repo_type` is explicitly required. Please set it to `model`, `dataset` or `space`."
- " If you are using the CLI, pass it as `--repo-type=model`."
- )
- if repo_type not in REPO_TYPES:
- raise ValueError(f"Invalid repo type, must be one of {REPO_TYPES}")
- if revision is None:
- revision = DEFAULT_REVISION
- folder_path = Path(folder_path).expanduser().resolve()
- if not folder_path.is_dir():
- raise ValueError(f"Provided path: '{folder_path}' is not a directory")
- if ignore_patterns is None:
- ignore_patterns = []
- elif isinstance(ignore_patterns, str):
- ignore_patterns = [ignore_patterns]
- ignore_patterns += DEFAULT_IGNORE_PATTERNS
- if num_workers is None:
- nb_cores = os.cpu_count() or 1
- num_workers = max(nb_cores // 2, 1) # Use at most half of cpu cores
- # 2. Create repo if missing
- repo_url = api.create_repo(repo_id=repo_id, repo_type=repo_type, private=private, exist_ok=True)
- logger.info(f"Repo created: {repo_url}")
- repo_id = repo_url.repo_id
- # Warn on too many commits
- try:
- commits = api.list_repo_commits(repo_id=repo_id, repo_type=repo_type, revision=revision)
- commit_count = len(commits)
- if commit_count > 500:
- logger.warning(
- f"\n{'=' * 80}\n"
- f"WARNING: This repository has {commit_count} commits.\n"
- f"Repositories with a large number of commits can experience performance issues.\n"
- f"\n"
- f"Consider squashing your commit history using `super_squash_history()`.\n"
- "To do so, you need to stop this process, run the snippet below and restart the upload command."
- f" from huggingface_hub import super_squash_history\n"
- f" super_squash_history(repo_id='{repo_id}', repo_type='{repo_type}')\n"
- f"\n"
- f"Note: This is a non-revertible operation. See the documentation for more details:\n"
- f"https://huggingface.co/docs/huggingface_hub/main/en/package_reference/hf_api#huggingface_hub.HfApi.super_squash_history\n"
- f"{'=' * 80}\n"
- )
- except Exception as e:
- # Don't fail the upload if we can't check commit count
- logger.debug(f"Could not check commit count: {e}")
- # 2.1 Check if xet is enabled to set batch file upload size
- upload_batch_size = UPLOAD_BATCH_SIZE_XET if is_xet_available() else UPLOAD_BATCH_SIZE_LFS
- # 3. List files to upload
- filtered_paths_list = filter_repo_objects(
- (path.relative_to(folder_path).as_posix() for path in folder_path.glob("**/*") if path.is_file()),
- allow_patterns=allow_patterns,
- ignore_patterns=ignore_patterns,
- )
- paths_list = [get_local_upload_paths(folder_path, relpath) for relpath in filtered_paths_list]
- logger.info(f"Found {len(paths_list)} candidate files to upload")
- # Validate upload against repository limits
- _validate_upload_limits(paths_list)
- logger.info("Starting upload...")
- # Read metadata for each file
- items = [
- (paths, read_upload_metadata(folder_path, paths.path_in_repo))
- for paths in tqdm(paths_list, desc="Recovering from metadata files")
- ]
- # 4. Start workers
- status = LargeUploadStatus(items, upload_batch_size)
- threads = [
- threading.Thread(
- target=_worker_job,
- kwargs={
- "status": status,
- "api": api,
- "repo_id": repo_id,
- "repo_type": repo_type,
- "revision": revision,
- },
- )
- for _ in range(num_workers)
- ]
- for thread in threads:
- thread.start()
- # 5. Print regular reports
- if print_report:
- print("\n\n" + status.current_report())
- last_report_ts = time.time()
- while True:
- time.sleep(1)
- if time.time() - last_report_ts >= print_report_every:
- if print_report:
- _print_overwrite(status.current_report())
- last_report_ts = time.time()
- if status.is_done():
- logger.info("Is done: exiting main loop")
- break
- for thread in threads:
- thread.join()
- logger.info(status.current_report())
- logger.info("Upload is complete!")
- ####################
- # Logic to manage workers and synchronize tasks
- ####################
- class WorkerJob(enum.Enum):
- SHA256 = enum.auto()
- GET_UPLOAD_MODE = enum.auto()
- PREUPLOAD_LFS = enum.auto()
- COMMIT = enum.auto()
- WAIT = enum.auto() # if no tasks are available but we don't want to exit
- JOB_ITEM_T = tuple[LocalUploadFilePaths, LocalUploadFileMetadata]
- class LargeUploadStatus:
- """Contains information, queues and tasks for a large upload process."""
- def __init__(self, items: list[JOB_ITEM_T], upload_batch_size: int = 1):
- self.items = items
- self.queue_sha256: "queue.Queue[JOB_ITEM_T]" = queue.Queue()
- self.queue_get_upload_mode: "queue.Queue[JOB_ITEM_T]" = queue.Queue()
- self.queue_preupload_lfs: "queue.Queue[JOB_ITEM_T]" = queue.Queue()
- self.queue_commit: "queue.Queue[JOB_ITEM_T]" = queue.Queue()
- self.lock = Lock()
- self.nb_workers_sha256: int = 0
- self.nb_workers_get_upload_mode: int = 0
- self.nb_workers_preupload_lfs: int = 0
- self.upload_batch_size: int = upload_batch_size
- self.nb_workers_commit: int = 0
- self.nb_workers_waiting: int = 0
- self.last_commit_attempt: float | None = None
- self._started_at = datetime.now()
- self._chunk_idx: int = 1
- self._chunk_lock: Lock = Lock()
- # Setup queues
- for item in self.items:
- paths, metadata = item
- if metadata.sha256 is None:
- self.queue_sha256.put(item)
- elif metadata.upload_mode is None:
- self.queue_get_upload_mode.put(item)
- elif metadata.upload_mode == "lfs" and not metadata.is_uploaded:
- self.queue_preupload_lfs.put(item)
- elif not metadata.is_committed:
- self.queue_commit.put(item)
- else:
- logger.debug(f"Skipping file {paths.path_in_repo} (already uploaded and committed)")
- def target_chunk(self) -> int:
- with self._chunk_lock:
- return COMMIT_SIZE_SCALE[self._chunk_idx]
- def update_chunk(self, success: bool, nb_items: int, duration: float) -> None:
- with self._chunk_lock:
- if not success:
- logger.warning(f"Failed to commit {nb_items} files at once. Will retry with less files in next batch.")
- self._chunk_idx -= 1
- elif nb_items >= COMMIT_SIZE_SCALE[self._chunk_idx] and duration < 40:
- logger.info(f"Successfully committed {nb_items} at once. Increasing the limit for next batch.")
- self._chunk_idx += 1
- self._chunk_idx = max(0, min(self._chunk_idx, len(COMMIT_SIZE_SCALE) - 1))
- def current_report(self) -> str:
- """Generate a report of the current status of the large upload."""
- nb_hashed = 0
- size_hashed = 0
- nb_preuploaded = 0
- nb_lfs = 0
- nb_lfs_unsure = 0
- size_preuploaded = 0
- nb_committed = 0
- size_committed = 0
- total_size = 0
- ignored_files = 0
- total_files = 0
- with self.lock:
- for _, metadata in self.items:
- if metadata.should_ignore:
- ignored_files += 1
- continue
- total_size += metadata.size
- total_files += 1
- if metadata.sha256 is not None:
- nb_hashed += 1
- size_hashed += metadata.size
- if metadata.upload_mode == "lfs":
- nb_lfs += 1
- if metadata.upload_mode is None:
- nb_lfs_unsure += 1
- if metadata.is_uploaded:
- nb_preuploaded += 1
- size_preuploaded += metadata.size
- if metadata.is_committed:
- nb_committed += 1
- size_committed += metadata.size
- total_size_str = _format_size(total_size)
- now = datetime.now()
- now_str = now.strftime("%Y-%m-%d %H:%M:%S")
- elapsed = now - self._started_at
- elapsed_str = str(elapsed).split(".")[0] # remove milliseconds
- message = "\n" + "-" * 10
- message += f" {now_str} ({elapsed_str}) "
- message += "-" * 10 + "\n"
- message += "Files: "
- message += f"hashed {nb_hashed}/{total_files} ({_format_size(size_hashed)}/{total_size_str}) | "
- message += f"pre-uploaded: {nb_preuploaded}/{nb_lfs} ({_format_size(size_preuploaded)}/{total_size_str})"
- if nb_lfs_unsure > 0:
- message += f" (+{nb_lfs_unsure} unsure)"
- message += f" | committed: {nb_committed}/{total_files} ({_format_size(size_committed)}/{total_size_str})"
- message += f" | ignored: {ignored_files}\n"
- message += "Workers: "
- message += f"hashing: {self.nb_workers_sha256} | "
- message += f"get upload mode: {self.nb_workers_get_upload_mode} | "
- message += f"pre-uploading: {self.nb_workers_preupload_lfs} | "
- message += f"committing: {self.nb_workers_commit} | "
- message += f"waiting: {self.nb_workers_waiting}\n"
- message += "-" * 51
- return message
- def is_done(self) -> bool:
- with self.lock:
- return all(metadata.is_committed or metadata.should_ignore for _, metadata in self.items)
- def _worker_job(
- status: LargeUploadStatus,
- api: "HfApi",
- repo_id: str,
- repo_type: str,
- revision: str,
- ):
- """
- Main process for a worker. The worker will perform tasks based on the priority list until all files are uploaded
- and committed. If no tasks are available, the worker will wait for 10 seconds before checking again.
- If a task fails for any reason, the item(s) are put back in the queue for another worker to pick up.
- Read `upload_large_folder` docstring for more information on how tasks are prioritized.
- """
- while True:
- next_job: tuple[WorkerJob, list[JOB_ITEM_T]] | None = None
- # Determine next task
- next_job = _determine_next_job(status)
- if next_job is None:
- return
- job, items = next_job
- # Perform task
- match job:
- case WorkerJob.SHA256:
- item = items[0] # single item
- try:
- _compute_sha256(item)
- status.queue_get_upload_mode.put(item)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"Failed to compute sha256: {e}")
- traceback.format_exc()
- status.queue_sha256.put(item)
- with status.lock:
- status.nb_workers_sha256 -= 1
- case WorkerJob.GET_UPLOAD_MODE:
- try:
- _get_upload_mode(items, api=api, repo_id=repo_id, repo_type=repo_type, revision=revision)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"Failed to get upload mode: {e}")
- traceback.format_exc()
- # Items are either:
- # - dropped (if should_ignore)
- # - put in LFS queue (if LFS)
- # - put in commit queue (if regular)
- # - or put back (if error occurred).
- for item in items:
- _, metadata = item
- if metadata.should_ignore:
- continue
- match metadata.upload_mode:
- case "lfs":
- status.queue_preupload_lfs.put(item)
- case "regular":
- status.queue_commit.put(item)
- case _:
- status.queue_get_upload_mode.put(item)
- with status.lock:
- status.nb_workers_get_upload_mode -= 1
- case WorkerJob.PREUPLOAD_LFS:
- try:
- _preupload_lfs(items, api=api, repo_id=repo_id, repo_type=repo_type, revision=revision)
- for item in items:
- status.queue_commit.put(item)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"Failed to preupload LFS: {e}")
- traceback.format_exc()
- for item in items:
- status.queue_preupload_lfs.put(item)
- with status.lock:
- status.nb_workers_preupload_lfs -= 1
- case WorkerJob.COMMIT:
- start_ts = time.time()
- success = True
- try:
- _commit(items, api=api, repo_id=repo_id, repo_type=repo_type, revision=revision)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"Failed to commit: {e}")
- traceback.format_exc()
- for item in items:
- status.queue_commit.put(item)
- success = False
- duration = time.time() - start_ts
- status.update_chunk(success, len(items), duration)
- with status.lock:
- status.last_commit_attempt = time.time()
- status.nb_workers_commit -= 1
- case WorkerJob.WAIT:
- time.sleep(WAITING_TIME_IF_NO_TASKS)
- with status.lock:
- status.nb_workers_waiting -= 1
- def _determine_next_job(status: LargeUploadStatus) -> tuple[WorkerJob, list[JOB_ITEM_T]] | None:
- with status.lock:
- # 1. Commit if more than 5 minutes since last commit attempt (and at least 1 file)
- if (
- status.nb_workers_commit == 0
- and status.queue_commit.qsize() > 0
- and status.last_commit_attempt is not None
- and time.time() - status.last_commit_attempt > 5 * 60
- ):
- status.nb_workers_commit += 1
- logger.debug("Job: commit (more than 5 minutes since last commit attempt)")
- return (WorkerJob.COMMIT, _get_n(status.queue_commit, status.target_chunk()))
- # 2. Commit if at least 100 files are ready to commit
- elif status.nb_workers_commit == 0 and status.queue_commit.qsize() >= 150:
- status.nb_workers_commit += 1
- logger.debug("Job: commit (>100 files ready)")
- return (WorkerJob.COMMIT, _get_n(status.queue_commit, status.target_chunk()))
- # 3. Get upload mode if at least 100 files
- elif status.queue_get_upload_mode.qsize() >= MAX_NB_FILES_FETCH_UPLOAD_MODE:
- status.nb_workers_get_upload_mode += 1
- logger.debug(f"Job: get upload mode (>{MAX_NB_FILES_FETCH_UPLOAD_MODE} files ready)")
- return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, MAX_NB_FILES_FETCH_UPLOAD_MODE))
- # 4. Preupload LFS file if at least `status.upload_batch_size` files and no worker is preuploading LFS
- elif status.queue_preupload_lfs.qsize() >= status.upload_batch_size and status.nb_workers_preupload_lfs == 0:
- status.nb_workers_preupload_lfs += 1
- logger.debug("Job: preupload LFS (no other worker preuploading LFS)")
- return (WorkerJob.PREUPLOAD_LFS, _get_n(status.queue_preupload_lfs, status.upload_batch_size))
- # 5. Compute sha256 if at least 1 file and no worker is computing sha256
- elif status.queue_sha256.qsize() > 0 and status.nb_workers_sha256 == 0:
- status.nb_workers_sha256 += 1
- logger.debug("Job: sha256 (no other worker computing sha256)")
- return (WorkerJob.SHA256, _get_one(status.queue_sha256))
- # 6. Get upload mode if at least 1 file and no worker is getting upload mode
- elif status.queue_get_upload_mode.qsize() > 0 and status.nb_workers_get_upload_mode == 0:
- status.nb_workers_get_upload_mode += 1
- logger.debug("Job: get upload mode (no other worker getting upload mode)")
- return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, MAX_NB_FILES_FETCH_UPLOAD_MODE))
- # 7. Preupload LFS file if at least `status.upload_batch_size` files
- elif status.queue_preupload_lfs.qsize() >= status.upload_batch_size:
- status.nb_workers_preupload_lfs += 1
- logger.debug("Job: preupload LFS")
- return (WorkerJob.PREUPLOAD_LFS, _get_n(status.queue_preupload_lfs, status.upload_batch_size))
- # 8. Compute sha256 if at least 1 file
- elif status.queue_sha256.qsize() > 0:
- status.nb_workers_sha256 += 1
- logger.debug("Job: sha256")
- return (WorkerJob.SHA256, _get_one(status.queue_sha256))
- # 9. Get upload mode if at least 1 file
- elif status.queue_get_upload_mode.qsize() > 0:
- status.nb_workers_get_upload_mode += 1
- logger.debug("Job: get upload mode")
- return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, MAX_NB_FILES_FETCH_UPLOAD_MODE))
- # 10. Preupload LFS file if at least 1 file
- elif status.queue_preupload_lfs.qsize() > 0:
- status.nb_workers_preupload_lfs += 1
- logger.debug("Job: preupload LFS")
- return (WorkerJob.PREUPLOAD_LFS, _get_n(status.queue_preupload_lfs, status.upload_batch_size))
- # 11. Commit if at least 1 file and 1 min since last commit attempt
- elif (
- status.nb_workers_commit == 0
- and status.queue_commit.qsize() > 0
- and status.last_commit_attempt is not None
- and time.time() - status.last_commit_attempt > 1 * 60
- ):
- status.nb_workers_commit += 1
- logger.debug("Job: commit (1 min since last commit attempt)")
- return (WorkerJob.COMMIT, _get_n(status.queue_commit, status.target_chunk()))
- # 12. Commit if at least 1 file all other queues are empty and all workers are waiting
- # e.g. when it's the last commit
- elif (
- status.nb_workers_commit == 0
- and status.queue_commit.qsize() > 0
- and status.queue_sha256.qsize() == 0
- and status.queue_get_upload_mode.qsize() == 0
- and status.queue_preupload_lfs.qsize() == 0
- and status.nb_workers_sha256 == 0
- and status.nb_workers_get_upload_mode == 0
- and status.nb_workers_preupload_lfs == 0
- ):
- status.nb_workers_commit += 1
- logger.debug("Job: commit")
- return (WorkerJob.COMMIT, _get_n(status.queue_commit, status.target_chunk()))
- # 13. If all queues are empty, exit
- elif all(metadata.is_committed or metadata.should_ignore for _, metadata in status.items):
- logger.info("All files have been processed! Exiting worker.")
- return None
- # 14. If no task is available, wait
- else:
- status.nb_workers_waiting += 1
- logger.debug(f"No task available, waiting... ({WAITING_TIME_IF_NO_TASKS}s)")
- return (WorkerJob.WAIT, [])
- ####################
- # Atomic jobs (sha256, get_upload_mode, preupload_lfs, commit)
- ####################
- def _compute_sha256(item: JOB_ITEM_T) -> None:
- """Compute sha256 of a file and save it in metadata."""
- paths, metadata = item
- if metadata.sha256 is None:
- with paths.file_path.open("rb") as f:
- metadata.sha256 = sha_fileobj(f).hex()
- metadata.save(paths)
- def _get_upload_mode(items: list[JOB_ITEM_T], api: "HfApi", repo_id: str, repo_type: str, revision: str) -> None:
- """Get upload mode for each file and update metadata.
- Also receive info if the file should be ignored.
- """
- additions = [_build_hacky_operation(item) for item in items]
- _fetch_upload_modes(
- additions=additions,
- repo_type=repo_type,
- repo_id=repo_id,
- headers=api._build_hf_headers(),
- revision=quote(revision, safe=""),
- endpoint=api.endpoint,
- )
- for item, addition in zip(items, additions):
- paths, metadata = item
- metadata.upload_mode = addition._upload_mode
- metadata.should_ignore = addition._should_ignore
- metadata.remote_oid = addition._remote_oid
- metadata.save(paths)
- def _preupload_lfs(items: list[JOB_ITEM_T], api: "HfApi", repo_id: str, repo_type: str, revision: str) -> None:
- """Preupload LFS files and update metadata."""
- additions = [_build_hacky_operation(item) for item in items]
- api.preupload_lfs_files(
- repo_id=repo_id,
- repo_type=repo_type,
- revision=revision,
- additions=additions,
- )
- for paths, metadata in items:
- metadata.is_uploaded = True
- metadata.save(paths)
- def _commit(items: list[JOB_ITEM_T], api: "HfApi", repo_id: str, repo_type: str, revision: str) -> None:
- """Commit files to the repo."""
- additions = [_build_hacky_operation(item) for item in items]
- api.create_commit(
- repo_id=repo_id,
- repo_type=repo_type,
- revision=revision,
- operations=additions,
- commit_message="Add files using upload-large-folder tool",
- )
- for paths, metadata in items:
- metadata.is_committed = True
- metadata.save(paths)
- ####################
- # Hacks with CommitOperationAdd to bypass checks/sha256 calculation
- ####################
- class HackyCommitOperationAdd(CommitOperationAdd):
- def __post_init__(self) -> None:
- if isinstance(self.path_or_fileobj, Path):
- self.path_or_fileobj = str(self.path_or_fileobj)
- def _build_hacky_operation(item: JOB_ITEM_T) -> HackyCommitOperationAdd:
- paths, metadata = item
- operation = HackyCommitOperationAdd(path_in_repo=paths.path_in_repo, path_or_fileobj=paths.file_path)
- with paths.file_path.open("rb") as file:
- sample = file.peek(512)[:512]
- if metadata.sha256 is None:
- raise ValueError("sha256 must have been computed by now!")
- operation.upload_info = UploadInfo(sha256=bytes.fromhex(metadata.sha256), size=metadata.size, sample=sample)
- operation._upload_mode = metadata.upload_mode # type: ignore
- operation._should_ignore = metadata.should_ignore
- operation._remote_oid = metadata.remote_oid
- return operation
- ####################
- # Misc helpers
- ####################
- def _get_one(queue: "queue.Queue[JOB_ITEM_T]") -> list[JOB_ITEM_T]:
- return [queue.get()]
- def _get_n(queue: "queue.Queue[JOB_ITEM_T]", n: int) -> list[JOB_ITEM_T]:
- return [queue.get() for _ in range(min(queue.qsize(), n))]
- def _print_overwrite(report: str) -> None:
- """Print a report, overwriting the previous lines.
- Since tqdm in using `sys.stderr` to (re-)write progress bars, we need to use `sys.stdout`
- to print the report.
- Note: works well only if no other process is writing to `sys.stdout`!
- """
- report += "\n"
- # Get terminal width
- terminal_width = shutil.get_terminal_size().columns
- # Count number of lines that should be cleared
- nb_lines = sum(len(line) // terminal_width + 1 for line in report.splitlines())
- # Clear previous lines based on the number of lines in the report
- for _ in range(nb_lines):
- sys.stdout.write("\r\033[K") # Clear line
- sys.stdout.write("\033[F") # Move cursor up one line
- # Print the new report, filling remaining space with whitespace
- sys.stdout.write(report)
- sys.stdout.write(" " * (terminal_width - len(report.splitlines()[-1])))
- sys.stdout.flush()
|