Skip to content

Commit e7aef53

Browse files
authored
Expand start and end time to nanoseconds during coordinator rewrite when needed (#96035)
Expand index.time_series.start_time and end_time to nanoseconds if timestamp field's resolution is set to nanoseconds. When creating coordinator rewrite context. Closes #96030
1 parent 98bc34a commit e7aef53

File tree

6 files changed

+79
-92
lines changed

6 files changed

+79
-92
lines changed

docs/changelog/96035.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 96035
2+
summary: Expand start and end time to nanoseconds during coordinator rewrite when
3+
needed
4+
area: TSDB
5+
type: bug
6+
issues:
7+
- 96030

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ public class TSDBIndexingIT extends ESSingleNodeTestCase {
4545
{
4646
"_doc":{
4747
"properties": {
48+
"@timestamp" : {
49+
"type": "date"
50+
},
4851
"metricset": {
4952
"type": "keyword",
5053
"time_series_dimension": true
@@ -86,28 +89,18 @@ protected Settings nodeSettings() {
8689
}
8790

8891
public void testTimeRanges() throws Exception {
89-
var mappingTemplate = """
90-
{
91-
"_doc":{
92-
"properties": {
93-
"metricset": {
94-
"type": "keyword",
95-
"time_series_dimension": true
96-
}
97-
}
98-
}
99-
}""";
10092
var templateSettings = Settings.builder().put("index.mode", "time_series");
10193
if (randomBoolean()) {
10294
templateSettings.put("index.routing_path", "metricset");
10395
}
96+
var mapping = new CompressedXContent(randomBoolean() ? MAPPING_TEMPLATE : MAPPING_TEMPLATE.replace("date", "date_nanos"));
10497

10598
if (randomBoolean()) {
10699
var request = new PutComposableIndexTemplateAction.Request("id");
107100
request.indexTemplate(
108101
new ComposableIndexTemplate(
109102
List.of("k8s*"),
110-
new Template(templateSettings.build(), new CompressedXContent(mappingTemplate), null),
103+
new Template(templateSettings.build(), mapping, null),
111104
null,
112105
null,
113106
null,
@@ -119,9 +112,7 @@ public void testTimeRanges() throws Exception {
119112
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
120113
} else {
121114
var putComponentTemplateRequest = new PutComponentTemplateAction.Request("1");
122-
putComponentTemplateRequest.componentTemplate(
123-
new ComponentTemplate(new Template(null, new CompressedXContent(mappingTemplate), null), null, null)
124-
);
115+
putComponentTemplateRequest.componentTemplate(new ComponentTemplate(new Template(null, mapping, null), null, null));
125116
client().execute(PutComponentTemplateAction.INSTANCE, putComponentTemplateRequest).actionGet();
126117

127118
var putTemplateRequest = new PutComposableIndexTemplateAction.Request("id");
@@ -376,13 +367,14 @@ public void testInvalidTsdbTemplatesMissingSettings() throws Exception {
376367

377368
public void testSkippingShards() throws Exception {
378369
Instant time = Instant.now();
370+
var mapping = new CompressedXContent(randomBoolean() ? MAPPING_TEMPLATE : MAPPING_TEMPLATE.replace("date", "date_nanos"));
379371
{
380372
var templateSettings = Settings.builder().put("index.mode", "time_series").put("index.routing_path", "metricset").build();
381373
var request = new PutComposableIndexTemplateAction.Request("id1");
382374
request.indexTemplate(
383375
new ComposableIndexTemplate(
384376
List.of("pattern-1"),
385-
new Template(templateSettings, new CompressedXContent(MAPPING_TEMPLATE), null),
377+
new Template(templateSettings, mapping, null),
386378
null,
387379
null,
388380
null,
@@ -401,7 +393,7 @@ public void testSkippingShards() throws Exception {
401393
request.indexTemplate(
402394
new ComposableIndexTemplate(
403395
List.of("pattern-2"),
404-
new Template(null, new CompressedXContent(MAPPING_TEMPLATE), null),
396+
new Template(null, mapping, null),
405397
null,
406398
null,
407399
null,

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java

Lines changed: 41 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.time.Instant;
2020
import java.time.temporal.ChronoUnit;
2121
import java.util.HashSet;
22-
import java.util.List;
2322
import java.util.Map;
2423
import java.util.Set;
2524

@@ -213,77 +212,19 @@ public void setup() throws IOException {
213212
}
214213

215214
public void testTsdbDataStreams() throws Exception {
216-
var bulkRequest = new Request("POST", "/k8s/_bulk");
217-
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
218-
bulkRequest.addParameter("refresh", "true");
219-
var response = client().performRequest(bulkRequest);
220-
assertOK(response);
221-
var responseBody = entityAsMap(response);
222-
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
223-
224-
var getDataStreamsRequest = new Request("GET", "/_data_stream");
225-
response = client().performRequest(getDataStreamsRequest);
226-
assertOK(response);
227-
var dataStreams = entityAsMap(response);
228-
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
229-
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
230-
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
231-
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("1"));
232-
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
233-
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
234-
assertThat(firstBackingIndex, backingIndexEqualTo("k8s", 1));
235-
236-
var indices = getIndex(firstBackingIndex);
237-
var escapedBackingIndex = firstBackingIndex.replace(".", "\\.");
238-
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
239-
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series"));
240-
String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
241-
assertThat(startTimeFirstBackingIndex, notNullValue());
242-
String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
243-
assertThat(endTimeFirstBackingIndex, notNullValue());
244-
List<?> routingPaths = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.routing_path");
245-
assertThat(routingPaths, containsInAnyOrder("metricset", "k8s.pod.uid", "pod.labels.*"));
246-
247-
var rolloverRequest = new Request("POST", "/k8s/_rollover");
248-
assertOK(client().performRequest(rolloverRequest));
249-
250-
response = client().performRequest(getDataStreamsRequest);
251-
assertOK(response);
252-
dataStreams = entityAsMap(response);
253-
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
254-
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(2));
255-
String secondBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.1.index_name");
256-
assertThat(secondBackingIndex, backingIndexEqualTo("k8s", 2));
257-
258-
indices = getIndex(secondBackingIndex);
259-
escapedBackingIndex = secondBackingIndex.replace(".", "\\.");
260-
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
261-
String startTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
262-
assertThat(startTimeSecondBackingIndex, equalTo(endTimeFirstBackingIndex));
263-
String endTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
264-
assertThat(endTimeSecondBackingIndex, notNullValue());
265-
266-
var indexRequest = new Request("POST", "/k8s/_doc");
267-
Instant time = parseInstant(startTimeFirstBackingIndex);
268-
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
269-
response = client().performRequest(indexRequest);
270-
assertOK(response);
271-
assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex));
272-
273-
indexRequest = new Request("POST", "/k8s/_doc");
274-
time = parseInstant(endTimeSecondBackingIndex).minusMillis(1);
275-
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
276-
response = client().performRequest(indexRequest);
277-
assertOK(response);
278-
assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex));
215+
assertTsdbDataStream();
279216
}
280217

281218
public void testTsdbDataStreamsNanos() throws Exception {
282-
// Create a template
219+
// Overwrite template to use date_nanos field type:
283220
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
284221
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE.replace("date", "date_nanos"));
285222
assertOK(client().performRequest(putComposableIndexTemplateRequest));
286223

224+
assertTsdbDataStream();
225+
}
226+
227+
private void assertTsdbDataStream() throws IOException {
287228
var bulkRequest = new Request("POST", "/k8s/_bulk");
288229
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstantNanos(Instant.now())));
289230
bulkRequest.addParameter("refresh", "true");
@@ -333,18 +274,53 @@ public void testTsdbDataStreamsNanos() throws Exception {
333274
assertThat(endTimeSecondBackingIndex, notNullValue());
334275

335276
var indexRequest = new Request("POST", "/k8s/_doc");
277+
indexRequest.addParameter("refresh", "true");
336278
Instant time = parseInstant(startTimeFirstBackingIndex);
337279
indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time)));
338280
response = client().performRequest(indexRequest);
339281
assertOK(response);
340282
assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex));
341283

342284
indexRequest = new Request("POST", "/k8s/_doc");
285+
indexRequest.addParameter("refresh", "true");
343286
time = parseInstant(endTimeSecondBackingIndex).minusMillis(1);
344287
indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time)));
345288
response = client().performRequest(indexRequest);
346289
assertOK(response);
347290
assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex));
291+
292+
var searchRequest = new Request("GET", "k8s/_search");
293+
searchRequest.setJsonEntity("""
294+
{
295+
"query": {
296+
"range":{
297+
"@timestamp":{
298+
"gte": "now-7d",
299+
"lte": "now+7d"
300+
}
301+
}
302+
},
303+
"sort": [
304+
{
305+
"@timestamp": {
306+
"order": "desc"
307+
}
308+
}
309+
]
310+
}
311+
""");
312+
response = client().performRequest(searchRequest);
313+
assertOK(response);
314+
responseBody = entityAsMap(response);
315+
try {
316+
assertThat(ObjectPath.evaluate(responseBody, "hits.total.value"), equalTo(10));
317+
assertThat(ObjectPath.evaluate(responseBody, "hits.total.relation"), equalTo("eq"));
318+
assertThat(ObjectPath.evaluate(responseBody, "hits.hits.0._index"), equalTo(secondBackingIndex));
319+
assertThat(ObjectPath.evaluate(responseBody, "hits.hits.1._index"), equalTo(firstBackingIndex));
320+
} catch (Exception | AssertionError e) {
321+
logger.error("search response body causing assertion error [" + responseBody + "]", e);
322+
throw e;
323+
}
348324
}
349325

350326
public void testSimulateTsdbDataStreamTemplate() throws Exception {

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.index.Index;
4444
import org.elasticsearch.index.IndexMode;
4545
import org.elasticsearch.index.IndexSettings;
46+
import org.elasticsearch.index.mapper.DateFieldMapper;
4647
import org.elasticsearch.index.mapper.MapperService;
4748
import org.elasticsearch.index.seqno.SequenceNumbers;
4849
import org.elasticsearch.index.shard.IndexLongFieldRange;
@@ -1297,14 +1298,27 @@ public IndexLongFieldRange getTimestampRange() {
12971298
}
12981299

12991300
/**
1301+
* @return whether this index has a time series timestamp range
1302+
*/
1303+
public boolean hasTimeSeriesTimestampRange() {
1304+
return indexMode != null && indexMode.getTimestampBound(this) != null;
1305+
}
1306+
1307+
/**
1308+
* @param dateFieldType the date field type of '@timestamp' field which is
1309+
* used to convert the start and end times recorded in index metadata
1310+
* to the right format that is being used by '@timestamp' field.
1311+
* For example, the '@timestamp' can be configured with nanosecond precision.
13001312
* @return the time range this index represents if this index is in time series mode.
13011313
* Otherwise <code>null</code> is returned.
13021314
*/
13031315
@Nullable
1304-
public IndexLongFieldRange getTimeSeriesTimestampRange() {
1316+
public IndexLongFieldRange getTimeSeriesTimestampRange(DateFieldMapper.DateFieldType dateFieldType) {
13051317
var bounds = indexMode != null ? indexMode.getTimestampBound(this) : null;
13061318
if (bounds != null) {
1307-
return IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(bounds.startTime(), bounds.endTime()));
1319+
long start = dateFieldType.resolution().convert(Instant.ofEpochMilli(bounds.startTime()));
1320+
long end = dateFieldType.resolution().convert(Instant.ofEpochMilli(bounds.endTime()));
1321+
return IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(start, end));
13081322
} else {
13091323
return null;
13101324
}

server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,18 @@ public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) {
4949
if (indexMetadata == null) {
5050
return null;
5151
}
52+
DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index);
53+
if (dateFieldType == null) {
54+
return null;
55+
}
5256
IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange();
5357
if (timestampRange.containsAllShardRanges() == false) {
54-
timestampRange = indexMetadata.getTimeSeriesTimestampRange();
58+
timestampRange = indexMetadata.getTimeSeriesTimestampRange(dateFieldType);
5559
if (timestampRange == null) {
5660
return null;
5761
}
5862
}
5963

60-
DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index);
61-
62-
if (dateFieldType == null) {
63-
return null;
64-
}
65-
6664
return new CoordinatorRewriteContext(parserConfig, client, nowInMillis, timestampRange, dateFieldType);
6765
}
6866
}

server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ private static boolean hasUsefulTimestampField(IndexMetadata indexMetadata) {
141141
return false;
142142
}
143143

144-
if (indexMetadata.getTimeSeriesTimestampRange() != null) {
144+
if (indexMetadata.hasTimeSeriesTimestampRange()) {
145145
// Tsdb indices have @timestamp field and index.time_series.start_time / index.time_series.end_time range
146146
return true;
147147
}

0 commit comments

Comments
 (0)