Skip to content

Commit 2e3707e

Browse files
authored
fix: projection for CooperativeExec and CoalesceBatchesExec (#19400)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> close #19398 ## Rationale for this change see issue #19398 ## What changes are included in this PR? impl `try_swapping_with_projection` for `CooperativeExec` and `CoalesceBatchesExec` ## Are these changes tested? add test case ## Are there any user-facing changes?
1 parent 91cfb69 commit 2e3707e

File tree

3 files changed

+128
-1
lines changed

3 files changed

+128
-1
lines changed

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ use datafusion_physical_expr_common::sort_expr::{
4545
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4646
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
4747
use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown;
48+
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
4849
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
50+
use datafusion_physical_plan::coop::CooperativeExec;
4951
use datafusion_physical_plan::filter::FilterExec;
5052
use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
5153
use datafusion_physical_plan::joins::{
@@ -1677,3 +1679,102 @@ fn test_partition_col_projection_pushdown_expr() -> Result<()> {
16771679

16781680
Ok(())
16791681
}
1682+
1683+
#[test]
1684+
fn test_coalesce_batches_after_projection() -> Result<()> {
1685+
let csv = create_simple_csv_exec();
1686+
let filter = Arc::new(FilterExec::try_new(
1687+
Arc::new(BinaryExpr::new(
1688+
Arc::new(Column::new("c", 2)),
1689+
Operator::Gt,
1690+
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1691+
)),
1692+
csv,
1693+
)?);
1694+
let coalesce_batches: Arc<dyn ExecutionPlan> =
1695+
Arc::new(CoalesceBatchesExec::new(filter, 8192));
1696+
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
1697+
vec![
1698+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
1699+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
1700+
],
1701+
coalesce_batches,
1702+
)?);
1703+
1704+
let initial = displayable(projection.as_ref()).indent(true).to_string();
1705+
let actual = initial.trim();
1706+
1707+
assert_snapshot!(
1708+
actual,
1709+
@r"
1710+
ProjectionExec: expr=[a@0 as a, b@1 as b]
1711+
CoalesceBatchesExec: target_batch_size=8192
1712+
FilterExec: c@2 > 0
1713+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1714+
"
1715+
);
1716+
1717+
let after_optimize =
1718+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1719+
1720+
let after_optimize_string = displayable(after_optimize.as_ref())
1721+
.indent(true)
1722+
.to_string();
1723+
let actual = after_optimize_string.trim();
1724+
1725+
// Projection should be pushed down through CoalesceBatchesExec
1726+
assert_snapshot!(
1727+
actual,
1728+
@r"
1729+
CoalesceBatchesExec: target_batch_size=8192
1730+
FilterExec: c@2 > 0, projection=[a@0, b@1]
1731+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1732+
"
1733+
);
1734+
1735+
Ok(())
1736+
}
1737+
1738+
#[test]
1739+
fn test_cooperative_exec_after_projection() -> Result<()> {
1740+
let csv = create_simple_csv_exec();
1741+
let cooperative: Arc<dyn ExecutionPlan> = Arc::new(CooperativeExec::new(csv));
1742+
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
1743+
vec![
1744+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
1745+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
1746+
],
1747+
cooperative,
1748+
)?);
1749+
1750+
let initial = displayable(projection.as_ref()).indent(true).to_string();
1751+
let actual = initial.trim();
1752+
1753+
assert_snapshot!(
1754+
actual,
1755+
@r"
1756+
ProjectionExec: expr=[a@0 as a, b@1 as b]
1757+
CooperativeExec
1758+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1759+
"
1760+
);
1761+
1762+
let after_optimize =
1763+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1764+
1765+
let after_optimize_string = displayable(after_optimize.as_ref())
1766+
.indent(true)
1767+
.to_string();
1768+
let actual = after_optimize_string.trim();
1769+
1770+
// Projection should be pushed down through CooperativeExec
1771+
assert_snapshot!(
1772+
actual,
1773+
@r"
1774+
CooperativeExec
1775+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b], file_type=csv, has_header=false
1776+
"
1777+
);
1778+
1779+
Ok(())
1780+
}

datafusion/physical-plan/src/coalesce_batches.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::task::{Context, Poll};
2424

2525
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2626
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27+
use crate::projection::ProjectionExec;
2728
use crate::{
2829
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
2930
};
@@ -226,6 +227,18 @@ impl ExecutionPlan for CoalesceBatchesExec {
226227
CardinalityEffect::Equal
227228
}
228229

230+
fn try_swapping_with_projection(
231+
&self,
232+
projection: &ProjectionExec,
233+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
234+
match self.input.try_swapping_with_projection(projection)? {
235+
Some(new_input) => Ok(Some(
236+
Arc::new(self.clone()).with_new_children(vec![new_input])?,
237+
)),
238+
None => Ok(None),
239+
}
240+
}
241+
229242
fn gather_filters_for_pushdown(
230243
&self,
231244
_phase: FilterPushdownPhase,

datafusion/physical-plan/src/coop.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ use crate::filter_pushdown::{
7979
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
8080
FilterPushdownPropagation,
8181
};
82+
use crate::projection::ProjectionExec;
8283
use crate::{
8384
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
8485
SendableRecordBatchStream,
@@ -207,7 +208,7 @@ where
207208
/// An execution plan decorator that enables cooperative multitasking.
208209
/// It wraps the streams produced by its input execution plan using the [`make_cooperative`] function,
209210
/// which makes the stream participate in Tokio cooperative scheduling.
210-
#[derive(Debug)]
211+
#[derive(Debug, Clone)]
211212
pub struct CooperativeExec {
212213
input: Arc<dyn ExecutionPlan>,
213214
properties: PlanProperties,
@@ -298,6 +299,18 @@ impl ExecutionPlan for CooperativeExec {
298299
Equal
299300
}
300301

302+
fn try_swapping_with_projection(
303+
&self,
304+
projection: &ProjectionExec,
305+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
306+
match self.input.try_swapping_with_projection(projection)? {
307+
Some(new_input) => Ok(Some(
308+
Arc::new(self.clone()).with_new_children(vec![new_input])?,
309+
)),
310+
None => Ok(None),
311+
}
312+
}
313+
301314
fn gather_filters_for_pushdown(
302315
&self,
303316
_phase: FilterPushdownPhase,

0 commit comments

Comments
 (0)