output_widget.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. """An output widget mimic."""
  2. from __future__ import annotations
  3. from typing import Any
  4. from jupyter_client.client import KernelClient
  5. from nbformat import NotebookNode
  6. from nbformat.v4 import output_from_msg
  7. from .jsonutil import json_clean
  8. class OutputWidget:
  9. """This class mimics a front end output widget"""
  10. def __init__(
  11. self, comm_id: str, state: dict[str, Any], kernel_client: KernelClient, executor: Any
  12. ) -> None:
  13. """Initialize the widget."""
  14. self.comm_id: str = comm_id
  15. self.state: dict[str, Any] = state
  16. self.kernel_client: KernelClient = kernel_client
  17. self.executor = executor
  18. self.topic: bytes = ("comm-%s" % self.comm_id).encode("ascii")
  19. self.outputs: list[NotebookNode] = self.state["outputs"]
  20. self.clear_before_next_output: bool = False
  21. def clear_output(self, outs: list[NotebookNode], msg: dict[str, Any], cell_index: int) -> None:
  22. """Clear output."""
  23. self.parent_header = msg["parent_header"]
  24. content = msg["content"]
  25. if content.get("wait"):
  26. self.clear_before_next_output = True
  27. else:
  28. self.outputs = []
  29. # sync back the state to the kernel
  30. self.sync_state()
  31. if hasattr(self.executor, "widget_state"):
  32. # sync the state to the nbconvert state as well, since that is used for testing
  33. self.executor.widget_state[self.comm_id]["outputs"] = self.outputs
  34. def sync_state(self) -> None:
  35. """Sync state."""
  36. state = {"outputs": self.outputs}
  37. msg = {"method": "update", "state": state, "buffer_paths": []}
  38. self.send(msg)
  39. def _publish_msg(
  40. self,
  41. msg_type: str,
  42. data: dict[str, Any] | None = None,
  43. metadata: dict[str, Any] | None = None,
  44. buffers: list[Any] | None = None,
  45. **keys: Any,
  46. ) -> None:
  47. """Helper for sending a comm message on IOPub"""
  48. data = {} if data is None else data
  49. metadata = {} if metadata is None else metadata
  50. content = json_clean(dict(data=data, comm_id=self.comm_id, **keys))
  51. msg = self.kernel_client.session.msg(
  52. msg_type, content=content, parent=self.parent_header, metadata=metadata
  53. )
  54. self.kernel_client.shell_channel.send(msg)
  55. def send(
  56. self,
  57. data: dict[str, Any] | None = None,
  58. metadata: dict[str, Any] | None = None,
  59. buffers: list[Any] | None = None,
  60. ) -> None:
  61. """Send a comm message."""
  62. self._publish_msg("comm_msg", data=data, metadata=metadata, buffers=buffers)
  63. def output(
  64. self, outs: list[NotebookNode], msg: dict[str, Any], display_id: str | None, cell_index: int
  65. ) -> None:
  66. """Handle output."""
  67. if self.clear_before_next_output:
  68. self.outputs = []
  69. self.clear_before_next_output = False
  70. self.parent_header = msg["parent_header"]
  71. output = output_from_msg(msg) # type:ignore[no-untyped-call]
  72. if self.outputs:
  73. # try to coalesce/merge output text
  74. last_output = self.outputs[-1]
  75. if (
  76. last_output["output_type"] == "stream"
  77. and output["output_type"] == "stream"
  78. and last_output["name"] == output["name"]
  79. ):
  80. last_output["text"] += output["text"]
  81. else:
  82. self.outputs.append(output)
  83. else:
  84. self.outputs.append(output)
  85. self.sync_state()
  86. if hasattr(self.executor, "widget_state"):
  87. # sync the state to the nbconvert state as well, since that is used for testing
  88. self.executor.widget_state[self.comm_id]["outputs"] = self.outputs
  89. def set_state(self, state: dict[str, Any]) -> None:
  90. """Set the state."""
  91. if "msg_id" in state:
  92. msg_id = state.get("msg_id")
  93. if msg_id:
  94. self.executor.register_output_hook(msg_id, self)
  95. self.msg_id = msg_id
  96. else:
  97. self.executor.remove_output_hook(self.msg_id, self)
  98. self.msg_id = msg_id
  99. def handle_msg(self, msg: dict[str, Any]) -> None:
  100. """Handle a message."""
  101. content = msg["content"]
  102. comm_id = content["comm_id"]
  103. if comm_id != self.comm_id:
  104. raise AssertionError("Mismatched comm id")
  105. data = content["data"]
  106. if "state" in data:
  107. self.set_state(data["state"])