-
Notifications
You must be signed in to change notification settings - Fork 2k
Description
Describe the bug
This is a regression found on branch-53.
When a physical right join (e.g., from a RightAnti) is executed with an empty schema (e.g., from count(*) queries), it returns invalid results. The issue comes from the build_batch_from_indices function:
datafusion/datafusion/physical-plan/src/joins/utils.rs
Lines 913 to 925 in d1a3058
| pub(crate) fn apply_join_filter_to_indices( | |
| build_input_buffer: &RecordBatch, | |
| probe_batch: &RecordBatch, | |
| build_indices: UInt64Array, | |
| probe_indices: UInt32Array, | |
| filter: &JoinFilter, | |
| build_side: JoinSide, | |
| max_intermediate_size: Option<usize>, | |
| ) -> Result<(UInt64Array, UInt32Array)> { | |
| if build_indices.is_empty() && probe_indices.is_empty() { | |
| return Ok((build_indices, probe_indices)); | |
| }; | |
If the schema is empty, an empty batch with build_indices.len() rows is returned. However, if the join type is, e.g., RightAnti, the correct number of rows should be retrieved from the probe side.
One way I see of fixing this would be to add the join type to build_batch_from_indices, since I don't think that info is available from the current arguments. However this might require a large number of changes.
To Reproduce
create table t1 (k int, v int);
create table t2 (k int, v int);
insert into t1 select i as k, i as v from generate_series(1, 100) t(i);
insert into t2 values (1, 1);
-- select * is ok
with t as (
select *
from t1
left anti join t2 on t1.k = t2.k
)
select *
from t;
+----+----+
| k | v |
+----+----+
| 2 | 2 |
| 3 | 3 |
| 4 | 4 |
| 5 | 5 |
...
+----+----+
99 row(s) fetched.
-- select count(*) is wrong
with t as (
select *
from t1
left anti join t2 on t1.k = t2.k
)
select count(*)
from t;
+----------+
| count(*) |
+----------+
| 1 |
+----------+
1 row(s) fetched.Expected behavior
Return the correct number of rows.
Additional context
This bug was exposed by the empty project optimization done by #20191, but the behavior of always returning the number of rows from the build side already existed.