service_process.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """Module for starting up the service process (wandb-core)."""
  2. from __future__ import annotations
  3. import os
  4. import pathlib
  5. import platform
  6. import subprocess
  7. import tempfile
  8. from typing import TYPE_CHECKING
  9. from wandb.analytics import get_sentry
  10. from wandb.env import core_debug, dcgm_profiling_enabled, error_reporting_enabled
  11. from wandb.errors import WandbCoreNotAvailableError
  12. from wandb.sdk.lib.service import ipc_support
  13. from wandb.util import get_core_path
  14. from . import service_port_file, service_token
  15. if TYPE_CHECKING:
  16. from wandb.sdk.wandb_settings import Settings
  17. DEFAULT_DETACHED_IDLE_TIMEOUT = "10m"
  18. def start(settings: Settings) -> ServiceProcess:
  19. """Start the internal service process.
  20. Returns:
  21. A handle to the process.
  22. """
  23. return _start(
  24. settings,
  25. detached=False,
  26. idle_timeout=None,
  27. )
  28. def start_detached(
  29. settings: Settings,
  30. *,
  31. idle_timeout: str = DEFAULT_DETACHED_IDLE_TIMEOUT,
  32. ) -> ServiceProcess:
  33. """Start the internal service process in detached mode.
  34. In detached mode, the service process does not automatically exit when the
  35. starting process exits.
  36. Args:
  37. settings: SDK settings.
  38. idle_timeout: How long the service should stay alive with no connected
  39. clients before shutting down. This uses Go duration syntax, for
  40. example ``30s`` or ``10m``. Use ``0`` to disable idle shutdown.
  41. Returns:
  42. A handle to the process.
  43. """
  44. return _start(
  45. settings,
  46. detached=True,
  47. idle_timeout=idle_timeout,
  48. )
  49. def _start(
  50. settings: Settings,
  51. *,
  52. detached: bool,
  53. idle_timeout: str | None,
  54. ) -> ServiceProcess:
  55. get_sentry().configure_scope(tags=dict(settings), process_context="service")
  56. try:
  57. return _launch_server(
  58. settings,
  59. detached=detached,
  60. idle_timeout=idle_timeout,
  61. )
  62. except Exception as e:
  63. get_sentry().reraise(e)
  64. class ServiceProcess:
  65. """A handle to a process running the internal service."""
  66. def __init__(
  67. self,
  68. *,
  69. connection_token: service_token.ServiceToken,
  70. process: subprocess.Popen,
  71. ) -> None:
  72. self._token = connection_token
  73. self._process = process
  74. @property
  75. def token(self) -> service_token.ServiceToken:
  76. """A token for connecting to the process."""
  77. return self._token
  78. def join(self) -> int:
  79. """Wait for the process to end and return its exit code."""
  80. return self._process.wait()
  81. def _launch_server(
  82. settings: Settings,
  83. *,
  84. detached: bool,
  85. idle_timeout: str | None,
  86. ) -> ServiceProcess:
  87. """Launch server and set ports."""
  88. if platform.system() == "Windows":
  89. creationflags: int = subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined]
  90. start_new_session = False
  91. else:
  92. creationflags = 0
  93. start_new_session = True
  94. pid = str(os.getpid())
  95. with tempfile.TemporaryDirectory() as tmpdir:
  96. port_file = pathlib.Path(tmpdir, f"port-{pid}.txt")
  97. service_args: list[str] = []
  98. try:
  99. core_path = get_core_path()
  100. except WandbCoreNotAvailableError as e:
  101. get_sentry().reraise(e)
  102. service_args.append(core_path)
  103. if not error_reporting_enabled():
  104. service_args.append("--no-observability")
  105. if core_debug(default="False"):
  106. service_args.extend(["--log-level", "-4"])
  107. if dcgm_profiling_enabled():
  108. service_args.append("--enable-dcgm-profiling")
  109. service_args.extend(["--port-filename", str(port_file)])
  110. service_args.extend(["--pid", pid])
  111. if detached:
  112. service_args.extend(["--detached", "--idle-timeout", idle_timeout or "0"])
  113. if not ipc_support.SUPPORTS_UNIX:
  114. service_args.append("--listen-on-localhost")
  115. proc = subprocess.Popen(
  116. service_args,
  117. env=os.environ,
  118. close_fds=True,
  119. creationflags=creationflags,
  120. start_new_session=start_new_session,
  121. stdin=subprocess.DEVNULL if detached else None,
  122. stdout=subprocess.DEVNULL if detached else None,
  123. stderr=subprocess.DEVNULL if detached else None,
  124. )
  125. token = service_port_file.poll_for_token(
  126. port_file,
  127. proc,
  128. timeout=settings.x_service_wait,
  129. )
  130. return ServiceProcess(connection_token=token, process=proc)