3131import org .elasticsearch .common .inject .Inject ;
3232import org .elasticsearch .common .settings .Settings ;
3333import org .elasticsearch .common .unit .ByteSizeValue ;
34+ import org .elasticsearch .common .unit .TimeValue ;
3435import org .elasticsearch .index .deletionpolicy .SnapshotIndexCommit ;
3536import org .elasticsearch .index .engine .Engine ;
3637import org .elasticsearch .index .shard .IllegalIndexShardStateException ;
@@ -60,11 +61,12 @@ public static class Actions {
6061 }
6162
6263 private final TransportService transportService ;
63-
6464 private final IndicesService indicesService ;
65-
6665 private final RecoverySettings recoverySettings ;
6766
67+ private final TimeValue internalActionTimeout ;
68+ private final TimeValue internalActionLongTimeout ;
69+
6870
6971 @ Inject
7072 public RecoverySource (Settings settings , TransportService transportService , IndicesService indicesService ,
@@ -76,6 +78,8 @@ public RecoverySource(Settings settings, TransportService transportService, Indi
7678 this .recoverySettings = recoverySettings ;
7779
7880 transportService .registerHandler (Actions .START_RECOVERY , new StartRecoveryTransportRequestHandler ());
81+ this .internalActionTimeout = componentSettings .getAsTime ("internal_action_timeout" , TimeValue .timeValueMinutes (15 ));
82+ this .internalActionLongTimeout = new TimeValue (internalActionTimeout .millis () * 2 );
7983 }
8084
8185 private RecoveryResponse recover (final StartRecoveryRequest request ) {
@@ -123,7 +127,7 @@ public void phase1(final SnapshotIndexCommit snapshot) throws ElasticSearchExcep
123127
124128 RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest (request .recoveryId (), request .shardId (), response .phase1FileNames , response .phase1FileSizes ,
125129 response .phase1ExistingFileNames , response .phase1ExistingFileSizes , response .phase1TotalSize , response .phase1ExistingTotalSize );
126- transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .FILES_INFO , recoveryInfoFilesRequest , EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
130+ transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .FILES_INFO , recoveryInfoFilesRequest , TransportRequestOptions . options (). withTimeout ( internalActionTimeout ), EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
127131
128132 final CountDownLatch latch = new CountDownLatch (response .phase1FileNames .size ());
129133 final AtomicReference <Exception > lastException = new AtomicReference <Exception >();
@@ -159,10 +163,9 @@ public void run() {
159163 indexInput .readBytes (buf , 0 , toRead , false );
160164 BytesArray content = new BytesArray (buf , 0 , toRead );
161165 transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .FILE_CHUNK , new RecoveryFileChunkRequest (request .recoveryId (), request .shardId (), name , position , len , md .checksum (), content ),
162- TransportRequestOptions .options ().withCompress (shouldCompressRequest ).withLowType (), EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
166+ TransportRequestOptions .options ().withCompress (shouldCompressRequest ).withLowType (). withTimeout ( internalActionTimeout ) , EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
163167 readCount += toRead ;
164168 }
165- indexInput .close ();
166169 } catch (Exception e ) {
167170 lastException .set (e );
168171 } finally {
@@ -187,7 +190,7 @@ public void run() {
187190
188191 // now, set the clean files request
189192 Set <String > snapshotFiles = Sets .newHashSet (snapshot .getFiles ());
190- transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .CLEAN_FILES , new RecoveryCleanFilesRequest (request .recoveryId (), shard .shardId (), snapshotFiles ), EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
193+ transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .CLEAN_FILES , new RecoveryCleanFilesRequest (request .recoveryId (), shard .shardId (), snapshotFiles ), TransportRequestOptions . options (). withTimeout ( internalActionTimeout ), EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
191194
192195 stopWatch .stop ();
193196 logger .trace ("[{}][{}] recovery [phase1] to {}: took [{}]" , request .shardId ().index ().name (), request .shardId ().id (), request .targetNode (), stopWatch .totalTime ());
@@ -204,7 +207,7 @@ public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException {
204207 }
205208 logger .trace ("[{}][{}] recovery [phase2] to {}: start" , request .shardId ().index ().name (), request .shardId ().id (), request .targetNode ());
206209 StopWatch stopWatch = new StopWatch ().start ();
207- transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .PREPARE_TRANSLOG , new RecoveryPrepareForTranslogOperationsRequest (request .recoveryId (), request .shardId ()), EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
210+ transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .PREPARE_TRANSLOG , new RecoveryPrepareForTranslogOperationsRequest (request .recoveryId (), request .shardId ()), TransportRequestOptions . options (). withTimeout ( internalActionTimeout ), EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
208211 stopWatch .stop ();
209212 response .startTime = stopWatch .totalTime ().millis ();
210213 logger .trace ("[{}][{}] recovery [phase2] to {}: start took [{}]" , request .shardId ().index ().name (), request .shardId ().id (), request .targetNode (), stopWatch .totalTime ());
@@ -226,7 +229,7 @@ public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException {
226229 logger .trace ("[{}][{}] recovery [phase3] to {}: sending transaction log operations" , request .shardId ().index ().name (), request .shardId ().id (), request .targetNode ());
227230 StopWatch stopWatch = new StopWatch ().start ();
228231 int totalOperations = sendSnapshot (snapshot );
229- transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .FINALIZE , new RecoveryFinalizeRecoveryRequest (request .recoveryId (), request .shardId ()), EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
232+ transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .FINALIZE , new RecoveryFinalizeRecoveryRequest (request .recoveryId (), request .shardId ()), TransportRequestOptions . options (). withTimeout ( internalActionLongTimeout ), EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
230233 if (request .markAsRelocated ()) {
231234 // TODO what happens if the recovery process fails afterwards, we need to mark this back to started
232235 try {
@@ -264,7 +267,7 @@ private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchExcepti
264267 }
265268
266269 RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest (request .recoveryId (), request .shardId (), operations );
267- transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .TRANSLOG_OPS , translogOperationsRequest , TransportRequestOptions .options ().withCompress (recoverySettings .compress ()).withLowType (), EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
270+ transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .TRANSLOG_OPS , translogOperationsRequest , TransportRequestOptions .options ().withCompress (recoverySettings .compress ()).withLowType (). withTimeout ( internalActionLongTimeout ) , EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
268271 ops = 0 ;
269272 size = 0 ;
270273 operations .clear ();
@@ -273,7 +276,7 @@ private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchExcepti
273276 // send the leftover
274277 if (!operations .isEmpty ()) {
275278 RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest (request .recoveryId (), request .shardId (), operations );
276- transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .TRANSLOG_OPS , translogOperationsRequest , TransportRequestOptions .options ().withCompress (recoverySettings .compress ()).withLowType (), EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
279+ transportService .submitRequest (request .targetNode (), RecoveryTarget .Actions .TRANSLOG_OPS , translogOperationsRequest , TransportRequestOptions .options ().withCompress (recoverySettings .compress ()).withLowType (). withTimeout ( internalActionLongTimeout ) , EmptyTransportResponseHandler .INSTANCE_SAME ).txGet ();
277280 }
278281 return totalOperations ;
279282 }
0 commit comments