filesystem.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. from __future__ import annotations
  2. import contextlib
  3. import ctypes
  4. import dataclasses
  5. import errno
  6. import logging
  7. import os
  8. import platform
  9. import re
  10. import shutil
  11. import tempfile
  12. import threading
  13. from collections.abc import Generator, Iterable
  14. from pathlib import Path, PurePath
  15. from typing import IO, Any, BinaryIO, Literal, NewType, TypedDict
  16. from wandb.sdk.lib.paths import StrPath
  17. from wandb.sdk.wandb_settings import Settings
  18. GlobStr = NewType("GlobStr", str)
  19. logger = logging.getLogger(__name__)
  20. PolicyName = Literal["now", "live", "end"]
  21. # https://en.wikipedia.org/wiki/Filename#Comparison_of_filename_limitations
  22. PROBLEMATIC_PATH_CHARS = "".join(chr(i) for i in range(32)) + ':"*<>?|'
  23. class FilesDict(TypedDict):
  24. files: Iterable[tuple[GlobStr, PolicyName]]
  25. def mkdir_exists_ok(dir_name: StrPath) -> None:
  26. """Create `dir_name` and any parent directories if they don't exist.
  27. Raises:
  28. FileExistsError: if `dir_name` exists and is not a directory.
  29. PermissionError: if `dir_name` is not writable.
  30. """
  31. try:
  32. os.makedirs(dir_name, exist_ok=True)
  33. except FileExistsError as e:
  34. raise FileExistsError(f"{dir_name!s} exists and is not a directory") from e
  35. except PermissionError as e:
  36. raise PermissionError(f"{dir_name!s} is not writable") from e
  37. def path_fallbacks(path: StrPath) -> Generator[str, None, None]:
  38. """Yield variations of `path` that may exist on the filesystem.
  39. Return a sequence of paths that should be checked in order for existence or
  40. create-ability. Essentially, keep replacing "suspect" characters until we run out.
  41. """
  42. path = str(path)
  43. root, tail = os.path.splitdrive(path)
  44. yield os.path.join(root, tail)
  45. for char in PROBLEMATIC_PATH_CHARS:
  46. if char in tail:
  47. tail = tail.replace(char, "-")
  48. yield os.path.join(root, tail)
  49. def mkdir_allow_fallback(dir_name: StrPath) -> StrPath:
  50. """Create `dir_name`, removing invalid path characters if necessary.
  51. Returns:
  52. The path to the created directory, which may not be the original path.
  53. """
  54. for new_name in path_fallbacks(dir_name):
  55. try:
  56. os.makedirs(new_name, exist_ok=True)
  57. if Path(new_name) != Path(dir_name):
  58. logger.warning(f"Creating '{new_name}' instead of '{dir_name}'")
  59. return Path(new_name) if isinstance(dir_name, Path) else new_name
  60. except (ValueError, NotADirectoryError):
  61. pass
  62. except OSError as e:
  63. if e.errno != 22:
  64. raise
  65. raise OSError(f"Unable to create directory '{dir_name}'")
  66. def files_in(path: StrPath) -> Generator[os.DirEntry, None, None]:
  67. """Yield a directory entry for each file under a given path (recursive)."""
  68. if not os.path.isdir(path):
  69. return
  70. for entry in os.scandir(path):
  71. if entry.is_dir():
  72. yield from files_in(entry.path)
  73. else:
  74. yield entry
  75. class WriteSerializingFile:
  76. """Wrapper for a file object that serializes writes."""
  77. def __init__(self, f: BinaryIO) -> None:
  78. self.lock = threading.Lock()
  79. self.f = f
  80. def write(self, *args, **kargs) -> None: # type: ignore
  81. self.lock.acquire()
  82. try:
  83. self.f.write(*args, **kargs)
  84. self.f.flush()
  85. finally:
  86. self.lock.release()
  87. def close(self) -> None:
  88. self.lock.acquire() # wait for pending writes
  89. try:
  90. self.f.close()
  91. finally:
  92. self.lock.release()
  93. class CRDedupedFile(WriteSerializingFile):
  94. def __init__(self, f: BinaryIO) -> None:
  95. super().__init__(f=f)
  96. self._buff = b""
  97. def write(self, data) -> None: # type: ignore
  98. lines = re.split(b"\r\n|\n", data)
  99. ret = [] # type: ignore
  100. for line in lines:
  101. if line[:1] == b"\r":
  102. if ret:
  103. ret.pop()
  104. elif self._buff:
  105. self._buff = b""
  106. line = line.split(b"\r")[-1]
  107. if line:
  108. ret.append(line)
  109. if self._buff:
  110. ret.insert(0, self._buff)
  111. if ret:
  112. self._buff = ret.pop()
  113. super().write(b"\n".join(ret) + b"\n")
  114. def close(self) -> None:
  115. if self._buff:
  116. super().write(self._buff)
  117. super().close()
  118. def copy_or_overwrite_changed(source_path: StrPath, target_path: StrPath) -> StrPath:
  119. """Copy source_path to target_path, unless it already exists with the same mtime.
  120. We liberally add write permissions to deal with the case of multiple users needing
  121. to share the same cache or run directory.
  122. Args:
  123. source_path: The path to the file to copy.
  124. target_path: The path to copy the file to.
  125. Returns:
  126. The path to the copied file (which may be different from target_path).
  127. """
  128. return_type = type(target_path)
  129. target_path = system_preferred_path(target_path, warn=True)
  130. need_copy = (
  131. not os.path.isfile(target_path)
  132. or os.stat(source_path).st_mtime != os.stat(target_path).st_mtime
  133. )
  134. permissions_plus_write = os.stat(source_path).st_mode
  135. if need_copy:
  136. dir_name, file_name = os.path.split(target_path)
  137. target_path = os.path.join(mkdir_allow_fallback(dir_name), file_name)
  138. try:
  139. # Use copy2 to preserve file metadata (including modified time).
  140. shutil.copy2(source_path, target_path)
  141. except PermissionError:
  142. # If the file is read-only try to make it writable.
  143. try:
  144. os.chmod(target_path, permissions_plus_write)
  145. shutil.copy2(source_path, target_path)
  146. except PermissionError as e:
  147. raise PermissionError("Unable to overwrite '{target_path!s}'") from e
  148. # Prevent future permissions issues by universal write permissions now.
  149. os.chmod(target_path, permissions_plus_write)
  150. return return_type(target_path) # type: ignore # 'os.PathLike' is abstract.
  151. @contextlib.contextmanager
  152. def safe_open(
  153. path: StrPath, mode: str = "r", *args: Any, **kwargs: Any
  154. ) -> Generator[IO, None, None]:
  155. """Open a file, ensuring any changes only apply atomically after close.
  156. This context manager ensures that even unsuccessful writes will not leave a "dirty"
  157. file or overwrite good data, and that all temp data is cleaned up.
  158. The semantics and behavior are intended to be nearly identical to the built-in
  159. open() function. Differences:
  160. - It creates any parent directories that don't exist, rather than raising.
  161. - In 'x' mode, it checks at the beginning AND end of the write and fails if the
  162. file exists either time.
  163. """
  164. path = Path(path).resolve()
  165. path.parent.mkdir(parents=True, exist_ok=True)
  166. if "x" in mode and path.exists():
  167. raise FileExistsError(f"{path!s} already exists")
  168. if "r" in mode and "+" not in mode:
  169. # This is read-only, so we can just open the original file.
  170. # TODO (hugh): create a reflink and read from that.
  171. with path.open(mode, *args, **kwargs) as f:
  172. yield f
  173. return
  174. with tempfile.TemporaryDirectory(dir=path.parent) as tmp_dir:
  175. tmp_path = Path(tmp_dir) / path.name
  176. if ("r" in mode or "a" in mode) and path.exists():
  177. # We need to copy the original file in order to support reads and appends.
  178. # TODO (hugh): use reflinks to avoid the copy on platforms that support it.
  179. shutil.copy2(path, tmp_path)
  180. with tmp_path.open(mode, *args, **kwargs) as f:
  181. yield f
  182. f.flush()
  183. os.fsync(f.fileno())
  184. if "x" in mode:
  185. # Ensure that if another process has beaten us to writing the file we raise
  186. # rather than overwrite. os.link() atomically creates a hard link to the
  187. # target file and will raise FileExistsError if the target already exists.
  188. os.link(tmp_path, path)
  189. os.unlink(tmp_path)
  190. else:
  191. tmp_path.replace(path)
  192. def safe_copy(source_path: StrPath, target_path: StrPath) -> StrPath:
  193. """Copy a file atomically.
  194. Copying is not usually atomic, and on operating systems that allow multiple
  195. writers to the same file, the result can get corrupted. If two writers copy
  196. to the same file, the contents can become interleaved.
  197. We mitigate the issue somewhat by copying to a temporary file first and
  198. then renaming. Renaming is atomic: if process 1 renames file A to X and
  199. process 2 renames file B to X, then X will either contain the contents
  200. of A or the contents of B, not some mixture of both.
  201. """
  202. # TODO (hugh): check that there is enough free space.
  203. output_path = Path(target_path).resolve()
  204. output_path.parent.mkdir(parents=True, exist_ok=True)
  205. with tempfile.TemporaryDirectory(dir=output_path.parent) as tmp_dir:
  206. tmp_path = (Path(tmp_dir) / Path(source_path).name).with_suffix(".tmp")
  207. shutil.copy2(source_path, tmp_path)
  208. tmp_path.replace(output_path)
  209. return target_path
  210. def _reflink_linux(existing_path: Path, new_path: Path) -> None:
  211. """Create a reflink to `existing_path` at `new_path` on Linux."""
  212. import fcntl
  213. FICLONE = 0x40049409 # magic number from <linux/fs.h> # noqa: N806
  214. with open(existing_path, "rb") as t_f, open(new_path, "wb+") as l_f:
  215. fcntl.ioctl(l_f.fileno(), FICLONE, t_f.fileno())
  216. def _reflink_macos(existing_path: Path, new_path: Path) -> None:
  217. try:
  218. clib = ctypes.CDLL("libc.dylib", use_errno=True)
  219. except (FileNotFoundError, OSError) as e:
  220. if ctypes.get_errno() != errno.ENOENT and not isinstance(e, FileNotFoundError):
  221. raise
  222. # Before macOS 11 (<Nov 2020) clib was in libSystem.dylib, so we can try there.
  223. clib = ctypes.CDLL("/usr/lib/libSystem.dylib", use_errno=True)
  224. try:
  225. clonefile = clib.clonefile
  226. except AttributeError:
  227. raise OSError(errno.ENOTSUP, "'clonefile' is not available on this system")
  228. clonefile.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int)
  229. clonefile.restype = ctypes.c_int
  230. if clonefile(os.fsencode(existing_path), os.fsencode(new_path), ctypes.c_int(0)):
  231. # Anything other than 0 is an error.
  232. err = ctypes.get_errno()
  233. raise OSError(err, os.strerror(err), existing_path)
  234. def reflink(existing_path: StrPath, new_path: StrPath, overwrite: bool = False) -> None:
  235. """Create a reflink to `existing_path` at `new_path`.
  236. A reflink (reflective link) is a copy-on-write reference to a file. Once linked, the
  237. file and link are both "real" files (not symbolic or hard links) and each can be
  238. modified independently without affecting the other; however, they share the same
  239. underlying data blocks on disk so until one is modified they are "zero-cost" copies.
  240. Reflinks have all the functionality of copies, so we should use them wherever they
  241. are supported if we would otherwise copy a file. (This is not particularly radical--
  242. GNU `cp` defaults to `reflink=auto`, using it whenever available) However, support
  243. for them is limited to a small number of filesystems. They should work on:
  244. - Linux with a Btrfs or XFS filesystem (NOT ext4)
  245. - macOS 10.13 or later with an APFS filesystem (called clone files)
  246. Reflinks are also supported on Solaris and Windows with ReFSv2, but we haven't
  247. implemented support for them.
  248. Like hard links, a reflink can only be created on the same filesystem as the target.
  249. """
  250. if platform.system() == "Linux":
  251. link_fn = _reflink_linux
  252. elif platform.system() == "Darwin":
  253. link_fn = _reflink_macos
  254. else:
  255. raise OSError(
  256. errno.ENOTSUP, f"reflinks are not supported on {platform.system()}"
  257. )
  258. new_path = Path(new_path).resolve()
  259. existing_path = Path(existing_path).resolve()
  260. if new_path.exists():
  261. if not overwrite:
  262. raise FileExistsError(f"{new_path} already exists")
  263. logger.warning(f"Overwriting existing file {new_path}.")
  264. new_path.unlink()
  265. # Create any missing parent directories.
  266. new_path.parent.mkdir(parents=True, exist_ok=True)
  267. try:
  268. link_fn(existing_path, new_path)
  269. except OSError as e:
  270. base_msg = f"failed to create reflink from {existing_path} to {new_path}."
  271. if e.errno in (errno.EPERM, errno.EACCES):
  272. raise PermissionError(f"Insufficient permissions; {base_msg}") from e
  273. if e.errno == errno.ENOENT:
  274. raise FileNotFoundError(f"File not found; {base_msg}") from e
  275. if e.errno == errno.EXDEV:
  276. raise ValueError(f"Cannot link across filesystems; {base_msg}") from e
  277. if e.errno == errno.EISDIR:
  278. raise IsADirectoryError(f"Cannot reflink a directory; {base_msg}") from e
  279. if e.errno in (errno.EOPNOTSUPP, errno.ENOTSUP):
  280. raise OSError(
  281. errno.ENOTSUP,
  282. f"Filesystem does not support reflinks; {base_msg}",
  283. ) from e
  284. if e.errno == errno.EINVAL:
  285. raise ValueError(f"Cannot link file ranges; {base_msg}") from e
  286. raise
  287. def check_exists(path: StrPath) -> StrPath | None:
  288. """Look for variations of `path` and return the first found.
  289. This exists to support former behavior around system-dependent paths; we used to use
  290. ':' in Artifact paths unless we were on Windows, but this has issues when e.g. a
  291. Linux machine is accessing an NTFS filesystem; we might need to look for the
  292. alternate path. This checks all the possible directories we would consider creating.
  293. """
  294. for dest in path_fallbacks(path):
  295. if os.path.exists(dest):
  296. return Path(dest) if isinstance(path, Path) else dest
  297. return None
  298. def system_preferred_path(path: StrPath, warn: bool = False) -> StrPath:
  299. """Replace ':' with '-' in paths on Windows.
  300. Args:
  301. path: The path to convert.
  302. warn: Whether to warn if ':' is replaced.
  303. """
  304. if platform.system() != "Windows":
  305. return path
  306. head, tail = os.path.splitdrive(path)
  307. if warn and ":" in tail:
  308. logger.warning(f"Replacing ':' in {tail} with '-'")
  309. new_path = head + tail.replace(":", "-")
  310. return Path(new_path) if isinstance(path, Path) else new_path
  311. @dataclasses.dataclass
  312. class LinkStats:
  313. """Tracks statistics about file linking operations."""
  314. n_symlink: int = 0
  315. n_hardlink: int = 0
  316. n_copy: int = 0
  317. n_copy_downgraded: int = 0
  318. def record(
  319. self,
  320. mode: Literal["symlink", "hardlink", "copy"],
  321. downgraded: bool = False,
  322. ) -> None:
  323. """Record a file operation.
  324. Args:
  325. mode: Type of operation - "symlink", "hardlink", or "copy".
  326. downgraded: Whether policy was downgraded from "live" to "now".
  327. """
  328. if mode == "symlink":
  329. self.n_symlink += 1
  330. elif mode == "hardlink":
  331. self.n_hardlink += 1
  332. elif mode == "copy":
  333. self.n_copy += 1
  334. if downgraded:
  335. self.n_copy_downgraded += 1
  336. def emit_warnings(self) -> None:
  337. """Emit appropriate warnings based on recorded operations."""
  338. from wandb import termwarn
  339. if self.n_symlink:
  340. termwarn(
  341. f"Symlinked {self.n_symlink} file{'s' if self.n_symlink > 1 else ''} "
  342. "into the W&B run directory; call wandb.save again to sync new files."
  343. )
  344. if self.n_hardlink:
  345. termwarn(
  346. f"Linked {self.n_hardlink} file{'s' if self.n_hardlink > 1 else ''} "
  347. "into the W&B run directory (hardlinks); call wandb.save again to sync new files."
  348. )
  349. if self.n_copy:
  350. if self.n_copy_downgraded:
  351. termwarn(
  352. f"Copied {self.n_copy} file{'s' if self.n_copy > 1 else ''} "
  353. "(cross-volume or links unavailable). "
  354. "Downgrading policy to 'now' for those files because live updates "
  355. "won't propagate from the originals. Re-run wandb.save to resync, "
  356. "or place your run directory on the same drive to enable hardlinks."
  357. )
  358. else:
  359. termwarn(
  360. f"Copied {self.n_copy} file{'s' if self.n_copy > 1 else ''} "
  361. "into the W&B run directory; call wandb.save again to sync new files."
  362. )
  363. def validate_glob_path(glob_path: PurePath, base_path: PurePath) -> None:
  364. """Validate that glob path is within base path.
  365. Args:
  366. glob_path: The glob pattern path to validate.
  367. base_path: The base path that should contain the glob.
  368. Raises:
  369. ValueError: If validation fails.
  370. """
  371. if not str(glob_path).startswith(str(base_path)):
  372. raise ValueError("Glob may not walk above the base path")
  373. if glob_path == base_path:
  374. raise ValueError("Glob cannot be the same as the base path")
  375. relative_glob = glob_path.relative_to(base_path)
  376. if relative_glob.parts and relative_glob.parts[0] == "*":
  377. raise ValueError("Glob may not start with '*' relative to the base path")
  378. def are_paths_on_same_drive(path1: Path, path2: Path) -> bool:
  379. """Check if two paths are on the same drive.
  380. This check is only relevant on Windows,
  381. since the concept of drives only exists on Windows.
  382. """
  383. if platform.system() != "Windows":
  384. return True
  385. try:
  386. path1_drive = path1.resolve().drive
  387. path2_drive = path2.resolve().drive
  388. except OSError:
  389. # If either path is not a valid Windows path, an OSError is raised.
  390. return False
  391. return path1_drive == path2_drive
  392. def unlink_path(path: Path) -> None:
  393. """Best-effort removal of a pre-existing file/symlink/dir.
  394. Args:
  395. path: Path to remove.
  396. """
  397. with contextlib.suppress(FileNotFoundError):
  398. if path.is_symlink() or path.is_file():
  399. path.unlink()
  400. def link_or_copy(
  401. settings: Settings,
  402. src: Path,
  403. dst: Path,
  404. ) -> Literal["symlink", "hardlink", "copy"]:
  405. """Link or copy a file using the best available method.
  406. Tries strategies in order: symlink -> hardlink -> copy.
  407. Args:
  408. src: Source file path (should be resolved/absolute).
  409. dst: Destination file path.
  410. allow_symlink: Whether to attempt symlinks before hardlinks.
  411. Returns:
  412. The strategy that succeeded.
  413. """
  414. if settings.symlink:
  415. try:
  416. dst.symlink_to(src, target_is_directory=src.is_dir())
  417. except (OSError, NotImplementedError):
  418. pass
  419. else:
  420. return "symlink"
  421. if are_paths_on_same_drive(src, dst.parent):
  422. try:
  423. os.link(str(src), str(dst))
  424. except OSError:
  425. pass
  426. else:
  427. return "hardlink"
  428. tmp = dst.with_name(dst.name + ".tmp~wandb")
  429. shutil.copy2(str(src), str(tmp))
  430. os.replace(str(tmp), str(dst))
  431. return "copy"
  432. def link_or_copy_with_policy(
  433. settings: Settings,
  434. src: Path,
  435. dst: Path,
  436. requested_policy: PolicyName,
  437. stats: LinkStats,
  438. ) -> PolicyName:
  439. """Link or copy a file using the best available method.
  440. Tries strategies in order: symlink -> hardlink -> copy.
  441. Updates stats and returns the effective policy.
  442. Args:
  443. settings: wandb settings object with symlink preference.
  444. src: Source file path.
  445. dst: Destination file path.
  446. requested_policy: Requested upload policy ("live", "now", or "end").
  447. stats: Stats object to update with operation type.
  448. Returns:
  449. Effective policy after any necessary downgrade.
  450. """
  451. mode = link_or_copy(settings, src, dst)
  452. effective_policy = requested_policy
  453. downgraded = False
  454. if mode == "copy" and requested_policy == "live":
  455. downgraded = True
  456. effective_policy = "now"
  457. stats.record(mode, downgraded=downgraded)
  458. return effective_policy