| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- import contextlib
- import os
- import shutil
- import stat
- import tempfile
- import time
- from collections.abc import Callable, Generator
- from functools import partial
- from pathlib import Path
- import yaml
- from filelock import BaseFileLock, FileLock, SoftFileLock, Timeout
- from .. import constants
- from . import logging
- logger = logging.get_logger(__name__)
- # Wrap `yaml.dump` to set `allow_unicode=True` by default.
- #
- # Example:
- # ```py
- # >>> yaml.dump({"emoji": "👀", "some unicode": "日本か"})
- # 'emoji: "\\U0001F440"\nsome unicode: "\\u65E5\\u672C\\u304B"\n'
- #
- # >>> yaml_dump({"emoji": "👀", "some unicode": "日本か"})
- # 'emoji: "👀"\nsome unicode: "日本か"\n'
- # ```
- yaml_dump: Callable[..., str] = partial(yaml.dump, stream=None, allow_unicode=True) # type: ignore
- @contextlib.contextmanager
- def SoftTemporaryDirectory(
- suffix: str | None = None,
- prefix: str | None = None,
- dir: Path | str | None = None,
- **kwargs,
- ) -> Generator[Path, None, None]:
- """
- Context manager to create a temporary directory and safely delete it.
- If tmp directory cannot be deleted normally, we set the WRITE permission and retry.
- If cleanup still fails, we give up but don't raise an exception. This is equivalent
- to `tempfile.TemporaryDirectory(..., ignore_cleanup_errors=True)` introduced in
- Python 3.10.
- See https://www.scivision.dev/python-tempfile-permission-error-windows/.
- """
- tmpdir = tempfile.TemporaryDirectory(prefix=prefix, suffix=suffix, dir=dir, **kwargs)
- yield Path(tmpdir.name).resolve()
- try:
- # First once with normal cleanup
- shutil.rmtree(tmpdir.name)
- except Exception:
- # If failed, try to set write permission and retry
- try:
- shutil.rmtree(tmpdir.name, onerror=_set_write_permission_and_retry)
- except Exception:
- pass
- # And finally, cleanup the tmpdir.
- # If it fails again, give up but do not throw error
- try:
- tmpdir.cleanup()
- except Exception:
- pass
- def _set_write_permission_and_retry(func, path, excinfo):
- os.chmod(path, stat.S_IWRITE)
- func(path)
- @contextlib.contextmanager
- def WeakFileLock(lock_file: str | Path, *, timeout: float | None = None) -> Generator[BaseFileLock, None, None]:
- """A filelock with some custom logic.
- This filelock is weaker than the default filelock in that:
- 1. It won't raise an exception if release fails.
- 2. It will default to a SoftFileLock if the filesystem does not support flock.
- 3. Lock files are created with mode 0o664 (group-writable) instead of the default 0o644.
- This allows multiple users sharing a cache directory to wait for locks.
- An INFO log message is emitted every 10 seconds if the lock is not acquired immediately.
- If a timeout is provided, a `filelock.Timeout` exception is raised if the lock is not acquired within the timeout.
- """
- log_interval = constants.FILELOCK_LOG_EVERY_SECONDS
- lock = FileLock(lock_file, timeout=log_interval, mode=0o664)
- start_time = time.time()
- while True:
- elapsed_time = time.time() - start_time
- if timeout is not None and elapsed_time >= timeout:
- raise Timeout(str(lock_file))
- try:
- lock.acquire(timeout=min(log_interval, timeout - elapsed_time) if timeout else log_interval)
- except Timeout:
- logger.info(
- f"Still waiting to acquire lock on {lock_file} (elapsed: {time.time() - start_time:.1f} seconds)"
- )
- except NotImplementedError as e:
- if "use SoftFileLock instead" in str(e):
- logger.warning(
- "FileSystem does not appear to support flock. Falling back to SoftFileLock for %s", lock_file
- )
- lock = SoftFileLock(lock_file, timeout=log_interval)
- continue
- else:
- break
- try:
- yield lock
- finally:
- try:
- lock.release()
- except OSError:
- try:
- Path(lock_file).unlink()
- except OSError:
- pass
|