| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546 |
- """Base class for a kernel that talks to frontends over 0MQ."""
- # Copyright (c) IPython Development Team.
- # Distributed under the terms of the Modified BSD License.
- from __future__ import annotations
- import asyncio
- import inspect
- import logging
- import os
- import socket
- import sys
- import threading
- import time
- import typing as t
- import uuid
- import warnings
- from collections.abc import Mapping
- from contextvars import Context, ContextVar, copy_context
- from datetime import datetime
- from functools import partial
- from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal
- from .thread import CONTROL_THREAD_NAME
- if sys.platform != "win32":
- from signal import SIGKILL
- else:
- SIGKILL = "windown-SIGKILL-sentinel"
- try:
- # jupyter_client >= 5, use tz-aware now
- from jupyter_client.session import utcnow as now
- except ImportError:
- # jupyter_client < 5, use local now()
- now = datetime.now
- import psutil
- import zmq
- from IPython.core.error import StdinNotImplementedError
- from jupyter_client.session import Session
- from tornado import ioloop
- from traitlets.config.configurable import SingletonConfigurable
- from traitlets.traitlets import (
- Any,
- Bool,
- Dict,
- Float,
- Instance,
- Integer,
- List,
- Unicode,
- default,
- observe,
- )
- from zmq.eventloop.zmqstream import ZMQStream
- from ipykernel.jsonutil import json_clean
- from ._version import kernel_protocol_version
- from .iostream import OutStream
- from .utils import LazyDict, _async_in_context
- _AWAITABLE_MESSAGE: str = (
- "For consistency across implementations, it is recommended that `{func_name}`"
- " either be a coroutine function (`async def`) or return an awaitable object"
- " (like an `asyncio.Future`). It might become a requirement in the future."
- " Coroutine functions and awaitables have been supported since"
- " ipykernel 6.0 (2021). {target} does not seem to return an awaitable"
- )
- T = t.TypeVar("T")
- def _accepts_parameters(meth, param_names):
- parameters = inspect.signature(meth).parameters
- accepts = dict.fromkeys(param_names, False)
- for param in param_names:
- param_spec = parameters.get(param)
- accepts[param] = (
- param_spec
- and param_spec.kind in [param_spec.KEYWORD_ONLY, param_spec.POSITIONAL_OR_KEYWORD]
- ) or any(p.kind == p.VAR_KEYWORD for p in parameters.values())
- return accepts
- class Kernel(SingletonConfigurable):
- """The base kernel class."""
- # ---------------------------------------------------------------------------
- # Kernel interface
- # ---------------------------------------------------------------------------
- # attribute to override with a GUI
- eventloop = Any(None)
- processes: dict[str, psutil.Process] = {}
- @observe("eventloop")
- def _update_eventloop(self, change):
- """schedule call to eventloop from IOLoop"""
- loop = ioloop.IOLoop.current()
- if change.new is not None:
- loop.add_callback(self.enter_eventloop)
- session = Instance(Session, allow_none=True)
- profile_dir = Instance("IPython.core.profiledir.ProfileDir", allow_none=True)
- shell_stream = Instance(ZMQStream, allow_none=True)
- shell_streams: List[t.Any] = List(
- help="""Deprecated shell_streams alias. Use shell_stream
- .. versionchanged:: 6.0
- shell_streams is deprecated. Use shell_stream.
- """
- )
- implementation: str
- implementation_version: str
- banner: str
- @default("shell_streams")
- def _shell_streams_default(self): # pragma: no cover
- warnings.warn(
- "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream",
- DeprecationWarning,
- stacklevel=2,
- )
- if self.shell_stream is not None:
- return [self.shell_stream]
- return []
- @observe("shell_streams")
- def _shell_streams_changed(self, change): # pragma: no cover
- warnings.warn(
- "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream",
- DeprecationWarning,
- stacklevel=2,
- )
- if len(change.new) > 1:
- warnings.warn(
- "Kernel only supports one shell stream. Additional streams will be ignored.",
- RuntimeWarning,
- stacklevel=2,
- )
- if change.new:
- self.shell_stream = change.new[0]
- control_stream = Instance(ZMQStream, allow_none=True)
- debug_shell_socket = Any()
- control_thread = Any()
- shell_channel_thread = Any()
- iopub_socket = Any()
- iopub_thread = Any()
- stdin_socket = Any()
- log: logging.Logger = Instance(logging.Logger, allow_none=True) # type:ignore[assignment]
- # identities:
- int_id = Integer(-1)
- ident = Unicode()
- @default("ident")
- def _default_ident(self):
- return str(uuid.uuid4())
- # This should be overridden by wrapper kernels that implement any real
- # language.
- language_info: dict[str, object] = {}
- # any links that should go in the help menu
- help_links: List[dict[str, str]] = List()
- # Experimental option to break in non-user code.
- # The ipykernel source is in the call stack, so the user
- # has to manipulate the step-over and step-into in a wize way.
- debug_just_my_code = Bool(
- False,
- help="""Set to False if you want to debug python standard and dependent libraries.
- """,
- ).tag(config=True)
- # Experimental option to filter internal frames from the stack trace and stepping.
- filter_internal_frames = Bool(
- True,
- help="""Set to False if you want to debug kernel modules.
- """,
- ).tag(config=True)
- # track associations with current request
- # Private interface
- _darwin_app_nap = Bool(
- True,
- help="""Whether to use appnope for compatibility with OS X App Nap.
- Only affects OS X >= 10.9.
- """,
- ).tag(config=True)
- # track associations with current request
- _allow_stdin = Bool(False)
- _control_parent: Dict[str, t.Any] = Dict({})
- _control_parent_ident: bytes = b""
- _shell_parent: ContextVar[dict[str, Any]]
- _shell_parent_ident: ContextVar[bytes]
- _shell_context: Context
- # Kept for backward-compatibility, accesses _control_parent_ident and _shell_parent_ident,
- # see https://github.com/jupyterlab/jupyterlab/issues/17785
- _parent_ident: Mapping[str, bytes]
- # Asyncio lock for main shell thread.
- _main_asyncio_lock: asyncio.Lock
- @property
- def _parent_header(self):
- warnings.warn(
- "Kernel._parent_header is deprecated in ipykernel 6. Use .get_parent()",
- DeprecationWarning,
- stacklevel=2,
- )
- return self.get_parent()
- # Time to sleep after flushing the stdout/err buffers in each execute
- # cycle. While this introduces a hard limit on the minimal latency of the
- # execute cycle, it helps prevent output synchronization problems for
- # clients.
- # Units are in seconds. The minimum zmq latency on local host is probably
- # ~150 microseconds, set this to 500us for now. We may need to increase it
- # a little if it's not enough after more interactive testing.
- _execute_sleep = Float(0.0005).tag(config=True)
- # Frequency of the kernel's event loop.
- # Units are in seconds, kernel subclasses for GUI toolkits may need to
- # adapt to milliseconds.
- _poll_interval = Float(0.01).tag(config=True)
- stop_on_error_timeout = Float(
- 0.0,
- config=True,
- help="""time (in seconds) to wait for messages to arrive
- when aborting queued requests after an error.
- Requests that arrive within this window after an error
- will be cancelled.
- Increase in the event of unusually slow network
- causing significant delays,
- which can manifest as e.g. "Run all" in a notebook
- aborting some, but not all, messages after an error.
- """,
- )
- # If the shutdown was requested over the network, we leave here the
- # necessary reply message so it can be sent by our registered atexit
- # handler. This ensures that the reply is only sent to clients truly at
- # the end of our shutdown process (which happens after the underlying
- # IPython shell's own shutdown).
- _shutdown_message = None
- # This is a dict of port number that the kernel is listening on. It is set
- # by record_ports and used by connect_request.
- _recorded_ports = Dict()
- # Track execution count here. For IPython, we override this to use the
- # execution count we store in the shell.
- execution_count = 0
- # Asyncio lock to ensure only one control queue message is processed at a time.
- _control_lock = Instance(asyncio.Lock)
- msg_types = [
- "execute_request",
- "complete_request",
- "inspect_request",
- "history_request",
- "comm_info_request",
- "kernel_info_request",
- "connect_request",
- "shutdown_request",
- "is_complete_request",
- "interrupt_request",
- ]
- # control channel accepts all shell messages
- # and some of its own
- control_msg_types = [
- *msg_types,
- "debug_request",
- "usage_request",
- "create_subshell_request",
- "delete_subshell_request",
- "list_subshell_request",
- ]
- def __init__(self, **kwargs):
- """Initialize the kernel."""
- super().__init__(**kwargs)
- # Kernel application may swap stdout and stderr to OutStream,
- # which is the case in `IPKernelApp.init_io`, hence `sys.stdout`
- # can already by different from TextIO at initialization time.
- self._stdout: OutStream | t.TextIO = sys.stdout
- self._stderr: OutStream | t.TextIO = sys.stderr
- # Build dict of handlers for message types
- self.shell_handlers = {}
- for msg_type in self.msg_types:
- self.shell_handlers[msg_type] = getattr(self, msg_type)
- self.control_handlers = {}
- for msg_type in self.control_msg_types:
- self.control_handlers[msg_type] = getattr(self, msg_type)
- # Storing the accepted parameters for do_execute, used in execute_request
- self._do_exec_accepted_params = _accepts_parameters(
- self.do_execute, ["cell_meta", "cell_id"]
- )
- self._control_parent = {}
- self._control_parent_ident = b""
- self._shell_parent = ContextVar("shell_parent")
- self._shell_parent.set({})
- self._shell_parent_ident = ContextVar("shell_parent_ident")
- self._shell_parent_ident.set(b"")
- self._shell_context = copy_context()
- # For backward compatibility so that _parent_ident["shell"] and _parent_ident["control"]
- # work as they used to for ipykernel >= 7
- self._parent_ident = LazyDict(
- {
- "control": lambda: self._control_parent_ident,
- "shell": lambda: self._get_shell_context_var(self._shell_parent_ident),
- }
- )
- self._main_asyncio_lock = asyncio.Lock()
- async def dispatch_control(self, msg):
- """Dispatch a control request, ensuring only one message is processed at a time."""
- # Ensure only one control message is processed at a time
- async with self._control_lock:
- await self.process_control(msg)
- async def process_control(self, msg):
- """dispatch control requests"""
- if not self.session:
- return
- idents, msg = self.session.feed_identities(msg, copy=False)
- try:
- msg = self.session.deserialize(msg, content=True, copy=False)
- except Exception:
- self.log.error("Invalid Control Message", exc_info=True) # noqa: G201
- return
- self.log.debug("Control received: %s", msg)
- # Set the parent message for side effects.
- self.set_parent(idents, msg, channel="control")
- self._publish_status("busy", "control")
- header = msg["header"]
- msg_type = header["msg_type"]
- handler = self.control_handlers.get(msg_type, None)
- if handler is None:
- self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
- else:
- try:
- result = handler(self.control_stream, idents, msg)
- if inspect.isawaitable(result):
- await result
- except Exception:
- self.log.error("Exception in control handler:", exc_info=True) # noqa: G201
- if sys.stdout is not None:
- sys.stdout.flush()
- if sys.stderr is not None:
- sys.stderr.flush()
- self._publish_status_and_flush("idle", "control", self.control_stream)
- def should_handle(self, stream, msg, idents):
- """Check whether a shell-channel message should be handled
- Allows subclasses to prevent handling of certain messages (e.g. aborted requests).
- .. versionchanged:: 7
- Subclass should_handle _may_ be async.
- Base class implementation is not async.
- """
- return True
- async def dispatch_shell(self, msg, /, subshell_id: str | None = None):
- """dispatch shell requests"""
- if len(msg) == 1 and msg[0].buffer == b"stop aborting":
- # Dummy "stop aborting" message to stop aborting execute requests on this subshell.
- # This dummy message implementation allows the subshell to abort messages that are
- # already queued in the zmq sockets/streams without having to know any of their
- # details in advance.
- if subshell_id is None:
- self._aborting = False
- else:
- self.shell_channel_thread.manager.set_subshell_aborting(subshell_id, False)
- return
- if not self.session:
- return
- if self._supports_kernel_subshells:
- assert threading.current_thread() not in (
- self.control_thread,
- self.shell_channel_thread,
- )
- idents, msg = self.session.feed_identities(msg, copy=False)
- try:
- msg = self.session.deserialize(msg, content=True, copy=False)
- except Exception:
- self.log.error("Invalid Message", exc_info=True) # noqa: G201
- return
- # Set the parent message for side effects.
- self.set_parent(idents, msg, channel="shell")
- self._publish_status("busy", "shell")
- msg_type = msg["header"]["msg_type"]
- assert msg["header"].get("subshell_id") == subshell_id
- if self._supports_kernel_subshells:
- stream = self.shell_channel_thread.manager.get_subshell_to_shell_channel_socket(
- subshell_id
- )
- else:
- stream = self.shell_stream
- # Only abort execute requests
- if msg_type == "execute_request":
- if subshell_id is None:
- aborting = self._aborting # type:ignore[unreachable]
- else:
- aborting = self.shell_channel_thread.manager.get_subshell_aborting(subshell_id)
- if aborting:
- self._send_abort_reply(stream, msg, idents)
- self._publish_status_and_flush("idle", "shell", stream)
- return
- # Print some info about this message and leave a '--->' marker, so it's
- # easier to trace visually the message chain when debugging. Each
- # handler prints its message at the end.
- self.log.debug("\n*** MESSAGE TYPE:%s***", msg_type)
- self.log.debug(" Content: %s\n --->\n ", msg["content"])
- should_handle: bool | t.Awaitable[bool] = self.should_handle(stream, msg, idents)
- if inspect.isawaitable(should_handle):
- should_handle = await should_handle
- if not should_handle:
- self._publish_status_and_flush("idle", "shell", stream)
- self.log.debug("Not handling %s:%s", msg_type, msg["header"].get("msg_id"))
- return
- handler = self.shell_handlers.get(msg_type, None)
- if handler is None:
- self.log.warning("Unknown message type: %r", msg_type)
- else:
- self.log.debug("%s: %s", msg_type, msg)
- try:
- self.pre_handler_hook()
- except Exception:
- self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True)
- try:
- result = handler(stream, idents, msg)
- if inspect.isawaitable(result):
- await result
- except Exception:
- self.log.error("Exception in message handler:", exc_info=True) # noqa: G201
- except KeyboardInterrupt:
- # Ctrl-c shouldn't crash the kernel here.
- self.log.error("KeyboardInterrupt caught in kernel.")
- finally:
- try:
- self.post_handler_hook()
- except Exception:
- self.log.debug("Unable to signal in post_handler_hook:", exc_info=True)
- if sys.stdout is not None:
- sys.stdout.flush()
- if sys.stderr is not None:
- sys.stderr.flush()
- self._publish_status_and_flush("idle", "shell", stream)
- def pre_handler_hook(self):
- """Hook to execute before calling message handler"""
- # ensure default_int_handler during handler call
- self.saved_sigint_handler = signal(SIGINT, default_int_handler)
- def post_handler_hook(self):
- """Hook to execute after calling message handler"""
- signal(SIGINT, self.saved_sigint_handler)
- def enter_eventloop(self):
- """enter eventloop"""
- self.log.info("Entering eventloop %s", self.eventloop)
- # record handle, so we can check when this changes
- eventloop = self.eventloop
- if eventloop is None:
- self.log.info("Exiting as there is no eventloop")
- return
- async def advance_eventloop():
- # check if eventloop changed:
- if self.eventloop is not eventloop:
- self.log.info("exiting eventloop %s", eventloop)
- return
- self.log.debug("Advancing eventloop %s", eventloop)
- try:
- eventloop(self)
- except KeyboardInterrupt:
- # Ctrl-C shouldn't crash the kernel
- self.log.error("KeyboardInterrupt caught in kernel")
- if self.eventloop is eventloop:
- # schedule advance again
- schedule_next()
- def schedule_next():
- """Schedule the next advance of the eventloop"""
- # call_later allows the io_loop to process other events if needed.
- # Going through schedule_dispatch ensures all other dispatches on msg_queue
- # are processed before we enter the eventloop, even if the previous dispatch was
- # already consumed from the queue by process_one and the queue is
- # technically empty.
- self.log.debug("Scheduling eventloop advance")
- self.io_loop.call_later(0.001, advance_eventloop)
- # begin polling the eventloop
- schedule_next()
- async def _create_control_lock(self):
- # This can be removed when minimum python increases to 3.10
- self._control_lock = asyncio.Lock()
- def start(self):
- """register dispatchers for streams"""
- self.io_loop = ioloop.IOLoop.current()
- if self.control_stream:
- self.control_stream.on_recv(self.dispatch_control, copy=False)
- if self.control_thread and sys.version_info < (3, 10):
- # Before Python 3.10 we need to ensure the _control_lock is created in the
- # thread that uses it. When our minimum python is 3.10 we can remove this
- # and always use the else below, or just assign it where it is declared.
- self.control_thread.io_loop.add_callback(self._create_control_lock)
- else:
- self._control_lock = asyncio.Lock()
- if self.shell_stream:
- if self.shell_channel_thread:
- self.shell_channel_thread.manager.set_on_recv_callback(self.shell_main)
- self.shell_stream.on_recv(self.shell_channel_thread_main, copy=False)
- else:
- self.shell_stream.on_recv(
- _async_in_context(partial(self.shell_main, None)),
- copy=False,
- )
- # publish idle status
- self._publish_status("starting", "shell")
- async def shell_channel_thread_main(self, msg):
- """Handler for shell messages received on shell_channel_thread"""
- assert threading.current_thread() == self.shell_channel_thread
- async with self.shell_channel_thread.asyncio_lock:
- if self.session is None:
- return
- # deserialize only the header to get subshell_id
- # Keep original message to send to subshell_id unmodified.
- _, msg2 = self.session.feed_identities(msg, copy=False)
- try:
- msg3 = self.session.deserialize(msg2, content=False, copy=False)
- subshell_id = msg3["header"].get("subshell_id")
- # Find inproc pair socket to use to send message to correct subshell.
- subshell_manager = self.shell_channel_thread.manager
- socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id)
- assert socket is not None
- socket.send_multipart(msg, copy=False)
- except Exception:
- self.log.error("Invalid message", exc_info=True) # noqa: G201
- async def shell_main(self, subshell_id: str | None, msg):
- """Handler of shell messages for a single subshell"""
- if self._supports_kernel_subshells:
- if subshell_id is None:
- assert threading.current_thread() == self.shell_channel_thread.parent_thread
- asyncio_lock = self._main_asyncio_lock
- else:
- assert threading.current_thread() not in (
- self.shell_channel_thread,
- self.shell_channel_thread.parent_thread,
- )
- asyncio_lock = self.shell_channel_thread.manager.get_subshell_asyncio_lock(
- subshell_id
- )
- else:
- assert subshell_id is None
- asyncio_lock = self._main_asyncio_lock
- # Whilst executing a shell message, do not accept any other shell messages on the
- # same subshell, so that cells are run sequentially. Without this we can run multiple
- # async cells at the same time which would be a nice feature to have but is an API
- # change.
- assert asyncio_lock is not None
- async with asyncio_lock:
- await self.dispatch_shell(msg, subshell_id=subshell_id)
- def record_ports(self, ports):
- """Record the ports that this kernel is using.
- The creator of the Kernel instance must call this methods if they
- want the :meth:`connect_request` method to return the port numbers.
- """
- self._recorded_ports = ports
- # ---------------------------------------------------------------------------
- # Kernel request handlers
- # ---------------------------------------------------------------------------
- def _publish_execute_input(self, code, parent, execution_count):
- """Publish the code request on the iopub stream."""
- if not self.session:
- return
- self.session.send(
- self.iopub_socket,
- "execute_input",
- {"code": code, "execution_count": execution_count},
- parent=parent,
- ident=self._topic("execute_input"),
- )
- def _publish_status(self, status, channel, parent=None):
- """send status (busy/idle) on IOPub"""
- if not self.session:
- return
- self.session.send(
- self.iopub_socket,
- "status",
- {"execution_state": status},
- parent=parent or self.get_parent(channel),
- ident=self._topic("status"),
- )
- def _publish_status_and_flush(self, status, channel, stream, parent=None):
- """send status on IOPub and flush specified stream to ensure reply is sent before handling the next reply"""
- self._publish_status(status, channel, parent)
- if stream and hasattr(stream, "flush") and not self._supports_kernel_subshells:
- stream.flush(zmq.POLLOUT)
- def _publish_debug_event(self, event):
- if not self.session:
- return
- self.session.send(
- self.iopub_socket,
- "debug_event",
- event,
- parent=self.get_parent(),
- ident=self._topic("debug_event"),
- )
- def set_parent(self, ident, parent, channel="shell"):
- """Set the current parent request
- Side effects (IOPub messages) and replies are associated with
- the request that caused them via the parent_header.
- The parent identity is used to route input_request messages
- on the stdin channel.
- """
- if channel == "control":
- self._control_parent_ident = ident
- self._control_parent = parent
- else:
- self._shell_parent_ident.set(ident)
- self._shell_parent.set(parent)
- # preserve the last call to set_parent
- self._shell_context = copy_context()
- def get_parent(self, channel=None):
- """Get the parent request associated with a channel.
- .. versionadded:: 6
- Parameters
- ----------
- channel : str
- the name of the channel ('shell' or 'control')
- Returns
- -------
- message : dict
- the parent message for the most recent request on the channel.
- """
- if channel is None:
- # If a channel is not specified, get information from current thread
- if threading.current_thread().name == CONTROL_THREAD_NAME:
- channel = "control"
- else:
- channel = "shell"
- if channel == "control":
- return self._control_parent
- return self._get_shell_context_var(self._shell_parent)
- def _get_shell_context_var(self, var: ContextVar[T]) -> T:
- """Lookup a ContextVar, falling back on the shell context
- Allows for user-launched Threads to still resolve to the shell's main context
- necessary for e.g. display from threads.
- """
- try:
- return var.get()
- except LookupError:
- return self._shell_context[var]
- def send_response(
- self,
- stream,
- msg_or_type,
- content=None,
- ident=None,
- buffers=None,
- track=False,
- header=None,
- metadata=None,
- channel=None,
- ):
- """Send a response to the message we're currently processing.
- This accepts all the parameters of :meth:`jupyter_client.session.Session.send`
- except ``parent``.
- This relies on :meth:`set_parent` having been called for the current
- message.
- """
- if not self.session:
- return None
- return self.session.send(
- stream,
- msg_or_type,
- content,
- self.get_parent(channel),
- ident,
- buffers,
- track,
- header,
- metadata,
- )
- def init_metadata(self, parent):
- """Initialize metadata.
- Run at the beginning of execution requests.
- """
- # FIXME: `started` is part of ipyparallel
- # Remove for ipykernel 5.0
- return {
- "started": now(),
- }
- def finish_metadata(self, parent, metadata, reply_content):
- """Finish populating metadata.
- Run after completing an execution request.
- """
- return metadata
- async def execute_request(self, stream, ident, parent):
- """handle an execute_request"""
- if not self.session:
- return
- try:
- content = parent["content"]
- code = content["code"]
- silent = content.get("silent", False)
- store_history = content.get("store_history", not silent)
- user_expressions = content.get("user_expressions", {})
- allow_stdin = content.get("allow_stdin", False)
- cell_meta = parent.get("metadata", {})
- cell_id = cell_meta.get("cellId")
- except Exception:
- self.log.error("Got bad msg: ")
- self.log.error("%s", parent)
- return
- stop_on_error = content.get("stop_on_error", True)
- metadata = self.init_metadata(parent)
- # Re-broadcast our input for the benefit of listening clients, and
- # start computing output
- if not silent:
- self.execution_count += 1
- self._publish_execute_input(code, parent, self.execution_count)
- # Arguments based on the do_execute signature
- do_execute_args = {
- "code": code,
- "silent": silent,
- "store_history": store_history,
- "user_expressions": user_expressions,
- "allow_stdin": allow_stdin,
- }
- if self._do_exec_accepted_params["cell_meta"]:
- do_execute_args["cell_meta"] = cell_meta
- if self._do_exec_accepted_params["cell_id"]:
- do_execute_args["cell_id"] = cell_id
- subshell_id = parent["header"].get("subshell_id")
- # Call do_execute with the appropriate arguments
- reply_content = self.do_execute(**do_execute_args)
- if inspect.isawaitable(reply_content):
- reply_content = await reply_content
- else:
- warnings.warn(
- _AWAITABLE_MESSAGE.format(func_name="do_execute", target=self.do_execute),
- PendingDeprecationWarning,
- stacklevel=1,
- )
- # Flush output before sending the reply.
- if sys.stdout is not None:
- sys.stdout.flush()
- if sys.stderr is not None:
- sys.stderr.flush()
- # FIXME: on rare occasions, the flush doesn't seem to make it to the
- # clients... This seems to mitigate the problem, but we definitely need
- # to better understand what's going on.
- if self._execute_sleep:
- time.sleep(self._execute_sleep)
- # Send the reply.
- reply_content = json_clean(reply_content)
- metadata = self.finish_metadata(parent, metadata, reply_content)
- reply_msg: dict[str, t.Any] = self.session.send( # type:ignore[assignment]
- stream,
- "execute_reply",
- reply_content,
- parent,
- metadata=metadata,
- ident=ident,
- )
- self.log.debug("%s", reply_msg)
- if not silent and reply_msg["content"]["status"] == "error" and stop_on_error:
- subshell_id = parent["header"].get("subshell_id")
- self._abort_queues(subshell_id)
- def do_execute(
- self,
- code,
- silent,
- store_history=True,
- user_expressions=None,
- allow_stdin=False,
- *,
- cell_meta=None,
- cell_id=None,
- ):
- """Execute user code. Must be overridden by subclasses."""
- raise NotImplementedError
- async def complete_request(self, stream, ident, parent):
- """Handle a completion request."""
- if not self.session:
- return
- content = parent["content"]
- code = content["code"]
- cursor_pos = content["cursor_pos"]
- matches = self.do_complete(code, cursor_pos)
- if inspect.isawaitable(matches):
- matches = await matches
- else:
- warnings.warn(
- _AWAITABLE_MESSAGE.format(func_name="do_complete", target=self.do_complete),
- PendingDeprecationWarning,
- stacklevel=1,
- )
- matches = json_clean(matches)
- self.session.send(stream, "complete_reply", matches, parent, ident)
- def do_complete(self, code, cursor_pos):
- """Override in subclasses to find completions."""
- return {
- "matches": [],
- "cursor_end": cursor_pos,
- "cursor_start": cursor_pos,
- "metadata": {},
- "status": "ok",
- }
- async def inspect_request(self, stream, ident, parent):
- """Handle an inspect request."""
- if not self.session:
- return
- content = parent["content"]
- reply_content = self.do_inspect(
- content["code"],
- content["cursor_pos"],
- content.get("detail_level", 0),
- set(content.get("omit_sections", [])),
- )
- if inspect.isawaitable(reply_content):
- reply_content = await reply_content
- else:
- warnings.warn(
- _AWAITABLE_MESSAGE.format(func_name="do_inspect", target=self.do_inspect),
- PendingDeprecationWarning,
- stacklevel=1,
- )
- # Before we send this object over, we scrub it for JSON usage
- reply_content = json_clean(reply_content)
- msg = self.session.send(stream, "inspect_reply", reply_content, parent, ident)
- self.log.debug("%s", msg)
- def do_inspect(self, code, cursor_pos, detail_level=0, omit_sections=()):
- """Override in subclasses to allow introspection."""
- return {"status": "ok", "data": {}, "metadata": {}, "found": False}
- async def history_request(self, stream, ident, parent):
- """Handle a history request."""
- if not self.session:
- return
- content = parent["content"]
- reply_content = self.do_history(**content)
- if inspect.isawaitable(reply_content):
- reply_content = await reply_content
- else:
- warnings.warn(
- _AWAITABLE_MESSAGE.format(func_name="do_history", target=self.do_history),
- PendingDeprecationWarning,
- stacklevel=1,
- )
- reply_content = json_clean(reply_content)
- msg = self.session.send(stream, "history_reply", reply_content, parent, ident)
- self.log.debug("%s", msg)
- def do_history(
- self,
- hist_access_type,
- output,
- raw,
- session=None,
- start=None,
- stop=None,
- n=None,
- pattern=None,
- unique=False,
- ):
- """Override in subclasses to access history."""
- return {"status": "ok", "history": []}
- async def connect_request(self, stream, ident, parent):
- """Handle a connect request."""
- if not self.session:
- return
- content = self._recorded_ports.copy() if self._recorded_ports else {}
- content["status"] = "ok"
- msg = self.session.send(stream, "connect_reply", content, parent, ident)
- self.log.debug("%s", msg)
- @property
- def kernel_info(self):
- from .debugger import _is_debugpy_available
- supported_features: list[str] = []
- if self._supports_kernel_subshells:
- supported_features.append("kernel subshells")
- if _is_debugpy_available:
- supported_features.append("debugger")
- return {
- "protocol_version": kernel_protocol_version,
- "implementation": self.implementation,
- "implementation_version": self.implementation_version,
- "language_info": self.language_info,
- "banner": self.banner,
- "help_links": self.help_links,
- "supported_features": supported_features,
- }
- async def kernel_info_request(self, stream, ident, parent):
- """Handle a kernel info request."""
- if not self.session:
- return
- content = {"status": "ok"}
- content.update(self.kernel_info)
- msg = self.session.send(stream, "kernel_info_reply", content, parent, ident)
- self.log.debug("%s", msg)
- async def comm_info_request(self, stream, ident, parent):
- """Handle a comm info request."""
- if not self.session:
- return
- content = parent["content"]
- target_name = content.get("target_name", None)
- # Should this be moved to ipkernel?
- if hasattr(self, "comm_manager"):
- comms = {
- k: dict(target_name=v.target_name)
- for (k, v) in self.comm_manager.comms.items()
- if v.target_name == target_name or target_name is None
- }
- else:
- comms = {}
- reply_content = dict(comms=comms, status="ok")
- msg = self.session.send(stream, "comm_info_reply", reply_content, parent, ident)
- self.log.debug("%s", msg)
- def _send_interrupt_children(self):
- if os.name == "nt":
- self.log.error("Interrupt message not supported on Windows")
- else:
- pid = os.getpid()
- pgid = os.getpgid(pid)
- # Prefer process-group over process
- # but only if the kernel is the leader of the process group
- if pgid and pgid == pid and hasattr(os, "killpg"):
- try:
- os.killpg(pgid, SIGINT)
- except OSError:
- os.kill(pid, SIGINT)
- raise
- else:
- os.kill(pid, SIGINT)
- async def interrupt_request(self, stream, ident, parent):
- """Handle an interrupt request."""
- if not self.session:
- return
- content: dict[str, t.Any] = {"status": "ok"}
- try:
- self._send_interrupt_children()
- except OSError as err:
- import traceback
- content = {
- "status": "error",
- "traceback": traceback.format_stack(),
- "ename": str(type(err).__name__),
- "evalue": str(err),
- }
- self.session.send(stream, "interrupt_reply", content, parent, ident=ident)
- return
- async def shutdown_request(self, stream, ident, parent):
- """Handle a shutdown request."""
- if not self.session:
- return
- content = self.do_shutdown(parent["content"]["restart"])
- if inspect.isawaitable(content):
- content = await content
- else:
- warnings.warn(
- _AWAITABLE_MESSAGE.format(func_name="do_shutdown", target=self.do_shutdown),
- PendingDeprecationWarning,
- stacklevel=1,
- )
- self.session.send(stream, "shutdown_reply", content, parent, ident=ident)
- # same content, but different msg_id for broadcasting on IOPub
- self._shutdown_message = self.session.msg("shutdown_reply", content, parent)
- await self._at_shutdown()
- self.log.debug("Stopping control ioloop")
- if self.control_stream:
- control_io_loop = self.control_stream.io_loop
- control_io_loop.add_callback(control_io_loop.stop)
- self.log.debug("Stopping shell ioloop")
- self.io_loop.add_callback(self.io_loop.stop)
- if self.shell_stream and self.shell_stream.io_loop != self.io_loop:
- shell_io_loop = self.shell_stream.io_loop
- shell_io_loop.add_callback(shell_io_loop.stop)
- def do_shutdown(self, restart):
- """Override in subclasses to do things when the frontend shuts down the
- kernel.
- """
- return {"status": "ok", "restart": restart}
- async def is_complete_request(self, stream, ident, parent):
- """Handle an is_complete request."""
- if not self.session:
- return
- content = parent["content"]
- code = content["code"]
- reply_content = self.do_is_complete(code)
- if inspect.isawaitable(reply_content):
- reply_content = await reply_content
- else:
- warnings.warn(
- _AWAITABLE_MESSAGE.format(func_name="do_is_complete", target=self.do_is_complete),
- PendingDeprecationWarning,
- stacklevel=1,
- )
- reply_content = json_clean(reply_content)
- reply_msg = self.session.send(stream, "is_complete_reply", reply_content, parent, ident)
- self.log.debug("%s", reply_msg)
- def do_is_complete(self, code):
- """Override in subclasses to find completions."""
- return {"status": "unknown"}
- async def debug_request(self, stream, ident, parent):
- """Handle a debug request."""
- if not self.session:
- return
- content = parent["content"]
- reply_content = self.do_debug_request(content)
- if inspect.isawaitable(reply_content):
- reply_content = await reply_content
- else:
- warnings.warn(
- _AWAITABLE_MESSAGE.format(
- func_name="do_debug_request", target=self.do_debug_request
- ),
- PendingDeprecationWarning,
- stacklevel=1,
- )
- reply_content = json_clean(reply_content)
- reply_msg = self.session.send(stream, "debug_reply", reply_content, parent, ident)
- self.log.debug("%s", reply_msg)
- def get_process_metric_value(self, process, name, attribute=None):
- """Get the process metric value."""
- try:
- metric_value = getattr(process, name)()
- if attribute is not None: # ... a named tuple
- return getattr(metric_value, attribute)
- # ... or a number
- return metric_value
- # Avoid littering logs with stack traces
- # complaining about dead processes
- except BaseException:
- return 0
- async def usage_request(self, stream, ident, parent):
- """Handle a usage request."""
- if not self.session:
- return
- reply_content = {"hostname": socket.gethostname(), "pid": os.getpid()}
- current_process = psutil.Process()
- all_processes = [current_process, *current_process.children(recursive=True)]
- # Ensure 1) self.processes is updated to only current subprocesses
- # and 2) we reuse processes when possible (needed for accurate CPU)
- self.processes = {
- process.pid: self.processes.get(process.pid, process) # type:ignore[misc,call-overload]
- for process in all_processes
- }
- reply_content["kernel_cpu"] = sum(
- [
- self.get_process_metric_value(process, "cpu_percent", None)
- for process in self.processes.values()
- ]
- )
- mem_info_type = "pss" if hasattr(current_process.memory_full_info(), "pss") else "rss"
- reply_content["kernel_memory"] = sum(
- [
- self.get_process_metric_value(process, "memory_full_info", mem_info_type)
- for process in self.processes.values()
- ]
- )
- cpu_percent = psutil.cpu_percent()
- # https://psutil.readthedocs.io/en/latest/index.html?highlight=cpu#psutil.cpu_percent
- # The first time cpu_percent is called it will return a meaningless 0.0 value which you are supposed to ignore.
- if cpu_percent is not None and cpu_percent != 0.0: # type:ignore[redundant-expr]
- reply_content["host_cpu_percent"] = cpu_percent
- reply_content["cpu_count"] = psutil.cpu_count(logical=True)
- reply_content["host_virtual_memory"] = dict(psutil.virtual_memory()._asdict())
- reply_msg = self.session.send(stream, "usage_reply", reply_content, parent, ident)
- self.log.debug("%s", reply_msg)
- async def do_debug_request(self, msg):
- raise NotImplementedError
- async def create_subshell_request(self, socket, ident, parent) -> None:
- """Handle a create subshell request.
- .. versionadded:: 7
- """
- if not self.session:
- return
- if not self._supports_kernel_subshells:
- self.log.error("Subshells are not supported by this kernel")
- return
- assert threading.current_thread().name == CONTROL_THREAD_NAME
- # This should only be called in the control thread if it exists.
- # Request is passed to shell channel thread to process.
- control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket
- control_socket.send_json({"type": "create"})
- reply = control_socket.recv_json()
- self.session.send(socket, "create_subshell_reply", reply, parent, ident)
- async def delete_subshell_request(self, socket, ident, parent) -> None:
- """Handle a delete subshell request.
- .. versionadded:: 7
- """
- if not self.session:
- return
- if not self._supports_kernel_subshells:
- self.log.error("KERNEL SUBSHELLS NOT SUPPORTED")
- return
- assert threading.current_thread().name == CONTROL_THREAD_NAME
- try:
- content = parent["content"]
- subshell_id = content["subshell_id"]
- except Exception:
- self.log.error("Got bad msg from parent: %s", parent)
- return
- # This should only be called in the control thread if it exists.
- # Request is passed to shell channel thread to process.
- control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket
- control_socket.send_json({"type": "delete", "subshell_id": subshell_id})
- reply = control_socket.recv_json()
- self.session.send(socket, "delete_subshell_reply", reply, parent, ident)
- async def list_subshell_request(self, socket, ident, parent) -> None:
- """Handle a list subshell request.
- .. versionadded:: 7
- """
- if not self.session:
- return
- if not self._supports_kernel_subshells:
- self.log.error("Subshells are not supported by this kernel")
- return
- assert threading.current_thread().name == CONTROL_THREAD_NAME
- # This should only be called in the control thread if it exists.
- # Request is passed to shell channel thread to process.
- control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket
- control_socket.send_json({"type": "list"})
- reply = control_socket.recv_json()
- self.session.send(socket, "list_subshell_reply", reply, parent, ident)
- # ---------------------------------------------------------------------------
- # Protected interface
- # ---------------------------------------------------------------------------
- def _topic(self, topic):
- """prefixed topic for IOPub messages"""
- base = "kernel.%s" % self.ident
- return (f"{base}.{topic}").encode()
- _aborting = Bool(False)
- def _post_dummy_stop_aborting_message(self, subshell_id: str | None) -> None:
- """Post a dummy message to the correct subshell that when handled will unset
- the _aborting flag.
- """
- subshell_manager = self.shell_channel_thread.manager
- socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id)
- assert socket is not None
- msg = b"stop aborting" # Magic string for dummy message.
- socket.send(msg, copy=False)
- def _abort_queues(self, subshell_id: str | None = None):
- # while this flag is true,
- # execute requests will be aborted
- if subshell_id is None:
- self._aborting = True
- else:
- self.shell_channel_thread.manager.set_subshell_aborting(subshell_id, True)
- self.log.info("Aborting queue")
- if self.shell_channel_thread:
- # Only really need to do this if there are messages already queued
- self.shell_channel_thread.io_loop.add_callback(
- self._post_dummy_stop_aborting_message, subshell_id
- )
- return
- # flush streams, so all currently waiting messages
- # are added to the queue
- if self.shell_stream and not self._supports_kernel_subshells:
- self.shell_stream.flush()
- # Callback to signal that we are done aborting
- # dispatch functions _must_ be async
- async def stop_aborting():
- self.log.info("Finishing abort")
- self._aborting = False
- if self.stop_on_error_timeout:
- # if we have a delay, give messages this long to arrive on the queue
- # before we stop aborting requests
- self.io_loop.call_later(self.stop_on_error_timeout, stop_aborting)
- # If we have an eventloop, it may interfere with the call_later above.
- # If the loop has a _schedule_exit method, we call that so the loop exits
- # after stop_on_error_timeout, returning to the main io_loop and letting
- # the call_later fire.
- if self.eventloop is not None and hasattr(self.eventloop, "_schedule_exit"):
- self.eventloop._schedule_exit(self.stop_on_error_timeout + 0.01)
- else:
- self.io_loop.add_callback(stop_aborting)
- def _send_abort_reply(self, stream, msg, idents):
- """Send a reply to an aborted request"""
- if not self.session:
- return
- self.log.info("Aborting %s: %s", msg["header"]["msg_id"], msg["header"]["msg_type"])
- reply_type = msg["header"]["msg_type"].rsplit("_", 1)[0] + "_reply"
- status = {"status": "aborted"}
- md = self.init_metadata(msg)
- md = self.finish_metadata(msg, md, status)
- md.update(status)
- self.session.send(
- stream,
- reply_type,
- metadata=md,
- content=status,
- parent=msg,
- ident=idents,
- )
- def _no_raw_input(self):
- """Raise StdinNotImplementedError if active frontend doesn't support
- stdin."""
- msg = "raw_input was called, but this frontend does not support stdin."
- raise StdinNotImplementedError(msg)
- def getpass(self, prompt="", stream=None):
- """Forward getpass to frontends
- Raises
- ------
- StdinNotImplementedError if active frontend doesn't support stdin.
- """
- if not self._allow_stdin:
- msg = "getpass was called, but this frontend does not support input requests."
- raise StdinNotImplementedError(msg)
- if stream is not None:
- import warnings
- warnings.warn(
- "The `stream` parameter of `getpass.getpass` will have no effect when using ipykernel",
- UserWarning,
- stacklevel=2,
- )
- return self._input_request(
- prompt,
- self._get_shell_context_var(self._shell_parent_ident),
- self.get_parent("shell"),
- password=True,
- )
- def raw_input(self, prompt=""):
- """Forward raw_input to frontends
- Raises
- ------
- StdinNotImplementedError if active frontend doesn't support stdin.
- """
- if not self._allow_stdin:
- msg = "raw_input was called, but this frontend does not support input requests."
- raise StdinNotImplementedError(msg)
- return self._input_request(
- str(prompt),
- self._get_shell_context_var(self._shell_parent_ident),
- self.get_parent("shell"),
- password=False,
- )
- def _input_request(self, prompt, ident, parent, password=False):
- # Flush output before making the request.
- if sys.stdout is not None:
- sys.stdout.flush()
- if sys.stderr is not None:
- sys.stderr.flush()
- # flush the stdin socket, to purge stale replies
- while True:
- try:
- self.stdin_socket.recv_multipart(zmq.NOBLOCK)
- except zmq.ZMQError as e:
- if e.errno == zmq.EAGAIN:
- break
- raise
- # Send the input request.
- assert self.session is not None
- content = json_clean(dict(prompt=prompt, password=password))
- self.session.send(self.stdin_socket, "input_request", content, parent, ident=ident)
- # Await a response.
- while True:
- try:
- # Use polling with select() so KeyboardInterrupts can get
- # through; doing a blocking recv() means stdin reads are
- # uninterruptible on Windows. We need a timeout because
- # zmq.select() is also uninterruptible, but at least this
- # way reads get noticed immediately and KeyboardInterrupts
- # get noticed fairly quickly by human response time standards.
- rlist, _, xlist = zmq.select([self.stdin_socket], [], [self.stdin_socket], 0.01)
- if rlist or xlist:
- ident, reply = self.session.recv(self.stdin_socket)
- if (ident, reply) != (None, None):
- break
- except KeyboardInterrupt:
- # re-raise KeyboardInterrupt, to truncate traceback
- msg = "Interrupted by user"
- raise KeyboardInterrupt(msg) from None
- except Exception:
- self.log.warning("Invalid Message:", exc_info=True)
- try:
- value = reply["content"]["value"] # type:ignore[index]
- except Exception:
- self.log.error("Bad input_reply: %s", parent)
- value = ""
- if value == "\x04":
- # EOF
- raise EOFError
- return value
- def _signal_children(self, signum):
- """
- Send a signal to all our children
- Like `killpg`, but does not include the current process
- (or possible parents).
- """
- sig_rep = f"{Signals(signum)!r}"
- for p in self._process_children():
- self.log.debug("Sending %s to subprocess %s", sig_rep, p)
- try:
- if signum == SIGTERM:
- p.terminate()
- elif signum == SIGKILL:
- p.kill()
- else:
- p.send_signal(signum)
- except psutil.NoSuchProcess:
- pass
- def _process_children(self):
- """Retrieve child processes in the kernel's process group
- Avoids:
- - including parents and self with killpg
- - including all children that may have forked-off a new group
- """
- kernel_process = psutil.Process()
- all_children = kernel_process.children(recursive=True)
- if os.name == "nt":
- return all_children
- kernel_pgid = os.getpgrp()
- process_group_children = []
- for child in all_children:
- try:
- child_pgid = os.getpgid(child.pid)
- except OSError:
- pass
- else:
- if child_pgid == kernel_pgid:
- process_group_children.append(child)
- return process_group_children
- async def _progressively_terminate_all_children(self):
- sleeps = (0.01, 0.03, 0.1, 0.3, 1, 3, 10)
- if not self._process_children():
- self.log.debug("Kernel has no children.")
- return
- for signum in (SIGTERM, SIGKILL):
- for delay in sleeps:
- children = self._process_children()
- if not children:
- self.log.debug("No more children, continuing shutdown routine.")
- return
- # signals only children, not current process
- self._signal_children(signum)
- self.log.debug(
- "Will sleep %s sec before checking for children and retrying. %s",
- delay,
- children,
- )
- await asyncio.sleep(delay)
- async def _at_shutdown(self):
- """Actions taken at shutdown by the kernel, called by python's atexit."""
- try:
- await self._progressively_terminate_all_children()
- except Exception as e:
- self.log.exception("Exception during subprocesses termination %s", e)
- finally:
- if self._shutdown_message is not None and self.session:
- self.session.send(
- self.iopub_socket,
- self._shutdown_message,
- ident=self._topic("shutdown"),
- )
- self.log.debug("%s", self._shutdown_message)
- if self.control_stream:
- self.control_stream.flush(zmq.POLLOUT)
- @property
- def _supports_kernel_subshells(self):
- return self.shell_channel_thread is not None
|