interfaces.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import os
  2. import warnings
  3. from enum import Enum
  4. from typing import TYPE_CHECKING, Optional, Tuple
  5. import pyarrow
  6. from ray.util.annotations import DeveloperAPI, PublicAPI
  7. if TYPE_CHECKING:
  8. from ray.data.datasource import PathPartitionFilter
  9. @PublicAPI(stability="alpha")
  10. class CheckpointBackend(Enum):
  11. """Supported backends for storing and reading checkpoint files.
  12. Currently, only one type of backend is supported:
  13. * Batch-based backends: CLOUD_OBJECT_STORAGE and FILE_STORAGE.
  14. Their differences are as follows:
  15. 1. Writing checkpoints: Batch-based backends write a checkpoint file
  16. for each block.
  17. 2. Loading checkpoints and filtering input data: Batch-based backends
  18. load all checkpoint data into memory prior to dataset execution.
  19. The checkpoint data is then passed to each read task to perform filtering.
  20. """
  21. CLOUD_OBJECT_STORAGE = "CLOUD_OBJECT_STORAGE"
  22. """
  23. Batch-based checkpoint backend that uses cloud object storage, such as
  24. AWS S3, Google Cloud Storage, etc.
  25. """
  26. FILE_STORAGE = "FILE_STORAGE"
  27. """
  28. Batch based checkpoint backend that uses file system storage.
  29. Note, when using this backend, the checkpoint path must be a network-mounted
  30. file system (e.g. `/mnt/cluster_storage/`).
  31. """
  32. @PublicAPI(stability="beta")
  33. class CheckpointConfig:
  34. """Configuration for checkpointing.
  35. Args:
  36. id_column: Name of the ID column in the input dataset.
  37. ID values must be unique across all rows in the dataset and must persist
  38. during all operators.
  39. checkpoint_path: Path to store the checkpoint data. It can be a path to a cloud
  40. object storage (e.g. `s3://bucket/path`) or a file system path.
  41. If the latter, the path must be a network-mounted file system (e.g.
  42. `/mnt/cluster_storage/`) that is accessible to the entire cluster.
  43. If not set, defaults to `RAY_DATA_CHECKPOINT_PATH_BUCKET/ray_data_checkpoint`.
  44. delete_checkpoint_on_success: If true, automatically delete checkpoint
  45. data when the dataset execution succeeds. Only supported for
  46. batch-based backend currently.
  47. override_filesystem: Override the :class:`pyarrow.fs.FileSystem` object used to
  48. read/write checkpoint data. Use this when you want to use custom credentials.
  49. override_backend: Override the :class:`CheckpointBackend` object used to
  50. access the checkpoint backend storage.
  51. filter_num_threads: Number of threads used to filter checkpointed rows.
  52. write_num_threads: Number of threads used to write checkpoint files for
  53. completed rows.
  54. checkpoint_path_partition_filter: Filter for checkpoint files to load during
  55. restoration when reading from `checkpoint_path`.
  56. """
  57. DEFAULT_CHECKPOINT_PATH_BUCKET_ENV_VAR = "RAY_DATA_CHECKPOINT_PATH_BUCKET"
  58. DEFAULT_CHECKPOINT_PATH_DIR = "ray_data_checkpoint"
  59. def __init__(
  60. self,
  61. id_column: Optional[str] = None,
  62. checkpoint_path: Optional[str] = None,
  63. *,
  64. delete_checkpoint_on_success: bool = True,
  65. override_filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  66. override_backend: Optional[CheckpointBackend] = None,
  67. filter_num_threads: int = 3,
  68. write_num_threads: int = 3,
  69. checkpoint_path_partition_filter: Optional["PathPartitionFilter"] = None,
  70. ):
  71. self.id_column: Optional[str] = id_column
  72. if not isinstance(self.id_column, str) or len(self.id_column) == 0:
  73. raise InvalidCheckpointingConfig(
  74. "Checkpoint ID column must be a non-empty string, "
  75. f"but got {self.id_column}"
  76. )
  77. if override_backend is not None:
  78. warnings.warn(
  79. "`override_backend` is deprecated and will be removed in August 2025.",
  80. FutureWarning,
  81. stacklevel=2,
  82. )
  83. self.checkpoint_path: str = (
  84. checkpoint_path or self._get_default_checkpoint_path()
  85. )
  86. inferred_backend, inferred_fs = self._infer_backend_and_fs(
  87. self.checkpoint_path,
  88. override_filesystem,
  89. override_backend,
  90. )
  91. self.filesystem: "pyarrow.fs.FileSystem" = inferred_fs
  92. self.backend: CheckpointBackend = inferred_backend
  93. self.delete_checkpoint_on_success: bool = delete_checkpoint_on_success
  94. self.filter_num_threads: int = filter_num_threads
  95. self.write_num_threads: int = write_num_threads
  96. self.checkpoint_path_partition_filter = checkpoint_path_partition_filter
  97. def _get_default_checkpoint_path(self) -> str:
  98. artifact_storage = os.environ.get(self.DEFAULT_CHECKPOINT_PATH_BUCKET_ENV_VAR)
  99. if artifact_storage is None:
  100. raise InvalidCheckpointingConfig(
  101. f"`{self.DEFAULT_CHECKPOINT_PATH_BUCKET_ENV_VAR}` env var is not set, "
  102. "please explicitly set `CheckpointConfig.checkpoint_path`."
  103. )
  104. return f"{artifact_storage}/{self.DEFAULT_CHECKPOINT_PATH_DIR}"
  105. def _infer_backend_and_fs(
  106. self,
  107. checkpoint_path: str,
  108. override_filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  109. override_backend: Optional[CheckpointBackend] = None,
  110. ) -> Tuple[CheckpointBackend, "pyarrow.fs.FileSystem"]:
  111. try:
  112. if override_filesystem is not None:
  113. assert isinstance(override_filesystem, pyarrow.fs.FileSystem), (
  114. "override_filesystem must be an instance of "
  115. f"`pyarrow.fs.FileSystem`, but got {type(override_filesystem)}"
  116. )
  117. fs = override_filesystem
  118. else:
  119. fs, _ = pyarrow.fs.FileSystem.from_uri(checkpoint_path)
  120. if override_backend is not None:
  121. assert isinstance(override_backend, CheckpointBackend), (
  122. "override_backend must be an instance of `CheckpointBackend`, "
  123. f"but got {type(override_backend)}"
  124. )
  125. backend = override_backend
  126. else:
  127. if isinstance(fs, pyarrow.fs.LocalFileSystem):
  128. backend = CheckpointBackend.FILE_STORAGE
  129. else:
  130. backend = CheckpointBackend.CLOUD_OBJECT_STORAGE
  131. return backend, fs
  132. except Exception as e:
  133. raise InvalidCheckpointingConfig(
  134. f"Invalid checkpoint path: {checkpoint_path}. "
  135. ) from e
  136. @DeveloperAPI
  137. class InvalidCheckpointingConfig(Exception):
  138. """Exception which indicates that the checkpointing
  139. configuration is invalid."""
  140. pass
  141. @DeveloperAPI
  142. class InvalidCheckpointingOperators(Exception):
  143. """Exception which indicates that the DAG is not eligible for checkpointing,
  144. due to one or more incompatible operators."""
  145. pass