Skip to content

Commit 20f02a9

Browse files
committed
Reduction
1 parent 2895f1e commit 20f02a9

File tree

2 files changed

+36
-67
lines changed

2 files changed

+36
-67
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,16 @@
2929
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer;
3030
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
3131
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer;
32-
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
3332
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
3433
import org.elasticsearch.xpack.esql.plan.logical.Filter;
35-
import org.elasticsearch.xpack.esql.plan.logical.Limit;
36-
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
37-
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
38-
import org.elasticsearch.xpack.esql.plan.logical.TopN;
39-
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
4034
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
4135
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
4236
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
4337
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
4438
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
4539
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
4640
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
47-
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
48-
import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
4941
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
50-
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
5142
import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
5243
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
5344
import org.elasticsearch.xpack.esql.session.Configuration;
@@ -83,29 +74,25 @@ public static Tuple<PhysicalPlan, PhysicalPlan> breakPlanBetweenCoordinatorAndDa
8374
return new Tuple<>(coordinatorPlan, dataNodePlan.get());
8475
}
8576

86-
public static PhysicalPlan dataNodeReductionPlan(LogicalPlan plan, PhysicalPlan unused) {
87-
var pipelineBreakers = plan.collectFirstChildren(Mapper::isPipelineBreaker);
77+
public static PhysicalPlan reductionPlan(PhysicalPlan plan) {
78+
// find the logical fragment
79+
var fragments = plan.collectFirstChildren(p -> p instanceof FragmentExec);
80+
if (fragments.isEmpty()) {
81+
return null;
82+
}
83+
final FragmentExec fragment = (FragmentExec) fragments.getFirst();
8884

89-
if (pipelineBreakers.isEmpty() == false) {
90-
UnaryPlan pipelineBreaker = (UnaryPlan) pipelineBreakers.get(0);
91-
if (pipelineBreaker instanceof TopN) {
92-
LocalMapper mapper = new LocalMapper();
93-
var physicalPlan = EstimatesRowSize.estimateRowSize(0, mapper.map(plan));
94-
return physicalPlan.collectFirstChildren(TopNExec.class::isInstance).get(0);
95-
} else if (pipelineBreaker instanceof Limit limit) {
96-
return new LimitExec(limit.source(), unused, limit.limit());
97-
} else if (pipelineBreaker instanceof OrderBy order) {
98-
return new OrderExec(order.source(), unused, order.order());
99-
} else if (pipelineBreaker instanceof Aggregate) {
100-
LocalMapper mapper = new LocalMapper();
101-
var physicalPlan = EstimatesRowSize.estimateRowSize(0, mapper.map(plan));
102-
var aggregate = (AggregateExec) physicalPlan.collectFirstChildren(AggregateExec.class::isInstance).get(0);
103-
return aggregate.withMode(AggregatorMode.INITIAL);
104-
} else {
105-
throw new EsqlIllegalArgumentException("unsupported unary physical plan node [" + pipelineBreaker.nodeName() + "]");
106-
}
85+
final var pipelineBreakers = fragment.fragment().collectFirstChildren(Mapper::isPipelineBreaker);
86+
if (pipelineBreakers.isEmpty()) {
87+
return null;
88+
}
89+
final var pipelineBreaker = pipelineBreakers.getFirst();
90+
final LocalMapper mapper = new LocalMapper();
91+
PhysicalPlan reducePlan = mapper.map(pipelineBreaker);
92+
if (reducePlan instanceof AggregateExec agg) {
93+
reducePlan = agg.withMode(AggregatorMode.INITIAL); // force to emit intermediate outputs
10794
}
108-
return null;
95+
return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), reducePlan);
10996
}
11097

11198
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 19 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,10 @@
6060
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
6161
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
6262
import org.elasticsearch.xpack.esql.core.expression.Attribute;
63-
import org.elasticsearch.xpack.esql.core.util.Holder;
6463
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
6564
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
6665
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
6766
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
68-
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
6967
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
7068
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
7169
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
@@ -780,35 +778,24 @@ private void runComputeOnDataNode(
780778
}
781779
}
782780

