temp_dir.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. from __future__ import annotations
  2. import errno
  3. import itertools
  4. import logging
  5. import os.path
  6. import tempfile
  7. import traceback
  8. from collections.abc import Generator
  9. from contextlib import ExitStack, contextmanager
  10. from pathlib import Path
  11. from typing import (
  12. Any,
  13. Callable,
  14. TypeVar,
  15. )
  16. from pip._internal.utils.misc import enum, rmtree
  17. logger = logging.getLogger(__name__)
  18. _T = TypeVar("_T", bound="TempDirectory")
  19. # Kinds of temporary directories. Only needed for ones that are
  20. # globally-managed.
  21. tempdir_kinds = enum(
  22. BUILD_ENV="build-env",
  23. EPHEM_WHEEL_CACHE="ephem-wheel-cache",
  24. REQ_BUILD="req-build",
  25. )
  26. _tempdir_manager: ExitStack | None = None
  27. @contextmanager
  28. def global_tempdir_manager() -> Generator[None, None, None]:
  29. global _tempdir_manager
  30. with ExitStack() as stack:
  31. old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
  32. try:
  33. yield
  34. finally:
  35. _tempdir_manager = old_tempdir_manager
  36. class TempDirectoryTypeRegistry:
  37. """Manages temp directory behavior"""
  38. def __init__(self) -> None:
  39. self._should_delete: dict[str, bool] = {}
  40. def set_delete(self, kind: str, value: bool) -> None:
  41. """Indicate whether a TempDirectory of the given kind should be
  42. auto-deleted.
  43. """
  44. self._should_delete[kind] = value
  45. def get_delete(self, kind: str) -> bool:
  46. """Get configured auto-delete flag for a given TempDirectory type,
  47. default True.
  48. """
  49. return self._should_delete.get(kind, True)
  50. _tempdir_registry: TempDirectoryTypeRegistry | None = None
  51. @contextmanager
  52. def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]:
  53. """Provides a scoped global tempdir registry that can be used to dictate
  54. whether directories should be deleted.
  55. """
  56. global _tempdir_registry
  57. old_tempdir_registry = _tempdir_registry
  58. _tempdir_registry = TempDirectoryTypeRegistry()
  59. try:
  60. yield _tempdir_registry
  61. finally:
  62. _tempdir_registry = old_tempdir_registry
  63. class _Default:
  64. pass
  65. _default = _Default()
  66. class TempDirectory:
  67. """Helper class that owns and cleans up a temporary directory.
  68. This class can be used as a context manager or as an OO representation of a
  69. temporary directory.
  70. Attributes:
  71. path
  72. Location to the created temporary directory
  73. delete
  74. Whether the directory should be deleted when exiting
  75. (when used as a contextmanager)
  76. Methods:
  77. cleanup()
  78. Deletes the temporary directory
  79. When used as a context manager, if the delete attribute is True, on
  80. exiting the context the temporary directory is deleted.
  81. """
  82. def __init__(
  83. self,
  84. path: str | None = None,
  85. delete: bool | None | _Default = _default,
  86. kind: str = "temp",
  87. globally_managed: bool = False,
  88. ignore_cleanup_errors: bool = True,
  89. ):
  90. super().__init__()
  91. if delete is _default:
  92. if path is not None:
  93. # If we were given an explicit directory, resolve delete option
  94. # now.
  95. delete = False
  96. else:
  97. # Otherwise, we wait until cleanup and see what
  98. # tempdir_registry says.
  99. delete = None
  100. # The only time we specify path is in for editables where it
  101. # is the value of the --src option.
  102. if path is None:
  103. path = self._create(kind)
  104. self._path = path
  105. self._deleted = False
  106. self.delete = delete
  107. self.kind = kind
  108. self.ignore_cleanup_errors = ignore_cleanup_errors
  109. if globally_managed:
  110. assert _tempdir_manager is not None
  111. _tempdir_manager.enter_context(self)
  112. @property
  113. def path(self) -> str:
  114. assert not self._deleted, f"Attempted to access deleted path: {self._path}"
  115. return self._path
  116. def __repr__(self) -> str:
  117. return f"<{self.__class__.__name__} {self.path!r}>"
  118. def __enter__(self: _T) -> _T:
  119. return self
  120. def __exit__(self, exc: Any, value: Any, tb: Any) -> None:
  121. if self.delete is not None:
  122. delete = self.delete
  123. elif _tempdir_registry:
  124. delete = _tempdir_registry.get_delete(self.kind)
  125. else:
  126. delete = True
  127. if delete:
  128. self.cleanup()
  129. def _create(self, kind: str) -> str:
  130. """Create a temporary directory and store its path in self.path"""
  131. # We realpath here because some systems have their default tmpdir
  132. # symlinked to another directory. This tends to confuse build
  133. # scripts, so we canonicalize the path by traversing potential
  134. # symlinks here.
  135. path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
  136. logger.debug("Created temporary directory: %s", path)
  137. return path
  138. def cleanup(self) -> None:
  139. """Remove the temporary directory created and reset state"""
  140. self._deleted = True
  141. if not os.path.exists(self._path):
  142. return
  143. errors: list[BaseException] = []
  144. def onerror(
  145. func: Callable[..., Any],
  146. path: Path,
  147. exc_val: BaseException,
  148. ) -> None:
  149. """Log a warning for a `rmtree` error and continue"""
  150. formatted_exc = "\n".join(
  151. traceback.format_exception_only(type(exc_val), exc_val)
  152. )
  153. formatted_exc = formatted_exc.rstrip() # remove trailing new line
  154. if func in (os.unlink, os.remove, os.rmdir):
  155. logger.debug(
  156. "Failed to remove a temporary file '%s' due to %s.\n",
  157. path,
  158. formatted_exc,
  159. )
  160. else:
  161. logger.debug("%s failed with %s.", func.__qualname__, formatted_exc)
  162. errors.append(exc_val)
  163. if self.ignore_cleanup_errors:
  164. try:
  165. # first try with @retry; retrying to handle ephemeral errors
  166. rmtree(self._path, ignore_errors=False)
  167. except OSError:
  168. # last pass ignore/log all errors
  169. rmtree(self._path, onexc=onerror)
  170. if errors:
  171. logger.warning(
  172. "Failed to remove contents in a temporary directory '%s'.\n"
  173. "You can safely remove it manually.",
  174. self._path,
  175. )
  176. else:
  177. rmtree(self._path)
  178. class AdjacentTempDirectory(TempDirectory):
  179. """Helper class that creates a temporary directory adjacent to a real one.
  180. Attributes:
  181. original
  182. The original directory to create a temp directory for.
  183. path
  184. After calling create() or entering, contains the full
  185. path to the temporary directory.
  186. delete
  187. Whether the directory should be deleted when exiting
  188. (when used as a contextmanager)
  189. """
  190. # The characters that may be used to name the temp directory
  191. # We always prepend a ~ and then rotate through these until
  192. # a usable name is found.
  193. # pkg_resources raises a different error for .dist-info folder
  194. # with leading '-' and invalid metadata
  195. LEADING_CHARS = "-~.=%0123456789"
  196. def __init__(self, original: str, delete: bool | None = None) -> None:
  197. self.original = original.rstrip("/\\")
  198. super().__init__(delete=delete)
  199. @classmethod
  200. def _generate_names(cls, name: str) -> Generator[str, None, None]:
  201. """Generates a series of temporary names.
  202. The algorithm replaces the leading characters in the name
  203. with ones that are valid filesystem characters, but are not
  204. valid package names (for both Python and pip definitions of
  205. package).
  206. """
  207. for i in range(1, len(name)):
  208. for candidate in itertools.combinations_with_replacement(
  209. cls.LEADING_CHARS, i - 1
  210. ):
  211. new_name = "~" + "".join(candidate) + name[i:]
  212. if new_name != name:
  213. yield new_name
  214. # If we make it this far, we will have to make a longer name
  215. for i in range(len(cls.LEADING_CHARS)):
  216. for candidate in itertools.combinations_with_replacement(
  217. cls.LEADING_CHARS, i
  218. ):
  219. new_name = "~" + "".join(candidate) + name
  220. if new_name != name:
  221. yield new_name
  222. def _create(self, kind: str) -> str:
  223. root, name = os.path.split(self.original)
  224. for candidate in self._generate_names(name):
  225. path = os.path.join(root, candidate)
  226. try:
  227. os.mkdir(path)
  228. except OSError as ex:
  229. # Continue if the name exists already
  230. if ex.errno != errno.EEXIST:
  231. raise
  232. else:
  233. path = os.path.realpath(path)
  234. break
  235. else:
  236. # Final fallback on the default behavior.
  237. path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
  238. logger.debug("Created temporary directory: %s", path)
  239. return path