Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 96 additions & 3 deletions datafusion/core/tests/physical_optimizer/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,26 @@
//! 4. Early termination is enabled for TopK queries
//! 5. Prefix matching works correctly

use arrow::array::{ArrayRef, Int64Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::prelude::SessionContext;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Result, assert_batches_eq};
use datafusion_physical_expr::expressions;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::pushdown_sort::PushdownSort;
use datafusion_physical_plan::collect;
use std::sync::Arc;

use crate::physical_optimizer::test_utils::{
OptimizationTest, TestScan, coalesce_partitions_exec, parquet_exec,
parquet_exec_with_sort, projection_exec, projection_exec_with_alias,
OptimizationTest, TestScan, coalesce_partitions_exec, inexact_memory_exec,
parquet_exec, parquet_exec_with_sort, projection_exec, projection_exec_with_alias,
repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch,
sort_expr, sort_expr_named, test_scan_with_ordering,
sort_exec_with_fetch_and_preserve_partitioning, sort_expr, sort_expr_named,
test_scan_with_ordering,
};

#[test]
Expand Down Expand Up @@ -119,6 +127,91 @@ fn test_sort_with_limit_phase1() {
);
}

#[test]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have positive tests now to add spm, may be also add some negative test which will not add spm.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, added a single-partition negative regression test that keeps the standalone TopK without adding SortPreservingMergeExec.

fn test_standalone_inexact_partitioned_topk_adds_global_merge() {
// Inexact pushdown keeps the SortExec. If that SortExec is a
// partition-preserving TopK, it still needs a final merge across partitions
// to preserve the global ORDER BY ... LIMIT semantics.
let schema = schema();
let a = sort_expr("a", &schema);
let source = Arc::new(TestScan::new(schema.clone(), vec![]).with_partition_count(2));

let ordering = LexOrdering::new(vec![a]).unwrap();
let plan = sort_exec_with_fetch_and_preserve_partitioning(ordering, Some(10), source);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[true]
- TestScan
output:
Ok:
- SortPreservingMergeExec: [a@0 ASC], fetch=10
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[true]
- TestScan: requested_ordering=[a@0 ASC]
"
);
}

#[test]
fn test_standalone_inexact_single_partition_topk_no_global_merge() {
let schema = schema();
let a = sort_expr("a", &schema);
let source = Arc::new(TestScan::new(schema.clone(), vec![]).with_partition_count(1));

let ordering = LexOrdering::new(vec![a]).unwrap();
let plan = sort_exec_with_fetch_and_preserve_partitioning(ordering, Some(10), source);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[true]
- TestScan
output:
Ok:
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[true]
- TestScan: requested_ordering=[a@0 ASC]
"
);
}

#[tokio::test]
async fn test_standalone_inexact_partitioned_topk_returns_global_limit() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
let partition_0 = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int64Array::from(vec![1, 100])) as ArrayRef],
)?;
let partition_1 = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int64Array::from(vec![2, 3])) as ArrayRef],
)?;
let source = inexact_memory_exec(
&[vec![partition_0], vec![partition_1]],
Arc::clone(&schema),
)?;

let ordering = LexOrdering::new(vec![sort_expr("a", &schema)]).unwrap();
let plan = sort_exec_with_fetch_and_preserve_partitioning(ordering, Some(3), source);

let mut config = ConfigOptions::new();
config.optimizer.enable_sort_pushdown = true;
let optimized = PushdownSort::new().optimize(plan, &config)?;

let ctx = SessionContext::new();
let batches = collect(optimized, ctx.task_ctx()).await?;

let expected = [
"+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+",
];
assert_batches_eq!(expected, &batches);
Ok(())
}

