Skip to content

Commit 3f5533e

Browse files
christophstroblmp911de
authored andcommitted
DATAREDIS-1119 - Support XINFO via ReactiveStreamCommands.
Original pull request: spring-projects#519.
1 parent c010472 commit 3f5533e

File tree

7 files changed

+335
-10
lines changed

7 files changed

+335
-10
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@
4141
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
4242
import org.springframework.data.redis.connection.stream.ReadOffset;
4343
import org.springframework.data.redis.connection.stream.RecordId;
44+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
45+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup;
46+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
4447
import org.springframework.data.redis.connection.stream.StreamOffset;
4548
import org.springframework.data.redis.connection.stream.StreamReadOptions;
4649
import org.springframework.data.redis.connection.stream.StreamRecords;
@@ -1030,6 +1033,101 @@ default Flux<ByteBufferRecord> xRead(StreamReadOptions readOptions, StreamOffset
10301033
*/
10311034
Flux<CommandResponse<ReadCommand, Flux<ByteBufferRecord>>> read(Publisher<ReadCommand> commands);
10321035

1036+
/**
1037+
* @author Christoph Strobl
1038+
* @since 2.3
1039+
*/
1040+
class XInfoCommand extends KeyCommand {
1041+
1042+
private final @Nullable String groupName;
1043+
1044+
public XInfoCommand(@Nullable ByteBuffer key, @Nullable String groupName) {
1045+
1046+
super(key);
1047+
this.groupName = groupName;
1048+
}
1049+
1050+
public static XInfoCommand xInfo() {
1051+
return new XInfoCommand(null, null);
1052+
}
1053+
1054+
public XInfoCommand of(ByteBuffer key) {
1055+
return new XInfoCommand(key, groupName);
1056+
}
1057+
1058+
public XInfoCommand consumersIn(String groupName) {
1059+
return new XInfoCommand(getKey(), groupName);
1060+
}
1061+
1062+
public String getGroupName() {
1063+
return groupName;
1064+
}
1065+
}
1066+
1067+
/**
1068+
* Obtain general information about the stream stored at the specified {@literal key}.
1069+
*
1070+
* @param key the {@literal key} the stream is stored at.
1071+
* @return a {@link Mono} emitting {@link XInfoStream} when ready.
1072+
* @since 2.3
1073+
*/
1074+
default Mono<XInfoStream> xInfo(ByteBuffer key) {
1075+
return xInfo(Mono.just(XInfoCommand.xInfo().of(key))).next().map(CommandResponse::getOutput);
1076+
}
1077+
1078+
/**
1079+
* Obtain general information about the stream stored at the specified {@literal key}.
1080+
*
1081+
* @param commands must not be {@literal null}.
1082+
* @return never {@literal null}.
1083+
* @since 2.3
1084+
*/
1085+
Flux<CommandResponse<XInfoCommand, XInfoStream>> xInfo(Publisher<XInfoCommand> commands);
1086+
1087+
/**
1088+
* Obtain general information about the stream stored at the specified {@literal key}.
1089+
*
1090+
* @param key the {@literal key} the stream is stored at.
1091+
* @return a {@link Flux} emitting consumer group info one by one.
1092+
* @since 2.3
1093+
*/
1094+
default Flux<XInfoGroup> xInfoGroups(ByteBuffer key) {
1095+
return xInfoGroups(Mono.just(XInfoCommand.xInfo().of(key))).next().flatMapMany(CommandResponse::getOutput);
1096+
}
1097+
1098+
/**
1099+
* Obtain general information about the stream stored at the specified {@literal key}.
1100+
*
1101+
* @param commands must not be {@literal null}.
1102+
* @return never {@literal null}.
1103+
* @since 2.3
1104+
*/
1105+
Flux<CommandResponse<XInfoCommand, Flux<XInfoGroup>>> xInfoGroups(Publisher<XInfoCommand> commands);
1106+
1107+
/**
1108+
* Obtain information about every consumer in a specific {@literal consumer group} for the stream stored at the
1109+
* specified {@literal key}.
1110+
*
1111+
* @param key the {@literal key} the stream is stored at.
1112+
* @param groupName name of the {@literal consumer group}.
1113+
* @return a {@link Flux} emitting consumer info one by one.
1114+
* @since 2.3
1115+
*/
1116+
default Flux<XInfoConsumer> xInfoConsumers(ByteBuffer key, String groupName) {
1117+
return xInfoConsumers(Mono.just(XInfoCommand.xInfo().of(key).consumersIn(groupName))).next()
1118+
.flatMapMany(CommandResponse::getOutput);
1119+
}
1120+
1121+
/**
1122+
* Obtain information about every consumer in a specific {@literal consumer group} for the stream stored at the
1123+
* specified {@literal key}.
1124+
*
1125+
* @param commands must not be {@literal null}.
1126+
* @return never {@literal null}.
1127+
* @since 2.3
1128+
*/
1129+
Flux<CommandResponse<XInfoCommand, Flux<XInfoConsumer>>> xInfoConsumers(Publisher<XInfoCommand> commands);
1130+
10331131
class GroupCommand extends KeyCommand {
10341132

10351133
private final GroupCommandAction action;

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.lettuce.core.XReadArgs;
2121
import io.lettuce.core.XReadArgs.StreamOffset;
2222
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
23+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
2324
import reactor.core.publisher.Flux;
2425

2526
import java.nio.ByteBuffer;
@@ -39,6 +40,8 @@
3940
import org.springframework.data.redis.connection.stream.PendingMessages;
4041
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
4142
import org.springframework.data.redis.connection.stream.RecordId;
43+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup;
44+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
4245
import org.springframework.data.redis.connection.stream.StreamReadOptions;
4346
import org.springframework.data.redis.connection.stream.StreamRecords;
4447
import org.springframework.data.redis.util.ByteUtils;
@@ -344,6 +347,41 @@ private static Flux<ByteBufferRecord> doRead(ReadCommand command, StreamReadOpti
344347
.map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody()));
345348
}
346349

