_read_write.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. from __future__ import annotations
  2. import atexit
  3. import logging
  4. import os
  5. import pathlib
  6. import sqlite3
  7. import threading
  8. import time
  9. from contextlib import contextmanager, suppress
  10. from typing import TYPE_CHECKING, Literal
  11. from weakref import WeakValueDictionary
  12. from ._api import AcquireReturnProxy
  13. from ._error import Timeout
  14. if TYPE_CHECKING:
  15. from collections.abc import Generator
  16. _LOGGER = logging.getLogger("filelock")
  17. _all_connections: set[sqlite3.Connection] = set()
  18. _all_connections_lock = threading.Lock()
  19. def _cleanup_connections() -> None:
  20. with _all_connections_lock:
  21. for con in list(_all_connections):
  22. with suppress(Exception):
  23. con.close()
  24. _all_connections.clear()
  25. atexit.register(_cleanup_connections)
  26. # sqlite3_busy_timeout() accepts a C int, max 2_147_483_647 on 32-bit. Use a lower value to be safe (~23 days).
  27. _MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1
  28. def timeout_for_sqlite(timeout: float, *, blocking: bool, already_waited: float) -> int:
  29. if blocking is False:
  30. return 0
  31. if timeout == -1:
  32. return _MAX_SQLITE_TIMEOUT_MS
  33. if timeout < 0:
  34. msg = "timeout must be a non-negative number or -1"
  35. raise ValueError(msg)
  36. remaining = max(timeout - already_waited, 0) if timeout > 0 else timeout
  37. timeout_ms = int(remaining * 1000)
  38. if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0:
  39. _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS)
  40. return _MAX_SQLITE_TIMEOUT_MS
  41. return timeout_ms
  42. class _ReadWriteLockMeta(type):
  43. """
  44. Metaclass that handles singleton resolution when is_singleton=True.
  45. Singleton logic lives here rather than in ReadWriteLock.get_lock so that ``ReadWriteLock(path)`` transparently
  46. returns cached instances without a 2-arg ``super()`` call that type checkers cannot verify.
  47. """
  48. _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock]
  49. _instances_lock: threading.Lock
  50. def __call__(
  51. cls,
  52. lock_file: str | os.PathLike[str],
  53. timeout: float = -1,
  54. *,
  55. blocking: bool = True,
  56. is_singleton: bool = True,
  57. ) -> ReadWriteLock:
  58. if not is_singleton:
  59. return super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
  60. normalized = pathlib.Path(lock_file).resolve()
  61. with cls._instances_lock:
  62. if normalized not in cls._instances:
  63. instance = super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
  64. cls._instances[normalized] = instance
  65. else:
  66. instance = cls._instances[normalized]
  67. if instance.timeout != timeout or instance.blocking != blocking:
  68. msg = (
  69. f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking},"
  70. f" cannot be changed to timeout={timeout}, blocking={blocking}"
  71. )
  72. raise ValueError(msg)
  73. return instance
  74. class ReadWriteLock(metaclass=_ReadWriteLockMeta):
  75. """
  76. Cross-process read-write lock backed by SQLite.
  77. Allows concurrent shared readers or a single exclusive writer. The lock is reentrant within the same mode (multiple
  78. ``acquire_read`` calls nest, as do multiple ``acquire_write`` calls from the same thread), but upgrading from read
  79. to write or downgrading from write to read raises :class:`RuntimeError`. Write locks are pinned to the thread that
  80. acquired them.
  81. By default, ``is_singleton=True``: calling ``ReadWriteLock(path)`` with the same resolved path returns the same
  82. instance. The lock file must use a ``.db`` extension (SQLite database).
  83. :param lock_file: path to the SQLite database file used as the lock
  84. :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
  85. :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
  86. :param is_singleton: if ``True``, reuse existing instances for the same resolved path
  87. .. versionadded:: 3.21.0
  88. """
  89. _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] = WeakValueDictionary()
  90. _instances_lock = threading.Lock()
  91. @classmethod
  92. def get_lock(
  93. cls, lock_file: str | os.PathLike[str], timeout: float = -1, *, blocking: bool = True
  94. ) -> ReadWriteLock:
  95. """
  96. Return the singleton :class:`ReadWriteLock` for *lock_file*.
  97. :param lock_file: path to the SQLite database file used as the lock
  98. :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
  99. :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
  100. :returns: the singleton lock instance
  101. :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values
  102. """
  103. return cls(lock_file, timeout, blocking=blocking)
  104. def __init__(
  105. self,
  106. lock_file: str | os.PathLike[str],
  107. timeout: float = -1,
  108. *,
  109. blocking: bool = True,
  110. is_singleton: bool = True, # noqa: ARG002 # consumed by _ReadWriteLockMeta.__call__
  111. ) -> None:
  112. self.lock_file = os.fspath(lock_file)
  113. self.timeout = timeout
  114. self.blocking = blocking
  115. self._transaction_lock = threading.Lock() # serializes the (possibly blocking) SQLite transaction work
  116. self._internal_lock = threading.Lock() # protects _lock_level / _current_mode updates and rollback
  117. self._lock_level = 0
  118. self._current_mode: Literal["read", "write"] | None = None
  119. self._write_thread_id: int | None = None
  120. self._con = sqlite3.connect(self.lock_file, check_same_thread=False)
  121. with _all_connections_lock:
  122. _all_connections.add(self._con)
  123. def _acquire_transaction_lock(self, *, blocking: bool, timeout: float) -> None:
  124. if timeout == -1:
  125. # blocking=True with no timeout means wait indefinitely per threading.Lock.acquire semantics
  126. acquired = self._transaction_lock.acquire(blocking)
  127. else:
  128. acquired = self._transaction_lock.acquire(blocking, timeout)
  129. if not acquired:
  130. raise Timeout(self.lock_file) from None
  131. def _validate_reentrant(self, mode: Literal["read", "write"], opposite: str, direction: str) -> AcquireReturnProxy:
  132. if self._current_mode != mode:
  133. msg = (
  134. f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): "
  135. f"already holding a {opposite} lock ({direction} not allowed)"
  136. )
  137. raise RuntimeError(msg)
  138. if mode == "write" and (cur := threading.get_ident()) != self._write_thread_id:
  139. msg = (
  140. f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) "
  141. f"from thread {cur} while it is held by thread {self._write_thread_id}"
  142. )
  143. raise RuntimeError(msg)
  144. self._lock_level += 1
  145. return AcquireReturnProxy(lock=self)
  146. def _configure_and_begin(
  147. self, mode: Literal["read", "write"], timeout: float, *, blocking: bool, start_time: float
  148. ) -> None:
  149. waited = time.perf_counter() - start_time
  150. timeout_ms = timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)
  151. self._con.execute(f"PRAGMA busy_timeout={timeout_ms};").close()
  152. # Use legacy journal mode (not WAL) because WAL does not block readers when a concurrent EXCLUSIVE
  153. # write transaction is active, making read-write locking impossible without modifying table data.
  154. # MEMORY is safe here since no actual writes happen — crashes cannot corrupt the DB.
  155. # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
  156. #
  157. # Set here (not in __init__) because this pragma itself may block on a locked database,
  158. # so it must run after busy_timeout is configured above.
  159. self._con.execute("PRAGMA journal_mode=MEMORY;").close()
  160. # Recompute remaining timeout after the potentially blocking journal_mode pragma.
  161. waited = time.perf_counter() - start_time
  162. if (recomputed := timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)) != timeout_ms:
  163. self._con.execute(f"PRAGMA busy_timeout={recomputed};").close()
  164. stmt = "BEGIN EXCLUSIVE TRANSACTION;" if mode == "write" else "BEGIN TRANSACTION;"
  165. self._con.execute(stmt).close()
  166. if mode == "read":
  167. # A SELECT is needed to force SQLite to actually acquire the SHARED lock on the database.
  168. # https://www.sqlite.org/lockingv3.html#transaction_control
  169. self._con.execute("SELECT name FROM sqlite_schema LIMIT 1;").close()
  170. def _acquire(self, mode: Literal["read", "write"], timeout: float, *, blocking: bool) -> AcquireReturnProxy:
  171. opposite = "write" if mode == "read" else "read"
  172. direction = "downgrade" if mode == "read" else "upgrade"
  173. with self._internal_lock:
  174. if self._lock_level > 0:
  175. return self._validate_reentrant(mode, opposite, direction)
  176. start_time = time.perf_counter()
  177. self._acquire_transaction_lock(blocking=blocking, timeout=timeout)
  178. try:
  179. # Double-check: another thread may have acquired the lock while we waited on _transaction_lock.
  180. with self._internal_lock:
  181. if self._lock_level > 0:
  182. return self._validate_reentrant(mode, opposite, direction)
  183. self._configure_and_begin(mode, timeout, blocking=blocking, start_time=start_time)
  184. with self._internal_lock:
  185. self._current_mode = mode
  186. self._lock_level = 1
  187. if mode == "write":
  188. self._write_thread_id = threading.get_ident()
  189. return AcquireReturnProxy(lock=self)
  190. except sqlite3.OperationalError as exc:
  191. if "database is locked" not in str(exc):
  192. raise
  193. raise Timeout(self.lock_file) from None
  194. finally:
  195. self._transaction_lock.release()
  196. def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
  197. """
  198. Acquire a shared read lock.
  199. If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a
  200. read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed).
  201. :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
  202. :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
  203. :returns: a proxy that can be used as a context manager to release the lock
  204. :raises RuntimeError: if a write lock is already held on this instance
  205. :raises Timeout: if the lock cannot be acquired within *timeout* seconds
  206. """
  207. return self._acquire("read", timeout, blocking=blocking)
  208. def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
  209. """
  210. Acquire an exclusive write lock.
  211. If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant).
  212. Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not allowed).
  213. Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises
  214. :class:`RuntimeError`.
  215. :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
  216. :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
  217. :returns: a proxy that can be used as a context manager to release the lock
  218. :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
  219. :raises Timeout: if the lock cannot be acquired within *timeout* seconds
  220. """
  221. return self._acquire("write", timeout, blocking=blocking)
  222. def release(self, *, force: bool = False) -> None:
  223. """
  224. Release one level of the current lock.
  225. When the lock level reaches zero the underlying SQLite transaction is rolled back, releasing the database lock.
  226. :param force: if ``True``, release the lock completely regardless of the current lock level
  227. :raises RuntimeError: if no lock is currently held and *force* is ``False``
  228. """
  229. should_rollback = False
  230. with self._internal_lock:
  231. if self._lock_level == 0:
  232. if force:
  233. return
  234. msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held"
  235. raise RuntimeError(msg)
  236. if force:
  237. self._lock_level = 0
  238. else:
  239. self._lock_level -= 1
  240. if self._lock_level == 0:
  241. self._current_mode = None
  242. self._write_thread_id = None
  243. should_rollback = True
  244. if should_rollback:
  245. self._con.rollback()
  246. @contextmanager
  247. def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
  248. """
  249. Context manager that acquires and releases a shared read lock.
  250. Falls back to instance defaults for *timeout* and *blocking* when ``None``.
  251. :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
  252. :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
  253. """
  254. if timeout is None:
  255. timeout = self.timeout
  256. if blocking is None:
  257. blocking = self.blocking
  258. self.acquire_read(timeout, blocking=blocking)
  259. try:
  260. yield
  261. finally:
  262. self.release()
  263. @contextmanager
  264. def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
  265. """
  266. Context manager that acquires and releases an exclusive write lock.
  267. Falls back to instance defaults for *timeout* and *blocking* when ``None``.
  268. :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
  269. :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
  270. """
  271. if timeout is None:
  272. timeout = self.timeout
  273. if blocking is None:
  274. blocking = self.blocking
  275. self.acquire_write(timeout, blocking=blocking)
  276. try:
  277. yield
  278. finally:
  279. self.release()
  280. def close(self) -> None:
  281. """
  282. Release the lock (if held) and close the underlying SQLite connection.
  283. After calling this method, the lock instance is no longer usable.
  284. """
  285. self.release(force=True)
  286. self._con.close()
  287. with _all_connections_lock:
  288. _all_connections.discard(self._con)