_windows.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. from __future__ import annotations
  2. import os
  3. import sys
  4. from errno import EACCES
  5. from typing import cast
  6. from ._api import BaseFileLock
  7. from ._util import ensure_directory_exists, raise_on_not_writable_file
  8. if sys.platform == "win32": # pragma: win32 cover
  9. import ctypes
  10. import msvcrt
  11. from ctypes import wintypes
  12. # Windows API constants for reparse point detection
  13. FILE_ATTRIBUTE_REPARSE_POINT = 0x00000400
  14. INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF
  15. # Load kernel32.dll
  16. _kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
  17. _kernel32.GetFileAttributesW.argtypes = [wintypes.LPCWSTR]
  18. _kernel32.GetFileAttributesW.restype = wintypes.DWORD
  19. def _is_reparse_point(path: str) -> bool:
  20. """
  21. Check if a path is a reparse point (symlink, junction, etc.) on Windows.
  22. :param path: Path to check
  23. :returns: True if path is a reparse point, False otherwise
  24. :raises OSError: If GetFileAttributesW fails for reasons other than file-not-found
  25. """
  26. attrs = _kernel32.GetFileAttributesW(path)
  27. if attrs == INVALID_FILE_ATTRIBUTES:
  28. # File doesn't exist yet - that's fine, we'll create it
  29. err = ctypes.get_last_error()
  30. if err == 2: # noqa: PLR2004 # ERROR_FILE_NOT_FOUND
  31. return False
  32. if err == 3: # noqa: PLR2004 # ERROR_PATH_NOT_FOUND
  33. return False
  34. # Some other error - let caller handle it
  35. return False
  36. return bool(attrs & FILE_ATTRIBUTE_REPARSE_POINT)
  37. class WindowsFileLock(BaseFileLock):
  38. """Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems."""
  39. def _acquire(self) -> None:
  40. raise_on_not_writable_file(self.lock_file)
  41. ensure_directory_exists(self.lock_file)
  42. # Security check: Refuse to open reparse points (symlinks, junctions)
  43. # This prevents TOCTOU symlink attacks (CVE-TBD)
  44. if _is_reparse_point(self.lock_file):
  45. msg = f"Lock file is a reparse point (symlink/junction): {self.lock_file}"
  46. raise OSError(msg)
  47. flags = (
  48. os.O_RDWR # open for read and write
  49. | os.O_CREAT # create file if not exists
  50. )
  51. try:
  52. fd = os.open(self.lock_file, flags, self._open_mode())
  53. except OSError as exception:
  54. if exception.errno != EACCES: # has no access to this lock
  55. raise
  56. else:
  57. try:
  58. msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
  59. except OSError as exception:
  60. os.close(fd) # close file first
  61. if exception.errno != EACCES: # file is already locked
  62. raise
  63. else:
  64. self._context.lock_file_fd = fd
  65. def _release(self) -> None:
  66. fd = cast("int", self._context.lock_file_fd)
  67. self._context.lock_file_fd = None
  68. msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
  69. os.close(fd)
  70. else: # pragma: win32 no cover
  71. class WindowsFileLock(BaseFileLock):
  72. """Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems."""
  73. def _acquire(self) -> None:
  74. raise NotImplementedError
  75. def _release(self) -> None:
  76. raise NotImplementedError
  77. __all__ = [
  78. "WindowsFileLock",
  79. ]