webhdfs.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. # https://hadoop.apache.org/docs/r1.0.4/webhdfs.html
  2. import logging
  3. import os
  4. import secrets
  5. import shutil
  6. import tempfile
  7. import uuid
  8. from contextlib import suppress
  9. from datetime import datetime
  10. from urllib.parse import quote
  11. import requests
  12. from ..spec import AbstractBufferedFile, AbstractFileSystem
  13. from ..utils import infer_storage_options, tokenize
  14. logger = logging.getLogger("webhdfs")
  15. class WebHDFS(AbstractFileSystem):
  16. """
  17. Interface to HDFS over HTTP using the WebHDFS API. Supports also HttpFS gateways.
  18. Four auth mechanisms are supported:
  19. insecure: no auth is done, and the user is assumed to be whoever they
  20. say they are (parameter ``user``), or a predefined value such as
  21. "dr.who" if not given
  22. spnego: when kerberos authentication is enabled, auth is negotiated by
  23. requests_kerberos https://github.com/requests/requests-kerberos .
  24. This establishes a session based on existing kinit login and/or
  25. specified principal/password; parameters are passed with ``kerb_kwargs``
  26. token: uses an existing Hadoop delegation token from another secured
  27. service. Indeed, this client can also generate such tokens when
  28. not insecure. Note that tokens expire, but can be renewed (by a
  29. previously specified user) and may allow for proxying.
  30. basic-auth: used when both parameter ``user`` and parameter ``password``
  31. are provided.
  32. """
  33. tempdir = str(tempfile.gettempdir())
  34. protocol = "webhdfs", "webHDFS"
  35. def __init__(
  36. self,
  37. host,
  38. port=50070,
  39. kerberos=False,
  40. token=None,
  41. user=None,
  42. password=None,
  43. proxy_to=None,
  44. kerb_kwargs=None,
  45. data_proxy=None,
  46. use_https=False,
  47. session_cert=None,
  48. session_verify=True,
  49. **kwargs,
  50. ):
  51. """
  52. Parameters
  53. ----------
  54. host: str
  55. Name-node address
  56. port: int
  57. Port for webHDFS
  58. kerberos: bool
  59. Whether to authenticate with kerberos for this connection
  60. token: str or None
  61. If given, use this token on every call to authenticate. A user
  62. and user-proxy may be encoded in the token and should not be also
  63. given
  64. user: str or None
  65. If given, assert the user name to connect with
  66. password: str or None
  67. If given, assert the password to use for basic auth. If password
  68. is provided, user must be provided also
  69. proxy_to: str or None
  70. If given, the user has the authority to proxy, and this value is
  71. the user in who's name actions are taken
  72. kerb_kwargs: dict
  73. Any extra arguments for HTTPKerberosAuth, see
  74. `<https://github.com/requests/requests-kerberos/blob/master/requests_kerberos/kerberos_.py>`_
  75. data_proxy: dict, callable or None
  76. If given, map data-node addresses. This can be necessary if the
  77. HDFS cluster is behind a proxy, running on Docker or otherwise has
  78. a mismatch between the host-names given by the name-node and the
  79. address by which to refer to them from the client. If a dict,
  80. maps host names ``host->data_proxy[host]``; if a callable, full
  81. URLs are passed, and function must conform to
  82. ``url->data_proxy(url)``.
  83. use_https: bool
  84. Whether to connect to the Name-node using HTTPS instead of HTTP
  85. session_cert: str or Tuple[str, str] or None
  86. Path to a certificate file, or tuple of (cert, key) files to use
  87. for the requests.Session
  88. session_verify: str, bool or None
  89. Path to a certificate file to use for verifying the requests.Session.
  90. kwargs
  91. """
  92. if self._cached:
  93. return
  94. super().__init__(**kwargs)
  95. self.url = f"{'https' if use_https else 'http'}://{host}:{port}/webhdfs/v1"
  96. self.kerb = kerberos
  97. self.kerb_kwargs = kerb_kwargs or {}
  98. self.pars = {}
  99. self.proxy = data_proxy or {}
  100. if token is not None:
  101. if user is not None or proxy_to is not None:
  102. raise ValueError(
  103. "If passing a delegation token, must not set "
  104. "user or proxy_to, as these are encoded in the"
  105. " token"
  106. )
  107. self.pars["delegation"] = token
  108. self.user = user
  109. self.password = password
  110. if password is not None:
  111. if user is None:
  112. raise ValueError(
  113. "If passing a password, the user must also be"
  114. "set in order to set up the basic-auth"
  115. )
  116. else:
  117. if user is not None:
  118. self.pars["user.name"] = user
  119. if proxy_to is not None:
  120. self.pars["doas"] = proxy_to
  121. if kerberos and user is not None:
  122. raise ValueError(
  123. "If using Kerberos auth, do not specify the "
  124. "user, this is handled by kinit."
  125. )
  126. self.session_cert = session_cert
  127. self.session_verify = session_verify
  128. self._connect()
  129. self._fsid = f"webhdfs_{tokenize(host, port)}"
  130. @property
  131. def fsid(self):
  132. return self._fsid
  133. def _connect(self):
  134. self.session = requests.Session()
  135. if self.session_cert:
  136. self.session.cert = self.session_cert
  137. self.session.verify = self.session_verify
  138. if self.kerb:
  139. from requests_kerberos import HTTPKerberosAuth
  140. self.session.auth = HTTPKerberosAuth(**self.kerb_kwargs)
  141. if self.user is not None and self.password is not None:
  142. from requests.auth import HTTPBasicAuth
  143. self.session.auth = HTTPBasicAuth(self.user, self.password)
  144. def _call(self, op, method="get", path=None, data=None, redirect=True, **kwargs):
  145. path = self._strip_protocol(path) if path is not None else ""
  146. url = self._apply_proxy(self.url + quote(path, safe="/="))
  147. args = kwargs.copy()
  148. args.update(self.pars)
  149. args["op"] = op.upper()
  150. logger.debug("sending %s with %s", url, method)
  151. out = self.session.request(
  152. method=method.upper(),
  153. url=url,
  154. params=args,
  155. data=data,
  156. allow_redirects=redirect,
  157. )
  158. if out.status_code in [400, 401, 403, 404, 500]:
  159. try:
  160. err = out.json()
  161. msg = err["RemoteException"]["message"]
  162. exp = err["RemoteException"]["exception"]
  163. except (ValueError, KeyError):
  164. pass
  165. else:
  166. if exp in ["IllegalArgumentException", "UnsupportedOperationException"]:
  167. raise ValueError(msg)
  168. elif exp in ["SecurityException", "AccessControlException"]:
  169. raise PermissionError(msg)
  170. elif exp in ["FileNotFoundException"]:
  171. raise FileNotFoundError(msg)
  172. else:
  173. raise RuntimeError(msg)
  174. out.raise_for_status()
  175. return out
  176. def _open(
  177. self,
  178. path,
  179. mode="rb",
  180. block_size=None,
  181. autocommit=True,
  182. replication=None,
  183. permissions=None,
  184. **kwargs,
  185. ):
  186. """
  187. Parameters
  188. ----------
  189. path: str
  190. File location
  191. mode: str
  192. 'rb', 'wb', etc.
  193. block_size: int
  194. Client buffer size for read-ahead or write buffer
  195. autocommit: bool
  196. If False, writes to temporary file that only gets put in final
  197. location upon commit
  198. replication: int
  199. Number of copies of file on the cluster, write mode only
  200. permissions: str or int
  201. posix permissions, write mode only
  202. kwargs
  203. Returns
  204. -------
  205. WebHDFile instance
  206. """
  207. block_size = block_size or self.blocksize
  208. return WebHDFile(
  209. self,
  210. path,
  211. mode=mode,
  212. block_size=block_size,
  213. tempdir=self.tempdir,
  214. autocommit=autocommit,
  215. replication=replication,
  216. permissions=permissions,
  217. )
  218. @staticmethod
  219. def _process_info(info):
  220. info["type"] = info["type"].lower()
  221. info["size"] = info["length"]
  222. return info
  223. @classmethod
  224. def _strip_protocol(cls, path):
  225. return infer_storage_options(path)["path"]
  226. @staticmethod
  227. def _get_kwargs_from_urls(urlpath):
  228. out = infer_storage_options(urlpath)
  229. out.pop("path", None)
  230. out.pop("protocol", None)
  231. if "username" in out:
  232. out["user"] = out.pop("username")
  233. return out
  234. def info(self, path):
  235. out = self._call("GETFILESTATUS", path=path)
  236. info = out.json()["FileStatus"]
  237. info["name"] = path
  238. return self._process_info(info)
  239. def created(self, path):
  240. """Return the created timestamp of a file as a datetime.datetime"""
  241. # The API does not provide creation time, so we use modification time
  242. info = self.info(path)
  243. mtime = info.get("modificationTime", None)
  244. if mtime is not None:
  245. return datetime.fromtimestamp(mtime / 1000)
  246. raise RuntimeError("Could not retrieve creation time (modification time).")
  247. def modified(self, path):
  248. """Return the modified timestamp of a file as a datetime.datetime"""
  249. info = self.info(path)
  250. mtime = info.get("modificationTime", None)
  251. if mtime is not None:
  252. return datetime.fromtimestamp(mtime / 1000)
  253. raise RuntimeError("Could not retrieve modification time.")
  254. def ls(self, path, detail=False, **kwargs):
  255. out = self._call("LISTSTATUS", path=path)
  256. infos = out.json()["FileStatuses"]["FileStatus"]
  257. for info in infos:
  258. self._process_info(info)
  259. info["name"] = path.rstrip("/") + "/" + info["pathSuffix"]
  260. if detail:
  261. return sorted(infos, key=lambda i: i["name"])
  262. else:
  263. return sorted(info["name"] for info in infos)
  264. def content_summary(self, path):
  265. """Total numbers of files, directories and bytes under path"""
  266. out = self._call("GETCONTENTSUMMARY", path=path)
  267. return out.json()["ContentSummary"]
  268. def ukey(self, path):
  269. """Checksum info of file, giving method and result"""
  270. out = self._call("GETFILECHECKSUM", path=path, redirect=False)
  271. if "Location" in out.headers:
  272. location = self._apply_proxy(out.headers["Location"])
  273. out2 = self.session.get(location)
  274. out2.raise_for_status()
  275. return out2.json()["FileChecksum"]
  276. else:
  277. out.raise_for_status()
  278. return out.json()["FileChecksum"]
  279. def home_directory(self):
  280. """Get user's home directory"""
  281. out = self._call("GETHOMEDIRECTORY")
  282. return out.json()["Path"]
  283. def get_delegation_token(self, renewer=None):
  284. """Retrieve token which can give the same authority to other uses
  285. Parameters
  286. ----------
  287. renewer: str or None
  288. User who may use this token; if None, will be current user
  289. """
  290. if renewer:
  291. out = self._call("GETDELEGATIONTOKEN", renewer=renewer)
  292. else:
  293. out = self._call("GETDELEGATIONTOKEN")
  294. t = out.json()["Token"]
  295. if t is None:
  296. raise ValueError("No token available for this user/security context")
  297. return t["urlString"]
  298. def renew_delegation_token(self, token):
  299. """Make token live longer. Returns new expiry time"""
  300. out = self._call("RENEWDELEGATIONTOKEN", method="put", token=token)
  301. return out.json()["long"]
  302. def cancel_delegation_token(self, token):
  303. """Stop the token from being useful"""
  304. self._call("CANCELDELEGATIONTOKEN", method="put", token=token)
  305. def chmod(self, path, mod):
  306. """Set the permission at path
  307. Parameters
  308. ----------
  309. path: str
  310. location to set (file or directory)
  311. mod: str or int
  312. posix epresentation or permission, give as oct string, e.g, '777'
  313. or 0o777
  314. """
  315. self._call("SETPERMISSION", method="put", path=path, permission=mod)
  316. def chown(self, path, owner=None, group=None):
  317. """Change owning user and/or group"""
  318. kwargs = {}
  319. if owner is not None:
  320. kwargs["owner"] = owner
  321. if group is not None:
  322. kwargs["group"] = group
  323. self._call("SETOWNER", method="put", path=path, **kwargs)
  324. def set_replication(self, path, replication):
  325. """
  326. Set file replication factor
  327. Parameters
  328. ----------
  329. path: str
  330. File location (not for directories)
  331. replication: int
  332. Number of copies of file on the cluster. Should be smaller than
  333. number of data nodes; normally 3 on most systems.
  334. """
  335. self._call("SETREPLICATION", path=path, method="put", replication=replication)
  336. def mkdir(self, path, **kwargs):
  337. self._call("MKDIRS", method="put", path=path)
  338. def makedirs(self, path, exist_ok=False):
  339. if exist_ok is False and self.exists(path):
  340. raise FileExistsError(path)
  341. self.mkdir(path)
  342. def mv(self, path1, path2, **kwargs):
  343. self._call("RENAME", method="put", path=path1, destination=path2)
  344. def rm(self, path, recursive=False, **kwargs):
  345. self._call(
  346. "DELETE",
  347. method="delete",
  348. path=path,
  349. recursive="true" if recursive else "false",
  350. )
  351. def rm_file(self, path, **kwargs):
  352. self.rm(path)
  353. def cp_file(self, lpath, rpath, **kwargs):
  354. with self.open(lpath) as lstream:
  355. tmp_fname = "/".join([self._parent(rpath), f".tmp.{secrets.token_hex(16)}"])
  356. # Perform an atomic copy (stream to a temporary file and
  357. # move it to the actual destination).
  358. try:
  359. with self.open(tmp_fname, "wb") as rstream:
  360. shutil.copyfileobj(lstream, rstream)
  361. self.mv(tmp_fname, rpath)
  362. except BaseException:
  363. with suppress(FileNotFoundError):
  364. self.rm(tmp_fname)
  365. raise
  366. def _apply_proxy(self, location):
  367. if self.proxy and callable(self.proxy):
  368. location = self.proxy(location)
  369. elif self.proxy:
  370. # as a dict
  371. for k, v in self.proxy.items():
  372. location = location.replace(k, v, 1)
  373. return location
  374. class WebHDFile(AbstractBufferedFile):
  375. """A file living in HDFS over webHDFS"""
  376. def __init__(self, fs, path, **kwargs):
  377. super().__init__(fs, path, **kwargs)
  378. kwargs = kwargs.copy()
  379. if kwargs.get("permissions", None) is None:
  380. kwargs.pop("permissions", None)
  381. if kwargs.get("replication", None) is None:
  382. kwargs.pop("replication", None)
  383. self.permissions = kwargs.pop("permissions", 511)
  384. tempdir = kwargs.pop("tempdir")
  385. if kwargs.pop("autocommit", False) is False:
  386. self.target = self.path
  387. self.path = os.path.join(tempdir, str(uuid.uuid4()))
  388. def _upload_chunk(self, final=False):
  389. """Write one part of a multi-block file upload
  390. Parameters
  391. ==========
  392. final: bool
  393. This is the last block, so should complete file, if
  394. self.autocommit is True.
  395. """
  396. out = self.fs.session.post(
  397. self.location,
  398. data=self.buffer.getvalue(),
  399. headers={"content-type": "application/octet-stream"},
  400. )
  401. out.raise_for_status()
  402. return True
  403. def _initiate_upload(self):
  404. """Create remote file/upload"""
  405. kwargs = self.kwargs.copy()
  406. if "a" in self.mode:
  407. op, method = "APPEND", "POST"
  408. else:
  409. op, method = "CREATE", "PUT"
  410. kwargs["overwrite"] = "true"
  411. out = self.fs._call(op, method, self.path, redirect=False, **kwargs)
  412. location = self.fs._apply_proxy(out.headers["Location"])
  413. if "w" in self.mode:
  414. # create empty file to append to
  415. out2 = self.fs.session.put(
  416. location, headers={"content-type": "application/octet-stream"}
  417. )
  418. out2.raise_for_status()
  419. # after creating empty file, change location to append to
  420. out2 = self.fs._call("APPEND", "POST", self.path, redirect=False, **kwargs)
  421. self.location = self.fs._apply_proxy(out2.headers["Location"])
  422. def _fetch_range(self, start, end):
  423. start = max(start, 0)
  424. end = min(self.size, end)
  425. if start >= end or start >= self.size:
  426. return b""
  427. out = self.fs._call(
  428. "OPEN", path=self.path, offset=start, length=end - start, redirect=False
  429. )
  430. out.raise_for_status()
  431. if "Location" in out.headers:
  432. location = out.headers["Location"]
  433. out2 = self.fs.session.get(self.fs._apply_proxy(location))
  434. return out2.content
  435. else:
  436. return out.content
  437. def commit(self):
  438. self.fs.mv(self.path, self.target)
  439. def discard(self):
  440. self.fs.rm(self.path)