_handler.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. import functools
  2. import json
  3. import multiprocessing
  4. import os
  5. import threading
  6. from contextlib import contextmanager
  7. from threading import Thread
  8. from ._colorizer import Colorizer
  9. from ._locks_machinery import create_handler_lock
  10. def prepare_colored_format(format_, ansi_level):
  11. colored = Colorizer.prepare_format(format_)
  12. return colored, colored.colorize(ansi_level)
  13. def prepare_stripped_format(format_):
  14. colored = Colorizer.prepare_format(format_)
  15. return colored.strip()
  16. def memoize(function):
  17. return functools.lru_cache(maxsize=64)(function)
  18. class Message(str):
  19. __slots__ = ("record",)
  20. class Handler:
  21. def __init__(
  22. self,
  23. *,
  24. sink,
  25. name,
  26. levelno,
  27. formatter,
  28. is_formatter_dynamic,
  29. filter_,
  30. colorize,
  31. serialize,
  32. enqueue,
  33. multiprocessing_context,
  34. error_interceptor,
  35. exception_formatter,
  36. id_,
  37. levels_ansi_codes
  38. ):
  39. self._name = name
  40. self._sink = sink
  41. self._levelno = levelno
  42. self._formatter = formatter
  43. self._is_formatter_dynamic = is_formatter_dynamic
  44. self._filter = filter_
  45. self._colorize = colorize
  46. self._serialize = serialize
  47. self._enqueue = enqueue
  48. self._multiprocessing_context = multiprocessing_context
  49. self._error_interceptor = error_interceptor
  50. self._exception_formatter = exception_formatter
  51. self._id = id_
  52. self._levels_ansi_codes = levels_ansi_codes # Warning, reference shared among handlers
  53. self._decolorized_format = None
  54. self._precolorized_formats = {}
  55. self._memoize_dynamic_format = None
  56. self._stopped = False
  57. self._lock = create_handler_lock()
  58. self._lock_acquired = threading.local()
  59. self._queue = None
  60. self._queue_lock = None
  61. self._confirmation_event = None
  62. self._confirmation_lock = None
  63. self._owner_process_pid = None
  64. self._thread = None
  65. if self._is_formatter_dynamic:
  66. if self._colorize:
  67. self._memoize_dynamic_format = memoize(prepare_colored_format)
  68. else:
  69. self._memoize_dynamic_format = memoize(prepare_stripped_format)
  70. else:
  71. if self._colorize:
  72. for level_name in self._levels_ansi_codes:
  73. self.update_format(level_name)
  74. else:
  75. self._decolorized_format = self._formatter.strip()
  76. if self._enqueue:
  77. if self._multiprocessing_context is None:
  78. self._queue = multiprocessing.SimpleQueue()
  79. self._confirmation_event = multiprocessing.Event()
  80. self._confirmation_lock = multiprocessing.Lock()
  81. else:
  82. self._queue = self._multiprocessing_context.SimpleQueue()
  83. self._confirmation_event = self._multiprocessing_context.Event()
  84. self._confirmation_lock = self._multiprocessing_context.Lock()
  85. self._queue_lock = create_handler_lock()
  86. self._owner_process_pid = os.getpid()
  87. self._thread = Thread(
  88. target=self._queued_writer, daemon=True, name="loguru-writer-%d" % self._id
  89. )
  90. self._thread.start()
  91. def __repr__(self):
  92. return "(id=%d, level=%d, sink=%s)" % (self._id, self._levelno, self._name)
  93. @contextmanager
  94. def _protected_lock(self):
  95. """Acquire the lock, but fail fast if its already acquired by the current thread."""
  96. if getattr(self._lock_acquired, "acquired", False):
  97. raise RuntimeError(
  98. "Could not acquire internal lock because it was already in use (deadlock avoided). "
  99. "This likely happened because the logger was re-used inside a sink, a signal "
  100. "handler or a '__del__' method. This is not permitted because the logger and its "
  101. "handlers are not re-entrant."
  102. )
  103. self._lock_acquired.acquired = True
  104. try:
  105. with self._lock:
  106. yield
  107. finally:
  108. self._lock_acquired.acquired = False
  109. def emit(self, record, level_id, from_decorator, is_raw, colored_message):
  110. try:
  111. if self._levelno > record["level"].no:
  112. return
  113. if self._filter is not None:
  114. if not self._filter(record):
  115. return
  116. if self._is_formatter_dynamic:
  117. dynamic_format = self._formatter(record)
  118. formatter_record = record.copy()
  119. if not record["exception"]:
  120. formatter_record["exception"] = ""
  121. else:
  122. type_, value, tb = record["exception"]
  123. formatter = self._exception_formatter
  124. lines = formatter.format_exception(type_, value, tb, from_decorator=from_decorator)
  125. formatter_record["exception"] = "".join(lines)
  126. if colored_message is not None and colored_message.stripped != record["message"]:
  127. colored_message = None
  128. if is_raw:
  129. if colored_message is None or not self._colorize:
  130. formatted = record["message"]
  131. else:
  132. ansi_level = self._levels_ansi_codes[level_id]
  133. formatted = colored_message.colorize(ansi_level)
  134. elif self._is_formatter_dynamic:
  135. if not self._colorize:
  136. precomputed_format = self._memoize_dynamic_format(dynamic_format)
  137. formatted = precomputed_format.format_map(formatter_record)
  138. elif colored_message is None:
  139. ansi_level = self._levels_ansi_codes[level_id]
  140. _, precomputed_format = self._memoize_dynamic_format(dynamic_format, ansi_level)
  141. formatted = precomputed_format.format_map(formatter_record)
  142. else:
  143. ansi_level = self._levels_ansi_codes[level_id]
  144. formatter, precomputed_format = self._memoize_dynamic_format(
  145. dynamic_format, ansi_level
  146. )
  147. coloring_message = formatter.make_coloring_message(
  148. record["message"], ansi_level=ansi_level, colored_message=colored_message
  149. )
  150. formatter_record["message"] = coloring_message
  151. formatted = precomputed_format.format_map(formatter_record)
  152. else:
  153. if not self._colorize:
  154. precomputed_format = self._decolorized_format
  155. formatted = precomputed_format.format_map(formatter_record)
  156. elif colored_message is None:
  157. ansi_level = self._levels_ansi_codes[level_id]
  158. precomputed_format = self._precolorized_formats[level_id]
  159. formatted = precomputed_format.format_map(formatter_record)
  160. else:
  161. ansi_level = self._levels_ansi_codes[level_id]
  162. precomputed_format = self._precolorized_formats[level_id]
  163. coloring_message = self._formatter.make_coloring_message(
  164. record["message"], ansi_level=ansi_level, colored_message=colored_message
  165. )
  166. formatter_record["message"] = coloring_message
  167. formatted = precomputed_format.format_map(formatter_record)
  168. if self._serialize:
  169. formatted = self._serialize_record(formatted, record)
  170. str_record = Message(formatted)
  171. str_record.record = record
  172. with self._protected_lock():
  173. if self._stopped:
  174. return
  175. if self._enqueue:
  176. self._queue.put(str_record)
  177. else:
  178. self._sink.write(str_record)
  179. except Exception:
  180. if not self._error_interceptor.should_catch():
  181. raise
  182. self._error_interceptor.print(record)
  183. def stop(self):
  184. with self._protected_lock():
  185. self._stopped = True
  186. if self._enqueue:
  187. if self._owner_process_pid != os.getpid():
  188. return
  189. self._queue.put(None)
  190. self._thread.join()
  191. if hasattr(self._queue, "close"):
  192. self._queue.close()
  193. self._sink.stop()
  194. def complete_queue(self):
  195. if not self._enqueue:
  196. return
  197. with self._confirmation_lock:
  198. self._queue.put(True)
  199. self._confirmation_event.wait()
  200. self._confirmation_event.clear()
  201. def tasks_to_complete(self):
  202. if self._enqueue and self._owner_process_pid != os.getpid():
  203. return []
  204. lock = self._queue_lock if self._enqueue else self._protected_lock()
  205. with lock:
  206. return self._sink.tasks_to_complete()
  207. def update_format(self, level_id):
  208. if not self._colorize or self._is_formatter_dynamic:
  209. return
  210. ansi_code = self._levels_ansi_codes[level_id]
  211. self._precolorized_formats[level_id] = self._formatter.colorize(ansi_code)
  212. @property
  213. def levelno(self):
  214. return self._levelno
  215. @staticmethod
  216. def _serialize_record(text, record):
  217. exception = record["exception"]
  218. if exception is not None:
  219. exception = {
  220. "type": None if exception.type is None else exception.type.__name__,
  221. "value": exception.value,
  222. "traceback": bool(exception.traceback),
  223. }
  224. serializable = {
  225. "text": text,
  226. "record": {
  227. "elapsed": {
  228. "repr": record["elapsed"],
  229. "seconds": record["elapsed"].total_seconds(),
  230. },
  231. "exception": exception,
  232. "extra": record["extra"],
  233. "file": {"name": record["file"].name, "path": record["file"].path},
  234. "function": record["function"],
  235. "level": {
  236. "icon": record["level"].icon,
  237. "name": record["level"].name,
  238. "no": record["level"].no,
  239. },
  240. "line": record["line"],
  241. "message": record["message"],
  242. "module": record["module"],
  243. "name": record["name"],
  244. "process": {"id": record["process"].id, "name": record["process"].name},
  245. "thread": {"id": record["thread"].id, "name": record["thread"].name},
  246. "time": {"repr": record["time"], "timestamp": record["time"].timestamp()},
  247. },
  248. }
  249. return json.dumps(serializable, default=str, ensure_ascii=False) + "\n"
  250. def _queued_writer(self):
  251. message = None
  252. queue = self._queue
  253. # We need to use a lock to protect sink during fork.
  254. # Particularly, writing to stderr may lead to deadlock in child process.
  255. lock = self._queue_lock
  256. while True:
  257. try:
  258. message = queue.get()
  259. except Exception:
  260. with lock:
  261. self._error_interceptor.print(None)
  262. continue
  263. if message is None:
  264. break
  265. if message is True:
  266. self._confirmation_event.set()
  267. continue
  268. with lock:
  269. try:
  270. self._sink.write(message)
  271. except Exception:
  272. self._error_interceptor.print(message.record)
  273. def __getstate__(self):
  274. state = self.__dict__.copy()
  275. state["_lock"] = None
  276. state["_lock_acquired"] = None
  277. state["_memoize_dynamic_format"] = None
  278. if self._enqueue:
  279. state["_sink"] = None
  280. state["_thread"] = None
  281. state["_owner_process"] = None
  282. state["_queue_lock"] = None
  283. return state
  284. def __setstate__(self, state):
  285. self.__dict__.update(state)
  286. self._lock = create_handler_lock()
  287. self._lock_acquired = threading.local()
  288. if self._enqueue:
  289. self._queue_lock = create_handler_lock()
  290. if self._is_formatter_dynamic:
  291. if self._colorize:
  292. self._memoize_dynamic_format = memoize(prepare_colored_format)
  293. else:
  294. self._memoize_dynamic_format = memoize(prepare_stripped_format)