350+
@Override
351+
public Flux<CommandResponse<XInfoCommand, XInfoStream>> xInfo(Publisher<XInfoCommand> commands) {
352+
353+
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
354+
355+
Assert.notNull(command.getKey(), "Key must not be null!");
356+
357+
return cmd.xinfoStream(command.getKey()).collectList().map(XInfoStream::fromList)
358+
.map(it -> new CommandResponse<>(command, it));
359+
}));
360+
361+
}
362+
363+
@Override
364+
public Flux<CommandResponse<XInfoCommand, Flux<XInfoGroup>>> xInfoGroups(Publisher<XInfoCommand> commands) {
365+
return connection.execute(cmd -> Flux.from(commands).map(command -> {
366+
367+
Assert.notNull(command.getKey(), "Key must not be null!");
368+
369+
return new CommandResponse(command, cmd.xinfoGroups(command.getKey()).map(it -> XInfoGroup.fromList((List<Object>) it)));
370+
}));
371+
}
372+
373+
@Override
374+
public Flux<CommandResponse<XInfoCommand, Flux<XInfoConsumer>>> xInfoConsumers(Publisher<XInfoCommand> commands) {
375+
376+
return connection.execute(cmd -> Flux.from(commands).map(command -> {
377+
378+
Assert.notNull(command.getKey(), "Key must not be null!");
379+
380+
ByteBuffer groupName = ByteUtils.getByteBuffer(command.getGroupName());
381+
return new CommandResponse(command, cmd.xinfoConsumers(command.getKey(), groupName).map(it -> new XInfoConsumer(command.getGroupName(), (List<Object>) it)));
382+
}));
383+
}
384+
347385
/*
348386
* (non-Javadoc)
349387
* @see org.springframework.data.redis.connection.ReactiveStreamCommands#xRevRange(org.reactivestreams.Publisher)

src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,10 @@ public static class XInfoGroup extends XInfoObject {
315315
private XInfoGroup(List<Object> raw) {
316316
super(raw, DEFAULT_TYPE_HINTS);
317317
}
318+
319+
public static XInfoGroup fromList(List<Object> raw) {
320+
return new XInfoGroup(raw);
321+
}
318322

319323
/**
320324
* The {@literal consumer group} name. Corresponds to {@literal name}.

src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18-
import org.springframework.data.redis.connection.stream.PendingMessages;
19-
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
2018
import reactor.core.publisher.Flux;
2119
import reactor.core.publisher.Mono;
2220

@@ -36,9 +34,14 @@
3634
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3735
import org.springframework.data.redis.connection.stream.Consumer;
3836
import org.springframework.data.redis.connection.stream.MapRecord;
37+
import org.springframework.data.redis.connection.stream.PendingMessages;
38+
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
3939
import org.springframework.data.redis.connection.stream.ReadOffset;
4040
import org.springframework.data.redis.connection.stream.Record;
4141
import org.springframework.data.redis.connection.stream.RecordId;
42+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
43+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup;
44+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
4245
import org.springframework.data.redis.connection.stream.StreamOffset;
4346
import org.springframework.data.redis.connection.stream.StreamReadOptions;
4447
import org.springframework.data.redis.hash.HashMapper;
@@ -185,6 +188,43 @@ public Mono<String> destroyGroup(K key, String group) {
185188
return createMono(connection -> connection.xGroupDestroy(rawKey(key), group));
186189
}
187190

191+
/*
192+
* (non-Javadoc)
193+
* @see org.springframework.data.redis.core.ReactiveStreamOperations#consumers(java.lang.Object)
194+
*/
195+
@Override
196+
public Flux<XInfoConsumer> consumers(K key, String group) {
197+
198+
Assert.notNull(key, "Key must not be null!");
199+
Assert.notNull(group, "Group must not be null!");
200+
201+
return createFlux(connection -> connection.xInfoConsumers(rawKey(key), group));
202+
}
203+
204+
/*
205+
* (non-Javadoc)
206+
* @see org.springframework.data.redis.core.ReactiveStreamOperations#info(java.lang.Object)
207+
*/
208+
@Override
209+
public Mono<XInfoStream> info(K key) {
210+
211+
Assert.notNull(key, "Key must not be null!");
212+
213+
return createMono(connection -> connection.xInfo(rawKey(key)));
214+
}
215+
216+
/*
217+
* (non-Javadoc)
218+
* @see org.springframework.data.redis.core.ReactiveStreamOperations#groups(java.lang.Object)
219+
*/
220+
@Override
221+
public Flux<XInfoGroup> groups(K key) {
222+
223+
Assert.notNull(key, "Key must not be null!");
224+
225+
return createFlux(connection -> connection.xInfoGroups(rawKey(key)));
226+
}
227+
188228
/*
189229
* (non-Javadoc)
190230
* @see org.springframework.data.redis.core.StreamOperations#pending(java.lang.Object, java.lang.String, org.springframework.data.domain.Range, java.lang.Long)
@@ -218,7 +258,6 @@ public Mono<PendingMessagesSummary> pending(K key, String group) {
218258
return createMono(connection -> connection.xPending(rawKey, group));
219259
}
220260

221-
222261
/*
223262
* (non-Javadoc)
224263
* @see org.springframework.data.redis.core.ReactiveStreamOperations#size(java.lang.Object)

src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import org.springframework.data.domain.Range;
2626
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
2727
import org.springframework.data.redis.connection.stream.*;
28+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
29+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup;
30+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
2831
import org.springframework.data.redis.hash.HashMapper;
2932
import org.springframework.lang.Nullable;
3033
import org.springframework.util.Assert;
@@ -194,6 +197,36 @@ default Mono<String> createGroup(K key, String group) {
194197
*/
195198
Mono<String> destroyGroup(K key, String group);
196199

