http_sync.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937
  1. """This file is largely copied from http.py"""
  2. import io
  3. import logging
  4. import re
  5. import urllib.error
  6. import urllib.parse
  7. from copy import copy
  8. from json import dumps, loads
  9. from urllib.parse import urlparse
  10. try:
  11. import yarl
  12. except (ImportError, ModuleNotFoundError, OSError):
  13. yarl = False
  14. from fsspec.callbacks import _DEFAULT_CALLBACK
  15. from fsspec.registry import register_implementation
  16. from fsspec.spec import AbstractBufferedFile, AbstractFileSystem
  17. from fsspec.utils import DEFAULT_BLOCK_SIZE, isfilelike, nullcontext, tokenize
  18. from ..caching import AllBytes
  19. # https://stackoverflow.com/a/15926317/3821154
  20. ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)""")
  21. ex2 = re.compile(r"""(?P<url>http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""")
  22. logger = logging.getLogger("fsspec.http")
  23. class JsHttpException(urllib.error.HTTPError): ...
  24. class StreamIO(io.BytesIO):
  25. # fake class, so you can set attributes on it
  26. # will eventually actually stream
  27. ...
  28. class ResponseProxy:
  29. """Looks like a requests response"""
  30. def __init__(self, req, stream=False):
  31. self.request = req
  32. self.stream = stream
  33. self._data = None
  34. self._headers = None
  35. @property
  36. def raw(self):
  37. if self._data is None:
  38. b = self.request.response.to_bytes()
  39. if self.stream:
  40. self._data = StreamIO(b)
  41. else:
  42. self._data = b
  43. return self._data
  44. def close(self):
  45. if hasattr(self, "_data"):
  46. del self._data
  47. @property
  48. def headers(self):
  49. if self._headers is None:
  50. self._headers = dict(
  51. [
  52. _.split(": ")
  53. for _ in self.request.getAllResponseHeaders().strip().split("\r\n")
  54. ]
  55. )
  56. return self._headers
  57. @property
  58. def status_code(self):
  59. return int(self.request.status)
  60. def raise_for_status(self):
  61. if not self.ok:
  62. raise JsHttpException(
  63. self.url, self.status_code, self.reason, self.headers, None
  64. )
  65. def iter_content(self, chunksize, *_, **__):
  66. while True:
  67. out = self.raw.read(chunksize)
  68. if out:
  69. yield out
  70. else:
  71. break
  72. @property
  73. def reason(self):
  74. return self.request.statusText
  75. @property
  76. def ok(self):
  77. return self.status_code < 400
  78. @property
  79. def url(self):
  80. return self.request.response.responseURL
  81. @property
  82. def text(self):
  83. # TODO: encoding from headers
  84. return self.content.decode()
  85. @property
  86. def content(self):
  87. self.stream = False
  88. return self.raw
  89. def json(self):
  90. return loads(self.text)
  91. class RequestsSessionShim:
  92. def __init__(self):
  93. self.headers = {}
  94. def request(
  95. self,
  96. method,
  97. url,
  98. params=None,
  99. data=None,
  100. headers=None,
  101. cookies=None,
  102. files=None,
  103. auth=None,
  104. timeout=None,
  105. allow_redirects=None,
  106. proxies=None,
  107. hooks=None,
  108. stream=None,
  109. verify=None,
  110. cert=None,
  111. json=None,
  112. ):
  113. from js import Blob, XMLHttpRequest
  114. logger.debug("JS request: %s %s", method, url)
  115. if cert or verify or proxies or files or cookies or hooks:
  116. raise NotImplementedError
  117. if data and json:
  118. raise ValueError("Use json= or data=, not both")
  119. req = XMLHttpRequest.new()
  120. extra = auth if auth else ()
  121. if params:
  122. url = f"{url}?{urllib.parse.urlencode(params)}"
  123. req.open(method, url, False, *extra)
  124. if timeout:
  125. req.timeout = timeout
  126. if headers:
  127. for k, v in headers.items():
  128. req.setRequestHeader(k, v)
  129. req.setRequestHeader("Accept", "application/octet-stream")
  130. req.responseType = "arraybuffer"
  131. if json:
  132. blob = Blob.new([dumps(data)], {type: "application/json"})
  133. req.send(blob)
  134. elif data:
  135. if isinstance(data, io.IOBase):
  136. data = data.read()
  137. blob = Blob.new([data], {type: "application/octet-stream"})
  138. req.send(blob)
  139. else:
  140. req.send(None)
  141. return ResponseProxy(req, stream=stream)
  142. def get(self, url, **kwargs):
  143. return self.request("GET", url, **kwargs)
  144. def head(self, url, **kwargs):
  145. return self.request("HEAD", url, **kwargs)
  146. def post(self, url, **kwargs):
  147. return self.request("POST}", url, **kwargs)
  148. def put(self, url, **kwargs):
  149. return self.request("PUT", url, **kwargs)
  150. def patch(self, url, **kwargs):
  151. return self.request("PATCH", url, **kwargs)
  152. def delete(self, url, **kwargs):
  153. return self.request("DELETE", url, **kwargs)
  154. class HTTPFileSystem(AbstractFileSystem):
  155. """
  156. Simple File-System for fetching data via HTTP(S)
  157. This is the BLOCKING version of the normal HTTPFileSystem. It uses
  158. requests in normal python and the JS runtime in pyodide.
  159. ***This implementation is extremely experimental, do not use unless
  160. you are testing pyodide/pyscript integration***
  161. """
  162. protocol = ("http", "https", "sync-http", "sync-https")
  163. sep = "/"
  164. def __init__(
  165. self,
  166. simple_links=True,
  167. block_size=None,
  168. same_scheme=True,
  169. cache_type="readahead",
  170. cache_options=None,
  171. client_kwargs=None,
  172. encoded=False,
  173. **storage_options,
  174. ):
  175. """
  176. Parameters
  177. ----------
  178. block_size: int
  179. Blocks to read bytes; if 0, will default to raw requests file-like
  180. objects instead of HTTPFile instances
  181. simple_links: bool
  182. If True, will consider both HTML <a> tags and anything that looks
  183. like a URL; if False, will consider only the former.
  184. same_scheme: True
  185. When doing ls/glob, if this is True, only consider paths that have
  186. http/https matching the input URLs.
  187. size_policy: this argument is deprecated
  188. client_kwargs: dict
  189. Passed to aiohttp.ClientSession, see
  190. https://docs.aiohttp.org/en/stable/client_reference.html
  191. For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}``
  192. storage_options: key-value
  193. Any other parameters passed on to requests
  194. cache_type, cache_options: defaults used in open
  195. """
  196. super().__init__(self, **storage_options)
  197. self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE
  198. self.simple_links = simple_links
  199. self.same_schema = same_scheme
  200. self.cache_type = cache_type
  201. self.cache_options = cache_options
  202. self.client_kwargs = client_kwargs or {}
  203. self.encoded = encoded
  204. self.kwargs = storage_options
  205. try:
  206. import js # noqa: F401
  207. logger.debug("Starting JS session")
  208. self.session = RequestsSessionShim()
  209. self.js = True
  210. except Exception as e:
  211. import requests
  212. logger.debug("Starting cpython session because of: %s", e)
  213. self.session = requests.Session(**(client_kwargs or {}))
  214. self.js = False
  215. request_options = copy(storage_options)
  216. self.use_listings_cache = request_options.pop("use_listings_cache", False)
  217. request_options.pop("listings_expiry_time", None)
  218. request_options.pop("max_paths", None)
  219. request_options.pop("skip_instance_cache", None)
  220. self.kwargs = request_options
  221. @property
  222. def fsid(self):
  223. return "sync-http"
  224. def encode_url(self, url):
  225. if yarl:
  226. return yarl.URL(url, encoded=self.encoded)
  227. return url
  228. @classmethod
  229. def _strip_protocol(cls, path: str) -> str:
  230. """For HTTP, we always want to keep the full URL"""
  231. path = path.replace("sync-http://", "http://").replace(
  232. "sync-https://", "https://"
  233. )
  234. return path
  235. @classmethod
  236. def _parent(cls, path):
  237. # override, since _strip_protocol is different for URLs
  238. par = super()._parent(path)
  239. if len(par) > 7: # "http://..."
  240. return par
  241. return ""
  242. def _ls_real(self, url, detail=True, **kwargs):
  243. # ignoring URL-encoded arguments
  244. kw = self.kwargs.copy()
  245. kw.update(kwargs)
  246. logger.debug(url)
  247. r = self.session.get(self.encode_url(url), **self.kwargs)
  248. self._raise_not_found_for_status(r, url)
  249. text = r.text
  250. if self.simple_links:
  251. links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
  252. else:
  253. links = [u[2] for u in ex.findall(text)]
  254. out = set()
  255. parts = urlparse(url)
  256. for l in links:
  257. if isinstance(l, tuple):
  258. l = l[1]
  259. if l.startswith("/") and len(l) > 1:
  260. # absolute URL on this server
  261. l = parts.scheme + "://" + parts.netloc + l
  262. if l.startswith("http"):
  263. if self.same_schema and l.startswith(url.rstrip("/") + "/"):
  264. out.add(l)
  265. elif l.replace("https", "http").startswith(
  266. url.replace("https", "http").rstrip("/") + "/"
  267. ):
  268. # allowed to cross http <-> https
  269. out.add(l)
  270. else:
  271. if l not in ["..", "../"]:
  272. # Ignore FTP-like "parent"
  273. out.add("/".join([url.rstrip("/"), l.lstrip("/")]))
  274. if not out and url.endswith("/"):
  275. out = self._ls_real(url.rstrip("/"), detail=False)
  276. if detail:
  277. return [
  278. {
  279. "name": u,
  280. "size": None,
  281. "type": "directory" if u.endswith("/") else "file",
  282. }
  283. for u in out
  284. ]
  285. else:
  286. return sorted(out)
  287. def ls(self, url, detail=True, **kwargs):
  288. if self.use_listings_cache and url in self.dircache:
  289. out = self.dircache[url]
  290. else:
  291. out = self._ls_real(url, detail=detail, **kwargs)
  292. self.dircache[url] = out
  293. return out
  294. def _raise_not_found_for_status(self, response, url):
  295. """
  296. Raises FileNotFoundError for 404s, otherwise uses raise_for_status.
  297. """
  298. if response.status_code == 404:
  299. raise FileNotFoundError(url)
  300. response.raise_for_status()
  301. def cat_file(self, url, start=None, end=None, **kwargs):
  302. kw = self.kwargs.copy()
  303. kw.update(kwargs)
  304. logger.debug(url)
  305. if start is not None or end is not None:
  306. if start == end:
  307. return b""
  308. headers = kw.pop("headers", {}).copy()
  309. headers["Range"] = self._process_limits(url, start, end)
  310. kw["headers"] = headers
  311. r = self.session.get(self.encode_url(url), **kw)
  312. self._raise_not_found_for_status(r, url)
  313. return r.content
  314. def get_file(
  315. self, rpath, lpath, chunk_size=5 * 2**20, callback=_DEFAULT_CALLBACK, **kwargs
  316. ):
  317. kw = self.kwargs.copy()
  318. kw.update(kwargs)
  319. logger.debug(rpath)
  320. r = self.session.get(self.encode_url(rpath), **kw)
  321. try:
  322. size = int(
  323. r.headers.get("content-length", None)
  324. or r.headers.get("Content-Length", None)
  325. )
  326. except (ValueError, KeyError, TypeError):
  327. size = None
  328. callback.set_size(size)
  329. self._raise_not_found_for_status(r, rpath)
  330. if not isfilelike(lpath):
  331. lpath = open(lpath, "wb")
  332. for chunk in r.iter_content(chunk_size, decode_unicode=False):
  333. lpath.write(chunk)
  334. callback.relative_update(len(chunk))
  335. def put_file(
  336. self,
  337. lpath,
  338. rpath,
  339. chunk_size=5 * 2**20,
  340. callback=_DEFAULT_CALLBACK,
  341. method="post",
  342. **kwargs,
  343. ):
  344. def gen_chunks():
  345. # Support passing arbitrary file-like objects
  346. # and use them instead of streams.
  347. if isinstance(lpath, io.IOBase):
  348. context = nullcontext(lpath)
  349. use_seek = False # might not support seeking
  350. else:
  351. context = open(lpath, "rb")
  352. use_seek = True
  353. with context as f:
  354. if use_seek:
  355. callback.set_size(f.seek(0, 2))
  356. f.seek(0)
  357. else:
  358. callback.set_size(getattr(f, "size", None))
  359. chunk = f.read(chunk_size)
  360. while chunk:
  361. yield chunk
  362. callback.relative_update(len(chunk))
  363. chunk = f.read(chunk_size)
  364. kw = self.kwargs.copy()
  365. kw.update(kwargs)
  366. method = method.lower()
  367. if method not in ("post", "put"):
  368. raise ValueError(
  369. f"method has to be either 'post' or 'put', not: {method!r}"
  370. )
  371. meth = getattr(self.session, method)
  372. resp = meth(rpath, data=gen_chunks(), **kw)
  373. self._raise_not_found_for_status(resp, rpath)
  374. def _process_limits(self, url, start, end):
  375. """Helper for "Range"-based _cat_file"""
  376. size = None
  377. suff = False
  378. if start is not None and start < 0:
  379. # if start is negative and end None, end is the "suffix length"
  380. if end is None:
  381. end = -start
  382. start = ""
  383. suff = True
  384. else:
  385. size = size or self.info(url)["size"]
  386. start = size + start
  387. elif start is None:
  388. start = 0
  389. if not suff:
  390. if end is not None and end < 0:
  391. if start is not None:
  392. size = size or self.info(url)["size"]
  393. end = size + end
  394. elif end is None:
  395. end = ""
  396. if isinstance(end, int):
  397. end -= 1 # bytes range is inclusive
  398. return f"bytes={start}-{end}"
  399. def exists(self, path, strict=False, **kwargs):
  400. kw = self.kwargs.copy()
  401. kw.update(kwargs)
  402. try:
  403. logger.debug(path)
  404. r = self.session.get(self.encode_url(path), **kw)
  405. if strict:
  406. self._raise_not_found_for_status(r, path)
  407. return r.status_code < 400
  408. except FileNotFoundError:
  409. return False
  410. except Exception:
  411. if strict:
  412. raise
  413. return False
  414. def isfile(self, path, **kwargs):
  415. return self.exists(path, **kwargs)
  416. def _open(
  417. self,
  418. path,
  419. mode="rb",
  420. block_size=None,
  421. autocommit=None, # XXX: This differs from the base class.
  422. cache_type=None,
  423. cache_options=None,
  424. size=None,
  425. **kwargs,
  426. ):
  427. """Make a file-like object
  428. Parameters
  429. ----------
  430. path: str
  431. Full URL with protocol
  432. mode: string
  433. must be "rb"
  434. block_size: int or None
  435. Bytes to download in one request; use instance value if None. If
  436. zero, will return a streaming Requests file-like instance.
  437. kwargs: key-value
  438. Any other parameters, passed to requests calls
  439. """
  440. if mode != "rb":
  441. raise NotImplementedError
  442. block_size = block_size if block_size is not None else self.block_size
  443. kw = self.kwargs.copy()
  444. kw.update(kwargs)
  445. size = size or self.info(path, **kwargs)["size"]
  446. if block_size and size:
  447. return HTTPFile(
  448. self,
  449. path,
  450. session=self.session,
  451. block_size=block_size,
  452. mode=mode,
  453. size=size,
  454. cache_type=cache_type or self.cache_type,
  455. cache_options=cache_options or self.cache_options,
  456. **kw,
  457. )
  458. else:
  459. return HTTPStreamFile(
  460. self,
  461. path,
  462. mode=mode,
  463. session=self.session,
  464. **kw,
  465. )
  466. def ukey(self, url):
  467. """Unique identifier; assume HTTP files are static, unchanging"""
  468. return tokenize(url, self.kwargs, self.protocol)
  469. def info(self, url, **kwargs):
  470. """Get info of URL
  471. Tries to access location via HEAD, and then GET methods, but does
  472. not fetch the data.
  473. It is possible that the server does not supply any size information, in
  474. which case size will be given as None (and certain operations on the
  475. corresponding file will not work).
  476. """
  477. info = {}
  478. for policy in ["head", "get"]:
  479. try:
  480. info.update(
  481. _file_info(
  482. self.encode_url(url),
  483. size_policy=policy,
  484. session=self.session,
  485. **self.kwargs,
  486. **kwargs,
  487. )
  488. )
  489. if info.get("size") is not None:
  490. break
  491. except Exception as exc:
  492. if policy == "get":
  493. # If get failed, then raise a FileNotFoundError
  494. raise FileNotFoundError(url) from exc
  495. logger.debug(str(exc))
  496. return {"name": url, "size": None, **info, "type": "file"}
  497. def glob(self, path, maxdepth=None, **kwargs):
  498. """
  499. Find files by glob-matching.
  500. This implementation is idntical to the one in AbstractFileSystem,
  501. but "?" is not considered as a character for globbing, because it is
  502. so common in URLs, often identifying the "query" part.
  503. """
  504. import re
  505. ends = path.endswith("/")
  506. path = self._strip_protocol(path)
  507. indstar = path.find("*") if path.find("*") >= 0 else len(path)
  508. indbrace = path.find("[") if path.find("[") >= 0 else len(path)
  509. ind = min(indstar, indbrace)
  510. detail = kwargs.pop("detail", False)
  511. if not has_magic(path):
  512. root = path
  513. depth = 1
  514. if ends:
  515. path += "/*"
  516. elif self.exists(path):
  517. if not detail:
  518. return [path]
  519. else:
  520. return {path: self.info(path)}
  521. else:
  522. if not detail:
  523. return [] # glob of non-existent returns empty
  524. else:
  525. return {}
  526. elif "/" in path[:ind]:
  527. ind2 = path[:ind].rindex("/")
  528. root = path[: ind2 + 1]
  529. depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1
  530. else:
  531. root = ""
  532. depth = None if "**" in path else path[ind + 1 :].count("/") + 1
  533. allpaths = self.find(
  534. root, maxdepth=maxdepth or depth, withdirs=True, detail=True, **kwargs
  535. )
  536. # Escape characters special to python regex, leaving our supported
  537. # special characters in place.
  538. # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html
  539. # for shell globbing details.
  540. pattern = (
  541. "^"
  542. + (
  543. path.replace("\\", r"\\")
  544. .replace(".", r"\.")
  545. .replace("+", r"\+")
  546. .replace("//", "/")
  547. .replace("(", r"\(")
  548. .replace(")", r"\)")
  549. .replace("|", r"\|")
  550. .replace("^", r"\^")
  551. .replace("$", r"\$")
  552. .replace("{", r"\{")
  553. .replace("}", r"\}")
  554. .rstrip("/")
  555. )
  556. + "$"
  557. )
  558. pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern)
  559. pattern = re.sub("[*]", "[^/]*", pattern)
  560. pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*"))
  561. out = {
  562. p: allpaths[p]
  563. for p in sorted(allpaths)
  564. if pattern.match(p.replace("//", "/").rstrip("/"))
  565. }
  566. if detail:
  567. return out
  568. else:
  569. return list(out)
  570. def isdir(self, path):
  571. # override, since all URLs are (also) files
  572. try:
  573. return bool(self.ls(path))
  574. except (FileNotFoundError, ValueError):
  575. return False
  576. class HTTPFile(AbstractBufferedFile):
  577. """
  578. A file-like object pointing to a remove HTTP(S) resource
  579. Supports only reading, with read-ahead of a predermined block-size.
  580. In the case that the server does not supply the filesize, only reading of
  581. the complete file in one go is supported.
  582. Parameters
  583. ----------
  584. url: str
  585. Full URL of the remote resource, including the protocol
  586. session: requests.Session or None
  587. All calls will be made within this session, to avoid restarting
  588. connections where the server allows this
  589. block_size: int or None
  590. The amount of read-ahead to do, in bytes. Default is 5MB, or the value
  591. configured for the FileSystem creating this file
  592. size: None or int
  593. If given, this is the size of the file in bytes, and we don't attempt
  594. to call the server to find the value.
  595. kwargs: all other key-values are passed to requests calls.
  596. """
  597. def __init__(
  598. self,
  599. fs,
  600. url,
  601. session=None,
  602. block_size=None,
  603. mode="rb",
  604. cache_type="bytes",
  605. cache_options=None,
  606. size=None,
  607. **kwargs,
  608. ):
  609. if mode != "rb":
  610. raise NotImplementedError("File mode not supported")
  611. self.url = url
  612. self.session = session
  613. self.details = {"name": url, "size": size, "type": "file"}
  614. super().__init__(
  615. fs=fs,
  616. path=url,
  617. mode=mode,
  618. block_size=block_size,
  619. cache_type=cache_type,
  620. cache_options=cache_options,
  621. **kwargs,
  622. )
  623. def read(self, length=-1):
  624. """Read bytes from file
  625. Parameters
  626. ----------
  627. length: int
  628. Read up to this many bytes. If negative, read all content to end of
  629. file. If the server has not supplied the filesize, attempting to
  630. read only part of the data will raise a ValueError.
  631. """
  632. if (
  633. (length < 0 and self.loc == 0) # explicit read all
  634. # but not when the size is known and fits into a block anyways
  635. and not (self.size is not None and self.size <= self.blocksize)
  636. ):
  637. self._fetch_all()
  638. if self.size is None:
  639. if length < 0:
  640. self._fetch_all()
  641. else:
  642. length = min(self.size - self.loc, length)
  643. return super().read(length)
  644. def _fetch_all(self):
  645. """Read whole file in one shot, without caching
  646. This is only called when position is still at zero,
  647. and read() is called without a byte-count.
  648. """
  649. logger.debug(f"Fetch all for {self}")
  650. if not isinstance(self.cache, AllBytes):
  651. r = self.session.get(self.fs.encode_url(self.url), **self.kwargs)
  652. r.raise_for_status()
  653. out = r.content
  654. self.cache = AllBytes(size=len(out), fetcher=None, blocksize=None, data=out)
  655. self.size = len(out)
  656. def _parse_content_range(self, headers):
  657. """Parse the Content-Range header"""
  658. s = headers.get("Content-Range", "")
  659. m = re.match(r"bytes (\d+-\d+|\*)/(\d+|\*)", s)
  660. if not m:
  661. return None, None, None
  662. if m[1] == "*":
  663. start = end = None
  664. else:
  665. start, end = [int(x) for x in m[1].split("-")]
  666. total = None if m[2] == "*" else int(m[2])
  667. return start, end, total
  668. def _fetch_range(self, start, end):
  669. """Download a block of data
  670. The expectation is that the server returns only the requested bytes,
  671. with HTTP code 206. If this is not the case, we first check the headers,
  672. and then stream the output - if the data size is bigger than we
  673. requested, an exception is raised.
  674. """
  675. logger.debug(f"Fetch range for {self}: {start}-{end}")
  676. kwargs = self.kwargs.copy()
  677. headers = kwargs.pop("headers", {}).copy()
  678. headers["Range"] = f"bytes={start}-{end - 1}"
  679. logger.debug("%s : %s", self.url, headers["Range"])
  680. r = self.session.get(self.fs.encode_url(self.url), headers=headers, **kwargs)
  681. if r.status_code == 416:
  682. # range request outside file
  683. return b""
  684. r.raise_for_status()
  685. # If the server has handled the range request, it should reply
  686. # with status 206 (partial content). But we'll guess that a suitable
  687. # Content-Range header or a Content-Length no more than the
  688. # requested range also mean we have got the desired range.
  689. cl = r.headers.get("Content-Length", r.headers.get("content-length", end + 1))
  690. response_is_range = (
  691. r.status_code == 206
  692. or self._parse_content_range(r.headers)[0] == start
  693. or int(cl) <= end - start
  694. )
  695. if response_is_range:
  696. # partial content, as expected
  697. out = r.content
  698. elif start > 0:
  699. raise ValueError(
  700. "The HTTP server doesn't appear to support range requests. "
  701. "Only reading this file from the beginning is supported. "
  702. "Open with block_size=0 for a streaming file interface."
  703. )
  704. else:
  705. # Response is not a range, but we want the start of the file,
  706. # so we can read the required amount anyway.
  707. cl = 0
  708. out = []
  709. for chunk in r.iter_content(2**20, False):
  710. out.append(chunk)
  711. cl += len(chunk)
  712. out = b"".join(out)[: end - start]
  713. return out
  714. magic_check = re.compile("([*[])")
  715. def has_magic(s):
  716. match = magic_check.search(s)
  717. return match is not None
  718. class HTTPStreamFile(AbstractBufferedFile):
  719. def __init__(self, fs, url, mode="rb", session=None, **kwargs):
  720. self.url = url
  721. self.session = session
  722. if mode != "rb":
  723. raise ValueError
  724. self.details = {"name": url, "size": None}
  725. super().__init__(fs=fs, path=url, mode=mode, cache_type="readahead", **kwargs)
  726. r = self.session.get(self.fs.encode_url(url), stream=True, **kwargs)
  727. self.fs._raise_not_found_for_status(r, url)
  728. self.it = r.iter_content(1024, False)
  729. self.leftover = b""
  730. self.r = r
  731. def seek(self, *args, **kwargs):
  732. raise ValueError("Cannot seek streaming HTTP file")
  733. def read(self, num=-1):
  734. bufs = [self.leftover]
  735. leng = len(self.leftover)
  736. while leng < num or num < 0:
  737. try:
  738. out = self.it.__next__()
  739. except StopIteration:
  740. break
  741. if out:
  742. bufs.append(out)
  743. else:
  744. break
  745. leng += len(out)
  746. out = b"".join(bufs)
  747. if num >= 0:
  748. self.leftover = out[num:]
  749. out = out[:num]
  750. else:
  751. self.leftover = b""
  752. self.loc += len(out)
  753. return out
  754. def close(self):
  755. self.r.close()
  756. self.closed = True
  757. def get_range(session, url, start, end, **kwargs):
  758. # explicit get a range when we know it must be safe
  759. kwargs = kwargs.copy()
  760. headers = kwargs.pop("headers", {}).copy()
  761. headers["Range"] = f"bytes={start}-{end - 1}"
  762. r = session.get(url, headers=headers, **kwargs)
  763. r.raise_for_status()
  764. return r.content
  765. def _file_info(url, session, size_policy="head", **kwargs):
  766. """Call HEAD on the server to get details about the file (size/checksum etc.)
  767. Default operation is to explicitly allow redirects and use encoding
  768. 'identity' (no compression) to get the true size of the target.
  769. """
  770. logger.debug("Retrieve file size for %s", url)
  771. kwargs = kwargs.copy()
  772. ar = kwargs.pop("allow_redirects", True)
  773. head = kwargs.get("headers", {}).copy()
  774. # TODO: not allowed in JS
  775. # head["Accept-Encoding"] = "identity"
  776. kwargs["headers"] = head
  777. info = {}
  778. if size_policy == "head":
  779. r = session.head(url, allow_redirects=ar, **kwargs)
  780. elif size_policy == "get":
  781. r = session.get(url, allow_redirects=ar, **kwargs)
  782. else:
  783. raise TypeError(f'size_policy must be "head" or "get", got {size_policy}')
  784. r.raise_for_status()
  785. # TODO:
  786. # recognise lack of 'Accept-Ranges',
  787. # or 'Accept-Ranges': 'none' (not 'bytes')
  788. # to mean streaming only, no random access => return None
  789. if "Content-Length" in r.headers:
  790. info["size"] = int(r.headers["Content-Length"])
  791. elif "Content-Range" in r.headers:
  792. info["size"] = int(r.headers["Content-Range"].split("/")[1])
  793. elif "content-length" in r.headers:
  794. info["size"] = int(r.headers["content-length"])
  795. elif "content-range" in r.headers:
  796. info["size"] = int(r.headers["content-range"].split("/")[1])
  797. for checksum_field in ["ETag", "Content-MD5", "Digest"]:
  798. if r.headers.get(checksum_field):
  799. info[checksum_field] = r.headers[checksum_field]
  800. return info
  801. # importing this is enough to register it
  802. def register():
  803. register_implementation("http", HTTPFileSystem, clobber=True)
  804. register_implementation("https", HTTPFileSystem, clobber=True)
  805. register_implementation("sync-http", HTTPFileSystem, clobber=True)
  806. register_implementation("sync-https", HTTPFileSystem, clobber=True)
  807. register()
  808. def unregister():
  809. from fsspec.implementations.http import HTTPFileSystem
  810. register_implementation("http", HTTPFileSystem, clobber=True)
  811. register_implementation("https", HTTPFileSystem, clobber=True)