Skip to content

Commit 1996355

Browse files
committed
allow for configuring underlying library
1 parent 41c95ea commit 1996355

File tree

8 files changed

+168
-40
lines changed

8 files changed

+168
-40
lines changed

src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
1010
import org.sourcelab.storm.spout.redis.client.Client;
11+
import org.sourcelab.storm.spout.redis.client.ClientFactory;
1112
import org.sourcelab.storm.spout.redis.client.Consumer;
1213
import org.sourcelab.storm.spout.redis.client.lettuce.LettuceClient;
1314
import org.sourcelab.storm.spout.redis.funnel.ConsumerFunnel;
@@ -85,7 +86,7 @@ public void open(
8586

8687
// Create consumer and client
8788
final int taskIndex = topologyContext.getThisTaskIndex();
88-
final Client client = new LettuceClient(config, taskIndex);
89+
final Client client = new ClientFactory().createClient(config, taskIndex);
8990
final Consumer consumer = new Consumer(config, client, (ConsumerFunnel) funnel);
9091

9192
// Create background consuming thread.

src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfig.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.sourcelab.storm.spout.redis;
22

3+
import org.sourcelab.storm.spout.redis.client.ClientType;
34
import org.sourcelab.storm.spout.redis.failhandler.NoRetryHandler;
45

56
import java.io.Serializable;
@@ -70,6 +71,11 @@ public class RedisStreamSpoutConfig implements Serializable {
7071
*/
7172
private final boolean metricsEnabled;
7273

74+
/**
75+
* Defines which underlying client library/implementation to use.
76+
*/
77+
private final ClientType clientType;
78+
7379
/**
7480
* Constructor.
7581
* Use Builder instance.
@@ -85,7 +91,7 @@ private RedisStreamSpoutConfig(
8591

8692
// Other settings
8793
final int maxConsumePerRead, final int maxTupleQueueSize, final int maxAckQueueSize, final long consumerDelayMillis,
88-
final boolean metricsEnabled
94+
final boolean metricsEnabled, final ClientType clientType
8995
) {
9096
// Connection
9197
if (redisCluster != null && redisServer != null) {
@@ -117,6 +123,9 @@ private RedisStreamSpoutConfig(
117123
this.maxAckQueueSize = maxAckQueueSize;
118124
this.consumerDelayMillis = consumerDelayMillis;
119125
this.metricsEnabled = metricsEnabled;
126+
127+
// Client type implementation
128+
this.clientType = Objects.requireNonNull(clientType);
120129
}
121130

122131
public String getStreamKey() {
@@ -185,6 +194,10 @@ public boolean isMetricsEnabled() {
185194
return metricsEnabled;
186195
}
187196

197+
public ClientType getClientType() {
198+
return clientType;
199+
}
200+
188201
/**
189202
* Create a new Builder instance.
190203
* @return Builder for Configuration instance.
@@ -229,6 +242,12 @@ public static final class Builder {
229242
private long consumerDelayMillis = 1000L;
230243
private boolean metricsEnabled = true;
231244

245+
/**
246+
* Underlying library to use.
247+
* Defaults to using Lettuce.
248+
*/
249+
private ClientType clientType = ClientType.LETTUCE;
250+
232251
private Builder() {
233252
}
234253

@@ -396,6 +415,19 @@ public Builder withMetricsEnabled(final boolean enabled) {
396415
return this;
397416
}
398417

418+
public Builder withLettuceClientLibrary() {
419+
return withClientType(ClientType.LETTUCE);
420+
}
421+
422+
public Builder withJedisClientLibrary() {
423+
return withClientType(ClientType.JEDIS);
424+
}
425+
426+
public Builder withClientType(final ClientType clientType) {
427+
this.clientType = Objects.requireNonNull(clientType);
428+
return this;
429+
}
430+
399431
/**
400432
* Creates new Configuration instance.
401433
* @return Configuration instance.
@@ -416,7 +448,10 @@ public RedisStreamSpoutConfig build() {
416448
tupleConverter, failureHandler,
417449
// Other settings
418450
maxConsumePerRead, maxTupleQueueSize, maxAckQueueSize, consumerDelayMillis,
419-
metricsEnabled
451+
metricsEnabled,
452+
453+
// Underlying client type
454+
clientType
420455
);
421456
}
422457
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.sourcelab.storm.spout.redis.client;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
6+
import org.sourcelab.storm.spout.redis.client.jedis.JedisClient;
7+
import org.sourcelab.storm.spout.redis.client.lettuce.LettuceClient;
8+
9+
import java.util.Objects;
10+
11+
/**
12+
* Factory for creating the appropriate Client instance based on config.
13+
*/
14+
public class ClientFactory {
15+
private static final Logger logger = LoggerFactory.getLogger(ClientFactory.class);
16+
17+
/**
18+
* Create the appropriate client intance based on configuration.
19+
* @param config Spout configuration.
20+
* @param instanceId Instance id of spout.
21+
* @return Client.
22+
*/
23+
public Client createClient(final RedisStreamSpoutConfig config, final int instanceId) {
24+
Objects.requireNonNull(config);
25+
26+
switch (config.getClientType()) {
27+
case JEDIS:
28+
logger.info("Using Jedis client library.");
29+
return new JedisClient(config, instanceId);
30+
case LETTUCE:
31+
logger.info("Using Lettuce client library.");
32+
return new LettuceClient(config, instanceId);
33+
default:
34+
throw new IllegalStateException("Unknown/Unhandled Client Type");
35+
}
36+
}
37+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.sourcelab.storm.spout.redis.client;
2+
3+
/**
4+
* Defines allowed implementations.
5+
*/
6+
public enum ClientType {
7+
LETTUCE,
8+
JEDIS;
9+
}

src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClient.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,6 @@ public JedisClient(final RedisStreamSpoutConfig config, final int instanceId) {
4545
);
4646
}
4747

48-
private static JedisAdapter createAdapter(final RedisStreamSpoutConfig config, final int instanceId) {
49-
final String connectStr = config.getConnectString().replaceAll("redis://", "");
50-
if (config.isConnectingToCluster()) {
51-
logger.info("Connecting to RedisCluster at {}", config.getConnectStringMasked());
52-
return new JedisClusterAdapter(new JedisCluster(HostAndPort.parseString(connectStr)), config, instanceId);
53-
} else {
54-
logger.info("Connecting to RedisCluster at {}", config.getConnectStringMasked());
55-
return new JedisRedisAdapter(new Jedis(HostAndPort.parseString(connectStr)), config, instanceId);
56-
}
57-
}
58-
5948
/**
6049
* Protected constructor for injecting a RedisClient instance, typically for tests.
6150
* @param adapter JedisAdapter instance.
@@ -111,4 +100,20 @@ public void commitMessage(final String msgId) {
111100
public void disconnect() {
112101
adapter.close();
113102
}
103+
104+
/**
105+
* Factory method for creating the appropriate adapter based on configuration.
106+
* @param config Spout configuration.
107+
* @return Appropriate Adapter.
108+
*/
109+
private static JedisAdapter createAdapter(final RedisStreamSpoutConfig config, final int instanceId) {
110+
final String connectStr = config.getConnectString().replaceAll("redis://", "");
111+
if (config.isConnectingToCluster()) {
112+
logger.info("Connecting to RedisCluster at {}", config.getConnectStringMasked());
113+
return new JedisClusterAdapter(new JedisCluster(HostAndPort.parseString(connectStr)), config, instanceId);
114+
} else {
115+
logger.info("Connecting to RedisCluster at {}", config.getConnectStringMasked());
116+
return new JedisRedisAdapter(new Jedis(HostAndPort.parseString(connectStr)), config, instanceId);
117+
}
118+
}
114119
}

src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClient.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,6 @@ public LettuceClient(final RedisStreamSpoutConfig config, final int instanceId)
6767
);
6868
}
6969

