file_transfer.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. import fnmatch
  2. import io
  3. import os
  4. import shutil
  5. import tarfile
  6. from typing import Dict, Generator, List, Optional, Tuple, Union
  7. import ray
  8. from ray.air._internal.filelock import TempFileLock
  9. from ray.air.util.node import _force_on_node, _get_node_id_from_node_ip
  10. from ray.util.annotations import DeveloperAPI
  11. _DEFAULT_CHUNK_SIZE_BYTES = 500 * 1024 * 1024 # 500 MiB
  12. _DEFAULT_MAX_SIZE_BYTES = 1 * 1024 * 1024 * 1024 # 1 GiB
  13. @DeveloperAPI
  14. def sync_dir_between_nodes(
  15. source_ip: str,
  16. source_path: str,
  17. target_ip: str,
  18. target_path: str,
  19. force_all: bool = False,
  20. exclude: Optional[List] = None,
  21. chunk_size_bytes: int = _DEFAULT_CHUNK_SIZE_BYTES,
  22. max_size_bytes: Optional[int] = _DEFAULT_MAX_SIZE_BYTES,
  23. return_futures: bool = False,
  24. ) -> Union[
  25. None,
  26. Tuple[ray.ObjectRef, ray.ActorID, ray.ObjectRef],
  27. Tuple[ray.ObjectRef, None, None],
  28. ]:
  29. """Synchronize directory on source node to directory on target node.
  30. Per default, this function will collect information about already existing
  31. files in the target directory. Only files that differ in either mtime or
  32. filesize will be transferred, unless ``force_all=True``.
  33. If ``source_ip==target_ip``, shutil will be used to copy the directory. Otherwise,
  34. the directory will be packed and sent through the Ray Object Store to the target
  35. node.
  36. Args:
  37. source_ip: IP of source node.
  38. source_path: Path to directory on source node.
  39. target_ip: IP of target node.
  40. target_path: Path to directory on target node.
  41. force_all: If True, all files will be transferred (not just differing files).
  42. Ignored if ``source_ip==target_ip``.
  43. exclude: Pattern of files to exclude, e.g.
  44. ``["*/checkpoint_*]`` to exclude trial checkpoints.
  45. chunk_size_bytes: Chunk size for data transfer. Ignored if
  46. ``source_ip==target_ip``.
  47. max_size_bytes: If packed data exceeds this value, raise an error before
  48. transfer. If ``None``, no limit is enforced. Ignored if
  49. ``source_ip==target_ip``.
  50. return_futures: If True, returns a tuple of the unpack future,
  51. the pack actor, and the files_stats future. If False (default) will
  52. block until synchronization finished and return None.
  53. Returns:
  54. None, or Tuple of unpack future, pack actor, and files_stats future.
  55. If ``source_ip==target_ip``, pack actor and files_stats future will be None.
  56. """
  57. if source_ip != target_ip:
  58. return _sync_dir_between_different_nodes(
  59. source_ip=source_ip,
  60. source_path=source_path,
  61. target_ip=target_ip,
  62. target_path=target_path,
  63. force_all=force_all,
  64. exclude=exclude,
  65. chunk_size_bytes=chunk_size_bytes,
  66. max_size_bytes=max_size_bytes,
  67. return_futures=return_futures,
  68. )
  69. elif source_path != target_path:
  70. ret = _sync_dir_on_same_node(
  71. ip=source_ip,
  72. source_path=source_path,
  73. target_path=target_path,
  74. exclude=exclude,
  75. return_futures=return_futures,
  76. )
  77. if return_futures:
  78. return ret, None, None
  79. return ret
  80. def _sync_dir_on_same_node(
  81. ip: str,
  82. source_path: str,
  83. target_path: str,
  84. exclude: Optional[List] = None,
  85. return_futures: bool = False,
  86. ) -> Optional[ray.ObjectRef]:
  87. """Synchronize directory to another directory on the same node.
  88. Per default, this function will collect information about already existing
  89. files in the target directory. All files will be copied over.
  90. Args:
  91. ip: IP of the node.
  92. source_path: Path to source directory.
  93. target_path: Path to target directory.
  94. exclude: Pattern of files to exclude, e.g.
  95. ``["*/checkpoint_*]`` to exclude trial checkpoints.
  96. return_futures: If True, returns a future of the copy task.
  97. Returns:
  98. None, or future of the copy task.
  99. """
  100. node_id = _get_node_id_from_node_ip(ip)
  101. copy_on_node = _remote_copy_dir.options(num_cpus=0, **_force_on_node(node_id))
  102. copy_future = copy_on_node.remote(
  103. source_dir=source_path, target_dir=target_path, exclude=exclude
  104. )
  105. if return_futures:
  106. return copy_future
  107. return ray.get(copy_future)
  108. def _sync_dir_between_different_nodes(
  109. source_ip: str,
  110. source_path: str,
  111. target_ip: str,
  112. target_path: str,
  113. force_all: bool = False,
  114. exclude: Optional[List] = None,
  115. chunk_size_bytes: int = _DEFAULT_CHUNK_SIZE_BYTES,
  116. max_size_bytes: Optional[int] = _DEFAULT_MAX_SIZE_BYTES,
  117. return_futures: bool = False,
  118. ) -> Union[None, Tuple[ray.ObjectRef, ray.ActorID, ray.ObjectRef]]:
  119. """Synchronize directory on source node to directory on target node.
  120. Per default, this function will collect information about already existing
  121. files in the target directory. Only files that differ in either mtime or
  122. filesize will be transferred, unless ``force_all=True``.
  123. Args:
  124. source_ip: IP of source node.
  125. source_path: Path to directory on source node.
  126. target_ip: IP of target node.
  127. target_path: Path to directory on target node.
  128. force_all: If True, all files will be transferred (not just differing files).
  129. exclude: Pattern of files to exclude, e.g.
  130. ``["*/checkpoint_*]`` to exclude trial checkpoints.
  131. chunk_size_bytes: Chunk size for data transfer.
  132. max_size_bytes: If packed data exceeds this value, raise an error before
  133. transfer. If ``None``, no limit is enforced.
  134. return_futures: If True, returns a tuple of the unpack future,
  135. the pack actor, and the files_stats future. If False (default) will
  136. block until synchronization finished and return None.
  137. Returns:
  138. None, or Tuple of unpack future, pack actor, and files_stats future.
  139. """
  140. source_node_id = _get_node_id_from_node_ip(source_ip)
  141. target_node_id = _get_node_id_from_node_ip(target_ip)
  142. pack_actor_on_source_node = _PackActor.options(
  143. num_cpus=0, **_force_on_node(source_node_id)
  144. )
  145. unpack_on_target_node = _unpack_from_actor.options(
  146. num_cpus=0, **_force_on_node(target_node_id)
  147. )
  148. if force_all:
  149. files_stats = None
  150. else:
  151. files_stats = _remote_get_recursive_files_and_stats.options(
  152. num_cpus=0, **_force_on_node(target_node_id)
  153. ).remote(target_path)
  154. pack_actor = pack_actor_on_source_node.remote(
  155. source_dir=source_path,
  156. files_stats=files_stats,
  157. chunk_size_bytes=chunk_size_bytes,
  158. max_size_bytes=max_size_bytes,
  159. exclude=exclude,
  160. )
  161. unpack_future = unpack_on_target_node.remote(pack_actor, target_path)
  162. if return_futures:
  163. return unpack_future, pack_actor, files_stats
  164. return ray.get(unpack_future)
  165. def _get_recursive_files_and_stats(path: str) -> Dict[str, Tuple[float, int]]:
  166. """Return dict of files mapping to stats in ``path``.
  167. This function scans a directory ``path`` recursively and returns a dict
  168. mapping each contained file to a tuple of (mtime, filesize).
  169. mtime and filesize are returned from ``os.lstat`` and are usually a
  170. floating point number (timestamp) and an int (filesize in bytes).
  171. """
  172. files_stats = {}
  173. for root, dirs, files in os.walk(path, topdown=False):
  174. rel_root = os.path.relpath(root, path)
  175. for file in files:
  176. try:
  177. key = os.path.join(rel_root, file)
  178. stat = os.lstat(os.path.join(path, key))
  179. files_stats[key] = stat.st_mtime, stat.st_size
  180. except FileNotFoundError:
  181. # Race condition: If a file is deleted while executing this
  182. # method, just continue and don't include the file in the stats
  183. pass
  184. return files_stats
  185. # Only export once
  186. _remote_get_recursive_files_and_stats = ray.remote(_get_recursive_files_and_stats)
  187. def _pack_dir(
  188. source_dir: str,
  189. exclude: Optional[List] = None,
  190. files_stats: Optional[Dict[str, Tuple[float, int]]] = None,
  191. ) -> io.BytesIO:
  192. """Pack whole directory contents into an uncompressed tarfile.
  193. This function accepts a ``files_stats`` argument. If given, only files
  194. whose stats differ from these stats will be packed.
  195. The main use case for this is that we can collect information about files
  196. already existing in the target directory, and only pack files that have
  197. been updated. This is similar to how cloud syncing utilities decide
  198. which files to transfer.
  199. Args:
  200. source_dir: Path to local directory to pack into tarfile.
  201. exclude: Pattern of files to exclude, e.g.
  202. ``["*/checkpoint_*]`` to exclude trial checkpoints.
  203. files_stats: Dict of relative filenames mapping to a tuple of
  204. (mtime, filesize). Only files that differ from these stats
  205. will be packed.
  206. Returns:
  207. Tarfile as a stream object.
  208. """
  209. def _should_exclude(candidate: str) -> bool:
  210. if not exclude:
  211. return False
  212. for excl in exclude:
  213. if fnmatch.fnmatch(candidate, excl):
  214. return True
  215. return False
  216. stream = io.BytesIO()
  217. with tarfile.open(fileobj=stream, mode="w", format=tarfile.PAX_FORMAT) as tar:
  218. if not files_stats and not exclude:
  219. # If no `files_stats` is passed, pack whole directory
  220. tar.add(source_dir, arcname="", recursive=True)
  221. else:
  222. files_stats = files_stats or {}
  223. # Otherwise, only pack differing files
  224. tar.add(source_dir, arcname="", recursive=False)
  225. for root, dirs, files in os.walk(source_dir, topdown=False):
  226. rel_root = os.path.relpath(root, source_dir)
  227. # Always add all directories
  228. for dir in dirs:
  229. key = os.path.join(rel_root, dir)
  230. tar.add(os.path.join(source_dir, key), arcname=key, recursive=False)
  231. # Add files where our information differs
  232. for file in files:
  233. key = os.path.join(rel_root, file)
  234. stat = os.lstat(os.path.join(source_dir, key))
  235. file_stat = stat.st_mtime, stat.st_size
  236. if _should_exclude(key):
  237. # If the file matches an exclude pattern, skip
  238. continue
  239. if key in files_stats and files_stats[key] == file_stat:
  240. # If the file did not change, skip
  241. continue
  242. tar.add(os.path.join(source_dir, key), arcname=key)
  243. return stream
  244. def _gib_string(num_bytes: float) -> str:
  245. return f"{float(num_bytes / 1024 ** 3):.2f}GiB"
  246. @ray.remote
  247. class _PackActor:
  248. """Actor wrapping around a packing job.
  249. This actor is used for chunking the packed data into smaller chunks that
  250. can be transferred via the object store more efficiently.
  251. The actor will start packing the directory when initialized, and separate
  252. chunks can be received by calling the remote ``next()`` task.
  253. Args:
  254. source_dir: Path to local directory to pack into tarfile.
  255. exclude: Pattern of files to exclude, e.g.
  256. ``["*/checkpoint_*]`` to exclude trial checkpoints.
  257. files_stats: Dict of relative filenames mapping to a tuple of
  258. (mtime, filesize). Only files that differ from these stats
  259. will be packed.
  260. chunk_size_bytes: Cut bytes stream into chunks of this size in bytes.
  261. max_size_bytes: If packed data exceeds this value, raise an error before
  262. transfer. If ``None``, no limit is enforced.
  263. """
  264. def __init__(
  265. self,
  266. source_dir: str,
  267. exclude: Optional[List] = None,
  268. files_stats: Optional[Dict[str, Tuple[float, int]]] = None,
  269. chunk_size_bytes: int = _DEFAULT_CHUNK_SIZE_BYTES,
  270. max_size_bytes: Optional[int] = _DEFAULT_MAX_SIZE_BYTES,
  271. ):
  272. self.stream = _pack_dir(
  273. source_dir=source_dir, exclude=exclude, files_stats=files_stats
  274. )
  275. # Get buffer size
  276. self.stream.seek(0, 2)
  277. file_size = self.stream.tell()
  278. if max_size_bytes and file_size > max_size_bytes:
  279. raise RuntimeError(
  280. f"Packed directory {source_dir} content has a size of "
  281. f"{_gib_string(file_size)}, which exceeds the limit "
  282. f"of {_gib_string(max_size_bytes)}. Please check the directory "
  283. f"contents. If you want to transfer everything, you can increase "
  284. f"or disable the limit by passing the `max_size` argument."
  285. )
  286. self.chunk_size = chunk_size_bytes
  287. self.max_size = max_size_bytes
  288. self.iter = None
  289. def get_full_data(self) -> bytes:
  290. return self.stream.getvalue()
  291. def _chunk_generator(self) -> Generator[bytes, None, None]:
  292. self.stream.seek(0)
  293. data = self.stream.read(self.chunk_size)
  294. while data:
  295. yield data
  296. data = self.stream.read(self.chunk_size)
  297. def next(self) -> Optional[bytes]:
  298. if not self.iter:
  299. self.iter = iter(self._chunk_generator())
  300. try:
  301. return next(self.iter)
  302. except StopIteration:
  303. return None
  304. def _iter_remote(actor: ray.ActorID) -> Generator[bytes, None, None]:
  305. """Iterate over actor task and return as generator."""
  306. while True:
  307. buffer = ray.get(actor.next.remote())
  308. if buffer is None:
  309. return
  310. yield buffer
  311. def _unpack_dir(stream: io.BytesIO, target_dir: str, *, _retry: bool = True) -> None:
  312. """Unpack tarfile stream into target directory."""
  313. stream.seek(0)
  314. target_dir = os.path.normpath(target_dir)
  315. try:
  316. # Timeout 0 means there will be only one attempt to acquire
  317. # the file lock. If it cannot be acquired, a TimeoutError
  318. # will be thrown.
  319. with TempFileLock(f"{target_dir}.lock", timeout=0):
  320. with tarfile.open(fileobj=stream) as tar:
  321. tar.extractall(target_dir)
  322. except TimeoutError:
  323. # wait, but do not do anything
  324. with TempFileLock(f"{target_dir}.lock"):
  325. pass
  326. # if the dir was locked due to being deleted,
  327. # recreate
  328. if not os.path.exists(target_dir):
  329. if _retry:
  330. _unpack_dir(stream, target_dir, _retry=False)
  331. else:
  332. raise RuntimeError(
  333. f"Target directory {target_dir} does not exist "
  334. "and couldn't be recreated. "
  335. "Please raise an issue on GitHub: "
  336. "https://github.com/ray-project/ray/issues"
  337. )
  338. @ray.remote
  339. def _unpack_from_actor(pack_actor: ray.ActorID, target_dir: str) -> None:
  340. """Iterate over chunks received from pack actor and unpack."""
  341. stream = io.BytesIO()
  342. for buffer in _iter_remote(pack_actor):
  343. stream.write(buffer)
  344. _unpack_dir(stream, target_dir=target_dir)
  345. def _copy_dir(
  346. source_dir: str,
  347. target_dir: str,
  348. *,
  349. exclude: Optional[List] = None,
  350. _retry: bool = True,
  351. ) -> None:
  352. """Copy dir with shutil on the actor."""
  353. target_dir = os.path.normpath(target_dir)
  354. try:
  355. # Timeout 0 means there will be only one attempt to acquire
  356. # the file lock. If it cannot be acquired, a TimeoutError
  357. # will be thrown.
  358. with TempFileLock(f"{target_dir}.lock", timeout=0):
  359. _delete_path_unsafe(target_dir)
  360. _ignore_func = None
  361. if exclude:
  362. def _ignore(path, names):
  363. ignored_names = set()
  364. rel_path = os.path.relpath(path, source_dir)
  365. for name in names:
  366. candidate = os.path.join(rel_path, name)
  367. for excl in exclude:
  368. if fnmatch.fnmatch(candidate, excl):
  369. ignored_names.add(name)
  370. break
  371. return ignored_names
  372. _ignore_func = _ignore
  373. shutil.copytree(source_dir, target_dir, ignore=_ignore_func)
  374. except TimeoutError:
  375. # wait, but do not do anything
  376. with TempFileLock(f"{target_dir}.lock"):
  377. pass
  378. # if the dir was locked due to being deleted,
  379. # recreate
  380. if not os.path.exists(target_dir):
  381. if _retry:
  382. _copy_dir(source_dir, target_dir, _retry=False)
  383. else:
  384. raise RuntimeError(
  385. f"Target directory {target_dir} does not exist "
  386. "and couldn't be recreated. "
  387. "Please raise an issue on GitHub: "
  388. "https://github.com/ray-project/ray/issues"
  389. )
  390. # Only export once
  391. _remote_copy_dir = ray.remote(_copy_dir)
  392. def _delete_path_unsafe(target_path: str):
  393. """Delete path (files and directories). No filelock."""
  394. if os.path.exists(target_path):
  395. if os.path.isdir(target_path):
  396. shutil.rmtree(target_path)
  397. else:
  398. os.remove(target_path)
  399. return True
  400. return False