| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- from __future__ import annotations
- import logging
- import secrets
- import string
- import threading
- from collections.abc import Awaitable
- from typing import Callable
- from wandb.proto import wandb_internal_pb2 as pb
- from wandb.proto import wandb_server_pb2 as spb
- from wandb.sdk.lib import asyncio_manager
- from .mailbox_handle import MailboxHandle
- from .response_handle import MailboxResponseHandle
- _logger = logging.getLogger(__name__)
- class MailboxClosedError(Exception):
- """The mailbox has been closed and cannot be used."""
- class Mailbox:
- """Matches service responses to requests.
- The mailbox can set an address on a server request and create a handle for
- waiting for a response to that record. Responses are delivered by calling
- `deliver()`. The `close()` method abandons all handles in case the
- service process becomes unreachable.
- """
- def __init__(
- self,
- asyncer: asyncio_manager.AsyncioManager,
- cancel: Callable[[str], Awaitable[None]],
- ) -> None:
- """Create a mailbox.
- Args:
- asyncer: Asyncio runner for scheduling async operations.
- cancel: A callback that can be used to cancel a request by ID.
- """
- self._asyncer = asyncer
- self._cancel = cancel
- self._handles: dict[str, MailboxResponseHandle] = {}
- self._handles_lock = threading.Lock()
- self._closed = False
- def require_response(
- self,
- request: spb.ServerRequest | pb.Record,
- ) -> MailboxHandle[spb.ServerResponse]:
- """Set a response address on a request.
- Args:
- request: The request on which to set a request ID or mailbox slot.
- This is mutated. An address must not already be set.
- Returns:
- A handle for waiting for the response to the request.
- Raises:
- MailboxClosedError: If the mailbox has been closed, in which case
- no new responses are expected to be delivered and new handles
- cannot be created.
- """
- if isinstance(request, spb.ServerRequest):
- if (address := request.request_id) or (
- address := request.record_publish.control.mailbox_slot
- ):
- raise ValueError(f"Request already has an address ({address})")
- address = self._new_address()
- request.request_id = address
- if request.HasField("record_publish"):
- request.record_publish.control.mailbox_slot = address
- if request.HasField("record_communicate"):
- request.record_communicate.control.mailbox_slot = address
- else:
- if address := request.control.mailbox_slot:
- raise ValueError(f"Request already has an address ({address})")
- address = self._new_address()
- request.control.mailbox_slot = address
- with self._handles_lock:
- if self._closed:
- raise MailboxClosedError()
- handle = MailboxResponseHandle(
- address,
- asyncer=self._asyncer,
- cancel=self._cancel,
- )
- self._handles[address] = handle
- return handle
- def _new_address(self) -> str:
- """Returns an unused address for a request.
- Assumes `_handles_lock` is held.
- """
- def generate():
- return "".join(
- secrets.choice(string.ascii_lowercase + string.digits)
- for _ in range(12)
- )
- address = generate()
- # Being extra cautious. This loop will almost never be entered.
- while address in self._handles:
- address = generate()
- return address
- async def deliver(self, response: spb.ServerResponse) -> None:
- """Deliver a response from the service.
- If the response address is invalid, this does nothing.
- It is a no-op if the mailbox has been closed.
- """
- address = response.request_id
- if not address:
- kind: str | None = response.WhichOneof("server_response_type")
- if kind == "result_communicate":
- result_type = response.result_communicate.WhichOneof("result_type")
- kind = f"result_communicate.{result_type}"
- _logger.error(f"Received response with no mailbox slot: {kind}")
- return
- with self._handles_lock:
- # NOTE: If the mailbox is closed, this returns None because
- # we clear the dict.
- handle = self._handles.pop(address, None)
- # It is not an error if there is no handle for the address:
- # handles can be abandoned if the result is no longer needed.
- if handle:
- await handle.deliver(response)
- def close(self) -> None:
- """Indicate no further responses will be delivered.
- Abandons all handles.
- """
- with self._handles_lock:
- self._closed = True
- _logger.info(
- f"Closing mailbox, abandoning {len(self._handles)} handles.",
- )
- for handle in self._handles.values():
- handle.abandon()
- self._handles.clear()
|