Skip to content

Commit bf701c9

Browse files
authored
Ignore closed indices for reindex (#120244)
1 parent f0ba669 commit bf701c9

File tree

5 files changed

+224
-50
lines changed

5 files changed

+224
-50
lines changed

docs/changelog/120244.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120244
2+
summary: Ignore closed indices for reindex
3+
area: Data streams
4+
type: enhancement
5+
issues: []

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public static Predicate<Index> getReindexRequiredPredicate(Metadata metadata) {
3636
public static boolean reindexRequired(IndexMetadata indexMetadata) {
3737
return creationVersionBeforeMinimumWritableVersion(indexMetadata)
3838
&& isNotSearchableSnapshot(indexMetadata)
39+
&& isNotClosed(indexMetadata)
3940
&& isNotVerifiedReadOnly(indexMetadata);
4041
}
4142

@@ -52,4 +53,8 @@ private static boolean creationVersionBeforeMinimumWritableVersion(IndexMetadata
5253
return metadata.getCreationVersion().before(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE);
5354
}
5455

56+
private static boolean isNotClosed(IndexMetadata indexMetadata) {
57+
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE) == false;
58+
}
59+
5560
}

x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecksTests.java

Lines changed: 153 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -41,56 +41,79 @@ public void testOldIndicesCheck() {
4141
int oldIndexCount = randomIntBetween(1, 100);
4242
int newIndexCount = randomIntBetween(1, 100);
4343

44-
List<Index> allIndices = new ArrayList<>();
4544
Map<String, IndexMetadata> nameToIndexMetadata = new HashMap<>();
4645
Set<String> expectedIndices = new HashSet<>();
4746

48-
for (int i = 0; i < oldIndexCount; i++) {
49-
Settings.Builder settings = settings(IndexVersion.fromId(7170099));
47+
DataStream dataStream = createTestDataStream(oldIndexCount, 0, newIndexCount, 0, nameToIndexMetadata, expectedIndices);
5048

51-
String indexName = "old-data-stream-index-" + i;
52-
if (expectedIndices.isEmpty() == false && randomIntBetween(0, 2) == 0) {
53-
settings.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE);
54-
} else {
55-
expectedIndices.add(indexName);
56-
}
49+
Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build();
50+
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build();
5751

58-
Settings.Builder settingsBuilder = settings;
59-
IndexMetadata oldIndexMetadata = IndexMetadata.builder(indexName)
60-
.settings(settingsBuilder)
61-
.numberOfShards(1)
62-
.numberOfReplicas(0)
63-
.build();
64-
allIndices.add(oldIndexMetadata.getIndex());
65-
nameToIndexMetadata.put(oldIndexMetadata.getIndex().getName(), oldIndexMetadata);
66-
}
52+
DeprecationIssue expected = new DeprecationIssue(
53+
DeprecationIssue.Level.CRITICAL,
54+
"Old data stream with a compatibility version < 9.0",
55+
"https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html",
56+
"This data stream has backing indices that were created before Elasticsearch 9.0.0",
57+
false,
58+
ofEntries(
59+
entry("reindex_required", true),
60+
entry("total_backing_indices", oldIndexCount + newIndexCount),
61+
entry("indices_requiring_upgrade_count", expectedIndices.size()),
62+
entry("indices_requiring_upgrade", expectedIndices)
63+
)
64+
);
6765

68-
for (int i = 0; i < newIndexCount; i++) {
69-
Settings.Builder settingsBuilder = settings(IndexVersion.current());
70-
IndexMetadata newIndexMetadata = IndexMetadata.builder("new-data-stream-index-" + i)
71-
.settings(settingsBuilder)
72-
.numberOfShards(1)
73-
.numberOfReplicas(0)
74-
.build();
75-
allIndices.add(newIndexMetadata.getIndex());
76-
nameToIndexMetadata.put(newIndexMetadata.getIndex().getName(), newIndexMetadata);
77-
}
66+
List<DeprecationIssue> issues = DeprecationChecks.filterChecks(DATA_STREAM_CHECKS, c -> c.apply(dataStream, clusterState));
7867

