Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/101845.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101845
summary: Introduce new endpoint to expose data stream lifecycle stats
area: Data streams
type: enhancement
issues: []
4 changes: 4 additions & 0 deletions docs/reference/data-streams/data-stream-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ preview:[]
preview:[]
* <<data-streams-explain-lifecycle,Explain data stream lifecycle>>
preview:[]
* <<data-streams-get-lifecycle-stats, Get data stream lifecycle stats>>
preview:[]

The following API is available for <<tsds,time series data streams>>:

Expand Down Expand Up @@ -55,4 +57,6 @@ include::{es-repo-dir}/data-streams/lifecycle/apis/delete-lifecycle.asciidoc[]

include::{es-repo-dir}/data-streams/lifecycle/apis/explain-lifecycle.asciidoc[]

include::{es-repo-dir}/data-streams/lifecycle/apis/get-lifecycle-stats.asciidoc[]

include::{es-repo-dir}/indices/downsample-data-stream.asciidoc[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
[[data-streams-get-lifecycle-stats]]
=== Get data stream lifecycle stats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice ! ❤️

++++
<titleabbrev>Get Data Stream Lifecycle</titleabbrev>
++++

preview::[]

Gets stats about the execution of data stream lifecycle.

[[get-lifecycle-stats-api-prereqs]]
==== {api-prereq-title}

* If the {es} {security-features} are enabled, you must have the `monitor` or
`manage` <<privileges-list-cluster,cluster privilege>> to use this API.

[[data-streams-get-lifecycle-stats-request]]
==== {api-request-title}

`GET _lifecycle/stats`

[[data-streams-get-lifecycle-stats-desc]]
==== {api-description-title}

Gets stats about the execution of the data stream lifecycle. The data stream level stats include only stats about data streams
managed by the data stream lifecycle.

[[get-lifecycle-stats-api-response-body]]
==== {api-response-body-title}

`last_run_duration_in_millis`::
(Optional, long)
The duration of the last data stream lifecycle execution.
`time_between_starts_in_millis`::
(Optional, long)
The time passed between the start of the last two data stream lifecycle executions. This should amount approximately to
<<data-streams-lifecycle-poll-interval,`data_streams.lifecycle.poll_interval`>>.
`data_stream_count`::
(integer)
The count of data streams currently being managed by the data stream lifecycle.
`data_streams`::
(array of objects)
Contains information about the retrieved data stream lifecycles.
+
.Properties of objects in `data_streams`
[%collapsible%open]
====
`name`::
(string)
The name of the data stream.
`backing_indices_in_total`::
(integer)
The count of the backing indices of this data stream that are managed by the data stream lifecycle.
`backing_indices_in_error`::
(integer)
The count of the backing indices of this data stream that are managed by the data stream lifecycle and have encountered an error.
====

[[data-streams-get-lifecycle-stats-example]]
==== {api-examples-title}

Let's retrieve the data stream lifecycle stats of a cluster that has already executed the lifecycle more than once:

[source,console]
--------------------------------------------------
GET _lifecycle/stats?human&pretty
--------------------------------------------------
// TEST[skip:this is for demonstration purposes only, we cannot ensure that DSL has run]

The response will look like the following:

[source,console-result]
--------------------------------------------------
{
"last_run_duration_in_millis": 2,
"last_run_duration": "2ms",
"time_between_starts_in_millis": 9998,
"time_between_starts": "9.99s",
"data_streams_count": 2,
"data_streams": [
{
"name": "my-data-stream",
"backing_indices_in_total": 2,
"backing_indices_in_error": 0
},
{
"name": "my-other-stream",
"backing_indices_in_total": 2,
"backing_indices_in_error": 1
}
]
}
--------------------------------------------------
Original file line number Diff line number Diff line change
Expand Up @@ -622,35 +622,6 @@ public void testDataLifecycleServiceConfiguresTheMergePolicy() throws Exception
});
}

private static List<String> getBackingIndices(String dataStreamName) {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
return getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().stream().map(Index::getName).toList();
}

static void indexDocs(String dataStream, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
bulkRequest.add(
new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE)
.source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, value), XContentType.JSON)
);
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat(bulkResponse.getItems().length, equalTo(numDocs));
String backingIndexPrefix = DataStream.BACKING_INDEX_PREFIX + dataStream;
for (BulkItemResponse itemResponse : bulkResponse) {
assertThat(itemResponse.getFailureMessage(), nullValue());
assertThat(itemResponse.status(), equalTo(RestStatus.CREATED));
assertThat(itemResponse.getIndex(), startsWith(backingIndexPrefix));
}
indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet();
}

