Skip to content

Commit df59704

Browse files
Fix time-series geo_line to include reduce phase in MergedGeoLines (#96953)
Recently we added support for line-simplification in geo-lines when in time-series aggregations: #94954. This work did not, however, cover the case of `MergedGeoLines`, which occurs when the same geo_line covers multiple shards on data nodes, and needs to be merged on the coordinating node. The original work would use line-simplification and time-ordering on the data node, but then revert to re-sorting (unnecessary) and truncation (incorrect) on the coordinating node. This PR rectifies that, and adds two fields to the InternalGeoLine: * nonOverlapping * re-sorting is *NOT* required because the sort value ranges do not overlap, so we only sort the set of incoming geo_lines (easy/fast) instead of every single point (complex/slow). * simplified * The approach to memory limiting is by line-simplification instead of truncation. * If the data nodes are doing line-simplification, we want the coordinating node to do that as well For time-series, both of the above are true, and at this point all other cases will have both false. The reason for two booleans is: * They are not necessarily related concepts * We consider supporting line-simplification as a user-requested option in the non-time-series geo_line in future (much nicer than truncation) Fixes #96983
1 parent bdd38c6 commit df59704

File tree

10 files changed

+486
-147
lines changed

10 files changed

+486
-147
lines changed

docs/changelog/96953.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 96953
2+
summary: Fix time-series geo_line to include reduce phase in MergedGeoLines
3+
area: Geo
4+
type: bug
5+
issues:
6+
- 96983

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,10 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId
146146
// 8.10.0
147147
public static final TransportVersion V_8_500_019 = registerTransportVersion(8_500_019, "09bae57f-cab8-423c-aab3-c9778509ffe3");
148148

149+
public static final TransportVersion V_8_500_020 = registerTransportVersion(8_500_020, "ECB42C26-B258-42E5-A835-E31AF84A76DE");
150+
149151
private static class CurrentHolder {
150-
private static final TransportVersion CURRENT = findCurrent(V_8_500_019);
152+
private static final TransportVersion CURRENT = findCurrent(V_8_500_020);
151153

152154
// finds the pluggable current version, or uses the given fallback
153155
private static TransportVersion findCurrent(TransportVersion fallback) {

x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public ScoreMode scoreMode() {
6161

6262
@Override
6363
public InternalAggregation buildEmptyAggregation() {
64-
return new InternalGeoLine(name, new long[0], new double[0], metadata(), true, includeSorts, sortOrder, size);
64+
return new InternalGeoLine(name, new long[0], new double[0], metadata(), true, includeSorts, sortOrder, size, true, false);
6565
}
6666

6767
static class Empty extends GeoLineAggregator {

x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineBucketedSort.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ InternalGeoLine buildAggregation(
6868
double[] sortVals = getSortValues(bucket);
6969
long[] bucketLine = getPoints(bucket);
7070
PathArraySorter.forOrder(sortOrder).apply(bucketLine, sortVals).sort();
71-
return new InternalGeoLine(name, bucketLine, sortVals, metadata, complete, includeSorts, sortOrder, size);
71+
return new InternalGeoLine(name, bucketLine, sortVals, metadata, complete, includeSorts, sortOrder, size, false, false);
7272
}
7373

7474
long sizeOf(long bucket) {

x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.xpack.spatial.search.aggregations;
88

99
import org.apache.lucene.geo.GeoEncodingUtils;
10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.common.Strings;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -30,12 +31,14 @@
3031
public class InternalGeoLine extends InternalAggregation implements GeoShapeMetricAggregation {
3132
private static final double SCALE = Math.pow(10, 6);
3233

33-
private long[] line;
34-
private double[] sortVals;
35-
private boolean complete;
36-
private boolean includeSorts;
37-
private SortOrder sortOrder;
38-
private int size;
34+
private final long[] line;
35+
private final double[] sortVals;
36+
private final boolean complete;
37+
private final boolean includeSorts;
38+
private final SortOrder sortOrder;
39+
private final int size;
40+
private final boolean nonOverlapping;
41+
private final boolean simplified;
3942

4043
/**
4144
* A geo_line representing the bucket for a {@link GeoLineAggregationBuilder}. The values of <code>line</code> and <code>sortVals</code>
@@ -49,6 +52,8 @@ public class InternalGeoLine extends InternalAggregation implements GeoShapeMetr
4952
* @param includeSorts true iff the sort-values should be rendered in xContent as properties of the line-string. False otherwise.
5053
* @param sortOrder the {@link SortOrder} for the line. Whether the points are to be plotted in asc or desc order
5154
* @param size the max length of the line-string.
55+
* @param nonOverlapping true iff the geo_line will not overlap with other geo_lines at reduce phase, allowing a simpler reduce.
56+
* @param simplified true iff the geo_line was created by line simplification (not truncation) so we should do so in reduce also.
5257
*/
5358
InternalGeoLine(
5459
String name,
@@ -58,7 +63,9 @@ public class InternalGeoLine extends InternalAggregation implements GeoShapeMetr
5863
boolean complete,
5964
boolean includeSorts,
6065
SortOrder sortOrder,
61-
int size
66+
int size,
67+
boolean nonOverlapping,
68+
boolean simplified
6269
) {
6370
super(name, metadata);
6471
this.line = line;
@@ -67,6 +74,8 @@ public class InternalGeoLine extends InternalAggregation implements GeoShapeMetr
6774
this.includeSorts = includeSorts;
6875
this.sortOrder = sortOrder;
6976
this.size = size;
77+
this.nonOverlapping = nonOverlapping;
78+
this.simplified = simplified;
7079
}
7180

7281
/**
@@ -80,6 +89,13 @@ public InternalGeoLine(StreamInput in) throws IOException {
8089
this.includeSorts = in.readBoolean();
8190
this.sortOrder = SortOrder.readFromStream(in);
8291
this.size = in.readVInt();
92+
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_020)) {
93+
nonOverlapping = in.readBoolean();
94+
simplified = in.readBoolean();
95+
} else {
96+
nonOverlapping = false;
97+
simplified = false;
98+
}
8399
}
84100

85101
@Override
@@ -90,6 +106,10 @@ protected void doWriteTo(StreamOutput out) throws IOException {
90106
out.writeBoolean(includeSorts);
91107
sortOrder.writeTo(out);
92108
out.writeVInt(size);
109+
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_020)) {
110+
out.writeBoolean(nonOverlapping);
111+
out.writeBoolean(simplified);
112+
}
93113
}
94114

95115
@Override
@@ -108,7 +128,9 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Aggreg
108128
reducedComplete &= mergedSize <= size;
109129
int finalSize = Math.min(mergedSize, size);
110130

111-
MergedGeoLines mergedGeoLines = new MergedGeoLines(internalGeoLines, finalSize, sortOrder);
131+
MergedGeoLines mergedGeoLines = nonOverlapping
132+
? new MergedGeoLines.NonOverlapping(internalGeoLines, finalSize, sortOrder, simplified)
133+
: new MergedGeoLines.Overlapping(internalGeoLines, finalSize, sortOrder, simplified);
112134
mergedGeoLines.merge();
113135
return new InternalGeoLine(
114136
name,
@@ -118,7 +140,9 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Aggreg
118140
reducedComplete,
119141
reducedIncludeSorts,
120142
sortOrder,
121-
size
143+
size,
144+
nonOverlapping,
145+
simplified
122146
);
123147
}
124148

0 commit comments

Comments
 (0)