Skip to content

Commit dea6403

Browse files
christophstroblThomas Darimont
authored andcommitted
DATAREDIS-166 - Corrupted i/o streams subscribing to both patterns and channels.
The issue has already been fixed in DATAREDIS-242 (see PR: spring-projects#25), which now allows to reenable the previously failing test. Additionally the tests now run against all supported client libraries except JRedis which does not support 'publish'. Original pull request: spring-projects#30
1 parent d4c5990 commit dea6403

File tree

1 file changed

+72
-28
lines changed

1 file changed

+72
-28
lines changed

src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2013 the original author or authors.
2+
* Copyright 2011-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,14 +16,13 @@
1616

1717
package org.springframework.data.redis.listener;
1818

19-
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertFalse;
21-
import static org.junit.Assert.assertNull;
22-
import static org.junit.Assert.assertTrue;
23-
import static org.junit.Assume.assumeTrue;
19+
import static org.hamcrest.core.Is.*;
20+
import static org.junit.Assert.*;
21+
import static org.junit.Assume.*;
2422

2523
import java.util.ArrayList;
2624
import java.util.Arrays;
25+
import java.util.Collection;
2726
import java.util.HashSet;
2827
import java.util.LinkedHashSet;
2928
import java.util.List;
@@ -36,38 +35,50 @@
3635
import org.junit.AfterClass;
3736
import org.junit.Before;
3837
import org.junit.BeforeClass;
39-
import org.junit.Ignore;
4038
import org.junit.Test;
41-
import org.springframework.beans.factory.DisposableBean;
39+
import org.junit.runner.RunWith;
40+
import org.junit.runners.Parameterized;
41+
import org.junit.runners.Parameterized.Parameters;
4242
import org.springframework.core.task.SimpleAsyncTaskExecutor;
4343
import org.springframework.core.task.SyncTaskExecutor;
4444
import org.springframework.data.redis.ConnectionFactoryTracker;
4545
import org.springframework.data.redis.RedisTestProfileValueSource;
4646
import org.springframework.data.redis.SettingsUtils;
47+
import org.springframework.data.redis.connection.ConnectionUtils;
4748
import org.springframework.data.redis.connection.RedisConnectionFactory;
4849
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
50+
import org.springframework.data.redis.connection.jredis.JredisConnectionFactory;
51+
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
52+
import org.springframework.data.redis.connection.srp.SrpConnectionFactory;
4953
import org.springframework.data.redis.core.RedisTemplate;
5054
import org.springframework.data.redis.core.StringRedisTemplate;
5155
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
5256

5357
/**
5458
* @author Costin Leau
5559
* @author Jennifer Hickey
60+
* @author Christoph Strobl
5661
*/
62+
@RunWith(Parameterized.class)
5763
public class PubSubResubscribeTests {
5864

5965
private static final String CHANNEL = "pubsub::test";
6066

61-
protected RedisMessageListenerContainer container;
62-
protected RedisConnectionFactory factory;
67+
private final BlockingDeque<String> bag = new LinkedBlockingDeque<String>(99);
68+
private final Object handler = new MessageHandler("handler1", bag);
69+
private final MessageListenerAdapter adapter = new MessageListenerAdapter(handler);
6370

64-
@SuppressWarnings("rawtypes") protected RedisTemplate template;
71+
private RedisMessageListenerContainer container;
72+
private RedisConnectionFactory factory;
6573

66-
private final BlockingDeque<String> bag = new LinkedBlockingDeque<String>(99);
74+
@SuppressWarnings("rawtypes")//
75+
private RedisTemplate template;
6776

68-
private final Object handler = new MessageHandler("handler1", bag);
77+
public PubSubResubscribeTests(RedisConnectionFactory connectionFactory) {
6978

70-
private final MessageListenerAdapter adapter = new MessageListenerAdapter(handler);
79+
this.factory = connectionFactory;
80+
ConnectionFactoryTracker.add(factory);
81+
}
7182

7283
@BeforeClass
7384
public static void shouldRun() {
@@ -79,21 +90,52 @@ public static void cleanUp() {
7990
ConnectionFactoryTracker.cleanUp();
8091
}
8192

82-
@Before
83-
public void setUp() throws Exception {
93+
@Parameters
94+
public static Collection<Object[]> testParams() {
95+
96+
int port = SettingsUtils.getPort();
97+
String host = SettingsUtils.getHost();
98+
99+
// Jedis
84100
JedisConnectionFactory jedisConnFactory = new JedisConnectionFactory();
85101
jedisConnFactory.setUsePool(true);
86-
jedisConnFactory.setPort(SettingsUtils.getPort());
87-
jedisConnFactory.setHostName(SettingsUtils.getHost());
102+
jedisConnFactory.setPort(port);
103+
jedisConnFactory.setHostName(host);
88104
jedisConnFactory.setDatabase(2);
89105
jedisConnFactory.afterPropertiesSet();
90106

91-
factory = jedisConnFactory;
107+
// Lettuce
108+
LettuceConnectionFactory lettuceConnFactory = new LettuceConnectionFactory();
109+
lettuceConnFactory.setPort(port);
110+
lettuceConnFactory.setHostName(host);
111+
lettuceConnFactory.setDatabase(2);
112+
lettuceConnFactory.setValidateConnection(true);
113+
lettuceConnFactory.afterPropertiesSet();
114+
115+
// SRP
116+
SrpConnectionFactory srpConnFactory = new SrpConnectionFactory();
117+
srpConnFactory.setPort(port);
118+
srpConnFactory.setHostName(host);
119+
srpConnFactory.afterPropertiesSet();
120+
121+
// JRedis
122+
JredisConnectionFactory jRedisConnectionFactory = new JredisConnectionFactory();
123+
jRedisConnectionFactory.setPort(port);
124+
jRedisConnectionFactory.setHostName(host);
125+
jRedisConnectionFactory.setDatabase(2);
126+
jRedisConnectionFactory.afterPropertiesSet();
127+
128+
return Arrays.asList(new Object[][] { { jedisConnFactory }, { lettuceConnFactory }, { srpConnFactory },
129+
{ jRedisConnectionFactory } });
130+
}
92131

93-
template = new StringRedisTemplate(jedisConnFactory);
94-
ConnectionFactoryTracker.add(template.getConnectionFactory());
132+
@Before
133+
public void setUp() throws Exception {
95134

96-
bag.clear();
135+
// JredisConnection#publish is currently not supported -> tests would fail
136+
assumeThat(ConnectionUtils.isJredis(factory), is(false));
137+
138+
template = new StringRedisTemplate(factory);
97139

98140
adapter.setSerializer(template.getValueSerializer());
99141
adapter.afterPropertiesSet();
@@ -111,14 +153,13 @@ public void setUp() throws Exception {
111153
}
112154

113155
@After
114-
public void tearDown() throws Exception {
115-
container.destroy();
116-
((DisposableBean) factory).destroy();
117-
Thread.sleep(1000);
156+
public void tearDown() {
157+
bag.clear();
118158
}
119159

120160
@Test
121161
public void testContainerPatternResubscribe() throws Exception {
162+
122163
String payload1 = "do";
123164
String payload2 = "re mi";
124165

@@ -171,6 +212,7 @@ public void testContainerPatternResubscribe() throws Exception {
171212

172213
@Test
173214
public void testContainerChannelResubscribe() throws Exception {
215+
174216
String payload1 = "do";
175217
String payload2 = "re mi";
176218

@@ -208,8 +250,9 @@ public void testContainerChannelResubscribe() throws Exception {
208250
*
209251
* @throws Exception
210252
*/
211-
@Ignore("DATAREDIS-166 Intermittent corrupted input/output streams subscribing to both patterns and channels in RMLC")
253+
@Test
212254
public void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exception {
255+
213256
container.removeMessageListener(adapter);
214257
container.stop();
215258
container.addMessageListener(adapter,
@@ -230,17 +273,18 @@ public void testInitializeContainerWithMultipleTopicsIncludingPattern() throws E
230273
}
231274

232275
private class MessageHandler {
276+
233277
private final BlockingDeque<String> bag;
234278
private final String name;
235279

236280
public MessageHandler(String name, BlockingDeque<String> bag) {
281+
237282
this.bag = bag;
238283
this.name = name;
239284
}
240285

241286
@SuppressWarnings("unused")
242287
public void handleMessage(String message) {
243-
System.out.println(name + ": " + message);
244288
bag.add(message);
245289
}
246290
}

0 commit comments

Comments
 (0)