licenses_handler.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. """Manager and Tornado handlers for license reporting."""
  2. # Copyright (c) Jupyter Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. from __future__ import annotations
  5. import asyncio
  6. import csv
  7. import io
  8. import json
  9. import mimetypes
  10. import re
  11. from concurrent.futures import ThreadPoolExecutor
  12. from pathlib import Path
  13. from typing import TYPE_CHECKING, Any
  14. from jupyter_server.base.handlers import APIHandler
  15. from tornado import web
  16. from traitlets import List, Unicode
  17. from traitlets.config import LoggingConfigurable
  18. from .config import get_federated_extensions
  19. # this is duplicated in @juptyerlab/builder
  20. DEFAULT_THIRD_PARTY_LICENSE_FILE = "third-party-licenses.json"
  21. UNKNOWN_PACKAGE_NAME = "UNKNOWN"
  22. if mimetypes.guess_extension("text/markdown") is None: # pragma: no cover
  23. # for python <3.8 https://bugs.python.org/issue39324
  24. mimetypes.add_type("text/markdown", ".md")
  25. class LicensesManager(LoggingConfigurable):
  26. """A manager for listing the licenses for all frontend end code distributed
  27. by an application and any federated extensions
  28. """
  29. executor = ThreadPoolExecutor(max_workers=1)
  30. third_party_licenses_files = List(
  31. Unicode(),
  32. default_value=[
  33. DEFAULT_THIRD_PARTY_LICENSE_FILE,
  34. f"static/{DEFAULT_THIRD_PARTY_LICENSE_FILE}",
  35. ],
  36. help="the license report data in built app and federated extensions",
  37. )
  38. @property
  39. def federated_extensions(self) -> dict[str, Any]:
  40. """Lazily load the currrently-available federated extensions.
  41. This is expensive, but probably the only way to be sure to get
  42. up-to-date license information for extensions installed interactively.
  43. """
  44. if TYPE_CHECKING:
  45. from .app import LabServerApp
  46. assert isinstance(self.parent, LabServerApp)
  47. per_paths = [
  48. self.parent.labextensions_path,
  49. self.parent.extra_labextensions_path,
  50. ]
  51. labextensions_path = [extension for extensions in per_paths for extension in extensions]
  52. return get_federated_extensions(labextensions_path)
  53. async def report_async(
  54. self, report_format: str = "markdown", bundles_pattern: str = ".*", full_text: bool = False
  55. ) -> tuple[str, str]:
  56. """Asynchronous wrapper around the potentially slow job of locating
  57. and encoding all of the licenses
  58. """
  59. return await asyncio.wrap_future(
  60. self.executor.submit(
  61. self.report,
  62. report_format=report_format,
  63. bundles_pattern=bundles_pattern,
  64. full_text=full_text,
  65. )
  66. )
  67. def report(self, report_format: str, bundles_pattern: str, full_text: bool) -> tuple[str, str]:
  68. """create a human- or machine-readable report"""
  69. bundles = self.bundles(bundles_pattern=bundles_pattern)
  70. if report_format == "json":
  71. return self.report_json(bundles), "application/json"
  72. if report_format == "csv":
  73. return self.report_csv(bundles), "text/csv"
  74. if report_format == "markdown":
  75. return (
  76. self.report_markdown(bundles, full_text=full_text),
  77. "text/markdown",
  78. )
  79. msg = f"Unsupported report format {report_format}."
  80. raise ValueError(msg)
  81. def report_json(self, bundles: dict[str, Any]) -> str:
  82. """create a JSON report
  83. TODO: SPDX
  84. """
  85. return json.dumps({"bundles": bundles}, indent=2, sort_keys=True)
  86. def report_csv(self, bundles: dict[str, Any]) -> str:
  87. """create a CSV report"""
  88. outfile = io.StringIO()
  89. fieldnames = ["name", "versionInfo", "licenseId", "extractedText"]
  90. writer = csv.DictWriter(outfile, fieldnames=["bundle", *fieldnames])
  91. writer.writeheader()
  92. for bundle_name, bundle in bundles.items():
  93. for package in bundle["packages"]:
  94. writer.writerow(
  95. {
  96. "bundle": bundle_name,
  97. **{field: package.get(field, "") for field in fieldnames},
  98. }
  99. )
  100. return outfile.getvalue()
  101. def report_markdown(self, bundles: dict[str, Any], full_text: bool = True) -> str:
  102. """create a markdown report"""
  103. lines = []
  104. library_names = [
  105. len(package.get("name", UNKNOWN_PACKAGE_NAME))
  106. for bundle_name, bundle in bundles.items()
  107. for package in bundle.get("packages", [])
  108. ]
  109. longest_name = max(library_names) if library_names else 1
  110. for bundle_name, bundle in bundles.items():
  111. # TODO: parametrize template
  112. lines += [f"# {bundle_name}", ""]
  113. packages = bundle.get("packages", [])
  114. if not packages:
  115. lines += ["> No licenses found", ""]
  116. continue
  117. for package in packages:
  118. name = package.get("name", UNKNOWN_PACKAGE_NAME).strip()
  119. version_info = package.get("versionInfo", UNKNOWN_PACKAGE_NAME).strip()
  120. license_id = package.get("licenseId", UNKNOWN_PACKAGE_NAME).strip()
  121. extracted_text = package.get("extractedText", "")
  122. lines += [
  123. "## "
  124. + (
  125. "\t".join(
  126. [
  127. f"""**{name}**""".ljust(longest_name),
  128. f"""`{version_info}`""".ljust(20),
  129. license_id,
  130. ]
  131. )
  132. )
  133. ]
  134. if full_text:
  135. if not extracted_text:
  136. lines += ["", "> No license text available", ""]
  137. else:
  138. lines += ["", "", "<pre/>", extracted_text, "</pre>", ""]
  139. return "\n".join(lines)
  140. def license_bundle(self, path: Path, bundle: str | None) -> dict[str, Any]:
  141. """Return the content of a packages's license bundles"""
  142. bundle_json: dict = {"packages": []}
  143. checked_paths = []
  144. for license_file in self.third_party_licenses_files:
  145. licenses_path = path / license_file
  146. self.log.debug("Loading licenses from %s", licenses_path)
  147. if not licenses_path.exists():
  148. checked_paths += [licenses_path]
  149. continue
  150. try:
  151. file_json = json.loads(licenses_path.read_text(encoding="utf-8"))
  152. except Exception as err:
  153. self.log.warning(
  154. "Failed to open third-party licenses for %s: %s\n%s",
  155. bundle,
  156. licenses_path,
  157. err,
  158. )
  159. continue
  160. try:
  161. bundle_json["packages"].extend(file_json["packages"])
  162. except Exception as err:
  163. self.log.warning(
  164. "Failed to find packages for %s: %s\n%s",
  165. bundle,
  166. licenses_path,
  167. err,
  168. )
  169. continue
  170. if not bundle_json["packages"]:
  171. self.log.warning("Third-party licenses not found for %s: %s", bundle, checked_paths)
  172. return bundle_json
  173. def app_static_info(self) -> tuple[Path | None, str | None]:
  174. """get the static directory for this app
  175. This will usually be in `static_dir`, but may also appear in the
  176. parent of `static_dir`.
  177. """
  178. if TYPE_CHECKING:
  179. from .app import LabServerApp
  180. assert isinstance(self.parent, LabServerApp)
  181. path = Path(self.parent.static_dir)
  182. package_json = path / "package.json"
  183. if not package_json.exists():
  184. parent_package_json = path.parent / "package.json"
  185. if parent_package_json.exists():
  186. package_json = parent_package_json
  187. else:
  188. return None, None
  189. name = json.loads(package_json.read_text(encoding="utf-8"))["name"]
  190. return path, name
  191. def bundles(self, bundles_pattern: str = ".*") -> dict[str, Any]:
  192. """Read all of the licenses
  193. TODO: schema
  194. """
  195. bundles = {
  196. name: self.license_bundle(Path(ext["ext_path"]), name)
  197. for name, ext in self.federated_extensions.items()
  198. if re.match(bundles_pattern, name)
  199. }
  200. app_path, app_name = self.app_static_info()
  201. if app_path is not None:
  202. assert app_name is not None
  203. if re.match(bundles_pattern, app_name):
  204. bundles[app_name] = self.license_bundle(app_path, app_name)
  205. if not bundles:
  206. self.log.warning("No license bundles found at all")
  207. return bundles
  208. class LicensesHandler(APIHandler):
  209. """A handler for serving licenses used by the application"""
  210. def initialize(self, manager: LicensesManager) -> None:
  211. """Initialize the handler."""
  212. super().initialize()
  213. self.manager = manager
  214. @web.authenticated
  215. async def get(self, _args: Any) -> None:
  216. """Return all the frontend licenses"""
  217. full_text = bool(json.loads(self.get_argument("full_text", "true")))
  218. report_format = self.get_argument("format", "json")
  219. bundles_pattern = self.get_argument("bundles", ".*")
  220. download = bool(json.loads(self.get_argument("download", "0")))
  221. report, mime = await self.manager.report_async(
  222. report_format=report_format,
  223. bundles_pattern=bundles_pattern,
  224. full_text=full_text,
  225. )
  226. if TYPE_CHECKING:
  227. from .app import LabServerApp
  228. assert isinstance(self.manager.parent, LabServerApp)
  229. if download:
  230. filename = "{}-licenses{}".format(
  231. self.manager.parent.app_name.lower(), mimetypes.guess_extension(mime)
  232. )
  233. self.set_attachment_header(filename)
  234. self.write(report)
  235. await self.finish(_mime_type=mime)
  236. async def finish( # type:ignore[override]
  237. self, _mime_type: str, *args: Any, **kwargs: Any
  238. ) -> Any:
  239. """Overload the regular finish, which (sensibly) always sets JSON"""
  240. self.update_api_activity()
  241. self.set_header("Content-Type", _mime_type)
  242. return await super(APIHandler, self).finish(*args, **kwargs)