_simple_sinks.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import inspect
  2. import logging
  3. import weakref
  4. from ._asyncio_loop import get_running_loop, get_task_loop
  5. class StreamSink:
  6. def __init__(self, stream):
  7. self._stream = stream
  8. self._flushable = callable(getattr(stream, "flush", None))
  9. self._stoppable = callable(getattr(stream, "stop", None))
  10. self._completable = inspect.iscoroutinefunction(getattr(stream, "complete", None))
  11. def write(self, message):
  12. self._stream.write(message)
  13. if self._flushable:
  14. self._stream.flush()
  15. def stop(self):
  16. if self._stoppable:
  17. self._stream.stop()
  18. def tasks_to_complete(self):
  19. if not self._completable:
  20. return []
  21. return [self._stream.complete()]
  22. class StandardSink:
  23. def __init__(self, handler):
  24. self._handler = handler
  25. def write(self, message):
  26. raw_record = message.record
  27. message = str(message)
  28. exc = raw_record["exception"]
  29. record = logging.getLogger().makeRecord(
  30. raw_record["name"],
  31. raw_record["level"].no,
  32. raw_record["file"].path,
  33. raw_record["line"],
  34. message,
  35. (),
  36. (exc.type, exc.value, exc.traceback) if exc else None,
  37. raw_record["function"],
  38. {"extra": raw_record["extra"]},
  39. )
  40. if exc:
  41. record.exc_text = "\n"
  42. record.levelname = raw_record["level"].name
  43. self._handler.handle(record)
  44. def stop(self):
  45. self._handler.close()
  46. def tasks_to_complete(self):
  47. return []
  48. class AsyncSink:
  49. def __init__(self, function, loop, error_interceptor):
  50. self._function = function
  51. self._loop = loop
  52. self._error_interceptor = error_interceptor
  53. self._tasks = weakref.WeakSet()
  54. def write(self, message):
  55. try:
  56. loop = self._loop or get_running_loop()
  57. except RuntimeError:
  58. return
  59. coroutine = self._function(message)
  60. task = loop.create_task(coroutine)
  61. def check_exception(future):
  62. if future.cancelled() or future.exception() is None:
  63. return
  64. if not self._error_interceptor.should_catch():
  65. raise future.exception()
  66. self._error_interceptor.print(message.record, exception=future.exception())
  67. task.add_done_callback(check_exception)
  68. self._tasks.add(task)
  69. def stop(self):
  70. for task in self._tasks:
  71. task.cancel()
  72. def tasks_to_complete(self):
  73. # To avoid errors due to "self._tasks" being mutated while iterated, the
  74. # "tasks_to_complete()" method must be protected by the same lock as "write()" (which
  75. # happens to be the handler lock). However, the tasks must not be awaited while the lock is
  76. # acquired as this could lead to a deadlock. Therefore, we first need to collect the tasks
  77. # to complete, then return them so that they can be awaited outside of the lock.
  78. return [self._complete_task(task) for task in self._tasks]
  79. async def _complete_task(self, task):
  80. loop = get_running_loop()
  81. if get_task_loop(task) is not loop:
  82. return
  83. try:
  84. await task
  85. except Exception:
  86. pass # Handled in "check_exception()"
  87. def __getstate__(self):
  88. state = self.__dict__.copy()
  89. state["_tasks"] = None
  90. return state
  91. def __setstate__(self, state):
  92. self.__dict__.update(state)
  93. self._tasks = weakref.WeakSet()
  94. class CallableSink:
  95. def __init__(self, function):
  96. self._function = function
  97. def write(self, message):
  98. self._function(message)
  99. def stop(self):
  100. pass
  101. def tasks_to_complete(self):
  102. return []