Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b588e47
feature: include unit support for time series rate aggregation
salvatore-campagna Jun 6, 2023
4805ffd
Update docs/changelog/96605.yaml
salvatore-campagna Jun 6, 2023
aadecb2
test: missing week unit
salvatore-campagna Jun 6, 2023
176de63
Merge branch 'main' into feature/94630-rate-aggregation-unit
salvatore-campagna Jun 6, 2023
55ec0d2
fix: transport version id
salvatore-campagna Jun 6, 2023
c2dea03
fix: use default seconds unit
salvatore-campagna Jun 6, 2023
10005b6
Update docs/changelog/96605.yaml
salvatore-campagna Jun 6, 2023
e01ef1b
fix: no need for a variable
salvatore-campagna Jun 6, 2023
b9f517c
fix: null check not required
salvatore-campagna Jun 7, 2023
4508db3
Merge branch 'main' into feature/94630-rate-aggregation-unit
salvatore-campagna Jun 8, 2023
aaafc13
fix: checkstyle unnecessary newline
salvatore-campagna Jun 8, 2023
dfcdd91
Merge branch 'main' into feature/94630-rate-aggregation-unit
salvatore-campagna Jun 8, 2023
e112807
fix: calculate result rate using seconds not milliseconds
salvatore-campagna Jun 13, 2023
e47ff38
fix: test results
salvatore-campagna Jun 13, 2023
67f911c
fix: test results
salvatore-campagna Jun 13, 2023
32d6eab
Merge branch 'main' into feature/94630-rate-aggregation-unit
salvatore-campagna Jun 13, 2023
ab23393
fix: use new transport version
salvatore-campagna Jun 13, 2023
58dcc5b
fix: use new transport version
salvatore-campagna Jun 13, 2023
b592787
Merge branch 'main' into feature/94630-rate-aggregation-unit
salvatore-campagna Jun 19, 2023
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/96605.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 96605
summary: "Feature: include unit support for time series rate aggregation"
area: TSDB
type: enhancement
issues:
- 94630
4 changes: 3 additions & 1 deletion server/src/main/java/org/elasticsearch/TransportVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,17 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId
public static final TransportVersion V_8_500_006 = registerTransportVersion(8_500_006, "7BB5621A-80AC-425F-BA88-75543C442F23");
public static final TransportVersion V_8_500_007 = registerTransportVersion(8_500_007, "77261d43-4149-40af-89c5-7e71e0454fce");
public static final TransportVersion V_8_500_008 = registerTransportVersion(8_500_008, "8884ab9d-94cd-4bac-aff8-01f2c394f47c");

public static final TransportVersion V_8_500_009 = registerTransportVersion(8_500_009, "35091358-fd41-4106-a6e2-d2a1315494c1");
public static final TransportVersion V_8_500_010 = registerTransportVersion(8_500_010, "9818C628-1EEC-439B-B943-468F61460675");
public static final TransportVersion V_8_500_011 = registerTransportVersion(8_500_011, "2209F28D-B52E-4BC4-9889-E780F291C32E");
public static final TransportVersion V_8_500_012 = registerTransportVersion(8_500_012, "BB6F4AF1-A860-4FD4-A138-8150FFBE0ABD");
public static final TransportVersion V_8_500_013 = registerTransportVersion(8_500_013, "f65b85ac-db5e-4558-a487-a1dde4f6a33a");
public static final TransportVersion V_8_500_014 = registerTransportVersion(8_500_014, "D115A2E1-1739-4A02-AB7B-64F6EA157EFB");
public static final TransportVersion V_8_500_015 = registerTransportVersion(8_500_015, "651216c9-d54f-4189-9fe1-48d82d276863");