79-
DataStream dataStream = new DataStream(
80-
randomAlphaOfLength(10),
81-
allIndices,
82-
randomNegativeLong(),
83-
Map.of(),
84-
randomBoolean(),
85-
false,
86-
false,
87-
randomBoolean(),
88-
randomFrom(IndexMode.values()),
89-
null,
90-
randomFrom(DataStreamOptions.EMPTY, DataStreamOptions.FAILURE_STORE_DISABLED, DataStreamOptions.FAILURE_STORE_ENABLED, null),
91-
List.of(),
92-
randomBoolean(),
93-
null
68+
assertThat(issues, equalTo(singletonList(expected)));
69+
}
70+
71+
public void testOldIndicesCheckWithOnlyClosedOrNewIndices() {
72+
// This tests what happens when any old indices that we have are closed. We expect no deprecation warning.
73+
int oldClosedIndexCount = randomIntBetween(1, 100);
74+
int newOpenIndexCount = randomIntBetween(0, 100);
75+
int newClosedIndexCount = randomIntBetween(0, 100);
76+
77+
Map<String, IndexMetadata> nameToIndexMetadata = new HashMap<>();
78+
Set<String> expectedIndices = new HashSet<>();
79+
80+
DataStream dataStream = createTestDataStream(
81+
0,
82+
oldClosedIndexCount,
83+
newOpenIndexCount,
84+
newClosedIndexCount,
85+
nameToIndexMetadata,
86+
expectedIndices
87+
);
88+
89+
Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build();
90+
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build();
91+
92+
List<DeprecationIssue> issues = DeprecationChecks.filterChecks(DATA_STREAM_CHECKS, c -> c.apply(dataStream, clusterState));
93+
94+
assertThat(issues.size(), equalTo(0));
95+
}
96+
97+
public void testOldIndicesCheckWithClosedAndOpenIndices() {
98+
/*
99+
* This tests what happens when a data stream has old indices, and some are open and some are closed. We expect a deprecation
100+
* warning that includes information about the old ones only.
101+
*/
102+
int oldOpenIndexCount = randomIntBetween(1, 100);
103+
int oldClosedIndexCount = randomIntBetween(1, 100);
104+
int newOpenIndexCount = randomIntBetween(0, 100);
105+
int newClosedIndexCount = randomIntBetween(0, 100);
106+
107+
Map<String, IndexMetadata> nameToIndexMetadata = new HashMap<>();
108+
Set<String> expectedIndices = new HashSet<>();
109+
110+
DataStream dataStream = createTestDataStream(
111+
oldOpenIndexCount,
112+
oldClosedIndexCount,
113+
newOpenIndexCount,
114+
newClosedIndexCount,
115+
nameToIndexMetadata,
116+
expectedIndices
94117
);
95118

96119
Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build();
@@ -104,7 +127,7 @@ public void testOldIndicesCheck() {
104127
false,
105128
ofEntries(
106129
entry("reindex_required", true),
107-
entry("total_backing_indices", oldIndexCount + newIndexCount),
130+
entry("total_backing_indices", oldOpenIndexCount + oldClosedIndexCount + newOpenIndexCount + newClosedIndexCount),
108131
entry("indices_requiring_upgrade_count", expectedIndices.size()),
109132
entry("indices_requiring_upgrade", expectedIndices)
110133
)
@@ -115,4 +138,90 @@ public void testOldIndicesCheck() {
115138
assertThat(issues, equalTo(singletonList(expected)));
116139
}
117140

141+
/*
142+
* This creates a test DataStream with the given counts. The nameToIndexMetadata Map and the expectedIndices Set are mutable collections
143+
* that will be populated by this method.
144+
*/
145+
private DataStream createTestDataStream(
146+
int oldOpenIndexCount,
147+
int oldClosedIndexCount,
148+
int newOpenIndexCount,
149+
int newClosedIndexCount,
150+
Map<String, IndexMetadata> nameToIndexMetadata,
151+
Set<String> expectedIndices
152+
) {
153+
List<Index> allIndices = new ArrayList<>();
154+
155+
for (int i = 0; i < oldOpenIndexCount; i++) {
156+
allIndices.add(createOldIndex(i, false, nameToIndexMetadata, expectedIndices));
157+
}
158+
for (int i = 0; i < oldClosedIndexCount; i++) {
159+
allIndices.add(createOldIndex(i, true, nameToIndexMetadata, null));
160+
}
161+
for (int i = 0; i < newOpenIndexCount; i++) {
162+
allIndices.add(createNewIndex(i, false, nameToIndexMetadata));
163+
}
164+
for (int i = 0; i < newClosedIndexCount; i++) {
165+
allIndices.add(createNewIndex(i, true, nameToIndexMetadata));
166+
}
167+
168+
DataStream dataStream = new DataStream(
169+
randomAlphaOfLength(10),
170+
allIndices,
171+
randomNegativeLong(),
172+
Map.of(),
173+
randomBoolean(),
174+
false,
175+
false,
176+
randomBoolean(),
177+
randomFrom(IndexMode.values()),
178+
null,
179+
randomFrom(DataStreamOptions.EMPTY, DataStreamOptions.FAILURE_STORE_DISABLED, DataStreamOptions.FAILURE_STORE_ENABLED, null),
180+
List.of(),
181+
randomBoolean(),
182+
null
183+
);
184+
return dataStream;
185+
}
186+
187+
private Index createOldIndex(
188+
int suffix,
189+
boolean isClosed,
190+
Map<String, IndexMetadata> nameToIndexMetadata,
191+
Set<String> expectedIndices
192+
) {
193+
return createIndex(true, suffix, isClosed, nameToIndexMetadata, expectedIndices);
194+
}
195+
196+
private Index createNewIndex(int suffix, boolean isClosed, Map<String, IndexMetadata> nameToIndexMetadata) {
197+
return createIndex(false, suffix, isClosed, nameToIndexMetadata, null);
198+
}
199+
200+
private Index createIndex(
201+
boolean isOld,
202+
int suffix,
203+
boolean isClosed,
204+
Map<String, IndexMetadata> nameToIndexMetadata,
205+
Set<String> expectedIndices
206+
) {
207+
Settings.Builder settingsBuilder = isOld ? settings(IndexVersion.fromId(7170099)) : settings(IndexVersion.current());
208+
String indexName = (isOld ? "old-" : "new-") + (isClosed ? "closed-" : "") + "data-stream-index-" + suffix;
209+
if (isOld && isClosed == false) { // we only expect warnings on open old indices
210+
if (expectedIndices.isEmpty() == false && randomIntBetween(0, 2) == 0) {
211+
settingsBuilder.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE);
212+
} else {
213+
expectedIndices.add(indexName);
214+
}
215+
}
216+
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
217+
.settings(settingsBuilder)
218+
.numberOfShards(1)
219+
.numberOfReplicas(0);
220+
if (isClosed) {
221+
indexMetadataBuilder.state(IndexMetadata.State.CLOSE);
222+
}
223+
IndexMetadata indexMetadata = indexMetadataBuilder.build();
224+
nameToIndexMetadata.put(indexMetadata.getIndex().getName(), indexMetadata);
225+
return indexMetadata.getIndex();
226+
}
118227
}

