mailbox_handle.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. from __future__ import annotations
  2. import abc
  3. from typing import Callable, Generic
  4. from typing_extensions import TypeVar, override
  5. from wandb.sdk.lib import asyncio_manager
  6. _T = TypeVar("_T")
  7. _S = TypeVar("_S")
  8. class HandleAbandonedError(Exception):
  9. """The handle has no response and has been abandoned."""
  10. class ServerResponseError(Exception):
  11. """The handle received a generic error response.
  12. In practice, this corresponds to receiving the ServerErrorResponse message.
  13. """
  14. class MailboxHandle(abc.ABC, Generic[_T]):
  15. """A handle for waiting on a response to a request."""
  16. def __init__(self, asyncer: asyncio_manager.AsyncioManager) -> None:
  17. self._asyncer = asyncer
  18. @property
  19. def asyncer(self) -> asyncio_manager.AsyncioManager:
  20. """The asyncio thread to which the handle belongs.
  21. The handle's async methods must be run using this object.
  22. """
  23. return self._asyncer
  24. def map(self, fn: Callable[[_T], _S]) -> MailboxHandle[_S]:
  25. """Returns a transformed handle.
  26. Methods on the returned handle call methods on this handle, but the
  27. response type is derived using the given function.
  28. Args:
  29. fn: A function to apply to this handle's result to get the new
  30. handle's result. The function should be pure and fast.
  31. """
  32. return _MailboxMappedHandle(self, fn)
  33. @abc.abstractmethod
  34. def cancel(self) -> None:
  35. """Cancel the handle, requesting any associated work to not complete.
  36. Any calls to `wait_or` or `wait_async` will raise `HandleAbandonedError`
  37. if they aren't resolved within a short time.
  38. Cancellation is best-effort. Most exceptions are logged and suppressed.
  39. """
  40. @abc.abstractmethod
  41. def wait_or(self, *, timeout: float | None) -> _T:
  42. """Wait for a response or a timeout.
  43. It is an error to call this from an async function.
  44. On error, including KeyboardInterrupt or a timeout,
  45. the handle cancels itself.
  46. Args:
  47. timeout: A finite number of seconds or None to never time out.
  48. If less than or equal to zero, times out immediately unless
  49. the response is available.
  50. Returns:
  51. The response if it arrives before the timeout or has already arrived.
  52. Raises:
  53. TimeoutError: If the timeout is reached.
  54. HandleAbandonedError: If the handle becomes abandoned.
  55. ServerResponseError: If the handle received an error response.
  56. """
  57. @abc.abstractmethod
  58. async def wait_async(self, *, timeout: float | None) -> _T:
  59. """Wait for a response or timeout.
  60. This must run in an `asyncio` event loop.
  61. On error, including asyncio cancellation, KeyboardInterrupt or
  62. a timeout, the handle cancels itself.
  63. Args:
  64. timeout: A finite number of seconds or None to never time out.
  65. Returns:
  66. The response if it arrives before the timeout or has already arrived.
  67. Raises:
  68. TimeoutError: If the timeout is reached.
  69. HandleAbandonedError: If the handle becomes abandoned.
  70. ServerResponseError: If the handle received an error response.
  71. """
  72. class _MailboxMappedHandle(Generic[_S], MailboxHandle[_S]):
  73. """A mailbox handle whose result is derived from another handle."""
  74. def __init__(
  75. self,
  76. handle: MailboxHandle[_T],
  77. fn: Callable[[_T], _S],
  78. ) -> None:
  79. super().__init__(handle.asyncer)
  80. self._handle = handle
  81. self._fn = fn
  82. @override
  83. def cancel(self) -> None:
  84. self._handle.cancel()
  85. @override
  86. def wait_or(self, *, timeout: float | None) -> _S:
  87. return self._fn(self._handle.wait_or(timeout=timeout))
  88. @override
  89. async def wait_async(self, *, timeout: float | None) -> _S:
  90. response = await self._handle.wait_async(timeout=timeout)
  91. return self._fn(response)