Skip to content

Commit 9d62ead

Browse files
DATAREDIS-1053 - Polishing.
Use early return and fix issue reference in tests. Original Pull Request: spring-projects#488
1 parent 6851186 commit 9d62ead

File tree

2 files changed

+18
-20
lines changed

2 files changed

+18
-20
lines changed

src/main/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -101,31 +101,30 @@ public Mono<Void> destroyLater() {
101101

102102
private Mono<Void> doDestroy() {
103103

104-
ReactiveRedisConnection connection = this.connection;
104+
if (this.connection == null) {
105+
return Mono.empty();
106+
}
105107

106-
if (connection != null) {
108+
ReactiveRedisConnection connection = this.connection;
107109

108-
Flux<Void> terminationSignals = null;
109-
while (!subscriptions.isEmpty()) {
110+
Flux<Void> terminationSignals = null;
111+
while (!subscriptions.isEmpty()) {
110112

111-
Map<ReactiveSubscription, Subscribers> local = new HashMap<>(subscriptions);
112-
List<Mono<Void>> monos = local.keySet().stream() //
113-
.peek(subscriptions::remove) //
114-
.map(ReactiveSubscription::cancel) //
115-
.collect(Collectors.toList());
113+
Map<ReactiveSubscription, Subscribers> local = new HashMap<>(subscriptions);
114+
List<Mono<Void>> monos = local.keySet().stream() //
115+
.peek(subscriptions::remove) //
116+
.map(ReactiveSubscription::cancel) //
117+
.collect(Collectors.toList());
116118

117-
if (terminationSignals == null) {
118-
terminationSignals = Flux.concat(monos);
119-
} else {
120-
terminationSignals = terminationSignals.mergeWith(Flux.concat(monos));
121-
}
119+
if (terminationSignals == null) {
120+
terminationSignals = Flux.concat(monos);
121+
} else {
122+
terminationSignals = terminationSignals.mergeWith(Flux.concat(monos));
122123
}
123-
124-
this.connection = null;
125-
return terminationSignals != null ? terminationSignals.then(connection.closeLater()) : connection.closeLater();
126124
}
127125

128-
return Mono.empty();
126+
this.connection = null;
127+
return terminationSignals != null ? terminationSignals.then(connection.closeLater()) : connection.closeLater();
129128
}
130129

131130
/**

src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.concurrent.atomic.AtomicBoolean;
2626

2727
import org.junit.Test;
28-
2928
import org.springframework.data.redis.connection.ReactivePubSubCommands;
3029
import org.springframework.data.redis.connection.ReactiveRedisConnection;
3130
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
@@ -59,7 +58,7 @@ public void closeShouldUseAsyncRelease() {
5958
verifyNoMoreInteractions(connectionMock);
6059
}
6160

62-
@Test // DATAREDIS-999
61+
@Test // DATAREDIS-1053
6362
public void listenToShouldSubscribeToChannel() {
6463

6564
AtomicBoolean closed = new AtomicBoolean();

0 commit comments

Comments
 (0)