Skip to content

Commit 7da09ea

Browse files
mp911dechristophstrobl
authored andcommitted
DATAREDIS-415 - Handle InterruptedException correctly in RedisMessageListenerContainer.
Set interrupted bit when catching InterruptedException. Exit loops when catching InterruptedException to free resources. Original Pull Request: spring-projects#168
1 parent 80f8867 commit 7da09ea

File tree

3 files changed

+129
-9
lines changed

3 files changed

+129
-9
lines changed

src/main/java/org/springframework/data/redis/connection/ClusterCommandExecutor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,11 @@ private <T> Map<RedisClusterNode, T> collectResults(Map<RedisClusterNode, Future
242242
exceptions.put(entry.getKey(), ex != null ? ex : e.getCause());
243243
} catch (InterruptedException e) {
244244

245+
Thread.currentThread().interrupt();
246+
245247
RuntimeException ex = convertToDataAccessExeption((Exception) e.getCause());
246248
exceptions.put(entry.getKey(), ex != null ? ex : e.getCause());
249+
break;
247250
}
248251
}
249252
}

src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
* @author Jennifer Hickey
6868
* @author Way Joke
6969
* @author Thomas Darimont
70+
* @author Mark Paluch
7071
*/
7172
public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
7273

@@ -204,6 +205,9 @@ public void start() {
204205
monitor.wait(initWait);
205206
} catch (InterruptedException e) {
206207
// stop waiting
208+
Thread.currentThread().interrupt();
209+
running = false;
210+
return;
207211
}
208212
}
209213
}
@@ -658,6 +662,7 @@ protected void sleepBeforeRecoveryAttempt() {
658662
Thread.sleep(this.recoveryInterval);
659663
} catch (InterruptedException interEx) {
660664
logger.debug("Thread interrupted while sleeping the recovery interval");
665+
Thread.currentThread().interrupt();
661666
}
662667
}
663668
}
@@ -700,7 +705,8 @@ public void run() {
700705
try {
701706
Thread.sleep(WAIT);
702707
} catch (InterruptedException ex) {
703-
done = true;
708+
Thread.currentThread().interrupt();
709+
return;
704710
}
705711
}
706712
}
@@ -730,7 +736,7 @@ public void run() {
730736

731737
boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory);
732738

733-
// NB: async drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
739+
// NB: sync drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
734740
if (!asyncConnection) {
735741
synchronized (monitor) {
736742
monitor.notify();
@@ -849,6 +855,7 @@ void cancel() {
849855
localMonitor.wait(subscriptionWait);
850856
} catch (InterruptedException e) {
851857
// Stop waiting
858+
Thread.currentThread().interrupt();
852859
}
853860
}
854861
if (!subscriptionTaskRunning) {
@@ -1004,16 +1011,18 @@ static boolean waitFor(Condition condition, long timeout) {
10041011

10051012
long startTime = System.currentTimeMillis();
10061013

1007-
while (!timedOut(startTime, timeout)) {
1008-
if (condition.passes()) {
1009-
return true;
1010-
}
1011-
try {
1014+
try {
1015+
while (!timedOut(startTime, timeout)) {
1016+
if (condition.passes()) {
1017+
return true;
1018+
}
1019+
10121020
Thread.sleep(100);
1013-
} catch (InterruptedException e) {
1014-
Thread.currentThread().interrupt();
10151021
}
1022+
} catch (InterruptedException e) {
1023+
Thread.currentThread().interrupt();
10161024
}
1025+
10171026
return false;
10181027
}
10191028

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.data.redis.listener;
18+
19+
import static org.hamcrest.core.Is.is;
20+
import static org.junit.Assert.assertThat;
21+
import static org.mockito.Matchers.any;
22+
import static org.mockito.Mockito.doAnswer;
23+
24+
import java.util.concurrent.Executor;
25+
26+
import org.junit.After;
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
import org.junit.runner.RunWith;
30+
import org.mockito.Mock;
31+
import org.mockito.invocation.InvocationOnMock;
32+
import org.mockito.runners.MockitoJUnitRunner;
33+
import org.mockito.stubbing.Answer;
34+
import org.springframework.core.task.SyncTaskExecutor;
35+
import org.springframework.data.redis.SettingsUtils;
36+
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
37+
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
38+
39+
/**
40+
* @author Mark Paluch
41+
*/
42+
@RunWith(MockitoJUnitRunner.class)
43+
public class RedisMessageListenerContainerTest {
44+
45+
private JedisConnectionFactory connectionFactory;
46+
private RedisMessageListenerContainer container;
47+
48+
private final Object handler = new Object() {
49+
@SuppressWarnings("unused")
50+
public void handleMessage(Object message) {
51+
}
52+
};
53+
54+
private final MessageListenerAdapter adapter = new MessageListenerAdapter(handler);
55+
56+
@Mock private Executor executor;
57+
58+
@Before
59+
public void before() throws Exception {
60+
61+
connectionFactory = new JedisConnectionFactory();
62+
connectionFactory.setPort(SettingsUtils.getPort());
63+
connectionFactory.setHostName(SettingsUtils.getHost());
64+
connectionFactory.setDatabase(2);
65+
66+
connectionFactory.afterPropertiesSet();
67+
68+
container = new RedisMessageListenerContainer();
69+
container.setConnectionFactory(connectionFactory);
70+
container.setBeanName("container");
71+
container.setTaskExecutor(new SyncTaskExecutor());
72+
container.setSubscriptionExecutor(executor);
73+
container.afterPropertiesSet();
74+
}
75+
76+
/*
77+
* @see DATAREDIS-415
78+
*/
79+
@Test
80+
public void interruptAtStart() throws Exception {
81+
82+
final Thread main = Thread.currentThread();
83+
84+
// interrupt thread once Executor.execute is called
85+
doAnswer(new Answer() {
86+
@Override
87+
public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
88+
main.interrupt();
89+
return null;
90+
}
91+
}).when(executor).execute(any(Runnable.class));
92+
93+
container.addMessageListener(adapter, new ChannelTopic("a"));
94+
container.start();
95+
96+
// reset the interrupted flag to not destroy the teardown
97+
assertThat(Thread.interrupted(), is(true));
98+
99+
assertThat(container.isRunning(), is(false));
100+
}
101+
102+
@After
103+
public void tearDown() throws Exception {
104+
105+
container.destroy();
106+
connectionFactory.destroy();
107+
}
108+
}

0 commit comments

Comments
 (0)