util.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. # util.py
  2. import contextlib
  3. import re
  4. from functools import lru_cache, wraps
  5. import inspect
  6. import itertools
  7. import types
  8. from typing import Callable, Union, Iterable, TypeVar, cast, Any
  9. import warnings
  10. from .warnings import PyparsingDeprecationWarning, PyparsingDiagnosticWarning
  11. _bslash = chr(92)
  12. C = TypeVar("C", bound=Callable)
  13. class __config_flags:
  14. """Internal class for defining compatibility and debugging flags"""
  15. _all_names: list[str] = []
  16. _fixed_names: list[str] = []
  17. _type_desc = "configuration"
  18. @classmethod
  19. def _set(cls, dname, value):
  20. if dname in cls._fixed_names:
  21. warnings.warn(
  22. f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}"
  23. f" and cannot be overridden",
  24. PyparsingDiagnosticWarning,
  25. stacklevel=3,
  26. )
  27. return
  28. if dname in cls._all_names:
  29. setattr(cls, dname, value)
  30. else:
  31. raise ValueError(f"no such {cls._type_desc} {dname!r}")
  32. enable = classmethod(lambda cls, name: cls._set(name, True))
  33. disable = classmethod(lambda cls, name: cls._set(name, False))
  34. @lru_cache(maxsize=128)
  35. def col(loc: int, strg: str) -> int:
  36. """
  37. Returns current column within a string, counting newlines as line separators.
  38. The first column is number 1.
  39. Note: the default parsing behavior is to expand tabs in the input string
  40. before starting the parsing process. See
  41. :meth:`ParserElement.parse_string` for more
  42. information on parsing strings containing ``<TAB>`` s, and suggested
  43. methods to maintain a consistent view of the parsed string, the parse
  44. location, and line and column positions within the parsed string.
  45. """
  46. s = strg
  47. return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
  48. @lru_cache(maxsize=128)
  49. def lineno(loc: int, strg: str) -> int:
  50. """Returns current line number within a string, counting newlines as line separators.
  51. The first line is number 1.
  52. Note - the default parsing behavior is to expand tabs in the input string
  53. before starting the parsing process. See :meth:`ParserElement.parse_string`
  54. for more information on parsing strings containing ``<TAB>`` s, and
  55. suggested methods to maintain a consistent view of the parsed string, the
  56. parse location, and line and column positions within the parsed string.
  57. """
  58. return strg.count("\n", 0, loc) + 1
  59. @lru_cache(maxsize=128)
  60. def line(loc: int, strg: str) -> str:
  61. """
  62. Returns the line of text containing loc within a string, counting newlines as line separators.
  63. """
  64. last_cr = strg.rfind("\n", 0, loc)
  65. next_cr = strg.find("\n", loc)
  66. return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
  67. class _UnboundedCache:
  68. def __init__(self):
  69. cache = {}
  70. cache_get = cache.get
  71. self.not_in_cache = not_in_cache = object()
  72. def get(_, key):
  73. return cache_get(key, not_in_cache)
  74. def set_(_, key, value):
  75. cache[key] = value
  76. def clear(_):
  77. cache.clear()
  78. self.size = None
  79. self.get = types.MethodType(get, self)
  80. self.set = types.MethodType(set_, self)
  81. self.clear = types.MethodType(clear, self)
  82. class _FifoCache:
  83. def __init__(self, size):
  84. cache = {}
  85. self.size = size
  86. self.not_in_cache = not_in_cache = object()
  87. cache_get = cache.get
  88. cache_pop = cache.pop
  89. def get(_, key):
  90. return cache_get(key, not_in_cache)
  91. def set_(_, key, value):
  92. cache[key] = value
  93. while len(cache) > size:
  94. # pop oldest element in cache by getting the first key
  95. cache_pop(next(iter(cache)))
  96. def clear(_):
  97. cache.clear()
  98. self.get = types.MethodType(get, self)
  99. self.set = types.MethodType(set_, self)
  100. self.clear = types.MethodType(clear, self)
  101. class LRUMemo:
  102. """
  103. A memoizing mapping that retains `capacity` deleted items
  104. The memo tracks retained items by their access order; once `capacity` items
  105. are retained, the least recently used item is discarded.
  106. """
  107. def __init__(self, capacity):
  108. self._capacity = capacity
  109. self._active = {}
  110. self._memory = {}
  111. def __getitem__(self, key):
  112. try:
  113. return self._active[key]
  114. except KeyError:
  115. self._memory[key] = self._memory.pop(key)
  116. return self._memory[key]
  117. def __setitem__(self, key, value):
  118. self._memory.pop(key, None)
  119. self._active[key] = value
  120. def __delitem__(self, key):
  121. try:
  122. value = self._active.pop(key)
  123. except KeyError:
  124. pass
  125. else:
  126. oldest_keys = list(self._memory)[: -(self._capacity + 1)]
  127. for key_to_delete in oldest_keys:
  128. self._memory.pop(key_to_delete)
  129. self._memory[key] = value
  130. def clear(self):
  131. self._active.clear()
  132. self._memory.clear()
  133. class UnboundedMemo(dict):
  134. """
  135. A memoizing mapping that retains all deleted items
  136. """
  137. def __delitem__(self, key):
  138. pass
  139. def _escape_regex_range_chars(s: str) -> str:
  140. # escape these chars: ^-[]
  141. for c in r"\^-[]":
  142. s = s.replace(c, _bslash + c)
  143. s = s.replace("\n", r"\n")
  144. s = s.replace("\t", r"\t")
  145. return str(s)
  146. class _GroupConsecutive:
  147. """
  148. Used as a callable `key` for itertools.groupby to group
  149. characters that are consecutive:
  150. .. testcode::
  151. from itertools import groupby
  152. from pyparsing.util import _GroupConsecutive
  153. grouped = groupby("abcdejkmpqrs", key=_GroupConsecutive())
  154. for index, group in grouped:
  155. print(tuple([index, list(group)]))
  156. prints:
  157. .. testoutput::
  158. (0, ['a', 'b', 'c', 'd', 'e'])
  159. (1, ['j', 'k'])
  160. (2, ['m'])
  161. (3, ['p', 'q', 'r', 's'])
  162. """
  163. def __init__(self) -> None:
  164. self.prev = 0
  165. self.counter = itertools.count()
  166. self.value = -1
  167. def __call__(self, char: str) -> int:
  168. c_int = ord(char)
  169. self.prev, prev = c_int, self.prev
  170. if c_int - prev > 1:
  171. self.value = next(self.counter)
  172. return self.value
  173. def _is_iterable(obj, _str_type=(str, bytes), _iter_exception=Exception):
  174. # str's are iterable, but in pyparsing, we don't want to iterate over them
  175. if isinstance(obj, _str_type):
  176. return False
  177. try:
  178. iter(obj)
  179. except _iter_exception: # noqa
  180. return False
  181. else:
  182. return True
  183. def _escape_re_range_char(c: str) -> str:
  184. return fr"\{c}" if c in r"\^-][" else c
  185. def _collapse_string_to_ranges(
  186. s: Union[str, Iterable[str]], re_escape: bool = True
  187. ) -> str:
  188. r"""
  189. Take a string or list of single-character strings, and return
  190. a string of the consecutive characters in that string collapsed
  191. into groups, as might be used in a regular expression '[a-z]'
  192. character set::
  193. 'a' -> 'a' -> '[a]'
  194. 'bc' -> 'bc' -> '[bc]'
  195. 'defgh' -> 'd-h' -> '[d-h]'
  196. 'fdgeh' -> 'd-h' -> '[d-h]'
  197. 'jklnpqrtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]'
  198. Duplicates get collapsed out::
  199. 'aaa' -> 'a' -> '[a]'
  200. 'bcbccb' -> 'bc' -> '[bc]'
  201. 'defghhgf' -> 'd-h' -> '[d-h]'
  202. 'jklnpqrjjjtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]'
  203. Spaces are preserved::
  204. 'ab c' -> ' a-c' -> '[ a-c]'
  205. Characters that are significant when defining regex ranges
  206. get escaped::
  207. 'acde[]-' -> r'\-\[\]ac-e' -> r'[\-\[\]ac-e]'
  208. """
  209. # Developer notes:
  210. # - Do not optimize this code assuming that the given input string
  211. # or internal lists will be short (such as in loading generators into
  212. # lists to make it easier to find the last element); this method is also
  213. # used to generate regex ranges for character sets in the pyparsing.unicode
  214. # classes, and these can be _very_ long lists of strings
  215. escape_re_range_char: Callable[[str], str]
  216. if re_escape:
  217. escape_re_range_char = _escape_re_range_char
  218. else:
  219. escape_re_range_char = lambda ss: ss
  220. ret = []
  221. # reduce input string to remove duplicates, and put in sorted order
  222. s_chars: list[str] = sorted(set(s))
  223. if len(s_chars) > 2:
  224. # find groups of characters that are consecutive (can be collapsed
  225. # down to "<first>-<last>")
  226. for _, chars in itertools.groupby(s_chars, key=_GroupConsecutive()):
  227. # _ is unimportant, is just used to identify groups
  228. # chars is an iterator of one or more consecutive characters
  229. # that comprise the current group
  230. first = last = next(chars)
  231. with contextlib.suppress(ValueError):
  232. *_, last = chars
  233. if first == last:
  234. # there was only a single char in this group
  235. ret.append(escape_re_range_char(first))
  236. elif last == chr(ord(first) + 1):
  237. # there were only 2 characters in this group
  238. # 'a','b' -> 'ab'
  239. ret.append(f"{escape_re_range_char(first)}{escape_re_range_char(last)}")
  240. else:
  241. # there were > 2 characters in this group, make into a range
  242. # 'c','d','e' -> 'c-e'
  243. ret.append(
  244. f"{escape_re_range_char(first)}-{escape_re_range_char(last)}"
  245. )
  246. else:
  247. # only 1 or 2 chars were given to form into groups
  248. # 'a' -> ['a']
  249. # 'bc' -> ['b', 'c']
  250. # 'dg' -> ['d', 'g']
  251. # no need to list them with "-", just return as a list
  252. # (after escaping)
  253. ret = [escape_re_range_char(c) for c in s_chars]
  254. return "".join(ret)
  255. def _flatten(ll: Iterable) -> list:
  256. ret = []
  257. for i in ll:
  258. # Developer notes:
  259. # - do not collapse this section of code, isinstance checks are done
  260. # in optimal order
  261. if isinstance(i, str):
  262. ret.append(i)
  263. elif isinstance(i, Iterable):
  264. ret.extend(_flatten(i))
  265. else:
  266. ret.append(i)
  267. return ret
  268. def _convert_escaped_numerics_to_char(s: str) -> str:
  269. if s == "0":
  270. return "\0"
  271. if s.isdigit() and len(s) == 3:
  272. return chr(int(s, 8))
  273. elif s.startswith(("u", "x")):
  274. return chr(int(s[1:], 16))
  275. return s
  276. def make_compressed_re(
  277. word_list: Iterable[str],
  278. max_level: int = 2,
  279. *,
  280. non_capturing_groups: bool = True,
  281. _level: int = 1,
  282. ) -> str:
  283. """
  284. Create a regular expression string from a list of words, collapsing by common
  285. prefixes and optional suffixes.
  286. Calls itself recursively to build nested sublists for each group of suffixes
  287. that have a shared prefix.
  288. """
  289. def get_suffixes_from_common_prefixes(namelist: list[str]):
  290. if len(namelist) > 1:
  291. for prefix, suffixes in itertools.groupby(namelist, key=lambda s: s[:1]):
  292. yield prefix, sorted([s[1:] for s in suffixes], key=len, reverse=True)
  293. else:
  294. yield namelist[0][0], [namelist[0][1:]]
  295. if _level == 1:
  296. if not word_list:
  297. raise ValueError("no words given to make_compressed_re()")
  298. if "" in word_list:
  299. raise ValueError("word list cannot contain empty string")
  300. else:
  301. # internal recursive call, just return empty string if no words
  302. if not word_list:
  303. return ""
  304. # dedupe the word list
  305. word_list = list({}.fromkeys(word_list))
  306. if max_level == 0:
  307. if any(len(wd) > 1 for wd in word_list):
  308. return "|".join(
  309. sorted([re.escape(wd) for wd in word_list], key=len, reverse=True)
  310. )
  311. else:
  312. return f"[{''.join(_escape_regex_range_chars(wd) for wd in word_list)}]"
  313. ret = []
  314. sep = ""
  315. ncgroup = "?:" if non_capturing_groups else ""
  316. for initial, suffixes in get_suffixes_from_common_prefixes(sorted(word_list)):
  317. ret.append(sep)
  318. sep = "|"
  319. initial = re.escape(initial)
  320. trailing = ""
  321. if "" in suffixes:
  322. trailing = "?"
  323. suffixes.remove("")
  324. if len(suffixes) > 1:
  325. if all(len(s) == 1 for s in suffixes):
  326. ret.append(
  327. f"{initial}[{''.join(_escape_regex_range_chars(s) for s in suffixes)}]{trailing}"
  328. )
  329. else:
  330. if _level < max_level:
  331. suffix_re = make_compressed_re(
  332. sorted(suffixes),
  333. max_level,
  334. non_capturing_groups=non_capturing_groups,
  335. _level=_level + 1,
  336. )
  337. ret.append(f"{initial}({ncgroup}{suffix_re}){trailing}")
  338. else:
  339. if all(len(s) == 1 for s in suffixes):
  340. ret.append(
  341. f"{initial}[{''.join(_escape_regex_range_chars(s) for s in suffixes)}]{trailing}"
  342. )
  343. else:
  344. suffixes.sort(key=len, reverse=True)
  345. ret.append(
  346. f"{initial}({ncgroup}{'|'.join(re.escape(s) for s in suffixes)}){trailing}"
  347. )
  348. else:
  349. if suffixes:
  350. suffix = re.escape(suffixes[0])
  351. if len(suffix) > 1 and trailing:
  352. ret.append(f"{initial}({ncgroup}{suffix}){trailing}")
  353. else:
  354. ret.append(f"{initial}{suffix}{trailing}")
  355. else:
  356. ret.append(initial)
  357. return "".join(ret)
  358. def replaced_by_pep8(compat_name: str, fn: C) -> C:
  359. # Unwrap staticmethod/classmethod
  360. fn = getattr(fn, "__func__", fn)
  361. # (Presence of 'self' arg in signature is used by explain_exception() methods, so we take
  362. # some extra steps to add it if present in decorated function.)
  363. if ["self"] == list(inspect.signature(fn).parameters)[:1]:
  364. @wraps(fn)
  365. def _inner(self, *args, **kwargs):
  366. warnings.warn(
  367. f"{compat_name!r} deprecated - use {fn.__name__!r}",
  368. PyparsingDeprecationWarning,
  369. stacklevel=2,
  370. )
  371. return fn(self, *args, **kwargs)
  372. else:
  373. @wraps(fn)
  374. def _inner(*args, **kwargs):
  375. warnings.warn(
  376. f"{compat_name!r} deprecated - use {fn.__name__!r}",
  377. PyparsingDeprecationWarning,
  378. stacklevel=2,
  379. )
  380. return fn(*args, **kwargs)
  381. _inner.__doc__ = f"""
  382. .. deprecated:: 3.0.0
  383. Use :class:`{fn.__name__}` instead
  384. """
  385. _inner.__name__ = compat_name
  386. _inner.__annotations__ = fn.__annotations__
  387. if isinstance(fn, types.FunctionType):
  388. _inner.__kwdefaults__ = fn.__kwdefaults__ # type: ignore [attr-defined]
  389. elif isinstance(fn, type) and hasattr(fn, "__init__"):
  390. _inner.__kwdefaults__ = fn.__init__.__kwdefaults__ # type: ignore [misc,attr-defined]
  391. else:
  392. _inner.__kwdefaults__ = None # type: ignore [attr-defined]
  393. _inner.__qualname__ = fn.__qualname__
  394. return cast(C, _inner)
  395. def _to_pep8_name(s: str, _re_sub_pattern=re.compile(r"([a-z])([A-Z])")) -> str:
  396. s = _re_sub_pattern.sub(r"\1_\2", s)
  397. return s.lower()
  398. def deprecate_argument(
  399. kwargs: dict[str, Any], arg_name: str, default_value=None, *, new_name: str = ""
  400. ) -> Any:
  401. if arg_name in kwargs:
  402. new_name = new_name or _to_pep8_name(arg_name)
  403. warnings.warn(
  404. f"{arg_name!r} argument is deprecated, use {new_name!r}",
  405. category=PyparsingDeprecationWarning,
  406. stacklevel=3,
  407. )
  408. else:
  409. kwargs[arg_name] = default_value
  410. return kwargs[arg_name]