workspaces_handler.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. """Tornado handlers for frontend config storage."""
  2. # Copyright (c) Jupyter Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. from __future__ import annotations
  5. import hashlib
  6. import json
  7. import re
  8. import unicodedata
  9. import urllib
  10. from pathlib import Path
  11. from typing import Any
  12. from jupyter_server import _tz as tz
  13. from jupyter_server.base.handlers import APIHandler
  14. from jupyter_server.extension.handler import ExtensionHandlerJinjaMixin, ExtensionHandlerMixin
  15. from jupyter_server.utils import url_path_join as ujoin
  16. from tornado import web
  17. from traitlets.config import LoggingConfigurable
  18. # The JupyterLab workspace file extension.
  19. WORKSPACE_EXTENSION = ".jupyterlab-workspace"
  20. def _list_workspaces(directory: Path, prefix: str) -> list[dict[str, Any]]:
  21. """
  22. Return the list of workspaces in a given directory beginning with the
  23. given prefix.
  24. """
  25. workspaces: list = []
  26. if not directory.exists():
  27. return workspaces
  28. items = [
  29. item
  30. for item in directory.iterdir()
  31. if item.name.startswith(prefix) and item.name.endswith(WORKSPACE_EXTENSION)
  32. ]
  33. items.sort()
  34. for slug in items:
  35. workspace_path: Path = directory / slug
  36. if workspace_path.exists():
  37. workspace = _load_with_file_times(workspace_path)
  38. workspaces.append(workspace)
  39. return workspaces
  40. def _load_with_file_times(workspace_path: Path) -> dict:
  41. """
  42. Load workspace JSON from disk, overwriting the `created` and `last_modified`
  43. metadata with current file stat information
  44. """
  45. stat = workspace_path.stat()
  46. with workspace_path.open(encoding="utf-8") as fid:
  47. workspace = json.load(fid)
  48. workspace["metadata"].update(
  49. last_modified=tz.utcfromtimestamp(stat.st_mtime).isoformat(),
  50. created=tz.utcfromtimestamp(stat.st_ctime).isoformat(),
  51. )
  52. return workspace
  53. def slugify(
  54. raw: str, base: str = "", sign: bool = True, max_length: int = 128 - len(WORKSPACE_EXTENSION)
  55. ) -> str:
  56. """
  57. Use the common superset of raw and base values to build a slug shorter
  58. than max_length. By default, base value is an empty string.
  59. Convert spaces to hyphens. Remove characters that aren't alphanumerics
  60. underscores, or hyphens. Convert to lowercase. Strip leading and trailing
  61. whitespace.
  62. Add an optional short signature suffix to prevent collisions.
  63. Modified from Django utils:
  64. https://github.com/django/django/blob/master/django/utils/text.py
  65. """
  66. raw = raw if raw.startswith("/") else "/" + raw
  67. signature = ""
  68. if sign:
  69. data = raw[1:] # Remove initial slash that always exists for digest.
  70. signature = "-" + hashlib.sha256(data.encode("utf-8")).hexdigest()[:4]
  71. base = (base if base.startswith("/") else "/" + base).lower()
  72. raw = raw.lower()
  73. common = 0
  74. limit = min(len(base), len(raw))
  75. while common < limit and base[common] == raw[common]:
  76. common += 1
  77. value = ujoin(base[common:], raw)
  78. value = urllib.parse.unquote(value)
  79. value = unicodedata.normalize("NFKC", value).encode("ascii", "ignore").decode("ascii")
  80. value = re.sub(r"[^\w\s-]", "", value).strip()
  81. value = re.sub(r"[-\s]+", "-", value)
  82. return value[: max_length - len(signature)] + signature
  83. class WorkspacesManager(LoggingConfigurable):
  84. """A manager for workspaces."""
  85. def __init__(self, path: str) -> None:
  86. """Initialize a workspaces manager with content in ``path``."""
  87. super()
  88. if not path:
  89. msg = "Workspaces directory is not set"
  90. raise ValueError(msg)
  91. self.workspaces_dir = Path(path)
  92. def delete(self, space_name: str) -> None:
  93. """Remove a workspace ``space_name``."""
  94. slug = slugify(space_name)
  95. workspace_path = self.workspaces_dir / (slug + WORKSPACE_EXTENSION)
  96. if not workspace_path.exists():
  97. msg = f"Workspace {space_name!r} ({slug!r}) not found"
  98. raise FileNotFoundError(msg)
  99. # to delete the workspace file.
  100. workspace_path.unlink()
  101. def list_workspaces(self) -> list:
  102. """List all available workspaces."""
  103. prefix = slugify("", sign=False)
  104. return _list_workspaces(self.workspaces_dir, prefix)
  105. def load(self, space_name: str) -> dict:
  106. """Load the workspace ``space_name``."""
  107. slug = slugify(space_name)
  108. workspace_path = self.workspaces_dir / (slug + WORKSPACE_EXTENSION)
  109. if workspace_path.exists():
  110. # to load and parse the workspace file.
  111. return _load_with_file_times(workspace_path)
  112. _id = space_name if space_name.startswith("/") else "/" + space_name
  113. return dict(data=dict(), metadata=dict(id=_id))
  114. def save(self, space_name: str, raw: str) -> Path:
  115. """Save the ``raw`` data as workspace ``space_name``."""
  116. if not self.workspaces_dir.exists():
  117. self.workspaces_dir.mkdir(parents=True)
  118. workspace = {}
  119. # Make sure the data is valid JSON.
  120. try:
  121. decoder = json.JSONDecoder()
  122. workspace = decoder.decode(raw)
  123. except Exception as e:
  124. raise ValueError(str(e)) from e
  125. # Make sure metadata ID matches the workspace name.
  126. # Transparently support an optional initial root `/`.
  127. metadata_id = workspace["metadata"]["id"]
  128. metadata_id = metadata_id if metadata_id.startswith("/") else "/" + metadata_id
  129. metadata_id = urllib.parse.unquote(metadata_id)
  130. if metadata_id != "/" + space_name:
  131. message = f"Workspace metadata ID mismatch: expected {space_name!r} got {metadata_id!r}"
  132. raise ValueError(message)
  133. slug = slugify(space_name)
  134. workspace_path = self.workspaces_dir / (slug + WORKSPACE_EXTENSION)
  135. # Write the workspace data to a file.
  136. workspace_path.write_text(raw, encoding="utf-8")
  137. return workspace_path
  138. class WorkspacesHandler(ExtensionHandlerMixin, ExtensionHandlerJinjaMixin, APIHandler):
  139. """A workspaces API handler."""
  140. def initialize(self, name: str, manager: WorkspacesManager, **kwargs: Any) -> None: # noqa: ARG002
  141. """Initialize the handler."""
  142. super().initialize(name)
  143. self.manager = manager
  144. @web.authenticated
  145. def delete(self, space_name: str) -> None:
  146. """Remove a workspace"""
  147. if not space_name:
  148. raise web.HTTPError(400, "Workspace name is required for DELETE")
  149. try:
  150. self.manager.delete(space_name)
  151. return self.set_status(204)
  152. except FileNotFoundError as e:
  153. raise web.HTTPError(404, str(e)) from e
  154. except Exception as e: # pragma: no cover
  155. raise web.HTTPError(500, str(e)) from e
  156. @web.authenticated
  157. async def get(self, space_name: str = "") -> Any:
  158. """Get workspace(s) data"""
  159. try:
  160. if not space_name:
  161. workspaces = self.manager.list_workspaces()
  162. ids = []
  163. values = []
  164. for workspace in workspaces:
  165. ids.append(workspace["metadata"]["id"])
  166. values.append(workspace)
  167. return self.finish(json.dumps({"workspaces": {"ids": ids, "values": values}}))
  168. workspace = self.manager.load(space_name)
  169. return self.finish(json.dumps(workspace))
  170. except Exception as e: # pragma: no cover
  171. raise web.HTTPError(500, str(e)) from e
  172. @web.authenticated
  173. def put(self, space_name: str = "") -> None:
  174. """Update workspace data"""
  175. if not space_name:
  176. raise web.HTTPError(400, "Workspace name is required for PUT.")
  177. raw = self.request.body.strip().decode("utf-8")
  178. # Make sure the data is valid JSON.
  179. try:
  180. self.manager.save(space_name, raw)
  181. except ValueError as e:
  182. raise web.HTTPError(400, str(e)) from e
  183. except Exception as e: # pragma: no cover
  184. raise web.HTTPError(500, str(e)) from e
  185. self.set_status(204)