Skip to content

Commit 8b84522

Browse files
HashJoin can preserve the right ordering when join type is Right (#11276)
* Preserve right hash order * Update joins.slt * better sorting alg * remove result * Review * add plan test * Address review feedback * Expand test coverage and update docstring --------- Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
1 parent 1ea6545 commit 8b84522

File tree

4 files changed

+295
-31
lines changed

4 files changed

+295
-31
lines changed

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,10 @@ impl HashJoinExec {
433433
false,
434434
matches!(
435435
join_type,
436-
JoinType::Inner | JoinType::RightAnti | JoinType::RightSemi
436+
JoinType::Inner
437+
| JoinType::Right
438+
| JoinType::RightAnti
439+
| JoinType::RightSemi
437440
),
438441
]
439442
}
@@ -779,6 +782,7 @@ impl ExecutionPlan for HashJoinExec {
779782
build_side: BuildSide::Initial(BuildSideInitialState { left_fut }),
780783
batch_size,
781784
hashes_buffer: vec![],
785+
right_side_ordered: self.right.output_ordering().is_some(),
782786
}))
783787
}
784788

@@ -1107,6 +1111,8 @@ struct HashJoinStream {
11071111
batch_size: usize,
11081112
/// Scratch space for computing hashes
11091113
hashes_buffer: Vec<u64>,
1114+
/// Specifies whether the right side has an ordering to potentially preserve
1115+
right_side_ordered: bool,
11101116
}
11111117

