Skip to content

Commit 18e6bc0

Browse files
authored
Merge pull request #5 from SourceLabOrg/sp/supportRedisCluster
Add support for communicating with both Redis instances and RedisCluster instances.
2 parents fab26fb + c27392c commit 18e6bc0

22 files changed

+1524
-779
lines changed

README.md

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ This project is an [Apache Storm](https://storm.apache.org/) Spout for consuming
66

77
### Features
88

9-
- Ability to consume from Redis Streams while maintaing state.
9+
- Ability to consume from Redis Streams while maintaining state.
10+
- Ability to consume from a single Redis server or a RedisCluster.
1011
- Parallelism supported via unique Consumer Ids.
1112

1213
### Usage & Configuration
@@ -30,9 +31,6 @@ The spout is configured using the [RedisStreamSpoutConfig](src/main/java/org/sou
3031

3132
| Property | Required | Description |
3233
|----------|----------|-------------|
33-
| `Host` | Required | The hostname to connect to Redis at. |
34-
| `Port` | Required | The port to connect to Redis at. |
35-
| `Password` | optional | Password to connect to Redis using. |
3634
| `Group Name` | Required | The Consumer group name the Spout should use. |
3735
| `Consumer Id Prefix` | Required | A prefix to use for generating unique Consumer Ids within the Consumer Group. To support multiple parallel consumers, the Spout instance will be appended to the end of this value. |
3836
| `Stream Key` | Required | The Redis key to consume messages from. |
@@ -44,17 +42,24 @@ The spout is configured using the [RedisStreamSpoutConfig](src/main/java/org/sou
4442
```java
4543
// Create config
4644
final RedisStreamSpoutConfig.Builder config = RedisStreamSpoutConfig.newBuilder()
47-
// Set Connection Properties
48-
.withHost("localhost")
49-
.withPort(6179)
45+
// If you want to connect to a single Redis instance:
46+
.withServer("localhost", 6759)
47+
48+
// OR if you want to talk to a RedisCluster:
49+
.withClusterNode("node1.hostname.com", 6759)
50+
.withClusterNode("node2.hostname.com", 6759)
51+
...
52+
5053
// Consumer Properties
5154
.withGroupName("StormConsumerGroup")
5255
.withConsumerIdPrefix("StormConsumer")
5356
.withStreamKey("RedisStreamKeyName")
54-
// Tuple Handler Class
57+
58+
// Tuple Converter instance (see note below)
5559
.withTupleConverter(..Your TupleConvertor implementation...)
56-
// Failure Handler
57-
.withFailureHandler(new RetryFailedTuples(10));
60+
61+
// Failure Handler instance (see note below)
62+
.withFailureHandler(new ExponentialBackoffFailureHandler(...));
5863

5964

6065
// Create Spout

src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
/**
2222
* Redis Stream based Spout for Apache Storm 2.2.x.
2323
*/
24-
public class RedisStreamSpout implements IRichSpout {
24+
public class RedisStreamSpout implements IRichSpout, AutoCloseable {
2525
private static final Logger logger = LoggerFactory.getLogger(RedisStreamSpout.class);
2626

2727
/**

0 commit comments

Comments
 (0)