plan.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. import copy
  2. import itertools
  3. import logging
  4. from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple, Type, Union
  5. import pyarrow
  6. import ray
  7. from ray._private.internal_api import get_memory_info_reply, get_state_from_address
  8. from ray.data._internal.execution.interfaces import RefBundle
  9. from ray.data._internal.logical.interfaces import SourceOperator
  10. from ray.data._internal.logical.interfaces.logical_operator import LogicalOperator
  11. from ray.data._internal.logical.interfaces.logical_plan import LogicalPlan
  12. from ray.data._internal.logical.interfaces.operator import Operator
  13. from ray.data._internal.logical.operators import Read
  14. from ray.data._internal.logical.optimizers import get_plan_conversion_fns
  15. from ray.data._internal.stats import DatasetStats
  16. from ray.data.block import BlockMetadataWithSchema, _take_first_non_empty_schema
  17. from ray.data.context import DataContext
  18. from ray.data.exceptions import omit_traceback_stdout
  19. from ray.util.debug import log_once
  20. if TYPE_CHECKING:
  21. from ray.data._internal.execution.streaming_executor import (
  22. StreamingExecutor,
  23. )
  24. from ray.data.dataset import Dataset
  25. # Scheduling strategy can be inherited from prev operator if not specified.
  26. INHERITABLE_REMOTE_ARGS = ["scheduling_strategy"]
  27. logger = logging.getLogger(__name__)
  28. class ExecutionPlan:
  29. """A lazy execution plan for a Dataset.
  30. This lazy execution plan builds up a chain of ``List[RefBundle]`` -->
  31. ``List[RefBundle]`` operators. Prior to execution, we apply a set of logical
  32. plan optimizations, such as operator fusion, in order to reduce Ray task
  33. overhead and data copies.
  34. Internally, the execution plan holds a snapshot of a computed list of
  35. blocks and their associated metadata under ``self._snapshot_bundle``,
  36. where this snapshot is the cached output of executing the operator chain."""
  37. def __init__(
  38. self,
  39. stats: DatasetStats,
  40. data_context: DataContext,
  41. ):
  42. """Create a plan with no transformation operators.
  43. Args:
  44. stats: Stats for the base blocks.
  45. data_context: :class:`~ray.data.context.DataContext`
  46. object to use for execution.
  47. """
  48. self._in_stats = stats
  49. # A computed snapshot of some prefix of operators and their corresponding
  50. # output blocks and stats.
  51. self._snapshot_operator: Optional[LogicalOperator] = None
  52. self._snapshot_stats = None
  53. self._snapshot_bundle = None
  54. # Snapshot of only metadata corresponding to the final operator's
  55. # output bundles, used as the source of truth for the Dataset's schema
  56. # and count. This is calculated and cached when the plan is executed as an
  57. # iterator (`execute_to_iterator()`), and avoids caching
  58. # all of the output blocks in memory like in `self.snapshot_bundle`.
  59. # TODO(scottjlee): To keep the caching logic consistent, update `execute()`
  60. # to also store the metadata in `_snapshot_metadata` instead of
  61. # `_snapshot_bundle`. For example, we could store the blocks in
  62. # `self._snapshot_blocks` and the metadata in `self._snapshot_metadata`.
  63. self._snapshot_metadata_schema: Optional["BlockMetadataWithSchema"] = None
  64. # Cached schema.
  65. self._schema = None
  66. # Set when a Dataset is constructed with this plan
  67. self._dataset_uuid = None
  68. # Index of the current execution.
  69. self._run_index = -1
  70. self._dataset_name = None
  71. self._has_started_execution = False
  72. self._context = data_context
  73. def get_dataset_id(self) -> str:
  74. """Unique ID of the dataset, including the dataset name,
  75. UUID, and current execution index.
  76. """
  77. return (
  78. f"{self._dataset_name or 'dataset'}_{self._dataset_uuid}_{self._run_index}"
  79. )
  80. def create_executor(self) -> "StreamingExecutor":
  81. """Create an executor for this plan."""
  82. from ray.data._internal.execution.streaming_executor import StreamingExecutor
  83. self._run_index += 1
  84. executor = StreamingExecutor(self._context, self.get_dataset_id())
  85. return executor
  86. def __repr__(self) -> str:
  87. return (
  88. f"ExecutionPlan("
  89. f"dataset_uuid={self._dataset_uuid}, "
  90. f"snapshot_operator={self._snapshot_operator}"
  91. f")"
  92. )
  93. def explain(self) -> str:
  94. """Return a string representation of the logical and physical plan."""
  95. convert_fns = [lambda x: x] + get_plan_conversion_fns()
  96. titles: List[str] = [
  97. "Logical Plan",
  98. "Logical Plan (Optimized)",
  99. "Physical Plan",
  100. "Physical Plan (Optimized)",
  101. ]
  102. # 1. Set initial plan
  103. plan = self._logical_plan
  104. sections = []
  105. for title, convert_fn in zip(titles, convert_fns):
  106. # 2. Convert plan to new plan
  107. plan = convert_fn(plan)
  108. # 3. Generate plan str from new plan.
  109. plan_str, _ = self.generate_plan_string(plan.dag, show_op_repr=True)
  110. banner = f"\n-------- {title} --------\n"
  111. section = f"{banner}{plan_str}"
  112. sections.append(section)
  113. return "".join(sections)
  114. @staticmethod
  115. def generate_plan_string(
  116. op: Operator,
  117. curr_str: str = "",
  118. depth: int = 0,
  119. including_source: bool = True,
  120. show_op_repr: bool = False,
  121. ):
  122. """Traverse (DFS) the Plan DAG and
  123. return a string representation of the operators."""
  124. if not including_source and isinstance(op, SourceOperator):
  125. return curr_str, depth
  126. curr_max_depth = depth
  127. # For logical plan, only show the operator name like "Aggregate".
  128. # But for physical plan, show the operator class name as well like "AllToAllOperator[Aggregate]".
  129. op_str = repr(op) if show_op_repr else op.name
  130. if depth == 0:
  131. curr_str += f"{op_str}\n"
  132. else:
  133. trailing_space = " " * ((depth - 1) * 3)
  134. curr_str += f"{trailing_space}+- {op_str}\n"
  135. for input in op.input_dependencies:
  136. curr_str, input_max_depth = ExecutionPlan.generate_plan_string(
  137. input, curr_str, depth + 1, including_source, show_op_repr
  138. )
  139. curr_max_depth = max(curr_max_depth, input_max_depth)
  140. return curr_str, curr_max_depth
  141. def get_plan_as_string(self, dataset_cls: Type["Dataset"]) -> str:
  142. """Create a cosmetic string representation of this execution plan.
  143. Returns:
  144. The string representation of this execution plan.
  145. """
  146. # NOTE: this is used for Dataset.__repr__ to give a user-facing string
  147. # representation. Ideally ExecutionPlan.__repr__ should be replaced with this
  148. # method as well.
  149. from ray.data.dataset import MaterializedDataset
  150. # Do not force execution for schema, as this method is expected to be very
  151. # cheap.
  152. plan_str = ""
  153. plan_max_depth = 0
  154. if not self.has_computed_output():
  155. # using dataset as source here, so don't generate source operator in generate_plan_string
  156. plan_str, plan_max_depth = self.generate_plan_string(
  157. self._logical_plan.dag, including_source=False
  158. )
  159. if self._snapshot_bundle is not None:
  160. # This plan has executed some but not all operators.
  161. schema = self._snapshot_bundle.schema
  162. count = self._snapshot_bundle.num_rows()
  163. elif self._snapshot_metadata_schema is not None:
  164. schema = self._snapshot_metadata_schema.schema
  165. count = self._snapshot_metadata_schema.metadata.num_rows
  166. else:
  167. # This plan hasn't executed any operators.
  168. has_n_ary_operator = False
  169. dag = self._logical_plan.dag
  170. while not isinstance(dag, SourceOperator):
  171. if len(dag.input_dependencies) > 1:
  172. has_n_ary_operator = True
  173. break
  174. dag = dag.input_dependencies[0]
  175. # TODO(@bveeramani): Handle schemas for n-ary operators like `Union`.
  176. if has_n_ary_operator:
  177. schema = None
  178. count = None
  179. else:
  180. assert isinstance(dag, SourceOperator), dag
  181. plan = ExecutionPlan(
  182. DatasetStats(metadata={}, parent=None),
  183. self._context,
  184. )
  185. plan.link_logical_plan(LogicalPlan(dag, plan._context))
  186. schema = plan.schema()
  187. count = plan.meta_count()
  188. else:
  189. # Get schema of output blocks.
  190. schema = self.schema(fetch_if_missing=False)
  191. count = self._snapshot_bundle.num_rows()
  192. if schema is None:
  193. schema_str = "Unknown schema"
  194. elif isinstance(schema, type):
  195. schema_str = str(schema)
  196. else:
  197. schema_str = []
  198. for n, t in zip(schema.names, schema.types):
  199. if hasattr(t, "__name__"):
  200. t = t.__name__
  201. schema_str.append(f"{n}: {t}")
  202. schema_str = ", ".join(schema_str)
  203. schema_str = "{" + schema_str + "}"
  204. if count is None:
  205. count = "?"
  206. num_blocks = None
  207. if dataset_cls == MaterializedDataset:
  208. num_blocks = self.initial_num_blocks()
  209. assert num_blocks is not None
  210. name_str = (
  211. "name={}, ".format(self._dataset_name)
  212. if self._dataset_name is not None
  213. else ""
  214. )
  215. num_blocks_str = f"num_blocks={num_blocks}, " if num_blocks else ""
  216. dataset_str = "{}({}{}num_rows={}, schema={})".format(
  217. dataset_cls.__name__,
  218. name_str,
  219. num_blocks_str,
  220. count,
  221. schema_str,
  222. )
  223. # If the resulting string representation fits in one line, use it directly.
  224. SCHEMA_LINE_CHAR_LIMIT = 80
  225. MIN_FIELD_LENGTH = 10
  226. INDENT_STR = " " * 3
  227. trailing_space = INDENT_STR * plan_max_depth
  228. if len(dataset_str) > SCHEMA_LINE_CHAR_LIMIT:
  229. # If the resulting string representation exceeds the line char limit,
  230. # first try breaking up each `Dataset` parameter into its own line
  231. # and check if each line fits within the line limit. We check the
  232. # `schema` param's length, since this is likely the longest string.
  233. schema_str_on_new_line = f"{trailing_space}{INDENT_STR}schema={schema_str}"
  234. if len(schema_str_on_new_line) > SCHEMA_LINE_CHAR_LIMIT:
  235. # If the schema cannot fit on a single line, break up each field
  236. # into its own line.
  237. schema_str = []
  238. for n, t in zip(schema.names, schema.types):
  239. if hasattr(t, "__name__"):
  240. t = t.__name__
  241. col_str = f"{trailing_space}{INDENT_STR * 2}{n}: {t}"
  242. # If the field line exceeds the char limit, abbreviate
  243. # the field name to fit while maintaining the full type
  244. if len(col_str) > SCHEMA_LINE_CHAR_LIMIT:
  245. shortened_suffix = f"...: {str(t)}"
  246. # Show at least 10 characters of the field name, even if
  247. # we have already hit the line limit with the type.
  248. chars_left_for_col_name = max(
  249. SCHEMA_LINE_CHAR_LIMIT - len(shortened_suffix),
  250. MIN_FIELD_LENGTH,
  251. )
  252. col_str = (
  253. f"{col_str[:chars_left_for_col_name]}{shortened_suffix}"
  254. )
  255. schema_str.append(col_str)
  256. schema_str = ",\n".join(schema_str)
  257. schema_str = (
  258. "{\n" + schema_str + f"\n{trailing_space}{INDENT_STR}" + "}"
  259. )
  260. name_str = (
  261. f"\n{trailing_space}{INDENT_STR}name={self._dataset_name},"
  262. if self._dataset_name is not None
  263. else ""
  264. )
  265. num_blocks_str = (
  266. f"\n{trailing_space}{INDENT_STR}num_blocks={num_blocks},"
  267. if num_blocks
  268. else ""
  269. )
  270. dataset_str = (
  271. f"{dataset_cls.__name__}("
  272. f"{name_str}"
  273. f"{num_blocks_str}"
  274. f"\n{trailing_space}{INDENT_STR}num_rows={count},"
  275. f"\n{trailing_space}{INDENT_STR}schema={schema_str}"
  276. f"\n{trailing_space})"
  277. )
  278. if plan_max_depth == 0:
  279. plan_str += dataset_str
  280. else:
  281. plan_str += f"{INDENT_STR * (plan_max_depth - 1)}+- {dataset_str}"
  282. return plan_str
  283. def link_logical_plan(self, logical_plan: "LogicalPlan"):
  284. """Link the logical plan into this execution plan.
  285. This is used for triggering execution for optimizer code path in this legacy
  286. execution plan.
  287. """
  288. self._logical_plan = logical_plan
  289. self._logical_plan._context = self._context
  290. def copy(self) -> "ExecutionPlan":
  291. """Create a shallow copy of this execution plan.
  292. This copy can be executed without mutating the original, but clearing the copy
  293. will also clear the original.
  294. Returns:
  295. A shallow copy of this execution plan.
  296. """
  297. plan_copy = ExecutionPlan(
  298. self._in_stats,
  299. data_context=self._context,
  300. )
  301. if self._snapshot_bundle is not None:
  302. # Copy over the existing snapshot.
  303. plan_copy._snapshot_bundle = self._snapshot_bundle
  304. plan_copy._snapshot_operator = self._snapshot_operator
  305. plan_copy._snapshot_stats = self._snapshot_stats
  306. plan_copy._dataset_name = self._dataset_name
  307. return plan_copy
  308. def deep_copy(self) -> "ExecutionPlan":
  309. """Create a deep copy of this execution plan.
  310. This copy can be executed AND cleared without mutating the original.
  311. Returns:
  312. A deep copy of this execution plan.
  313. """
  314. plan_copy = ExecutionPlan(
  315. copy.copy(self._in_stats),
  316. data_context=self._context.copy(),
  317. )
  318. if self._snapshot_bundle:
  319. # Copy over the existing snapshot.
  320. plan_copy._snapshot_bundle = copy.copy(self._snapshot_bundle)
  321. plan_copy._snapshot_operator = copy.copy(self._snapshot_operator)
  322. plan_copy._snapshot_stats = copy.copy(self._snapshot_stats)
  323. plan_copy._dataset_name = self._dataset_name
  324. return plan_copy
  325. def initial_num_blocks(self) -> Optional[int]:
  326. """Get the estimated number of blocks from the logical plan
  327. after applying execution plan optimizations, but prior to
  328. fully executing the dataset."""
  329. return self._logical_plan.dag.estimated_num_outputs()
  330. def schema(
  331. self, fetch_if_missing: bool = False
  332. ) -> Union[type, "pyarrow.lib.Schema"]:
  333. """Get the schema after applying all execution plan optimizations,
  334. but prior to fully executing the dataset
  335. (unless `fetch_if_missing` is set to True).
  336. Args:
  337. fetch_if_missing: Whether to execute the plan to fetch the schema.
  338. Returns:
  339. The schema of the output dataset.
  340. """
  341. if self._schema is not None:
  342. return self._schema
  343. schema = None
  344. if self.has_computed_output():
  345. schema = self._snapshot_bundle.schema
  346. else:
  347. schema = self._logical_plan.dag.infer_schema()
  348. if schema is None and fetch_if_missing:
  349. # For consistency with the previous implementation, we fetch the schema if
  350. # the plan is read-only even if `fetch_if_missing` is False.
  351. iter_ref_bundles, _, executor = self.execute_to_iterator()
  352. # Make sure executor is fully shutdown upon exiting
  353. with executor:
  354. schema = _take_first_non_empty_schema(
  355. bundle.schema for bundle in iter_ref_bundles
  356. )
  357. self.cache_schema(schema)
  358. return self._schema
  359. def cache_schema(self, schema: Union[type, "pyarrow.lib.Schema"]):
  360. self._schema = schema
  361. def input_files(self) -> Optional[List[str]]:
  362. """Get the input files of the dataset, if available."""
  363. return self._logical_plan.dag.infer_metadata().input_files
  364. def meta_count(self) -> Optional[int]:
  365. """Get the number of rows after applying all plan optimizations, if possible.
  366. This method will never trigger any computation.
  367. Returns:
  368. The number of records of the result Dataset, or None.
  369. """
  370. dag = self._logical_plan.dag
  371. if self.has_computed_output():
  372. num_rows = sum(m.num_rows for m in self._snapshot_bundle.metadata)
  373. elif dag.infer_metadata().num_rows is not None:
  374. num_rows = dag.infer_metadata().num_rows
  375. else:
  376. num_rows = None
  377. return num_rows
  378. @omit_traceback_stdout
  379. def execute_to_iterator(
  380. self,
  381. ) -> Tuple[Iterator[RefBundle], DatasetStats, Optional["StreamingExecutor"]]:
  382. """Execute this plan, returning an iterator.
  383. This will use streaming execution to generate outputs.
  384. NOTE: Executor will be shutdown upon either of the 2 following conditions:
  385. - Iterator is fully exhausted (ie until StopIteration is raised)
  386. - Executor instances is garbage-collected
  387. Returns:
  388. Tuple of iterator over output RefBundles, DatasetStats, and the executor.
  389. """
  390. self._has_started_execution = True
  391. if self.has_computed_output():
  392. bundle = self.execute()
  393. return iter([bundle]), self._snapshot_stats, None
  394. from ray.data._internal.execution.legacy_compat import (
  395. execute_to_legacy_bundle_iterator,
  396. )
  397. executor = self.create_executor()
  398. bundle_iter = execute_to_legacy_bundle_iterator(executor, self)
  399. # Since the generator doesn't run any code until we try to fetch the first
  400. # value, force execution of one bundle before we call get_stats().
  401. gen = iter(bundle_iter)
  402. try:
  403. bundle_iter = itertools.chain([next(gen)], gen)
  404. except StopIteration:
  405. pass
  406. self._snapshot_stats = executor.get_stats()
  407. return bundle_iter, self._snapshot_stats, executor
  408. @omit_traceback_stdout
  409. def execute(
  410. self,
  411. preserve_order: bool = False,
  412. ) -> RefBundle:
  413. """Executes this plan (eagerly).
  414. Args:
  415. preserve_order: Whether to preserve order in execution.
  416. Returns:
  417. The blocks of the output dataset.
  418. """
  419. self._has_started_execution = True
  420. # Always used the saved context for execution.
  421. context = self._context
  422. if not ray.available_resources().get("CPU"):
  423. if log_once("cpu_warning"):
  424. logger.warning(
  425. "Warning: The Ray cluster currently does not have "
  426. "any available CPUs. The Dataset job will hang unless more CPUs "
  427. "are freed up. A common reason is that cluster resources are "
  428. "used by Actors or Tune trials; see the following link "
  429. "for more details: "
  430. "https://docs.ray.io/en/latest/data/data-internals.html#ray-data-and-tune" # noqa: E501
  431. )
  432. if not self.has_computed_output():
  433. from ray.data._internal.execution.legacy_compat import (
  434. _get_initial_stats_from_plan,
  435. execute_to_legacy_block_list,
  436. )
  437. if (
  438. isinstance(self._logical_plan.dag, SourceOperator)
  439. and self._logical_plan.dag.output_data() is not None
  440. ):
  441. # If the data is already materialized (e.g., `from_pandas`), we can
  442. # skip execution and directly return the output data. This avoids
  443. # recording unnecessary metrics for an empty plan execution.
  444. stats = _get_initial_stats_from_plan(self)
  445. # TODO(@bveeramani): Make `ExecutionPlan.execute()` return
  446. # `List[RefBundle]` instead of `RefBundle`. Among other reasons, it'd
  447. # allow us to remove the unwrapping logic below.
  448. output_bundles = self._logical_plan.dag.output_data()
  449. owns_blocks = all(bundle.owns_blocks for bundle in output_bundles)
  450. schema = _take_first_non_empty_schema(
  451. bundle.schema for bundle in output_bundles
  452. )
  453. bundle = RefBundle(
  454. [
  455. (block, metadata)
  456. for bundle in output_bundles
  457. for block, metadata in bundle.blocks
  458. ],
  459. owns_blocks=owns_blocks,
  460. schema=schema,
  461. )
  462. else:
  463. # Make sure executor is properly shutdown
  464. with self.create_executor() as executor:
  465. blocks = execute_to_legacy_block_list(
  466. executor,
  467. self,
  468. dataset_uuid=self._dataset_uuid,
  469. preserve_order=preserve_order,
  470. )
  471. bundle = RefBundle(
  472. tuple(blocks.iter_blocks_with_metadata()),
  473. owns_blocks=blocks._owned_by_consumer,
  474. schema=blocks.get_schema(),
  475. )
  476. stats = executor.get_stats()
  477. stats_summary_string = stats.to_summary().to_string(
  478. include_parent=False
  479. )
  480. if context.enable_auto_log_stats:
  481. logger.info(stats_summary_string)
  482. # Retrieve memory-related stats from ray.
  483. try:
  484. reply = get_memory_info_reply(
  485. get_state_from_address(ray.get_runtime_context().gcs_address)
  486. )
  487. if reply.store_stats.spill_time_total_s > 0:
  488. stats.global_bytes_spilled = int(
  489. reply.store_stats.spilled_bytes_total
  490. )
  491. if reply.store_stats.restore_time_total_s > 0:
  492. stats.global_bytes_restored = int(
  493. reply.store_stats.restored_bytes_total
  494. )
  495. except Exception as e:
  496. logger.debug(
  497. "Skipping recording memory spilled and restored statistics due to "
  498. f"exception: {e}"
  499. )
  500. stats.dataset_bytes_spilled = 0
  501. def collect_stats(cur_stats):
  502. stats.dataset_bytes_spilled += cur_stats.extra_metrics.get(
  503. "obj_store_mem_spilled", 0
  504. )
  505. for parent in cur_stats.parents:
  506. collect_stats(parent)
  507. collect_stats(stats)
  508. # Set the snapshot to the output of the final operator.
  509. self._snapshot_bundle = bundle
  510. self._snapshot_operator = self._logical_plan.dag
  511. self._snapshot_stats = stats
  512. self._snapshot_stats.dataset_uuid = self._dataset_uuid
  513. return self._snapshot_bundle
  514. @property
  515. def has_started_execution(self) -> bool:
  516. """Return ``True`` if this plan has been partially or fully executed."""
  517. return self._has_started_execution
  518. def clear_snapshot(self) -> None:
  519. """Clear the snapshot kept in the plan to the beginning state."""
  520. self._snapshot_bundle = None
  521. self._snapshot_operator = None
  522. self._snapshot_stats = None
  523. def stats(self) -> DatasetStats:
  524. """Return stats for this plan.
  525. If the plan isn't executed, an empty stats object will be returned.
  526. """
  527. if not self._snapshot_stats:
  528. return DatasetStats(metadata={}, parent=None)
  529. return self._snapshot_stats
  530. def has_lazy_input(self) -> bool:
  531. """Return whether this plan has lazy input blocks."""
  532. return all(isinstance(op, Read) for op in self._logical_plan.sources())
  533. def has_computed_output(self) -> bool:
  534. """Whether this plan has a computed snapshot for the final operator, i.e. for
  535. the output of this plan.
  536. """
  537. return (
  538. self._snapshot_bundle is not None
  539. and self._snapshot_operator == self._logical_plan.dag
  540. )
  541. def require_preserve_order(self) -> bool:
  542. """Whether this plan requires to preserve order."""
  543. from ray.data._internal.logical.operators import Zip
  544. for op in self._logical_plan.dag.post_order_iter():
  545. if isinstance(op, Zip):
  546. return True
  547. return False