#[test]
fn test_sort_multiple_columns_phase1() {
// Phase 1: Sort on multiple columns - reverse multi-column ordering
Expand Down
94 changes: 93 additions & 1 deletion datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::source::DataSourceExec;
use datafusion::datasource::source::{DataSource, DataSourceExec};
use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
Expand All @@ -43,6 +43,7 @@ use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::{self, col};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, OrderingRequirements, PhysicalSortExpr,
Expand Down Expand Up @@ -230,6 +231,16 @@ pub fn memory_exec(schema: &SchemaRef) -> Arc<dyn ExecutionPlan> {
MemorySourceConfig::try_new_exec(&[vec![]], Arc::clone(schema), None).unwrap()
}

pub fn inexact_memory_exec(
partitions: &[Vec<RecordBatch>],
schema: SchemaRef,
) -> Result<Arc<dyn ExecutionPlan>> {
let source = InexactMemorySource {
inner: MemorySourceConfig::try_new(partitions, schema, None)?,
};
Ok(Arc::new(DataSourceExec::new(Arc::new(source))))
}

pub fn hash_join_exec(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -373,6 +384,18 @@ pub fn sort_exec_with_preserve_partitioning(
Arc::new(SortExec::new(ordering, input).with_preserve_partitioning(true))
}

pub fn sort_exec_with_fetch_and_preserve_partitioning(
ordering: LexOrdering,
fetch: Option<usize>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
SortExec::new(ordering, input)
.with_fetch(fetch)
.with_preserve_partitioning(true),
)
}

pub fn sort_exec_with_fetch(
ordering: LexOrdering,
fetch: Option<usize>,
Expand Down Expand Up @@ -893,6 +916,18 @@ impl TestScan {
self.supports_fetch = supports;
self
}

/// Set the number of output partitions reported by this scan.
pub fn with_partition_count(mut self, partition_count: usize) -> Self {
let eq_properties = self.plan_properties.equivalence_properties().clone();
self.plan_properties = Arc::new(PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(partition_count),
EmissionType::Incremental,
Boundedness::Bounded,
));
self
}
}

impl DisplayAs for TestScan {
Expand Down Expand Up @@ -1028,3 +1063,60 @@ pub fn test_scan_with_ordering(
) -> Arc<dyn ExecutionPlan> {
Arc::new(TestScan::with_ordering(schema, ordering))
}

#[derive(Debug, Clone)]
struct InexactMemorySource {
inner: MemorySourceConfig,
}

impl DataSource for InexactMemorySource {
fn open(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.inner.open(partition, context)
}

fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
self.inner.fmt_as(t, f)
}

fn output_partitioning(&self) -> Partitioning {
self.inner.output_partitioning()
}

fn eq_properties(&self) -> EquivalenceProperties {
self.inner.eq_properties()
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
self.inner.partition_statistics(partition)
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
let mut new_source = self.clone();
new_source.inner = new_source.inner.with_limit(limit);
Some(Arc::new(new_source))
}

fn fetch(&self) -> Option<usize> {
self.inner.fetch()
}

fn try_swapping_with_projection(
&self,
_projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn DataSource>>> {
Ok(None)
}

fn try_pushdown_sort(
&self,
_order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(self.clone()),
})
}
}
48 changes: 40 additions & 8 deletions datafusion/physical-optimizer/src/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@
use crate::PhysicalOptimizerRule;
use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::buffer::BufferExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use std::sync::Arc;

/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
Expand Down Expand Up @@ -133,7 +135,15 @@ impl PhysicalOptimizerRule for PushdownSort {
Arc::new(new_sort),
)
.with_fetch(spm.fetch());
return Ok(Transformed::yes(Arc::new(new_spm)));
// The replacement already has the required
// `SortPreservingMergeExec` parent. Do not descend
// into its `SortExec` child and treat it as a
// standalone TopK.
return Ok(Transformed::new(
Arc::new(new_spm),
true,
TreeNodeRecursion::Jump,
));
}
SortOrderPushdownResult::Unsupported => {
return Ok(Transformed::no(plan));
Expand Down Expand Up @@ -172,13 +182,35 @@ impl PhysicalOptimizerRule for PushdownSort {
// Data source is optimized for the ordering but not perfectly sorted
// Keep the Sort operator but use the optimized input
// Benefits: TopK queries can terminate early, better cache locality
Ok(Transformed::yes(Arc::new(
// A standalone multi-partition TopK still needs a global
// merge; otherwise a later coalesce can concatenate
// locally sorted partitions.
let preserve_partitioning = sort_exec.preserve_partitioning();
let needs_global_topk =
preserve_partitioning && sort_exec.fetch().is_some();
let input_partitions = inner.output_partitioning().partition_count();
let new_sort: Arc<dyn ExecutionPlan> = Arc::new(
SortExec::new(required_ordering.clone(), inner)
.with_fetch(sort_exec.fetch())
.with_preserve_partitioning(
sort_exec.preserve_partitioning(),
),
)))
.with_preserve_partitioning(preserve_partitioning),
);
if needs_global_topk && input_partitions > 1 {
let new_spm = SortPreservingMergeExec::new(
required_ordering.clone(),
new_sort,
)
.with_fetch(sort_exec.fetch());
// Do not descend into the newly inserted
// `SortExec`, or this standalone branch will wrap it
// in another `SortPreservingMergeExec`.
Ok(Transformed::new(
Arc::new(new_spm),
true,
TreeNodeRecursion::Jump,
))
} else {
Ok(Transformed::yes(new_sort))
}
}
SortOrderPushdownResult::Unsupported => {
// Cannot optimize for this ordering - no change
Expand Down
Loading