Skip to content

Commit d6ea867

Browse files
mp911dechristophstrobl
authored andcommitted
DATAREDIS-645 - Introduce absent values in ReactiveStringCommands.
We now return AbsentByteBufferResponse to indicate absence of a Redis key when retrieving the key as top-level element. CommandResponse indicates via isPresent its presence or absence of a response value. Omission of absent values can cause a stream with a different response structure compared to the request. Retaining the stream sequence is required for response to request correlation. GET and GETSET commands use presence of CommandResponse to filter response emission. Both commands return a single element so absence does not interferes with the request structure. Previously, we just returned an empty byte-array that caused downstream errors deserializing and emitting null values that are prohibited in Reactive Streams. Original Pull Request: spring-projects#249
1 parent 18a6440 commit d6ea867

File tree

5 files changed

+49
-14
lines changed

5 files changed

+49
-14
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveRedisConnection.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,14 @@ class CommandResponse<I, O> {
245245

246246
private final I input;
247247
private final O output;
248+
249+
/**
250+
* @return {@literal true} if the response is present. An absent {@link CommandResponse} maps to Redis
251+
* {@literal (nil)}.
252+
*/
253+
public boolean isPresent() {
254+
return true;
255+
}
248256
}
249257

250258
/**
@@ -267,6 +275,26 @@ public ByteBufferResponse(I input, ByteBuffer output) {
267275
}
268276
}
269277

278+
/**
279+
* {@link CommandResponse} implementation for {@link ByteBuffer} responses for absent keys.
280+
*/
281+
class AbsentByteBufferResponse<I> extends ByteBufferResponse<I> {
282+
283+
private final static ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]);
284+
285+
public AbsentByteBufferResponse(I input) {
286+
super(input, EMPTY_BYTE_BUFFER);
287+
}
288+
289+
/* (non-Javadoc)
290+
* @see org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse#isPresent()
291+
*/
292+
@Override
293+
public boolean isPresent() {
294+
return false;
295+
}
296+
}
297+
270298
/**
271299
* {@link CommandResponse} implementation for {@link List} responses.
272300
*/

src/main/java/org/springframework/data/redis/connection/ReactiveStringCommands.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ default Mono<ByteBuffer> get(ByteBuffer key) {
199199

200200
Assert.notNull(key, "Key must not be null!");
201201

202-
return get(Mono.just(new KeyCommand(key))).next().map(CommandResponse::getOutput);
202+
return get(Mono.just(new KeyCommand(key))).next().filter(CommandResponse::isPresent)
203+
.map(CommandResponse::getOutput);
203204
}
204205

