Skip to content

Commit 6ac1c67

Browse files
committed
Make sure events are not duplicated on retry
1 parent 1e532ae commit 6ac1c67

File tree

2 files changed

+22
-35
lines changed

2 files changed

+22
-35
lines changed

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/AbstractEventHandler.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,23 @@ protected AbstractEventHandler(Publisher<InstanceEvent> publisher, Class<T> even
4040
}
4141

4242
public void start() {
43-
subscription = Flux.from(publisher)
44-
.log(log.getName(), Level.FINEST)
45-
.doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
46-
.ofType(eventType)
47-
.cast(eventType)
48-
.transform(this::handle)
49-
.retryWhen(Retry.any()
50-
.retryMax(Long.MAX_VALUE)
51-
.doOnRetry(ctx -> log.warn("Unexpected error", ctx.exception())))
52-
.subscribe();
43+
this.subscription = Flux.from(this.publisher)
44+
.log(this.log.getName(), Level.FINEST)
45+
.doOnSubscribe(s -> this.log.debug("Subscribed to {} events", this.eventType))
46+
.ofType(this.eventType)
47+
.cast(this.eventType)
48+
.transform(this::handle)
49+
.retryWhen(Retry.any()
50+
.retryMax(Long.MAX_VALUE)
51+
.doOnRetry(ctx -> this.log.warn("Unexpected error", ctx.exception())))
52+
.subscribe();
5353
}
5454

5555
protected abstract Publisher<Void> handle(Flux<T> publisher);
5656

5757
public void stop() {
58-
if (subscription != null) {
59-
subscription.dispose();
58+
if (this.subscription != null) {
59+
this.subscription.dispose();
6060
}
6161
}
6262
}

spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/AbstractEventHandlerTest.java

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,10 @@
3737
public class AbstractEventHandlerTest {
3838
private static final Logger log = LoggerFactory.getLogger(AbstractEventHandlerTest.class);
3939
private static final Registration registration = Registration.create("foo", "http://health").build();
40-
private static final InstanceRegisteredEvent event = new InstanceRegisteredEvent(InstanceId.of("id"),
41-
0L,
42-
registration
43-
);
44-
private static final InstanceRegisteredEvent errorEvent = new InstanceRegisteredEvent(InstanceId.of("err"),
45-
0L,
46-
registration
47-
);
48-
private static final InstanceDeregisteredEvent ignoredEvent = new InstanceDeregisteredEvent(InstanceId.of("id"),
49-
1L
50-
);
40+
private static final InstanceRegisteredEvent firstEvent = new InstanceRegisteredEvent(InstanceId.of("id"), 0L, registration);
41+
private static final InstanceRegisteredEvent secondEvent = new InstanceRegisteredEvent(InstanceId.of("id"), 1L, registration);
42+
private static final InstanceRegisteredEvent errorEvent = new InstanceRegisteredEvent(InstanceId.of("err"), 2L, registration);
43+
private static final InstanceDeregisteredEvent ignoredEvent = new InstanceDeregisteredEvent(InstanceId.of("id"), 2L);
5144

5245
@Test
5346
public void should_resubscribe_after_error() {
@@ -58,14 +51,10 @@ public void should_resubscribe_after_error() {
5851

5952
StepVerifier.create(eventHandler.getFlux())
6053
.expectSubscription()
61-
.then(() -> testPublisher.next(event))
62-
.expectNext(event)
63-
.then(() -> testPublisher.next(errorEvent))
64-
.expectNoEvent(Duration.ofMillis(100L))
65-
.then(() -> testPublisher.next(event))
66-
.expectNext(event)
54+
.then(() -> testPublisher.next(firstEvent, errorEvent, secondEvent))
55+
.expectNext(firstEvent, secondEvent)
6756
.thenCancel()
68-
.verify(Duration.ofSeconds(5));
57+
.verify(Duration.ofSeconds(1));
6958

7059
}
7160

@@ -78,12 +67,10 @@ public void should_filter() {
7867

7968
StepVerifier.create(eventHandler.getFlux())
8069
.expectSubscription()
81-
.then(() -> testPublisher.next(event))
82-
.expectNext(event)
83-
.then(() -> testPublisher.next(ignoredEvent))
84-
.expectNoEvent(Duration.ofMillis(100L))
70+
.then(() -> testPublisher.next(firstEvent, ignoredEvent, secondEvent))
71+
.expectNext(firstEvent, secondEvent)
8572
.thenCancel()
86-
.verify(Duration.ofSeconds(5));
73+
.verify(Duration.ofSeconds(1));
8774
}
8875

8976

0 commit comments

Comments
 (0)