Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
346f797
Prototype of reindex fully wired up in x-pack
masseyke Dec 3, 2024
37a2620
merging main
masseyke Dec 3, 2024
e549475
merging main
masseyke Dec 4, 2024
8986744
Adding an assertion to the upgrade test
masseyke Dec 5, 2024
abd3eb4
excluding headers from api requests
masseyke Dec 5, 2024
cd6fac8
merging main
masseyke Dec 6, 2024
ab0b47b
deleting ReindexDataStreamClient
masseyke Dec 6, 2024
d13258e
merging main
masseyke Dec 9, 2024
8c71cfe
moving RestReindexDataStreamAction
masseyke Dec 9, 2024
480ea11
using ModifyDataStreamsAction to swap indices
masseyke Dec 10, 2024
c3f703b
Merge branch 'main' into reindex-data-stream-fully-wired-up-xpack
masseyke Dec 10, 2024
e097504
removing SwapDataStreamIndexAction
masseyke Dec 10, 2024
147f1de
merging main
masseyke Dec 10, 2024
b7330a8
removing renamed action
masseyke Dec 10, 2024
bbacdd2
deleting RestGetReindexDataStreamStatusAction
masseyke Dec 10, 2024
2a26e0a
Merge branch 'main' into reindex-data-stream-fully-wired-up-xpack
masseyke Dec 10, 2024
6b1231b
Merge branch 'main' into reindex-data-stream-fully-wired-up-xpack
masseyke Dec 11, 2024
a5528c5
updating the upgrade test
masseyke Dec 11, 2024
e0bd400
Merge branch 'main' into reindex-data-stream-fully-wired-up-xpack
masseyke Dec 11, 2024
4124b16
merging main
masseyke Dec 12, 2024
09d67f0
fixing the merge
masseyke Dec 12, 2024
2e71ce0
Copying over Parkers work, adding rollover, fixing bug
masseyke Dec 13, 2024
c41bf85
satisfying checkstyle
masseyke Dec 13, 2024
c330e31
merging main
masseyke Dec 13, 2024
1ff7bba
Limiting the number of indices that can be processed concurrently
masseyke Dec 13, 2024
0dbe121
Merge branch 'main' into data-stream-reindex-prototype-with-limiting
masseyke Dec 16, 2024
50e2344
changing concurrency to 1
masseyke Dec 17, 2024
cc0f02d
improving the upgrade test
masseyke Dec 17, 2024
0bb5b0c
adding a comment
masseyke Dec 17, 2024
3b2f962
Merge branch 'main' into data-stream-reindex-prototype-with-limiting
masseyke Dec 17, 2024
96601d9
shortening lines in the upgrade test
masseyke Dec 17, 2024
d610e9d
making data stream and template name unique
masseyke Dec 17, 2024
ebdd921
[CI] Auto commit changes from spotless
Dec 17, 2024
bc4c3d8
Merge branch 'main' into data-stream-reindex-prototype-with-limiting
masseyke Dec 17, 2024
fa3db8e
Fixing upgrade test when the original cluster is the current version
masseyke Dec 17, 2024
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.support.CountDownActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand All @@ -20,9 +27,13 @@
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexAction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;

Expand Down Expand Up @@ -72,22 +83,112 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
reindexClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = response.getDataStreams();
if (dataStreamInfos.size() == 1) {
List<Index> indices = dataStreamInfos.getFirst().getDataStream().getIndices();
List<Index> indicesToBeReindexed = indices.stream()
.filter(getOldIndexVersionPredicate(clusterService.state().metadata()))
.toList();
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
for (Index index : indicesToBeReindexed) {
reindexDataStreamTask.incrementInProgressIndicesCount();
// TODO This is just a placeholder. This is where the real data stream reindex logic will go
reindexDataStreamTask.reindexSucceeded();
DataStream dataStream = dataStreamInfos.getFirst().getDataStream();
if (getOldIndexVersionPredicate(clusterService.state().metadata()).test(dataStream.getWriteIndex())) {
reindexClient.execute(
RolloverAction.INSTANCE,
new RolloverRequest(sourceDataStream, null),
ActionListener.wrap(
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream),
e -> completeFailedPersistentTask(reindexDataStreamTask, e)
)
);
} else {
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream);
}

