Skip to content

Commit 60343fc

Browse files
authored
Merge pull request #2 from SourceLabOrg/sp/exponentialBackoff
Implements an Exponential Backoff Failure Handler
2 parents 16f0d4e + d66ea93 commit 60343fc

File tree

7 files changed

+1002
-4
lines changed

7 files changed

+1002
-4
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ will handle Failed Tuples. The following implementations are provided out of th
7777
| Implementation | Description |
7878
|----------------|-------------|
7979
| [NoRetryHandler](src/main/java/org/sourcelab/storm/spout/redis/failhandler/NoRetryHandler.java) | Will never retry failed tuples. |
80-
| [RetryFailedTuples](src/main/java/org/sourcelab/storm/spout/redis/failhandler/RetryFailedTuples.java) | Rudimentary implementation that can be configured to replay failed tuples forever, or for a configured number of attempts. |
80+
| [ExponentialBackoffFailureHandler](src/main/java/org/sourcelab/storm/spout/redis/failhandler/ExponentialBackoffFailureHandler.java) | Will attempt to retry failed messages using an exponential backoff strategy. |
81+
| [RetryFailedTuples](src/main/java/org/sourcelab/storm/spout/redis/failhandler/RetryFailedTuples.java) | Rudimentary implementation that can be configured to replay failed tuples for a configured number of attempts. |
8182

8283
#### Example Topology
8384

