| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644 |
- import copy
- import itertools
- import logging
- from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple, Type, Union
- import pyarrow
- import ray
- from ray._private.internal_api import get_memory_info_reply, get_state_from_address
- from ray.data._internal.execution.interfaces import RefBundle
- from ray.data._internal.logical.interfaces import SourceOperator
- from ray.data._internal.logical.interfaces.logical_operator import LogicalOperator
- from ray.data._internal.logical.interfaces.logical_plan import LogicalPlan
- from ray.data._internal.logical.interfaces.operator import Operator
- from ray.data._internal.logical.operators import Read
- from ray.data._internal.logical.optimizers import get_plan_conversion_fns
- from ray.data._internal.stats import DatasetStats
- from ray.data.block import BlockMetadataWithSchema, _take_first_non_empty_schema
- from ray.data.context import DataContext
- from ray.data.exceptions import omit_traceback_stdout
- from ray.util.debug import log_once
- if TYPE_CHECKING:
- from ray.data._internal.execution.streaming_executor import (
- StreamingExecutor,
- )
- from ray.data.dataset import Dataset
- # Scheduling strategy can be inherited from prev operator if not specified.
- INHERITABLE_REMOTE_ARGS = ["scheduling_strategy"]
- logger = logging.getLogger(__name__)
- class ExecutionPlan:
- """A lazy execution plan for a Dataset.
- This lazy execution plan builds up a chain of ``List[RefBundle]`` -->
- ``List[RefBundle]`` operators. Prior to execution, we apply a set of logical
- plan optimizations, such as operator fusion, in order to reduce Ray task
- overhead and data copies.
- Internally, the execution plan holds a snapshot of a computed list of
- blocks and their associated metadata under ``self._snapshot_bundle``,
- where this snapshot is the cached output of executing the operator chain."""
- def __init__(
- self,
- stats: DatasetStats,
- data_context: DataContext,
- ):
- """Create a plan with no transformation operators.
- Args:
- stats: Stats for the base blocks.
- data_context: :class:`~ray.data.context.DataContext`
- object to use for execution.
- """
- self._in_stats = stats
- # A computed snapshot of some prefix of operators and their corresponding
- # output blocks and stats.
- self._snapshot_operator: Optional[LogicalOperator] = None
- self._snapshot_stats = None
- self._snapshot_bundle = None
- # Snapshot of only metadata corresponding to the final operator's
- # output bundles, used as the source of truth for the Dataset's schema
- # and count. This is calculated and cached when the plan is executed as an
- # iterator (`execute_to_iterator()`), and avoids caching
- # all of the output blocks in memory like in `self.snapshot_bundle`.
- # TODO(scottjlee): To keep the caching logic consistent, update `execute()`
- # to also store the metadata in `_snapshot_metadata` instead of
- # `_snapshot_bundle`. For example, we could store the blocks in
- # `self._snapshot_blocks` and the metadata in `self._snapshot_metadata`.
- self._snapshot_metadata_schema: Optional["BlockMetadataWithSchema"] = None
- # Cached schema.
- self._schema = None
- # Set when a Dataset is constructed with this plan
- self._dataset_uuid = None
- # Index of the current execution.
- self._run_index = -1
- self._dataset_name = None
- self._has_started_execution = False
- self._context = data_context
- def get_dataset_id(self) -> str:
- """Unique ID of the dataset, including the dataset name,
- UUID, and current execution index.
- """
- return (
- f"{self._dataset_name or 'dataset'}_{self._dataset_uuid}_{self._run_index}"
- )
- def create_executor(self) -> "StreamingExecutor":
- """Create an executor for this plan."""
- from ray.data._internal.execution.streaming_executor import StreamingExecutor
- self._run_index += 1
- executor = StreamingExecutor(self._context, self.get_dataset_id())
- return executor
- def __repr__(self) -> str:
- return (
- f"ExecutionPlan("
- f"dataset_uuid={self._dataset_uuid}, "
- f"snapshot_operator={self._snapshot_operator}"
- f")"
- )
- def explain(self) -> str:
- """Return a string representation of the logical and physical plan."""
- convert_fns = [lambda x: x] + get_plan_conversion_fns()
- titles: List[str] = [
- "Logical Plan",
- "Logical Plan (Optimized)",
- "Physical Plan",
- "Physical Plan (Optimized)",
- ]
- # 1. Set initial plan
- plan = self._logical_plan
- sections = []
- for title, convert_fn in zip(titles, convert_fns):
- # 2. Convert plan to new plan
- plan = convert_fn(plan)
- # 3. Generate plan str from new plan.
- plan_str, _ = self.generate_plan_string(plan.dag, show_op_repr=True)
- banner = f"\n-------- {title} --------\n"
- section = f"{banner}{plan_str}"
- sections.append(section)
- return "".join(sections)
- @staticmethod
- def generate_plan_string(
- op: Operator,
- curr_str: str = "",
- depth: int = 0,
- including_source: bool = True,
- show_op_repr: bool = False,
- ):
- """Traverse (DFS) the Plan DAG and
- return a string representation of the operators."""
- if not including_source and isinstance(op, SourceOperator):
- return curr_str, depth
- curr_max_depth = depth
- # For logical plan, only show the operator name like "Aggregate".
- # But for physical plan, show the operator class name as well like "AllToAllOperator[Aggregate]".
- op_str = repr(op) if show_op_repr else op.name
- if depth == 0:
- curr_str += f"{op_str}\n"
- else:
- trailing_space = " " * ((depth - 1) * 3)
- curr_str += f"{trailing_space}+- {op_str}\n"
- for input in op.input_dependencies:
- curr_str, input_max_depth = ExecutionPlan.generate_plan_string(
- input, curr_str, depth + 1, including_source, show_op_repr
- )
- curr_max_depth = max(curr_max_depth, input_max_depth)
- return curr_str, curr_max_depth
- def get_plan_as_string(self, dataset_cls: Type["Dataset"]) -> str:
- """Create a cosmetic string representation of this execution plan.
- Returns:
- The string representation of this execution plan.
- """
- # NOTE: this is used for Dataset.__repr__ to give a user-facing string
- # representation. Ideally ExecutionPlan.__repr__ should be replaced with this
- # method as well.
- from ray.data.dataset import MaterializedDataset
- # Do not force execution for schema, as this method is expected to be very
- # cheap.
- plan_str = ""
- plan_max_depth = 0
- if not self.has_computed_output():
- # using dataset as source here, so don't generate source operator in generate_plan_string
- plan_str, plan_max_depth = self.generate_plan_string(
- self._logical_plan.dag, including_source=False
- )
- if self._snapshot_bundle is not None:
- # This plan has executed some but not all operators.
- schema = self._snapshot_bundle.schema
- count = self._snapshot_bundle.num_rows()
- elif self._snapshot_metadata_schema is not None:
- schema = self._snapshot_metadata_schema.schema
- count = self._snapshot_metadata_schema.metadata.num_rows
- else:
- # This plan hasn't executed any operators.
- has_n_ary_operator = False
- dag = self._logical_plan.dag
- while not isinstance(dag, SourceOperator):
- if len(dag.input_dependencies) > 1:
- has_n_ary_operator = True
- break
- dag = dag.input_dependencies[0]
- # TODO(@bveeramani): Handle schemas for n-ary operators like `Union`.
- if has_n_ary_operator:
- schema = None
- count = None
- else:
- assert isinstance(dag, SourceOperator), dag
- plan = ExecutionPlan(
- DatasetStats(metadata={}, parent=None),
- self._context,
- )
- plan.link_logical_plan(LogicalPlan(dag, plan._context))
- schema = plan.schema()
- count = plan.meta_count()
- else:
- # Get schema of output blocks.
- schema = self.schema(fetch_if_missing=False)
- count = self._snapshot_bundle.num_rows()
- if schema is None:
- schema_str = "Unknown schema"
- elif isinstance(schema, type):
- schema_str = str(schema)
- else:
- schema_str = []
- for n, t in zip(schema.names, schema.types):
- if hasattr(t, "__name__"):
- t = t.__name__
- schema_str.append(f"{n}: {t}")
- schema_str = ", ".join(schema_str)
- schema_str = "{" + schema_str + "}"
- if count is None:
- count = "?"
- num_blocks = None
- if dataset_cls == MaterializedDataset:
- num_blocks = self.initial_num_blocks()
- assert num_blocks is not None
- name_str = (
- "name={}, ".format(self._dataset_name)
- if self._dataset_name is not None
- else ""
- )
- num_blocks_str = f"num_blocks={num_blocks}, " if num_blocks else ""
- dataset_str = "{}({}{}num_rows={}, schema={})".format(
- dataset_cls.__name__,
- name_str,
- num_blocks_str,
- count,
- schema_str,
- )
- # If the resulting string representation fits in one line, use it directly.
- SCHEMA_LINE_CHAR_LIMIT = 80
- MIN_FIELD_LENGTH = 10
- INDENT_STR = " " * 3
- trailing_space = INDENT_STR * plan_max_depth
- if len(dataset_str) > SCHEMA_LINE_CHAR_LIMIT:
- # If the resulting string representation exceeds the line char limit,
- # first try breaking up each `Dataset` parameter into its own line
- # and check if each line fits within the line limit. We check the
- # `schema` param's length, since this is likely the longest string.
- schema_str_on_new_line = f"{trailing_space}{INDENT_STR}schema={schema_str}"
- if len(schema_str_on_new_line) > SCHEMA_LINE_CHAR_LIMIT:
- # If the schema cannot fit on a single line, break up each field
- # into its own line.
- schema_str = []
- for n, t in zip(schema.names, schema.types):
- if hasattr(t, "__name__"):
- t = t.__name__
- col_str = f"{trailing_space}{INDENT_STR * 2}{n}: {t}"
- # If the field line exceeds the char limit, abbreviate
- # the field name to fit while maintaining the full type
- if len(col_str) > SCHEMA_LINE_CHAR_LIMIT:
- shortened_suffix = f"...: {str(t)}"
- # Show at least 10 characters of the field name, even if
- # we have already hit the line limit with the type.
- chars_left_for_col_name = max(
- SCHEMA_LINE_CHAR_LIMIT - len(shortened_suffix),
- MIN_FIELD_LENGTH,
- )
- col_str = (
- f"{col_str[:chars_left_for_col_name]}{shortened_suffix}"
- )
- schema_str.append(col_str)
- schema_str = ",\n".join(schema_str)
- schema_str = (
- "{\n" + schema_str + f"\n{trailing_space}{INDENT_STR}" + "}"
- )
- name_str = (
- f"\n{trailing_space}{INDENT_STR}name={self._dataset_name},"
- if self._dataset_name is not None
- else ""
- )
- num_blocks_str = (
- f"\n{trailing_space}{INDENT_STR}num_blocks={num_blocks},"
- if num_blocks
- else ""
- )
- dataset_str = (
- f"{dataset_cls.__name__}("
- f"{name_str}"
- f"{num_blocks_str}"
- f"\n{trailing_space}{INDENT_STR}num_rows={count},"
- f"\n{trailing_space}{INDENT_STR}schema={schema_str}"
- f"\n{trailing_space})"
- )
- if plan_max_depth == 0:
- plan_str += dataset_str
- else:
- plan_str += f"{INDENT_STR * (plan_max_depth - 1)}+- {dataset_str}"
- return plan_str
- def link_logical_plan(self, logical_plan: "LogicalPlan"):
- """Link the logical plan into this execution plan.
- This is used for triggering execution for optimizer code path in this legacy
- execution plan.
- """
- self._logical_plan = logical_plan
- self._logical_plan._context = self._context
- def copy(self) -> "ExecutionPlan":
- """Create a shallow copy of this execution plan.
- This copy can be executed without mutating the original, but clearing the copy
- will also clear the original.
- Returns:
- A shallow copy of this execution plan.
- """
- plan_copy = ExecutionPlan(
- self._in_stats,
- data_context=self._context,
- )
- if self._snapshot_bundle is not None:
- # Copy over the existing snapshot.
- plan_copy._snapshot_bundle = self._snapshot_bundle
- plan_copy._snapshot_operator = self._snapshot_operator
- plan_copy._snapshot_stats = self._snapshot_stats
- plan_copy._dataset_name = self._dataset_name
- return plan_copy
- def deep_copy(self) -> "ExecutionPlan":
- """Create a deep copy of this execution plan.
- This copy can be executed AND cleared without mutating the original.
- Returns:
- A deep copy of this execution plan.
- """
- plan_copy = ExecutionPlan(
- copy.copy(self._in_stats),
- data_context=self._context.copy(),
- )
- if self._snapshot_bundle:
- # Copy over the existing snapshot.
- plan_copy._snapshot_bundle = copy.copy(self._snapshot_bundle)
- plan_copy._snapshot_operator = copy.copy(self._snapshot_operator)
- plan_copy._snapshot_stats = copy.copy(self._snapshot_stats)
- plan_copy._dataset_name = self._dataset_name
- return plan_copy
- def initial_num_blocks(self) -> Optional[int]:
- """Get the estimated number of blocks from the logical plan
- after applying execution plan optimizations, but prior to
- fully executing the dataset."""
- return self._logical_plan.dag.estimated_num_outputs()
- def schema(
- self, fetch_if_missing: bool = False
- ) -> Union[type, "pyarrow.lib.Schema"]:
- """Get the schema after applying all execution plan optimizations,
- but prior to fully executing the dataset
- (unless `fetch_if_missing` is set to True).
- Args:
- fetch_if_missing: Whether to execute the plan to fetch the schema.
- Returns:
- The schema of the output dataset.
- """
- if self._schema is not None:
- return self._schema
- schema = None
- if self.has_computed_output():
- schema = self._snapshot_bundle.schema
- else:
- schema = self._logical_plan.dag.infer_schema()
- if schema is None and fetch_if_missing:
- # For consistency with the previous implementation, we fetch the schema if
- # the plan is read-only even if `fetch_if_missing` is False.
- iter_ref_bundles, _, executor = self.execute_to_iterator()
- # Make sure executor is fully shutdown upon exiting
- with executor:
- schema = _take_first_non_empty_schema(
- bundle.schema for bundle in iter_ref_bundles
- )
- self.cache_schema(schema)
- return self._schema
- def cache_schema(self, schema: Union[type, "pyarrow.lib.Schema"]):
- self._schema = schema
- def input_files(self) -> Optional[List[str]]:
- """Get the input files of the dataset, if available."""
- return self._logical_plan.dag.infer_metadata().input_files
- def meta_count(self) -> Optional[int]:
- """Get the number of rows after applying all plan optimizations, if possible.
- This method will never trigger any computation.
- Returns:
- The number of records of the result Dataset, or None.
- """
- dag = self._logical_plan.dag
- if self.has_computed_output():
- num_rows = sum(m.num_rows for m in self._snapshot_bundle.metadata)
- elif dag.infer_metadata().num_rows is not None:
- num_rows = dag.infer_metadata().num_rows
- else:
- num_rows = None
- return num_rows
- @omit_traceback_stdout
- def execute_to_iterator(
- self,
- ) -> Tuple[Iterator[RefBundle], DatasetStats, Optional["StreamingExecutor"]]:
- """Execute this plan, returning an iterator.
- This will use streaming execution to generate outputs.
- NOTE: Executor will be shutdown upon either of the 2 following conditions:
- - Iterator is fully exhausted (ie until StopIteration is raised)
- - Executor instances is garbage-collected
- Returns:
- Tuple of iterator over output RefBundles, DatasetStats, and the executor.
- """
- self._has_started_execution = True
- if self.has_computed_output():
- bundle = self.execute()
- return iter([bundle]), self._snapshot_stats, None
- from ray.data._internal.execution.legacy_compat import (
- execute_to_legacy_bundle_iterator,
- )
- executor = self.create_executor()
- bundle_iter = execute_to_legacy_bundle_iterator(executor, self)
- # Since the generator doesn't run any code until we try to fetch the first
- # value, force execution of one bundle before we call get_stats().
- gen = iter(bundle_iter)
- try:
- bundle_iter = itertools.chain([next(gen)], gen)
- except StopIteration:
- pass
- self._snapshot_stats = executor.get_stats()
- return bundle_iter, self._snapshot_stats, executor
- @omit_traceback_stdout
- def execute(
- self,
- preserve_order: bool = False,
- ) -> RefBundle:
- """Executes this plan (eagerly).
- Args:
- preserve_order: Whether to preserve order in execution.
- Returns:
- The blocks of the output dataset.
- """
- self._has_started_execution = True
- # Always used the saved context for execution.
- context = self._context
- if not ray.available_resources().get("CPU"):
- if log_once("cpu_warning"):
- logger.warning(
- "Warning: The Ray cluster currently does not have "
- "any available CPUs. The Dataset job will hang unless more CPUs "
- "are freed up. A common reason is that cluster resources are "
- "used by Actors or Tune trials; see the following link "
- "for more details: "
- "https://docs.ray.io/en/latest/data/data-internals.html#ray-data-and-tune" # noqa: E501
- )
- if not self.has_computed_output():
- from ray.data._internal.execution.legacy_compat import (
- _get_initial_stats_from_plan,
- execute_to_legacy_block_list,
- )
- if (
- isinstance(self._logical_plan.dag, SourceOperator)
- and self._logical_plan.dag.output_data() is not None
- ):
- # If the data is already materialized (e.g., `from_pandas`), we can
- # skip execution and directly return the output data. This avoids
- # recording unnecessary metrics for an empty plan execution.
- stats = _get_initial_stats_from_plan(self)
- # TODO(@bveeramani): Make `ExecutionPlan.execute()` return
- # `List[RefBundle]` instead of `RefBundle`. Among other reasons, it'd
- # allow us to remove the unwrapping logic below.
- output_bundles = self._logical_plan.dag.output_data()
- owns_blocks = all(bundle.owns_blocks for bundle in output_bundles)
- schema = _take_first_non_empty_schema(
- bundle.schema for bundle in output_bundles
- )
- bundle = RefBundle(
- [
- (block, metadata)
- for bundle in output_bundles
- for block, metadata in bundle.blocks
- ],
- owns_blocks=owns_blocks,
- schema=schema,
- )
- else:
- # Make sure executor is properly shutdown
- with self.create_executor() as executor:
- blocks = execute_to_legacy_block_list(
- executor,
- self,
- dataset_uuid=self._dataset_uuid,
- preserve_order=preserve_order,
- )
- bundle = RefBundle(
- tuple(blocks.iter_blocks_with_metadata()),
- owns_blocks=blocks._owned_by_consumer,
- schema=blocks.get_schema(),
- )
- stats = executor.get_stats()
- stats_summary_string = stats.to_summary().to_string(
- include_parent=False
- )
- if context.enable_auto_log_stats:
- logger.info(stats_summary_string)
- # Retrieve memory-related stats from ray.
- try:
- reply = get_memory_info_reply(
- get_state_from_address(ray.get_runtime_context().gcs_address)
- )
- if reply.store_stats.spill_time_total_s > 0:
- stats.global_bytes_spilled = int(
- reply.store_stats.spilled_bytes_total
- )
- if reply.store_stats.restore_time_total_s > 0:
- stats.global_bytes_restored = int(
- reply.store_stats.restored_bytes_total
- )
- except Exception as e:
- logger.debug(
- "Skipping recording memory spilled and restored statistics due to "
- f"exception: {e}"
- )
- stats.dataset_bytes_spilled = 0
- def collect_stats(cur_stats):
- stats.dataset_bytes_spilled += cur_stats.extra_metrics.get(
- "obj_store_mem_spilled", 0
- )
- for parent in cur_stats.parents:
- collect_stats(parent)
- collect_stats(stats)
- # Set the snapshot to the output of the final operator.
- self._snapshot_bundle = bundle
- self._snapshot_operator = self._logical_plan.dag
- self._snapshot_stats = stats
- self._snapshot_stats.dataset_uuid = self._dataset_uuid
- return self._snapshot_bundle
- @property
- def has_started_execution(self) -> bool:
- """Return ``True`` if this plan has been partially or fully executed."""
- return self._has_started_execution
- def clear_snapshot(self) -> None:
- """Clear the snapshot kept in the plan to the beginning state."""
- self._snapshot_bundle = None
- self._snapshot_operator = None
- self._snapshot_stats = None
- def stats(self) -> DatasetStats:
- """Return stats for this plan.
- If the plan isn't executed, an empty stats object will be returned.
- """
- if not self._snapshot_stats:
- return DatasetStats(metadata={}, parent=None)
- return self._snapshot_stats
- def has_lazy_input(self) -> bool:
- """Return whether this plan has lazy input blocks."""
- return all(isinstance(op, Read) for op in self._logical_plan.sources())
- def has_computed_output(self) -> bool:
- """Whether this plan has a computed snapshot for the final operator, i.e. for
- the output of this plan.
- """
- return (
- self._snapshot_bundle is not None
- and self._snapshot_operator == self._logical_plan.dag
- )
- def require_preserve_order(self) -> bool:
- """Whether this plan requires to preserve order."""
- from ray.data._internal.logical.operators import Zip
- for op in self._logical_plan.dag.post_order_iter():
- if isinstance(op, Zip):
- return True
- return False
|