client.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. """Implements an async kernel client"""
  2. # Copyright (c) Jupyter Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. from __future__ import annotations
  5. import typing as t
  6. import zmq.asyncio
  7. from traitlets import Instance, Type
  8. from ..channels import AsyncZMQSocketChannel, HBChannel
  9. from ..client import KernelClient, reqrep
  10. def wrapped(meth: t.Callable, channel: str) -> t.Callable:
  11. """Wrap a method on a channel and handle replies."""
  12. def _(self: AsyncKernelClient, *args: t.Any, **kwargs: t.Any) -> t.Any:
  13. reply = kwargs.pop("reply", False)
  14. timeout = kwargs.pop("timeout", None)
  15. msg_id = meth(self, *args, **kwargs)
  16. if not reply:
  17. return msg_id
  18. return self._recv_reply(msg_id, timeout=timeout, channel=channel)
  19. return _
  20. class AsyncKernelClient(KernelClient):
  21. """A KernelClient with async APIs
  22. ``get_[channel]_msg()`` methods wait for and return messages on channels,
  23. raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds.
  24. """
  25. context = Instance(zmq.asyncio.Context) # type:ignore[assignment]
  26. def _context_default(self) -> zmq.asyncio.Context:
  27. self._created_context = True
  28. return zmq.asyncio.Context()
  29. # --------------------------------------------------------------------------
  30. # Channel proxy methods
  31. # --------------------------------------------------------------------------
  32. get_shell_msg = KernelClient._async_get_shell_msg
  33. get_iopub_msg = KernelClient._async_get_iopub_msg
  34. get_stdin_msg = KernelClient._async_get_stdin_msg
  35. get_control_msg = KernelClient._async_get_control_msg
  36. wait_for_ready = KernelClient._async_wait_for_ready
  37. # The classes to use for the various channels
  38. shell_channel_class = Type(AsyncZMQSocketChannel) # type:ignore[assignment]
  39. iopub_channel_class = Type(AsyncZMQSocketChannel) # type:ignore[assignment]
  40. stdin_channel_class = Type(AsyncZMQSocketChannel) # type:ignore[assignment]
  41. hb_channel_class = Type(HBChannel) # type:ignore[assignment]
  42. control_channel_class = Type(AsyncZMQSocketChannel) # type:ignore[assignment]
  43. _recv_reply = KernelClient._async_recv_reply
  44. # replies come on the shell channel
  45. execute = reqrep(wrapped, KernelClient.execute)
  46. history = reqrep(wrapped, KernelClient.history)
  47. complete = reqrep(wrapped, KernelClient.complete)
  48. is_complete = reqrep(wrapped, KernelClient.is_complete)
  49. inspect = reqrep(wrapped, KernelClient.inspect)
  50. kernel_info = reqrep(wrapped, KernelClient.kernel_info)
  51. comm_info = reqrep(wrapped, KernelClient.comm_info)
  52. is_alive = KernelClient._async_is_alive
  53. execute_interactive = KernelClient._async_execute_interactive
  54. # replies come on the control channel
  55. shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control")