x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecksTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,22 @@ public void testOldIndicesCheckSnapshotIgnored() {
116116
assertThat(issues, empty());
117117
}
118118

119+
public void testOldIndicesCheckClosedIgnored() {
120+
IndexVersion createdWith = IndexVersion.fromId(7170099);
121+
Settings.Builder settings = settings(createdWith);
122+
IndexMetadata indexMetadata = IndexMetadata.builder("test")
123+
.settings(settings)
124+
.numberOfShards(1)
125+
.numberOfReplicas(0)
126+
.state(IndexMetadata.State.CLOSE)
127+
.build();
128+
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
129+
.metadata(Metadata.builder().put(indexMetadata, true))
130+
.build();
131+
List<DeprecationIssue> issues = DeprecationChecks.filterChecks(INDEX_SETTINGS_CHECKS, c -> c.apply(indexMetadata, clusterState));
132+
assertThat(issues, empty());
133+
}
134+
119135
public void testTranslogRetentionSettings() {
120136
Settings.Builder settings = settings(IndexVersion.current());
121137
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue());

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.IOException;
2525
import java.nio.charset.StandardCharsets;
2626
import java.time.Instant;
27+
import java.util.HashSet;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.Set;
@@ -252,16 +253,27 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
252253
assertOK(client().performRequest(putIndexTemplateRequest));
253254
bulkLoadData(dataStreamName);
254255
for (int i = 0; i < numRollovers; i++) {
255-
rollover(dataStreamName);
256+
String oldIndexName = rollover(dataStreamName);
257+
if (randomBoolean()) {
258+
closeIndex(oldIndexName);
259+
}
256260
bulkLoadData(dataStreamName);
257261
}
258262
}
259263

