sftp.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import datetime
  2. import logging
  3. import os
  4. import types
  5. import uuid
  6. from stat import S_ISDIR, S_ISLNK
  7. import paramiko
  8. from .. import AbstractFileSystem
  9. from ..utils import infer_storage_options
  10. logger = logging.getLogger("fsspec.sftp")
  11. class SFTPFileSystem(AbstractFileSystem):
  12. """Files over SFTP/SSH
  13. Peer-to-peer filesystem over SSH using paramiko.
  14. Note: if using this with the ``open`` or ``open_files``, with full URLs,
  15. there is no way to tell if a path is relative, so all paths are assumed
  16. to be absolute.
  17. """
  18. protocol = "sftp", "ssh"
  19. def __init__(self, host, **ssh_kwargs):
  20. """
  21. Parameters
  22. ----------
  23. host: str
  24. Hostname or IP as a string
  25. temppath: str
  26. Location on the server to put files, when within a transaction
  27. ssh_kwargs: dict
  28. Parameters passed on to connection. See details in
  29. https://docs.paramiko.org/en/3.3/api/client.html#paramiko.client.SSHClient.connect
  30. May include port, username, password...
  31. """
  32. if self._cached:
  33. return
  34. super().__init__(**ssh_kwargs)
  35. self.temppath = ssh_kwargs.pop("temppath", "/tmp") # remote temp directory
  36. self.host = host
  37. self.ssh_kwargs = ssh_kwargs
  38. self._connect()
  39. def _connect(self):
  40. logger.debug("Connecting to SFTP server %s", self.host)
  41. self.client = paramiko.SSHClient()
  42. self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  43. self.client.connect(self.host, **self.ssh_kwargs)
  44. self.ftp = self.client.open_sftp()
  45. @classmethod
  46. def _strip_protocol(cls, path):
  47. return infer_storage_options(path)["path"]
  48. @staticmethod
  49. def _get_kwargs_from_urls(urlpath):
  50. out = infer_storage_options(urlpath)
  51. out.pop("path", None)
  52. out.pop("protocol", None)
  53. return out
  54. def mkdir(self, path, create_parents=True, mode=511):
  55. path = self._strip_protocol(path)
  56. logger.debug("Creating folder %s", path)
  57. if self.exists(path):
  58. raise FileExistsError(f"File exists: {path}")
  59. if create_parents:
  60. self.makedirs(path)
  61. else:
  62. self.ftp.mkdir(path, mode)
  63. def makedirs(self, path, exist_ok=False, mode=511):
  64. if self.exists(path) and not exist_ok:
  65. raise FileExistsError(f"File exists: {path}")
  66. parts = path.split("/")
  67. new_path = "/" if path[:1] == "/" else ""
  68. for part in parts:
  69. if part:
  70. new_path = f"{new_path}/{part}" if new_path else part
  71. if not self.exists(new_path):
  72. self.ftp.mkdir(new_path, mode)
  73. def rmdir(self, path):
  74. path = self._strip_protocol(path)
  75. logger.debug("Removing folder %s", path)
  76. self.ftp.rmdir(path)
  77. def info(self, path):
  78. path = self._strip_protocol(path)
  79. stat = self._decode_stat(self.ftp.stat(path))
  80. stat["name"] = path
  81. return stat
  82. @staticmethod
  83. def _decode_stat(stat, parent_path=None):
  84. if S_ISDIR(stat.st_mode):
  85. t = "directory"
  86. elif S_ISLNK(stat.st_mode):
  87. t = "link"
  88. else:
  89. t = "file"
  90. out = {
  91. "name": "",
  92. "size": stat.st_size,
  93. "type": t,
  94. "uid": stat.st_uid,
  95. "gid": stat.st_gid,
  96. "time": datetime.datetime.fromtimestamp(
  97. stat.st_atime, tz=datetime.timezone.utc
  98. ),
  99. "mtime": datetime.datetime.fromtimestamp(
  100. stat.st_mtime, tz=datetime.timezone.utc
  101. ),
  102. }
  103. if parent_path:
  104. out["name"] = "/".join([parent_path.rstrip("/"), stat.filename])
  105. return out
  106. def ls(self, path, detail=False):
  107. path = self._strip_protocol(path)
  108. logger.debug("Listing folder %s", path)
  109. stats = [self._decode_stat(stat, path) for stat in self.ftp.listdir_iter(path)]
  110. if detail:
  111. return stats
  112. else:
  113. paths = [stat["name"] for stat in stats]
  114. return sorted(paths)
  115. def put(self, lpath, rpath, callback=None, **kwargs):
  116. rpath = self._strip_protocol(rpath)
  117. logger.debug("Put file %s into %s", lpath, rpath)
  118. self.ftp.put(lpath, rpath)
  119. def get_file(self, rpath, lpath, **kwargs):
  120. if self.isdir(rpath):
  121. os.makedirs(lpath, exist_ok=True)
  122. else:
  123. self.ftp.get(self._strip_protocol(rpath), lpath)
  124. def _open(self, path, mode="rb", block_size=None, **kwargs):
  125. """
  126. block_size: int or None
  127. If 0, no buffering, if 1, line buffering, if >1, buffer that many
  128. bytes, if None use default from paramiko.
  129. """
  130. logger.debug("Opening file %s", path)
  131. if kwargs.get("autocommit", True) is False:
  132. # writes to temporary file, move on commit
  133. path2 = "/".join([self.temppath, str(uuid.uuid4())])
  134. f = self.ftp.open(path2, mode, bufsize=block_size if block_size else -1)
  135. f.temppath = path2
  136. f.targetpath = path
  137. f.fs = self
  138. f.commit = types.MethodType(commit_a_file, f)
  139. f.discard = types.MethodType(discard_a_file, f)
  140. else:
  141. f = self.ftp.open(path, mode, bufsize=block_size if block_size else -1)
  142. return f
  143. def _rm(self, path):
  144. if self.isdir(path):
  145. self.ftp.rmdir(path)
  146. else:
  147. self.ftp.remove(path)
  148. def mv(self, old, new):
  149. new = self._strip_protocol(new)
  150. old = self._strip_protocol(old)
  151. logger.debug("Renaming %s into %s", old, new)
  152. self.ftp.posix_rename(old, new)
  153. def commit_a_file(self):
  154. self.fs.mv(self.temppath, self.targetpath)
  155. def discard_a_file(self):
  156. self.fs._rm(self.temppath)