rwbase.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. """Base classes and utilities for readers and writers."""
  2. # Copyright (c) IPython Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. from __future__ import annotations
  5. def _is_json_mime(mime):
  6. """Is a key a JSON mime-type that should be left alone?"""
  7. return mime == "application/json" or (
  8. mime.startswith("application/") and mime.endswith("+json")
  9. )
  10. def _rejoin_mimebundle(data):
  11. """Rejoin the multi-line string fields in a mimebundle (in-place)"""
  12. for key, value in list(data.items()):
  13. if (
  14. not _is_json_mime(key)
  15. and isinstance(value, list)
  16. and all(isinstance(line, str) for line in value)
  17. ):
  18. data[key] = "".join(value)
  19. return data
  20. def rejoin_lines(nb):
  21. """rejoin multiline text into strings
  22. For reversing effects of ``split_lines(nb)``.
  23. This only rejoins lines that have been split, so if text objects were not split
  24. they will pass through unchanged.
  25. Used when reading JSON files that may have been passed through split_lines.
  26. """
  27. for cell in nb.cells:
  28. if "source" in cell and isinstance(cell.source, list):
  29. cell.source = "".join(cell.source)
  30. attachments = cell.get("attachments", {})
  31. for _, attachment in attachments.items():
  32. _rejoin_mimebundle(attachment)
  33. if cell.get("cell_type", None) == "code":
  34. for output in cell.get("outputs", []):
  35. output_type = output.get("output_type", "")
  36. if output_type in {"execute_result", "display_data"}:
  37. _rejoin_mimebundle(output.get("data", {}))
  38. elif output_type and isinstance(output.get("text", ""), list):
  39. output.text = "".join(output.text)
  40. return nb
  41. _non_text_split_mimes = {
  42. "application/javascript",
  43. "image/svg+xml",
  44. }
  45. def _split_mimebundle(data):
  46. """Split multi-line string fields in a mimebundle (in-place)"""
  47. for key, value in list(data.items()):
  48. if isinstance(value, str) and (key.startswith("text/") or key in _non_text_split_mimes):
  49. data[key] = value.splitlines(True)
  50. return data
  51. def split_lines(nb):
  52. """split likely multiline text into lists of strings
  53. For file output more friendly to line-based VCS. ``rejoin_lines(nb)`` will
  54. reverse the effects of ``split_lines(nb)``.
  55. Used when writing JSON files.
  56. """
  57. for cell in nb.cells:
  58. source = cell.get("source", None)
  59. if isinstance(source, str):
  60. cell["source"] = source.splitlines(True)
  61. attachments = cell.get("attachments", {})
  62. for _, attachment in attachments.items():
  63. _split_mimebundle(attachment)
  64. if cell.cell_type == "code":
  65. for output in cell.outputs:
  66. if output.output_type in {"execute_result", "display_data"}:
  67. _split_mimebundle(output.get("data", {}))
  68. elif output.output_type == "stream" and isinstance(output.text, str):
  69. output.text = output.text.splitlines(True)
  70. return nb
  71. def strip_transient(nb):
  72. """Strip transient values that shouldn't be stored in files.
  73. This should be called in *both* read and write.
  74. """
  75. nb.metadata.pop("orig_nbformat", None)
  76. nb.metadata.pop("orig_nbformat_minor", None)
  77. nb.metadata.pop("signature", None)
  78. for cell in nb.cells:
  79. cell.metadata.pop("trusted", None)
  80. return nb
  81. class NotebookReader:
  82. """A class for reading notebooks."""
  83. def reads(self, s, **kwargs):
  84. """Read a notebook from a string."""
  85. msg = "reads must be implemented in a subclass"
  86. raise NotImplementedError(msg)
  87. def read(self, fp, **kwargs):
  88. """Read a notebook from a file like object"""
  89. nbs = fp.read()
  90. return self.reads(nbs, **kwargs)
  91. class NotebookWriter:
  92. """A class for writing notebooks."""
  93. def writes(self, nb, **kwargs):
  94. """Write a notebook to a string."""
  95. msg = "writes must be implemented in a subclass"
  96. raise NotImplementedError(msg)
  97. def write(self, nb, fp, **kwargs):
  98. """Write a notebook to a file like object"""
  99. nbs = self.writes(nb, **kwargs)
  100. return fp.write(nbs)