completeSuccessfulPersistentTask(reindexDataStreamTask);
} else {
completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist"));
}
}, reindexDataStreamTask::markAsFailed));
}, exception -> completeFailedPersistentTask(reindexDataStreamTask, exception)));
}

private void reindexIndices(
DataStream dataStream,
ReindexDataStreamTask reindexDataStreamTask,
ExecuteWithHeadersClient reindexClient,
String sourceDataStream
) {
List<Index> indices = dataStream.getIndices();
List<Index> indicesToBeReindexed = indices.stream()
.filter(getOldIndexVersionPredicate(clusterService.state().metadata()))
.filter(index -> index.getName().equals(dataStream.getWriteIndex().getName()) == false)
.toList();
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
// The CountDownActionListener is 1 more than the number of indices so that the count is not 0 if we have no indices
CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> {
completeSuccessfulPersistentTask(reindexDataStreamTask);
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, exception); }));
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
final int maxConcurrentIndices = 1;
for (int i = 0; i < maxConcurrentIndices; i++) {
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
}
// This takes care of the additional latch count referenced above:
listener.onResponse(null);
}

private void maybeProcessNextIndex(
List<Index> indicesRemaining,
ReindexDataStreamTask reindexDataStreamTask,
ExecuteWithHeadersClient reindexClient,
String sourceDataStream,
CountDownActionListener listener
) {
if (indicesRemaining.isEmpty()) {
return;
}
Index index;
try {
index = indicesRemaining.removeFirst();
} catch (NoSuchElementException e) {
return;
}
reindexDataStreamTask.incrementInProgressIndicesCount();
reindexClient.execute(
ReindexDataStreamIndexAction.INSTANCE,
new ReindexDataStreamIndexAction.Request(index.getName()),
ActionListener.wrap(response1 -> {
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {
reindexDataStreamTask.reindexSucceeded();
listener.onResponse(null);
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
}), reindexClient);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
})
);
}

