storage.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724
  1. # Try import ray[train] core requirements (defined in setup.py)
  2. # isort: off
  3. try:
  4. import fsspec # noqa
  5. from fsspec.implementations.local import LocalFileSystem
  6. except (ImportError, ModuleNotFoundError) as e:
  7. raise RuntimeError(
  8. "fsspec is a required dependency of Ray Train and Ray Tune. "
  9. "Please install with: `pip install fsspec`"
  10. ) from e
  11. try:
  12. import pyarrow
  13. import pyarrow.fs
  14. except (ImportError, ModuleNotFoundError) as e:
  15. raise RuntimeError(
  16. "pyarrow is a required dependency of Ray Train and Ray Tune. "
  17. "Please install with: `pip install pyarrow`"
  18. ) from e
  19. try:
  20. # check if Arrow has S3 support
  21. from pyarrow.fs import S3FileSystem
  22. except ImportError:
  23. S3FileSystem = None
  24. # isort: on
  25. import fnmatch
  26. import logging
  27. import os
  28. import shutil
  29. from pathlib import Path
  30. from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Type, Union
  31. from ray.air._internal.filelock import TempFileLock
  32. from ray.train._internal.syncer import SyncConfig, Syncer, _BackgroundSyncer
  33. from ray.train.constants import _get_ray_train_session_dir
  34. from ray.util.annotations import DeveloperAPI
  35. if TYPE_CHECKING:
  36. from ray.train._checkpoint import Checkpoint
  37. logger = logging.getLogger(__name__)
  38. _VALIDATE_STORAGE_MARKER_FILENAME = ".validate_storage_marker"
  39. class _ExcludingLocalFilesystem(LocalFileSystem):
  40. """LocalFileSystem wrapper to exclude files according to patterns.
  41. Args:
  42. root_path: Root path to strip when matching with the exclude pattern.
  43. Ex: root_path="/tmp/a/b/c", exclude=["*a*"], will exclude
  44. /tmp/a/b/c/_a_.txt but not ALL of /tmp/a/*.
  45. exclude: List of patterns that are applied to files returned by
  46. ``self.find()``. If a file path matches this pattern, it will
  47. be excluded.
  48. """
  49. def __init__(self, root_path: Path, exclude: List[str], **kwargs):
  50. super().__init__(**kwargs)
  51. self._exclude = exclude
  52. self._root_path = root_path
  53. @property
  54. def fsid(self):
  55. return "_excluding_local"
  56. def _should_exclude(self, path: str) -> bool:
  57. """Return True if `path` (relative to `root_path`) matches any of the
  58. `self._exclude` patterns."""
  59. path = Path(path)
  60. relative_path = path.relative_to(self._root_path).as_posix()
  61. match_candidates = [relative_path]
  62. if path.is_dir():
  63. # Everything is in posix path format ('/')
  64. match_candidates.append(relative_path + "/")
  65. for excl in self._exclude:
  66. if any(fnmatch.fnmatch(candidate, excl) for candidate in match_candidates):
  67. return True
  68. return False
  69. def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
  70. """Call parent find() and exclude from result."""
  71. paths = super().find(
  72. path, maxdepth=maxdepth, withdirs=withdirs, detail=detail, **kwargs
  73. )
  74. if detail:
  75. return {
  76. path: out
  77. for path, out in paths.items()
  78. if not self._should_exclude(path)
  79. }
  80. else:
  81. return [path for path in paths if not self._should_exclude(path)]
  82. def _pyarrow_fs_copy_files(
  83. source, destination, source_filesystem=None, destination_filesystem=None, **kwargs
  84. ):
  85. if S3FileSystem and isinstance(destination_filesystem, pyarrow.fs.S3FileSystem):
  86. # Workaround multi-threading issue with pyarrow. Note that use_threads=True
  87. # is safe for download, just not for uploads, see:
  88. # https://github.com/apache/arrow/issues/32372
  89. kwargs.setdefault("use_threads", False)
  90. # Use a large chunk size to speed up large checkpoint transfers.
  91. kwargs.setdefault("chunk_size", 64 * 1024 * 1024)
  92. return pyarrow.fs.copy_files(
  93. source,
  94. destination,
  95. source_filesystem=source_filesystem,
  96. destination_filesystem=destination_filesystem,
  97. **kwargs,
  98. )
  99. # TODO(justinvyu): Add unit tests for all these utils.
  100. def _delete_fs_path(fs: pyarrow.fs.FileSystem, fs_path: str):
  101. is_dir = _is_directory(fs, fs_path)
  102. try:
  103. if is_dir:
  104. fs.delete_dir(fs_path)
  105. else:
  106. fs.delete_file(fs_path)
  107. except Exception:
  108. logger.exception(f"Caught exception when deleting path at ({fs}, {fs_path}):")
  109. def _download_from_fs_path(
  110. fs: pyarrow.fs.FileSystem,
  111. fs_path: str,
  112. local_path: str,
  113. filelock: bool = True,
  114. ):
  115. """Downloads a directory or file from (fs, fs_path) to a local path.
  116. If fs_path points to a directory:
  117. - The full directory contents are downloaded directly into `local_path`,
  118. rather than to a subdirectory of `local_path`.
  119. If fs_path points to a file:
  120. - The file is downloaded to `local_path`, which is expected to be a file path.
  121. If the download fails, the `local_path` contents are
  122. cleaned up before raising, if the directory did not previously exist.
  123. NOTE: This method creates `local_path`'s parent directories if they do not
  124. already exist. If the download fails, this does NOT clean up all the parent
  125. directories that were created.
  126. Args:
  127. fs: The filesystem to download from.
  128. fs_path: The filesystem path (either a directory or a file) to download.
  129. local_path: The local path to download to.
  130. filelock: Whether to require a file lock before downloading, useful for
  131. multiple downloads to the same directory that may be happening in parallel.
  132. Raises:
  133. FileNotFoundError: if (fs, fs_path) doesn't exist.
  134. """
  135. _local_path = Path(local_path).resolve()
  136. exists_before = _local_path.exists()
  137. if _is_directory(fs=fs, fs_path=fs_path):
  138. _local_path.mkdir(parents=True, exist_ok=True)
  139. else:
  140. _local_path.parent.mkdir(parents=True, exist_ok=True)
  141. try:
  142. if filelock:
  143. with TempFileLock(f"{os.path.normpath(local_path)}.lock"):
  144. _pyarrow_fs_copy_files(fs_path, local_path, source_filesystem=fs)
  145. else:
  146. _pyarrow_fs_copy_files(fs_path, local_path, source_filesystem=fs)
  147. except Exception as e:
  148. # Clean up the directory if downloading was unsuccessful
  149. if not exists_before:
  150. shutil.rmtree(local_path, ignore_errors=True)
  151. raise e
  152. def _upload_to_fs_path(
  153. local_path: str,
  154. fs: pyarrow.fs.FileSystem,
  155. fs_path: str,
  156. exclude: Optional[List[str]] = None,
  157. ) -> None:
  158. """Uploads a local directory or file to (fs, fs_path).
  159. NOTE: This will create all necessary parent directories at the destination.
  160. Args:
  161. local_path: The local path to upload.
  162. fs: The filesystem to upload to.
  163. fs_path: The filesystem path where the dir/file will be uploaded to.
  164. exclude: A list of filename matches to exclude from upload. This includes
  165. all files under subdirectories as well.
  166. This pattern will match with the relative paths of all files under
  167. `local_path`.
  168. Ex: ["*.png"] to exclude all .png images.
  169. """
  170. if not exclude:
  171. # TODO(justinvyu): uploading a single file doesn't work
  172. # (since we always create a directory at fs_path)
  173. _create_directory(fs=fs, fs_path=fs_path)
  174. _pyarrow_fs_copy_files(local_path, fs_path, destination_filesystem=fs)
  175. return
  176. _upload_to_uri_with_exclude_fsspec(
  177. local_path=local_path, fs=fs, fs_path=fs_path, exclude=exclude
  178. )
  179. def _upload_to_uri_with_exclude_fsspec(
  180. local_path: str, fs: "pyarrow.fs", fs_path: str, exclude: Optional[List[str]]
  181. ) -> None:
  182. local_fs = _ExcludingLocalFilesystem(root_path=local_path, exclude=exclude)
  183. handler = pyarrow.fs.FSSpecHandler(local_fs)
  184. source_fs = pyarrow.fs.PyFileSystem(handler)
  185. _create_directory(fs=fs, fs_path=fs_path)
  186. _pyarrow_fs_copy_files(
  187. local_path, fs_path, source_filesystem=source_fs, destination_filesystem=fs
  188. )
  189. def _list_at_fs_path(
  190. fs: pyarrow.fs.FileSystem,
  191. fs_path: str,
  192. file_filter: Optional[Callable[[pyarrow.fs.FileInfo], bool]] = None,
  193. ) -> List[str]:
  194. """Returns the list of filenames at (fs, fs_path), similar to os.listdir.
  195. If the path doesn't exist, returns an empty list.
  196. """
  197. if file_filter is None:
  198. file_filter = lambda x: True # noqa: E731
  199. selector = pyarrow.fs.FileSelector(fs_path, allow_not_found=True, recursive=False)
  200. return [
  201. os.path.relpath(file_info.path.lstrip("/"), start=fs_path.lstrip("/"))
  202. for file_info in fs.get_file_info(selector)
  203. if file_filter(file_info)
  204. ]
  205. def _exists_at_fs_path(fs: pyarrow.fs.FileSystem, fs_path: str) -> bool:
  206. """Returns True if (fs, fs_path) exists."""
  207. valid = fs.get_file_info(fs_path)
  208. return valid.type != pyarrow.fs.FileType.NotFound
  209. def _is_directory(fs: pyarrow.fs.FileSystem, fs_path: str) -> bool:
  210. """Checks if (fs, fs_path) is a directory or a file.
  211. Raises:
  212. FileNotFoundError: if (fs, fs_path) doesn't exist.
  213. """
  214. file_info = fs.get_file_info(fs_path)
  215. if file_info.type == pyarrow.fs.FileType.NotFound:
  216. raise FileNotFoundError(f"Path not found: ({fs}, {fs_path})")
  217. return not file_info.is_file
  218. def _create_directory(fs: pyarrow.fs.FileSystem, fs_path: str) -> None:
  219. """Create directory at (fs, fs_path).
  220. Some external filesystems require directories to already exist, or at least
  221. the `netloc` to be created (e.g. PyArrows ``mock://`` filesystem).
  222. Generally this should be done before and outside of Ray applications. This
  223. utility is thus primarily used in testing, e.g. of ``mock://` URIs.
  224. """
  225. try:
  226. fs.create_dir(fs_path)
  227. except Exception:
  228. logger.exception(
  229. f"Caught exception when creating directory at ({fs}, {fs_path}):"
  230. )
  231. def get_fs_and_path(
  232. storage_path: Union[str, os.PathLike],
  233. storage_filesystem: Optional[pyarrow.fs.FileSystem] = None,
  234. ) -> Tuple[pyarrow.fs.FileSystem, str]:
  235. """Returns the fs and path from a storage path and an optional custom fs.
  236. Args:
  237. storage_path: A storage path or URI. (ex: s3://bucket/path or /tmp/ray_results)
  238. storage_filesystem: A custom filesystem to use. If not provided,
  239. this will be auto-resolved by pyarrow. If provided, the storage_path
  240. is assumed to be prefix-stripped already, and must be a valid path
  241. on the filesystem.
  242. """
  243. storage_path = str(storage_path)
  244. if storage_filesystem:
  245. return storage_filesystem, storage_path
  246. return pyarrow.fs.FileSystem.from_uri(storage_path)
  247. class _FilesystemSyncer(_BackgroundSyncer):
  248. """Syncer between local filesystem and a `storage_filesystem`."""
  249. def __init__(self, storage_filesystem: Optional["pyarrow.fs.FileSystem"], **kwargs):
  250. self.storage_filesystem = storage_filesystem
  251. super().__init__(**kwargs)
  252. def _sync_up_command(
  253. self, local_path: str, uri: str, exclude: Optional[List] = None
  254. ) -> Tuple[Callable, Dict]:
  255. # TODO(justinvyu): Defer this cleanup up as part of the
  256. # external-facing Syncer deprecation.
  257. fs_path = uri
  258. return (
  259. _upload_to_fs_path,
  260. dict(
  261. local_path=local_path,
  262. fs=self.storage_filesystem,
  263. fs_path=fs_path,
  264. exclude=exclude,
  265. ),
  266. )
  267. def _sync_down_command(self, uri: str, local_path: str) -> Tuple[Callable, Dict]:
  268. fs_path = uri
  269. return (
  270. _download_from_fs_path,
  271. dict(
  272. fs=self.storage_filesystem,
  273. fs_path=fs_path,
  274. local_path=local_path,
  275. ),
  276. )
  277. def _delete_command(self, uri: str) -> Tuple[Callable, Dict]:
  278. fs_path = uri
  279. return _delete_fs_path, dict(fs=self.storage_filesystem, fs_path=fs_path)
  280. @DeveloperAPI
  281. class StorageContext:
  282. """Shared context that holds the source of truth for all paths and
  283. storage utilities, passed along from the driver to workers.
  284. This object defines a few types of paths:
  285. 1. *_fs_path: A path on the `storage_filesystem`. This is a regular path
  286. which has been prefix-stripped by pyarrow.fs.FileSystem.from_uri and
  287. can be joined with `Path(...).as_posix()`.
  288. 2. *_driver_staging_path: The temporary staging directory on the local filesystem
  289. where driver artifacts are saved to before persisting them to storage.
  290. 3. trial_working_directory: The local filesystem path that the remote
  291. actors' working directories are moved to by default.
  292. This is separated from the driver staging path so that driver syncing
  293. does not implicitly upload the trial working directory, for trials on the
  294. driver node.
  295. Example with storage_path="mock:///bucket/path?param=1":
  296. >>> import ray
  297. >>> from ray.train._internal.storage import StorageContext
  298. >>> import os
  299. >>> _ = ray.init()
  300. >>> storage = StorageContext(
  301. ... storage_path="mock://netloc/bucket/path?param=1",
  302. ... experiment_dir_name="exp_name",
  303. ... )
  304. >>> storage.storage_filesystem # Auto-resolved # doctest: +ELLIPSIS
  305. <pyarrow._fs._MockFileSystem object...
  306. >>> storage.experiment_fs_path
  307. 'bucket/path/exp_name'
  308. >>> storage.experiment_driver_staging_path # doctest: +ELLIPSIS
  309. '/tmp/ray/session_.../artifacts/.../exp_name/driver_artifacts'
  310. >>> storage.trial_dir_name = "trial_dir"
  311. >>> storage.trial_fs_path
  312. 'bucket/path/exp_name/trial_dir'
  313. >>> storage.trial_driver_staging_path # doctest: +ELLIPSIS
  314. '/tmp/ray/session_.../artifacts/.../exp_name/driver_artifacts/trial_dir'
  315. >>> storage.trial_working_directory # doctest: +ELLIPSIS
  316. '/tmp/ray/session_.../artifacts/.../exp_name/working_dirs/trial_dir'
  317. >>> storage.current_checkpoint_index = 1
  318. >>> storage.checkpoint_fs_path
  319. 'bucket/path/exp_name/trial_dir/checkpoint_000001'
  320. >>> ray.shutdown()
  321. Example with storage_path="/tmp/ray_results":
  322. >>> from ray.train._internal.storage import StorageContext
  323. >>> storage = StorageContext(
  324. ... storage_path="/tmp/ray_results",
  325. ... experiment_dir_name="exp_name",
  326. ... )
  327. >>> storage.storage_fs_path
  328. '/tmp/ray_results'
  329. >>> storage.experiment_fs_path
  330. '/tmp/ray_results/exp_name'
  331. >>> storage.storage_filesystem # Auto-resolved # doctest: +ELLIPSIS
  332. <pyarrow._fs.LocalFileSystem object...
  333. Internal Usage Examples:
  334. - To copy files to the trial directory on the storage filesystem:
  335. pyarrow.fs.copy_files(
  336. local_dir,
  337. Path(storage.trial_fs_path, "subdir").as_posix(),
  338. destination_filesystem=storage.filesystem
  339. )
  340. .. warning::
  341. This is an experimental developer API and is subject to change
  342. without notice between versions.
  343. """
  344. def __init__(
  345. self,
  346. storage_path: Union[str, os.PathLike],
  347. experiment_dir_name: str,
  348. sync_config: Optional[SyncConfig] = None,
  349. storage_filesystem: Optional[pyarrow.fs.FileSystem] = None,
  350. trial_dir_name: Optional[str] = None,
  351. current_checkpoint_index: int = -1,
  352. ):
  353. from ray.tune.utils import date_str
  354. self.custom_fs_provided = storage_filesystem is not None
  355. # Invariant: (`storage_filesystem`, `storage_path`) is the location where
  356. # *all* results can be accessed.
  357. self.experiment_dir_name = experiment_dir_name
  358. self.trial_dir_name = trial_dir_name
  359. self.current_checkpoint_index = current_checkpoint_index
  360. self.sync_config = sync_config or SyncConfig()
  361. self.storage_filesystem, self.storage_fs_path = get_fs_and_path(
  362. storage_path, storage_filesystem
  363. )
  364. self.storage_fs_path = Path(self.storage_fs_path).as_posix()
  365. self.syncer: Syncer = _FilesystemSyncer(
  366. storage_filesystem=self.storage_filesystem,
  367. sync_period=self.sync_config.sync_period,
  368. sync_timeout=self.sync_config.sync_timeout,
  369. )
  370. self._create_validation_file()
  371. self._check_validation_file()
  372. # Timestamp is used to create a unique session directory for the current
  373. # training job. This is used to avoid conflicts when multiple training jobs
  374. # run with the same name in the same cluster.
  375. # This is set ONCE at the creation of the storage context, on the driver.
  376. self._timestamp = date_str()
  377. def __str__(self):
  378. return (
  379. "StorageContext<\n"
  380. f" storage_filesystem='{self.storage_filesystem.type_name}',\n"
  381. f" storage_fs_path='{self.storage_fs_path}',\n"
  382. f" experiment_dir_name='{self.experiment_dir_name}',\n"
  383. f" trial_dir_name='{self.trial_dir_name}',\n"
  384. f" current_checkpoint_index={self.current_checkpoint_index},\n"
  385. ">"
  386. )
  387. def _create_validation_file(self):
  388. """On the creation of a storage context, create a validation file at the
  389. storage path to verify that the storage path can be written to.
  390. This validation file is also used to check whether the storage path is
  391. accessible by all nodes in the cluster."""
  392. valid_file = Path(
  393. self.experiment_fs_path, _VALIDATE_STORAGE_MARKER_FILENAME
  394. ).as_posix()
  395. self.storage_filesystem.create_dir(self.experiment_fs_path)
  396. with self.storage_filesystem.open_output_stream(valid_file):
  397. pass
  398. def _check_validation_file(self):
  399. """Checks that the validation file exists at the storage path."""
  400. valid_file = Path(
  401. self.experiment_fs_path, _VALIDATE_STORAGE_MARKER_FILENAME
  402. ).as_posix()
  403. if not _exists_at_fs_path(fs=self.storage_filesystem, fs_path=valid_file):
  404. raise RuntimeError(
  405. f"Unable to set up cluster storage with the following settings:\n{self}"
  406. "\nCheck that all nodes in the cluster have read/write access "
  407. "to the configured storage path. `RunConfig(storage_path)` should be "
  408. "set to a cloud storage URI or a shared filesystem path accessible "
  409. "by all nodes in your cluster ('s3://bucket' or '/mnt/nfs'). "
  410. "A local path on the head node is not accessible by worker nodes. "
  411. "See: https://docs.ray.io/en/latest/train/user-guides/persistent-storage.html" # noqa: E501
  412. )
  413. def _update_checkpoint_index(self, metrics: Dict):
  414. # Per default, increase by 1. This can be overwritten to customize checkpoint
  415. # directories.
  416. self.current_checkpoint_index += 1
  417. def persist_current_checkpoint(self, checkpoint: "Checkpoint") -> "Checkpoint":
  418. """Persists a given checkpoint to the current checkpoint path on the filesystem.
  419. "Current" is defined by the `current_checkpoint_index` attribute of the
  420. storage context.
  421. This method copies the checkpoint files to the storage location.
  422. It's up to the user to delete the original checkpoint files if desired.
  423. For example, the original directory is typically a local temp directory.
  424. Args:
  425. checkpoint: The checkpoint to persist to (fs, checkpoint_fs_path).
  426. Returns:
  427. Checkpoint: A Checkpoint pointing to the persisted checkpoint location.
  428. """
  429. # TODO(justinvyu): Fix this cyclical import.
  430. logger.debug(
  431. "Copying checkpoint files to storage path:\n"
  432. "({source_fs}, {source}) -> ({dest_fs}, {destination})".format(
  433. source=checkpoint.path,
  434. destination=self.checkpoint_fs_path,
  435. source_fs=checkpoint.filesystem,
  436. dest_fs=self.storage_filesystem,
  437. )
  438. )
  439. # Raise an error if the storage path is not accessible when
  440. # attempting to upload a checkpoint from a remote worker.
  441. # Ex: If storage_path is a local path, then a validation marker
  442. # will only exist on the head node but not the worker nodes.
  443. self._check_validation_file()
  444. self.storage_filesystem.create_dir(self.checkpoint_fs_path)
  445. _pyarrow_fs_copy_files(
  446. source=checkpoint.path,
  447. destination=self.checkpoint_fs_path,
  448. source_filesystem=checkpoint.filesystem,
  449. destination_filesystem=self.storage_filesystem,
  450. )
  451. persisted_checkpoint = checkpoint.__class__(
  452. filesystem=self.storage_filesystem,
  453. path=self.checkpoint_fs_path,
  454. )
  455. logger.info(f"Checkpoint successfully created at: {persisted_checkpoint}")
  456. return persisted_checkpoint
  457. def persist_artifacts(self, force: bool = False) -> None:
  458. """Persists all artifacts within `trial_local_dir` to storage.
  459. This method possibly launches a background task to sync the trial dir,
  460. depending on the `sync_period` + `sync_artifacts_on_checkpoint`
  461. settings of `SyncConfig`.
  462. `(local_fs, trial_working_dir) -> (storage_filesystem, trial_fs_path)`
  463. Args:
  464. force: If True, wait for a previous sync to finish, launch a new one,
  465. and wait for that one to finish. By the end of a `force=True` call, the
  466. latest version of the trial artifacts will be persisted.
  467. """
  468. if not self.sync_config.sync_artifacts:
  469. return
  470. # Skip if there are no artifacts to sync
  471. is_empty = not any(os.scandir(self.trial_working_directory))
  472. if is_empty:
  473. return
  474. if force:
  475. self.syncer.wait()
  476. self.syncer.sync_up(
  477. local_dir=self.trial_working_directory, remote_dir=self.trial_fs_path
  478. )
  479. self.syncer.wait()
  480. else:
  481. self.syncer.sync_up_if_needed(
  482. local_dir=self.trial_working_directory, remote_dir=self.trial_fs_path
  483. )
  484. @property
  485. def experiment_fs_path(self) -> str:
  486. """The path on the `storage_filesystem` to the experiment directory.
  487. NOTE: This does not have a URI prefix anymore, since it has been stripped
  488. by pyarrow.fs.FileSystem.from_uri already. The URI scheme information is
  489. kept in `storage_filesystem` instead.
  490. """
  491. return Path(self.storage_fs_path, self.experiment_dir_name).as_posix()
  492. def _get_session_path(self) -> str:
  493. """The Ray Train/Tune session local directory used to stage files
  494. before persisting to the storage filesystem."""
  495. return Path(
  496. _get_ray_train_session_dir(), self._timestamp, self.experiment_dir_name
  497. ).as_posix()
  498. @property
  499. def experiment_driver_staging_path(self) -> str:
  500. """The local filesystem path of the experiment directory on the driver node.
  501. The driver is the node where `Trainer.fit`/`Tuner.fit` is being called.
  502. This path is of the form:
  503. `/tmp/ray/session_<session_id>/artifacts/<ray-train-job-timestamp>/
  504. <experiment_dir_name>/driver_artifacts`
  505. This should be used as the temporary staging location for files *on the driver*
  506. before syncing them to `experiment_fs_path`.
  507. For example, the search algorithm should dump its state to this directory.
  508. See `trial_driver_staging_path` for writing trial-specific artifacts.
  509. The directory is synced to
  510. `{storage_path}/{experiment_dir_name}` periodically.
  511. See `_ExperimentCheckpointManager.checkpoint` for where that happens.
  512. """
  513. return Path(self._get_session_path(), "driver_artifacts").as_posix()
  514. @property
  515. def trial_fs_path(self) -> str:
  516. """The trial directory path on the `storage_filesystem`.
  517. Raises a ValueError if `trial_dir_name` is not set beforehand.
  518. """
  519. if self.trial_dir_name is None:
  520. raise RuntimeError(
  521. "Should not access `trial_fs_path` without setting `trial_dir_name`"
  522. )
  523. return Path(self.experiment_fs_path, self.trial_dir_name).as_posix()
  524. @property
  525. def trial_driver_staging_path(self) -> str:
  526. """The local filesystem path of the trial directory on the driver.
  527. The driver is the node where `Trainer.fit`/`Tuner.fit` is being called.
  528. This path is of the form:
  529. `/tmp/ray/session_<session_id>/artifacts/<ray-train-job-timestamp>/
  530. <experiment_dir_name>/driver_artifacts/<trial_dir_name>`
  531. This should be used as the temporary location for files on the driver
  532. before persisting them to `trial_fs_path`.
  533. For example, callbacks (e.g., JsonLoggerCallback) should write trial-specific
  534. logfiles within this directory.
  535. """
  536. if self.trial_dir_name is None:
  537. raise RuntimeError(
  538. "Should not access `trial_driver_staging_path` "
  539. "without setting `trial_dir_name`"
  540. )
  541. return Path(self.experiment_driver_staging_path, self.trial_dir_name).as_posix()
  542. @property
  543. def trial_working_directory(self) -> str:
  544. """The local filesystem path to trial working directory.
  545. This path is of the form:
  546. `/tmp/ray/session_<session_id>/artifacts/<ray-train-job-timestamp>/
  547. <experiment_dir_name>/working_dirs/<trial_dir_name>`
  548. Ray Train/Tune moves the remote actor's working directory to this path
  549. by default, unless disabled by `RAY_CHDIR_TO_TRIAL_DIR` environment variable.
  550. Writing files to this directory allows users to persist training artifacts
  551. if `SyncConfig(sync_artifacts=True)` is set.
  552. """
  553. if self.trial_dir_name is None:
  554. raise RuntimeError(
  555. "Cannot access `trial_working_directory` without "
  556. "setting `trial_dir_name`"
  557. )
  558. return Path(
  559. self._get_session_path(), "working_dirs", self.trial_dir_name
  560. ).as_posix()
  561. @property
  562. def checkpoint_fs_path(self) -> str:
  563. """The current checkpoint directory path on the `storage_filesystem`.
  564. "Current" refers to the checkpoint that is currently being created/persisted.
  565. The user of this class is responsible for setting the `current_checkpoint_index`
  566. (e.g., incrementing when needed).
  567. """
  568. return Path(self.trial_fs_path, self.checkpoint_dir_name).as_posix()
  569. @property
  570. def checkpoint_dir_name(self) -> str:
  571. """The current checkpoint directory name, based on the checkpoint index."""
  572. return StorageContext._make_checkpoint_dir_name(self.current_checkpoint_index)
  573. @staticmethod
  574. def get_experiment_dir_name(run_obj: Union[str, Callable, Type]) -> str:
  575. from ray.tune.experiment import Experiment
  576. from ray.tune.utils import date_str
  577. run_identifier = Experiment.get_trainable_name(run_obj)
  578. if bool(int(os.environ.get("TUNE_DISABLE_DATED_SUBDIR", 0))):
  579. dir_name = run_identifier
  580. else:
  581. dir_name = "{}_{}".format(run_identifier, date_str())
  582. return dir_name
  583. @staticmethod
  584. def _make_checkpoint_dir_name(index: int):
  585. """Get the name of the checkpoint directory, given an index."""
  586. return f"checkpoint_{index:06d}"