Skip to content

Commit af5f470

Browse files
authored
Support pushing down empty projections into joins (#20191)
## Which issue does this PR close? - Closes #20190. ## Rationale for this change We should push down empty projections into HashJoinExec ## What changes are included in this PR? 1. try_embed_projection should embed empty projections 2. build_batch_empty_build_side should support empty schemas ## Are these changes tested? Yes ## Are there any user-facing changes? No
1 parent 2cca3fa commit af5f470

File tree

6 files changed

+182
-64
lines changed

6 files changed

+182
-64
lines changed

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1723,3 +1723,47 @@ fn test_cooperative_exec_after_projection() -> Result<()> {
17231723

17241724
Ok(())
17251725
}
1726+
1727+
#[test]
1728+
fn test_hash_join_empty_projection_embeds() -> Result<()> {
1729+
let left_csv = create_simple_csv_exec();
1730+
let right_csv = create_simple_csv_exec();
1731+
1732+
let join = Arc::new(HashJoinExec::try_new(
1733+
left_csv,
1734+
right_csv,
1735+
vec![(Arc::new(Column::new("a", 0)), Arc::new(Column::new("a", 0)))],
1736+
None,
1737+
&JoinType::Right,
1738+
None,
1739+
PartitionMode::CollectLeft,
1740+
NullEquality::NullEqualsNothing,
1741+
false,
1742+
)?);
1743+
1744+
// Empty projection: no columns needed from the join output
1745+
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
1746+
vec![] as Vec<ProjectionExpr>,
1747+
join,
1748+
)?);
1749+
1750+
let after_optimize =
1751+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1752+
let after_optimize_string = displayable(after_optimize.as_ref())
1753+
.indent(true)
1754+
.to_string();
1755+
let actual = after_optimize_string.trim();
1756+
1757+
// The empty projection should be embedded into the HashJoinExec,
1758+
// resulting in projection=[] on the join and no ProjectionExec wrapper.
1759+
assert_snapshot!(
1760+
actual,
1761+
@r"
1762+
HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, a@0)], projection=[]
1763+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1764+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1765+
"
1766+
);
1767+
1768+
Ok(())
1769+
}

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -977,6 +977,17 @@ pub(crate) fn apply_join_filter_to_indices(
977977
))
978978
}
979979

980+
/// Creates a [RecordBatch] with zero columns but the given row count.
981+
/// Used when a join has an empty projection (e.g. `SELECT count(1) ...`).
982+
fn new_empty_schema_batch(schema: &Schema, row_count: usize) -> Result<RecordBatch> {
983+
let options = RecordBatchOptions::new().with_row_count(Some(row_count));
984+
Ok(RecordBatch::try_new_with_options(
985+
Arc::new(schema.clone()),
986+
vec![],
987+
&options,
988+
)?)
989+
}
990+
980991
/// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`.
981992
/// The resulting batch has [Schema] `schema`.
982993
pub(crate) fn build_batch_from_indices(
@@ -989,15 +1000,7 @@ pub(crate) fn build_batch_from_indices(
9891000
build_side: JoinSide,
9901001
) -> Result<RecordBatch> {
9911002
if schema.fields().is_empty() {
992-
let options = RecordBatchOptions::new()
993-
.with_match_field_names(true)
994-
.with_row_count(Some(build_indices.len()));
995-
996-
return Ok(RecordBatch::try_new_with_options(
997-
Arc::new(schema.clone()),
998-
vec![],
999-
&options,
1000-
)?);
1003+
return new_empty_schema_batch(schema, build_indices.len());
10011004
}
10021005

10031006
// build the columns of the new [RecordBatch]:
@@ -1057,6 +1060,9 @@ pub(crate) fn build_batch_empty_build_side(
10571060
// the remaining joins will return data for the right columns and null for the left ones
10581061
JoinType::Right | JoinType::Full | JoinType::RightAnti | JoinType::RightMark => {
10591062
let num_rows = probe_batch.num_rows();
1063+
if schema.fields().is_empty() {
1064+
return new_empty_schema_batch(schema, num_rows);
1065+
}
10601066
let mut columns: Vec<Arc<dyn Array>> =
10611067
Vec::with_capacity(schema.fields().len());
10621068

@@ -2889,4 +2895,35 @@ mod tests {
28892895

28902896
Ok(())
28912897
}
2898+
2899+
#[test]
2900+
fn test_build_batch_empty_build_side_empty_schema() -> Result<()> {
2901+
// When the output schema has no fields (empty projection pushed into
2902+
// the join), build_batch_empty_build_side should return a RecordBatch
2903+
// with the correct row count but no columns.
2904+
let empty_schema = Schema::empty();
2905+
2906+
let build_batch = RecordBatch::try_new(
2907+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
2908+
vec![Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3]))],
2909+
)?;
2910+
2911+
let probe_batch = RecordBatch::try_new(
2912+
Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)])),
2913+
vec![Arc::new(arrow::array::Int32Array::from(vec![4, 5, 6, 7]))],
2914+
)?;
2915+
2916+
let result = build_batch_empty_build_side(
2917+
&empty_schema,
2918+
&build_batch,
2919+
&probe_batch,
2920+
&[], // no column indices with empty projection
2921+
JoinType::Right,
2922+
)?;
2923+
2924+
assert_eq!(result.num_rows(), 4);
2925+
assert_eq!(result.num_columns(), 0);
2926+
2927+
Ok(())
2928+
}
28922929
}

