dir_util.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. """distutils.dir_util
  2. Utility functions for manipulating directories and directory trees."""
  3. import functools
  4. import itertools
  5. import os
  6. import pathlib
  7. from . import file_util
  8. from ._log import log
  9. from .errors import DistutilsFileError, DistutilsInternalError
  10. class SkipRepeatAbsolutePaths(set):
  11. """
  12. Cache for mkpath.
  13. In addition to cheapening redundant calls, eliminates redundant
  14. "creating /foo/bar/baz" messages in dry-run mode.
  15. """
  16. def __init__(self):
  17. SkipRepeatAbsolutePaths.instance = self
  18. @classmethod
  19. def clear(cls):
  20. super(cls, cls.instance).clear()
  21. def wrap(self, func):
  22. @functools.wraps(func)
  23. def wrapper(path, *args, **kwargs):
  24. if path.absolute() in self:
  25. return
  26. result = func(path, *args, **kwargs)
  27. self.add(path.absolute())
  28. return result
  29. return wrapper
  30. # Python 3.8 compatibility
  31. wrapper = SkipRepeatAbsolutePaths().wrap
  32. @functools.singledispatch
  33. @wrapper
  34. def mkpath(name: pathlib.Path, mode=0o777, verbose=True) -> None:
  35. """Create a directory and any missing ancestor directories.
  36. If the directory already exists (or if 'name' is the empty string, which
  37. means the current directory, which of course exists), then do nothing.
  38. Raise DistutilsFileError if unable to create some directory along the way
  39. (eg. some sub-path exists, but is a file rather than a directory).
  40. If 'verbose' is true, log the directory created.
  41. """
  42. if verbose and not name.is_dir():
  43. log.info("creating %s", name)
  44. try:
  45. name.mkdir(mode=mode, parents=True, exist_ok=True)
  46. except OSError as exc:
  47. raise DistutilsFileError(f"could not create '{name}': {exc.args[-1]}")
  48. @mkpath.register
  49. def _(name: str, *args, **kwargs):
  50. return mkpath(pathlib.Path(name), *args, **kwargs)
  51. @mkpath.register
  52. def _(name: None, *args, **kwargs):
  53. """
  54. Detect a common bug -- name is None.
  55. """
  56. raise DistutilsInternalError(f"mkpath: 'name' must be a string (got {name!r})")
  57. def create_tree(base_dir, files, mode=0o777, verbose=True):
  58. """Create all the empty directories under 'base_dir' needed to put 'files'
  59. there.
  60. 'base_dir' is just the name of a directory which doesn't necessarily
  61. exist yet; 'files' is a list of filenames to be interpreted relative to
  62. 'base_dir'. 'base_dir' + the directory portion of every file in 'files'
  63. will be created if it doesn't already exist. 'mode' and 'verbose'
  64. flags are as for 'mkpath()'.
  65. """
  66. # First get the list of directories to create
  67. need_dir = set(os.path.join(base_dir, os.path.dirname(file)) for file in files)
  68. # Now create them
  69. for dir in sorted(need_dir):
  70. mkpath(dir, mode, verbose=verbose)
  71. def copy_tree(
  72. src,
  73. dst,
  74. preserve_mode=True,
  75. preserve_times=True,
  76. preserve_symlinks=False,
  77. update=False,
  78. verbose=True,
  79. ):
  80. """Copy an entire directory tree 'src' to a new location 'dst'.
  81. Both 'src' and 'dst' must be directory names. If 'src' is not a
  82. directory, raise DistutilsFileError. If 'dst' does not exist, it is
  83. created with 'mkpath()'. The end result of the copy is that every
  84. file in 'src' is copied to 'dst', and directories under 'src' are
  85. recursively copied to 'dst'. Return the list of files that were
  86. copied or might have been copied, using their output name. The
  87. return value is unaffected by 'update': it is simply
  88. the list of all files under 'src', with the names changed to be
  89. under 'dst'.
  90. 'preserve_mode' and 'preserve_times' are the same as for
  91. 'copy_file'; note that they only apply to regular files, not to
  92. directories. If 'preserve_symlinks' is true, symlinks will be
  93. copied as symlinks (on platforms that support them!); otherwise
  94. (the default), the destination of the symlink will be copied.
  95. 'update' and 'verbose' are the same as for 'copy_file'.
  96. """
  97. if not os.path.isdir(src):
  98. raise DistutilsFileError(f"cannot copy tree '{src}': not a directory")
  99. try:
  100. names = os.listdir(src)
  101. except OSError as e:
  102. raise DistutilsFileError(f"error listing files in '{src}': {e.strerror}")
  103. mkpath(dst, verbose=verbose)
  104. copy_one = functools.partial(
  105. _copy_one,
  106. src=src,
  107. dst=dst,
  108. preserve_symlinks=preserve_symlinks,
  109. verbose=verbose,
  110. preserve_mode=preserve_mode,
  111. preserve_times=preserve_times,
  112. update=update,
  113. )
  114. return list(itertools.chain.from_iterable(map(copy_one, names)))
  115. def _copy_one(
  116. name,
  117. *,
  118. src,
  119. dst,
  120. preserve_symlinks,
  121. verbose,
  122. preserve_mode,
  123. preserve_times,
  124. update,
  125. ):
  126. src_name = os.path.join(src, name)
  127. dst_name = os.path.join(dst, name)
  128. if name.startswith('.nfs'):
  129. # skip NFS rename files
  130. return
  131. if preserve_symlinks and os.path.islink(src_name):
  132. link_dest = os.readlink(src_name)
  133. if verbose >= 1:
  134. log.info("linking %s -> %s", dst_name, link_dest)
  135. os.symlink(link_dest, dst_name)
  136. yield dst_name
  137. elif os.path.isdir(src_name):
  138. yield from copy_tree(
  139. src_name,
  140. dst_name,
  141. preserve_mode,
  142. preserve_times,
  143. preserve_symlinks,
  144. update,
  145. verbose=verbose,
  146. )
  147. else:
  148. file_util.copy_file(
  149. src_name,
  150. dst_name,
  151. preserve_mode,
  152. preserve_times,
  153. update,
  154. verbose=verbose,
  155. )
  156. yield dst_name
  157. def _build_cmdtuple(path, cmdtuples):
  158. """Helper for remove_tree()."""
  159. for f in os.listdir(path):
  160. real_f = os.path.join(path, f)
  161. if os.path.isdir(real_f) and not os.path.islink(real_f):
  162. _build_cmdtuple(real_f, cmdtuples)
  163. else:
  164. cmdtuples.append((os.remove, real_f))
  165. cmdtuples.append((os.rmdir, path))
  166. def remove_tree(directory, verbose=True):
  167. """Recursively remove an entire directory tree.
  168. Any errors are ignored (apart from being reported to stdout if 'verbose'
  169. is true).
  170. """
  171. if verbose >= 1:
  172. log.info("removing '%s' (and everything under it)", directory)
  173. cmdtuples = []
  174. _build_cmdtuple(directory, cmdtuples)
  175. for cmd in cmdtuples:
  176. try:
  177. cmd[0](cmd[1])
  178. # Clear the cache
  179. SkipRepeatAbsolutePaths.clear()
  180. except OSError as exc:
  181. log.warning("error removing %s: %s", directory, exc)
  182. def ensure_relative(path):
  183. """Take the full path 'path', and make it a relative path.
  184. This is useful to make 'path' the second argument to os.path.join().
  185. """
  186. drive, path = os.path.splitdrive(path)
  187. if path[0:1] == os.sep:
  188. path = drive + path[1:]
  189. return path