abc.py 921 B

1234567891011121314151617181920212223242526272829
  1. from abc import ABC, abstractmethod
  2. from typing import Any
  3. class KernelWebsocketConnectionABC(ABC):
  4. """
  5. This class defines a minimal interface that should
  6. be used to bridge the connection between Jupyter
  7. Server's websocket API and a kernel's ZMQ socket
  8. interface.
  9. """
  10. websocket_handler: Any
  11. @abstractmethod
  12. async def connect(self):
  13. """Connect the kernel websocket to the kernel ZMQ connections"""
  14. @abstractmethod
  15. async def disconnect(self):
  16. """Disconnect the kernel websocket from the kernel ZMQ connections"""
  17. @abstractmethod
  18. def handle_incoming_message(self, incoming_msg: str) -> None:
  19. """Broker the incoming websocket message to the appropriate ZMQ channel."""
  20. @abstractmethod
  21. def handle_outgoing_message(self, stream: str, outgoing_msg: list[Any]) -> None:
  22. """Broker outgoing ZMQ messages to the kernel websocket."""