_zipfs.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. from __future__ import annotations
  2. import io
  3. import os
  4. import shutil
  5. import stat
  6. import typing
  7. import zipfile
  8. from datetime import datetime
  9. from ._base import FS
  10. from ._errors import FileExpected, ResourceNotFound, ResourceReadOnly
  11. from ._info import Info
  12. from ._path import dirname, forcedir, normpath, relpath
  13. from ._tempfs import TempFS
  14. if typing.TYPE_CHECKING:
  15. from collections.abc import Collection
  16. from typing import IO, Any
  17. from ._subfs import SubFS
  18. class ZipFS(FS):
  19. """Read and write zip files."""
  20. def __new__(
  21. cls, file: str | os.PathLike, write: bool = False, encoding: str = "utf-8"
  22. ):
  23. if write:
  24. return WriteZipFS(file, encoding)
  25. else:
  26. return ReadZipFS(file, encoding)
  27. if typing.TYPE_CHECKING:
  28. def __init__(
  29. self, file: str | os.PathLike, write: bool = False, encoding: str = "utf-8"
  30. ):
  31. pass
  32. class ReadZipFS(FS):
  33. """A readable zip file."""
  34. def __init__(self, file: str | os.PathLike, encoding: str = "utf-8"):
  35. super().__init__()
  36. self._file = os.fspath(file)
  37. self.encoding = encoding # unused
  38. self._zip = zipfile.ZipFile(file, "r")
  39. self._directory_fs = None
  40. def __repr__(self) -> str:
  41. return f"ReadZipFS({self._file!r})"
  42. def __str__(self) -> str:
  43. return f"<zipfs '{self._file}'>"
  44. def _path_to_zip_name(self, path: str) -> str:
  45. """Convert a path to a zip file name."""
  46. path = relpath(normpath(path))
  47. if self._directory.isdir(path):
  48. path = forcedir(path)
  49. return path
  50. @property
  51. def _directory(self) -> TempFS:
  52. if self._directory_fs is None:
  53. self._directory_fs = _fs = TempFS()
  54. for zip_name in self._zip.namelist():
  55. resource_name = zip_name
  56. if resource_name.endswith("/"):
  57. _fs.makedirs(resource_name, recreate=True)
  58. else:
  59. _fs.makedirs(dirname(resource_name), recreate=True)
  60. _fs.create(resource_name)
  61. return self._directory_fs
  62. def close(self):
  63. super(ReadZipFS, self).close()
  64. self._zip.close()
  65. if self._directory_fs is not None:
  66. self._directory_fs.close()
  67. def getinfo(self, path: str, namespaces: Collection[str] | None = None) -> Info:
  68. namespaces = namespaces or ()
  69. raw_info = {}
  70. if path == "/":
  71. raw_info["basic"] = {"name": "", "is_dir": True}
  72. if "details" in namespaces:
  73. raw_info["details"] = {"type": stat.S_IFDIR}
  74. else:
  75. basic_info = self._directory.getinfo(path)
  76. raw_info["basic"] = {"name": basic_info.name, "is_dir": basic_info.is_dir}
  77. if "details" in namespaces:
  78. zip_name = self._path_to_zip_name(path)
  79. try:
  80. zip_info = self._zip.getinfo(zip_name)
  81. except KeyError:
  82. pass
  83. else:
  84. if "details" in namespaces:
  85. raw_info["details"] = {
  86. "size": zip_info.file_size,
  87. "type": int(
  88. stat.S_IFDIR if basic_info.is_dir else stat.S_IFREG
  89. ),
  90. "modified": datetime(*zip_info.date_time).timestamp(),
  91. }
  92. return Info(raw_info)
  93. def exists(self, path: str) -> bool:
  94. self.check()
  95. return self._directory.exists(path)
  96. def isdir(self, path: str) -> bool:
  97. self.check()
  98. return self._directory.isdir(path)
  99. def isfile(self, path: str) -> bool:
  100. self.check()
  101. return self._directory.isfile(path)
  102. def listdir(self, path: str) -> str:
  103. self.check()
  104. return self._directory.listdir(path)
  105. def makedir(self, path: str, recreate: bool = False) -> SubFS:
  106. self.check()
  107. raise ResourceReadOnly(path)
  108. def makedirs(self, path: str, recreate: bool = False) -> SubFS:
  109. self.check()
  110. raise ResourceReadOnly(path)
  111. def remove(self, path: str):
  112. self.check()
  113. raise ResourceReadOnly(path)
  114. def removedir(self, path: str):
  115. self.check()
  116. raise ResourceReadOnly(path)
  117. def removetree(self, path: str):
  118. self.check()
  119. raise ResourceReadOnly(path)
  120. def movedir(self, src: str, dst: str, create: bool = False):
  121. self.check()
  122. raise ResourceReadOnly(src)
  123. def readbytes(self, path: str) -> bytes:
  124. self.check()
  125. if not self._directory.isfile(path):
  126. raise ResourceNotFound(path)
  127. zip_name = self._path_to_zip_name(path)
  128. zip_bytes = self._zip.read(zip_name)
  129. return zip_bytes
  130. def open(self, path: str, mode: str = "rb", **kwargs) -> IO[Any]:
  131. self.check()
  132. if self._directory.isdir(path):
  133. raise FileExpected(f"{path!r} is a directory")
  134. zip_mode = mode[0]
  135. if zip_mode == "r" and not self._directory.exists(path):
  136. raise ResourceNotFound(f"No such file or directory: {path!r}")
  137. if any(m in mode for m in "wax+"):
  138. raise ResourceReadOnly(path)
  139. zip_name = self._path_to_zip_name(path)
  140. stream = self._zip.open(zip_name, zip_mode)
  141. if "b" in mode:
  142. if kwargs:
  143. raise ValueError("encoding args invalid for binary operation")
  144. return stream
  145. # Text mode
  146. return io.TextIOWrapper(stream, **kwargs)
  147. class WriteZipFS(TempFS):
  148. """A writable zip file."""
  149. def __init__(self, file: str | os.PathLike, encoding: str = "utf-8"):
  150. super().__init__()
  151. self._file = os.fspath(file)
  152. self.encoding = encoding # unused
  153. def __repr__(self) -> str:
  154. return f"WriteZipFS({self._file!r})"
  155. def __str__(self) -> str:
  156. return f"<zipfs-write '{self._file}'>"
  157. def close(self):
  158. base_name = os.path.splitext(self._file)[0]
  159. shutil.make_archive(base_name, format="zip", root_dir=self._temp_dir)
  160. if self._file != base_name + ".zip":
  161. shutil.move(base_name + ".zip", self._file)
  162. super().close()