Skip to content

Commit fccaeb2

Browse files
christophstroblmp911de
authored andcommitted
DATAREDIS-1079 - Fix StreamMessageListenerContainer autoAck on receive.
Original pull request: spring-projects#508.
1 parent 75fcb0e commit fccaeb2

File tree

6 files changed

+93
-20
lines changed

6 files changed

+93
-20
lines changed

src/main/asciidoc/reference/redis-streams.adoc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,31 @@ Please refer to the Javadoc of the various message listener containers for a ful
163163

164164
NOTE: Demand-driven consumption uses backpressure signals to activate and deactivate polling. `StreamReceiver` subscriptions pause polling if the demand is satisfied until subscribers signal further demand. Depending on the `ReadOffset` strategy, this can cause messages to be skipped.
165165

166+
[[redis.streams.acknowledge]]
167+
=== `Acknowledge` strategies
168+
169+
When you read with messages via a `Consumer Group`, the server will remember that a given message was delivered and add it to the Pending Entries List (PEL). A list of messages delivered but not yet acknowledged. +
170+
Messages have to be acknowledged via `StreamOperations.acknowledge` in order to be removed from the Pending Entries List as shown in the snippet below.
171+
172+
====
173+
[source,java]
174+
----
175+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ...
176+
177+
container.receive(Consumer.from("my-group", "my-consumer"), <1>
178+
StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
179+
msg -> {
180+
181+
// ...
182+
redisTemplate.opsForStream().acknowledge("my-group", msg); <2>
183+
});
184+
----
185+
<1> Read as _my-consumer_ from group _my-group_. Received messages are not acknowledged.
186+
<2> Acknowledged the message after processing.
187+
====
188+
189+
TIP: To auto acknowledge messages on receive use `receiveAutoAck` instead of `receive`.
190+
166191
[[redis.streams.receive.readoffset]]
167192
=== `ReadOffset` strategies
168193

src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* Options for reading messages from a Redis Stream.
2929
*
3030
* @author Mark Paluch
31+
* @author Christoph Strobl
3132
* @see 2.2
3233
*/
3334
@EqualsAndHashCode
@@ -57,14 +58,23 @@ public static StreamReadOptions empty() {
5758
}
5859

5960
/**
60-
* Disable auto-acknowledgement when reading in the context of a consumer group.
61+
* Enable auto-acknowledgement by setting the {@code NOACK} flag when reading in the context of a consumer group.
6162
*
6263
* @return {@link StreamReadOptions} with {@code noack} applied.
6364
*/
6465
public StreamReadOptions noack() {
6566
return new StreamReadOptions(block, count, true);
6667
}
6768

