_async_read_write.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. """Async wrapper around :class:`ReadWriteLock` for use with ``asyncio``."""
  2. from __future__ import annotations
  3. import asyncio
  4. import functools
  5. from contextlib import asynccontextmanager
  6. from typing import TYPE_CHECKING
  7. from ._read_write import ReadWriteLock
  8. if TYPE_CHECKING:
  9. import os
  10. from collections.abc import AsyncGenerator, Callable
  11. from concurrent import futures
  12. from types import TracebackType
  13. class AsyncAcquireReadWriteReturnProxy:
  14. """Context-aware object that releases the async read/write lock on exit."""
  15. def __init__(self, lock: AsyncReadWriteLock) -> None:
  16. self.lock = lock
  17. async def __aenter__(self) -> AsyncReadWriteLock:
  18. return self.lock
  19. async def __aexit__(
  20. self,
  21. exc_type: type[BaseException] | None,
  22. exc_value: BaseException | None,
  23. traceback: TracebackType | None,
  24. ) -> None:
  25. await self.lock.release()
  26. class AsyncReadWriteLock:
  27. """
  28. Async wrapper around :class:`ReadWriteLock` for use in ``asyncio`` applications.
  29. Because Python's :mod:`sqlite3` module has no async API, all blocking SQLite operations are dispatched to a thread
  30. pool via ``loop.run_in_executor()``. Reentrancy, upgrade/downgrade rules, and singleton behavior are delegated
  31. to the underlying :class:`ReadWriteLock`.
  32. :param lock_file: path to the SQLite database file used as the lock
  33. :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
  34. :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
  35. :param is_singleton: if ``True``, reuse existing :class:`ReadWriteLock` instances for the same resolved path
  36. :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop
  37. :param executor: executor for ``run_in_executor``; ``None`` uses the default executor
  38. .. versionadded:: 3.21.0
  39. """
  40. def __init__( # noqa: PLR0913
  41. self,
  42. lock_file: str | os.PathLike[str],
  43. timeout: float = -1,
  44. *,
  45. blocking: bool = True,
  46. is_singleton: bool = True,
  47. loop: asyncio.AbstractEventLoop | None = None,
  48. executor: futures.Executor | None = None,
  49. ) -> None:
  50. self._lock = ReadWriteLock(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
  51. self._loop = loop
  52. self._executor = executor
  53. @property
  54. def lock_file(self) -> str:
  55. """:returns: the path to the lock file."""
  56. return self._lock.lock_file
  57. @property
  58. def timeout(self) -> float:
  59. """:returns: the default timeout."""
  60. return self._lock.timeout
  61. @property
  62. def blocking(self) -> bool:
  63. """:returns: whether blocking is enabled by default."""
  64. return self._lock.blocking
  65. @property
  66. def loop(self) -> asyncio.AbstractEventLoop | None:
  67. """:returns: the event loop (or ``None`` for the running loop)."""
  68. return self._loop
  69. @property
  70. def executor(self) -> futures.Executor | None:
  71. """:returns: the executor (or ``None`` for the default)."""
  72. return self._executor
  73. async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object:
  74. loop = self._loop or asyncio.get_running_loop()
  75. return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs))
  76. async def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
  77. """
  78. Acquire a shared read lock.
  79. See :meth:`ReadWriteLock.acquire_read` for full semantics.
  80. :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
  81. :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
  82. :returns: a proxy that can be used as an async context manager to release the lock
  83. :raises RuntimeError: if a write lock is already held on this instance
  84. :raises Timeout: if the lock cannot be acquired within *timeout* seconds
  85. """
  86. await self._run(self._lock.acquire_read, timeout, blocking=blocking)
  87. return AsyncAcquireReadWriteReturnProxy(lock=self)
  88. async def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
  89. """
  90. Acquire an exclusive write lock.
  91. See :meth:`ReadWriteLock.acquire_write` for full semantics.
  92. :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
  93. :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
  94. :returns: a proxy that can be used as an async context manager to release the lock
  95. :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
  96. :raises Timeout: if the lock cannot be acquired within *timeout* seconds
  97. """
  98. await self._run(self._lock.acquire_write, timeout, blocking=blocking)
  99. return AsyncAcquireReadWriteReturnProxy(lock=self)
  100. async def release(self, *, force: bool = False) -> None:
  101. """
  102. Release one level of the current lock.
  103. See :meth:`ReadWriteLock.release` for full semantics.
  104. :param force: if ``True``, release the lock completely regardless of the current lock level
  105. :raises RuntimeError: if no lock is currently held and *force* is ``False``
  106. """
  107. await self._run(self._lock.release, force=force)
  108. @asynccontextmanager
  109. async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
  110. """
  111. Async context manager that acquires and releases a shared read lock.
  112. Falls back to instance defaults for *timeout* and *blocking* when ``None``.
  113. :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
  114. :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
  115. """
  116. if timeout is None:
  117. timeout = self._lock.timeout
  118. if blocking is None:
  119. blocking = self._lock.blocking
  120. await self.acquire_read(timeout, blocking=blocking)
  121. try:
  122. yield
  123. finally:
  124. await self.release()
  125. @asynccontextmanager
  126. async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
  127. """
  128. Async context manager that acquires and releases an exclusive write lock.
  129. Falls back to instance defaults for *timeout* and *blocking* when ``None``.
  130. :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
  131. :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
  132. """
  133. if timeout is None:
  134. timeout = self._lock.timeout
  135. if blocking is None:
  136. blocking = self._lock.blocking
  137. await self.acquire_write(timeout, blocking=blocking)
  138. try:
  139. yield
  140. finally:
  141. await self.release()
  142. async def close(self) -> None:
  143. """
  144. Release the lock (if held) and close the underlying SQLite connection.
  145. After calling this method, the lock instance is no longer usable.
  146. """
  147. await self._run(self._lock.close)
  148. __all__ = [
  149. "AsyncAcquireReadWriteReturnProxy",
  150. "AsyncReadWriteLock",
  151. ]