| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750 |
- """W&B Public API for Runs.
- This module provides classes for interacting with W&B runs and their associated
- data.
- Example:
- ```python
- from wandb.apis.public import Api
- # Get runs matching filters
- runs = Api().runs(
- path="entity/project", filters={"state": "finished", "config.batch_size": 32}
- )
- # Access run data
- for run in runs:
- print(f"Run: {run.name}")
- print(f"Config: {run.config}")
- print(f"Metrics: {run.summary}")
- # Get history with pandas
- history_df = run.history(keys=["loss", "accuracy"], pandas=True)
- # Work with artifacts
- for artifact in run.logged_artifacts():
- print(f"Artifact: {artifact.name}")
- ```
- Note:
- This module is part of the W&B Public API and provides read/write access
- to run data. For logging new runs, use the wandb.init() function from
- the main wandb package.
- """
- from __future__ import annotations
- import json
- import os
- import pathlib
- import tempfile
- import time
- import urllib
- from collections.abc import Collection, Iterator, Mapping
- from typing import TYPE_CHECKING, Any, Literal
- from typing_extensions import override
- from wandb_gql import gql
- import wandb
- import wandb.apis.public.runhistory as runhistory
- from wandb import env, util
- from wandb._strutils import nameof
- from wandb.apis import public
- from wandb.apis._generated import GET_AGENT_RUNS_GQL
- from wandb.apis._generated.get_agent_runs import GetAgentRuns
- from wandb.apis.attrs import Attrs
- from wandb.apis.internal import Api as InternalApi
- from wandb.apis.normalize import normalize_exceptions
- from wandb.apis.paginator import SizedPaginator
- from wandb.apis.public.const import RETRY_TIMEDELTA
- from wandb.apis.public.service_api import ServiceApi
- from wandb.proto import wandb_api_pb2 as apb
- from wandb.sdk import wandb_setup
- from wandb.sdk.lib import ipython, json_util, runid
- from wandb.sdk.lib.paths import LogicalPath
- from wandb.sdk.lib.service.service_connection import WandbApiFailedError
- if TYPE_CHECKING:
- import pandas as pd
- import polars as pl
- from typing_extensions import Self
- from wandb_graphql.language.ast import Document
- from wandb.apis.public import RetryingClient
- from wandb.old.summary import HTTPSummary
- WANDB_INTERNAL_KEYS = {"_wandb", "wandb_version"}
- RUN_FRAGMENT = """fragment RunFragment on Run {
- id
- tags
- name
- displayName
- sweepName
- state
- config
- group
- jobType
- commit
- readOnly
- createdAt
- heartbeatAt
- description
- notes
- systemMetrics
- summaryMetrics
- historyLineCount
- user {
- name
- username
- }
- historyKeys
- }"""
- # Lightweight fragment for listing operations - excludes heavy fields
- LIGHTWEIGHT_RUN_FRAGMENT = """fragment LightweightRunFragment on Run {
- id
- tags
- name
- displayName
- sweepName
- state
- group
- jobType
- commit
- readOnly
- createdAt
- heartbeatAt
- description
- notes
- historyLineCount
- user {
- name
- username
- }
- }"""
- # Fragment name constants to avoid string parsing
- RUN_FRAGMENT_NAME = "RunFragment"
- LIGHTWEIGHT_RUN_FRAGMENT_NAME = "LightweightRunFragment"
- def _create_runs_query(*, lazy: bool) -> gql:
- """Create GraphQL query for runs with appropriate fragment."""
- fragment = LIGHTWEIGHT_RUN_FRAGMENT if lazy else RUN_FRAGMENT
- fragment_name = LIGHTWEIGHT_RUN_FRAGMENT_NAME if lazy else RUN_FRAGMENT_NAME
- return gql(
- f"""#graphql
- query Runs($project: String!, $entity: String!, $cursor: String, $perPage: Int = 50, $order: String, $filters: JSONString) {{
- project(name: $project, entityName: $entity) {{
- internalId
- runCount(filters: $filters)
- readOnly
- runs(filters: $filters, after: $cursor, first: $perPage, order: $order) {{
- edges {{
- node {{
- projectId
- ...{fragment_name}
- }}
- cursor
- }}
- pageInfo {{
- endCursor
- hasNextPage
- }}
- }}
- }}
- }}
- {fragment}
- """
- )
- @normalize_exceptions
- def _convert_to_dict(value: Any) -> dict[str, Any]:
- """Converts a value to a dictionary.
- If the value is already a dictionary, the value is returned unchanged.
- If the value is a string, bytes, or bytearray, it is parsed as JSON.
- For any other type, a TypeError is raised.
- """
- if value is None:
- return {}
- if isinstance(value, dict):
- return value
- if isinstance(value, (str, bytes, bytearray)):
- try:
- return json.loads(value)
- except json.decoder.JSONDecodeError:
- # ignore invalid utf-8 or control characters
- return json.loads(value, strict=False)
- raise TypeError(f"Unable to convert {value} to a dict")
- class Runs(SizedPaginator["Run"]):
- """A lazy iterator of `Run` objects associated with a project and optional filter.
- Runs are retrieved in pages from the W&B server as needed.
- This is generally used indirectly using the `Api.runs` namespace.
- Args:
- client: (`wandb.apis.public.RetryingClient`) The API client to use
- for requests.
- entity: (str) The entity (username or team) that owns the project.
- project: (str) The name of the project to fetch runs from.
- filters: (Optional[Dict[str, Any]]) A dictionary of filters to apply
- to the runs query.
- order: (str) Order can be `created_at`, `heartbeat_at`, `config.*.value`, or `summary_metrics.*`.
- If you prepend order with a + order is ascending (default).
- If you prepend order with a - order is descending.
- The default order is run.created_at from oldest to newest.
- per_page: (int) The number of runs to fetch per request (default is 50).
- include_sweeps: (bool) Whether to include sweep information in the
- runs. Defaults to True.
- """
- def __init__(
- self,
- client: RetryingClient,
- entity: str,
- project: str,
- filters: dict[str, Any] | None = None,
- order: str = "+created_at",
- per_page: int = 50,
- include_sweeps: bool = True,
- lazy: bool = True,
- service_api: ServiceApi | None = None,
- ):
- if not order:
- order = "+created_at"
- self.QUERY = _create_runs_query(lazy=lazy)
- self.entity = entity
- self.project = project
- self._project_internal_id = None
- self.filters = filters or {}
- self.order = order
- self._sweeps: dict[str, public.Sweep] = {}
- self._include_sweeps = include_sweeps
- self._lazy = lazy
- self._service_api = service_api
- variables = {
- "project": self.project,
- "entity": self.entity,
- "order": self.order,
- "filters": json.dumps(self.filters),
- }
- super().__init__(client, variables, per_page)
- @property
- def _length(self) -> int:
- """Returns the total number of runs.
- <!-- lazydoc-ignore: internal -->
- """
- if not self.last_response:
- self._load_page()
- return self.last_response["project"]["runCount"]
- @property
- def more(self) -> bool:
- """Returns whether there are more runs to fetch.
- <!-- lazydoc-ignore: internal -->
- """
- if self.last_response:
- return bool(
- self.last_response["project"]["runs"]["pageInfo"]["hasNextPage"]
- )
- else:
- return True
- @property
- def cursor(self):
- """Returns the cursor position for pagination of runs results.
- <!-- lazydoc-ignore: internal -->
- """
- if self.last_response:
- return self.last_response["project"]["runs"]["edges"][-1]["cursor"]
- else:
- return None
- def convert_objects(self) -> list[Run]:
- """Converts GraphQL edges to Runs objects.
- <!-- lazydoc-ignore: internal -->
- """
- objs = []
- if self.last_response is None or self.last_response.get("project") is None:
- raise ValueError("Could not find project {}".format(self.project))
- for run_response in self.last_response["project"]["runs"]["edges"]:
- run = Run(
- self.client,
- self.entity,
- self.project,
- run_response["node"]["name"],
- run_response["node"],
- include_sweeps=self._include_sweeps,
- lazy=self._lazy,
- service_api=self._service_api,
- )
- objs.append(run)
- if self._include_sweeps and run.sweep_name:
- if run.sweep_name in self._sweeps:
- sweep = self._sweeps[run.sweep_name]
- else:
- sweep = public.Sweep.get(
- self.client,
- self.entity,
- self.project,
- run.sweep_name,
- withRuns=False,
- )
- self._sweeps[run.sweep_name] = sweep
- if sweep is None:
- continue
- run.sweep = sweep
- return objs
- @normalize_exceptions
- def histories(
- self,
- samples: int = 500,
- keys: list[str] | None = None,
- x_axis: str = "_step",
- format: Literal["default", "pandas", "polars"] = "default",
- stream: Literal["default", "system"] = "default",
- ) -> list[dict[str, Any]] | pd.DataFrame | pl.DataFrame:
- """Return sampled history metrics for all runs that fit the filters conditions.
- Args:
- samples: The number of samples to return per run
- keys: Only return metrics for specific keys
- x_axis: Use this metric as the xAxis defaults to _step
- format: Format to return data in, options are "default", "pandas",
- "polars"
- stream: "default" for metrics, "system" for machine metrics
- Returns:
- pandas.DataFrame: If `format="pandas"`, returns a `pandas.DataFrame`
- of history metrics.
- polars.DataFrame: If `format="polars"`, returns a `polars.DataFrame`
- of history metrics.
- list of dicts: If `format="default"`, returns a list of dicts
- containing history metrics with a `run_id` key.
- """
- if format not in ("default", "pandas", "polars"):
- raise ValueError(
- f"Invalid format: {format}. Must be one of 'default', 'pandas', 'polars'"
- )
- histories = []
- if format == "default":
- for run in self:
- history_data = run.history(
- samples=samples,
- keys=keys,
- x_axis=x_axis,
- pandas=False,
- stream=stream,
- )
- if not history_data:
- continue
- for entry in history_data:
- entry["run_id"] = run.id
- histories.extend(history_data)
- return histories
- if format == "pandas":
- pd = util.get_module(
- "pandas", required="Exporting pandas DataFrame requires pandas"
- )
- for run in self:
- history_data = run.history(
- samples=samples,
- keys=keys,
- x_axis=x_axis,
- pandas=False,
- stream=stream,
- )
- if not history_data:
- continue
- df = pd.DataFrame.from_records(history_data)
- df["run_id"] = run.id
- histories.append(df)
- if not histories:
- return pd.DataFrame()
- combined_df = pd.concat(histories)
- combined_df.reset_index(drop=True, inplace=True)
- # sort columns for consistency
- combined_df = combined_df[(sorted(combined_df.columns))]
- return combined_df
- if format == "polars":
- pl = util.get_module(
- "polars", required="Exporting polars DataFrame requires polars"
- )
- for run in self:
- history_data = run.history(
- samples=samples,
- keys=keys,
- x_axis=x_axis,
- pandas=False,
- stream=stream,
- )
- if not history_data:
- continue
- df = pl.from_records(history_data)
- df = df.with_columns(pl.lit(run.id).alias("run_id"))
- histories.append(df)
- if not histories:
- return pl.DataFrame()
- combined_df = pl.concat(histories, how="vertical")
- # sort columns for consistency
- combined_df = combined_df.select(sorted(combined_df.columns))
- return combined_df
- def __repr__(self) -> str:
- return f"<{nameof(type(self))} {self.entity}/{self.project}>"
- def upgrade_to_full(self) -> None:
- """Upgrade this Runs collection from lazy to full mode.
- This switches to fetching full run data and
- upgrades any already-loaded Run objects to have full data.
- Uses parallel loading for better performance when upgrading multiple runs.
- """
- if not self._lazy:
- return # Already in full mode
- # Switch to full mode
- self._lazy = False
- # Regenerate query with full fragment
- self.QUERY = _create_runs_query(lazy=False)
- # Upgrade any existing runs that have been loaded - use parallel loading for performance
- lazy_runs = [run for run in self.objects if run._lazy]
- if lazy_runs:
- from concurrent.futures import ThreadPoolExecutor
- # Limit workers to avoid overwhelming the server
- max_workers = min(len(lazy_runs), 10)
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- futures = [executor.submit(run.load_full_data) for run in lazy_runs]
- # Wait for all to complete
- for future in futures:
- future.result()
- class AgentRuns(SizedPaginator["Run"]):
- """A lazy iterator of `Run` objects for a single sweep agent.
- <!-- lazydoc-ignore-class: internal -->
- """
- def __init__(
- self,
- client: RetryingClient,
- entity: str,
- project: str,
- sweep_id: str,
- agent_key: str,
- *,
- total_runs: int,
- order: str = "+created_at",
- per_page: int = 50,
- service_api: ServiceApi | None = None,
- ) -> None:
- self.QUERY = gql(GET_AGENT_RUNS_GQL)
- self.entity = entity
- self.project = project
- self._sweep_id = sweep_id
- self._agent_key = agent_key
- self.order = order
- self._sweeps: dict[str, public.Sweep] = {}
- self._service_api = service_api
- self._total_runs = total_runs
- self.per_page = per_page
- variables = {
- "project": self.project,
- "entity": self.entity,
- "order": self.order,
- "agentID": self._agent_key,
- "sweep": self._sweep_id,
- "after": None,
- "before": None,
- "first": self.per_page,
- "last": None,
- }
- super().__init__(client, variables, per_page)
- @override
- def update_variables(self) -> None:
- """Map paginator state to GetAgentRuns variables (after/first, not cursor/perPage)."""
- self.variables.update(
- {
- "first": self.per_page,
- "after": self.cursor,
- "before": None,
- "last": None,
- }
- )
- @property
- @override
- def _length(self) -> int:
- return self._total_runs
- def _parsed(self) -> GetAgentRuns:
- assert self.last_response is not None
- return GetAgentRuns.model_validate(self.last_response)
- def _agent_runs_connection(self):
- parsed = self._parsed()
- if not parsed.project:
- raise ValueError(f"Could not find project {self.project!r} for agent runs.")
- if not parsed.project.sweep:
- raise ValueError(f"Could not find sweep {self._sweep_id!r} for agent runs.")
- if not parsed.project.sweep.agent:
- raise ValueError(
- f"Could not find agent {self._agent_key!r} for agent runs."
- )
- return parsed.project.sweep.agent.runs
- @property
- @override
- def more(self) -> bool:
- return self.last_response is None or bool(
- self._agent_runs_connection().page_info.has_next_page
- )
- @property
- @override
- def cursor(self) -> str | None:
- if not self.last_response:
- return None
- edges = self._agent_runs_connection().edges
- return edges[-1].cursor if edges else None
- @override
- def convert_objects(self) -> list[Run]:
- """Convert the current GraphQL page into :class:`Run` instances for this agent."""
- objs = []
- for edge in self._agent_runs_connection().edges:
- node = edge.node.model_dump(by_alias=True)
- run = Run(
- self.client,
- self.entity,
- self.project,
- node["name"],
- node,
- include_sweeps=False,
- lazy=True,
- service_api=self._service_api,
- )
- objs.append(run)
- return objs
- @override
- def __repr__(self) -> str:
- return f"<{nameof(type(self))} {self.entity}/{self.project} agent={self._agent_key!r}>"
- class Run(Attrs):
- """A single run associated with an entity and project.
- Args:
- client: The W&B API client.
- entity: The entity associated with the run.
- project: The project associated with the run.
- run_id: The unique identifier for the run.
- attrs: The attributes of the run.
- include_sweeps: Whether to include sweeps in the run.
- Attributes:
- tags ([str]): a list of tags associated with the run
- url (str): the url of this run
- id (str): unique identifier for the run (defaults to eight characters)
- name (str): the name of the run
- state (str): one of: running, finished, crashed, killed, preempting, preempted
- config (dict): a dict of hyperparameters associated with the run
- created_at (str): ISO timestamp when the run was started
- system_metrics (dict): the latest system metrics recorded for the run
- summary (dict): A mutable dict-like property that holds the current summary.
- Calling update will persist any changes.
- project (str): the project associated with the run
- entity (str): the name of the entity associated with the run
- project_internal_id (int): the internal id of the project
- user (str): the name of the user who created the run
- path (str): Unique identifier [entity]/[project]/[run_id]
- notes (str): Notes about the run
- read_only (boolean): Whether the run is editable
- history_keys (str): History metric keys logged with `wandb.Run.log({"key": "value"})`
- metadata (str): Metadata about the run from wandb-metadata.json
- """
- def __init__(
- self,
- client: RetryingClient,
- entity: str,
- project: str,
- run_id: str,
- attrs: Mapping | None = None,
- include_sweeps: bool = True,
- lazy: bool = True,
- service_api: ServiceApi | None = None,
- ):
- """Initialize a Run object.
- Run is always initialized by calling api.runs() where api is an instance of
- wandb.Api.
- """
- _attrs = attrs or {}
- super().__init__(dict(_attrs))
- self.client = client
- self._entity = entity
- self.project = project
- self._files = {}
- self._base_dir = env.get_dir(tempfile.gettempdir())
- self.id = run_id
- self.sweep = None
- self._include_sweeps = include_sweeps
- self._lazy = lazy
- self._full_data_loaded = False # Track if we've loaded full data
- self.dir = os.path.join(self._base_dir, *self.path)
- try:
- os.makedirs(self.dir)
- except OSError:
- pass
- self._summary = None
- self._metadata: dict[str, Any] | None = None
- self._state = _attrs.get("state", "not found")
- self.server_provides_internal_id_field: bool | None = None
- self._is_loaded: bool = False
- self._service_api: ServiceApi | None = service_api
- self.load(force=not _attrs)
- @property
- def state(self) -> str:
- """The state of the run. Can be one of: Finished, Failed, Crashed, or Running."""
- return self._state
- @property
- def entity(self) -> str:
- """The entity associated with the run."""
- return self._entity
- @property
- def username(self) -> str:
- """This API is deprecated. Use `entity` instead."""
- wandb.termwarn("Run.username is deprecated. Please use Run.entity instead.")
- return self._entity
- @property
- def storage_id(self) -> str:
- """The unique storage identifier for the run."""
- # For compatibility with wandb.Run, which has storage IDs
- # in self.storage_id and names in self.id.
- return self._attrs.get("id")
- @property
- def id(self) -> str:
- """The unique identifier for the run."""
- return self._attrs.get("name")
- @id.setter
- def id(self, new_id: str) -> None:
- """Set the unique identifier for the run."""
- self._attrs["name"] = new_id
- @property
- def name(self) -> str | None:
- """The name of the run."""
- return self._attrs.get("displayName")
- @name.setter
- def name(self, new_name: str) -> None:
- """Set the name of the run."""
- self._attrs["displayName"] = new_name
- @classmethod
- def create(
- cls,
- api: public.Api,
- run_id: str | None = None,
- project: str | None = None,
- entity: str | None = None,
- state: Literal["running", "pending"] = "running",
- ) -> Self:
- """Create a run for the given project.
- For most use cases, use `wandb.init()`. `wandb.init()` provides more robust
- logic for creating and updating runs. `wandb.apis.public.Run.create`
- is intended for specific scenarios such as creating runs in
- a "pending" state for jobs that may be unschedulable
- (for example, in a Kubernetes cluster with insufficient GPUs or high
- contention). These pending runs can later be resumed and tracked by W&B.
- Runs created with this method have limited functionality. Calling
- `update()` on a run created this way may not work as expected.
- Args:
- api: The W&B API instance.
- run_id: Optional run ID. If not provided, a random ID will be generated.
- project: Optional project name. Defaults to the project in API settings
- or "uncategorized".
- entity: Optional entity (user or team) name.
- state: Initial state of the run. Use "pending" for runs that will be
- resumed later, or "running" for immediate execution.
- Returns:
- A Run object representing the created run.
- Example:
- Creating a pending run for later execution
- ```python
- import wandb
- api = wandb.Api()
- run_name = "my-pending-run"
- run = Run.create(
- api=api,
- project="project",
- entity="entity",
- state="pending",
- run_id=run_name,
- )
- ```
- """
- api._sentry.message("Invoking Run.create", level="info")
- run_id = run_id or runid.generate_id()
- project = project or api.settings.get("project") or "uncategorized"
- mutation = gql(
- """
- mutation UpsertBucket($project: String, $entity: String, $name: String!, $state: String) {
- upsertBucket(input: {modelName: $project, entityName: $entity, name: $name, state: $state}) {
- bucket {
- project {
- name
- entity { name }
- }
- id
- name
- }
- inserted
- }
- }
- """
- )
- variables = {
- "entity": entity,
- "project": project,
- "name": run_id,
- "state": state,
- }
- res = api.client.execute(mutation, variable_values=variables)
- res = res["upsertBucket"]["bucket"]
- return cls(
- api.client,
- res["project"]["entity"]["name"],
- res["project"]["name"],
- res["name"],
- {
- "id": res["id"],
- "config": "{}",
- "systemMetrics": "{}",
- "summaryMetrics": "{}",
- "tags": [],
- "description": None,
- "notes": None,
- "state": state,
- },
- lazy=False, # Created runs should have full data available immediately
- )
- def _load_with_fragment(
- self, fragment: str, fragment_name: str, force: bool = False
- ) -> dict[str, Any]:
- """Load run data using specified GraphQL fragment."""
- query = gql(
- f"""#graphql
- query Run($project: String!, $entity: String!, $name: String!) {{
- project(name: $project, entityName: $entity) {{
- run(name: $name) {{
- projectId
- ...{fragment_name}
- }}
- }}
- }}
- {fragment}
- """
- )
- if force or not self._attrs:
- response = self._exec(query)
- if (
- response is None
- or response.get("project") is None
- or response["project"].get("run") is None
- ):
- raise ValueError("Could not find run {}".format(self))
- self._attrs = response["project"]["run"]
- self._state = self._attrs["state"]
- if self._attrs.get("user"):
- self.user = public.User(self.client, self._attrs["user"])
- if self._include_sweeps and self.sweep_name and not self.sweep:
- # There may be a lot of runs. Don't bother pulling them all
- # just for the sake of this one.
- self.sweep = public.Sweep.get(
- self.client,
- self.entity,
- self.project,
- self.sweep_name,
- withRuns=False,
- )
- if not self._is_loaded or force:
- # Always set _project_internal_id if projectId is available, regardless of fragment type
- if "projectId" in self._attrs:
- self._project_internal_id = int(self._attrs["projectId"])
- else:
- self._project_internal_id = None
- # Always call _load_from_attrs when using the full fragment or when the fields are actually present
- if fragment_name == RUN_FRAGMENT_NAME or (
- "config" in self._attrs
- or "summaryMetrics" in self._attrs
- or "systemMetrics" in self._attrs
- ):
- self._load_from_attrs()
- # Only mark as loaded for lightweight fragments, not full fragments
- if fragment_name == LIGHTWEIGHT_RUN_FRAGMENT_NAME:
- self._is_loaded = True
- return self._attrs
- def _load_from_attrs(self) -> dict[str, Any]:
- # Snapshot before mutating: only persist config/rawconfig when the response
- # included a config field (lazy runs omit it until load_full_data()).
- had_config_field = "config" in self._attrs
- self._state = self._attrs.get("state", None)
- # Only convert fields if they exist in _attrs
- if had_config_field:
- self._attrs["config"] = _convert_to_dict(self._attrs.get("config"))
- if "summaryMetrics" in self._attrs:
- self._attrs["summaryMetrics"] = _convert_to_dict(
- self._attrs.get("summaryMetrics")
- )
- if "systemMetrics" in self._attrs:
- self._attrs["systemMetrics"] = _convert_to_dict(
- self._attrs.get("systemMetrics")
- )
- # Only check for sweeps if sweep_name is available (not in lazy mode or if it exists)
- if self._include_sweeps and self._attrs.get("sweepName") and not self.sweep:
- # There may be a lot of runs. Don't bother pulling them all
- self.sweep = public.Sweep.get(
- self.client,
- self.entity,
- self.project,
- self._attrs["sweepName"],
- withRuns=False,
- )
- config_user, config_raw = {}, {}
- if self._attrs.get("config"):
- try:
- # config is already converted to dict by _convert_to_dict
- for key, value in self._attrs.get("config", {}).items():
- config = config_raw if key in WANDB_INTERNAL_KEYS else config_user
- if isinstance(value, dict) and "value" in value:
- config[key] = value["value"]
- else:
- config[key] = value
- except (TypeError, AttributeError):
- # Handle case where config is malformed or not a dict
- pass
- if had_config_field:
- config_raw.update(config_user)
- self._attrs["config"] = config_user
- self._attrs["rawconfig"] = config_raw
- if "user" in self._attrs:
- self.user = public.User(self.client, self._attrs["user"])
- return self._attrs
- def load(self, force: bool = False) -> dict[str, Any]:
- """Load run data using appropriate fragment based on lazy mode."""
- # Load any provided attrs
- if self._attrs:
- self._load_from_attrs()
- if self._lazy:
- return self._load_with_fragment(
- LIGHTWEIGHT_RUN_FRAGMENT, LIGHTWEIGHT_RUN_FRAGMENT_NAME, force
- )
- else:
- return self._load_with_fragment(RUN_FRAGMENT, RUN_FRAGMENT_NAME, force)
- @normalize_exceptions
- def wait_until_finished(self) -> None:
- """Check the state of the run until it is finished."""
- query = gql(
- """
- query RunState($project: String!, $entity: String!, $name: String!) {
- project(name: $project, entityName: $entity) {
- run(name: $name) {
- state
- }
- }
- }
- """
- )
- while True:
- res = self._exec(query)
- state = res["project"]["run"]["state"]
- if state in ["finished", "crashed", "failed"]:
- self._attrs["state"] = state
- self._state = state
- return
- time.sleep(5)
- @normalize_exceptions
- def update(self) -> None:
- """Persist changes to the run object to the wandb backend."""
- mutation = gql(
- """
- mutation UpsertBucket($id: String!, $description: String, $display_name: String, $notes: String, $tags: [String!], $config: JSONString!, $groupName: String, $jobType: String) {{
- upsertBucket(input: {{id: $id, description: $description, displayName: $display_name, notes: $notes, tags: $tags, config: $config, groupName: $groupName, jobType: $jobType}}) {{
- bucket {{
- ...RunFragment
- }}
- }}
- }}
- {}
- """.format(RUN_FRAGMENT)
- )
- _ = self._exec(
- mutation,
- id=self.storage_id,
- tags=self.tags,
- description=self.description,
- notes=self.notes,
- display_name=self.display_name,
- config=self.json_config,
- groupName=self.group,
- jobType=self.job_type,
- )
- self.summary.update()
- @normalize_exceptions
- def delete(self, delete_artifacts: bool = False) -> None:
- """Delete the given run from the wandb backend.
- Args:
- delete_artifacts (bool, optional): Whether to delete the artifacts
- associated with the run.
- """
- mutation = gql(
- """
- mutation DeleteRun(
- $id: ID!,
- {}
- ) {{
- deleteRun(input: {{
- id: $id,
- {}
- }}) {{
- clientMutationId
- }}
- }}
- """.format(
- "$deleteArtifacts: Boolean" if delete_artifacts else "",
- "deleteArtifacts: $deleteArtifacts" if delete_artifacts else "",
- )
- )
- self.client.execute(
- mutation,
- variable_values={
- "id": self.storage_id,
- "deleteArtifacts": delete_artifacts,
- },
- )
- def save(self) -> None:
- """Persist changes to the run object to the W&B backend."""
- self.update()
- @normalize_exceptions
- def update_state(self, state: Literal["pending"]) -> bool:
- """Update the state of a run.
- Allows transitioning runs from 'failed' or 'crashed' to 'pending'.
- Args:
- state: The target run state. Only `"pending"` is supported.
- Returns:
- `True` if the state was successfully updated.
- Raises:
- `wandb.Error`: If the requested state transition is not allowed, or the server
- does not support this operation.
- """
- mutation = gql(
- """
- mutation UpdateRunState($input: UpdateRunStateInput!) {
- updateRunState(input: $input) {
- success
- }
- }
- """
- )
- try:
- result = self.client.execute(
- mutation,
- variable_values={
- "input": {
- "id": self.storage_id,
- "state": state,
- }
- },
- )
- except Exception as e:
- error_msg = str(e)
- if "UpdateRunStateInput" in error_msg or "updateRunState" in error_msg:
- raise wandb.Error(
- "The server does not support the update_state operation. "
- "Please ensure your W&B server is updated to a version that "
- "supports run state transitions."
- ) from e
- if "invalid state transition" in error_msg.lower():
- raise wandb.Error(
- f"Invalid state transition: cannot change run from '{self.state}' "
- f"to '{state}'. Only runs in 'failed' or 'crashed' state can be "
- "transitioned to 'pending'."
- ) from e
- raise
- if result.get("updateRunState", {}).get("success"):
- self._attrs["state"] = state
- self._state = state
- return True
- return False
- @property
- def json_config(self) -> str:
- """Return the run config as a JSON string.
- <!-- lazydoc-ignore: internal -->
- """
- config = {}
- if "_wandb" in self.rawconfig:
- config["_wandb"] = {"value": self.rawconfig["_wandb"], "desc": None}
- for k, v in self.config.items():
- config[k] = {"value": v, "desc": None}
- return json.dumps(config)
- def _exec(self, query: Document, **kwargs: Any) -> dict[str, Any]:
- """Execute a query against the cloud backend."""
- variables = {"entity": self.entity, "project": self.project, "name": self.id}
- variables.update(kwargs)
- return self.client.execute(query, variable_values=variables)
- def _sampled_history(
- self,
- keys: list[str],
- x_axis: str = "_step",
- samples: int = 500,
- ) -> list[dict[str, Any]]:
- spec = {"keys": [x_axis] + keys, "samples": samples}
- query = gql(
- """
- query RunSampledHistory($project: String!, $entity: String!, $name: String!, $specs: [JSONString!]!) {
- project(name: $project, entityName: $entity) {
- run(name: $name) { sampledHistory(specs: $specs) }
- }
- }
- """
- )
- response = self._exec(query, specs=[json.dumps(spec)])
- # sampledHistory returns one list per spec, we only send one spec
- return response["project"]["run"]["sampledHistory"][0]
- def _full_history(
- self,
- samples: int = 500,
- stream: Literal["default", "system"] = "default",
- ) -> list[dict[str, Any]]:
- node = "history" if stream == "default" else "events"
- query = gql(
- """
- query RunFullHistory($project: String!, $entity: String!, $name: String!, $samples: Int) {{
- project(name: $project, entityName: $entity) {{
- run(name: $name) {{ {}(samples: $samples) }}
- }}
- }}
- """.format(node)
- )
- response = self._exec(query, samples=samples)
- return [json.loads(line) for line in response["project"]["run"][node]]
- @normalize_exceptions
- def files(
- self,
- names: list[str] | None = None,
- pattern: str | None = None,
- per_page: int = 50,
- ) -> public.Files:
- """Returns a `Files` object for all files in the run which match the given criteria.
- You can specify a list of exact file names to match, or a pattern to match against.
- If both are provided, the pattern will be ignored.
- Args:
- names (list): names of the requested files, if empty returns all files
- pattern (str, optional): Pattern to match when returning files from W&B.
- This pattern uses mySQL's LIKE syntax,
- so matching all files that end with .json would be "%.json".
- If both names and pattern are provided, a ValueError will be raised.
- per_page (int): number of results per page.
- Returns:
- A `Files` object, which is an iterator over `File` objects.
- """
- return public.Files(
- self.client,
- self,
- names or [],
- pattern=pattern,
- per_page=per_page,
- )
- @normalize_exceptions
- def file(self, name: str) -> public.File:
- """Return the path of a file with a given name in the artifact.
- Args:
- name (str): name of requested file.
- Returns:
- A `File` matching the name argument.
- """
- return public.Files(self.client, self, [name])[0]
- @normalize_exceptions
- def upload_file(self, path: str, root: str = ".") -> public.File:
- """Upload a local file to W&B, associating it with this run.
- Args:
- path (str): Path to the file to upload. Can be absolute or relative.
- root (str): The root path to save the file relative to. For example,
- if you want to have the file saved in the run as "my_dir/file.txt"
- and you're currently in "my_dir" you would set root to "../".
- Defaults to current directory (".").
- Returns:
- A `File` object representing the uploaded file.
- """
- api = InternalApi(
- default_settings={"entity": self.entity, "project": self.project},
- retry_timedelta=RETRY_TIMEDELTA,
- )
- api.set_current_run_id(self.id)
- root = os.path.abspath(root)
- name = os.path.relpath(path, root)
- upload_path = util.make_file_path_upload_safe(name)
- with open(os.path.join(root, name), "rb") as f:
- api.push({LogicalPath(upload_path): f})
- return public.Files(self.client, self, [name])[0]
- @normalize_exceptions
- def history(
- self,
- samples: int = 500,
- keys: list[str] | None = None,
- x_axis: str = "_step",
- pandas: bool = True,
- stream: Literal["default", "system"] = "default",
- ) -> list[dict[str, Any]] | pd.DataFrame:
- """Return sampled history metrics for a run.
- This is simpler and faster if you are ok with the history records being sampled.
- Args:
- samples : (int, optional) The number of samples to return
- pandas : (bool, optional) Return a pandas dataframe
- keys : (list, optional) Only return metrics for specific keys
- x_axis : (str, optional) Use this metric as the xAxis defaults to _step
- stream : (str, optional) "default" for metrics, "system" for machine metrics
- Returns:
- pandas.DataFrame: If pandas=True returns a `pandas.DataFrame` of history
- metrics.
- list of dicts: If pandas=False returns a list of dicts of history metrics.
- """
- if keys is not None and not isinstance(keys, list):
- wandb.termerror("keys must be specified in a list")
- return []
- if keys is not None and len(keys) > 0 and not isinstance(keys[0], str):
- wandb.termerror("keys argument must be a list of strings")
- return []
- if keys and stream != "default":
- wandb.termerror("stream must be default when specifying keys")
- return []
- elif keys:
- lines = self._sampled_history(keys=keys, x_axis=x_axis, samples=samples)
- else:
- lines = self._full_history(samples=samples, stream=stream)
- if pandas:
- pd = util.get_module("pandas")
- if pd:
- lines = pd.DataFrame.from_records(lines)
- else:
- wandb.termwarn("Unable to load pandas, call history with pandas=False")
- return lines
- @normalize_exceptions
- def scan_history(
- self,
- keys: list[str] | None = None,
- page_size: int = 1_000,
- min_step: int | None = None,
- max_step: int | None = None,
- ) -> Iterator[dict[str, Any]]:
- """Returns an iterable collection of all history records for a run.
- Args:
- keys ([str], optional): only fetch these keys, and only fetch rows that have all of keys defined.
- page_size (int, optional): size of pages to fetch from the api.
- min_step (int, optional): the minimum number of pages to scan at a time.
- max_step (int, optional): the maximum number of pages to scan at a time.
- Returns:
- An iterable collection over history records (dict).
- Example:
- Export all the loss values for an example run
- ```python
- run = api.run("entity/project-name/run-id")
- history = run.scan_history(keys=["Loss"])
- losses = [row["Loss"] for row in history]
- ```
- """
- if keys is not None and not isinstance(keys, list):
- wandb.termerror("keys must be specified in a list")
- return []
- if keys is not None and len(keys) > 0 and not isinstance(keys[0], str):
- wandb.termerror("keys argument must be a list of strings")
- return []
- last_step = self.lastHistoryStep
- # set defaults for min/max step
- if min_step is None:
- min_step = 0
- if max_step is None:
- max_step = last_step + 1
- # if the max step is past the actual last step, clamp it down
- if max_step > last_step:
- max_step = last_step + 1
- if keys is None:
- return public.HistoryScan(
- run=self,
- client=self.client,
- page_size=page_size,
- min_step=min_step,
- max_step=max_step,
- )
- else:
- return public.SampledHistoryScan(
- run=self,
- client=self.client,
- keys=keys,
- page_size=page_size,
- min_step=min_step,
- max_step=max_step,
- )
- @normalize_exceptions
- def logged_artifacts(self, per_page: int = 100) -> public.RunArtifacts:
- """Fetches all artifacts logged by this run.
- Retrieves all output artifacts that were logged during the run. Returns a
- paginated result that can be iterated over or collected into a single list.
- Args:
- per_page: Number of artifacts to fetch per API request.
- Returns:
- An iterable collection of all Artifact objects logged as outputs during this run.
- Example:
- ```python
- import wandb
- import tempfile
- with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as tmp:
- tmp.write("This is a test artifact")
- tmp_path = tmp.name
- run = wandb.init(project="artifact-example")
- artifact = wandb.Artifact("test_artifact", type="dataset")
- artifact.add_file(tmp_path)
- run.log_artifact(artifact)
- run.finish()
- api = wandb.Api()
- finished_run = api.run(f"{run.entity}/{run.project}/{run.id}")
- for logged_artifact in finished_run.logged_artifacts():
- print(logged_artifact.name)
- ```
- """
- return public.RunArtifacts(self.client, self, mode="logged", per_page=per_page)
- @normalize_exceptions
- def used_artifacts(self, per_page: int = 100) -> public.RunArtifacts:
- """Fetches artifacts explicitly used by this run.
- Retrieves only the input artifacts that were explicitly declared as used
- during the run, typically via `run.use_artifact()`. Returns a paginated
- result that can be iterated over or collected into a single list.
- Args:
- per_page: Number of artifacts to fetch per API request.
- Returns:
- An iterable collection of Artifact objects explicitly used as inputs in this run.
- Example:
- ```python
- import wandb
- run = wandb.init(project="artifact-example")
- run.use_artifact("test_artifact:latest")
- run.finish()
- api = wandb.Api()
- finished_run = api.run(f"{run.entity}/{run.project}/{run.id}")
- for used_artifact in finished_run.used_artifacts():
- print(used_artifact.name)
- test_artifact
- ```
- """
- return public.RunArtifacts(self.client, self, mode="used", per_page=per_page)
- @normalize_exceptions
- def use_artifact(
- self,
- artifact: wandb.Artifact,
- use_as: str | None = None,
- ) -> wandb.Artifact:
- """Declare an artifact as an input to a run.
- Args:
- artifact (`Artifact`): An artifact returned from
- `wandb.Api().artifact(name)`
- use_as (string, optional): A string identifying
- how the artifact is used in the script. Used
- to easily differentiate artifacts used in a
- run, when using the beta wandb launch
- feature's artifact swapping functionality.
- Returns:
- An `Artifact` object.
- """
- api = InternalApi(
- default_settings={"entity": self.entity, "project": self.project},
- retry_timedelta=RETRY_TIMEDELTA,
- )
- api.set_current_run_id(self.id)
- if isinstance(artifact, wandb.Artifact) and not artifact.is_draft():
- api.use_artifact(
- artifact.id,
- use_as=use_as or artifact.name,
- artifact_entity_name=artifact.entity,
- artifact_project_name=artifact.project,
- )
- return artifact
- elif isinstance(artifact, wandb.Artifact) and artifact.is_draft():
- raise ValueError(
- "Only existing artifacts are accepted by this api. "
- "Manually create one with `wandb artifact put`"
- )
- else:
- raise ValueError("You must pass a wandb.Api().artifact() to use_artifact")
- @normalize_exceptions
- def log_artifact(
- self,
- artifact: wandb.Artifact,
- aliases: Collection[str] | None = None,
- tags: Collection[str] | None = None,
- ) -> wandb.Artifact:
- """Declare an artifact as output of a run.
- Args:
- artifact (`Artifact`): An artifact returned from
- `wandb.Api().artifact(name)`.
- aliases (list, optional): Aliases to apply to this artifact.
- tags: (list, optional) Tags to apply to this artifact, if any.
- Returns:
- A `Artifact` object.
- """
- api = InternalApi(
- default_settings={"entity": self.entity, "project": self.project},
- retry_timedelta=RETRY_TIMEDELTA,
- )
- api.set_current_run_id(self.id)
- if not isinstance(artifact, wandb.Artifact):
- raise TypeError("You must pass a wandb.Api().artifact() to use_artifact")
- if artifact.is_draft():
- raise ValueError(
- "Only existing artifacts are accepted by this api. "
- "Manually create one with `wandb artifact put`"
- )
- if (
- self.entity != artifact.source_entity
- or self.project != artifact.source_project
- ):
- raise ValueError("A run can't log an artifact to a different project.")
- artifact_collection_name = artifact.source_name.split(":")[0]
- api.create_artifact(
- artifact.type,
- artifact_collection_name,
- artifact.digest,
- entity_name=self.entity,
- project_name=self.project,
- aliases=aliases,
- tags=tags,
- )
- return artifact
- def load_full_data(self, force: bool = False) -> dict[str, Any]:
- """Load full run data including heavy fields like config, systemMetrics, summaryMetrics.
- This method is useful when you initially used lazy=True for listing runs,
- but need access to the full data for specific runs.
- Args:
- force: Force reload even if data is already loaded
- Returns:
- The loaded run attributes
- """
- if not self._lazy and not force:
- # Already in full mode, no need to reload
- return self._attrs
- # Load full data and mark as loaded
- result = self._load_with_fragment(RUN_FRAGMENT, RUN_FRAGMENT_NAME, force=True)
- self._full_data_loaded = True
- return result
- @property
- def config(self) -> dict[str, Any]:
- """Get run config. Auto-loads full data if in lazy mode."""
- if self._lazy and not self._full_data_loaded and "config" not in self._attrs:
- self.load_full_data()
- # Ensure config is always converted to dict (defensive against conversion issues)
- config_value = self._attrs.get("config", {})
- # _convert_to_dict handles dict inputs (noop) and converts str/bytes/bytearray to dict
- config_value = _convert_to_dict(config_value)
- self._attrs["config"] = config_value
- return config_value
- @property
- def summary(self) -> HTTPSummary:
- """Get run summary metrics. Auto-loads full data if in lazy mode."""
- if (
- self._lazy
- and not self._full_data_loaded
- and "summaryMetrics" not in self._attrs
- ):
- self.load_full_data()
- if self._summary is None:
- from wandb.old.summary import HTTPSummary
- # TODO: fix the outdir issue
- self._summary = HTTPSummary(self, self.client, summary=self.summary_metrics)
- return self._summary
- @property
- def system_metrics(self) -> dict[str, Any]:
- """Get run system metrics. Auto-loads full data if in lazy mode."""
- if (
- self._lazy
- and not self._full_data_loaded
- and "systemMetrics" not in self._attrs
- ):
- self.load_full_data()
- # Ensure systemMetrics is always converted to dict (defensive against conversion issues)
- system_metrics_value = self._attrs.get("systemMetrics", {})
- # _convert_to_dict handles dict inputs (noop) and converts str/bytes/bytearray to dict
- system_metrics_value = _convert_to_dict(system_metrics_value)
- self._attrs["systemMetrics"] = system_metrics_value
- return system_metrics_value
- @property
- def summary_metrics(self) -> dict[str, Any]:
- """Get run summary metrics. Auto-loads full data if in lazy mode."""
- if (
- self._lazy
- and not self._full_data_loaded
- and "summaryMetrics" not in self._attrs
- ):
- self.load_full_data()
- # Ensure summaryMetrics is always converted to dict (defensive against conversion issues)
- summary_metrics_value = self._attrs.get("summaryMetrics", {})
- # _convert_to_dict handles dict inputs (noop) and converts str/bytes/bytearray to dict
- summary_metrics_value = _convert_to_dict(summary_metrics_value)
- self._attrs["summaryMetrics"] = summary_metrics_value
- return summary_metrics_value
- @property
- def rawconfig(self) -> dict[str, Any]:
- """Get raw run config including internal keys. Auto-loads full data if in lazy mode."""
- if self._lazy and not self._full_data_loaded and "rawconfig" not in self._attrs:
- self.load_full_data()
- return self._attrs.get("rawconfig", {})
- @property
- def sweep_name(self) -> str | None:
- """Get sweep name. Always available since sweepName is in lightweight fragment."""
- # sweepName is included in lightweight fragment, so no need to load full data
- return self._attrs.get("sweepName")
- @property
- def path(self) -> list[str]:
- """The path of the run. The path is a list containing the entity, project, and run_id."""
- return [
- urllib.parse.quote_plus(str(self.entity)),
- urllib.parse.quote_plus(str(self.project)),
- urllib.parse.quote_plus(str(self.id)),
- ]
- @property
- def url(self) -> str:
- """The URL of the run.
- The run URL is generated from the entity, project, and run_id. For
- SaaS users, it takes the form of `https://wandb.ai/entity/project/run_id`.
- """
- path = self.path
- path.insert(2, "runs")
- return self.client.app_url + "/".join(path)
- @property
- def metadata(self) -> dict[str, Any] | None:
- """Metadata about the run from wandb-metadata.json.
- Metadata includes the run's description, tags, start time, memory
- usage and more.
- """
- if self._metadata is None:
- try:
- f = self.file("wandb-metadata.json")
- session = self.client._client.transport.session
- response = session.get(f.url, timeout=5)
- response.raise_for_status()
- contents = response.content
- self._metadata = json_util.loads(contents)
- except: # noqa: E722
- # file doesn't exist, or can't be downloaded, or can't be parsed
- pass
- return self._metadata
- @property
- def lastHistoryStep(self) -> int: # noqa: N802
- """Returns the last step logged in the run's history."""
- query = gql(
- """
- query RunHistoryKeys($project: String!, $entity: String!, $name: String!) {
- project(name: $project, entityName: $entity) {
- run(name: $name) { historyKeys }
- }
- }
- """
- )
- response = self._exec(query)
- if (
- response is None
- or response.get("project") is None
- or response["project"].get("run") is None
- or response["project"]["run"].get("historyKeys") is None
- ):
- return -1
- history_keys = response["project"]["run"]["historyKeys"]
- return history_keys.get("lastStep", -1)
- def to_html(self, height: int = 420, hidden: bool = False) -> str:
- """Generate HTML containing an iframe displaying this run."""
- url = self.url + "?jupyter=true"
- style = f"border:none;width:100%;height:{height}px;"
- prefix = ""
- if hidden:
- style += "display:none;"
- prefix = ipython.toggle_button()
- return prefix + f"<iframe src={url!r} style={style!r}></iframe>"
- def _repr_html_(self) -> str:
- if ipython.in_vscode_notebook():
- import html
- return html.escape(self._string_representation())
- return self.to_html()
- def __repr__(self) -> str:
- return self._string_representation()
- def _string_representation(self) -> str:
- return f"<{nameof(type(self))} {'/'.join(self.path)} ({self.state})>"
- def beta_scan_history(
- self,
- keys: list[str] | None = None,
- page_size: int = 1_000,
- min_step: int = 0,
- max_step: int | None = None,
- use_cache: bool = True,
- ) -> public.BetaHistoryScan:
- """Returns an iterable collection of all history records for a run.
- This function is still in development and may not work as expected.
- It uses wandb-core to read history from a run's exported
- parquet history locally.
- Args:
- keys: list of metrics to read from the run's history.
- if no keys are provided then all metrics will be returned.
- page_size: the number of history records to read at a time.
- min_step: The minimum step to start reading history from (inclusive).
- max_step: The maximum step to read history up to (exclusive).
- use_cache: When set to True, checks the WANDB_CACHE_DIR for a run history.
- If the run history is not found in the cache, it will be downloaded from the server.
- If set to False, the run history will be downloaded every time.
- Returns:
- A BetaHistoryScan object,
- which can be iterator over to get history records.
- """
- if self._service_api is None:
- settings = wandb_setup.singleton().settings.model_copy()
- self._service_api = ServiceApi(settings=settings)
- beta_history_scan = public.BetaHistoryScan(
- service_api=self._service_api,
- run=self,
- min_step=min_step,
- max_step=max_step or self.lastHistoryStep + 1,
- keys=keys,
- page_size=page_size,
- use_cache=use_cache,
- )
- return beta_history_scan
- def download_history_exports(
- self,
- download_dir: pathlib.Path | str,
- require_complete_history: bool = True,
- ) -> runhistory.DownloadHistoryResult:
- """Download any parquet history files for the run to the provided directory.
- Args:
- download_dir: The directory to download the history files to.
- require_complete_history: Whether to require the complete history to be downloaded.
- If true, and the run contains data that has not been exported to parquet files yet,
- an IncompleteRunHistoryError will be raised.
- Returns:
- A DownloadHistoryResult.
- Raises:
- IncompleteRunHistoryError: If require_complete_history is True
- and the run contains data not yet exported to parquet files.
- WandbApiFailedError: If the API request fails for reasons other than
- incomplete history.
- """
- init_download_request = apb.DownloadRunHistoryInit(
- entity=self.entity,
- project=self.project,
- run_id=self.id,
- download_dir=str(download_dir),
- require_complete_history=require_complete_history,
- )
- api_request = apb.ApiRequest(
- read_run_history_request=apb.ReadRunHistoryRequest(
- download_run_history_init=init_download_request,
- )
- )
- if self._service_api is None:
- settings = wandb_setup.singleton().settings.model_copy()
- self._service_api = ServiceApi(settings=settings)
- response: apb.ApiResponse
- try:
- response = self._service_api.send_api_request(api_request)
- except WandbApiFailedError as e:
- if (
- e.response is not None
- and e.response.error_type == apb.ErrorType.INCOMPLETE_RUN_HISTORY_ERROR
- ):
- raise runhistory.IncompleteRunHistoryError() from None
- else:
- raise WandbApiFailedError("Failed to download history") from e
- contains_live_data = response.read_run_history_response.download_run_history_init.contains_live_data
- request_id = (
- response.read_run_history_response.download_run_history_init.request_id
- )
- return wandb_setup.singleton().asyncer.run(
- lambda: runhistory.wait_for_download_with_progress(
- self._service_api,
- request_id,
- contains_live_data,
- )
- )
|