781+
private static PhysicalPlan reductionPlan(ExchangeSinkExec plan, boolean enable) {
782+
PhysicalPlan reducePlan = new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg());
783+
if (enable) {
784+
PhysicalPlan p = PlannerUtils.reductionPlan(plan);
785+
if (p != null) {
786+
reducePlan = p.replaceChildren(List.of(reducePlan));
787+
}
788+
}
789+
return new ExchangeSinkExec(plan.source(), plan.output(), plan.isIntermediateAgg(), reducePlan);
790+
}
791+
783792
private class DataNodeRequestHandler implements TransportRequestHandler<DataNodeRequest> {
784793
@Override
785794
public void messageReceived(DataNodeRequest request, TransportChannel channel, Task task) {
786795
final ActionListener<ComputeResponse> listener = new ChannelActionListener<>(channel);
787-
final ExchangeSinkExec reducePlan;
796+
final PhysicalPlan reductionPlan;
788797
if (request.plan() instanceof ExchangeSinkExec plan) {
789-
var fragments = plan.collectFirstChildren(FragmentExec.class::isInstance);
790-
if (fragments.isEmpty()) {
791-
listener.onFailure(new IllegalStateException("expected a fragment plan for a remote compute; got " + request.plan()));
792-
return;
793-
}
794-
var localExchangeSource = new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg());
795-
Holder<PhysicalPlan> reducePlanHolder = new Holder<>();
796-
if (request.pragmas().nodeLevelReduction()) {
797-
PhysicalPlan dataNodePlan = request.plan();
798-
request.plan()
799-
.forEachUp(
800-
FragmentExec.class,
801-
f -> { reducePlanHolder.set(PlannerUtils.dataNodeReductionPlan(f.fragment(), dataNodePlan)); }
802-
);
803-
}
804-
reducePlan = new ExchangeSinkExec(
805-
plan.source(),
806-
plan.output(),
807-
plan.isIntermediateAgg(),
808-
reducePlanHolder.get() != null
809-
? reducePlanHolder.get().replaceChildren(List.of(localExchangeSource))
810-
: localExchangeSource
811-
);
798+
reductionPlan = reductionPlan(plan, request.pragmas().nodeLevelReduction());
812799
} else {
813800
listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + request.plan()));
814801
return;
@@ -825,7 +812,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
825812
request.indicesOptions()
826813
);
827814
try (var computeListener = ComputeListener.create(transportService, (CancellableTask) task, listener)) {
828-
runComputeOnDataNode((CancellableTask) task, sessionId, reducePlan, request, computeListener);
815+
runComputeOnDataNode((CancellableTask) task, sessionId, reductionPlan, request, computeListener);
829816
}
830817
}
831818
}
@@ -871,10 +858,10 @@ public void messageReceived(ClusterComputeRequest request, TransportChannel chan
871858
* Performs a compute on a remote cluster. The output pages are placed in an exchange sink specified by
872859
* {@code globalSessionId}. The coordinator on the main cluster will poll pages from there.
873860
* <p>
874-
* Currently, the coordinator on the remote cluster simply collects pages from data nodes in the remote cluster
875-
* and places them in the exchange sink. We can achieve this by using a single exchange buffer to minimize overhead.
876-
* However, here we use two exchange buffers so that we can run an actual plan on this coordinator to perform partial
877-
* reduce operations, such as limit, topN, and partial-to-partial aggregation in the future.
861+
* Currently, the coordinator on the remote cluster polls pages from data nodes within the remote cluster
862+
* and performs cluster-level reduction before sending pages to the querying cluster. This reduction aims
863+
* to minimize data transfers across clusters but may require additional CPU resources for operations like
864+
* aggregations.
878865
*/
879866
void runComputeOnRemoteCluster(
880867
String clusterAlias,
@@ -892,19 +879,14 @@ void runComputeOnRemoteCluster(
892879
() -> exchangeService.finishSinkHandler(globalSessionId, new TaskCancelledException(parentTask.getReasonCancelled()))
893880
);
894881
final String localSessionId = clusterAlias + ":" + globalSessionId;
882+
final PhysicalPlan coordinatorPlan = reductionPlan(plan, true);
895883
var exchangeSource = new ExchangeSourceHandler(
896884
configuration.pragmas().exchangeBufferSize(),
897885
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
898886
computeListener.acquireAvoid()
899887
);
900888
try (Releasable ignored = exchangeSource.addEmptySink()) {
901889
exchangeSink.addCompletionListener(computeListener.acquireAvoid());
902-
PhysicalPlan coordinatorPlan = new ExchangeSinkExec(
903-
plan.source(),
904-
plan.output(),
905-
plan.isIntermediateAgg(),
906-
new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg())
907-
);
908890
runCompute(
909891
parentTask,
910892
new ComputeContext(localSessionId, clusterAlias, List.of(), configuration, exchangeSource, exchangeSink),

0 commit comments

Comments
 (0)