ftp.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. import os
  2. import ssl
  3. import uuid
  4. from ftplib import FTP, FTP_TLS, Error, error_perm
  5. from typing import Any
  6. from ..spec import AbstractBufferedFile, AbstractFileSystem
  7. from ..utils import infer_storage_options, isfilelike
  8. SECURITY_PROTOCOL_MAP = {
  9. "tls": ssl.PROTOCOL_TLS,
  10. "tlsv1": ssl.PROTOCOL_TLSv1,
  11. "tlsv1_1": ssl.PROTOCOL_TLSv1_1,
  12. "tlsv1_2": ssl.PROTOCOL_TLSv1_2,
  13. "sslv23": ssl.PROTOCOL_SSLv23,
  14. }
  15. class ImplicitFTPTLS(FTP_TLS):
  16. """
  17. FTP_TLS subclass that automatically wraps sockets in SSL
  18. to support implicit FTPS.
  19. """
  20. def __init__(self, *args, **kwargs):
  21. super().__init__(*args, **kwargs)
  22. self._sock = None
  23. @property
  24. def sock(self):
  25. """Return the socket."""
  26. return self._sock
  27. @sock.setter
  28. def sock(self, value):
  29. """When modifying the socket, ensure that it is ssl wrapped."""
  30. if value is not None and not isinstance(value, ssl.SSLSocket):
  31. value = self.context.wrap_socket(value)
  32. self._sock = value
  33. class FTPFileSystem(AbstractFileSystem):
  34. """A filesystem over classic FTP"""
  35. root_marker = "/"
  36. cachable = False
  37. protocol = "ftp"
  38. def __init__(
  39. self,
  40. host,
  41. port=21,
  42. username=None,
  43. password=None,
  44. acct=None,
  45. block_size=None,
  46. tempdir=None,
  47. timeout=30,
  48. encoding="utf-8",
  49. tls=False,
  50. **kwargs,
  51. ):
  52. """
  53. You can use _get_kwargs_from_urls to get some kwargs from
  54. a reasonable FTP url.
  55. Authentication will be anonymous if username/password are not
  56. given.
  57. Parameters
  58. ----------
  59. host: str
  60. The remote server name/ip to connect to
  61. port: int
  62. Port to connect with
  63. username: str or None
  64. If authenticating, the user's identifier
  65. password: str of None
  66. User's password on the server, if using
  67. acct: str or None
  68. Some servers also need an "account" string for auth
  69. block_size: int or None
  70. If given, the read-ahead or write buffer size.
  71. tempdir: str
  72. Directory on remote to put temporary files when in a transaction
  73. timeout: int
  74. Timeout of the ftp connection in seconds
  75. encoding: str
  76. Encoding to use for directories and filenames in FTP connection
  77. tls: bool or str
  78. Enable FTP-TLS for secure connections:
  79. - False: Plain FTP (default)
  80. - True: Explicit TLS (FTPS with AUTH TLS command)
  81. - "tls": Auto-negotiate highest protocol
  82. - "tlsv1": TLS v1.0
  83. - "tlsv1_1": TLS v1.1
  84. - "tlsv1_2": TLS v1.2
  85. """
  86. super().__init__(**kwargs)
  87. self.host = host
  88. self.port = port
  89. self.tempdir = tempdir or "/tmp"
  90. self.cred = username or "", password or "", acct or ""
  91. self.timeout = timeout
  92. self.encoding = encoding
  93. if block_size is not None:
  94. self.blocksize = block_size
  95. else:
  96. self.blocksize = 2**16
  97. self.tls = tls
  98. self._connect()
  99. if isinstance(self.tls, bool) and self.tls:
  100. self.ftp.prot_p()
  101. def _connect(self):
  102. security = None
  103. if self.tls:
  104. if isinstance(self.tls, str):
  105. ftp_cls = ImplicitFTPTLS
  106. security = SECURITY_PROTOCOL_MAP.get(
  107. self.tls,
  108. f"Not supported {self.tls} protocol",
  109. )
  110. if isinstance(security, str):
  111. raise ValueError(security)
  112. else:
  113. ftp_cls = FTP_TLS
  114. else:
  115. ftp_cls = FTP
  116. self.ftp = ftp_cls(timeout=self.timeout, encoding=self.encoding)
  117. if security:
  118. self.ftp.ssl_version = security
  119. self.ftp.connect(self.host, self.port)
  120. self.ftp.login(*self.cred)
  121. @classmethod
  122. def _strip_protocol(cls, path):
  123. return "/" + infer_storage_options(path)["path"].lstrip("/").rstrip("/")
  124. @staticmethod
  125. def _get_kwargs_from_urls(urlpath):
  126. out = infer_storage_options(urlpath)
  127. out.pop("path", None)
  128. out.pop("protocol", None)
  129. return out
  130. def ls(self, path, detail=True, **kwargs):
  131. path = self._strip_protocol(path)
  132. out = []
  133. if path not in self.dircache:
  134. try:
  135. try:
  136. out = [
  137. (fn, details)
  138. for (fn, details) in self.ftp.mlsd(path)
  139. if fn not in [".", ".."]
  140. and details["type"] not in ["pdir", "cdir"]
  141. ]
  142. except error_perm:
  143. out = _mlsd2(self.ftp, path) # Not platform independent
  144. for fn, details in out:
  145. details["name"] = "/".join(
  146. ["" if path == "/" else path, fn.lstrip("/")]
  147. )
  148. if details["type"] == "file":
  149. details["size"] = int(details["size"])
  150. else:
  151. details["size"] = 0
  152. if details["type"] == "dir":
  153. details["type"] = "directory"
  154. self.dircache[path] = out
  155. except Error:
  156. try:
  157. info = self.info(path)
  158. if info["type"] == "file":
  159. out = [(path, info)]
  160. except (Error, IndexError) as exc:
  161. raise FileNotFoundError(path) from exc
  162. files = self.dircache.get(path, out)
  163. if not detail:
  164. return sorted([fn for fn, details in files])
  165. return [details for fn, details in files]
  166. def info(self, path, **kwargs):
  167. # implement with direct method
  168. path = self._strip_protocol(path)
  169. if path == "/":
  170. # special case, since this dir has no real entry
  171. return {"name": "/", "size": 0, "type": "directory"}
  172. files = self.ls(self._parent(path).lstrip("/"), True)
  173. try:
  174. out = next(f for f in files if f["name"] == path)
  175. except StopIteration as exc:
  176. raise FileNotFoundError(path) from exc
  177. return out
  178. def get_file(self, rpath, lpath, **kwargs):
  179. if self.isdir(rpath):
  180. if not os.path.exists(lpath):
  181. os.mkdir(lpath)
  182. return
  183. if isfilelike(lpath):
  184. outfile = lpath
  185. else:
  186. outfile = open(lpath, "wb")
  187. def cb(x):
  188. outfile.write(x)
  189. self.ftp.retrbinary(
  190. f"RETR {rpath}",
  191. blocksize=self.blocksize,
  192. callback=cb,
  193. )
  194. if not isfilelike(lpath):
  195. outfile.close()
  196. def cat_file(self, path, start=None, end=None, **kwargs):
  197. if end is not None:
  198. return super().cat_file(path, start, end, **kwargs)
  199. out = []
  200. def cb(x):
  201. out.append(x)
  202. try:
  203. self.ftp.retrbinary(
  204. f"RETR {path}",
  205. blocksize=self.blocksize,
  206. rest=start,
  207. callback=cb,
  208. )
  209. except (Error, error_perm) as orig_exc:
  210. raise FileNotFoundError(path) from orig_exc
  211. return b"".join(out)
  212. def _open(
  213. self,
  214. path,
  215. mode="rb",
  216. block_size=None,
  217. cache_options=None,
  218. autocommit=True,
  219. **kwargs,
  220. ):
  221. path = self._strip_protocol(path)
  222. block_size = block_size or self.blocksize
  223. return FTPFile(
  224. self,
  225. path,
  226. mode=mode,
  227. block_size=block_size,
  228. tempdir=self.tempdir,
  229. autocommit=autocommit,
  230. cache_options=cache_options,
  231. )
  232. def _rm(self, path):
  233. path = self._strip_protocol(path)
  234. self.ftp.delete(path)
  235. self.invalidate_cache(self._parent(path))
  236. def rm(self, path, recursive=False, maxdepth=None):
  237. paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
  238. for p in reversed(paths):
  239. if self.isfile(p):
  240. self.rm_file(p)
  241. else:
  242. self.rmdir(p)
  243. def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
  244. path = self._strip_protocol(path)
  245. parent = self._parent(path)
  246. if parent != self.root_marker and not self.exists(parent) and create_parents:
  247. self.mkdir(parent, create_parents=create_parents)
  248. self.ftp.mkd(path)
  249. self.invalidate_cache(self._parent(path))
  250. def makedirs(self, path: str, exist_ok: bool = False) -> None:
  251. path = self._strip_protocol(path)
  252. if self.exists(path):
  253. # NB: "/" does not "exist" as it has no directory entry
  254. if not exist_ok:
  255. raise FileExistsError(f"{path} exists without `exist_ok`")
  256. # exists_ok=True -> no-op
  257. else:
  258. self.mkdir(path, create_parents=True)
  259. def rmdir(self, path):
  260. path = self._strip_protocol(path)
  261. self.ftp.rmd(path)
  262. self.invalidate_cache(self._parent(path))
  263. def mv(self, path1, path2, **kwargs):
  264. path1 = self._strip_protocol(path1)
  265. path2 = self._strip_protocol(path2)
  266. self.ftp.rename(path1, path2)
  267. self.invalidate_cache(self._parent(path1))
  268. self.invalidate_cache(self._parent(path2))
  269. def __del__(self):
  270. self.ftp.close()
  271. def invalidate_cache(self, path=None):
  272. if path is None:
  273. self.dircache.clear()
  274. else:
  275. self.dircache.pop(path, None)
  276. super().invalidate_cache(path)
  277. class TransferDone(Exception):
  278. """Internal exception to break out of transfer"""
  279. pass
  280. class FTPFile(AbstractBufferedFile):
  281. """Interact with a remote FTP file with read/write buffering"""
  282. def __init__(
  283. self,
  284. fs,
  285. path,
  286. mode="rb",
  287. block_size="default",
  288. autocommit=True,
  289. cache_type="readahead",
  290. cache_options=None,
  291. **kwargs,
  292. ):
  293. super().__init__(
  294. fs,
  295. path,
  296. mode=mode,
  297. block_size=block_size,
  298. autocommit=autocommit,
  299. cache_type=cache_type,
  300. cache_options=cache_options,
  301. **kwargs,
  302. )
  303. if not autocommit:
  304. self.target = self.path
  305. self.path = "/".join([kwargs["tempdir"], str(uuid.uuid4())])
  306. def commit(self):
  307. self.fs.mv(self.path, self.target)
  308. def discard(self):
  309. self.fs.rm(self.path)
  310. def _fetch_range(self, start, end):
  311. """Get bytes between given byte limits
  312. Implemented by raising an exception in the fetch callback when the
  313. number of bytes received reaches the requested amount.
  314. Will fail if the server does not respect the REST command on
  315. retrieve requests.
  316. """
  317. out = []
  318. total = [0]
  319. def callback(x):
  320. total[0] += len(x)
  321. if total[0] > end - start:
  322. out.append(x[: (end - start) - total[0]])
  323. if end < self.size:
  324. raise TransferDone
  325. else:
  326. out.append(x)
  327. if total[0] == end - start and end < self.size:
  328. raise TransferDone
  329. try:
  330. self.fs.ftp.retrbinary(
  331. f"RETR {self.path}",
  332. blocksize=self.blocksize,
  333. rest=start,
  334. callback=callback,
  335. )
  336. except TransferDone:
  337. try:
  338. # stop transfer, we got enough bytes for this block
  339. self.fs.ftp.abort()
  340. self.fs.ftp.getmultiline()
  341. except Error:
  342. self.fs._connect()
  343. return b"".join(out)
  344. def _upload_chunk(self, final=False):
  345. self.buffer.seek(0)
  346. self.fs.ftp.storbinary(
  347. f"STOR {self.path}", self.buffer, blocksize=self.blocksize, rest=self.offset
  348. )
  349. return True
  350. def _mlsd2(ftp, path="."):
  351. """
  352. Fall back to using `dir` instead of `mlsd` if not supported.
  353. This parses a Linux style `ls -l` response to `dir`, but the response may
  354. be platform dependent.
  355. Parameters
  356. ----------
  357. ftp: ftplib.FTP
  358. path: str
  359. Expects to be given path, but defaults to ".".
  360. """
  361. lines = []
  362. minfo = []
  363. ftp.dir(path, lines.append)
  364. for line in lines:
  365. split_line = line.split()
  366. if len(split_line) < 9:
  367. continue
  368. this = (
  369. split_line[-1],
  370. {
  371. "modify": " ".join(split_line[5:8]),
  372. "unix.owner": split_line[2],
  373. "unix.group": split_line[3],
  374. "unix.mode": split_line[0],
  375. "size": split_line[4],
  376. },
  377. )
  378. if this[1]["unix.mode"][0] == "d":
  379. this[1]["type"] = "dir"
  380. else:
  381. this[1]["type"] = "file"
  382. minfo.append(this)
  383. return minfo