Skip to content

Commit 6851186

Browse files
mp911dechristophstrobl
authored andcommitted
DATAREDIS-1053 - Subscribe to cleanup publishers when canceling a listenTo() stream.
Cleanup publishers are now subscribed to instead of setting the subscribeOn Scheduler to ensure connections get closed after canceling the stream. Original Pull Request: spring-projects#488
1 parent f2bdeff commit 6851186

File tree

3 files changed

+42
-8
lines changed

3 files changed

+42
-8
lines changed

src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public Flux<? extends Message<String, V>> listenTo(Topic... topics) {
235235
return container
236236
.receive(Arrays.asList(topics), getSerializationContext().getStringSerializationPair(),
237237
getSerializationContext().getValueSerializationPair()) //
238-
.doFinally((signalType) -> container.destroyLater().subscribeOn(Schedulers.elastic()));
238+
.doFinally((signalType) -> container.destroyLater().subscribe());
239239
}
240240

241241
// -------------------------------------------------------------------------

src/main/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
2020
import reactor.core.publisher.MonoProcessor;
21-
import reactor.core.scheduler.Schedulers;
2221

2322
import java.nio.ByteBuffer;
2423
import java.util.Arrays;
@@ -97,6 +96,12 @@ public void destroy() {
9796
* @return the {@link Mono} signalling container termination.
9897
*/
9998
public Mono<Void> destroyLater() {
99+
return Mono.defer(this::doDestroy);
100+
}
101+
102+
private Mono<Void> doDestroy() {
103+
104+
ReactiveRedisConnection connection = this.connection;
100105

101106
if (connection != null) {
102107

@@ -116,12 +121,8 @@ public Mono<Void> destroyLater() {
116121
}
117122
}
118123

119-
if (terminationSignals != null) {
120-
return terminationSignals.collectList()
121-
.doFinally(signalType -> connection.closeLater().subscribeOn(Schedulers.immediate()))
122-
.flatMap(all -> Mono.empty());
123-
}
124124
this.connection = null;
125+
return terminationSignals != null ? terminationSignals.then(connection.closeLater()) : connection.closeLater();
125126
}
126127

127128
return Mono.empty();
@@ -344,7 +345,7 @@ boolean hasRegistration() {
344345

345346
/**
346347
* Unregister a subscriber and decrement subscriber count.
347-
*
348+
*
348349
* @return {@literal true} if this was the last unregistered subscriber.
349350
*/
350351
boolean unregister() {

src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,21 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18+
import static org.assertj.core.api.Assertions.*;
1819
import static org.mockito.Mockito.*;
1920

21+
import reactor.core.publisher.Flux;
2022
import reactor.core.publisher.Mono;
2123
import reactor.test.StepVerifier;
2224

25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
2327
import org.junit.Test;
2428

29+
import org.springframework.data.redis.connection.ReactivePubSubCommands;
2530
import org.springframework.data.redis.connection.ReactiveRedisConnection;
2631
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
32+
import org.springframework.data.redis.connection.ReactiveSubscription;
2733
import org.springframework.data.redis.serializer.RedisSerializationContext;
2834

2935
/**
@@ -52,4 +58,31 @@ public void closeShouldUseAsyncRelease() {
5258
verify(connectionMock).closeLater();
5359
verifyNoMoreInteractions(connectionMock);
5460
}
61+
62+
@Test // DATAREDIS-999
63+
public void listenToShouldSubscribeToChannel() {
64+
65+
AtomicBoolean closed = new AtomicBoolean();
66+
when(connectionFactoryMock.getReactiveConnection()).thenReturn(connectionMock);
67+
when(connectionMock.closeLater()).thenReturn(Mono.<Void> empty().doOnSubscribe(ignore -> closed.set(true)));
68+
69+
ReactivePubSubCommands pubSubCommands = mock(ReactivePubSubCommands.class);
70+
ReactiveSubscription subscription = mock(ReactiveSubscription.class);
71+
72+
when(connectionMock.pubSubCommands()).thenReturn(pubSubCommands);
73+
when(pubSubCommands.subscribe(any())).thenReturn(Mono.empty());
74+
when(pubSubCommands.createSubscription()).thenReturn(Mono.just(subscription));
75+
when(subscription.receive()).thenReturn(Flux.create(sink -> {}));
76+
77+
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate<>(connectionFactoryMock,
78+
RedisSerializationContext.string());
79+
80+
template.listenToChannel("channel") //
81+
.as(StepVerifier::create) //
82+
.thenAwait() //
83+
.thenCancel() //
84+
.verify();
85+
86+
assertThat(closed).isTrue();
87+
}
5588
}

0 commit comments

Comments
 (0)