private static class CurrentHolder {
private static final TransportVersion CURRENT = findCurrent(V_8_500_014);
private static final TransportVersion CURRENT = findCurrent(V_8_500_015);

// finds the pluggable current version, or uses the given fallback
private static TransportVersion findCurrent(TransportVersion fallback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.analytics.rate;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
Expand All @@ -19,17 +21,21 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class InternalResetTrackingRate extends InternalNumericMetricsAggregation.SingleValue implements Rate {

public static final String NAME = "rate_with_resets";
private static final int MILLIS_IN_SECOND = 1_000;

private final double startValue;
private final double endValue;
private final long startTime;
private final long endTime;
private final double resetCompensation;

private final Rounding.DateTimeUnit rateUnit;

protected InternalResetTrackingRate(
String name,
DocValueFormat format,
Expand All @@ -38,14 +44,16 @@ protected InternalResetTrackingRate(
double endValue,
long startTime,
long endTime,
double resetCompensation
double resetCompensation,
Rounding.DateTimeUnit rateUnit
) {
super(name, format, metadata);
this.startValue = startValue;
this.endValue = endValue;
this.startTime = startTime;
this.endTime = endTime;
this.resetCompensation = resetCompensation;
this.rateUnit = Objects.requireNonNull(rateUnit);
}

public InternalResetTrackingRate(StreamInput in) throws IOException {
Expand All @@ -55,6 +63,11 @@ public InternalResetTrackingRate(StreamInput in) throws IOException {
this.startTime = in.readLong();
this.endTime = in.readLong();
this.resetCompensation = in.readDouble();
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_015)) {
this.rateUnit = Rounding.DateTimeUnit.resolve(in.readByte());
} else {
this.rateUnit = Rounding.DateTimeUnit.SECOND_OF_MINUTE;
}
}

@Override
Expand All @@ -69,6 +82,11 @@ protected void doWriteTo(StreamOutput out) throws IOException {
out.writeLong(startTime);
out.writeLong(endTime);
out.writeDouble(resetCompensation);
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_015) && rateUnit != null) {
out.writeByte(rateUnit.getId());
} else {
out.writeByte(Rounding.DateTimeUnit.SECOND_OF_MINUTE.getId());
}
}

@Override
Expand Down Expand Up @@ -98,7 +116,8 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Aggreg
endValue,
toReduce.get(0).startTime,
toReduce.get(endIndex).endTime,
resetComp
resetComp,
toReduce.get(0).rateUnit
);
}

Expand All @@ -109,7 +128,8 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th

