| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- from copy import deepcopy, copy
- from typing import Dict, Any, Generic, List
- from ..lexer import Token, LexerThread
- from ..common import ParserCallbacks
- from .lalr_analysis import Shift, ParseTableBase, StateT
- from lark.exceptions import UnexpectedToken
- ###{standalone
- class ParseConf(Generic[StateT]):
- __slots__ = 'parse_table', 'callbacks', 'start', 'start_state', 'end_state', 'states'
- parse_table: ParseTableBase[StateT]
- callbacks: ParserCallbacks
- start: str
- start_state: StateT
- end_state: StateT
- states: Dict[StateT, Dict[str, tuple]]
- def __init__(self, parse_table: ParseTableBase[StateT], callbacks: ParserCallbacks, start: str):
- self.parse_table = parse_table
- self.start_state = self.parse_table.start_states[start]
- self.end_state = self.parse_table.end_states[start]
- self.states = self.parse_table.states
- self.callbacks = callbacks
- self.start = start
- class ParserState(Generic[StateT]):
- __slots__ = 'parse_conf', 'lexer', 'state_stack', 'value_stack'
- parse_conf: ParseConf[StateT]
- lexer: LexerThread
- state_stack: List[StateT]
- value_stack: list
- def __init__(self, parse_conf: ParseConf[StateT], lexer: LexerThread, state_stack=None, value_stack=None):
- self.parse_conf = parse_conf
- self.lexer = lexer
- self.state_stack = state_stack or [self.parse_conf.start_state]
- self.value_stack = value_stack or []
- @property
- def position(self) -> StateT:
- return self.state_stack[-1]
- # Necessary for match_examples() to work
- def __eq__(self, other) -> bool:
- if not isinstance(other, ParserState):
- return NotImplemented
- return len(self.state_stack) == len(other.state_stack) and self.position == other.position
- def __copy__(self):
- return self.copy()
- def copy(self, deepcopy_values=True) -> 'ParserState[StateT]':
- return type(self)(
- self.parse_conf,
- self.lexer, # XXX copy
- copy(self.state_stack),
- deepcopy(self.value_stack) if deepcopy_values else copy(self.value_stack),
- )
- def feed_token(self, token: Token, is_end=False) -> Any:
- state_stack = self.state_stack
- value_stack = self.value_stack
- states = self.parse_conf.states
- end_state = self.parse_conf.end_state
- callbacks = self.parse_conf.callbacks
- while True:
- state = state_stack[-1]
- try:
- action, arg = states[state][token.type]
- except KeyError:
- expected = {s for s in states[state].keys() if s.isupper()}
- raise UnexpectedToken(token, expected, state=self, interactive_parser=None)
- assert arg != end_state
- if action is Shift:
- # shift once and return
- assert not is_end
- state_stack.append(arg)
- value_stack.append(token if token.type not in callbacks else callbacks[token.type](token))
- return
- else:
- # reduce+shift as many times as necessary
- rule = arg
- size = len(rule.expansion)
- if size:
- s = value_stack[-size:]
- del state_stack[-size:]
- del value_stack[-size:]
- else:
- s = []
- value = callbacks[rule](s) if callbacks else s
- _action, new_state = states[state_stack[-1]][rule.origin.name]
- assert _action is Shift
- state_stack.append(new_state)
- value_stack.append(value)
- if is_end and state_stack[-1] == end_state:
- return value_stack[-1]
- ###}
|