Skip to content

Commit fdc914a

Browse files
committed
More refactoring, still rough
1 parent 3e40684 commit fdc914a

14 files changed

+1013
-125
lines changed

src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfig.java

Lines changed: 155 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33
import org.sourcelab.storm.spout.redis.failhandler.NoRetryHandler;
44

55
import java.io.Serializable;
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.Collections;
9+
import java.util.List;
610
import java.util.Objects;
11+
import java.util.stream.Collectors;
712

813
/**
914
* Configuration properties for the spout.
@@ -12,9 +17,8 @@ public class RedisStreamSpoutConfig implements Serializable {
1217
/**
1318
* Redis server details.
1419
*/
15-
private final String host;
16-
private final int port;
17-
private final String password;
20+
private final RedisServer redisServer;
21+
private final RedisCluster redisCluster;
1822

1923
/**
2024
* The Redis key to stream from.
@@ -69,11 +73,12 @@ public class RedisStreamSpoutConfig implements Serializable {
6973

7074
/**
7175
* Constructor.
72-
* See Builder instance.
76+
* Use Builder instance.
7377
*/
74-
public RedisStreamSpoutConfig(
78+
private RedisStreamSpoutConfig(
7579
// Redis Connection Properties
76-
final String host, final int port, final String password,
80+
final RedisServer redisServer,
81+
final RedisCluster redisCluster,
7782
// Consumer properties
7883
final String streamKey, final String groupName, final String consumerIdPrefix,
7984
// Classes
@@ -83,10 +88,15 @@ public RedisStreamSpoutConfig(
8388
final int maxConsumePerRead, final int maxTupleQueueSize, final int maxAckQueueSize, final long consumerDelayMillis,
8489
final boolean metricsEnabled
8590
) {
86-
// Connection Details.
87-
this.host = Objects.requireNonNull(host);
88-
this.port = port;
89-
this.password = password;
91+
// Connection
92+
if (redisCluster != null && redisServer != null) {
93+
throw new IllegalStateException("TODO");
94+
} else if (redisCluster == null && redisServer == null) {
95+
throw new IllegalStateException("TODO");
96+
}
97+
98+
this.redisCluster = redisCluster;
99+
this.redisServer = redisServer;
90100

91101
// Consumer Details
92102
this.groupName = Objects.requireNonNull(groupName);
@@ -105,18 +115,6 @@ public RedisStreamSpoutConfig(
105115
this.metricsEnabled = metricsEnabled;
106116
}
107117

108-
public String getHost() {
109-
return host;
110-
}
111-
112-
public int getPort() {
113-
return port;
114-
}
115-
116-
public String getPassword() {
117-
return password;
118-
}
119-
120118
public String getStreamKey() {
121119
return streamKey;
122120
}
@@ -133,18 +131,19 @@ public int getMaxConsumePerRead() {
133131
return maxConsumePerRead;
134132
}
135133

134+
public boolean isConnectingToCluster() {
135+
return redisCluster != null;
136+
}
137+
136138
/**
137139
* Build a Redis connection string based on configured properties.
138140
* @return Redis Connection string.
139141
*/
140142
public String getConnectString() {
141-
String connectStr = "redis://";
142-
if (getPassword() != null && !getPassword().trim().isEmpty()) {
143-
connectStr += getPassword() + "@";
143+
if (!isConnectingToCluster()) {
144+
return redisServer.getConnectString();
144145
}
145-
connectStr += getHost() + ":" + getPort();
146-
147-
return connectStr;
146+
return redisCluster.getConnectString();
148147
}
149148

150149
public int getMaxTupleQueueSize() {
@@ -186,9 +185,8 @@ public static final class Builder {
186185
/**
187186
* Connection details.
188187
*/
189-
private String host;
190-
private int port;
191-
private String password;
188+
private final List<RedisServer> clusterServers = new ArrayList<>();
189+
private RedisServer redisServer = null;
192190

193191
/**
194192
* Consumer details.
@@ -219,36 +217,52 @@ public static final class Builder {
219217
private Builder() {
220218
}
221219

222-
public Builder withHost(final String host) {
223-
this.host = host;
220+
public Builder withServer(final RedisServer redisServer) {
221+
if (!clusterServers.isEmpty()) {
222+
// Cannot define both cluster servers and redis server instances.
223+
throw new IllegalArgumentException("TODO");
224+
}
225+
this.redisServer = Objects.requireNonNull(redisServer);
224226
return this;
225227
}
226228

227-
/**
228-
* Set the port parameter. Attempts to handle input in both
229-
* Number or String input.
230-
*
231-
* @param port Port value.
232-
* @return Builder instance.
233-
* @throws IllegalArgumentException if passed a non-number representation value.
234-
*/
235-
public Builder withPort(final Object port) {
236-
Objects.requireNonNull(port);
237-
if (port instanceof Number) {
238-
return withPort(((Number) port).intValue());
239-
} else if (port instanceof String) {
240-
return withPort(Integer.parseInt((String) port));
241-
}
242-
throw new IllegalArgumentException("Port must be a Number!");
229+
public Builder withServer(final String host, final int port, final String password) {
230+
return withServer(new RedisServer(host, port, password));
243231
}
244232

245-
public Builder withPort(final int port) {
246-
this.port = port;
233+
public Builder withServer(final String host, final int port) {
234+
return withServer(host, port, null);
235+
}
236+
237+
238+
public Builder withClusters(final RedisServer...clusterServers) {
239+
Arrays.stream(clusterServers)
240+
.forEach(this::withCluster);
247241
return this;
248242
}
249243

250-
public Builder withPassword(final String password) {
251-
this.password = password;
244+
public Builder withCluster(final RedisServer clusterServer) {
245+
if (redisServer != null) {
246+
// Cannot define both cluster servers and redis server instances.
247+
throw new IllegalArgumentException("TODO");
248+
}
249+
clusterServers.add(Objects.requireNonNull(clusterServer));
250+
return this;
251+
}
252+
253+
public Builder withCluster(final String host, final int port, final String password) {
254+
return withCluster(new RedisServer(host, port, password));
255+
}
256+
257+
public Builder withCluster(final String host, final int port) {
258+
return withCluster(host, port, null);
259+
}
260+
261+
public Builder withCluster(final RedisCluster redisCluster) {
262+
Objects.requireNonNull(redisCluster);
263+
264+
this.clusterServers.clear();
265+
this.clusterServers.addAll(redisCluster.getServers());
252266
return this;
253267
}
254268

@@ -320,9 +334,15 @@ public Builder withMetricsEnabled(final boolean enabled) {
320334
* @return Configuration instance.
321335
*/
322336
public RedisStreamSpoutConfig build() {
337+
RedisCluster redisCluster = null;
338+
if (!clusterServers.isEmpty()) {
339+
redisCluster = new RedisCluster(clusterServers);
340+
}
341+
323342
return new RedisStreamSpoutConfig(
324343
// Redis connection properties
325-
host, port, password,
344+
redisServer, redisCluster,
345+
326346
// Consumer Properties
327347
streamKey, groupName, consumerIdPrefix,
328348
// Classes
@@ -333,4 +353,85 @@ public RedisStreamSpoutConfig build() {
333353
);
334354
}
335355
}
356+
357+
public static class RedisCluster {
358+
private final List<RedisServer> servers;
359+
360+
public RedisCluster(final RedisServer...servers) {
361+
this(Arrays.asList(servers));
362+
}
363+
364+
public RedisCluster(final List<RedisServer> servers) {
365+
Objects.requireNonNull(servers);
366+
this.servers = Collections.unmodifiableList(new ArrayList<>(servers));
367+
}
368+
369+
public List<RedisServer> getServers() {
370+
return servers;
371+
}
372+
373+
@Override
374+
public String toString() {
375+
return "RedisCluster{"
376+
+ "servers=" + servers
377+
+ '}';
378+
}
379+
380+
public String getConnectString() {
381+
return getServers().stream()
382+
.map(RedisServer::getConnectString)
383+
.collect(Collectors.joining(","));
384+
}
385+
}
386+
387+
public static class RedisServer {
388+
private final String host;
389+
private final int port;
390+
private final String password;
391+
392+
public RedisServer(final String host, final int port) {
393+
this(host, port, null);
394+
}
395+
396+
public RedisServer(final String host, final int port, final String password) {
397+
this.host = host;
398+
this.port = port;
399+
this.password = password;
400+
}
401+
402+
public String getHost() {
403+
return host;
404+
}
405+
406+
public int getPort() {
407+
return port;
408+
}
409+
410+
public String getPassword() {
411+
return password;
412+
}
413+
414+
public boolean hasPassword() {
415+
return password != null;
416+
}
417+
418+
public String getConnectString() {
419+
String connectStr = "redis://";
420+
421+
if (getPassword() != null && !getPassword().trim().isEmpty()) {
422+
connectStr += getPassword() + "@";
423+
}
424+
connectStr += getHost() + ":" + getPort();
425+
426+
return connectStr;
427+
}
428+
429+
@Override
430+
public String toString() {
431+
return "RedisServer{"
432+
+ "host='" + host + '\''
433+
+ ", port=" + port
434+
+ '}';
435+
}
436+
}
336437
}

src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ public LettuceClient(final RedisStreamSpoutConfig config, final int instanceId)
5858
this(
5959
config,
6060
instanceId,
61-
//new LettuceRedisClient(RedisClient.create(config.getConnectString()))
62-
new LettuceClusterClient(RedisClusterClient.create(config.getConnectString()))
61+
config.isConnectingToCluster() ?
62+
new LettuceClusterClient(RedisClusterClient.create(config.getConnectString()))
63+
: new LettuceRedisClient(RedisClient.create(config.getConnectString()))
6364
);
6465
}
6566

0 commit comments

Comments
 (0)