local_process.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. from __future__ import annotations
  2. import logging
  3. import shlex
  4. from typing import Any
  5. import wandb
  6. from .._project_spec import LaunchProject
  7. from ..errors import LaunchError
  8. from ..utils import (
  9. LOG_PREFIX,
  10. MAX_ENV_LENGTHS,
  11. PROJECT_SYNCHRONOUS,
  12. sanitize_wandb_api_key,
  13. validate_wandb_python_deps,
  14. )
  15. from .abstract import AbstractRun, AbstractRunner
  16. from .local_container import _run_entry_point
  17. _logger = logging.getLogger(__name__)
  18. class LocalProcessRunner(AbstractRunner):
  19. """Runner class, uses a project to create a LocallySubmittedRun.
  20. LocalProcessRunner is very similar to a LocalContainerRunner, except it does not
  21. run the command inside a docker container. Instead, it runs the
  22. command specified as a process directly on the bare metal machine.
  23. """
  24. async def run( # type: ignore
  25. self,
  26. launch_project: LaunchProject,
  27. *args,
  28. **kwargs,
  29. ) -> AbstractRun | None:
  30. if args is not None:
  31. _msg = f"{LOG_PREFIX}LocalProcessRunner.run received unused args {args}"
  32. _logger.warning(_msg)
  33. if kwargs is not None:
  34. _msg = f"{LOG_PREFIX}LocalProcessRunner.run received unused kwargs {kwargs}"
  35. _logger.warning(_msg)
  36. synchronous: bool = self.backend_config[PROJECT_SYNCHRONOUS]
  37. entry_point = (
  38. launch_project.override_entrypoint or launch_project.get_job_entry_point()
  39. )
  40. cmd: list[Any] = []
  41. if launch_project.project_dir is None:
  42. raise LaunchError("Launch LocalProcessRunner received empty project dir")
  43. if launch_project.job:
  44. assert launch_project._job_artifact is not None
  45. try:
  46. validate_wandb_python_deps(
  47. "requirements.frozen.txt",
  48. launch_project.project_dir,
  49. )
  50. except Exception:
  51. wandb.termwarn("Unable to validate python dependencies")
  52. env_vars = launch_project.get_env_vars_dict(
  53. self._api, MAX_ENV_LENGTHS[self.__class__.__name__]
  54. )
  55. for env_key, env_value in env_vars.items():
  56. cmd += [f"{shlex.quote(env_key)}={shlex.quote(env_value)}"]
  57. if entry_point is not None:
  58. cmd += entry_point.command
  59. cmd += launch_project.override_args
  60. command_str = " ".join(cmd).strip()
  61. _msg = f"{LOG_PREFIX}Launching run as a local-process with command {sanitize_wandb_api_key(command_str)}"
  62. wandb.termlog(_msg)
  63. run = _run_entry_point(command_str, launch_project.project_dir)
  64. if synchronous:
  65. await run.wait()
  66. return run