| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- """Comm package.
- Copyright (c) IPython Development Team.
- Distributed under the terms of the Modified BSD License.
- This package provides a way to register a Kernel Comm implementation, as per
- the Jupyter kernel protocol.
- It also provides a base Comm implementation and a default CommManager for the IPython case.
- """
- from __future__ import annotations
- from typing import Any
- from .base_comm import BaseComm, BuffersType, CommManager, MaybeDict
- __version__ = "0.2.3"
- __all__ = [
- "__version__",
- "create_comm",
- "get_comm_manager",
- ]
- _comm_manager = None
- class DummyComm(BaseComm):
- def publish_msg(
- self,
- msg_type: str,
- data: MaybeDict = None,
- metadata: MaybeDict = None,
- buffers: BuffersType = None,
- **keys: Any,
- ) -> None:
- pass
- def _create_comm(*args: Any, **kwargs: Any) -> BaseComm:
- """Create a Comm.
- This method is intended to be replaced, so that it returns your Comm instance.
- """
- return DummyComm(*args, **kwargs)
- def _get_comm_manager() -> CommManager:
- """Get the current Comm manager, creates one if there is none.
- This method is intended to be replaced if needed (if you want to manage multiple CommManagers).
- """
- global _comm_manager # noqa: PLW0603
- if _comm_manager is None:
- _comm_manager = CommManager()
- return _comm_manager
- create_comm = _create_comm
- get_comm_manager = _get_comm_manager
|