| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- from __future__ import annotations
- import logging
- from multiprocessing.process import BaseProcess
- from typing import TYPE_CHECKING
- from typing_extensions import override
- from .interface_shared import InterfaceShared
- if TYPE_CHECKING:
- from queue import Queue
- from wandb.proto import wandb_internal_pb2 as pb
- from wandb.sdk.mailbox.mailbox_handle import MailboxHandle
- logger = logging.getLogger("wandb")
- class InterfaceQueue(InterfaceShared):
- """Legacy implementation of InterfaceShared.
- This was used by legacy-service to pass messages back to itself before
- the existence of wandb-core. It may be removed once legacy-service is
- completely removed (including its use in `wandb sync`).
- Since it was used by the internal service, it does not implement
- the "deliver" methods, which are only used in the client.
- """
- def __init__(
- self,
- record_q: Queue[pb.Record] | None = None,
- result_q: Queue[pb.Result] | None = None,
- process: BaseProcess | None = None,
- ) -> None:
- self.record_q = record_q
- self.result_q = result_q
- self._process = process
- super().__init__()
- @override
- def _publish(self, record: pb.Record, *, nowait: bool = False) -> None:
- if self._process and not self._process.is_alive():
- raise Exception("The wandb backend process has shutdown")
- if self.record_q:
- self.record_q.put(record)
- @override
- async def deliver_async(
- self,
- record: pb.Record,
- ) -> MailboxHandle[pb.Result]:
- raise NotImplementedError
- @override
- def _deliver(self, record: pb.Record) -> MailboxHandle[pb.Result]:
- raise NotImplementedError
|