interface_queue.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. from __future__ import annotations
  2. import logging
  3. from multiprocessing.process import BaseProcess
  4. from typing import TYPE_CHECKING
  5. from typing_extensions import override
  6. from .interface_shared import InterfaceShared
  7. if TYPE_CHECKING:
  8. from queue import Queue
  9. from wandb.proto import wandb_internal_pb2 as pb
  10. from wandb.sdk.mailbox.mailbox_handle import MailboxHandle
  11. logger = logging.getLogger("wandb")
  12. class InterfaceQueue(InterfaceShared):
  13. """Legacy implementation of InterfaceShared.
  14. This was used by legacy-service to pass messages back to itself before
  15. the existence of wandb-core. It may be removed once legacy-service is
  16. completely removed (including its use in `wandb sync`).
  17. Since it was used by the internal service, it does not implement
  18. the "deliver" methods, which are only used in the client.
  19. """
  20. def __init__(
  21. self,
  22. record_q: Queue[pb.Record] | None = None,
  23. result_q: Queue[pb.Result] | None = None,
  24. process: BaseProcess | None = None,
  25. ) -> None:
  26. self.record_q = record_q
  27. self.result_q = result_q
  28. self._process = process
  29. super().__init__()
  30. @override
  31. def _publish(self, record: pb.Record, *, nowait: bool = False) -> None:
  32. if self._process and not self._process.is_alive():
  33. raise Exception("The wandb backend process has shutdown")
  34. if self.record_q:
  35. self.record_q.put(record)
  36. @override
  37. async def deliver_async(
  38. self,
  39. record: pb.Record,
  40. ) -> MailboxHandle[pb.Result]:
  41. raise NotImplementedError
  42. @override
  43. def _deliver(self, record: pb.Record) -> MailboxHandle[pb.Result]:
  44. raise NotImplementedError