Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120231.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120231
summary: Add sanity check to `ReindexDatastreamIndexAction`
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -25,12 +29,14 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -108,6 +114,7 @@ protected void doExecute(
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> copyOldSourceSettingsToDest(settingsBefore, destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> sanityCheck(sourceIndexName, destIndexName, l, taskId))
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
.addListener(listener);
}
Expand Down Expand Up @@ -188,21 +195,6 @@ private void reindex(String sourceIndexName, String destIndexName, ActionListene
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
}

private void addBlockIfFromSource(
IndexMetadata.APIBlock block,
Settings settingsBefore,
String destIndexName,
ActionListener<AddIndexBlockResponse> listener,
TaskId parentTaskId
) {
if (settingsBefore.getAsBoolean(block.settingName(), false)) {
var errorMessage = String.format(Locale.ROOT, "Add [%s] block to index [%s] was not acknowledged", block.name(), destIndexName);
addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage), parentTaskId);
} else {
listener.onResponse(null);
}
}

private void updateSettings(
String index,
Settings.Builder settings,
Expand Down Expand Up @@ -270,4 +262,50 @@ private void addBlockToIndex(
addIndexBlockRequest.setParentTask(parentTaskId);
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest, listener);
}

private void getIndexDocCount(String index, TaskId parentTaskId, ActionListener<Long> listener) {
SearchRequest countRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
countRequest.allowPartialSearchResults(false);
countRequest.source(searchSourceBuilder);
countRequest.setParentTask(parentTaskId);
client.search(countRequest, listener.delegateFailure((delegate, response) -> {
var totalHits = response.getHits().getTotalHits();
assert totalHits.relation() == TotalHits.Relation.EQUAL_TO;
delegate.onResponse(totalHits.value());
}));
}

private void sanityCheck(
String sourceIndexName,
String destIndexName,
ActionListener<AcknowledgedResponse> listener,
TaskId parentTaskId
) {
if (Assertions.ENABLED) {
logger.debug("Comparing source [{}] and dest [{}] doc counts", sourceIndexName, destIndexName);
client.execute(
RefreshAction.INSTANCE,
new RefreshRequest(destIndexName),
listener.delegateFailureAndWrap((delegate, ignored) -> {
getIndexDocCount(sourceIndexName, parentTaskId, delegate.delegateFailureAndWrap((delegate1, sourceCount) -> {
getIndexDocCount(destIndexName, parentTaskId, delegate1.delegateFailureAndWrap((delegate2, destCount) -> {
assert sourceCount == destCount
: String.format(
Locale.ROOT,
"source index [%s] has %d docs and dest [%s] has %d docs",
sourceIndexName,
sourceCount,
destIndexName,
destCount
);
delegate2.onResponse(null);
}));
}));
})
);
} else {
listener.onResponse(null);
}
}
}