205206
/**
@@ -225,7 +226,8 @@ default Mono<ByteBuffer> getSet(ByteBuffer key, ByteBuffer value) {
225226
Assert.notNull(key, "Key must not be null!");
226227
Assert.notNull(value, "Value must not be null!");
227228

228-
return getSet(Mono.just(SetCommand.set(key).value(value))).next().map(ByteBufferResponse::getOutput);
229+
return getSet(Mono.just(SetCommand.set(key).value(value))).next().filter(CommandResponse::isPresent)
230+
.map(ByteBufferResponse::getOutput);
229231
}
230232

231233
/**

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStringCommands.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.reactivestreams.Publisher;
2626
import org.springframework.data.domain.Range;
27+
import org.springframework.data.redis.connection.ReactiveRedisConnection.AbsentByteBufferResponse;
2728
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
2829
import org.springframework.data.redis.connection.ReactiveRedisConnection.ByteBufferResponse;
2930
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
@@ -113,7 +114,7 @@ public Flux<ByteBufferResponse<SetCommand>> getSet(Publisher<SetCommand> command
113114
}
114115

115116
return cmd.getset(command.getKey(), command.getValue()).map((value) -> new ByteBufferResponse<>(command, value))
116-
.defaultIfEmpty(new ByteBufferResponse<>(command, EMPTY_BYTE_BUFFER));
117+
.defaultIfEmpty(new AbsentByteBufferResponse<>(command));
117118
}));
118119
}
119120

@@ -129,7 +130,7 @@ public Flux<ByteBufferResponse<KeyCommand>> get(Publisher<KeyCommand> commands)
129130
Assert.notNull(command.getKey(), "Key must not be null!");
130131

131132
return cmd.get(command.getKey()).map((value) -> new ByteBufferResponse<>(command, value))
132-
.defaultIfEmpty(new ByteBufferResponse<>(command, EMPTY_BYTE_BUFFER));
133+
.defaultIfEmpty(new AbsentByteBufferResponse<>(command));
133134
}));
134135
}
135136

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStringCommandsTests.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
import static org.hamcrest.core.IsEqual.*;
2121
import static org.hamcrest.core.IsNull.*;
2222
import static org.junit.Assert.*;
23-
import static org.junit.Assume.assumeThat;
23+
import static org.junit.Assume.*;
24+
25+
import reactor.core.publisher.Flux;
26+
import reactor.core.publisher.Mono;
27+
import reactor.test.TestSubscriber;
2428

2529
import java.nio.ByteBuffer;
2630
import java.util.Arrays;
@@ -37,14 +41,11 @@
3741
import org.springframework.data.redis.connection.ReactiveStringCommands.SetCommand;
3842
import org.springframework.data.redis.connection.RedisStringCommands.BitOperation;
3943
import org.springframework.data.redis.core.types.Expiration;
40-
4144
import org.springframework.data.redis.test.util.LettuceRedisClientProvider;
42-
import reactor.core.publisher.Flux;
43-
import reactor.core.publisher.Mono;
44-
import reactor.test.TestSubscriber;
4545

4646
/**
4747
* @author Christoph Strobl
48+
* @author Mark Paluch
4849
*/
4950
public class LettuceReactiveStringCommandsTests extends LettuceReactiveCommandsTestsBase {
5051

@@ -59,14 +60,13 @@ public void getSetShouldReturnPreviousValueCorrectly() {
5960
assertThat(nativeCommands.get(KEY_1), is(equalTo(VALUE_2)));
6061
}
6162

62-
@Test // DATAREDIS-525
63+
@Test // DATAREDIS-525, DATAREDIS-645
6364
public void getSetShouldReturnPreviousValueCorrectlyWhenNoExists() {
6465

6566
Mono<ByteBuffer> result = connection.stringCommands().getSet(KEY_1_BBUFFER, VALUE_2_BBUFFER);
6667

6768
ByteBuffer value = result.block();
68-
assertThat(value, is(notNullValue()));
69-
assertThat(value, is(equalTo(ByteBuffer.allocate(0))));
69+
assertThat(value, is(nullValue()));
7070
assertThat(nativeCommands.get(KEY_1), is(equalTo(VALUE_2)));
7171
}
7272

@@ -104,11 +104,11 @@ public void getShouldRetriveValueCorrectly() {
104104
assertThat(result.block(), is(equalTo(VALUE_1_BBUFFER)));
105105
}
106106

107-
@Test // DATAREDIS-525
107+
@Test // DATAREDIS-525, DATAREDIS-645
108108
public void getShouldRetriveNullValueCorrectly() {
109109

110110
Mono<ByteBuffer> result = connection.stringCommands().get(KEY_1_BBUFFER);
111-
assertThat(result.block(), is(equalTo(ByteBuffer.allocate(0))));
111+
assertThat(result.block(), is(nullValue()));
112112
}
113113

114114
@Test // DATAREDIS-525

src/test/java/org/springframework/data/redis/core/DefaultReactiveValueOperationsIntegrationTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ public void get() {
195195
K key = keyFactory.instance();
196196
V value = valueFactory.instance();
197197

198+
StepVerifier.create(valueOperations.get(key)).verifyComplete();
199+
198200
StepVerifier.create(valueOperations.set(key, value)).expectNext(true).verifyComplete();
199201

200202
StepVerifier.create(valueOperations.get(key)).expectNext(value).verifyComplete();
@@ -207,6 +209,8 @@ public void getAndSet() {
207209
V value = valueFactory.instance();
208210
V nextValue = valueFactory.instance();
209211

212+
StepVerifier.create(valueOperations.getAndSet(key, nextValue)).verifyComplete();
213+
210214
StepVerifier.create(valueOperations.set(key, value)).expectNext(true).verifyComplete();
211215

212216
StepVerifier.create(valueOperations.getAndSet(key, nextValue)).expectNext(value).verifyComplete();

0 commit comments

Comments
 (0)