2929import org .apache .lucene .store .LockObtainFailedException ;
3030import org .apache .lucene .store .NativeFSLockFactory ;
3131import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsResponse ;
32+ import org .elasticsearch .action .admin .indices .flush .FlushRequest ;
33+ import org .elasticsearch .action .admin .indices .recovery .RecoveryResponse ;
3234import org .elasticsearch .action .index .IndexRequestBuilder ;
3335import org .elasticsearch .action .search .SearchPhaseExecutionException ;
34- import org .elasticsearch .action .search .SearchResponse ;
3536import org .elasticsearch .cli .MockTerminal ;
3637import org .elasticsearch .cluster .ClusterState ;
3738import org .elasticsearch .cluster .routing .GroupShardsIterator ;
4344import org .elasticsearch .common .unit .ByteSizeUnit ;
4445import org .elasticsearch .common .unit .ByteSizeValue ;
4546import org .elasticsearch .common .unit .TimeValue ;
46- import org .elasticsearch .env .Environment ;
4747import org .elasticsearch .index .Index ;
4848import org .elasticsearch .index .IndexSettings ;
4949import org .elasticsearch .index .MockEngineFactoryPlugin ;
50+ import org .elasticsearch .index .shard .IndexShard ;
51+ import org .elasticsearch .index .shard .ShardId ;
52+ import org .elasticsearch .indices .IndicesService ;
53+ import org .elasticsearch .indices .recovery .RecoveryState ;
5054import org .elasticsearch .monitor .fs .FsInfo ;
5155import org .elasticsearch .plugins .Plugin ;
5256import org .elasticsearch .test .ESIntegTestCase ;
57+ import org .elasticsearch .test .InternalTestCluster ;
5358import org .elasticsearch .test .engine .MockEngineSupport ;
54- import org .elasticsearch .test .hamcrest .ElasticsearchAssertions ;
5559import org .elasticsearch .test .transport .MockTransportService ;
5660
5761import java .io .IOException ;
6367import java .nio .file .StandardOpenOption ;
6468import java .util .Arrays ;
6569import java .util .Collection ;
66- import java .util .HashMap ;
6770import java .util .List ;
6871import java .util .Set ;
6972import java .util .TreeSet ;
7275import static org .elasticsearch .common .util .CollectionUtils .iterableAsArrayList ;
7376import static org .elasticsearch .index .query .QueryBuilders .matchAllQuery ;
7477import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
78+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertHitCount ;
7579import static org .hamcrest .Matchers .containsString ;
80+ import static org .hamcrest .Matchers .equalTo ;
81+ import static org .hamcrest .Matchers .greaterThan ;
7682import static org .hamcrest .Matchers .notNullValue ;
7783
7884@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .SUITE , numDataNodes = 0 )
@@ -84,23 +90,40 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
8490 }
8591
8692 public void testCorruptTranslogTruncation () throws Exception {
87- internalCluster ().startNodes (1 , Settings .EMPTY );
93+ internalCluster ().startNodes (2 , Settings .EMPTY );
8894
89- assertAcked (prepareCreate ("test" ).setSettings (Settings .builder ()
90- .put ("index.number_of_shards" , 1 )
91- .put ("index.number_of_replicas" , 0 )
92- .put ("index.refresh_interval" , "-1" )
93- .put (MockEngineSupport .DISABLE_FLUSH_ON_CLOSE .getKey (), true ) // never flush - always recover from translog
94- ));
95+ final String replicaNode = internalCluster ().getNodeNames ()[1 ];
96+
97+ assertAcked (prepareCreate ("test" ).setSettings (Settings .builder ()
98+ .put ("index.number_of_shards" , 1 )
99+ .put ("index.number_of_replicas" , 1 )
100+ .put ("index.refresh_interval" , "-1" )
101+ .put (MockEngineSupport .DISABLE_FLUSH_ON_CLOSE .getKey (), true ) // never flush - always recover from translog
102+ .put ("index.routing.allocation.exclude._name" , replicaNode )
103+ ));
95104 ensureYellow ();
96105
106+ assertAcked (client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ()
107+ .put ("index.routing.allocation.exclude._name" , (String )null )
108+ ));
109+
97110 // Index some documents
98- int numDocs = scaledRandomIntBetween (100 , 1000 );
99- IndexRequestBuilder [] builders = new IndexRequestBuilder [numDocs ];
111+ logger .info ("--> indexing more doc to be kept" );
112+ int numDocsToKeep = randomIntBetween (0 , 100 );
113+ IndexRequestBuilder [] builders = new IndexRequestBuilder [numDocsToKeep ];
100114 for (int i = 0 ; i < builders .length ; i ++) {
101115 builders [i ] = client ().prepareIndex ("test" , "type" ).setSource ("foo" , "bar" );
102116 }
117+ indexRandom (false , false , false , Arrays .asList (builders ));
118+ flush ("test" );
103119 disableTranslogFlush ("test" );
120+ // having no extra docs is an interesting case for seq no based recoveries - test it more often
121+ int numDocsToTruncate = randomBoolean () ? 0 : randomIntBetween (0 , 100 );
122+ logger .info ("--> indexing [{}] more doc to be truncated" , numDocsToTruncate );
123+ builders = new IndexRequestBuilder [numDocsToTruncate ];
124+ for (int i = 0 ; i < builders .length ; i ++) {
125+ builders [i ] = client ().prepareIndex ("test" , "type" ).setSource ("foo" , "bar" );
126+ }
104127 indexRandom (false , false , false , Arrays .asList (builders ));
105128 Set <Path > translogDirs = getTranslogDirs ("test" );
106129
@@ -120,17 +143,32 @@ public void testCorruptTranslogTruncation() throws Exception {
120143 }
121144 }
122145
146+ final boolean expectSeqNoRecovery ;
147+ if (randomBoolean () && numDocsToTruncate > 0 ) {
148+ // flush the replica, so it will have more docs than what the primary will have
149+ Index index = resolveIndex ("test" );
150+ IndexShard replica = internalCluster ().getInstance (IndicesService .class , replicaNode ).getShardOrNull (new ShardId (index , 0 ));
151+ replica .flush (new FlushRequest ());
152+ expectSeqNoRecovery = false ;
153+ logger .info ("--> ops based recovery disabled by flushing replica" );
154+ } else {
155+ expectSeqNoRecovery = true ;
156+ }
157+
158+ // shut down the replica node to be tested later
159+ internalCluster ().stopRandomNode (InternalTestCluster .nameFilter (replicaNode ));
160+
123161 // Corrupt the translog file(s)
124162 logger .info ("--> corrupting translog" );
125163 corruptRandomTranslogFiles ("test" );
126164
127165 // Restart the single node
128166 logger .info ("--> restarting node" );
129- internalCluster ().fullRestart ();
167+ internalCluster ().restartRandomDataNode ();
130168 client ().admin ().cluster ().prepareHealth ().setWaitForYellowStatus ()
131- .setTimeout (new TimeValue (1000 , TimeUnit .MILLISECONDS ))
132- .setWaitForEvents (Priority .LANGUID )
133- .get ();
169+ .setTimeout (new TimeValue (1000 , TimeUnit .MILLISECONDS ))
170+ .setWaitForEvents (Priority .LANGUID )
171+ .get ();
134172
135173 try {
136174 client ().prepareSearch ("test" ).setQuery (matchAllQuery ()).get ();
@@ -149,7 +187,7 @@ public void testCorruptTranslogTruncation() throws Exception {
149187 assertBusy (() -> {
150188 logger .info ("--> checking that lock has been released for {}" , idxLocation );
151189 try (Directory dir = FSDirectory .open (idxLocation , NativeFSLockFactory .INSTANCE );
152- Lock writeLock = dir .obtainLock (IndexWriter .WRITE_LOCK_NAME )) {
190+ Lock writeLock = dir .obtainLock (IndexWriter .WRITE_LOCK_NAME )) {
153191 // Great, do nothing, we just wanted to obtain the lock
154192 } catch (LockObtainFailedException lofe ) {
155193 logger .info ("--> failed acquiring lock for {}" , idxLocation );
@@ -171,26 +209,140 @@ public void testCorruptTranslogTruncation() throws Exception {
171209 ensureYellow ("test" );
172210
173211 // Run a search and make sure it succeeds
174- SearchResponse resp = client ().prepareSearch ("test" ).setQuery (matchAllQuery ()).get ();
175- ElasticsearchAssertions .assertNoFailures (resp );
212+ assertHitCount (client ().prepareSearch ("test" ).setQuery (matchAllQuery ()).get (), numDocsToKeep );
213+
214+ logger .info ("--> starting the replica node to test recovery" );
215+ internalCluster ().startNode ();
216+ ensureGreen ("test" );
217+ assertHitCount (client ().prepareSearch ("test" ).setPreference ("_replica" ).setQuery (matchAllQuery ()).get (), numDocsToKeep );
218+ final RecoveryResponse recoveryResponse = client ().admin ().indices ().prepareRecoveries ("test" ).setActiveOnly (false ).get ();
219+ final RecoveryState replicaRecoveryState = recoveryResponse .shardRecoveryStates ().get ("test" ).stream ()
220+ .filter (recoveryState -> recoveryState .getPrimary () == false ).findFirst ().get ();
221+ assertThat (replicaRecoveryState .getIndex ().toString (), replicaRecoveryState .getIndex ().recoveredFileCount (),
222+ expectSeqNoRecovery ? equalTo (0 ) : greaterThan (0 ));
223+ }
224+
225+ public void testCorruptTranslogTruncationOfReplica () throws Exception {
226+ internalCluster ().startNodes (2 , Settings .EMPTY );
227+
228+ final String primaryNode = internalCluster ().getNodeNames ()[0 ];
229+ final String replicaNode = internalCluster ().getNodeNames ()[1 ];
230+
231+ assertAcked (prepareCreate ("test" ).setSettings (Settings .builder ()
232+ .put ("index.number_of_shards" , 1 )
233+ .put ("index.number_of_replicas" , 1 )
234+ .put ("index.refresh_interval" , "-1" )
235+ .put (MockEngineSupport .DISABLE_FLUSH_ON_CLOSE .getKey (), true ) // never flush - always recover from translog
236+ .put ("index.routing.allocation.exclude._name" , replicaNode )
237+ ));
238+ ensureYellow ();
239+
240+ assertAcked (client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ()
241+ .put ("index.routing.allocation.exclude._name" , (String )null )
242+ ));
243+ ensureGreen ();
244+
245+ // Index some documents
246+ logger .info ("--> indexing more doc to be kept" );
247+ int numDocsToKeep = randomIntBetween (0 , 100 );
248+ IndexRequestBuilder [] builders = new IndexRequestBuilder [numDocsToKeep ];
249+ for (int i = 0 ; i < builders .length ; i ++) {
250+ builders [i ] = client ().prepareIndex ("test" , "type" ).setSource ("foo" , "bar" );
251+ }
252+ indexRandom (false , false , false , Arrays .asList (builders ));
253+ flush ("test" );
254+ disableTranslogFlush ("test" );
255+ // having no extra docs is an interesting case for seq no based recoveries - test it more often
256+ int numDocsToTruncate = randomBoolean () ? 0 : randomIntBetween (0 , 100 );
257+ logger .info ("--> indexing [{}] more doc to be truncated" , numDocsToTruncate );
258+ builders = new IndexRequestBuilder [numDocsToTruncate ];
259+ for (int i = 0 ; i < builders .length ; i ++) {
260+ builders [i ] = client ().prepareIndex ("test" , "type" ).setSource ("foo" , "bar" );
261+ }
262+ indexRandom (false , false , false , Arrays .asList (builders ));
263+ final int totalDocs = numDocsToKeep + numDocsToTruncate ;
264+
265+
266+ // sample the replica node translog dirs
267+ final ShardId shardId = new ShardId (resolveIndex ("test" ), 0 );
268+ Set <Path > translogDirs = getTranslogDirs (replicaNode , shardId );
269+
270+ // stop the cluster nodes. we don't use full restart so the node start up order will be the same
271+ // and shard roles will be maintained
272+ internalCluster ().stopRandomDataNode ();
273+ internalCluster ().stopRandomDataNode ();
274+
275+ // Corrupt the translog file(s)
276+ logger .info ("--> corrupting translog" );
277+ corruptTranslogFiles (translogDirs );
278+
279+ // Restart the single node
280+ logger .info ("--> starting node" );
281+ internalCluster ().startNode ();
282+
283+ ensureYellow ();
284+
285+ // Run a search and make sure it succeeds
286+ assertHitCount (client ().prepareSearch ("test" ).setQuery (matchAllQuery ()).get (), totalDocs );
287+
288+ TruncateTranslogCommand ttc = new TruncateTranslogCommand ();
289+ MockTerminal t = new MockTerminal ();
290+ OptionParser parser = ttc .getParser ();
291+
292+ for (Path translogDir : translogDirs ) {
293+ final Path idxLocation = translogDir .getParent ().resolve ("index" );
294+ assertBusy (() -> {
295+ logger .info ("--> checking that lock has been released for {}" , idxLocation );
296+ try (Directory dir = FSDirectory .open (idxLocation , NativeFSLockFactory .INSTANCE );
297+ Lock writeLock = dir .obtainLock (IndexWriter .WRITE_LOCK_NAME )) {
298+ // Great, do nothing, we just wanted to obtain the lock
299+ } catch (LockObtainFailedException lofe ) {
300+ logger .info ("--> failed acquiring lock for {}" , idxLocation );
301+ fail ("still waiting for lock release at [" + idxLocation + "]" );
302+ } catch (IOException ioe ) {
303+ fail ("Got an IOException: " + ioe );
304+ }
305+ });
306+
307+ OptionSet options = parser .parse ("-d" , translogDir .toAbsolutePath ().toString (), "-b" );
308+ logger .info ("--> running truncate translog command for [{}]" , translogDir .toAbsolutePath ());
309+ ttc .execute (t , options , null /* TODO: env should be real here, and ttc should actually use it... */ );
310+ logger .info ("--> output:\n {}" , t .getOutput ());
311+ }
312+
313+ logger .info ("--> starting the replica node to test recovery" );
314+ internalCluster ().startNode ();
315+ ensureGreen ("test" );
316+ assertHitCount (client ().prepareSearch ("test" ).setPreference ("_replica" ).setQuery (matchAllQuery ()).get (), totalDocs );
317+
318+ final RecoveryResponse recoveryResponse = client ().admin ().indices ().prepareRecoveries ("test" ).setActiveOnly (false ).get ();
319+ final RecoveryState replicaRecoveryState = recoveryResponse .shardRecoveryStates ().get ("test" ).stream ()
320+ .filter (recoveryState -> recoveryState .getPrimary () == false ).findFirst ().get ();
321+ // the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery
322+ assertThat (replicaRecoveryState .getIndex ().toString (), replicaRecoveryState .getIndex ().recoveredFileCount (), greaterThan (0 ));
176323 }
177324
178325 private Set <Path > getTranslogDirs (String indexName ) throws IOException {
179326 ClusterState state = client ().admin ().cluster ().prepareState ().get ().getState ();
180327 GroupShardsIterator shardIterators = state .getRoutingTable ().activePrimaryShardsGrouped (new String []{indexName }, false );
181- final Index idx = state .metaData ().index (indexName ).getIndex ();
182328 List <ShardIterator > iterators = iterableAsArrayList (shardIterators );
183329 ShardIterator shardIterator = RandomPicks .randomFrom (random (), iterators );
184330 ShardRouting shardRouting = shardIterator .nextOrNull ();
185331 assertNotNull (shardRouting );
186332 assertTrue (shardRouting .primary ());
187333 assertTrue (shardRouting .assignedToNode ());
188334 String nodeId = shardRouting .currentNodeId ();
335+ ShardId shardId = shardRouting .shardId ();
336+ return getTranslogDirs (nodeId , shardId );
337+ }
338+
339+ private Set <Path > getTranslogDirs (String nodeId , ShardId shardId ) {
189340 NodesStatsResponse nodeStatses = client ().admin ().cluster ().prepareNodesStats (nodeId ).setFs (true ).get ();
190341 Set <Path > translogDirs = new TreeSet <>(); // treeset makes sure iteration order is deterministic
191342 for (FsInfo .Path fsPath : nodeStatses .getNodes ().get (0 ).getFs ()) {
192343 String path = fsPath .getPath ();
193- final String relativeDataLocationPath = "indices/" + idx .getUUID () +"/" + Integer .toString (shardRouting .getId ()) + "/translog" ;
344+ final String relativeDataLocationPath = "indices/" + shardId .getIndex ().getUUID () +"/" + Integer .toString (shardId .getId ())
345+ + "/translog" ;
194346 Path translogPath = PathUtils .get (path ).resolve (relativeDataLocationPath );
195347 if (Files .isDirectory (translogPath )) {
196348 translogDirs .add (translogPath );
@@ -201,6 +353,10 @@ private Set<Path> getTranslogDirs(String indexName) throws IOException {
201353
202354 private void corruptRandomTranslogFiles (String indexName ) throws IOException {
203355 Set <Path > translogDirs = getTranslogDirs (indexName );
356+ corruptTranslogFiles (translogDirs );
357+ }
358+
359+ private void corruptTranslogFiles (Set <Path > translogDirs ) throws IOException {
204360 Set <Path > files = new TreeSet <>(); // treeset makes sure iteration order is deterministic
205361 for (Path translogDir : translogDirs ) {
206362 if (Files .isDirectory (translogDir )) {
0 commit comments