base.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. """Kernel connection helpers."""
  2. import json
  3. import struct
  4. from typing import Any
  5. from jupyter_client.session import Session
  6. from tornado.websocket import WebSocketHandler
  7. from traitlets import Float, Instance, Unicode, default
  8. from traitlets.config import LoggingConfigurable
  9. try:
  10. from jupyter_client.jsonutil import json_default
  11. except ImportError:
  12. from jupyter_client.jsonutil import date_default as json_default
  13. from jupyter_client.jsonutil import extract_dates
  14. from jupyter_server.transutils import _i18n
  15. from .abc import KernelWebsocketConnectionABC
  16. def serialize_binary_message(msg):
  17. """serialize a message as a binary blob
  18. Header:
  19. 4 bytes: number of msg parts (nbufs) as 32b int
  20. 4 * nbufs bytes: offset for each buffer as integer as 32b int
  21. Offsets are from the start of the buffer, including the header.
  22. Returns
  23. -------
  24. The message serialized to bytes.
  25. """
  26. # don't modify msg or buffer list in-place
  27. msg = msg.copy()
  28. buffers = list(msg.pop("buffers"))
  29. bmsg = json.dumps(msg, default=json_default).encode("utf8")
  30. buffers.insert(0, bmsg)
  31. nbufs = len(buffers)
  32. offsets = [4 * (nbufs + 1)]
  33. for buf in buffers[:-1]:
  34. offsets.append(offsets[-1] + len(buf))
  35. offsets_buf = struct.pack("!" + "I" * (nbufs + 1), nbufs, *offsets)
  36. buffers.insert(0, offsets_buf)
  37. return b"".join(buffers)
  38. def deserialize_binary_message(bmsg):
  39. """deserialize a message from a binary blog
  40. Header:
  41. 4 bytes: number of msg parts (nbufs) as 32b int
  42. 4 * nbufs bytes: offset for each buffer as integer as 32b int
  43. Offsets are from the start of the buffer, including the header.
  44. Returns
  45. -------
  46. message dictionary
  47. """
  48. nbufs = struct.unpack("!i", bmsg[:4])[0]
  49. offsets = list(struct.unpack("!" + "I" * nbufs, bmsg[4 : 4 * (nbufs + 1)]))
  50. offsets.append(None)
  51. bufs = []
  52. for start, stop in zip(offsets[:-1], offsets[1:]):
  53. bufs.append(bmsg[start:stop])
  54. msg = json.loads(bufs[0].decode("utf8"))
  55. msg["header"] = extract_dates(msg["header"])
  56. msg["parent_header"] = extract_dates(msg["parent_header"])
  57. msg["buffers"] = bufs[1:]
  58. return msg
  59. def serialize_msg_to_ws_v1(msg_or_list, channel, pack=None):
  60. """Serialize a message using the v1 protocol."""
  61. if pack:
  62. msg_list = [
  63. pack(msg_or_list["header"]),
  64. pack(msg_or_list["parent_header"]),
  65. pack(msg_or_list["metadata"]),
  66. pack(msg_or_list["content"]),
  67. ]
  68. else:
  69. msg_list = msg_or_list
  70. channel = channel.encode("utf-8")
  71. offsets: list[Any] = []
  72. offsets.append(8 * (1 + 1 + len(msg_list) + 1))
  73. offsets.append(len(channel) + offsets[-1])
  74. for msg in msg_list:
  75. offsets.append(len(msg) + offsets[-1])
  76. offset_number = len(offsets).to_bytes(8, byteorder="little")
  77. offsets = [offset.to_bytes(8, byteorder="little") for offset in offsets]
  78. bin_msg = b"".join([offset_number, *offsets, channel, *msg_list])
  79. return bin_msg
  80. def deserialize_msg_from_ws_v1(ws_msg):
  81. """Deserialize a message using the v1 protocol."""
  82. offset_number = int.from_bytes(ws_msg[:8], "little")
  83. offsets = [
  84. int.from_bytes(ws_msg[8 * (i + 1) : 8 * (i + 2)], "little") for i in range(offset_number)
  85. ]
  86. channel = ws_msg[offsets[0] : offsets[1]].decode("utf-8")
  87. msg_list = [ws_msg[offsets[i] : offsets[i + 1]] for i in range(1, offset_number - 1)]
  88. return channel, msg_list
  89. class BaseKernelWebsocketConnection(LoggingConfigurable):
  90. """A configurable base class for connecting Kernel WebSockets to ZMQ sockets."""
  91. kernel_ws_protocol = Unicode(
  92. None,
  93. allow_none=True,
  94. config=True,
  95. help=_i18n(
  96. "Preferred kernel message protocol over websocket to use (default: None). "
  97. "If an empty string is passed, select the legacy protocol. If None, "
  98. "the selected protocol will depend on what the front-end supports "
  99. "(usually the most recent protocol supported by the back-end and the "
  100. "front-end)."
  101. ),
  102. )
  103. @property
  104. def kernel_manager(self):
  105. """The kernel manager."""
  106. return self.parent
  107. @property
  108. def multi_kernel_manager(self):
  109. """The multi kernel manager."""
  110. return self.kernel_manager.parent
  111. @property
  112. def kernel_id(self):
  113. """The kernel id."""
  114. return self.kernel_manager.kernel_id
  115. @property
  116. def session_id(self):
  117. """The session id."""
  118. return self.session.session
  119. kernel_info_timeout = Float()
  120. @default("kernel_info_timeout")
  121. def _default_kernel_info_timeout(self):
  122. return self.multi_kernel_manager.kernel_info_timeout
  123. session = Instance(klass=Session, config=True)
  124. @default("session")
  125. def _default_session(self):
  126. return Session(config=self.config)
  127. websocket_handler = Instance(WebSocketHandler)
  128. async def connect(self):
  129. """Handle a connect."""
  130. raise NotImplementedError
  131. async def disconnect(self):
  132. """Handle a disconnect."""
  133. raise NotImplementedError
  134. def handle_incoming_message(self, incoming_msg: str) -> None:
  135. """Handle an incoming message."""
  136. raise NotImplementedError
  137. def handle_outgoing_message(self, stream: str, outgoing_msg: list[Any]) -> None:
  138. """Handle an outgoing message."""
  139. raise NotImplementedError
  140. KernelWebsocketConnectionABC.register(BaseKernelWebsocketConnection)