_file_sink.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. import datetime
  2. import decimal
  3. import glob
  4. import numbers
  5. import os
  6. import shutil
  7. import string
  8. from functools import partial
  9. from stat import ST_DEV, ST_INO
  10. from . import _string_parsers as string_parsers
  11. from ._ctime_functions import get_ctime, set_ctime
  12. from ._datetime import aware_now
  13. def generate_rename_path(root, ext, creation_time):
  14. creation_datetime = datetime.datetime.fromtimestamp(creation_time)
  15. date = FileDateFormatter(creation_datetime)
  16. renamed_path = "{}.{}{}".format(root, date, ext)
  17. counter = 1
  18. while os.path.exists(renamed_path):
  19. counter += 1
  20. renamed_path = "{}.{}.{}{}".format(root, date, counter, ext)
  21. return renamed_path
  22. class FileDateFormatter:
  23. def __init__(self, datetime=None):
  24. self.datetime = datetime or aware_now()
  25. def __format__(self, spec):
  26. if not spec:
  27. spec = "%Y-%m-%d_%H-%M-%S_%f"
  28. return self.datetime.__format__(spec)
  29. class Compression:
  30. @staticmethod
  31. def add_compress(path_in, path_out, opener, **kwargs):
  32. with opener(path_out, **kwargs) as f_comp:
  33. f_comp.add(path_in, os.path.basename(path_in))
  34. @staticmethod
  35. def write_compress(path_in, path_out, opener, **kwargs):
  36. with opener(path_out, **kwargs) as f_comp:
  37. f_comp.write(path_in, os.path.basename(path_in))
  38. @staticmethod
  39. def copy_compress(path_in, path_out, opener, **kwargs):
  40. with open(path_in, "rb") as f_in:
  41. with opener(path_out, **kwargs) as f_out:
  42. shutil.copyfileobj(f_in, f_out)
  43. @staticmethod
  44. def compression(path_in, ext, compress_function):
  45. path_out = "{}{}".format(path_in, ext)
  46. if os.path.exists(path_out):
  47. creation_time = get_ctime(path_out)
  48. root, ext_before = os.path.splitext(path_in)
  49. renamed_path = generate_rename_path(root, ext_before + ext, creation_time)
  50. os.rename(path_out, renamed_path)
  51. compress_function(path_in, path_out)
  52. os.remove(path_in)
  53. class Retention:
  54. @staticmethod
  55. def retention_count(logs, number):
  56. def key_log(log):
  57. return (-os.stat(log).st_mtime, log)
  58. for log in sorted(logs, key=key_log)[number:]:
  59. os.remove(log)
  60. @staticmethod
  61. def retention_age(logs, seconds):
  62. t = datetime.datetime.now().timestamp()
  63. for log in logs:
  64. if os.stat(log).st_mtime <= t - seconds:
  65. os.remove(log)
  66. class Rotation:
  67. @staticmethod
  68. def forward_day(t):
  69. return t + datetime.timedelta(days=1)
  70. @staticmethod
  71. def forward_weekday(t, weekday):
  72. while True:
  73. t += datetime.timedelta(days=1)
  74. if t.weekday() == weekday:
  75. return t
  76. @staticmethod
  77. def forward_interval(t, interval):
  78. return t + interval
  79. @staticmethod
  80. def rotation_size(message, file, size_limit):
  81. file.seek(0, 2)
  82. return file.tell() + len(message) > size_limit
  83. class RotationTime:
  84. def __init__(self, step_forward, time_init=None):
  85. self._step_forward = step_forward
  86. self._time_init = time_init
  87. self._limit = None
  88. def __call__(self, message, file):
  89. record_time = message.record["time"]
  90. if self._limit is None:
  91. filepath = os.path.realpath(file.name)
  92. creation_time = get_ctime(filepath)
  93. set_ctime(filepath, creation_time)
  94. start_time = datetime.datetime.fromtimestamp(
  95. creation_time, tz=datetime.timezone.utc
  96. )
  97. time_init = self._time_init
  98. if time_init is None:
  99. limit = start_time.astimezone(record_time.tzinfo).replace(tzinfo=None)
  100. limit = self._step_forward(limit)
  101. else:
  102. tzinfo = record_time.tzinfo if time_init.tzinfo is None else time_init.tzinfo
  103. limit = start_time.astimezone(tzinfo).replace(
  104. hour=time_init.hour,
  105. minute=time_init.minute,
  106. second=time_init.second,
  107. microsecond=time_init.microsecond,
  108. )
  109. if limit <= start_time:
  110. limit = self._step_forward(limit)
  111. if time_init.tzinfo is None:
  112. limit = limit.replace(tzinfo=None)
  113. self._limit = limit
  114. if self._limit.tzinfo is None:
  115. record_time = record_time.replace(tzinfo=None)
  116. if record_time >= self._limit:
  117. while self._limit <= record_time:
  118. self._limit = self._step_forward(self._limit)
  119. return True
  120. return False
  121. class FileSink:
  122. def __init__(
  123. self,
  124. path,
  125. *,
  126. rotation=None,
  127. retention=None,
  128. compression=None,
  129. delay=False,
  130. watch=False,
  131. mode="a",
  132. buffering=1,
  133. encoding="utf8",
  134. **kwargs
  135. ):
  136. self.encoding = encoding
  137. self._kwargs = {**kwargs, "mode": mode, "buffering": buffering, "encoding": self.encoding}
  138. self._path = str(path)
  139. self._glob_patterns = self._make_glob_patterns(self._path)
  140. self._rotation_function = self._make_rotation_function(rotation)
  141. self._retention_function = self._make_retention_function(retention)
  142. self._compression_function = self._make_compression_function(compression)
  143. self._file = None
  144. self._file_path = None
  145. self._watch = watch
  146. self._file_dev = -1
  147. self._file_ino = -1
  148. if not delay:
  149. path = self._create_path()
  150. self._create_dirs(path)
  151. self._create_file(path)
  152. def write(self, message):
  153. if self._file is None:
  154. path = self._create_path()
  155. self._create_dirs(path)
  156. self._create_file(path)
  157. if self._watch:
  158. self._reopen_if_needed()
  159. if self._rotation_function is not None and self._rotation_function(message, self._file):
  160. self._terminate_file(is_rotating=True)
  161. self._file.write(message)
  162. def stop(self):
  163. if self._watch:
  164. self._reopen_if_needed()
  165. self._terminate_file(is_rotating=False)
  166. def tasks_to_complete(self):
  167. return []
  168. def _create_path(self):
  169. path = self._path.format_map({"time": FileDateFormatter()})
  170. return os.path.abspath(path)
  171. def _create_dirs(self, path):
  172. dirname = os.path.dirname(path)
  173. os.makedirs(dirname, exist_ok=True)
  174. def _create_file(self, path):
  175. self._file = open(path, **self._kwargs)
  176. self._file_path = path
  177. if self._watch:
  178. fileno = self._file.fileno()
  179. result = os.fstat(fileno)
  180. self._file_dev = result[ST_DEV]
  181. self._file_ino = result[ST_INO]
  182. def _close_file(self):
  183. self._file.flush()
  184. self._file.close()
  185. self._file = None
  186. self._file_path = None
  187. self._file_dev = -1
  188. self._file_ino = -1
  189. def _reopen_if_needed(self):
  190. # Implemented based on standard library:
  191. # https://github.com/python/cpython/blob/cb589d1b/Lib/logging/handlers.py#L486
  192. if not self._file:
  193. return
  194. filepath = self._file_path
  195. try:
  196. result = os.stat(filepath)
  197. except FileNotFoundError:
  198. result = None
  199. if not result or result[ST_DEV] != self._file_dev or result[ST_INO] != self._file_ino:
  200. self._close_file()
  201. self._create_dirs(filepath)
  202. self._create_file(filepath)
  203. def _terminate_file(self, *, is_rotating=False):
  204. old_path = self._file_path
  205. if self._file is not None:
  206. self._close_file()
  207. if is_rotating:
  208. new_path = self._create_path()
  209. self._create_dirs(new_path)
  210. if new_path == old_path:
  211. creation_time = get_ctime(old_path)
  212. root, ext = os.path.splitext(old_path)
  213. renamed_path = generate_rename_path(root, ext, creation_time)
  214. os.rename(old_path, renamed_path)
  215. old_path = renamed_path
  216. if is_rotating or self._rotation_function is None:
  217. if self._compression_function is not None and old_path is not None:
  218. self._compression_function(old_path)
  219. if self._retention_function is not None:
  220. logs = {
  221. file
  222. for pattern in self._glob_patterns
  223. for file in glob.glob(pattern)
  224. if os.path.isfile(file)
  225. }
  226. self._retention_function(list(logs))
  227. if is_rotating:
  228. self._create_file(new_path)
  229. set_ctime(new_path, datetime.datetime.now().timestamp())
  230. @staticmethod
  231. def _make_glob_patterns(path):
  232. formatter = string.Formatter()
  233. tokens = formatter.parse(path)
  234. escaped = "".join(glob.escape(text) + "*" * (name is not None) for text, name, *_ in tokens)
  235. root, ext = os.path.splitext(escaped)
  236. if not ext:
  237. return [escaped, escaped + ".*"]
  238. return [escaped, escaped + ".*", root + ".*" + ext, root + ".*" + ext + ".*"]
  239. @staticmethod
  240. def _make_rotation_function(rotation):
  241. if rotation is None:
  242. return None
  243. if isinstance(rotation, str):
  244. size = string_parsers.parse_size(rotation)
  245. if size is not None:
  246. return FileSink._make_rotation_function(size)
  247. interval = string_parsers.parse_duration(rotation)
  248. if interval is not None:
  249. return FileSink._make_rotation_function(interval)
  250. frequency = string_parsers.parse_frequency(rotation)
  251. if frequency is not None:
  252. return Rotation.RotationTime(frequency)
  253. daytime = string_parsers.parse_daytime(rotation)
  254. if daytime is not None:
  255. day, time = daytime
  256. if day is None:
  257. return FileSink._make_rotation_function(time)
  258. if time is None:
  259. time = datetime.time(0, 0, 0)
  260. step_forward = partial(Rotation.forward_weekday, weekday=day)
  261. return Rotation.RotationTime(step_forward, time)
  262. raise ValueError("Cannot parse rotation from: '%s'" % rotation)
  263. if isinstance(rotation, (numbers.Real, decimal.Decimal)):
  264. return partial(Rotation.rotation_size, size_limit=rotation)
  265. if isinstance(rotation, datetime.time):
  266. return Rotation.RotationTime(Rotation.forward_day, rotation)
  267. if isinstance(rotation, datetime.timedelta):
  268. step_forward = partial(Rotation.forward_interval, interval=rotation)
  269. return Rotation.RotationTime(step_forward)
  270. if callable(rotation):
  271. return rotation
  272. raise TypeError("Cannot infer rotation for objects of type: '%s'" % type(rotation).__name__)
  273. @staticmethod
  274. def _make_retention_function(retention):
  275. if retention is None:
  276. return None
  277. if isinstance(retention, str):
  278. interval = string_parsers.parse_duration(retention)
  279. if interval is None:
  280. raise ValueError("Cannot parse retention from: '%s'" % retention)
  281. return FileSink._make_retention_function(interval)
  282. if isinstance(retention, int):
  283. return partial(Retention.retention_count, number=retention)
  284. if isinstance(retention, datetime.timedelta):
  285. return partial(Retention.retention_age, seconds=retention.total_seconds())
  286. if callable(retention):
  287. return retention
  288. raise TypeError(
  289. "Cannot infer retention for objects of type: '%s'" % type(retention).__name__
  290. )
  291. @staticmethod
  292. def _make_compression_function(compression):
  293. if compression is None:
  294. return None
  295. if isinstance(compression, str):
  296. ext = compression.strip().lstrip(".")
  297. if ext == "gz":
  298. import gzip
  299. compress = partial(Compression.copy_compress, opener=gzip.open, mode="wb")
  300. elif ext == "bz2":
  301. import bz2
  302. compress = partial(Compression.copy_compress, opener=bz2.open, mode="wb")
  303. elif ext == "xz":
  304. import lzma
  305. compress = partial(
  306. Compression.copy_compress, opener=lzma.open, mode="wb", format=lzma.FORMAT_XZ
  307. )
  308. elif ext == "lzma":
  309. import lzma
  310. compress = partial(
  311. Compression.copy_compress, opener=lzma.open, mode="wb", format=lzma.FORMAT_ALONE
  312. )
  313. elif ext == "tar":
  314. import tarfile
  315. compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:")
  316. elif ext == "tar.gz":
  317. import gzip
  318. import tarfile
  319. compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:gz")
  320. elif ext == "tar.bz2":
  321. import bz2
  322. import tarfile
  323. compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:bz2")
  324. elif ext == "tar.xz":
  325. import lzma
  326. import tarfile
  327. compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:xz")
  328. elif ext == "zip":
  329. import zipfile
  330. compress = partial(
  331. Compression.write_compress,
  332. opener=zipfile.ZipFile,
  333. mode="w",
  334. compression=zipfile.ZIP_DEFLATED,
  335. )
  336. else:
  337. raise ValueError("Invalid compression format: '%s'" % ext)
  338. return partial(Compression.compression, ext="." + ext, compress_function=compress)
  339. if callable(compression):
  340. return compression
  341. raise TypeError(
  342. "Cannot infer compression for objects of type: '%s'" % type(compression).__name__
  343. )