datafusion/physical-plan/src/projection.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,15 @@ impl RecordBatchStream for ProjectionStream {
553553
}
554554
}
555555

556+
/// Trait for execution plans that can embed a projection, avoiding a separate
557+
/// [`ProjectionExec`] wrapper.
558+
///
559+
/// # Empty projections
560+
///
561+
/// `Some(vec![])` is a valid projection that produces zero output columns while
562+
/// preserving the correct row count. Implementors must ensure that runtime batch
563+
/// construction still returns batches with the right number of rows even when no
564+
/// columns are selected (e.g. for `SELECT count(1) … JOIN …`).
556565
pub trait EmbeddedProjection: ExecutionPlan + Sized {
557566
fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
558567
}
@@ -563,6 +572,15 @@ pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
563572
projection: &ProjectionExec,
564573
execution_plan: &Exec,
565574
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
575+
// If the projection has no expressions at all (e.g., ProjectionExec: expr=[]),
576+
// embed an empty projection into the execution plan so it outputs zero columns.
577+
// This avoids allocating throwaway null arrays for build-side columns
578+
// when no output columns are actually needed (e.g., count(1) over a right join).
579+
if projection.expr().is_empty() {
580+
let new_execution_plan = Arc::new(execution_plan.with_projection(Some(vec![]))?);
581+
return Ok(Some(new_execution_plan));
582+
}
583+
566584
// Collect all column indices from the given projection expressions.
567585
let projection_index = collect_column_indices(projection.expr());
568586

datafusion/sqllogictest/test_files/array.slt

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6606,10 +6606,9 @@ physical_plan
66066606
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
66076607
03)----CoalescePartitionsExec
66086608
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6609-
05)--------ProjectionExec: expr=[]
6610-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6611-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6612-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6609+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6610+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6611+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
66136612

66146613
query I
66156614
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6634,10 +6633,9 @@ physical_plan
66346633
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
66356634
03)----CoalescePartitionsExec
66366635
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6637-
05)--------ProjectionExec: expr=[]
6638-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6639-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6640-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6636+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6637+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6638+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
66416639

66426640
query I
66436641
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6662,10 +6660,9 @@ physical_plan
66626660
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
66636661
03)----CoalescePartitionsExec
66646662
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6665-
05)--------ProjectionExec: expr=[]
6666-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6667-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6668-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6663+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6664+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6665+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
66696666

66706667
query I
66716668
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6690,10 +6687,9 @@ physical_plan
66906687
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
66916688
03)----CoalescePartitionsExec
66926689
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6693-
05)--------ProjectionExec: expr=[]
6694-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6695-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6696-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6690+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6691+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6692+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
66976693

66986694
query I
66996695
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6718,10 +6714,9 @@ physical_plan
67186714
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
67196715
03)----CoalescePartitionsExec
67206716
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6721-
05)--------ProjectionExec: expr=[]
6722-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6723-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6724-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6717+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6718+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6719+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
67256720

67266721
query I
67276722
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6748,10 +6743,9 @@ physical_plan
67486743
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
67496744
03)----CoalescePartitionsExec
67506745
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6751-
05)--------ProjectionExec: expr=[]
6752-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IS NOT NULL OR NULL
6753-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6754-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6746+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IS NOT NULL OR NULL, projection=[]
6747+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6748+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
67556749

67566750
# any operator
67576751
query ?

0 commit comments

Comments
 (0)