| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- from typing import Callable, List
- from .ruleset import Ruleset
- from ray.data._internal.logical.interfaces import (
- LogicalPlan,
- Optimizer,
- PhysicalPlan,
- Plan,
- Rule,
- )
- from ray.data._internal.logical.rules import (
- CombineShuffles,
- ConfigureMapTaskMemoryUsingOutputSize,
- FuseOperators,
- InheritBatchFormatRule,
- InheritTargetMaxBlockSizeRule,
- LimitPushdownRule,
- PredicatePushdown,
- ProjectionPushdown,
- SetReadParallelismRule,
- )
- from ray.util.annotations import DeveloperAPI
- _LOGICAL_RULESET = Ruleset(
- [
- InheritBatchFormatRule,
- LimitPushdownRule,
- ProjectionPushdown,
- PredicatePushdown,
- CombineShuffles,
- ]
- )
- _PHYSICAL_RULESET = Ruleset(
- [
- InheritTargetMaxBlockSizeRule,
- SetReadParallelismRule,
- FuseOperators,
- ConfigureMapTaskMemoryUsingOutputSize,
- ]
- )
- @DeveloperAPI
- def get_logical_ruleset() -> Ruleset:
- return _LOGICAL_RULESET
- @DeveloperAPI
- def get_physical_ruleset() -> Ruleset:
- return _PHYSICAL_RULESET
- class LogicalOptimizer(Optimizer):
- """The optimizer for logical operators."""
- @property
- def rules(self) -> List[Rule]:
- return [rule_cls() for rule_cls in get_logical_ruleset()]
- class PhysicalOptimizer(Optimizer):
- """The optimizer for physical operators."""
- @property
- def rules(self) -> List[Rule]:
- return [rule_cls() for rule_cls in get_physical_ruleset()]
- def get_plan_conversion_fns() -> List[Callable[[Plan], Plan]]:
- """Get the list of transformation functions to convert a logical plan
- to an optimized physical plan.
- This returns the 3 transformation steps:
- 1. Logical optimization
- 2. Planning (logical -> physical operators)
- 3. Physical optimization
- Returns:
- A list of transformation functions, each taking a Plan and returning a Plan.
- """
- from ray.data._internal.planner import create_planner
- return [
- LogicalOptimizer().optimize, # Logical optimization
- create_planner().plan, # Planning
- PhysicalOptimizer().optimize, # Physical optimization
- ]
- def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan:
- """Get the physical execution plan for the provided logical plan.
- This process has 3 steps:
- (1) logical optimization: optimize logical operators.
- (2) planning: convert logical to physical operators.
- (3) physical optimization: optimize physical operators.
- """
- # 1. Get planning functions
- optimize_logical, plan, optimize_physical = get_plan_conversion_fns()
- # 2. Logical -> Logical (Optimized)
- optimized_logical_plan = optimize_logical(logical_plan)
- # 3. Rewire Logical -> Logical (Optimized)
- logical_plan._dag = optimized_logical_plan.dag
- # 4. Logical (Optimized) -> Physical
- physical_plan = plan(optimized_logical_plan)
- # 5. Physical (Optimized) -> Physical
- return optimize_physical(physical_plan)
|