| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- """Implements a fully blocking kernel client.
- Useful for test suites and blocking terminal interfaces.
- """
- import sys
- # -----------------------------------------------------------------------------
- # Copyright (C) 2012 The IPython Development Team
- #
- # Distributed under the terms of the BSD License. The full license is in
- # the file LICENSE, distributed as part of this software.
- # -----------------------------------------------------------------------------
- from queue import Empty, Queue
- # IPython imports
- from traitlets import Type
- # Local imports
- from .channels import InProcessChannel
- from .client import InProcessKernelClient
- class BlockingInProcessChannel(InProcessChannel):
- """A blocking in-process channel."""
- def __init__(self, *args, **kwds):
- """Initialize the channel."""
- super().__init__(*args, **kwds)
- self._in_queue: Queue[object] = Queue()
- def call_handlers(self, msg):
- """Call the handlers for a message."""
- self._in_queue.put(msg)
- def get_msg(self, block=True, timeout=None):
- """Gets a message if there is one that is ready."""
- if timeout is None:
- # Queue.get(timeout=None) has stupid uninteruptible
- # behavior, so wait for a week instead
- timeout = 604800
- return self._in_queue.get(block, timeout)
- def get_msgs(self):
- """Get all messages that are currently ready."""
- msgs = []
- while True:
- try:
- msgs.append(self.get_msg(block=False))
- except Empty:
- break
- return msgs
- def msg_ready(self):
- """Is there a message that has been received?"""
- return not self._in_queue.empty()
- class BlockingInProcessStdInChannel(BlockingInProcessChannel):
- """A blocking in-process stdin channel."""
- def call_handlers(self, msg):
- """Overridden for the in-process channel.
- This methods simply calls raw_input directly.
- """
- msg_type = msg["header"]["msg_type"]
- if msg_type == "input_request":
- _raw_input = self.client.kernel._sys_raw_input
- prompt = msg["content"]["prompt"]
- print(prompt, end="", file=sys.__stdout__)
- assert sys.__stdout__ is not None
- sys.__stdout__.flush()
- self.client.input(_raw_input())
- class BlockingInProcessKernelClient(InProcessKernelClient):
- """A blocking in-process kernel client."""
- # The classes to use for the various channels.
- shell_channel_class = Type(BlockingInProcessChannel)
- iopub_channel_class = Type(BlockingInProcessChannel)
- stdin_channel_class = Type(BlockingInProcessStdInChannel)
- def wait_for_ready(self):
- """Wait for kernel info reply on shell channel."""
- while True:
- self.kernel_info()
- try:
- msg = self.shell_channel.get_msg(block=True, timeout=1)
- except Empty:
- pass
- else:
- if msg["msg_type"] == "kernel_info_reply":
- # Checking that IOPub is connected. If it is not connected, start over.
- try:
- self.iopub_channel.get_msg(block=True, timeout=0.2)
- except Empty:
- pass
- else:
- self._handle_kernel_info_reply(msg)
- break
- # Flush IOPub channel
- while True:
- try:
- msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
- print(msg["msg_type"])
- except Empty:
- break
|