context.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. import copy
  2. import enum
  3. import logging
  4. import os
  5. import threading
  6. import warnings
  7. from dataclasses import dataclass, field
  8. from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
  9. from ray._private.ray_constants import env_bool, env_float, env_integer
  10. from ray.data._internal.logging import update_dataset_logger_for_worker
  11. from ray.data.checkpoint.interfaces import CheckpointBackend, CheckpointConfig
  12. from ray.util.annotations import DeveloperAPI
  13. from ray.util.scheduling_strategies import SchedulingStrategyT
  14. if TYPE_CHECKING:
  15. from ray.data._internal.execution.interfaces import ExecutionOptions
  16. from ray.data._internal.issue_detection.issue_detector_configuration import (
  17. IssueDetectorsConfiguration,
  18. )
  19. logger = logging.getLogger(__name__)
  20. # The context singleton on this process.
  21. _default_context: "Optional[DataContext]" = None
  22. _context_lock = threading.Lock()
  23. @DeveloperAPI(stability="alpha")
  24. class ShuffleStrategy(str, enum.Enum):
  25. """Shuffle strategy determines shuffling algorithm employed by operations
  26. like aggregate, repartition, etc"""
  27. SORT_SHUFFLE_PULL_BASED = "sort_shuffle_pull_based"
  28. SORT_SHUFFLE_PUSH_BASED = "sort_shuffle_push_based"
  29. HASH_SHUFFLE = "hash_shuffle"
  30. # We chose 128MiB for default: With streaming execution and num_cpus many concurrent
  31. # tasks, the memory footprint will be about 2 * num_cpus * target_max_block_size ~= RAM
  32. # * DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION * 0.3 (default object store memory
  33. # fraction set by Ray core), assuming typical memory:core ratio of 4:1.
  34. DEFAULT_TARGET_MAX_BLOCK_SIZE = 128 * 1024 * 1024
  35. # We set a higher target block size because we have to materialize
  36. # all input blocks anyway, so there is no performance advantage to having
  37. # smaller blocks. Setting a larger block size allows avoiding overhead from an
  38. # excessive number of partitions.
  39. # We choose 1GiB as 4x less than the typical memory:core ratio (4:1).
  40. DEFAULT_SHUFFLE_TARGET_MAX_BLOCK_SIZE = 1024 * 1024 * 1024
  41. # We will attempt to slice blocks whose size exceeds this factor *
  42. # target_max_block_size. We will warn the user if slicing fails and we produce
  43. # blocks larger than this threshold.
  44. MAX_SAFE_BLOCK_SIZE_FACTOR = 1.5
  45. DEFAULT_TARGET_MIN_BLOCK_SIZE = 1 * 1024 * 1024
  46. # This default appears to work well with most file sizes on remote storage systems,
  47. # which is very sensitive to the buffer size.
  48. DEFAULT_STREAMING_READ_BUFFER_SIZE = 32 * 1024 * 1024
  49. DEFAULT_ENABLE_PANDAS_BLOCK = True
  50. DEFAULT_PANDAS_BLOCK_IGNORE_METADATA = env_bool(
  51. "RAY_DATA_PANDAS_BLOCK_IGNORE_METADATA", False
  52. )
  53. DEFAULT_READ_OP_MIN_NUM_BLOCKS = 200
  54. DEFAULT_ACTOR_PREFETCHER_ENABLED = False
  55. DEFAULT_USE_PUSH_BASED_SHUFFLE = bool(
  56. os.environ.get("RAY_DATA_PUSH_BASED_SHUFFLE", None)
  57. )
  58. DEFAULT_SHUFFLE_STRATEGY = os.environ.get(
  59. "RAY_DATA_DEFAULT_SHUFFLE_STRATEGY", ShuffleStrategy.HASH_SHUFFLE
  60. )
  61. DEFAULT_MAX_HASH_SHUFFLE_AGGREGATORS = env_integer(
  62. "RAY_DATA_MAX_HASH_SHUFFLE_AGGREGATORS", 128
  63. )
  64. DEFAULT_SCHEDULING_STRATEGY = "SPREAD"
  65. # This default enables locality-based scheduling in Ray for tasks where arg data
  66. # transfer is a bottleneck.
  67. DEFAULT_SCHEDULING_STRATEGY_LARGE_ARGS = "DEFAULT"
  68. DEFAULT_LARGE_ARGS_THRESHOLD = 50 * 1024 * 1024
  69. DEFAULT_USE_POLARS = False
  70. DEFAULT_USE_POLARS_SORT = False
  71. DEFAULT_EAGER_FREE = bool(int(os.environ.get("RAY_DATA_EAGER_FREE", "0")))
  72. DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED = True
  73. DEFAULT_MIN_PARALLELISM = env_integer("RAY_DATA_DEFAULT_MIN_PARALLELISM", 200)
  74. DEFAULT_ENABLE_TENSOR_EXTENSION_CASTING = env_bool(
  75. "RAY_DATA_ENABLE_TENSOR_EXTENSION_CASTING",
  76. True,
  77. )
  78. # NOTE: V1 tensor type format only supports tensors of no more than 2Gb in
  79. # total cumulative size (due to it internally utilizing int32 offsets)
  80. #
  81. # V2 in turn relies on int64 offsets, therefore having a limit of ~9Eb (exabytes)
  82. DEFAULT_USE_ARROW_TENSOR_V2 = env_bool("RAY_DATA_USE_ARROW_TENSOR_V2", True)
  83. DEFAULT_AUTO_LOG_STATS = False
  84. DEFAULT_VERBOSE_STATS_LOG = False
  85. DEFAULT_TRACE_ALLOCATIONS = bool(int(os.environ.get("RAY_DATA_TRACE_ALLOCATIONS", "0")))
  86. DEFAULT_LOG_INTERNAL_STACK_TRACE_TO_STDOUT = env_bool(
  87. "RAY_DATA_LOG_INTERNAL_STACK_TRACE_TO_STDOUT", False
  88. )
  89. DEFAULT_RAY_DATA_RAISE_ORIGINAL_MAP_EXCEPTION = env_bool(
  90. "RAY_DATA_RAISE_ORIGINAL_MAP_EXCEPTION", False
  91. )
  92. DEFAULT_USE_RAY_TQDM = bool(int(os.environ.get("RAY_TQDM", "1")))
  93. # Globally enable or disable all progress bars.
  94. # If this is False, both the global and operator-level progress bars are disabled.
  95. DEFAULT_ENABLE_PROGRESS_BARS = not bool(
  96. env_integer("RAY_DATA_DISABLE_PROGRESS_BARS", 0)
  97. )
  98. DEFAULT_ENABLE_PROGRESS_BAR_NAME_TRUNCATION = env_bool(
  99. "RAY_DATA_ENABLE_PROGRESS_BAR_NAME_TRUNCATION", True
  100. )
  101. # Globally enable or disable experimental rich progress bars. This is a new
  102. # interface to replace the old tqdm progress bar implementation.
  103. DEFAULT_ENABLE_RICH_PROGRESS_BARS = bool(
  104. env_integer("RAY_DATA_ENABLE_RICH_PROGRESS_BARS", 0)
  105. )
  106. DEFAULT_ENFORCE_SCHEMAS = env_bool("RAY_DATA_ENFORCE_SCHEMAS", False)
  107. DEFAULT_ENABLE_GET_OBJECT_LOCATIONS_FOR_METRICS = False
  108. # `write_file_retry_on_errors` is deprecated in favor of `retried_io_errors`. You
  109. # shouldn't need to modify `DEFAULT_WRITE_FILE_RETRY_ON_ERRORS`.
  110. DEFAULT_WRITE_FILE_RETRY_ON_ERRORS = (
  111. "AWS Error INTERNAL_FAILURE",
  112. "AWS Error NETWORK_CONNECTION",
  113. "AWS Error SLOW_DOWN",
  114. "AWS Error UNKNOWN (HTTP status 503)",
  115. )
  116. DEFAULT_RETRIED_IO_ERRORS = (
  117. "AWS Error INTERNAL_FAILURE",
  118. "AWS Error NETWORK_CONNECTION",
  119. "AWS Error SLOW_DOWN",
  120. "AWS Error UNKNOWN (HTTP status 503)",
  121. "AWS Error SERVICE_UNAVAILABLE",
  122. )
  123. DEFAULT_ICEBERG_WRITE_FILE_MAX_ATTEMPTS = env_integer(
  124. "RAY_DATA_ICEBERG_WRITE_FILE_MAX_ATTEMPTS", 10
  125. )
  126. DEFAULT_ICEBERG_WRITE_FILE_RETRY_MAX_BACKOFF_S = env_integer(
  127. "RAY_DATA_ICEBERG_WRITE_FILE_RETRY_MAX_BACKOFF_S", 32
  128. )
  129. DEFAULT_ICEBERG_CATALOG_MAX_ATTEMPTS = env_integer(
  130. "RAY_DATA_ICEBERG_CATALOG_MAX_ATTEMPTS", 5
  131. )
  132. DEFAULT_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S = env_integer(
  133. "RAY_DATA_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S", 16
  134. )
  135. DEFAULT_ICEBERG_CATALOG_RETRIED_ERRORS = (
  136. "429",
  137. "503",
  138. "502",
  139. "500",
  140. "Too Many Requests",
  141. "Service Unavailable",
  142. "Internal Server Error",
  143. "Connection reset",
  144. "Connection refused",
  145. "Connection timed out",
  146. "Read timed out",
  147. "UNAVAILABLE",
  148. "DEADLINE_EXCEEDED",
  149. )
  150. DEFAULT_WARN_ON_DRIVER_MEMORY_USAGE_BYTES = 2 * 1024 * 1024 * 1024
  151. DEFAULT_ACTOR_TASK_RETRY_ON_ERRORS = False
  152. DEFAULT_ACTOR_INIT_RETRY_ON_ERRORS = False
  153. DEFAULT_ACTOR_INIT_MAX_RETRIES = 3
  154. DEFAULT_ENABLE_OP_RESOURCE_RESERVATION = env_bool(
  155. "RAY_DATA_ENABLE_OP_RESOURCE_RESERVATION", True
  156. )
  157. DEFAULT_OP_RESOURCE_RESERVATION_RATIO = float(
  158. os.environ.get("RAY_DATA_OP_RESERVATION_RATIO", "0.5")
  159. )
  160. DEFAULT_MAX_ERRORED_BLOCKS = 0
  161. # Use this to prefix important warning messages for the user.
  162. WARN_PREFIX = "⚠️ "
  163. # Use this to prefix important success messages for the user.
  164. OK_PREFIX = "✔️ "
  165. # The default batch size for batch transformations before it was changed to `None`.
  166. LEGACY_DEFAULT_BATCH_SIZE = 1024
  167. # Default value of the max number of blocks that can be buffered at the
  168. # streaming generator of each `DataOpTask`.
  169. # Note, if this value is too large, we'll need to allocate more memory
  170. # buffer for the pending task outputs, which may lead to bad performance
  171. # as we may not have enough memory buffer for the operator outputs.
  172. # If the value is too small, the task may be frequently blocked due to
  173. # streaming generator backpressure.
  174. DEFAULT_MAX_NUM_BLOCKS_IN_STREAMING_GEN_BUFFER = 2
  175. # Default value for whether or not to try to create directories for write
  176. # calls if the URI is an S3 URI.
  177. DEFAULT_S3_TRY_CREATE_DIR = False
  178. DEFAULT_WAIT_FOR_MIN_ACTORS_S = env_integer(
  179. "RAY_DATA_DEFAULT_WAIT_FOR_MIN_ACTORS_S", -1
  180. )
  181. DEFAULT_ACTOR_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR = env_integer(
  182. "RAY_DATA_ACTOR_DEFAULT_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR", 2
  183. )
  184. # Enable per node metrics reporting for Ray Data, disabled by default.
  185. DEFAULT_ENABLE_PER_NODE_METRICS = bool(
  186. int(os.environ.get("RAY_DATA_PER_NODE_METRICS", "0"))
  187. )
  188. DEFAULT_MIN_HASH_SHUFFLE_AGGREGATOR_WAIT_TIME_IN_S = env_integer(
  189. "RAY_DATA_MIN_HASH_SHUFFLE_AGGREGATOR_WAIT_TIME_IN_S", 300
  190. )
  191. DEFAULT_HASH_SHUFFLE_AGGREGATOR_HEALTH_WARNING_INTERVAL_S = env_integer(
  192. "RAY_DATA_HASH_SHUFFLE_AGGREGATOR_HEALTH_WARNING_INTERVAL_S", 30
  193. )
  194. DEFAULT_ACTOR_POOL_UTIL_UPSCALING_THRESHOLD: float = env_float(
  195. "RAY_DATA_DEFAULT_ACTOR_POOL_UTIL_UPSCALING_THRESHOLD",
  196. 1.75,
  197. )
  198. DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD: float = env_float(
  199. "RAY_DATA_DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD",
  200. 0.5,
  201. )
  202. DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA: int = env_integer(
  203. "RAY_DATA_DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA",
  204. 1,
  205. )
  206. # Disable dynamic output queue size backpressure by default.
  207. DEFAULT_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE: bool = env_bool(
  208. "RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE", False
  209. )
  210. DEFAULT_DOWNSTREAM_CAPACITY_BACKPRESSURE_RATIO: float = env_float(
  211. "RAY_DATA_DOWNSTREAM_CAPACITY_BACKPRESSURE_RATIO", 10.0
  212. )
  213. @DeveloperAPI
  214. @dataclass
  215. class IcebergConfig:
  216. """Configuration for Iceberg datasource operations.
  217. Args:
  218. write_file_max_attempts: Maximum number of retry attempts when writing
  219. Iceberg data files to storage. Defaults to 10.
  220. write_file_retry_max_backoff_s: Maximum backoff time in seconds between
  221. Iceberg write retry attempts. Uses exponential backoff with jitter.
  222. Defaults to 32.
  223. catalog_max_attempts: Maximum number of retry attempts for Iceberg
  224. catalog operations (load catalog, load table, commit transactions).
  225. Defaults to 5.
  226. catalog_retry_max_backoff_s: Maximum backoff time in seconds between
  227. Iceberg catalog retry attempts. Defaults to 16.
  228. catalog_retried_errors: A list of substrings of error messages that
  229. should trigger a retry for Iceberg catalog operations. Includes common
  230. HTTP error codes and connection errors.
  231. """
  232. write_file_max_attempts: int = DEFAULT_ICEBERG_WRITE_FILE_MAX_ATTEMPTS
  233. write_file_retry_max_backoff_s: int = DEFAULT_ICEBERG_WRITE_FILE_RETRY_MAX_BACKOFF_S
  234. catalog_max_attempts: int = DEFAULT_ICEBERG_CATALOG_MAX_ATTEMPTS
  235. catalog_retry_max_backoff_s: int = DEFAULT_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S
  236. catalog_retried_errors: List[str] = field(
  237. default_factory=lambda: list(DEFAULT_ICEBERG_CATALOG_RETRIED_ERRORS)
  238. )
  239. @DeveloperAPI
  240. @dataclass
  241. class AutoscalingConfig:
  242. """Configuration for autoscaling of Ray Data.
  243. Args:
  244. actor_pool_util_upscaling_threshold: Actor Pool utilization threshold for upscaling.
  245. Once Actor Pool exceeds this utilization threshold it will start adding new actors.
  246. Actor Pool utilization is defined as ratio of number of submitted tasks to the
  247. number of available concurrency-slots to run them in the current set of actors.
  248. This utilization value could exceed 100%, when the number of submitted tasks
  249. exceed available concurrency-slots to run them in the current set of actors.
  250. This is possible when `max_tasks_in_flight_per_actor`
  251. (defaults to 2 x of `max_concurrency`) > Actor's `max_concurrency`
  252. and allows to overlap task execution with the fetching of the blocks
  253. for the next task providing for ability to negotiate a trade-off
  254. between autoscaling speed and resource efficiency (i.e.,
  255. making tasks wait instead of immediately triggering execution).
  256. actor_pool_util_downscaling_threshold: Actor Pool utilization threshold for downscaling.
  257. actor_pool_max_upscaling_delta: Maximum number of actors to scale up in a single scaling decision.
  258. This limits how many actors can be added at once to prevent resource contention
  259. and scheduling pressure. Defaults to 1 for conservative scaling.
  260. """
  261. actor_pool_util_upscaling_threshold: float = (
  262. DEFAULT_ACTOR_POOL_UTIL_UPSCALING_THRESHOLD
  263. )
  264. # Actor Pool utilization threshold for downscaling
  265. actor_pool_util_downscaling_threshold: float = (
  266. DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD
  267. )
  268. # Maximum number of actors to scale up in a single scaling decision
  269. actor_pool_max_upscaling_delta: int = DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA
  270. def _execution_options_factory() -> "ExecutionOptions":
  271. # Lazily import to avoid circular dependencies.
  272. from ray.data._internal.execution.interfaces import ExecutionOptions
  273. return ExecutionOptions()
  274. def _deduce_default_shuffle_algorithm() -> ShuffleStrategy:
  275. if DEFAULT_USE_PUSH_BASED_SHUFFLE:
  276. logger.warning(
  277. "RAY_DATA_PUSH_BASED_SHUFFLE is deprecated, please use "
  278. "RAY_DATA_DEFAULT_SHUFFLE_STRATEGY to set shuffling strategy"
  279. )
  280. return ShuffleStrategy.SORT_SHUFFLE_PUSH_BASED
  281. else:
  282. vs = [s for s in ShuffleStrategy] # noqa: C416
  283. assert DEFAULT_SHUFFLE_STRATEGY in vs, (
  284. f"RAY_DATA_DEFAULT_SHUFFLE_STRATEGY has to be one of the [{','.join(vs)}] "
  285. f"(got {DEFAULT_SHUFFLE_STRATEGY})"
  286. )
  287. return DEFAULT_SHUFFLE_STRATEGY
  288. def _issue_detectors_config_factory() -> "IssueDetectorsConfiguration":
  289. # Lazily import to avoid circular dependencies.
  290. from ray.data._internal.issue_detection.issue_detector_configuration import (
  291. IssueDetectorsConfiguration,
  292. )
  293. return IssueDetectorsConfiguration()
  294. @DeveloperAPI
  295. @dataclass
  296. class DataContext:
  297. """Global settings for Ray Data.
  298. Configure this class to enable advanced features and tune performance.
  299. .. warning::
  300. Apply changes before creating a :class:`~ray.data.Dataset`. Changes made after
  301. won't take effect.
  302. .. note::
  303. This object is automatically propagated to workers. Access it from the driver
  304. and remote workers with :meth:`DataContext.get_current()`.
  305. Examples:
  306. >>> from ray.data import DataContext
  307. >>> DataContext.get_current().enable_progress_bars = False
  308. Args:
  309. target_max_block_size: The max target block size in bytes for reads and
  310. transformations. If `None`, this means the block size is infinite.
  311. target_min_block_size: Ray Data avoids creating blocks smaller than this
  312. size in bytes on read. This takes precedence over
  313. ``read_op_min_num_blocks``.
  314. streaming_read_buffer_size: Buffer size when doing streaming reads from local or
  315. remote storage.
  316. enable_pandas_block: Whether pandas block format is enabled.
  317. actor_prefetcher_enabled: Whether to use actor based block prefetcher.
  318. autoscaling_config: Autoscaling configuration.
  319. use_push_based_shuffle: Whether to use push-based shuffle.
  320. pipeline_push_based_shuffle_reduce_tasks:
  321. scheduling_strategy: The global scheduling strategy. For tasks with large args,
  322. ``scheduling_strategy_large_args`` takes precedence.
  323. scheduling_strategy_large_args: Scheduling strategy for tasks with large args.
  324. large_args_threshold: Size in bytes after which point task arguments are
  325. considered large. Choose a value so that the data transfer overhead is
  326. significant in comparison to task scheduling (i.e., low tens of ms).
  327. use_polars: Whether to use Polars for tabular dataset sorts, groupbys, and
  328. aggregations.
  329. eager_free: Whether to eagerly free memory.
  330. decoding_size_estimation: Whether to estimate in-memory decoding data size for
  331. data source.
  332. min_parallelism: This setting is deprecated. Use ``read_op_min_num_blocks``
  333. instead.
  334. read_op_min_num_blocks: Minimum number of read output blocks for a dataset.
  335. enable_tensor_extension_casting: Whether to automatically cast NumPy ndarray
  336. columns in Pandas DataFrames to tensor extension columns.
  337. use_arrow_tensor_v2: Config enabling V2 version of ArrowTensorArray supporting
  338. tensors > 2Gb in size (off by default)
  339. enable_fallback_to_arrow_object_ext_type: Enables fallback to serialize column
  340. values not suppported by Arrow natively (like user-defined custom Python
  341. classes for ex, etc) using `ArrowPythonObjectType` (simply serializing
  342. these as bytes)
  343. enable_auto_log_stats: Whether to automatically log stats after execution. If
  344. disabled, you can still manually print stats with ``Dataset.stats()``.
  345. verbose_stats_logs: Whether stats logs should be verbose. This includes fields
  346. such as `extra_metrics` in the stats output, which are excluded by default.
  347. trace_allocations: Whether to trace allocations / eager free. This adds
  348. significant performance overheads and should only be used for debugging.
  349. execution_options: The
  350. :class:`~ray.data._internal.execution.interfaces.execution_options.ExecutionOptions`
  351. to use.
  352. use_ray_tqdm: Whether to enable distributed tqdm.
  353. enable_progress_bars: Whether to enable progress bars.
  354. enable_operator_progress_bars: Whether to enable progress bars for individual
  355. operators during execution.
  356. enable_progress_bar_name_truncation: If True, the name of the progress bar
  357. (often the operator name) will be truncated if it exceeds
  358. `ProgressBar.MAX_NAME_LENGTH`. Otherwise, the full operator name is shown.
  359. enable_rich_progress_bars: Whether to use the new rich progress bars instead
  360. of the tqdm TUI.
  361. enable_get_object_locations_for_metrics: Whether to enable
  362. ``get_object_locations`` for metrics. This is useful for tracking whether
  363. the object input of a task is local (cache hit) or not local (cache miss)
  364. to the node that task is running on.
  365. write_file_retry_on_errors: A list of substrings of error messages that should
  366. trigger a retry when writing files. This is useful for handling transient
  367. errors when writing to remote storage systems.
  368. warn_on_driver_memory_usage_bytes: If driver memory exceeds this threshold,
  369. Ray Data warns you. For now, this only applies to shuffle ops because most
  370. other ops are unlikely to use as much driver memory.
  371. actor_task_retry_on_errors: The application-level errors that actor task should
  372. retry. This follows same format as :ref:`retry_exceptions <task-retries>` in
  373. Ray Core. Default to `False` to not retry on any errors. Set to `True` to
  374. retry all errors, or set to a list of errors to retry.
  375. actor_init_retry_on_errors: Whether to retry when actor initialization fails.
  376. Default to `False` to not retry on any errors. Set to `True` to retry
  377. all errors.
  378. actor_init_max_retries: Maximum number of consecutive retries for actor
  379. initialization failures. The counter resets when an actor successfully
  380. initializes. Default is 3. Set to -1 for infinite retries.
  381. op_resource_reservation_enabled: Whether to enable resource reservation for
  382. operators to prevent resource contention.
  383. op_resource_reservation_ratio: The ratio of the total resources to reserve for
  384. each operator.
  385. max_errored_blocks: Max number of blocks that are allowed to have errors,
  386. unlimited if negative. This option allows application-level exceptions in
  387. block processing tasks. These exceptions may be caused by UDFs (e.g., due to
  388. corrupted data samples) or IO errors. Data in the failed blocks are dropped.
  389. This option can be useful to prevent a long-running job from failing due to
  390. a small number of bad blocks.
  391. log_internal_stack_trace_to_stdout: Whether to include internal Ray Data/Ray
  392. Core code stack frames when logging to stdout. The full stack trace is
  393. always written to the Ray Data log file.
  394. raise_original_map_exception: Whether to raise the original exception
  395. encountered in map UDF instead of wrapping it in a `UserCodeException`.
  396. print_on_execution_start: If ``True``, print execution information when
  397. execution starts.
  398. s3_try_create_dir: If ``True``, try to create directories on S3 when a write
  399. call is made with a S3 URI.
  400. wait_for_min_actors_s: The default time to wait for minimum requested
  401. actors to start before raising a timeout, in seconds.
  402. max_tasks_in_flight_per_actor: Max number of tasks that could be submitted
  403. for execution to individual actor at the same time. Note that only up to
  404. `max_concurrency` number of these tasks will be executing concurrently
  405. while remaining ones will be waiting in the Actor's queue. Buffering
  406. tasks in the queue allows us to overlap pulling of the blocks (which are
  407. tasks arguments) with the execution of the prior tasks maximizing
  408. individual Actor's utilization
  409. retried_io_errors: A list of substrings of error messages that should
  410. trigger a retry when reading or writing files. This is useful for handling
  411. transient errors when reading from remote storage systems.
  412. iceberg_config: Configuration for Iceberg datasource operations including
  413. retry settings for file writes and catalog operations. See
  414. :class:`IcebergConfig` for details.
  415. default_hash_shuffle_parallelism: Default parallelism level for hash-based
  416. shuffle operations if the number of partitions is unspecifed.
  417. max_hash_shuffle_aggregators: Maximum number of aggregating actors that can be
  418. provisioned for hash-shuffle aggregations.
  419. min_hash_shuffle_aggregator_wait_time_in_s: Minimum time to wait for hash
  420. shuffle aggregators to become available, in seconds.
  421. hash_shuffle_aggregator_health_warning_interval_s: Interval for health warning
  422. checks on hash shuffle aggregators, in seconds.
  423. max_hash_shuffle_finalization_batch_size: Maximum batch size for concurrent
  424. hash-shuffle finalization tasks. If `None`, defaults to
  425. `max_hash_shuffle_aggregators`.
  426. join_operator_actor_num_cpus_per_partition_override: Override CPU allocation
  427. per partition for join operator actors.
  428. hash_shuffle_operator_actor_num_cpus_per_partition_override: Override CPU
  429. allocation per partition for hash shuffle operator actors.
  430. hash_aggregate_operator_actor_num_cpus_per_partition_override: Override CPU
  431. allocation per partition for hash aggregate operator actors.
  432. use_polars_sort: Whether to use Polars for tabular dataset sorting operations.
  433. enable_per_node_metrics: Enable per node metrics reporting for Ray Data,
  434. disabled by default.
  435. override_object_store_memory_limit_fraction: Override the fraction of object
  436. store memory limit. If `None`, uses Ray's default.
  437. memory_usage_poll_interval_s: The interval to poll the USS of map tasks. If `None`,
  438. map tasks won't record memory stats.
  439. dataset_logger_id: Optional logger ID for dataset operations. If `None`, uses
  440. default logging configuration.
  441. issue_detectors_config: Configuration for issue detection and monitoring during
  442. dataset operations.
  443. downstream_capacity_backpressure_ratio: Ratio for downstream capacity
  444. backpressure control. A higher ratio causes backpressure to kick-in
  445. later. If `None`, this backpressure policy is disabled.
  446. enable_dynamic_output_queue_size_backpressure: Whether to cap the concurrency
  447. of an operator based on its and downstream operators' queue size.
  448. enforce_schemas: Whether to enforce schema consistency across dataset operations.
  449. pandas_block_ignore_metadata: Whether to ignore pandas metadata when converting
  450. between Arrow and pandas formats for better type inference.
  451. """
  452. # `None` means the block size is infinite.
  453. target_max_block_size: Optional[int] = DEFAULT_TARGET_MAX_BLOCK_SIZE
  454. target_min_block_size: int = DEFAULT_TARGET_MIN_BLOCK_SIZE
  455. streaming_read_buffer_size: int = DEFAULT_STREAMING_READ_BUFFER_SIZE
  456. enable_pandas_block: bool = DEFAULT_ENABLE_PANDAS_BLOCK
  457. actor_prefetcher_enabled: bool = DEFAULT_ACTOR_PREFETCHER_ENABLED
  458. autoscaling_config: AutoscalingConfig = field(default_factory=AutoscalingConfig)
  459. ################################################################
  460. # Sort-based shuffling configuration
  461. ################################################################
  462. use_push_based_shuffle: bool = DEFAULT_USE_PUSH_BASED_SHUFFLE
  463. _shuffle_strategy: ShuffleStrategy = _deduce_default_shuffle_algorithm()
  464. pipeline_push_based_shuffle_reduce_tasks: bool = True
  465. ################################################################
  466. # Hash-based shuffling configuration
  467. ################################################################
  468. # Default hash-shuffle parallelism level (will be used when not
  469. # provided explicitly)
  470. default_hash_shuffle_parallelism: int = DEFAULT_MIN_PARALLELISM
  471. # Max number of aggregators (actors) that could be provisioned
  472. # to perform aggregations on partitions produced during hash-shuffling
  473. #
  474. # When unset defaults to the smaller of
  475. # - Total # of CPUs available in the cluster * 2
  476. # - DEFAULT_MAX_HASH_SHUFFLE_AGGREGATORS (128 by default)
  477. max_hash_shuffle_aggregators: Optional[int] = None
  478. min_hash_shuffle_aggregator_wait_time_in_s: int = (
  479. DEFAULT_MIN_HASH_SHUFFLE_AGGREGATOR_WAIT_TIME_IN_S
  480. )
  481. hash_shuffle_aggregator_health_warning_interval_s: int = (
  482. DEFAULT_HASH_SHUFFLE_AGGREGATOR_HEALTH_WARNING_INTERVAL_S
  483. )
  484. # Max number of *concurrent* hash-shuffle finalization tasks running
  485. # at the same time. This config is helpful to control concurrency of
  486. # finalization tasks to prevent single aggregator running multiple tasks
  487. # concurrently (for ex, to prevent it failing w/ OOM)
  488. #
  489. # When unset defaults to `DataContext.max_hash_shuffle_aggregators`
  490. max_hash_shuffle_finalization_batch_size: Optional[int] = None
  491. # (Advanced) Following configuration allows to override `num_cpus` allocation for the
  492. # Join/Aggregate/Shuffle workers (utilizing hash-shuffle)
  493. join_operator_actor_num_cpus_override: float = None
  494. hash_shuffle_operator_actor_num_cpus_override: float = None
  495. hash_aggregate_operator_actor_num_cpus_override: float = None
  496. scheduling_strategy: SchedulingStrategyT = DEFAULT_SCHEDULING_STRATEGY
  497. scheduling_strategy_large_args: SchedulingStrategyT = (
  498. DEFAULT_SCHEDULING_STRATEGY_LARGE_ARGS
  499. )
  500. large_args_threshold: int = DEFAULT_LARGE_ARGS_THRESHOLD
  501. use_polars: bool = DEFAULT_USE_POLARS
  502. use_polars_sort: bool = DEFAULT_USE_POLARS_SORT
  503. eager_free: bool = DEFAULT_EAGER_FREE
  504. decoding_size_estimation: bool = DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED
  505. min_parallelism: int = DEFAULT_MIN_PARALLELISM
  506. read_op_min_num_blocks: int = DEFAULT_READ_OP_MIN_NUM_BLOCKS
  507. enable_tensor_extension_casting: bool = DEFAULT_ENABLE_TENSOR_EXTENSION_CASTING
  508. use_arrow_tensor_v2: bool = DEFAULT_USE_ARROW_TENSOR_V2
  509. enable_fallback_to_arrow_object_ext_type: Optional[bool] = None
  510. enable_auto_log_stats: bool = DEFAULT_AUTO_LOG_STATS
  511. verbose_stats_logs: bool = DEFAULT_VERBOSE_STATS_LOG
  512. trace_allocations: bool = DEFAULT_TRACE_ALLOCATIONS
  513. execution_options: "ExecutionOptions" = field(
  514. default_factory=_execution_options_factory
  515. )
  516. use_ray_tqdm: bool = DEFAULT_USE_RAY_TQDM
  517. enable_progress_bars: bool = DEFAULT_ENABLE_PROGRESS_BARS
  518. # By default, enable the progress bar for operator-level progress.
  519. enable_operator_progress_bars: bool = True
  520. enable_progress_bar_name_truncation: bool = (
  521. DEFAULT_ENABLE_PROGRESS_BAR_NAME_TRUNCATION
  522. )
  523. enable_rich_progress_bars: bool = DEFAULT_ENABLE_RICH_PROGRESS_BARS
  524. enable_get_object_locations_for_metrics: bool = (
  525. DEFAULT_ENABLE_GET_OBJECT_LOCATIONS_FOR_METRICS
  526. )
  527. write_file_retry_on_errors: List[str] = DEFAULT_WRITE_FILE_RETRY_ON_ERRORS
  528. warn_on_driver_memory_usage_bytes: int = DEFAULT_WARN_ON_DRIVER_MEMORY_USAGE_BYTES
  529. actor_task_retry_on_errors: Union[
  530. bool, List[BaseException]
  531. ] = DEFAULT_ACTOR_TASK_RETRY_ON_ERRORS
  532. actor_init_retry_on_errors: bool = DEFAULT_ACTOR_INIT_RETRY_ON_ERRORS
  533. actor_init_max_retries: int = DEFAULT_ACTOR_INIT_MAX_RETRIES
  534. op_resource_reservation_enabled: bool = DEFAULT_ENABLE_OP_RESOURCE_RESERVATION
  535. op_resource_reservation_ratio: float = DEFAULT_OP_RESOURCE_RESERVATION_RATIO
  536. max_errored_blocks: int = DEFAULT_MAX_ERRORED_BLOCKS
  537. log_internal_stack_trace_to_stdout: bool = (
  538. DEFAULT_LOG_INTERNAL_STACK_TRACE_TO_STDOUT
  539. )
  540. raise_original_map_exception: bool = DEFAULT_RAY_DATA_RAISE_ORIGINAL_MAP_EXCEPTION
  541. print_on_execution_start: bool = True
  542. s3_try_create_dir: bool = DEFAULT_S3_TRY_CREATE_DIR
  543. # Timeout threshold (in seconds) for how long it should take for actors in the
  544. # Actor Pool to start up. Exceeding this threshold will lead to execution being
  545. # terminated with exception due to inability to secure min required capacity.
  546. #
  547. # Setting non-positive value here (ie <= 0) disables this functionality
  548. # (defaults to -1).
  549. wait_for_min_actors_s: int = DEFAULT_WAIT_FOR_MIN_ACTORS_S
  550. # This setting serves as a global override
  551. max_tasks_in_flight_per_actor: Optional[int] = None
  552. retried_io_errors: List[str] = field(
  553. default_factory=lambda: list(DEFAULT_RETRIED_IO_ERRORS)
  554. )
  555. iceberg_config: IcebergConfig = field(default_factory=IcebergConfig)
  556. enable_per_node_metrics: bool = DEFAULT_ENABLE_PER_NODE_METRICS
  557. override_object_store_memory_limit_fraction: float = None
  558. memory_usage_poll_interval_s: Optional[float] = 1
  559. dataset_logger_id: Optional[str] = None
  560. # This is a temporary workaround to allow actors to perform cleanup
  561. # until https://github.com/ray-project/ray/issues/53169 is fixed.
  562. # This hook is known to have a race condition bug in fault tolerance.
  563. # I.E., after the hook is triggered and the UDF is deleted, another
  564. # retry task may still be scheduled to this actor and it will fail.
  565. _enable_actor_pool_on_exit_hook: bool = False
  566. issue_detectors_config: "IssueDetectorsConfiguration" = field(
  567. default_factory=_issue_detectors_config_factory
  568. )
  569. downstream_capacity_backpressure_ratio: Optional[
  570. float
  571. ] = DEFAULT_DOWNSTREAM_CAPACITY_BACKPRESSURE_RATIO
  572. enable_dynamic_output_queue_size_backpressure: bool = (
  573. DEFAULT_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE
  574. )
  575. enforce_schemas: bool = DEFAULT_ENFORCE_SCHEMAS
  576. pandas_block_ignore_metadata: bool = DEFAULT_PANDAS_BLOCK_IGNORE_METADATA
  577. _checkpoint_config: Optional[CheckpointConfig] = None
  578. def __post_init__(self):
  579. # The additonal ray remote args that should be added to
  580. # the task-pool-based data tasks.
  581. self._task_pool_data_task_remote_args: Dict[str, Any] = {}
  582. # The extra key-value style configs.
  583. # These configs are managed by individual components or plugins via
  584. # `set_config`, `get_config` and `remove_config`.
  585. # The reason why we use a dict instead of individual fields is to decouple
  586. # the DataContext from the plugin implementations, as well as to avoid
  587. # circular dependencies.
  588. self._kv_configs: Dict[str, Any] = {}
  589. # Sync hash shuffle aggregator fields to its detector config
  590. self.issue_detectors_config.hash_shuffle_detector_config.detection_time_interval_s = (
  591. self.hash_shuffle_aggregator_health_warning_interval_s
  592. )
  593. self.issue_detectors_config.hash_shuffle_detector_config.min_wait_time_s = (
  594. self.min_hash_shuffle_aggregator_wait_time_in_s
  595. )
  596. self._max_num_blocks_in_streaming_gen_buffer = (
  597. DEFAULT_MAX_NUM_BLOCKS_IN_STREAMING_GEN_BUFFER
  598. )
  599. # Unique id of the current execution of the data pipeline.
  600. # This value increments only upon re-execution of the exact same pipeline.
  601. self._execution_idx = 0
  602. def __setattr__(self, name: str, value: Any) -> None:
  603. if (
  604. name == "write_file_retry_on_errors"
  605. and value != DEFAULT_WRITE_FILE_RETRY_ON_ERRORS
  606. ):
  607. warnings.warn(
  608. "`write_file_retry_on_errors` is deprecated! Configure "
  609. "`retried_io_errors` instead.",
  610. DeprecationWarning,
  611. )
  612. elif name == "use_push_based_shuffle":
  613. warnings.warn(
  614. "`use_push_based_shuffle` is deprecated! Configure "
  615. "`shuffle_strategy` instead.",
  616. DeprecationWarning,
  617. )
  618. elif name == "target_shuffle_max_block_size":
  619. warnings.warn(
  620. "`target_shuffle_max_block_size` is deprecated! Configure `target_max_block_size` instead."
  621. )
  622. self.target_max_block_size = value
  623. elif name == "use_polars":
  624. warnings.warn(
  625. "`use_polars` is deprecated, please configure "
  626. "`use_polars_sort` instead.",
  627. DeprecationWarning,
  628. )
  629. self.use_polars_sort = value
  630. super().__setattr__(name, value)
  631. @staticmethod
  632. def get_current() -> "DataContext":
  633. """Get or create the current DataContext.
  634. When a Dataset is created, the current DataContext will be sealed.
  635. Changes to `DataContext.get_current()` will not impact existing Datasets.
  636. Examples:
  637. .. testcode::
  638. import ray
  639. context = ray.data.DataContext.get_current()
  640. context.target_max_block_size = 100 * 1024 ** 2
  641. ds1 = ray.data.range(1)
  642. context.target_max_block_size = 1 * 1024 ** 2
  643. ds2 = ray.data.range(1)
  644. # ds1's target_max_block_size will be 100MB
  645. ds1.take_all()
  646. # ds2's target_max_block_size will be 1MB
  647. ds2.take_all()
  648. Developer notes: Avoid using `DataContext.get_current()` in data
  649. internal components, use the DataContext object captured in the
  650. Dataset and pass it around as arguments.
  651. """
  652. global _default_context
  653. with _context_lock:
  654. if _default_context is None:
  655. _default_context = DataContext()
  656. return _default_context
  657. @staticmethod
  658. def _set_current(context: "DataContext") -> None:
  659. """Set the current context in a remote worker.
  660. This is used internally by Dataset to propagate the driver context to
  661. remote workers used for parallelization.
  662. """
  663. global _default_context
  664. if (
  665. not _default_context
  666. or _default_context.dataset_logger_id != context.dataset_logger_id
  667. ):
  668. update_dataset_logger_for_worker(context.dataset_logger_id)
  669. _default_context = context
  670. @property
  671. def shuffle_strategy(self) -> ShuffleStrategy:
  672. if self.use_push_based_shuffle:
  673. logger.warning(
  674. "`use_push_based_shuffle` is deprecated, please configure "
  675. "`shuffle_strategy` instead.",
  676. )
  677. return ShuffleStrategy.SORT_SHUFFLE_PUSH_BASED
  678. return self._shuffle_strategy
  679. @shuffle_strategy.setter
  680. def shuffle_strategy(self, value: ShuffleStrategy) -> None:
  681. self._shuffle_strategy = value
  682. def get_config(self, key: str, default: Any = None) -> Any:
  683. """Get the value for a key-value style config.
  684. Args:
  685. key: The key of the config.
  686. default: The default value to return if the key is not found.
  687. Returns: The value for the key, or the default value if the key is not found.
  688. """
  689. return self._kv_configs.get(key, default)
  690. def set_config(self, key: str, value: Any) -> None:
  691. """Set the value for a key-value style config.
  692. Args:
  693. key: The key of the config.
  694. value: The value of the config.
  695. """
  696. self._kv_configs[key] = value
  697. def remove_config(self, key: str) -> None:
  698. """Remove a key-value style config.
  699. Args:
  700. key: The key of the config.
  701. """
  702. self._kv_configs.pop(key, None)
  703. def copy(self) -> "DataContext":
  704. """Create a copy of the current DataContext."""
  705. return copy.deepcopy(self)
  706. def set_dataset_logger_id(self, dataset_id: str) -> None:
  707. """Set the current dataset logger id.
  708. This is used internally to propagate the current dataset logger id to remote
  709. workers.
  710. """
  711. self.dataset_logger_id = dataset_id
  712. @property
  713. def checkpoint_config(self) -> Optional[CheckpointConfig]:
  714. """Get the checkpoint configuration."""
  715. return self._checkpoint_config
  716. @checkpoint_config.setter
  717. def checkpoint_config(
  718. self, value: Optional[Union[CheckpointConfig, Dict[str, Any]]]
  719. ) -> None:
  720. """Set the checkpoint configuration."""
  721. if value is None:
  722. self._checkpoint_config = None
  723. elif isinstance(value, dict):
  724. if "override_backend" in value:
  725. if not isinstance(value["override_backend"], str):
  726. raise TypeError(
  727. "Expected 'override_backend' to be a string,"
  728. f" but got {type(value['override_backend'])}."
  729. )
  730. value["override_backend"] = CheckpointBackend[value["override_backend"]]
  731. self._checkpoint_config = CheckpointConfig(**value)
  732. elif isinstance(value, CheckpointConfig):
  733. self._checkpoint_config = value
  734. else:
  735. raise TypeError(
  736. "checkpoint_config must be a CheckpointConfig instance, a dict, or None."
  737. )
  738. # Backwards compatibility alias.
  739. DatasetContext = DataContext