| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- from __future__ import annotations
- import abc
- from typing import Callable, Generic
- from typing_extensions import TypeVar, override
- from wandb.sdk.lib import asyncio_manager
- _T = TypeVar("_T")
- _S = TypeVar("_S")
- class HandleAbandonedError(Exception):
- """The handle has no response and has been abandoned."""
- class ServerResponseError(Exception):
- """The handle received a generic error response.
- In practice, this corresponds to receiving the ServerErrorResponse message.
- """
- class MailboxHandle(abc.ABC, Generic[_T]):
- """A handle for waiting on a response to a request."""
- def __init__(self, asyncer: asyncio_manager.AsyncioManager) -> None:
- self._asyncer = asyncer
- @property
- def asyncer(self) -> asyncio_manager.AsyncioManager:
- """The asyncio thread to which the handle belongs.
- The handle's async methods must be run using this object.
- """
- return self._asyncer
- def map(self, fn: Callable[[_T], _S]) -> MailboxHandle[_S]:
- """Returns a transformed handle.
- Methods on the returned handle call methods on this handle, but the
- response type is derived using the given function.
- Args:
- fn: A function to apply to this handle's result to get the new
- handle's result. The function should be pure and fast.
- """
- return _MailboxMappedHandle(self, fn)
- @abc.abstractmethod
- def cancel(self) -> None:
- """Cancel the handle, requesting any associated work to not complete.
- Any calls to `wait_or` or `wait_async` will raise `HandleAbandonedError`
- if they aren't resolved within a short time.
- Cancellation is best-effort. Most exceptions are logged and suppressed.
- """
- @abc.abstractmethod
- def wait_or(self, *, timeout: float | None) -> _T:
- """Wait for a response or a timeout.
- It is an error to call this from an async function.
- On error, including KeyboardInterrupt or a timeout,
- the handle cancels itself.
- Args:
- timeout: A finite number of seconds or None to never time out.
- If less than or equal to zero, times out immediately unless
- the response is available.
- Returns:
- The response if it arrives before the timeout or has already arrived.
- Raises:
- TimeoutError: If the timeout is reached.
- HandleAbandonedError: If the handle becomes abandoned.
- ServerResponseError: If the handle received an error response.
- """
- @abc.abstractmethod
- async def wait_async(self, *, timeout: float | None) -> _T:
- """Wait for a response or timeout.
- This must run in an `asyncio` event loop.
- On error, including asyncio cancellation, KeyboardInterrupt or
- a timeout, the handle cancels itself.
- Args:
- timeout: A finite number of seconds or None to never time out.
- Returns:
- The response if it arrives before the timeout or has already arrived.
- Raises:
- TimeoutError: If the timeout is reached.
- HandleAbandonedError: If the handle becomes abandoned.
- ServerResponseError: If the handle received an error response.
- """
- class _MailboxMappedHandle(Generic[_S], MailboxHandle[_S]):
- """A mailbox handle whose result is derived from another handle."""
- def __init__(
- self,
- handle: MailboxHandle[_T],
- fn: Callable[[_T], _S],
- ) -> None:
- super().__init__(handle.asyncer)
- self._handle = handle
- self._fn = fn
- @override
- def cancel(self) -> None:
- self._handle.cancel()
- @override
- def wait_or(self, *, timeout: float | None) -> _S:
- return self._fn(self._handle.wait_or(timeout=timeout))
- @override
- async def wait_async(self, *, timeout: float | None) -> _S:
- response = await self._handle.wait_async(timeout=timeout)
- return self._fn(response)
|