abstract.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. """Implementation of the abstract runner class.
  2. This class defines the interface that the W&B launch runner uses to manage the lifecycle
  3. of runs launched in different environments (e.g. runs launched locally or in a cluster).
  4. """
  5. from __future__ import annotations
  6. import logging
  7. import os
  8. import shutil
  9. import subprocess
  10. import sys
  11. from abc import ABC, abstractmethod
  12. from typing import Any, Literal
  13. import wandb
  14. from wandb.apis.internal import Api
  15. from wandb.sdk.lib import runid
  16. from .._project_spec import LaunchProject
  17. _logger = logging.getLogger(__name__)
  18. State = Literal[
  19. "unknown",
  20. "starting",
  21. "running",
  22. "failed",
  23. "finished",
  24. "stopping",
  25. "stopped",
  26. "preempted",
  27. ]
  28. class Status:
  29. def __init__(self, state: State = "unknown", messages: list[str] = None): # type: ignore
  30. self.state = state
  31. self.messages = messages or []
  32. def __repr__(self) -> State:
  33. return self.state
  34. def __str__(self) -> str:
  35. return self.state
  36. def __eq__(self, __value: object) -> bool:
  37. if isinstance(__value, Status):
  38. return self.state == __value.state
  39. else:
  40. return self.state == __value
  41. def __hash__(self) -> int:
  42. return hash(self.state)
  43. class AbstractRun(ABC):
  44. """Wrapper around a W&B launch run.
  45. A launched run is a subprocess running an entry point
  46. command, that exposes methods for waiting on and cancelling the run.
  47. This class defines the interface that the W&B launch runner uses to manage the lifecycle
  48. of runs launched in different environments (e.g. runs launched locally or in a cluster).
  49. ``AbstractRun`` is not thread-safe. That is, concurrent calls to wait() / cancel()
  50. from multiple threads may inadvertently kill resources (e.g. local processes) unrelated to the
  51. run.
  52. """
  53. def __init__(self) -> None:
  54. self._status = Status()
  55. @property
  56. def status(self) -> Status:
  57. return self._status
  58. @abstractmethod
  59. async def get_logs(self) -> str | None:
  60. """Return the logs associated with the run."""
  61. def _run_cmd(
  62. self, cmd: list[str], output_only: bool | None = False
  63. ) -> subprocess.Popen[bytes] | bytes | None:
  64. """Run the command and returns a popen object or the stdout of the command.
  65. Arguments:
  66. cmd: The command to run
  67. output_only: If true just return the stdout bytes
  68. """
  69. try:
  70. env = os.environ
  71. popen = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE)
  72. if output_only:
  73. popen.wait()
  74. if popen.stdout is not None:
  75. return popen.stdout.read()
  76. return popen
  77. except subprocess.CalledProcessError as e:
  78. wandb.termerror(f"Command failed: {e}")
  79. return None
  80. @abstractmethod
  81. async def wait(self) -> bool:
  82. """Wait for the run to finish, returning True if the run succeeded and false otherwise.
  83. Note that in some cases, we may wait until the remote job completes rather than until the W&B run completes.
  84. """
  85. @abstractmethod
  86. async def get_status(self) -> Status:
  87. """Get status of the run."""
  88. @abstractmethod
  89. async def cancel(self) -> None:
  90. """Cancel the run (interrupts the command subprocess, cancels the run, etc).
  91. Cancels the run and waits for it to terminate. The W&B run status may not be
  92. set correctly upon run cancellation.
  93. """
  94. @property
  95. @abstractmethod
  96. def id(self) -> str | None:
  97. pass
  98. class AbstractRunner(ABC):
  99. """Abstract plugin class defining the interface needed to execute W&B Launches.
  100. You can define subclasses of ``AbstractRunner`` and expose them as third-party
  101. plugins to enable running W&B projects against custom execution backends
  102. (e.g. to run projects against your team's in-house cluster or job scheduler).
  103. """
  104. _type: str
  105. def __init__(
  106. self,
  107. api: Api,
  108. backend_config: dict[str, Any],
  109. ) -> None:
  110. self._api = api
  111. self.backend_config = backend_config
  112. self._cwd = os.getcwd()
  113. self._namespace = runid.generate_id()
  114. def find_executable(
  115. self,
  116. cmd: str,
  117. ) -> str | None:
  118. """Cross platform utility for checking if a program is available."""
  119. return shutil.which(cmd)
  120. @property
  121. def api_key(self) -> Any:
  122. return self._api.api_key
  123. def verify(self) -> bool:
  124. """This is called on first boot to verify the needed commands, and permissions are available.
  125. For now just call `wandb.termerror` and `sys.exit(1)`
  126. """
  127. if self._api.api_key is None:
  128. wandb.termerror(
  129. "Couldn't find W&B api key, run wandb login or set WANDB_API_KEY"
  130. )
  131. sys.exit(1)
  132. return True
  133. @abstractmethod
  134. async def run(
  135. self,
  136. launch_project: LaunchProject,
  137. image_uri: str,
  138. ) -> AbstractRun | None:
  139. """Submit an LaunchProject to be run.
  140. Returns a SubmittedRun object to track the execution
  141. Arguments:
  142. launch_project: Object of _project_spec.LaunchProject class representing a wandb launch project
  143. Returns:
  144. A :py:class:`wandb.sdk.launch.runners.SubmittedRun`. This function is expected to run
  145. the project asynchronously, i.e. it should trigger project execution and then
  146. immediately return a `SubmittedRun` to track execution status.
  147. """