| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596 |
- from __future__ import annotations
- import asyncio
- import base64
- import copy
- import json
- import logging
- import os
- import shutil
- import tarfile
- import tempfile
- import time
- import traceback
- from typing import Any
- import wandb
- from wandb.sdk.launch.agent.job_status_tracker import JobAndRunStatusTracker
- from wandb.sdk.launch.builder.abstract import AbstractBuilder, registry_from_uri
- from wandb.sdk.launch.environment.abstract import AbstractEnvironment
- from wandb.sdk.launch.environment.azure_environment import AzureEnvironment
- from wandb.sdk.launch.registry.abstract import AbstractRegistry
- from wandb.sdk.launch.registry.azure_container_registry import AzureContainerRegistry
- from wandb.sdk.launch.registry.elastic_container_registry import (
- ElasticContainerRegistry,
- )
- from wandb.sdk.launch.registry.google_artifact_registry import GoogleArtifactRegistry
- from wandb.util import get_module
- from .._project_spec import EntryPoint, LaunchProject
- from ..errors import LaunchError
- from ..utils import (
- LOG_PREFIX,
- get_kube_context_and_api_client,
- warn_failed_packages_from_build_logs,
- )
- from .build import _WANDB_DOCKERFILE_NAME
- from .context_manager import BuildContextManager
- get_module(
- "kubernetes_asyncio",
- required="Kaniko builder requires the kubernetes_asyncio package. Please install it with `pip install wandb[launch]`.",
- )
- import kubernetes_asyncio as kubernetes # type: ignore # noqa: E402
- from kubernetes_asyncio import client # noqa: E402
- _logger = logging.getLogger(__name__)
- _DEFAULT_BUILD_TIMEOUT_SECS = 1800 # 30 minute build timeout
- SERVICE_ACCOUNT_NAME = os.environ.get("WANDB_LAUNCH_SERVICE_ACCOUNT_NAME", "default")
- PVC_NAME = os.environ.get("WANDB_LAUNCH_KANIKO_PVC_NAME")
- PVC_MOUNT_PATH = (
- os.environ.get("WANDB_LAUNCH_KANIKO_PVC_MOUNT_PATH", "/kaniko").rstrip("/")
- if PVC_NAME
- else None
- )
- DOCKER_CONFIG_SECRET = os.environ.get("WANDB_LAUNCH_KANIKO_AUTH_SECRET")
- if os.path.exists("/var/run/secrets/kubernetes.io/serviceaccount/namespace"):
- with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
- NAMESPACE = f.read().strip()
- else:
- NAMESPACE = "wandb"
- def get_pod_name_safe(job: client.V1Job):
- try:
- return job.spec.template.metadata.name
- except AttributeError:
- return None
- async def _wait_for_completion(
- batch_client: client.BatchV1Api, job_name: str, deadline_secs: int | None = None
- ) -> bool:
- start_time = time.time()
- while True:
- job = await batch_client.read_namespaced_job_status(job_name, NAMESPACE)
- if job.status.succeeded is not None and job.status.succeeded >= 1:
- return True
- elif job.status.failed is not None and job.status.failed >= 1:
- wandb.termerror(f"{LOG_PREFIX}Build job {job.status.failed} failed {job}")
- return False
- wandb.termlog(f"{LOG_PREFIX}Waiting for build job to complete...")
- if deadline_secs is not None and time.time() - start_time > deadline_secs:
- return False
- await asyncio.sleep(5)
- class KanikoBuilder(AbstractBuilder):
- """Builds a docker image for a project using Kaniko."""
- type = "kaniko"
- build_job_name: str
- build_context_store: str
- secret_name: str | None
- secret_key: str | None
- image: str
- def __init__(
- self,
- environment: AbstractEnvironment,
- registry: AbstractRegistry,
- build_job_name: str = "wandb-launch-container-build",
- build_context_store: str = "",
- secret_name: str = "",
- secret_key: str = "",
- image: str = "gcr.io/kaniko-project/executor:v1.11.0",
- config: dict | None = None,
- ):
- """Initialize a KanikoBuilder.
- Arguments:
- environment (AbstractEnvironment): The environment to use.
- registry (AbstractRegistry): The registry to use.
- build_job_name (str, optional): The name of the build job.
- build_context_store (str, optional): The name of the build context store.
- secret_name (str, optional): The name of the secret to use for the registry.
- secret_key (str, optional): The key of the secret to use for the registry.
- verify (bool, optional): Whether to verify the functionality of the builder.
- Defaults to True.
- """
- self.environment = environment
- self.registry = registry
- self.build_job_name = build_job_name
- self.build_context_store = build_context_store.rstrip("/")
- self.secret_name = secret_name
- self.secret_key = secret_key
- self.image = image
- self.kaniko_config = config or {}
- @classmethod
- def from_config(
- cls,
- config: dict,
- environment: AbstractEnvironment,
- registry: AbstractRegistry,
- verify: bool = True,
- login: bool = True,
- ) -> AbstractBuilder:
- """Create a KanikoBuilder from a config dict.
- Arguments:
- config: A dict containing the builder config. Must contain a "type" key
- with value "kaniko".
- environment: The environment to use for the build.
- registry: The registry to use for the build.
- verify: Whether to verify the builder config.
- Returns:
- A KanikoBuilder instance.
- """
- if config.get("type") != "kaniko":
- raise LaunchError(
- "Builder config must include 'type':'kaniko' to create a KanikoBuilder."
- )
- build_context_store = config.get("build-context-store", "")
- if build_context_store is None and not PVC_MOUNT_PATH:
- raise LaunchError(
- "You must specify a build context store for kaniko builds. "
- "You can set builder.build-context-store in your agent config "
- "to a valid s3, gcs, or azure blog storage URI. Or, configure "
- "a persistent volume claim through the agent helm chart: "
- "https://github.com/wandb/helm-charts/tree/main/charts/launch-agent"
- )
- build_job_name = config.get("build-job-name", "wandb-launch-container-build")
- secret_name = config.get("secret-name", "")
- secret_key = config.get("secret-key", "")
- kaniko_image = config.get(
- "kaniko-image", "gcr.io/kaniko-project/executor:v1.11.0"
- )
- image_uri = config.get("destination")
- if image_uri is not None:
- registry = registry_from_uri(image_uri)
- kaniko_config = config.get("kaniko-config", {})
- return cls(
- environment,
- registry,
- build_context_store=build_context_store,
- build_job_name=build_job_name,
- secret_name=secret_name,
- secret_key=secret_key,
- image=kaniko_image,
- config=kaniko_config,
- )
- async def verify(self) -> None:
- """Verify that the builder config is valid.
- Raises:
- LaunchError: If the builder config is invalid.
- """
- if self.build_context_store:
- await self.environment.verify_storage_uri(self.build_context_store)
- def login(self) -> None:
- """Login to the registry."""
- async def _create_docker_ecr_config_map(
- self, job_name: str, corev1_client: client.CoreV1Api, repository: str
- ) -> None:
- username, password = await self.registry.get_username_password()
- encoded = base64.b64encode(f"{username}:{password}".encode()).decode("utf-8")
- ecr_config_map = client.V1ConfigMap(
- api_version="v1",
- kind="ConfigMap",
- metadata=client.V1ObjectMeta(
- name=f"docker-config-{job_name}",
- namespace=NAMESPACE,
- ),
- data={
- "config.json": json.dumps(
- {
- "auths": {
- f"{await self.registry.get_repo_uri()}": {"auth": encoded}
- }
- }
- )
- },
- immutable=True,
- )
- await corev1_client.create_namespaced_config_map(NAMESPACE, ecr_config_map)
- async def _delete_docker_ecr_config_map(
- self, job_name: str, client: client.CoreV1Api
- ) -> None:
- if self.secret_name:
- await client.delete_namespaced_config_map(
- f"docker-config-{job_name}", NAMESPACE
- )
- async def _upload_build_context(self, run_id: str, context_path: str) -> str:
- # creat a tar archive of the build context and upload it to s3
- context_file = tempfile.NamedTemporaryFile(delete=False)
- with tarfile.TarFile.open(fileobj=context_file, mode="w:gz") as context_tgz:
- context_tgz.add(context_path, arcname=".")
- context_file.close()
- if PVC_MOUNT_PATH is None:
- destination = f"{self.build_context_store}/{run_id}.tgz"
- if self.environment is None:
- raise LaunchError("No environment specified for Kaniko build.")
- await self.environment.upload_file(context_file.name, destination)
- return destination
- else:
- destination = f"{PVC_MOUNT_PATH}/{run_id}.tgz"
- try:
- shutil.copy(context_file.name, destination)
- except Exception as e:
- raise LaunchError(
- f"Error copying build context to PVC mounted at {PVC_MOUNT_PATH}: {e}"
- ) from e
- return f"tar:///context/{run_id}.tgz"
- async def build_image(
- self,
- launch_project: LaunchProject,
- entrypoint: EntryPoint,
- job_tracker: JobAndRunStatusTracker | None = None,
- ) -> str:
- await self.verify()
- build_contex_manager = BuildContextManager(launch_project=launch_project)
- context_path, image_tag = build_contex_manager.create_build_context("kaniko")
- run_id = launch_project.run_id
- repo_uri = await self.registry.get_repo_uri()
- image_uri = repo_uri + ":" + image_tag
- # The DOCKER_CONFIG_SECRET option is mutually exclusive with the
- # registry classes, so we must skip the check for image existence in
- # that case.
- if not launch_project.build_required():
- if DOCKER_CONFIG_SECRET:
- wandb.termlog(
- f"Skipping check for existing image {image_uri} due to custom dockerconfig."
- )
- else:
- if await self.registry.check_image_exists(image_uri):
- return image_uri
- _logger.info(f"Building image {image_uri}...")
- _, api_client = await get_kube_context_and_api_client(
- kubernetes, launch_project.resource_args
- )
- # TODO: use same client as kubernetes_runner.py
- batch_v1 = client.BatchV1Api(api_client)
- core_v1 = client.CoreV1Api(api_client)
- build_job_name = f"{self.build_job_name}-{run_id}"
- build_context = await self._upload_build_context(run_id, context_path)
- build_job = await self._create_kaniko_job(
- build_job_name, repo_uri, image_uri, build_context, core_v1, api_client
- )
- wandb.termlog(f"{LOG_PREFIX}Created kaniko job {build_job_name}")
- try:
- # DOCKER_CONFIG_SECRET is a user provided dockerconfigjson. Skip our
- # dockerconfig handling if it's set.
- if (
- isinstance(self.registry, AzureContainerRegistry)
- and not DOCKER_CONFIG_SECRET
- ):
- dockerfile_config_map = client.V1ConfigMap(
- metadata=client.V1ObjectMeta(
- name=f"docker-config-{build_job_name}"
- ),
- data={
- "config.json": json.dumps(
- {
- "credHelpers": {
- f"{self.registry.registry_name}.azurecr.io": "acr-env"
- }
- }
- )
- },
- )
- await core_v1.create_namespaced_config_map(
- "wandb", dockerfile_config_map
- )
- if self.secret_name:
- await self._create_docker_ecr_config_map(
- build_job_name, core_v1, repo_uri
- )
- k8s_job = await batch_v1.create_namespaced_job(NAMESPACE, build_job)
- # wait for double the job deadline since it might take time to schedule
- if not await _wait_for_completion(
- batch_v1, build_job_name, 3 * _DEFAULT_BUILD_TIMEOUT_SECS
- ):
- if job_tracker:
- job_tracker.set_err_stage("build")
- msg = f"Failed to build image in kaniko for job {run_id}."
- pod_name = get_pod_name_safe(k8s_job)
- if pod_name:
- msg += f" View logs with `kubectl logs -n {NAMESPACE} {pod_name}`."
- raise Exception(msg) # noqa: TRY301
- try:
- pods_from_job = await core_v1.list_namespaced_pod(
- namespace=NAMESPACE, label_selector=f"job-name={build_job_name}"
- )
- if len(pods_from_job.items) != 1:
- raise Exception( # noqa: TRY301
- f"Expected 1 pod for job {build_job_name},"
- f" found {len(pods_from_job.items)}"
- )
- pod_name = pods_from_job.items[0].metadata.name
- logs = await core_v1.read_namespaced_pod_log(pod_name, NAMESPACE)
- warn_failed_packages_from_build_logs(
- logs, image_uri, launch_project.api, job_tracker
- )
- except Exception as e:
- wandb.termwarn(
- f"{LOG_PREFIX}Failed to get logs for kaniko job {build_job_name}: {e}"
- )
- except Exception as e:
- wandb.termerror(
- f"{LOG_PREFIX}Exception when creating Kubernetes resources: {e}\n"
- )
- raise
- finally:
- wandb.termlog(f"{LOG_PREFIX}Cleaning up resources")
- try:
- if (
- isinstance(self.registry, AzureContainerRegistry)
- and not DOCKER_CONFIG_SECRET
- ):
- await core_v1.delete_namespaced_config_map(
- f"docker-config-{build_job_name}", "wandb"
- )
- if self.secret_name:
- await self._delete_docker_ecr_config_map(build_job_name, core_v1)
- await batch_v1.delete_namespaced_job(build_job_name, NAMESPACE)
- except Exception as e:
- traceback.print_exc()
- raise LaunchError(
- f"Exception during Kubernetes resource clean up {e}"
- ) from e
- return image_uri
- async def _create_kaniko_job(
- self,
- job_name: str,
- repository: str,
- image_tag: str,
- build_context_path: str,
- core_client: client.CoreV1Api,
- api_client,
- ) -> dict[str, Any]:
- job = copy.deepcopy(self.kaniko_config)
- job_metadata = job.get("metadata", {})
- job_labels = job_metadata.get("labels", {})
- job_spec = job.get("spec", {})
- pod_template = job_spec.get("template", {})
- pod_metadata = pod_template.get("metadata", {})
- pod_labels = pod_metadata.get("labels", {})
- pod_spec = pod_template.get("spec", {})
- volumes = pod_spec.get("volumes", [])
- containers = pod_spec.get("containers") or [{}]
- if len(containers) > 1:
- raise LaunchError(
- "Multiple container configs not supported for kaniko builder."
- )
- container = containers[0]
- volume_mounts = container.get("volumeMounts", [])
- env = container.get("env", [])
- custom_args = container.get("args", [])
- if PVC_MOUNT_PATH:
- volumes.append(
- {"name": "kaniko-pvc", "persistentVolumeClaim": {"claimName": PVC_NAME}}
- )
- volume_mounts.append({"name": "kaniko-pvc", "mountPath": "/context"})
- if bool(self.secret_name) != bool(self.secret_key):
- raise LaunchError(
- "Both secret_name and secret_key or neither must be specified "
- "for kaniko build. You provided only one of them."
- )
- if isinstance(self.registry, ElasticContainerRegistry):
- env.append(
- {
- "name": "AWS_REGION",
- "value": self.registry.region,
- }
- )
- # TODO(ben): Refactor all of this environment/registry
- # specific stuff into methods of those classes.
- if isinstance(self.environment, AzureEnvironment):
- # Use the core api to check if the secret exists
- try:
- await core_client.read_namespaced_secret(
- "azure-storage-access-key",
- "wandb",
- )
- except Exception as e:
- raise LaunchError(
- "Secret azure-storage-access-key does not exist in "
- "namespace wandb. Please create it with the key password "
- "set to your azure storage access key."
- ) from e
- env.append(
- {
- "name": "AZURE_STORAGE_ACCESS_KEY",
- "valueFrom": {
- "secretKeyRef": {
- "name": "azure-storage-access-key",
- "key": "password",
- }
- },
- }
- )
- if DOCKER_CONFIG_SECRET:
- volumes.append(
- {
- "name": "kaniko-docker-config",
- "secret": {
- "secretName": DOCKER_CONFIG_SECRET,
- "items": [
- {
- "key": ".dockerconfigjson",
- "path": "config.json",
- }
- ],
- },
- }
- )
- volume_mounts.append(
- {"name": "kaniko-docker-config", "mountPath": "/kaniko/.docker"}
- )
- elif self.secret_name and self.secret_key:
- volumes.append(
- {
- "name": "docker-config",
- "configMap": {"name": f"docker-config-{job_name}"},
- }
- )
- volume_mounts.append(
- {"name": "docker-config", "mountPath": "/kaniko/.docker"}
- )
- # TODO(ben): I don't like conditioning on the registry type here. As a
- # future change I want the registry and environment classes to provide
- # a list of environment variables and volume mounts that need to be
- # added to the job. The environment class provides credentials for
- # build context access, and the registry class provides credentials
- # for pushing the image. This way we can have separate secrets for
- # each and support build contexts and registries that require
- # different credentials.
- if isinstance(self.registry, ElasticContainerRegistry):
- mount_path = "/root/.aws"
- key = "credentials"
- elif isinstance(self.registry, GoogleArtifactRegistry):
- mount_path = "/kaniko/.config/gcloud"
- key = "config.json"
- env.append(
- {
- "name": "GOOGLE_APPLICATION_CREDENTIALS",
- "value": "/kaniko/.config/gcloud/config.json",
- }
- )
- else:
- wandb.termwarn(
- f"{LOG_PREFIX}Automatic credential handling is not supported for registry type {type(self.registry)}. Build job: {self.build_job_name}"
- )
- volumes.append(
- {
- "name": self.secret_name,
- "secret": {
- "secretName": self.secret_name,
- "items": [{"key": self.secret_key, "path": key}],
- },
- }
- )
- volume_mounts.append(
- {
- "name": self.secret_name,
- "mountPath": mount_path,
- "readOnly": True,
- }
- )
- if (
- isinstance(self.registry, AzureContainerRegistry)
- and not DOCKER_CONFIG_SECRET
- ):
- # Add the docker config map
- volumes.append(
- {
- "name": "docker-config",
- "configMap": {"name": f"docker-config-{job_name}"},
- }
- )
- volume_mounts.append(
- {"name": "docker-config", "mountPath": "/kaniko/.docker/"}
- )
- # Kaniko doesn't want https:// at the beginning of the image tag.
- destination = image_tag
- if destination.startswith("https://"):
- destination = destination.replace("https://", "")
- args = {
- "--context": build_context_path,
- "--dockerfile": _WANDB_DOCKERFILE_NAME,
- "--destination": destination,
- "--cache": "true",
- "--cache-repo": repository.replace("https://", ""),
- "--snapshot-mode": "redo",
- "--compressed-caching": "false",
- }
- for custom_arg in custom_args:
- arg_name, arg_value = custom_arg.split("=", 1)
- args[arg_name] = arg_value
- parsed_args = [
- f"{arg_name}={arg_value}" for arg_name, arg_value in args.items()
- ]
- container["args"] = parsed_args
- # Apply the rest of our defaults
- pod_labels["wandb"] = "launch"
- # This annotation is required to enable azure workload identity.
- # Don't add this label if using a docker config secret for auth.
- if (
- isinstance(self.registry, AzureContainerRegistry)
- and not DOCKER_CONFIG_SECRET
- ):
- pod_labels["azure.workload.identity/use"] = "true"
- pod_spec["restartPolicy"] = pod_spec.get("restartPolicy", "Never")
- pod_spec["activeDeadlineSeconds"] = pod_spec.get(
- "activeDeadlineSeconds", _DEFAULT_BUILD_TIMEOUT_SECS
- )
- pod_spec["serviceAccountName"] = pod_spec.get(
- "serviceAccountName", SERVICE_ACCOUNT_NAME
- )
- job_spec["backoffLimit"] = job_spec.get("backoffLimit", 0)
- job_labels["wandb"] = "launch"
- job_metadata["namespace"] = job_metadata.get("namespace", NAMESPACE)
- job_metadata["name"] = job_metadata.get("name", job_name)
- job["apiVersion"] = "batch/v1"
- job["kind"] = "Job"
- # Apply all nested configs from the bottom up
- pod_metadata["labels"] = pod_labels
- pod_template["metadata"] = pod_metadata
- container["name"] = container.get("name", "wandb-container-build")
- container["image"] = container.get("image", self.image)
- container["volumeMounts"] = volume_mounts
- container["env"] = env
- pod_spec["containers"] = [container]
- pod_spec["volumes"] = volumes
- pod_template["spec"] = pod_spec
- job_spec["template"] = pod_template
- job_metadata["labels"] = job_labels
- job["metadata"] = job_metadata
- job["spec"] = job_spec
- return job
|