| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- from __future__ import annotations
- import asyncio
- import pprint
- from typing import Any
- import wandb
- import wandb.apis.public as public
- from wandb.apis.internal import Api
- from wandb.errors import CommError
- from wandb.sdk.launch.builder.build import build_image_from_project
- from wandb.sdk.launch.errors import LaunchError
- from wandb.sdk.launch.utils import (
- LAUNCH_DEFAULT_PROJECT,
- LOG_PREFIX,
- construct_launch_spec,
- validate_launch_spec_source,
- )
- from ._project_spec import LaunchProject
- def push_to_queue(
- api: Api,
- queue_name: str,
- launch_spec: dict[str, Any],
- template_variables: dict | None,
- project_queue: str,
- priority: int | None = None,
- ) -> Any:
- return api.push_to_run_queue(
- queue_name, launch_spec, template_variables, project_queue, priority
- )
- def launch_add(
- uri: str | None = None,
- job: str | None = None,
- config: dict[str, Any] | None = None,
- template_variables: dict[str, float | int | str] | None = None,
- project: str | None = None,
- entity: str | None = None,
- queue_name: str | None = None,
- resource: str | None = None,
- entry_point: list[str] | None = None,
- name: str | None = None,
- version: str | None = None,
- docker_image: str | None = None,
- project_queue: str | None = None,
- resource_args: dict[str, Any] | None = None,
- run_id: str | None = None,
- build: bool | None = False,
- repository: str | None = None,
- sweep_id: str | None = None,
- author: str | None = None,
- priority: int | None = None,
- ) -> public.QueuedRun:
- """Enqueue a W&B launch experiment. With either a source uri, job or docker_image.
- Arguments:
- uri: URI of experiment to run. A wandb run uri or a Git repository URI.
- job: string reference to a wandb.Job eg: wandb/test/my-job:latest
- config: A dictionary containing the configuration for the run. May also contain
- resource specific arguments under the key "resource_args"
- template_variables: A dictionary containing values of template variables for a run queue.
- Expected format of `{"VAR_NAME": VAR_VALUE}`
- project: Target project to send launched run to
- entity: Target entity to send launched run to
- queue: the name of the queue to enqueue the run to
- priority: the priority level of the job, where 1 is the highest priority
- resource: Execution backend for the run: W&B provides built-in support for "local-container" backend
- entry_point: Entry point to run within the project. Defaults to using the entry point used
- in the original run for wandb URIs, or main.py for git repository URIs.
- name: Name run under which to launch the run.
- version: For Git-based projects, either a commit hash or a branch name.
- docker_image: The name of the docker image to use for the run.
- resource_args: Resource related arguments for launching runs onto a remote backend.
- Will be stored on the constructed launch config under ``resource_args``.
- run_id: optional string indicating the id of the launched run
- build: optional flag defaulting to false, requires queue to be set
- if build, an image is created, creates a job artifact, pushes a reference
- to that job artifact to queue
- repository: optional string to control the name of the remote repository, used when
- pushing images to a registry
- project_queue: optional string to control the name of the project for the queue. Primarily used
- for back compatibility with project scoped queues
- Example:
- ```python
- from wandb.sdk.launch import launch_add
- project_uri = "https://github.com/wandb/examples"
- params = {"alpha": 0.5, "l1_ratio": 0.01}
- # Run W&B project and create a reproducible docker environment
- # on a local host
- api = wandb.apis.internal.Api()
- launch_add(uri=project_uri, parameters=params)
- ```
- Returns:
- an instance of`wandb.api.public.QueuedRun` which gives information about the
- queued run, or if `wait_until_started` or `wait_until_finished` are called, gives access
- to the underlying Run information.
- Raises:
- `wandb.exceptions.LaunchError` if unsuccessful
- """
- api = Api()
- return _launch_add(
- api,
- job,
- config,
- template_variables,
- project,
- entity,
- queue_name,
- resource,
- entry_point,
- name,
- version,
- docker_image,
- project_queue,
- resource_args,
- run_id=run_id,
- build=build,
- repository=repository,
- sweep_id=sweep_id,
- author=author,
- priority=priority,
- )
- def _launch_add(
- api: Api,
- job: str | None,
- config: dict[str, Any] | None,
- template_variables: dict | None,
- project: str | None,
- entity: str | None,
- queue_name: str | None,
- resource: str | None,
- entry_point: list[str] | None,
- name: str | None,
- version: str | None,
- docker_image: str | None,
- project_queue: str | None,
- resource_args: dict[str, Any] | None = None,
- run_id: str | None = None,
- build: bool | None = False,
- repository: str | None = None,
- sweep_id: str | None = None,
- author: str | None = None,
- priority: int | None = None,
- ) -> public.QueuedRun:
- launch_spec = construct_launch_spec(
- None,
- job,
- api,
- name,
- project,
- entity,
- docker_image,
- resource,
- entry_point,
- version,
- resource_args,
- config,
- run_id,
- repository,
- author,
- sweep_id,
- )
- if build:
- if resource == "local-process":
- raise LaunchError(
- "Cannot build a docker image for the resource: local-process"
- )
- if launch_spec.get("job") is not None:
- wandb.termwarn("Build doesn't support setting a job. Overwriting job.")
- launch_spec["job"] = None
- launch_project = LaunchProject.from_spec(launch_spec, api)
- docker_image_uri = asyncio.run(
- build_image_from_project(launch_project, api, config or {})
- )
- run = wandb.run or wandb.init(
- project=launch_spec["project"],
- entity=launch_spec["entity"],
- job_type="launch_job",
- )
- job_artifact = run._log_job_artifact_with_image( # type: ignore
- docker_image_uri, launch_project.override_args
- )
- job_name = job_artifact.wait().name
- job = f"{launch_spec['entity']}/{launch_spec['project']}/{job_name}"
- launch_spec["job"] = job
- launch_spec["uri"] = None # Remove given URI --> now in job
- if queue_name is None:
- queue_name = "default"
- if project_queue is None:
- project_queue = LAUNCH_DEFAULT_PROJECT
- spec_template_vars = launch_spec.get("template_variables")
- if isinstance(spec_template_vars, dict):
- launch_spec.pop("template_variables")
- if template_variables is None:
- template_variables = spec_template_vars
- else:
- template_variables = {
- **spec_template_vars,
- **template_variables,
- }
- validate_launch_spec_source(launch_spec)
- res = push_to_queue(
- api, queue_name, launch_spec, template_variables, project_queue, priority
- )
- if res is None or "runQueueItemId" not in res:
- raise LaunchError("Error adding run to queue")
- updated_spec = res.get("runSpec")
- if updated_spec:
- if updated_spec.get("resource_args"):
- launch_spec["resource_args"] = updated_spec.get("resource_args")
- if updated_spec.get("resource"):
- launch_spec["resource"] = updated_spec.get("resource")
- if project_queue == LAUNCH_DEFAULT_PROJECT:
- wandb.termlog(f"{LOG_PREFIX}Added run to queue {queue_name}.")
- else:
- wandb.termlog(f"{LOG_PREFIX}Added run to queue {project_queue}/{queue_name}.")
- wandb.termlog(f"{LOG_PREFIX}Launch spec:\n{pprint.pformat(launch_spec)}\n")
- public_api = public.Api()
- if job is not None:
- try:
- public_api._artifact(job, type="job")
- except (ValueError, CommError) as e:
- raise LaunchError(f"Unable to fetch job with name {job}: {e}")
- queued_run = public_api.queued_run(
- launch_spec["entity"],
- launch_spec["project"],
- queue_name,
- res["runQueueItemId"],
- project_queue,
- priority,
- )
- return queued_run # type: ignore
|