Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/94954.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 94954
summary: Asset tracking - geo_line for TSDB
area: Geo
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public abstract class StreamingGeometrySimplifier<T extends Geometry> {
protected int length;
protected int objCount = 0;
protected String description;
protected PointConstructor pointConstructor = new PointConstructor() {
};
protected PointResetter pointResetter = new PointResetter() {
};

protected final PriorityQueue<PointError> queue = new PriorityQueue<>();

Expand All @@ -63,16 +67,24 @@ public void reset() {

/**
* Consume a single point on the stream of points to be simplified.
* The internal PointError objects will be re-used from a pool of at most <code>maxPoints + 1</code> objects.
*/
public void consume(double x, double y) {
PointError pointError = makePointErrorFor(length, x, y);
consume(makePointErrorFor(length, x, y));
}

/**
* Consume a single point on the stream of points to be simplified.
* No memory management of the points passed in is done, so only use this call if you manage these objects from calling code.
*/
public void consume(PointError pointError) {
if (length > 1) {
// we need at least three points to calculate the error of the middle point
points[length - 1].error = calculator.calculateError(points[length - 2], points[length - 1], pointError);
queue.add(points[length - 1]);
}
if (length == maxPoints) {
// Remove point with lowest error
// Remove point with the lowest error
PointError toRemove = queue.remove();
removeAndAdd(toRemove.index, pointError);
notifyMonitorPointRemoved(toRemove);
Expand All @@ -88,20 +100,42 @@ public void consume(double x, double y) {
*/
public abstract T produce();

/**
* Override this method to use alternative objects within the algorithm.
* This is useful is calling code wants to make use of the algorithms maximum objects creation logic,
* while also using alternative objects.
*/
public interface PointConstructor {
default PointError newPoint(int index, double x, double y) {
return new PointError(index, x, y);
}
}

/**
* Override this method to use alternative objects within the algorithm.
* This is useful is calling code wants to make use of the algorithms maximum objects creation logic,
* while also using alternative objects.
*/
public interface PointResetter {
default PointError resetPoint(PointError point, int index, double x, double y) {
return point.reset(index, x, y);
}
}

private PointError makePointErrorFor(int index, double x, double y) {
if (index == maxPoints) {
if (lastRemoved == null) {
this.objCount++;
return new PointError(index, x, y);
return pointConstructor.newPoint(index, x, y);
} else {
return lastRemoved.reset(index, x, y);
return pointResetter.resetPoint(lastRemoved, index, x, y);
}
} else {
if (points[index] == null) {
this.objCount++;
return new PointError(index, x, y);
return pointConstructor.newPoint(index, x, y);
} else {
return points[index].reset(index, x, y);
return points[index] = pointResetter.resetPoint(points[index], index, x, y);
}
}
}
Expand Down Expand Up @@ -152,7 +186,7 @@ public static class PointError implements SimplificationErrorCalculator.PointLik
private double y;
double error = 0;

PointError(int index, double x, double y) {
protected PointError(int index, double x, double y) {
this.index = index;
this.x = x;
this.y = y;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ multi-valued fields unsupported:
- close_to: { aggregations.view_port.bounds.bottom_right.lat: { value: 48.860000, error: 0.00001 } }
- close_to: { aggregations.view_port.bounds.bottom_right.lon: { value: 4.914722, error: 0.00001 } }

---
"geo_centroid on position field":
- skip:
version: " - 8.7.99"
reason: position metric introduced in 8.8.0
features: close_to

- do:
search:
rest_total_hits_as_int: true
Expand All @@ -121,7 +128,6 @@ multi-valued fields unsupported:
- match: { aggregations.centroid.location.lon: 3.9662131341174245 }
- match: { aggregations.centroid.count: 6 }


---
"geo bounding box query":
- skip:
Expand Down Expand Up @@ -230,28 +236,6 @@ multi-valued fields unsupported:
lon: 5
- match: { hits.total.value: 4 }

---
"bounds agg":
- skip:
version: " - 8.7.99"
reason: position metric introduced in 8.8.0
features: close_to

- do:
search:
index: locations
body:
aggs:
bounds:
geo_bounds:
field: "location"
wrap_longitude: false
- match: { hits.total.value: 6 }
- close_to: { aggregations.bounds.bounds.top_left.lat: { value: 52.374081, error: 0.00001 } }
- close_to: { aggregations.bounds.bounds.top_left.lon: { value: 2.327000, error: 0.00001 } }
- close_to: { aggregations.bounds.bounds.bottom_right.lat: { value: 48.860000, error: 0.00001 } }
- close_to: { aggregations.bounds.bounds.bottom_right.lon: { value: 4.914722, error: 0.00001 } }

---
"geo_distance sort":
- skip:
Expand Down Expand Up @@ -282,7 +266,6 @@ multi-valued fields unsupported:
- close_to: { hits.hits.5._source.location.lat: { value: 48.860000, error: 0.00001 } }
- close_to: { hits.hits.5._source.location.lon: { value: 2.327000, error: 0.00001 } }


---
"distance_feature query":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,19 @@ public static void reverseSubArray(double[] array, int offset, int length) {
end--;
}
}

/**
* Reverse the {@code length} values on the array starting from {@code offset}.
*/
public static void reverseSubArray(long[] array, int offset, int length) {
int start = offset;
int end = offset + length;
while (end > start) {
final long scratch = array[start];
array[start] = array[end - 1];
array[end - 1] = scratch;
start++;
end--;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.elasticsearch.script.SortedNumericDocValuesLongFieldScript;
import org.elasticsearch.script.field.GeoPointDocValuesField;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.TimeSeriesValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.lookup.FieldValues;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.runtime.GeoPointScriptFieldDistanceFeatureQuery;
Expand Down Expand Up @@ -207,7 +209,8 @@ public FieldMapper build(MapperBuilderContext context) {
geoParser,
scriptValues(),
meta.get(),
metric.get()
metric.get(),
indexMode
);
if (this.script.get() == null) {
return new GeoPointFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo.build(), geoParser, this);
Expand Down Expand Up @@ -361,6 +364,7 @@ public static class GeoPointFieldType extends AbstractGeometryFieldType<GeoPoint
);

private final FieldValues<GeoPoint> scriptValues;
private final IndexMode indexMode;

private GeoPointFieldType(
String name,
Expand All @@ -370,16 +374,23 @@ private GeoPointFieldType(
Parser<GeoPoint> parser,
FieldValues<GeoPoint> scriptValues,
Map<String, String> meta,
TimeSeriesParams.MetricType metricType
TimeSeriesParams.MetricType metricType,
IndexMode indexMode
) {
super(name, indexed, stored, hasDocValues, parser, meta);
this.scriptValues = scriptValues;
this.metricType = metricType;
this.indexMode = indexMode;
}

// only used in test
public GeoPointFieldType(String name, TimeSeriesParams.MetricType metricType, IndexMode indexMode) {
this(name, true, false, true, null, null, Collections.emptyMap(), metricType, indexMode);
}

// only used in test
public GeoPointFieldType(String name) {
this(name, true, false, true, null, null, Collections.emptyMap(), null);
this(name, null, null);
}

@Override
Expand Down Expand Up @@ -443,8 +454,12 @@ public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext
failIfNoDocValues();
}

ValuesSourceType valuesSourceType = indexMode == IndexMode.TIME_SERIES && metricType == TimeSeriesParams.MetricType.POSITION
? TimeSeriesValuesSourceType.POSITION
: CoreValuesSourceType.GEOPOINT;

if ((operation == FielddataOperation.SEARCH || operation == FielddataOperation.SCRIPT) && hasDocValues()) {
return new LatLonPointIndexFieldData.Builder(name(), CoreValuesSourceType.GEOPOINT, GeoPointDocValuesField::new);
return new LatLonPointIndexFieldData.Builder(name(), valuesSourceType, GeoPointDocValuesField::new);
}

if (operation == FielddataOperation.SCRIPT) {
Expand All @@ -453,7 +468,7 @@ public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext

return new SourceValueFetcherMultiGeoPointIndexFieldData.Builder(
name(),
CoreValuesSourceType.GEOPOINT,
valuesSourceType,
valueFetcher(sourcePaths, null, null),
searchLookup,
GeoPointDocValuesField::new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.search.aggregations.NonCollectingAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.TimeSeriesValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -62,5 +63,6 @@ protected Aggregator doCreateInternal(Aggregator parent, CardinalityUpperBound c

static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(GeoBoundsAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.GEOPOINT, GeoBoundsAggregator::new, true);
builder.register(GeoBoundsAggregationBuilder.REGISTRY_KEY, TimeSeriesValuesSourceType.POSITION, GeoBoundsAggregator::new, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.search.aggregations.NonCollectingAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.TimeSeriesValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -58,5 +59,6 @@ protected Aggregator doCreateInternal(Aggregator parent, CardinalityUpperBound c

static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(GeoCentroidAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.GEOPOINT, GeoCentroidAggregator::new, true);
builder.register(GeoCentroidAggregationBuilder.REGISTRY_KEY, TimeSeriesValuesSourceType.POSITION, GeoCentroidAggregator::new, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.script.AggregationScript;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;

import java.util.Locale;

import static org.elasticsearch.search.aggregations.support.CoreValuesSourceType.GEOPOINT;

/**
* Holds {@link ValuesSourceType} implementations for time series fields
*/
Expand Down Expand Up @@ -52,6 +55,32 @@ public ValuesSource replaceMissing(
) {
throw new IllegalArgumentException("Cannot replace missing values for time-series counters");
}
},
POSITION {
@Override
public ValuesSource getEmpty() {
return ValuesSource.GeoPoint.EMPTY;
}

@Override
public ValuesSource getScript(AggregationScript.LeafFactory script, ValueType scriptValueType) {
throw new AggregationExecutionException("value source of type [" + this.value() + "] is not supported by scripts");
}

@Override
public ValuesSource getField(FieldContext fieldContext, AggregationScript.LeafFactory script) {
return GEOPOINT.getField(fieldContext, script);
}

@Override
public ValuesSource replaceMissing(
ValuesSource valuesSource,
Object rawMissing,
DocValueFormat docValueFormat,
AggregationContext context
) {
return GEOPOINT.replaceMissing(valuesSource, rawMissing, docValueFormat, context);
}
};

public static ValuesSourceType fromString(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class ValuesSourceConfig {

/**
* Given the query context and other information, decide on the input {@link ValuesSource} for this aggretation run, and construct a new
* Given the query context and other information, decide on the input {@link ValuesSource} for this aggregation run, and construct a new
* {@link ValuesSourceConfig} based on that {@link ValuesSourceType}
*
* @param context - the query context
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/spatial/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(path: ':modules:percolator')
testImplementation project(path: xpackModule('vector-tile'))
testImplementation project(path: ':modules:aggregations')
}

testClusters.configureEach {
Expand Down
Loading