| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637 |
- """Event loop integration for the ZeroMQ-based kernels."""
- # Copyright (c) IPython Development Team.
- # Distributed under the terms of the Modified BSD License.
- import os
- import platform
- import sys
- from functools import partial
- import zmq
- from packaging.version import Version as V
- from traitlets.config.application import Application
- def _use_appnope():
- """Should we use appnope for dealing with OS X app nap?
- Checks if we are on OS X 10.9 or greater.
- """
- return sys.platform == "darwin" and V(platform.mac_ver()[0]) >= V("10.9")
- # mapping of keys to loop functions
- loop_map = {
- "inline": None,
- "nbagg": None,
- "webagg": None,
- "notebook": None,
- "ipympl": None,
- "widget": None,
- None: None,
- }
- def register_integration(*toolkitnames):
- """Decorator to register an event loop to integrate with the IPython kernel
- The decorator takes names to register the event loop as for the %gui magic.
- You can provide alternative names for the same toolkit.
- The decorated function should take a single argument, the IPython kernel
- instance, arrange for the event loop to yield the asyncio loop when a
- message is received by the main shell zmq stream or at least every
- ``kernel._poll_interval`` seconds, and start the event loop.
- :mod:`ipykernel.eventloops` provides and registers such functions
- for a few common event loops.
- """
- def decorator(func):
- """Integration registration decorator."""
- for name in toolkitnames:
- loop_map[name] = func
- func.exit_hook = lambda kernel: None # noqa: ARG005
- def exit_decorator(exit_func):
- """@func.exit is now a decorator
- to register a function to be called on exit
- """
- func.exit_hook = exit_func
- return exit_func
- func.exit = exit_decorator
- return func
- return decorator
- def get_shell_stream(kernel):
- # Return the zmq stream that receives messages for the main shell.
- if kernel._supports_kernel_subshells:
- manager = kernel.shell_channel_thread.manager
- socket_pair = manager.get_shell_channel_to_subshell_pair(None)
- return socket_pair.to_stream
- return kernel.shell_stream
- def _notify_stream_qt(kernel):
- import operator
- from functools import lru_cache
- from IPython.external.qt_for_kernel import QtCore
- try:
- from IPython.external.qt_for_kernel import enum_helper
- except ImportError:
- @lru_cache(None)
- def enum_helper(name):
- return operator.attrgetter(name.rpartition(".")[0])(sys.modules[QtCore.__package__])
- def exit_loop():
- """fall back to main loop"""
- kernel._qt_notifier.setEnabled(False)
- kernel.app.qt_event_loop.quit()
- def process_stream_events_wrap(shell_stream, *args, **kwargs):
- """fall back to main loop when there's a socket event"""
- # call flush to ensure that the stream doesn't lose events
- # due to our consuming of the edge-triggered FD
- # flush returns the number of events consumed.
- # if there were any, wake it up
- if shell_stream.flush(limit=1):
- exit_loop()
- shell_stream = get_shell_stream(kernel)
- process_stream_events = partial(process_stream_events_wrap, shell_stream)
- if not hasattr(kernel, "_qt_notifier"):
- fd = shell_stream.getsockopt(zmq.FD)
- kernel._qt_notifier = QtCore.QSocketNotifier(
- fd, enum_helper("QtCore.QSocketNotifier.Type").Read, kernel.app.qt_event_loop
- )
- kernel._qt_notifier.activated.connect(process_stream_events)
- else:
- kernel._qt_notifier.setEnabled(True)
- # allow for scheduling exits from the loop in case a timeout needs to
- # be set from the kernel level
- def _schedule_exit(delay):
- """schedule fall back to main loop in [delay] seconds"""
- # The signatures of QtCore.QTimer.singleShot are inconsistent between PySide and PyQt
- # if setting the TimerType, so we create a timer explicitly and store it
- # to avoid a memory leak.
- # PreciseTimer is needed so we exit after _at least_ the specified delay, not within 5% of it
- if not hasattr(kernel, "_qt_timer"):
- kernel._qt_timer = QtCore.QTimer(kernel.app)
- kernel._qt_timer.setSingleShot(True)
- kernel._qt_timer.setTimerType(enum_helper("QtCore.Qt.TimerType").PreciseTimer)
- kernel._qt_timer.timeout.connect(exit_loop)
- kernel._qt_timer.start(int(1000 * delay))
- loop_qt._schedule_exit = _schedule_exit
- # there may already be unprocessed events waiting.
- # these events will not wake zmq's edge-triggered FD
- # since edge-triggered notification only occurs on new i/o activity.
- # process all the waiting events immediately
- # so we start in a clean state ensuring that any new i/o events will notify.
- # schedule first call on the eventloop as soon as it's running,
- # so we don't block here processing events
- QtCore.QTimer.singleShot(0, process_stream_events)
- @register_integration("qt", "qt5", "qt6")
- def loop_qt(kernel):
- """Event loop for all supported versions of Qt."""
- _notify_stream_qt(kernel) # install hook to stop event loop.
- # Start the event loop.
- kernel.app._in_event_loop = True
- # `exec` blocks until there's ZMQ activity.
- el = kernel.app.qt_event_loop # for brevity
- el.exec() if hasattr(el, "exec") else el.exec_()
- kernel.app._in_event_loop = False
- # NOTE: To be removed in version 7
- loop_qt5 = loop_qt
- # exit and watch are the same for qt 4 and 5
- @loop_qt.exit
- def loop_qt_exit(kernel):
- kernel.app.exit()
- def _loop_wx(app):
- """Inner-loop for running the Wx eventloop
- Pulled from guisupport.start_event_loop in IPython < 5.2,
- since IPython 5.2 only checks `get_ipython().active_eventloop` is defined,
- rather than if the eventloop is actually running.
- """
- app._in_event_loop = True
- app.MainLoop()
- app._in_event_loop = False
- @register_integration("wx")
- def loop_wx(kernel):
- """Start a kernel with wx event loop support."""
- import wx
- # We have to put the wx.Timer in a wx.Frame for it to fire properly.
- # We make the Frame hidden when we create it in the main app below.
- class TimerFrame(wx.Frame): # type:ignore[misc]
- def __init__(self, kernel):
- self.kernel = kernel
- self.shell_stream = get_shell_stream(kernel)
- wx.Frame.__init__(self, None, -1)
- self.timer = wx.Timer(self)
- self.Bind(wx.EVT_CLOSE, self.on_exit)
- self.Bind(wx.EVT_TIMER, self.on_timer)
- # Units for the timer are in milliseconds
- self.timer.Start(int(1000 * self.kernel._poll_interval))
- def wake(self):
- """wake from wx"""
- try:
- if self.shell_stream.flush(limit=1):
- self.kernel.app.ExitMainLoop()
- except Exception:
- pass
- def on_timer(self, event):
- self.wake()
- def on_exit(self, event):
- self.timer.Stop()
- self.wake()
- self.Destroy()
- # We need a custom wx.App to create our Frame subclass that has the
- # wx.Timer to defer back to the tornado event loop.
- class IPWxApp(wx.App): # type:ignore[misc]
- def OnInit(self):
- self.frame = TimerFrame(kernel)
- self.frame.Show(False)
- return True
- # The redirect=False here makes sure that wx doesn't replace
- # sys.stdout/stderr with its own classes.
- if not (getattr(kernel, "app", None) and isinstance(kernel.app, wx.App)):
- kernel.app = IPWxApp(redirect=False)
- # The import of wx on Linux sets the handler for signal.SIGINT
- # to 0. This is a bug in wx or gtk. We fix by just setting it
- # back to the Python default.
- import signal
- if not callable(signal.getsignal(signal.SIGINT)):
- signal.signal(signal.SIGINT, signal.default_int_handler)
- _loop_wx(kernel.app)
- @loop_wx.exit
- def loop_wx_exit(kernel):
- """Exit the wx loop."""
- import wx
- wx.Exit()
- @register_integration("tk")
- def loop_tk(kernel):
- """Start a kernel with the Tk event loop."""
- from tkinter import READABLE, Tk
- app = Tk()
- # Capability detection:
- # per https://docs.python.org/3/library/tkinter.html#file-handlers
- # file handlers are not available on Windows
- if hasattr(app, "createfilehandler"):
- # A basic wrapper for structural similarity with the Windows version
- class BasicAppWrapper:
- def __init__(self, app):
- self.app = app
- self.app.withdraw()
- def exit_loop():
- """fall back to main loop"""
- app.tk.deletefilehandler(shell_stream.getsockopt(zmq.FD))
- app.quit()
- app.destroy()
- del kernel.app_wrapper
- def process_stream_events_wrap(shell_stream, *a, **kw):
- """fall back to main loop when there's a socket event"""
- if shell_stream.flush(limit=1):
- exit_loop()
- # allow for scheduling exits from the loop in case a timeout needs to
- # be set from the kernel level
- def _schedule_exit(delay):
- """schedule fall back to main loop in [delay] seconds"""
- app.after(int(1000 * delay), exit_loop)
- loop_tk._schedule_exit = _schedule_exit
- # For Tkinter, we create a Tk object and call its withdraw method.
- kernel.app_wrapper = BasicAppWrapper(app)
- shell_stream = get_shell_stream(kernel)
- process_stream_events = partial(process_stream_events_wrap, shell_stream)
- app.tk.createfilehandler(shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events)
- # schedule initial call after start
- app.after(0, process_stream_events)
- app.mainloop()
- else:
- import asyncio
- import nest_asyncio
- nest_asyncio.apply()
- # Tk uses milliseconds
- poll_interval = int(1000 * kernel._poll_interval)
- shell_stream = get_shell_stream(kernel)
- class TimedAppWrapper:
- def __init__(self, app, shell_stream):
- self.app = app
- self.shell_stream = shell_stream
- self.app.withdraw()
- async def func(self):
- self.shell_stream.flush(limit=1)
- def on_timer(self):
- loop = asyncio.get_event_loop()
- try:
- loop.run_until_complete(self.func())
- except Exception:
- kernel.log.exception("Error in message handler")
- self.app.after(poll_interval, self.on_timer)
- def start(self):
- self.on_timer() # Call it once to get things going.
- self.app.mainloop()
- kernel.app_wrapper = TimedAppWrapper(app, shell_stream)
- kernel.app_wrapper.start()
- @loop_tk.exit
- def loop_tk_exit(kernel):
- """Exit the tk loop."""
- try:
- kernel.app_wrapper.app.quit()
- kernel.app_wrapper.app.destroy()
- del kernel.app_wrapper
- kernel.eventloop = None
- except (RuntimeError, AttributeError):
- pass
- @register_integration("gtk")
- def loop_gtk(kernel):
- """Start the kernel, coordinating with the GTK event loop"""
- from .gui.gtkembed import GTKEmbed
- gtk_kernel = GTKEmbed(kernel)
- gtk_kernel.start()
- kernel._gtk = gtk_kernel
- @loop_gtk.exit
- def loop_gtk_exit(kernel):
- """Exit the gtk loop."""
- kernel._gtk.stop()
- @register_integration("gtk3")
- def loop_gtk3(kernel):
- """Start the kernel, coordinating with the GTK event loop"""
- from .gui.gtk3embed import GTKEmbed
- gtk_kernel = GTKEmbed(kernel)
- gtk_kernel.start()
- kernel._gtk = gtk_kernel
- @loop_gtk3.exit
- def loop_gtk3_exit(kernel):
- """Exit the gtk3 loop."""
- kernel._gtk.stop()
- @register_integration("osx", "macosx")
- def loop_cocoa(kernel):
- """Start the kernel, coordinating with the Cocoa CFRunLoop event loop
- via the matplotlib MacOSX backend.
- """
- from ._eventloop_macos import mainloop, stop
- real_excepthook = sys.excepthook
- shell_stream = get_shell_stream(kernel)
- def handle_int(etype, value, tb):
- """don't let KeyboardInterrupts look like crashes"""
- # wake the eventloop when we get a signal
- stop()
- if etype is KeyboardInterrupt:
- print("KeyboardInterrupt caught in CFRunLoop", file=sys.__stdout__)
- else:
- real_excepthook(etype, value, tb)
- while not kernel.shell.exit_now:
- try:
- # double nested try/except, to properly catch KeyboardInterrupt
- # due to pyzmq Issue #130
- try:
- # don't let interrupts during mainloop invoke crash_handler:
- sys.excepthook = handle_int
- mainloop(kernel._poll_interval)
- if shell_stream.flush(limit=1):
- # events to process, return control to kernel
- return
- except BaseException:
- raise
- except KeyboardInterrupt:
- # Ctrl-C shouldn't crash the kernel
- print("KeyboardInterrupt caught in kernel", file=sys.__stdout__)
- finally:
- # ensure excepthook is restored
- sys.excepthook = real_excepthook
- @loop_cocoa.exit
- def loop_cocoa_exit(kernel):
- """Exit the cocoa loop."""
- from ._eventloop_macos import stop
- stop()
- @register_integration("asyncio")
- def loop_asyncio(kernel):
- """Start a kernel with asyncio event loop support."""
- import asyncio
- loop = asyncio.get_event_loop()
- # loop is already running (e.g. tornado 5), nothing left to do
- if loop.is_running():
- return
- if loop.is_closed():
- # main loop is closed, create a new one
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop._should_close = False # type:ignore[attr-defined]
- # pause eventloop when there's an event on a zmq socket
- def process_stream_events(shell_stream):
- """fall back to main loop when there's a socket event"""
- if shell_stream.flush(limit=1):
- loop.stop()
- shell_stream = get_shell_stream(kernel)
- notifier = partial(process_stream_events, shell_stream)
- loop.add_reader(shell_stream.getsockopt(zmq.FD), notifier)
- loop.call_soon(notifier)
- while True:
- error = None
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- continue
- except Exception as e:
- error = e
- if loop._should_close: # type:ignore[attr-defined]
- loop.close()
- if error is not None:
- raise error
- break
- @loop_asyncio.exit
- def loop_asyncio_exit(kernel):
- """Exit hook for asyncio"""
- import asyncio
- loop = asyncio.get_event_loop()
- async def close_loop():
- if hasattr(loop, "shutdown_asyncgens"):
- yield loop.shutdown_asyncgens()
- loop._should_close = True # type:ignore[attr-defined]
- loop.stop()
- if loop.is_running():
- close_loop()
- elif not loop.is_closed():
- loop.run_until_complete(close_loop) # type:ignore[arg-type]
- loop.close()
- def set_qt_api_env_from_gui(gui):
- """
- Sets the QT_API environment variable by trying to import PyQtx or PySidex.
- The user can generically request `qt` or a specific Qt version, e.g. `qt6`.
- For a generic Qt request, we let the mechanism in IPython choose the best
- available version by leaving the `QT_API` environment variable blank.
- For specific versions, we check to see whether the PyQt or PySide
- implementations are present and set `QT_API` accordingly to indicate to
- IPython which version we want. If neither implementation is present, we
- leave the environment variable set so IPython will generate a helpful error
- message.
- Notes
- -----
- - If the environment variable is already set, it will be used unchanged,
- regardless of what the user requested.
- """
- qt_api = os.environ.get("QT_API", None)
- from IPython.external.qt_loaders import (
- QT_API_PYQT5,
- QT_API_PYQT6,
- QT_API_PYSIDE2,
- QT_API_PYSIDE6,
- loaded_api,
- )
- loaded = loaded_api()
- qt_env2gui = {
- QT_API_PYSIDE2: "qt5",
- QT_API_PYQT5: "qt5",
- QT_API_PYSIDE6: "qt6",
- QT_API_PYQT6: "qt6",
- }
- if loaded is not None and gui != "qt" and qt_env2gui[loaded] != gui:
- print(f"Cannot switch Qt versions for this session; you must use {qt_env2gui[loaded]}.")
- return
- if qt_api is not None and gui != "qt":
- if qt_env2gui[qt_api] != gui:
- print(
- f'Request for "{gui}" will be ignored because `QT_API` '
- f'environment variable is set to "{qt_api}"'
- )
- return
- else:
- if gui == "qt5":
- try:
- import PyQt5 # noqa: F401
- os.environ["QT_API"] = "pyqt5"
- except ImportError:
- try:
- import PySide2 # noqa: F401
- os.environ["QT_API"] = "pyside2"
- except ImportError:
- os.environ["QT_API"] = "pyqt5"
- elif gui == "qt6":
- try:
- import PyQt6 # noqa: F401
- os.environ["QT_API"] = "pyqt6"
- except ImportError:
- try:
- import PySide6 # noqa: F401
- os.environ["QT_API"] = "pyside6"
- except ImportError:
- os.environ["QT_API"] = "pyqt6"
- elif gui == "qt":
- # Don't set QT_API; let IPython logic choose the version.
- if "QT_API" in os.environ:
- del os.environ["QT_API"]
- else:
- print(f'Unrecognized Qt version: {gui}. Should be "qt5", "qt6", or "qt".')
- return
- # Do the actual import now that the environment variable is set to make sure it works.
- try:
- pass
- except Exception as e:
- # Clear the environment variable for the next attempt.
- if "QT_API" in os.environ:
- del os.environ["QT_API"]
- print(f"QT_API couldn't be set due to error {e}")
- return
- def make_qt_app_for_kernel(gui, kernel):
- """Sets the `QT_API` environment variable if it isn't already set."""
- if hasattr(kernel, "app"):
- # Kernel is already running a Qt event loop, so there's no need to
- # create another app for it.
- return
- set_qt_api_env_from_gui(gui)
- # This import is guaranteed to work now:
- from IPython.external.qt_for_kernel import QtCore
- from IPython.lib.guisupport import get_app_qt4
- kernel.app = get_app_qt4([" "])
- kernel.app.qt_event_loop = QtCore.QEventLoop(kernel.app)
- def enable_gui(gui, kernel=None):
- """Enable integration with a given GUI"""
- if gui not in loop_map:
- e = f"Invalid GUI request {gui!r}, valid ones are:{loop_map.keys()}"
- raise ValueError(e)
- if kernel is None:
- if Application.initialized():
- kernel = getattr(Application.instance(), "kernel", None)
- if kernel is None:
- msg = (
- "You didn't specify a kernel,"
- " and no IPython Application with a kernel appears to be running."
- )
- raise RuntimeError(msg)
- if gui is None:
- # User wants to turn off integration; clear any evidence if Qt was the last one.
- if hasattr(kernel, "app"):
- delattr(kernel, "app")
- if hasattr(kernel, "_qt_notifier"):
- delattr(kernel, "_qt_notifier")
- if hasattr(kernel, "_qt_timer"):
- delattr(kernel, "_qt_timer")
- else:
- if gui.startswith("qt"):
- # Prepare the kernel here so any exceptions are displayed in the client.
- make_qt_app_for_kernel(gui, kernel)
- loop = loop_map[gui]
- if (
- loop and kernel.eventloop is not None and kernel.eventloop is not loop # type:ignore[unreachable]
- ):
- msg = "Cannot activate multiple GUI eventloops" # type:ignore[unreachable]
- raise RuntimeError(msg)
- kernel.eventloop = loop
- # We set `eventloop`; the function the user chose is executed in `Kernel.enter_eventloop`, thus
- # any exceptions raised during the event loop will not be shown in the client.
|