69+
/**
70+
* Enable auto-acknowledgement by setting the {@code NOACK} flag when reading in the context of a consumer group.
71+
*
72+
* @return new instance of {@link StreamReadOptions} with {@code noack} applied.
73+
*/
74+
public StreamReadOptions autoAcknowledge() {
75+
return new StreamReadOptions(block, count, true);
76+
}
77+
6878
/**
6979
* Use a blocking read and supply the {@link Duration timeout} after which the call will terminate if no message was
7080
* read.

src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
* This message container creates long-running tasks that are executed on {@link Executor}.
4646
*
4747
* @author Mark Paluch
48+
* @author Christoph Strobl
4849
* @since 2.2
4950
*/
5051
class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implements StreamMessageListenerContainer<K, V> {
@@ -223,7 +224,7 @@ private StreamPollTask<K, V> getReadTask(StreamReadRequest<K> streamRequest, Str
223224

224225
ConsumerStreamReadRequest<K> consumerStreamRequest = (ConsumerStreamReadRequest<K>) streamRequest;
225226

226-
StreamReadOptions readOptions = consumerStreamRequest.isAutoAck() ? this.readOptions : this.readOptions.noack();
227+
StreamReadOptions readOptions = consumerStreamRequest.isAutoAck() ? this.readOptions.autoAcknowledge() : this.readOptions;
227228
Consumer consumer = consumerStreamRequest.getConsumer();
228229

229230
if (this.containerOptions.getHashMapper() != null) {

src/main/java/org/springframework/data/redis/stream/DefaultStreamReceiver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
* Default implementation of {@link StreamReceiver}.
4848
*
4949
* @author Mark Paluch
50+
* @author Christoph Strobl
5051
* @since 2.2
5152
*/
5253
class DefaultStreamReceiver<K, V extends Record<K, ?>> implements StreamReceiver<K, V> {
@@ -134,7 +135,7 @@ public Flux<V> receiveAutoAck(Consumer consumer, StreamOffset<K> streamOffset) {
134135
}
135136

136137
BiFunction<K, ReadOffset, Flux<? extends Record<?, ?>>> readFunction = getConsumeReadFunction(consumer,
137-
this.readOptions);
138+
this.readOptions.autoAcknowledge());
138139

139140
return Flux.defer(() -> {
140141

@@ -157,7 +158,7 @@ public Flux<V> receive(Consumer consumer, StreamOffset<K> streamOffset) {
157158
}
158159

159160
BiFunction<K, ReadOffset, Flux<? extends Record<?, ?>>> readFunction = getConsumeReadFunction(consumer,
160-
this.readOptions.noack());
161+
this.readOptions);
161162

162163
return Flux.defer(() -> {
163164
PollState pollState = PollState.consumer(consumer, streamOffset.getOffset());

src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
* </pre>
105105
*
106106
* @author Mark Paluch
107+
* @author Christoph Strobl
107108
* @param <K> Stream key and Stream field type.
108109
* @param <V> Stream value type.
109110
* @since 2.2
@@ -196,7 +197,7 @@ default Subscription receive(StreamOffset<K> streamOffset, StreamListener<K, V>
196197
* @see ReadOffset#lastConsumed()
197198
*/
198199
default Subscription receive(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K, V> listener) {
199-
return register(StreamReadRequest.builder(streamOffset).consumer(consumer).build(), listener);
200+
return register(StreamReadRequest.builder(streamOffset).consumer(consumer).autoAck(false).build(), listener);
200201
}
201202

202203
/**

src/test/java/org/springframework/data/redis/stream/StreamMessageListenerContainerIntegrationTests.java

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
import static org.assertj.core.api.Assertions.*;
1919
import static org.junit.Assume.*;
2020

21+
import io.lettuce.core.codec.StringCodec;
22+
import io.lettuce.core.output.NestedMultiOutput;
2123
import lombok.AllArgsConstructor;
2224
import lombok.Data;
2325

2426
import java.time.Duration;
2527
import java.util.Collections;
28+
import java.util.List;
2629
import java.util.concurrent.BlockingQueue;
2730
import java.util.concurrent.LinkedBlockingQueue;
2831
import java.util.concurrent.TimeUnit;
@@ -38,6 +41,7 @@
3841
import org.springframework.data.redis.connection.RedisConnectionFactory;
3942
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
4043
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
44+
import org.springframework.data.redis.connection.lettuce.LettuceConnection;
4145
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
4246
import org.springframework.data.redis.connection.lettuce.LettuceTestClientResources;
4347
import org.springframework.data.redis.connection.stream.Consumer;
@@ -50,11 +54,13 @@
5054
import org.springframework.data.redis.core.StringRedisTemplate;
5155
import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamMessageListenerContainerOptions;
5256
import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamReadRequest;
57+
import org.springframework.util.NumberUtils;
5358

5459
/**
5560
* Integration tests for {@link StreamMessageListenerContainer}.
5661
*
5762
* @author Mark Paluch
63+
* @author Christoph Strobl
5864
*/
5965
public class StreamMessageListenerContainerIntegrationTests {
6066

@@ -103,8 +109,7 @@ public void before() {
103109
public void shouldReceiveMapMessages() throws InterruptedException {
104110

105111
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
106-
.create(connectionFactory,
107-
containerOptions);
112+
.create(connectionFactory, containerOptions);
108113
BlockingQueue<MapRecord<String, String, String>> queue = new LinkedBlockingQueue<>();
109114

110115
container.start();
@@ -174,12 +179,11 @@ public void shouldReceiveObjectHashRecords() throws InterruptedException {
174179
assertThat(subscription.isActive()).isFalse();
175180
}
176181

177-
@Test // DATAREDIS-864
182+
@Test // DATAREDIS-864, DATAREDIS-1079
178183
public void shouldReceiveMessagesInConsumerGroup() throws InterruptedException {
179184

180185
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
181-
.create(connectionFactory,
182-
containerOptions);
186+
.create(connectionFactory, containerOptions);
183187
BlockingQueue<MapRecord<String, String, String>> queue = new LinkedBlockingQueue<>();
184188
RecordId messageId = redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value1"));
185189
redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from(messageId), "my-group");
@@ -196,6 +200,34 @@ public void shouldReceiveMessagesInConsumerGroup() throws InterruptedException {
196200
assertThat(message).isNotNull();
197201
assertThat(message.getValue()).containsEntry("key", "value2");
198202

203+
assertThat(getNumberOfPending("my-stream", "my-group")).isOne();
204+
205+
cancelAwait(subscription);
206+
}
207+
208+
@Test // DATAREDIS-1079
209+
public void shouldReceiveAndAckMessagesInConsumerGroup() throws InterruptedException {
210+
211+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
212+
.create(connectionFactory, containerOptions);
213+
BlockingQueue<MapRecord<String, String, String>> queue = new LinkedBlockingQueue<>();
214+
RecordId messageId = redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value1"));
215+
redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from(messageId), "my-group");
216+
217+
container.start();
218+
Subscription subscription = container.receiveAutoAck(Consumer.from("my-group", "my-consumer"),
219+
StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::add);
220+
221+
subscription.await(Duration.ofSeconds(2));
222+
223+
redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value2"));
224+
225+
MapRecord<String, String, String> message = queue.poll(1, TimeUnit.SECONDS);
226+
assertThat(message).isNotNull();
227+
assertThat(message.getValue()).containsEntry("key", "value2");
228+
229+
assertThat(getNumberOfPending("my-stream", "my-group")).isZero();
230+
199231
cancelAwait(subscription);
200232
}
201233

@@ -207,8 +239,7 @@ public void shouldUseCustomErrorHandler() throws InterruptedException {
207239
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
208240
.builder().errorHandler(failures::add).pollTimeout(Duration.ofMillis(100)).build();
209241
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
210-
.create(connectionFactory,
211-
containerOptions);
242+
.create(connectionFactory, containerOptions);
212243

213244
container.start();
214245
Subscription subscription = container.receive(Consumer.from("my-group", "my-consumer"),
@@ -229,8 +260,7 @@ public void errorShouldStopListening() throws InterruptedException {
229260
BlockingQueue<Throwable> failures = new LinkedBlockingQueue<>();
230261

231262
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
232-
.create(connectionFactory,
233-
containerOptions);
263+
.create(connectionFactory, containerOptions);
234264

235265
StreamReadRequest<String> readRequest = StreamReadRequest
236266
.builder(StreamOffset.create("my-stream", ReadOffset.lastConsumed())).errorHandler(failures::add)
@@ -260,8 +290,7 @@ public void customizedCancelPredicateShouldNotStopListening() throws Interrupted
260290
BlockingQueue<Throwable> failures = new LinkedBlockingQueue<>();
261291

262292
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
263-
.create(connectionFactory,
264-
containerOptions);
293+
.create(connectionFactory, containerOptions);
265294

266295
StreamReadRequest<String> readRequest = StreamReadRequest
267296
.builder(StreamOffset.create("my-stream", ReadOffset.lastConsumed())) //
@@ -291,8 +320,7 @@ public void customizedCancelPredicateShouldNotStopListening() throws Interrupted
291320
public void cancelledStreamShouldNotReceiveMessages() throws InterruptedException {
292321

293322
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
294-
.create(connectionFactory,
295-
containerOptions);
323+
.create(connectionFactory, containerOptions);
296324
BlockingQueue<MapRecord<String, String, String>> queue = new LinkedBlockingQueue<>();
297325

298326
container.start();
@@ -310,8 +338,7 @@ public void cancelledStreamShouldNotReceiveMessages() throws InterruptedExceptio
310338
public void containerRestartShouldRestartSubscription() throws InterruptedException {
311339

312340
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
313-
.create(connectionFactory,
314-
containerOptions);
341+
.create(connectionFactory, containerOptions);
315342
BlockingQueue<MapRecord<String, String, String>> queue = new LinkedBlockingQueue<>();
316343

317344
container.start();
@@ -345,6 +372,14 @@ private static void cancelAwait(Subscription subscription) throws InterruptedExc
345372
}
346373
}
347374

375+
private Integer getNumberOfPending(String stream, String group) {
376+
377+
String value = ((List) ((LettuceConnection) connectionFactory.getConnection()).execute("XPENDING",
378+
new NestedMultiOutput(StringCodec.UTF8), new byte[][] { stream.getBytes(), group.getBytes() })).iterator()
379+
.next().toString();
380+
return NumberUtils.parseNumber(value, Integer.class);
381+
}
382+
348383
@Data
349384
@AllArgsConstructor
350385
static class LoginEvent {

0 commit comments

Comments
 (0)