Skip to content

Commit 4b53a97

Browse files
committed
testing recovering
1 parent 6cdded5 commit 4b53a97

File tree

8 files changed

+25
-17
lines changed

8 files changed

+25
-17
lines changed

src/main/java/com/mageddo/kafka/client/BatchConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ public void consume(
2626
commitFirstRecord(consumer, records, partition);
2727
}
2828
})
29-
.onExhausted(() -> {
29+
.onExhausted((lastFailure) -> {
3030
log.info("status=exhausted-tries, records={}", records.count());
31-
records.forEach(record -> Consumers.doRecoverWhenAvailable(consumer, consumingConfig, record));
31+
records.forEach(record -> Consumers.doRecoverWhenAvailable(consumer, consumingConfig, record, lastFailure));
3232
})
3333
.build();
3434

src/main/java/com/mageddo/kafka/client/ConsumeHandler.java

Lines changed: 0 additions & 4 deletions
This file was deleted.

src/main/java/com/mageddo/kafka/client/ConsumerConfig.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.mageddo.kafka.client;
22

3+
import lombok.AccessLevel;
34
import lombok.AllArgsConstructor;
45
import lombok.Builder;
56
import lombok.Data;
@@ -14,7 +15,7 @@
1415
import java.util.Map;
1516

1617
@Data
17-
@Builder(toBuilder = true)
18+
@Builder(toBuilder = true, access = AccessLevel.PRIVATE)
1819
@Accessors(chain = true)
1920
@NoArgsConstructor
2021
@AllArgsConstructor
@@ -49,4 +50,7 @@ public ConsumerConfig<K, V> withProp(String k, Object v) {
4950
return this;
5051
}
5152

53+
public ConsumerConfig<K, V> copy(){
54+
return this.toBuilder().build();
55+
}
5256
}

src/main/java/com/mageddo/kafka/client/Consumers.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ public static <K, V> void commitSyncRecord(Consumer<K, V> consumer, ConsumerReco
2424
public static <K, V> void doRecoverWhenAvailable(
2525
Consumer<K, V> consumer,
2626
ConsumingConfig<K, V> consumingConfig,
27-
ConsumerRecord<K, V> record
27+
ConsumerRecord<K, V> record,
28+
Throwable lastFailure
2829
) {
2930
if (consumingConfig.getRecoverCallback() != null) {
30-
consumingConfig.getRecoverCallback().recover(record);
31+
consumingConfig.getRecoverCallback().recover(record, lastFailure);
3132
commitSyncRecord(consumer, record);
3233
} else {
3334
log.warn("status=no recover callback was specified");

src/main/java/com/mageddo/kafka/client/RecordConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ public void consume(
2121
final AtomicBoolean recovered = new AtomicBoolean();
2222
Retrier
2323
.builder()
24-
.onExhausted(() -> {
24+
.onExhausted((lastFailure) -> {
2525
log.info("exhausted tries");
26-
Consumers.doRecoverWhenAvailable(consumer, consumingConfig, record);
26+
Consumers.doRecoverWhenAvailable(consumer, consumingConfig, record, lastFailure);
2727
recovered.set(true);
2828
})
2929
.onRetry(() -> {

src/main/java/com/mageddo/kafka/client/RecoverCallback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
import org.apache.kafka.clients.consumer.ConsumerRecord;
44

55
public interface RecoverCallback<K, V> {
6-
void recover(ConsumerRecord<K, V> record);
6+
void recover(ConsumerRecord<K, V> record, Throwable lastFailure);
77
}

src/main/java/com/mageddo/kafka/client/Retrier.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.mageddo.kafka.client;
22

33
import java.util.ArrayList;
4+
import java.util.function.Consumer;
45

56
import lombok.extern.slf4j.Slf4j;
67

@@ -19,7 +20,7 @@ public class Retrier {
1920

2021
private RetryPolicy retryPolicy;
2122
private Callback onRetry;
22-
private Callback onExhausted;
23+
private Consumer<Throwable> onExhausted;
2324

2425
public void run(final Callback run) {
2526
Failsafe
@@ -28,7 +29,7 @@ public void run(final Callback run) {
2829
if(log.isTraceEnabled()){
2930
log.trace("status=onExhausted, ctx={}", ctx);
3031
}
31-
onExhausted.call();
32+
onExhausted.accept(ctx.getLastFailure());
3233
return null;
3334
}),
3435
retryPolicyToFailSafeRetryPolicy(this.retryPolicy)

src/test/java/StockPriceMDB.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import java.net.http.HttpClient;
1+
22
import java.time.Duration;
33
import java.util.Collections;
44
import java.util.LinkedHashMap;
@@ -64,13 +64,19 @@ public static void main(String[] args) {
6464
.withProp(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName())
6565
.setRetryPolicy(RetryPolicy
6666
.builder()
67-
.maxTries(3)
67+
.maxTries(1)
6868
.delay(Duration.ofSeconds(29))
6969
.build()
7070
)
71+
.setRecoverCallback((record, lastFailure) -> {
72+
log.info("status=recovering, record={}", new String(record.value()));
73+
})
7174
.setBatchCallback((consumer, records, e) -> {
7275
for (final ConsumerRecord<String, byte[]> record : records) {
73-
// throw new RuntimeException("an error occurred");
76+
final double randomValue = Math.random();
77+
if(randomValue > 0.5 && randomValue < 0.8){
78+
throw new RuntimeException("an error occurred");
79+
}
7480
log.info("key={}, value={}", record.key(), new String(record.value()));
7581
}
7682
});

0 commit comments

Comments
 (0)