File tree Expand file tree Collapse file tree 3 files changed +32
-19
lines changed
src/main/java/org/sourcelab/storm/spout/redis/client/jedis Expand file tree Collapse file tree 3 files changed +32
-19
lines changed Original file line number Diff line number Diff line change 9
9
* Adapter to allow usage of both Jedis and JedisCluster.
10
10
*/
11
11
public interface JedisAdapter {
12
- /**
13
- * Is the underlying client connected?
14
- * @return true if connected, false if not.
15
- */
16
- boolean isConnected ();
17
-
18
12
/**
19
13
* Call connect.
20
14
*/
21
15
void connect ();
22
16
17
+ /**
18
+ * Consume next batch of messages.
19
+ * @return List of messages consumed.
20
+ */
23
21
List <Map .Entry <String , List <StreamEntry >>> consume ();
24
22
23
+ /**
24
+ * Mark the provided messageId as acknowledged/completed.
25
+ * @param msgId Id of the message.
26
+ */
25
27
void commit (final String msgId );
26
28
27
29
/**
28
- * Call shutdown .
30
+ * Disconnect client .
29
31
*/
30
32
void close ();
31
33
34
+ /**
35
+ * Advance the last offset consumed from PPL.
36
+ * @param lastMsgId Id of the last msg consumed.
37
+ */
32
38
void advancePplOffset (final String lastMsgId );
33
39
40
+ /**
41
+ * Switch to consuming from latest messages.
42
+ */
34
43
void switchToConsumerGroupMessages ();
35
44
}
Original file line number Diff line number Diff line change 15
15
import java .util .Objects ;
16
16
17
17
/**
18
- *
18
+ * Adapter for talking to a RedisCluster.
19
+ * If you need to talk to a single Redis instance {@link JedisRedisAdapter}.
19
20
*/
20
21
public class JedisClusterAdapter implements JedisAdapter {
21
22
private static final Logger logger = LoggerFactory .getLogger (JedisClusterAdapter .class );
@@ -38,17 +39,18 @@ public class JedisClusterAdapter implements JedisAdapter {
38
39
*/
39
40
private Map .Entry <String , StreamEntryID > streamPositionKey ;
40
41
42
+ /**
43
+ * Constructor.
44
+ * @param jedisCluster Underlying Jedis Cluster client instance.
45
+ * @param config Spout configuration.
46
+ * @param instanceId Spout instance Id.
47
+ */
41
48
public JedisClusterAdapter (final JedisCluster jedisCluster , final RedisStreamSpoutConfig config , final int instanceId ) {
42
49
this .jedisCluster = Objects .requireNonNull (jedisCluster );
43
50
this .config = Objects .requireNonNull (config );
44
51
this .consumerId = config .getConsumerIdPrefix () + instanceId ;
45
52
}
46
53
47
- @ Override
48
- public boolean isConnected () {
49
- return true ;
50
- }
51
-
52
54
@ Override
53
55
public void connect () {
54
56
// Attempt to create consumer group
Original file line number Diff line number Diff line change 15
15
import java .util .Objects ;
16
16
17
17
/**
18
- *
18
+ * Adapter for talking to a single Redis instance.
19
+ * If you need to talk to a RedisCluster {@link JedisClusterAdapter}.
19
20
*/
20
21
public class JedisRedisAdapter implements JedisAdapter {
21
22
private static final Logger logger = LoggerFactory .getLogger (JedisRedisAdapter .class );
@@ -38,17 +39,18 @@ public class JedisRedisAdapter implements JedisAdapter {
38
39
*/
39
40
private Map .Entry <String , StreamEntryID > streamPositionKey ;
40
41
42
+ /**
43
+ * Constructor.
44
+ * @param jedis Underlying Jedis client instance.
45
+ * @param config Spout configuration.
46
+ * @param instanceId Spout instance Id.
47
+ */
41
48
public JedisRedisAdapter (final Jedis jedis , final RedisStreamSpoutConfig config , final int instanceId ) {
42
49
this .jedis = Objects .requireNonNull (jedis );
43
50
this .config = Objects .requireNonNull (config );
44
51
this .consumerId = config .getConsumerIdPrefix () + instanceId ;
45
52
}
46
53
47
- @ Override
48
- public boolean isConnected () {
49
- return jedis .isConnected ();
50
- }
51
-
52
54
@ Override
53
55
public void connect () {
54
56
jedis .connect ();
You can’t perform that action at this time.
0 commit comments