| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- """An output widget mimic."""
- from __future__ import annotations
- from typing import Any
- from jupyter_client.client import KernelClient
- from nbformat import NotebookNode
- from nbformat.v4 import output_from_msg
- from .jsonutil import json_clean
- class OutputWidget:
- """This class mimics a front end output widget"""
- def __init__(
- self, comm_id: str, state: dict[str, Any], kernel_client: KernelClient, executor: Any
- ) -> None:
- """Initialize the widget."""
- self.comm_id: str = comm_id
- self.state: dict[str, Any] = state
- self.kernel_client: KernelClient = kernel_client
- self.executor = executor
- self.topic: bytes = ("comm-%s" % self.comm_id).encode("ascii")
- self.outputs: list[NotebookNode] = self.state["outputs"]
- self.clear_before_next_output: bool = False
- def clear_output(self, outs: list[NotebookNode], msg: dict[str, Any], cell_index: int) -> None:
- """Clear output."""
- self.parent_header = msg["parent_header"]
- content = msg["content"]
- if content.get("wait"):
- self.clear_before_next_output = True
- else:
- self.outputs = []
- # sync back the state to the kernel
- self.sync_state()
- if hasattr(self.executor, "widget_state"):
- # sync the state to the nbconvert state as well, since that is used for testing
- self.executor.widget_state[self.comm_id]["outputs"] = self.outputs
- def sync_state(self) -> None:
- """Sync state."""
- state = {"outputs": self.outputs}
- msg = {"method": "update", "state": state, "buffer_paths": []}
- self.send(msg)
- def _publish_msg(
- self,
- msg_type: str,
- data: dict[str, Any] | None = None,
- metadata: dict[str, Any] | None = None,
- buffers: list[Any] | None = None,
- **keys: Any,
- ) -> None:
- """Helper for sending a comm message on IOPub"""
- data = {} if data is None else data
- metadata = {} if metadata is None else metadata
- content = json_clean(dict(data=data, comm_id=self.comm_id, **keys))
- msg = self.kernel_client.session.msg(
- msg_type, content=content, parent=self.parent_header, metadata=metadata
- )
- self.kernel_client.shell_channel.send(msg)
- def send(
- self,
- data: dict[str, Any] | None = None,
- metadata: dict[str, Any] | None = None,
- buffers: list[Any] | None = None,
- ) -> None:
- """Send a comm message."""
- self._publish_msg("comm_msg", data=data, metadata=metadata, buffers=buffers)
- def output(
- self, outs: list[NotebookNode], msg: dict[str, Any], display_id: str | None, cell_index: int
- ) -> None:
- """Handle output."""
- if self.clear_before_next_output:
- self.outputs = []
- self.clear_before_next_output = False
- self.parent_header = msg["parent_header"]
- output = output_from_msg(msg) # type:ignore[no-untyped-call]
- if self.outputs:
- # try to coalesce/merge output text
- last_output = self.outputs[-1]
- if (
- last_output["output_type"] == "stream"
- and output["output_type"] == "stream"
- and last_output["name"] == output["name"]
- ):
- last_output["text"] += output["text"]
- else:
- self.outputs.append(output)
- else:
- self.outputs.append(output)
- self.sync_state()
- if hasattr(self.executor, "widget_state"):
- # sync the state to the nbconvert state as well, since that is used for testing
- self.executor.widget_state[self.comm_id]["outputs"] = self.outputs
- def set_state(self, state: dict[str, Any]) -> None:
- """Set the state."""
- if "msg_id" in state:
- msg_id = state.get("msg_id")
- if msg_id:
- self.executor.register_output_hook(msg_id, self)
- self.msg_id = msg_id
- else:
- self.executor.remove_output_hook(self.msg_id, self)
- self.msg_id = msg_id
- def handle_msg(self, msg: dict[str, Any]) -> None:
- """Handle a message."""
- content = msg["content"]
- comm_id = content["comm_id"]
- if comm_id != self.comm_id:
- raise AssertionError("Mismatched comm id")
- data = content["data"]
- if "state" in data:
- self.set_state(data["state"])
|