| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- #
- # Copyright (c) 2012-2017 The ANTLR Project. All rights reserved.
- # Use of this file is governed by the BSD 3-clause license that
- # can be found in the LICENSE.txt file in the project root.
- # This implementation of {@link TokenStream} loads tokens from a
- # {@link TokenSource} on-demand, and places the tokens in a buffer to provide
- # access to any previous token by index.
- #
- # <p>
- # This token stream ignores the value of {@link Token#getChannel}. If your
- # parser requires the token stream filter tokens to only those on a particular
- # channel, such as {@link Token#DEFAULT_CHANNEL} or
- # {@link Token#HIDDEN_CHANNEL}, use a filtering token stream such a
- # {@link CommonTokenStream}.</p>
- from io import StringIO
- from antlr4.Token import Token
- from antlr4.error.Errors import IllegalStateException
- # need forward declaration
- Lexer = None
- # this is just to keep meaningful parameter types to Parser
- class TokenStream(object):
- pass
- class BufferedTokenStream(TokenStream):
- __slots__ = ('tokenSource', 'tokens', 'index', 'fetchedEOF')
- def __init__(self, tokenSource:Lexer):
- # The {@link TokenSource} from which tokens for this stream are fetched.
- self.tokenSource = tokenSource
- # A collection of all tokens fetched from the token source. The list is
- # considered a complete view of the input once {@link #fetchedEOF} is set
- # to {@code true}.
- self.tokens = []
- # The index into {@link #tokens} of the current token (next token to
- # {@link #consume}). {@link #tokens}{@code [}{@link #p}{@code ]} should be
- # {@link #LT LT(1)}.
- #
- # <p>This field is set to -1 when the stream is first constructed or when
- # {@link #setTokenSource} is called, indicating that the first token has
- # not yet been fetched from the token source. For additional information,
- # see the documentation of {@link IntStream} for a description of
- # Initializing Methods.</p>
- self.index = -1
- # Indicates whether the {@link Token#EOF} token has been fetched from
- # {@link #tokenSource} and added to {@link #tokens}. This field improves
- # performance for the following cases:
- #
- # <ul>
- # <li>{@link #consume}: The lookahead check in {@link #consume} to prevent
- # consuming the EOF symbol is optimized by checking the values of
- # {@link #fetchedEOF} and {@link #p} instead of calling {@link #LA}.</li>
- # <li>{@link #fetch}: The check to prevent adding multiple EOF symbols into
- # {@link #tokens} is trivial with this field.</li>
- # <ul>
- self.fetchedEOF = False
- def mark(self):
- return 0
- def release(self, marker:int):
- # no resources to release
- pass
- def reset(self):
- self.seek(0)
- def seek(self, index:int):
- self.lazyInit()
- self.index = self.adjustSeekIndex(index)
- def get(self, index:int):
- self.lazyInit()
- return self.tokens[index]
- def consume(self):
- skipEofCheck = False
- if self.index >= 0:
- if self.fetchedEOF:
- # the last token in tokens is EOF. skip check if p indexes any
- # fetched token except the last.
- skipEofCheck = self.index < len(self.tokens) - 1
- else:
- # no EOF token in tokens. skip check if p indexes a fetched token.
- skipEofCheck = self.index < len(self.tokens)
- else:
- # not yet initialized
- skipEofCheck = False
- if not skipEofCheck and self.LA(1) == Token.EOF:
- raise IllegalStateException("cannot consume EOF")
- if self.sync(self.index + 1):
- self.index = self.adjustSeekIndex(self.index + 1)
- # Make sure index {@code i} in tokens has a token.
- #
- # @return {@code true} if a token is located at index {@code i}, otherwise
- # {@code false}.
- # @see #get(int i)
- #/
- def sync(self, i:int):
- n = i - len(self.tokens) + 1 # how many more elements we need?
- if n > 0 :
- fetched = self.fetch(n)
- return fetched >= n
- return True
- # Add {@code n} elements to buffer.
- #
- # @return The actual number of elements added to the buffer.
- #/
- def fetch(self, n:int):
- if self.fetchedEOF:
- return 0
- for i in range(0, n):
- t = self.tokenSource.nextToken()
- t.tokenIndex = len(self.tokens)
- self.tokens.append(t)
- if t.type==Token.EOF:
- self.fetchedEOF = True
- return i + 1
- return n
- # Get all tokens from start..stop inclusively#/
- def getTokens(self, start:int, stop:int, types:set=None):
- if start<0 or stop<0:
- return None
- self.lazyInit()
- subset = []
- if stop >= len(self.tokens):
- stop = len(self.tokens)-1
- for i in range(start, stop):
- t = self.tokens[i]
- if t.type==Token.EOF:
- break
- if types is None or t.type in types:
- subset.append(t)
- return subset
- def LA(self, i:int):
- return self.LT(i).type
- def LB(self, k:int):
- if (self.index-k) < 0:
- return None
- return self.tokens[self.index-k]
- def LT(self, k:int):
- self.lazyInit()
- if k==0:
- return None
- if k < 0:
- return self.LB(-k)
- i = self.index + k - 1
- self.sync(i)
- if i >= len(self.tokens): # return EOF token
- # EOF must be last token
- return self.tokens[len(self.tokens)-1]
- return self.tokens[i]
- # Allowed derived classes to modify the behavior of operations which change
- # the current stream position by adjusting the target token index of a seek
- # operation. The default implementation simply returns {@code i}. If an
- # exception is thrown in this method, the current stream index should not be
- # changed.
- #
- # <p>For example, {@link CommonTokenStream} overrides this method to ensure that
- # the seek target is always an on-channel token.</p>
- #
- # @param i The target token index.
- # @return The adjusted target token index.
- def adjustSeekIndex(self, i:int):
- return i
- def lazyInit(self):
- if self.index == -1:
- self.setup()
- def setup(self):
- self.sync(0)
- self.index = self.adjustSeekIndex(0)
- # Reset this token stream by setting its token source.#/
- def setTokenSource(self, tokenSource:Lexer):
- self.tokenSource = tokenSource
- self.tokens = []
- self.index = -1
- self.fetchedEOF = False
- # Given a starting index, return the index of the next token on channel.
- # Return i if tokens[i] is on channel. Return the index of the EOF token
- # if there are no tokens on channel between i and EOF.
- #/
- def nextTokenOnChannel(self, i:int, channel:int):
- self.sync(i)
- if i>=len(self.tokens):
- return len(self.tokens) - 1
- token = self.tokens[i]
- while token.channel!=channel:
- if token.type==Token.EOF:
- return i
- i += 1
- self.sync(i)
- token = self.tokens[i]
- return i
- # Given a starting index, return the index of the previous token on channel.
- # Return i if tokens[i] is on channel. Return -1 if there are no tokens
- # on channel between i and 0.
- def previousTokenOnChannel(self, i:int, channel:int):
- while i>=0 and self.tokens[i].channel!=channel:
- i -= 1
- return i
- # Collect all tokens on specified channel to the right of
- # the current token up until we see a token on DEFAULT_TOKEN_CHANNEL or
- # EOF. If channel is -1, find any non default channel token.
- def getHiddenTokensToRight(self, tokenIndex:int, channel:int=-1):
- self.lazyInit()
- if tokenIndex<0 or tokenIndex>=len(self.tokens):
- raise Exception(str(tokenIndex) + " not in 0.." + str(len(self.tokens)-1))
- from antlr4.Lexer import Lexer
- nextOnChannel = self.nextTokenOnChannel(tokenIndex + 1, Lexer.DEFAULT_TOKEN_CHANNEL)
- from_ = tokenIndex+1
- # if none onchannel to right, nextOnChannel=-1 so set to = last token
- to = (len(self.tokens)-1) if nextOnChannel==-1 else nextOnChannel
- return self.filterForChannel(from_, to, channel)
- # Collect all tokens on specified channel to the left of
- # the current token up until we see a token on DEFAULT_TOKEN_CHANNEL.
- # If channel is -1, find any non default channel token.
- def getHiddenTokensToLeft(self, tokenIndex:int, channel:int=-1):
- self.lazyInit()
- if tokenIndex<0 or tokenIndex>=len(self.tokens):
- raise Exception(str(tokenIndex) + " not in 0.." + str(len(self.tokens)-1))
- from antlr4.Lexer import Lexer
- prevOnChannel = self.previousTokenOnChannel(tokenIndex - 1, Lexer.DEFAULT_TOKEN_CHANNEL)
- if prevOnChannel == tokenIndex - 1:
- return None
- # if none on channel to left, prevOnChannel=-1 then from=0
- from_ = prevOnChannel+1
- to = tokenIndex-1
- return self.filterForChannel(from_, to, channel)
- def filterForChannel(self, left:int, right:int, channel:int):
- hidden = []
- for i in range(left, right+1):
- t = self.tokens[i]
- if channel==-1:
- from antlr4.Lexer import Lexer
- if t.channel!= Lexer.DEFAULT_TOKEN_CHANNEL:
- hidden.append(t)
- elif t.channel==channel:
- hidden.append(t)
- if len(hidden)==0:
- return None
- return hidden
- def getSourceName(self):
- return self.tokenSource.getSourceName()
- # Get the text of all tokens in this buffer.#/
- def getText(self, start:int=None, stop:int=None):
- self.lazyInit()
- self.fill()
- if isinstance(start, Token):
- start = start.tokenIndex
- elif start is None:
- start = 0
- if isinstance(stop, Token):
- stop = stop.tokenIndex
- elif stop is None or stop >= len(self.tokens):
- stop = len(self.tokens) - 1
- if start < 0 or stop < 0 or stop < start:
- return ""
- with StringIO() as buf:
- for i in range(start, stop+1):
- t = self.tokens[i]
- if t.type==Token.EOF:
- break
- buf.write(t.text)
- return buf.getvalue()
- # Get all tokens from lexer until EOF#/
- def fill(self):
- self.lazyInit()
- while self.fetch(1000)==1000:
- pass
|