| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- # Copyright (c) Microsoft Corporation. All rights reserved.
- # Licensed under the MIT License. See LICENSE in the project root
- # for license information.
- import codecs
- import os
- import threading
- from debugpy import launcher
- from debugpy.common import log
- class CaptureOutput(object):
- """Captures output from the specified file descriptor, and tees it into another
- file descriptor while generating DAP "output" events for it.
- """
- instances = {}
- """Keys are output categories, values are CaptureOutput instances."""
- def __init__(self, whose, category, fd, stream):
- assert category not in self.instances
- self.instances[category] = self
- log.info("Capturing {0} of {1}.", category, whose)
- self.category = category
- self._whose = whose
- self._fd = fd
- self._decoder = codecs.getincrementaldecoder("utf-8")(errors="surrogateescape")
- if stream is None:
- # Can happen if running under pythonw.exe.
- self._stream = None
- else:
- self._stream = stream.buffer
- encoding = stream.encoding
- if encoding is None or encoding == "cp65001":
- encoding = "utf-8"
- try:
- self._encode = codecs.getencoder(encoding)
- except Exception:
- log.swallow_exception(
- "Unsupported {0} encoding {1!r}; falling back to UTF-8.",
- category,
- encoding,
- level="warning",
- )
- self._encode = codecs.getencoder("utf-8")
- else:
- log.info("Using encoding {0!r} for {1}", encoding, category)
- self._worker_thread = threading.Thread(target=self._worker, name=category)
- self._worker_thread.start()
- def __del__(self):
- fd = self._fd
- if fd is not None:
- try:
- os.close(fd)
- except Exception:
- pass
- def _worker(self):
- while self._fd is not None:
- try:
- s = os.read(self._fd, 0x1000)
- except Exception:
- break
- if not len(s):
- break
- self._process_chunk(s)
- # Flush any remaining data in the incremental decoder.
- self._process_chunk(b"", final=True)
- def _process_chunk(self, s, final=False):
- s = self._decoder.decode(s, final=final)
- if len(s) == 0:
- return
- try:
- launcher.channel.send_event(
- "output", {"category": self.category, "output": s.replace("\r\n", "\n")}
- )
- except Exception:
- pass # channel to adapter is already closed
- if self._stream is None:
- return
- try:
- s, _ = self._encode(s, "surrogateescape")
- size = len(s)
- i = 0
- while i < size:
- written = self._stream.write(s[i:])
- self._stream.flush()
- if written == 0:
- # This means that the output stream was closed from the other end.
- # Do the same to the debuggee, so that it knows as well.
- os.close(self._fd)
- self._fd = None
- break
- i += written
- except Exception:
- log.swallow_exception("Error printing {0!r} to {1}", s, self.category)
- def wait_for_remaining_output():
- """Waits for all remaining output to be captured and propagated."""
- for category, instance in CaptureOutput.instances.items():
- log.info("Waiting for remaining {0} of {1}.", category, instance._whose)
- instance._worker_thread.join()
|