builtins.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. """
  2. Python polyfills for builtins
  3. """
  4. from __future__ import annotations
  5. import builtins
  6. import functools
  7. import operator
  8. from collections.abc import Callable
  9. from typing import TYPE_CHECKING, TypeVar
  10. from ..decorators import substitute_in_graph
  11. if TYPE_CHECKING:
  12. from collections.abc import Iterable
  13. __all__ = [
  14. "all",
  15. "any",
  16. "enumerate",
  17. "sum",
  18. ]
  19. _T = TypeVar("_T")
  20. @substitute_in_graph(builtins.all, can_constant_fold_through=True)
  21. def all(iterable: Iterable[object], /) -> bool:
  22. for elem in iterable:
  23. if not elem:
  24. return False
  25. return True
  26. @substitute_in_graph(builtins.any, can_constant_fold_through=True)
  27. def any(iterable: Iterable[object], /) -> bool:
  28. for elem in iterable:
  29. if elem:
  30. return True
  31. return False
  32. @substitute_in_graph(builtins.enumerate, is_embedded_type=True) # type: ignore[arg-type]
  33. def enumerate(iterable: Iterable[_T], start: int = 0) -> Iterable[tuple[int, _T]]:
  34. if not isinstance(start, int):
  35. raise TypeError(
  36. f"{type(start).__name__!r} object cannot be interpreted as an integer"
  37. )
  38. for x in iterable:
  39. yield start, x
  40. start += 1
  41. @substitute_in_graph(builtins.sum, can_constant_fold_through=True) # type: ignore[arg-type]
  42. def sum(iterable: Iterable[_T], /, start: _T = 0) -> _T: # type: ignore[assignment]
  43. return functools.reduce(operator.add, iterable, start)
  44. class _CallableIterator:
  45. def __init__(self, fn, sentinel): # type: ignore[no-untyped-def]
  46. self.fn = fn
  47. self.sentinel = sentinel
  48. def __iter__(self): # type: ignore[no-untyped-def]
  49. return self
  50. def __next__(self): # type: ignore[no-untyped-def]
  51. # The iterator created in this case will call object with no arguments
  52. # for each call to its __next__() method;
  53. r = self.fn()
  54. # If the value returned is equal to sentinel, StopIteration will be raised
  55. if r == self.sentinel:
  56. raise StopIteration
  57. # otherwise the value will be returned.
  58. return r
  59. _sentinel_missing = object()
  60. # TODO(guilhermeleobas): use substitute_in_graph for iter()
  61. def iter_(fn_or_iterable, sentinel=_sentinel_missing, /): # type: ignore[no-untyped-def]
  62. # Without a second argument, object must be a collection object which supports
  63. # the iterable (__iter__) or the sequence protocol (__getitem__ with an integer
  64. # starting at 0)
  65. if sentinel is _sentinel_missing:
  66. iterable = fn_or_iterable
  67. if hasattr(iterable, "__iter__"):
  68. iterator = iterable.__iter__()
  69. if hasattr(iterator, "__next__"):
  70. return iterator
  71. else:
  72. raise TypeError(f"'{type(iterator)}' object is not iterable")
  73. if hasattr(iterable, "__getitem__"):
  74. # Needs to be a new function to avoid iter becoming a generator
  75. def sequence_protocol(iterable): # type: ignore[no-untyped-def]
  76. i = 0
  77. while True:
  78. try:
  79. yield iterable.__getitem__(i)
  80. i += 1
  81. except IndexError:
  82. break
  83. return sequence_protocol(iterable)
  84. raise TypeError(f"'{type(iterable)}' object is not iterable")
  85. else:
  86. # If the second argument, sentinel, is given, then object must be a
  87. # callable object.
  88. fn = fn_or_iterable
  89. if not isinstance(fn, Callable): # type: ignore[arg-type]
  90. raise TypeError("iter(v, w): v must be a callable")
  91. return _CallableIterator(fn, sentinel)