| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- """Async wrapper around :class:`ReadWriteLock` for use with ``asyncio``."""
- from __future__ import annotations
- import asyncio
- import functools
- from contextlib import asynccontextmanager
- from typing import TYPE_CHECKING
- from ._read_write import ReadWriteLock
- if TYPE_CHECKING:
- import os
- from collections.abc import AsyncGenerator, Callable
- from concurrent import futures
- from types import TracebackType
- class AsyncAcquireReadWriteReturnProxy:
- """Context-aware object that releases the async read/write lock on exit."""
- def __init__(self, lock: AsyncReadWriteLock) -> None:
- self.lock = lock
- async def __aenter__(self) -> AsyncReadWriteLock:
- return self.lock
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None,
- exc_value: BaseException | None,
- traceback: TracebackType | None,
- ) -> None:
- await self.lock.release()
- class AsyncReadWriteLock:
- """
- Async wrapper around :class:`ReadWriteLock` for use in ``asyncio`` applications.
- Because Python's :mod:`sqlite3` module has no async API, all blocking SQLite operations are dispatched to a thread
- pool via ``loop.run_in_executor()``. Reentrancy, upgrade/downgrade rules, and singleton behavior are delegated
- to the underlying :class:`ReadWriteLock`.
- :param lock_file: path to the SQLite database file used as the lock
- :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
- :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
- :param is_singleton: if ``True``, reuse existing :class:`ReadWriteLock` instances for the same resolved path
- :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop
- :param executor: executor for ``run_in_executor``; ``None`` uses the default executor
- .. versionadded:: 3.21.0
- """
- def __init__( # noqa: PLR0913
- self,
- lock_file: str | os.PathLike[str],
- timeout: float = -1,
- *,
- blocking: bool = True,
- is_singleton: bool = True,
- loop: asyncio.AbstractEventLoop | None = None,
- executor: futures.Executor | None = None,
- ) -> None:
- self._lock = ReadWriteLock(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
- self._loop = loop
- self._executor = executor
- @property
- def lock_file(self) -> str:
- """:returns: the path to the lock file."""
- return self._lock.lock_file
- @property
- def timeout(self) -> float:
- """:returns: the default timeout."""
- return self._lock.timeout
- @property
- def blocking(self) -> bool:
- """:returns: whether blocking is enabled by default."""
- return self._lock.blocking
- @property
- def loop(self) -> asyncio.AbstractEventLoop | None:
- """:returns: the event loop (or ``None`` for the running loop)."""
- return self._loop
- @property
- def executor(self) -> futures.Executor | None:
- """:returns: the executor (or ``None`` for the default)."""
- return self._executor
- async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object:
- loop = self._loop or asyncio.get_running_loop()
- return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs))
- async def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
- """
- Acquire a shared read lock.
- See :meth:`ReadWriteLock.acquire_read` for full semantics.
- :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
- :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
- :returns: a proxy that can be used as an async context manager to release the lock
- :raises RuntimeError: if a write lock is already held on this instance
- :raises Timeout: if the lock cannot be acquired within *timeout* seconds
- """
- await self._run(self._lock.acquire_read, timeout, blocking=blocking)
- return AsyncAcquireReadWriteReturnProxy(lock=self)
- async def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
- """
- Acquire an exclusive write lock.
- See :meth:`ReadWriteLock.acquire_write` for full semantics.
- :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
- :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
- :returns: a proxy that can be used as an async context manager to release the lock
- :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
- :raises Timeout: if the lock cannot be acquired within *timeout* seconds
- """
- await self._run(self._lock.acquire_write, timeout, blocking=blocking)
- return AsyncAcquireReadWriteReturnProxy(lock=self)
- async def release(self, *, force: bool = False) -> None:
- """
- Release one level of the current lock.
- See :meth:`ReadWriteLock.release` for full semantics.
- :param force: if ``True``, release the lock completely regardless of the current lock level
- :raises RuntimeError: if no lock is currently held and *force* is ``False``
- """
- await self._run(self._lock.release, force=force)
- @asynccontextmanager
- async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
- """
- Async context manager that acquires and releases a shared read lock.
- Falls back to instance defaults for *timeout* and *blocking* when ``None``.
- :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
- :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
- """
- if timeout is None:
- timeout = self._lock.timeout
- if blocking is None:
- blocking = self._lock.blocking
- await self.acquire_read(timeout, blocking=blocking)
- try:
- yield
- finally:
- await self.release()
- @asynccontextmanager
- async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
- """
- Async context manager that acquires and releases an exclusive write lock.
- Falls back to instance defaults for *timeout* and *blocking* when ``None``.
- :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
- :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
- """
- if timeout is None:
- timeout = self._lock.timeout
- if blocking is None:
- blocking = self._lock.blocking
- await self.acquire_write(timeout, blocking=blocking)
- try:
- yield
- finally:
- await self.release()
- async def close(self) -> None:
- """
- Release the lock (if held) and close the underlying SQLite connection.
- After calling this method, the lock instance is no longer usable.
- """
- await self._run(self._lock.close)
- __all__ = [
- "AsyncAcquireReadWriteReturnProxy",
- "AsyncReadWriteLock",
- ]
|