src/main/java/org/sourcelab/storm/spout/redis/FailureHandler.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,22 @@
88
* Does NOT need to be thread safe as only accessed via a single thread.
99
*/
1010
public interface FailureHandler extends Serializable {
11-
void open(Map<String, Object> stormConfig);
11+
/**
12+
* Lifecycle method. Called once the Spout has started.
13+
* @param stormConfig Configuration map passed from the spout.
14+
*/
15+
void open(final Map<String, Object> stormConfig);
1216

1317
/**
1418
* Handle a failed message.
15-
* If the implementation wants to replay this tuple again in the future, it should return value of TRUE.
16-
* If the implementation does NOT want to handle/replay this tuple in the future, it should return a value of FALSE.
19+
* A return value of TRUE means:
20+
* This implementation wants to replay this message again in the future.
21+
* The spout will NOT mark the message as processed/ack the message.
22+
*
23+
* A return value of FALSE means:
24+
* This implementation does NOT want to replay this message in the future.
25+
* The spout WILL mark the message as processed/ack the message.
26+
*
1727
* @param message The failed message.
1828
* @return True if the implementation will replay this tuple again later, false if not.
1929
*/

src/main/java/org/sourcelab/storm/spout/redis/Message.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,23 @@ public Map<String, String> getBody() {
3131
return body;
3232
}
3333

34+
@Override
35+
public boolean equals(final Object other) {
36+
if (this == other) {
37+
return true;
38+
}
39+
if (other == null || getClass() != other.getClass()) {
40+
return false;
41+
}
42+
final Message message = (Message) other;
43+
return getId().equals(message.getId());
44+
}
45+
46+
@Override
47+
public int hashCode() {
48+
return Objects.hash(id);
49+
}
50+
3451
@Override
3552
public String toString() {
3653
return "Message{"
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package org.sourcelab.storm.spout.redis.failhandler;
2+
3+
import java.io.Serializable;
4+
import java.time.Duration;
5+
import java.util.Objects;
6+
import java.util.concurrent.TimeUnit;
7+
8+
/**
9+
* Configuration for {@link ExponentialBackoffFailureHandler}.
10+
*/
11+
public class ExponentialBackoffConfig implements Serializable {
12+
/**
13+
* Define our retry limit.
14+
* A value of less than 0 will mean we'll retry forever
15+
* A value of 0 means we'll never retry.
16+
* A value of greater than 0 sets an upper bound of number of retries.
17+
*/
18+
private final int retryLimit;
19+
20+
/**
21+
* Initial delay after a tuple fails for the first time, in milliseconds.
22+
*/
23+
private final long initialRetryDelayMs;
24+
25+
/**
26+
* Each time we fail, double our delay, so 4, 8, 16 seconds, etc.
27+
*/
28+
private final double retryDelayMultiplier;
29+
30+
/**
31+
* Maximum delay between successive retries, defaults to 15 minutes.
32+
*/
33+
private final long retryDelayMaxMs;
34+
35+
/**
36+
* Constructor. See Builder instance.
37+
*/
38+
public ExponentialBackoffConfig(
39+
final int retryLimit,
40+
final long initialRetryDelayMs,
41+
final double retryDelayMultiplier,
42+
final long retryDelayMaxMs
43+
) {
44+
this.retryLimit = retryLimit;
45+
this.initialRetryDelayMs = initialRetryDelayMs;
46+
this.retryDelayMultiplier = retryDelayMultiplier;
47+
this.retryDelayMaxMs = retryDelayMaxMs;
48+
}
49+
50+
/**
51+
* New Builder instance for ExponentialBackoffConfig.
52+
* @return New Builder instance for ExponentialBackoffConfig.
53+
*/
54+
public static Builder newBuilder() {
55+
return new Builder();
56+
}
57+
58+
public int getRetryLimit() {
59+
return retryLimit;
60+
}
61+
62+
public long getInitialRetryDelayMs() {
63+
return initialRetryDelayMs;
64+
}
65+
66+
public double getRetryDelayMultiplier() {
67+
return retryDelayMultiplier;
68+
}
69+
70+
public long getRetryDelayMaxMs() {
71+
return retryDelayMaxMs;
72+
}
73+
74+
@Override
75+
public String toString() {
76+
return "ExponentialBackoffConfig{"
77+
+ "retryLimit=" + retryLimit
78+
+ ", initialRetryDelayMs=" + initialRetryDelayMs
79+
+ ", retryDelayMultiplier=" + retryDelayMultiplier
80+
+ ", retryDelayMaxMs=" + retryDelayMaxMs
81+
+ '}';
82+
}
83+
84+
/**
85+
* Builder for {@link ExponentialBackoffConfig}.
86+
*/
87+
public static final class Builder {
88+
/**
89+
* Default retry limit is 10 attempts.
90+
*/
91+
private int retryLimit = 10;
92+
93+
/**
94+
* The first time a message has failed, defines the minimum delay.
95+
* Default initial delay is 2 seconds (2000 milliseconds).
96+
*/
97+
private long initialRetryDelayMs = TimeUnit.SECONDS.toMillis(2);
98+
99+
/**
100+
* Default multiplier to 2.0.
101+
*/
102+
private double retryDelayMultiplier = 2.0;
103+
104+
/**
105+
* Upperbound that we'll limit retries within.
106+
* Defaults to a max of 15 minutes.
107+
*/
108+
private long retryDelayMaxMs = TimeUnit.MINUTES.toMillis(15);
109+
110+
private Builder() {
111+
}
112+
113+
public Builder withRetryForever() {
114+
return withRetryLimit(-1);
115+
}
116+
117+
public Builder withRetryNever() {
118+
return withRetryLimit(0);
119+
}
120+
121+
public Builder withRetryLimit(int retryLimit) {
122+
this.retryLimit = retryLimit;
123+
return this;
124+
}
125+
126+
public Builder withInitialRetryDelay(final Duration duration) {
127+
Objects.requireNonNull(duration);
128+
return withInitialRetryDelayMs(duration.toMillis());
129+
}
130+
131+
public Builder withInitialRetryDelayMs(long initialRetryDelayMs) {
132+
this.initialRetryDelayMs = initialRetryDelayMs;
133+
return this;
134+
}
135+
136+
public Builder withRetryDelayMultiplier(double retryDelayMultiplier) {
137+
this.retryDelayMultiplier = retryDelayMultiplier;
138+
return this;
139+
}
140+
141+
public Builder withRetryDelayMax(final Duration duration) {
142+
Objects.requireNonNull(duration);
143+
return withRetryDelayMaxMs(duration.toMillis());
144+
}
145+
146+
public Builder withRetryDelayMaxMs(long retryDelayMaxMs) {
147+
this.retryDelayMaxMs = retryDelayMaxMs;
148+
return this;
149+
}
150+
151+
/**
152+
* Create new ExponentialBackoffConfig instance.
153+
* @return ExponentialBackoffConfig.
154+
*/
155+
public ExponentialBackoffConfig build() {
156+
return new ExponentialBackoffConfig(retryLimit, initialRetryDelayMs, retryDelayMultiplier, retryDelayMaxMs);
157+
}
158+
}
159+
}

0 commit comments

Comments
 (0)