@Override
public double value() {
return (endValue - startValue + resetCompensation) / (endTime - startTime);
long rateUnitSeconds = rateUnit.getField().getBaseUnit().getDuration().toSeconds();
return (endValue - startValue + resetCompensation) / (endTime - startTime) * MILLIS_IN_SECOND * rateUnitSeconds;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class TimeSeriesRateAggregator extends NumericMetricsAggregator.SingleVal
private double currentStartValue = -1;
private int currentTsid = -1;

private final Rounding.DateTimeUnit rateUnit;

// Unused parameters are so that the constructor implements `RateAggregatorSupplier`
protected TimeSeriesRateAggregator(
String name,
Expand All @@ -62,11 +64,12 @@ protected TimeSeriesRateAggregator(
this.startTimes = bigArrays().newLongArray(1, true);
this.endTimes = bigArrays().newLongArray(1, true);
this.resetCompensations = bigArrays().newDoubleArray(1, true);
this.rateUnit = rateUnit;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just do rateUnit == null ? Rounding.DateTimeUnit.SECOND_OF_MINUTE : rateUnit here?

}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalResetTrackingRate(name, DocValueFormat.RAW, metadata(), 0, 0, 0, 0, 0);
return new InternalResetTrackingRate(name, DocValueFormat.RAW, metadata(), 0, 0, 0, 0, 0, Rounding.DateTimeUnit.SECOND_OF_MINUTE);
}

private void calculateLastBucket() {
Expand Down Expand Up @@ -138,7 +141,8 @@ public InternalResetTrackingRate buildAggregation(long owningBucketOrd) {
endValues.get(owningBucketOrd),
startTimes.get(owningBucketOrd),
endTimes.get(owningBucketOrd),
resetCompensations.get(owningBucketOrd)
resetCompensations.get(owningBucketOrd),
rateUnit == null ? Rounding.DateTimeUnit.SECOND_OF_MINUTE : rateUnit
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.analytics.rate;

import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.Aggregation;
Expand All @@ -28,39 +29,77 @@

public class InternalResetTrackingRateTests extends InternalAggregationTestCase<InternalResetTrackingRate> {

private static final int MILLIS_IN_SECOND = 1_000;
private static final int SECONDS_IN_MINUTE = 60;
private static final int MILLIS_IN_MINUTE = MILLIS_IN_SECOND * SECONDS_IN_MINUTE;
private static final int MINUTES_IN_HOUR = 60;
private static final int MILLIS_IN_HOUR = MILLIS_IN_SECOND * SECONDS_IN_MINUTE * MINUTES_IN_HOUR;
private static final int HOURS_IN_DAY = 24;
private static final int MILLIS_IN_DAY = MILLIS_IN_SECOND * SECONDS_IN_MINUTE * MINUTES_IN_HOUR * HOURS_IN_DAY;
private static final int DAYS_IN_WEEK = 7;
private static final int MILLIS_IN_WEEK = MILLIS_IN_SECOND * SECONDS_IN_MINUTE * MINUTES_IN_HOUR * HOURS_IN_DAY * DAYS_IN_WEEK;
private static final int MONTHS_IN_QUARTER = 3;
private static final int MONTHS_IN_YEAR = 12;

@Override
protected SearchPlugin registerPlugin() {
return new AnalyticsPlugin();
}

@Override
protected InternalResetTrackingRate createTestInstance(String name, Map<String, Object> metadata) {
return new InternalResetTrackingRate(name, null, metadata, 0, 0, 0, 0, 0);
return new InternalResetTrackingRate(name, null, metadata, 0, 0, 0, 0, 0, Rounding.DateTimeUnit.SECOND_OF_MINUTE);
}

private static InternalResetTrackingRate rate(double startValue, double endValue, long startTime, long endTime, double resetComp) {
return new InternalResetTrackingRate("n", null, null, startValue, endValue, startTime, endTime, resetComp);
private static InternalResetTrackingRate rate(
double startValue,
double endValue,
long startTime,
long endTime,
double resetComp,
Rounding.DateTimeUnit rateUnit
) {
return new InternalResetTrackingRate("n", null, null, startValue, endValue, startTime, endTime, resetComp, rateUnit);
}

public void testReduction() {
List<InternalAggregation> rates = List.of(
rate(0, 10, 1000, 2000, 0),
rate(10, 20, 2000, 3000, 0),
rate(20, 5, 3000, 4000, 25), // internal reset
rate(5, 15, 4000, 5000, 0),
rate(0, 10, 5000, 6000, 0) // cross-boundary reset
);
InternalAggregation reduced = rates.get(0).reduce(rates, null);
assertThat(reduced, instanceOf(Rate.class));
assertThat(((Rate) reduced).getValue(), equalTo(0.01));
public void testReductionSecond() {
testReduction(Rounding.DateTimeUnit.SECOND_OF_MINUTE, 0.01 * MILLIS_IN_SECOND);
}

public void testReductionMinute() {
testReduction(Rounding.DateTimeUnit.MINUTES_OF_HOUR, 0.01 * MILLIS_IN_MINUTE);
}

public void testReductionHour() {
testReduction(Rounding.DateTimeUnit.HOUR_OF_DAY, 0.01 * MILLIS_IN_HOUR);
}

public void testReductionDay() {
testReduction(Rounding.DateTimeUnit.DAY_OF_MONTH, 0.01 * MILLIS_IN_DAY);
}

public void testReductionWeek() {
testReduction(Rounding.DateTimeUnit.WEEK_OF_WEEKYEAR, 0.01 * MILLIS_IN_WEEK);
}

public void testReductionMonth() {
testReduction(Rounding.DateTimeUnit.MONTH_OF_YEAR, 26297.46 * MILLIS_IN_SECOND);
}

public void testReductionQuarter() {
testReduction(Rounding.DateTimeUnit.QUARTER_OF_YEAR, 26297.46 * MILLIS_IN_SECOND * MONTHS_IN_QUARTER);
}

public void testReductionYear() {
testReduction(Rounding.DateTimeUnit.YEAR_OF_CENTURY, 26297.46 * MILLIS_IN_SECOND * MONTHS_IN_YEAR);
}

@Override
protected void assertReduced(InternalResetTrackingRate reduced, List<InternalResetTrackingRate> inputs) {
for (InternalResetTrackingRate input : inputs) {
assertEquals(0.01f, input.getValue(), 0.001);
assertEquals(0.01f * MILLIS_IN_SECOND, input.getValue(), 0.01);
}
assertEquals(0.01f, reduced.getValue(), 0.001);
assertEquals(0.01f * MILLIS_IN_SECOND, reduced.getValue(), 0.01);
}

// Buckets must always be in-order so that we can detect resets between consecutive buckets
Expand Down Expand Up @@ -88,7 +127,7 @@ protected BuilderAndToReduce<InternalResetTrackingRate> randomResultsToReduce(St
currentValue = 0;
}
if (randomInt(45) == 0) {
internalRates.add(rate(startValue, currentValue, startTime, endTime, resetComp));
internalRates.add(rate(startValue, currentValue, startTime, endTime, resetComp, Rounding.DateTimeUnit.SECOND_OF_MINUTE));
startValue = currentValue;
resetComp = 0;
startTime = endTime;
Expand All @@ -98,7 +137,7 @@ protected BuilderAndToReduce<InternalResetTrackingRate> randomResultsToReduce(St
endTime += 1000;
currentValue += 10;
}
internalRates.add(rate(startValue, currentValue, startTime, endTime, resetComp));
internalRates.add(rate(startValue, currentValue, startTime, endTime, resetComp, Rounding.DateTimeUnit.SECOND_OF_MINUTE));
return new BuilderAndToReduce<>(mock(RateAggregationBuilder.class), internalRates);
}

Expand All @@ -119,12 +158,42 @@ protected List<NamedXContentRegistry.Entry> getNamedXContents() {
}

public void testIncludes() {
InternalResetTrackingRate big = new InternalResetTrackingRate("n", null, null, 0, 0, 1000, 3000, 0);
InternalResetTrackingRate small = new InternalResetTrackingRate("n", null, null, 0, 0, 1500, 2500, 0);
InternalResetTrackingRate big = new InternalResetTrackingRate(
"n",
null,
null,
0,
0,
1000,
3000,
0,
Rounding.DateTimeUnit.SECOND_OF_MINUTE
);
InternalResetTrackingRate small = new InternalResetTrackingRate(
"n",
null,
null,
0,
0,
1500,
2500,
0,
Rounding.DateTimeUnit.SECOND_OF_MINUTE
);
assertTrue(big.includes(small));
assertFalse(small.includes(big));

InternalResetTrackingRate unrelated = new InternalResetTrackingRate("n", null, null, 0, 0, 100000, 1000010, 0);
InternalResetTrackingRate unrelated = new InternalResetTrackingRate(
"n",
null,
null,
0,
0,
100000,
1000010,
0,
Rounding.DateTimeUnit.SECOND_OF_MINUTE
);
assertFalse(big.includes(unrelated));
assertFalse(unrelated.includes(big));
assertFalse(small.includes(unrelated));
Expand All @@ -135,4 +204,17 @@ public void testIncludes() {
protected InternalResetTrackingRate mutateInstance(InternalResetTrackingRate instance) {
return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929
}

private static void testReduction(final Rounding.DateTimeUnit dateTimeUnit, double operand) {
List<InternalAggregation> rates = List.of(
rate(0, 10, 1000, 2000, 0, dateTimeUnit),
rate(10, 20, 2000, 3000, 0, dateTimeUnit),
rate(20, 5, 3000, 4000, 25, dateTimeUnit), // internal reset
rate(5, 15, 4000, 5000, 0, dateTimeUnit),
rate(0, 10, 5000, 6000, 0, dateTimeUnit) // cross-boundary reset
);
InternalAggregation reduced = rates.get(0).reduce(rates, null);
assertThat(reduced, instanceOf(Rate.class));
assertThat(((Rate) reduced).getValue(), equalTo(operand));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

public class TimeSeriesRateAggregatorTests extends AggregatorTestCase {

private static final int MILLIS_IN_SECOND = 1_000;

@Override
protected List<SearchPlugin> getSearchPlugins() {
return List.of(new AggregationsPlugin(), new AnalyticsPlugin());
Expand All @@ -51,8 +53,14 @@ public void testSimple() throws IOException {
tsBuilder.subAggregation(builder);
Consumer<InternalTimeSeries> verifier = r -> {
assertThat(r.getBuckets(), hasSize(2));
assertThat(((Rate) r.getBucketByKey("{dim=1}").getAggregations().asList().get(0)).getValue(), closeTo(59.0 / 3000.0, 0.00001));
assertThat(((Rate) r.getBucketByKey("{dim=2}").getAggregations().asList().get(0)).getValue(), closeTo(206.0 / 4000.0, 0.00001));
assertThat(
((Rate) r.getBucketByKey("{dim=1}").getAggregations().asList().get(0)).getValue(),
closeTo(59.0 / 3000.0 * MILLIS_IN_SECOND, 0.00001)
);
assertThat(
((Rate) r.getBucketByKey("{dim=2}").getAggregations().asList().get(0)).getValue(),
closeTo(206.0 / 4000.0 * MILLIS_IN_SECOND, 0.00001)
);
};
AggTestConfig aggTestConfig = new AggTestConfig(tsBuilder, timeStampField(), counterField("counter_field"))
.withSplitLeavesIntoSeperateAggregators(false);
Expand All @@ -77,20 +85,20 @@ public void testNestedWithinDateHistogram() throws IOException {
InternalDateHistogram hb = r.getBucketByKey("{dim=1}").getAggregations().get("date");
{
Rate rate = hb.getBuckets().get(1).getAggregations().get("counter_field");
assertThat(rate.getValue(), closeTo((60 - 37 + 14) / 2000.0, 0.00001));
assertThat(rate.getValue(), closeTo((60 - 37 + 14) / 2000.0 * MILLIS_IN_SECOND, 0.00001));
}
{
Rate rate = hb.getBuckets().get(0).getAggregations().get("counter_field");
assertThat(rate.getValue(), closeTo((37 - 15) / 1000.0, 0.00001));
assertThat(rate.getValue(), closeTo((37 - 15) / 1000.0 * MILLIS_IN_SECOND, 0.00001));
}
hb = r.getBucketByKey("{dim=2}").getAggregations().get("date");
{
Rate rate = hb.getBuckets().get(0).getAggregations().get("counter_field");
assertThat(rate.getValue(), closeTo((150 - 74) / 1000.0, 0.00001));
assertThat(rate.getValue(), closeTo((150 - 74) / 1000.0 * MILLIS_IN_SECOND, 0.00001));
}
{
Rate rate = hb.getBuckets().get(1).getAggregations().get("counter_field");
assertThat(rate.getValue(), closeTo(90 / 2000.0, 0.00001));
assertThat(rate.getValue(), closeTo(90 / 2000.0 * MILLIS_IN_SECOND, 0.00001));
}
};

Expand Down
Loading