heapq.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. """
  2. Python polyfills for heapq
  3. """
  4. from __future__ import annotations
  5. import heapq
  6. import importlib
  7. import sys
  8. from typing import TYPE_CHECKING, TypeVar
  9. from ..decorators import substitute_in_graph
  10. if TYPE_CHECKING:
  11. from types import ModuleType
  12. _T = TypeVar("_T")
  13. # Partially copied from CPython test/support/import_helper.py
  14. # https://github.com/python/cpython/blob/bb8791c0b75b5970d109e5557bfcca8a578a02af/Lib/test/support/import_helper.py
  15. def _save_and_remove_modules(names: set[str]) -> dict[str, ModuleType]:
  16. orig_modules = {}
  17. prefixes = tuple(name + "." for name in names)
  18. for modname in list(sys.modules):
  19. if modname in names or modname.startswith(prefixes):
  20. orig_modules[modname] = sys.modules.pop(modname)
  21. return orig_modules
  22. def import_fresh_module(name: str, blocked: list[str]) -> ModuleType:
  23. # Keep track of modules saved for later restoration as well
  24. # as those which just need a blocking entry removed
  25. names = {name, *blocked}
  26. orig_modules = _save_and_remove_modules(names)
  27. for modname in blocked:
  28. sys.modules[modname] = None # type: ignore[assignment]
  29. try:
  30. return importlib.import_module(name)
  31. finally:
  32. _save_and_remove_modules(names)
  33. sys.modules.update(orig_modules)
  34. # Import the pure Python heapq module, blocking the C extension
  35. py_heapq = import_fresh_module("heapq", blocked=["_heapq"])
  36. __all__ = [
  37. "_heapify_max",
  38. "_heappop_max",
  39. "_heapreplace_max",
  40. "heapify",
  41. "heappop",
  42. "heappush",
  43. "heappushpop",
  44. "heapreplace",
  45. "merge",
  46. "nlargest",
  47. "nsmallest",
  48. ]
  49. @substitute_in_graph(heapq._heapify_max)
  50. def _heapify_max(heap: list[_T], /) -> None:
  51. return py_heapq._heapify_max(heap)
  52. @substitute_in_graph(heapq._heappop_max) # type: ignore[attr-defined]
  53. def _heappop_max(heap: list[_T]) -> _T:
  54. return py_heapq._heappop_max(heap)
  55. @substitute_in_graph(heapq._heapreplace_max) # type: ignore[attr-defined]
  56. def _heapreplace_max(heap: list[_T], item: _T) -> _T:
  57. return py_heapq._heapreplace_max(heap, item)
  58. @substitute_in_graph(heapq.heapify)
  59. def heapify(heap: list[_T], /) -> None:
  60. return py_heapq.heapify(heap)
  61. @substitute_in_graph(heapq.heappop)
  62. def heappop(heap: list[_T], /) -> _T:
  63. return py_heapq.heappop(heap)
  64. @substitute_in_graph(heapq.heappush)
  65. def heappush(heap: list[_T], item: _T) -> None:
  66. return py_heapq.heappush(heap, item)
  67. @substitute_in_graph(heapq.heappushpop)
  68. def heappushpop(heap: list[_T], item: _T) -> _T:
  69. return py_heapq.heappushpop(heap, item)
  70. @substitute_in_graph(heapq.heapreplace)
  71. def heapreplace(heap: list[_T], item: _T) -> _T:
  72. return py_heapq.heapreplace(heap, item)
  73. @substitute_in_graph(heapq.merge) # type: ignore[arg-type]
  74. def merge(*iterables, key=None, reverse=False): # type: ignore[no-untyped-def]
  75. return py_heapq.merge(*iterables, key=key, reverse=reverse)
  76. @substitute_in_graph(heapq.nlargest) # type: ignore[arg-type]
  77. def nlargest(n, iterable, key=None): # type: ignore[no-untyped-def]
  78. return py_heapq.nlargest(n, iterable, key=key)
  79. @substitute_in_graph(heapq.nsmallest) # type: ignore[arg-type]
  80. def nsmallest(n, iterable, key=None): # type: ignore[no-untyped-def]
  81. return py_heapq.nsmallest(n, iterable, key=key)