| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506 |
- # Copyright (c) Microsoft Corporation. All rights reserved.
- # Licensed under the MIT License. See LICENSE in the project root
- # for license information.
- """An implementation of the session and presentation layers as used in the Debug
- Adapter Protocol (DAP): channels and their lifetime, JSON messages, requests,
- responses, and events.
- https://microsoft.github.io/debug-adapter-protocol/overview#base-protocol
- """
- from __future__ import annotations
- import collections
- import contextlib
- import functools
- import itertools
- import os
- import socket
- import sys
- import threading
- from debugpy.common import json, log, util
- from debugpy.common.util import hide_thread_from_debugger
- class JsonIOError(IOError):
- """Indicates that a read or write operation on JsonIOStream has failed."""
- def __init__(self, *args, **kwargs):
- stream = kwargs.pop("stream")
- cause = kwargs.pop("cause", None)
- if not len(args) and cause is not None:
- args = [str(cause)]
- super().__init__(*args, **kwargs)
- self.stream = stream
- """The stream that couldn't be read or written.
- Set by JsonIOStream.read_json() and JsonIOStream.write_json().
- JsonMessageChannel relies on this value to decide whether a NoMoreMessages
- instance that bubbles up to the message loop is related to that loop.
- """
- self.cause = cause
- """The underlying exception, if any."""
- class NoMoreMessages(JsonIOError, EOFError):
- """Indicates that there are no more messages that can be read from or written
- to a stream.
- """
- def __init__(self, *args, **kwargs):
- args = args if len(args) else ["No more messages"]
- super().__init__(*args, **kwargs)
- class JsonIOStream(object):
- """Implements a JSON value stream over two byte streams (input and output).
- Each value is encoded as a DAP packet, with metadata headers and a JSON payload.
- """
- MAX_BODY_SIZE = 0xFFFFFF
- json_decoder_factory = json.JsonDecoder
- """Used by read_json() when decoder is None."""
- json_encoder_factory = json.JsonEncoder
- """Used by write_json() when encoder is None."""
- @classmethod
- def from_stdio(cls, name="stdio"):
- """Creates a new instance that receives messages from sys.stdin, and sends
- them to sys.stdout.
- """
- return cls(sys.stdin.buffer, sys.stdout.buffer, name)
- @classmethod
- def from_process(cls, process, name="stdio"):
- """Creates a new instance that receives messages from process.stdin, and sends
- them to process.stdout.
- """
- return cls(process.stdout, process.stdin, name)
- @classmethod
- def from_socket(cls, sock, name=None):
- """Creates a new instance that sends and receives messages over a socket."""
- sock.settimeout(None) # make socket blocking
- if name is None:
- name = repr(sock)
- # TODO: investigate switching to buffered sockets; readline() on unbuffered
- # sockets is very slow! Although the implementation of readline() itself is
- # native code, it calls read(1) in a loop - and that then ultimately calls
- # SocketIO.readinto(), which is implemented in Python.
- socket_io = sock.makefile("rwb", 0)
- # SocketIO.close() doesn't close the underlying socket.
- def cleanup():
- try:
- sock.shutdown(socket.SHUT_RDWR)
- except Exception: # pragma: no cover
- pass
- sock.close()
- return cls(socket_io, socket_io, name, cleanup)
- def __init__(self, reader, writer, name=None, cleanup=lambda: None):
- """Creates a new JsonIOStream.
- reader must be a BytesIO-like object, from which incoming messages will be
- read by read_json().
- writer must be a BytesIO-like object, into which outgoing messages will be
- written by write_json().
- cleanup must be a callable; it will be invoked without arguments when the
- stream is closed.
- reader.readline() must treat "\n" as the line terminator, and must leave "\r"
- as is - it must not replace "\r\n" with "\n" automatically, as TextIO does.
- """
- if name is None:
- name = f"reader={reader!r}, writer={writer!r}"
- self.name = name
- self._reader = reader
- self._writer = writer
- self._cleanup = cleanup
- self._closed = False
- def close(self):
- """Closes the stream, the reader, and the writer."""
- if self._closed:
- return
- self._closed = True
- log.debug("Closing {0} message stream", self.name)
- try:
- try:
- # Close the writer first, so that the other end of the connection has
- # its message loop waiting on read() unblocked. If there is an exception
- # while closing the writer, we still want to try to close the reader -
- # only one exception can bubble up, so if both fail, it'll be the one
- # from reader.
- try:
- self._writer.close()
- finally:
- if self._reader is not self._writer:
- self._reader.close()
- finally:
- self._cleanup()
- except Exception: # pragma: no cover
- log.reraise_exception("Error while closing {0} message stream", self.name)
- def _log_message(self, dir, data, logger=log.debug):
- return logger("{0} {1} {2}", self.name, dir, data)
- def _read_line(self, reader):
- line = b""
- while True:
- try:
- line += reader.readline()
- except Exception as exc:
- raise NoMoreMessages(str(exc), stream=self)
- if not line:
- raise NoMoreMessages(stream=self)
- if line.endswith(b"\r\n"):
- line = line[0:-2]
- return line
- def read_json(self, decoder=None):
- """Read a single JSON value from reader.
- Returns JSON value as parsed by decoder.decode(), or raises NoMoreMessages
- if there are no more values to be read.
- """
- decoder = decoder if decoder is not None else self.json_decoder_factory()
- reader = self._reader
- read_line = functools.partial(self._read_line, reader)
- # If any error occurs while reading and parsing the message, log the original
- # raw message data as is, so that it's possible to diagnose missing or invalid
- # headers, encoding issues, JSON syntax errors etc.
- def log_message_and_reraise_exception(format_string="", *args, **kwargs):
- if format_string:
- format_string += "\n\n"
- format_string += "{name} -->\n{raw_lines}"
- raw_lines = b"".join(raw_chunks).split(b"\n")
- raw_lines = "\n".join(repr(line) for line in raw_lines)
- log.reraise_exception(
- format_string, *args, name=self.name, raw_lines=raw_lines, **kwargs
- )
- raw_chunks = []
- headers = {}
- while True:
- try:
- line = read_line()
- except Exception: # pragma: no cover
- # Only log it if we have already read some headers, and are looking
- # for a blank line terminating them. If this is the very first read,
- # there's no message data to log in any case, and the caller might
- # be anticipating the error - e.g. NoMoreMessages on disconnect.
- if headers:
- log_message_and_reraise_exception(
- "Error while reading message headers:"
- )
- else:
- raise
- raw_chunks += [line, b"\n"]
- if line == b"":
- break
- key, _, value = line.partition(b":")
- headers[key] = value
- try:
- length = int(headers[b"Content-Length"])
- if not (0 <= length <= self.MAX_BODY_SIZE):
- raise ValueError
- except (KeyError, ValueError): # pragma: no cover
- try:
- raise IOError("Content-Length is missing or invalid:")
- except Exception:
- log_message_and_reraise_exception()
- body_start = len(raw_chunks)
- body_remaining = length
- while body_remaining > 0:
- try:
- chunk = reader.read(body_remaining)
- if not chunk:
- raise EOFError
- except Exception as exc:
- # Not logged due to https://github.com/microsoft/ptvsd/issues/1699
- raise NoMoreMessages(str(exc), stream=self)
- raw_chunks.append(chunk)
- body_remaining -= len(chunk)
- assert body_remaining == 0
- body = b"".join(raw_chunks[body_start:])
- try:
- body = body.decode("utf-8")
- except Exception: # pragma: no cover
- log_message_and_reraise_exception()
- try:
- body = decoder.decode(body)
- except Exception: # pragma: no cover
- log_message_and_reraise_exception()
- # If parsed successfully, log as JSON for readability.
- self._log_message("-->", body)
- return body
- def write_json(self, value, encoder=None):
- """Write a single JSON value into writer.
- Value is written as encoded by encoder.encode().
- """
- if self._closed:
- # Don't log this - it's a common pattern to write to a stream while
- # anticipating EOFError from it in case it got closed concurrently.
- raise NoMoreMessages(stream=self)
- encoder = encoder if encoder is not None else self.json_encoder_factory()
- writer = self._writer
- # Format the value as a message, and try to log any failures using as much
- # information as we already have at the point of the failure. For example,
- # if it fails after it is serialized to JSON, log that JSON.
- try:
- body = encoder.encode(value)
- except Exception: # pragma: no cover
- self._log_message("<--", repr(value), logger=log.reraise_exception)
- body = body.encode("utf-8")
- header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
- data = header + body
- data_written = 0
- try:
- while data_written < len(data):
- written = writer.write(data[data_written:])
- if written is not None:
- data_written += written
- writer.flush()
- except Exception as exc: # pragma: no cover
- self._log_message("<--", value, logger=log.swallow_exception)
- raise JsonIOError(stream=self, cause=exc)
- self._log_message("<--", value)
- def __repr__(self):
- return f"{type(self).__name__}({self.name!r})"
- class MessageDict(collections.OrderedDict):
- """A specialized dict that is used for JSON message payloads - Request.arguments,
- Response.body, and Event.body.
- For all members that normally throw KeyError when a requested key is missing, this
- dict raises InvalidMessageError instead. Thus, a message handler can skip checks
- for missing properties, and just work directly with the payload on the assumption
- that it is valid according to the protocol specification; if anything is missing,
- it will be reported automatically in the proper manner.
- If the value for the requested key is itself a dict, it is returned as is, and not
- automatically converted to MessageDict. Thus, to enable convenient chaining - e.g.
- d["a"]["b"]["c"] - the dict must consistently use MessageDict instances rather than
- vanilla dicts for all its values, recursively. This is guaranteed for the payload
- of all freshly received messages (unless and until it is mutated), but there is no
- such guarantee for outgoing messages.
- """
- def __init__(self, message, items=None):
- assert message is None or isinstance(message, Message)
- if items is None:
- super().__init__()
- else:
- super().__init__(items)
- self.message = message
- """The Message object that owns this dict.
- For any instance exposed via a Message object corresponding to some incoming
- message, it is guaranteed to reference that Message object. There is no similar
- guarantee for outgoing messages.
- """
- def __repr__(self):
- try:
- return format(json.repr(self))
- except Exception: # pragma: no cover
- return super().__repr__()
- def __call__(self, key, validate, optional=False):
- """Like get(), but with validation.
- The item is first retrieved as if with self.get(key, default=()) - the default
- value is () rather than None, so that JSON nulls are distinguishable from
- missing properties.
- If optional=True, and the value is (), it's returned as is. Otherwise, the
- item is validated by invoking validate(item) on it.
- If validate=False, it's treated as if it were (lambda x: x) - i.e. any value
- is considered valid, and is returned unchanged. If validate is a type or a
- tuple, it's treated as json.of_type(validate). Otherwise, if validate is not
- callable(), it's treated as json.default(validate).
- If validate() returns successfully, the item is substituted with the value
- it returns - thus, the validator can e.g. replace () with a suitable default
- value for the property.
- If validate() raises TypeError or ValueError, raises InvalidMessageError with
- the same text that applies_to(self.messages).
- See debugpy.common.json for reusable validators.
- """
- if not validate:
- validate = lambda x: x
- elif isinstance(validate, type) or isinstance(validate, tuple):
- validate = json.of_type(validate, optional=optional)
- elif not callable(validate):
- validate = json.default(validate)
- value = self.get(key, ())
- try:
- value = validate(value)
- except (TypeError, ValueError) as exc:
- message = Message if self.message is None else self.message
- err = str(exc)
- if not err.startswith("["):
- err = " " + err
- raise message.isnt_valid("{0}{1}", json.repr(key), err)
- return value
- def _invalid_if_no_key(func):
- def wrap(self, key, *args, **kwargs):
- try:
- return func(self, key, *args, **kwargs)
- except KeyError:
- message = Message if self.message is None else self.message
- raise message.isnt_valid("missing property {0!r}", key)
- return wrap
- __getitem__ = _invalid_if_no_key(collections.OrderedDict.__getitem__)
- __delitem__ = _invalid_if_no_key(collections.OrderedDict.__delitem__)
- pop = _invalid_if_no_key(collections.OrderedDict.pop)
- del _invalid_if_no_key
- def _payload(value):
- """JSON validator for message payload.
- If that value is missing or null, it is treated as if it were {}.
- """
- if value is not None and value != ():
- if isinstance(value, dict): # can be int, str, list...
- assert isinstance(value, MessageDict)
- return value
- # Missing payload. Construct a dummy MessageDict, and make it look like it was
- # deserialized. See JsonMessageChannel._parse_incoming_message for why it needs
- # to have associate_with().
- def associate_with(message):
- value.message = message
- value = MessageDict(None)
- value.associate_with = associate_with
- return value
- class Message(object):
- """Represents a fully parsed incoming or outgoing message.
- https://microsoft.github.io/debug-adapter-protocol/specification#protocolmessage
- """
- def __init__(self, channel, seq, json=None):
- self.channel = channel
- self.seq = seq
- """Sequence number of the message in its channel.
- This can be None for synthesized Responses.
- """
- self.json = json
- """For incoming messages, the MessageDict containing raw JSON from which
- this message was originally parsed.
- """
- def __str__(self):
- return json.repr(self.json) if self.json is not None else repr(self)
- def describe(self):
- """A brief description of the message that is enough to identify it.
- Examples:
- '#1 request "launch" from IDE'
- '#2 response to #1 request "launch" from IDE'.
- """
- raise NotImplementedError
- @property
- def payload(self) -> MessageDict:
- """Payload of the message - self.body or self.arguments, depending on the
- message type.
- """
- raise NotImplementedError
- def __call__(self, *args, **kwargs):
- """Same as self.payload(...)."""
- return self.payload(*args, **kwargs)
- def __contains__(self, key):
- """Same as (key in self.payload)."""
- return key in self.payload
- def is_event(self, *event):
- """Returns True if this message is an Event of one of the specified types."""
- if not isinstance(self, Event):
- return False
- return event == () or self.event in event
- def is_request(self, *command):
- """Returns True if this message is a Request of one of the specified types."""
- if not isinstance(self, Request):
- return False
- return command == () or self.command in command
- def is_response(self, *command):
- """Returns True if this message is a Response to a request of one of the
- specified types.
- """
- if not isinstance(self, Response):
- return False
- return command == () or self.request.command in command
- def error(self, exc_type, format_string, *args, **kwargs):
- """Returns a new exception of the specified type from the point at which it is
- invoked, with the specified formatted message as the reason.
- The resulting exception will have its cause set to the Message object on which
- error() was called. Additionally, if that message is a Request, a failure
- response is immediately sent.
- """
- assert issubclass(exc_type, MessageHandlingError)
- silent = kwargs.pop("silent", False)
- reason = format_string.format(*args, **kwargs)
- exc = exc_type(reason, self, silent) # will log it
- if isinstance(self, Request):
- self.respond(exc)
- return exc
- def isnt_valid(self, *args, **kwargs):
- """Same as self.error(InvalidMessageError, ...)."""
- return self.error(InvalidMessageError, *args, **kwargs)
- def cant_handle(self, *args, **kwargs):
- """Same as self.error(MessageHandlingError, ...)."""
- return self.error(MessageHandlingError, *args, **kwargs)
- class Event(Message):
- """Represents an incoming event.
- https://microsoft.github.io/debug-adapter-protocol/specification#event
- It is guaranteed that body is a MessageDict associated with this Event, and so
- are all the nested dicts in it. If "body" was missing or null in JSON, body is
- an empty dict.
- To handle the event, JsonMessageChannel tries to find a handler for this event in
- JsonMessageChannel.handlers. Given event="X", if handlers.X_event exists, then it
- is the specific handler for this event. Otherwise, handlers.event must exist, and
- it is the generic handler for this event. A missing handler is a fatal error.
- No further incoming messages are processed until the handler returns, except for
- responses to requests that have wait_for_response() invoked on them.
- To report failure to handle the event, the handler must raise an instance of
- MessageHandlingError that applies_to() the Event object it was handling. Any such
- failure is logged, after which the message loop moves on to the next message.
- Helper methods Message.isnt_valid() and Message.cant_handle() can be used to raise
- the appropriate exception type that applies_to() the Event object.
- """
- def __init__(self, channel, seq, event, body, json=None):
- super().__init__(channel, seq, json)
- self.event = event
- if isinstance(body, MessageDict) and hasattr(body, "associate_with"):
- body.associate_with(self)
- self.body = body
- def describe(self):
- return f"#{self.seq} event {json.repr(self.event)} from {self.channel}"
- @property
- def payload(self):
- return self.body
- @staticmethod
- def _parse(channel, message_dict):
- seq = message_dict("seq", int)
- event = message_dict("event", str)
- body = message_dict("body", _payload)
- message = Event(channel, seq, event, body, json=message_dict)
- channel._enqueue_handlers(message, message._handle)
- def _handle(self):
- channel = self.channel
- handler = channel._get_handler_for("event", self.event)
- try:
- try:
- result = handler(self)
- assert (
- result is None
- ), f"Handler {util.srcnameof(handler)} tried to respond to {self.describe()}."
- except MessageHandlingError as exc:
- if not exc.applies_to(self):
- raise
- log.error(
- "Handler {0}\ncouldn't handle {1}:\n{2}",
- util.srcnameof(handler),
- self.describe(),
- str(exc),
- )
- except Exception:
- log.reraise_exception(
- "Handler {0}\ncouldn't handle {1}:",
- util.srcnameof(handler),
- self.describe(),
- )
- NO_RESPONSE = object()
- """Can be returned from a request handler in lieu of the response body, to indicate
- that no response is to be sent.
- Request.respond() must be invoked explicitly at some later point to provide a response.
- """
- class Request(Message):
- """Represents an incoming or an outgoing request.
- Incoming requests are represented directly by instances of this class.
- Outgoing requests are represented by instances of OutgoingRequest, which provides
- additional functionality to handle responses.
- For incoming requests, it is guaranteed that arguments is a MessageDict associated
- with this Request, and so are all the nested dicts in it. If "arguments" was missing
- or null in JSON, arguments is an empty dict.
- To handle the request, JsonMessageChannel tries to find a handler for this request
- in JsonMessageChannel.handlers. Given command="X", if handlers.X_request exists,
- then it is the specific handler for this request. Otherwise, handlers.request must
- exist, and it is the generic handler for this request. A missing handler is a fatal
- error.
- The handler is then invoked with the Request object as its sole argument.
- If the handler itself invokes respond() on the Request at any point, then it must
- not return any value.
- Otherwise, if the handler returns NO_RESPONSE, no response to the request is sent.
- It must be sent manually at some later point via respond().
- Otherwise, a response to the request is sent with the returned value as the body.
- To fail the request, the handler can return an instance of MessageHandlingError,
- or respond() with one, or raise one such that it applies_to() the Request object
- being handled.
- Helper methods Message.isnt_valid() and Message.cant_handle() can be used to raise
- the appropriate exception type that applies_to() the Request object.
- """
- def __init__(self, channel, seq, command, arguments, json=None):
- super().__init__(channel, seq, json)
- self.command = command
- if isinstance(arguments, MessageDict) and hasattr(arguments, "associate_with"):
- arguments.associate_with(self)
- self.arguments = arguments
- self.response = None
- """Response to this request.
- For incoming requests, it is set as soon as the request handler returns.
- For outgoing requests, it is set as soon as the response is received, and
- before self._handle_response is invoked.
- """
- def describe(self):
- return f"#{self.seq} request {json.repr(self.command)} from {self.channel}"
- @property
- def payload(self):
- return self.arguments
- def respond(self, body):
- assert self.response is None
- d = {"type": "response", "request_seq": self.seq, "command": self.command}
- if isinstance(body, Exception):
- d["success"] = False
- d["message"] = str(body)
- else:
- d["success"] = True
- if body is not None and body != {}:
- d["body"] = body
- with self.channel._send_message(d) as seq:
- pass
- self.response = Response(self.channel, seq, self, body)
- @staticmethod
- def _parse(channel, message_dict):
- seq = message_dict("seq", int)
- command = message_dict("command", str)
- arguments = message_dict("arguments", _payload)
- message = Request(channel, seq, command, arguments, json=message_dict)
- channel._enqueue_handlers(message, message._handle)
- def _handle(self):
- channel = self.channel
- handler = channel._get_handler_for("request", self.command)
- try:
- try:
- result = handler(self)
- except MessageHandlingError as exc:
- if not exc.applies_to(self):
- raise
- result = exc
- log.error(
- "Handler {0}\ncouldn't handle {1}:\n{2}",
- util.srcnameof(handler),
- self.describe(),
- str(exc),
- )
- if result is NO_RESPONSE:
- assert self.response is None, (
- "Handler {0} for {1} must not return NO_RESPONSE if it has already "
- "invoked request.respond().".format(
- util.srcnameof(handler), self.describe()
- )
- )
- elif self.response is not None:
- assert result is None or result is self.response.body, (
- "Handler {0} for {1} must not return a response body if it has "
- "already invoked request.respond().".format(
- util.srcnameof(handler), self.describe()
- )
- )
- else:
- assert result is not None, (
- "Handler {0} for {1} must either call request.respond() before it "
- "returns, or return the response body, or return NO_RESPONSE.".format(
- util.srcnameof(handler), self.describe()
- )
- )
- try:
- self.respond(result)
- except NoMoreMessages:
- log.warning(
- "Channel was closed before the response from handler {0} to {1} could be sent",
- util.srcnameof(handler),
- self.describe(),
- )
- except Exception:
- log.reraise_exception(
- "Handler {0}\ncouldn't handle {1}:",
- util.srcnameof(handler),
- self.describe(),
- )
- class OutgoingRequest(Request):
- """Represents an outgoing request, for which it is possible to wait for a
- response to be received, and register a response handler.
- """
- _parse = _handle = None
- def __init__(self, channel, seq, command, arguments):
- super().__init__(channel, seq, command, arguments)
- self._response_handlers = []
- def describe(self):
- return f"{self.seq} request {json.repr(self.command)} to {self.channel}"
- def wait_for_response(self, raise_if_failed=True):
- """Waits until a response is received for this request, records the Response
- object for it in self.response, and returns response.body.
- If no response was received from the other party before the channel closed,
- self.response is a synthesized Response with body=NoMoreMessages().
- If raise_if_failed=True and response.success is False, raises response.body
- instead of returning.
- """
- with self.channel:
- while self.response is None:
- self.channel._handlers_enqueued.wait()
- if raise_if_failed and not self.response.success:
- raise self.response.body
- return self.response.body
- def on_response(self, response_handler):
- """Registers a handler to invoke when a response is received for this request.
- The handler is invoked with Response as its sole argument.
- If response has already been received, invokes the handler immediately.
- It is guaranteed that self.response is set before the handler is invoked.
- If no response was received from the other party before the channel closed,
- self.response is a dummy Response with body=NoMoreMessages().
- The handler is always invoked asynchronously on an unspecified background
- thread - thus, the caller of on_response() can never be blocked or deadlocked
- by the handler.
- No further incoming messages are processed until the handler returns, except for
- responses to requests that have wait_for_response() invoked on them.
- """
- with self.channel:
- self._response_handlers.append(response_handler)
- self._enqueue_response_handlers()
- def _enqueue_response_handlers(self):
- response = self.response
- if response is None:
- # Response._parse() will submit the handlers when response is received.
- return
- def run_handlers():
- for handler in handlers:
- try:
- try:
- handler(response)
- except MessageHandlingError as exc:
- if not exc.applies_to(response):
- raise
- log.error(
- "Handler {0}\ncouldn't handle {1}:\n{2}",
- util.srcnameof(handler),
- response.describe(),
- str(exc),
- )
- except Exception:
- log.reraise_exception(
- "Handler {0}\ncouldn't handle {1}:",
- util.srcnameof(handler),
- response.describe(),
- )
- handlers = self._response_handlers[:]
- self.channel._enqueue_handlers(response, run_handlers)
- del self._response_handlers[:]
- class Response(Message):
- """Represents an incoming or an outgoing response to a Request.
- https://microsoft.github.io/debug-adapter-protocol/specification#response
- error_message corresponds to "message" in JSON, and is renamed for clarity.
- If success is False, body is None. Otherwise, it is a MessageDict associated
- with this Response, and so are all the nested dicts in it. If "body" was missing
- or null in JSON, body is an empty dict.
- If this is a response to an outgoing request, it will be handled by the handler
- registered via self.request.on_response(), if any.
- Regardless of whether there is such a handler, OutgoingRequest.wait_for_response()
- can also be used to retrieve and handle the response. If there is a handler, it is
- executed before wait_for_response() returns.
- No further incoming messages are processed until the handler returns, except for
- responses to requests that have wait_for_response() invoked on them.
- To report failure to handle the event, the handler must raise an instance of
- MessageHandlingError that applies_to() the Response object it was handling. Any
- such failure is logged, after which the message loop moves on to the next message.
- Helper methods Message.isnt_valid() and Message.cant_handle() can be used to raise
- the appropriate exception type that applies_to() the Response object.
- """
- def __init__(self, channel, seq, request, body, json=None):
- super().__init__(channel, seq, json)
- self.request = request
- """The request to which this is the response."""
- if isinstance(body, MessageDict) and hasattr(body, "associate_with"):
- body.associate_with(self)
- self.body = body
- """Body of the response if the request was successful, or an instance
- of some class derived from Exception it it was not.
- If a response was received from the other side, but request failed, it is an
- instance of MessageHandlingError containing the received error message. If the
- error message starts with InvalidMessageError.PREFIX, then it's an instance of
- the InvalidMessageError specifically, and that prefix is stripped.
- If no response was received from the other party before the channel closed,
- it is an instance of NoMoreMessages.
- """
- def describe(self):
- return f"#{self.seq} response to {self.request.describe()}"
- @property
- def payload(self):
- return self.body
- @property
- def success(self):
- """Whether the request succeeded or not."""
- return not isinstance(self.body, Exception)
- @property
- def result(self):
- """Result of the request. Returns the value of response.body, unless it
- is an exception, in which case it is raised instead.
- """
- if self.success:
- return self.body
- else:
- raise self.body
- @staticmethod
- def _parse(channel, message_dict, body=None):
- seq = message_dict("seq", int) if (body is None) else None
- request_seq = message_dict("request_seq", int)
- command = message_dict("command", str)
- success = message_dict("success", bool)
- if body is None:
- if success:
- body = message_dict("body", _payload)
- else:
- error_message = message_dict("message", str)
- exc_type = MessageHandlingError
- if error_message.startswith(InvalidMessageError.PREFIX):
- error_message = error_message[len(InvalidMessageError.PREFIX) :]
- exc_type = InvalidMessageError
- body = exc_type(error_message, silent=True)
- try:
- with channel:
- request = channel._sent_requests.pop(request_seq)
- known_request = True
- except KeyError:
- # Synthetic Request that only has seq and command as specified in response
- # JSON, for error reporting purposes.
- request = OutgoingRequest(channel, request_seq, command, "<unknown>")
- known_request = False
- if not success:
- body.cause = request
- response = Response(channel, seq, request, body, json=message_dict)
- with channel:
- request.response = response
- request._enqueue_response_handlers()
- if known_request:
- return response
- else:
- raise response.isnt_valid(
- "request_seq={0} does not match any known request", request_seq
- )
- class Disconnect(Message):
- """A dummy message used to represent disconnect. It's always the last message
- received from any channel.
- """
- def __init__(self, channel):
- super().__init__(channel, None)
- def describe(self):
- return f"disconnect from {self.channel}"
- class MessageHandlingError(Exception):
- """Indicates that a message couldn't be handled for some reason.
- If the reason is a contract violation - i.e. the message that was handled did not
- conform to the protocol specification - InvalidMessageError, which is a subclass,
- should be used instead.
- If any message handler raises an exception not derived from this class, it will
- escape the message loop unhandled, and terminate the process.
- If any message handler raises this exception, but applies_to(message) is False, it
- is treated as if it was a generic exception, as desribed above. Thus, if a request
- handler issues another request of its own, and that one fails, the failure is not
- silently propagated. However, a request that is delegated via Request.delegate()
- will also propagate failures back automatically. For manual propagation, catch the
- exception, and call exc.propagate().
- If any event handler raises this exception, and applies_to(event) is True, the
- exception is silently swallowed by the message loop.
- If any request handler raises this exception, and applies_to(request) is True, the
- exception is silently swallowed by the message loop, and a failure response is sent
- with "message" set to str(reason).
- Note that, while errors are not logged when they're swallowed by the message loop,
- by that time they have already been logged by their __init__ (when instantiated).
- """
- def __init__(self, reason, cause=None, silent=False):
- """Creates a new instance of this class, and immediately logs the exception.
- Message handling errors are logged immediately unless silent=True, so that the
- precise context in which they occured can be determined from the surrounding
- log entries.
- """
- self.reason = reason
- """Why it couldn't be handled. This can be any object, but usually it's either
- str or Exception.
- """
- assert cause is None or isinstance(cause, Message)
- self.cause = cause
- """The Message object for the message that couldn't be handled. For responses
- to unknown requests, this is a synthetic Request.
- """
- if not silent:
- try:
- raise self
- except MessageHandlingError:
- log.swallow_exception()
- def __hash__(self):
- return hash((self.reason, id(self.cause)))
- def __eq__(self, other):
- if not isinstance(other, MessageHandlingError):
- return NotImplemented
- if type(self) is not type(other):
- return NotImplemented
- if self.reason != other.reason:
- return False
- if self.cause is not None and other.cause is not None:
- if self.cause.seq != other.cause.seq:
- return False
- return True
- def __ne__(self, other):
- return not self == other
- def __str__(self):
- return str(self.reason)
- def __repr__(self):
- s = type(self).__name__
- if self.cause is None:
- s += f"reason={self.reason!r})"
- else:
- s += f"channel={self.cause.channel.name!r}, cause={self.cause.seq!r}, reason={self.reason!r})"
- return s
- def applies_to(self, message):
- """Whether this MessageHandlingError can be treated as a reason why the
- handling of message failed.
- If self.cause is None, this is always true.
- If self.cause is not None, this is only true if cause is message.
- """
- return self.cause is None or self.cause is message
- def propagate(self, new_cause):
- """Propagates this error, raising a new instance of the same class with the
- same reason, but a different cause.
- """
- raise type(self)(self.reason, new_cause, silent=True)
- class InvalidMessageError(MessageHandlingError):
- """Indicates that an incoming message did not follow the protocol specification -
- for example, it was missing properties that are required, or the message itself
- is not allowed in the current state.
- Raised by MessageDict in lieu of KeyError for missing keys.
- """
- PREFIX = "Invalid message: "
- """Automatically prepended to the "message" property in JSON responses, when the
- handler raises InvalidMessageError.
- If a failed response has "message" property that starts with this prefix, it is
- reported as InvalidMessageError rather than MessageHandlingError.
- """
- def __str__(self):
- return InvalidMessageError.PREFIX + str(self.reason)
- class JsonMessageChannel(object):
- """Implements a JSON message channel on top of a raw JSON message stream, with
- support for DAP requests, responses, and events.
- The channel can be locked for exclusive use via the with-statement::
- with channel:
- channel.send_request(...)
- # No interleaving messages can be sent here from other threads.
- channel.send_event(...)
- """
- def __init__(self, stream, handlers=None, name=None):
- self.stream = stream
- self.handlers = handlers
- self.name = name if name is not None else stream.name
- self.started = False
- self._lock = threading.RLock()
- self._closed = False
- self._seq_iter = itertools.count(1)
- self._sent_requests = {} # {seq: Request}
- self._handler_queue = [] # [(what, handler)]
- self._handlers_enqueued = threading.Condition(self._lock)
- self._handler_thread = None
- self._parser_thread = None
- def __str__(self):
- return self.name
- def __repr__(self):
- return f"{type(self).__name__}({self.name!r})"
- def __enter__(self):
- self._lock.acquire()
- return self
- def __exit__(self, exc_type, exc_value, exc_tb):
- self._lock.release()
- def close(self):
- """Closes the underlying stream.
- This does not immediately terminate any handlers that are already executing,
- but they will be unable to respond. No new request or event handlers will
- execute after this method is called, even for messages that have already been
- received. However, response handlers will continue to executed for any request
- that is still pending, as will any handlers registered via on_response().
- """
- with self:
- if not self._closed:
- self._closed = True
- self.stream.close()
- def start(self):
- """Starts a message loop which parses incoming messages and invokes handlers
- for them on a background thread, until the channel is closed.
- Incoming messages, including responses to requests, will not be processed at
- all until this is invoked.
- """
- assert not self.started
- self.started = True
- self._parser_thread = threading.Thread(
- target=self._parse_incoming_messages, name=f"{self} message parser"
- )
- hide_thread_from_debugger(self._parser_thread)
- self._parser_thread.daemon = True
- self._parser_thread.start()
- def wait(self):
- """Waits for the message loop to terminate, and for all enqueued Response
- message handlers to finish executing.
- """
- parser_thread = self._parser_thread
- try:
- if parser_thread is not None:
- parser_thread.join()
- except AssertionError:
- log.debug("Handled error joining parser thread.")
- try:
- handler_thread = self._handler_thread
- if handler_thread is not None:
- handler_thread.join()
- except AssertionError:
- log.debug("Handled error joining handler thread.")
- # Order of keys for _prettify() - follows the order of properties in
- # https://microsoft.github.io/debug-adapter-protocol/specification
- _prettify_order = (
- "seq",
- "type",
- "request_seq",
- "success",
- "command",
- "event",
- "message",
- "arguments",
- "body",
- "error",
- )
- def _prettify(self, message_dict):
- """Reorders items in a MessageDict such that it is more readable."""
- for key in self._prettify_order:
- if key not in message_dict:
- continue
- value = message_dict[key]
- del message_dict[key]
- message_dict[key] = value
- @contextlib.contextmanager
- def _send_message(self, message):
- """Sends a new message to the other party.
- Generates a new sequence number for the message, and provides it to the
- caller before the message is sent, using the context manager protocol::
- with send_message(...) as seq:
- # The message hasn't been sent yet.
- ...
- # Now the message has been sent.
- Safe to call concurrently for the same channel from different threads.
- """
- assert "seq" not in message
- with self:
- seq = next(self._seq_iter)
- message = MessageDict(None, message)
- message["seq"] = seq
- self._prettify(message)
- with self:
- yield seq
- self.stream.write_json(message)
- def send_request(self, command, arguments=None, on_before_send=None):
- """Sends a new request, and returns the OutgoingRequest object for it.
- If arguments is None or {}, "arguments" will be omitted in JSON.
- If on_before_send is not None, invokes on_before_send() with the request
- object as the sole argument, before the request actually gets sent.
- Does not wait for response - use OutgoingRequest.wait_for_response().
- Safe to call concurrently for the same channel from different threads.
- """
- d = {"type": "request", "command": command}
- if arguments is not None and arguments != {}:
- d["arguments"] = arguments
- with self._send_message(d) as seq:
- request = OutgoingRequest(self, seq, command, arguments)
- if on_before_send is not None:
- on_before_send(request)
- self._sent_requests[seq] = request
- return request
- def send_event(self, event, body=None):
- """Sends a new event.
- If body is None or {}, "body" will be omitted in JSON.
- Safe to call concurrently for the same channel from different threads.
- """
- d = {"type": "event", "event": event}
- if body is not None and body != {}:
- d["body"] = body
- with self._send_message(d):
- pass
- def request(self, *args, **kwargs):
- """Same as send_request(...).wait_for_response()"""
- return self.send_request(*args, **kwargs).wait_for_response()
- def propagate(self, message):
- """Sends a new message with the same type and payload.
- If it was a request, returns the new OutgoingRequest object for it.
- """
- assert message.is_request() or message.is_event()
- if message.is_request():
- return self.send_request(message.command, message.arguments)
- else:
- self.send_event(message.event, message.body)
- def delegate(self, message):
- """Like propagate(message).wait_for_response(), but will also propagate
- any resulting MessageHandlingError back.
- """
- try:
- result = self.propagate(message)
- if result.is_request():
- result = result.wait_for_response()
- return result
- except MessageHandlingError as exc:
- exc.propagate(message)
- def _parse_incoming_messages(self):
- log.debug("Starting message loop for channel {0}", self)
- try:
- while True:
- self._parse_incoming_message()
- except NoMoreMessages as exc:
- log.debug("Exiting message loop for channel {0}: {1}", self, exc)
- with self:
- # Generate dummy responses for all outstanding requests.
- err_message = str(exc)
- # Response._parse() will remove items from _sent_requests, so
- # make a snapshot before iterating.
- sent_requests = list(self._sent_requests.values())
- for request in sent_requests:
- response_json = MessageDict(
- None,
- {
- "seq": -1,
- "request_seq": request.seq,
- "command": request.command,
- "success": False,
- "message": err_message,
- },
- )
- Response._parse(self, response_json, body=exc)
- assert not len(self._sent_requests)
- self._enqueue_handlers(Disconnect(self), self._handle_disconnect)
- self.close()
- _message_parsers = {
- "event": Event._parse,
- "request": Request._parse,
- "response": Response._parse,
- }
- def _parse_incoming_message(self):
- """Reads incoming messages, parses them, and puts handlers into the queue
- for _run_handlers() to invoke, until the channel is closed.
- """
- # Set up a dedicated decoder for this message, to create MessageDict instances
- # for all JSON objects, and track them so that they can be later wired up to
- # the Message they belong to, once it is instantiated.
- def object_hook(d):
- d = MessageDict(None, d)
- if "seq" in d:
- self._prettify(d)
- d.associate_with = associate_with
- message_dicts.append(d)
- return d
- # A hack to work around circular dependency between messages, and instances of
- # MessageDict in their payload. We need to set message for all of them, but it
- # cannot be done until the actual Message is created - which happens after the
- # dicts are created during deserialization.
- #
- # So, upon deserialization, every dict in the message payload gets a method
- # that can be called to set MessageDict.message for *all* dicts belonging to
- # that message. This method can then be invoked on the top-level dict by the
- # parser, after it has parsed enough of the dict to create the appropriate
- # instance of Event, Request, or Response for this message.
- def associate_with(message):
- for d in message_dicts:
- d.message = message
- del d.associate_with
- message_dicts = []
- decoder = self.stream.json_decoder_factory(object_hook=object_hook)
- message_dict = self.stream.read_json(decoder)
- assert isinstance(message_dict, MessageDict) # make sure stream used decoder
- msg_type = message_dict("type", json.enum("event", "request", "response"))
- parser = self._message_parsers[msg_type]
- try:
- parser(self, message_dict)
- except InvalidMessageError as exc:
- log.error(
- "Failed to parse message in channel {0}: {1} in:\n{2}",
- self,
- str(exc),
- json.repr(message_dict),
- )
- except Exception as exc:
- if isinstance(exc, NoMoreMessages) and exc.stream is self.stream:
- raise
- log.swallow_exception(
- "Fatal error in channel {0} while parsing:\n{1}",
- self,
- json.repr(message_dict),
- )
- os._exit(1)
- def _enqueue_handlers(self, what, *handlers):
- """Enqueues handlers for _run_handlers() to run.
- `what` is the Message being handled, and is used for logging purposes.
- If the background thread with _run_handlers() isn't running yet, starts it.
- """
- with self:
- self._handler_queue.extend((what, handler) for handler in handlers)
- self._handlers_enqueued.notify_all()
- # If there is anything to handle, but there's no handler thread yet,
- # spin it up. This will normally happen only once, on the first call
- # to _enqueue_handlers(), and that thread will run all the handlers
- # for parsed messages. However, this can also happen is somebody calls
- # Request.on_response() - possibly concurrently from multiple threads -
- # after the channel has already been closed, and the initial handler
- # thread has exited. In this case, we spin up a new thread just to run
- # the enqueued response handlers, and it will exit as soon as it's out
- # of handlers to run.
- if len(self._handler_queue) and self._handler_thread is None:
- self._handler_thread = threading.Thread(
- target=self._run_handlers,
- name=f"{self} message handler",
- )
- hide_thread_from_debugger(self._handler_thread)
- self._handler_thread.start()
- def _run_handlers(self):
- """Runs enqueued handlers until the channel is closed, or until the handler
- queue is empty once the channel is closed.
- """
- while True:
- with self:
- closed = self._closed
- if closed:
- # Wait for the parser thread to wrap up and enqueue any remaining
- # handlers, if it is still running.
- self._parser_thread.join()
- # From this point on, _enqueue_handlers() can only get called
- # from Request.on_response().
- with self:
- if not closed and not len(self._handler_queue):
- # Wait for something to process.
- self._handlers_enqueued.wait()
- # Make a snapshot before releasing the lock.
- handlers = self._handler_queue[:]
- del self._handler_queue[:]
- if closed and not len(handlers):
- # Nothing to process, channel is closed, and parser thread is
- # not running anymore - time to quit! If Request.on_response()
- # needs to call _enqueue_handlers() later, it will spin up
- # a new handler thread.
- self._handler_thread = None
- return
- for what, handler in handlers:
- # If the channel is closed, we don't want to process any more events
- # or requests - only responses and the final disconnect handler. This
- # is to guarantee that if a handler calls close() on its own channel,
- # the corresponding request or event is the last thing to be processed.
- if closed and handler in (Event._handle, Request._handle):
- continue
- with log.prefixed("/handling {0}/\n", what.describe()):
- try:
- handler()
- except Exception:
- # It's already logged by the handler, so just fail fast.
- self.close()
- os._exit(1)
- def _get_handler_for(self, type, name):
- """Returns the handler for a message of a given type."""
- with self:
- handlers = self.handlers
- for handler_name in (name + "_" + type, type):
- try:
- return getattr(handlers, handler_name)
- except AttributeError:
- continue
- raise AttributeError(
- "handler object {0} for channel {1} has no handler for {2} {3!r}".format(
- util.srcnameof(handlers),
- self,
- type,
- name,
- )
- )
- def _handle_disconnect(self):
- handler = getattr(self.handlers, "disconnect", lambda: None)
- try:
- handler()
- except Exception:
- log.reraise_exception(
- "Handler {0}\ncouldn't handle disconnect from {1}:",
- util.srcnameof(handler),
- self,
- )
- class MessageHandlers(object):
- """A simple delegating message handlers object for use with JsonMessageChannel.
- For every argument provided, the object gets an attribute with the corresponding
- name and value.
- """
- def __init__(self, **kwargs):
- for name, func in kwargs.items():
- setattr(self, name, func)
|