jobs.py 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150
  1. # Copyright 2025 The HuggingFace Team. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Contains commands to interact with jobs on the Hugging Face Hub.
  15. Usage:
  16. # run a job
  17. hf jobs run <image> <command>
  18. # List running or completed jobs
  19. hf jobs ps [-a] [-f key=value] [--format table|json|TEMPLATE] [-q]
  20. # Print logs from a job (non-blocking)
  21. hf jobs logs <job-id>
  22. # Stream logs from a job (blocking, like `docker logs -f`)
  23. hf jobs logs -f <job-id>
  24. # Stream resources usage stats and metrics from a job
  25. hf jobs stats <job-id>
  26. # Inspect detailed information about a job
  27. hf jobs inspect <job-id>
  28. # Cancel a running job
  29. hf jobs cancel <job-id>
  30. # List available hardware options
  31. hf jobs hardware
  32. # Run a UV script
  33. hf jobs uv run <script>
  34. # Schedule a job
  35. hf jobs scheduled run <schedule> <image> <command>
  36. # List scheduled jobs
  37. hf jobs scheduled ps [-a] [-f key=value] [--format table|json] [-q]
  38. # Inspect a scheduled job
  39. hf jobs scheduled inspect <scheduled_job_id>
  40. # Suspend a scheduled job
  41. hf jobs scheduled suspend <scheduled_job_id>
  42. # Resume a scheduled job
  43. hf jobs scheduled resume <scheduled_job_id>
  44. # Delete a scheduled job
  45. hf jobs scheduled delete <scheduled_job_id>
  46. """
  47. import json
  48. import multiprocessing
  49. import multiprocessing.pool
  50. import shutil
  51. import time
  52. from collections import deque
  53. from collections.abc import Callable, Iterable
  54. from dataclasses import asdict
  55. from fnmatch import fnmatch
  56. from queue import Empty, Queue
  57. from typing import Annotated, Any, TypeVar
  58. import typer
  59. from huggingface_hub import SpaceHardware
  60. from huggingface_hub.errors import CLIError, HfHubHTTPError
  61. from huggingface_hub.utils import logging
  62. from huggingface_hub.utils._cache_manager import _format_size
  63. from ._cli_utils import (
  64. EnvFileOpt,
  65. EnvOpt,
  66. OutputFormat,
  67. QuietOpt,
  68. SecretsFileOpt,
  69. SecretsOpt,
  70. TokenOpt,
  71. VolumesOpt,
  72. _format_cell,
  73. api_object_to_dict,
  74. get_hf_api,
  75. parse_env_map,
  76. parse_volumes,
  77. print_list_output,
  78. typer_factory,
  79. )
  80. logger = logging.get_logger(__name__)
  81. def _parse_namespace_from_job_id(job_id: str, namespace: str | None) -> tuple[str, str | None]:
  82. """Extract namespace from job_id if provided in 'namespace/job_id' format.
  83. Allows users to pass job IDs copied from the Hub UI (e.g. 'username/job_id')
  84. instead of only bare job IDs. If the namespace is also provided explicitly via
  85. --namespace and conflicts, a CLIError is raised.
  86. """
  87. if not job_id:
  88. raise CLIError("Job ID cannot be empty.")
  89. if job_id.count("/") > 1:
  90. raise CLIError(f"Job ID must be in the form 'job_id' or 'namespace/job_id': '{job_id}'.")
  91. if "/" not in job_id:
  92. return job_id, namespace
  93. extracted_namespace, parsed_job_id = job_id.split("/", 1)
  94. if not extracted_namespace or not parsed_job_id:
  95. raise CLIError(f"Job ID must be in the form 'job_id' or 'namespace/job_id': '{job_id}'.")
  96. if namespace is not None and namespace != extracted_namespace:
  97. raise CLIError(
  98. f"Conflicting namespace: got --namespace='{namespace}' but job ID implies namespace='{extracted_namespace}'"
  99. )
  100. return parsed_job_id, extracted_namespace
  101. SUGGESTED_FLAVORS = [item.value for item in SpaceHardware if item.value != "zero-a10g"]
  102. STATS_UPDATE_MIN_INTERVAL = 0.1 # we set a limit here since there is one update per second per job
  103. # Common job-related options
  104. ImageArg = Annotated[
  105. str,
  106. typer.Argument(
  107. help="The Docker image to use.",
  108. ),
  109. ]
  110. ImageOpt = Annotated[
  111. str | None,
  112. typer.Option(
  113. help="Use a custom Docker image with `uv` installed.",
  114. ),
  115. ]
  116. FlavorOpt = Annotated[
  117. SpaceHardware | None,
  118. typer.Option(
  119. help="Flavor for the hardware, as in HF Spaces. Run 'hf jobs hardware' to list available flavors. Defaults to `cpu-basic`.",
  120. ),
  121. ]
  122. LabelsOpt = Annotated[
  123. list[str] | None,
  124. typer.Option(
  125. "-l",
  126. "--label",
  127. help="Set labels. E.g. --label KEY=VALUE or --label LABEL",
  128. ),
  129. ]
  130. TimeoutOpt = Annotated[
  131. str | None,
  132. typer.Option(
  133. help="Max duration: int/float with s (seconds, default), m (minutes), h (hours) or d (days).",
  134. ),
  135. ]
  136. DetachOpt = Annotated[
  137. bool,
  138. typer.Option(
  139. "-d",
  140. "--detach",
  141. help="Run the Job in the background and print the Job ID.",
  142. ),
  143. ]
  144. NamespaceOpt = Annotated[
  145. str | None,
  146. typer.Option(
  147. help="The namespace where the job will be running. Defaults to the current user's namespace.",
  148. ),
  149. ]
  150. WithOpt = Annotated[
  151. list[str] | None,
  152. typer.Option(
  153. "--with",
  154. help="Run with the given packages installed",
  155. ),
  156. ]
  157. PythonOpt = Annotated[
  158. str | None,
  159. typer.Option(
  160. "-p",
  161. "--python",
  162. help="The Python interpreter to use for the run environment",
  163. ),
  164. ]
  165. SuspendOpt = Annotated[
  166. bool | None,
  167. typer.Option(
  168. help="Suspend (pause) the scheduled Job",
  169. ),
  170. ]
  171. ConcurrencyOpt = Annotated[
  172. bool | None,
  173. typer.Option(
  174. help="Allow multiple instances of this Job to run concurrently",
  175. ),
  176. ]
  177. ScheduleArg = Annotated[
  178. str,
  179. typer.Argument(
  180. help="One of annually, yearly, monthly, weekly, daily, hourly, or a CRON schedule expression.",
  181. ),
  182. ]
  183. ScriptArg = Annotated[
  184. str,
  185. typer.Argument(
  186. help="UV script to run (local file or URL)",
  187. ),
  188. ]
  189. ScriptArgsArg = Annotated[
  190. list[str] | None,
  191. typer.Argument(
  192. help="Arguments for the script",
  193. ),
  194. ]
  195. CommandArg = Annotated[
  196. list[str],
  197. typer.Argument(
  198. help="The command to run.",
  199. ),
  200. ]
  201. JobIdArg = Annotated[
  202. str,
  203. typer.Argument(
  204. help="Job ID (or 'namespace/job_id')",
  205. ),
  206. ]
  207. JobIdsArg = Annotated[
  208. list[str] | None,
  209. typer.Argument(
  210. help="Job IDs (or 'namespace/job_id')",
  211. ),
  212. ]
  213. ScheduledJobIdArg = Annotated[
  214. str,
  215. typer.Argument(
  216. help="Scheduled Job ID (or 'namespace/scheduled_job_id')",
  217. ),
  218. ]
  219. jobs_cli = typer_factory(help="Run and manage Jobs on the Hub.")
  220. @jobs_cli.command(
  221. "run",
  222. context_settings={"ignore_unknown_options": True},
  223. examples=[
  224. "hf jobs run python:3.12 python -c 'print(\"Hello!\")'",
  225. "hf jobs run -e FOO=foo python:3.12 python script.py",
  226. "hf jobs run --secrets HF_TOKEN python:3.12 python script.py",
  227. "hf jobs run -v hf://gpt2:/data -v hf://buckets/org/b:/mnt python:3.12 python script.py",
  228. ],
  229. )
  230. def jobs_run(
  231. image: ImageArg,
  232. command: CommandArg,
  233. env: EnvOpt = None,
  234. secrets: SecretsOpt = None,
  235. label: LabelsOpt = None,
  236. volume: VolumesOpt = None,
  237. env_file: EnvFileOpt = None,
  238. secrets_file: SecretsFileOpt = None,
  239. flavor: FlavorOpt = None,
  240. timeout: TimeoutOpt = None,
  241. detach: DetachOpt = False,
  242. namespace: NamespaceOpt = None,
  243. token: TokenOpt = None,
  244. ) -> None:
  245. """Run a Job."""
  246. env_map = parse_env_map(env, env_file)
  247. secrets_map = parse_env_map(secrets, secrets_file)
  248. api = get_hf_api(token=token)
  249. job = api.run_job(
  250. image=image,
  251. command=command,
  252. env=env_map,
  253. secrets=secrets_map,
  254. labels=_parse_labels_map(label),
  255. volumes=parse_volumes(volume),
  256. flavor=flavor,
  257. timeout=timeout,
  258. namespace=namespace,
  259. )
  260. # Always print the job ID to the user
  261. print(f"Job started with ID: {job.id}")
  262. print(f"View at: {job.url}")
  263. if detach:
  264. return
  265. # Now let's stream the logs
  266. for log in api.fetch_job_logs(job_id=job.id, namespace=job.owner.name, follow=True):
  267. print(log)
  268. @jobs_cli.command(
  269. "logs", examples=["hf jobs logs <job_id>", "hf jobs logs -f <job_id>", "hf jobs logs --tail 20 <job_id>"]
  270. )
  271. def jobs_logs(
  272. job_id: JobIdArg,
  273. follow: Annotated[
  274. bool,
  275. typer.Option(
  276. "-f",
  277. "--follow",
  278. help="Follow log output (stream until the job completes). Without this flag, only currently available logs are printed.",
  279. ),
  280. ] = False,
  281. tail: Annotated[
  282. int | None,
  283. typer.Option(
  284. "-n",
  285. "--tail",
  286. help="Number of lines to show from the end of the logs.",
  287. ),
  288. ] = None,
  289. namespace: NamespaceOpt = None,
  290. token: TokenOpt = None,
  291. ) -> None:
  292. """Fetch the logs of a Job.
  293. By default, prints currently available logs and exits (non-blocking).
  294. Use --follow/-f to stream logs in real-time until the job completes.
  295. """
  296. job_id, namespace = _parse_namespace_from_job_id(job_id, namespace)
  297. if follow and tail is not None:
  298. raise CLIError(
  299. "Cannot use --follow and --tail together. Use --follow to stream logs or --tail to show recent logs."
  300. )
  301. api = get_hf_api(token=token)
  302. try:
  303. logs = api.fetch_job_logs(job_id=job_id, namespace=namespace, follow=follow)
  304. if tail is not None:
  305. logs = deque(logs, maxlen=tail)
  306. for log in logs:
  307. print(log)
  308. except HfHubHTTPError as e:
  309. status = e.response.status_code if e.response is not None else None
  310. if status == 404:
  311. raise CLIError("Job not found. Please check the job ID.") from e
  312. elif status == 403:
  313. raise CLIError("Access denied. You may not have permission to view this job.") from e
  314. else:
  315. raise CLIError(f"Failed to fetch job logs: {e}") from e
  316. def _matches_filters(job_properties: dict[str, str], filters: list[tuple[str, str, str]]) -> bool:
  317. """Check if scheduled job matches all specified filters."""
  318. for key, op_str, pattern in filters:
  319. value = job_properties.get(key)
  320. if value is None:
  321. if op_str == "!=":
  322. continue
  323. return False
  324. match = fnmatch(value.lower(), pattern.lower())
  325. if (op_str == "=" and not match) or (op_str == "!=" and match):
  326. return False
  327. return True
  328. def _print_output(rows: list[list[str | int]], headers: list[str], aliases: list[str], fmt: str | None) -> None:
  329. """Print output according to the chosen format."""
  330. if fmt:
  331. # Use custom template if provided
  332. template = fmt
  333. for row in rows:
  334. line = template
  335. for i, field in enumerate(aliases):
  336. placeholder = f"{{{{.{field}}}}}"
  337. if placeholder in line:
  338. line = line.replace(placeholder, str(row[i]))
  339. print(line)
  340. else:
  341. # Default tabular format
  342. print(_tabulate(rows, headers=headers))
  343. def _clear_line(n: int) -> None:
  344. LINE_UP = "\033[1A"
  345. LINE_CLEAR = "\x1b[2K"
  346. for i in range(n):
  347. print(LINE_UP, end=LINE_CLEAR)
  348. def _get_jobs_stats_rows(
  349. job_id: str, metrics_stream: Iterable[dict[str, Any]], table_headers: list[str]
  350. ) -> Iterable[tuple[bool, str, list[list[str | int]]]]:
  351. for metrics in metrics_stream:
  352. row = [
  353. job_id,
  354. f"{metrics['cpu_usage_pct']}%",
  355. round(metrics["cpu_millicores"] / 1000.0, 1),
  356. f"{round(100 * metrics['memory_used_bytes'] / metrics['memory_total_bytes'], 2)}%",
  357. f"{_format_size(metrics['memory_used_bytes'])}B / {_format_size(metrics['memory_total_bytes'])}B",
  358. f"{_format_size(metrics['rx_bps'])}bps / {_format_size(metrics['tx_bps'])}bps",
  359. ]
  360. if metrics["gpus"] and isinstance(metrics["gpus"], dict):
  361. rows = [row] + [[""] * len(row)] * (len(metrics["gpus"]) - 1)
  362. for row, gpu_id in zip(rows, sorted(metrics["gpus"])):
  363. gpu = metrics["gpus"][gpu_id]
  364. row += [
  365. f"{gpu['utilization']}%",
  366. f"{round(100 * gpu['memory_used_bytes'] / gpu['memory_total_bytes'], 2)}%",
  367. f"{_format_size(gpu['memory_used_bytes'])}B / {_format_size(gpu['memory_total_bytes'])}B",
  368. ]
  369. else:
  370. row += ["N/A"] * (len(table_headers) - len(row))
  371. rows = [row]
  372. yield False, job_id, rows
  373. yield True, job_id, []
  374. @jobs_cli.command("stats", examples=["hf jobs stats <job_id>"])
  375. def jobs_stats(
  376. job_ids: JobIdsArg = None,
  377. namespace: NamespaceOpt = None,
  378. token: TokenOpt = None,
  379. ) -> None:
  380. """Fetch the resource usage statistics and metrics of Jobs"""
  381. if job_ids is not None:
  382. parsed_ids = []
  383. for job_id in job_ids:
  384. job_id, namespace = _parse_namespace_from_job_id(job_id, namespace)
  385. parsed_ids.append(job_id)
  386. job_ids = parsed_ids
  387. api = get_hf_api(token=token)
  388. if namespace is None:
  389. namespace = api.whoami()["name"]
  390. if job_ids is None:
  391. job_ids = [
  392. job.id
  393. for job in api.list_jobs(namespace=namespace)
  394. if (job.status.stage if job.status else "UNKNOWN") in ("RUNNING", "UPDATING")
  395. ]
  396. if len(job_ids) == 0:
  397. print("No running jobs found")
  398. return
  399. table_headers = [
  400. "JOB ID",
  401. "CPU %",
  402. "NUM CPU",
  403. "MEM %",
  404. "MEM USAGE",
  405. "NET I/O",
  406. "GPU UTIL %",
  407. "GPU MEM %",
  408. "GPU MEM USAGE",
  409. ]
  410. headers_aliases = [
  411. "id",
  412. "cpu_usage_pct",
  413. "cpu_millicores",
  414. "memory_used_bytes_pct",
  415. "memory_used_bytes_and_total_bytes",
  416. "rx_bps_and_tx_bps",
  417. "gpu_utilization",
  418. "gpu_memory_used_bytes_pct",
  419. "gpu_memory_used_bytes_and_total_bytes",
  420. ]
  421. try:
  422. with multiprocessing.pool.ThreadPool(len(job_ids)) as pool:
  423. rows_per_job_id: dict[str, list[list[str | int]]] = {}
  424. for job_id in job_ids:
  425. row: list[str | int] = [job_id]
  426. row += ["-- / --" if ("/" in header or "USAGE" in header) else "--" for header in table_headers[1:]]
  427. rows_per_job_id[job_id] = [row]
  428. last_update_time = time.time()
  429. total_rows = [row for job_id in rows_per_job_id for row in rows_per_job_id[job_id]]
  430. _print_output(total_rows, table_headers, headers_aliases, None)
  431. kwargs_list = [
  432. {
  433. "job_id": job_id,
  434. "metrics_stream": api.fetch_job_metrics(job_id=job_id, namespace=namespace),
  435. "table_headers": table_headers,
  436. }
  437. for job_id in job_ids
  438. ]
  439. for done, job_id, rows in iflatmap_unordered(pool, _get_jobs_stats_rows, kwargs_list=kwargs_list):
  440. if done:
  441. rows_per_job_id.pop(job_id, None)
  442. else:
  443. rows_per_job_id[job_id] = rows
  444. now = time.time()
  445. if now - last_update_time >= STATS_UPDATE_MIN_INTERVAL:
  446. _clear_line(2 + len(total_rows))
  447. total_rows = [row for job_id in rows_per_job_id for row in rows_per_job_id[job_id]]
  448. _print_output(total_rows, table_headers, headers_aliases, None)
  449. last_update_time = now
  450. except HfHubHTTPError as e:
  451. status = e.response.status_code if e.response is not None else None
  452. if status == 404:
  453. raise CLIError("Job not found. Please check the job ID.") from e
  454. elif status == 403:
  455. raise CLIError("Access denied. You may not have permission to view this job.") from e
  456. else:
  457. raise CLIError(f"Failed to fetch job stats: {e}") from e
  458. @jobs_cli.command("ps", examples=["hf jobs ps", "hf jobs ps -a"])
  459. def jobs_ps(
  460. all: Annotated[
  461. bool,
  462. typer.Option(
  463. "-a",
  464. "--all",
  465. help="Show all Jobs (default shows just running)",
  466. ),
  467. ] = False,
  468. namespace: NamespaceOpt = None,
  469. token: TokenOpt = None,
  470. filter: Annotated[
  471. list[str] | None,
  472. typer.Option(
  473. "-f",
  474. "--filter",
  475. help="Filter output based on conditions provided (format: key=value)",
  476. ),
  477. ] = None,
  478. format: Annotated[
  479. str | None,
  480. typer.Option(help="Output format: 'table' (default), 'json', or a Go template (e.g. '{{.id}}')"),
  481. ] = None,
  482. quiet: QuietOpt = False,
  483. ) -> None:
  484. """List Jobs."""
  485. api = get_hf_api(token=token)
  486. # Fetch jobs data
  487. jobs = api.list_jobs(namespace=namespace)
  488. filters: list[tuple[str, str, str]] = []
  489. labels_filters: list[tuple[str, str, str]] = []
  490. for f in filter or []:
  491. if f.startswith("label!=") or f.startswith("label="):
  492. if f.startswith("label!="):
  493. label_part = f[len("label!=") :]
  494. if "=" in label_part:
  495. print(
  496. f"Warning: Ignoring invalid label filter format 'label!={label_part}'. Use label!=key format."
  497. )
  498. continue
  499. label_key, op, label_value = label_part, "!=", "*"
  500. else:
  501. label_part = f[len("label=") :]
  502. if "=" in label_part:
  503. label_key, label_value = label_part.split("=", 1)
  504. else:
  505. label_key, label_value = label_part, "*"
  506. # Negate predicate in case of key!=value
  507. if label_key.endswith("!"):
  508. op = "!="
  509. label_key = label_key[:-1]
  510. else:
  511. op = "="
  512. labels_filters.append((label_key.lower(), op, label_value.lower()))
  513. elif "=" in f:
  514. key, value = f.split("=", 1)
  515. # Negate predicate in case of key!=value
  516. if key.endswith("!"):
  517. op = "!="
  518. key = key[:-1]
  519. else:
  520. op = "="
  521. filters.append((key.lower(), op, value.lower()))
  522. else:
  523. print(f"Warning: Ignoring invalid filter format '{f}'. Use key=value format.")
  524. # Filter jobs (operating on JobInfo objects to preserve existing filter behavior)
  525. filtered_jobs = []
  526. for job in jobs:
  527. status = job.status.stage if job.status else "UNKNOWN"
  528. if not all and status not in ("RUNNING", "UPDATING"):
  529. continue
  530. image_or_space = job.docker_image or "N/A"
  531. cmd = job.command or []
  532. command_str = " ".join(cmd) if cmd else "N/A"
  533. props = {"id": job.id, "image": image_or_space, "status": status.lower(), "command": command_str}
  534. if not _matches_filters(props, filters):
  535. continue
  536. if not _matches_filters(job.labels or {}, labels_filters):
  537. continue
  538. filtered_jobs.append(job)
  539. if not filtered_jobs:
  540. if not quiet and format != "json":
  541. filters_msg = f" matching filters: {', '.join([f'{k}{o}{v}' for k, o, v in filters])}" if filters else ""
  542. print(f"No jobs found{filters_msg}")
  543. elif format == "json":
  544. print("[]")
  545. return
  546. headers = ["JOB ID", "IMAGE/SPACE", "COMMAND", "CREATED", "STATUS"]
  547. aliases = ["id", "image", "command", "created", "status"]
  548. items = [api_object_to_dict(job) for job in filtered_jobs]
  549. def row_fn(item: dict[str, Any]) -> list[str]:
  550. status = item.get("status", {})
  551. cmd = item.get("command") or []
  552. command_str = " ".join(cmd) if cmd else "N/A"
  553. return [
  554. str(item.get("id", "")),
  555. _format_cell(item.get("docker_image") or "N/A"),
  556. _format_cell(command_str),
  557. item["created_at"][:19].replace("T", " ") if item.get("created_at") else "N/A",
  558. str(status.get("stage", "UNKNOWN")),
  559. ]
  560. # Custom template format
  561. if format and format not in ("table", "json"):
  562. _print_output([row_fn(item) for item in items], headers, aliases, format) # type: ignore
  563. else:
  564. output_format = OutputFormat.json if format == "json" else OutputFormat.table
  565. print_list_output(
  566. items=items,
  567. format=output_format,
  568. quiet=quiet,
  569. id_key="id",
  570. headers=headers,
  571. row_fn=row_fn,
  572. )
  573. @jobs_cli.command("hardware", examples=["hf jobs hardware"])
  574. def jobs_hardware() -> None:
  575. """List available hardware options for Jobs"""
  576. api = get_hf_api()
  577. hardware_list = api.list_jobs_hardware()
  578. table_headers = ["NAME", "PRETTY NAME", "CPU", "RAM", "ACCELERATOR", "COST/MIN", "COST/HOUR"]
  579. headers_aliases = ["name", "prettyName", "cpu", "ram", "accelerator", "costMin", "costHour"]
  580. rows: list[list[str | int]] = []
  581. for hw in hardware_list:
  582. accelerator_info = "N/A"
  583. if hw.accelerator:
  584. accelerator_info = f"{hw.accelerator.quantity}x {hw.accelerator.model} ({hw.accelerator.vram})"
  585. cost_min = f"${hw.unit_cost_usd:.4f}" if hw.unit_cost_usd is not None else "N/A"
  586. cost_hour = f"${hw.unit_cost_usd * 60:.2f}" if hw.unit_cost_usd is not None else "N/A"
  587. rows.append([hw.name, hw.pretty_name or "N/A", hw.cpu, hw.ram, accelerator_info, cost_min, cost_hour])
  588. if not rows:
  589. print("No hardware options found")
  590. return
  591. _print_output(rows, table_headers, headers_aliases, None)
  592. @jobs_cli.command("inspect", examples=["hf jobs inspect <job_id>"])
  593. def jobs_inspect(
  594. job_ids: Annotated[
  595. list[str],
  596. typer.Argument(
  597. help="Job IDs to inspect (or 'namespace/job_id')",
  598. ),
  599. ],
  600. namespace: NamespaceOpt = None,
  601. token: TokenOpt = None,
  602. ) -> None:
  603. """Display detailed information on one or more Jobs"""
  604. parsed_ids = []
  605. for job_id in job_ids:
  606. job_id, namespace = _parse_namespace_from_job_id(job_id, namespace)
  607. parsed_ids.append(job_id)
  608. job_ids = parsed_ids
  609. api = get_hf_api(token=token)
  610. try:
  611. jobs = [api.inspect_job(job_id=job_id, namespace=namespace) for job_id in job_ids]
  612. print(json.dumps([asdict(job) for job in jobs], indent=4, default=str))
  613. except HfHubHTTPError as e:
  614. status = e.response.status_code if e.response is not None else None
  615. if status == 404:
  616. raise CLIError("Job not found. Please check the job ID.") from e
  617. elif status == 403:
  618. raise CLIError("Access denied. You may not have permission to view this job.") from e
  619. else:
  620. raise CLIError(f"Failed to inspect job: {e}") from e
  621. @jobs_cli.command("cancel", examples=["hf jobs cancel <job_id>"])
  622. def jobs_cancel(
  623. job_id: JobIdArg,
  624. namespace: NamespaceOpt = None,
  625. token: TokenOpt = None,
  626. ) -> None:
  627. """Cancel a Job"""
  628. job_id, namespace = _parse_namespace_from_job_id(job_id, namespace)
  629. api = get_hf_api(token=token)
  630. try:
  631. api.cancel_job(job_id=job_id, namespace=namespace)
  632. except HfHubHTTPError as e:
  633. status = e.response.status_code if e.response is not None else None
  634. if status == 404:
  635. raise CLIError("Job not found. Please check the job ID.") from e
  636. elif status == 403:
  637. raise CLIError("Access denied. You may not have permission to cancel this job.") from e
  638. else:
  639. raise CLIError(f"Failed to cancel job: {e}") from e
  640. uv_app = typer_factory(help="Run UV scripts (Python with inline dependencies) on HF infrastructure.")
  641. jobs_cli.add_typer(uv_app, name="uv")
  642. @uv_app.command(
  643. "run",
  644. context_settings={"ignore_unknown_options": True},
  645. examples=[
  646. "hf jobs uv run my_script.py",
  647. "hf jobs uv run ml_training.py --flavor a10g-small",
  648. "hf jobs uv run --with transformers train.py",
  649. "hf jobs uv run -v hf://gpt2:/data -v hf://buckets/org/b:/mnt script.py",
  650. ],
  651. )
  652. def jobs_uv_run(
  653. script: ScriptArg,
  654. script_args: ScriptArgsArg = None,
  655. image: ImageOpt = None,
  656. flavor: FlavorOpt = None,
  657. env: EnvOpt = None,
  658. secrets: SecretsOpt = None,
  659. label: LabelsOpt = None,
  660. volume: VolumesOpt = None,
  661. env_file: EnvFileOpt = None,
  662. secrets_file: SecretsFileOpt = None,
  663. timeout: TimeoutOpt = None,
  664. detach: DetachOpt = False,
  665. namespace: NamespaceOpt = None,
  666. token: TokenOpt = None,
  667. with_: WithOpt = None,
  668. python: PythonOpt = None,
  669. ) -> None:
  670. """Run a UV script (local file or URL) on HF infrastructure"""
  671. env_map = parse_env_map(env, env_file)
  672. secrets_map = parse_env_map(secrets, secrets_file)
  673. api = get_hf_api(token=token)
  674. job = api.run_uv_job(
  675. script=script,
  676. script_args=script_args or [],
  677. dependencies=with_,
  678. python=python,
  679. image=image,
  680. env=env_map,
  681. secrets=secrets_map,
  682. labels=_parse_labels_map(label),
  683. volumes=parse_volumes(volume),
  684. flavor=flavor, # type: ignore[arg-type,misc]
  685. timeout=timeout,
  686. namespace=namespace,
  687. )
  688. # Always print the job ID to the user
  689. print(f"Job started with ID: {job.id}")
  690. print(f"View at: {job.url}")
  691. if detach:
  692. return
  693. # Now let's stream the logs
  694. for log in api.fetch_job_logs(job_id=job.id, namespace=job.owner.name, follow=True):
  695. print(log)
  696. scheduled_app = typer_factory(help="Create and manage scheduled Jobs on the Hub.")
  697. jobs_cli.add_typer(scheduled_app, name="scheduled")
  698. @scheduled_app.command(
  699. "run",
  700. context_settings={"ignore_unknown_options": True},
  701. examples=['hf jobs scheduled run "0 0 * * *" python:3.12 python script.py'],
  702. )
  703. def scheduled_run(
  704. schedule: ScheduleArg,
  705. image: ImageArg,
  706. command: CommandArg,
  707. suspend: SuspendOpt = None,
  708. concurrency: ConcurrencyOpt = None,
  709. env: EnvOpt = None,
  710. secrets: SecretsOpt = None,
  711. label: LabelsOpt = None,
  712. volume: VolumesOpt = None,
  713. env_file: EnvFileOpt = None,
  714. secrets_file: SecretsFileOpt = None,
  715. flavor: FlavorOpt = None,
  716. timeout: TimeoutOpt = None,
  717. namespace: NamespaceOpt = None,
  718. token: TokenOpt = None,
  719. ) -> None:
  720. """Schedule a Job."""
  721. env_map = parse_env_map(env, env_file)
  722. secrets_map = parse_env_map(secrets, secrets_file)
  723. api = get_hf_api(token=token)
  724. scheduled_job = api.create_scheduled_job(
  725. image=image,
  726. command=command,
  727. schedule=schedule,
  728. suspend=suspend,
  729. concurrency=concurrency,
  730. env=env_map,
  731. secrets=secrets_map,
  732. labels=_parse_labels_map(label),
  733. volumes=parse_volumes(volume),
  734. flavor=flavor,
  735. timeout=timeout,
  736. namespace=namespace,
  737. )
  738. print(f"Scheduled Job created with ID: {scheduled_job.id}")
  739. @scheduled_app.command("ps", examples=["hf jobs scheduled ps"])
  740. def scheduled_ps(
  741. all: Annotated[
  742. bool,
  743. typer.Option(
  744. "-a",
  745. "--all",
  746. help="Show all scheduled Jobs (default hides suspended)",
  747. ),
  748. ] = False,
  749. namespace: NamespaceOpt = None,
  750. token: TokenOpt = None,
  751. filter: Annotated[
  752. list[str] | None,
  753. typer.Option(
  754. "-f",
  755. "--filter",
  756. help="Filter output based on conditions provided (format: key=value)",
  757. ),
  758. ] = None,
  759. format: Annotated[
  760. str | None,
  761. typer.Option(help="Output format: 'table' (default), 'json', or a Go template (e.g. '{{.id}}')"),
  762. ] = None,
  763. quiet: QuietOpt = False,
  764. ) -> None:
  765. """List scheduled Jobs"""
  766. api = get_hf_api(token=token)
  767. scheduled_jobs = api.list_scheduled_jobs(namespace=namespace)
  768. filters: list[tuple[str, str, str]] = []
  769. for f in filter or []:
  770. if "=" in f:
  771. key, value = f.split("=", 1)
  772. # Negate predicate in case of key!=value
  773. if key.endswith("!"):
  774. op = "!="
  775. key = key[:-1]
  776. else:
  777. op = "="
  778. filters.append((key.lower(), op, value.lower()))
  779. else:
  780. print(f"Warning: Ignoring invalid filter format '{f}'. Use key=value format.")
  781. # Filter scheduled jobs (operating on ScheduledJobInfo objects to preserve existing filter behavior)
  782. filtered_jobs = []
  783. for scheduled_job in scheduled_jobs:
  784. suspend = scheduled_job.suspend or False
  785. if not all and suspend:
  786. continue
  787. image_or_space = scheduled_job.job_spec.docker_image or "N/A"
  788. cmd = scheduled_job.job_spec.command or []
  789. command_str = " ".join(cmd) if cmd else "N/A"
  790. props = {"id": scheduled_job.id, "image": image_or_space, "suspend": str(suspend), "command": command_str}
  791. if not _matches_filters(props, filters):
  792. continue
  793. filtered_jobs.append(scheduled_job)
  794. if not filtered_jobs:
  795. if not quiet and format != "json":
  796. filters_msg = f" matching filters: {', '.join([f'{k}{o}{v}' for k, o, v in filters])}" if filters else ""
  797. print(f"No scheduled jobs found{filters_msg}")
  798. elif format == "json":
  799. print("[]")
  800. return
  801. headers = ["ID", "SCHEDULE", "IMAGE/SPACE", "COMMAND", "LAST RUN", "NEXT RUN", "SUSPEND"]
  802. aliases = ["id", "schedule", "image", "command", "last", "next", "suspend"]
  803. items = [api_object_to_dict(sj) for sj in filtered_jobs]
  804. def row_fn(item: dict[str, Any]) -> list[str]:
  805. job_spec = item.get("job_spec", {})
  806. status = item.get("status", {})
  807. last_job = status.get("last_job")
  808. cmd = job_spec.get("command") or []
  809. last_job_at = "N/A"
  810. if last_job and last_job.get("at"):
  811. last_job_at = last_job["at"][:19].replace("T", " ")
  812. next_run = "N/A"
  813. if status.get("next_job_run_at"):
  814. next_run = status["next_job_run_at"][:19].replace("T", " ")
  815. command_str = " ".join(cmd) if cmd else "N/A"
  816. return [
  817. str(item.get("id", "")),
  818. str(item.get("schedule") or "N/A"),
  819. _format_cell(job_spec.get("docker_image") or "N/A"),
  820. _format_cell(command_str),
  821. last_job_at,
  822. next_run,
  823. str(item.get("suspend", False)),
  824. ]
  825. # Custom template format (e.g. --format '{{.id}} {{.schedule}}')
  826. if format and format not in ("table", "json"):
  827. _print_output([row_fn(item) for item in items], headers, aliases, format) # type: ignore
  828. else:
  829. output_format = OutputFormat.json if format == "json" else OutputFormat.table
  830. print_list_output(
  831. items=items,
  832. format=output_format,
  833. quiet=quiet,
  834. id_key="id",
  835. headers=headers,
  836. row_fn=row_fn,
  837. )
  838. @scheduled_app.command("inspect", examples=["hf jobs scheduled inspect <id>"])
  839. def scheduled_inspect(
  840. scheduled_job_ids: Annotated[
  841. list[str],
  842. typer.Argument(
  843. help="Scheduled Job IDs to inspect (or 'namespace/scheduled_job_id')",
  844. ),
  845. ],
  846. namespace: NamespaceOpt = None,
  847. token: TokenOpt = None,
  848. ) -> None:
  849. """Display detailed information on one or more scheduled Jobs"""
  850. parsed_ids = []
  851. for job_id in scheduled_job_ids:
  852. job_id, namespace = _parse_namespace_from_job_id(job_id, namespace)
  853. parsed_ids.append(job_id)
  854. scheduled_job_ids = parsed_ids
  855. api = get_hf_api(token=token)
  856. scheduled_jobs = [
  857. api.inspect_scheduled_job(scheduled_job_id=scheduled_job_id, namespace=namespace)
  858. for scheduled_job_id in scheduled_job_ids
  859. ]
  860. print(json.dumps([asdict(scheduled_job) for scheduled_job in scheduled_jobs], indent=4, default=str))
  861. @scheduled_app.command("delete", examples=["hf jobs scheduled delete <id>"])
  862. def scheduled_delete(
  863. scheduled_job_id: ScheduledJobIdArg,
  864. namespace: NamespaceOpt = None,
  865. token: TokenOpt = None,
  866. ) -> None:
  867. """Delete a scheduled Job."""
  868. scheduled_job_id, namespace = _parse_namespace_from_job_id(scheduled_job_id, namespace)
  869. api = get_hf_api(token=token)
  870. api.delete_scheduled_job(scheduled_job_id=scheduled_job_id, namespace=namespace)
  871. @scheduled_app.command("suspend", examples=["hf jobs scheduled suspend <id>"])
  872. def scheduled_suspend(
  873. scheduled_job_id: ScheduledJobIdArg,
  874. namespace: NamespaceOpt = None,
  875. token: TokenOpt = None,
  876. ) -> None:
  877. """Suspend (pause) a scheduled Job."""
  878. scheduled_job_id, namespace = _parse_namespace_from_job_id(scheduled_job_id, namespace)
  879. api = get_hf_api(token=token)
  880. api.suspend_scheduled_job(scheduled_job_id=scheduled_job_id, namespace=namespace)
  881. @scheduled_app.command("resume", examples=["hf jobs scheduled resume <id>"])
  882. def scheduled_resume(
  883. scheduled_job_id: ScheduledJobIdArg,
  884. namespace: NamespaceOpt = None,
  885. token: TokenOpt = None,
  886. ) -> None:
  887. """Resume (unpause) a scheduled Job."""
  888. scheduled_job_id, namespace = _parse_namespace_from_job_id(scheduled_job_id, namespace)
  889. api = get_hf_api(token=token)
  890. api.resume_scheduled_job(scheduled_job_id=scheduled_job_id, namespace=namespace)
  891. scheduled_uv_app = typer_factory(help="Schedule UV scripts on HF infrastructure.")
  892. scheduled_app.add_typer(scheduled_uv_app, name="uv")
  893. @scheduled_uv_app.command(
  894. "run",
  895. context_settings={"ignore_unknown_options": True},
  896. examples=[
  897. 'hf jobs scheduled uv run "0 0 * * *" script.py',
  898. 'hf jobs scheduled uv run "0 0 * * *" script.py --with pandas',
  899. ],
  900. )
  901. def scheduled_uv_run(
  902. schedule: ScheduleArg,
  903. script: ScriptArg,
  904. script_args: ScriptArgsArg = None,
  905. suspend: SuspendOpt = None,
  906. concurrency: ConcurrencyOpt = None,
  907. image: ImageOpt = None,
  908. flavor: FlavorOpt = None,
  909. env: EnvOpt = None,
  910. secrets: SecretsOpt = None,
  911. label: LabelsOpt = None,
  912. volume: VolumesOpt = None,
  913. env_file: EnvFileOpt = None,
  914. secrets_file: SecretsFileOpt = None,
  915. timeout: TimeoutOpt = None,
  916. namespace: NamespaceOpt = None,
  917. token: TokenOpt = None,
  918. with_: WithOpt = None,
  919. python: PythonOpt = None,
  920. ) -> None:
  921. """Run a UV script (local file or URL) on HF infrastructure"""
  922. env_map = parse_env_map(env, env_file)
  923. secrets_map = parse_env_map(secrets, secrets_file)
  924. api = get_hf_api(token=token)
  925. job = api.create_scheduled_uv_job(
  926. script=script,
  927. script_args=script_args or [],
  928. schedule=schedule,
  929. suspend=suspend,
  930. concurrency=concurrency,
  931. dependencies=with_,
  932. python=python,
  933. image=image,
  934. env=env_map,
  935. secrets=secrets_map,
  936. labels=_parse_labels_map(label),
  937. volumes=parse_volumes(volume),
  938. flavor=flavor, # type: ignore[arg-type,misc]
  939. timeout=timeout,
  940. namespace=namespace,
  941. )
  942. print(f"Scheduled Job created with ID: {job.id}")
  943. ### UTILS
  944. def _parse_labels_map(labels: list[str] | None) -> dict[str, str] | None:
  945. """Parse label key-value pairs from CLI arguments.
  946. Args:
  947. labels: List of label strings in KEY=VALUE format. If KEY only, then VALUE is set to empty string.
  948. Returns:
  949. Dictionary mapping label keys to values, or None if no labels provided.
  950. """
  951. if not labels:
  952. return None
  953. labels_map: dict[str, str] = {}
  954. for label_var in labels:
  955. key, value = label_var.split("=", 1) if "=" in label_var else (label_var, "")
  956. labels_map[key] = value
  957. return labels_map
  958. def _tabulate(rows: list[list[str | int]], headers: list[str]) -> str:
  959. """
  960. Inspired by:
  961. - stackoverflow.com/a/8356620/593036
  962. - stackoverflow.com/questions/9535954/printing-lists-as-tabular-data
  963. """
  964. col_widths = [max(len(str(x)) for x in col) for col in zip(*rows, headers)]
  965. terminal_width = max(shutil.get_terminal_size().columns, len(headers) * 12)
  966. while len(headers) + sum(col_widths) > terminal_width:
  967. col_to_minimize = col_widths.index(max(col_widths))
  968. col_widths[col_to_minimize] //= 2
  969. if len(headers) + sum(col_widths) <= terminal_width:
  970. col_widths[col_to_minimize] = terminal_width - sum(col_widths) - len(headers) + col_widths[col_to_minimize]
  971. row_format = ("{{:{}}} " * len(headers)).format(*col_widths)
  972. lines = []
  973. lines.append(row_format.format(*headers))
  974. lines.append(row_format.format(*["-" * w for w in col_widths]))
  975. for row in rows:
  976. row_format_args = [
  977. str(x)[: col_width - 3] + "..." if len(str(x)) > col_width else str(x)
  978. for x, col_width in zip(row, col_widths)
  979. ]
  980. lines.append(row_format.format(*row_format_args))
  981. return "\n".join(lines)
  982. T = TypeVar("T")
  983. def _write_generator_to_queue(queue: Queue[T], func: Callable[..., Iterable[T]], kwargs: dict) -> None:
  984. for result in func(**kwargs):
  985. queue.put(result)
  986. def iflatmap_unordered(
  987. pool: multiprocessing.pool.ThreadPool,
  988. func: Callable[..., Iterable[T]],
  989. *,
  990. kwargs_list: list[dict],
  991. ) -> Iterable[T]:
  992. """
  993. Takes a function that returns an iterable of items, and run it in parallel using threads to return the flattened iterable of items as they arrive.
  994. This is inspired by those three `map()` variants, and is the mix of all three:
  995. * `imap()`: like `map()` but returns an iterable instead of a list of results
  996. * `imap_unordered()`: like `imap()` but the output is sorted by time of arrival
  997. * `flatmap()`: like `map()` but given a function which returns a list, `flatmap()` returns the flattened list that is the concatenation of all the output lists
  998. """
  999. queue: Queue[T] = Queue()
  1000. async_results = [pool.apply_async(_write_generator_to_queue, (queue, func, kwargs)) for kwargs in kwargs_list]
  1001. try:
  1002. while True:
  1003. try:
  1004. yield queue.get(timeout=0.05)
  1005. except Empty:
  1006. if all(async_result.ready() for async_result in async_results) and queue.empty():
  1007. break
  1008. except KeyboardInterrupt:
  1009. pass
  1010. finally:
  1011. # we get the result in case there's an error to raise
  1012. try:
  1013. [async_result.get(timeout=0.05) for async_result in async_results]
  1014. except multiprocessing.TimeoutError:
  1015. pass