blocking.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. """Implements a fully blocking kernel client.
  2. Useful for test suites and blocking terminal interfaces.
  3. """
  4. import sys
  5. # -----------------------------------------------------------------------------
  6. # Copyright (C) 2012 The IPython Development Team
  7. #
  8. # Distributed under the terms of the BSD License. The full license is in
  9. # the file LICENSE, distributed as part of this software.
  10. # -----------------------------------------------------------------------------
  11. from queue import Empty, Queue
  12. # IPython imports
  13. from traitlets import Type
  14. # Local imports
  15. from .channels import InProcessChannel
  16. from .client import InProcessKernelClient
  17. class BlockingInProcessChannel(InProcessChannel):
  18. """A blocking in-process channel."""
  19. def __init__(self, *args, **kwds):
  20. """Initialize the channel."""
  21. super().__init__(*args, **kwds)
  22. self._in_queue: Queue[object] = Queue()
  23. def call_handlers(self, msg):
  24. """Call the handlers for a message."""
  25. self._in_queue.put(msg)
  26. def get_msg(self, block=True, timeout=None):
  27. """Gets a message if there is one that is ready."""
  28. if timeout is None:
  29. # Queue.get(timeout=None) has stupid uninteruptible
  30. # behavior, so wait for a week instead
  31. timeout = 604800
  32. return self._in_queue.get(block, timeout)
  33. def get_msgs(self):
  34. """Get all messages that are currently ready."""
  35. msgs = []
  36. while True:
  37. try:
  38. msgs.append(self.get_msg(block=False))
  39. except Empty:
  40. break
  41. return msgs
  42. def msg_ready(self):
  43. """Is there a message that has been received?"""
  44. return not self._in_queue.empty()
  45. class BlockingInProcessStdInChannel(BlockingInProcessChannel):
  46. """A blocking in-process stdin channel."""
  47. def call_handlers(self, msg):
  48. """Overridden for the in-process channel.
  49. This methods simply calls raw_input directly.
  50. """
  51. msg_type = msg["header"]["msg_type"]
  52. if msg_type == "input_request":
  53. _raw_input = self.client.kernel._sys_raw_input
  54. prompt = msg["content"]["prompt"]
  55. print(prompt, end="", file=sys.__stdout__)
  56. assert sys.__stdout__ is not None
  57. sys.__stdout__.flush()
  58. self.client.input(_raw_input())
  59. class BlockingInProcessKernelClient(InProcessKernelClient):
  60. """A blocking in-process kernel client."""
  61. # The classes to use for the various channels.
  62. shell_channel_class = Type(BlockingInProcessChannel)
  63. iopub_channel_class = Type(BlockingInProcessChannel)
  64. stdin_channel_class = Type(BlockingInProcessStdInChannel)
  65. def wait_for_ready(self):
  66. """Wait for kernel info reply on shell channel."""
  67. while True:
  68. self.kernel_info()
  69. try:
  70. msg = self.shell_channel.get_msg(block=True, timeout=1)
  71. except Empty:
  72. pass
  73. else:
  74. if msg["msg_type"] == "kernel_info_reply":
  75. # Checking that IOPub is connected. If it is not connected, start over.
  76. try:
  77. self.iopub_channel.get_msg(block=True, timeout=0.2)
  78. except Empty:
  79. pass
  80. else:
  81. self._handle_kernel_info_reply(msg)
  82. break
  83. # Flush IOPub channel
  84. while True:
  85. try:
  86. msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
  87. print(msg["msg_type"])
  88. except Empty:
  89. break