file_baton.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. # mypy: allow-untyped-defs
  2. import os
  3. import time
  4. import warnings
  5. class FileBaton:
  6. """A primitive, file-based synchronization utility."""
  7. def __init__(self, lock_file_path, wait_seconds=0.1, warn_after_seconds=None) -> None:
  8. """
  9. Create a new :class:`FileBaton`.
  10. Args:
  11. lock_file_path: The path to the file used for locking.
  12. wait_seconds: The seconds to periodically sleep (spin) when
  13. calling ``wait()``.
  14. warn_after_seconds: The seconds to wait before showing
  15. lock file path to warn existing lock file.
  16. """
  17. self.lock_file_path = lock_file_path
  18. self.wait_seconds = wait_seconds
  19. self.fd = None
  20. self.warn_after_seconds = warn_after_seconds
  21. def try_acquire(self) -> bool | None:
  22. """
  23. Try to atomically create a file under exclusive access.
  24. Returns:
  25. True if the file could be created, else False.
  26. """
  27. try:
  28. # pyrefly: ignore [bad-assignment]
  29. self.fd = os.open(self.lock_file_path, os.O_CREAT | os.O_EXCL)
  30. return True
  31. except FileExistsError:
  32. return False
  33. def wait(self) -> None:
  34. """
  35. Periodically sleeps for a certain amount until the baton is released.
  36. The amount of time slept depends on the ``wait_seconds`` parameter
  37. passed to the constructor.
  38. """
  39. has_warned = False
  40. start_time = time.time()
  41. while os.path.exists(self.lock_file_path):
  42. time.sleep(self.wait_seconds)
  43. if self.warn_after_seconds is not None:
  44. if time.time() - start_time > self.warn_after_seconds and not has_warned:
  45. warnings.warn(f'Waited on lock file "{self.lock_file_path}" for '
  46. f'{self.warn_after_seconds} seconds.', stacklevel=2)
  47. has_warned = True
  48. def release(self) -> None:
  49. """Release the baton and removes its file."""
  50. if self.fd is not None:
  51. os.close(self.fd)
  52. os.remove(self.lock_file_path)