private void updateDataStream(
String dataStream,
String oldIndex,
String newIndex,
ActionListener<Void> listener,
ExecuteWithHeadersClient reindexClient
) {
reindexClient.execute(
ModifyDataStreamsAction.INSTANCE,
new ModifyDataStreamsAction.Request(
TimeValue.MAX_VALUE,
TimeValue.MAX_VALUE,
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
}

private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
Expand All @@ -105,6 +206,9 @@ private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) {
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(
reindexDataStreamTask.getPersistentTaskId()
);
if (persistentTask == null) {
return TimeValue.timeValueMillis(0);
}
PersistentTaskState state = persistentTask.getState();
final long completionTime;
if (state == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,99 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Strings;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.upgrades.IndexingIT.assertCount;
import static org.hamcrest.Matchers.equalTo;

public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {

static final String TEMPLATE = """
{
"settings":{
"index": {
"mode": "time_series"
}
},
"mappings":{
"dynamic_templates": [
{
"labels": {
"path_match": "pod.labels.*",
"mapping": {
"type": "keyword",
"time_series_dimension": true
}
}
}
],
"properties": {
"@timestamp" : {
"type": "date"
},
"metricset": {
"type": "keyword",
"time_series_dimension": true
},
"k8s": {
"properties": {
"pod": {
"properties": {
"name": {
"type": "keyword"
},
"network": {
"properties": {
"tx": {
"type": "long"
},
"rx": {
"type": "long"
}
}
}
}
}
}
}
}
}
}
""";

private static final String BULK = """
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "network": {"tx": 2001818691, "rx": 802133794}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "hamster", "network": {"tx": 2005177954, "rx": 801479970}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cow", "network": {"tx": 2006223737, "rx": 802337279}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "rat", "network": {"tx": 2012916202, "rx": 803685721}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "network": {"tx": 1434521831, "rx": 530575198}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "tiger", "network": {"tx": 1434577921, "rx": 530600088}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "lion", "network": {"tx": 1434587694, "rx": 530604797}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "elephant", "network": {"tx": 1434595272, "rx": 530605511}}}}
""";;

public void testDataStreams() throws IOException {
if (CLUSTER_TYPE == ClusterType.OLD) {
String requestBody = """
Expand Down Expand Up @@ -164,4 +245,102 @@ public void testDataStreamValidationDoesNotBreakUpgrade() throws Exception {
}
}

public void testUpgradeDataStream() throws Exception {
String dataStreamName = "reindex_test_data_stream";
int numRollovers = 10;
if (CLUSTER_TYPE == ClusterType.OLD) {
final String INDEX_TEMPLATE = """
{
"index_patterns": ["$PATTERN"],
"template": $TEMPLATE,
"data_stream": {
}
}""";
// Add composable index template
String templateName = "reindex_test_data_stream_template";
var putIndexTemplateRequest = new Request("POST", "/_index_template/" + templateName);
putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$TEMPLATE", TEMPLATE).replace("$PATTERN", dataStreamName));
assertOK(client().performRequest(putIndexTemplateRequest));
performOldClusterOperations(templateName, dataStreamName, numRollovers);
} else if (CLUSTER_TYPE == ClusterType.MIXED) {
// nothing
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
performUpgradedClusterOperations(dataStreamName, numRollovers);
}
}

private void performUpgradedClusterOperations(String dataStreamName, int numRollovers) throws Exception {
Request reindexRequest = new Request("POST", "/_migration/reindex");
reindexRequest.setJsonEntity(Strings.format("""
{
"mode": "upgrade",
"source": {
"index": "%s"
}
}""", dataStreamName));
Response reindexResponse = client().performRequest(reindexRequest);
assertOK(reindexResponse);
assertBusy(() -> {
Request statusRequest = new Request("GET", "_migration/reindex/" + dataStreamName + "/_status");
Response statusResponse = client().performRequest(statusRequest);
Map<String, Object> statusResponseMap = XContentHelper.convertToMap(
JsonXContent.jsonXContent,
statusResponse.getEntity().getContent(),
false
);
assertOK(statusResponse);
assertThat(statusResponseMap.get("complete"), equalTo(true));
if (isOriginalClusterCurrent()) {
// If the original cluster was the same as this one, we don't want any indices reindexed:
assertThat(statusResponseMap.get("successes"), equalTo(0));
} else {
assertThat(statusResponseMap.get("successes"), equalTo(numRollovers + 1));
}
}, 60, TimeUnit.SECONDS);
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
Response cancelResponse = client().performRequest(cancelRequest);
assertOK(cancelResponse);
}

private static void performOldClusterOperations(String templateName, String dataStreamName, int numRollovers) throws IOException {
bulkLoadData(dataStreamName);
for (int i = 0; i < numRollovers; i++) {
rollover(dataStreamName);
bulkLoadData(dataStreamName);
}

}

private static void bulkLoadData(String dataStreamName) throws IOException {
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
var response = client().performRequest(bulkRequest);
assertOK(response);
}

private static void rollover(String dataStreamName) throws IOException {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
Response rolloverResponse = client().performRequest(rolloverRequest);
assertOK(rolloverResponse);
}

private static Map<String, Object> getDataStream(String dataStreamName) throws IOException {
var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName);
var response = client().performRequest(getDataStreamsRequest);
assertOK(response);
return entityAsMap(response);
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

private static void assertSearch(String dataStreamName, int expectedHitCount) throws IOException {
var searchRequest = new Request("GET", dataStreamName + "/_search");
var response = client().performRequest(searchRequest);
assertOK(response);
var responseBody = entityAsMap(response);
assertThat(ObjectPath.evaluate(responseBody, "hits.total.value"), equalTo(expectedHitCount));
}

}