optimizers.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. from typing import Callable, List
  2. from .ruleset import Ruleset
  3. from ray.data._internal.logical.interfaces import (
  4. LogicalPlan,
  5. Optimizer,
  6. PhysicalPlan,
  7. Plan,
  8. Rule,
  9. )
  10. from ray.data._internal.logical.rules import (
  11. CombineShuffles,
  12. ConfigureMapTaskMemoryUsingOutputSize,
  13. FuseOperators,
  14. InheritBatchFormatRule,
  15. InheritTargetMaxBlockSizeRule,
  16. LimitPushdownRule,
  17. PredicatePushdown,
  18. ProjectionPushdown,
  19. SetReadParallelismRule,
  20. )
  21. from ray.util.annotations import DeveloperAPI
  22. _LOGICAL_RULESET = Ruleset(
  23. [
  24. InheritBatchFormatRule,
  25. LimitPushdownRule,
  26. ProjectionPushdown,
  27. PredicatePushdown,
  28. CombineShuffles,
  29. ]
  30. )
  31. _PHYSICAL_RULESET = Ruleset(
  32. [
  33. InheritTargetMaxBlockSizeRule,
  34. SetReadParallelismRule,
  35. FuseOperators,
  36. ConfigureMapTaskMemoryUsingOutputSize,
  37. ]
  38. )
  39. @DeveloperAPI
  40. def get_logical_ruleset() -> Ruleset:
  41. return _LOGICAL_RULESET
  42. @DeveloperAPI
  43. def get_physical_ruleset() -> Ruleset:
  44. return _PHYSICAL_RULESET
  45. class LogicalOptimizer(Optimizer):
  46. """The optimizer for logical operators."""
  47. @property
  48. def rules(self) -> List[Rule]:
  49. return [rule_cls() for rule_cls in get_logical_ruleset()]
  50. class PhysicalOptimizer(Optimizer):
  51. """The optimizer for physical operators."""
  52. @property
  53. def rules(self) -> List[Rule]:
  54. return [rule_cls() for rule_cls in get_physical_ruleset()]
  55. def get_plan_conversion_fns() -> List[Callable[[Plan], Plan]]:
  56. """Get the list of transformation functions to convert a logical plan
  57. to an optimized physical plan.
  58. This returns the 3 transformation steps:
  59. 1. Logical optimization
  60. 2. Planning (logical -> physical operators)
  61. 3. Physical optimization
  62. Returns:
  63. A list of transformation functions, each taking a Plan and returning a Plan.
  64. """
  65. from ray.data._internal.planner import create_planner
  66. return [
  67. LogicalOptimizer().optimize, # Logical optimization
  68. create_planner().plan, # Planning
  69. PhysicalOptimizer().optimize, # Physical optimization
  70. ]
  71. def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan:
  72. """Get the physical execution plan for the provided logical plan.
  73. This process has 3 steps:
  74. (1) logical optimization: optimize logical operators.
  75. (2) planning: convert logical to physical operators.
  76. (3) physical optimization: optimize physical operators.
  77. """
  78. # 1. Get planning functions
  79. optimize_logical, plan, optimize_physical = get_plan_conversion_fns()
  80. # 2. Logical -> Logical (Optimized)
  81. optimized_logical_plan = optimize_logical(logical_plan)
  82. # 3. Rewire Logical -> Logical (Optimized)
  83. logical_plan._dag = optimized_logical_plan.dag
  84. # 4. Logical (Optimized) -> Physical
  85. physical_plan = plan(optimized_logical_plan)
  86. # 5. Physical (Optimized) -> Physical
  87. return optimize_physical(physical_plan)