lock.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. """holds locking functionality that works across processes."""
  2. from __future__ import annotations
  3. import logging
  4. import os
  5. from abc import ABC, abstractmethod
  6. from contextlib import contextmanager, suppress
  7. from pathlib import Path
  8. from threading import Lock, RLock
  9. from typing import TYPE_CHECKING
  10. from filelock import FileLock, Timeout
  11. if TYPE_CHECKING:
  12. from collections.abc import Iterator
  13. from types import TracebackType
  14. LOGGER = logging.getLogger(__name__)
  15. class _CountedFileLock(FileLock):
  16. def __init__(self, lock_file: str) -> None:
  17. parent = os.path.dirname(lock_file)
  18. with suppress(OSError):
  19. os.makedirs(parent, exist_ok=True)
  20. super().__init__(lock_file)
  21. self.count = 0
  22. self.thread_safe = RLock()
  23. def acquire( # ty: ignore[invalid-method-override]
  24. self,
  25. timeout: float | None = None,
  26. poll_interval: float = 0.05,
  27. ) -> None:
  28. if not self.thread_safe.acquire(timeout=-1 if timeout is None else timeout):
  29. raise Timeout(self.lock_file)
  30. if self.count == 0:
  31. try:
  32. super().acquire(timeout, poll_interval)
  33. except BaseException:
  34. self.thread_safe.release()
  35. raise
  36. self.count += 1
  37. def release(self, force: bool = False) -> None: # noqa: FBT002
  38. with self.thread_safe:
  39. if self.count > 0:
  40. if self.count == 1:
  41. super().release(force=force)
  42. self.count -= 1
  43. if self.count == 0:
  44. # if we have no more users of this lock, release the thread lock
  45. self.thread_safe.release()
  46. _lock_store = {}
  47. _store_lock = Lock()
  48. class PathLockBase(ABC):
  49. def __init__(self, folder: str | Path) -> None:
  50. path = Path(folder)
  51. self.path = path.resolve() if path.exists() else path
  52. def __repr__(self) -> str:
  53. return f"{self.__class__.__name__}({self.path})"
  54. def __truediv__(self, other: str) -> PathLockBase:
  55. return type(self)(self.path / other)
  56. @abstractmethod
  57. def __enter__(self) -> None:
  58. raise NotImplementedError
  59. @abstractmethod
  60. def __exit__(
  61. self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
  62. ) -> None:
  63. raise NotImplementedError
  64. @abstractmethod
  65. @contextmanager
  66. def lock_for_key(self, name: str, no_block: bool = False) -> Iterator[None]: # noqa: FBT002
  67. raise NotImplementedError
  68. @abstractmethod
  69. @contextmanager
  70. def non_reentrant_lock_for_key(self, name: str) -> Iterator[None]:
  71. raise NotImplementedError
  72. class ReentrantFileLock(PathLockBase):
  73. def __init__(self, folder: str | Path) -> None:
  74. super().__init__(folder)
  75. self._lock = None
  76. def _create_lock(self, name: str = "") -> _CountedFileLock:
  77. lock_file = str(self.path / f"{name}.lock")
  78. with _store_lock:
  79. if lock_file not in _lock_store:
  80. _lock_store[lock_file] = _CountedFileLock(lock_file)
  81. return _lock_store[lock_file]
  82. @staticmethod
  83. def _del_lock(lock: _CountedFileLock | None) -> None:
  84. if lock is not None:
  85. with _store_lock, lock.thread_safe:
  86. if lock.count == 0:
  87. _lock_store.pop(lock.lock_file, None)
  88. def __del__(self) -> None:
  89. self._del_lock(self._lock)
  90. def __enter__(self) -> None:
  91. self._lock = self._create_lock()
  92. self._lock_file(self._lock)
  93. def __exit__(
  94. self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
  95. ) -> None:
  96. self._release(self._lock) # ty: ignore[invalid-argument-type]
  97. self._del_lock(self._lock)
  98. self._lock = None
  99. def _lock_file(self, lock: _CountedFileLock, no_block: bool = False) -> None: # noqa: FBT002
  100. # multiple processes might be trying to get a first lock... so we cannot check if this directory exist without
  101. # a lock, but that lock might then become expensive, and it's not clear where that lock should live.
  102. # Instead here we just ignore if we fail to create the directory.
  103. with suppress(OSError):
  104. os.makedirs(str(self.path), exist_ok=True)
  105. try:
  106. lock.acquire(0.0001)
  107. except Timeout:
  108. if no_block:
  109. raise
  110. LOGGER.debug("lock file %s present, will block until released", lock.lock_file)
  111. lock.release() # release the acquire try from above
  112. lock.acquire()
  113. @staticmethod
  114. def _release(lock: _CountedFileLock) -> None:
  115. lock.release()
  116. @contextmanager
  117. def lock_for_key(self, name: str, no_block: bool = False) -> Iterator[None]: # noqa: FBT002
  118. lock = self._create_lock(name)
  119. try:
  120. try:
  121. self._lock_file(lock, no_block)
  122. yield
  123. finally:
  124. self._release(lock)
  125. finally:
  126. self._del_lock(lock)
  127. lock = None
  128. @contextmanager
  129. def non_reentrant_lock_for_key(self, name: str) -> Iterator[None]:
  130. with _CountedFileLock(str(self.path / f"{name}.lock")):
  131. yield
  132. class NoOpFileLock(PathLockBase):
  133. def __enter__(self) -> None:
  134. raise NotImplementedError
  135. def __exit__(
  136. self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
  137. ) -> None:
  138. raise NotImplementedError
  139. @contextmanager
  140. def lock_for_key(self, name: str, no_block: bool = False) -> Iterator[None]: # noqa: ARG002, FBT002
  141. yield
  142. @contextmanager
  143. def non_reentrant_lock_for_key(self, name: str) -> Iterator[None]: # noqa: ARG002
  144. yield
  145. __all__ = [
  146. "NoOpFileLock",
  147. "ReentrantFileLock",
  148. "Timeout",
  149. ]