job_builder.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. """job builder."""
  2. from __future__ import annotations
  3. import json
  4. import logging
  5. import os
  6. import re
  7. import sys
  8. from typing import TYPE_CHECKING, Any, Callable, Literal, TypedDict
  9. import wandb
  10. from wandb.sdk.artifacts._internal_artifact import InternalArtifact
  11. from wandb.sdk.artifacts.artifact import Artifact
  12. from wandb.sdk.data_types._dtypes import TypeRegistry
  13. from wandb.sdk.internal.internal_api import Api
  14. from wandb.sdk.lib.filenames import DIFF_FNAME, METADATA_FNAME, REQUIREMENTS_FNAME
  15. from wandb.util import make_artifact_name_safe
  16. from .settings_static import SettingsStatic
  17. _logger = logging.getLogger(__name__)
  18. if TYPE_CHECKING:
  19. from wandb.proto.wandb_internal_pb2 import ArtifactRecord
  20. FROZEN_REQUIREMENTS_FNAME = "requirements.frozen.txt"
  21. JOB_FNAME = "wandb-job.json"
  22. JOB_ARTIFACT_TYPE = "job"
  23. LOG_LEVEL = Literal["log", "warn", "error"]
  24. class Version:
  25. def __init__(self, major: int, minor: int, patch: int):
  26. self._major = major
  27. self._minor = minor
  28. self._patch = patch
  29. def __repr__(self) -> str:
  30. return f"{self._major}.{self._minor}.{self._patch}"
  31. def __lt__(self, other: Version) -> bool:
  32. if self._major < other._major:
  33. return True
  34. elif self._major == other._major:
  35. if self._minor < other._minor:
  36. return True
  37. elif self._minor == other._minor and self._patch < other._patch:
  38. return True
  39. return False
  40. def __eq__(self, other: object) -> bool:
  41. if not isinstance(other, Version):
  42. return NotImplemented
  43. return (
  44. self._major == other._major
  45. and self._minor == other._minor
  46. and self._patch == other._patch
  47. )
  48. # Minimum supported wandb version for keys in the source dict of wandb-job.json
  49. SOURCE_KEYS_MIN_SUPPORTED_VERSION = {
  50. "dockerfile": Version(0, 17, 0),
  51. "build_context": Version(0, 17, 0),
  52. }
  53. class GitInfo(TypedDict):
  54. remote: str
  55. commit: str
  56. class GitSourceDict(TypedDict):
  57. git: GitInfo
  58. entrypoint: list[str]
  59. notebook: bool
  60. build_context: str | None
  61. dockerfile: str | None
  62. class ArtifactSourceDict(TypedDict):
  63. artifact: str
  64. entrypoint: list[str]
  65. notebook: bool
  66. build_context: str | None
  67. dockerfile: str | None
  68. class ImageSourceDict(TypedDict):
  69. image: str
  70. class JobSourceDict(TypedDict, total=False):
  71. _version: str
  72. source_type: str
  73. source: GitSourceDict | ArtifactSourceDict | ImageSourceDict
  74. input_types: dict[str, Any]
  75. output_types: dict[str, Any]
  76. runtime: str | None
  77. services: dict[str, str]
  78. class ArtifactInfoForJob(TypedDict):
  79. id: str
  80. name: str
  81. def get_min_supported_for_source_dict(
  82. source: GitSourceDict | ArtifactSourceDict | ImageSourceDict,
  83. ) -> Version | None:
  84. """Get the minimum supported wandb version the source dict of wandb-job.json."""
  85. min_seen = None
  86. for key in source:
  87. new_ver = SOURCE_KEYS_MIN_SUPPORTED_VERSION.get(key)
  88. if new_ver and (min_seen is None or new_ver < min_seen):
  89. min_seen = new_ver
  90. return min_seen
  91. class JobBuilder:
  92. _settings: SettingsStatic
  93. _files_dir: str
  94. _metadatafile_path: str | None
  95. _requirements_path: str | None
  96. _config: dict[str, Any] | None
  97. _summary: dict[str, Any] | None
  98. _logged_code_artifact: ArtifactInfoForJob | None
  99. _disable: bool
  100. _partial_source_id: str | None # Partial job source artifact id.
  101. _aliases: list[str]
  102. _job_seq_id: str | None
  103. _job_version_alias: str | None
  104. _is_notebook_run: bool
  105. _verbose: bool
  106. _services: dict[str, str]
  107. def __init__(
  108. self,
  109. settings: SettingsStatic,
  110. verbose: bool = False,
  111. *,
  112. files_dir: str,
  113. ):
  114. """Instantiate a JobBuilder.
  115. Args:
  116. settings: Parameters for the job builder.
  117. In a run, this is the run's settings.
  118. Otherwise, this is a set of undocumented parameters,
  119. all of which should be made explicit like files_dir.
  120. files_dir: The directory where to write files.
  121. In a run, this should be the run's files directory.
  122. """
  123. self._settings = settings
  124. self._files_dir = files_dir
  125. self._metadatafile_path = None
  126. self._requirements_path = None
  127. self._config = None
  128. self._summary = None
  129. self._logged_code_artifact = None
  130. self._job_seq_id = None
  131. self._job_version_alias = None
  132. self._disable = settings.disable_job_creation or settings.x_disable_machine_info
  133. self._partial_source_id = None
  134. self._aliases = []
  135. self._source_type: Literal["repo", "artifact", "image"] | None = (
  136. settings.job_source # type: ignore[assignment]
  137. )
  138. self._is_notebook_run = self._get_is_notebook_run()
  139. self._verbose = verbose
  140. self._partial = False
  141. self._services = {}
  142. def set_config(self, config: dict[str, Any]) -> None:
  143. self._config = config
  144. def set_summary(self, summary: dict[str, Any]) -> None:
  145. self._summary = summary
  146. @property
  147. def disable(self) -> bool:
  148. return self._disable
  149. @disable.setter
  150. def disable(self, val: bool) -> None:
  151. self._disable = val
  152. @property
  153. def input_types(self) -> dict[str, Any]:
  154. return TypeRegistry.type_of(self._config).to_json()
  155. @property
  156. def output_types(self) -> dict[str, Any]:
  157. return TypeRegistry.type_of(self._summary).to_json()
  158. def set_partial_source_id(self, source_id: str) -> None:
  159. self._partial_source_id = source_id
  160. def _handle_server_artifact(
  161. self, res: dict | None, artifact: ArtifactRecord
  162. ) -> None:
  163. if artifact.type == "job" and res is not None:
  164. try:
  165. if res["artifactSequence"]["latestArtifact"] is None:
  166. self._job_version_alias = "v0"
  167. elif res["artifactSequence"]["latestArtifact"]["id"] == res["id"]:
  168. self._job_version_alias = (
  169. f"v{res['artifactSequence']['latestArtifact']['versionIndex']}"
  170. )
  171. else:
  172. self._job_version_alias = f"v{res['artifactSequence']['latestArtifact']['versionIndex'] + 1}"
  173. self._job_seq_id = res["artifactSequence"]["id"]
  174. except KeyError as e:
  175. _logger.info(f"Malformed response from ArtifactSaver.save {e}")
  176. if artifact.type == "code" and res is not None:
  177. self._logged_code_artifact = ArtifactInfoForJob(
  178. {
  179. "id": res["id"],
  180. "name": artifact.name,
  181. }
  182. )
  183. def _build_repo_job_source(
  184. self,
  185. program_relpath: str,
  186. metadata: dict[str, Any],
  187. ) -> tuple[GitSourceDict | None, str | None]:
  188. git_info: dict[str, str] = metadata.get("git", {})
  189. remote = git_info.get("remote")
  190. commit = git_info.get("commit")
  191. root = metadata.get("root")
  192. assert remote is not None
  193. assert commit is not None
  194. if self._is_notebook_run:
  195. if not os.path.exists(
  196. os.path.join(os.getcwd(), os.path.basename(program_relpath))
  197. ):
  198. return None, None
  199. if root is None or self._settings.x_jupyter_root is None:
  200. _logger.info("target path does not exist, exiting")
  201. return None, None
  202. assert self._settings.x_jupyter_root is not None
  203. # git notebooks set the root to the git root,
  204. # jupyter_root contains the path where the jupyter notebook was started
  205. # program_relpath contains the path from jupyter_root to the file
  206. # full program path here is actually the relpath from the program to the git root
  207. full_program_path = os.path.join(
  208. os.path.relpath(str(self._settings.x_jupyter_root), root),
  209. program_relpath,
  210. )
  211. full_program_path = os.path.normpath(full_program_path)
  212. # if the notebook server is started above the git repo need to clear all the ..s
  213. if full_program_path.startswith(".."):
  214. split_path = full_program_path.split("/")
  215. count_dots = 0
  216. for p in split_path:
  217. if p == "..":
  218. count_dots += 1
  219. full_program_path = "/".join(split_path[2 * count_dots :])
  220. else:
  221. full_program_path = program_relpath
  222. entrypoint = self._get_entrypoint(full_program_path, metadata)
  223. # TODO: update executable to a method that supports pex
  224. source: GitSourceDict = {
  225. "git": {"remote": remote, "commit": commit},
  226. "entrypoint": entrypoint,
  227. "notebook": self._is_notebook_run,
  228. "build_context": metadata.get("build_context"),
  229. "dockerfile": metadata.get("dockerfile"),
  230. }
  231. name = self._make_job_name(f"{remote}_{program_relpath}")
  232. return source, name
  233. def _log_if_verbose(self, message: str, level: LOG_LEVEL) -> None:
  234. log_func: Callable[[Any], None] | Callable[[Any], None] | None = None
  235. if level == "log":
  236. _logger.info(message)
  237. log_func = wandb.termlog
  238. elif level == "warn":
  239. _logger.warning(message)
  240. log_func = wandb.termwarn
  241. elif level == "error":
  242. _logger.error(message)
  243. log_func = wandb.termerror
  244. if self._verbose and log_func is not None:
  245. log_func(message)
  246. def _build_artifact_job_source(
  247. self,
  248. program_relpath: str,
  249. metadata: dict[str, Any],
  250. ) -> tuple[ArtifactSourceDict | None, str | None]:
  251. assert isinstance(self._logged_code_artifact, dict)
  252. # TODO: should we just always exit early if the path doesn't exist?
  253. if self._is_notebook_run and not self._is_colab_run():
  254. full_program_relpath = os.path.relpath(program_relpath, os.getcwd())
  255. # if the resolved path doesn't exist, then we shouldn't make a job because it will fail
  256. if not os.path.exists(full_program_relpath):
  257. # when users call log code in a notebook the code artifact starts
  258. # at the directory the notebook is in instead of the jupyter core
  259. if not os.path.exists(os.path.basename(program_relpath)):
  260. _logger.info("target path does not exist, exiting")
  261. self._log_if_verbose(
  262. "No program path found when generating artifact job source for a non-colab notebook run. See https://docs.wandb.ai/platform/launch/create-job",
  263. "warn",
  264. )
  265. return None, None
  266. full_program_relpath = os.path.basename(program_relpath)
  267. else:
  268. full_program_relpath = program_relpath
  269. entrypoint = self._get_entrypoint(full_program_relpath, metadata)
  270. # TODO: update executable to a method that supports pex
  271. source: ArtifactSourceDict = {
  272. "entrypoint": entrypoint,
  273. "notebook": self._is_notebook_run,
  274. "artifact": f"wandb-artifact://_id/{self._logged_code_artifact['id']}",
  275. "build_context": metadata.get("build_context"),
  276. "dockerfile": metadata.get("dockerfile"),
  277. }
  278. artifact_basename, *_ = self._logged_code_artifact["name"].split(":")
  279. name = self._make_job_name(artifact_basename)
  280. return source, name
  281. def _build_image_job_source(
  282. self, metadata: dict[str, Any]
  283. ) -> tuple[ImageSourceDict, str]:
  284. image_name = metadata.get("docker")
  285. assert isinstance(image_name, str)
  286. raw_image_name = image_name
  287. if ":" in image_name:
  288. tag = image_name.split(":")[-1]
  289. # if tag looks properly formatted, assume its a tag
  290. # regex: alphanumeric and "_" "-" "."
  291. if re.fullmatch(r"([a-zA-Z0-9_\-\.]+)", tag):
  292. raw_image_name = raw_image_name.replace(f":{tag}", "")
  293. self._aliases += [tag]
  294. source: ImageSourceDict = {
  295. "image": image_name,
  296. }
  297. name = self._make_job_name(raw_image_name)
  298. return source, name
  299. def _make_job_name(self, input_str: str) -> str:
  300. """Use job name from settings if provided, else use programmatic name."""
  301. if self._settings.job_name:
  302. return self._settings.job_name
  303. return make_artifact_name_safe(f"job-{input_str}")
  304. def _get_entrypoint(
  305. self,
  306. program_relpath: str,
  307. metadata: dict[str, Any],
  308. ) -> list[str]:
  309. # if building a partial job from CLI, overwrite entrypoint and notebook
  310. # should already be in metadata from create_job
  311. if self._partial and metadata.get("entrypoint"):
  312. entrypoint: list[str] = metadata["entrypoint"]
  313. return entrypoint
  314. # job is being built from a run
  315. entrypoint = [os.path.basename(sys.executable), program_relpath]
  316. return entrypoint
  317. def _get_is_notebook_run(self) -> bool:
  318. return hasattr(self._settings, "_jupyter") and bool(self._settings._jupyter)
  319. def _is_colab_run(self) -> bool:
  320. return hasattr(self._settings, "_colab") and bool(self._settings._colab)
  321. def _build_job_source(
  322. self,
  323. source_type: str,
  324. program_relpath: str | None,
  325. metadata: dict[str, Any],
  326. ) -> tuple[
  327. GitSourceDict | ArtifactSourceDict | ImageSourceDict | None,
  328. str | None,
  329. ]:
  330. """Construct a job source dict and name from the current run.
  331. Args:
  332. source_type (str): The type of source to build the job from. One of
  333. "repo", "artifact", or "image".
  334. """
  335. source: GitSourceDict | ArtifactSourceDict | ImageSourceDict | None = None
  336. if source_type == "repo":
  337. source, name = self._build_repo_job_source(
  338. program_relpath or "",
  339. metadata,
  340. )
  341. elif source_type == "artifact":
  342. source, name = self._build_artifact_job_source(
  343. program_relpath or "",
  344. metadata,
  345. )
  346. elif source_type == "image" and self._has_image_job_ingredients(metadata):
  347. source, name = self._build_image_job_source(metadata)
  348. else:
  349. source = None
  350. if source is None:
  351. if source_type:
  352. self._log_if_verbose(
  353. f"Source type is set to '{source_type}' but some required information is missing "
  354. "from the environment. A job will not be created from this run. See "
  355. "https://docs.wandb.ai/platform/launch/create-job",
  356. "warn",
  357. )
  358. return None, None
  359. return source, name
  360. def build(
  361. self,
  362. api: Api,
  363. build_context: str | None = None,
  364. dockerfile: str | None = None,
  365. base_image: str | None = None,
  366. ) -> Artifact | None:
  367. """Build a job artifact from the current run.
  368. Args:
  369. api (Api): The API object to use to create the job artifact.
  370. build_context (Optional[str]): Path within the job source code to
  371. the image build context. Saved as part of the job for future
  372. builds.
  373. dockerfile (Optional[str]): Path within the build context the
  374. Dockerfile. Saved as part of the job for future builds.
  375. base_image (Optional[str]): The base image used to run the job code.
  376. Returns:
  377. Optional[Artifact]: The job artifact if it was successfully built,
  378. otherwise None.
  379. """
  380. _logger.info("Attempting to build job artifact")
  381. # If a partial job was used, write the input/output types to the metadata
  382. # rather than building a new job version.
  383. if self._partial_source_id is not None:
  384. new_metadata = {
  385. "input_types": {"@wandb.config": self.input_types},
  386. "output_types": self.output_types,
  387. }
  388. api.update_artifact_metadata(
  389. self._partial_source_id,
  390. new_metadata,
  391. )
  392. return None
  393. if not os.path.exists(os.path.join(self._files_dir, REQUIREMENTS_FNAME)):
  394. self._log_if_verbose(
  395. "No requirements.txt found, not creating job artifact. See https://docs.wandb.ai/platform/launch/create-job",
  396. "warn",
  397. )
  398. return None
  399. metadata = self._handle_metadata_file()
  400. if metadata is None:
  401. self._log_if_verbose(
  402. f"Ensure read and write access to run files dir: {self._files_dir}, control this via the WANDB_DIR env var. See https://docs.wandb.ai/models/track/environment-variables",
  403. "warn",
  404. )
  405. return None
  406. runtime: str | None = metadata.get("python")
  407. # can't build a job without a python version
  408. if runtime is None:
  409. self._log_if_verbose(
  410. "No python version found in metadata, not creating job artifact. "
  411. "See https://docs.wandb.ai/platform/launch/create-job",
  412. "warn",
  413. )
  414. return None
  415. input_types = TypeRegistry.type_of(self._config).to_json()
  416. output_types = TypeRegistry.type_of(self._summary).to_json()
  417. name: str | None = None
  418. source_info: JobSourceDict | None = None
  419. # configure job from environment
  420. source_type = self._get_source_type(metadata)
  421. if not source_type:
  422. # if source_type is None, then we don't have enough information to build a job
  423. # if the user intended to create a job, warn.
  424. if (
  425. self._settings.job_name
  426. or self._settings.job_source
  427. or self._source_type
  428. ):
  429. self._log_if_verbose(
  430. "No source type found, not creating job artifact", "warn"
  431. )
  432. return None
  433. program_relpath = self._get_program_relpath(source_type, metadata)
  434. if not self._partial and source_type != "image" and not program_relpath:
  435. self._log_if_verbose(
  436. "No program path found, not creating job artifact. "
  437. "See https://docs.wandb.ai/platform/launch/create-job",
  438. "warn",
  439. )
  440. return None
  441. source, name = self._build_job_source(
  442. source_type,
  443. program_relpath,
  444. metadata,
  445. )
  446. if source is None:
  447. return None
  448. if build_context:
  449. source["build_context"] = build_context # type: ignore[typeddict-item]
  450. if dockerfile:
  451. source["dockerfile"] = dockerfile # type: ignore[typeddict-item]
  452. if base_image:
  453. source["base_image"] = base_image # type: ignore[typeddict-item]
  454. # Pop any keys that are initialized to None. The current TypedDict
  455. # system for source dicts requires all keys to be present, but we
  456. # don't want to include keys that are None in the final dict.
  457. for key in list(source.keys()):
  458. if source[key] is None: # type: ignore[literal-required]
  459. source.pop(key) # type: ignore[literal-require,misc]
  460. source_info = {
  461. "_version": str(get_min_supported_for_source_dict(source) or "v0"),
  462. "source_type": source_type,
  463. "source": source,
  464. "input_types": input_types,
  465. "output_types": output_types,
  466. "runtime": runtime,
  467. }
  468. if self._services:
  469. source_info["services"] = self._services
  470. assert source_info is not None
  471. assert name is not None
  472. artifact = InternalArtifact(name, JOB_ARTIFACT_TYPE)
  473. _logger.info("adding wandb-job metadata file")
  474. with artifact.new_file("wandb-job.json") as f:
  475. f.write(json.dumps(source_info, indent=4))
  476. artifact.add_file(
  477. os.path.join(self._files_dir, REQUIREMENTS_FNAME),
  478. name=FROZEN_REQUIREMENTS_FNAME,
  479. )
  480. if source_type == "repo" and os.path.exists(
  481. os.path.join(self._files_dir, DIFF_FNAME)
  482. ):
  483. # add diff
  484. artifact.add_file(
  485. os.path.join(self._files_dir, DIFF_FNAME),
  486. name=DIFF_FNAME,
  487. )
  488. return artifact
  489. def _get_source_type(self, metadata: dict[str, Any]) -> str | None:
  490. if self._source_type:
  491. return self._source_type
  492. if self._has_git_job_ingredients(metadata):
  493. _logger.info("is repo sourced job")
  494. return "repo"
  495. if self._has_artifact_job_ingredients():
  496. _logger.info("is artifact sourced job")
  497. return "artifact"
  498. if self._has_image_job_ingredients(metadata):
  499. _logger.info("is image sourced job")
  500. return "image"
  501. _logger.info("no source found")
  502. return None
  503. def _get_program_relpath(
  504. self, source_type: str, metadata: dict[str, Any]
  505. ) -> str | None:
  506. if self._is_notebook_run:
  507. _logger.info("run is notebook based run")
  508. program = metadata.get("program")
  509. if not program:
  510. self._log_if_verbose(
  511. "Notebook 'program' path not found in metadata. See https://docs.wandb.ai/platform/launch/create-job",
  512. "warn",
  513. )
  514. return program
  515. if source_type == "artifact" or self._settings.job_source == "artifact":
  516. # if the job is set to be an artifact, use relpath guaranteed
  517. # to be correct. 'codePath' uses the root path when in git repo
  518. # fallback to codePath if strictly local relpath not present
  519. return metadata.get("codePathLocal") or metadata.get("codePath")
  520. return metadata.get("codePath")
  521. def _handle_metadata_file(
  522. self,
  523. ) -> dict | None:
  524. if os.path.exists(os.path.join(self._files_dir, METADATA_FNAME)):
  525. with open(os.path.join(self._files_dir, METADATA_FNAME)) as f:
  526. metadata: dict = json.load(f)
  527. return metadata
  528. return None
  529. def _has_git_job_ingredients(self, metadata: dict[str, Any]) -> bool:
  530. git_info: dict[str, str] = metadata.get("git", {})
  531. if self._is_notebook_run and metadata.get("root") is None:
  532. return False
  533. return git_info.get("remote") is not None and git_info.get("commit") is not None
  534. def _has_artifact_job_ingredients(self) -> bool:
  535. return self._logged_code_artifact is not None
  536. def _has_image_job_ingredients(self, metadata: dict[str, Any]) -> bool:
  537. return metadata.get("docker") is not None