70-
private static LettuceAdapter createAdapter(final RedisStreamSpoutConfig config) {
71-
if (config.isConnectingToCluster()) {
72-
logger.info("Connecting to RedisCluster at {}", config.getConnectStringMasked());
73-
return new LettuceClusterAdapter(RedisClusterClient.create(config.getConnectString()));
74-
} else {
75-
logger.info("Connecting to Redis server at {}", config.getConnectStringMasked());
76-
return new LettuceRedisAdapter(RedisClient.create(config.getConnectString()));
77-
}
78-
}
79-
8070
/**
8171
* Protected constructor for injecting a RedisClient instance, typically for tests.
8272
* @param config Configuration.
@@ -188,4 +178,19 @@ public void commitMessage(final String msgId) {
188178
public void disconnect() {
189179
adapter.shutdown();
190180
}
181+
182+
/**
183+
* Factory method for creating the appropriate adapter based on configuration.
184+
* @param config Spout configuration.
185+
* @return Appropriate Adapter.
186+
*/
187+
private static LettuceAdapter createAdapter(final RedisStreamSpoutConfig config) {
188+
if (config.isConnectingToCluster()) {
189+
logger.info("Connecting to RedisCluster at {}", config.getConnectStringMasked());
190+
return new LettuceClusterAdapter(RedisClusterClient.create(config.getConnectString()));
191+
} else {
192+
logger.info("Connecting to Redis server at {}", config.getConnectStringMasked());
193+
return new LettuceRedisAdapter(RedisClient.create(config.getConnectString()));
194+
}
195+
}
191196
}

