Skip to content

Commit 70322f7

Browse files
Jokewayjencompgeek
authored andcommitted
Resubscribe message listener on connection failure
DATAREDIS-231
1 parent 5d00542 commit 70322f7

File tree

1 file changed

+68
-2
lines changed

1 file changed

+68
-2
lines changed

src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.context.SmartLifecycle;
3535
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3636
import org.springframework.core.task.TaskExecutor;
37+
import org.springframework.data.redis.RedisConnectionFailureException;
3738
import org.springframework.data.redis.connection.Message;
3839
import org.springframework.data.redis.connection.MessageListener;
3940
import org.springframework.data.redis.connection.RedisConnection;
@@ -65,6 +66,7 @@
6566
*
6667
*
6768
* @author Costin Leau
69+
* @author Way Joke
6870
*/
6971
public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
7072

@@ -116,8 +118,13 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
116118
private final SubscriptionTask subscriptionTask = new SubscriptionTask();
117119

118120
private volatile RedisSerializer<String> serializer = new StringRedisSerializer();
119-
120-
121+
122+
// will recovery redis listening after recovery tnterval setting
123+
private long recoveryInterval = 3000L;
124+
// will recovery redis listening until reach the limit ( -1 will always try again)
125+
private int multipleRetries = -1;
126+
// the retried times
127+
private int numberOfRetries = 0;
121128

122129
public void afterPropertiesSet() {
123130
if (taskExecutor == null) {
@@ -632,7 +639,45 @@ private void remove(MessageListener listener, Topic topic, ByteArrayWrapper hold
632639
}
633640
}
634641

642+
/**
643+
* Handle subscription task exception, will recovery redis listening on RedisConnectionFailureExceptions.
644+
* @param t Throwable exception
645+
*/
646+
protected void refreshConnectionUntilSuccessful(Throwable t) {
647+
// we need to specifically deal with RedisConnectionFailureExceptions
648+
// so we aren't attempting to reconnect if other types of failures occur.
649+
if (t instanceof RedisConnectionFailureException) {
650+
651+
if (multipleRetries > 0 && numberOfRetries > multipleRetries) {
652+
logger.error("Retry count has reached the upper limit " + multipleRetries);
653+
return;
654+
}
655+
656+
subscriptionExecutor.execute(new Runnable() {
635657

658+
public void run() {
659+
if (multipleRetries > 0) {
660+
numberOfRetries++;
661+
}
662+
try {
663+
stop();
664+
} catch (Throwable ex) {
665+
// as we know the stop will throw JedisConnectionException on jedis,
666+
// catch Throwable will not need to import any jedis classes.
667+
// The exception will be ignored.
668+
}
669+
logger.error("Restarting subscription task after " + recoveryInterval + "ms.");
670+
try {
671+
Thread.sleep(recoveryInterval);
672+
} catch (InterruptedException e) {
673+
return;
674+
}
675+
start();
676+
}
677+
});
678+
}
679+
}
680+
636681
/**
637682
* Runnable used for Redis subscription. Implemented as a dedicated class to provide as many hints
638683
* as possible to the underlying thread pool.
@@ -721,6 +766,9 @@ public void run() {
721766
else {
722767
connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet()));
723768
}
769+
} catch (Throwable t) {
770+
logger.error("SubscriptionTask abort with exception:", t);
771+
refreshConnectionUntilSuccessful(t);
724772
} finally {
725773
// this block is executed once the subscription thread has ended, this may or may not mean
726774
// the connection has been unsubscribed, depending on driver
@@ -870,4 +918,22 @@ public void run() {
870918
});
871919
}
872920
}
921+
922+
/**
923+
* Set the recovery interval value(ms), must be greater than zero
924+
* @param recoveryInterval
925+
*/
926+
public void setRecoveryInterval(long recoveryInterval) {
927+
if (recoveryInterval > 0) {
928+
this.recoveryInterval = recoveryInterval;
929+
}
930+
}
931+
932+
/**
933+
* Set the multiple retries times, less than zero on behalf of the infinite.
934+
* @param multipleRetries
935+
*/
936+
public void setMultipleRetries(int multipleRetries) {
937+
this.multipleRetries = multipleRetries;
938+
}
873939
}

0 commit comments

Comments
 (0)