11121118
impl RecordBatchStream for HashJoinStream {
@@ -1449,6 +1455,7 @@ impl HashJoinStream {
14491455
right_indices,
14501456
index_alignment_range_start..index_alignment_range_end,
14511457
self.join_type,
1458+
self.right_side_ordered,
14521459
);
14531460

14541461
let result = build_batch_from_indices(
@@ -1542,7 +1549,6 @@ impl Stream for HashJoinStream {
15421549

15431550
#[cfg(test)]
15441551
mod tests {
1545-
15461552
use super::*;
15471553
use crate::{
15481554
common, expressions::Column, memory::MemoryExec, repartition::RepartitionExec,

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,7 @@ fn join_left_and_right_batch(
604604
right_side,
605605
0..right_batch.num_rows(),
606606
join_type,
607+
false,
607608
);
608609

609610
build_batch_from_indices(
@@ -647,7 +648,6 @@ impl RecordBatchStream for NestedLoopJoinStream {
647648

648649
#[cfg(test)]
649650
mod tests {
650-
651651
use super::*;
652652
use crate::{
653653
common, expressions::Column, memory::MemoryExec, repartition::RepartitionExec,

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 107 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ use arrow::array::{
3434
UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder,
3535
};
3636
use arrow::compute;
37-
use arrow::datatypes::{Field, Schema, SchemaBuilder};
37+
use arrow::datatypes::{Field, Schema, SchemaBuilder, UInt32Type, UInt64Type};
3838
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
39+
use arrow_array::builder::UInt64Builder;
3940
use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray};
4041
use arrow_buffer::ArrowNativeType;
4142
use datafusion_common::cast::as_boolean_array;
@@ -1284,6 +1285,7 @@ pub(crate) fn adjust_indices_by_join_type(
12841285
right_indices: UInt32Array,
12851286
adjust_range: Range<usize>,
12861287
join_type: JoinType,
1288+
preserve_order_for_right: bool,
12871289
) -> (UInt64Array, UInt32Array) {
12881290
match join_type {
12891291
JoinType::Inner => {
@@ -1295,12 +1297,17 @@ pub(crate) fn adjust_indices_by_join_type(
12951297
(left_indices, right_indices)
12961298
// unmatched left row will be produced in the end of loop, and it has been set in the left visited bitmap
12971299
}
1298-
JoinType::Right | JoinType::Full => {
1299-
// matched
1300-
// unmatched right row will be produced in this batch
1301-
let right_unmatched_indices = get_anti_indices(adjust_range, &right_indices);
1300+
JoinType::Right => {
13021301
// combine the matched and unmatched right result together
1303-
append_right_indices(left_indices, right_indices, right_unmatched_indices)
1302+
append_right_indices(
1303+
left_indices,
1304+
right_indices,
1305+
adjust_range,
1306+
preserve_order_for_right,
1307+
)
1308+
}
1309+
JoinType::Full => {
1310+
append_right_indices(left_indices, right_indices, adjust_range, false)
13041311
}
13051312
JoinType::RightSemi => {
13061313
// need to remove the duplicated record in the right side
@@ -1326,30 +1333,48 @@ pub(crate) fn adjust_indices_by_join_type(
13261333
}
13271334
}
13281335

1329-
/// Appends the `right_unmatched_indices` to the `right_indices`,
1330-
/// and fills Null to tail of `left_indices` to
1331-
/// keep the length of `right_indices` and `left_indices` consistent.
1336+
/// Appends right indices to left indices based on the specified order mode.
1337+
///
1338+
/// The function operates in two modes:
1339+
/// 1. If `preserve_order_for_right` is true, probe matched and unmatched indices
1340+
/// are inserted in order using the `append_probe_indices_in_order()` method.
1341+
/// 2. Otherwise, unmatched probe indices are simply appended after matched ones.
1342+
///
1343+
/// # Parameters
1344+
/// - `left_indices`: UInt64Array of left indices.
1345+
/// - `right_indices`: UInt32Array of right indices.
1346+
/// - `adjust_range`: Range to adjust the right indices.
1347+
/// - `preserve_order_for_right`: Boolean flag to determine the mode of operation.
1348+
///
1349+
/// # Returns
1350+
/// A tuple of updated `UInt64Array` and `UInt32Array`.
13321351
pub(crate) fn append_right_indices(
13331352
left_indices: UInt64Array,
13341353
right_indices: UInt32Array,
1335-
right_unmatched_indices: UInt32Array,
1354+
adjust_range: Range<usize>,
1355+
preserve_order_for_right: bool,
13361356
) -> (UInt64Array, UInt32Array) {
1337-
// left_indices, right_indices and right_unmatched_indices must not contain the null value
1338-
if right_unmatched_indices.is_empty() {
1339-
(left_indices, right_indices)
1357+
if preserve_order_for_right {
1358+
append_probe_indices_in_order(left_indices, right_indices, adjust_range)
13401359
} else {
1341-
let unmatched_size = right_unmatched_indices.len();
1342-
// the new left indices: left_indices + null array
1343-
// the new right indices: right_indices + right_unmatched_indices
1344-
let new_left_indices = left_indices
1345-
.iter()
1346-
.chain(std::iter::repeat(None).take(unmatched_size))
1347-
.collect::<UInt64Array>();
1348-
let new_right_indices = right_indices
1349-
.iter()
1350-
.chain(right_unmatched_indices.iter())
1351-
.collect::<UInt32Array>();
1352-
(new_left_indices, new_right_indices)
1360+
let right_unmatched_indices = get_anti_indices(adjust_range, &right_indices);
1361+
1362+
if right_unmatched_indices.is_empty() {
1363+
(left_indices, right_indices)
1364+
} else {
1365+
let unmatched_size = right_unmatched_indices.len();
1366+
// the new left indices: left_indices + null array
1367+
// the new right indices: right_indices + right_unmatched_indices
1368+
let new_left_indices = left_indices
1369+
.iter()
1370+
.chain(std::iter::repeat(None).take(unmatched_size))
1371+
.collect();
1372+
let new_right_indices = right_indices
1373+
.iter()
1374+
.chain(right_unmatched_indices.iter())
1375+
.collect();
1376+
(new_left_indices, new_right_indices)
1377+
}
13531378
}
13541379
}
13551380

@@ -1379,7 +1404,7 @@ where
13791404
.filter_map(|idx| {
13801405
(!bitmap.get_bit(idx - offset)).then_some(T::Native::from_usize(idx))
13811406
})
1382-
.collect::<PrimitiveArray<T>>()
1407+
.collect()
13831408
}
13841409

13851410
/// Returns intersection of `range` and `input_indices` omitting duplicates
@@ -1408,7 +1433,61 @@ where
14081433
.filter_map(|idx| {
14091434
(bitmap.get_bit(idx - offset)).then_some(T::Native::from_usize(idx))
14101435
})
1411-
.collect::<PrimitiveArray<T>>()
1436+
.collect()
1437+
}
1438+
1439+
/// Appends probe indices in order by considering the given build indices.
1440+
///
1441+
/// This function constructs new build and probe indices by iterating through
1442+
/// the provided indices, and appends any missing values between previous and
1443+
/// current probe index with a corresponding null build index.
1444+
///
1445+
/// # Parameters
1446+
///
1447+
/// - `build_indices`: `PrimitiveArray` of `UInt64Type` containing build indices.
1448+
/// - `probe_indices`: `PrimitiveArray` of `UInt32Type` containing probe indices.
1449+
/// - `range`: The range of indices to consider.
1450+
///
1451+
/// # Returns
1452+
///
1453+
/// A tuple of two arrays:
1454+
/// - A `PrimitiveArray` of `UInt64Type` with the newly constructed build indices.
1455+
/// - A `PrimitiveArray` of `UInt32Type` with the newly constructed probe indices.
1456+
fn append_probe_indices_in_order(
1457+
build_indices: PrimitiveArray<UInt64Type>,
1458+
probe_indices: PrimitiveArray<UInt32Type>,
1459+
range: Range<usize>,
1460+
) -> (PrimitiveArray<UInt64Type>, PrimitiveArray<UInt32Type>) {
1461+
// Builders for new indices:
1462+
let mut new_build_indices = UInt64Builder::new();
1463+
let mut new_probe_indices = UInt32Builder::new();
1464+
// Set previous index as the start index for the initial loop:
1465+
let mut prev_index = range.start as u32;
1466+
// Zip the two iterators.
1467+
debug_assert!(build_indices.len() == probe_indices.len());
1468+
for (build_index, probe_index) in build_indices
1469+
.values()
1470+
.into_iter()
1471+
.zip(probe_indices.values().into_iter())
1472+
{
1473+
// Append values between previous and current probe index with null build index:
1474+
for value in prev_index..*probe_index {
1475+
new_probe_indices.append_value(value);
1476+
new_build_indices.append_null();
1477+
}
1478+
// Append current indices:
1479+
new_probe_indices.append_value(*probe_index);
1480+
new_build_indices.append_value(*build_index);
1481+
// Set current probe index as previous for the next iteration:
1482+
prev_index = probe_index + 1;
1483+
}
1484+
// Append remaining probe indices after the last valid probe index with null build index.
1485+
for value in prev_index..range.end as u32 {
1486+
new_probe_indices.append_value(value);
1487+
new_build_indices.append_null();
1488+
}
1489+
// Build arrays and return:
1490+
(new_build_indices.finish(), new_probe_indices.finish())
14121491
}
14131492

14141493
/// Metrics for build & probe joins
@@ -2475,7 +2554,7 @@ mod tests {
24752554
&on_columns,
24762555
left_columns_len,
24772556
maintains_input_order,
2478-
probe_side
2557+
probe_side,
24792558
),
24802559
expected[i]
24812560
);

0 commit comments

Comments
 (0)