src/test/java/org/sourcelab/storm/spout/redis/AbstractRedisStreamSpoutIntegrationTest.java

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import org.junit.jupiter.api.AfterEach;
1010
import org.junit.jupiter.api.BeforeEach;
1111
import org.junit.jupiter.api.Test;
12+
import org.junit.jupiter.params.ParameterizedTest;
13+
import org.junit.jupiter.params.provider.EnumSource;
14+
import org.sourcelab.storm.spout.redis.client.ClientType;
1215
import org.sourcelab.storm.spout.redis.example.TestTupleConverter;
1316
import org.sourcelab.storm.spout.redis.failhandler.RetryFailedTuples;
1417
import org.sourcelab.storm.spout.redis.util.outputcollector.EmittedTuple;
@@ -101,8 +104,12 @@ void cleanup() {
101104
/**
102105
* Most basic lifecycle smoke test.
103106
*/
104-
@Test
105-
void smokeTest_openAndClose() {
107+
@ParameterizedTest
108+
@EnumSource(ClientType.class)
109+
void smokeTest_openAndClose(final ClientType clientType) {
110+
// Inject client type into config
111+
configBuilder.withClientType(clientType);
112+
106113
// Create spout
107114
try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) {
108115

@@ -121,8 +128,12 @@ void smokeTest_openAndClose() {
121128
/**
122129
* Basic lifecycle smoke test.
123130
*/
124-
@Test
125-
void smokeTest_openActivateDeactivateAndClose() throws InterruptedException {
131+
@ParameterizedTest
132+
@EnumSource(ClientType.class)
133+
void smokeTest_openActivateDeactivateAndClose(final ClientType clientType) throws InterruptedException {
134+
// Inject client type into config
135+
configBuilder.withClientType(clientType);
136+
126137
// Create spout
127138
try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) {
128139
final StubSpoutCollector collector = new StubSpoutCollector();
@@ -150,7 +161,12 @@ void smokeTest_openActivateDeactivateAndClose() throws InterruptedException {
150161
*
151162
* Disabled for now.
152163
*/
153-
void smokeTest_configureInvalidRedisHost() throws InterruptedException {
164+
@ParameterizedTest
165+
@EnumSource(ClientType.class)
166+
void smokeTest_configureInvalidRedisHost(final ClientType clientType) throws InterruptedException {
167+
// Inject client type into config
168+
configBuilder.withClientType(clientType);
169+
154170
// Lets override the redis host with something invalid
155171
configBuilder
156172
.withServer(getTestContainer().getHost(), 124);
@@ -186,8 +202,12 @@ void smokeTest_configureInvalidRedisHost() throws InterruptedException {
186202
/**
187203
* Basic usage test.
188204
*/
189-
@Test
190-
void smokeTest_consumeAndAckMessages() throws InterruptedException {
205+
@ParameterizedTest
206+
@EnumSource(ClientType.class)
207+
void smokeTest_consumeAndAckMessages(final ClientType clientType) throws InterruptedException {
208+
// Inject client type into config
209+
configBuilder.withClientType(clientType);
210+
191211
// Create spout
192212
try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) {
193213
final StubSpoutCollector collector = new StubSpoutCollector();
@@ -274,8 +294,12 @@ void smokeTest_consumeAndAckMessages() throws InterruptedException {
274294
/**
275295
* Basic usage with retry failure handler.
276296
*/
277-
@Test
278-
void smokeTest_consumeFailAndAckMessages() throws InterruptedException {
297+
@ParameterizedTest
298+
@EnumSource(ClientType.class)
299+
void smokeTest_consumeFailAndAckMessages(final ClientType clientType) throws InterruptedException {
300+
// Inject client type into config
301+
configBuilder.withClientType(clientType);
302+
279303
// Swap out failure handler
280304
configBuilder.withFailureHandler(new RetryFailedTuples(2));
281305

@@ -397,8 +421,12 @@ void smokeTest_consumeFailAndAckMessages() throws InterruptedException {
397421
/**
398422
* Verify declareOutputFields using TestTupleConverter.
399423
*/
400-
@Test
401-
void test_declareOutputFields() {
424+
@ParameterizedTest
425+
@EnumSource(ClientType.class)
426+
void test_declareOutputFields(final ClientType clientType) {
427+
// Inject client type into config
428+
configBuilder.withClientType(clientType);
429+
402430
// Create a test implementation
403431
final TupleConverter converter = new DummyTupleConverter() ;
404432

@@ -444,8 +472,12 @@ void test_declareOutputFields() {
444472
/**
445473
* Verify spout emits tuples down the correct stream.
446474
*/
447-
@Test
448-
void test_EmitDownSeparateStreams() {
475+
@ParameterizedTest
476+
@EnumSource(ClientType.class)
477+
void test_EmitDownSeparateStreams(final ClientType clientType) {
478+
// Inject client type into config
479+
configBuilder.withClientType(clientType);
480+
449481
// Create a test implementation
450482
final TupleConverter converter = new DummyTupleConverter() ;
451483

@@ -503,8 +535,12 @@ void test_EmitDownSeparateStreams() {
503535
* Verify if tuple converter instance returns null, then the message
504536
* is simply acked and nothing is emitted.
505537
*/
506-
@Test
507-
void test_NullConversionJustGetsAckedNothingEmitted() {
538+
@ParameterizedTest
539+
@EnumSource(ClientType.class)
540+
void test_NullConversionJustGetsAckedNothingEmitted(final ClientType clientType) {
541+
// Inject client type into config
542+
configBuilder.withClientType(clientType);
543+
508544
// Create a test implementation
509545
final TupleConverter converter = new NullTupleConverter() ;
510546

src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpout_ClusterIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import org.testcontainers.junit.jupiter.Testcontainers;
77

88
/**
9-
*
9+
* Runs Spout integration tests against a RedisCluster.
1010
*/
1111
@Testcontainers
1212
@Tag("Integration")

0 commit comments

Comments
 (0)