output.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. # Copyright (c) Microsoft Corporation. All rights reserved.
  2. # Licensed under the MIT License. See LICENSE in the project root
  3. # for license information.
  4. import codecs
  5. import os
  6. import threading
  7. from debugpy import launcher
  8. from debugpy.common import log
  9. class CaptureOutput(object):
  10. """Captures output from the specified file descriptor, and tees it into another
  11. file descriptor while generating DAP "output" events for it.
  12. """
  13. instances = {}
  14. """Keys are output categories, values are CaptureOutput instances."""
  15. def __init__(self, whose, category, fd, stream):
  16. assert category not in self.instances
  17. self.instances[category] = self
  18. log.info("Capturing {0} of {1}.", category, whose)
  19. self.category = category
  20. self._whose = whose
  21. self._fd = fd
  22. self._decoder = codecs.getincrementaldecoder("utf-8")(errors="surrogateescape")
  23. if stream is None:
  24. # Can happen if running under pythonw.exe.
  25. self._stream = None
  26. else:
  27. self._stream = stream.buffer
  28. encoding = stream.encoding
  29. if encoding is None or encoding == "cp65001":
  30. encoding = "utf-8"
  31. try:
  32. self._encode = codecs.getencoder(encoding)
  33. except Exception:
  34. log.swallow_exception(
  35. "Unsupported {0} encoding {1!r}; falling back to UTF-8.",
  36. category,
  37. encoding,
  38. level="warning",
  39. )
  40. self._encode = codecs.getencoder("utf-8")
  41. else:
  42. log.info("Using encoding {0!r} for {1}", encoding, category)
  43. self._worker_thread = threading.Thread(target=self._worker, name=category)
  44. self._worker_thread.start()
  45. def __del__(self):
  46. fd = self._fd
  47. if fd is not None:
  48. try:
  49. os.close(fd)
  50. except Exception:
  51. pass
  52. def _worker(self):
  53. while self._fd is not None:
  54. try:
  55. s = os.read(self._fd, 0x1000)
  56. except Exception:
  57. break
  58. if not len(s):
  59. break
  60. self._process_chunk(s)
  61. # Flush any remaining data in the incremental decoder.
  62. self._process_chunk(b"", final=True)
  63. def _process_chunk(self, s, final=False):
  64. s = self._decoder.decode(s, final=final)
  65. if len(s) == 0:
  66. return
  67. try:
  68. launcher.channel.send_event(
  69. "output", {"category": self.category, "output": s.replace("\r\n", "\n")}
  70. )
  71. except Exception:
  72. pass # channel to adapter is already closed
  73. if self._stream is None:
  74. return
  75. try:
  76. s, _ = self._encode(s, "surrogateescape")
  77. size = len(s)
  78. i = 0
  79. while i < size:
  80. written = self._stream.write(s[i:])
  81. self._stream.flush()
  82. if written == 0:
  83. # This means that the output stream was closed from the other end.
  84. # Do the same to the debuggee, so that it knows as well.
  85. os.close(self._fd)
  86. self._fd = None
  87. break
  88. i += written
  89. except Exception:
  90. log.swallow_exception("Error printing {0!r} to {1}", s, self.category)
  91. def wait_for_remaining_output():
  92. """Waits for all remaining output to be captured and propagated."""
  93. for category, instance in CaptureOutput.instances.items():
  94. log.info("Waiting for remaining {0} of {1}.", category, instance._whose)
  95. instance._worker_thread.join()