| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- """Utilities"""
- from __future__ import annotations
- import asyncio
- import sys
- import typing as t
- from collections.abc import Mapping
- from contextvars import copy_context
- from functools import partial, wraps
- if t.TYPE_CHECKING:
- from collections.abc import Callable
- from contextvars import Context
- class LazyDict(Mapping[str, t.Any]):
- """Lazy evaluated read-only dictionary.
- Initialised with a dictionary of key-value pairs where the values are either
- constants or callables. Callables are evaluated each time the respective item is
- read.
- """
- def __init__(self, dict):
- self._dict = dict
- def __getitem__(self, key):
- item = self._dict.get(key)
- return item() if callable(item) else item
- def __len__(self):
- return len(self._dict)
- def __iter__(self):
- return iter(self._dict)
- T = t.TypeVar("T")
- U = t.TypeVar("U")
- V = t.TypeVar("V")
- def _async_in_context(
- f: Callable[..., t.Coroutine[T, U, V]], context: Context | None = None
- ) -> Callable[..., t.Coroutine[T, U, V]]:
- """
- Wrapper to run a coroutine in a persistent ContextVar Context.
- Backports asyncio.create_task(context=...) behavior from Python 3.11
- """
- if context is None:
- context = copy_context()
- if sys.version_info >= (3, 11):
- @wraps(f)
- async def run_in_context(*args, **kwargs):
- coro = f(*args, **kwargs)
- return await asyncio.create_task(coro, context=context)
- return run_in_context
- # don't need this backport when we require 3.11
- # context_holder so we have a modifiable container for later calls
- context_holder = [context] # type: ignore[unreachable]
- async def preserve_context(f, *args, **kwargs):
- """call a coroutine, preserving the context after it is called"""
- try:
- return await f(*args, **kwargs)
- finally:
- # persist changes to the context for future calls
- context_holder[0] = copy_context()
- @wraps(f)
- async def run_in_context_pre311(*args, **kwargs):
- ctx = context_holder[0]
- return await ctx.run(partial(asyncio.create_task, preserve_context(f, *args, **kwargs)))
- return run_in_context_pre311
|