capture.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # encoding: utf-8
  2. """IO capturing utilities."""
  3. # Copyright (c) IPython Development Team.
  4. # Distributed under the terms of the Modified BSD License.
  5. import sys
  6. from io import StringIO
  7. from types import TracebackType
  8. from typing import Any, List, Optional, Type
  9. #-----------------------------------------------------------------------------
  10. # Classes and functions
  11. #-----------------------------------------------------------------------------
  12. class RichOutput:
  13. def __init__(self, data=None, metadata=None, transient=None, update=False):
  14. self.data = data or {}
  15. self.metadata = metadata or {}
  16. self.transient = transient or {}
  17. self.update = update
  18. def display(self):
  19. from IPython.display import publish_display_data
  20. publish_display_data(data=self.data, metadata=self.metadata,
  21. transient=self.transient, update=self.update)
  22. def _repr_mime_(self, mime):
  23. if mime not in self.data:
  24. return
  25. data = self.data[mime]
  26. if mime in self.metadata:
  27. return data, self.metadata[mime]
  28. else:
  29. return data
  30. def _repr_mimebundle_(self, include=None, exclude=None):
  31. return self.data, self.metadata
  32. def _repr_html_(self):
  33. return self._repr_mime_("text/html")
  34. def _repr_latex_(self):
  35. return self._repr_mime_("text/latex")
  36. def _repr_json_(self):
  37. return self._repr_mime_("application/json")
  38. def _repr_javascript_(self):
  39. return self._repr_mime_("application/javascript")
  40. def _repr_png_(self):
  41. return self._repr_mime_("image/png")
  42. def _repr_jpeg_(self):
  43. return self._repr_mime_("image/jpeg")
  44. def _repr_svg_(self):
  45. return self._repr_mime_("image/svg+xml")
  46. class CapturedIO:
  47. """Simple object for containing captured stdout/err and rich display StringIO objects
  48. Each instance `c` has three attributes:
  49. - ``c.stdout`` : standard output as a string
  50. - ``c.stderr`` : standard error as a string
  51. - ``c.outputs``: a list of rich display outputs
  52. Additionally, there's a ``c.show()`` method which will print all of the
  53. above in the same order, and can be invoked simply via ``c()``.
  54. """
  55. def __init__(self, stdout: StringIO, stderr: StringIO, outputs: Optional[List[Any]]=None):
  56. self._stdout = stdout
  57. self._stderr = stderr
  58. if outputs is None:
  59. outputs = []
  60. self._outputs = outputs
  61. def __str__(self):
  62. return self.stdout
  63. @property
  64. def stdout(self) -> str:
  65. "Captured standard output"
  66. if not self._stdout:
  67. return ''
  68. return self._stdout.getvalue()
  69. @property
  70. def stderr(self) -> str:
  71. "Captured standard error"
  72. if not self._stderr:
  73. return ''
  74. return self._stderr.getvalue()
  75. @property
  76. def outputs(self):
  77. """A list of the captured rich display outputs, if any.
  78. If you have a CapturedIO object ``c``, these can be displayed in IPython
  79. using::
  80. from IPython.display import display
  81. for o in c.outputs:
  82. display(o)
  83. """
  84. return [ RichOutput(**kargs) for kargs in self._outputs ]
  85. def show(self):
  86. """write my output to sys.stdout/err as appropriate"""
  87. sys.stdout.write(self.stdout)
  88. sys.stderr.write(self.stderr)
  89. sys.stdout.flush()
  90. sys.stderr.flush()
  91. for kargs in self._outputs:
  92. RichOutput(**kargs).display()
  93. __call__ = show
  94. class capture_output:
  95. """context manager for capturing stdout/err"""
  96. stdout = True
  97. stderr = True
  98. display = True
  99. def __init__(self, stdout: bool=True, stderr: bool=True, display: bool=True):
  100. self.stdout = stdout
  101. self.stderr = stderr
  102. self.display = display
  103. self.shell = None
  104. def __enter__(self) -> CapturedIO:
  105. from IPython.core.getipython import get_ipython
  106. from IPython.core.displaypub import CapturingDisplayPublisher
  107. from IPython.core.displayhook import CapturingDisplayHook
  108. self.sys_stdout = sys.stdout
  109. self.sys_stderr = sys.stderr
  110. if self.display:
  111. self.shell = get_ipython()
  112. if self.shell is None:
  113. self.save_display_pub = None
  114. self.display = False
  115. stdout = stderr = outputs = None
  116. if self.stdout:
  117. stdout = sys.stdout = StringIO()
  118. if self.stderr:
  119. stderr = sys.stderr = StringIO()
  120. if self.display:
  121. self.save_display_pub = self.shell.display_pub
  122. self.shell.display_pub = CapturingDisplayPublisher()
  123. outputs = self.shell.display_pub.outputs
  124. self.save_display_hook = sys.displayhook
  125. sys.displayhook = CapturingDisplayHook(shell=self.shell,
  126. outputs=outputs)
  127. return CapturedIO(stdout, stderr, outputs)
  128. def __exit__(self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType]):
  129. sys.stdout = self.sys_stdout
  130. sys.stderr = self.sys_stderr
  131. if self.display and self.shell:
  132. self.shell.display_pub = self.save_display_pub
  133. sys.displayhook = self.save_display_hook