260264
private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception {
261265
Set<String> indicesNeedingUpgrade = getDataStreamIndices(dataStreamName);
266+
Set<String> closedOldIndices = getClosedIndices(dataStreamName);
262267
final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
263268
for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
264-
rollover(dataStreamName);
269+
String oldIndexName = rollover(dataStreamName);
270+
if (randomBoolean()) {
271+
if (i == 0) {
272+
// Since this is the first rollover on the new cluster, the old index came from the old cluster
273+
closedOldIndices.add(oldIndexName);
274+
}
275+
closeIndex(oldIndexName);
276+
}
265277
}
266278
Request reindexRequest = new Request("POST", "/_migration/reindex");
267279
reindexRequest.setJsonEntity(Strings.format("""
@@ -304,12 +316,14 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
304316
*/
305317
assertThat(
306318
statusResponseMap.get("total_indices_requiring_upgrade"),
307-
equalTo(originalWriteIndex + numRolloversOnOldCluster)
319+
equalTo(originalWriteIndex + numRolloversOnOldCluster - closedOldIndices.size())
308320
);
309-
assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1));
321+
assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1 - closedOldIndices.size()));
310322
// We expect all the original indices to have been deleted
311323
for (String oldIndex : indicesNeedingUpgrade) {
312-
assertThat(indexExists(oldIndex), equalTo(false));
324+
if (closedOldIndices.contains(oldIndex) == false) {
325+
assertThat(indexExists(oldIndex), equalTo(false));
326+
}
313327
}
314328
assertThat(getDataStreamIndices(dataStreamName).size(), equalTo(expectedTotalIndicesInDataStream));
315329
}
@@ -329,6 +343,29 @@ private Set<String> getDataStreamIndices(String dataStreamName) throws IOExcepti
329343
return indices.stream().map(index -> index.get("index_name").toString()).collect(Collectors.toSet());
330344
}
331345

346+
@SuppressWarnings("unchecked")
347+
private Set<String> getClosedIndices(String dataStreamName) throws IOException {
348+
Set<String> allIndices = getDataStreamIndices(dataStreamName);
349+
Set<String> closedIndices = new HashSet<>();
350+
Response response = client().performRequest(new Request("GET", "_cluster/state/blocks/indices"));
351+
Map<String, Object> responseMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false);
352+
Map<String, Object> blocks = (Map<String, Object>) responseMap.get("blocks");
353+
Map<String, Object> indices = (Map<String, Object>) blocks.get("indices");
354+
for (Map.Entry<String, Object> indexEntry : indices.entrySet()) {
355+
String indexName = indexEntry.getKey();
356+
if (allIndices.contains(indexName)) {
357+
Map<String, Object> blocksForIndex = (Map<String, Object>) indexEntry.getValue();
358+
for (Map.Entry<String, Object> blockEntry : blocksForIndex.entrySet()) {
359+
Map<String, String> block = (Map<String, String>) blockEntry.getValue();
360+
if ("index closed".equals(block.get("description"))) {
361+
closedIndices.add(indexName);
362+
}
363+
}
364+
}
365+
}
366+
return closedIndices;
367+
}
368+
332369
/*
333370
* Similar to isOriginalClusterCurrent, but returns true if the major versions of the clusters are the same. So true
334371
* for 8.6 and 8.17, but false for 7.17 and 8.18.
@@ -370,9 +407,11 @@ static String formatInstant(Instant instant) {
370407
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
371408
}
372409

373-
private static void rollover(String dataStreamName) throws IOException {
410+
private static String rollover(String dataStreamName) throws IOException {
374411
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
375412
Response rolloverResponse = client().performRequest(rolloverRequest);
376413
assertOK(rolloverResponse);
414+
String oldIndexName = (String) entityAsMap(rolloverResponse).get("old_index");
415+
return oldIndexName;
377416
}
378417
}

0 commit comments

Comments
 (0)