| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- import inspect
- import logging
- import weakref
- from ._asyncio_loop import get_running_loop, get_task_loop
- class StreamSink:
- def __init__(self, stream):
- self._stream = stream
- self._flushable = callable(getattr(stream, "flush", None))
- self._stoppable = callable(getattr(stream, "stop", None))
- self._completable = inspect.iscoroutinefunction(getattr(stream, "complete", None))
- def write(self, message):
- self._stream.write(message)
- if self._flushable:
- self._stream.flush()
- def stop(self):
- if self._stoppable:
- self._stream.stop()
- def tasks_to_complete(self):
- if not self._completable:
- return []
- return [self._stream.complete()]
- class StandardSink:
- def __init__(self, handler):
- self._handler = handler
- def write(self, message):
- raw_record = message.record
- message = str(message)
- exc = raw_record["exception"]
- record = logging.getLogger().makeRecord(
- raw_record["name"],
- raw_record["level"].no,
- raw_record["file"].path,
- raw_record["line"],
- message,
- (),
- (exc.type, exc.value, exc.traceback) if exc else None,
- raw_record["function"],
- {"extra": raw_record["extra"]},
- )
- if exc:
- record.exc_text = "\n"
- record.levelname = raw_record["level"].name
- self._handler.handle(record)
- def stop(self):
- self._handler.close()
- def tasks_to_complete(self):
- return []
- class AsyncSink:
- def __init__(self, function, loop, error_interceptor):
- self._function = function
- self._loop = loop
- self._error_interceptor = error_interceptor
- self._tasks = weakref.WeakSet()
- def write(self, message):
- try:
- loop = self._loop or get_running_loop()
- except RuntimeError:
- return
- coroutine = self._function(message)
- task = loop.create_task(coroutine)
- def check_exception(future):
- if future.cancelled() or future.exception() is None:
- return
- if not self._error_interceptor.should_catch():
- raise future.exception()
- self._error_interceptor.print(message.record, exception=future.exception())
- task.add_done_callback(check_exception)
- self._tasks.add(task)
- def stop(self):
- for task in self._tasks:
- task.cancel()
- def tasks_to_complete(self):
- # To avoid errors due to "self._tasks" being mutated while iterated, the
- # "tasks_to_complete()" method must be protected by the same lock as "write()" (which
- # happens to be the handler lock). However, the tasks must not be awaited while the lock is
- # acquired as this could lead to a deadlock. Therefore, we first need to collect the tasks
- # to complete, then return them so that they can be awaited outside of the lock.
- return [self._complete_task(task) for task in self._tasks]
- async def _complete_task(self, task):
- loop = get_running_loop()
- if get_task_loop(task) is not loop:
- return
- try:
- await task
- except Exception:
- pass # Handled in "check_exception()"
- def __getstate__(self):
- state = self.__dict__.copy()
- state["_tasks"] = None
- return state
- def __setstate__(self, state):
- self.__dict__.update(state)
- self._tasks = weakref.WeakSet()
- class CallableSink:
- def __init__(self, function):
- self._function = function
- def write(self, message):
- self._function(message)
- def stop(self):
- pass
- def tasks_to_complete(self):
- return []
|