gist.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import requests
  2. from ..spec import AbstractFileSystem
  3. from ..utils import infer_storage_options
  4. from .memory import MemoryFile
  5. class GistFileSystem(AbstractFileSystem):
  6. """
  7. Interface to files in a single GitHub Gist.
  8. Provides read-only access to a gist's files. Gists do not contain
  9. subdirectories, so file listing is straightforward.
  10. Parameters
  11. ----------
  12. gist_id: str
  13. The ID of the gist you want to access (the long hex value from the URL).
  14. filenames: list[str] (optional)
  15. If provided, only make a file system representing these files, and do not fetch
  16. the list of all files for this gist.
  17. sha: str (optional)
  18. If provided, fetch a particular revision of the gist. If omitted,
  19. the latest revision is used.
  20. username: str (optional)
  21. GitHub username for authentication.
  22. token: str (optional)
  23. GitHub personal access token (required if username is given), or.
  24. timeout: (float, float) or float, optional
  25. Connect and read timeouts for requests (default 60s each).
  26. kwargs: dict
  27. Stored on `self.request_kw` and passed to `requests.get` when fetching Gist
  28. metadata or reading ("opening") a file.
  29. """
  30. protocol = "gist"
  31. gist_url = "https://api.github.com/gists/{gist_id}"
  32. gist_rev_url = "https://api.github.com/gists/{gist_id}/{sha}"
  33. def __init__(
  34. self,
  35. gist_id,
  36. filenames=None,
  37. sha=None,
  38. username=None,
  39. token=None,
  40. timeout=None,
  41. **kwargs,
  42. ):
  43. super().__init__()
  44. self.gist_id = gist_id
  45. self.filenames = filenames
  46. self.sha = sha # revision of the gist (optional)
  47. if username is not None and token is None:
  48. raise ValueError("User auth requires a token")
  49. self.username = username
  50. self.token = token
  51. self.request_kw = kwargs
  52. # Default timeouts to 60s connect/read if none provided
  53. self.timeout = timeout if timeout is not None else (60, 60)
  54. # We use a single-level "directory" cache, because a gist is essentially flat
  55. self.dircache[""] = self._fetch_file_list()
  56. @property
  57. def kw(self):
  58. """Auth parameters passed to 'requests' if we have username/token."""
  59. kw = {
  60. "headers": {
  61. "Accept": "application/vnd.github+json",
  62. "X-GitHub-Api-Version": "2022-11-28",
  63. }
  64. }
  65. kw.update(self.request_kw)
  66. if self.username and self.token:
  67. kw["auth"] = (self.username, self.token)
  68. elif self.token:
  69. kw["headers"]["Authorization"] = f"Bearer {self.token}"
  70. return kw
  71. def _fetch_gist_metadata(self):
  72. """
  73. Fetch the JSON metadata for this gist (possibly for a specific revision).
  74. """
  75. if self.sha:
  76. url = self.gist_rev_url.format(gist_id=self.gist_id, sha=self.sha)
  77. else:
  78. url = self.gist_url.format(gist_id=self.gist_id)
  79. r = requests.get(url, timeout=self.timeout, **self.kw)
  80. if r.status_code == 404:
  81. raise FileNotFoundError(
  82. f"Gist not found: {self.gist_id}@{self.sha or 'latest'}"
  83. )
  84. r.raise_for_status()
  85. return r.json()
  86. def _fetch_file_list(self):
  87. """
  88. Returns a list of dicts describing each file in the gist. These get stored
  89. in self.dircache[""].
  90. """
  91. meta = self._fetch_gist_metadata()
  92. if self.filenames:
  93. available_files = meta.get("files", {})
  94. files = {}
  95. for fn in self.filenames:
  96. if fn not in available_files:
  97. raise FileNotFoundError(fn)
  98. files[fn] = available_files[fn]
  99. else:
  100. files = meta.get("files", {})
  101. out = []
  102. for fname, finfo in files.items():
  103. if finfo is None:
  104. # Occasionally GitHub returns a file entry with null if it was deleted
  105. continue
  106. # Build a directory entry
  107. out.append(
  108. {
  109. "name": fname, # file's name
  110. "type": "file", # gists have no subdirectories
  111. "size": finfo.get("size", 0), # file size in bytes
  112. "raw_url": finfo.get("raw_url"),
  113. }
  114. )
  115. return out
  116. @classmethod
  117. def _strip_protocol(cls, path):
  118. """
  119. Remove 'gist://' from the path, if present.
  120. """
  121. # The default infer_storage_options can handle gist://username:token@id/file
  122. # or gist://id/file, but let's ensure we handle a normal usage too.
  123. # We'll just strip the protocol prefix if it exists.
  124. path = infer_storage_options(path).get("path", path)
  125. return path.lstrip("/")
  126. @staticmethod
  127. def _get_kwargs_from_urls(path):
  128. """
  129. Parse 'gist://' style URLs into GistFileSystem constructor kwargs.
  130. For example:
  131. gist://:TOKEN@<gist_id>/file.txt
  132. gist://username:TOKEN@<gist_id>/file.txt
  133. """
  134. so = infer_storage_options(path)
  135. out = {}
  136. if "username" in so and so["username"]:
  137. out["username"] = so["username"]
  138. if "password" in so and so["password"]:
  139. out["token"] = so["password"]
  140. if "host" in so and so["host"]:
  141. # We interpret 'host' as the gist ID
  142. out["gist_id"] = so["host"]
  143. # Extract SHA and filename from path
  144. if "path" in so and so["path"]:
  145. path_parts = so["path"].rsplit("/", 2)[-2:]
  146. if len(path_parts) == 2:
  147. if path_parts[0]: # SHA present
  148. out["sha"] = path_parts[0]
  149. if path_parts[1]: # filename also present
  150. out["filenames"] = [path_parts[1]]
  151. return out
  152. def ls(self, path="", detail=False, **kwargs):
  153. """
  154. List files in the gist. Gists are single-level, so any 'path' is basically
  155. the filename, or empty for all files.
  156. Parameters
  157. ----------
  158. path : str, optional
  159. The filename to list. If empty, returns all files in the gist.
  160. detail : bool, default False
  161. If True, return a list of dicts; if False, return a list of filenames.
  162. """
  163. path = self._strip_protocol(path or "")
  164. # If path is empty, return all
  165. if path == "":
  166. results = self.dircache[""]
  167. else:
  168. # We want just the single file with this name
  169. all_files = self.dircache[""]
  170. results = [f for f in all_files if f["name"] == path]
  171. if not results:
  172. raise FileNotFoundError(path)
  173. if detail:
  174. return results
  175. else:
  176. return sorted(f["name"] for f in results)
  177. def _open(self, path, mode="rb", block_size=None, **kwargs):
  178. """
  179. Read a single file from the gist.
  180. """
  181. if mode != "rb":
  182. raise NotImplementedError("GitHub Gist FS is read-only (no write).")
  183. path = self._strip_protocol(path)
  184. # Find the file entry in our dircache
  185. matches = [f for f in self.dircache[""] if f["name"] == path]
  186. if not matches:
  187. raise FileNotFoundError(path)
  188. finfo = matches[0]
  189. raw_url = finfo.get("raw_url")
  190. if not raw_url:
  191. raise FileNotFoundError(f"No raw_url for file: {path}")
  192. r = requests.get(raw_url, timeout=self.timeout, **self.kw)
  193. if r.status_code == 404:
  194. raise FileNotFoundError(path)
  195. r.raise_for_status()
  196. return MemoryFile(path, None, r.content)
  197. def cat(self, path, recursive=False, on_error="raise", **kwargs):
  198. """
  199. Return {path: contents} for the given file or files. If 'recursive' is True,
  200. and path is empty, returns all files in the gist.
  201. """
  202. paths = self.expand_path(path, recursive=recursive)
  203. out = {}
  204. for p in paths:
  205. try:
  206. with self.open(p, "rb") as f:
  207. out[p] = f.read()
  208. except FileNotFoundError as e:
  209. if on_error == "raise":
  210. raise e
  211. elif on_error == "omit":
  212. pass # skip
  213. else:
  214. out[p] = e
  215. if len(paths) == 1 and paths[0] == path:
  216. return out[path]
  217. return out