6565import java .util .concurrent .ConcurrentHashMap ;
6666import java .util .concurrent .ConcurrentMap ;
6767import java .util .concurrent .LinkedBlockingQueue ;
68+ import java .util .concurrent .Semaphore ;
6869import java .util .concurrent .TimeUnit ;
6970import java .util .concurrent .atomic .AtomicBoolean ;
7071import java .util .concurrent .atomic .AtomicLong ;
@@ -533,6 +534,7 @@ public CompletableFuture<Boolean> revive(PopConsumerRecord record) {
533534 });
534535 }
535536
537+ @ SuppressWarnings ("StatementWithEmptyBody" )
536538 public void clearCache (String groupId , String topicId , int queueId ) {
537539 while (consumerLockService .tryLock (groupId , topicId )) {
538540 }
@@ -551,12 +553,26 @@ public long revive(AtomicLong currentTime, int maxCount) {
551553 List <PopConsumerRecord > consumerRecords = this .popConsumerStore .scanExpiredRecords (
552554 currentTime .get () - TimeUnit .SECONDS .toMillis (3 ), upperTime , maxCount );
553555 long scanCostTime = stopwatch .elapsed (TimeUnit .MILLISECONDS );
556+
557+ // When reading messages from local storage, the current thread is used
558+ // directly for data retrieval. When reading original messages from remote
559+ // storage (such as distributed file systems), so concurrency needs to be
560+ // controlled via semaphore.
561+ Semaphore semaphore = new Semaphore (brokerConfig .getPopReviveConcurrency ());
554562 Queue <PopConsumerRecord > failureList = new LinkedBlockingQueue <>();
555563 List <CompletableFuture <?>> futureList = new ArrayList <>(consumerRecords .size ());
556564
557565 // could merge read operation here
558566 for (PopConsumerRecord record : consumerRecords ) {
559- futureList .add (this .revive (record ).thenAccept (result -> {
567+ CompletableFuture <Boolean > future ;
568+ try {
569+ semaphore .acquire ();
570+ future = this .revive (record );
571+ } catch (Exception e ) {
572+ semaphore .release ();
573+ throw new RuntimeException (e );
574+ }
575+ futureList .add (future .thenAccept (result -> {
560576 if (!result ) {
561577 if (record .getAttemptTimes () < brokerConfig .getPopReviveMaxAttemptTimes ()) {
562578 long backoffInterval = 1000L * REWRITE_INTERVALS_IN_SECONDS [
@@ -572,7 +588,7 @@ public long revive(AtomicLong currentTime, int maxCount) {
572588 log .error ("PopConsumerService drop record, message may be lost, record={}" , record );
573589 }
574590 }
575- }));
591+ }). whenComplete (( result , ex ) -> semaphore . release ()) );
576592 }
577593
578594 CompletableFuture .allOf (futureList .toArray (new CompletableFuture [0 ])).join ();
0 commit comments