session.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. """ A session for managing a language server process
  2. """
  3. import asyncio
  4. import atexit
  5. import os
  6. import string
  7. import subprocess
  8. from datetime import datetime, timezone
  9. from tornado.ioloop import IOLoop
  10. from tornado.queues import Queue
  11. from tornado.websocket import WebSocketHandler
  12. from traitlets import Bunch, Instance, Set, Unicode, UseEnum, observe
  13. from traitlets.config import LoggingConfigurable
  14. from . import stdio
  15. from .schema import LANGUAGE_SERVER_SPEC
  16. from .specs.utils import censored_spec
  17. from .trait_types import Schema
  18. from .types import SessionStatus
  19. class LanguageServerSession(LoggingConfigurable):
  20. """Manage a session for a connection to a language server"""
  21. language_server = Unicode(help="the language server implementation name")
  22. spec = Schema(LANGUAGE_SERVER_SPEC)
  23. # run-time specifics
  24. process = Instance(
  25. subprocess.Popen, help="the language server subprocess", allow_none=True
  26. )
  27. writer = Instance(stdio.LspStdIoWriter, help="the JSON-RPC writer", allow_none=True)
  28. reader = Instance(stdio.LspStdIoReader, help="the JSON-RPC reader", allow_none=True)
  29. from_lsp = Instance(
  30. Queue, help="a queue for string messages from the server", allow_none=True
  31. )
  32. to_lsp = Instance(
  33. Queue, help="a queue for string message to the server", allow_none=True
  34. )
  35. handlers = Set(
  36. trait=Instance(WebSocketHandler),
  37. default_value=[],
  38. help="the currently subscribed websockets",
  39. )
  40. status = UseEnum(SessionStatus, default_value=SessionStatus.NOT_STARTED)
  41. last_handler_message_at = Instance(datetime, allow_none=True)
  42. last_server_message_at = Instance(datetime, allow_none=True)
  43. _tasks = None
  44. _skip_serialize = ["argv", "debug_argv"]
  45. def __init__(self, *args, **kwargs):
  46. """set up the required traitlets and exit behavior for a session"""
  47. super().__init__(*args, **kwargs)
  48. atexit.register(self.stop)
  49. def __repr__(self): # pragma: no cover
  50. return (
  51. "<LanguageServerSession(" "language_server={language_server}, argv={argv})>"
  52. ).format(language_server=self.language_server, **self.spec)
  53. def to_json(self):
  54. return dict(
  55. handler_count=len(self.handlers),
  56. status=self.status.value,
  57. last_server_message_at=(
  58. self.last_server_message_at.isoformat()
  59. if self.last_server_message_at
  60. else None
  61. ),
  62. last_handler_message_at=(
  63. self.last_handler_message_at.isoformat()
  64. if self.last_handler_message_at
  65. else None
  66. ),
  67. spec=censored_spec(self.spec),
  68. )
  69. def initialize(self):
  70. """(re)initialize a language server session"""
  71. self.stop()
  72. self.status = SessionStatus.STARTING
  73. self.init_queues()
  74. self.init_process()
  75. self.init_writer()
  76. self.init_reader()
  77. loop = asyncio.get_event_loop()
  78. self._tasks = [
  79. loop.create_task(coro())
  80. for coro in [self._read_lsp, self._write_lsp, self._broadcast_from_lsp]
  81. ]
  82. self.status = SessionStatus.STARTED
  83. def stop(self):
  84. """clean up all of the state of the session"""
  85. self.status = SessionStatus.STOPPING
  86. if self.process:
  87. self.process.terminate()
  88. self.process = None
  89. if self.reader:
  90. self.reader.close()
  91. self.reader = None
  92. if self.writer:
  93. self.writer.close()
  94. self.writer = None
  95. if self._tasks:
  96. [task.cancel() for task in self._tasks]
  97. self.status = SessionStatus.STOPPED
  98. @observe("handlers")
  99. def _on_handlers(self, change: Bunch):
  100. """re-initialize if someone starts listening, or stop if nobody is"""
  101. if change["new"] and not self.process:
  102. self.initialize()
  103. elif not change["new"] and self.process:
  104. self.stop()
  105. def write(self, message):
  106. """wrapper around the write queue to keep it mostly internal"""
  107. self.last_handler_message_at = self.now()
  108. IOLoop.current().add_callback(self.to_lsp.put_nowait, message)
  109. def now(self):
  110. return datetime.now(timezone.utc)
  111. def init_process(self):
  112. """start the language server subprocess"""
  113. self.process = subprocess.Popen(
  114. self.spec["argv"],
  115. stdin=subprocess.PIPE,
  116. stdout=subprocess.PIPE,
  117. env=self.substitute_env(self.spec.get("env", {}), os.environ),
  118. bufsize=0,
  119. )
  120. def init_queues(self):
  121. """create the queues"""
  122. self.from_lsp = Queue()
  123. self.to_lsp = Queue()
  124. def init_reader(self):
  125. """create the stdout reader (from the language server)"""
  126. self.reader = stdio.LspStdIoReader(
  127. stream=self.process.stdout, queue=self.from_lsp, parent=self
  128. )
  129. def init_writer(self):
  130. """create the stdin writer (to the language server)"""
  131. self.writer = stdio.LspStdIoWriter(
  132. stream=self.process.stdin, queue=self.to_lsp, parent=self
  133. )
  134. def substitute_env(self, env, base):
  135. final_env = base.copy()
  136. for key, value in env.items():
  137. final_env.update({key: string.Template(value).safe_substitute(base)})
  138. return final_env
  139. async def _read_lsp(self):
  140. await self.reader.read()
  141. async def _write_lsp(self):
  142. await self.writer.write()
  143. async def _broadcast_from_lsp(self):
  144. """loop for reading messages from the queue of messages from the language
  145. server
  146. """
  147. async for message in self.from_lsp:
  148. self.last_server_message_at = self.now()
  149. await self.parent.on_server_message(message, self)
  150. self.from_lsp.task_done()