| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- from __future__ import annotations
- import os
- import sys
- from errno import EACCES
- from typing import cast
- from ._api import BaseFileLock
- from ._util import ensure_directory_exists, raise_on_not_writable_file
- if sys.platform == "win32": # pragma: win32 cover
- import ctypes
- import msvcrt
- from ctypes import wintypes
- # Windows API constants for reparse point detection
- FILE_ATTRIBUTE_REPARSE_POINT = 0x00000400
- INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF
- # Load kernel32.dll
- _kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
- _kernel32.GetFileAttributesW.argtypes = [wintypes.LPCWSTR]
- _kernel32.GetFileAttributesW.restype = wintypes.DWORD
- def _is_reparse_point(path: str) -> bool:
- """
- Check if a path is a reparse point (symlink, junction, etc.) on Windows.
- :param path: Path to check
- :returns: True if path is a reparse point, False otherwise
- :raises OSError: If GetFileAttributesW fails for reasons other than file-not-found
- """
- attrs = _kernel32.GetFileAttributesW(path)
- if attrs == INVALID_FILE_ATTRIBUTES:
- # File doesn't exist yet - that's fine, we'll create it
- err = ctypes.get_last_error()
- if err == 2: # noqa: PLR2004 # ERROR_FILE_NOT_FOUND
- return False
- if err == 3: # noqa: PLR2004 # ERROR_PATH_NOT_FOUND
- return False
- # Some other error - let caller handle it
- return False
- return bool(attrs & FILE_ATTRIBUTE_REPARSE_POINT)
- class WindowsFileLock(BaseFileLock):
- """Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems."""
- def _acquire(self) -> None:
- raise_on_not_writable_file(self.lock_file)
- ensure_directory_exists(self.lock_file)
- # Security check: Refuse to open reparse points (symlinks, junctions)
- # This prevents TOCTOU symlink attacks (CVE-TBD)
- if _is_reparse_point(self.lock_file):
- msg = f"Lock file is a reparse point (symlink/junction): {self.lock_file}"
- raise OSError(msg)
- flags = (
- os.O_RDWR # open for read and write
- | os.O_CREAT # create file if not exists
- )
- try:
- fd = os.open(self.lock_file, flags, self._open_mode())
- except OSError as exception:
- if exception.errno != EACCES: # has no access to this lock
- raise
- else:
- try:
- msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
- except OSError as exception:
- os.close(fd) # close file first
- if exception.errno != EACCES: # file is already locked
- raise
- else:
- self._context.lock_file_fd = fd
- def _release(self) -> None:
- fd = cast("int", self._context.lock_file_fd)
- self._context.lock_file_fd = None
- msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
- os.close(fd)
- else: # pragma: win32 no cover
- class WindowsFileLock(BaseFileLock):
- """Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems."""
- def _acquire(self) -> None:
- raise NotImplementedError
- def _release(self) -> None:
- raise NotImplementedError
- __all__ = [
- "WindowsFileLock",
- ]
|