| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- import re
- import threading
- from typing import Any
- from antlr4 import CommonTokenStream, InputStream, ParserRuleContext
- from antlr4.error.ErrorListener import ErrorListener
- from .errors import GrammarParseError
- # Import from visitor in order to check the presence of generated grammar files
- # files in a single place.
- from .grammar_visitor import ( # type: ignore
- OmegaConfGrammarLexer,
- OmegaConfGrammarParser,
- )
- # Used to cache grammar objects to avoid re-creating them on each call to `parse()`.
- # We use a per-thread cache to make it thread-safe.
- _grammar_cache = threading.local()
- # Build regex pattern to efficiently identify typical interpolations.
- # See test `test_match_simple_interpolation_pattern` for examples.
- _config_key = r"[$\w]+" # foo, $0, $bar, $foo_$bar123$
- _key_maybe_brackets = f"{_config_key}|\\[{_config_key}\\]" # foo, [foo], [$bar]
- _node_access = f"\\.{_key_maybe_brackets}" # .foo, [foo], [$bar]
- _node_path = f"(\\.)*({_key_maybe_brackets})({_node_access})*" # [foo].bar, .foo[bar]
- _node_inter = f"\\${{\\s*{_node_path}\\s*}}" # node interpolation ${foo.bar}
- _id = "[a-zA-Z_][\\w\\-]*" # foo, foo_bar, foo-bar, abc123
- _resolver_name = f"({_id}(\\.{_id})*)?" # foo, ns.bar3, ns_1.ns_2.b0z
- _arg = r"[a-zA-Z_0-9/\-\+.$%*@?|]+" # string representing a resolver argument
- _args = f"{_arg}(\\s*,\\s*{_arg})*" # list of resolver arguments
- _resolver_inter = f"\\${{\\s*{_resolver_name}\\s*:\\s*{_args}?\\s*}}" # ${foo:bar}
- _inter = f"({_node_inter}|{_resolver_inter})" # any kind of interpolation
- _outer = "([^$]|\\$(?!{))+" # any character except $ (unless not followed by {)
- SIMPLE_INTERPOLATION_PATTERN = re.compile(
- f"({_outer})?({_inter}({_outer})?)+$", flags=re.ASCII
- )
- # NOTE: SIMPLE_INTERPOLATION_PATTERN must not generate false positive matches:
- # it must not accept anything that isn't a valid interpolation (per the
- # interpolation grammar defined in `omegaconf/grammar/*.g4`).
- class OmegaConfErrorListener(ErrorListener): # type: ignore
- def syntaxError(
- self,
- recognizer: Any,
- offending_symbol: Any,
- line: Any,
- column: Any,
- msg: Any,
- e: Any,
- ) -> None:
- raise GrammarParseError(str(e) if msg is None else msg) from e
- def reportAmbiguity(
- self,
- recognizer: Any,
- dfa: Any,
- startIndex: Any,
- stopIndex: Any,
- exact: Any,
- ambigAlts: Any,
- configs: Any,
- ) -> None:
- raise GrammarParseError("ANTLR error: Ambiguity") # pragma: no cover
- def reportAttemptingFullContext(
- self,
- recognizer: Any,
- dfa: Any,
- startIndex: Any,
- stopIndex: Any,
- conflictingAlts: Any,
- configs: Any,
- ) -> None:
- # Note: for now we raise an error to be safe. However this is mostly a
- # performance warning, so in the future this may be relaxed if we need
- # to change the grammar in such a way that this warning cannot be
- # avoided (another option would be to switch to SLL parsing mode).
- raise GrammarParseError(
- "ANTLR error: Attempting Full Context"
- ) # pragma: no cover
- def reportContextSensitivity(
- self,
- recognizer: Any,
- dfa: Any,
- startIndex: Any,
- stopIndex: Any,
- prediction: Any,
- configs: Any,
- ) -> None:
- raise GrammarParseError("ANTLR error: ContextSensitivity") # pragma: no cover
- def parse(
- value: str, parser_rule: str = "configValue", lexer_mode: str = "DEFAULT_MODE"
- ) -> ParserRuleContext:
- """
- Parse interpolated string `value` (and return the parse tree).
- """
- l_mode = getattr(OmegaConfGrammarLexer, lexer_mode)
- istream = InputStream(value)
- cached = getattr(_grammar_cache, "data", None)
- if cached is None:
- error_listener = OmegaConfErrorListener()
- lexer = OmegaConfGrammarLexer(istream)
- lexer.removeErrorListeners()
- lexer.addErrorListener(error_listener)
- lexer.mode(l_mode)
- token_stream = CommonTokenStream(lexer)
- parser = OmegaConfGrammarParser(token_stream)
- parser.removeErrorListeners()
- parser.addErrorListener(error_listener)
- # The two lines below could be enabled in the future if we decide to switch
- # to SLL prediction mode. Warning though, it has not been fully tested yet!
- # from antlr4 import PredictionMode
- # parser._interp.predictionMode = PredictionMode.SLL
- # Note that although the input stream `istream` is implicitly cached within
- # the lexer, it will be replaced by a new input next time the lexer is re-used.
- _grammar_cache.data = lexer, token_stream, parser
- else:
- lexer, token_stream, parser = cached
- # Replace the old input stream with the new one.
- lexer.inputStream = istream
- # Initialize the lexer / token stream / parser to process the new input.
- lexer.mode(l_mode)
- token_stream.setTokenSource(lexer)
- parser.reset()
- try:
- return getattr(parser, parser_rule)()
- except Exception as exc:
- if type(exc) is Exception and str(exc) == "Empty Stack":
- # This exception is raised by antlr when trying to pop a mode while
- # no mode has been pushed. We convert it into an `GrammarParseError`
- # to facilitate exception handling from the caller.
- raise GrammarParseError("Empty Stack")
- else:
- raise
|