filecheckpoints.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. """
  2. File-based Checkpoints implementations.
  3. """
  4. import os
  5. import shutil
  6. import tempfile
  7. from anyio.to_thread import run_sync
  8. from jupyter_core.utils import ensure_dir_exists
  9. from tornado.web import HTTPError
  10. from traitlets import Unicode
  11. from jupyter_server import _tz as tz
  12. from .checkpoints import (
  13. AsyncCheckpoints,
  14. AsyncGenericCheckpointsMixin,
  15. Checkpoints,
  16. GenericCheckpointsMixin,
  17. )
  18. from .fileio import AsyncFileManagerMixin, FileManagerMixin
  19. class FileCheckpoints(FileManagerMixin, Checkpoints):
  20. """
  21. A Checkpoints that caches checkpoints for files in adjacent
  22. directories.
  23. Only works with FileContentsManager. Use GenericFileCheckpoints if
  24. you want file-based checkpoints with another ContentsManager.
  25. """
  26. checkpoint_dir = Unicode(
  27. ".ipynb_checkpoints",
  28. config=True,
  29. help="""The directory name in which to keep file checkpoints
  30. This is a path relative to the file's own directory.
  31. By default, it is .ipynb_checkpoints
  32. """,
  33. )
  34. root_dir = Unicode(config=True)
  35. def _root_dir_default(self):
  36. if not self.parent:
  37. return os.getcwd()
  38. return self.parent.root_dir
  39. # ContentsManager-dependent checkpoint API
  40. def create_checkpoint(self, contents_mgr, path):
  41. """Create a checkpoint."""
  42. checkpoint_id = "checkpoint"
  43. src_path = contents_mgr._get_os_path(path)
  44. dest_path = self.checkpoint_path(checkpoint_id, path)
  45. self._copy(src_path, dest_path)
  46. return self.checkpoint_model(checkpoint_id, dest_path)
  47. def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
  48. """Restore a checkpoint."""
  49. src_path = self.checkpoint_path(checkpoint_id, path)
  50. dest_path = contents_mgr._get_os_path(path)
  51. self._copy(src_path, dest_path)
  52. # ContentsManager-independent checkpoint API
  53. def rename_checkpoint(self, checkpoint_id, old_path, new_path):
  54. """Rename a checkpoint from old_path to new_path."""
  55. old_cp_path = self.checkpoint_path(checkpoint_id, old_path)
  56. new_cp_path = self.checkpoint_path(checkpoint_id, new_path)
  57. if os.path.isfile(old_cp_path):
  58. self.log.debug(
  59. "Renaming checkpoint %s -> %s",
  60. old_cp_path,
  61. new_cp_path,
  62. )
  63. with self.perm_to_403():
  64. shutil.move(old_cp_path, new_cp_path)
  65. def delete_checkpoint(self, checkpoint_id, path):
  66. """delete a file's checkpoint"""
  67. path = path.strip("/")
  68. cp_path = self.checkpoint_path(checkpoint_id, path)
  69. if not os.path.isfile(cp_path):
  70. self.no_such_checkpoint(path, checkpoint_id)
  71. self.log.debug("unlinking %s", cp_path)
  72. with self.perm_to_403():
  73. os.unlink(cp_path)
  74. def list_checkpoints(self, path):
  75. """list the checkpoints for a given file
  76. This contents manager currently only supports one checkpoint per file.
  77. """
  78. path = path.strip("/")
  79. checkpoint_id = "checkpoint"
  80. os_path = self.checkpoint_path(checkpoint_id, path)
  81. if not os.path.isfile(os_path):
  82. return []
  83. else:
  84. return [self.checkpoint_model(checkpoint_id, os_path)]
  85. # Checkpoint-related utilities
  86. def checkpoint_path(self, checkpoint_id, path):
  87. """find the path to a checkpoint"""
  88. path = path.strip("/")
  89. parent, name = ("/" + path).rsplit("/", 1)
  90. parent = parent.strip("/")
  91. basename, ext = os.path.splitext(name)
  92. filename = f"{basename}-{checkpoint_id}{ext}"
  93. os_path = self._get_os_path(path=parent)
  94. cp_dir = os.path.join(os_path, self.checkpoint_dir)
  95. # If parent directory isn't writable, use system temp
  96. if not os.access(os.path.dirname(cp_dir), os.W_OK):
  97. rel = os.path.relpath(os_path, start=self.root_dir)
  98. cp_dir = os.path.join(tempfile.gettempdir(), "jupyter_checkpoints", rel)
  99. with self.perm_to_403():
  100. ensure_dir_exists(cp_dir)
  101. cp_path = os.path.join(cp_dir, filename)
  102. return cp_path
  103. def checkpoint_model(self, checkpoint_id, os_path):
  104. """construct the info dict for a given checkpoint"""
  105. stats = os.stat(os_path)
  106. last_modified = tz.utcfromtimestamp(stats.st_mtime)
  107. info = {
  108. "id": checkpoint_id,
  109. "last_modified": last_modified,
  110. }
  111. return info
  112. # Error Handling
  113. def no_such_checkpoint(self, path, checkpoint_id):
  114. raise HTTPError(404, f"Checkpoint does not exist: {path}@{checkpoint_id}")
  115. class AsyncFileCheckpoints(FileCheckpoints, AsyncFileManagerMixin, AsyncCheckpoints):
  116. async def create_checkpoint(self, contents_mgr, path):
  117. """Create a checkpoint."""
  118. checkpoint_id = "checkpoint"
  119. src_path = contents_mgr._get_os_path(path)
  120. dest_path = self.checkpoint_path(checkpoint_id, path)
  121. await self._copy(src_path, dest_path)
  122. return await self.checkpoint_model(checkpoint_id, dest_path)
  123. async def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
  124. """Restore a checkpoint."""
  125. src_path = self.checkpoint_path(checkpoint_id, path)
  126. dest_path = contents_mgr._get_os_path(path)
  127. await self._copy(src_path, dest_path)
  128. async def checkpoint_model(self, checkpoint_id, os_path):
  129. """construct the info dict for a given checkpoint"""
  130. stats = await run_sync(os.stat, os_path)
  131. last_modified = tz.utcfromtimestamp(stats.st_mtime)
  132. info = {
  133. "id": checkpoint_id,
  134. "last_modified": last_modified,
  135. }
  136. return info
  137. # ContentsManager-independent checkpoint API
  138. async def rename_checkpoint(self, checkpoint_id, old_path, new_path):
  139. """Rename a checkpoint from old_path to new_path."""
  140. old_cp_path = self.checkpoint_path(checkpoint_id, old_path)
  141. new_cp_path = self.checkpoint_path(checkpoint_id, new_path)
  142. if os.path.isfile(old_cp_path):
  143. self.log.debug(
  144. "Renaming checkpoint %s -> %s",
  145. old_cp_path,
  146. new_cp_path,
  147. )
  148. with self.perm_to_403():
  149. await run_sync(shutil.move, old_cp_path, new_cp_path)
  150. async def delete_checkpoint(self, checkpoint_id, path):
  151. """delete a file's checkpoint"""
  152. path = path.strip("/")
  153. cp_path = self.checkpoint_path(checkpoint_id, path)
  154. if not os.path.isfile(cp_path):
  155. self.no_such_checkpoint(path, checkpoint_id)
  156. self.log.debug("unlinking %s", cp_path)
  157. with self.perm_to_403():
  158. await run_sync(os.unlink, cp_path)
  159. async def list_checkpoints(self, path):
  160. """list the checkpoints for a given file
  161. This contents manager currently only supports one checkpoint per file.
  162. """
  163. path = path.strip("/")
  164. checkpoint_id = "checkpoint"
  165. os_path = self.checkpoint_path(checkpoint_id, path)
  166. if not os.path.isfile(os_path):
  167. return []
  168. else:
  169. return [await self.checkpoint_model(checkpoint_id, os_path)]
  170. class GenericFileCheckpoints(GenericCheckpointsMixin, FileCheckpoints):
  171. """
  172. Local filesystem Checkpoints that works with any conforming
  173. ContentsManager.
  174. """
  175. def create_file_checkpoint(self, content, format, path):
  176. """Create a checkpoint from the current content of a file."""
  177. path = path.strip("/")
  178. # only the one checkpoint ID:
  179. checkpoint_id = "checkpoint"
  180. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  181. self.log.debug("creating checkpoint for %s", path)
  182. with self.perm_to_403():
  183. self._save_file(os_checkpoint_path, content, format=format)
  184. # return the checkpoint info
  185. return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
  186. def create_notebook_checkpoint(self, nb, path):
  187. """Create a checkpoint from the current content of a notebook."""
  188. path = path.strip("/")
  189. # only the one checkpoint ID:
  190. checkpoint_id = "checkpoint"
  191. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  192. self.log.debug("creating checkpoint for %s", path)
  193. with self.perm_to_403():
  194. self._save_notebook(os_checkpoint_path, nb)
  195. # return the checkpoint info
  196. return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
  197. def get_notebook_checkpoint(self, checkpoint_id, path):
  198. """Get a checkpoint for a notebook."""
  199. path = path.strip("/")
  200. self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
  201. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  202. if not os.path.isfile(os_checkpoint_path):
  203. self.no_such_checkpoint(path, checkpoint_id)
  204. return {
  205. "type": "notebook",
  206. "content": self._read_notebook(
  207. os_checkpoint_path,
  208. as_version=4,
  209. ),
  210. }
  211. def get_file_checkpoint(self, checkpoint_id, path):
  212. """Get a checkpoint for a file."""
  213. path = path.strip("/")
  214. self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
  215. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  216. if not os.path.isfile(os_checkpoint_path):
  217. self.no_such_checkpoint(path, checkpoint_id)
  218. content, format = self._read_file(os_checkpoint_path, format=None) # type: ignore[misc]
  219. return {
  220. "type": "file",
  221. "content": content,
  222. "format": format,
  223. }
  224. class AsyncGenericFileCheckpoints(AsyncGenericCheckpointsMixin, AsyncFileCheckpoints):
  225. """
  226. Asynchronous Local filesystem Checkpoints that works with any conforming
  227. ContentsManager.
  228. """
  229. async def create_file_checkpoint(self, content, format, path):
  230. """Create a checkpoint from the current content of a file."""
  231. path = path.strip("/")
  232. # only the one checkpoint ID:
  233. checkpoint_id = "checkpoint"
  234. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  235. self.log.debug("creating checkpoint for %s", path)
  236. with self.perm_to_403():
  237. await self._save_file(os_checkpoint_path, content, format=format)
  238. # return the checkpoint info
  239. return await self.checkpoint_model(checkpoint_id, os_checkpoint_path)
  240. async def create_notebook_checkpoint(self, nb, path):
  241. """Create a checkpoint from the current content of a notebook."""
  242. path = path.strip("/")
  243. # only the one checkpoint ID:
  244. checkpoint_id = "checkpoint"
  245. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  246. self.log.debug("creating checkpoint for %s", path)
  247. with self.perm_to_403():
  248. await self._save_notebook(os_checkpoint_path, nb)
  249. # return the checkpoint info
  250. return await self.checkpoint_model(checkpoint_id, os_checkpoint_path)
  251. async def get_notebook_checkpoint(self, checkpoint_id, path):
  252. """Get a checkpoint for a notebook."""
  253. path = path.strip("/")
  254. self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
  255. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  256. if not os.path.isfile(os_checkpoint_path):
  257. self.no_such_checkpoint(path, checkpoint_id)
  258. return {
  259. "type": "notebook",
  260. "content": await self._read_notebook(
  261. os_checkpoint_path,
  262. as_version=4,
  263. ),
  264. }
  265. async def get_file_checkpoint(self, checkpoint_id, path):
  266. """Get a checkpoint for a file."""
  267. path = path.strip("/")
  268. self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
  269. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  270. if not os.path.isfile(os_checkpoint_path):
  271. self.no_such_checkpoint(path, checkpoint_id)
  272. content, format = await self._read_file(os_checkpoint_path, format=None) # type: ignore[misc]
  273. return {
  274. "type": "file",
  275. "content": content,
  276. "format": format,
  277. }