dir_watcher.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. from __future__ import annotations
  2. import abc
  3. import fnmatch
  4. import glob
  5. import logging
  6. import os
  7. import queue
  8. import time
  9. from collections.abc import Mapping, MutableMapping, MutableSet
  10. from typing import TYPE_CHECKING, Any
  11. from wandb import util
  12. from wandb.sdk.lib.filesystem import GlobStr
  13. from wandb.sdk.lib.paths import LogicalPath
  14. if TYPE_CHECKING:
  15. import wandb.vendor.watchdog_0_9_0.observers.api as wd_api
  16. import wandb.vendor.watchdog_0_9_0.observers.polling as wd_polling
  17. import wandb.vendor.watchdog_0_9_0.watchdog.events as wd_events
  18. from wandb.sdk.internal.file_pusher import FilePusher
  19. from wandb.sdk.internal.settings_static import SettingsStatic
  20. from wandb.sdk.lib.filesystem import PolicyName
  21. else:
  22. wd_polling = util.vendor_import("wandb_watchdog.observers.polling")
  23. wd_events = util.vendor_import("wandb_watchdog.events")
  24. PathStr = str # TODO(spencerpearson): would be nice to use Path here
  25. logger = logging.getLogger(__name__)
  26. class FileEventHandler(abc.ABC):
  27. def __init__(
  28. self,
  29. file_path: PathStr,
  30. save_name: LogicalPath,
  31. file_pusher: FilePusher,
  32. *args: Any,
  33. **kwargs: Any,
  34. ) -> None:
  35. self.file_path = file_path
  36. # Convert windows paths to unix paths
  37. self.save_name = LogicalPath(save_name)
  38. self._file_pusher = file_pusher
  39. self._last_sync: float | None = None
  40. @property
  41. @abc.abstractmethod
  42. def policy(self) -> PolicyName:
  43. raise NotImplementedError
  44. @abc.abstractmethod
  45. def on_modified(self, force: bool = False) -> None:
  46. raise NotImplementedError
  47. @abc.abstractmethod
  48. def finish(self) -> None:
  49. raise NotImplementedError
  50. def on_renamed(self, new_path: PathStr, new_name: LogicalPath) -> None:
  51. self.file_path = new_path
  52. self.save_name = new_name
  53. self.on_modified()
  54. class PolicyNow(FileEventHandler):
  55. """This policy only uploads files now."""
  56. def on_modified(self, force: bool = False) -> None:
  57. # only upload if we've never uploaded or when .save is called
  58. if self._last_sync is None or force:
  59. self._file_pusher.file_changed(self.save_name, self.file_path)
  60. self._last_sync = os.path.getmtime(self.file_path)
  61. def finish(self) -> None:
  62. pass
  63. @property
  64. def policy(self) -> PolicyName:
  65. return "now"
  66. class PolicyEnd(FileEventHandler):
  67. """This policy only updates at the end of the run."""
  68. def on_modified(self, force: bool = False) -> None:
  69. pass
  70. # TODO: make sure we call this
  71. def finish(self) -> None:
  72. # We use copy=False to avoid possibly expensive copies, and because
  73. # user files shouldn't still be changing at the end of the run.
  74. self._last_sync = os.path.getmtime(self.file_path)
  75. self._file_pusher.file_changed(self.save_name, self.file_path, copy=False)
  76. @property
  77. def policy(self) -> PolicyName:
  78. return "end"
  79. class PolicyLive(FileEventHandler):
  80. """Event handler that uploads respecting throttling.
  81. Uploads files every RATE_LIMIT_SECONDS, which changes as the size increases to deal
  82. with throttling.
  83. """
  84. RATE_LIMIT_SECONDS = 15
  85. unit_dict = dict(util.POW_10_BYTES)
  86. # Wait to upload until size has increased 20% from last upload
  87. RATE_LIMIT_SIZE_INCREASE = 1.2
  88. def __init__(
  89. self,
  90. file_path: PathStr,
  91. save_name: LogicalPath,
  92. file_pusher: FilePusher,
  93. settings: SettingsStatic | None = None,
  94. *args: Any,
  95. **kwargs: Any,
  96. ) -> None:
  97. super().__init__(file_path, save_name, file_pusher, *args, **kwargs)
  98. self._last_uploaded_time: float | None = None
  99. self._last_uploaded_size: int = 0
  100. if settings is not None:
  101. if settings.x_live_policy_rate_limit is not None:
  102. self.RATE_LIMIT_SECONDS = settings.x_live_policy_rate_limit
  103. self._min_wait_time: float | None = settings.x_live_policy_wait_time
  104. else:
  105. self._min_wait_time = None
  106. @property
  107. def current_size(self) -> int:
  108. return os.path.getsize(self.file_path)
  109. @classmethod
  110. def min_wait_for_size(cls, size: int) -> float:
  111. if size < 10 * cls.unit_dict["MB"]:
  112. return 60
  113. elif size < 100 * cls.unit_dict["MB"]:
  114. return 5 * 60
  115. elif size < cls.unit_dict["GB"]:
  116. return 10 * 60
  117. else:
  118. return 20 * 60
  119. def should_update(self) -> bool:
  120. if self._last_uploaded_time is not None:
  121. # Check rate limit by time elapsed
  122. time_elapsed = time.time() - self._last_uploaded_time
  123. # if more than 15 seconds has passed potentially upload it
  124. if time_elapsed < self.RATE_LIMIT_SECONDS:
  125. return False
  126. # Check rate limit by size increase
  127. if float(self._last_uploaded_size) > 0:
  128. size_increase = self.current_size / float(self._last_uploaded_size)
  129. if size_increase < self.RATE_LIMIT_SIZE_INCREASE:
  130. return False
  131. return time_elapsed > (
  132. self._min_wait_time or self.min_wait_for_size(self.current_size)
  133. )
  134. # if the file has never been uploaded, we'll upload it
  135. return True
  136. def on_modified(self, force: bool = False) -> None:
  137. if self.current_size == 0:
  138. return
  139. if self._last_sync == os.path.getmtime(self.file_path):
  140. return
  141. if force or self.should_update():
  142. self.save_file()
  143. def save_file(self) -> None:
  144. self._last_sync = os.path.getmtime(self.file_path)
  145. self._last_uploaded_time = time.time()
  146. self._last_uploaded_size = self.current_size
  147. self._file_pusher.file_changed(self.save_name, self.file_path)
  148. def finish(self) -> None:
  149. self.on_modified(force=True)
  150. @property
  151. def policy(self) -> PolicyName:
  152. return "live"
  153. class DirWatcher:
  154. def __init__(
  155. self,
  156. settings: SettingsStatic,
  157. file_pusher: FilePusher,
  158. file_dir: PathStr | None = None,
  159. ) -> None:
  160. self._file_count = 0
  161. self._dir = file_dir or settings.files_dir
  162. self._settings = settings
  163. self._savename_file_policies: MutableMapping[LogicalPath, PolicyName] = {}
  164. self._user_file_policies: Mapping[PolicyName, MutableSet[GlobStr]] = {
  165. "end": set(),
  166. "live": set(),
  167. "now": set(),
  168. }
  169. self._file_pusher = file_pusher
  170. self._file_event_handlers: MutableMapping[LogicalPath, FileEventHandler] = {}
  171. self._file_observer = wd_polling.PollingObserver()
  172. self._file_observer.schedule(
  173. self._per_file_event_handler(), self._dir, recursive=True
  174. )
  175. self._file_observer.start()
  176. logger.info("watching files in: %s", settings.files_dir)
  177. @property
  178. def emitter(self) -> wd_api.EventEmitter | None:
  179. try:
  180. return next(iter(self._file_observer.emitters))
  181. except StopIteration:
  182. return None
  183. def update_policy(self, path: GlobStr, policy: PolicyName) -> None:
  184. # When we're dealing with one of our own media files, there's no need
  185. # to store the policy in memory. _get_file_event_handler will always
  186. # return PolicyNow. Using the path makes syncing historic runs much
  187. # faster if the name happens to include glob escapable characters. In
  188. # the future we may add a flag to "files" records that indicates it's
  189. # policy is not dynamic and doesn't need to be stored / checked.
  190. save_name = LogicalPath(
  191. os.path.relpath(os.path.join(self._dir, path), self._dir)
  192. )
  193. if save_name.startswith("media/"):
  194. pass
  195. elif path == glob.escape(path):
  196. self._savename_file_policies[save_name] = policy
  197. else:
  198. self._user_file_policies[policy].add(path)
  199. for src_path in glob.glob(os.path.join(self._dir, path)):
  200. save_name = LogicalPath(os.path.relpath(src_path, self._dir))
  201. feh = self._get_file_event_handler(src_path, save_name)
  202. # handle the case where the policy changed
  203. if feh.policy != policy:
  204. try:
  205. del self._file_event_handlers[save_name]
  206. except KeyError:
  207. # TODO: probably should do locking, but this handles moved files for now
  208. pass
  209. feh = self._get_file_event_handler(src_path, save_name)
  210. feh.on_modified(force=True)
  211. def _per_file_event_handler(self) -> wd_events.FileSystemEventHandler:
  212. """Create a Watchdog file event handler that does different things for every file."""
  213. file_event_handler = wd_events.PatternMatchingEventHandler()
  214. file_event_handler.on_created = self._on_file_created
  215. file_event_handler.on_modified = self._on_file_modified
  216. file_event_handler.on_moved = self._on_file_moved
  217. file_event_handler._patterns = [os.path.join(self._dir, os.path.normpath("*"))]
  218. # Ignore hidden files/folders
  219. # TODO: what other files should we skip?
  220. file_event_handler._ignore_patterns = [
  221. "*.tmp",
  222. "*.wandb",
  223. "wandb-summary.json",
  224. os.path.join(self._dir, ".*"),
  225. os.path.join(self._dir, "*/.*"),
  226. ]
  227. for glb in self._settings.ignore_globs:
  228. file_event_handler._ignore_patterns.append(os.path.join(self._dir, glb))
  229. return file_event_handler
  230. def _on_file_created(self, event: wd_events.FileCreatedEvent) -> None:
  231. logger.info("file/dir created: %s", event.src_path)
  232. if os.path.isdir(event.src_path):
  233. return None
  234. self._file_count += 1
  235. # We do the directory scan less often as it grows
  236. if self._file_count % 100 == 0:
  237. emitter = self.emitter
  238. if emitter:
  239. emitter._timeout = int(self._file_count / 100) + 1
  240. save_name = LogicalPath(os.path.relpath(event.src_path, self._dir))
  241. self._get_file_event_handler(event.src_path, save_name).on_modified()
  242. # TODO(spencerpearson): this pattern repeats so many times we should have a method/function for it
  243. # def _save_name(self, path: PathStr) -> LogicalPath:
  244. # return LogicalPath(os.path.relpath(path, self._dir))
  245. def _on_file_modified(self, event: wd_events.FileModifiedEvent) -> None:
  246. logger.info(f"file/dir modified: {event.src_path}")
  247. if os.path.isdir(event.src_path):
  248. return None
  249. save_name = LogicalPath(os.path.relpath(event.src_path, self._dir))
  250. self._get_file_event_handler(event.src_path, save_name).on_modified()
  251. def _on_file_moved(self, event: wd_events.FileMovedEvent) -> None:
  252. # TODO: test me...
  253. logger.info(f"file/dir moved: {event.src_path} -> {event.dest_path}")
  254. if os.path.isdir(event.dest_path):
  255. return None
  256. old_save_name = LogicalPath(os.path.relpath(event.src_path, self._dir))
  257. new_save_name = LogicalPath(os.path.relpath(event.dest_path, self._dir))
  258. # We have to move the existing file handler to the new name
  259. handler = self._get_file_event_handler(event.src_path, old_save_name)
  260. self._file_event_handlers[new_save_name] = handler
  261. del self._file_event_handlers[old_save_name]
  262. handler.on_renamed(event.dest_path, new_save_name)
  263. def _get_file_event_handler(
  264. self, file_path: PathStr, save_name: LogicalPath
  265. ) -> FileEventHandler:
  266. """Get or create an event handler for a particular file.
  267. file_path: the file's actual path
  268. save_name: its path relative to the run directory (aka the watch directory)
  269. """
  270. # Always return PolicyNow for any of our media files.
  271. if save_name.startswith("media/"):
  272. return PolicyNow(file_path, save_name, self._file_pusher, self._settings)
  273. if save_name not in self._file_event_handlers:
  274. # TODO: we can use PolicyIgnore if there are files we never want to sync
  275. if "tfevents" in save_name or "graph.pbtxt" in save_name:
  276. self._file_event_handlers[save_name] = PolicyLive(
  277. file_path, save_name, self._file_pusher, self._settings
  278. )
  279. elif save_name in self._savename_file_policies:
  280. policy_name = self._savename_file_policies[save_name]
  281. make_handler = (
  282. PolicyLive
  283. if policy_name == "live"
  284. else PolicyNow
  285. if policy_name == "now"
  286. else PolicyEnd
  287. )
  288. self._file_event_handlers[save_name] = make_handler(
  289. file_path, save_name, self._file_pusher, self._settings
  290. )
  291. else:
  292. make_handler = PolicyEnd
  293. for policy, globs in self._user_file_policies.items():
  294. if policy == "end":
  295. continue
  296. # Convert set to list to avoid RuntimeError's
  297. # TODO: we may need to add locks
  298. for g in list(globs):
  299. paths = glob.glob(os.path.join(self._dir, g))
  300. if any(save_name in p for p in paths):
  301. if policy == "live":
  302. make_handler = PolicyLive
  303. elif policy == "now":
  304. make_handler = PolicyNow
  305. self._file_event_handlers[save_name] = make_handler(
  306. file_path, save_name, self._file_pusher, self._settings
  307. )
  308. return self._file_event_handlers[save_name]
  309. def finish(self) -> None:
  310. logger.info("shutting down directory watcher")
  311. try:
  312. # avoid hanging if we crashed before the observer was started
  313. if self._file_observer.is_alive():
  314. # rather unfortunately we need to manually do a final scan of the dir
  315. # with `queue_events`, then iterate through all events before stopping
  316. # the observer to catch all files written. First we need to prevent the
  317. # existing thread from consuming our final events, then we process them
  318. self._file_observer._timeout = 0
  319. self._file_observer._stopped_event.set()
  320. self._file_observer.join()
  321. self.emitter.queue_events(0) # type: ignore[union-attr]
  322. while True:
  323. try:
  324. self._file_observer.dispatch_events(
  325. self._file_observer.event_queue, 0
  326. )
  327. except queue.Empty:
  328. break
  329. # Calling stop unschedules any inflight events so we handled them above
  330. self._file_observer.stop()
  331. # TODO: py2 TypeError: PyCObject_AsVoidPtr called with null pointer
  332. except TypeError:
  333. pass
  334. # TODO: py3 SystemError: <built-in function stop> returned an error
  335. except SystemError:
  336. pass
  337. # Ensure we've at least noticed every file in the run directory. Sometimes
  338. # we miss things because asynchronously watching filesystems isn't reliable.
  339. logger.info("scan: %s", self._dir)
  340. for dirpath, _, filenames in os.walk(self._dir):
  341. for fname in filenames:
  342. file_path = os.path.join(dirpath, fname)
  343. save_name = LogicalPath(os.path.relpath(file_path, self._dir))
  344. ignored = False
  345. for glb in self._settings.ignore_globs:
  346. if len(fnmatch.filter([save_name], glb)) > 0:
  347. ignored = True
  348. logger.info("ignored: %s matching glob %s", save_name, glb)
  349. break
  350. if ignored:
  351. continue
  352. logger.info("scan save: %s %s", file_path, save_name)
  353. self._get_file_event_handler(file_path, save_name).finish()