| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- """Module for starting up the service process (wandb-core)."""
- from __future__ import annotations
- import os
- import pathlib
- import platform
- import subprocess
- import tempfile
- from typing import TYPE_CHECKING
- from wandb.analytics import get_sentry
- from wandb.env import core_debug, dcgm_profiling_enabled, error_reporting_enabled
- from wandb.errors import WandbCoreNotAvailableError
- from wandb.sdk.lib.service import ipc_support
- from wandb.util import get_core_path
- from . import service_port_file, service_token
- if TYPE_CHECKING:
- from wandb.sdk.wandb_settings import Settings
- DEFAULT_DETACHED_IDLE_TIMEOUT = "10m"
- def start(settings: Settings) -> ServiceProcess:
- """Start the internal service process.
- Returns:
- A handle to the process.
- """
- return _start(
- settings,
- detached=False,
- idle_timeout=None,
- )
- def start_detached(
- settings: Settings,
- *,
- idle_timeout: str = DEFAULT_DETACHED_IDLE_TIMEOUT,
- ) -> ServiceProcess:
- """Start the internal service process in detached mode.
- In detached mode, the service process does not automatically exit when the
- starting process exits.
- Args:
- settings: SDK settings.
- idle_timeout: How long the service should stay alive with no connected
- clients before shutting down. This uses Go duration syntax, for
- example ``30s`` or ``10m``. Use ``0`` to disable idle shutdown.
- Returns:
- A handle to the process.
- """
- return _start(
- settings,
- detached=True,
- idle_timeout=idle_timeout,
- )
- def _start(
- settings: Settings,
- *,
- detached: bool,
- idle_timeout: str | None,
- ) -> ServiceProcess:
- get_sentry().configure_scope(tags=dict(settings), process_context="service")
- try:
- return _launch_server(
- settings,
- detached=detached,
- idle_timeout=idle_timeout,
- )
- except Exception as e:
- get_sentry().reraise(e)
- class ServiceProcess:
- """A handle to a process running the internal service."""
- def __init__(
- self,
- *,
- connection_token: service_token.ServiceToken,
- process: subprocess.Popen,
- ) -> None:
- self._token = connection_token
- self._process = process
- @property
- def token(self) -> service_token.ServiceToken:
- """A token for connecting to the process."""
- return self._token
- def join(self) -> int:
- """Wait for the process to end and return its exit code."""
- return self._process.wait()
- def _launch_server(
- settings: Settings,
- *,
- detached: bool,
- idle_timeout: str | None,
- ) -> ServiceProcess:
- """Launch server and set ports."""
- if platform.system() == "Windows":
- creationflags: int = subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined]
- start_new_session = False
- else:
- creationflags = 0
- start_new_session = True
- pid = str(os.getpid())
- with tempfile.TemporaryDirectory() as tmpdir:
- port_file = pathlib.Path(tmpdir, f"port-{pid}.txt")
- service_args: list[str] = []
- try:
- core_path = get_core_path()
- except WandbCoreNotAvailableError as e:
- get_sentry().reraise(e)
- service_args.append(core_path)
- if not error_reporting_enabled():
- service_args.append("--no-observability")
- if core_debug(default="False"):
- service_args.extend(["--log-level", "-4"])
- if dcgm_profiling_enabled():
- service_args.append("--enable-dcgm-profiling")
- service_args.extend(["--port-filename", str(port_file)])
- service_args.extend(["--pid", pid])
- if detached:
- service_args.extend(["--detached", "--idle-timeout", idle_timeout or "0"])
- if not ipc_support.SUPPORTS_UNIX:
- service_args.append("--listen-on-localhost")
- proc = subprocess.Popen(
- service_args,
- env=os.environ,
- close_fds=True,
- creationflags=creationflags,
- start_new_session=start_new_session,
- stdin=subprocess.DEVNULL if detached else None,
- stdout=subprocess.DEVNULL if detached else None,
- stderr=subprocess.DEVNULL if detached else None,
- )
- token = service_port_file.poll_for_token(
- port_file,
- proc,
- timeout=settings.x_service_wait,
- )
- return ServiceProcess(connection_token=token, process=proc)
|