public void testReenableDataStreamLifecycle() throws Exception {
// start with a lifecycle that's not enabled
DataStreamLifecycle lifecycle = new DataStreamLifecycle(null, null, false);
Expand Down Expand Up @@ -700,6 +671,35 @@ public void testReenableDataStreamLifecycle() throws Exception {
});
}

private static List<String> getBackingIndices(String dataStreamName) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved this helper function in the bottom to be after all the test cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this note, this is really helpful when reviewing ❤️

GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
return getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().stream().map(Index::getName).toList();
}

static void indexDocs(String dataStream, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
bulkRequest.add(
new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE)
.source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, value), XContentType.JSON)
);
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat(bulkResponse.getItems().length, equalTo(numDocs));
String backingIndexPrefix = DataStream.BACKING_INDEX_PREFIX + dataStream;
for (BulkItemResponse itemResponse : bulkResponse) {
assertThat(itemResponse.getFailureMessage(), nullValue());
assertThat(itemResponse.status(), equalTo(RestStatus.CREATED));
assertThat(itemResponse.getIndex(), startsWith(backingIndexPrefix));
}
indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet();
}

static void putComposableIndexTemplate(
String id,
@Nullable String mappings,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.datastreams.lifecycle;

import org.elasticsearch.client.Request;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.datastreams.DisabledSecurityDataStreamTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;

public class DataStreamLifecycleStatsIT extends DisabledSecurityDataStreamTestCase {

@Before
public void updateClusterSettings() throws IOException {
updateClusterSettings(
Settings.builder()
.put("data_streams.lifecycle.poll_interval", "1s")
.put("cluster.lifecycle.default.rollover", "min_docs=1,max_docs=1")
.build()
);
}

@After
public void cleanUp() throws IOException {
adminClient().performRequest(new Request("DELETE", "_data_stream/*?expand_wildcards=hidden"));
}

@SuppressWarnings("unchecked")
public void testStats() throws Exception {
// Check empty stats and wait until we have 2 executions
assertBusy(() -> {
Request request = new Request("GET", "/_lifecycle/stats");
Map<String, Object> response = entityAsMap(client().performRequest(request));
assertThat(response.get("data_stream_count"), is(0));
assertThat(response.get("data_streams"), is(List.of()));
assertThat(response.containsKey("last_run_duration_in_millis"), is(true));
assertThat(response.containsKey("time_between_starts_in_millis"), is(true));
});

// Create a template
Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity("""
{
"index_patterns": ["my-data-stream-*"],
"data_stream": {},
"template": {
"lifecycle": {}
}
}
""");
assertOK(client().performRequest(putComposableIndexTemplateRequest));

// Create two data streams with one doc each
Request createDocRequest = new Request("POST", "/my-data-stream-1/_doc?refresh=true");
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-12-12\"}");
assertOK(client().performRequest(createDocRequest));
createDocRequest = new Request("POST", "/my-data-stream-2/_doc?refresh=true");
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-12-12\"}");
assertOK(client().performRequest(createDocRequest));

Request request = new Request("GET", "/_lifecycle/stats");
Map<String, Object> response = entityAsMap(client().performRequest(request));
assertThat(response.get("data_stream_count"), is(2));
List<Map<String, Object>> dataStreams = (List<Map<String, Object>>) response.get("data_streams");
assertThat(dataStreams.get(0).get("name"), is("my-data-stream-1"));
assertThat((Integer) dataStreams.get(0).get("backing_indices_in_total"), greaterThanOrEqualTo(1));
assertThat((Integer) dataStreams.get(0).get("backing_indices_in_error"), is(0));
assertThat(dataStreams.get(1).get("name"), is("my-data-stream-2"));
assertThat((Integer) dataStreams.get(1).get("backing_indices_in_total"), greaterThanOrEqualTo(1));
assertThat((Integer) dataStreams.get(0).get("backing_indices_in_error"), is(0));
assertThat(response.containsKey("last_run_duration_in_millis"), is(true));
assertThat(response.containsKey("time_between_starts_in_millis"), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@
import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.action.ExplainDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamLifecycleStatsAction;
import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.action.TransportDeleteDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.action.TransportExplainDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.action.TransportGetDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.action.TransportGetDataStreamLifecycleStatsAction;
import org.elasticsearch.datastreams.lifecycle.action.TransportPutDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.rest.RestDataStreamLifecycleStatsAction;
import org.elasticsearch.datastreams.lifecycle.rest.RestDeleteDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.rest.RestExplainDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamLifecycleAction;
Expand Down Expand Up @@ -189,6 +192,7 @@ public Collection<?> createComponents(PluginServices services) {
actions.add(new ActionHandler<>(GetDataStreamLifecycleAction.INSTANCE, TransportGetDataStreamLifecycleAction.class));
actions.add(new ActionHandler<>(DeleteDataStreamLifecycleAction.INSTANCE, TransportDeleteDataStreamLifecycleAction.class));
actions.add(new ActionHandler<>(ExplainDataStreamLifecycleAction.INSTANCE, TransportExplainDataStreamLifecycleAction.class));
actions.add(new ActionHandler<>(GetDataStreamLifecycleStatsAction.INSTANCE, TransportGetDataStreamLifecycleStatsAction.class));
return actions;
}

Expand Down Expand Up @@ -218,6 +222,7 @@ public List<RestHandler> getRestHandlers(
handlers.add(new RestGetDataStreamLifecycleAction());
handlers.add(new RestDeleteDataStreamLifecycleAction());
handlers.add(new RestExplainDataStreamLifecycleAction());
handlers.add(new RestDataStreamLifecycleStatsAction());
return handlers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;

import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -87,7 +87,7 @@ public ErrorEntry getError(String indexName) {
/**
* Return an immutable view (a snapshot) of the tracked indices at the moment this method is called.
*/
public List<String> getAllIndices() {
return List.copyOf(indexNameToError.keySet());
public Set<String> getAllIndices() {
return Set.copyOf(indexNameToError.keySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
*/
private volatile int signallingErrorRetryInterval;

/**
* The following stats are tracking how the data stream lifecycle runs are performing time wise
*/
private volatile Long lastRunStartedAt = null;
private volatile Long lastRunDuration = null;
private volatile Long timeBetweenStarts = null;

private static final SimpleBatchedExecutor<UpdateForceMergeCompleteTask, Void> FORCE_MERGE_STATE_UPDATE_TASK_EXECUTOR =
new SimpleBatchedExecutor<>() {
@Override
Expand Down Expand Up @@ -299,6 +306,11 @@ public void triggered(SchedulerEngine.Event event) {
*/
// default visibility for testing purposes
void run(ClusterState state) {
long startTime = nowSupplier.getAsLong();
if (lastRunStartedAt != null) {
timeBetweenStarts = startTime - lastRunStartedAt;
}
lastRunStartedAt = startTime;
int affectedIndices = 0;
int affectedDataStreams = 0;
for (DataStream dataStream : state.metadata().dataStreams().values()) {
Expand Down Expand Up @@ -396,8 +408,10 @@ void run(ClusterState state) {
affectedIndices += indicesToExcludeForRemainingRun.size();
affectedDataStreams++;
}
lastRunDuration = nowSupplier.getAsLong() - lastRunStartedAt;
logger.trace(
"Data stream lifecycle service performed operations on [{}] indices, part of [{}] data streams",
"Data stream lifecycle service run for {} and performed operations on [{}] indices, part of [{}] data streams",
TimeValue.timeValueMillis(lastRunDuration).toHumanReadableString(2),
affectedIndices,
affectedDataStreams
);
Expand Down Expand Up @@ -1193,6 +1207,22 @@ static TimeValue getRetentionConfiguration(DataStream dataStream) {
return dataStream.getLifecycle().getEffectiveDataRetention();
}

/**
* @return the duration of the last run in millis or null if the service hasn't completed a run yet.
*/
@Nullable
public Long getLastRunDuration() {
return lastRunDuration;
}

/**
* @return the time passed between the start times of the last two consecutive runs or null if the service hasn't started twice yet.
*/
@Nullable
public Long getTimeBetweenStarts() {
return timeBetweenStarts;
}

/**
* Action listener that records the encountered failure using the provided recordError callback for the
* provided target index. If the listener is notified of success it will clear the recorded entry for the provided
Expand Down
Loading