| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150 |
- # Copyright 2025 The HuggingFace Team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Contains commands to interact with jobs on the Hugging Face Hub.
- Usage:
- # run a job
- hf jobs run <image> <command>
- # List running or completed jobs
- hf jobs ps [-a] [-f key=value] [--format table|json|TEMPLATE] [-q]
- # Print logs from a job (non-blocking)
- hf jobs logs <job-id>
- # Stream logs from a job (blocking, like `docker logs -f`)
- hf jobs logs -f <job-id>
- # Stream resources usage stats and metrics from a job
- hf jobs stats <job-id>
- # Inspect detailed information about a job
- hf jobs inspect <job-id>
- # Cancel a running job
- hf jobs cancel <job-id>
- # List available hardware options
- hf jobs hardware
- # Run a UV script
- hf jobs uv run <script>
- # Schedule a job
- hf jobs scheduled run <schedule> <image> <command>
- # List scheduled jobs
- hf jobs scheduled ps [-a] [-f key=value] [--format table|json] [-q]
- # Inspect a scheduled job
- hf jobs scheduled inspect <scheduled_job_id>
- # Suspend a scheduled job
- hf jobs scheduled suspend <scheduled_job_id>
- # Resume a scheduled job
- hf jobs scheduled resume <scheduled_job_id>
- # Delete a scheduled job
- hf jobs scheduled delete <scheduled_job_id>
- """
- import json
- import multiprocessing
- import multiprocessing.pool
- import shutil
- import time
- from collections import deque
- from collections.abc import Callable, Iterable
- from dataclasses import asdict
- from fnmatch import fnmatch
- from queue import Empty, Queue
- from typing import Annotated, Any, TypeVar
- import typer
- from huggingface_hub import SpaceHardware
- from huggingface_hub.errors import CLIError, HfHubHTTPError
- from huggingface_hub.utils import logging
- from huggingface_hub.utils._cache_manager import _format_size
- from ._cli_utils import (
- EnvFileOpt,
- EnvOpt,
- OutputFormat,
- QuietOpt,
- SecretsFileOpt,
- SecretsOpt,
- TokenOpt,
- VolumesOpt,
- _format_cell,
- api_object_to_dict,
- get_hf_api,
- parse_env_map,
- parse_volumes,
- print_list_output,
- typer_factory,
- )
- logger = logging.get_logger(__name__)
- def _parse_namespace_from_job_id(job_id: str, namespace: str | None) -> tuple[str, str | None]:
- """Extract namespace from job_id if provided in 'namespace/job_id' format.
- Allows users to pass job IDs copied from the Hub UI (e.g. 'username/job_id')
- instead of only bare job IDs. If the namespace is also provided explicitly via
- --namespace and conflicts, a CLIError is raised.
- """
- if not job_id:
- raise CLIError("Job ID cannot be empty.")
- if job_id.count("/") > 1:
- raise CLIError(f"Job ID must be in the form 'job_id' or 'namespace/job_id': '{job_id}'.")
- if "/" not in job_id:
- return job_id, namespace
- extracted_namespace, parsed_job_id = job_id.split("/", 1)
- if not extracted_namespace or not parsed_job_id:
- raise CLIError(f"Job ID must be in the form 'job_id' or 'namespace/job_id': '{job_id}'.")
- if namespace is not None and namespace != extracted_namespace:
- raise CLIError(
- f"Conflicting namespace: got --namespace='{namespace}' but job ID implies namespace='{extracted_namespace}'"
- )
- return parsed_job_id, extracted_namespace
- SUGGESTED_FLAVORS = [item.value for item in SpaceHardware if item.value != "zero-a10g"]
- STATS_UPDATE_MIN_INTERVAL = 0.1 # we set a limit here since there is one update per second per job
- # Common job-related options
- ImageArg = Annotated[
- str,
- typer.Argument(
- help="The Docker image to use.",
- ),
- ]
- ImageOpt = Annotated[
- str | None,
- typer.Option(
- help="Use a custom Docker image with `uv` installed.",
- ),
- ]
- FlavorOpt = Annotated[
- SpaceHardware | None,
- typer.Option(
- help="Flavor for the hardware, as in HF Spaces. Run 'hf jobs hardware' to list available flavors. Defaults to `cpu-basic`.",
- ),
- ]
- LabelsOpt = Annotated[
- list[str] | None,
- typer.Option(
- "-l",
- "--label",
- help="Set labels. E.g. --label KEY=VALUE or --label LABEL",
- ),
- ]
- TimeoutOpt = Annotated[
- str | None,
- typer.Option(
- help="Max duration: int/float with s (seconds, default), m (minutes), h (hours) or d (days).",
- ),
- ]
- DetachOpt = Annotated[
- bool,
- typer.Option(
- "-d",
- "--detach",
- help="Run the Job in the background and print the Job ID.",
- ),
- ]
- NamespaceOpt = Annotated[
- str | None,
- typer.Option(
- help="The namespace where the job will be running. Defaults to the current user's namespace.",
- ),
- ]
- WithOpt = Annotated[
- list[str] | None,
- typer.Option(
- "--with",
- help="Run with the given packages installed",
- ),
- ]
- PythonOpt = Annotated[
- str | None,
- typer.Option(
- "-p",
- "--python",
- help="The Python interpreter to use for the run environment",
- ),
- ]
- SuspendOpt = Annotated[
- bool | None,
- typer.Option(
- help="Suspend (pause) the scheduled Job",
- ),
- ]
- ConcurrencyOpt = Annotated[
- bool | None,
- typer.Option(
- help="Allow multiple instances of this Job to run concurrently",
- ),
- ]
- ScheduleArg = Annotated[
- str,
- typer.Argument(
- help="One of annually, yearly, monthly, weekly, daily, hourly, or a CRON schedule expression.",
- ),
- ]
- ScriptArg = Annotated[
- str,
- typer.Argument(
- help="UV script to run (local file or URL)",
- ),
- ]
- ScriptArgsArg = Annotated[
- list[str] | None,
- typer.Argument(
- help="Arguments for the script",
- ),
- ]
- CommandArg = Annotated[
- list[str],
- typer.Argument(
- help="The command to run.",
- ),
- ]
- JobIdArg = Annotated[
- str,
- typer.Argument(
- help="Job ID (or 'namespace/job_id')",
- ),
- ]
- JobIdsArg = Annotated[
- list[str] | None,
- typer.Argument(
- help="Job IDs (or 'namespace/job_id')",
- ),
- ]
- ScheduledJobIdArg = Annotated[
- str,
- typer.Argument(
- help="Scheduled Job ID (or 'namespace/scheduled_job_id')",
- ),
- ]
- jobs_cli = typer_factory(help="Run and manage Jobs on the Hub.")
- @jobs_cli.command(
- "run",
- context_settings={"ignore_unknown_options": True},
- examples=[
- "hf jobs run python:3.12 python -c 'print(\"Hello!\")'",
- "hf jobs run -e FOO=foo python:3.12 python script.py",
- "hf jobs run --secrets HF_TOKEN python:3.12 python script.py",
- "hf jobs run -v hf://gpt2:/data -v hf://buckets/org/b:/mnt python:3.12 python script.py",
- ],
- )
- def jobs_run(
- image: ImageArg,
- command: CommandArg,
- env: EnvOpt = None,
- secrets: SecretsOpt = None,
- label: LabelsOpt = None,
- volume: VolumesOpt = None,
- env_file: EnvFileOpt = None,
- secrets_file: SecretsFileOpt = None,
- flavor: FlavorOpt = None,
- timeout: TimeoutOpt = None,
- detach: DetachOpt = False,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- ) -> None:
- """Run a Job."""
- env_map = parse_env_map(env, env_file)
- secrets_map = parse_env_map(secrets, secrets_file)
- api = get_hf_api(token=token)
- job = api.run_job(
- image=image,
- command=command,
- env=env_map,
- secrets=secrets_map,
- labels=_parse_labels_map(label),
- volumes=parse_volumes(volume),
- flavor=flavor,
- timeout=timeout,
- namespace=namespace,
- )
- # Always print the job ID to the user
- print(f"Job started with ID: {job.id}")
- print(f"View at: {job.url}")
- if detach:
- return
- # Now let's stream the logs
- for log in api.fetch_job_logs(job_id=job.id, namespace=job.owner.name, follow=True):
- print(log)
- @jobs_cli.command(
- "logs", examples=["hf jobs logs <job_id>", "hf jobs logs -f <job_id>", "hf jobs logs --tail 20 <job_id>"]
- )
- def jobs_logs(
- job_id: JobIdArg,
- follow: Annotated[
- bool,
- typer.Option(
- "-f",
- "--follow",
- help="Follow log output (stream until the job completes). Without this flag, only currently available logs are printed.",
- ),
- ] = False,
- tail: Annotated[
- int | None,
- typer.Option(
- "-n",
- "--tail",
- help="Number of lines to show from the end of the logs.",
- ),
- ] = None,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- ) -> None:
- """Fetch the logs of a Job.
- By default, prints currently available logs and exits (non-blocking).
- Use --follow/-f to stream logs in real-time until the job completes.
- """
- job_id, namespace = _parse_namespace_from_job_id(job_id, namespace)
- if follow and tail is not None:
- raise CLIError(
- "Cannot use --follow and --tail together. Use --follow to stream logs or --tail to show recent logs."
- )
- api = get_hf_api(token=token)
- try:
- logs = api.fetch_job_logs(job_id=job_id, namespace=namespace, follow=follow)
- if tail is not None:
- logs = deque(logs, maxlen=tail)
- for log in logs:
- print(log)
- except HfHubHTTPError as e:
- status = e.response.status_code if e.response is not None else None
- if status == 404:
- raise CLIError("Job not found. Please check the job ID.") from e
- elif status == 403:
- raise CLIError("Access denied. You may not have permission to view this job.") from e
- else:
- raise CLIError(f"Failed to fetch job logs: {e}") from e
- def _matches_filters(job_properties: dict[str, str], filters: list[tuple[str, str, str]]) -> bool:
- """Check if scheduled job matches all specified filters."""
- for key, op_str, pattern in filters:
- value = job_properties.get(key)
- if value is None:
- if op_str == "!=":
- continue
- return False
- match = fnmatch(value.lower(), pattern.lower())
- if (op_str == "=" and not match) or (op_str == "!=" and match):
- return False
- return True
- def _print_output(rows: list[list[str | int]], headers: list[str], aliases: list[str], fmt: str | None) -> None:
- """Print output according to the chosen format."""
- if fmt:
- # Use custom template if provided
- template = fmt
- for row in rows:
- line = template
- for i, field in enumerate(aliases):
- placeholder = f"{{{{.{field}}}}}"
- if placeholder in line:
- line = line.replace(placeholder, str(row[i]))
- print(line)
- else:
- # Default tabular format
- print(_tabulate(rows, headers=headers))
- def _clear_line(n: int) -> None:
- LINE_UP = "\033[1A"
- LINE_CLEAR = "\x1b[2K"
- for i in range(n):
- print(LINE_UP, end=LINE_CLEAR)
- def _get_jobs_stats_rows(
- job_id: str, metrics_stream: Iterable[dict[str, Any]], table_headers: list[str]
- ) -> Iterable[tuple[bool, str, list[list[str | int]]]]:
- for metrics in metrics_stream:
- row = [
- job_id,
- f"{metrics['cpu_usage_pct']}%",
- round(metrics["cpu_millicores"] / 1000.0, 1),
- f"{round(100 * metrics['memory_used_bytes'] / metrics['memory_total_bytes'], 2)}%",
- f"{_format_size(metrics['memory_used_bytes'])}B / {_format_size(metrics['memory_total_bytes'])}B",
- f"{_format_size(metrics['rx_bps'])}bps / {_format_size(metrics['tx_bps'])}bps",
- ]
- if metrics["gpus"] and isinstance(metrics["gpus"], dict):
- rows = [row] + [[""] * len(row)] * (len(metrics["gpus"]) - 1)
- for row, gpu_id in zip(rows, sorted(metrics["gpus"])):
- gpu = metrics["gpus"][gpu_id]
- row += [
- f"{gpu['utilization']}%",
- f"{round(100 * gpu['memory_used_bytes'] / gpu['memory_total_bytes'], 2)}%",
- f"{_format_size(gpu['memory_used_bytes'])}B / {_format_size(gpu['memory_total_bytes'])}B",
- ]
- else:
- row += ["N/A"] * (len(table_headers) - len(row))
- rows = [row]
- yield False, job_id, rows
- yield True, job_id, []
- @jobs_cli.command("stats", examples=["hf jobs stats <job_id>"])
- def jobs_stats(
- job_ids: JobIdsArg = None,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- ) -> None:
- """Fetch the resource usage statistics and metrics of Jobs"""
- if job_ids is not None:
- parsed_ids = []
- for job_id in job_ids:
- job_id, namespace = _parse_namespace_from_job_id(job_id, namespace)
- parsed_ids.append(job_id)
- job_ids = parsed_ids
- api = get_hf_api(token=token)
- if namespace is None:
- namespace = api.whoami()["name"]
- if job_ids is None:
- job_ids = [
- job.id
- for job in api.list_jobs(namespace=namespace)
- if (job.status.stage if job.status else "UNKNOWN") in ("RUNNING", "UPDATING")
- ]
- if len(job_ids) == 0:
- print("No running jobs found")
- return
- table_headers = [
- "JOB ID",
- "CPU %",
- "NUM CPU",
- "MEM %",
- "MEM USAGE",
- "NET I/O",
- "GPU UTIL %",
- "GPU MEM %",
- "GPU MEM USAGE",
- ]
- headers_aliases = [
- "id",
- "cpu_usage_pct",
- "cpu_millicores",
- "memory_used_bytes_pct",
- "memory_used_bytes_and_total_bytes",
- "rx_bps_and_tx_bps",
- "gpu_utilization",
- "gpu_memory_used_bytes_pct",
- "gpu_memory_used_bytes_and_total_bytes",
- ]
- try:
- with multiprocessing.pool.ThreadPool(len(job_ids)) as pool:
- rows_per_job_id: dict[str, list[list[str | int]]] = {}
- for job_id in job_ids:
- row: list[str | int] = [job_id]
- row += ["-- / --" if ("/" in header or "USAGE" in header) else "--" for header in table_headers[1:]]
- rows_per_job_id[job_id] = [row]
- last_update_time = time.time()
- total_rows = [row for job_id in rows_per_job_id for row in rows_per_job_id[job_id]]
- _print_output(total_rows, table_headers, headers_aliases, None)
- kwargs_list = [
- {
- "job_id": job_id,
- "metrics_stream": api.fetch_job_metrics(job_id=job_id, namespace=namespace),
- "table_headers": table_headers,
- }
- for job_id in job_ids
- ]
- for done, job_id, rows in iflatmap_unordered(pool, _get_jobs_stats_rows, kwargs_list=kwargs_list):
- if done:
- rows_per_job_id.pop(job_id, None)
- else:
- rows_per_job_id[job_id] = rows
- now = time.time()
- if now - last_update_time >= STATS_UPDATE_MIN_INTERVAL:
- _clear_line(2 + len(total_rows))
- total_rows = [row for job_id in rows_per_job_id for row in rows_per_job_id[job_id]]
- _print_output(total_rows, table_headers, headers_aliases, None)
- last_update_time = now
- except HfHubHTTPError as e:
- status = e.response.status_code if e.response is not None else None
- if status == 404:
- raise CLIError("Job not found. Please check the job ID.") from e
- elif status == 403:
- raise CLIError("Access denied. You may not have permission to view this job.") from e
- else:
- raise CLIError(f"Failed to fetch job stats: {e}") from e
- @jobs_cli.command("ps", examples=["hf jobs ps", "hf jobs ps -a"])
- def jobs_ps(
- all: Annotated[
- bool,
- typer.Option(
- "-a",
- "--all",
- help="Show all Jobs (default shows just running)",
- ),
- ] = False,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- filter: Annotated[
- list[str] | None,
- typer.Option(
- "-f",
- "--filter",
- help="Filter output based on conditions provided (format: key=value)",
- ),
- ] = None,
- format: Annotated[
- str | None,
- typer.Option(help="Output format: 'table' (default), 'json', or a Go template (e.g. '{{.id}}')"),
- ] = None,
- quiet: QuietOpt = False,
- ) -> None:
- """List Jobs."""
- api = get_hf_api(token=token)
- # Fetch jobs data
- jobs = api.list_jobs(namespace=namespace)
- filters: list[tuple[str, str, str]] = []
- labels_filters: list[tuple[str, str, str]] = []
- for f in filter or []:
- if f.startswith("label!=") or f.startswith("label="):
- if f.startswith("label!="):
- label_part = f[len("label!=") :]
- if "=" in label_part:
- print(
- f"Warning: Ignoring invalid label filter format 'label!={label_part}'. Use label!=key format."
- )
- continue
- label_key, op, label_value = label_part, "!=", "*"
- else:
- label_part = f[len("label=") :]
- if "=" in label_part:
- label_key, label_value = label_part.split("=", 1)
- else:
- label_key, label_value = label_part, "*"
- # Negate predicate in case of key!=value
- if label_key.endswith("!"):
- op = "!="
- label_key = label_key[:-1]
- else:
- op = "="
- labels_filters.append((label_key.lower(), op, label_value.lower()))
- elif "=" in f:
- key, value = f.split("=", 1)
- # Negate predicate in case of key!=value
- if key.endswith("!"):
- op = "!="
- key = key[:-1]
- else:
- op = "="
- filters.append((key.lower(), op, value.lower()))
- else:
- print(f"Warning: Ignoring invalid filter format '{f}'. Use key=value format.")
- # Filter jobs (operating on JobInfo objects to preserve existing filter behavior)
- filtered_jobs = []
- for job in jobs:
- status = job.status.stage if job.status else "UNKNOWN"
- if not all and status not in ("RUNNING", "UPDATING"):
- continue
- image_or_space = job.docker_image or "N/A"
- cmd = job.command or []
- command_str = " ".join(cmd) if cmd else "N/A"
- props = {"id": job.id, "image": image_or_space, "status": status.lower(), "command": command_str}
- if not _matches_filters(props, filters):
- continue
- if not _matches_filters(job.labels or {}, labels_filters):
- continue
- filtered_jobs.append(job)
- if not filtered_jobs:
- if not quiet and format != "json":
- filters_msg = f" matching filters: {', '.join([f'{k}{o}{v}' for k, o, v in filters])}" if filters else ""
- print(f"No jobs found{filters_msg}")
- elif format == "json":
- print("[]")
- return
- headers = ["JOB ID", "IMAGE/SPACE", "COMMAND", "CREATED", "STATUS"]
- aliases = ["id", "image", "command", "created", "status"]
- items = [api_object_to_dict(job) for job in filtered_jobs]
- def row_fn(item: dict[str, Any]) -> list[str]:
- status = item.get("status", {})
- cmd = item.get("command") or []
- command_str = " ".join(cmd) if cmd else "N/A"
- return [
- str(item.get("id", "")),
- _format_cell(item.get("docker_image") or "N/A"),
- _format_cell(command_str),
- item["created_at"][:19].replace("T", " ") if item.get("created_at") else "N/A",
- str(status.get("stage", "UNKNOWN")),
- ]
- # Custom template format
- if format and format not in ("table", "json"):
- _print_output([row_fn(item) for item in items], headers, aliases, format) # type: ignore
- else:
- output_format = OutputFormat.json if format == "json" else OutputFormat.table
- print_list_output(
- items=items,
- format=output_format,
- quiet=quiet,
- id_key="id",
- headers=headers,
- row_fn=row_fn,
- )
- @jobs_cli.command("hardware", examples=["hf jobs hardware"])
- def jobs_hardware() -> None:
- """List available hardware options for Jobs"""
- api = get_hf_api()
- hardware_list = api.list_jobs_hardware()
- table_headers = ["NAME", "PRETTY NAME", "CPU", "RAM", "ACCELERATOR", "COST/MIN", "COST/HOUR"]
- headers_aliases = ["name", "prettyName", "cpu", "ram", "accelerator", "costMin", "costHour"]
- rows: list[list[str | int]] = []
- for hw in hardware_list:
- accelerator_info = "N/A"
- if hw.accelerator:
- accelerator_info = f"{hw.accelerator.quantity}x {hw.accelerator.model} ({hw.accelerator.vram})"
- cost_min = f"${hw.unit_cost_usd:.4f}" if hw.unit_cost_usd is not None else "N/A"
- cost_hour = f"${hw.unit_cost_usd * 60:.2f}" if hw.unit_cost_usd is not None else "N/A"
- rows.append([hw.name, hw.pretty_name or "N/A", hw.cpu, hw.ram, accelerator_info, cost_min, cost_hour])
- if not rows:
- print("No hardware options found")
- return
- _print_output(rows, table_headers, headers_aliases, None)
- @jobs_cli.command("inspect", examples=["hf jobs inspect <job_id>"])
- def jobs_inspect(
- job_ids: Annotated[
- list[str],
- typer.Argument(
- help="Job IDs to inspect (or 'namespace/job_id')",
- ),
- ],
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- ) -> None:
- """Display detailed information on one or more Jobs"""
- parsed_ids = []
- for job_id in job_ids:
- job_id, namespace = _parse_namespace_from_job_id(job_id, namespace)
- parsed_ids.append(job_id)
- job_ids = parsed_ids
- api = get_hf_api(token=token)
- try:
- jobs = [api.inspect_job(job_id=job_id, namespace=namespace) for job_id in job_ids]
- print(json.dumps([asdict(job) for job in jobs], indent=4, default=str))
- except HfHubHTTPError as e:
- status = e.response.status_code if e.response is not None else None
- if status == 404:
- raise CLIError("Job not found. Please check the job ID.") from e
- elif status == 403:
- raise CLIError("Access denied. You may not have permission to view this job.") from e
- else:
- raise CLIError(f"Failed to inspect job: {e}") from e
- @jobs_cli.command("cancel", examples=["hf jobs cancel <job_id>"])
- def jobs_cancel(
- job_id: JobIdArg,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- ) -> None:
- """Cancel a Job"""
- job_id, namespace = _parse_namespace_from_job_id(job_id, namespace)
- api = get_hf_api(token=token)
- try:
- api.cancel_job(job_id=job_id, namespace=namespace)
- except HfHubHTTPError as e:
- status = e.response.status_code if e.response is not None else None
- if status == 404:
- raise CLIError("Job not found. Please check the job ID.") from e
- elif status == 403:
- raise CLIError("Access denied. You may not have permission to cancel this job.") from e
- else:
- raise CLIError(f"Failed to cancel job: {e}") from e
- uv_app = typer_factory(help="Run UV scripts (Python with inline dependencies) on HF infrastructure.")
- jobs_cli.add_typer(uv_app, name="uv")
- @uv_app.command(
- "run",
- context_settings={"ignore_unknown_options": True},
- examples=[
- "hf jobs uv run my_script.py",
- "hf jobs uv run ml_training.py --flavor a10g-small",
- "hf jobs uv run --with transformers train.py",
- "hf jobs uv run -v hf://gpt2:/data -v hf://buckets/org/b:/mnt script.py",
- ],
- )
- def jobs_uv_run(
- script: ScriptArg,
- script_args: ScriptArgsArg = None,
- image: ImageOpt = None,
- flavor: FlavorOpt = None,
- env: EnvOpt = None,
- secrets: SecretsOpt = None,
- label: LabelsOpt = None,
- volume: VolumesOpt = None,
- env_file: EnvFileOpt = None,
- secrets_file: SecretsFileOpt = None,
- timeout: TimeoutOpt = None,
- detach: DetachOpt = False,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- with_: WithOpt = None,
- python: PythonOpt = None,
- ) -> None:
- """Run a UV script (local file or URL) on HF infrastructure"""
- env_map = parse_env_map(env, env_file)
- secrets_map = parse_env_map(secrets, secrets_file)
- api = get_hf_api(token=token)
- job = api.run_uv_job(
- script=script,
- script_args=script_args or [],
- dependencies=with_,
- python=python,
- image=image,
- env=env_map,
- secrets=secrets_map,
- labels=_parse_labels_map(label),
- volumes=parse_volumes(volume),
- flavor=flavor, # type: ignore[arg-type,misc]
- timeout=timeout,
- namespace=namespace,
- )
- # Always print the job ID to the user
- print(f"Job started with ID: {job.id}")
- print(f"View at: {job.url}")
- if detach:
- return
- # Now let's stream the logs
- for log in api.fetch_job_logs(job_id=job.id, namespace=job.owner.name, follow=True):
- print(log)
- scheduled_app = typer_factory(help="Create and manage scheduled Jobs on the Hub.")
- jobs_cli.add_typer(scheduled_app, name="scheduled")
- @scheduled_app.command(
- "run",
- context_settings={"ignore_unknown_options": True},
- examples=['hf jobs scheduled run "0 0 * * *" python:3.12 python script.py'],
- )
- def scheduled_run(
- schedule: ScheduleArg,
- image: ImageArg,
- command: CommandArg,
- suspend: SuspendOpt = None,
- concurrency: ConcurrencyOpt = None,
- env: EnvOpt = None,
- secrets: SecretsOpt = None,
- label: LabelsOpt = None,
- volume: VolumesOpt = None,
- env_file: EnvFileOpt = None,
- secrets_file: SecretsFileOpt = None,
- flavor: FlavorOpt = None,
- timeout: TimeoutOpt = None,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- ) -> None:
- """Schedule a Job."""
- env_map = parse_env_map(env, env_file)
- secrets_map = parse_env_map(secrets, secrets_file)
- api = get_hf_api(token=token)
- scheduled_job = api.create_scheduled_job(
- image=image,
- command=command,
- schedule=schedule,
- suspend=suspend,
- concurrency=concurrency,
- env=env_map,
- secrets=secrets_map,
- labels=_parse_labels_map(label),
- volumes=parse_volumes(volume),
- flavor=flavor,
- timeout=timeout,
- namespace=namespace,
- )
- print(f"Scheduled Job created with ID: {scheduled_job.id}")
- @scheduled_app.command("ps", examples=["hf jobs scheduled ps"])
- def scheduled_ps(
- all: Annotated[
- bool,
- typer.Option(
- "-a",
- "--all",
- help="Show all scheduled Jobs (default hides suspended)",
- ),
- ] = False,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- filter: Annotated[
- list[str] | None,
- typer.Option(
- "-f",
- "--filter",
- help="Filter output based on conditions provided (format: key=value)",
- ),
- ] = None,
- format: Annotated[
- str | None,
- typer.Option(help="Output format: 'table' (default), 'json', or a Go template (e.g. '{{.id}}')"),
- ] = None,
- quiet: QuietOpt = False,
- ) -> None:
- """List scheduled Jobs"""
- api = get_hf_api(token=token)
- scheduled_jobs = api.list_scheduled_jobs(namespace=namespace)
- filters: list[tuple[str, str, str]] = []
- for f in filter or []:
- if "=" in f:
- key, value = f.split("=", 1)
- # Negate predicate in case of key!=value
- if key.endswith("!"):
- op = "!="
- key = key[:-1]
- else:
- op = "="
- filters.append((key.lower(), op, value.lower()))
- else:
- print(f"Warning: Ignoring invalid filter format '{f}'. Use key=value format.")
- # Filter scheduled jobs (operating on ScheduledJobInfo objects to preserve existing filter behavior)
- filtered_jobs = []
- for scheduled_job in scheduled_jobs:
- suspend = scheduled_job.suspend or False
- if not all and suspend:
- continue
- image_or_space = scheduled_job.job_spec.docker_image or "N/A"
- cmd = scheduled_job.job_spec.command or []
- command_str = " ".join(cmd) if cmd else "N/A"
- props = {"id": scheduled_job.id, "image": image_or_space, "suspend": str(suspend), "command": command_str}
- if not _matches_filters(props, filters):
- continue
- filtered_jobs.append(scheduled_job)
- if not filtered_jobs:
- if not quiet and format != "json":
- filters_msg = f" matching filters: {', '.join([f'{k}{o}{v}' for k, o, v in filters])}" if filters else ""
- print(f"No scheduled jobs found{filters_msg}")
- elif format == "json":
- print("[]")
- return
- headers = ["ID", "SCHEDULE", "IMAGE/SPACE", "COMMAND", "LAST RUN", "NEXT RUN", "SUSPEND"]
- aliases = ["id", "schedule", "image", "command", "last", "next", "suspend"]
- items = [api_object_to_dict(sj) for sj in filtered_jobs]
- def row_fn(item: dict[str, Any]) -> list[str]:
- job_spec = item.get("job_spec", {})
- status = item.get("status", {})
- last_job = status.get("last_job")
- cmd = job_spec.get("command") or []
- last_job_at = "N/A"
- if last_job and last_job.get("at"):
- last_job_at = last_job["at"][:19].replace("T", " ")
- next_run = "N/A"
- if status.get("next_job_run_at"):
- next_run = status["next_job_run_at"][:19].replace("T", " ")
- command_str = " ".join(cmd) if cmd else "N/A"
- return [
- str(item.get("id", "")),
- str(item.get("schedule") or "N/A"),
- _format_cell(job_spec.get("docker_image") or "N/A"),
- _format_cell(command_str),
- last_job_at,
- next_run,
- str(item.get("suspend", False)),
- ]
- # Custom template format (e.g. --format '{{.id}} {{.schedule}}')
- if format and format not in ("table", "json"):
- _print_output([row_fn(item) for item in items], headers, aliases, format) # type: ignore
- else:
- output_format = OutputFormat.json if format == "json" else OutputFormat.table
- print_list_output(
- items=items,
- format=output_format,
- quiet=quiet,
- id_key="id",
- headers=headers,
- row_fn=row_fn,
- )
- @scheduled_app.command("inspect", examples=["hf jobs scheduled inspect <id>"])
- def scheduled_inspect(
- scheduled_job_ids: Annotated[
- list[str],
- typer.Argument(
- help="Scheduled Job IDs to inspect (or 'namespace/scheduled_job_id')",
- ),
- ],
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- ) -> None:
- """Display detailed information on one or more scheduled Jobs"""
- parsed_ids = []
- for job_id in scheduled_job_ids:
- job_id, namespace = _parse_namespace_from_job_id(job_id, namespace)
- parsed_ids.append(job_id)
- scheduled_job_ids = parsed_ids
- api = get_hf_api(token=token)
- scheduled_jobs = [
- api.inspect_scheduled_job(scheduled_job_id=scheduled_job_id, namespace=namespace)
- for scheduled_job_id in scheduled_job_ids
- ]
- print(json.dumps([asdict(scheduled_job) for scheduled_job in scheduled_jobs], indent=4, default=str))
- @scheduled_app.command("delete", examples=["hf jobs scheduled delete <id>"])
- def scheduled_delete(
- scheduled_job_id: ScheduledJobIdArg,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- ) -> None:
- """Delete a scheduled Job."""
- scheduled_job_id, namespace = _parse_namespace_from_job_id(scheduled_job_id, namespace)
- api = get_hf_api(token=token)
- api.delete_scheduled_job(scheduled_job_id=scheduled_job_id, namespace=namespace)
- @scheduled_app.command("suspend", examples=["hf jobs scheduled suspend <id>"])
- def scheduled_suspend(
- scheduled_job_id: ScheduledJobIdArg,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- ) -> None:
- """Suspend (pause) a scheduled Job."""
- scheduled_job_id, namespace = _parse_namespace_from_job_id(scheduled_job_id, namespace)
- api = get_hf_api(token=token)
- api.suspend_scheduled_job(scheduled_job_id=scheduled_job_id, namespace=namespace)
- @scheduled_app.command("resume", examples=["hf jobs scheduled resume <id>"])
- def scheduled_resume(
- scheduled_job_id: ScheduledJobIdArg,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- ) -> None:
- """Resume (unpause) a scheduled Job."""
- scheduled_job_id, namespace = _parse_namespace_from_job_id(scheduled_job_id, namespace)
- api = get_hf_api(token=token)
- api.resume_scheduled_job(scheduled_job_id=scheduled_job_id, namespace=namespace)
- scheduled_uv_app = typer_factory(help="Schedule UV scripts on HF infrastructure.")
- scheduled_app.add_typer(scheduled_uv_app, name="uv")
- @scheduled_uv_app.command(
- "run",
- context_settings={"ignore_unknown_options": True},
- examples=[
- 'hf jobs scheduled uv run "0 0 * * *" script.py',
- 'hf jobs scheduled uv run "0 0 * * *" script.py --with pandas',
- ],
- )
- def scheduled_uv_run(
- schedule: ScheduleArg,
- script: ScriptArg,
- script_args: ScriptArgsArg = None,
- suspend: SuspendOpt = None,
- concurrency: ConcurrencyOpt = None,
- image: ImageOpt = None,
- flavor: FlavorOpt = None,
- env: EnvOpt = None,
- secrets: SecretsOpt = None,
- label: LabelsOpt = None,
- volume: VolumesOpt = None,
- env_file: EnvFileOpt = None,
- secrets_file: SecretsFileOpt = None,
- timeout: TimeoutOpt = None,
- namespace: NamespaceOpt = None,
- token: TokenOpt = None,
- with_: WithOpt = None,
- python: PythonOpt = None,
- ) -> None:
- """Run a UV script (local file or URL) on HF infrastructure"""
- env_map = parse_env_map(env, env_file)
- secrets_map = parse_env_map(secrets, secrets_file)
- api = get_hf_api(token=token)
- job = api.create_scheduled_uv_job(
- script=script,
- script_args=script_args or [],
- schedule=schedule,
- suspend=suspend,
- concurrency=concurrency,
- dependencies=with_,
- python=python,
- image=image,
- env=env_map,
- secrets=secrets_map,
- labels=_parse_labels_map(label),
- volumes=parse_volumes(volume),
- flavor=flavor, # type: ignore[arg-type,misc]
- timeout=timeout,
- namespace=namespace,
- )
- print(f"Scheduled Job created with ID: {job.id}")
- ### UTILS
- def _parse_labels_map(labels: list[str] | None) -> dict[str, str] | None:
- """Parse label key-value pairs from CLI arguments.
- Args:
- labels: List of label strings in KEY=VALUE format. If KEY only, then VALUE is set to empty string.
- Returns:
- Dictionary mapping label keys to values, or None if no labels provided.
- """
- if not labels:
- return None
- labels_map: dict[str, str] = {}
- for label_var in labels:
- key, value = label_var.split("=", 1) if "=" in label_var else (label_var, "")
- labels_map[key] = value
- return labels_map
- def _tabulate(rows: list[list[str | int]], headers: list[str]) -> str:
- """
- Inspired by:
- - stackoverflow.com/a/8356620/593036
- - stackoverflow.com/questions/9535954/printing-lists-as-tabular-data
- """
- col_widths = [max(len(str(x)) for x in col) for col in zip(*rows, headers)]
- terminal_width = max(shutil.get_terminal_size().columns, len(headers) * 12)
- while len(headers) + sum(col_widths) > terminal_width:
- col_to_minimize = col_widths.index(max(col_widths))
- col_widths[col_to_minimize] //= 2
- if len(headers) + sum(col_widths) <= terminal_width:
- col_widths[col_to_minimize] = terminal_width - sum(col_widths) - len(headers) + col_widths[col_to_minimize]
- row_format = ("{{:{}}} " * len(headers)).format(*col_widths)
- lines = []
- lines.append(row_format.format(*headers))
- lines.append(row_format.format(*["-" * w for w in col_widths]))
- for row in rows:
- row_format_args = [
- str(x)[: col_width - 3] + "..." if len(str(x)) > col_width else str(x)
- for x, col_width in zip(row, col_widths)
- ]
- lines.append(row_format.format(*row_format_args))
- return "\n".join(lines)
- T = TypeVar("T")
- def _write_generator_to_queue(queue: Queue[T], func: Callable[..., Iterable[T]], kwargs: dict) -> None:
- for result in func(**kwargs):
- queue.put(result)
- def iflatmap_unordered(
- pool: multiprocessing.pool.ThreadPool,
- func: Callable[..., Iterable[T]],
- *,
- kwargs_list: list[dict],
- ) -> Iterable[T]:
- """
- Takes a function that returns an iterable of items, and run it in parallel using threads to return the flattened iterable of items as they arrive.
- This is inspired by those three `map()` variants, and is the mix of all three:
- * `imap()`: like `map()` but returns an iterable instead of a list of results
- * `imap_unordered()`: like `imap()` but the output is sorted by time of arrival
- * `flatmap()`: like `map()` but given a function which returns a list, `flatmap()` returns the flattened list that is the concatenation of all the output lists
- """
- queue: Queue[T] = Queue()
- async_results = [pool.apply_async(_write_generator_to_queue, (queue, func, kwargs)) for kwargs in kwargs_list]
- try:
- while True:
- try:
- yield queue.get(timeout=0.05)
- except Empty:
- if all(async_result.ready() for async_result in async_results) and queue.empty():
- break
- except KeyboardInterrupt:
- pass
- finally:
- # we get the result in case there's an error to raise
- try:
- [async_result.get(timeout=0.05) for async_result in async_results]
- except multiprocessing.TimeoutError:
- pass
|