asyn_wrapper.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import asyncio
  2. import functools
  3. import inspect
  4. import fsspec
  5. from fsspec.asyn import AsyncFileSystem, running_async
  6. from .chained import ChainedFileSystem
  7. def async_wrapper(func, obj=None, semaphore=None):
  8. """
  9. Wraps a synchronous function to make it awaitable.
  10. Parameters
  11. ----------
  12. func : callable
  13. The synchronous function to wrap.
  14. obj : object, optional
  15. The instance to bind the function to, if applicable.
  16. semaphore : asyncio.Semaphore, optional
  17. A semaphore to limit concurrent calls.
  18. Returns
  19. -------
  20. coroutine
  21. An awaitable version of the function.
  22. """
  23. @functools.wraps(func)
  24. async def wrapper(*args, **kwargs):
  25. if semaphore:
  26. async with semaphore:
  27. return await asyncio.to_thread(func, *args, **kwargs)
  28. return await asyncio.to_thread(func, *args, **kwargs)
  29. return wrapper
  30. class AsyncFileSystemWrapper(AsyncFileSystem, ChainedFileSystem):
  31. """
  32. A wrapper class to convert a synchronous filesystem into an asynchronous one.
  33. This class takes an existing synchronous filesystem implementation and wraps all
  34. its methods to provide an asynchronous interface.
  35. Parameters
  36. ----------
  37. sync_fs : AbstractFileSystem
  38. The synchronous filesystem instance to wrap.
  39. """
  40. protocol = "asyncwrapper", "async_wrapper"
  41. cachable = False
  42. def __init__(
  43. self,
  44. fs=None,
  45. asynchronous=None,
  46. target_protocol=None,
  47. target_options=None,
  48. semaphore=None,
  49. max_concurrent_tasks=None,
  50. **kwargs,
  51. ):
  52. if asynchronous is None:
  53. asynchronous = running_async()
  54. super().__init__(asynchronous=asynchronous, **kwargs)
  55. if fs is not None:
  56. self.sync_fs = fs
  57. else:
  58. self.sync_fs = fsspec.filesystem(target_protocol, **target_options)
  59. self.protocol = self.sync_fs.protocol
  60. self.semaphore = semaphore
  61. self._wrap_all_sync_methods()
  62. @property
  63. def fsid(self):
  64. return f"async_{self.sync_fs.fsid}"
  65. def _wrap_all_sync_methods(self):
  66. """
  67. Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
  68. """
  69. excluded_methods = {"open"}
  70. for method_name in dir(self.sync_fs):
  71. if method_name.startswith("_") or method_name in excluded_methods:
  72. continue
  73. attr = inspect.getattr_static(self.sync_fs, method_name)
  74. if isinstance(attr, property):
  75. continue
  76. method = getattr(self.sync_fs, method_name)
  77. if callable(method) and not inspect.iscoroutinefunction(method):
  78. async_method = async_wrapper(method, obj=self, semaphore=self.semaphore)
  79. setattr(self, f"_{method_name}", async_method)
  80. @classmethod
  81. def wrap_class(cls, sync_fs_class):
  82. """
  83. Create a new class that can be used to instantiate an AsyncFileSystemWrapper
  84. with lazy instantiation of the underlying synchronous filesystem.
  85. Parameters
  86. ----------
  87. sync_fs_class : type
  88. The class of the synchronous filesystem to wrap.
  89. Returns
  90. -------
  91. type
  92. A new class that wraps the provided synchronous filesystem class.
  93. """
  94. class GeneratedAsyncFileSystemWrapper(cls):
  95. def __init__(self, *args, **kwargs):
  96. sync_fs = sync_fs_class(*args, **kwargs)
  97. super().__init__(sync_fs)
  98. GeneratedAsyncFileSystemWrapper.__name__ = (
  99. f"Async{sync_fs_class.__name__}Wrapper"
  100. )
  101. return GeneratedAsyncFileSystemWrapper