200+
/**
201+
* Obtain information about every consumer in a specific {@literal consumer group} for the stream stored at the
202+
* specified {@literal key}.
203+
*
204+
* @param key the {@literal key} the stream is stored at.
205+
* @param group name of the {@literal consumer group}.
206+
* @return {@literal null} when used in pipeline / transaction.
207+
* @since 2.3
208+
*/
209+
Flux<XInfoConsumer> consumers(K key, String group);
210+
211+
/**
212+
* Obtain information about {@literal consumer groups} associated with the stream stored at the specified
213+
* {@literal key}.
214+
*
215+
* @param key the {@literal key} the stream is stored at.
216+
* @return {@literal null} when used in pipeline / transaction.
217+
* @since 2.3
218+
*/
219+
Flux<XInfoGroup> groups(K key);
220+
221+
/**
222+
* Obtain general information about the stream stored at the specified {@literal key}.
223+
*
224+
* @param key the {@literal key} the stream is stored at.
225+
* @return {@literal null} when used in pipeline / transaction.
226+
* @since 2.3
227+
*/
228+
Mono<XInfoStream> info(K key);
229+
197230
/**
198231
* Obtain the {@link PendingMessagesSummary} for a given {@literal consumer group}.
199232
*

src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3246,7 +3246,7 @@ public void xClaim() throws InterruptedException {
32463246
assertThat(claimed).containsAll(messages);
32473247
}
32483248

3249-
@Test // DATAREDIS-1084
3249+
@Test // DATAREDIS-1119
32503250
@IfProfileValue(name = "redisVersion", value = "5.0")
32513251
@WithRedisDriver({ RedisDriver.LETTUCE })
32523252
public void xinfo() {
@@ -3274,7 +3274,7 @@ public void xinfo() {
32743274
assertThat(info.lastEntryId()).isEqualTo(lastRecord.getValue());
32753275
}
32763276

3277-
@Test // DATAREDIS-1084
3277+
@Test // DATAREDIS-1119
32783278
@IfProfileValue(name = "redisVersion", value = "5.0")
32793279
@WithRedisDriver({ RedisDriver.LETTUCE })
32803280
public void xinfoNoGroup() {
@@ -3299,7 +3299,7 @@ public void xinfoNoGroup() {
32993299
assertThat(info.lastEntryId()).isEqualTo(lastRecord.getValue());
33003300
}
33013301

3302-
@Test // DATAREDIS-1084
3302+
@Test // DATAREDIS-1119
33033303
@IfProfileValue(name = "redisVersion", value = "5.0")
33043304
@WithRedisDriver({ RedisDriver.LETTUCE })
33053305
public void xinfoGroups() {
@@ -3324,7 +3324,7 @@ public void xinfoGroups() {
33243324
assertThat(info.get(0).lastDeliveredId()).isEqualTo(lastRecord.getValue());
33253325
}
33263326

3327-
@Test // DATAREDIS-1084
3327+
@Test // DATAREDIS-1119
33283328
@IfProfileValue(name = "redisVersion", value = "5.0")
33293329
@WithRedisDriver({ RedisDriver.LETTUCE })
33303330
public void xinfoGroupsNoGroup() {
@@ -3342,7 +3342,7 @@ public void xinfoGroupsNoGroup() {
33423342

33433343
}
33443344

3345-
@Test // DATAREDIS-1084
3345+
@Test // DATAREDIS-1119
33463346
@IfProfileValue(name = "redisVersion", value = "5.0")
33473347
@WithRedisDriver({ RedisDriver.LETTUCE })
33483348
public void xinfoGroupsNoConsumer() {
@@ -3365,7 +3365,7 @@ public void xinfoGroupsNoConsumer() {
33653365
assertThat(info.get(0).lastDeliveredId()).isEqualTo("0-0");
33663366
}
33673367

3368-
@Test // DATAREDIS-1084
3368+
@Test // DATAREDIS-1119
33693369
@IfProfileValue(name = "redisVersion", value = "5.0")
33703370
@WithRedisDriver({ RedisDriver.LETTUCE })
33713371
public void xinfoConsumers() {
@@ -3389,7 +3389,7 @@ public void xinfoConsumers() {
33893389
assertThat(info.get(0).idleTimeMs()).isCloseTo(1L, Offset.offset(200L));
33903390
}
33913391

3392-
@Test // DATAREDIS-1084
3392+
@Test // DATAREDIS-1119
33933393
@IfProfileValue(name = "redisVersion", value = "5.0")
33943394
@WithRedisDriver({ RedisDriver.LETTUCE })
33953395
public void xinfoConsumersNoConsumer() {

0 commit comments

Comments
 (0)