Skip to content

Commit 5f6c726

Browse files
authored
Merge pull request #6 from SourceLabOrg/sp/jedisImplementation
Add implementation using Jedis client library
2 parents ba98ff3 + f466614 commit 5f6c726

24 files changed

+971
-129
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,11 @@
22
The format is based on [Keep a Changelog](http://keepachangelog.com/)
33
and this project adheres to [Semantic Versioning](http://semver.org/).
44

5+
## 1.1.0 (07/24/2020)
6+
- Add Jedis implementation. Spout defaults to using the Lettuce redis library, but you can configure
7+
to use the Jedis library instead via the `RedisStreamSpoutConfig.withJedisClientLibrary()` method.
8+
- Bugfix on Spout deploy, consumer thread started during `open()` lifecycle call instead of `activate()`.
9+
- Bugfix on Spout restart, resume consuming first from consumers personal pending list.
10+
511
## 1.0.0 (07/20/2020)
612
- Initial release!

pom.xml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>org.sourcelab.storm.spout</groupId>
88
<artifactId>redis-stream-spout</artifactId>
9-
<version>1.0.0</version>
9+
<version>1.1.0</version>
1010

1111
<!-- Module Description and Ownership -->
1212
<name>Redis Streams Spout for Apache Storm.</name>
@@ -48,6 +48,7 @@
4848

4949
<!-- Redis Client -->
5050
<lettuceVersion>5.3.1.RELEASE</lettuceVersion>
51+
<jedisVersion>3.2.0</jedisVersion>
5152

5253
<!-- Define which version of JUnit 5 to -->
5354
<junit5Version>5.6.2</junit5Version>
@@ -78,6 +79,11 @@
7879
<artifactId>lettuce-core</artifactId>
7980
<version>${lettuceVersion}</version>
8081
</dependency>
82+
<dependency>
83+
<groupId>redis.clients</groupId>
84+
<artifactId>jedis</artifactId>
85+
<version>${jedisVersion}</version>
86+
</dependency>
8187

8288
<!-- Testing Dependencies -->
8389
<dependency>
@@ -120,6 +126,14 @@
120126
<version>${testContainersVersion}</version>
121127
<scope>test</scope>
122128
</dependency>
129+
130+
<!-- Async Testing Helper -->
131+
<dependency>
132+
<groupId>org.awaitility</groupId>
133+
<artifactId>awaitility</artifactId>
134+
<version>4.0.3</version>
135+
<scope>test</scope>
136+
</dependency>
123137
</dependencies>
124138

125139
<build>

src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
1010
import org.sourcelab.storm.spout.redis.client.Client;
11+
import org.sourcelab.storm.spout.redis.client.ClientFactory;
1112
import org.sourcelab.storm.spout.redis.client.Consumer;
12-
import org.sourcelab.storm.spout.redis.client.LettuceClient;
13+
import org.sourcelab.storm.spout.redis.client.lettuce.LettuceClient;
1314
import org.sourcelab.storm.spout.redis.funnel.ConsumerFunnel;
1415
import org.sourcelab.storm.spout.redis.funnel.MemoryFunnel;
1516
import org.sourcelab.storm.spout.redis.funnel.SpoutFunnel;
@@ -83,16 +84,8 @@ public void open(
8384
// Create funnel instance.
8485
this.funnel = new MemoryFunnel(config, spoutConfig, topologyContext);
8586

86-
// Create consumer and client
87-
final int taskIndex = topologyContext.getThisTaskIndex();
88-
final Client client = new LettuceClient(config, taskIndex);
89-
final Consumer consumer = new Consumer(config, client, (ConsumerFunnel) funnel);
90-
91-
// Create background consuming thread.
92-
consumerThread = new Thread(
93-
consumer,
94-
"RedisStreamSpout-ConsumerThread[" + taskIndex + "]"
95-
);
87+
// Create and start consumer thread.
88+
createAndStartConsumerThread();
9689
}
9790

9891
@Override
@@ -103,12 +96,15 @@ public void close() {
10396

10497
@Override
10598
public void activate() {
106-
if (consumerThread.isAlive()) {
99+
// If the thread is already running and alive
100+
if (consumerThread != null && consumerThread.isAlive()) {
107101
// No-op. It's already running, and deactivate() is a no-op for us.
108102
return;
109103
}
110-
// Start thread, this should return immediately, but start a background processing thread.
111-
consumerThread.start();
104+
105+
// If we haven't created the consumer thread yet, or it has previously died.
106+
// Create and start it
107+
createAndStartConsumerThread();
112108
}
113109

114110
@Override
@@ -186,4 +182,21 @@ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
186182
public Map<String, Object> getComponentConfiguration() {
187183
return new HashMap<>();
188184
}
185+
186+
/**
187+
* Create background consumer thread.
188+
*/
189+
private void createAndStartConsumerThread() {
190+
// Create consumer and client
191+
final int taskIndex = topologyContext.getThisTaskIndex();
192+
final Client client = new ClientFactory().createClient(config, taskIndex);
193+
final Consumer consumer = new Consumer(config, client, (ConsumerFunnel) funnel);
194+
195+
// Create background consuming thread.
196+
consumerThread = new Thread(
197+
consumer,
198+
"RedisStreamSpout-ConsumerThread[" + taskIndex + "]"
199+
);
200+
consumerThread.start();
201+
}
189202
}

src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfig.java

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.sourcelab.storm.spout.redis;
22

3+
import org.sourcelab.storm.spout.redis.client.ClientType;
34
import org.sourcelab.storm.spout.redis.failhandler.NoRetryHandler;
45

56
import java.io.Serializable;
@@ -70,6 +71,11 @@ public class RedisStreamSpoutConfig implements Serializable {
7071
*/
7172
private final boolean metricsEnabled;
7273

74+
/**
75+
* Defines which underlying client library/implementation to use.
76+
*/
77+
private final ClientType clientType;
78+
7379
/**
7480
* Constructor.
7581
* Use Builder instance.
@@ -85,7 +91,7 @@ private RedisStreamSpoutConfig(
8591

8692
// Other settings
8793
final int maxConsumePerRead, final int maxTupleQueueSize, final int maxAckQueueSize, final long consumerDelayMillis,
88-
final boolean metricsEnabled
94+
final boolean metricsEnabled, final ClientType clientType
8995
) {
9096
// Connection
9197
if (redisCluster != null && redisServer != null) {
@@ -117,6 +123,9 @@ private RedisStreamSpoutConfig(
117123
this.maxAckQueueSize = maxAckQueueSize;
118124
this.consumerDelayMillis = consumerDelayMillis;
119125
this.metricsEnabled = metricsEnabled;
126+
127+
// Client type implementation
128+
this.clientType = Objects.requireNonNull(clientType);
120129
}
121130

122131
public String getStreamKey() {
@@ -150,6 +159,17 @@ public String getConnectString() {
150159
return redisCluster.getConnectString();
151160
}
152161

162+
/**
163+
* The URI for connecting to this Redis Server instance with the password masked.
164+
* @return URI for the server.
165+
*/
166+
public String getConnectStringMasked() {
167+
if (!isConnectingToCluster()) {
168+
return redisServer.getConnectStringMasked();
169+
}
170+
return redisCluster.getConnectStringMasked();
171+
}
172+
153173
public int getMaxTupleQueueSize() {
154174
return maxTupleQueueSize;
155175
}
@@ -174,6 +194,10 @@ public boolean isMetricsEnabled() {
174194
return metricsEnabled;
175195
}
176196

197+
public ClientType getClientType() {
198+
return clientType;
199+
}
200+
177201
/**
178202
* Create a new Builder instance.
179203
* @return Builder for Configuration instance.
@@ -218,6 +242,12 @@ public static final class Builder {
218242
private long consumerDelayMillis = 1000L;
219243
private boolean metricsEnabled = true;
220244

245+
/**
246+
* Underlying library to use.
247+
* Defaults to using Lettuce.
248+
*/
249+
private ClientType clientType = ClientType.LETTUCE;
250+
221251
private Builder() {
222252
}
223253

@@ -385,6 +415,27 @@ public Builder withMetricsEnabled(final boolean enabled) {
385415
return this;
386416
}
387417

418+
/**
419+
* Configure the spout to use the Lettuce client library for communicating with redis.
420+
* @return Builder instance.
421+
*/
422+
public Builder withLettuceClientLibrary() {
423+
return withClientType(ClientType.LETTUCE);
424+
}
425+
426+
/**
427+
* Configure the spout to use the Jedis client library for communicating with redis.
428+
* @return Builder instance.
429+
*/
430+
public Builder withJedisClientLibrary() {
431+
return withClientType(ClientType.JEDIS);
432+
}
433+
434+
public Builder withClientType(final ClientType clientType) {
435+
this.clientType = Objects.requireNonNull(clientType);
436+
return this;
437+
}
438+
388439
/**
389440
* Creates new Configuration instance.
390441
* @return Configuration instance.
@@ -405,7 +456,10 @@ public RedisStreamSpoutConfig build() {
405456
tupleConverter, failureHandler,
406457
// Other settings
407458
maxConsumePerRead, maxTupleQueueSize, maxAckQueueSize, consumerDelayMillis,
408-
metricsEnabled
459+
metricsEnabled,
460+
461+
// Underlying client type
462+
clientType
409463
);
410464
}
411465
}
@@ -445,6 +499,16 @@ public String getConnectString() {
445499
.map(RedisServer::getConnectString)
446500
.collect(Collectors.joining(","));
447501
}
502+
503+
/**
504+
* The URI for connecting to this Redis Server instance with the password masked.
505+
* @return URI for the server.
506+
*/
507+
public String getConnectStringMasked() {
508+
return getServers().stream()
509+
.map(RedisServer::getConnectStringMasked)
510+
.collect(Collectors.joining(","));
511+
}
448512
}
449513

450514
/**
@@ -503,6 +567,21 @@ public String getConnectString() {
503567
return connectStr;
504568
}
505569

570+
/**
571+
* The URI for connecting to this Redis Server instance with the password masked.
572+
* @return URI for the server.
573+
*/
574+
public String getConnectStringMasked() {
575+
String connectStr = "redis://";
576+
577+
if (getPassword() != null && !getPassword().trim().isEmpty()) {
578+
connectStr += "XXXXXX@";
579+
}
580+
connectStr += getHost() + ":" + getPort();
581+
582+
return connectStr;
583+
}
584+
506585
@Override
507586
public String toString() {
508587
return "RedisServer{"
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.sourcelab.storm.spout.redis.client;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
6+
import org.sourcelab.storm.spout.redis.client.jedis.JedisClient;
7+
import org.sourcelab.storm.spout.redis.client.lettuce.LettuceClient;
8+
9+
import java.util.Objects;
10+
11+
/**
12+
* Factory for creating the appropriate Client instance based on config.
13+
*/
14+
public class ClientFactory {
15+
private static final Logger logger = LoggerFactory.getLogger(ClientFactory.class);
16+
17+
/**
18+
* Create the appropriate client intance based on configuration.
19+
* @param config Spout configuration.
20+
* @param instanceId Instance id of spout.
21+
* @return Client.
22+
*/
23+
public Client createClient(final RedisStreamSpoutConfig config, final int instanceId) {
24+
Objects.requireNonNull(config);
25+
26+
switch (config.getClientType()) {
27+
case JEDIS:
28+
logger.info("Using Jedis client library.");
29+
return new JedisClient(config, instanceId);
30+
case LETTUCE:
31+
logger.info("Using Lettuce client library.");
32+
return new LettuceClient(config, instanceId);
33+
default:
34+
throw new IllegalStateException("Unknown/Unhandled Client Type");
35+
}
36+
}
37+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.sourcelab.storm.spout.redis.client;
2+
3+
/**
4+
* Defines allowed implementations.
5+
*/
6+
public enum ClientType {
7+
LETTUCE,
8+
JEDIS;
9+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package org.sourcelab.storm.spout.redis.client.jedis;
2+
3+
import redis.clients.jedis.StreamEntry;
4+
5+
import java.util.List;
6+
import java.util.Map;
7+
8+
/**
9+
* Adapter to allow usage of both Jedis and JedisCluster.
10+
*/
11+
public interface JedisAdapter {
12+
/**
13+
* Call connect.
14+
*/
15+
void connect();
16+
17+
/**
18+
* Consume next batch of messages.
19+
* @return List of messages consumed.
20+
*/
21+
List<Map.Entry<String, List<StreamEntry>>> consume();
22+
23+
/**
24+
* Mark the provided messageId as acknowledged/completed.
25+
* @param msgId Id of the message.
26+
*/
27+
void commit(final String msgId);
28+
29+
/**
30+
* Disconnect client.
31+
*/
32+
void close();
33+
34+
/**
35+
* Advance the last offset consumed from PPL.
36+
* @param lastMsgId Id of the last msg consumed.
37+
*/
38+
void advancePplOffset(final String lastMsgId);
39+
40+
/**
41+
* Switch to consuming from latest messages.
42+
*/
43+
void switchToConsumerGroupMessages();
44+
}

0 commit comments

Comments
 (0)