client.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. """Implements a fully blocking kernel client.
  2. Useful for test suites and blocking terminal interfaces.
  3. """
  4. # Copyright (c) Jupyter Development Team.
  5. # Distributed under the terms of the Modified BSD License.
  6. from __future__ import annotations
  7. import typing as t
  8. from traitlets import Type
  9. from ..channels import HBChannel, ZMQSocketChannel
  10. from ..client import KernelClient, reqrep
  11. from ..utils import run_sync
  12. def wrapped(meth: t.Callable, channel: str) -> t.Callable:
  13. """Wrap a method on a channel and handle replies."""
  14. def _(self: BlockingKernelClient, *args: t.Any, **kwargs: t.Any) -> t.Any:
  15. reply = kwargs.pop("reply", False)
  16. timeout = kwargs.pop("timeout", None)
  17. msg_id = meth(self, *args, **kwargs)
  18. if not reply:
  19. return msg_id
  20. return self._recv_reply(msg_id, timeout=timeout, channel=channel)
  21. return _
  22. class BlockingKernelClient(KernelClient):
  23. """A KernelClient with blocking APIs
  24. ``get_[channel]_msg()`` methods wait for and return messages on channels,
  25. raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds.
  26. """
  27. # --------------------------------------------------------------------------
  28. # Channel proxy methods
  29. # --------------------------------------------------------------------------
  30. get_shell_msg = run_sync(KernelClient._async_get_shell_msg)
  31. get_iopub_msg = run_sync(KernelClient._async_get_iopub_msg)
  32. get_stdin_msg = run_sync(KernelClient._async_get_stdin_msg)
  33. get_control_msg = run_sync(KernelClient._async_get_control_msg)
  34. wait_for_ready = run_sync(KernelClient._async_wait_for_ready)
  35. # The classes to use for the various channels
  36. shell_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment]
  37. iopub_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment]
  38. stdin_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment]
  39. hb_channel_class = Type(HBChannel) # type:ignore[assignment]
  40. control_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment]
  41. _recv_reply = run_sync(KernelClient._async_recv_reply)
  42. # replies come on the shell channel
  43. execute = reqrep(wrapped, KernelClient.execute)
  44. history = reqrep(wrapped, KernelClient.history)
  45. complete = reqrep(wrapped, KernelClient.complete)
  46. inspect = reqrep(wrapped, KernelClient.inspect)
  47. kernel_info = reqrep(wrapped, KernelClient.kernel_info)
  48. comm_info = reqrep(wrapped, KernelClient.comm_info)
  49. is_alive = run_sync(KernelClient._async_is_alive)
  50. execute_interactive = run_sync(KernelClient._async_execute_interactive)
  51. # replies come on the control channel
  52. shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control")