stdio.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. """ Language Server stdio-mode readers
  2. Parts of this code are derived from:
  3. > https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/pyls_jsonrpc/streams.py#L83 # noqa
  4. > https://github.com/palantir/python-jsonrpc-server/blob/45ed1931e4b2e5100cc61b3992c16d6f68af2e80/pyls_jsonrpc/streams.py # noqa
  5. > > MIT License https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/LICENSE
  6. > > Copyright 2018 Palantir Technologies, Inc.
  7. """
  8. # pylint: disable=broad-except
  9. import asyncio
  10. import io
  11. import os
  12. from concurrent.futures import ThreadPoolExecutor
  13. from typing import List, Optional, Text
  14. from tornado.concurrent import run_on_executor
  15. from tornado.gen import convert_yielded
  16. from tornado.httputil import HTTPHeaders
  17. from tornado.ioloop import IOLoop
  18. from tornado.queues import Queue
  19. from traitlets import Float, Instance, default
  20. from traitlets.config import LoggingConfigurable
  21. from .non_blocking import make_non_blocking
  22. class LspStdIoBase(LoggingConfigurable):
  23. """Non-blocking, queued base for communicating with stdio Language Servers"""
  24. executor = None
  25. stream = Instance( # type:ignore[assignment]
  26. io.RawIOBase, help="the stream to read/write"
  27. ) # type: io.RawIOBase
  28. queue = Instance(Queue, help="queue to get/put")
  29. def __repr__(self): # pragma: no cover
  30. return "<{}(parent={})>".format(self.__class__.__name__, self.parent)
  31. def __init__(self, **kwargs):
  32. super().__init__(**kwargs)
  33. self.log.debug("%s initialized", self)
  34. self.executor = ThreadPoolExecutor(max_workers=1)
  35. def close(self):
  36. self.stream.close()
  37. self.log.debug("%s closed", self)
  38. class LspStdIoReader(LspStdIoBase):
  39. """Language Server stdio Reader
  40. Because non-blocking (but still synchronous) IO is used, rudimentary
  41. exponential backoff is used.
  42. """
  43. max_wait = Float(help="maximum time to wait on idle stream").tag(config=True)
  44. min_wait = Float(0.05, help="minimum time to wait on idle stream").tag(config=True)
  45. next_wait = Float(0.05, help="next time to wait on idle stream").tag(config=True)
  46. @default("max_wait")
  47. def _default_max_wait(self):
  48. return 0.1 if os.name == "nt" else self.min_wait * 2
  49. async def sleep(self):
  50. """Simple exponential backoff for sleeping"""
  51. if self.stream.closed: # pragma: no cover
  52. return
  53. self.next_wait = min(self.next_wait * 2, self.max_wait)
  54. try:
  55. await asyncio.sleep(self.next_wait)
  56. except Exception: # pragma: no cover
  57. pass
  58. def wake(self):
  59. """Reset the wait time"""
  60. self.wait = self.min_wait
  61. async def read(self) -> None:
  62. """Read from a Language Server until it is closed"""
  63. make_non_blocking(self.stream)
  64. while not self.stream.closed:
  65. message = None
  66. try:
  67. message = await self.read_one()
  68. if not message:
  69. await self.sleep()
  70. continue
  71. else:
  72. self.wake()
  73. IOLoop.current().add_callback(self.queue.put_nowait, message)
  74. except Exception as e: # pragma: no cover
  75. self.log.exception(
  76. "%s couldn't enqueue message: %s (%s)", self, message, e
  77. )
  78. await self.sleep()
  79. async def _read_content(
  80. self, length: int, max_parts=1000, max_empties=200
  81. ) -> Optional[bytes]:
  82. """Read the full length of the message unless exceeding max_parts or
  83. max_empties empty reads occur.
  84. See https://github.com/jupyter-lsp/jupyterlab-lsp/issues/450
  85. Crucial docs or read():
  86. "If the argument is positive, and the underlying raw
  87. stream is not interactive, multiple raw reads may be issued
  88. to satisfy the byte count (unless EOF is reached first)"
  89. Args:
  90. - length: the content length
  91. - max_parts: prevent absurdly long messages (1000 parts is several MBs):
  92. 1 part is usually sufficient but not enough for some long
  93. messages 2 or 3 parts are often needed.
  94. """
  95. raw = None
  96. raw_parts: List[bytes] = []
  97. received_size = 0
  98. while received_size < length and len(raw_parts) < max_parts and max_empties > 0:
  99. part = None
  100. try:
  101. part = self.stream.read(length - received_size)
  102. except OSError: # pragma: no cover
  103. pass
  104. if part is None:
  105. max_empties -= 1
  106. await self.sleep()
  107. continue
  108. received_size += len(part)
  109. raw_parts.append(part)
  110. if raw_parts:
  111. raw = b"".join(raw_parts)
  112. if len(raw) != length: # pragma: no cover
  113. self.log.warning(
  114. f"Readout and content-length mismatch: {len(raw)} vs {length};"
  115. f"remaining empties: {max_empties}; remaining parts: {max_parts}"
  116. )
  117. return raw
  118. async def read_one(self) -> Text:
  119. """Read a single message"""
  120. message = ""
  121. headers = HTTPHeaders()
  122. line = await convert_yielded(self._readline())
  123. if line:
  124. while line and line.strip():
  125. headers.parse_line(line)
  126. line = await convert_yielded(self._readline())
  127. content_length = int(headers.get("content-length", "0"))
  128. if content_length:
  129. raw = await self._read_content(length=content_length)
  130. if raw is not None:
  131. message = raw.decode("utf-8").strip()
  132. else: # pragma: no cover
  133. self.log.warning(
  134. "%s failed to read message of length %s",
  135. self,
  136. content_length,
  137. )
  138. return message
  139. @run_on_executor
  140. def _readline(self) -> Text:
  141. """Read a line (or immediately return None)"""
  142. try:
  143. return self.stream.readline().decode("utf-8").strip()
  144. except OSError: # pragma: no cover
  145. return ""
  146. class LspStdIoWriter(LspStdIoBase):
  147. """Language Server stdio Writer"""
  148. async def write(self) -> None:
  149. """Write to a Language Server until it closes"""
  150. while not self.stream.closed:
  151. message = await self.queue.get()
  152. try:
  153. body = message.encode("utf-8")
  154. response = "Content-Length: {}\r\n\r\n{}".format(len(body), message)
  155. await convert_yielded(self._write_one(response.encode("utf-8")))
  156. except Exception: # pragma: no cover
  157. self.log.exception("%s couldn't write message: %s", self, response)
  158. finally:
  159. self.queue.task_done()
  160. @run_on_executor
  161. def _write_one(self, message) -> None:
  162. self.stream.write(message)
  163. self.stream.flush()