| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570 |
- #
- # Copyright (c) 2012-2017 The ANTLR Project. All rights reserved.
- # Use of this file is governed by the BSD 3-clause license that
- # can be found in the LICENSE.txt file in the project root.
- #/
- # When we hit an accept state in either the DFA or the ATN, we
- # have to notify the character stream to start buffering characters
- # via {@link IntStream#mark} and record the current state. The current sim state
- # includes the current index into the input, the current line,
- # and current character position in that line. Note that the Lexer is
- # tracking the starting line and characterization of the token. These
- # variables track the "state" of the simulator when it hits an accept state.
- #
- # <p>We track these variables separately for the DFA and ATN simulation
- # because the DFA simulation often has to fail over to the ATN
- # simulation. If the ATN simulation fails, we need the DFA to fall
- # back to its previously accepted state, if any. If the ATN succeeds,
- # then the ATN does the accept and the DFA simulator that invoked it
- # can simply return the predicted token type.</p>
- #/
- from antlr4.PredictionContext import PredictionContextCache, SingletonPredictionContext, PredictionContext
- from antlr4.InputStream import InputStream
- from antlr4.Token import Token
- from antlr4.atn.ATN import ATN
- from antlr4.atn.ATNConfig import LexerATNConfig
- from antlr4.atn.ATNSimulator import ATNSimulator
- from antlr4.atn.ATNConfigSet import ATNConfigSet, OrderedATNConfigSet
- from antlr4.atn.ATNState import RuleStopState, ATNState
- from antlr4.atn.LexerActionExecutor import LexerActionExecutor
- from antlr4.atn.Transition import Transition
- from antlr4.dfa.DFAState import DFAState
- from antlr4.error.Errors import LexerNoViableAltException, UnsupportedOperationException
- class SimState(object):
- __slots__ = ('index', 'line', 'column', 'dfaState')
- def __init__(self):
- self.reset()
- def reset(self):
- self.index = -1
- self.line = 0
- self.column = -1
- self.dfaState = None
- # need forward declaration
- Lexer = None
- LexerATNSimulator = None
- class LexerATNSimulator(ATNSimulator):
- __slots__ = (
- 'decisionToDFA', 'recog', 'startIndex', 'line', 'column', 'mode',
- 'DEFAULT_MODE', 'MAX_CHAR_VALUE', 'prevAccept'
- )
- debug = False
- dfa_debug = False
- MIN_DFA_EDGE = 0
- MAX_DFA_EDGE = 127 # forces unicode to stay in ATN
- ERROR = None
- def __init__(self, recog:Lexer, atn:ATN, decisionToDFA:list, sharedContextCache:PredictionContextCache):
- super().__init__(atn, sharedContextCache)
- self.decisionToDFA = decisionToDFA
- self.recog = recog
- # The current token's starting index into the character stream.
- # Shared across DFA to ATN simulation in case the ATN fails and the
- # DFA did not have a previous accept state. In this case, we use the
- # ATN-generated exception object.
- self.startIndex = -1
- # line number 1..n within the input#/
- self.line = 1
- # The index of the character relative to the beginning of the line 0..n-1#/
- self.column = 0
- from antlr4.Lexer import Lexer
- self.mode = Lexer.DEFAULT_MODE
- # Cache Lexer properties to avoid further imports
- self.DEFAULT_MODE = Lexer.DEFAULT_MODE
- self.MAX_CHAR_VALUE = Lexer.MAX_CHAR_VALUE
- # Used during DFA/ATN exec to record the most recent accept configuration info
- self.prevAccept = SimState()
- def copyState(self, simulator:LexerATNSimulator ):
- self.column = simulator.column
- self.line = simulator.line
- self.mode = simulator.mode
- self.startIndex = simulator.startIndex
- def match(self, input:InputStream , mode:int):
- self.mode = mode
- mark = input.mark()
- try:
- self.startIndex = input.index
- self.prevAccept.reset()
- dfa = self.decisionToDFA[mode]
- if dfa.s0 is None:
- return self.matchATN(input)
- else:
- return self.execATN(input, dfa.s0)
- finally:
- input.release(mark)
- def reset(self):
- self.prevAccept.reset()
- self.startIndex = -1
- self.line = 1
- self.column = 0
- self.mode = self.DEFAULT_MODE
- def matchATN(self, input:InputStream):
- startState = self.atn.modeToStartState[self.mode]
- if LexerATNSimulator.debug:
- print("matchATN mode " + str(self.mode) + " start: " + str(startState))
- old_mode = self.mode
- s0_closure = self.computeStartState(input, startState)
- suppressEdge = s0_closure.hasSemanticContext
- s0_closure.hasSemanticContext = False
- next = self.addDFAState(s0_closure)
- if not suppressEdge:
- self.decisionToDFA[self.mode].s0 = next
- predict = self.execATN(input, next)
- if LexerATNSimulator.debug:
- print("DFA after matchATN: " + str(self.decisionToDFA[old_mode].toLexerString()))
- return predict
- def execATN(self, input:InputStream, ds0:DFAState):
- if LexerATNSimulator.debug:
- print("start state closure=" + str(ds0.configs))
- if ds0.isAcceptState:
- # allow zero-length tokens
- self.captureSimState(self.prevAccept, input, ds0)
- t = input.LA(1)
- s = ds0 # s is current/from DFA state
- while True: # while more work
- if LexerATNSimulator.debug:
- print("execATN loop starting closure:", str(s.configs))
- # As we move src->trg, src->trg, we keep track of the previous trg to
- # avoid looking up the DFA state again, which is expensive.
- # If the previous target was already part of the DFA, we might
- # be able to avoid doing a reach operation upon t. If s!=null,
- # it means that semantic predicates didn't prevent us from
- # creating a DFA state. Once we know s!=null, we check to see if
- # the DFA state has an edge already for t. If so, we can just reuse
- # it's configuration set; there's no point in re-computing it.
- # This is kind of like doing DFA simulation within the ATN
- # simulation because DFA simulation is really just a way to avoid
- # computing reach/closure sets. Technically, once we know that
- # we have a previously added DFA state, we could jump over to
- # the DFA simulator. But, that would mean popping back and forth
- # a lot and making things more complicated algorithmically.
- # This optimization makes a lot of sense for loops within DFA.
- # A character will take us back to an existing DFA state
- # that already has lots of edges out of it. e.g., .* in comments.
- # print("Target for:" + str(s) + " and:" + str(t))
- target = self.getExistingTargetState(s, t)
- # print("Existing:" + str(target))
- if target is None:
- target = self.computeTargetState(input, s, t)
- # print("Computed:" + str(target))
- if target == self.ERROR:
- break
- # If this is a consumable input element, make sure to consume before
- # capturing the accept state so the input index, line, and char
- # position accurately reflect the state of the interpreter at the
- # end of the token.
- if t != Token.EOF:
- self.consume(input)
- if target.isAcceptState:
- self.captureSimState(self.prevAccept, input, target)
- if t == Token.EOF:
- break
- t = input.LA(1)
- s = target # flip; current DFA target becomes new src/from state
- return self.failOrAccept(self.prevAccept, input, s.configs, t)
- # Get an existing target state for an edge in the DFA. If the target state
- # for the edge has not yet been computed or is otherwise not available,
- # this method returns {@code null}.
- #
- # @param s The current DFA state
- # @param t The next input symbol
- # @return The existing target DFA state for the given input symbol
- # {@code t}, or {@code null} if the target state for this edge is not
- # already cached
- def getExistingTargetState(self, s:DFAState, t:int):
- if s.edges is None or t < self.MIN_DFA_EDGE or t > self.MAX_DFA_EDGE:
- return None
- target = s.edges[t - self.MIN_DFA_EDGE]
- if LexerATNSimulator.debug and target is not None:
- print("reuse state", str(s.stateNumber), "edge to", str(target.stateNumber))
- return target
- # Compute a target state for an edge in the DFA, and attempt to add the
- # computed state and corresponding edge to the DFA.
- #
- # @param input The input stream
- # @param s The current DFA state
- # @param t The next input symbol
- #
- # @return The computed target DFA state for the given input symbol
- # {@code t}. If {@code t} does not lead to a valid DFA state, this method
- # returns {@link #ERROR}.
- def computeTargetState(self, input:InputStream, s:DFAState, t:int):
- reach = OrderedATNConfigSet()
- # if we don't find an existing DFA state
- # Fill reach starting from closure, following t transitions
- self.getReachableConfigSet(input, s.configs, reach, t)
- if len(reach)==0: # we got nowhere on t from s
- if not reach.hasSemanticContext:
- # we got nowhere on t, don't throw out this knowledge; it'd
- # cause a failover from DFA later.
- self. addDFAEdge(s, t, self.ERROR)
- # stop when we can't match any more char
- return self.ERROR
- # Add an edge from s to target DFA found/created for reach
- return self.addDFAEdge(s, t, cfgs=reach)
- def failOrAccept(self, prevAccept:SimState , input:InputStream, reach:ATNConfigSet, t:int):
- if self.prevAccept.dfaState is not None:
- lexerActionExecutor = prevAccept.dfaState.lexerActionExecutor
- self.accept(input, lexerActionExecutor, self.startIndex, prevAccept.index, prevAccept.line, prevAccept.column)
- return prevAccept.dfaState.prediction
- else:
- # if no accept and EOF is first char, return EOF
- if t==Token.EOF and input.index==self.startIndex:
- return Token.EOF
- raise LexerNoViableAltException(self.recog, input, self.startIndex, reach)
- # Given a starting configuration set, figure out all ATN configurations
- # we can reach upon input {@code t}. Parameter {@code reach} is a return
- # parameter.
- def getReachableConfigSet(self, input:InputStream, closure:ATNConfigSet, reach:ATNConfigSet, t:int):
- # this is used to skip processing for configs which have a lower priority
- # than a config that already reached an accept state for the same rule
- skipAlt = ATN.INVALID_ALT_NUMBER
- for cfg in closure:
- currentAltReachedAcceptState = ( cfg.alt == skipAlt )
- if currentAltReachedAcceptState and cfg.passedThroughNonGreedyDecision:
- continue
- if LexerATNSimulator.debug:
- print("testing", self.getTokenName(t), "at", str(cfg))
- for trans in cfg.state.transitions: # for each transition
- target = self.getReachableTarget(trans, t)
- if target is not None:
- lexerActionExecutor = cfg.lexerActionExecutor
- if lexerActionExecutor is not None:
- lexerActionExecutor = lexerActionExecutor.fixOffsetBeforeMatch(input.index - self.startIndex)
- treatEofAsEpsilon = (t == Token.EOF)
- config = LexerATNConfig(state=target, lexerActionExecutor=lexerActionExecutor, config=cfg)
- if self.closure(input, config, reach, currentAltReachedAcceptState, True, treatEofAsEpsilon):
- # any remaining configs for this alt have a lower priority than
- # the one that just reached an accept state.
- skipAlt = cfg.alt
- def accept(self, input:InputStream, lexerActionExecutor:LexerActionExecutor, startIndex:int, index:int, line:int, charPos:int):
- if LexerATNSimulator.debug:
- print("ACTION", lexerActionExecutor)
- # seek to after last char in token
- input.seek(index)
- self.line = line
- self.column = charPos
- if lexerActionExecutor is not None and self.recog is not None:
- lexerActionExecutor.execute(self.recog, input, startIndex)
- def getReachableTarget(self, trans:Transition, t:int):
- if trans.matches(t, 0, self.MAX_CHAR_VALUE):
- return trans.target
- else:
- return None
- def computeStartState(self, input:InputStream, p:ATNState):
- initialContext = PredictionContext.EMPTY
- configs = OrderedATNConfigSet()
- for i in range(0,len(p.transitions)):
- target = p.transitions[i].target
- c = LexerATNConfig(state=target, alt=i+1, context=initialContext)
- self.closure(input, c, configs, False, False, False)
- return configs
- # Since the alternatives within any lexer decision are ordered by
- # preference, this method stops pursuing the closure as soon as an accept
- # state is reached. After the first accept state is reached by depth-first
- # search from {@code config}, all other (potentially reachable) states for
- # this rule would have a lower priority.
- #
- # @return {@code true} if an accept state is reached, otherwise
- # {@code false}.
- def closure(self, input:InputStream, config:LexerATNConfig, configs:ATNConfigSet, currentAltReachedAcceptState:bool,
- speculative:bool, treatEofAsEpsilon:bool):
- if LexerATNSimulator.debug:
- print("closure(" + str(config) + ")")
- if isinstance( config.state, RuleStopState ):
- if LexerATNSimulator.debug:
- if self.recog is not None:
- print("closure at", self.recog.symbolicNames[config.state.ruleIndex], "rule stop", str(config))
- else:
- print("closure at rule stop", str(config))
- if config.context is None or config.context.hasEmptyPath():
- if config.context is None or config.context.isEmpty():
- configs.add(config)
- return True
- else:
- configs.add(LexerATNConfig(state=config.state, config=config, context=PredictionContext.EMPTY))
- currentAltReachedAcceptState = True
- if config.context is not None and not config.context.isEmpty():
- for i in range(0,len(config.context)):
- if config.context.getReturnState(i) != PredictionContext.EMPTY_RETURN_STATE:
- newContext = config.context.getParent(i) # "pop" return state
- returnState = self.atn.states[config.context.getReturnState(i)]
- c = LexerATNConfig(state=returnState, config=config, context=newContext)
- currentAltReachedAcceptState = self.closure(input, c, configs,
- currentAltReachedAcceptState, speculative, treatEofAsEpsilon)
- return currentAltReachedAcceptState
- # optimization
- if not config.state.epsilonOnlyTransitions:
- if not currentAltReachedAcceptState or not config.passedThroughNonGreedyDecision:
- configs.add(config)
- for t in config.state.transitions:
- c = self.getEpsilonTarget(input, config, t, configs, speculative, treatEofAsEpsilon)
- if c is not None:
- currentAltReachedAcceptState = self.closure(input, c, configs, currentAltReachedAcceptState, speculative, treatEofAsEpsilon)
- return currentAltReachedAcceptState
- # side-effect: can alter configs.hasSemanticContext
- def getEpsilonTarget(self, input:InputStream, config:LexerATNConfig, t:Transition, configs:ATNConfigSet,
- speculative:bool, treatEofAsEpsilon:bool):
- c = None
- if t.serializationType==Transition.RULE:
- newContext = SingletonPredictionContext.create(config.context, t.followState.stateNumber)
- c = LexerATNConfig(state=t.target, config=config, context=newContext)
- elif t.serializationType==Transition.PRECEDENCE:
- raise UnsupportedOperationException("Precedence predicates are not supported in lexers.")
- elif t.serializationType==Transition.PREDICATE:
- # Track traversing semantic predicates. If we traverse,
- # we cannot add a DFA state for this "reach" computation
- # because the DFA would not test the predicate again in the
- # future. Rather than creating collections of semantic predicates
- # like v3 and testing them on prediction, v4 will test them on the
- # fly all the time using the ATN not the DFA. This is slower but
- # semantically it's not used that often. One of the key elements to
- # this predicate mechanism is not adding DFA states that see
- # predicates immediately afterwards in the ATN. For example,
- # a : ID {p1}? | ID {p2}? ;
- # should create the start state for rule 'a' (to save start state
- # competition), but should not create target of ID state. The
- # collection of ATN states the following ID references includes
- # states reached by traversing predicates. Since this is when we
- # test them, we cannot cash the DFA state target of ID.
- if LexerATNSimulator.debug:
- print("EVAL rule "+ str(t.ruleIndex) + ":" + str(t.predIndex))
- configs.hasSemanticContext = True
- if self.evaluatePredicate(input, t.ruleIndex, t.predIndex, speculative):
- c = LexerATNConfig(state=t.target, config=config)
- elif t.serializationType==Transition.ACTION:
- if config.context is None or config.context.hasEmptyPath():
- # execute actions anywhere in the start rule for a token.
- #
- # TODO: if the entry rule is invoked recursively, some
- # actions may be executed during the recursive call. The
- # problem can appear when hasEmptyPath() is true but
- # isEmpty() is false. In this case, the config needs to be
- # split into two contexts - one with just the empty path
- # and another with everything but the empty path.
- # Unfortunately, the current algorithm does not allow
- # getEpsilonTarget to return two configurations, so
- # additional modifications are needed before we can support
- # the split operation.
- lexerActionExecutor = LexerActionExecutor.append(config.lexerActionExecutor,
- self.atn.lexerActions[t.actionIndex])
- c = LexerATNConfig(state=t.target, config=config, lexerActionExecutor=lexerActionExecutor)
- else:
- # ignore actions in referenced rules
- c = LexerATNConfig(state=t.target, config=config)
- elif t.serializationType==Transition.EPSILON:
- c = LexerATNConfig(state=t.target, config=config)
- elif t.serializationType in [ Transition.ATOM, Transition.RANGE, Transition.SET ]:
- if treatEofAsEpsilon:
- if t.matches(Token.EOF, 0, self.MAX_CHAR_VALUE):
- c = LexerATNConfig(state=t.target, config=config)
- return c
- # Evaluate a predicate specified in the lexer.
- #
- # <p>If {@code speculative} is {@code true}, this method was called before
- # {@link #consume} for the matched character. This method should call
- # {@link #consume} before evaluating the predicate to ensure position
- # sensitive values, including {@link Lexer#getText}, {@link Lexer#getLine},
- # and {@link Lexer#getcolumn}, properly reflect the current
- # lexer state. This method should restore {@code input} and the simulator
- # to the original state before returning (i.e. undo the actions made by the
- # call to {@link #consume}.</p>
- #
- # @param input The input stream.
- # @param ruleIndex The rule containing the predicate.
- # @param predIndex The index of the predicate within the rule.
- # @param speculative {@code true} if the current index in {@code input} is
- # one character before the predicate's location.
- #
- # @return {@code true} if the specified predicate evaluates to
- # {@code true}.
- #/
- def evaluatePredicate(self, input:InputStream, ruleIndex:int, predIndex:int, speculative:bool):
- # assume true if no recognizer was provided
- if self.recog is None:
- return True
- if not speculative:
- return self.recog.sempred(None, ruleIndex, predIndex)
- savedcolumn = self.column
- savedLine = self.line
- index = input.index
- marker = input.mark()
- try:
- self.consume(input)
- return self.recog.sempred(None, ruleIndex, predIndex)
- finally:
- self.column = savedcolumn
- self.line = savedLine
- input.seek(index)
- input.release(marker)
- def captureSimState(self, settings:SimState, input:InputStream, dfaState:DFAState):
- settings.index = input.index
- settings.line = self.line
- settings.column = self.column
- settings.dfaState = dfaState
- def addDFAEdge(self, from_:DFAState, tk:int, to:DFAState=None, cfgs:ATNConfigSet=None) -> DFAState:
- if to is None and cfgs is not None:
- # leading to this call, ATNConfigSet.hasSemanticContext is used as a
- # marker indicating dynamic predicate evaluation makes this edge
- # dependent on the specific input sequence, so the static edge in the
- # DFA should be omitted. The target DFAState is still created since
- # execATN has the ability to resynchronize with the DFA state cache
- # following the predicate evaluation step.
- #
- # TJP notes: next time through the DFA, we see a pred again and eval.
- # If that gets us to a previously created (but dangling) DFA
- # state, we can continue in pure DFA mode from there.
- #/
- suppressEdge = cfgs.hasSemanticContext
- cfgs.hasSemanticContext = False
- to = self.addDFAState(cfgs)
- if suppressEdge:
- return to
- # add the edge
- if tk < self.MIN_DFA_EDGE or tk > self.MAX_DFA_EDGE:
- # Only track edges within the DFA bounds
- return to
- if LexerATNSimulator.debug:
- print("EDGE " + str(from_) + " -> " + str(to) + " upon "+ chr(tk))
- if from_.edges is None:
- # make room for tokens 1..n and -1 masquerading as index 0
- from_.edges = [ None ] * (self.MAX_DFA_EDGE - self.MIN_DFA_EDGE + 1)
- from_.edges[tk - self.MIN_DFA_EDGE] = to # connect
- return to
- # Add a new DFA state if there isn't one with this set of
- # configurations already. This method also detects the first
- # configuration containing an ATN rule stop state. Later, when
- # traversing the DFA, we will know which rule to accept.
- def addDFAState(self, configs:ATNConfigSet) -> DFAState:
- proposed = DFAState(configs=configs)
- firstConfigWithRuleStopState = next((cfg for cfg in configs if isinstance(cfg.state, RuleStopState)), None)
- if firstConfigWithRuleStopState is not None:
- proposed.isAcceptState = True
- proposed.lexerActionExecutor = firstConfigWithRuleStopState.lexerActionExecutor
- proposed.prediction = self.atn.ruleToTokenType[firstConfigWithRuleStopState.state.ruleIndex]
- dfa = self.decisionToDFA[self.mode]
- existing = dfa.states.get(proposed, None)
- if existing is not None:
- return existing
- newState = proposed
- newState.stateNumber = len(dfa.states)
- configs.setReadonly(True)
- newState.configs = configs
- dfa.states[newState] = newState
- return newState
- def getDFA(self, mode:int):
- return self.decisionToDFA[mode]
- # Get the text matched so far for the current token.
- def getText(self, input:InputStream):
- # index is first lookahead char, don't include.
- return input.getText(self.startIndex, input.index-1)
- def consume(self, input:InputStream):
- curChar = input.LA(1)
- if curChar==ord('\n'):
- self.line += 1
- self.column = 0
- else:
- self.column += 1
- input.consume()
- def getTokenName(self, t:int):
- if t==-1:
- return "EOF"
- else:
- return "'" + chr(t) + "'"
- LexerATNSimulator.ERROR = DFAState(0x7FFFFFFF, ATNConfigSet())
- del Lexer
|