_windows.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. from __future__ import annotations
  2. import os
  3. import sys
  4. from contextlib import suppress
  5. from errno import EACCES
  6. from pathlib import Path
  7. from typing import cast
  8. from ._api import BaseFileLock
  9. from ._util import ensure_directory_exists, raise_on_not_writable_file
  10. if sys.platform == "win32": # pragma: win32 cover
  11. import ctypes
  12. import msvcrt
  13. from ctypes import wintypes
  14. # Windows API constants for reparse point detection
  15. FILE_ATTRIBUTE_REPARSE_POINT = 0x00000400
  16. INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF
  17. # Load kernel32.dll
  18. _kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
  19. _kernel32.GetFileAttributesW.argtypes = [wintypes.LPCWSTR]
  20. _kernel32.GetFileAttributesW.restype = wintypes.DWORD
  21. def _is_reparse_point(path: str) -> bool:
  22. """
  23. Check if a path is a reparse point (symlink, junction, etc.) on Windows.
  24. :param path: Path to check
  25. :returns: True if path is a reparse point, False otherwise
  26. :raises OSError: If GetFileAttributesW fails for reasons other than file-not-found
  27. """
  28. attrs = _kernel32.GetFileAttributesW(path)
  29. if attrs == INVALID_FILE_ATTRIBUTES:
  30. # File doesn't exist yet - that's fine, we'll create it
  31. err = ctypes.get_last_error()
  32. if err == 2: # noqa: PLR2004 # ERROR_FILE_NOT_FOUND
  33. return False
  34. if err == 3: # noqa: PLR2004 # ERROR_PATH_NOT_FOUND
  35. return False
  36. # Some other error - let caller handle it
  37. return False
  38. return bool(attrs & FILE_ATTRIBUTE_REPARSE_POINT)
  39. class WindowsFileLock(BaseFileLock):
  40. """
  41. Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems.
  42. Lock file cleanup: Windows attempts to delete the lock file after release, but deletion is
  43. not guaranteed in multi-threaded scenarios where another thread holds an open handle. The lock
  44. file may persist on disk, which does not affect lock correctness.
  45. """
  46. def _acquire(self) -> None:
  47. raise_on_not_writable_file(self.lock_file)
  48. ensure_directory_exists(self.lock_file)
  49. # Security check: Refuse to open reparse points (symlinks, junctions)
  50. # This prevents TOCTOU symlink attacks (CVE-TBD)
  51. if _is_reparse_point(self.lock_file):
  52. msg = f"Lock file is a reparse point (symlink/junction): {self.lock_file}"
  53. raise OSError(msg)
  54. flags = (
  55. os.O_RDWR # open for read and write
  56. | os.O_CREAT # create file if not exists
  57. )
  58. try:
  59. fd = os.open(self.lock_file, flags, self._open_mode())
  60. except OSError as exception:
  61. if exception.errno != EACCES: # has no access to this lock
  62. raise
  63. else:
  64. try:
  65. msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
  66. except OSError as exception:
  67. os.close(fd) # close file first
  68. if exception.errno != EACCES: # file is already locked
  69. raise
  70. else:
  71. self._context.lock_file_fd = fd
  72. def _release(self) -> None:
  73. fd = cast("int", self._context.lock_file_fd)
  74. self._context.lock_file_fd = None
  75. msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
  76. os.close(fd)
  77. with suppress(OSError):
  78. Path(self.lock_file).unlink()
  79. else: # pragma: win32 no cover
  80. class WindowsFileLock(BaseFileLock):
  81. """Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems."""
  82. def _acquire(self) -> None:
  83. raise NotImplementedError
  84. def _release(self) -> None:
  85. raise NotImplementedError
  86. __all__ = [
  87. "WindowsFileLock",
  88. ]