|
17 | 17 |
|
18 | 18 | import static org.assertj.core.api.Assertions.*;
|
19 | 19 | import static org.junit.Assume.*;
|
| 20 | +import static org.mockito.ArgumentMatchers.*; |
20 | 21 |
|
21 | 22 | import lombok.AllArgsConstructor;
|
22 | 23 | import lombok.Data;
|
23 | 24 | import reactor.core.publisher.Flux;
|
24 | 25 | import reactor.test.StepVerifier;
|
25 | 26 |
|
| 27 | +import java.nio.ByteBuffer; |
26 | 28 | import java.time.Duration;
|
27 | 29 | import java.util.Collections;
|
28 | 30 |
|
29 | 31 | import org.junit.AfterClass;
|
30 | 32 | import org.junit.Before;
|
31 | 33 | import org.junit.BeforeClass;
|
32 | 34 | import org.junit.Test;
|
| 35 | +import org.mockito.Mockito; |
33 | 36 | import org.springframework.data.redis.ConnectionFactoryTracker;
|
34 | 37 | import org.springframework.data.redis.RedisSystemException;
|
35 | 38 | import org.springframework.data.redis.RedisVersionUtils;
|
|
55 | 58 | * Integration tests for {@link StreamReceiver}.
|
56 | 59 | *
|
57 | 60 | * @author Mark Paluch
|
| 61 | + * @author Eddie McDaniel |
58 | 62 | */
|
59 | 63 | public class StreamReceiverIntegrationTests {
|
60 | 64 |
|
@@ -168,6 +172,31 @@ public void shouldReceiveObjectHashRecords() {
|
168 | 172 | .verify(Duration.ofSeconds(5));
|
169 | 173 | }
|
170 | 174 |
|
| 175 | +@Test // DATAREDIS-1172 |
| 176 | +public void shouldReceiveCustomHashValueRecords() { |
| 177 | + |
| 178 | +SerializationPair<Integer> serializationPair = Mockito.mock(SerializationPair.class); |
| 179 | +Mockito.when(serializationPair.read(any(ByteBuffer.class))).thenReturn(345920); |
| 180 | + |
| 181 | +StreamReceiverOptions<String, MapRecord<String, String, Integer>> receiverOptions = StreamReceiverOptions.builder() |
| 182 | +.<String, Integer>hashValueSerializer(serializationPair).build(); |
| 183 | + |
| 184 | +StreamReceiver<String, MapRecord<String, String, Integer>> receiver = StreamReceiver.create(connectionFactory, |
| 185 | +receiverOptions); |
| 186 | + |
| 187 | +Flux<MapRecord<String, String, Integer>> messages = receiver.receive(StreamOffset.fromStart("my-stream")); |
| 188 | + |
| 189 | +messages.as(StepVerifier::create) |
| 190 | +.then(() -> reactiveRedisTemplate.opsForStream() |
| 191 | +.add("my-stream", Collections.singletonMap("Jesse", "Pinkman")).subscribe()) |
| 192 | +.consumeNextWith(it -> { |
| 193 | +assertThat(it.getStream()).isEqualTo("my-stream"); |
| 194 | +assertThat(it.getValue()).contains(entry("Jesse", 345920)); |
| 195 | +}) |
| 196 | +.thenCancel() |
| 197 | +.verify(Duration.ofSeconds(5)); |
| 198 | +} |
| 199 | + |
171 | 200 | @Test // DATAREDIS-864
|
172 | 201 | public void latestModeLosesMessages() {
|
173 | 202 |
|
|
0 commit comments